471 lines
18 KiB
Plaintext
Executable File
471 lines
18 KiB
Plaintext
Executable File
#!/usr/bin/env -S uv run --script
|
||
# /// script
|
||
# dependencies = [
|
||
# "numpy",
|
||
# "scipy",
|
||
# "Pillow",
|
||
# "imageio",
|
||
# "warp-lang",
|
||
# "rawpy",
|
||
# ]
|
||
# ///
|
||
|
||
|
||
import warp as wp
|
||
import numpy as np
|
||
import math
|
||
import argparse
|
||
import imageio.v3 as iio
|
||
from scipy.integrate import quad
|
||
from scipy.signal.windows import gaussian # For creating Gaussian kernel
|
||
import rawpy
|
||
import math
|
||
|
||
def get_grain_parameters(width, height, iso):
|
||
"""
|
||
Calculates mu_r and sigma for the film grain script based on image
|
||
dimensions (width, height) and target ISO.
|
||
|
||
Args:
|
||
width (int): The width of the source image in pixels.
|
||
height (int): The height of the source image in pixels.
|
||
iso (float): The target film ISO to simulate (e.g., 100, 400, 3200).
|
||
|
||
Returns:
|
||
tuple: A tuple containing the calculated (mu_r, sigma) for the script.
|
||
"""
|
||
# --- Baseline Parameters (calibrated for a 24MP image @ ISO 400) ---
|
||
# A 24MP image (e.g., 6000x4000) has 24,000,000 pixels.
|
||
PIXELS_BASE = 24_000_000.0
|
||
ISO_BASE = 400.0
|
||
MU_R_BASE = 0.15
|
||
SIGMA_BASE = 0.5
|
||
|
||
# --- Scaling Exponents (Artistically chosen for a natural feel) ---
|
||
# The exponent for mu_r is larger than for sigma to ensure that
|
||
# grain intensity (related to mu_r²/sigma²) increases with ISO.
|
||
ISO_EXPONENT_MU = 0.4
|
||
ISO_EXPONENT_SIGMA = 0.3
|
||
|
||
# Clamp ISO to a reasonable range to avoid extreme/invalid values
|
||
iso = max(64.0, min(iso, 8000.0))
|
||
|
||
# 1. Calculate the total number of pixels in the actual image
|
||
pixels_actual = float(width * height)
|
||
|
||
# 2. Calculate the resolution scaler
|
||
# This scales parameters based on the image's linear dimensions (sqrt of area)
|
||
# relative to the 24MP baseline.
|
||
resolution_scaler = math.sqrt(pixels_actual / PIXELS_BASE)
|
||
print(f"Resolution scaler: {resolution_scaler:.4f} (for {width}x{height} image)")
|
||
|
||
# 3. Calculate the ISO scaler
|
||
iso_ratio = iso / ISO_BASE
|
||
iso_scaler_mu = iso_ratio ** ISO_EXPONENT_MU
|
||
iso_scaler_sigma = iso_ratio ** ISO_EXPONENT_SIGMA
|
||
print(f"ISO scaler: μ = {iso_scaler_mu:.4f}, σ = {iso_scaler_sigma:.4f} (for ISO {iso})")
|
||
|
||
# 4. Calculate the final parameters by applying both scalers
|
||
final_mu_r = MU_R_BASE * resolution_scaler * iso_scaler_mu
|
||
final_sigma = SIGMA_BASE * resolution_scaler * iso_scaler_sigma
|
||
|
||
return (final_mu_r, final_sigma)
|
||
|
||
wp.init()
|
||
|
||
# --- Helper functions based on the paper ---
|
||
|
||
def save_debug_image(wp_array, filename):
|
||
"""Saves a Warp 2D array as a grayscale image, scaling values."""
|
||
try:
|
||
numpy_array = wp_array.numpy()
|
||
min_val, max_val = np.min(numpy_array), np.max(numpy_array)
|
||
if max_val > min_val:
|
||
norm_array = (numpy_array - min_val) / (max_val - min_val)
|
||
else:
|
||
norm_array = np.full_like(numpy_array, 0.5)
|
||
|
||
img_uint8 = (norm_array * 255.0).clip(0, 255).astype(np.uint8)
|
||
iio.imwrite(filename, img_uint8)
|
||
print(f"Debug image saved to {filename}")
|
||
except Exception as e:
|
||
print(f"Error saving debug image {filename}: {e}")
|
||
|
||
|
||
@wp.func
|
||
def w_func(x: float):
|
||
if x >= 2.0:
|
||
return 0.0
|
||
elif x < 0.0:
|
||
return 1.0
|
||
else:
|
||
arg = x / 2.0
|
||
if arg > 1.0:
|
||
arg = 1.0
|
||
elif arg < -1.0:
|
||
arg = -1.0
|
||
sqrt_arg = 4.0 - x * x
|
||
if sqrt_arg < 0.0:
|
||
sqrt_val = 0.0
|
||
else:
|
||
sqrt_val = wp.sqrt(sqrt_arg) # This should be sqrt(1 - (x/2)^2) for unit disk overlap area formula
|
||
|
||
# Corrected w(x) based on overlap area of two unit disks divided by pi
|
||
# overlap_area = 2 * acos(x/2) - x * sqrt(1 - (x/2)^2)
|
||
# w(x) = overlap_area / pi
|
||
# Ensure acos argument is clamped
|
||
acos_arg = x / 2.0
|
||
if acos_arg > 1.0: acos_arg = 1.0
|
||
if acos_arg < -1.0: acos_arg = -1.0
|
||
|
||
sqrt_term_arg = 1.0 - acos_arg * acos_arg
|
||
if sqrt_term_arg < 0.0: sqrt_term_arg = 0.0
|
||
|
||
overlap_area_over_pi = (
|
||
2.0 * wp.acos(acos_arg) - x * wp.sqrt(sqrt_term_arg)
|
||
) / math.pi
|
||
return overlap_area_over_pi
|
||
|
||
|
||
@wp.func
|
||
def CB_const_radius_unit(u_pixel: float, x: float):
|
||
safe_u = wp.min(u_pixel, 0.99999)
|
||
if safe_u < 0.0:
|
||
safe_u = 0.0
|
||
|
||
one_minus_u = 1.0 - safe_u
|
||
if one_minus_u <= 1e-9: # Avoid pow(small_negative_base) or pow(0, neg_exponent from wx)
|
||
return 0.0
|
||
|
||
wx = w_func(x)
|
||
# Ensure 2.0 - wx does not lead to issues if wx is large, though wx should be <= 1
|
||
exponent = 2.0 - wx # type: ignore
|
||
|
||
term1 = wp.pow(one_minus_u, exponent)
|
||
term2 = one_minus_u * one_minus_u
|
||
return term1 - term2
|
||
|
||
|
||
def integrand_variance(x, u_pixel):
|
||
if x < 0: return 0.0
|
||
if x >= 2.0: return 0.0
|
||
|
||
safe_u = np.clip(u_pixel, 0.0, 0.99999)
|
||
one_minus_u = 1.0 - safe_u
|
||
if one_minus_u <= 1e-9: return 0.0
|
||
|
||
acos_arg = x / 2.0
|
||
if acos_arg > 1.0: acos_arg = 1.0
|
||
if acos_arg < -1.0: acos_arg = -1.0
|
||
|
||
sqrt_term_arg = 1.0 - acos_arg * acos_arg
|
||
if sqrt_term_arg < 0.0: sqrt_term_arg = 0.0
|
||
|
||
wx = (2.0 * np.arccos(acos_arg) - x * np.sqrt(sqrt_term_arg)) / np.pi
|
||
|
||
cb = np.power(one_minus_u, 2.0 - wx) - np.power(one_minus_u, 2.0)
|
||
return cb * x
|
||
|
||
|
||
def precompute_variance_lut(num_u_samples=256):
|
||
"""
|
||
Precomputes the integral I(u) = ∫[0 to 2] CB(u, x, 1) * x dx for different u values.
|
||
Creates a LUT with num_u_samples + 1 entries (for u=0 to u=1 inclusive).
|
||
"""
|
||
print(f"Precomputing variance LUT with {num_u_samples+1} entries...")
|
||
# Samples u from 0 to 1 inclusive for the LUT
|
||
u_values_for_lut = np.linspace(0.0, 1.0, num_u_samples + 1, endpoint=True)
|
||
lut = np.zeros(num_u_samples + 1, dtype=np.float32)
|
||
|
||
for i, u in enumerate(u_values_for_lut):
|
||
result, error = quad(
|
||
integrand_variance, 0, 2, args=(u,), epsabs=1e-6, limit=100
|
||
)
|
||
if result < 0: result = 0.0
|
||
lut[i] = result
|
||
if i % ((num_u_samples + 1) // 10) == 0 :
|
||
print(f" LUT progress: {i}/{num_u_samples+1}")
|
||
print("Variance LUT computed.")
|
||
return lut
|
||
|
||
|
||
@wp.kernel
|
||
def generate_noise_kernel(
|
||
u_image: wp.array2d(dtype=float),
|
||
variance_lut: wp.array(dtype=float),
|
||
noise_out: wp.array2d(dtype=float),
|
||
mu_r: float,
|
||
sigma_filter: float,
|
||
seed: int,
|
||
):
|
||
ix, iy = wp.tid()
|
||
height = u_image.shape[0]
|
||
width = u_image.shape[1]
|
||
if ix >= height or iy >= width: return
|
||
|
||
lut_size = variance_lut.shape[0]
|
||
u_val = u_image[ix, iy]
|
||
|
||
lut_pos = u_val * float(lut_size - 1)
|
||
lut_index0 = int(lut_pos)
|
||
lut_index0 = wp.min(wp.max(lut_index0, 0), lut_size - 2) # Ensure lut_index0 and lut_index0+1 are valid
|
||
lut_index1 = lut_index0 + 1
|
||
t = lut_pos - float(lut_index0)
|
||
if t < 0.0: t = 0.0 # Clamp t to avoid issues with precision
|
||
if t > 1.0: t = 1.0
|
||
|
||
integral_val = wp.lerp(variance_lut[lut_index0], variance_lut[lut_index1], t)
|
||
|
||
var_bp = 0.0
|
||
if sigma_filter > 1e-6 and mu_r > 1e-6: # mu_r check also important
|
||
var_bp = wp.max(0.0, (mu_r * mu_r) / (2.0 * sigma_filter * sigma_filter) * integral_val)
|
||
|
||
std_dev = wp.sqrt(var_bp)
|
||
state = wp.rand_init(seed, ix * width + iy + seed) # Add seed to sequence as well
|
||
noise_sample = wp.randn(state) * std_dev
|
||
noise_out[ix, iy] = noise_sample
|
||
|
||
@wp.kernel
|
||
def convolve_2d_kernel(
|
||
input_array: wp.array2d(dtype=float),
|
||
kernel: wp.array(dtype=float),
|
||
kernel_radius: int,
|
||
output_array: wp.array2d(dtype=float),
|
||
):
|
||
ix, iy = wp.tid()
|
||
height = input_array.shape[0]
|
||
width = input_array.shape[1]
|
||
if ix >= height or iy >= width: return
|
||
|
||
kernel_dim = 2 * kernel_radius + 1
|
||
accum = float(0.0)
|
||
|
||
for ky_offset in range(kernel_dim):
|
||
for kx_offset in range(kernel_dim):
|
||
k_idx = ky_offset * kernel_dim + kx_offset
|
||
weight = kernel[k_idx]
|
||
|
||
# Image coordinates to sample from
|
||
read_row = ix + (ky_offset - kernel_radius) # Corrected: ix is row, iy is col usually
|
||
read_col = iy + (kx_offset - kernel_radius)
|
||
|
||
clamped_row = wp.max(0, wp.min(read_row, height - 1))
|
||
clamped_col = wp.max(0, wp.min(read_col, width - 1))
|
||
|
||
sample_val = input_array[clamped_row, clamped_col]
|
||
accum += weight * sample_val
|
||
output_array[ix, iy] = accum
|
||
|
||
@wp.kernel
|
||
def add_rgb_noise_and_clip_kernel(
|
||
r_in: wp.array2d(dtype=float),
|
||
g_in: wp.array2d(dtype=float),
|
||
b_in: wp.array2d(dtype=float),
|
||
noise_r: wp.array2d(dtype=float),
|
||
noise_g: wp.array2d(dtype=float),
|
||
noise_b: wp.array2d(dtype=float),
|
||
r_out: wp.array2d(dtype=float),
|
||
g_out: wp.array2d(dtype=float),
|
||
b_out: wp.array2d(dtype=float)):
|
||
"""Adds channel-specific filtered noise to each channel and clips."""
|
||
ix, iy = wp.tid() # type: ignore
|
||
|
||
height = r_in.shape[0]
|
||
width = r_in.shape[1]
|
||
if ix >= height or iy >= width: return
|
||
|
||
|
||
r_out[ix, iy] = wp.clamp(r_in[ix, iy] + noise_r[ix, iy], 0.0, 1.0) # type: ignore
|
||
g_out[ix, iy] = wp.clamp(g_in[ix, iy] + noise_g[ix, iy], 0.0, 1.0) # type: ignore
|
||
b_out[ix, iy] = wp.clamp(b_in[ix, iy] + noise_b[ix, iy], 0.0, 1.0) # type: ignore
|
||
|
||
|
||
def create_gaussian_kernel_2d(sigma, radius):
|
||
kernel_size = 2 * radius + 1
|
||
g = gaussian(kernel_size, sigma, sym=True) # Ensure symmetry for odd kernel_size
|
||
kernel_2d = np.outer(g, g)
|
||
sum_sq = np.sum(kernel_2d**2)
|
||
if sum_sq > 1e-9: # Avoid division by zero if kernel is all zeros
|
||
kernel_2d /= np.sqrt(sum_sq)
|
||
return kernel_2d.flatten().astype(np.float32)
|
||
|
||
|
||
def render_film_grain(image_path, iso, output_path, seed=42, mono=False):
|
||
try:
|
||
if image_path.lower().endswith('.arw') or image_path.lower().endswith('.dng'):
|
||
# Use rawpy for TIFF images to handle metadata correctly
|
||
with rawpy.imread(image_path) as raw:
|
||
img_np = raw.postprocess(
|
||
use_camera_wb=True,
|
||
no_auto_bright=True,
|
||
output_bps=16,
|
||
half_size=False,
|
||
gamma=(1.0, 1.0), # No gamma correction
|
||
)
|
||
elif image_path.lower().endswith('.tiff') or image_path.lower().endswith('.tif') or image_path.lower().endswith('.png') or image_path.lower().endswith('.jpg') or image_path.lower().endswith('.jpeg'):
|
||
img_np = iio.imread(image_path)
|
||
else:
|
||
raise ValueError("Unsupported image format. Please use TIFF, PNG, JPG, or RAW (ARW, DNG) formats.")
|
||
except FileNotFoundError:
|
||
print(f"Error: Input image not found at {image_path}")
|
||
return
|
||
except Exception as e:
|
||
print(f"Error loading image: {e}")
|
||
return
|
||
|
||
if img_np.ndim == 2:
|
||
img_np = img_np[..., np.newaxis]
|
||
if img_np.shape[2] == 4:
|
||
img_np = img_np[..., :3]
|
||
|
||
|
||
if img_np.dtype == np.uint8:
|
||
img_np_float = img_np.astype(np.float32) / 255.0
|
||
elif img_np.dtype == np.uint16:
|
||
img_np_float = img_np.astype(np.float32) / 65535.0
|
||
else:
|
||
img_np_float = img_np.astype(np.float32)
|
||
|
||
height, width, channels = img_np_float.shape
|
||
mu_r, sigma_filter = get_grain_parameters(width, height, iso)
|
||
|
||
print(f"Input image: {width}x{height}x{channels}")
|
||
print(f"Parameters: μr = {mu_r}, σ_filter = {sigma_filter}")
|
||
|
||
# Use 256 u_samples for LUT, resulting in 257 entries (0 to 256 for u=0 to u=1)
|
||
variance_lut_np = precompute_variance_lut(num_u_samples=256)
|
||
variance_lut_wp = wp.array(variance_lut_np, dtype=float, device="cuda")
|
||
|
||
kernel_radius = max(1, int(np.ceil(3 * sigma_filter)))
|
||
kernel_np = create_gaussian_kernel_2d(sigma_filter, kernel_radius)
|
||
kernel_wp = wp.array(kernel_np, dtype=float, device="cuda")
|
||
print(f"Using Gaussian filter h with sigma={sigma_filter}, radius={kernel_radius}")
|
||
|
||
# --- Prepare original channel data on GPU ---
|
||
r_original_wp = wp.array(img_np_float[:, :, 0], dtype=float, device="cuda")
|
||
if channels == 3:
|
||
g_original_wp = wp.array(img_np_float[:, :, 1], dtype=float, device="cuda")
|
||
b_original_wp = wp.array(img_np_float[:, :, 2], dtype=float, device="cuda")
|
||
else: # Grayscale input
|
||
g_original_wp = r_original_wp
|
||
b_original_wp = r_original_wp
|
||
|
||
# --- Allocate noise arrays on GPU ---
|
||
noise_r_unfiltered_wp = wp.empty_like(r_original_wp)
|
||
noise_g_unfiltered_wp = wp.empty_like(g_original_wp)
|
||
noise_b_unfiltered_wp = wp.empty_like(b_original_wp)
|
||
|
||
noise_r_filtered_wp = wp.empty_like(r_original_wp)
|
||
noise_g_filtered_wp = wp.empty_like(g_original_wp)
|
||
noise_b_filtered_wp = wp.empty_like(b_original_wp)
|
||
|
||
if mono:
|
||
if channels == 1:
|
||
img_gray_np = img_np_float[:, :, 0]
|
||
else:
|
||
# Standard RGB to Luminance weights
|
||
img_gray_np = (0.299 * img_np_float[:, :, 0] +
|
||
0.587 * img_np_float[:, :, 1] +
|
||
0.114 * img_np_float[:, :, 2])
|
||
print("Generating monochromatic noise...")
|
||
u_gray_wp = wp.array(img_gray_np, dtype=float, device="cuda")
|
||
noise_image_wp = wp.empty_like(u_gray_wp)
|
||
wp.launch(kernel=generate_noise_kernel,
|
||
dim=(height, width),
|
||
inputs=[u_gray_wp, variance_lut_wp, noise_image_wp, mu_r, sigma_filter, seed],
|
||
device="cuda")
|
||
noise_filtered_wp = wp.empty_like(u_gray_wp)
|
||
wp.launch(kernel=convolve_2d_kernel,
|
||
dim=(height, width),
|
||
inputs=[noise_image_wp, kernel_wp, kernel_radius, noise_filtered_wp],
|
||
device="cuda")
|
||
noise_r_filtered_wp.assign(noise_filtered_wp)
|
||
noise_g_filtered_wp.assign(noise_filtered_wp)
|
||
noise_b_filtered_wp.assign(noise_filtered_wp)
|
||
else:
|
||
# --- Process R Channel ---
|
||
print("Processing R channel...")
|
||
wp.launch(kernel=generate_noise_kernel, dim=(height, width),
|
||
inputs=[r_original_wp, variance_lut_wp, noise_r_unfiltered_wp, mu_r, sigma_filter, seed], device="cuda")
|
||
wp.launch(kernel=convolve_2d_kernel, dim=(height, width),
|
||
inputs=[noise_r_unfiltered_wp, kernel_wp, kernel_radius, noise_r_filtered_wp], device="cuda")
|
||
|
||
if channels == 3:
|
||
# --- Process G Channel ---
|
||
print("Processing G channel...")
|
||
wp.launch(kernel=generate_noise_kernel, dim=(height, width),
|
||
inputs=[g_original_wp, variance_lut_wp, noise_g_unfiltered_wp, mu_r, sigma_filter, seed + 1], device="cuda") # Offset seed
|
||
wp.launch(kernel=convolve_2d_kernel, dim=(height, width),
|
||
inputs=[noise_g_unfiltered_wp, kernel_wp, kernel_radius, noise_g_filtered_wp], device="cuda")
|
||
|
||
# --- Process B Channel ---
|
||
print("Processing B channel...")
|
||
wp.launch(kernel=generate_noise_kernel, dim=(height, width),
|
||
inputs=[b_original_wp, variance_lut_wp, noise_b_unfiltered_wp, mu_r, sigma_filter, seed + 2], device="cuda") # Offset seed
|
||
wp.launch(kernel=convolve_2d_kernel, dim=(height, width),
|
||
inputs=[noise_b_unfiltered_wp, kernel_wp, kernel_radius, noise_b_filtered_wp], device="cuda")
|
||
else: # Grayscale: copy R channel's filtered noise to G and B components
|
||
noise_g_filtered_wp.assign(noise_r_filtered_wp) # Use assign for Warp arrays
|
||
noise_b_filtered_wp.assign(noise_r_filtered_wp)
|
||
|
||
|
||
# --- Add noise and clip ---
|
||
print("Adding noise to channels and clipping...")
|
||
r_output_wp = wp.empty_like(r_original_wp)
|
||
g_output_wp = wp.empty_like(g_original_wp)
|
||
b_output_wp = wp.empty_like(b_original_wp)
|
||
|
||
wp.launch(kernel=add_rgb_noise_and_clip_kernel,
|
||
dim=(height, width),
|
||
inputs=[r_original_wp, g_original_wp, b_original_wp,
|
||
noise_r_filtered_wp, noise_g_filtered_wp, noise_b_filtered_wp,
|
||
r_output_wp, g_output_wp, b_output_wp],
|
||
device="cuda")
|
||
|
||
# --- Copy back to host ---
|
||
output_img_np = np.zeros((height,width,3), dtype=np.float32) # Always create 3-channel output buffer
|
||
output_img_np[:, :, 0] = r_output_wp.numpy()
|
||
output_img_np[:, :, 1] = g_output_wp.numpy()
|
||
output_img_np[:, :, 2] = b_output_wp.numpy()
|
||
|
||
try:
|
||
if output_path.lower().endswith('.tiff') or output_path.lower().endswith('.tif'):
|
||
output_img_uint16 = (output_img_np * 65535.0).clip(0, 65535).astype(np.uint16)
|
||
iio.imwrite(output_path, output_img_uint16)
|
||
print(f"Output image saved to {output_path}")
|
||
elif output_path.lower().endswith('.png'):
|
||
output_img_uint8 = (output_img_np * 255.0).clip(0, 255).astype(np.uint8)
|
||
iio.imwrite(output_path, output_img_uint8)
|
||
print(f"Output image saved to {output_path}")
|
||
elif output_path.lower().endswith('.jpg') or output_path.lower().endswith('.jpeg'):
|
||
output_img_uint8 = (output_img_np * 255.0).clip(0, 255).astype(np.uint8)
|
||
iio.imwrite(output_path, output_img_uint8, quality=95)
|
||
print(f"Output image saved to {output_path}")
|
||
except Exception as e:
|
||
print(f"Error saving image: {e}")
|
||
|
||
if __name__ == "__main__":
|
||
parser = argparse.ArgumentParser(
|
||
description="Apply realistic film grain (Zhang et al. 2023 method)."
|
||
)
|
||
parser.add_argument("input_image", help="Path to the input image (TIFF, PNG, JPG, or RAW (ARW/DNG) format)")
|
||
parser.add_argument("output_image", help="Path to save the output image (TIFF (16-bit), PNG, JPG format)")
|
||
parser.add_argument(
|
||
"--iso",
|
||
type=int,
|
||
default=400,
|
||
help="Target film ISO to simulate (e.g., 100, 400, 1600).",
|
||
)
|
||
parser.add_argument(
|
||
"--seed", type=int, default=42, help="Random seed for noise generation"
|
||
)
|
||
parser.add_argument(
|
||
"--mono", action="store_true", help="Apply monochrome film grain across channels based on luminance", default=False
|
||
)
|
||
|
||
args = parser.parse_args()
|
||
|
||
|
||
render_film_grain(
|
||
args.input_image, args.iso, args.output_image, args.seed, args.mono
|
||
) |