Files
filmsim/testbench.py
2025-06-19 15:31:45 -04:00

194 lines
6.9 KiB
Python

#!/usr/bin/env python3
import os
import sys
import argparse
import itertools
import subprocess
from multiprocessing import Pool
from functools import partial
# --- Configuration ---
# This dictionary maps the desired abbreviation to the full command-line flag.
# This makes it easy to add or remove flags in the future.
ARGS_MAP = {
# 'fd': '--force-d65',
# 'pnc': '--perform-negative-correction',
'pwb': '--perform-white-balance',
'pec': '--perform-exposure-correction',
# 'rae': '--raw-auto-exposure',
'sg': '--simulate-grain',
# 'mg': '--mono-grain'
}
# --- Worker Function for Multiprocessing ---
def run_filmcolor_command(job_info, filmcolor_path):
"""
Executes a single filmcolor command.
This function is designed to be called by a multiprocessing Pool.
"""
input_file, datasheet, output_file, flags = job_info
command = [
filmcolor_path,
input_file,
datasheet,
output_file
]
command.extend(flags)
command_str = " ".join(command)
print(f"🚀 Starting job: {os.path.basename(output_file)}")
try:
# Using subprocess.run to execute the command
# capture_output=True keeps stdout/stderr from cluttering the main display
# text=True decodes stdout/stderr as text
# check=True will raise a CalledProcessError if the command returns a non-zero exit code
result = subprocess.run(
command,
check=True,
capture_output=True,
text=True,
encoding='utf-8'
)
return f"✅ SUCCESS: Created {output_file}"
except FileNotFoundError:
return f"❌ ERROR: filmcolor executable not found at '{filmcolor_path}'"
except subprocess.CalledProcessError as e:
# This block runs if the command fails (returns non-zero exit code)
error_message = (
f"❌ FAILURE: Could not process {os.path.basename(input_file)} with {os.path.basename(datasheet)}\n"
f" Command: {command_str}\n"
f" Exit Code: {e.returncode}\n"
f" Stderr: {e.stderr.strip()}"
)
return error_message
except Exception as e:
return f"❌ UNEXPECTED ERROR: {e}"
# --- Main Script Logic ---
def main():
parser = argparse.ArgumentParser(
description="A testbench runner for the 'filmcolor' script.",
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument(
"input_dir",
help="The root directory containing subfolders with RAW images (ARW, DNG)."
)
parser.add_argument(
"datasheet_dir",
help="The directory containing the film datasheet JSON files."
)
parser.add_argument(
"filmcolor_path",
help="The path to the 'filmcolor' executable script."
)
parser.add_argument(
"-j", "--jobs",
type=int,
default=3,
help="Number of parallel jobs to run. (Default: 3)"
)
args = parser.parse_args()
# 1. Find all input RAW files
raw_files = []
print(f"🔎 Scanning for RAW files in '{args.input_dir}'...")
for root, _, files in os.walk(args.input_dir):
for file in files:
if file.lower().endswith(('.arw', '.dng')):
raw_files.append(os.path.join(root, file))
if not raw_files:
print("❌ No RAW (.arW or .DNG) files found. Exiting.")
sys.exit(1)
print(f" Found {len(raw_files)} RAW files.")
# 2. Find all datasheet JSON files
datasheet_files = []
print(f"🔎 Scanning for JSON files in '{args.datasheet_dir}'...")
try:
for file in os.listdir(args.datasheet_dir):
if file.lower().endswith('.json'):
datasheet_files.append(os.path.join(args.datasheet_dir, file))
except FileNotFoundError:
print(f"❌ Datasheet directory not found at '{args.datasheet_dir}'. Exiting.")
sys.exit(1)
if not datasheet_files:
print("❌ No datasheet (.json) files found. Exiting.")
sys.exit(1)
print(f" Found {len(datasheet_files)} datasheet files.")
# 3. Generate all argument combinations
arg_abbreviations = list(ARGS_MAP.keys())
all_arg_combos = []
# Loop from 0 to len(abbreviations) to get combinations of all lengths
for i in range(len(arg_abbreviations) + 1):
for combo in itertools.combinations(arg_abbreviations, i):
all_arg_combos.append(sorted(list(combo))) # Sort for consistent naming
# 4. Create the full list of jobs to run
jobs_to_run = []
for raw_file_path in raw_files:
input_dir = os.path.dirname(raw_file_path)
input_filename = os.path.basename(raw_file_path)
for datasheet_path in datasheet_files:
datasheet_name = os.path.splitext(os.path.basename(datasheet_path))[0]
for arg_combo_abbrs in all_arg_combos:
# Build the output filename
arg_suffix = "-".join(arg_combo_abbrs)
# Handle the case with no arguments to avoid a trailing hyphen
if arg_suffix:
output_name = f"{input_filename}-{datasheet_name}-{arg_suffix}.jpg"
else:
output_name = f"{input_filename}-{datasheet_name}.jpg"
output_path = os.path.join(input_dir, output_name)
# Get the full flags from the abbreviations
flags = [ARGS_MAP[abbr] for abbr in arg_combo_abbrs] + ['--perform-negative-correction'] # always include this flag
# Add the complete job description to our list
jobs_to_run.append((raw_file_path, datasheet_path, output_path, flags))
total_jobs = len(jobs_to_run)
print(f"\n✨ Generated {total_jobs} total jobs to run.")
if total_jobs == 0:
print("Nothing to do. Exiting.")
sys.exit(0)
# Ask for confirmation before starting a large number of jobs
try:
confirm = input(f"Proceed with running {total_jobs} jobs using {args.jobs} parallel processes? (y/N): ")
if confirm.lower() != 'y':
print("Aborted by user.")
sys.exit(0)
except KeyboardInterrupt:
print("\nAborted by user.")
sys.exit(0)
# 5. Run the jobs in a multiprocessing pool
print("\n--- Starting Testbench ---\n")
# `partial` is used to "pre-fill" the filmcolor_path argument of our worker function
worker_func = partial(run_filmcolor_command, filmcolor_path=args.filmcolor_path)
with Pool(processes=args.jobs) as pool:
# imap_unordered is great for this: it yields results as they complete,
# providing real-time feedback without waiting for all jobs to finish.
for i, result in enumerate(pool.imap_unordered(worker_func, jobs_to_run), 1):
print(f"[{i}/{total_jobs}] {result}")
print("\n--- Testbench Finished ---")
if __name__ == "__main__":
main()