#!/usr/bin/env -S uv run --script # /// script # dependencies = [ # "numpy", # "scipy", # "Pillow", # "imageio", # "warp-lang", # "rawpy", # ] # /// import warp as wp import numpy as np import math import argparse import imageio.v3 as iio from scipy.integrate import quad from scipy.signal.windows import gaussian # For creating Gaussian kernel import rawpy wp.init() # --- Helper functions based on the paper --- def save_debug_image(wp_array, filename): """Saves a Warp 2D array as a grayscale image, scaling values.""" try: numpy_array = wp_array.numpy() min_val, max_val = np.min(numpy_array), np.max(numpy_array) if max_val > min_val: norm_array = (numpy_array - min_val) / (max_val - min_val) else: norm_array = np.full_like(numpy_array, 0.5) img_uint8 = (norm_array * 255.0).clip(0, 255).astype(np.uint8) iio.imwrite(filename, img_uint8) print(f"Debug image saved to {filename}") except Exception as e: print(f"Error saving debug image {filename}: {e}") @wp.func def w_func(x: float): if x >= 2.0: return 0.0 elif x < 0.0: return 1.0 else: arg = x / 2.0 if arg > 1.0: arg = 1.0 elif arg < -1.0: arg = -1.0 sqrt_arg = 4.0 - x * x if sqrt_arg < 0.0: sqrt_val = 0.0 else: sqrt_val = wp.sqrt(sqrt_arg) # This should be sqrt(1 - (x/2)^2) for unit disk overlap area formula # Corrected w(x) based on overlap area of two unit disks divided by pi # overlap_area = 2 * acos(x/2) - x * sqrt(1 - (x/2)^2) # w(x) = overlap_area / pi # Ensure acos argument is clamped acos_arg = x / 2.0 if acos_arg > 1.0: acos_arg = 1.0 if acos_arg < -1.0: acos_arg = -1.0 sqrt_term_arg = 1.0 - acos_arg * acos_arg if sqrt_term_arg < 0.0: sqrt_term_arg = 0.0 overlap_area_over_pi = ( 2.0 * wp.acos(acos_arg) - x * wp.sqrt(sqrt_term_arg) ) / math.pi return overlap_area_over_pi @wp.func def CB_const_radius_unit(u_pixel: float, x: float): safe_u = wp.min(u_pixel, 0.99999) if safe_u < 0.0: safe_u = 0.0 one_minus_u = 1.0 - safe_u if one_minus_u <= 1e-9: # Avoid pow(small_negative_base) or pow(0, neg_exponent from wx) return 0.0 wx = w_func(x) # Ensure 2.0 - wx does not lead to issues if wx is large, though wx should be <= 1 exponent = 2.0 - wx # type: ignore term1 = wp.pow(one_minus_u, exponent) term2 = one_minus_u * one_minus_u return term1 - term2 def integrand_variance(x, u_pixel): if x < 0: return 0.0 if x >= 2.0: return 0.0 safe_u = np.clip(u_pixel, 0.0, 0.99999) one_minus_u = 1.0 - safe_u if one_minus_u <= 1e-9: return 0.0 acos_arg = x / 2.0 if acos_arg > 1.0: acos_arg = 1.0 if acos_arg < -1.0: acos_arg = -1.0 sqrt_term_arg = 1.0 - acos_arg * acos_arg if sqrt_term_arg < 0.0: sqrt_term_arg = 0.0 wx = (2.0 * np.arccos(acos_arg) - x * np.sqrt(sqrt_term_arg)) / np.pi cb = np.power(one_minus_u, 2.0 - wx) - np.power(one_minus_u, 2.0) return cb * x def precompute_variance_lut(num_u_samples=256): """ Precomputes the integral I(u) = ∫[0 to 2] CB(u, x, 1) * x dx for different u values. Creates a LUT with num_u_samples + 1 entries (for u=0 to u=1 inclusive). """ print(f"Precomputing variance LUT with {num_u_samples+1} entries...") # Samples u from 0 to 1 inclusive for the LUT u_values_for_lut = np.linspace(0.0, 1.0, num_u_samples + 1, endpoint=True) lut = np.zeros(num_u_samples + 1, dtype=np.float32) for i, u in enumerate(u_values_for_lut): result, error = quad( integrand_variance, 0, 2, args=(u,), epsabs=1e-6, limit=100 ) if result < 0: result = 0.0 lut[i] = result if i % ((num_u_samples + 1) // 10) == 0 : print(f" LUT progress: {i}/{num_u_samples+1}") print("Variance LUT computed.") return lut @wp.kernel def generate_noise_kernel( u_image: wp.array2d(dtype=float), variance_lut: wp.array(dtype=float), noise_out: wp.array2d(dtype=float), mu_r: float, sigma_filter: float, seed: int, ): ix, iy = wp.tid() height = u_image.shape[0] width = u_image.shape[1] if ix >= height or iy >= width: return lut_size = variance_lut.shape[0] u_val = u_image[ix, iy] lut_pos = u_val * float(lut_size - 1) lut_index0 = int(lut_pos) lut_index0 = wp.min(wp.max(lut_index0, 0), lut_size - 2) # Ensure lut_index0 and lut_index0+1 are valid lut_index1 = lut_index0 + 1 t = lut_pos - float(lut_index0) if t < 0.0: t = 0.0 # Clamp t to avoid issues with precision if t > 1.0: t = 1.0 integral_val = wp.lerp(variance_lut[lut_index0], variance_lut[lut_index1], t) var_bp = 0.0 if sigma_filter > 1e-6 and mu_r > 1e-6: # mu_r check also important var_bp = wp.max(0.0, (mu_r * mu_r) / (2.0 * sigma_filter * sigma_filter) * integral_val) std_dev = wp.sqrt(var_bp) state = wp.rand_init(seed, ix * width + iy + seed) # Add seed to sequence as well noise_sample = wp.randn(state) * std_dev noise_out[ix, iy] = noise_sample @wp.kernel def convolve_2d_kernel( input_array: wp.array2d(dtype=float), kernel: wp.array(dtype=float), kernel_radius: int, output_array: wp.array2d(dtype=float), ): ix, iy = wp.tid() height = input_array.shape[0] width = input_array.shape[1] if ix >= height or iy >= width: return kernel_dim = 2 * kernel_radius + 1 accum = float(0.0) for ky_offset in range(kernel_dim): for kx_offset in range(kernel_dim): k_idx = ky_offset * kernel_dim + kx_offset weight = kernel[k_idx] # Image coordinates to sample from read_row = ix + (ky_offset - kernel_radius) # Corrected: ix is row, iy is col usually read_col = iy + (kx_offset - kernel_radius) clamped_row = wp.max(0, wp.min(read_row, height - 1)) clamped_col = wp.max(0, wp.min(read_col, width - 1)) sample_val = input_array[clamped_row, clamped_col] accum += weight * sample_val output_array[ix, iy] = accum @wp.kernel def add_rgb_noise_and_clip_kernel( r_in: wp.array2d(dtype=float), g_in: wp.array2d(dtype=float), b_in: wp.array2d(dtype=float), noise_r: wp.array2d(dtype=float), noise_g: wp.array2d(dtype=float), noise_b: wp.array2d(dtype=float), r_out: wp.array2d(dtype=float), g_out: wp.array2d(dtype=float), b_out: wp.array2d(dtype=float)): """Adds channel-specific filtered noise to each channel and clips.""" ix, iy = wp.tid() # type: ignore height = r_in.shape[0] width = r_in.shape[1] if ix >= height or iy >= width: return r_out[ix, iy] = wp.clamp(r_in[ix, iy] + noise_r[ix, iy], 0.0, 1.0) # type: ignore g_out[ix, iy] = wp.clamp(g_in[ix, iy] + noise_g[ix, iy], 0.0, 1.0) # type: ignore b_out[ix, iy] = wp.clamp(b_in[ix, iy] + noise_b[ix, iy], 0.0, 1.0) # type: ignore def create_gaussian_kernel_2d(sigma, radius): kernel_size = 2 * radius + 1 g = gaussian(kernel_size, sigma, sym=True) # Ensure symmetry for odd kernel_size kernel_2d = np.outer(g, g) sum_sq = np.sum(kernel_2d**2) if sum_sq > 1e-9: # Avoid division by zero if kernel is all zeros kernel_2d /= np.sqrt(sum_sq) return kernel_2d.flatten().astype(np.float32) def render_film_grain(image_path, mu_r, sigma_filter, output_path, seed=42, mono=False): try: if image_path.lower().endswith('.arw') or image_path.lower().endswith('.dng'): # Use rawpy for TIFF images to handle metadata correctly with rawpy.imread(image_path) as raw: img_np = raw.postprocess( use_camera_wb=True, no_auto_bright=True, output_bps=16, half_size=False, gamma=(1.0, 1.0), # No gamma correction ) elif image_path.lower().endswith('.tiff') or image_path.lower().endswith('.tif') or image_path.lower().endswith('.png') or image_path.lower().endswith('.jpg') or image_path.lower().endswith('.jpeg'): img_np = iio.imread(image_path) else: raise ValueError("Unsupported image format. Please use TIFF, PNG, JPG, or RAW (ARW, DNG) formats.") except FileNotFoundError: print(f"Error: Input image not found at {image_path}") return except Exception as e: print(f"Error loading image: {e}") return if img_np.ndim == 2: img_np = img_np[..., np.newaxis] if img_np.shape[2] == 4: img_np = img_np[..., :3] if img_np.dtype == np.uint8: img_np_float = img_np.astype(np.float32) / 255.0 elif img_np.dtype == np.uint16: img_np_float = img_np.astype(np.float32) / 65535.0 else: img_np_float = img_np.astype(np.float32) height, width, channels = img_np_float.shape print(f"Input image: {width}x{height}x{channels}") print(f"Parameters: μr = {mu_r}, σ_filter = {sigma_filter}") # Use 256 u_samples for LUT, resulting in 257 entries (0 to 256 for u=0 to u=1) variance_lut_np = precompute_variance_lut(num_u_samples=256) variance_lut_wp = wp.array(variance_lut_np, dtype=float, device="cuda") kernel_radius = max(1, int(np.ceil(3 * sigma_filter))) kernel_np = create_gaussian_kernel_2d(sigma_filter, kernel_radius) kernel_wp = wp.array(kernel_np, dtype=float, device="cuda") print(f"Using Gaussian filter h with sigma={sigma_filter}, radius={kernel_radius}") # --- Prepare original channel data on GPU --- r_original_wp = wp.array(img_np_float[:, :, 0], dtype=float, device="cuda") if channels == 3: g_original_wp = wp.array(img_np_float[:, :, 1], dtype=float, device="cuda") b_original_wp = wp.array(img_np_float[:, :, 2], dtype=float, device="cuda") else: # Grayscale input g_original_wp = r_original_wp b_original_wp = r_original_wp # --- Allocate noise arrays on GPU --- noise_r_unfiltered_wp = wp.empty_like(r_original_wp) noise_g_unfiltered_wp = wp.empty_like(g_original_wp) noise_b_unfiltered_wp = wp.empty_like(b_original_wp) noise_r_filtered_wp = wp.empty_like(r_original_wp) noise_g_filtered_wp = wp.empty_like(g_original_wp) noise_b_filtered_wp = wp.empty_like(b_original_wp) if mono: if channels == 1: img_gray_np = img_np_float[:, :, 0] else: # Standard RGB to Luminance weights img_gray_np = (0.299 * img_np_float[:, :, 0] + 0.587 * img_np_float[:, :, 1] + 0.114 * img_np_float[:, :, 2]) print("Generating monochromatic noise...") u_gray_wp = wp.array(img_gray_np, dtype=float, device="cuda") noise_image_wp = wp.empty_like(u_gray_wp) wp.launch(kernel=generate_noise_kernel, dim=(height, width), inputs=[u_gray_wp, variance_lut_wp, noise_image_wp, mu_r, sigma_filter, seed], device="cuda") noise_filtered_wp = wp.empty_like(u_gray_wp) wp.launch(kernel=convolve_2d_kernel, dim=(height, width), inputs=[noise_image_wp, kernel_wp, kernel_radius, noise_filtered_wp], device="cuda") noise_r_filtered_wp.assign(noise_filtered_wp) noise_g_filtered_wp.assign(noise_filtered_wp) noise_b_filtered_wp.assign(noise_filtered_wp) else: # --- Process R Channel --- print("Processing R channel...") wp.launch(kernel=generate_noise_kernel, dim=(height, width), inputs=[r_original_wp, variance_lut_wp, noise_r_unfiltered_wp, mu_r, sigma_filter, seed], device="cuda") wp.launch(kernel=convolve_2d_kernel, dim=(height, width), inputs=[noise_r_unfiltered_wp, kernel_wp, kernel_radius, noise_r_filtered_wp], device="cuda") if channels == 3: # --- Process G Channel --- print("Processing G channel...") wp.launch(kernel=generate_noise_kernel, dim=(height, width), inputs=[g_original_wp, variance_lut_wp, noise_g_unfiltered_wp, mu_r, sigma_filter, seed + 1], device="cuda") # Offset seed wp.launch(kernel=convolve_2d_kernel, dim=(height, width), inputs=[noise_g_unfiltered_wp, kernel_wp, kernel_radius, noise_g_filtered_wp], device="cuda") # --- Process B Channel --- print("Processing B channel...") wp.launch(kernel=generate_noise_kernel, dim=(height, width), inputs=[b_original_wp, variance_lut_wp, noise_b_unfiltered_wp, mu_r, sigma_filter, seed + 2], device="cuda") # Offset seed wp.launch(kernel=convolve_2d_kernel, dim=(height, width), inputs=[noise_b_unfiltered_wp, kernel_wp, kernel_radius, noise_b_filtered_wp], device="cuda") else: # Grayscale: copy R channel's filtered noise to G and B components noise_g_filtered_wp.assign(noise_r_filtered_wp) # Use assign for Warp arrays noise_b_filtered_wp.assign(noise_r_filtered_wp) # --- Add noise and clip --- print("Adding noise to channels and clipping...") r_output_wp = wp.empty_like(r_original_wp) g_output_wp = wp.empty_like(g_original_wp) b_output_wp = wp.empty_like(b_original_wp) wp.launch(kernel=add_rgb_noise_and_clip_kernel, dim=(height, width), inputs=[r_original_wp, g_original_wp, b_original_wp, noise_r_filtered_wp, noise_g_filtered_wp, noise_b_filtered_wp, r_output_wp, g_output_wp, b_output_wp], device="cuda") # --- Copy back to host --- output_img_np = np.zeros((height,width,3), dtype=np.float32) # Always create 3-channel output buffer output_img_np[:, :, 0] = r_output_wp.numpy() output_img_np[:, :, 1] = g_output_wp.numpy() output_img_np[:, :, 2] = b_output_wp.numpy() try: if output_path.lower().endswith('.tiff') or output_path.lower().endswith('.tif'): output_img_uint16 = (output_img_np * 65535.0).clip(0, 65535).astype(np.uint16) iio.imwrite(output_path, output_img_uint16) print(f"Output image saved to {output_path}") elif output_path.lower().endswith('.png'): output_img_uint8 = (output_img_np * 255.0).clip(0, 255).astype(np.uint8) iio.imwrite(output_path, output_img_uint8) print(f"Output image saved to {output_path}") elif output_path.lower().endswith('.jpg') or output_path.lower().endswith('.jpeg'): output_img_uint8 = (output_img_np * 255.0).clip(0, 255).astype(np.uint8) iio.imwrite(output_path, output_img_uint8, quality=95) print(f"Output image saved to {output_path}") except Exception as e: print(f"Error saving image: {e}") if __name__ == "__main__": parser = argparse.ArgumentParser( description="Apply realistic film grain (Zhang et al. 2023 method)." ) parser.add_argument("input_image", help="Path to the input image (TIFF, PNG, JPG, or RAW (ARW/DNG) format)") parser.add_argument("output_image", help="Path to save the output image (TIFF (16-bit), PNG, JPG format)") parser.add_argument( "--mu_r", type=float, default=0.1, help="Mean grain radius (relative to pixel size)" ) parser.add_argument( "--sigma", type=float, default=0.8, help="Standard deviation of the Gaussian Filter for noise blurring (sigma_filter).", ) parser.add_argument( "--seed", type=int, default=42, help="Random seed for noise generation" ) parser.add_argument( "--mono", action="store_true", help="Apply monochrome film grain across channels based on luminance", default=False ) args = parser.parse_args() if args.mu_r <= 0: print("Warning: mu_r should be positive. Using default 0.1") args.mu_r = 0.1 if args.sigma <= 0: print("Warning: sigma_filter should be positive. Using default 0.8") args.sigma = 0.8 if args.sigma < 3 * args.mu_r: print( f"Warning: sigma_filter ({args.sigma}) is less than 3*mu_r ({3 * args.mu_r:.2f}). Approximations in the model might be less accurate." ) render_film_grain( args.input_image, args.mu_r, args.sigma, args.output_image, args.seed, args.mono )