commit e7cdb8dd6f2b9636e8ec5dc84cf73dc0776f7e9f Author: Frank Schwenk Date: Sat May 30 11:33:07 2026 +0200 amayer5125 is savage diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..493c3aa --- /dev/null +++ b/.env.example @@ -0,0 +1,3 @@ +# Copy to .env and fill in your key: cp .env.example .env +# OpenRouter API key — https://openrouter.ai/keys +OPENROUTER_API_KEY= diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4417c1f --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +__pycache__/ +*.py[cod] +*.egg-info/ +.eggs/ +dist/ +build/ +.pytest_cache/ +.venv/ +venv/ +.env diff --git a/README.md b/README.md new file mode 100644 index 0000000..ea98af4 --- /dev/null +++ b/README.md @@ -0,0 +1,78 @@ +# Image Pipeline + +Modular Python framework for chaining image processing steps after Darktable export. + +Each pipeline is a Python script that defines a DAG of processing steps. Every step writes its output to a numbered subfolder inside a timestamped run directory. + +## Requirements + +- Python 3.11+ +- [ImageMagick](https://imagemagick.org/) (`magick` or `convert` on PATH) + +## Installation + +```bash +cd /path/to/imagepipeline +pip install -e ".[dev]" +``` + +## Quick Start + +Edit the input path in `pipelines/example_grayscale.py`, then run: + +```bash +python pipelines/example_grayscale.py +``` + +Or from Python: + +```python +from pathlib import Path +from imagepipeline import Pipeline + +with Pipeline(name="my_run", input_dir=Path("/path/to/export")) as p: + gray = p.step("imagemagick_grayscale", inputs="input") + p.run() +``` + +## Output Structure + +Each run creates a folder like `my_run_20260527143022/`: + +``` +my_run_20260527143022/ +├── pipeline_manifest.json +├── input/ # symlinks to source images +├── imagemagick_grayscale_01/ +│ └── photo.jpg +└── ... +``` + +Step folders are named `{module_name}_{nn}` (two-digit counter per module name). + +## Writing Pipelines + +Pipelines are plain Python scripts. Reference previous steps via `StepRef` objects returned by `p.step()`: + +```python +with Pipeline(name="colorsplash", input_dir=INPUT) as p: + rembg_out = p.step("rembg", inputs="input") + bw = p.step("imagemagick_grayscale", inputs="input") + combined = p.step("composite", inputs=[bw, rembg_out], mode="foreground_over") + p.step("darktable_style", inputs=combined, style="vintage.dtstyle") + p.run() +``` + +- `"input"` refers to the original input directory +- Parameters are passed as kwargs and validated against each module's schema +- Multiple uses of the same module get separate numbered folders + +## Adding Modules + +See [docs/MODULE_DEVELOPMENT.md](docs/MODULE_DEVELOPMENT.md). + +## Tests + +```bash +pytest +``` diff --git a/docs/MODULE_DEVELOPMENT.md b/docs/MODULE_DEVELOPMENT.md new file mode 100644 index 0000000..d354293 --- /dev/null +++ b/docs/MODULE_DEVELOPMENT.md @@ -0,0 +1,275 @@ +# Module development guide + +This document explains how to add new processing modules to the image pipeline framework. + +## Overview + +Each module is a Python class that: + +1. Declares a unique `name` +2. Defines accepted parameters via `parameters()` +3. Implements `run(ctx)` to process images +4. Registers itself with `@register` + +Pipeline scripts reference modules by name and pass parameters as kwargs. + +## Quick Start + +Create a new file in `imagepipeline/modules/`, e.g. `my_module.py`: + +```python +from imagepipeline.core.context import ModuleContext +from imagepipeline.core.params import Param +from imagepipeline.modules.base import BaseModule +from imagepipeline.modules.registry import register + + +@register +class MyModule(BaseModule): + name = "my_module" + description = "Short description for documentation" + + @classmethod + def parameters(cls) -> dict[str, Param]: + return { + "strength": Param( + "float", + default=1.0, + help="Processing strength from 0.0 to 1.0", + ), + } + + def run(self, ctx: ModuleContext) -> None: + ctx.output_dir.mkdir(parents=True, exist_ok=True) + for src in ctx.input_paths: + dst = ctx.output_dir / src.name + # ... write dst ... +``` + +Import the module in `imagepipeline/modules/__init__.py` so it registers on load: + +```python +import imagepipeline.modules.my_module # noqa: F401 +``` + +Use it in a pipeline script: + +```python +with Pipeline(name="my_run", input_dir=INPUT) as p: + result = p.step("my_module", inputs="input", strength=0.8) + p.run() +``` + +## ModuleContext + +Every `run()` receives a `ModuleContext`: + +| Field | Description | +|-------|-------------| +| `input_paths` | Flat list of input images for this step | +| `matched_groups` | For multi-input steps: list of path groups matched by stem | +| `output_dir` | Directory where outputs must be written | +| `params` | Validated parameters | +| `pipeline_output_root` | Root folder of the current run | +| `step_id` | Internal step identifier (e.g. `my_module_01`) | + +### Single-input modules + +Use `ctx.input_paths` — one path per image in the batch: + +```python +for src in ctx.input_paths: + dst = ctx.output_dir / src.name + process(src, dst) +``` + +### Multi-input modules + +When a step receives multiple input references (e.g. background + foreground), the runner matches images by filename stem across all sources. + +Use `ctx.matched_groups` — each entry is a list of paths with the same stem: + +```python +for group in ctx.matched_groups: + background, foreground = group + dst = ctx.output_dir / foreground.name + composite(background, foreground, dst) +``` + +If a stem is missing in any source, the pipeline fails with a clear error before your module runs. + +## Parameters + +Define parameters with `Param`: + +```python +Param("string", default="value", help="Description") +Param("int", required=True, help="Required integer") +Param("float", default=0.5) +Param("bool", default=False) +Param("path") +Param("list", default=[]) +Param("string", choices=("a", "b", "c")) +``` + +Supported types: `string`, `int`, `float`, `bool`, `path`, `list`. + +Unknown kwargs passed to `p.step()` raise a validation error. Missing required parameters raise as well. + +## Base Classes + +### `BaseModule` + +Use for pure Python processing or custom logic. + +Optional class attributes: + +- `supported_input_formats` — tuple of extensions (informational) +- `description` — short module description + +Optional methods: + +- `check_dependencies()` — raise `DependencyError` if tools are missing +- `list_output_images(ctx)` — default: all images in `output_dir` + +### `SubprocessModule` + +Use for CLI tools (ImageMagick, gmic, darktable-cli, rembg): + +```python +from imagepipeline.modules.base import SubprocessModule +from imagepipeline.utils.subprocess import run_command + + +@register +class MyCliModule(SubprocessModule): + name = "my_cli_module" + command_candidates = ("my-tool", "my-tool-fallback") + + @classmethod + def check_dependencies(cls) -> None: + super().check_dependencies() # verifies command_candidates + + def run(self, ctx: ModuleContext) -> None: + tool = self.resolve_command() + for src in ctx.input_paths: + dst = ctx.output_dir / src.name + run_command([tool, str(src), str(dst)]) +``` + +`run_command()` captures stderr and raises `RuntimeError` with the command output on failure. + +## Output Conventions + +- Write one output file per input, keeping the **same filename** (especially the stem) so downstream steps can match batches. +- Only write image files into `ctx.output_dir` — the runner discovers outputs by scanning for known image extensions. +- Create `ctx.output_dir` if your tool does not do so automatically. + +## Step Folder Naming + +The runner assigns output folders automatically: `{module_name}_{nn}`. + +Using the same module twice in one pipeline produces separate folders: + +``` +imagemagick_grayscale_01/ +imagemagick_grayscale_02/ +darktable_style_01/ +darktable_style_02/ +``` + +You do not choose folder names in the module. + +## Format Warnings + +Declare `supported_input_formats` on your module. When chaining modules with incompatible formats (e.g. JPEG without alpha after rembg), document expected behavior in your module's `description` and consider converting explicitly in `run()`. + +## Testing + +Add tests under `tests/`: + +1. **Unit tests** — parameter validation, matching logic (no external tools) +2. **Integration tests** — run the module against a real image; skip if CLI tool missing: + +```python +import shutil +import pytest + +pytestmark = pytest.mark.skipif( + not shutil.which("magick") and not shutil.which("convert"), + reason="ImageMagick not installed", +) +``` + +## Checklist for New Modules + +- [ ] Unique `name` (snake_case) +- [ ] `@register` decorator +- [ ] `parameters()` documents all kwargs +- [ ] `run()` writes outputs preserving stems +- [ ] `check_dependencies()` if external tools required +- [ ] Import in `imagepipeline/modules/__init__.py` +- [ ] Test added (unit and/or integration) + +## AI Modules + +Optional dependencies: `pip install -e ".[ai]"` (numpy, Pillow, torch). + +All AI modules inherit from `AIModule` (`imagepipeline/modules/ai_base.py`), which adds: + +| Parameter | Default | Description | +|-----------|---------|-------------| +| `skip_existing` | `True` | Skip when output file already exists | +| `max_edge` | `2048` | Downscale long edge before inference (`0` = full resolution) | +| `device` | `"cpu"` | Inference device (v1: CPU only) | + +Use `iter_input_images(ctx, processor)` in `run()` — it handles skip, resize, upscale, and per-image ETA logging. + +### Built-in AI modules + +| Module | Backend | Notes | +|--------|---------|-------| +| `ai_exposure` | Zero-DCE++ (PyTorch) | Weights auto-download on first run | +| `ai_tone_map` | HDRNet or CLAHE fallback | Set `checkpoint` to a creotiv-format `.pth`; empty uses CLAHE | +| `openrouter_edit` | OpenRouter API | Requires `OPENROUTER_API_KEY`; default model Flux Klein 4B | +| `comfy_flux_edit` | ComfyUI HTTP API | Experimental; export workflow to `workflows/comfy/flux_klein_edit_api.json` | + +Example pipeline: `pipelines/example_ai.py`. + +### Adding a new AI module + +```python +from imagepipeline.modules.ai_base import AIModule + +@register +class MyAIModule(AIModule): + name = "my_ai_module" + + @classmethod + def parameters(cls) -> dict[str, Param]: + params = dict(super().parameters()) + params["strength"] = Param("float", default=1.0) + return params + + def run(self, ctx: ModuleContext) -> None: + self.configure_torch(ctx.params["device"]) + + def process(src: Path, dst: Path, index: int, total: int) -> None: + ... + + self.iter_input_images(ctx, process) +``` + +Put shared inference code under `imagepipeline/ai/`. + +## Planned Modules + +These slots follow the same interface — no pipeline API changes needed: + +| Module | Base class | Tool | +|--------|-----------|------| +| `imagemagick_resize` | SubprocessModule | ImageMagick | +| `darktable_style` | SubprocessModule | darktable-cli | +| `rembg` | SubprocessModule | rembg | +| `gmic` | SubprocessModule | gmic | +| `composite` | BaseModule | Pillow | diff --git a/imagepipeline/__init__.py b/imagepipeline/__init__.py new file mode 100644 index 0000000..e1824be --- /dev/null +++ b/imagepipeline/__init__.py @@ -0,0 +1,6 @@ +"""Image pipeline framework.""" + +from imagepipeline.core.pipeline import Pipeline + +__all__ = ["Pipeline"] +__version__ = "0.1.0" diff --git a/imagepipeline/ai/__init__.py b/imagepipeline/ai/__init__.py new file mode 100644 index 0000000..609310f --- /dev/null +++ b/imagepipeline/ai/__init__.py @@ -0,0 +1 @@ +"""Local AI inference helpers.""" diff --git a/imagepipeline/ai/cache.py b/imagepipeline/ai/cache.py new file mode 100644 index 0000000..8669057 --- /dev/null +++ b/imagepipeline/ai/cache.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +import hashlib +import urllib.request +from pathlib import Path + +CACHE_DIR = Path.home() / ".cache" / "imagepipeline" / "weights" + +WEIGHT_URLS: dict[str, str] = { + "zero_dce_pp": ( + "https://github.com/Li-Chongyi/Zero-DCE_extension/raw/main/" + "Zero-DCE++/snapshots_Zero_DCE++/Epoch99.pth" + ), +} + + +def cache_path(name: str, filename: str | None = None) -> Path: + CACHE_DIR.mkdir(parents=True, exist_ok=True) + if filename: + return CACHE_DIR / filename + return CACHE_DIR / name + + +def ensure_weight(name: str, *, filename: str | None = None) -> Path: + if name not in WEIGHT_URLS: + raise KeyError(f"Unknown weight bundle: {name}") + dest_name = filename or f"{name}.pth" + dest = cache_path(name, dest_name) + if dest.is_file() and dest.stat().st_size > 0: + return dest + url = WEIGHT_URLS[name] + tmp = dest.with_suffix(dest.suffix + ".part") + request = urllib.request.Request(url, headers={"User-Agent": "imagepipeline/0.1"}) + with urllib.request.urlopen(request, timeout=300) as response: + data = response.read() + tmp.write_bytes(data) + tmp.replace(dest) + return dest diff --git a/imagepipeline/ai/classical_tone.py b/imagepipeline/ai/classical_tone.py new file mode 100644 index 0000000..3e2eeda --- /dev/null +++ b/imagepipeline/ai/classical_tone.py @@ -0,0 +1,99 @@ +from __future__ import annotations + +import numpy as np + + +def apply_clahe_tone(image_rgb, *, clip_limit: float = 2.0, strength: float = 1.0): + """Classical CLAHE tone mapping on the L channel in LAB space.""" + from PIL import Image + + if not isinstance(image_rgb, Image.Image): + image_rgb = Image.fromarray(image_rgb) + + arr = np.asarray(image_rgb, dtype=np.uint8) + lab = _rgb_to_lab(arr.astype(np.float32)) + l_channel = lab[:, :, 0] + enhanced_l = _clahe_1d(l_channel, clip_limit=clip_limit) + lab[:, :, 0] = l_channel * (1.0 - strength) + enhanced_l * strength + out = _lab_to_rgb(lab) + return Image.fromarray(np.clip(out, 0, 255).astype(np.uint8)) + + +def _rgb_to_lab(rgb: np.ndarray) -> np.ndarray: + matrix = np.array( + [ + [0.4124564, 0.3575761, 0.1804375], + [0.2126729, 0.7151522, 0.0721750], + [0.0193339, 0.1191920, 0.9503041], + ], + dtype=np.float32, + ) + linear = np.where(rgb <= 0.04045, rgb / 12.92, ((rgb + 0.055) / 1.055) ** 2.4) + linear = linear / 255.0 + xyz = linear @ matrix.T + xyz = xyz * np.array([1 / 0.95047, 1.0, 1 / 1.08883], dtype=np.float32) + epsilon = 216 / 24389 + kappa = 24389 / 27 + + def f(t): + return np.where(t > epsilon, np.cbrt(t), (kappa * t + 16) / 116) + + fx, fy, fz = f(xyz[..., 0]), f(xyz[..., 1]), f(xyz[..., 2]) + lab = np.stack([116 * fy - 16, 500 * (fx - fy), 200 * (fy - fz)], axis=-1) + return lab + + +def _lab_to_rgb(lab: np.ndarray) -> np.ndarray: + fy = (lab[..., 0] + 16) / 116 + fx = lab[..., 1] / 500 + fy + fz = fy - lab[..., 2] / 200 + + def finv(t): + t3 = t ** 3 + return np.where(t3 > 216 / 24389, t3, (116 * t - 16) / kappa) + + kappa = 24389 / 27 + x = finv(fx) * 0.95047 + y = finv(fy) + z = finv(fz) * 1.08883 + xyz = np.stack([x, y, z], axis=-1) + + matrix = np.array( + [ + [3.2404542, -1.5371385, -0.4985314], + [-0.9692660, 1.8760108, 0.0415560], + [0.0556434, -0.2040259, 1.0572252], + ], + dtype=np.float32, + ) + linear = xyz @ matrix.T + rgb = np.where( + linear <= 0.0031308, + 12.92 * linear, + 1.055 * np.power(np.clip(linear, 0, None), 1 / 2.4) - 0.055, + ) + return rgb * 255.0 + + +def _clahe_1d(l_channel: np.ndarray, *, clip_limit: float, tile_size: int = 8) -> np.ndarray: + height, width = l_channel.shape + tile_h = max(1, height // tile_size) + tile_w = max(1, width // tile_size) + out = np.zeros_like(l_channel, dtype=np.float32) + counts = np.zeros_like(l_channel, dtype=np.float32) + + for y in range(0, height, tile_h): + for x in range(0, width, tile_w): + tile = l_channel[y : y + tile_h, x : x + tile_w] + hist, _ = np.histogram(tile, bins=256, range=(0, 256)) + clip_val = max(1, int(clip_limit * tile.size / 256)) + excess = np.maximum(hist - clip_val, 0).sum() + hist = np.minimum(hist, clip_val) + hist += excess // 256 + cdf = hist.cumsum().astype(np.float32) + cdf = (cdf - cdf.min()) / max(cdf.max() - cdf.min(), 1.0) * 255.0 + mapped = cdf[tile.astype(np.int32).clip(0, 255)] + out[y : y + tile_h, x : x + tile_w] += mapped + counts[y : y + tile_h, x : x + tile_w] += 1.0 + + return out / np.maximum(counts, 1.0) diff --git a/imagepipeline/ai/hdrnet/__init__.py b/imagepipeline/ai/hdrnet/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/imagepipeline/ai/hdrnet/model.py b/imagepipeline/ai/hdrnet/model.py new file mode 100644 index 0000000..6b3b3cb --- /dev/null +++ b/imagepipeline/ai/hdrnet/model.py @@ -0,0 +1,250 @@ +from __future__ import annotations + +import math + +import numpy as np +import torch +import torch.nn as nn +import torch.nn.functional as F + +from imagepipeline.ai.hdrnet.slice import batch_bilateral_slice + + +class ConvBlock(nn.Module): + def __init__( + self, + inc, + outc, + kernel_size=3, + padding=1, + stride=1, + use_bias=True, + activation=nn.ReLU, + batch_norm=False, + ) -> None: + super().__init__() + self.conv = nn.Conv2d( + int(inc), int(outc), kernel_size, padding=padding, stride=stride, bias=use_bias + ) + self.activation = activation() if activation else None + self.bn = nn.BatchNorm2d(outc) if batch_norm else None + if use_bias and not batch_norm: + self.conv.bias.data.fill_(0.0) + torch.nn.init.kaiming_uniform_(self.conv.weight) + + def forward(self, x): + x = self.conv(x) + if self.bn is not None: + x = self.bn(x) + if self.activation is not None: + x = self.activation(x) + return x + + +class FC(nn.Module): + def __init__(self, inc, outc, activation=nn.ReLU, batch_norm=False) -> None: + super().__init__() + self.fc = nn.Linear(int(inc), int(outc), bias=(not batch_norm)) + self.activation = activation() if activation else None + self.bn = nn.BatchNorm1d(outc) if batch_norm else None + if not batch_norm: + self.fc.bias.data.fill_(0.0) + torch.nn.init.kaiming_uniform_(self.fc.weight) + + def forward(self, x): + x = self.fc(x) + if self.bn is not None: + x = self.bn(x) + if self.activation is not None: + x = self.activation(x) + return x + + +class Slice(nn.Module): + def forward(self, bilateral_grid, guidemap): + bilateral_grid = bilateral_grid.permute(0, 3, 4, 2, 1) + guidemap = guidemap.squeeze(1) + coeffs = batch_bilateral_slice(bilateral_grid, guidemap).permute(0, 3, 1, 2) + return coeffs + + +class ApplyCoeffs(nn.Module): + def forward(self, coeff, full_res_input): + r = torch.sum(full_res_input * coeff[:, 0:3, :, :], dim=1, keepdim=True) + coeff[ + :, 9:10, :, : + ] + g = torch.sum(full_res_input * coeff[:, 3:6, :, :], dim=1, keepdim=True) + coeff[ + :, 10:11, :, : + ] + b = torch.sum(full_res_input * coeff[:, 6:9, :, :], dim=1, keepdim=True) + coeff[ + :, 11:12, :, : + ] + return torch.cat([r, g, b], dim=1) + + +class GuideNN(nn.Module): + def __init__(self, params) -> None: + super().__init__() + self.conv1 = ConvBlock(3, params["guide_complexity"], kernel_size=1, padding=0, batch_norm=True) + self.conv2 = ConvBlock( + params["guide_complexity"], 1, kernel_size=1, padding=0, activation=nn.Sigmoid + ) + + def forward(self, x): + return self.conv2(self.conv1(x)) + + +class Coeffs(nn.Module): + def __init__(self, nin=4, nout=3, params=None) -> None: + super().__init__() + self.params = params + self.nin = nin + self.nout = nout + lb = params["luma_bins"] + cm = params["channel_multiplier"] + sb = params["spatial_bin"] + bn = params["batch_norm"] + nsize = params["net_input_size"] + + n_layers_splat = int(np.log2(nsize / sb)) + self.splat_features = nn.ModuleList() + prev_ch = 3 + for index in range(n_layers_splat): + use_bn = bn if index > 0 else False + out_ch = cm * (2**index) * lb + self.splat_features.append( + ConvBlock(prev_ch, out_ch, 3, stride=2, batch_norm=use_bn) + ) + prev_ch = out_ch + splat_ch = prev_ch + + n_layers_global = int(np.log2(sb / 4)) + self.global_features_conv = nn.ModuleList() + self.global_features_fc = nn.ModuleList() + for _ in range(n_layers_global): + self.global_features_conv.append( + ConvBlock(prev_ch, cm * 8 * lb, 3, stride=2, batch_norm=bn) + ) + prev_ch = cm * 8 * lb + + n_total = n_layers_splat + n_layers_global + prev_ch = int(prev_ch * (nsize / 2**n_total) ** 2) + self.global_features_fc.append(FC(prev_ch, 32 * cm * lb, batch_norm=bn)) + self.global_features_fc.append(FC(32 * cm * lb, 16 * cm * lb, batch_norm=bn)) + self.global_features_fc.append(FC(16 * cm * lb, 8 * cm * lb, activation=None, batch_norm=bn)) + + self.local_features = nn.ModuleList( + [ + ConvBlock(splat_ch, 8 * cm * lb, 3, batch_norm=bn), + ConvBlock(8 * cm * lb, 8 * cm * lb, 3, activation=None, use_bias=False), + ] + ) + self.conv_out = ConvBlock( + 8 * cm * lb, lb * nout * nin, 1, padding=0, activation=None + ) + self.relu = nn.ReLU() + + def forward(self, lowres_input): + params = self.params + bs = lowres_input.shape[0] + lb = params["luma_bins"] + cm = params["channel_multiplier"] + + x = lowres_input + for layer in self.splat_features: + x = layer(x) + splat_features = x + + for layer in self.global_features_conv: + x = layer(x) + x = x.view(bs, -1) + for layer in self.global_features_fc: + x = layer(x) + global_features = x + + x = splat_features + for layer in self.local_features: + x = layer(x) + fusion = self.relu(x + global_features.view(bs, 8 * cm * lb, 1, 1)) + x = self.conv_out(fusion) + return torch.stack(torch.split(x, self.nin * self.nout, 1), 2) + + +class HDRPointwiseNN(nn.Module): + def __init__(self, params) -> None: + super().__init__() + self.coeffs = Coeffs(params=params) + self.guide = GuideNN(params=params) + self.slice = Slice() + self.apply_coeffs = ApplyCoeffs() + + def forward(self, lowres, fullres): + coeffs = self.coeffs(lowres) + guide = self.guide(fullres) + slice_coeffs = self.slice(coeffs, guide) + return self.apply_coeffs(slice_coeffs, fullres) + + +def default_hdrnet_params(net_input_size: int = 256) -> dict: + return { + "luma_bins": 8, + "channel_multiplier": 1, + "spatial_bin": 16, + "batch_norm": True, + "net_input_size": net_input_size, + "guide_complexity": 16, + } + + +def load_hdrnet_checkpoint(checkpoint_path, device: torch.device): + state = torch.load(checkpoint_path, map_location=device, weights_only=False) + if "model_params" in state: + params = state["model_params"] + del state["model_params"] + else: + params = default_hdrnet_params() + model = HDRPointwiseNN(params=params) + model.load_state_dict(state) + model.to(device) + model.eval() + return model, params + + +def resize_rgb_array(arr: np.ndarray, size: int) -> np.ndarray: + from PIL import Image + + image = Image.fromarray(arr.astype(np.uint8)) + short = min(image.size) + scale = size / short + new_size = (max(1, round(image.size[0] * scale)), max(1, round(image.size[1] * scale))) + return np.asarray(image.resize(new_size, Image.Resampling.NEAREST)) + + +def enhance_image_hdrnet( + model: HDRPointwiseNN, + image_rgb, + *, + device: torch.device, + net_input_size: int, + strength: float = 1.0, +): + from PIL import Image + + if not isinstance(image_rgb, Image.Image): + image_rgb = Image.fromarray(image_rgb) + + full_arr = np.asarray(image_rgb, dtype=np.float32) + low_arr = resize_rgb_array(full_arr, net_input_size) + low = torch.from_numpy(low_arr).permute(2, 0, 1).unsqueeze(0).float() / 255.0 + full = torch.from_numpy(full_arr).permute(2, 0, 1).unsqueeze(0).float() / 255.0 + low = low.to(device) + full = full.to(device) + + with torch.no_grad(): + out = model(low, full) + if strength < 1.0: + out = full * (1.0 - strength) + out * strength + out = torch.clamp(out, 0.0, 1.0) + + result = (out.squeeze(0).permute(1, 2, 0).cpu().numpy() * 255.0).astype(np.uint8) + return Image.fromarray(result) diff --git a/imagepipeline/ai/hdrnet/slice.py b/imagepipeline/ai/hdrnet/slice.py new file mode 100644 index 0000000..6bc964f --- /dev/null +++ b/imagepipeline/ai/hdrnet/slice.py @@ -0,0 +1,108 @@ +from __future__ import annotations + +import torch + + +def lerp_weight(x, xs): + dx = x - xs + abs_dx = torch.abs(dx) + return torch.maximum( + torch.tensor(1.0, device=x.device) - abs_dx, + torch.tensor(0.0, device=x.device), + ) + + +def smoothed_abs(x, eps): + return torch.sqrt(torch.multiply(x, x) + eps) + + +def smoothed_lerp_weight(x, xs): + eps = torch.tensor(1e-8, dtype=torch.float32, device=x.device) + dx = x - xs + abs_dx = smoothed_abs(dx, eps) + return torch.maximum( + torch.tensor(1.0, device=x.device) - abs_dx, + torch.tensor(0.0, device=x.device), + ) + + +def _bilateral_slice(grid, guide): + device = grid.device + ii, jj = torch.meshgrid( + [ + torch.arange(guide.shape[0], device=device), + torch.arange(guide.shape[1], device=device), + ], + indexing="ij", + ) + + scale_i = grid.shape[0] / guide.shape[0] + scale_j = grid.shape[1] / guide.shape[1] + + gif = (ii + 0.5) * scale_i + gjf = (jj + 0.5) * scale_j + gkf = guide * grid.shape[2] + + gi0 = torch.floor(gif - 0.5).to(torch.int32) + gj0 = torch.floor(gjf - 0.5).to(torch.int32) + gk0 = torch.floor(gkf - 0.5).to(torch.int32) + gi1 = gi0 + 1 + gj1 = gj0 + 1 + gk1 = gk0 + 1 + + wi0 = lerp_weight(gi0 + 0.5, gif) + wi1 = lerp_weight(gi1 + 0.5, gif) + wj0 = lerp_weight(gj0 + 0.5, gjf) + wj1 = lerp_weight(gj1 + 0.5, gjf) + wk0 = smoothed_lerp_weight(gk0 + 0.5, gkf) + wk1 = smoothed_lerp_weight(gk1 + 0.5, gkf) + + w_000 = wi0 * wj0 * wk0 + w_001 = wi0 * wj0 * wk1 + w_010 = wi0 * wj1 * wk0 + w_011 = wi0 * wj1 * wk1 + w_100 = wi1 * wj0 * wk0 + w_101 = wi1 * wj0 * wk1 + w_110 = wi1 * wj1 * wk0 + w_111 = wi1 * wj1 * wk1 + + gi0c = gi0.clip(0, grid.shape[0] - 1).to(torch.long) + gj0c = gj0.clip(0, grid.shape[1] - 1).to(torch.long) + gk0c = gk0.clip(0, grid.shape[2] - 1).to(torch.long) + gi1c = (gi0 + 1).clip(0, grid.shape[0] - 1).to(torch.long) + gj1c = (gj0 + 1).clip(0, grid.shape[1] - 1).to(torch.long) + gk1c = (gk0 + 1).clip(0, grid.shape[2] - 1).to(torch.long) + + grid_val_000 = grid[gi0c, gj0c, gk0c, :] + grid_val_001 = grid[gi0c, gj0c, gk1c, :] + grid_val_010 = grid[gi0c, gj1c, gk0c, :] + grid_val_011 = grid[gi0c, gj1c, gk1c, :] + grid_val_100 = grid[gi1c, gj0c, gk0c, :] + grid_val_101 = grid[gi1c, gj0c, gk1c, :] + grid_val_110 = grid[gi1c, gj1c, gk0c, :] + grid_val_111 = grid[gi1c, gj1c, gk1c, :] + + w_000, w_001, w_010, w_011 = map( + torch.atleast_3d, (w_000, w_001, w_010, w_011) + ) + w_100, w_101, w_110, w_111 = map( + torch.atleast_3d, (w_100, w_101, w_110, w_111) + ) + + return ( + torch.multiply(w_000, grid_val_000) + + torch.multiply(w_001, grid_val_001) + + torch.multiply(w_010, grid_val_010) + + torch.multiply(w_011, grid_val_011) + + torch.multiply(w_100, grid_val_100) + + torch.multiply(w_101, grid_val_101) + + torch.multiply(w_110, grid_val_110) + + torch.multiply(w_111, grid_val_111) + ) + + +def batch_bilateral_slice(grid, guide): + results = [] + for index in range(grid.shape[0]): + results.append(_bilateral_slice(grid[index], guide[index]).unsqueeze(0)) + return torch.concat(results, dim=0) diff --git a/imagepipeline/ai/imaging.py b/imagepipeline/ai/imaging.py new file mode 100644 index 0000000..894278b --- /dev/null +++ b/imagepipeline/ai/imaging.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +import shutil +from pathlib import Path + +from imagepipeline.utils.subprocess import require_command, run_command + + +def magick_command() -> str: + return require_command("magick", "convert") + + +def resize_max_edge(src: Path, dst: Path, max_edge: int) -> tuple[int, int, int, int]: + """Resize so longest edge is at most max_edge. Returns (orig_w, orig_h, work_w, work_h).""" + from PIL import Image + + with Image.open(src) as image: + orig_w, orig_h = image.size + if max_edge <= 0 or max(orig_w, orig_h) <= max_edge: + shutil.copy2(src, dst) + return orig_w, orig_h, orig_w, orig_h + + command = magick_command() + run_command( + [ + command, + str(src), + "-resize", + f"{max_edge}x{max_edge}>", + str(dst), + ] + ) + with Image.open(dst) as image: + new_w, new_h = image.size + return orig_w, orig_h, new_w, new_h + + +def resize_to_size(src: Path, dst: Path, width: int, height: int) -> None: + command = magick_command() + run_command( + [ + command, + str(src), + "-resize", + f"{width}x{height}!", + str(dst), + ] + ) + + +def load_pil_rgb(path: Path): + from PIL import Image + + return Image.open(path).convert("RGB") + + +def save_pil_image(image, path: Path) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + image.save(path, quality=95) diff --git a/imagepipeline/ai/zero_dce.py b/imagepipeline/ai/zero_dce.py new file mode 100644 index 0000000..be1c69d --- /dev/null +++ b/imagepipeline/ai/zero_dce.py @@ -0,0 +1,111 @@ +from __future__ import annotations + +import torch +import torch.nn as nn +import torch.nn.functional as F + + +class CSDN_Tem(nn.Module): + def __init__(self, in_ch: int, out_ch: int) -> None: + super().__init__() + self.depth_conv = nn.Conv2d( + in_channels=in_ch, + out_channels=in_ch, + kernel_size=3, + stride=1, + padding=1, + groups=in_ch, + ) + self.point_conv = nn.Conv2d( + in_channels=in_ch, + out_channels=out_ch, + kernel_size=1, + stride=1, + padding=0, + groups=1, + ) + + def forward(self, input_tensor: torch.Tensor) -> torch.Tensor: + out = self.depth_conv(input_tensor) + return self.point_conv(out) + + +class EnhanceNetNoPool(nn.Module): + def __init__(self, scale_factor: int = 1) -> None: + super().__init__() + self.relu = nn.ReLU(inplace=True) + self.scale_factor = scale_factor + self.upsample = nn.UpsamplingBilinear2d(scale_factor=self.scale_factor) + number_f = 32 + + self.e_conv1 = CSDN_Tem(3, number_f) + self.e_conv2 = CSDN_Tem(number_f, number_f) + self.e_conv3 = CSDN_Tem(number_f, number_f) + self.e_conv4 = CSDN_Tem(number_f, number_f) + self.e_conv5 = CSDN_Tem(number_f * 2, number_f) + self.e_conv6 = CSDN_Tem(number_f * 2, number_f) + self.e_conv7 = CSDN_Tem(number_f * 2, 3) + + def enhance(self, x: torch.Tensor, x_r: torch.Tensor) -> torch.Tensor: + x = x + x_r * (torch.pow(x, 2) - x) + x = x + x_r * (torch.pow(x, 2) - x) + x = x + x_r * (torch.pow(x, 2) - x) + enhance_image_1 = x + x_r * (torch.pow(x, 2) - x) + x = enhance_image_1 + x_r * (torch.pow(enhance_image_1, 2) - enhance_image_1) + x = x + x_r * (torch.pow(x, 2) - x) + x = x + x_r * (torch.pow(x, 2) - x) + return x + x_r * (torch.pow(x, 2) - x) + + def forward(self, x: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]: + if self.scale_factor == 1: + x_down = x + else: + x_down = F.interpolate( + x, scale_factor=1 / self.scale_factor, mode="bilinear" + ) + + x1 = self.relu(self.e_conv1(x_down)) + x2 = self.relu(self.e_conv2(x1)) + x3 = self.relu(self.e_conv3(x2)) + x4 = self.relu(self.e_conv4(x3)) + x5 = self.relu(self.e_conv5(torch.cat([x3, x4], 1))) + x6 = self.relu(self.e_conv6(torch.cat([x2, x5], 1))) + x_r = F.tanh(self.e_conv7(torch.cat([x1, x6], 1))) + if self.scale_factor != 1: + x_r = self.upsample(x_r) + return self.enhance(x, x_r), x_r + + +def load_zero_dce_model(weights_path, device: torch.device) -> EnhanceNetNoPool: + model = EnhanceNetNoPool(scale_factor=1) + state = torch.load(weights_path, map_location=device, weights_only=False) + model.load_state_dict(state) + model.to(device) + model.eval() + return model + + +def enhance_image( + model: EnhanceNetNoPool, + image_rgb, + *, + device: torch.device, + strength: float = 1.0, +): + import numpy as np + from PIL import Image + + if not isinstance(image_rgb, Image.Image): + image_rgb = Image.fromarray(image_rgb) + + arr = np.asarray(image_rgb, dtype=np.float32) / 255.0 + tensor = torch.from_numpy(arr).permute(2, 0, 1).unsqueeze(0).to(device) + with torch.no_grad(): + enhanced, _ = model(tensor) + if strength < 1.0: + enhanced = tensor * (1.0 - strength) + enhanced * strength + enhanced = torch.clamp(enhanced, 0.0, 1.0) + out = (enhanced.squeeze(0).permute(1, 2, 0).cpu().numpy() * 255.0).astype( + np.uint8 + ) + return Image.fromarray(out) diff --git a/imagepipeline/cli.py b/imagepipeline/cli.py new file mode 100644 index 0000000..0a841e2 --- /dev/null +++ b/imagepipeline/cli.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +import argparse +import sys +from pathlib import Path + +from imagepipeline.modules.registry import list_modules + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser( + description="Image pipeline utilities", + ) + subparsers = parser.add_subparsers(dest="command") + + list_parser = subparsers.add_parser("list-modules", help="List registered modules") + list_parser.set_defaults(func=_cmd_list_modules) + + args = parser.parse_args(argv) + if not getattr(args, "func", None): + parser.print_help() + return 1 + return args.func(args) + + +def _cmd_list_modules(args: argparse.Namespace) -> int: + for name in list_modules(): + print(name) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/imagepipeline/core/__init__.py b/imagepipeline/core/__init__.py new file mode 100644 index 0000000..729c980 --- /dev/null +++ b/imagepipeline/core/__init__.py @@ -0,0 +1 @@ +"""Core pipeline engine.""" diff --git a/imagepipeline/core/context.py b/imagepipeline/core/context.py new file mode 100644 index 0000000..a4db3cd --- /dev/null +++ b/imagepipeline/core/context.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, TYPE_CHECKING + +if TYPE_CHECKING: + from imagepipeline.core.log import PipelineLogger + + +@dataclass +class ModuleContext: + """Runtime context passed to each module's run() method.""" + + input_paths: list[Path] + output_dir: Path + params: dict[str, Any] + pipeline_output_root: Path + step_id: str + matched_groups: list[list[Path]] = field(default_factory=list) + logger: PipelineLogger | None = None + + @property + def is_multi_input(self) -> bool: + return len(self.matched_groups) > 1 + + def log_image(self, module_name: str, index: int, total: int, path: Path) -> None: + if self.logger is not None: + self.logger.image(module_name, index, total, path.name) diff --git a/imagepipeline/core/exceptions.py b/imagepipeline/core/exceptions.py new file mode 100644 index 0000000..c663b8f --- /dev/null +++ b/imagepipeline/core/exceptions.py @@ -0,0 +1,18 @@ +class PipelineError(Exception): + """Base error for pipeline failures.""" + + +class StepError(PipelineError): + """Error during step execution.""" + + +class DependencyError(PipelineError): + """Missing external tool or module dependency.""" + + +class ValidationError(PipelineError): + """Invalid parameters or configuration.""" + + +class CycleError(PipelineError): + """Cycle detected in pipeline DAG.""" diff --git a/imagepipeline/core/log.py b/imagepipeline/core/log.py new file mode 100644 index 0000000..a9f383b --- /dev/null +++ b/imagepipeline/core/log.py @@ -0,0 +1,82 @@ +from __future__ import annotations + +import sys +from typing import TextIO + + +def _format_eta(seconds: float) -> str: + seconds = max(0, int(seconds)) + minutes, secs = divmod(seconds, 60) + hours, minutes = divmod(minutes, 60) + if hours: + return f"{hours}h{minutes:02d}m" + if minutes: + return f"{minutes}m{secs:02d}s" + return f"{secs}s" + + +class PipelineLogger: + """Simple stdout logger for pipeline progress.""" + + def __init__(self, *, verbose: bool = True, stream: TextIO | None = None) -> None: + self.verbose = verbose + self.stream = stream or sys.stdout + + def info(self, message: str) -> None: + if self.verbose: + print(message, file=self.stream, flush=True) + + def step_start( + self, + step_index: int, + step_total: int, + step_id: str, + module_name: str, + *, + inputs: list[str], + params: dict, + ) -> None: + self.info( + f"Step {step_index}/{step_total}: {step_id} ({module_name})" + ) + self.info(f" inputs: {', '.join(inputs)}") + if params: + rendered = ", ".join(f"{key}={value!r}" for key, value in params.items()) + self.info(f" params: {rendered}") + + def image(self, module_name: str, index: int, total: int, filename: str) -> None: + self.info( + f" Applying module {module_name} to image [{index}/{total}]: {filename}" + ) + + def skipped(self, module_name: str, index: int, total: int, filename: str) -> None: + self.info( + f" Skipped module {module_name} [{index}/{total}] {filename} (output exists)" + ) + + def image_done( + self, + module_name: str, + index: int, + total: int, + filename: str, + elapsed: float, + *, + eta_seconds: float | None = None, + ) -> None: + eta_part = "" + if eta_seconds is not None and eta_seconds > 0: + eta_part = f" (ETA ~{_format_eta(eta_seconds)})" + self.info( + f" Finished module {module_name} [{index}/{total}] {filename} " + f"in {elapsed:.1f}s{eta_part}" + ) + + def warning(self, message: str) -> None: + self.info(f" Warning: {message}") + + def step_done(self, step_id: str, output_dir: str, count: int) -> None: + self.info(f" Done: {count} image(s) -> {output_dir}/") + + def blank(self) -> None: + self.info("") diff --git a/imagepipeline/core/manifest.py b/imagepipeline/core/manifest.py new file mode 100644 index 0000000..d49b057 --- /dev/null +++ b/imagepipeline/core/manifest.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +import json +from dataclasses import asdict, dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + + +@dataclass +class StepManifestEntry: + step_id: str + output_dir: str + module: str + inputs: list[str] + params: dict[str, Any] + input_files: list[str] = field(default_factory=list) + output_files: list[str] = field(default_factory=list) + + +@dataclass +class PipelineManifest: + name: str + output_root: str + input_dir: str + started_at: str + finished_at: str | None = None + steps: list[StepManifestEntry] = field(default_factory=list) + + def to_dict(self) -> dict[str, Any]: + return asdict(self) + + +def write_manifest(path: Path, manifest: PipelineManifest) -> None: + path.write_text( + json.dumps(manifest.to_dict(), indent=2, ensure_ascii=False) + "\n", + encoding="utf-8", + ) + + +def utc_now_iso() -> str: + return datetime.now(timezone.utc).replace(microsecond=0).isoformat() diff --git a/imagepipeline/core/params.py b/imagepipeline/core/params.py new file mode 100644 index 0000000..4ba2e1c --- /dev/null +++ b/imagepipeline/core/params.py @@ -0,0 +1,70 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + + +@dataclass(frozen=True) +class Param: + """Schema entry for a module parameter.""" + + type: str + default: Any = None + required: bool = False + help: str = "" + choices: tuple[Any, ...] | None = None + + def validate_value(self, name: str, value: Any) -> Any: + if value is None: + if self.required: + raise ValueError(f"Parameter '{name}' is required") + return self.default + + if self.choices is not None and value not in self.choices: + allowed = ", ".join(repr(c) for c in self.choices) + raise ValueError( + f"Parameter '{name}' must be one of [{allowed}], got {value!r}" + ) + + if self.type == "string": + if not isinstance(value, str): + raise ValueError(f"Parameter '{name}' must be a string") + return value + if self.type == "int": + if isinstance(value, bool) or not isinstance(value, int): + raise ValueError(f"Parameter '{name}' must be an integer") + return value + if self.type == "float": + if isinstance(value, bool) or not isinstance(value, (int, float)): + raise ValueError(f"Parameter '{name}' must be a number") + return float(value) + if self.type == "bool": + if not isinstance(value, bool): + raise ValueError(f"Parameter '{name}' must be a boolean") + return value + if self.type == "path": + from pathlib import Path + + if not isinstance(value, (str, Path)): + raise ValueError(f"Parameter '{name}' must be a path") + return Path(value) + if self.type == "list": + if not isinstance(value, list): + raise ValueError(f"Parameter '{name}' must be a list") + return value + + raise ValueError(f"Unknown parameter type '{self.type}' for '{name}'") + + +def validate_params( + schema: dict[str, Param], raw: dict[str, Any] +) -> dict[str, Any]: + unknown = set(raw) - set(schema) + if unknown: + names = ", ".join(sorted(unknown)) + raise ValueError(f"Unknown parameters: {names}") + + validated: dict[str, Any] = {} + for name, spec in schema.items(): + validated[name] = spec.validate_value(name, raw.get(name)) + return validated diff --git a/imagepipeline/core/pipeline.py b/imagepipeline/core/pipeline.py new file mode 100644 index 0000000..f645fde --- /dev/null +++ b/imagepipeline/core/pipeline.py @@ -0,0 +1,109 @@ +from __future__ import annotations + +from collections import defaultdict +from pathlib import Path +from typing import Any + +from imagepipeline.core.exceptions import ValidationError +from imagepipeline.core.runner import PipelineRunner +from imagepipeline.core.step import INPUT_SOURCE, StepDefinition, StepRef +from imagepipeline.modules.registry import get_module + +# Import built-in modules so they register on package load. +import imagepipeline.modules.imagemagick_grayscale # noqa: F401 + + +class Pipeline: + """Define and run an image processing pipeline.""" + + def __init__( + self, + *, + input_dir: Path | str, + name: str = "pipeline", + output_base: Path | str | None = None, + symlink_input: bool = True, + verbose: bool = True, + ) -> None: + self.input_dir = Path(input_dir) + self.name = name + self.output_base = Path(output_base) if output_base else None + self.symlink_input = symlink_input + self.verbose = verbose + self._steps: list[StepDefinition] = [] + self._module_counters: dict[str, int] = defaultdict(int) + self._output_root: Path | None = None + + def step( + self, + module_name: str, + *, + inputs: StepRef | str | list[StepRef | str], + **params: Any, + ) -> StepRef: + module_cls = get_module(module_name) + self._module_counters[module_name] += 1 + counter = self._module_counters[module_name] + output_dir_name = f"{module_name}_{counter:02d}" + step_id = output_dir_name + + input_refs = self._normalize_inputs(inputs) + reserved = {"inputs", "input"} + if reserved & set(params): + raise ValidationError( + "Do not pass 'inputs' or 'input' as module parameters" + ) + + definition = StepDefinition( + step_id=step_id, + module_name=module_name, + module=module_cls, + input_refs=input_refs, + params=params, + output_dir_name=output_dir_name, + ) + self._steps.append(definition) + return StepRef(step_id=step_id, output_dir_name=output_dir_name) + + def run(self) -> Path: + if not self._steps: + raise ValidationError("Pipeline has no steps") + runner = PipelineRunner( + name=self.name, + input_dir=self.input_dir, + output_base=self.output_base, + steps=self._steps, + symlink_input=self.symlink_input, + verbose=self.verbose, + ) + self._output_root = runner.run() + return self._output_root + + @property + def output_root(self) -> Path | None: + return self._output_root + + def _normalize_inputs( + self, inputs: StepRef | str | list[StepRef | str] + ) -> list[str]: + if isinstance(inputs, list): + if not inputs: + raise ValidationError("inputs must not be an empty list") + return [self._input_ref(item) for item in inputs] + return [self._input_ref(inputs)] + + @staticmethod + def _input_ref(value: StepRef | str) -> str: + if isinstance(value, StepRef): + return value.step_id + if value == INPUT_SOURCE: + return INPUT_SOURCE + if isinstance(value, str): + return value + raise ValidationError(f"Invalid input reference: {value!r}") + + def __enter__(self) -> Pipeline: + return self + + def __exit__(self, exc_type, exc, tb) -> None: + return None diff --git a/imagepipeline/core/runner.py b/imagepipeline/core/runner.py new file mode 100644 index 0000000..2ae6b82 --- /dev/null +++ b/imagepipeline/core/runner.py @@ -0,0 +1,207 @@ +from __future__ import annotations + +import os +from collections import defaultdict, deque +from datetime import datetime +from pathlib import Path + +from imagepipeline.core.context import ModuleContext +from imagepipeline.core.exceptions import CycleError, StepError, ValidationError +from imagepipeline.core.manifest import ( + PipelineManifest, + StepManifestEntry, + utc_now_iso, + write_manifest, +) +from imagepipeline.core.step import ( + INPUT_SOURCE, + StepDefinition, + StepResult, +) +from imagepipeline.utils.files import flatten_matched, list_images, match_by_stem + + +class PipelineRunner: + def __init__( + self, + *, + name: str, + input_dir: Path, + output_base: Path | None, + steps: list[StepDefinition], + symlink_input: bool = True, + verbose: bool = True, + ) -> None: + from imagepipeline.core.log import PipelineLogger + + self.name = name + self.input_dir = input_dir.resolve() + self.output_base = (output_base or Path.cwd()).resolve() + self.steps = steps + self.symlink_input = symlink_input + self.logger = PipelineLogger(verbose=verbose) + self.output_root = self._build_output_root() + self._input_link_dir = self.output_root / "input" + self._results: dict[str, StepResult] = {} + + def _build_output_root(self) -> Path: + timestamp = datetime.now().strftime("%y%m%d%H%M%S") + folder_name = f"{self.name}_{timestamp}" + output_root = self.output_base / folder_name + output_root.mkdir(parents=True, exist_ok=False) + return output_root + + def run(self) -> Path: + images = list_images(self.input_dir) + self.logger.info(f"Pipeline: {self.name}") + self.logger.info(f"Input: {self.input_dir}") + self.logger.info(f"Output: {self.output_root}") + self.logger.blank() + self.logger.info(f"Found {len(images)} photo(s)") + self.logger.blank() + + self._prepare_input_dir(images) + order = self._topological_sort() + step_total = len(order) + manifest = PipelineManifest( + name=self.name, + output_root=str(self.output_root), + input_dir=str(self.input_dir), + started_at=utc_now_iso(), + ) + + for step_index, step in enumerate(order, start=1): + result = self._execute_step(step, step_index=step_index, step_total=step_total) + self._results[step.step_id] = result + manifest.steps.append( + StepManifestEntry( + step_id=step.step_id, + output_dir=step.output_dir_name, + module=step.module_name, + inputs=step.input_refs, + params=step.params, + input_files=[str(p) for p in result.input_paths], + output_files=[str(p) for p in result.output_paths], + ) + ) + + manifest.finished_at = utc_now_iso() + write_manifest(self.output_root / "pipeline_manifest.json", manifest) + self.logger.blank() + self.logger.info("Pipeline finished.") + self.logger.info(f"Manifest: {self.output_root / 'pipeline_manifest.json'}") + return self.output_root + + def _prepare_input_dir(self, images: list[Path]) -> None: + self._input_link_dir.mkdir(parents=True, exist_ok=True) + for src in images: + dst = self._input_link_dir / src.name + if dst.exists() or dst.is_symlink(): + continue + if self.symlink_input: + os.symlink(src, dst) + else: + import shutil + + shutil.copy2(src, dst) + + def _resolve_source_paths(self, ref: str) -> list[Path]: + if ref == INPUT_SOURCE: + return list_images(self._input_link_dir) + if ref not in self._results: + raise StepError(f"Unknown step reference: {ref}") + return self._results[ref].output_paths + + def _execute_step( + self, + step: StepDefinition, + *, + step_index: int, + step_total: int, + ) -> StepResult: + step.module.check_dependencies() + validated = step.module.validate_module_params(step.params) + + source_lists = [self._resolve_source_paths(ref) for ref in step.input_refs] + matched_groups = match_by_stem(source_lists) + input_paths = flatten_matched(matched_groups) + + output_dir = self.output_root / step.output_dir_name + output_dir.mkdir(parents=True, exist_ok=True) + + self.logger.step_start( + step_index, + step_total, + step.output_dir_name, + step.module_name, + inputs=step.input_refs, + params=validated, + ) + + ctx = ModuleContext( + input_paths=input_paths, + output_dir=output_dir, + params=validated, + pipeline_output_root=self.output_root, + step_id=step.step_id, + matched_groups=matched_groups, + logger=self.logger, + ) + + try: + step.module().run(ctx) + except Exception as exc: + raise StepError( + f"Step '{step.output_dir_name}' ({step.module_name}) failed: {exc}" + ) from exc + + output_paths = step.module().list_output_images(ctx) + if not output_paths: + raise StepError( + f"Step '{step.output_dir_name}' ({step.module_name}) " + "produced no output images" + ) + + self.logger.step_done(step.output_dir_name, step.output_dir_name, len(output_paths)) + self.logger.blank() + + return StepResult( + step_id=step.step_id, + output_dir_name=step.output_dir_name, + module_name=step.module_name, + output_dir=output_dir, + input_paths=input_paths, + output_paths=output_paths, + params=validated, + ) + + def _topological_sort(self) -> list[StepDefinition]: + step_by_id = {step.step_id: step for step in self.steps} + in_degree: dict[str, int] = {step.step_id: 0 for step in self.steps} + dependents: dict[str, list[str]] = defaultdict(list) + + for step in self.steps: + deps = [ref for ref in step.input_refs if ref != INPUT_SOURCE] + in_degree[step.step_id] = len(deps) + for dep in deps: + if dep not in step_by_id: + raise ValidationError(f"Step '{step.step_id}' references unknown step '{dep}'") + dependents[dep].append(step.step_id) + + queue = deque( + step_id for step_id, degree in in_degree.items() if degree == 0 + ) + ordered_ids: list[str] = [] + + while queue: + step_id = queue.popleft() + ordered_ids.append(step_id) + for dependent in dependents[step_id]: + in_degree[dependent] -= 1 + if in_degree[dependent] == 0: + queue.append(dependent) + + if len(ordered_ids) != len(self.steps): + raise CycleError("Pipeline contains a cycle in step dependencies") + + return [step_by_id[step_id] for step_id in ordered_ids] diff --git a/imagepipeline/core/step.py b/imagepipeline/core/step.py new file mode 100644 index 0000000..356efc3 --- /dev/null +++ b/imagepipeline/core/step.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, TYPE_CHECKING + +if TYPE_CHECKING: + from imagepipeline.modules.base import BaseModule + + +INPUT_SOURCE = "input" + + +@dataclass +class StepDefinition: + """Internal step registered on a pipeline.""" + + step_id: str + module_name: str + module: BaseModule + input_refs: list[str] + params: dict[str, Any] + output_dir_name: str + + +@dataclass +class StepRef: + """Reference to a pipeline step, returned by Pipeline.step().""" + + step_id: str + output_dir_name: str + + def __repr__(self) -> str: + return f"StepRef({self.output_dir_name!r})" + + +@dataclass +class StepResult: + """Result of an executed step.""" + + step_id: str + output_dir_name: str + module_name: str + output_dir: Path + input_paths: list[Path] + output_paths: list[Path] = field(default_factory=list) + params: dict[str, Any] = field(default_factory=dict) diff --git a/imagepipeline/modules/__init__.py b/imagepipeline/modules/__init__.py new file mode 100644 index 0000000..ad32c77 --- /dev/null +++ b/imagepipeline/modules/__init__.py @@ -0,0 +1,15 @@ +"""Built-in pipeline modules.""" + +import imagepipeline.modules.ai_exposure # noqa: F401 +import imagepipeline.modules.ai_tone_map # noqa: F401 +import imagepipeline.modules.comfy_flux_edit # noqa: F401 +import imagepipeline.modules.composite # noqa: F401 +import imagepipeline.modules.crop_square # noqa: F401 +import imagepipeline.modules.darktable_style # noqa: F401 +import imagepipeline.modules.gmic # noqa: F401 +import imagepipeline.modules.gmic_grayscale # noqa: F401 +import imagepipeline.modules.imagemagick_fill # noqa: F401 +import imagepipeline.modules.imagemagick_grayscale # noqa: F401 +import imagepipeline.modules.imagemagick_scale_crop # noqa: F401 +import imagepipeline.modules.openrouter_edit # noqa: F401 +import imagepipeline.modules.rembg # noqa: F401 diff --git a/imagepipeline/modules/ai_base.py b/imagepipeline/modules/ai_base.py new file mode 100644 index 0000000..b60b721 --- /dev/null +++ b/imagepipeline/modules/ai_base.py @@ -0,0 +1,126 @@ +from __future__ import annotations + +import time +from collections.abc import Callable +from pathlib import Path +from typing import Any, ClassVar + +from imagepipeline.core.context import ModuleContext +from imagepipeline.core.params import Param +from imagepipeline.modules.base import BaseModule + + +def _format_eta(seconds: float) -> str: + seconds = max(0, int(seconds)) + minutes, secs = divmod(seconds, 60) + hours, minutes = divmod(minutes, 60) + if hours: + return f"{hours}h{minutes:02d}m" + if minutes: + return f"{minutes}m{secs:02d}s" + return f"{secs}s" + + +class AIModule(BaseModule): + """Base class for AI pipeline modules (CPU-first, batch-friendly).""" + + ai_common_parameters: ClassVar[dict[str, Param]] = { + "skip_existing": Param( + "bool", + default=True, + help="Skip images whose output file already exists", + ), + "max_edge": Param( + "int", + default=2048, + help="Max long edge for inference (0 = full resolution)", + ), + "device": Param( + "string", + default="cpu", + choices=("cpu",), + help="Inference device (v1 supports cpu only)", + ), + } + + @classmethod + def parameters(cls) -> dict[str, Param]: + return dict(cls.ai_common_parameters) + + @classmethod + def all_parameters(cls) -> dict[str, Param]: + merged = dict(cls.ai_common_parameters) + merged.update(cls.parameters()) + return merged + + @classmethod + def validate_module_params(cls, raw: dict[str, Any]) -> dict[str, Any]: + from imagepipeline.core.params import validate_params + + return validate_params(cls.all_parameters(), raw) + + def output_path(self, src: Path, ctx: ModuleContext) -> Path: + return ctx.output_dir / src.name + + def configure_torch(self, device_name: str) -> None: + import torch + + if device_name != "cpu": + raise ValueError(f"Unsupported device: {device_name!r} (only 'cpu' in v1)") + torch.set_num_threads(24) + + def iter_input_images( + self, + ctx: ModuleContext, + processor: Callable[[Path, Path, int, int], None], + ) -> None: + import tempfile + + from imagepipeline.ai.imaging import resize_max_edge, resize_to_size + + ctx.output_dir.mkdir(parents=True, exist_ok=True) + skip_existing = ctx.params["skip_existing"] + max_edge = ctx.params["max_edge"] + total = len(ctx.input_paths) + elapsed_times: list[float] = [] + + for index, src in enumerate(ctx.input_paths, start=1): + dst = self.output_path(src, ctx) + if skip_existing and dst.is_file(): + if ctx.logger is not None: + ctx.logger.skipped(self.name, index, total, dst.name) + continue + + self.log_image(ctx, index, total, src) + started = time.perf_counter() + + if max_edge > 0: + with tempfile.TemporaryDirectory(prefix="imagepipeline_ai_") as tmp: + tmp_path = Path(tmp) + work_src = tmp_path / f"work_{src.name}" + work_out = tmp_path / f"out_{src.name}" + orig_w, orig_h, _, _ = resize_max_edge(src, work_src, max_edge) + processor(work_src, work_out, index, total) + if (orig_w, orig_h) != self._image_size(work_out): + resize_to_size(work_out, dst, orig_w, orig_h) + else: + work_out.replace(dst) + else: + processor(src, dst, index, total) + + elapsed = time.perf_counter() - started + elapsed_times.append(elapsed) + remaining = total - index + avg = sum(elapsed_times) / len(elapsed_times) + eta = avg * remaining if remaining else 0.0 + if ctx.logger is not None: + ctx.logger.image_done( + self.name, index, total, dst.name, elapsed, eta_seconds=eta + ) + + @staticmethod + def _image_size(path: Path) -> tuple[int, int]: + from PIL import Image + + with Image.open(path) as image: + return image.size diff --git a/imagepipeline/modules/ai_exposure.py b/imagepipeline/modules/ai_exposure.py new file mode 100644 index 0000000..eb0471a --- /dev/null +++ b/imagepipeline/modules/ai_exposure.py @@ -0,0 +1,69 @@ +from __future__ import annotations + +from pathlib import Path + +from imagepipeline.ai.cache import ensure_weight +from imagepipeline.ai.imaging import load_pil_rgb, save_pil_image +from imagepipeline.core.context import ModuleContext +from imagepipeline.core.exceptions import DependencyError +from imagepipeline.core.params import Param +from imagepipeline.modules.ai_base import AIModule +from imagepipeline.modules.registry import register + + +@register +class AIExposureModule(AIModule): + name = "ai_exposure" + description = "Exposure enhancement using Zero-DCE++ (CPU)" + + _model = None + _model_device = None + + @classmethod + def parameters(cls) -> dict[str, Param]: + params = dict(super().parameters()) + params["strength"] = Param( + "float", + default=1.0, + help="Blend factor between original (0) and enhanced (1)", + ) + return params + + @classmethod + def check_dependencies(cls) -> None: + try: + import numpy # noqa: F401 + import torch # noqa: F401 + from PIL import Image # noqa: F401 + except ImportError as exc: + raise DependencyError( + "ai_exposure requires optional deps: pip install -e '.[ai]'" + ) from exc + + @classmethod + def _get_model(cls, device_name: str): + import torch + + from imagepipeline.ai.zero_dce import load_zero_dce_model + + device = torch.device(device_name) + if cls._model is None or cls._model_device != device: + weights = ensure_weight("zero_dce_pp", filename="zero_dce_pp_epoch99.pth") + cls._model = load_zero_dce_model(weights, device) + cls._model_device = device + return cls._model, device + + def run(self, ctx: ModuleContext) -> None: + from imagepipeline.ai.zero_dce import enhance_image + + self.check_dependencies() + self.configure_torch(ctx.params["device"]) + model, device = self._get_model(ctx.params["device"]) + strength = ctx.params["strength"] + + def process(src: Path, dst: Path, _index: int, _total: int) -> None: + image = load_pil_rgb(src) + result = enhance_image(model, image, device=device, strength=strength) + save_pil_image(result, dst) + + self.iter_input_images(ctx, process) diff --git a/imagepipeline/modules/ai_tone_map.py b/imagepipeline/modules/ai_tone_map.py new file mode 100644 index 0000000..013aa49 --- /dev/null +++ b/imagepipeline/modules/ai_tone_map.py @@ -0,0 +1,108 @@ +from __future__ import annotations + +from pathlib import Path + +from imagepipeline.ai.classical_tone import apply_clahe_tone +from imagepipeline.ai.imaging import load_pil_rgb, save_pil_image +from imagepipeline.core.context import ModuleContext +from imagepipeline.core.params import Param +from imagepipeline.modules.ai_base import AIModule +from imagepipeline.modules.registry import register + + +@register +class AIToneMapModule(AIModule): + name = "ai_tone_map" + description = "Tone mapping via HDRNet checkpoint or CLAHE fallback (CPU)" + + _hdrnet_model = None + _hdrnet_checkpoint: Path | None = None + _warned_fallback = False + + @classmethod + def parameters(cls) -> dict[str, Param]: + params = dict(super().parameters()) + params.update( + { + "checkpoint": Param( + "string", + default="", + help="HDRNet .pth checkpoint (creotiv format); empty uses CLAHE", + ), + "strength": Param( + "float", + default=1.0, + help="Blend factor between original and tone-mapped result", + ), + "net_input_size": Param( + "int", + default=256, + help="HDRNet low-res branch input size", + ), + } + ) + return params + + @classmethod + def check_dependencies(cls) -> None: + try: + import numpy # noqa: F401 + from PIL import Image # noqa: F401 + except ImportError as exc: + from imagepipeline.core.exceptions import DependencyError + + raise DependencyError( + "ai_tone_map requires optional deps: pip install -e '.[ai]'" + ) from exc + + def _load_hdrnet(self, checkpoint: Path, device_name: str): + import torch + + from imagepipeline.ai.hdrnet.model import load_hdrnet_checkpoint + + device = torch.device(device_name) + if self._hdrnet_model is None or self._hdrnet_checkpoint != checkpoint: + self._hdrnet_model, _params = load_hdrnet_checkpoint(checkpoint, device) + self._hdrnet_checkpoint = checkpoint + return self._hdrnet_model, device + + def run(self, ctx: ModuleContext) -> None: + self.check_dependencies() + checkpoint = ctx.params["checkpoint"] + strength = ctx.params["strength"] + use_hdrnet = bool(checkpoint) + + if use_hdrnet: + try: + import torch # noqa: F401 + except ImportError as exc: + raise RuntimeError( + "HDRNet tone mapping requires torch: pip install -e '.[ai]'" + ) from exc + self.configure_torch(ctx.params["device"]) + model, device = self._load_hdrnet(Path(checkpoint), ctx.params["device"]) + elif ctx.logger is not None and not self._warned_fallback: + ctx.logger.warning( + "Using classical CLAHE tone mapping (no HDRNet checkpoint configured)" + ) + self._warned_fallback = True + + net_input_size = ctx.params["net_input_size"] + + def process(src: Path, dst: Path, _index: int, _total: int) -> None: + image = load_pil_rgb(src) + if use_hdrnet: + from imagepipeline.ai.hdrnet.model import enhance_image_hdrnet + + result = enhance_image_hdrnet( + model, + image, + device=device, + net_input_size=net_input_size, + strength=strength, + ) + else: + result = apply_clahe_tone(image, strength=strength) + save_pil_image(result, dst) + + self.iter_input_images(ctx, process) diff --git a/imagepipeline/modules/base.py b/imagepipeline/modules/base.py new file mode 100644 index 0000000..cfdfe9b --- /dev/null +++ b/imagepipeline/modules/base.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from pathlib import Path +from typing import Any, ClassVar + +from imagepipeline.core.context import ModuleContext +from imagepipeline.core.params import Param, validate_params +from imagepipeline.utils.files import is_image + + +class BaseModule(ABC): + """Base class for all pipeline modules.""" + + name: ClassVar[str] + description: ClassVar[str] = "" + supported_input_formats: ClassVar[tuple[str, ...]] = ( + ".jpg", + ".jpeg", + ".png", + ".tif", + ".tiff", + ".webp", + ) + + @classmethod + def parameters(cls) -> dict[str, Param]: + return {} + + @classmethod + def validate_module_params(cls, raw: dict[str, Any]) -> dict[str, Any]: + return validate_params(cls.parameters(), raw) + + @classmethod + def check_dependencies(cls) -> None: + """Raise DependencyError if required external tools are missing.""" + + @abstractmethod + def run(self, ctx: ModuleContext) -> None: + """Process ctx.input_paths and write outputs into ctx.output_dir.""" + + def log_image(self, ctx: ModuleContext, index: int, total: int, path: Path) -> None: + ctx.log_image(self.name, index, total, path) + + def list_output_images(self, ctx: ModuleContext) -> list[Path]: + return sorted(p for p in ctx.output_dir.iterdir() if is_image(p)) + + +class SubprocessModule(BaseModule): + """Base class for modules that shell out to CLI tools.""" + + command_candidates: ClassVar[tuple[str, ...]] = () + default_timeout: ClassVar[float | None] = None + + @classmethod + def check_dependencies(cls) -> None: + from imagepipeline.utils.subprocess import require_command + + if cls.command_candidates: + require_command(*cls.command_candidates) + + @classmethod + def resolve_command(cls) -> str: + from imagepipeline.utils.subprocess import require_command + + return require_command(*cls.command_candidates) diff --git a/imagepipeline/modules/comfy_flux_edit.py b/imagepipeline/modules/comfy_flux_edit.py new file mode 100644 index 0000000..c391763 --- /dev/null +++ b/imagepipeline/modules/comfy_flux_edit.py @@ -0,0 +1,221 @@ +from __future__ import annotations + +import json +import random +import time +import urllib.error +import urllib.parse +import urllib.request +from pathlib import Path + +from imagepipeline.core.context import ModuleContext +from imagepipeline.core.exceptions import DependencyError +from imagepipeline.core.params import Param +from imagepipeline.modules.ai_base import AIModule +from imagepipeline.modules.registry import register + +REPO_ROOT = Path(__file__).resolve().parents[2] + + +@register +class ComfyFluxEditModule(AIModule): + name = "comfy_flux_edit" + description = "Experimental FLUX img2img editing via ComfyUI HTTP API (CPU: very slow)" + + @classmethod + def parameters(cls) -> dict[str, Param]: + params = dict(super().parameters()) + params.update( + { + "prompt": Param( + "string", + required=True, + help="Edit prompt for the ComfyUI workflow", + ), + "denoise": Param( + "float", + default=0.35, + help="KSampler denoise strength", + ), + "seed": Param( + "int", + default=-1, + help="Random seed (-1 = random per image)", + ), + "server_url": Param( + "string", + default="http://127.0.0.1:8188", + help="ComfyUI server base URL", + ), + "workflow_path": Param( + "path", + default=REPO_ROOT / "workflows" / "comfy" / "flux_klein_edit_api.json", + help="ComfyUI workflow exported in API format", + ), + "poll_interval": Param( + "float", + default=2.0, + help="Seconds between history polls", + ), + } + ) + return params + + @classmethod + def check_dependencies(cls) -> None: + pass + + def run(self, ctx: ModuleContext) -> None: + workflow_path = Path(ctx.params["workflow_path"]) + if not workflow_path.is_file(): + raise DependencyError( + f"ComfyUI workflow not found: {workflow_path}. " + "Export a Flux Klein img2img workflow to this path (see workflows/comfy/README.md)." + ) + + server_url = ctx.params["server_url"].rstrip("/") + self._ensure_server(server_url) + if ctx.logger is not None: + ctx.logger.warning( + "ComfyUI on CPU: expect hours per full-resolution image" + ) + + workflow_template = json.loads(workflow_path.read_text(encoding="utf-8")) + denoise = ctx.params["denoise"] + prompt = ctx.params["prompt"] + poll_interval = ctx.params["poll_interval"] + + def process(src: Path, dst: Path, _index: int, _total: int) -> None: + uploaded_name = self._upload_image(server_url, src) + seed = ctx.params["seed"] + if seed < 0: + seed = random.randint(0, 2**32 - 1) + workflow = self._patch_workflow( + workflow_template, + image_name=uploaded_name, + prompt=prompt, + denoise=denoise, + seed=seed, + ) + prompt_id = self._queue_prompt(server_url, workflow) + output_info = self._wait_for_output( + server_url, prompt_id, poll_interval=poll_interval + ) + image_bytes = self._download_view(server_url, output_info) + dst.write_bytes(image_bytes) + + self.iter_input_images(ctx, process) + + @staticmethod + def _ensure_server(server_url: str) -> None: + try: + urllib.request.urlopen(f"{server_url}/system_stats", timeout=5) + except urllib.error.URLError as exc: + raise DependencyError( + f"ComfyUI server not reachable at {server_url}: {exc}" + ) from exc + + @staticmethod + def _upload_image(server_url: str, src: Path) -> str: + boundary = "----imagepipelineboundary" + body_parts = [ + f"--{boundary}\r\n".encode(), + ( + f'Content-Disposition: form-data; name="image"; filename="{src.name}"\r\n' + f"Content-Type: application/octet-stream\r\n\r\n" + ).encode(), + src.read_bytes(), + b"\r\n", + f"--{boundary}\r\n".encode(), + b'Content-Disposition: form-data; name="overwrite"\r\n\r\n', + b"true\r\n", + f"--{boundary}--\r\n".encode(), + ] + body = b"".join(body_parts) + request = urllib.request.Request( + f"{server_url}/upload/image", + data=body, + headers={"Content-Type": f"multipart/form-data; boundary={boundary}"}, + method="POST", + ) + with urllib.request.urlopen(request, timeout=120) as response: + data = json.loads(response.read().decode("utf-8")) + return data["name"] + + @staticmethod + def _patch_workflow( + template: dict, + *, + image_name: str, + prompt: str, + denoise: float, + seed: int, + ) -> dict: + workflow = json.loads(json.dumps(template)) + for node in workflow.values(): + if not isinstance(node, dict): + continue + class_type = node.get("class_type", "") + inputs = node.get("inputs", {}) + if class_type == "LoadImage": + inputs["image"] = image_name + elif class_type in {"CLIPTextEncode", "TextEncode"} and "text" in inputs: + inputs["text"] = prompt + elif class_type == "KSampler": + inputs["denoise"] = denoise + inputs["seed"] = seed + elif "denoise" in inputs: + inputs["denoise"] = denoise + if "seed" in inputs and class_type != "KSampler": + inputs["seed"] = seed + return workflow + + @staticmethod + def _queue_prompt(server_url: str, workflow: dict) -> str: + payload = json.dumps({"prompt": workflow}).encode("utf-8") + request = urllib.request.Request( + f"{server_url}/prompt", + data=payload, + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urllib.request.urlopen(request, timeout=60) as response: + data = json.loads(response.read().decode("utf-8")) + return data["prompt_id"] + + @staticmethod + def _wait_for_output( + server_url: str, + prompt_id: str, + *, + poll_interval: float, + timeout: float = 86400.0, + ) -> dict: + deadline = time.time() + timeout + while time.time() < deadline: + request = urllib.request.Request(f"{server_url}/history/{prompt_id}") + with urllib.request.urlopen(request, timeout=30) as response: + history = json.loads(response.read().decode("utf-8")) + if prompt_id in history: + outputs = history[prompt_id].get("outputs") or {} + for node_output in outputs.values(): + images = node_output.get("images") or [] + if images: + return images[0] + status = history[prompt_id].get("status", {}) + if status.get("status_str") == "error": + raise RuntimeError(f"ComfyUI workflow failed: {status}") + time.sleep(poll_interval) + raise TimeoutError(f"ComfyUI prompt {prompt_id} did not finish within {timeout}s") + + @staticmethod + def _download_view(server_url: str, image_info: dict) -> bytes: + query = urllib.parse.urlencode( + { + "filename": image_info["filename"], + "subfolder": image_info.get("subfolder", ""), + "type": image_info.get("type", "output"), + } + ) + with urllib.request.urlopen(f"{server_url}/view?{query}", timeout=120) as response: + return response.read() diff --git a/imagepipeline/modules/composite.py b/imagepipeline/modules/composite.py new file mode 100644 index 0000000..74efd79 --- /dev/null +++ b/imagepipeline/modules/composite.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +from imagepipeline.core.context import ModuleContext +from imagepipeline.core.params import Param +from imagepipeline.modules.base import SubprocessModule +from imagepipeline.modules.registry import register +from imagepipeline.utils.subprocess import run_command + + +@register +class CompositeModule(SubprocessModule): + name = "composite" + description = ( + "Composite multiple inputs. With two inputs: first=background, " + "second=foreground (with alpha)." + ) + command_candidates = ("magick", "convert") + + @classmethod + def parameters(cls) -> dict[str, Param]: + return { + "mode": Param( + "string", + default="over", + choices=("over", "multiply", "screen"), + help="ImageMagick -compose mode", + ), + "output_ext": Param( + "string", + default=".png", + help="Output file extension including dot", + ), + "foreground_opacity": Param( + "float", + default=1.0, + help="Foreground layer opacity from 0.0 to 1.0", + ), + } + + def run(self, ctx: ModuleContext) -> None: + if not ctx.matched_groups: + raise ValueError("composite requires at least one matched input group") + + command = self.resolve_command() + mode = ctx.params["mode"] + output_ext = ctx.params["output_ext"] + foreground_opacity = ctx.params["foreground_opacity"] + ctx.output_dir.mkdir(parents=True, exist_ok=True) + total = len(ctx.matched_groups) + + for index, group in enumerate(ctx.matched_groups, start=1): + if len(group) < 2: + raise ValueError( + "composite requires at least two input sources per image" + ) + + background, foreground = group[0], group[1] + self.log_image(ctx, index, total, foreground) + dst = ctx.output_dir / f"{foreground.stem}{output_ext}" + + background_args = [ + "(", + str(background), + "-define", + "png:color-type=2", + "-type", + "TrueColor", + "-colorspace", + "sRGB", + ")", + ] + if foreground_opacity < 1.0: + foreground_args = [ + "(", + str(foreground), + "-alpha", + "on", + "-channel", + "A", + "-evaluate", + "multiply", + str(foreground_opacity), + "+channel", + ")", + ] + else: + foreground_args = [str(foreground)] + + cmd = [ + command, + *background_args, + *foreground_args, + "-compose", + mode, + "-composite", + str(dst), + ] + run_command(cmd) diff --git a/imagepipeline/modules/crop_square.py b/imagepipeline/modules/crop_square.py new file mode 100644 index 0000000..d410199 --- /dev/null +++ b/imagepipeline/modules/crop_square.py @@ -0,0 +1,45 @@ +from __future__ import annotations + +from pathlib import Path + +from imagepipeline.core.context import ModuleContext +from imagepipeline.modules.base import SubprocessModule +from imagepipeline.modules.registry import register +from imagepipeline.utils.subprocess import run_command + + +@register +class CropSquareModule(SubprocessModule): + name = "crop_square" + description = "Center-crop images to the largest possible square" + command_candidates = ("magick", "convert") + + def run(self, ctx: ModuleContext) -> None: + command = self.resolve_command() + ctx.output_dir.mkdir(parents=True, exist_ok=True) + total = len(ctx.input_paths) + + for index, src in enumerate(ctx.input_paths, start=1): + self.log_image(ctx, index, total, src) + dst = ctx.output_dir / src.name + size, left, top = self._center_square_crop(src, command) + run_command( + [ + command, + str(src), + "-auto-orient", + "-crop", + f"{size}x{size}+{left}+{top}", + "+repage", + str(dst), + ] + ) + + @staticmethod + def _center_square_crop(src: Path, command: str) -> tuple[int, int, int]: + result = run_command([command, "-format", "%w %h", str(src), "info:"]) + width, height = map(int, result.stdout.strip().split()) + size = min(width, height) + left = (width - size) // 2 + top = (height - size) // 2 + return size, left, top diff --git a/imagepipeline/modules/darktable_style.py b/imagepipeline/modules/darktable_style.py new file mode 100644 index 0000000..79ed33e --- /dev/null +++ b/imagepipeline/modules/darktable_style.py @@ -0,0 +1,109 @@ +from __future__ import annotations + +from pathlib import Path + +from imagepipeline.core.context import ModuleContext +from imagepipeline.core.params import Param +from imagepipeline.modules.base import SubprocessModule +from imagepipeline.modules.registry import register +from imagepipeline.utils.subprocess import run_command + +JPEG_EXTENSIONS = {".jpg", ".jpeg"} +PNG_EXTENSIONS = {".png"} +SUPPORTED_EXTENSIONS = JPEG_EXTENSIONS | PNG_EXTENSIONS + + +def _normalize_format(ext: str) -> str: + ext = ext.lower().lstrip(".") + if ext in {"jpg", "jpeg"}: + return "jpeg" + if ext == "png": + return "png" + raise ValueError( + f"Unsupported output format {ext!r}; darktable_style supports jpeg and png only" + ) + + +def export_conf_options(format_name: str) -> list[str]: + if format_name == "jpeg": + return ["plugins/imageio/format/jpeg/quality=90"] + if format_name == "png": + return [ + "plugins/imageio/format/png/bpp=8", + "plugins/imageio/format/png/compression=5", + ] + raise ValueError(f"Unsupported format {format_name!r}") + + +def resolve_export_format(src: Path, out_ext: str) -> tuple[str, list[str]]: + ext = out_ext or src.suffix.lstrip(".") + format_name = _normalize_format(ext) + return format_name, export_conf_options(format_name) + + +@register +class DarktableStyleModule(SubprocessModule): + name = "darktable_style" + description = "Apply a darktable style via darktable-cli (JPEG/PNG export)" + command_candidates = ("darktable-cli",) + default_timeout = 600.0 + supported_input_formats = (".jpg", ".jpeg", ".png") + + @classmethod + def parameters(cls) -> dict[str, Param]: + return { + "style": Param( + "string", + required=True, + help='darktable style name (e.g. "Watermark F12.rocks")', + ), + "style_overwrite": Param( + "bool", + default=True, + help="Pass --style-overwrite to darktable-cli", + ), + "out_ext": Param( + "string", + default="", + help="Output format (jpeg or png); empty keeps input format", + ), + "config_dir": Param( + "path", + default=Path.home() / ".config" / "darktable", + help="darktable config directory (required when applying styles)", + ), + } + + def run(self, ctx: ModuleContext) -> None: + ctx.output_dir.mkdir(parents=True, exist_ok=True) + style = ctx.params["style"] + style_overwrite = ctx.params["style_overwrite"] + out_ext = ctx.params["out_ext"] + config_dir = Path(ctx.params["config_dir"]) + total = len(ctx.input_paths) + + for index, src in enumerate(ctx.input_paths, start=1): + if src.suffix.lower() not in SUPPORTED_EXTENSIONS: + raise ValueError( + f"Unsupported input format {src.suffix!r} for {src.name}; " + "darktable_style supports .jpg, .jpeg, and .png only" + ) + self.log_image(ctx, index, total, src) + format_name, conf_options = resolve_export_format(src, out_ext) + cmd = [ + "darktable-cli", + str(src), + str(ctx.output_dir), + "--style", + style, + "--out-ext", + format_name, + "--core", + "--configdir", + str(config_dir), + ] + for conf in conf_options: + cmd.extend(["--conf", conf]) + if style_overwrite: + cmd.append("--style-overwrite") + run_command(cmd, timeout=self.default_timeout) diff --git a/imagepipeline/modules/gmic.py b/imagepipeline/modules/gmic.py new file mode 100644 index 0000000..444c91b --- /dev/null +++ b/imagepipeline/modules/gmic.py @@ -0,0 +1,36 @@ +from __future__ import annotations + +from imagepipeline.core.context import ModuleContext +from imagepipeline.core.params import Param +from imagepipeline.modules.base import SubprocessModule +from imagepipeline.modules.registry import register +from imagepipeline.utils.subprocess import run_command + + +@register +class GmicModule(SubprocessModule): + name = "gmic" + description = "Run a G'MIC command on each input image" + command_candidates = ("gmic",) + default_timeout = 300.0 + + @classmethod + def parameters(cls) -> dict[str, Param]: + return { + "command": Param( + "string", + required=True, + help="G'MIC command(s) applied before -output (e.g. '-fx_drop_shadow3d ...')", + ), + } + + def run(self, ctx: ModuleContext) -> None: + ctx.output_dir.mkdir(parents=True, exist_ok=True) + gmic_command = ctx.params["command"] + total = len(ctx.input_paths) + + for index, src in enumerate(ctx.input_paths, start=1): + self.log_image(ctx, index, total, src) + dst = ctx.output_dir / src.name + cmd = ["gmic", str(src), gmic_command, "-output", str(dst)] + run_command(cmd, timeout=self.default_timeout) diff --git a/imagepipeline/modules/gmic_grayscale.py b/imagepipeline/modules/gmic_grayscale.py new file mode 100644 index 0000000..7fdf210 --- /dev/null +++ b/imagepipeline/modules/gmic_grayscale.py @@ -0,0 +1,36 @@ +from __future__ import annotations + +from imagepipeline.core.context import ModuleContext +from imagepipeline.core.params import Param +from imagepipeline.modules.base import SubprocessModule +from imagepipeline.modules.registry import register +from imagepipeline.utils.subprocess import run_command + + +@register +class GmicGrayscale(SubprocessModule): + name = "gmic_grayscale" + description = "Convert images to grayscale using G'MIC" + command_candidates = ("gmic",) + default_timeout = 300.0 + + @classmethod + def parameters(cls) -> dict[str, Param]: + return { + "command": Param( + "string", + default="-to_gray", + help="G'MIC command(s) applied before -output", + ), + } + + def run(self, ctx: ModuleContext) -> None: + ctx.output_dir.mkdir(parents=True, exist_ok=True) + gmic_command = ctx.params["command"] + total = len(ctx.input_paths) + + for index, src in enumerate(ctx.input_paths, start=1): + self.log_image(ctx, index, total, src) + dst = ctx.output_dir / src.name + cmd = ["gmic", str(src), gmic_command, "-output", str(dst)] + run_command(cmd, timeout=self.default_timeout) diff --git a/imagepipeline/modules/imagemagick_fill.py b/imagepipeline/modules/imagemagick_fill.py new file mode 100644 index 0000000..a237d2a --- /dev/null +++ b/imagepipeline/modules/imagemagick_fill.py @@ -0,0 +1,117 @@ +from __future__ import annotations + +from pathlib import Path + +from imagepipeline.core.context import ModuleContext +from imagepipeline.core.params import Param +from imagepipeline.modules.base import SubprocessModule +from imagepipeline.modules.registry import register +from imagepipeline.utils.subprocess import run_command + + +def normalize_color(color: str) -> str: + color = color.strip() + if not color: + raise ValueError("Color must not be empty") + if color.startswith("#"): + return color + if color.replace(".", "", 1).isdigit(): + return color + return f"#{color}" + + +def build_fill_arguments( + width: int, + height: int, + *, + color1: str, + color2: str | None, + gradient: bool, + radial: bool, + angle: float | None, +) -> list[str]: + c1 = normalize_color(color1) + size = f"{width}x{height}" + + if not gradient: + return ["-size", size, f"xc:{c1}"] + + c2 = normalize_color(color2 or color1) + if radial: + return ["-size", size, f"radial-gradient:{c1}-{c2}"] + + args = ["-size", size] + if angle is not None: + args.extend(["-define", f"gradient:angle={angle}"]) + args.append(f"gradient:{c1}-{c2}") + return args + + +@register +class ImageMagickFillModule(SubprocessModule): + name = "imagemagick_fill" + description = ( + "Create solid-color or gradient images sized to match each input image" + ) + command_candidates = ("magick", "convert") + + @classmethod + def parameters(cls) -> dict[str, Param]: + return { + "color1": Param( + "string", + required=True, + help="Primary color (hex, e.g. #d7fd00, or ImageMagick color name)", + ), + "color2": Param( + "string", + default="", + help="Second gradient color; ignored for solid fills", + ), + "gradient": Param( + "bool", + default=False, + help="Create a gradient instead of a solid fill", + ), + "radial": Param( + "bool", + default=False, + help="Use a radial gradient (linear when false)", + ), + "angle": Param( + "float", + default=None, + help="Linear gradient angle in degrees (ignored for radial/solid)", + ), + } + + def run(self, ctx: ModuleContext) -> None: + command = self.resolve_command() + color1 = ctx.params["color1"] + color2 = ctx.params["color2"] or None + gradient = ctx.params["gradient"] + radial = ctx.params["radial"] + angle = ctx.params["angle"] + ctx.output_dir.mkdir(parents=True, exist_ok=True) + total = len(ctx.input_paths) + + for index, src in enumerate(ctx.input_paths, start=1): + self.log_image(ctx, index, total, src) + width, height = self._image_size(command, src) + fill_args = build_fill_arguments( + width, + height, + color1=color1, + color2=color2, + gradient=gradient, + radial=radial, + angle=angle, + ) + dst = ctx.output_dir / src.name + run_command([command, *fill_args, str(dst)]) + + @staticmethod + def _image_size(command: str, src: Path) -> tuple[int, int]: + result = run_command([command, "-format", "%w %h", str(src), "info:"]) + width, height = map(int, result.stdout.strip().split()) + return width, height diff --git a/imagepipeline/modules/imagemagick_grayscale.py b/imagepipeline/modules/imagemagick_grayscale.py new file mode 100644 index 0000000..aec0e75 --- /dev/null +++ b/imagepipeline/modules/imagemagick_grayscale.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +from pathlib import Path + +from imagepipeline.core.context import ModuleContext +from imagepipeline.core.params import Param +from imagepipeline.modules.base import SubprocessModule +from imagepipeline.modules.registry import register +from imagepipeline.utils.subprocess import run_command + + +@register +class ImageMagickGrayscale(SubprocessModule): + name = "imagemagick_grayscale" + description = "Convert images to grayscale using ImageMagick" + command_candidates = ("magick", "convert") + + @classmethod + def parameters(cls) -> dict[str, Param]: + return { + "colorspace": Param( + "string", + default="Gray", + help="ImageMagick -colorspace value", + ), + } + + def run(self, ctx: ModuleContext) -> None: + command = self.resolve_command() + colorspace = ctx.params["colorspace"] + ctx.output_dir.mkdir(parents=True, exist_ok=True) + total = len(ctx.input_paths) + + for index, src in enumerate(ctx.input_paths, start=1): + self.log_image(ctx, index, total, src) + dst = ctx.output_dir / src.name + if command == "magick": + cmd = [command, str(src), "-colorspace", colorspace, str(dst)] + else: + cmd = [command, str(src), "-colorspace", colorspace, str(dst)] + run_command(cmd) diff --git a/imagepipeline/modules/imagemagick_scale_crop.py b/imagepipeline/modules/imagemagick_scale_crop.py new file mode 100644 index 0000000..7982621 --- /dev/null +++ b/imagepipeline/modules/imagemagick_scale_crop.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +from pathlib import Path + +from imagepipeline.core.context import ModuleContext +from imagepipeline.core.params import Param +from imagepipeline.modules.base import SubprocessModule +from imagepipeline.modules.registry import register +from imagepipeline.utils.subprocess import run_command + + +@register +class ImageMagickScaleCropModule(SubprocessModule): + name = "imagemagick_scale_crop" + description = ( + "Scale an image then center-crop back to its original dimensions" + ) + command_candidates = ("magick", "convert") + + @classmethod + def parameters(cls) -> dict[str, Param]: + return { + "scale": Param( + "float", + default=1.05, + help="Uniform scale factor before center crop (1.05 = 5% larger)", + ), + } + + def run(self, ctx: ModuleContext) -> None: + command = self.resolve_command() + scale = ctx.params["scale"] + ctx.output_dir.mkdir(parents=True, exist_ok=True) + total = len(ctx.input_paths) + + for index, src in enumerate(ctx.input_paths, start=1): + self.log_image(ctx, index, total, src) + width, height = self._image_size(command, src) + dst = ctx.output_dir / src.name + scaled_w = max(1, round(width * scale)) + scaled_h = max(1, round(height * scale)) + run_command( + [ + command, + str(src), + "-resize", + f"{scaled_w}x{scaled_h}!", + "-gravity", + "Center", + "-crop", + f"{width}x{height}+0+0", + "+repage", + str(dst), + ] + ) + + @staticmethod + def _image_size(command: str, src: Path) -> tuple[int, int]: + result = run_command([command, "-format", "%w %h", str(src), "info:"]) + width, height = map(int, result.stdout.strip().split()) + return width, height diff --git a/imagepipeline/modules/openrouter_edit.py b/imagepipeline/modules/openrouter_edit.py new file mode 100644 index 0000000..5196286 --- /dev/null +++ b/imagepipeline/modules/openrouter_edit.py @@ -0,0 +1,150 @@ +from __future__ import annotations + +import base64 +import io +import json +import os +import urllib.error +import urllib.request +from pathlib import Path + +from imagepipeline.ai.imaging import load_pil_rgb +from imagepipeline.core.context import ModuleContext +from imagepipeline.core.exceptions import DependencyError +from imagepipeline.core.params import Param +from imagepipeline.modules.ai_base import AIModule +from imagepipeline.modules.registry import register + +OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions" + + +@register +class OpenRouterEditModule(AIModule): + name = "openrouter_edit" + description = "Generative image editing via OpenRouter (cloud)" + + @classmethod + def parameters(cls) -> dict[str, Param]: + params = dict(super().parameters()) + params.update( + { + "prompt": Param( + "string", + required=True, + help="Edit instruction for the model", + ), + "model": Param( + "string", + default="black-forest-labs/flux.2-klein-4b", + help="OpenRouter model id with image output", + ), + "strength": Param( + "float", + default=0.3, + help="Edit strength (image_config.strength where supported)", + ), + "api_key_env": Param( + "string", + default="OPENROUTER_API_KEY", + help="Environment variable containing the API key", + ), + } + ) + return params + + @classmethod + def check_dependencies(cls) -> None: + if not os.environ.get("OPENROUTER_API_KEY"): + raise DependencyError( + "OPENROUTER_API_KEY environment variable is not set" + ) + + def run(self, ctx: ModuleContext) -> None: + api_key_env = ctx.params["api_key_env"] + api_key = os.environ.get(api_key_env) + if not api_key: + raise DependencyError(f"Environment variable {api_key_env!r} is not set") + + prompt = ctx.params["prompt"] + model = ctx.params["model"] + strength = ctx.params["strength"] + + def process(src: Path, dst: Path, index: int, total: int) -> None: + image = load_pil_rgb(src) + megapixels = (image.size[0] * image.size[1]) / 1_000_000 + if ctx.logger is not None: + ctx.logger.info( + f" OpenRouter request [{index}/{total}]: model={model!r}, " + f"~{megapixels:.1f} MP (cost varies by model)" + ) + payload = self._build_payload(image, prompt, model, strength) + response = self._post(api_key, payload) + result_bytes = self._extract_image_bytes(response) + dst.parent.mkdir(parents=True, exist_ok=True) + dst.write_bytes(result_bytes) + + self.iter_input_images(ctx, process) + + @staticmethod + def _build_payload(image, prompt: str, model: str, strength: float) -> dict: + buffer = io.BytesIO() + image.save(buffer, format="JPEG", quality=92) + encoded = base64.b64encode(buffer.getvalue()).decode("ascii") + data_url = f"data:image/jpeg;base64,{encoded}" + payload = { + "model": model, + "modalities": ["image"], + "messages": [ + { + "role": "user", + "content": [ + {"type": "text", "text": prompt}, + {"type": "image_url", "image_url": {"url": data_url}}, + ], + } + ], + } + if strength is not None: + payload["image_config"] = {"strength": strength} + return payload + + @staticmethod + def _post(api_key: str, payload: dict) -> dict: + body = json.dumps(payload).encode("utf-8") + request = urllib.request.Request( + OPENROUTER_URL, + data=body, + headers={ + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json", + "HTTP-Referer": "https://github.com/froxxxy/imagepipeline", + "X-Title": "imagepipeline", + }, + method="POST", + ) + try: + with urllib.request.urlopen(request, timeout=600) as response: + return json.loads(response.read().decode("utf-8")) + except urllib.error.HTTPError as exc: + detail = exc.read().decode("utf-8", errors="replace") + raise RuntimeError( + f"OpenRouter API error ({exc.code}): {detail}" + ) from exc + + @staticmethod + def _extract_image_bytes(response: dict) -> bytes: + choices = response.get("choices") or [] + if not choices: + raise RuntimeError("OpenRouter response contained no choices") + message = choices[0].get("message") or {} + images = message.get("images") or [] + if not images: + raise RuntimeError("OpenRouter response contained no images") + url = images[0].get("image_url", {}).get("url", "") + if url.startswith("data:"): + _, encoded = url.split(",", 1) + return base64.b64decode(encoded) + if url.startswith("http://") or url.startswith("https://"): + with urllib.request.urlopen(url, timeout=120) as image_response: + return image_response.read() + raise RuntimeError(f"Unsupported image URL in OpenRouter response: {url!r}") diff --git a/imagepipeline/modules/registry.py b/imagepipeline/modules/registry.py new file mode 100644 index 0000000..a3b2890 --- /dev/null +++ b/imagepipeline/modules/registry.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from imagepipeline.modules.base import BaseModule + +_REGISTRY: dict[str, type[BaseModule]] = {} + + +def register(cls: type[BaseModule]) -> type[BaseModule]: + if not getattr(cls, "name", None): + raise ValueError(f"Module {cls.__name__} must define 'name'") + existing = _REGISTRY.get(cls.name) + if existing is not None: + if existing is cls: + return cls + raise ValueError(f"Module name already registered: {cls.name}") + _REGISTRY[cls.name] = cls + return cls + + +def unregister(name: str) -> None: + _REGISTRY.pop(name, None) + + +def get_module(name: str) -> type[BaseModule]: + if name not in _REGISTRY: + available = ", ".join(sorted(_REGISTRY)) or "(none)" + raise KeyError(f"Unknown module '{name}'. Available: {available}") + return _REGISTRY[name] + + +def list_modules() -> list[str]: + return sorted(_REGISTRY) + + +def clear_registry() -> None: + """Clear registry — intended for tests only.""" + _REGISTRY.clear() diff --git a/imagepipeline/modules/rembg.py b/imagepipeline/modules/rembg.py new file mode 100644 index 0000000..1e543d4 --- /dev/null +++ b/imagepipeline/modules/rembg.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +from imagepipeline.core.context import ModuleContext +from imagepipeline.core.params import Param +from imagepipeline.modules.base import SubprocessModule +from imagepipeline.modules.registry import register +from imagepipeline.utils.subprocess import run_command + + +@register +class RembgModule(SubprocessModule): + name = "rembg" + description = "Remove image background using rembg" + command_candidates = ("rembg",) + default_timeout = 600.0 + supported_input_formats = (".jpg", ".jpeg", ".png", ".webp", ".tif", ".tiff", ".bmp") + + @classmethod + def parameters(cls) -> dict[str, Param]: + return { + "model": Param( + "string", + default="birefnet-general", + help="rembg model name (-m)", + ), + "alpha_matting": Param( + "bool", + default=True, + help="Enable alpha matting (-a)", + ), + } + + def run(self, ctx: ModuleContext) -> None: + ctx.output_dir.mkdir(parents=True, exist_ok=True) + model = ctx.params["model"] + alpha_matting = ctx.params["alpha_matting"] + total = len(ctx.input_paths) + + for index, src in enumerate(ctx.input_paths, start=1): + self.log_image(ctx, index, total, src) + dst = ctx.output_dir / f"{src.stem}.png" + cmd = ["rembg", "i", "-m", model] + if alpha_matting: + cmd.append("-a") + cmd.extend([str(src), str(dst)]) + run_command(cmd, timeout=self.default_timeout) diff --git a/imagepipeline/utils/__init__.py b/imagepipeline/utils/__init__.py new file mode 100644 index 0000000..9453f12 --- /dev/null +++ b/imagepipeline/utils/__init__.py @@ -0,0 +1 @@ +"""Shared utilities.""" diff --git a/imagepipeline/utils/files.py b/imagepipeline/utils/files.py new file mode 100644 index 0000000..4750f3d --- /dev/null +++ b/imagepipeline/utils/files.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +from pathlib import Path + +IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".tif", ".tiff", ".webp", ".bmp", ".gif"} + + +def is_image(path: Path) -> bool: + return path.is_file() and path.suffix.lower() in IMAGE_EXTENSIONS + + +def list_images(directory: Path) -> list[Path]: + if not directory.is_dir(): + raise FileNotFoundError(f"Input directory not found: {directory}") + images = sorted(p for p in directory.iterdir() if is_image(p)) + if not images: + raise ValueError(f"No image files found in {directory}") + return images + + +def stem_key(path: Path) -> str: + return path.stem.lower() + + +def match_by_stem(sources: list[list[Path]]) -> list[list[Path]]: + """Match image paths across multiple source lists by filename stem.""" + if not sources: + return [] + if len(sources) == 1: + return [[p] for p in sources[0]] + + key_maps: list[dict[str, Path]] = [] + for source in sources: + mapping: dict[str, Path] = {} + for path in source: + key = stem_key(path) + if key in mapping: + raise ValueError( + f"Duplicate stem '{key}' in {path.parent}: " + f"{mapping[key].name} and {path.name}" + ) + mapping[key] = path + key_maps.append(mapping) + + base_keys = set(key_maps[0]) + for idx, mapping in enumerate(key_maps[1:], start=2): + keys = set(mapping) + missing = base_keys - keys + extra = keys - base_keys + if missing or extra: + parts = [f"source 1 has {len(base_keys)} image(s)"] + if missing: + sample = ", ".join(sorted(missing)[:5]) + parts.append(f"source {idx} missing: {sample}") + if extra: + sample = ", ".join(sorted(extra)[:5]) + parts.append(f"source {idx} extra: {sample}") + raise ValueError("Cannot match images by stem: " + "; ".join(parts)) + + return [[mapping[key] for mapping in key_maps] for key in sorted(base_keys)] + + +def flatten_matched(groups: list[list[Path]]) -> list[Path]: + """For single-input steps, return the first path from each group.""" + return [group[0] for group in groups] diff --git a/imagepipeline/utils/subprocess.py b/imagepipeline/utils/subprocess.py new file mode 100644 index 0000000..471c806 --- /dev/null +++ b/imagepipeline/utils/subprocess.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +import shutil +import subprocess +from pathlib import Path + +from imagepipeline.core.exceptions import DependencyError + + +def command_exists(name: str) -> bool: + return shutil.which(name) is not None + + +def run_command( + cmd: list[str], + *, + timeout: float | None = None, + cwd: Path | None = None, +) -> subprocess.CompletedProcess[str]: + try: + return subprocess.run( + cmd, + check=True, + capture_output=True, + text=True, + timeout=timeout, + cwd=str(cwd) if cwd else None, + ) + except subprocess.CalledProcessError as exc: + stderr = (exc.stderr or "").strip() + stdout = (exc.stdout or "").strip() + detail = stderr or stdout or str(exc) + raise RuntimeError( + f"Command failed ({exc.returncode}): {' '.join(cmd)}\n{detail}" + ) from exc + except subprocess.TimeoutExpired as exc: + raise RuntimeError( + f"Command timed out after {timeout}s: {' '.join(cmd)}" + ) from exc + + +def require_command(*names: str) -> str: + for name in names: + if command_exists(name): + return name + options = " or ".join(names) + raise DependencyError(f"Required command not found on PATH: {options}") diff --git a/pipelines/example_ai.py b/pipelines/example_ai.py new file mode 100644 index 0000000..d759007 --- /dev/null +++ b/pipelines/example_ai.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python3 +"""Example pipeline: OpenRouter photo enhancement (Darktable export → cloud edit).""" + +import os +from pathlib import Path + +from imagepipeline import Pipeline + +REPO_ROOT = Path(__file__).resolve().parents[1] +ENV_FILE = REPO_ROOT / ".env" + +# Change this to your Darktable export folder. +INPUT = Path("/home/frank/tmp/aiedittest") + +# Optional: where run folders are created (defaults to current working directory). +OUTPUT_BASE = Path.home() / "pipeline_output" + +OPENROUTER_PROMPT = ( + "Subtle photo enhancement only. Improve exposure, color, and skin tones. " + "Sharpen where the blur seems to be unintended." + "Keep the same people, poses, background, and composition. " + "Natural, realistic look — no oversaturation or plastic skin." + "Do not change image ratio, do not crop or enlarge the image, just improve the photo." + "In group photos, improve the photo of the group, not the individual photos." + "Also in group photos make faces same brightness and remove any glare or reflections." + "Think of it like an amateur developed the RAW file in Lightroom and you are the senior who makes final touches to make it look professional." +) + +# Test cheap with Klein; switch to flux.2-pro or flux.2-max for final exports. +OPENROUTER_MODEL = "x-ai/grok-imagine-image-quality" + + +def _load_env_file(path: Path) -> None: + if not path.is_file(): + return + for raw_line in path.read_text(encoding="utf-8").splitlines(): + line = raw_line.strip() + if not line or line.startswith("#"): + continue + key, sep, value = line.partition("=") + if not sep: + continue + key = key.strip() + value = value.strip() + if len(value) >= 2 and value[0] == value[-1] and value[0] in "\"'": + value = value[1:-1] + os.environ.setdefault(key, value) + + +def main() -> None: + _load_env_file(ENV_FILE) + + with Pipeline( + name="example_ai", + input_dir=INPUT, + output_base=OUTPUT_BASE, + ) as p: + p.step( + "openrouter_edit", + inputs="input", + prompt=OPENROUTER_PROMPT, + model=OPENROUTER_MODEL, + max_edge=2048, + ) + # Local CPU fallback (usually worse on already-graded Darktable exports): + # exp = p.step("ai_exposure", inputs="input", max_edge=2048, strength=0.5) + # p.step("ai_tone_map", inputs=exp, strength=0.5) + output_root = p.run() + + print(f"Pipeline finished. Output: {output_root}") + + +if __name__ == "__main__": + main() diff --git a/pipelines/example_grayscale.py b/pipelines/example_grayscale.py new file mode 100644 index 0000000..edc0499 --- /dev/null +++ b/pipelines/example_grayscale.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python3 +"""Example pipeline: convert all exported images to grayscale.""" + +from pathlib import Path + +from imagepipeline import Pipeline + +# Change this to your Darktable export folder. +INPUT = Path("/path/to/darktable/export") + +# Optional: where run folders are created (defaults to current working directory). +OUTPUT_BASE = Path.home() / "pipeline_output" + + +def main() -> None: + with Pipeline( + name="example_grayscale", + input_dir=INPUT, + output_base=OUTPUT_BASE, + ) as p: + gray = p.step("imagemagick_grayscale", inputs="input") + # Chain another step on the result: + # p.step("imagemagick_grayscale", inputs=gray, colorspace="Gray") + output_root = p.run() + + print(f"Pipeline finished. Output: {output_root}") + + +if __name__ == "__main__": + main() diff --git a/pipelines/pipeline_baxxter.py b/pipelines/pipeline_baxxter.py new file mode 100644 index 0000000..867b05e --- /dev/null +++ b/pipelines/pipeline_baxxter.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python3 +"""Baxxter pipeline: rembg variants composited over backgrounds and originals.""" + +from pathlib import Path + +from imagepipeline import Pipeline + +INPUT = Path("/home/frank/pics/20260525_Shooting Baxxter Boys/darktable_exported") +OUTPUT_BASE = Path.home() / "pipeline_output" + +GRADIENT_COLOR1 = "#d7fd00ff" +GRADIENT_COLOR2 = "#fc0adeff" + +GMIC_STEREO = "-gcd_stereo_img 0,0,2.028,1,1.714,3.06,4,1,0" +GMIC_DROP_SHADOW = "-fx_drop_shadow3d 0,0,0,10,1,1,2,0.5,252,10,222,200,0" +GMIC_BWRECOLOR = ( + "-fx_bwrecolorize 0,0,0,0,0,1,0,2,252,10,222,255,215,253,0,255," + "158,137,189,255,224,191,228,255,215,253,0,255,255,255,255,255,255,255," + "255,255,215,253,0,255" +) +GMIC_GRADIENT_A = ( + '-fx_custom_gradient 0,0,0,1,2,1,0,128,100,100,2,0,1,0,"",1,0,215,253,0,255,' + "252,10,222,255,255,255,0,255,255,255,255,255,0,255,255,255,0,255,0,255,0,0," + "255,255,128,128,128,255,255,0,255,255,0,0,0,0" +) +GMIC_GRADIENT_B = ( + '-fx_custom_gradient 0,0,0,1,2,1,0,128,100,100,2,0,1,0,"",1,0,252,10,222,255,' + "215,253,0,255,255,255,0,255,255,255,255,255,0,255,255,255,0,255,0,255,0,0," + "255,255,128,128,128,255,255,0,255,255,0,0,0,0" +) +GMIC_JPR_SMOOTH = "-jpr_gradient_smooth 0,1.5" + + +def main() -> None: + with Pipeline( + name="baxxter", + input_dir=INPUT, + output_base=OUTPUT_BASE, + ) as p: + rembg_out = p.step("rembg", inputs="input") + + white_bg = p.step("imagemagick_fill", inputs="input", color1="#ffffff") + black_bg = p.step("imagemagick_fill", inputs="input", color1="#000000") + gradient_45_bg = p.step( + "imagemagick_fill", + inputs="input", + color1=GRADIENT_COLOR1, + color2=GRADIENT_COLOR2, + gradient=True, + angle=45, + ) + gradient_radial_bg = p.step( + "imagemagick_fill", + inputs="input", + color1=GRADIENT_COLOR1, + color2=GRADIENT_COLOR2, + gradient=True, + radial=True, + ) + grayscale = p.step("gmic_grayscale", inputs="input") + + rembg_stereo = p.step("gmic", inputs=rembg_out, command=GMIC_STEREO) + rembg_shadow = p.step("gmic", inputs=rembg_out, command=GMIC_DROP_SHADOW) + rembg_bwrecolor = p.step("gmic", inputs=rembg_out, command=GMIC_BWRECOLOR) + rembg_gradient_a = p.step("gmic", inputs=rembg_out, command=GMIC_GRADIENT_A) + rembg_gradient_b = p.step("gmic", inputs=rembg_out, command=GMIC_GRADIENT_B) + rembg_jpr_smooth = p.step("gmic", inputs=rembg_out, command=GMIC_JPR_SMOOTH) + rembg_jpr_smooth_sized = p.step( + "imagemagick_scale_crop", + inputs=rembg_jpr_smooth, + scale=1.05, + ) + + # combine: white background, rembg + p.step("composite", inputs=[white_bg, rembg_out]) + + # combine: black background, rembg + p.step("composite", inputs=[black_bg, rembg_out]) + + # combine: linear gradient background, rembg + p.step("composite", inputs=[gradient_45_bg, rembg_out]) + + # combine: radial gradient background, rembg + p.step("composite", inputs=[gradient_radial_bg, rembg_out]) + + # combine: original, rembg (stereo), rembg + stereo_mid = p.step("composite", inputs=["input", rembg_stereo]) + p.step("composite", inputs=[stereo_mid, rembg_out]) + + # combine: original, rembg (drop shadow), rembg + shadow_mid = p.step("composite", inputs=["input", rembg_shadow]) + p.step("composite", inputs=[shadow_mid, rembg_out]) + + # combine: original, rembg (bw recolorize @ 50%), rembg + bw_mid = p.step( + "composite", + inputs=["input", rembg_bwrecolor], + foreground_opacity=0.5, + ) + p.step("composite", inputs=[bw_mid, rembg_out]) + + # combine: rembg (custom gradient A), rembg + p.step("composite", inputs=[rembg_gradient_a, rembg_out]) + + # combine: rembg (custom gradient B), rembg + p.step("composite", inputs=[rembg_gradient_b, rembg_out]) + + # combine: original, rembg (jpr smooth, scaled), rembg + smooth_mid = p.step("composite", inputs=["input", rembg_jpr_smooth_sized]) + p.step("composite", inputs=[smooth_mid, rembg_out]) + + # combine: original (grayscale), rembg + p.step("composite", inputs=[grayscale, rembg_out]) + + output_root = p.run() + + print(f"Pipeline finished. Output: {output_root}") + + +if __name__ == "__main__": + main() diff --git a/pipelines/pipeline_colorsplash_watermark_f12.py b/pipelines/pipeline_colorsplash_watermark_f12.py new file mode 100644 index 0000000..d710750 --- /dev/null +++ b/pipelines/pipeline_colorsplash_watermark_f12.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python3 +"""Watermark pipeline: rembg + grayscale composite + darktable style.""" + +from pathlib import Path + +from imagepipeline import Pipeline + +# Darktable export folder. +INPUT = Path("/home/frank/pics/20260517_Albershausen Crusaders - Biberach Beavers/darktable_exported/png") + +# Where timestamped run folders are created. +OUTPUT_BASE = Path.home() / "pipeline_output" + +# darktable style name (must exist in ~/.config/darktable/styles/). +STYLE = "Watermark F12.rocks" + + +def main() -> None: + with Pipeline( + name="pipeline_watermark_f12", + input_dir=INPUT, + output_base=OUTPUT_BASE, + ) as p: + rembg_out = p.step("rembg", inputs="input") + grayscale = p.step("gmic_grayscale", inputs="input") + combined = p.step("composite", inputs=[grayscale, rembg_out]) + p.step("darktable_style", inputs=combined, style=STYLE) + output_root = p.run() + + print(f"Pipeline finished. Output: {output_root}") + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..bbb0508 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,27 @@ +[build-system] +requires = ["setuptools>=68", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "imagepipeline" +version = "0.1.0" +description = "Modular image processing pipeline framework" +readme = "README.md" +requires-python = ">=3.11" +license = { text = "MIT" } +authors = [{ name = "Frank" }] +dependencies = [] + +[project.optional-dependencies] +ai = ["numpy>=1.26", "Pillow>=10.0", "torch>=2.0"] +dev = ["pytest>=8.0"] + +[project.scripts] +imagepipeline = "imagepipeline.cli:main" + +[tool.setuptools.packages.find] +where = ["."] +include = ["imagepipeline*"] + +[tool.pytest.ini_options] +testpaths = ["tests"] diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ + diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..8c8fa42 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,56 @@ +import struct +import zlib +from pathlib import Path + +import pytest + +import imagepipeline.modules # noqa: F401 + + +def make_png( + path: Path, + width: int = 2, + height: int = 2, + rgb: tuple[int, int, int] = (200, 100, 50), +) -> None: + """Write a minimal valid PNG without external dependencies.""" + r, g, b = rgb + + def chunk(tag: bytes, data: bytes) -> bytes: + crc = zlib.crc32(tag + data) & 0xFFFFFFFF + return struct.pack(">I", len(data)) + tag + data + struct.pack(">I", crc) + + raw = b"".join( + b"\x00" + bytes([r, g, b] * width) + for _ in range(height) + ) + compressed = zlib.compress(raw, 9) + ihdr = struct.pack(">IIBBBBB", width, height, 8, 2, 0, 0, 0) + png = ( + b"\x89PNG\r\n\x1a\n" + + chunk(b"IHDR", ihdr) + + chunk(b"IDAT", compressed) + + chunk(b"IEND", b"") + ) + path.write_bytes(png) + + +@pytest.fixture(autouse=True) +def _ensure_builtin_modules() -> None: + import imagepipeline.modules # noqa: F401 + + +@pytest.fixture +def input_dir(tmp_path: Path) -> Path: + directory = tmp_path / "input" + directory.mkdir() + make_png(directory / "photo_a.png") + make_png(directory / "photo_b.png", rgb=(10, 20, 30)) + return directory + + +@pytest.fixture +def output_base(tmp_path: Path) -> Path: + base = tmp_path / "output" + base.mkdir() + return base diff --git a/tests/test_ai_modules.py b/tests/test_ai_modules.py new file mode 100644 index 0000000..da1c9ec --- /dev/null +++ b/tests/test_ai_modules.py @@ -0,0 +1,222 @@ +from __future__ import annotations + +import shutil +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from imagepipeline.core.context import ModuleContext +from imagepipeline.core.exceptions import DependencyError +from imagepipeline.modules.ai_exposure import AIExposureModule +from imagepipeline.modules.ai_tone_map import AIToneMapModule +from imagepipeline.modules.comfy_flux_edit import ComfyFluxEditModule +from imagepipeline.modules.openrouter_edit import OpenRouterEditModule +from imagepipeline.modules.registry import get_module, list_modules +from tests.conftest import make_png + +try: + import numpy # noqa: F401 + + has_numpy = True +except ImportError: + has_numpy = False + +try: + import torch # noqa: F401 + + has_torch = True +except ImportError: + has_torch = False + +has_magick = bool(shutil.which("magick") or shutil.which("convert")) + + +class TestAIModuleRegistration: + def test_ai_modules_registered(self) -> None: + names = list_modules() + for name in ( + "ai_exposure", + "ai_tone_map", + "openrouter_edit", + "comfy_flux_edit", + ): + assert name in names + + def test_get_ai_modules(self) -> None: + assert get_module("ai_exposure") is AIExposureModule + assert get_module("ai_tone_map") is AIToneMapModule + assert get_module("openrouter_edit") is OpenRouterEditModule + assert get_module("comfy_flux_edit") is ComfyFluxEditModule + + +class TestAIParameters: + def test_ai_exposure_defaults(self) -> None: + params = AIExposureModule.validate_module_params({}) + assert params["skip_existing"] is True + assert params["max_edge"] == 2048 + assert params["device"] == "cpu" + assert params["strength"] == 1.0 + + def test_ai_tone_map_defaults(self) -> None: + params = AIToneMapModule.validate_module_params({}) + assert params["checkpoint"] == "" + assert params["strength"] == 1.0 + assert params["net_input_size"] == 256 + + def test_openrouter_requires_prompt(self) -> None: + with pytest.raises(ValueError, match="required"): + OpenRouterEditModule.validate_module_params({}) + + def test_openrouter_defaults(self) -> None: + params = OpenRouterEditModule.validate_module_params({"prompt": "brighten shadows"}) + assert params["model"] == "black-forest-labs/flux.2-klein-4b" + assert params["strength"] == 0.3 + assert params["api_key_env"] == "OPENROUTER_API_KEY" + + def test_comfy_requires_prompt(self) -> None: + with pytest.raises(ValueError, match="required"): + ComfyFluxEditModule.validate_module_params({}) + + +class TestAIToneMapFallback: + @pytest.mark.skipif(not has_numpy, reason="numpy not installed") + def test_clahe_fallback_writes_output(self, tmp_path: Path) -> None: + src = tmp_path / "photo.png" + make_png(src, width=8, height=8, rgb=(40, 80, 120)) + output_dir = tmp_path / "out" + output_dir.mkdir() + ctx = ModuleContext( + input_paths=[src], + matched_groups=[], + output_dir=output_dir, + params=AIToneMapModule.validate_module_params({"strength": 1.0}), + pipeline_output_root=tmp_path, + step_id="ai_tone_map_01", + logger=None, + ) + AIToneMapModule().run(ctx) + dst = output_dir / "photo.png" + assert dst.is_file() + assert dst.stat().st_size > 0 + + +class TestSkipExisting: + @pytest.mark.skipif(not has_numpy, reason="numpy not installed") + def test_second_run_skips_existing_outputs(self, tmp_path: Path, capsys) -> None: + src = tmp_path / "photo.png" + make_png(src, width=8, height=8) + output_dir = tmp_path / "out" + output_dir.mkdir() + params = AIToneMapModule.validate_module_params({"checkpoint": ""}) + from imagepipeline.core.log import PipelineLogger + + logger = PipelineLogger(verbose=True) + ctx = ModuleContext( + input_paths=[src], + matched_groups=[], + output_dir=output_dir, + params=params, + pipeline_output_root=tmp_path, + step_id="ai_tone_map_01", + logger=logger, + ) + module = AIToneMapModule() + module.run(ctx) + assert (output_dir / "photo.png").is_file() + + module.run(ctx) + output = capsys.readouterr().out + assert "Skipped module ai_tone_map" in output + + +@pytest.mark.skipif(not has_torch, reason="torch not installed") +class TestAIExposure: + def test_ai_exposure_processes_image(self, tmp_path: Path) -> None: + src = tmp_path / "photo.png" + make_png(src, width=4, height=4, rgb=(30, 60, 90)) + output_dir = tmp_path / "out" + output_dir.mkdir() + + mock_model = MagicMock() + mock_device = MagicMock() + + def fake_enhance(_model, image, *, device, strength): + return image + + with ( + patch.object(AIExposureModule, "_get_model", return_value=(mock_model, mock_device)), + patch( + "imagepipeline.ai.zero_dce.enhance_image", + side_effect=fake_enhance, + ), + ): + ctx = ModuleContext( + input_paths=[src], + matched_groups=[], + output_dir=output_dir, + params=AIExposureModule.validate_module_params({"max_edge": 0}), + pipeline_output_root=tmp_path, + step_id="ai_exposure_01", + logger=None, + ) + AIExposureModule().run(ctx) + + assert (output_dir / "photo.png").is_file() + + +class TestComfyFluxEdit: + def test_server_unreachable_raises(self, tmp_path: Path) -> None: + src = tmp_path / "photo.png" + make_png(src) + output_dir = tmp_path / "out" + output_dir.mkdir() + workflow = tmp_path / "workflow.json" + workflow.write_text('{"1": {"class_type": "LoadImage", "inputs": {"image": "x"}}}') + + ctx = ModuleContext( + input_paths=[src], + matched_groups=[], + output_dir=output_dir, + params=ComfyFluxEditModule.validate_module_params( + { + "prompt": "test", + "server_url": "http://127.0.0.1:1", + "workflow_path": workflow, + } + ), + pipeline_output_root=tmp_path, + step_id="comfy_flux_edit_01", + logger=None, + ) + with pytest.raises(DependencyError, match="not reachable"): + ComfyFluxEditModule().run(ctx) + + def test_missing_workflow_raises(self, tmp_path: Path) -> None: + src = tmp_path / "photo.png" + make_png(src) + output_dir = tmp_path / "out" + output_dir.mkdir() + ctx = ModuleContext( + input_paths=[src], + matched_groups=[], + output_dir=output_dir, + params=ComfyFluxEditModule.validate_module_params( + { + "prompt": "test", + "workflow_path": tmp_path / "missing.json", + } + ), + pipeline_output_root=tmp_path, + step_id="comfy_flux_edit_01", + logger=None, + ) + with pytest.raises(DependencyError, match="workflow not found"): + ComfyFluxEditModule().run(ctx) + + +class TestOpenRouterDependencies: + def test_missing_api_key_raises(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("OPENROUTER_API_KEY", raising=False) + with pytest.raises(DependencyError, match="OPENROUTER_API_KEY"): + OpenRouterEditModule.check_dependencies() diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py new file mode 100644 index 0000000..5a60bba --- /dev/null +++ b/tests/test_pipeline.py @@ -0,0 +1,232 @@ +from __future__ import annotations + +import json +import shutil +from pathlib import Path + +import pytest + +from imagepipeline.core.exceptions import CycleError, ValidationError +from imagepipeline.core.manifest import PipelineManifest, write_manifest +from imagepipeline.core.params import Param, validate_params +from imagepipeline.core.pipeline import Pipeline +from imagepipeline.core.runner import PipelineRunner +from imagepipeline.core.step import StepDefinition +from imagepipeline.modules.base import BaseModule +from imagepipeline.modules.registry import get_module, register, unregister +from imagepipeline.utils.files import list_images, match_by_stem +from tests.conftest import make_png + + +class TestParams: + def test_defaults_applied(self) -> None: + schema = {"colorspace": Param("string", default="Gray")} + result = validate_params(schema, {}) + assert result == {"colorspace": "Gray"} + + def test_unknown_param_rejected(self) -> None: + schema = {"a": Param("int", default=1)} + with pytest.raises(ValueError, match="Unknown parameters"): + validate_params(schema, {"b": 2}) + + def test_choices_enforced(self) -> None: + schema = {"mode": Param("string", choices=("a", "b"))} + with pytest.raises(ValueError, match="must be one of"): + validate_params(schema, {"mode": "c"}) + + +class TestFileMatching: + def test_match_by_stem_pairs_sources(self, tmp_path: Path) -> None: + a = tmp_path / "a" + b = tmp_path / "b" + a.mkdir() + b.mkdir() + make_png(a / "img1.png") + make_png(a / "img2.png") + make_png(b / "img1.png") + make_png(b / "img2.png") + + groups = match_by_stem([list_images(a), list_images(b)]) + assert len(groups) == 2 + assert groups[0][0].parent == a + assert groups[0][1].parent == b + + def test_missing_stem_raises(self, tmp_path: Path) -> None: + a = tmp_path / "a" + b = tmp_path / "b" + a.mkdir() + b.mkdir() + make_png(a / "only_a.png") + make_png(b / "only_b.png") + + with pytest.raises(ValueError, match="Cannot match images by stem"): + match_by_stem([list_images(a), list_images(b)]) + + +class TestPipelineRunner: + def test_cycle_detection(self, input_dir: Path, output_base: Path) -> None: + @register + class DummyModule(BaseModule): + name = "dummy_cycle" + + def run(self, ctx) -> None: + pass + + step_a = StepDefinition( + step_id="dummy_cycle_01", + module_name="dummy_cycle", + module=DummyModule, + input_refs=["dummy_cycle_02"], + params={}, + output_dir_name="dummy_cycle_01", + ) + step_b = StepDefinition( + step_id="dummy_cycle_02", + module_name="dummy_cycle", + module=DummyModule, + input_refs=["dummy_cycle_01"], + params={}, + output_dir_name="dummy_cycle_02", + ) + + runner = PipelineRunner( + name="cycle_test", + input_dir=input_dir, + output_base=output_base, + steps=[step_a, step_b], + ) + with pytest.raises(CycleError): + runner.run() + + unregister("dummy_cycle") + + def test_dag_execution_order(self, input_dir: Path, output_base: Path) -> None: + order: list[str] = [] + + @register + class OrderModule(BaseModule): + name = "order_tracker" + + def run(self, ctx) -> None: + order.append(ctx.step_id) + ctx.output_dir.mkdir(parents=True, exist_ok=True) + for src in ctx.input_paths: + shutil.copy2(src, ctx.output_dir / src.name) + + with Pipeline(name="order_test", input_dir=input_dir, output_base=output_base, verbose=False) as p: + step_b = p.step("order_tracker", inputs="input") + step_a = p.step("order_tracker", inputs=step_b) + p.run() + + assert order == ["order_tracker_01", "order_tracker_02"] + + unregister("order_tracker") + + def test_duplicate_module_numbering(self, input_dir: Path, output_base: Path) -> None: + @register + class NumberModule(BaseModule): + name = "number_tracker" + + def run(self, ctx) -> None: + ctx.output_dir.mkdir(parents=True, exist_ok=True) + for src in ctx.input_paths: + shutil.copy2(src, ctx.output_dir / src.name) + + with Pipeline(name="dup_test", input_dir=input_dir, output_base=output_base, verbose=False) as p: + first = p.step("number_tracker", inputs="input") + p.step("number_tracker", inputs=first) + root = p.run() + + assert (root / "number_tracker_01").is_dir() + assert (root / "number_tracker_02").is_dir() + + unregister("number_tracker") + + +class TestManifest: + def test_write_manifest(self, tmp_path: Path) -> None: + manifest = PipelineManifest( + name="test", + output_root=str(tmp_path), + input_dir="/in", + started_at="2026-01-01T00:00:00+00:00", + finished_at="2026-01-01T00:00:01+00:00", + ) + path = tmp_path / "pipeline_manifest.json" + write_manifest(path, manifest) + data = json.loads(path.read_text(encoding="utf-8")) + assert data["name"] == "test" + assert data["finished_at"] is not None + + +class TestPipelineValidation: + def test_empty_pipeline_rejected(self, input_dir: Path) -> None: + with Pipeline(name="empty", input_dir=input_dir, verbose=False) as p: + with pytest.raises(ValidationError, match="no steps"): + p.run() + + def test_unknown_module_rejected(self, input_dir: Path) -> None: + with Pipeline(name="bad", input_dir=input_dir, verbose=False) as p: + with pytest.raises(KeyError, match="Unknown module"): + p.step("does_not_exist", inputs="input") + + +has_imagemagick = bool(shutil.which("magick") or shutil.which("convert")) + + +class TestPipelineLogging: + @pytest.mark.skipif(not has_imagemagick, reason="ImageMagick not installed") + def test_verbose_output(self, input_dir: Path, output_base: Path, capsys) -> None: + with Pipeline( + name="log_test", + input_dir=input_dir, + output_base=output_base, + verbose=True, + ) as p: + p.step("imagemagick_grayscale", inputs="input") + p.run() + + output = capsys.readouterr().out + assert "Found 2 photo(s)" in output + assert "Step 1/1: imagemagick_grayscale_01 (imagemagick_grayscale)" in output + assert "Applying module imagemagick_grayscale to image [1/2]" in output + assert "Pipeline finished." in output + + @pytest.mark.skipif(not has_imagemagick, reason="ImageMagick not installed") + def test_quiet_output(self, input_dir: Path, output_base: Path, capsys) -> None: + with Pipeline( + name="quiet_test", + input_dir=input_dir, + output_base=output_base, + verbose=False, + ) as p: + p.step("imagemagick_grayscale", inputs="input") + p.run() + + assert capsys.readouterr().out == "" + + +@pytest.mark.skipif(not has_imagemagick, reason="ImageMagick not installed") +class TestImageMagickGrayscaleIntegration: + def test_grayscale_pipeline(self, input_dir: Path, output_base: Path) -> None: + with Pipeline( + name="gray_integration", + input_dir=input_dir, + output_base=output_base, + verbose=False, + ) as p: + p.step("imagemagick_grayscale", inputs="input") + root = p.run() + + manifest_path = root / "pipeline_manifest.json" + assert manifest_path.is_file() + manifest = json.loads(manifest_path.read_text(encoding="utf-8")) + assert len(manifest["steps"]) == 1 + assert manifest["steps"][0]["module"] == "imagemagick_grayscale" + + out_dir = root / "imagemagick_grayscale_01" + outputs = list_images(out_dir) + assert len(outputs) == 2 + + module = get_module("imagemagick_grayscale") + module.check_dependencies() diff --git a/tests/test_workflow_modules.py b/tests/test_workflow_modules.py new file mode 100644 index 0000000..9d0eada --- /dev/null +++ b/tests/test_workflow_modules.py @@ -0,0 +1,298 @@ +from __future__ import annotations + +import shutil +from pathlib import Path + +import pytest + +from imagepipeline.core.params import validate_params +from imagepipeline.modules.composite import CompositeModule +from imagepipeline.modules.crop_square import CropSquareModule +from imagepipeline.modules.darktable_style import DarktableStyleModule +from imagepipeline.modules.imagemagick_fill import ( + ImageMagickFillModule, + build_fill_arguments, +) +from imagepipeline.modules.imagemagick_grayscale import ImageMagickGrayscale +from imagepipeline.modules.gmic_grayscale import GmicGrayscale +from imagepipeline.modules.registry import get_module, list_modules +from imagepipeline.modules.rembg import RembgModule + +has_magick = bool(shutil.which("magick") or shutil.which("convert")) + + +class TestModuleRegistration: + def test_workflow_modules_registered(self) -> None: + names = list_modules() + for name in ( + "rembg", + "gmic_grayscale", + "composite", + "darktable_style", + "imagemagick_grayscale", + "imagemagick_fill", + "crop_square", + ): + assert name in names + + def test_get_module_returns_class(self) -> None: + assert get_module("rembg") is RembgModule + assert get_module("gmic_grayscale") is GmicGrayscale + assert get_module("composite") is CompositeModule + assert get_module("darktable_style") is DarktableStyleModule + assert get_module("crop_square") is CropSquareModule + assert get_module("imagemagick_fill") is ImageMagickFillModule + + +class TestImageMagickFill: + def test_solid_fill_arguments(self) -> None: + args = build_fill_arguments( + 800, + 600, + color1="#d7fd00", + color2=None, + gradient=False, + radial=False, + angle=None, + ) + assert args == ["-size", "800x600", "xc:#d7fd00"] + + def test_linear_gradient_arguments(self) -> None: + args = build_fill_arguments( + 100, + 50, + color1="#d7fd00", + color2="#fc0ade", + gradient=True, + radial=False, + angle=45.0, + ) + assert args == [ + "-size", + "100x50", + "-define", + "gradient:angle=45.0", + "gradient:#d7fd00-#fc0ade", + ] + + def test_radial_gradient_arguments(self) -> None: + args = build_fill_arguments( + 100, + 50, + color1="d7fd00", + color2="fc0ade", + gradient=True, + radial=True, + angle=90.0, + ) + assert args == ["-size", "100x50", "radial-gradient:#d7fd00-#fc0ade"] + + def test_requires_color1(self) -> None: + with pytest.raises(ValueError, match="required"): + ImageMagickFillModule.validate_module_params({}) + + @pytest.mark.skipif(not has_magick, reason="ImageMagick not installed") + def test_output_matches_input_size(self, tmp_path: Path) -> None: + from imagepipeline.core.context import ModuleContext + from imagepipeline.utils.subprocess import run_command + + from tests.conftest import make_png + + src = tmp_path / "ref.png" + make_png(src, width=40, height=20) + output_dir = tmp_path / "out" + output_dir.mkdir() + ctx = ModuleContext( + input_paths=[src], + matched_groups=[], + output_dir=output_dir, + params=ImageMagickFillModule.validate_module_params( + { + "color1": "#d7fd00", + "color2": "#fc0ade", + "gradient": True, + "angle": 45, + } + ), + pipeline_output_root=tmp_path, + step_id="imagemagick_fill_01", + logger=None, + ) + ImageMagickFillModule().run(ctx) + + dst = output_dir / "ref.png" + assert dst.is_file() + magick = shutil.which("magick") or shutil.which("convert") + result = run_command([magick, "-format", "%w %h", str(dst), "info:"]) + assert result.stdout.strip() == "40 20" + + +class TestCropSquare: + @pytest.mark.skipif(not has_magick, reason="ImageMagick not installed") + def test_center_crops_to_square(self, tmp_path: Path) -> None: + from imagepipeline.core.context import ModuleContext + from imagepipeline.utils.subprocess import run_command + + from tests.conftest import make_png + + src = tmp_path / "wide.png" + make_png(src, width=40, height=20) + output_dir = tmp_path / "out" + output_dir.mkdir() + ctx = ModuleContext( + input_paths=[src], + matched_groups=[], + output_dir=output_dir, + params={}, + pipeline_output_root=tmp_path, + step_id="crop_square_01", + logger=None, + ) + CropSquareModule().run(ctx) + + dst = output_dir / "wide.png" + assert dst.is_file() + magick = shutil.which("magick") or shutil.which("convert") + result = run_command( + [magick, "-format", "%w %h", str(dst), "info:"], + ) + width, height = map(int, result.stdout.strip().split()) + assert width == height == 20 + + +class TestModuleParameters: + def test_darktable_style_requires_style(self) -> None: + with pytest.raises(ValueError, match="required"): + DarktableStyleModule.validate_module_params({}) + + def test_darktable_style_accepts_style(self) -> None: + params = DarktableStyleModule.validate_module_params( + {"style": "Watermark F12.rocks"} + ) + assert params["style"] == "Watermark F12.rocks" + assert params["style_overwrite"] is True + + def test_composite_mode_choices(self) -> None: + with pytest.raises(ValueError, match="must be one of"): + CompositeModule.validate_module_params({"mode": "invalid"}) + + def test_rembg_defaults(self) -> None: + params = RembgModule.validate_module_params({}) + assert params["model"] == "birefnet-general" + assert params["alpha_matting"] is True + + def test_darktable_export_conf_jpeg(self) -> None: + from imagepipeline.modules.darktable_style import export_conf_options + + assert export_conf_options("jpeg") == [ + "plugins/imageio/format/jpeg/quality=90" + ] + + def test_darktable_export_conf_png(self) -> None: + from imagepipeline.modules.darktable_style import export_conf_options + + assert export_conf_options("png") == [ + "plugins/imageio/format/png/bpp=8", + "plugins/imageio/format/png/compression=5", + ] + + def test_darktable_resolve_format_from_input(self) -> None: + from imagepipeline.modules.darktable_style import resolve_export_format + + format_name, conf = resolve_export_format(Path("photo.jpg"), "") + assert format_name == "jpeg" + assert conf == ["plugins/imageio/format/jpeg/quality=90"] + + format_name, conf = resolve_export_format(Path("photo.png"), "") + assert format_name == "png" + assert "plugins/imageio/format/png/compression=5" in conf + + def test_darktable_rejects_unsupported_format(self) -> None: + from imagepipeline.modules.darktable_style import resolve_export_format + + with pytest.raises(ValueError, match="Unsupported output format"): + resolve_export_format(Path("photo.tiff"), "tiff") + + +@pytest.mark.skipif(not has_magick, reason="ImageMagick not installed") +class TestCompositeColor: + def test_preserves_color_over_grayscale_background(self, tmp_path: Path) -> None: + from imagepipeline.core.context import ModuleContext + from imagepipeline.utils.subprocess import run_command + + from tests.conftest import make_png + + src = tmp_path / "src.png" + make_png(src, width=40, height=40, rgb=(200, 50, 50)) + background = tmp_path / "bg.png" + foreground = tmp_path / "fg.png" + output_dir = tmp_path / "composite_out" + output_dir.mkdir() + magick = shutil.which("magick") or shutil.which("convert") + + run_command([magick, str(src), "-colorspace", "Gray", str(background)]) + run_command( + [ + magick, + "-size", + "40x40", + "xc:none", + "-fill", + "rgba(255,0,0,0.8)", + "-draw", + "circle 20,20 20,2", + str(foreground), + ] + ) + + module = CompositeModule() + ctx = ModuleContext( + input_paths=[background, foreground], + output_dir=output_dir, + params=CompositeModule.validate_module_params({}), + pipeline_output_root=tmp_path, + step_id="composite_01", + matched_groups=[[background, foreground]], + ) + module.run(ctx) + + output = output_dir / "fg.png" + assert output.is_file() + result = run_command( + [magick, "identify", "-format", "%[type]", str(output)] + ) + assert result.stdout.strip() != "Grayscale" + + +has_workflow_tools = all( + shutil.which(name) + for name in ("rembg", "gmic", "magick", "darktable-cli") +) or all(shutil.which(name) for name in ("rembg", "gmic", "convert", "darktable-cli")) + + +@pytest.mark.skipif(not has_workflow_tools, reason="Workflow CLI tools not installed") +class TestWorkflowIntegration: + def test_watermark_pipeline(self, input_dir, output_base) -> None: + from imagepipeline import Pipeline + + with Pipeline( + name="workflow_test", + input_dir=input_dir, + output_base=output_base, + verbose=False, + ) as p: + rembg_out = p.step("rembg", inputs="input") + grayscale = p.step("gmic_grayscale", inputs="input") + combined = p.step("composite", inputs=[grayscale, rembg_out]) + p.step( + "darktable_style", + inputs=combined, + style="Watermark F12.rocks", + ) + root = p.run() + + assert (root / "rembg_01").is_dir() + assert (root / "gmic_grayscale_01").is_dir() + assert (root / "composite_01").is_dir() + assert (root / "darktable_style_01").is_dir() + assert list((root / "darktable_style_01").iterdir()) diff --git a/workflows/comfy/README.md b/workflows/comfy/README.md new file mode 100644 index 0000000..3a59986 --- /dev/null +++ b/workflows/comfy/README.md @@ -0,0 +1,34 @@ +# ComfyUI Flux Klein img2img workflow (API format) + +Export a Flux Klein img2img workflow from ComfyUI once and save it as: + +``` +workflows/comfy/flux_klein_edit_api.json +``` + +Copy `flux_klein_edit_api.json.example` as a starting skeleton, then replace node IDs and wiring with your exported workflow. + +## Export steps + +1. Build an img2img workflow in ComfyUI with **Load Image**, prompt encoding, sampler, and **Save Image** nodes. +2. Enable **Dev mode** in ComfyUI settings if needed. +3. Use **Save (API Format)** on the workflow. +4. Save the JSON to `workflows/comfy/flux_klein_edit_api.json`. + +## Nodes patched by `comfy_flux_edit` + +The module walks workflow nodes by `class_type` and updates: + +| class_type | Field | Value | +|------------|-------|-------| +| `LoadImage` | `inputs.image` | Uploaded filename | +| `CLIPTextEncode` / `TextEncode` | `inputs.text` | Step `prompt` | +| `KSampler` | `inputs.denoise`, `inputs.seed` | Step params | + +## CPU warning + +ComfyUI on CPU can take hours per full-resolution image. Use `max_edge=1024` or lower in the pipeline step for experiments. + +## Server + +Default URL: `http://127.0.0.1:8188`. Start ComfyUI before running the pipeline step. diff --git a/workflows/comfy/flux_klein_edit_api.json.example b/workflows/comfy/flux_klein_edit_api.json.example new file mode 100644 index 0000000..caf1153 --- /dev/null +++ b/workflows/comfy/flux_klein_edit_api.json.example @@ -0,0 +1,37 @@ +{ + "1": { + "class_type": "LoadImage", + "inputs": { + "image": "example.png" + } + }, + "2": { + "class_type": "CLIPTextEncode", + "inputs": { + "text": "placeholder prompt", + "clip": ["3", 0] + } + }, + "4": { + "class_type": "KSampler", + "inputs": { + "seed": 0, + "steps": 20, + "cfg": 1.0, + "sampler_name": "euler", + "scheduler": "simple", + "denoise": 0.35, + "model": ["5", 0], + "positive": ["2", 0], + "negative": ["6", 0], + "latent_image": ["7", 0] + } + }, + "8": { + "class_type": "SaveImage", + "inputs": { + "filename_prefix": "imagepipeline_flux_edit", + "images": ["9", 0] + } + } +}