amayer5125 is savage

This commit is contained in:
Frank Schwenk
2026-05-30 11:33:07 +02:00
commit e7cdb8dd6f
55 changed files with 4339 additions and 0 deletions
+3
View File
@@ -0,0 +1,3 @@
# Copy to .env and fill in your key: cp .env.example .env
# OpenRouter API key — https://openrouter.ai/keys
OPENROUTER_API_KEY=
+10
View File
@@ -0,0 +1,10 @@
__pycache__/
*.py[cod]
*.egg-info/
.eggs/
dist/
build/
.pytest_cache/
.venv/
venv/
.env
+78
View File
@@ -0,0 +1,78 @@
# Image Pipeline
Modular Python framework for chaining image processing steps after Darktable export.
Each pipeline is a Python script that defines a DAG of processing steps. Every step writes its output to a numbered subfolder inside a timestamped run directory.
## Requirements
- Python 3.11+
- [ImageMagick](https://imagemagick.org/) (`magick` or `convert` on PATH)
## Installation
```bash
cd /path/to/imagepipeline
pip install -e ".[dev]"
```
## Quick Start
Edit the input path in `pipelines/example_grayscale.py`, then run:
```bash
python pipelines/example_grayscale.py
```
Or from Python:
```python
from pathlib import Path
from imagepipeline import Pipeline
with Pipeline(name="my_run", input_dir=Path("/path/to/export")) as p:
gray = p.step("imagemagick_grayscale", inputs="input")
p.run()
```
## Output Structure
Each run creates a folder like `my_run_20260527143022/`:
```
my_run_20260527143022/
├── pipeline_manifest.json
├── input/ # symlinks to source images
├── imagemagick_grayscale_01/
│ └── photo.jpg
└── ...
```
Step folders are named `{module_name}_{nn}` (two-digit counter per module name).
## Writing Pipelines
Pipelines are plain Python scripts. Reference previous steps via `StepRef` objects returned by `p.step()`:
```python
with Pipeline(name="colorsplash", input_dir=INPUT) as p:
rembg_out = p.step("rembg", inputs="input")
bw = p.step("imagemagick_grayscale", inputs="input")
combined = p.step("composite", inputs=[bw, rembg_out], mode="foreground_over")
p.step("darktable_style", inputs=combined, style="vintage.dtstyle")
p.run()
```
- `"input"` refers to the original input directory
- Parameters are passed as kwargs and validated against each module's schema
- Multiple uses of the same module get separate numbered folders
## Adding Modules
See [docs/MODULE_DEVELOPMENT.md](docs/MODULE_DEVELOPMENT.md).
## Tests
```bash
pytest
```
+275
View File
@@ -0,0 +1,275 @@
# Module development guide
This document explains how to add new processing modules to the image pipeline framework.
## Overview
Each module is a Python class that:
1. Declares a unique `name`
2. Defines accepted parameters via `parameters()`
3. Implements `run(ctx)` to process images
4. Registers itself with `@register`
Pipeline scripts reference modules by name and pass parameters as kwargs.
## Quick Start
Create a new file in `imagepipeline/modules/`, e.g. `my_module.py`:
```python
from imagepipeline.core.context import ModuleContext
from imagepipeline.core.params import Param
from imagepipeline.modules.base import BaseModule
from imagepipeline.modules.registry import register
@register
class MyModule(BaseModule):
name = "my_module"
description = "Short description for documentation"
@classmethod
def parameters(cls) -> dict[str, Param]:
return {
"strength": Param(
"float",
default=1.0,
help="Processing strength from 0.0 to 1.0",
),
}
def run(self, ctx: ModuleContext) -> None:
ctx.output_dir.mkdir(parents=True, exist_ok=True)
for src in ctx.input_paths:
dst = ctx.output_dir / src.name
# ... write dst ...
```
Import the module in `imagepipeline/modules/__init__.py` so it registers on load:
```python
import imagepipeline.modules.my_module # noqa: F401
```
Use it in a pipeline script:
```python
with Pipeline(name="my_run", input_dir=INPUT) as p:
result = p.step("my_module", inputs="input", strength=0.8)
p.run()
```
## ModuleContext
Every `run()` receives a `ModuleContext`:
| Field | Description |
|-------|-------------|
| `input_paths` | Flat list of input images for this step |
| `matched_groups` | For multi-input steps: list of path groups matched by stem |
| `output_dir` | Directory where outputs must be written |
| `params` | Validated parameters |
| `pipeline_output_root` | Root folder of the current run |
| `step_id` | Internal step identifier (e.g. `my_module_01`) |
### Single-input modules
Use `ctx.input_paths` — one path per image in the batch:
```python
for src in ctx.input_paths:
dst = ctx.output_dir / src.name
process(src, dst)
```
### Multi-input modules
When a step receives multiple input references (e.g. background + foreground), the runner matches images by filename stem across all sources.
Use `ctx.matched_groups` — each entry is a list of paths with the same stem:
```python
for group in ctx.matched_groups:
background, foreground = group
dst = ctx.output_dir / foreground.name
composite(background, foreground, dst)
```
If a stem is missing in any source, the pipeline fails with a clear error before your module runs.
## Parameters
Define parameters with `Param`:
```python
Param("string", default="value", help="Description")
Param("int", required=True, help="Required integer")
Param("float", default=0.5)
Param("bool", default=False)
Param("path")
Param("list", default=[])
Param("string", choices=("a", "b", "c"))
```
Supported types: `string`, `int`, `float`, `bool`, `path`, `list`.
Unknown kwargs passed to `p.step()` raise a validation error. Missing required parameters raise as well.
## Base Classes
### `BaseModule`
Use for pure Python processing or custom logic.
Optional class attributes:
- `supported_input_formats` — tuple of extensions (informational)
- `description` — short module description
Optional methods:
- `check_dependencies()` — raise `DependencyError` if tools are missing
- `list_output_images(ctx)` — default: all images in `output_dir`
### `SubprocessModule`
Use for CLI tools (ImageMagick, gmic, darktable-cli, rembg):
```python
from imagepipeline.modules.base import SubprocessModule
from imagepipeline.utils.subprocess import run_command
@register
class MyCliModule(SubprocessModule):
name = "my_cli_module"
command_candidates = ("my-tool", "my-tool-fallback")
@classmethod
def check_dependencies(cls) -> None:
super().check_dependencies() # verifies command_candidates
def run(self, ctx: ModuleContext) -> None:
tool = self.resolve_command()
for src in ctx.input_paths:
dst = ctx.output_dir / src.name
run_command([tool, str(src), str(dst)])
```
`run_command()` captures stderr and raises `RuntimeError` with the command output on failure.
## Output Conventions
- Write one output file per input, keeping the **same filename** (especially the stem) so downstream steps can match batches.
- Only write image files into `ctx.output_dir` — the runner discovers outputs by scanning for known image extensions.
- Create `ctx.output_dir` if your tool does not do so automatically.
## Step Folder Naming
The runner assigns output folders automatically: `{module_name}_{nn}`.
Using the same module twice in one pipeline produces separate folders:
```
imagemagick_grayscale_01/
imagemagick_grayscale_02/
darktable_style_01/
darktable_style_02/
```
You do not choose folder names in the module.
## Format Warnings
Declare `supported_input_formats` on your module. When chaining modules with incompatible formats (e.g. JPEG without alpha after rembg), document expected behavior in your module's `description` and consider converting explicitly in `run()`.
## Testing
Add tests under `tests/`:
1. **Unit tests** — parameter validation, matching logic (no external tools)
2. **Integration tests** — run the module against a real image; skip if CLI tool missing:
```python
import shutil
import pytest
pytestmark = pytest.mark.skipif(
not shutil.which("magick") and not shutil.which("convert"),
reason="ImageMagick not installed",
)
```
## Checklist for New Modules
- [ ] Unique `name` (snake_case)
- [ ] `@register` decorator
- [ ] `parameters()` documents all kwargs
- [ ] `run()` writes outputs preserving stems
- [ ] `check_dependencies()` if external tools required
- [ ] Import in `imagepipeline/modules/__init__.py`
- [ ] Test added (unit and/or integration)
## AI Modules
Optional dependencies: `pip install -e ".[ai]"` (numpy, Pillow, torch).
All AI modules inherit from `AIModule` (`imagepipeline/modules/ai_base.py`), which adds:
| Parameter | Default | Description |
|-----------|---------|-------------|
| `skip_existing` | `True` | Skip when output file already exists |
| `max_edge` | `2048` | Downscale long edge before inference (`0` = full resolution) |
| `device` | `"cpu"` | Inference device (v1: CPU only) |
Use `iter_input_images(ctx, processor)` in `run()` — it handles skip, resize, upscale, and per-image ETA logging.
### Built-in AI modules
| Module | Backend | Notes |
|--------|---------|-------|
| `ai_exposure` | Zero-DCE++ (PyTorch) | Weights auto-download on first run |
| `ai_tone_map` | HDRNet or CLAHE fallback | Set `checkpoint` to a creotiv-format `.pth`; empty uses CLAHE |
| `openrouter_edit` | OpenRouter API | Requires `OPENROUTER_API_KEY`; default model Flux Klein 4B |
| `comfy_flux_edit` | ComfyUI HTTP API | Experimental; export workflow to `workflows/comfy/flux_klein_edit_api.json` |
Example pipeline: `pipelines/example_ai.py`.
### Adding a new AI module
```python
from imagepipeline.modules.ai_base import AIModule
@register
class MyAIModule(AIModule):
name = "my_ai_module"
@classmethod
def parameters(cls) -> dict[str, Param]:
params = dict(super().parameters())
params["strength"] = Param("float", default=1.0)
return params
def run(self, ctx: ModuleContext) -> None:
self.configure_torch(ctx.params["device"])
def process(src: Path, dst: Path, index: int, total: int) -> None:
...
self.iter_input_images(ctx, process)
```
Put shared inference code under `imagepipeline/ai/`.
## Planned Modules
These slots follow the same interface — no pipeline API changes needed:
| Module | Base class | Tool |
|--------|-----------|------|
| `imagemagick_resize` | SubprocessModule | ImageMagick |
| `darktable_style` | SubprocessModule | darktable-cli |
| `rembg` | SubprocessModule | rembg |
| `gmic` | SubprocessModule | gmic |
| `composite` | BaseModule | Pillow |
+6
View File
@@ -0,0 +1,6 @@
"""Image pipeline framework."""
from imagepipeline.core.pipeline import Pipeline
__all__ = ["Pipeline"]
__version__ = "0.1.0"
+1
View File
@@ -0,0 +1 @@
"""Local AI inference helpers."""
+38
View File
@@ -0,0 +1,38 @@
from __future__ import annotations
import hashlib
import urllib.request
from pathlib import Path
CACHE_DIR = Path.home() / ".cache" / "imagepipeline" / "weights"
WEIGHT_URLS: dict[str, str] = {
"zero_dce_pp": (
"https://github.com/Li-Chongyi/Zero-DCE_extension/raw/main/"
"Zero-DCE++/snapshots_Zero_DCE++/Epoch99.pth"
),
}
def cache_path(name: str, filename: str | None = None) -> Path:
CACHE_DIR.mkdir(parents=True, exist_ok=True)
if filename:
return CACHE_DIR / filename
return CACHE_DIR / name
def ensure_weight(name: str, *, filename: str | None = None) -> Path:
if name not in WEIGHT_URLS:
raise KeyError(f"Unknown weight bundle: {name}")
dest_name = filename or f"{name}.pth"
dest = cache_path(name, dest_name)
if dest.is_file() and dest.stat().st_size > 0:
return dest
url = WEIGHT_URLS[name]
tmp = dest.with_suffix(dest.suffix + ".part")
request = urllib.request.Request(url, headers={"User-Agent": "imagepipeline/0.1"})
with urllib.request.urlopen(request, timeout=300) as response:
data = response.read()
tmp.write_bytes(data)
tmp.replace(dest)
return dest
+99
View File
@@ -0,0 +1,99 @@
from __future__ import annotations
import numpy as np
def apply_clahe_tone(image_rgb, *, clip_limit: float = 2.0, strength: float = 1.0):
"""Classical CLAHE tone mapping on the L channel in LAB space."""
from PIL import Image
if not isinstance(image_rgb, Image.Image):
image_rgb = Image.fromarray(image_rgb)
arr = np.asarray(image_rgb, dtype=np.uint8)
lab = _rgb_to_lab(arr.astype(np.float32))
l_channel = lab[:, :, 0]
enhanced_l = _clahe_1d(l_channel, clip_limit=clip_limit)
lab[:, :, 0] = l_channel * (1.0 - strength) + enhanced_l * strength
out = _lab_to_rgb(lab)
return Image.fromarray(np.clip(out, 0, 255).astype(np.uint8))
def _rgb_to_lab(rgb: np.ndarray) -> np.ndarray:
matrix = np.array(
[
[0.4124564, 0.3575761, 0.1804375],
[0.2126729, 0.7151522, 0.0721750],
[0.0193339, 0.1191920, 0.9503041],
],
dtype=np.float32,
)
linear = np.where(rgb <= 0.04045, rgb / 12.92, ((rgb + 0.055) / 1.055) ** 2.4)
linear = linear / 255.0
xyz = linear @ matrix.T
xyz = xyz * np.array([1 / 0.95047, 1.0, 1 / 1.08883], dtype=np.float32)
epsilon = 216 / 24389
kappa = 24389 / 27
def f(t):
return np.where(t > epsilon, np.cbrt(t), (kappa * t + 16) / 116)
fx, fy, fz = f(xyz[..., 0]), f(xyz[..., 1]), f(xyz[..., 2])
lab = np.stack([116 * fy - 16, 500 * (fx - fy), 200 * (fy - fz)], axis=-1)
return lab
def _lab_to_rgb(lab: np.ndarray) -> np.ndarray:
fy = (lab[..., 0] + 16) / 116
fx = lab[..., 1] / 500 + fy
fz = fy - lab[..., 2] / 200
def finv(t):
t3 = t ** 3
return np.where(t3 > 216 / 24389, t3, (116 * t - 16) / kappa)
kappa = 24389 / 27
x = finv(fx) * 0.95047
y = finv(fy)
z = finv(fz) * 1.08883
xyz = np.stack([x, y, z], axis=-1)
matrix = np.array(
[
[3.2404542, -1.5371385, -0.4985314],
[-0.9692660, 1.8760108, 0.0415560],
[0.0556434, -0.2040259, 1.0572252],
],
dtype=np.float32,
)
linear = xyz @ matrix.T
rgb = np.where(
linear <= 0.0031308,
12.92 * linear,
1.055 * np.power(np.clip(linear, 0, None), 1 / 2.4) - 0.055,
)
return rgb * 255.0
def _clahe_1d(l_channel: np.ndarray, *, clip_limit: float, tile_size: int = 8) -> np.ndarray:
height, width = l_channel.shape
tile_h = max(1, height // tile_size)
tile_w = max(1, width // tile_size)
out = np.zeros_like(l_channel, dtype=np.float32)
counts = np.zeros_like(l_channel, dtype=np.float32)
for y in range(0, height, tile_h):
for x in range(0, width, tile_w):
tile = l_channel[y : y + tile_h, x : x + tile_w]
hist, _ = np.histogram(tile, bins=256, range=(0, 256))
clip_val = max(1, int(clip_limit * tile.size / 256))
excess = np.maximum(hist - clip_val, 0).sum()
hist = np.minimum(hist, clip_val)
hist += excess // 256
cdf = hist.cumsum().astype(np.float32)
cdf = (cdf - cdf.min()) / max(cdf.max() - cdf.min(), 1.0) * 255.0
mapped = cdf[tile.astype(np.int32).clip(0, 255)]
out[y : y + tile_h, x : x + tile_w] += mapped
counts[y : y + tile_h, x : x + tile_w] += 1.0
return out / np.maximum(counts, 1.0)
View File
+250
View File
@@ -0,0 +1,250 @@
from __future__ import annotations
import math
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from imagepipeline.ai.hdrnet.slice import batch_bilateral_slice
class ConvBlock(nn.Module):
def __init__(
self,
inc,
outc,
kernel_size=3,
padding=1,
stride=1,
use_bias=True,
activation=nn.ReLU,
batch_norm=False,
) -> None:
super().__init__()
self.conv = nn.Conv2d(
int(inc), int(outc), kernel_size, padding=padding, stride=stride, bias=use_bias
)
self.activation = activation() if activation else None
self.bn = nn.BatchNorm2d(outc) if batch_norm else None
if use_bias and not batch_norm:
self.conv.bias.data.fill_(0.0)
torch.nn.init.kaiming_uniform_(self.conv.weight)
def forward(self, x):
x = self.conv(x)
if self.bn is not None:
x = self.bn(x)
if self.activation is not None:
x = self.activation(x)
return x
class FC(nn.Module):
def __init__(self, inc, outc, activation=nn.ReLU, batch_norm=False) -> None:
super().__init__()
self.fc = nn.Linear(int(inc), int(outc), bias=(not batch_norm))
self.activation = activation() if activation else None
self.bn = nn.BatchNorm1d(outc) if batch_norm else None
if not batch_norm:
self.fc.bias.data.fill_(0.0)
torch.nn.init.kaiming_uniform_(self.fc.weight)
def forward(self, x):
x = self.fc(x)
if self.bn is not None:
x = self.bn(x)
if self.activation is not None:
x = self.activation(x)
return x
class Slice(nn.Module):
def forward(self, bilateral_grid, guidemap):
bilateral_grid = bilateral_grid.permute(0, 3, 4, 2, 1)
guidemap = guidemap.squeeze(1)
coeffs = batch_bilateral_slice(bilateral_grid, guidemap).permute(0, 3, 1, 2)
return coeffs
class ApplyCoeffs(nn.Module):
def forward(self, coeff, full_res_input):
r = torch.sum(full_res_input * coeff[:, 0:3, :, :], dim=1, keepdim=True) + coeff[
:, 9:10, :, :
]
g = torch.sum(full_res_input * coeff[:, 3:6, :, :], dim=1, keepdim=True) + coeff[
:, 10:11, :, :
]
b = torch.sum(full_res_input * coeff[:, 6:9, :, :], dim=1, keepdim=True) + coeff[
:, 11:12, :, :
]
return torch.cat([r, g, b], dim=1)
class GuideNN(nn.Module):
def __init__(self, params) -> None:
super().__init__()
self.conv1 = ConvBlock(3, params["guide_complexity"], kernel_size=1, padding=0, batch_norm=True)
self.conv2 = ConvBlock(
params["guide_complexity"], 1, kernel_size=1, padding=0, activation=nn.Sigmoid
)
def forward(self, x):
return self.conv2(self.conv1(x))
class Coeffs(nn.Module):
def __init__(self, nin=4, nout=3, params=None) -> None:
super().__init__()
self.params = params
self.nin = nin
self.nout = nout
lb = params["luma_bins"]
cm = params["channel_multiplier"]
sb = params["spatial_bin"]
bn = params["batch_norm"]
nsize = params["net_input_size"]
n_layers_splat = int(np.log2(nsize / sb))
self.splat_features = nn.ModuleList()
prev_ch = 3
for index in range(n_layers_splat):
use_bn = bn if index > 0 else False
out_ch = cm * (2**index) * lb
self.splat_features.append(
ConvBlock(prev_ch, out_ch, 3, stride=2, batch_norm=use_bn)
)
prev_ch = out_ch
splat_ch = prev_ch
n_layers_global = int(np.log2(sb / 4))
self.global_features_conv = nn.ModuleList()
self.global_features_fc = nn.ModuleList()
for _ in range(n_layers_global):
self.global_features_conv.append(
ConvBlock(prev_ch, cm * 8 * lb, 3, stride=2, batch_norm=bn)
)
prev_ch = cm * 8 * lb
n_total = n_layers_splat + n_layers_global
prev_ch = int(prev_ch * (nsize / 2**n_total) ** 2)
self.global_features_fc.append(FC(prev_ch, 32 * cm * lb, batch_norm=bn))
self.global_features_fc.append(FC(32 * cm * lb, 16 * cm * lb, batch_norm=bn))
self.global_features_fc.append(FC(16 * cm * lb, 8 * cm * lb, activation=None, batch_norm=bn))
self.local_features = nn.ModuleList(
[
ConvBlock(splat_ch, 8 * cm * lb, 3, batch_norm=bn),
ConvBlock(8 * cm * lb, 8 * cm * lb, 3, activation=None, use_bias=False),
]
)
self.conv_out = ConvBlock(
8 * cm * lb, lb * nout * nin, 1, padding=0, activation=None
)
self.relu = nn.ReLU()
def forward(self, lowres_input):
params = self.params
bs = lowres_input.shape[0]
lb = params["luma_bins"]
cm = params["channel_multiplier"]
x = lowres_input
for layer in self.splat_features:
x = layer(x)
splat_features = x
for layer in self.global_features_conv:
x = layer(x)
x = x.view(bs, -1)
for layer in self.global_features_fc:
x = layer(x)
global_features = x
x = splat_features
for layer in self.local_features:
x = layer(x)
fusion = self.relu(x + global_features.view(bs, 8 * cm * lb, 1, 1))
x = self.conv_out(fusion)
return torch.stack(torch.split(x, self.nin * self.nout, 1), 2)
class HDRPointwiseNN(nn.Module):
def __init__(self, params) -> None:
super().__init__()
self.coeffs = Coeffs(params=params)
self.guide = GuideNN(params=params)
self.slice = Slice()
self.apply_coeffs = ApplyCoeffs()
def forward(self, lowres, fullres):
coeffs = self.coeffs(lowres)
guide = self.guide(fullres)
slice_coeffs = self.slice(coeffs, guide)
return self.apply_coeffs(slice_coeffs, fullres)
def default_hdrnet_params(net_input_size: int = 256) -> dict:
return {
"luma_bins": 8,
"channel_multiplier": 1,
"spatial_bin": 16,
"batch_norm": True,
"net_input_size": net_input_size,
"guide_complexity": 16,
}
def load_hdrnet_checkpoint(checkpoint_path, device: torch.device):
state = torch.load(checkpoint_path, map_location=device, weights_only=False)
if "model_params" in state:
params = state["model_params"]
del state["model_params"]
else:
params = default_hdrnet_params()
model = HDRPointwiseNN(params=params)
model.load_state_dict(state)
model.to(device)
model.eval()
return model, params
def resize_rgb_array(arr: np.ndarray, size: int) -> np.ndarray:
from PIL import Image
image = Image.fromarray(arr.astype(np.uint8))
short = min(image.size)
scale = size / short
new_size = (max(1, round(image.size[0] * scale)), max(1, round(image.size[1] * scale)))
return np.asarray(image.resize(new_size, Image.Resampling.NEAREST))
def enhance_image_hdrnet(
model: HDRPointwiseNN,
image_rgb,
*,
device: torch.device,
net_input_size: int,
strength: float = 1.0,
):
from PIL import Image
if not isinstance(image_rgb, Image.Image):
image_rgb = Image.fromarray(image_rgb)
full_arr = np.asarray(image_rgb, dtype=np.float32)
low_arr = resize_rgb_array(full_arr, net_input_size)
low = torch.from_numpy(low_arr).permute(2, 0, 1).unsqueeze(0).float() / 255.0
full = torch.from_numpy(full_arr).permute(2, 0, 1).unsqueeze(0).float() / 255.0
low = low.to(device)
full = full.to(device)
with torch.no_grad():
out = model(low, full)
if strength < 1.0:
out = full * (1.0 - strength) + out * strength
out = torch.clamp(out, 0.0, 1.0)
result = (out.squeeze(0).permute(1, 2, 0).cpu().numpy() * 255.0).astype(np.uint8)
return Image.fromarray(result)
+108
View File
@@ -0,0 +1,108 @@
from __future__ import annotations
import torch
def lerp_weight(x, xs):
dx = x - xs
abs_dx = torch.abs(dx)
return torch.maximum(
torch.tensor(1.0, device=x.device) - abs_dx,
torch.tensor(0.0, device=x.device),
)
def smoothed_abs(x, eps):
return torch.sqrt(torch.multiply(x, x) + eps)
def smoothed_lerp_weight(x, xs):
eps = torch.tensor(1e-8, dtype=torch.float32, device=x.device)
dx = x - xs
abs_dx = smoothed_abs(dx, eps)
return torch.maximum(
torch.tensor(1.0, device=x.device) - abs_dx,
torch.tensor(0.0, device=x.device),
)
def _bilateral_slice(grid, guide):
device = grid.device
ii, jj = torch.meshgrid(
[
torch.arange(guide.shape[0], device=device),
torch.arange(guide.shape[1], device=device),
],
indexing="ij",
)
scale_i = grid.shape[0] / guide.shape[0]
scale_j = grid.shape[1] / guide.shape[1]
gif = (ii + 0.5) * scale_i
gjf = (jj + 0.5) * scale_j
gkf = guide * grid.shape[2]
gi0 = torch.floor(gif - 0.5).to(torch.int32)
gj0 = torch.floor(gjf - 0.5).to(torch.int32)
gk0 = torch.floor(gkf - 0.5).to(torch.int32)
gi1 = gi0 + 1
gj1 = gj0 + 1
gk1 = gk0 + 1
wi0 = lerp_weight(gi0 + 0.5, gif)
wi1 = lerp_weight(gi1 + 0.5, gif)
wj0 = lerp_weight(gj0 + 0.5, gjf)
wj1 = lerp_weight(gj1 + 0.5, gjf)
wk0 = smoothed_lerp_weight(gk0 + 0.5, gkf)
wk1 = smoothed_lerp_weight(gk1 + 0.5, gkf)
w_000 = wi0 * wj0 * wk0
w_001 = wi0 * wj0 * wk1
w_010 = wi0 * wj1 * wk0
w_011 = wi0 * wj1 * wk1
w_100 = wi1 * wj0 * wk0
w_101 = wi1 * wj0 * wk1
w_110 = wi1 * wj1 * wk0
w_111 = wi1 * wj1 * wk1
gi0c = gi0.clip(0, grid.shape[0] - 1).to(torch.long)
gj0c = gj0.clip(0, grid.shape[1] - 1).to(torch.long)
gk0c = gk0.clip(0, grid.shape[2] - 1).to(torch.long)
gi1c = (gi0 + 1).clip(0, grid.shape[0] - 1).to(torch.long)
gj1c = (gj0 + 1).clip(0, grid.shape[1] - 1).to(torch.long)
gk1c = (gk0 + 1).clip(0, grid.shape[2] - 1).to(torch.long)
grid_val_000 = grid[gi0c, gj0c, gk0c, :]
grid_val_001 = grid[gi0c, gj0c, gk1c, :]
grid_val_010 = grid[gi0c, gj1c, gk0c, :]
grid_val_011 = grid[gi0c, gj1c, gk1c, :]
grid_val_100 = grid[gi1c, gj0c, gk0c, :]
grid_val_101 = grid[gi1c, gj0c, gk1c, :]
grid_val_110 = grid[gi1c, gj1c, gk0c, :]
grid_val_111 = grid[gi1c, gj1c, gk1c, :]
w_000, w_001, w_010, w_011 = map(
torch.atleast_3d, (w_000, w_001, w_010, w_011)
)
w_100, w_101, w_110, w_111 = map(
torch.atleast_3d, (w_100, w_101, w_110, w_111)
)
return (
torch.multiply(w_000, grid_val_000)
+ torch.multiply(w_001, grid_val_001)
+ torch.multiply(w_010, grid_val_010)
+ torch.multiply(w_011, grid_val_011)
+ torch.multiply(w_100, grid_val_100)
+ torch.multiply(w_101, grid_val_101)
+ torch.multiply(w_110, grid_val_110)
+ torch.multiply(w_111, grid_val_111)
)
def batch_bilateral_slice(grid, guide):
results = []
for index in range(grid.shape[0]):
results.append(_bilateral_slice(grid[index], guide[index]).unsqueeze(0))
return torch.concat(results, dim=0)
+59
View File
@@ -0,0 +1,59 @@
from __future__ import annotations
import shutil
from pathlib import Path
from imagepipeline.utils.subprocess import require_command, run_command
def magick_command() -> str:
return require_command("magick", "convert")
def resize_max_edge(src: Path, dst: Path, max_edge: int) -> tuple[int, int, int, int]:
"""Resize so longest edge is at most max_edge. Returns (orig_w, orig_h, work_w, work_h)."""
from PIL import Image
with Image.open(src) as image:
orig_w, orig_h = image.size
if max_edge <= 0 or max(orig_w, orig_h) <= max_edge:
shutil.copy2(src, dst)
return orig_w, orig_h, orig_w, orig_h
command = magick_command()
run_command(
[
command,
str(src),
"-resize",
f"{max_edge}x{max_edge}>",
str(dst),
]
)
with Image.open(dst) as image:
new_w, new_h = image.size
return orig_w, orig_h, new_w, new_h
def resize_to_size(src: Path, dst: Path, width: int, height: int) -> None:
command = magick_command()
run_command(
[
command,
str(src),
"-resize",
f"{width}x{height}!",
str(dst),
]
)
def load_pil_rgb(path: Path):
from PIL import Image
return Image.open(path).convert("RGB")
def save_pil_image(image, path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
image.save(path, quality=95)
+111
View File
@@ -0,0 +1,111 @@
from __future__ import annotations
import torch
import torch.nn as nn
import torch.nn.functional as F
class CSDN_Tem(nn.Module):
def __init__(self, in_ch: int, out_ch: int) -> None:
super().__init__()
self.depth_conv = nn.Conv2d(
in_channels=in_ch,
out_channels=in_ch,
kernel_size=3,
stride=1,
padding=1,
groups=in_ch,
)
self.point_conv = nn.Conv2d(
in_channels=in_ch,
out_channels=out_ch,
kernel_size=1,
stride=1,
padding=0,
groups=1,
)
def forward(self, input_tensor: torch.Tensor) -> torch.Tensor:
out = self.depth_conv(input_tensor)
return self.point_conv(out)
class EnhanceNetNoPool(nn.Module):
def __init__(self, scale_factor: int = 1) -> None:
super().__init__()
self.relu = nn.ReLU(inplace=True)
self.scale_factor = scale_factor
self.upsample = nn.UpsamplingBilinear2d(scale_factor=self.scale_factor)
number_f = 32
self.e_conv1 = CSDN_Tem(3, number_f)
self.e_conv2 = CSDN_Tem(number_f, number_f)
self.e_conv3 = CSDN_Tem(number_f, number_f)
self.e_conv4 = CSDN_Tem(number_f, number_f)
self.e_conv5 = CSDN_Tem(number_f * 2, number_f)
self.e_conv6 = CSDN_Tem(number_f * 2, number_f)
self.e_conv7 = CSDN_Tem(number_f * 2, 3)
def enhance(self, x: torch.Tensor, x_r: torch.Tensor) -> torch.Tensor:
x = x + x_r * (torch.pow(x, 2) - x)
x = x + x_r * (torch.pow(x, 2) - x)
x = x + x_r * (torch.pow(x, 2) - x)
enhance_image_1 = x + x_r * (torch.pow(x, 2) - x)
x = enhance_image_1 + x_r * (torch.pow(enhance_image_1, 2) - enhance_image_1)
x = x + x_r * (torch.pow(x, 2) - x)
x = x + x_r * (torch.pow(x, 2) - x)
return x + x_r * (torch.pow(x, 2) - x)
def forward(self, x: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]:
if self.scale_factor == 1:
x_down = x
else:
x_down = F.interpolate(
x, scale_factor=1 / self.scale_factor, mode="bilinear"
)
x1 = self.relu(self.e_conv1(x_down))
x2 = self.relu(self.e_conv2(x1))
x3 = self.relu(self.e_conv3(x2))
x4 = self.relu(self.e_conv4(x3))
x5 = self.relu(self.e_conv5(torch.cat([x3, x4], 1)))
x6 = self.relu(self.e_conv6(torch.cat([x2, x5], 1)))
x_r = F.tanh(self.e_conv7(torch.cat([x1, x6], 1)))
if self.scale_factor != 1:
x_r = self.upsample(x_r)
return self.enhance(x, x_r), x_r
def load_zero_dce_model(weights_path, device: torch.device) -> EnhanceNetNoPool:
model = EnhanceNetNoPool(scale_factor=1)
state = torch.load(weights_path, map_location=device, weights_only=False)
model.load_state_dict(state)
model.to(device)
model.eval()
return model
def enhance_image(
model: EnhanceNetNoPool,
image_rgb,
*,
device: torch.device,
strength: float = 1.0,
):
import numpy as np
from PIL import Image
if not isinstance(image_rgb, Image.Image):
image_rgb = Image.fromarray(image_rgb)
arr = np.asarray(image_rgb, dtype=np.float32) / 255.0
tensor = torch.from_numpy(arr).permute(2, 0, 1).unsqueeze(0).to(device)
with torch.no_grad():
enhanced, _ = model(tensor)
if strength < 1.0:
enhanced = tensor * (1.0 - strength) + enhanced * strength
enhanced = torch.clamp(enhanced, 0.0, 1.0)
out = (enhanced.squeeze(0).permute(1, 2, 0).cpu().numpy() * 255.0).astype(
np.uint8
)
return Image.fromarray(out)
+33
View File
@@ -0,0 +1,33 @@
from __future__ import annotations
import argparse
import sys
from pathlib import Path
from imagepipeline.modules.registry import list_modules
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(
description="Image pipeline utilities",
)
subparsers = parser.add_subparsers(dest="command")
list_parser = subparsers.add_parser("list-modules", help="List registered modules")
list_parser.set_defaults(func=_cmd_list_modules)
args = parser.parse_args(argv)
if not getattr(args, "func", None):
parser.print_help()
return 1
return args.func(args)
def _cmd_list_modules(args: argparse.Namespace) -> int:
for name in list_modules():
print(name)
return 0
if __name__ == "__main__":
sys.exit(main())
+1
View File
@@ -0,0 +1 @@
"""Core pipeline engine."""
+29
View File
@@ -0,0 +1,29 @@
from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
from imagepipeline.core.log import PipelineLogger
@dataclass
class ModuleContext:
"""Runtime context passed to each module's run() method."""
input_paths: list[Path]
output_dir: Path
params: dict[str, Any]
pipeline_output_root: Path
step_id: str
matched_groups: list[list[Path]] = field(default_factory=list)
logger: PipelineLogger | None = None
@property
def is_multi_input(self) -> bool:
return len(self.matched_groups) > 1
def log_image(self, module_name: str, index: int, total: int, path: Path) -> None:
if self.logger is not None:
self.logger.image(module_name, index, total, path.name)
+18
View File
@@ -0,0 +1,18 @@
class PipelineError(Exception):
"""Base error for pipeline failures."""
class StepError(PipelineError):
"""Error during step execution."""
class DependencyError(PipelineError):
"""Missing external tool or module dependency."""
class ValidationError(PipelineError):
"""Invalid parameters or configuration."""
class CycleError(PipelineError):
"""Cycle detected in pipeline DAG."""
+82
View File
@@ -0,0 +1,82 @@
from __future__ import annotations
import sys
from typing import TextIO
def _format_eta(seconds: float) -> str:
seconds = max(0, int(seconds))
minutes, secs = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
if hours:
return f"{hours}h{minutes:02d}m"
if minutes:
return f"{minutes}m{secs:02d}s"
return f"{secs}s"
class PipelineLogger:
"""Simple stdout logger for pipeline progress."""
def __init__(self, *, verbose: bool = True, stream: TextIO | None = None) -> None:
self.verbose = verbose
self.stream = stream or sys.stdout
def info(self, message: str) -> None:
if self.verbose:
print(message, file=self.stream, flush=True)
def step_start(
self,
step_index: int,
step_total: int,
step_id: str,
module_name: str,
*,
inputs: list[str],
params: dict,
) -> None:
self.info(
f"Step {step_index}/{step_total}: {step_id} ({module_name})"
)
self.info(f" inputs: {', '.join(inputs)}")
if params:
rendered = ", ".join(f"{key}={value!r}" for key, value in params.items())
self.info(f" params: {rendered}")
def image(self, module_name: str, index: int, total: int, filename: str) -> None:
self.info(
f" Applying module {module_name} to image [{index}/{total}]: {filename}"
)
def skipped(self, module_name: str, index: int, total: int, filename: str) -> None:
self.info(
f" Skipped module {module_name} [{index}/{total}] {filename} (output exists)"
)
def image_done(
self,
module_name: str,
index: int,
total: int,
filename: str,
elapsed: float,
*,
eta_seconds: float | None = None,
) -> None:
eta_part = ""
if eta_seconds is not None and eta_seconds > 0:
eta_part = f" (ETA ~{_format_eta(eta_seconds)})"
self.info(
f" Finished module {module_name} [{index}/{total}] {filename} "
f"in {elapsed:.1f}s{eta_part}"
)
def warning(self, message: str) -> None:
self.info(f" Warning: {message}")
def step_done(self, step_id: str, output_dir: str, count: int) -> None:
self.info(f" Done: {count} image(s) -> {output_dir}/")
def blank(self) -> None:
self.info("")
+42
View File
@@ -0,0 +1,42 @@
from __future__ import annotations
import json
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
@dataclass
class StepManifestEntry:
step_id: str
output_dir: str
module: str
inputs: list[str]
params: dict[str, Any]
input_files: list[str] = field(default_factory=list)
output_files: list[str] = field(default_factory=list)
@dataclass
class PipelineManifest:
name: str
output_root: str
input_dir: str
started_at: str
finished_at: str | None = None
steps: list[StepManifestEntry] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
return asdict(self)
def write_manifest(path: Path, manifest: PipelineManifest) -> None:
path.write_text(
json.dumps(manifest.to_dict(), indent=2, ensure_ascii=False) + "\n",
encoding="utf-8",
)
def utc_now_iso() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat()
+70
View File
@@ -0,0 +1,70 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
@dataclass(frozen=True)
class Param:
"""Schema entry for a module parameter."""
type: str
default: Any = None
required: bool = False
help: str = ""
choices: tuple[Any, ...] | None = None
def validate_value(self, name: str, value: Any) -> Any:
if value is None:
if self.required:
raise ValueError(f"Parameter '{name}' is required")
return self.default
if self.choices is not None and value not in self.choices:
allowed = ", ".join(repr(c) for c in self.choices)
raise ValueError(
f"Parameter '{name}' must be one of [{allowed}], got {value!r}"
)
if self.type == "string":
if not isinstance(value, str):
raise ValueError(f"Parameter '{name}' must be a string")
return value
if self.type == "int":
if isinstance(value, bool) or not isinstance(value, int):
raise ValueError(f"Parameter '{name}' must be an integer")
return value
if self.type == "float":
if isinstance(value, bool) or not isinstance(value, (int, float)):
raise ValueError(f"Parameter '{name}' must be a number")
return float(value)
if self.type == "bool":
if not isinstance(value, bool):
raise ValueError(f"Parameter '{name}' must be a boolean")
return value
if self.type == "path":
from pathlib import Path
if not isinstance(value, (str, Path)):
raise ValueError(f"Parameter '{name}' must be a path")
return Path(value)
if self.type == "list":
if not isinstance(value, list):
raise ValueError(f"Parameter '{name}' must be a list")
return value
raise ValueError(f"Unknown parameter type '{self.type}' for '{name}'")
def validate_params(
schema: dict[str, Param], raw: dict[str, Any]
) -> dict[str, Any]:
unknown = set(raw) - set(schema)
if unknown:
names = ", ".join(sorted(unknown))
raise ValueError(f"Unknown parameters: {names}")
validated: dict[str, Any] = {}
for name, spec in schema.items():
validated[name] = spec.validate_value(name, raw.get(name))
return validated
+109
View File
@@ -0,0 +1,109 @@
from __future__ import annotations
from collections import defaultdict
from pathlib import Path
from typing import Any
from imagepipeline.core.exceptions import ValidationError
from imagepipeline.core.runner import PipelineRunner
from imagepipeline.core.step import INPUT_SOURCE, StepDefinition, StepRef
from imagepipeline.modules.registry import get_module
# Import built-in modules so they register on package load.
import imagepipeline.modules.imagemagick_grayscale # noqa: F401
class Pipeline:
"""Define and run an image processing pipeline."""
def __init__(
self,
*,
input_dir: Path | str,
name: str = "pipeline",
output_base: Path | str | None = None,
symlink_input: bool = True,
verbose: bool = True,
) -> None:
self.input_dir = Path(input_dir)
self.name = name
self.output_base = Path(output_base) if output_base else None
self.symlink_input = symlink_input
self.verbose = verbose
self._steps: list[StepDefinition] = []
self._module_counters: dict[str, int] = defaultdict(int)
self._output_root: Path | None = None
def step(
self,
module_name: str,
*,
inputs: StepRef | str | list[StepRef | str],
**params: Any,
) -> StepRef:
module_cls = get_module(module_name)
self._module_counters[module_name] += 1
counter = self._module_counters[module_name]
output_dir_name = f"{module_name}_{counter:02d}"
step_id = output_dir_name
input_refs = self._normalize_inputs(inputs)
reserved = {"inputs", "input"}
if reserved & set(params):
raise ValidationError(
"Do not pass 'inputs' or 'input' as module parameters"
)
definition = StepDefinition(
step_id=step_id,
module_name=module_name,
module=module_cls,
input_refs=input_refs,
params=params,
output_dir_name=output_dir_name,
)
self._steps.append(definition)
return StepRef(step_id=step_id, output_dir_name=output_dir_name)
def run(self) -> Path:
if not self._steps:
raise ValidationError("Pipeline has no steps")
runner = PipelineRunner(
name=self.name,
input_dir=self.input_dir,
output_base=self.output_base,
steps=self._steps,
symlink_input=self.symlink_input,
verbose=self.verbose,
)
self._output_root = runner.run()
return self._output_root
@property
def output_root(self) -> Path | None:
return self._output_root
def _normalize_inputs(
self, inputs: StepRef | str | list[StepRef | str]
) -> list[str]:
if isinstance(inputs, list):
if not inputs:
raise ValidationError("inputs must not be an empty list")
return [self._input_ref(item) for item in inputs]
return [self._input_ref(inputs)]
@staticmethod
def _input_ref(value: StepRef | str) -> str:
if isinstance(value, StepRef):
return value.step_id
if value == INPUT_SOURCE:
return INPUT_SOURCE
if isinstance(value, str):
return value
raise ValidationError(f"Invalid input reference: {value!r}")
def __enter__(self) -> Pipeline:
return self
def __exit__(self, exc_type, exc, tb) -> None:
return None
+207
View File
@@ -0,0 +1,207 @@
from __future__ import annotations
import os
from collections import defaultdict, deque
from datetime import datetime
from pathlib import Path
from imagepipeline.core.context import ModuleContext
from imagepipeline.core.exceptions import CycleError, StepError, ValidationError
from imagepipeline.core.manifest import (
PipelineManifest,
StepManifestEntry,
utc_now_iso,
write_manifest,
)
from imagepipeline.core.step import (
INPUT_SOURCE,
StepDefinition,
StepResult,
)
from imagepipeline.utils.files import flatten_matched, list_images, match_by_stem
class PipelineRunner:
def __init__(
self,
*,
name: str,
input_dir: Path,
output_base: Path | None,
steps: list[StepDefinition],
symlink_input: bool = True,
verbose: bool = True,
) -> None:
from imagepipeline.core.log import PipelineLogger
self.name = name
self.input_dir = input_dir.resolve()
self.output_base = (output_base or Path.cwd()).resolve()
self.steps = steps
self.symlink_input = symlink_input
self.logger = PipelineLogger(verbose=verbose)
self.output_root = self._build_output_root()
self._input_link_dir = self.output_root / "input"
self._results: dict[str, StepResult] = {}
def _build_output_root(self) -> Path:
timestamp = datetime.now().strftime("%y%m%d%H%M%S")
folder_name = f"{self.name}_{timestamp}"
output_root = self.output_base / folder_name
output_root.mkdir(parents=True, exist_ok=False)
return output_root
def run(self) -> Path:
images = list_images(self.input_dir)
self.logger.info(f"Pipeline: {self.name}")
self.logger.info(f"Input: {self.input_dir}")
self.logger.info(f"Output: {self.output_root}")
self.logger.blank()
self.logger.info(f"Found {len(images)} photo(s)")
self.logger.blank()
self._prepare_input_dir(images)
order = self._topological_sort()
step_total = len(order)
manifest = PipelineManifest(
name=self.name,
output_root=str(self.output_root),
input_dir=str(self.input_dir),
started_at=utc_now_iso(),
)
for step_index, step in enumerate(order, start=1):
result = self._execute_step(step, step_index=step_index, step_total=step_total)
self._results[step.step_id] = result
manifest.steps.append(
StepManifestEntry(
step_id=step.step_id,
output_dir=step.output_dir_name,
module=step.module_name,
inputs=step.input_refs,
params=step.params,
input_files=[str(p) for p in result.input_paths],
output_files=[str(p) for p in result.output_paths],
)
)
manifest.finished_at = utc_now_iso()
write_manifest(self.output_root / "pipeline_manifest.json", manifest)
self.logger.blank()
self.logger.info("Pipeline finished.")
self.logger.info(f"Manifest: {self.output_root / 'pipeline_manifest.json'}")
return self.output_root
def _prepare_input_dir(self, images: list[Path]) -> None:
self._input_link_dir.mkdir(parents=True, exist_ok=True)
for src in images:
dst = self._input_link_dir / src.name
if dst.exists() or dst.is_symlink():
continue
if self.symlink_input:
os.symlink(src, dst)
else:
import shutil
shutil.copy2(src, dst)
def _resolve_source_paths(self, ref: str) -> list[Path]:
if ref == INPUT_SOURCE:
return list_images(self._input_link_dir)
if ref not in self._results:
raise StepError(f"Unknown step reference: {ref}")
return self._results[ref].output_paths
def _execute_step(
self,
step: StepDefinition,
*,
step_index: int,
step_total: int,
) -> StepResult:
step.module.check_dependencies()
validated = step.module.validate_module_params(step.params)
source_lists = [self._resolve_source_paths(ref) for ref in step.input_refs]
matched_groups = match_by_stem(source_lists)
input_paths = flatten_matched(matched_groups)
output_dir = self.output_root / step.output_dir_name
output_dir.mkdir(parents=True, exist_ok=True)
self.logger.step_start(
step_index,
step_total,
step.output_dir_name,
step.module_name,
inputs=step.input_refs,
params=validated,
)
ctx = ModuleContext(
input_paths=input_paths,
output_dir=output_dir,
params=validated,
pipeline_output_root=self.output_root,
step_id=step.step_id,
matched_groups=matched_groups,
logger=self.logger,
)
try:
step.module().run(ctx)
except Exception as exc:
raise StepError(
f"Step '{step.output_dir_name}' ({step.module_name}) failed: {exc}"
) from exc
output_paths = step.module().list_output_images(ctx)
if not output_paths:
raise StepError(
f"Step '{step.output_dir_name}' ({step.module_name}) "
"produced no output images"
)
self.logger.step_done(step.output_dir_name, step.output_dir_name, len(output_paths))
self.logger.blank()
return StepResult(
step_id=step.step_id,
output_dir_name=step.output_dir_name,
module_name=step.module_name,
output_dir=output_dir,
input_paths=input_paths,
output_paths=output_paths,
params=validated,
)
def _topological_sort(self) -> list[StepDefinition]:
step_by_id = {step.step_id: step for step in self.steps}
in_degree: dict[str, int] = {step.step_id: 0 for step in self.steps}
dependents: dict[str, list[str]] = defaultdict(list)
for step in self.steps:
deps = [ref for ref in step.input_refs if ref != INPUT_SOURCE]
in_degree[step.step_id] = len(deps)
for dep in deps:
if dep not in step_by_id:
raise ValidationError(f"Step '{step.step_id}' references unknown step '{dep}'")
dependents[dep].append(step.step_id)
queue = deque(
step_id for step_id, degree in in_degree.items() if degree == 0
)
ordered_ids: list[str] = []
while queue:
step_id = queue.popleft()
ordered_ids.append(step_id)
for dependent in dependents[step_id]:
in_degree[dependent] -= 1
if in_degree[dependent] == 0:
queue.append(dependent)
if len(ordered_ids) != len(self.steps):
raise CycleError("Pipeline contains a cycle in step dependencies")
return [step_by_id[step_id] for step_id in ordered_ids]
+47
View File
@@ -0,0 +1,47 @@
from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
from imagepipeline.modules.base import BaseModule
INPUT_SOURCE = "input"
@dataclass
class StepDefinition:
"""Internal step registered on a pipeline."""
step_id: str
module_name: str
module: BaseModule
input_refs: list[str]
params: dict[str, Any]
output_dir_name: str
@dataclass
class StepRef:
"""Reference to a pipeline step, returned by Pipeline.step()."""
step_id: str
output_dir_name: str
def __repr__(self) -> str:
return f"StepRef({self.output_dir_name!r})"
@dataclass
class StepResult:
"""Result of an executed step."""
step_id: str
output_dir_name: str
module_name: str
output_dir: Path
input_paths: list[Path]
output_paths: list[Path] = field(default_factory=list)
params: dict[str, Any] = field(default_factory=dict)
+15
View File
@@ -0,0 +1,15 @@
"""Built-in pipeline modules."""
import imagepipeline.modules.ai_exposure # noqa: F401
import imagepipeline.modules.ai_tone_map # noqa: F401
import imagepipeline.modules.comfy_flux_edit # noqa: F401
import imagepipeline.modules.composite # noqa: F401
import imagepipeline.modules.crop_square # noqa: F401
import imagepipeline.modules.darktable_style # noqa: F401
import imagepipeline.modules.gmic # noqa: F401
import imagepipeline.modules.gmic_grayscale # noqa: F401
import imagepipeline.modules.imagemagick_fill # noqa: F401
import imagepipeline.modules.imagemagick_grayscale # noqa: F401
import imagepipeline.modules.imagemagick_scale_crop # noqa: F401
import imagepipeline.modules.openrouter_edit # noqa: F401
import imagepipeline.modules.rembg # noqa: F401
+126
View File
@@ -0,0 +1,126 @@
from __future__ import annotations
import time
from collections.abc import Callable
from pathlib import Path
from typing import Any, ClassVar
from imagepipeline.core.context import ModuleContext
from imagepipeline.core.params import Param
from imagepipeline.modules.base import BaseModule
def _format_eta(seconds: float) -> str:
seconds = max(0, int(seconds))
minutes, secs = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
if hours:
return f"{hours}h{minutes:02d}m"
if minutes:
return f"{minutes}m{secs:02d}s"
return f"{secs}s"
class AIModule(BaseModule):
"""Base class for AI pipeline modules (CPU-first, batch-friendly)."""
ai_common_parameters: ClassVar[dict[str, Param]] = {
"skip_existing": Param(
"bool",
default=True,
help="Skip images whose output file already exists",
),
"max_edge": Param(
"int",
default=2048,
help="Max long edge for inference (0 = full resolution)",
),
"device": Param(
"string",
default="cpu",
choices=("cpu",),
help="Inference device (v1 supports cpu only)",
),
}
@classmethod
def parameters(cls) -> dict[str, Param]:
return dict(cls.ai_common_parameters)
@classmethod
def all_parameters(cls) -> dict[str, Param]:
merged = dict(cls.ai_common_parameters)
merged.update(cls.parameters())
return merged
@classmethod
def validate_module_params(cls, raw: dict[str, Any]) -> dict[str, Any]:
from imagepipeline.core.params import validate_params
return validate_params(cls.all_parameters(), raw)
def output_path(self, src: Path, ctx: ModuleContext) -> Path:
return ctx.output_dir / src.name
def configure_torch(self, device_name: str) -> None:
import torch
if device_name != "cpu":
raise ValueError(f"Unsupported device: {device_name!r} (only 'cpu' in v1)")
torch.set_num_threads(24)
def iter_input_images(
self,
ctx: ModuleContext,
processor: Callable[[Path, Path, int, int], None],
) -> None:
import tempfile
from imagepipeline.ai.imaging import resize_max_edge, resize_to_size
ctx.output_dir.mkdir(parents=True, exist_ok=True)
skip_existing = ctx.params["skip_existing"]
max_edge = ctx.params["max_edge"]
total = len(ctx.input_paths)
elapsed_times: list[float] = []
for index, src in enumerate(ctx.input_paths, start=1):
dst = self.output_path(src, ctx)
if skip_existing and dst.is_file():
if ctx.logger is not None:
ctx.logger.skipped(self.name, index, total, dst.name)
continue
self.log_image(ctx, index, total, src)
started = time.perf_counter()
if max_edge > 0:
with tempfile.TemporaryDirectory(prefix="imagepipeline_ai_") as tmp:
tmp_path = Path(tmp)
work_src = tmp_path / f"work_{src.name}"
work_out = tmp_path / f"out_{src.name}"
orig_w, orig_h, _, _ = resize_max_edge(src, work_src, max_edge)
processor(work_src, work_out, index, total)
if (orig_w, orig_h) != self._image_size(work_out):
resize_to_size(work_out, dst, orig_w, orig_h)
else:
work_out.replace(dst)
else:
processor(src, dst, index, total)
elapsed = time.perf_counter() - started
elapsed_times.append(elapsed)
remaining = total - index
avg = sum(elapsed_times) / len(elapsed_times)
eta = avg * remaining if remaining else 0.0
if ctx.logger is not None:
ctx.logger.image_done(
self.name, index, total, dst.name, elapsed, eta_seconds=eta
)
@staticmethod
def _image_size(path: Path) -> tuple[int, int]:
from PIL import Image
with Image.open(path) as image:
return image.size
+69
View File
@@ -0,0 +1,69 @@
from __future__ import annotations
from pathlib import Path
from imagepipeline.ai.cache import ensure_weight
from imagepipeline.ai.imaging import load_pil_rgb, save_pil_image
from imagepipeline.core.context import ModuleContext
from imagepipeline.core.exceptions import DependencyError
from imagepipeline.core.params import Param
from imagepipeline.modules.ai_base import AIModule
from imagepipeline.modules.registry import register
@register
class AIExposureModule(AIModule):
name = "ai_exposure"
description = "Exposure enhancement using Zero-DCE++ (CPU)"
_model = None
_model_device = None
@classmethod
def parameters(cls) -> dict[str, Param]:
params = dict(super().parameters())
params["strength"] = Param(
"float",
default=1.0,
help="Blend factor between original (0) and enhanced (1)",
)
return params
@classmethod
def check_dependencies(cls) -> None:
try:
import numpy # noqa: F401
import torch # noqa: F401
from PIL import Image # noqa: F401
except ImportError as exc:
raise DependencyError(
"ai_exposure requires optional deps: pip install -e '.[ai]'"
) from exc
@classmethod
def _get_model(cls, device_name: str):
import torch
from imagepipeline.ai.zero_dce import load_zero_dce_model
device = torch.device(device_name)
if cls._model is None or cls._model_device != device:
weights = ensure_weight("zero_dce_pp", filename="zero_dce_pp_epoch99.pth")
cls._model = load_zero_dce_model(weights, device)
cls._model_device = device
return cls._model, device
def run(self, ctx: ModuleContext) -> None:
from imagepipeline.ai.zero_dce import enhance_image
self.check_dependencies()
self.configure_torch(ctx.params["device"])
model, device = self._get_model(ctx.params["device"])
strength = ctx.params["strength"]
def process(src: Path, dst: Path, _index: int, _total: int) -> None:
image = load_pil_rgb(src)
result = enhance_image(model, image, device=device, strength=strength)
save_pil_image(result, dst)
self.iter_input_images(ctx, process)
+108
View File
@@ -0,0 +1,108 @@
from __future__ import annotations
from pathlib import Path
from imagepipeline.ai.classical_tone import apply_clahe_tone
from imagepipeline.ai.imaging import load_pil_rgb, save_pil_image
from imagepipeline.core.context import ModuleContext
from imagepipeline.core.params import Param
from imagepipeline.modules.ai_base import AIModule
from imagepipeline.modules.registry import register
@register
class AIToneMapModule(AIModule):
name = "ai_tone_map"
description = "Tone mapping via HDRNet checkpoint or CLAHE fallback (CPU)"
_hdrnet_model = None
_hdrnet_checkpoint: Path | None = None
_warned_fallback = False
@classmethod
def parameters(cls) -> dict[str, Param]:
params = dict(super().parameters())
params.update(
{
"checkpoint": Param(
"string",
default="",
help="HDRNet .pth checkpoint (creotiv format); empty uses CLAHE",
),
"strength": Param(
"float",
default=1.0,
help="Blend factor between original and tone-mapped result",
),
"net_input_size": Param(
"int",
default=256,
help="HDRNet low-res branch input size",
),
}
)
return params
@classmethod
def check_dependencies(cls) -> None:
try:
import numpy # noqa: F401
from PIL import Image # noqa: F401
except ImportError as exc:
from imagepipeline.core.exceptions import DependencyError
raise DependencyError(
"ai_tone_map requires optional deps: pip install -e '.[ai]'"
) from exc
def _load_hdrnet(self, checkpoint: Path, device_name: str):
import torch
from imagepipeline.ai.hdrnet.model import load_hdrnet_checkpoint
device = torch.device(device_name)
if self._hdrnet_model is None or self._hdrnet_checkpoint != checkpoint:
self._hdrnet_model, _params = load_hdrnet_checkpoint(checkpoint, device)
self._hdrnet_checkpoint = checkpoint
return self._hdrnet_model, device
def run(self, ctx: ModuleContext) -> None:
self.check_dependencies()
checkpoint = ctx.params["checkpoint"]
strength = ctx.params["strength"]
use_hdrnet = bool(checkpoint)
if use_hdrnet:
try:
import torch # noqa: F401
except ImportError as exc:
raise RuntimeError(
"HDRNet tone mapping requires torch: pip install -e '.[ai]'"
) from exc
self.configure_torch(ctx.params["device"])
model, device = self._load_hdrnet(Path(checkpoint), ctx.params["device"])
elif ctx.logger is not None and not self._warned_fallback:
ctx.logger.warning(
"Using classical CLAHE tone mapping (no HDRNet checkpoint configured)"
)
self._warned_fallback = True
net_input_size = ctx.params["net_input_size"]
def process(src: Path, dst: Path, _index: int, _total: int) -> None:
image = load_pil_rgb(src)
if use_hdrnet:
from imagepipeline.ai.hdrnet.model import enhance_image_hdrnet
result = enhance_image_hdrnet(
model,
image,
device=device,
net_input_size=net_input_size,
strength=strength,
)
else:
result = apply_clahe_tone(image, strength=strength)
save_pil_image(result, dst)
self.iter_input_images(ctx, process)
+66
View File
@@ -0,0 +1,66 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any, ClassVar
from imagepipeline.core.context import ModuleContext
from imagepipeline.core.params import Param, validate_params
from imagepipeline.utils.files import is_image
class BaseModule(ABC):
"""Base class for all pipeline modules."""
name: ClassVar[str]
description: ClassVar[str] = ""
supported_input_formats: ClassVar[tuple[str, ...]] = (
".jpg",
".jpeg",
".png",
".tif",
".tiff",
".webp",
)
@classmethod
def parameters(cls) -> dict[str, Param]:
return {}
@classmethod
def validate_module_params(cls, raw: dict[str, Any]) -> dict[str, Any]:
return validate_params(cls.parameters(), raw)
@classmethod
def check_dependencies(cls) -> None:
"""Raise DependencyError if required external tools are missing."""
@abstractmethod
def run(self, ctx: ModuleContext) -> None:
"""Process ctx.input_paths and write outputs into ctx.output_dir."""
def log_image(self, ctx: ModuleContext, index: int, total: int, path: Path) -> None:
ctx.log_image(self.name, index, total, path)
def list_output_images(self, ctx: ModuleContext) -> list[Path]:
return sorted(p for p in ctx.output_dir.iterdir() if is_image(p))
class SubprocessModule(BaseModule):
"""Base class for modules that shell out to CLI tools."""
command_candidates: ClassVar[tuple[str, ...]] = ()
default_timeout: ClassVar[float | None] = None
@classmethod
def check_dependencies(cls) -> None:
from imagepipeline.utils.subprocess import require_command
if cls.command_candidates:
require_command(*cls.command_candidates)
@classmethod
def resolve_command(cls) -> str:
from imagepipeline.utils.subprocess import require_command
return require_command(*cls.command_candidates)
+221
View File
@@ -0,0 +1,221 @@
from __future__ import annotations
import json
import random
import time
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
from imagepipeline.core.context import ModuleContext
from imagepipeline.core.exceptions import DependencyError
from imagepipeline.core.params import Param
from imagepipeline.modules.ai_base import AIModule
from imagepipeline.modules.registry import register
REPO_ROOT = Path(__file__).resolve().parents[2]
@register
class ComfyFluxEditModule(AIModule):
name = "comfy_flux_edit"
description = "Experimental FLUX img2img editing via ComfyUI HTTP API (CPU: very slow)"
@classmethod
def parameters(cls) -> dict[str, Param]:
params = dict(super().parameters())
params.update(
{
"prompt": Param(
"string",
required=True,
help="Edit prompt for the ComfyUI workflow",
),
"denoise": Param(
"float",
default=0.35,
help="KSampler denoise strength",
),
"seed": Param(
"int",
default=-1,
help="Random seed (-1 = random per image)",
),
"server_url": Param(
"string",
default="http://127.0.0.1:8188",
help="ComfyUI server base URL",
),
"workflow_path": Param(
"path",
default=REPO_ROOT / "workflows" / "comfy" / "flux_klein_edit_api.json",
help="ComfyUI workflow exported in API format",
),
"poll_interval": Param(
"float",
default=2.0,
help="Seconds between history polls",
),
}
)
return params
@classmethod
def check_dependencies(cls) -> None:
pass
def run(self, ctx: ModuleContext) -> None:
workflow_path = Path(ctx.params["workflow_path"])
if not workflow_path.is_file():
raise DependencyError(
f"ComfyUI workflow not found: {workflow_path}. "
"Export a Flux Klein img2img workflow to this path (see workflows/comfy/README.md)."
)
server_url = ctx.params["server_url"].rstrip("/")
self._ensure_server(server_url)
if ctx.logger is not None:
ctx.logger.warning(
"ComfyUI on CPU: expect hours per full-resolution image"
)
workflow_template = json.loads(workflow_path.read_text(encoding="utf-8"))
denoise = ctx.params["denoise"]
prompt = ctx.params["prompt"]
poll_interval = ctx.params["poll_interval"]
def process(src: Path, dst: Path, _index: int, _total: int) -> None:
uploaded_name = self._upload_image(server_url, src)
seed = ctx.params["seed"]
if seed < 0:
seed = random.randint(0, 2**32 - 1)
workflow = self._patch_workflow(
workflow_template,
image_name=uploaded_name,
prompt=prompt,
denoise=denoise,
seed=seed,
)
prompt_id = self._queue_prompt(server_url, workflow)
output_info = self._wait_for_output(
server_url, prompt_id, poll_interval=poll_interval
)
image_bytes = self._download_view(server_url, output_info)
dst.write_bytes(image_bytes)
self.iter_input_images(ctx, process)
@staticmethod
def _ensure_server(server_url: str) -> None:
try:
urllib.request.urlopen(f"{server_url}/system_stats", timeout=5)
except urllib.error.URLError as exc:
raise DependencyError(
f"ComfyUI server not reachable at {server_url}: {exc}"
) from exc
@staticmethod
def _upload_image(server_url: str, src: Path) -> str:
boundary = "----imagepipelineboundary"
body_parts = [
f"--{boundary}\r\n".encode(),
(
f'Content-Disposition: form-data; name="image"; filename="{src.name}"\r\n'
f"Content-Type: application/octet-stream\r\n\r\n"
).encode(),
src.read_bytes(),
b"\r\n",
f"--{boundary}\r\n".encode(),
b'Content-Disposition: form-data; name="overwrite"\r\n\r\n',
b"true\r\n",
f"--{boundary}--\r\n".encode(),
]
body = b"".join(body_parts)
request = urllib.request.Request(
f"{server_url}/upload/image",
data=body,
headers={"Content-Type": f"multipart/form-data; boundary={boundary}"},
method="POST",
)
with urllib.request.urlopen(request, timeout=120) as response:
data = json.loads(response.read().decode("utf-8"))
return data["name"]
@staticmethod
def _patch_workflow(
template: dict,
*,
image_name: str,
prompt: str,
denoise: float,
seed: int,
) -> dict:
workflow = json.loads(json.dumps(template))
for node in workflow.values():
if not isinstance(node, dict):
continue
class_type = node.get("class_type", "")
inputs = node.get("inputs", {})
if class_type == "LoadImage":
inputs["image"] = image_name
elif class_type in {"CLIPTextEncode", "TextEncode"} and "text" in inputs:
inputs["text"] = prompt
elif class_type == "KSampler":
inputs["denoise"] = denoise
inputs["seed"] = seed
elif "denoise" in inputs:
inputs["denoise"] = denoise
if "seed" in inputs and class_type != "KSampler":
inputs["seed"] = seed
return workflow
@staticmethod
def _queue_prompt(server_url: str, workflow: dict) -> str:
payload = json.dumps({"prompt": workflow}).encode("utf-8")
request = urllib.request.Request(
f"{server_url}/prompt",
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(request, timeout=60) as response:
data = json.loads(response.read().decode("utf-8"))
return data["prompt_id"]
@staticmethod
def _wait_for_output(
server_url: str,
prompt_id: str,
*,
poll_interval: float,
timeout: float = 86400.0,
) -> dict:
deadline = time.time() + timeout
while time.time() < deadline:
request = urllib.request.Request(f"{server_url}/history/{prompt_id}")
with urllib.request.urlopen(request, timeout=30) as response:
history = json.loads(response.read().decode("utf-8"))
if prompt_id in history:
outputs = history[prompt_id].get("outputs") or {}
for node_output in outputs.values():
images = node_output.get("images") or []
if images:
return images[0]
status = history[prompt_id].get("status", {})
if status.get("status_str") == "error":
raise RuntimeError(f"ComfyUI workflow failed: {status}")
time.sleep(poll_interval)
raise TimeoutError(f"ComfyUI prompt {prompt_id} did not finish within {timeout}s")
@staticmethod
def _download_view(server_url: str, image_info: dict) -> bytes:
query = urllib.parse.urlencode(
{
"filename": image_info["filename"],
"subfolder": image_info.get("subfolder", ""),
"type": image_info.get("type", "output"),
}
)
with urllib.request.urlopen(f"{server_url}/view?{query}", timeout=120) as response:
return response.read()
+98
View File
@@ -0,0 +1,98 @@
from __future__ import annotations
from imagepipeline.core.context import ModuleContext
from imagepipeline.core.params import Param
from imagepipeline.modules.base import SubprocessModule
from imagepipeline.modules.registry import register
from imagepipeline.utils.subprocess import run_command
@register
class CompositeModule(SubprocessModule):
name = "composite"
description = (
"Composite multiple inputs. With two inputs: first=background, "
"second=foreground (with alpha)."
)
command_candidates = ("magick", "convert")
@classmethod
def parameters(cls) -> dict[str, Param]:
return {
"mode": Param(
"string",
default="over",
choices=("over", "multiply", "screen"),
help="ImageMagick -compose mode",
),
"output_ext": Param(
"string",
default=".png",
help="Output file extension including dot",
),
"foreground_opacity": Param(
"float",
default=1.0,
help="Foreground layer opacity from 0.0 to 1.0",
),
}
def run(self, ctx: ModuleContext) -> None:
if not ctx.matched_groups:
raise ValueError("composite requires at least one matched input group")
command = self.resolve_command()
mode = ctx.params["mode"]
output_ext = ctx.params["output_ext"]
foreground_opacity = ctx.params["foreground_opacity"]
ctx.output_dir.mkdir(parents=True, exist_ok=True)
total = len(ctx.matched_groups)
for index, group in enumerate(ctx.matched_groups, start=1):
if len(group) < 2:
raise ValueError(
"composite requires at least two input sources per image"
)
background, foreground = group[0], group[1]
self.log_image(ctx, index, total, foreground)
dst = ctx.output_dir / f"{foreground.stem}{output_ext}"
background_args = [
"(",
str(background),
"-define",
"png:color-type=2",
"-type",
"TrueColor",
"-colorspace",
"sRGB",
")",
]
if foreground_opacity < 1.0:
foreground_args = [
"(",
str(foreground),
"-alpha",
"on",
"-channel",
"A",
"-evaluate",
"multiply",
str(foreground_opacity),
"+channel",
")",
]
else:
foreground_args = [str(foreground)]
cmd = [
command,
*background_args,
*foreground_args,
"-compose",
mode,
"-composite",
str(dst),
]
run_command(cmd)
+45
View File
@@ -0,0 +1,45 @@
from __future__ import annotations
from pathlib import Path
from imagepipeline.core.context import ModuleContext
from imagepipeline.modules.base import SubprocessModule
from imagepipeline.modules.registry import register
from imagepipeline.utils.subprocess import run_command
@register
class CropSquareModule(SubprocessModule):
name = "crop_square"
description = "Center-crop images to the largest possible square"
command_candidates = ("magick", "convert")
def run(self, ctx: ModuleContext) -> None:
command = self.resolve_command()
ctx.output_dir.mkdir(parents=True, exist_ok=True)
total = len(ctx.input_paths)
for index, src in enumerate(ctx.input_paths, start=1):
self.log_image(ctx, index, total, src)
dst = ctx.output_dir / src.name
size, left, top = self._center_square_crop(src, command)
run_command(
[
command,
str(src),
"-auto-orient",
"-crop",
f"{size}x{size}+{left}+{top}",
"+repage",
str(dst),
]
)
@staticmethod
def _center_square_crop(src: Path, command: str) -> tuple[int, int, int]:
result = run_command([command, "-format", "%w %h", str(src), "info:"])
width, height = map(int, result.stdout.strip().split())
size = min(width, height)
left = (width - size) // 2
top = (height - size) // 2
return size, left, top
+109
View File
@@ -0,0 +1,109 @@
from __future__ import annotations
from pathlib import Path
from imagepipeline.core.context import ModuleContext
from imagepipeline.core.params import Param
from imagepipeline.modules.base import SubprocessModule
from imagepipeline.modules.registry import register
from imagepipeline.utils.subprocess import run_command
JPEG_EXTENSIONS = {".jpg", ".jpeg"}
PNG_EXTENSIONS = {".png"}
SUPPORTED_EXTENSIONS = JPEG_EXTENSIONS | PNG_EXTENSIONS
def _normalize_format(ext: str) -> str:
ext = ext.lower().lstrip(".")
if ext in {"jpg", "jpeg"}:
return "jpeg"
if ext == "png":
return "png"
raise ValueError(
f"Unsupported output format {ext!r}; darktable_style supports jpeg and png only"
)
def export_conf_options(format_name: str) -> list[str]:
if format_name == "jpeg":
return ["plugins/imageio/format/jpeg/quality=90"]
if format_name == "png":
return [
"plugins/imageio/format/png/bpp=8",
"plugins/imageio/format/png/compression=5",
]
raise ValueError(f"Unsupported format {format_name!r}")
def resolve_export_format(src: Path, out_ext: str) -> tuple[str, list[str]]:
ext = out_ext or src.suffix.lstrip(".")
format_name = _normalize_format(ext)
return format_name, export_conf_options(format_name)
@register
class DarktableStyleModule(SubprocessModule):
name = "darktable_style"
description = "Apply a darktable style via darktable-cli (JPEG/PNG export)"
command_candidates = ("darktable-cli",)
default_timeout = 600.0
supported_input_formats = (".jpg", ".jpeg", ".png")
@classmethod
def parameters(cls) -> dict[str, Param]:
return {
"style": Param(
"string",
required=True,
help='darktable style name (e.g. "Watermark F12.rocks")',
),
"style_overwrite": Param(
"bool",
default=True,
help="Pass --style-overwrite to darktable-cli",
),
"out_ext": Param(
"string",
default="",
help="Output format (jpeg or png); empty keeps input format",
),
"config_dir": Param(
"path",
default=Path.home() / ".config" / "darktable",
help="darktable config directory (required when applying styles)",
),
}
def run(self, ctx: ModuleContext) -> None:
ctx.output_dir.mkdir(parents=True, exist_ok=True)
style = ctx.params["style"]
style_overwrite = ctx.params["style_overwrite"]
out_ext = ctx.params["out_ext"]
config_dir = Path(ctx.params["config_dir"])
total = len(ctx.input_paths)
for index, src in enumerate(ctx.input_paths, start=1):
if src.suffix.lower() not in SUPPORTED_EXTENSIONS:
raise ValueError(
f"Unsupported input format {src.suffix!r} for {src.name}; "
"darktable_style supports .jpg, .jpeg, and .png only"
)
self.log_image(ctx, index, total, src)
format_name, conf_options = resolve_export_format(src, out_ext)
cmd = [
"darktable-cli",
str(src),
str(ctx.output_dir),
"--style",
style,
"--out-ext",
format_name,
"--core",
"--configdir",
str(config_dir),
]
for conf in conf_options:
cmd.extend(["--conf", conf])
if style_overwrite:
cmd.append("--style-overwrite")
run_command(cmd, timeout=self.default_timeout)
+36
View File
@@ -0,0 +1,36 @@
from __future__ import annotations
from imagepipeline.core.context import ModuleContext
from imagepipeline.core.params import Param
from imagepipeline.modules.base import SubprocessModule
from imagepipeline.modules.registry import register
from imagepipeline.utils.subprocess import run_command
@register
class GmicModule(SubprocessModule):
name = "gmic"
description = "Run a G'MIC command on each input image"
command_candidates = ("gmic",)
default_timeout = 300.0
@classmethod
def parameters(cls) -> dict[str, Param]:
return {
"command": Param(
"string",
required=True,
help="G'MIC command(s) applied before -output (e.g. '-fx_drop_shadow3d ...')",
),
}
def run(self, ctx: ModuleContext) -> None:
ctx.output_dir.mkdir(parents=True, exist_ok=True)
gmic_command = ctx.params["command"]
total = len(ctx.input_paths)
for index, src in enumerate(ctx.input_paths, start=1):
self.log_image(ctx, index, total, src)
dst = ctx.output_dir / src.name
cmd = ["gmic", str(src), gmic_command, "-output", str(dst)]
run_command(cmd, timeout=self.default_timeout)
+36
View File
@@ -0,0 +1,36 @@
from __future__ import annotations
from imagepipeline.core.context import ModuleContext
from imagepipeline.core.params import Param
from imagepipeline.modules.base import SubprocessModule
from imagepipeline.modules.registry import register
from imagepipeline.utils.subprocess import run_command
@register
class GmicGrayscale(SubprocessModule):
name = "gmic_grayscale"
description = "Convert images to grayscale using G'MIC"
command_candidates = ("gmic",)
default_timeout = 300.0
@classmethod
def parameters(cls) -> dict[str, Param]:
return {
"command": Param(
"string",
default="-to_gray",
help="G'MIC command(s) applied before -output",
),
}
def run(self, ctx: ModuleContext) -> None:
ctx.output_dir.mkdir(parents=True, exist_ok=True)
gmic_command = ctx.params["command"]
total = len(ctx.input_paths)
for index, src in enumerate(ctx.input_paths, start=1):
self.log_image(ctx, index, total, src)
dst = ctx.output_dir / src.name
cmd = ["gmic", str(src), gmic_command, "-output", str(dst)]
run_command(cmd, timeout=self.default_timeout)
+117
View File
@@ -0,0 +1,117 @@
from __future__ import annotations
from pathlib import Path
from imagepipeline.core.context import ModuleContext
from imagepipeline.core.params import Param
from imagepipeline.modules.base import SubprocessModule
from imagepipeline.modules.registry import register
from imagepipeline.utils.subprocess import run_command
def normalize_color(color: str) -> str:
color = color.strip()
if not color:
raise ValueError("Color must not be empty")
if color.startswith("#"):
return color
if color.replace(".", "", 1).isdigit():
return color
return f"#{color}"
def build_fill_arguments(
width: int,
height: int,
*,
color1: str,
color2: str | None,
gradient: bool,
radial: bool,
angle: float | None,
) -> list[str]:
c1 = normalize_color(color1)
size = f"{width}x{height}"
if not gradient:
return ["-size", size, f"xc:{c1}"]
c2 = normalize_color(color2 or color1)
if radial:
return ["-size", size, f"radial-gradient:{c1}-{c2}"]
args = ["-size", size]
if angle is not None:
args.extend(["-define", f"gradient:angle={angle}"])
args.append(f"gradient:{c1}-{c2}")
return args
@register
class ImageMagickFillModule(SubprocessModule):
name = "imagemagick_fill"
description = (
"Create solid-color or gradient images sized to match each input image"
)
command_candidates = ("magick", "convert")
@classmethod
def parameters(cls) -> dict[str, Param]:
return {
"color1": Param(
"string",
required=True,
help="Primary color (hex, e.g. #d7fd00, or ImageMagick color name)",
),
"color2": Param(
"string",
default="",
help="Second gradient color; ignored for solid fills",
),
"gradient": Param(
"bool",
default=False,
help="Create a gradient instead of a solid fill",
),
"radial": Param(
"bool",
default=False,
help="Use a radial gradient (linear when false)",
),
"angle": Param(
"float",
default=None,
help="Linear gradient angle in degrees (ignored for radial/solid)",
),
}
def run(self, ctx: ModuleContext) -> None:
command = self.resolve_command()
color1 = ctx.params["color1"]
color2 = ctx.params["color2"] or None
gradient = ctx.params["gradient"]
radial = ctx.params["radial"]
angle = ctx.params["angle"]
ctx.output_dir.mkdir(parents=True, exist_ok=True)
total = len(ctx.input_paths)
for index, src in enumerate(ctx.input_paths, start=1):
self.log_image(ctx, index, total, src)
width, height = self._image_size(command, src)
fill_args = build_fill_arguments(
width,
height,
color1=color1,
color2=color2,
gradient=gradient,
radial=radial,
angle=angle,
)
dst = ctx.output_dir / src.name
run_command([command, *fill_args, str(dst)])
@staticmethod
def _image_size(command: str, src: Path) -> tuple[int, int]:
result = run_command([command, "-format", "%w %h", str(src), "info:"])
width, height = map(int, result.stdout.strip().split())
return width, height
@@ -0,0 +1,41 @@
from __future__ import annotations
from pathlib import Path
from imagepipeline.core.context import ModuleContext
from imagepipeline.core.params import Param
from imagepipeline.modules.base import SubprocessModule
from imagepipeline.modules.registry import register
from imagepipeline.utils.subprocess import run_command
@register
class ImageMagickGrayscale(SubprocessModule):
name = "imagemagick_grayscale"
description = "Convert images to grayscale using ImageMagick"
command_candidates = ("magick", "convert")
@classmethod
def parameters(cls) -> dict[str, Param]:
return {
"colorspace": Param(
"string",
default="Gray",
help="ImageMagick -colorspace value",
),
}
def run(self, ctx: ModuleContext) -> None:
command = self.resolve_command()
colorspace = ctx.params["colorspace"]
ctx.output_dir.mkdir(parents=True, exist_ok=True)
total = len(ctx.input_paths)
for index, src in enumerate(ctx.input_paths, start=1):
self.log_image(ctx, index, total, src)
dst = ctx.output_dir / src.name
if command == "magick":
cmd = [command, str(src), "-colorspace", colorspace, str(dst)]
else:
cmd = [command, str(src), "-colorspace", colorspace, str(dst)]
run_command(cmd)
@@ -0,0 +1,61 @@
from __future__ import annotations
from pathlib import Path
from imagepipeline.core.context import ModuleContext
from imagepipeline.core.params import Param
from imagepipeline.modules.base import SubprocessModule
from imagepipeline.modules.registry import register
from imagepipeline.utils.subprocess import run_command
@register
class ImageMagickScaleCropModule(SubprocessModule):
name = "imagemagick_scale_crop"
description = (
"Scale an image then center-crop back to its original dimensions"
)
command_candidates = ("magick", "convert")
@classmethod
def parameters(cls) -> dict[str, Param]:
return {
"scale": Param(
"float",
default=1.05,
help="Uniform scale factor before center crop (1.05 = 5% larger)",
),
}
def run(self, ctx: ModuleContext) -> None:
command = self.resolve_command()
scale = ctx.params["scale"]
ctx.output_dir.mkdir(parents=True, exist_ok=True)
total = len(ctx.input_paths)
for index, src in enumerate(ctx.input_paths, start=1):
self.log_image(ctx, index, total, src)
width, height = self._image_size(command, src)
dst = ctx.output_dir / src.name
scaled_w = max(1, round(width * scale))
scaled_h = max(1, round(height * scale))
run_command(
[
command,
str(src),
"-resize",
f"{scaled_w}x{scaled_h}!",
"-gravity",
"Center",
"-crop",
f"{width}x{height}+0+0",
"+repage",
str(dst),
]
)
@staticmethod
def _image_size(command: str, src: Path) -> tuple[int, int]:
result = run_command([command, "-format", "%w %h", str(src), "info:"])
width, height = map(int, result.stdout.strip().split())
return width, height
+150
View File
@@ -0,0 +1,150 @@
from __future__ import annotations
import base64
import io
import json
import os
import urllib.error
import urllib.request
from pathlib import Path
from imagepipeline.ai.imaging import load_pil_rgb
from imagepipeline.core.context import ModuleContext
from imagepipeline.core.exceptions import DependencyError
from imagepipeline.core.params import Param
from imagepipeline.modules.ai_base import AIModule
from imagepipeline.modules.registry import register
OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions"
@register
class OpenRouterEditModule(AIModule):
name = "openrouter_edit"
description = "Generative image editing via OpenRouter (cloud)"
@classmethod
def parameters(cls) -> dict[str, Param]:
params = dict(super().parameters())
params.update(
{
"prompt": Param(
"string",
required=True,
help="Edit instruction for the model",
),
"model": Param(
"string",
default="black-forest-labs/flux.2-klein-4b",
help="OpenRouter model id with image output",
),
"strength": Param(
"float",
default=0.3,
help="Edit strength (image_config.strength where supported)",
),
"api_key_env": Param(
"string",
default="OPENROUTER_API_KEY",
help="Environment variable containing the API key",
),
}
)
return params
@classmethod
def check_dependencies(cls) -> None:
if not os.environ.get("OPENROUTER_API_KEY"):
raise DependencyError(
"OPENROUTER_API_KEY environment variable is not set"
)
def run(self, ctx: ModuleContext) -> None:
api_key_env = ctx.params["api_key_env"]
api_key = os.environ.get(api_key_env)
if not api_key:
raise DependencyError(f"Environment variable {api_key_env!r} is not set")
prompt = ctx.params["prompt"]
model = ctx.params["model"]
strength = ctx.params["strength"]
def process(src: Path, dst: Path, index: int, total: int) -> None:
image = load_pil_rgb(src)
megapixels = (image.size[0] * image.size[1]) / 1_000_000
if ctx.logger is not None:
ctx.logger.info(
f" OpenRouter request [{index}/{total}]: model={model!r}, "
f"~{megapixels:.1f} MP (cost varies by model)"
)
payload = self._build_payload(image, prompt, model, strength)
response = self._post(api_key, payload)
result_bytes = self._extract_image_bytes(response)
dst.parent.mkdir(parents=True, exist_ok=True)
dst.write_bytes(result_bytes)
self.iter_input_images(ctx, process)
@staticmethod
def _build_payload(image, prompt: str, model: str, strength: float) -> dict:
buffer = io.BytesIO()
image.save(buffer, format="JPEG", quality=92)
encoded = base64.b64encode(buffer.getvalue()).decode("ascii")
data_url = f"data:image/jpeg;base64,{encoded}"
payload = {
"model": model,
"modalities": ["image"],
"messages": [
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{"type": "image_url", "image_url": {"url": data_url}},
],
}
],
}
if strength is not None:
payload["image_config"] = {"strength": strength}
return payload
@staticmethod
def _post(api_key: str, payload: dict) -> dict:
body = json.dumps(payload).encode("utf-8")
request = urllib.request.Request(
OPENROUTER_URL,
data=body,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"HTTP-Referer": "https://github.com/froxxxy/imagepipeline",
"X-Title": "imagepipeline",
},
method="POST",
)
try:
with urllib.request.urlopen(request, timeout=600) as response:
return json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")
raise RuntimeError(
f"OpenRouter API error ({exc.code}): {detail}"
) from exc
@staticmethod
def _extract_image_bytes(response: dict) -> bytes:
choices = response.get("choices") or []
if not choices:
raise RuntimeError("OpenRouter response contained no choices")
message = choices[0].get("message") or {}
images = message.get("images") or []
if not images:
raise RuntimeError("OpenRouter response contained no images")
url = images[0].get("image_url", {}).get("url", "")
if url.startswith("data:"):
_, encoded = url.split(",", 1)
return base64.b64decode(encoded)
if url.startswith("http://") or url.startswith("https://"):
with urllib.request.urlopen(url, timeout=120) as image_response:
return image_response.read()
raise RuntimeError(f"Unsupported image URL in OpenRouter response: {url!r}")
+40
View File
@@ -0,0 +1,40 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from imagepipeline.modules.base import BaseModule
_REGISTRY: dict[str, type[BaseModule]] = {}
def register(cls: type[BaseModule]) -> type[BaseModule]:
if not getattr(cls, "name", None):
raise ValueError(f"Module {cls.__name__} must define 'name'")
existing = _REGISTRY.get(cls.name)
if existing is not None:
if existing is cls:
return cls
raise ValueError(f"Module name already registered: {cls.name}")
_REGISTRY[cls.name] = cls
return cls
def unregister(name: str) -> None:
_REGISTRY.pop(name, None)
def get_module(name: str) -> type[BaseModule]:
if name not in _REGISTRY:
available = ", ".join(sorted(_REGISTRY)) or "(none)"
raise KeyError(f"Unknown module '{name}'. Available: {available}")
return _REGISTRY[name]
def list_modules() -> list[str]:
return sorted(_REGISTRY)
def clear_registry() -> None:
"""Clear registry — intended for tests only."""
_REGISTRY.clear()
+46
View File
@@ -0,0 +1,46 @@
from __future__ import annotations
from imagepipeline.core.context import ModuleContext
from imagepipeline.core.params import Param
from imagepipeline.modules.base import SubprocessModule
from imagepipeline.modules.registry import register
from imagepipeline.utils.subprocess import run_command
@register
class RembgModule(SubprocessModule):
name = "rembg"
description = "Remove image background using rembg"
command_candidates = ("rembg",)
default_timeout = 600.0
supported_input_formats = (".jpg", ".jpeg", ".png", ".webp", ".tif", ".tiff", ".bmp")
@classmethod
def parameters(cls) -> dict[str, Param]:
return {
"model": Param(
"string",
default="birefnet-general",
help="rembg model name (-m)",
),
"alpha_matting": Param(
"bool",
default=True,
help="Enable alpha matting (-a)",
),
}
def run(self, ctx: ModuleContext) -> None:
ctx.output_dir.mkdir(parents=True, exist_ok=True)
model = ctx.params["model"]
alpha_matting = ctx.params["alpha_matting"]
total = len(ctx.input_paths)
for index, src in enumerate(ctx.input_paths, start=1):
self.log_image(ctx, index, total, src)
dst = ctx.output_dir / f"{src.stem}.png"
cmd = ["rembg", "i", "-m", model]
if alpha_matting:
cmd.append("-a")
cmd.extend([str(src), str(dst)])
run_command(cmd, timeout=self.default_timeout)
+1
View File
@@ -0,0 +1 @@
"""Shared utilities."""
+65
View File
@@ -0,0 +1,65 @@
from __future__ import annotations
from pathlib import Path
IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".tif", ".tiff", ".webp", ".bmp", ".gif"}
def is_image(path: Path) -> bool:
return path.is_file() and path.suffix.lower() in IMAGE_EXTENSIONS
def list_images(directory: Path) -> list[Path]:
if not directory.is_dir():
raise FileNotFoundError(f"Input directory not found: {directory}")
images = sorted(p for p in directory.iterdir() if is_image(p))
if not images:
raise ValueError(f"No image files found in {directory}")
return images
def stem_key(path: Path) -> str:
return path.stem.lower()
def match_by_stem(sources: list[list[Path]]) -> list[list[Path]]:
"""Match image paths across multiple source lists by filename stem."""
if not sources:
return []
if len(sources) == 1:
return [[p] for p in sources[0]]
key_maps: list[dict[str, Path]] = []
for source in sources:
mapping: dict[str, Path] = {}
for path in source:
key = stem_key(path)
if key in mapping:
raise ValueError(
f"Duplicate stem '{key}' in {path.parent}: "
f"{mapping[key].name} and {path.name}"
)
mapping[key] = path
key_maps.append(mapping)
base_keys = set(key_maps[0])
for idx, mapping in enumerate(key_maps[1:], start=2):
keys = set(mapping)
missing = base_keys - keys
extra = keys - base_keys
if missing or extra:
parts = [f"source 1 has {len(base_keys)} image(s)"]
if missing:
sample = ", ".join(sorted(missing)[:5])
parts.append(f"source {idx} missing: {sample}")
if extra:
sample = ", ".join(sorted(extra)[:5])
parts.append(f"source {idx} extra: {sample}")
raise ValueError("Cannot match images by stem: " + "; ".join(parts))
return [[mapping[key] for mapping in key_maps] for key in sorted(base_keys)]
def flatten_matched(groups: list[list[Path]]) -> list[Path]:
"""For single-input steps, return the first path from each group."""
return [group[0] for group in groups]
+47
View File
@@ -0,0 +1,47 @@
from __future__ import annotations
import shutil
import subprocess
from pathlib import Path
from imagepipeline.core.exceptions import DependencyError
def command_exists(name: str) -> bool:
return shutil.which(name) is not None
def run_command(
cmd: list[str],
*,
timeout: float | None = None,
cwd: Path | None = None,
) -> subprocess.CompletedProcess[str]:
try:
return subprocess.run(
cmd,
check=True,
capture_output=True,
text=True,
timeout=timeout,
cwd=str(cwd) if cwd else None,
)
except subprocess.CalledProcessError as exc:
stderr = (exc.stderr or "").strip()
stdout = (exc.stdout or "").strip()
detail = stderr or stdout or str(exc)
raise RuntimeError(
f"Command failed ({exc.returncode}): {' '.join(cmd)}\n{detail}"
) from exc
except subprocess.TimeoutExpired as exc:
raise RuntimeError(
f"Command timed out after {timeout}s: {' '.join(cmd)}"
) from exc
def require_command(*names: str) -> str:
for name in names:
if command_exists(name):
return name
options = " or ".join(names)
raise DependencyError(f"Required command not found on PATH: {options}")
+74
View File
@@ -0,0 +1,74 @@
#!/usr/bin/env python3
"""Example pipeline: OpenRouter photo enhancement (Darktable export → cloud edit)."""
import os
from pathlib import Path
from imagepipeline import Pipeline
REPO_ROOT = Path(__file__).resolve().parents[1]
ENV_FILE = REPO_ROOT / ".env"
# Change this to your Darktable export folder.
INPUT = Path("/home/frank/tmp/aiedittest")
# Optional: where run folders are created (defaults to current working directory).
OUTPUT_BASE = Path.home() / "pipeline_output"
OPENROUTER_PROMPT = (
"Subtle photo enhancement only. Improve exposure, color, and skin tones. "
"Sharpen where the blur seems to be unintended."
"Keep the same people, poses, background, and composition. "
"Natural, realistic look — no oversaturation or plastic skin."
"Do not change image ratio, do not crop or enlarge the image, just improve the photo."
"In group photos, improve the photo of the group, not the individual photos."
"Also in group photos make faces same brightness and remove any glare or reflections."
"Think of it like an amateur developed the RAW file in Lightroom and you are the senior who makes final touches to make it look professional."
)
# Test cheap with Klein; switch to flux.2-pro or flux.2-max for final exports.
OPENROUTER_MODEL = "x-ai/grok-imagine-image-quality"
def _load_env_file(path: Path) -> None:
if not path.is_file():
return
for raw_line in path.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if not line or line.startswith("#"):
continue
key, sep, value = line.partition("=")
if not sep:
continue
key = key.strip()
value = value.strip()
if len(value) >= 2 and value[0] == value[-1] and value[0] in "\"'":
value = value[1:-1]
os.environ.setdefault(key, value)
def main() -> None:
_load_env_file(ENV_FILE)
with Pipeline(
name="example_ai",
input_dir=INPUT,
output_base=OUTPUT_BASE,
) as p:
p.step(
"openrouter_edit",
inputs="input",
prompt=OPENROUTER_PROMPT,
model=OPENROUTER_MODEL,
max_edge=2048,
)
# Local CPU fallback (usually worse on already-graded Darktable exports):
# exp = p.step("ai_exposure", inputs="input", max_edge=2048, strength=0.5)
# p.step("ai_tone_map", inputs=exp, strength=0.5)
output_root = p.run()
print(f"Pipeline finished. Output: {output_root}")
if __name__ == "__main__":
main()
+30
View File
@@ -0,0 +1,30 @@
#!/usr/bin/env python3
"""Example pipeline: convert all exported images to grayscale."""
from pathlib import Path
from imagepipeline import Pipeline
# Change this to your Darktable export folder.
INPUT = Path("/path/to/darktable/export")
# Optional: where run folders are created (defaults to current working directory).
OUTPUT_BASE = Path.home() / "pipeline_output"
def main() -> None:
with Pipeline(
name="example_grayscale",
input_dir=INPUT,
output_base=OUTPUT_BASE,
) as p:
gray = p.step("imagemagick_grayscale", inputs="input")
# Chain another step on the result:
# p.step("imagemagick_grayscale", inputs=gray, colorspace="Gray")
output_root = p.run()
print(f"Pipeline finished. Output: {output_root}")
if __name__ == "__main__":
main()
+121
View File
@@ -0,0 +1,121 @@
#!/usr/bin/env python3
"""Baxxter pipeline: rembg variants composited over backgrounds and originals."""
from pathlib import Path
from imagepipeline import Pipeline
INPUT = Path("/home/frank/pics/20260525_Shooting Baxxter Boys/darktable_exported")
OUTPUT_BASE = Path.home() / "pipeline_output"
GRADIENT_COLOR1 = "#d7fd00ff"
GRADIENT_COLOR2 = "#fc0adeff"
GMIC_STEREO = "-gcd_stereo_img 0,0,2.028,1,1.714,3.06,4,1,0"
GMIC_DROP_SHADOW = "-fx_drop_shadow3d 0,0,0,10,1,1,2,0.5,252,10,222,200,0"
GMIC_BWRECOLOR = (
"-fx_bwrecolorize 0,0,0,0,0,1,0,2,252,10,222,255,215,253,0,255,"
"158,137,189,255,224,191,228,255,215,253,0,255,255,255,255,255,255,255,"
"255,255,215,253,0,255"
)
GMIC_GRADIENT_A = (
'-fx_custom_gradient 0,0,0,1,2,1,0,128,100,100,2,0,1,0,"",1,0,215,253,0,255,'
"252,10,222,255,255,255,0,255,255,255,255,255,0,255,255,255,0,255,0,255,0,0,"
"255,255,128,128,128,255,255,0,255,255,0,0,0,0"
)
GMIC_GRADIENT_B = (
'-fx_custom_gradient 0,0,0,1,2,1,0,128,100,100,2,0,1,0,"",1,0,252,10,222,255,'
"215,253,0,255,255,255,0,255,255,255,255,255,0,255,255,255,0,255,0,255,0,0,"
"255,255,128,128,128,255,255,0,255,255,0,0,0,0"
)
GMIC_JPR_SMOOTH = "-jpr_gradient_smooth 0,1.5"
def main() -> None:
with Pipeline(
name="baxxter",
input_dir=INPUT,
output_base=OUTPUT_BASE,
) as p:
rembg_out = p.step("rembg", inputs="input")
white_bg = p.step("imagemagick_fill", inputs="input", color1="#ffffff")
black_bg = p.step("imagemagick_fill", inputs="input", color1="#000000")
gradient_45_bg = p.step(
"imagemagick_fill",
inputs="input",
color1=GRADIENT_COLOR1,
color2=GRADIENT_COLOR2,
gradient=True,
angle=45,
)
gradient_radial_bg = p.step(
"imagemagick_fill",
inputs="input",
color1=GRADIENT_COLOR1,
color2=GRADIENT_COLOR2,
gradient=True,
radial=True,
)
grayscale = p.step("gmic_grayscale", inputs="input")
rembg_stereo = p.step("gmic", inputs=rembg_out, command=GMIC_STEREO)
rembg_shadow = p.step("gmic", inputs=rembg_out, command=GMIC_DROP_SHADOW)
rembg_bwrecolor = p.step("gmic", inputs=rembg_out, command=GMIC_BWRECOLOR)
rembg_gradient_a = p.step("gmic", inputs=rembg_out, command=GMIC_GRADIENT_A)
rembg_gradient_b = p.step("gmic", inputs=rembg_out, command=GMIC_GRADIENT_B)
rembg_jpr_smooth = p.step("gmic", inputs=rembg_out, command=GMIC_JPR_SMOOTH)
rembg_jpr_smooth_sized = p.step(
"imagemagick_scale_crop",
inputs=rembg_jpr_smooth,
scale=1.05,
)
# combine: white background, rembg
p.step("composite", inputs=[white_bg, rembg_out])
# combine: black background, rembg
p.step("composite", inputs=[black_bg, rembg_out])
# combine: linear gradient background, rembg
p.step("composite", inputs=[gradient_45_bg, rembg_out])
# combine: radial gradient background, rembg
p.step("composite", inputs=[gradient_radial_bg, rembg_out])
# combine: original, rembg (stereo), rembg
stereo_mid = p.step("composite", inputs=["input", rembg_stereo])
p.step("composite", inputs=[stereo_mid, rembg_out])
# combine: original, rembg (drop shadow), rembg
shadow_mid = p.step("composite", inputs=["input", rembg_shadow])
p.step("composite", inputs=[shadow_mid, rembg_out])
# combine: original, rembg (bw recolorize @ 50%), rembg
bw_mid = p.step(
"composite",
inputs=["input", rembg_bwrecolor],
foreground_opacity=0.5,
)
p.step("composite", inputs=[bw_mid, rembg_out])
# combine: rembg (custom gradient A), rembg
p.step("composite", inputs=[rembg_gradient_a, rembg_out])
# combine: rembg (custom gradient B), rembg
p.step("composite", inputs=[rembg_gradient_b, rembg_out])
# combine: original, rembg (jpr smooth, scaled), rembg
smooth_mid = p.step("composite", inputs=["input", rembg_jpr_smooth_sized])
p.step("composite", inputs=[smooth_mid, rembg_out])
# combine: original (grayscale), rembg
p.step("composite", inputs=[grayscale, rembg_out])
output_root = p.run()
print(f"Pipeline finished. Output: {output_root}")
if __name__ == "__main__":
main()
@@ -0,0 +1,34 @@
#!/usr/bin/env python3
"""Watermark pipeline: rembg + grayscale composite + darktable style."""
from pathlib import Path
from imagepipeline import Pipeline
# Darktable export folder.
INPUT = Path("/home/frank/pics/20260517_Albershausen Crusaders - Biberach Beavers/darktable_exported/png")
# Where timestamped run folders are created.
OUTPUT_BASE = Path.home() / "pipeline_output"
# darktable style name (must exist in ~/.config/darktable/styles/).
STYLE = "Watermark F12.rocks"
def main() -> None:
with Pipeline(
name="pipeline_watermark_f12",
input_dir=INPUT,
output_base=OUTPUT_BASE,
) as p:
rembg_out = p.step("rembg", inputs="input")
grayscale = p.step("gmic_grayscale", inputs="input")
combined = p.step("composite", inputs=[grayscale, rembg_out])
p.step("darktable_style", inputs=combined, style=STYLE)
output_root = p.run()
print(f"Pipeline finished. Output: {output_root}")
if __name__ == "__main__":
main()
+27
View File
@@ -0,0 +1,27 @@
[build-system]
requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "imagepipeline"
version = "0.1.0"
description = "Modular image processing pipeline framework"
readme = "README.md"
requires-python = ">=3.11"
license = { text = "MIT" }
authors = [{ name = "Frank" }]
dependencies = []
[project.optional-dependencies]
ai = ["numpy>=1.26", "Pillow>=10.0", "torch>=2.0"]
dev = ["pytest>=8.0"]
[project.scripts]
imagepipeline = "imagepipeline.cli:main"
[tool.setuptools.packages.find]
where = ["."]
include = ["imagepipeline*"]
[tool.pytest.ini_options]
testpaths = ["tests"]
+1
View File
@@ -0,0 +1 @@
+56
View File
@@ -0,0 +1,56 @@
import struct
import zlib
from pathlib import Path
import pytest
import imagepipeline.modules # noqa: F401
def make_png(
path: Path,
width: int = 2,
height: int = 2,
rgb: tuple[int, int, int] = (200, 100, 50),
) -> None:
"""Write a minimal valid PNG without external dependencies."""
r, g, b = rgb
def chunk(tag: bytes, data: bytes) -> bytes:
crc = zlib.crc32(tag + data) & 0xFFFFFFFF
return struct.pack(">I", len(data)) + tag + data + struct.pack(">I", crc)
raw = b"".join(
b"\x00" + bytes([r, g, b] * width)
for _ in range(height)
)
compressed = zlib.compress(raw, 9)
ihdr = struct.pack(">IIBBBBB", width, height, 8, 2, 0, 0, 0)
png = (
b"\x89PNG\r\n\x1a\n"
+ chunk(b"IHDR", ihdr)
+ chunk(b"IDAT", compressed)
+ chunk(b"IEND", b"")
)
path.write_bytes(png)
@pytest.fixture(autouse=True)
def _ensure_builtin_modules() -> None:
import imagepipeline.modules # noqa: F401
@pytest.fixture
def input_dir(tmp_path: Path) -> Path:
directory = tmp_path / "input"
directory.mkdir()
make_png(directory / "photo_a.png")
make_png(directory / "photo_b.png", rgb=(10, 20, 30))
return directory
@pytest.fixture
def output_base(tmp_path: Path) -> Path:
base = tmp_path / "output"
base.mkdir()
return base
+222
View File
@@ -0,0 +1,222 @@
from __future__ import annotations
import shutil
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from imagepipeline.core.context import ModuleContext
from imagepipeline.core.exceptions import DependencyError
from imagepipeline.modules.ai_exposure import AIExposureModule
from imagepipeline.modules.ai_tone_map import AIToneMapModule
from imagepipeline.modules.comfy_flux_edit import ComfyFluxEditModule
from imagepipeline.modules.openrouter_edit import OpenRouterEditModule
from imagepipeline.modules.registry import get_module, list_modules
from tests.conftest import make_png
try:
import numpy # noqa: F401
has_numpy = True
except ImportError:
has_numpy = False
try:
import torch # noqa: F401
has_torch = True
except ImportError:
has_torch = False
has_magick = bool(shutil.which("magick") or shutil.which("convert"))
class TestAIModuleRegistration:
def test_ai_modules_registered(self) -> None:
names = list_modules()
for name in (
"ai_exposure",
"ai_tone_map",
"openrouter_edit",
"comfy_flux_edit",
):
assert name in names
def test_get_ai_modules(self) -> None:
assert get_module("ai_exposure") is AIExposureModule
assert get_module("ai_tone_map") is AIToneMapModule
assert get_module("openrouter_edit") is OpenRouterEditModule
assert get_module("comfy_flux_edit") is ComfyFluxEditModule
class TestAIParameters:
def test_ai_exposure_defaults(self) -> None:
params = AIExposureModule.validate_module_params({})
assert params["skip_existing"] is True
assert params["max_edge"] == 2048
assert params["device"] == "cpu"
assert params["strength"] == 1.0
def test_ai_tone_map_defaults(self) -> None:
params = AIToneMapModule.validate_module_params({})
assert params["checkpoint"] == ""
assert params["strength"] == 1.0
assert params["net_input_size"] == 256
def test_openrouter_requires_prompt(self) -> None:
with pytest.raises(ValueError, match="required"):
OpenRouterEditModule.validate_module_params({})
def test_openrouter_defaults(self) -> None:
params = OpenRouterEditModule.validate_module_params({"prompt": "brighten shadows"})
assert params["model"] == "black-forest-labs/flux.2-klein-4b"
assert params["strength"] == 0.3
assert params["api_key_env"] == "OPENROUTER_API_KEY"
def test_comfy_requires_prompt(self) -> None:
with pytest.raises(ValueError, match="required"):
ComfyFluxEditModule.validate_module_params({})
class TestAIToneMapFallback:
@pytest.mark.skipif(not has_numpy, reason="numpy not installed")
def test_clahe_fallback_writes_output(self, tmp_path: Path) -> None:
src = tmp_path / "photo.png"
make_png(src, width=8, height=8, rgb=(40, 80, 120))
output_dir = tmp_path / "out"
output_dir.mkdir()
ctx = ModuleContext(
input_paths=[src],
matched_groups=[],
output_dir=output_dir,
params=AIToneMapModule.validate_module_params({"strength": 1.0}),
pipeline_output_root=tmp_path,
step_id="ai_tone_map_01",
logger=None,
)
AIToneMapModule().run(ctx)
dst = output_dir / "photo.png"
assert dst.is_file()
assert dst.stat().st_size > 0
class TestSkipExisting:
@pytest.mark.skipif(not has_numpy, reason="numpy not installed")
def test_second_run_skips_existing_outputs(self, tmp_path: Path, capsys) -> None:
src = tmp_path / "photo.png"
make_png(src, width=8, height=8)
output_dir = tmp_path / "out"
output_dir.mkdir()
params = AIToneMapModule.validate_module_params({"checkpoint": ""})
from imagepipeline.core.log import PipelineLogger
logger = PipelineLogger(verbose=True)
ctx = ModuleContext(
input_paths=[src],
matched_groups=[],
output_dir=output_dir,
params=params,
pipeline_output_root=tmp_path,
step_id="ai_tone_map_01",
logger=logger,
)
module = AIToneMapModule()
module.run(ctx)
assert (output_dir / "photo.png").is_file()
module.run(ctx)
output = capsys.readouterr().out
assert "Skipped module ai_tone_map" in output
@pytest.mark.skipif(not has_torch, reason="torch not installed")
class TestAIExposure:
def test_ai_exposure_processes_image(self, tmp_path: Path) -> None:
src = tmp_path / "photo.png"
make_png(src, width=4, height=4, rgb=(30, 60, 90))
output_dir = tmp_path / "out"
output_dir.mkdir()
mock_model = MagicMock()
mock_device = MagicMock()
def fake_enhance(_model, image, *, device, strength):
return image
with (
patch.object(AIExposureModule, "_get_model", return_value=(mock_model, mock_device)),
patch(
"imagepipeline.ai.zero_dce.enhance_image",
side_effect=fake_enhance,
),
):
ctx = ModuleContext(
input_paths=[src],
matched_groups=[],
output_dir=output_dir,
params=AIExposureModule.validate_module_params({"max_edge": 0}),
pipeline_output_root=tmp_path,
step_id="ai_exposure_01",
logger=None,
)
AIExposureModule().run(ctx)
assert (output_dir / "photo.png").is_file()
class TestComfyFluxEdit:
def test_server_unreachable_raises(self, tmp_path: Path) -> None:
src = tmp_path / "photo.png"
make_png(src)
output_dir = tmp_path / "out"
output_dir.mkdir()
workflow = tmp_path / "workflow.json"
workflow.write_text('{"1": {"class_type": "LoadImage", "inputs": {"image": "x"}}}')
ctx = ModuleContext(
input_paths=[src],
matched_groups=[],
output_dir=output_dir,
params=ComfyFluxEditModule.validate_module_params(
{
"prompt": "test",
"server_url": "http://127.0.0.1:1",
"workflow_path": workflow,
}
),
pipeline_output_root=tmp_path,
step_id="comfy_flux_edit_01",
logger=None,
)
with pytest.raises(DependencyError, match="not reachable"):
ComfyFluxEditModule().run(ctx)
def test_missing_workflow_raises(self, tmp_path: Path) -> None:
src = tmp_path / "photo.png"
make_png(src)
output_dir = tmp_path / "out"
output_dir.mkdir()
ctx = ModuleContext(
input_paths=[src],
matched_groups=[],
output_dir=output_dir,
params=ComfyFluxEditModule.validate_module_params(
{
"prompt": "test",
"workflow_path": tmp_path / "missing.json",
}
),
pipeline_output_root=tmp_path,
step_id="comfy_flux_edit_01",
logger=None,
)
with pytest.raises(DependencyError, match="workflow not found"):
ComfyFluxEditModule().run(ctx)
class TestOpenRouterDependencies:
def test_missing_api_key_raises(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
with pytest.raises(DependencyError, match="OPENROUTER_API_KEY"):
OpenRouterEditModule.check_dependencies()
+232
View File
@@ -0,0 +1,232 @@
from __future__ import annotations
import json
import shutil
from pathlib import Path
import pytest
from imagepipeline.core.exceptions import CycleError, ValidationError
from imagepipeline.core.manifest import PipelineManifest, write_manifest
from imagepipeline.core.params import Param, validate_params
from imagepipeline.core.pipeline import Pipeline
from imagepipeline.core.runner import PipelineRunner
from imagepipeline.core.step import StepDefinition
from imagepipeline.modules.base import BaseModule
from imagepipeline.modules.registry import get_module, register, unregister
from imagepipeline.utils.files import list_images, match_by_stem
from tests.conftest import make_png
class TestParams:
def test_defaults_applied(self) -> None:
schema = {"colorspace": Param("string", default="Gray")}
result = validate_params(schema, {})
assert result == {"colorspace": "Gray"}
def test_unknown_param_rejected(self) -> None:
schema = {"a": Param("int", default=1)}
with pytest.raises(ValueError, match="Unknown parameters"):
validate_params(schema, {"b": 2})
def test_choices_enforced(self) -> None:
schema = {"mode": Param("string", choices=("a", "b"))}
with pytest.raises(ValueError, match="must be one of"):
validate_params(schema, {"mode": "c"})
class TestFileMatching:
def test_match_by_stem_pairs_sources(self, tmp_path: Path) -> None:
a = tmp_path / "a"
b = tmp_path / "b"
a.mkdir()
b.mkdir()
make_png(a / "img1.png")
make_png(a / "img2.png")
make_png(b / "img1.png")
make_png(b / "img2.png")
groups = match_by_stem([list_images(a), list_images(b)])
assert len(groups) == 2
assert groups[0][0].parent == a
assert groups[0][1].parent == b
def test_missing_stem_raises(self, tmp_path: Path) -> None:
a = tmp_path / "a"
b = tmp_path / "b"
a.mkdir()
b.mkdir()
make_png(a / "only_a.png")
make_png(b / "only_b.png")
with pytest.raises(ValueError, match="Cannot match images by stem"):
match_by_stem([list_images(a), list_images(b)])
class TestPipelineRunner:
def test_cycle_detection(self, input_dir: Path, output_base: Path) -> None:
@register
class DummyModule(BaseModule):
name = "dummy_cycle"
def run(self, ctx) -> None:
pass
step_a = StepDefinition(
step_id="dummy_cycle_01",
module_name="dummy_cycle",
module=DummyModule,
input_refs=["dummy_cycle_02"],
params={},
output_dir_name="dummy_cycle_01",
)
step_b = StepDefinition(
step_id="dummy_cycle_02",
module_name="dummy_cycle",
module=DummyModule,
input_refs=["dummy_cycle_01"],
params={},
output_dir_name="dummy_cycle_02",
)
runner = PipelineRunner(
name="cycle_test",
input_dir=input_dir,
output_base=output_base,
steps=[step_a, step_b],
)
with pytest.raises(CycleError):
runner.run()
unregister("dummy_cycle")
def test_dag_execution_order(self, input_dir: Path, output_base: Path) -> None:
order: list[str] = []
@register
class OrderModule(BaseModule):
name = "order_tracker"
def run(self, ctx) -> None:
order.append(ctx.step_id)
ctx.output_dir.mkdir(parents=True, exist_ok=True)
for src in ctx.input_paths:
shutil.copy2(src, ctx.output_dir / src.name)
with Pipeline(name="order_test", input_dir=input_dir, output_base=output_base, verbose=False) as p:
step_b = p.step("order_tracker", inputs="input")
step_a = p.step("order_tracker", inputs=step_b)
p.run()
assert order == ["order_tracker_01", "order_tracker_02"]
unregister("order_tracker")
def test_duplicate_module_numbering(self, input_dir: Path, output_base: Path) -> None:
@register
class NumberModule(BaseModule):
name = "number_tracker"
def run(self, ctx) -> None:
ctx.output_dir.mkdir(parents=True, exist_ok=True)
for src in ctx.input_paths:
shutil.copy2(src, ctx.output_dir / src.name)
with Pipeline(name="dup_test", input_dir=input_dir, output_base=output_base, verbose=False) as p:
first = p.step("number_tracker", inputs="input")
p.step("number_tracker", inputs=first)
root = p.run()
assert (root / "number_tracker_01").is_dir()
assert (root / "number_tracker_02").is_dir()
unregister("number_tracker")
class TestManifest:
def test_write_manifest(self, tmp_path: Path) -> None:
manifest = PipelineManifest(
name="test",
output_root=str(tmp_path),
input_dir="/in",
started_at="2026-01-01T00:00:00+00:00",
finished_at="2026-01-01T00:00:01+00:00",
)
path = tmp_path / "pipeline_manifest.json"
write_manifest(path, manifest)
data = json.loads(path.read_text(encoding="utf-8"))
assert data["name"] == "test"
assert data["finished_at"] is not None
class TestPipelineValidation:
def test_empty_pipeline_rejected(self, input_dir: Path) -> None:
with Pipeline(name="empty", input_dir=input_dir, verbose=False) as p:
with pytest.raises(ValidationError, match="no steps"):
p.run()
def test_unknown_module_rejected(self, input_dir: Path) -> None:
with Pipeline(name="bad", input_dir=input_dir, verbose=False) as p:
with pytest.raises(KeyError, match="Unknown module"):
p.step("does_not_exist", inputs="input")
has_imagemagick = bool(shutil.which("magick") or shutil.which("convert"))
class TestPipelineLogging:
@pytest.mark.skipif(not has_imagemagick, reason="ImageMagick not installed")
def test_verbose_output(self, input_dir: Path, output_base: Path, capsys) -> None:
with Pipeline(
name="log_test",
input_dir=input_dir,
output_base=output_base,
verbose=True,
) as p:
p.step("imagemagick_grayscale", inputs="input")
p.run()
output = capsys.readouterr().out
assert "Found 2 photo(s)" in output
assert "Step 1/1: imagemagick_grayscale_01 (imagemagick_grayscale)" in output
assert "Applying module imagemagick_grayscale to image [1/2]" in output
assert "Pipeline finished." in output
@pytest.mark.skipif(not has_imagemagick, reason="ImageMagick not installed")
def test_quiet_output(self, input_dir: Path, output_base: Path, capsys) -> None:
with Pipeline(
name="quiet_test",
input_dir=input_dir,
output_base=output_base,
verbose=False,
) as p:
p.step("imagemagick_grayscale", inputs="input")
p.run()
assert capsys.readouterr().out == ""
@pytest.mark.skipif(not has_imagemagick, reason="ImageMagick not installed")
class TestImageMagickGrayscaleIntegration:
def test_grayscale_pipeline(self, input_dir: Path, output_base: Path) -> None:
with Pipeline(
name="gray_integration",
input_dir=input_dir,
output_base=output_base,
verbose=False,
) as p:
p.step("imagemagick_grayscale", inputs="input")
root = p.run()
manifest_path = root / "pipeline_manifest.json"
assert manifest_path.is_file()
manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
assert len(manifest["steps"]) == 1
assert manifest["steps"][0]["module"] == "imagemagick_grayscale"
out_dir = root / "imagemagick_grayscale_01"
outputs = list_images(out_dir)
assert len(outputs) == 2
module = get_module("imagemagick_grayscale")
module.check_dependencies()
+298
View File
@@ -0,0 +1,298 @@
from __future__ import annotations
import shutil
from pathlib import Path
import pytest
from imagepipeline.core.params import validate_params
from imagepipeline.modules.composite import CompositeModule
from imagepipeline.modules.crop_square import CropSquareModule
from imagepipeline.modules.darktable_style import DarktableStyleModule
from imagepipeline.modules.imagemagick_fill import (
ImageMagickFillModule,
build_fill_arguments,
)
from imagepipeline.modules.imagemagick_grayscale import ImageMagickGrayscale
from imagepipeline.modules.gmic_grayscale import GmicGrayscale
from imagepipeline.modules.registry import get_module, list_modules
from imagepipeline.modules.rembg import RembgModule
has_magick = bool(shutil.which("magick") or shutil.which("convert"))
class TestModuleRegistration:
def test_workflow_modules_registered(self) -> None:
names = list_modules()
for name in (
"rembg",
"gmic_grayscale",
"composite",
"darktable_style",
"imagemagick_grayscale",
"imagemagick_fill",
"crop_square",
):
assert name in names
def test_get_module_returns_class(self) -> None:
assert get_module("rembg") is RembgModule
assert get_module("gmic_grayscale") is GmicGrayscale
assert get_module("composite") is CompositeModule
assert get_module("darktable_style") is DarktableStyleModule
assert get_module("crop_square") is CropSquareModule
assert get_module("imagemagick_fill") is ImageMagickFillModule
class TestImageMagickFill:
def test_solid_fill_arguments(self) -> None:
args = build_fill_arguments(
800,
600,
color1="#d7fd00",
color2=None,
gradient=False,
radial=False,
angle=None,
)
assert args == ["-size", "800x600", "xc:#d7fd00"]
def test_linear_gradient_arguments(self) -> None:
args = build_fill_arguments(
100,
50,
color1="#d7fd00",
color2="#fc0ade",
gradient=True,
radial=False,
angle=45.0,
)
assert args == [
"-size",
"100x50",
"-define",
"gradient:angle=45.0",
"gradient:#d7fd00-#fc0ade",
]
def test_radial_gradient_arguments(self) -> None:
args = build_fill_arguments(
100,
50,
color1="d7fd00",
color2="fc0ade",
gradient=True,
radial=True,
angle=90.0,
)
assert args == ["-size", "100x50", "radial-gradient:#d7fd00-#fc0ade"]
def test_requires_color1(self) -> None:
with pytest.raises(ValueError, match="required"):
ImageMagickFillModule.validate_module_params({})
@pytest.mark.skipif(not has_magick, reason="ImageMagick not installed")
def test_output_matches_input_size(self, tmp_path: Path) -> None:
from imagepipeline.core.context import ModuleContext
from imagepipeline.utils.subprocess import run_command
from tests.conftest import make_png
src = tmp_path / "ref.png"
make_png(src, width=40, height=20)
output_dir = tmp_path / "out"
output_dir.mkdir()
ctx = ModuleContext(
input_paths=[src],
matched_groups=[],
output_dir=output_dir,
params=ImageMagickFillModule.validate_module_params(
{
"color1": "#d7fd00",
"color2": "#fc0ade",
"gradient": True,
"angle": 45,
}
),
pipeline_output_root=tmp_path,
step_id="imagemagick_fill_01",
logger=None,
)
ImageMagickFillModule().run(ctx)
dst = output_dir / "ref.png"
assert dst.is_file()
magick = shutil.which("magick") or shutil.which("convert")
result = run_command([magick, "-format", "%w %h", str(dst), "info:"])
assert result.stdout.strip() == "40 20"
class TestCropSquare:
@pytest.mark.skipif(not has_magick, reason="ImageMagick not installed")
def test_center_crops_to_square(self, tmp_path: Path) -> None:
from imagepipeline.core.context import ModuleContext
from imagepipeline.utils.subprocess import run_command
from tests.conftest import make_png
src = tmp_path / "wide.png"
make_png(src, width=40, height=20)
output_dir = tmp_path / "out"
output_dir.mkdir()
ctx = ModuleContext(
input_paths=[src],
matched_groups=[],
output_dir=output_dir,
params={},
pipeline_output_root=tmp_path,
step_id="crop_square_01",
logger=None,
)
CropSquareModule().run(ctx)
dst = output_dir / "wide.png"
assert dst.is_file()
magick = shutil.which("magick") or shutil.which("convert")
result = run_command(
[magick, "-format", "%w %h", str(dst), "info:"],
)
width, height = map(int, result.stdout.strip().split())
assert width == height == 20
class TestModuleParameters:
def test_darktable_style_requires_style(self) -> None:
with pytest.raises(ValueError, match="required"):
DarktableStyleModule.validate_module_params({})
def test_darktable_style_accepts_style(self) -> None:
params = DarktableStyleModule.validate_module_params(
{"style": "Watermark F12.rocks"}
)
assert params["style"] == "Watermark F12.rocks"
assert params["style_overwrite"] is True
def test_composite_mode_choices(self) -> None:
with pytest.raises(ValueError, match="must be one of"):
CompositeModule.validate_module_params({"mode": "invalid"})
def test_rembg_defaults(self) -> None:
params = RembgModule.validate_module_params({})
assert params["model"] == "birefnet-general"
assert params["alpha_matting"] is True
def test_darktable_export_conf_jpeg(self) -> None:
from imagepipeline.modules.darktable_style import export_conf_options
assert export_conf_options("jpeg") == [
"plugins/imageio/format/jpeg/quality=90"
]
def test_darktable_export_conf_png(self) -> None:
from imagepipeline.modules.darktable_style import export_conf_options
assert export_conf_options("png") == [
"plugins/imageio/format/png/bpp=8",
"plugins/imageio/format/png/compression=5",
]
def test_darktable_resolve_format_from_input(self) -> None:
from imagepipeline.modules.darktable_style import resolve_export_format
format_name, conf = resolve_export_format(Path("photo.jpg"), "")
assert format_name == "jpeg"
assert conf == ["plugins/imageio/format/jpeg/quality=90"]
format_name, conf = resolve_export_format(Path("photo.png"), "")
assert format_name == "png"
assert "plugins/imageio/format/png/compression=5" in conf
def test_darktable_rejects_unsupported_format(self) -> None:
from imagepipeline.modules.darktable_style import resolve_export_format
with pytest.raises(ValueError, match="Unsupported output format"):
resolve_export_format(Path("photo.tiff"), "tiff")
@pytest.mark.skipif(not has_magick, reason="ImageMagick not installed")
class TestCompositeColor:
def test_preserves_color_over_grayscale_background(self, tmp_path: Path) -> None:
from imagepipeline.core.context import ModuleContext
from imagepipeline.utils.subprocess import run_command
from tests.conftest import make_png
src = tmp_path / "src.png"
make_png(src, width=40, height=40, rgb=(200, 50, 50))
background = tmp_path / "bg.png"
foreground = tmp_path / "fg.png"
output_dir = tmp_path / "composite_out"
output_dir.mkdir()
magick = shutil.which("magick") or shutil.which("convert")
run_command([magick, str(src), "-colorspace", "Gray", str(background)])
run_command(
[
magick,
"-size",
"40x40",
"xc:none",
"-fill",
"rgba(255,0,0,0.8)",
"-draw",
"circle 20,20 20,2",
str(foreground),
]
)
module = CompositeModule()
ctx = ModuleContext(
input_paths=[background, foreground],
output_dir=output_dir,
params=CompositeModule.validate_module_params({}),
pipeline_output_root=tmp_path,
step_id="composite_01",
matched_groups=[[background, foreground]],
)
module.run(ctx)
output = output_dir / "fg.png"
assert output.is_file()
result = run_command(
[magick, "identify", "-format", "%[type]", str(output)]
)
assert result.stdout.strip() != "Grayscale"
has_workflow_tools = all(
shutil.which(name)
for name in ("rembg", "gmic", "magick", "darktable-cli")
) or all(shutil.which(name) for name in ("rembg", "gmic", "convert", "darktable-cli"))
@pytest.mark.skipif(not has_workflow_tools, reason="Workflow CLI tools not installed")
class TestWorkflowIntegration:
def test_watermark_pipeline(self, input_dir, output_base) -> None:
from imagepipeline import Pipeline
with Pipeline(
name="workflow_test",
input_dir=input_dir,
output_base=output_base,
verbose=False,
) as p:
rembg_out = p.step("rembg", inputs="input")
grayscale = p.step("gmic_grayscale", inputs="input")
combined = p.step("composite", inputs=[grayscale, rembg_out])
p.step(
"darktable_style",
inputs=combined,
style="Watermark F12.rocks",
)
root = p.run()
assert (root / "rembg_01").is_dir()
assert (root / "gmic_grayscale_01").is_dir()
assert (root / "composite_01").is_dir()
assert (root / "darktable_style_01").is_dir()
assert list((root / "darktable_style_01").iterdir())
+34
View File
@@ -0,0 +1,34 @@
# ComfyUI Flux Klein img2img workflow (API format)
Export a Flux Klein img2img workflow from ComfyUI once and save it as:
```
workflows/comfy/flux_klein_edit_api.json
```
Copy `flux_klein_edit_api.json.example` as a starting skeleton, then replace node IDs and wiring with your exported workflow.
## Export steps
1. Build an img2img workflow in ComfyUI with **Load Image**, prompt encoding, sampler, and **Save Image** nodes.
2. Enable **Dev mode** in ComfyUI settings if needed.
3. Use **Save (API Format)** on the workflow.
4. Save the JSON to `workflows/comfy/flux_klein_edit_api.json`.
## Nodes patched by `comfy_flux_edit`
The module walks workflow nodes by `class_type` and updates:
| class_type | Field | Value |
|------------|-------|-------|
| `LoadImage` | `inputs.image` | Uploaded filename |
| `CLIPTextEncode` / `TextEncode` | `inputs.text` | Step `prompt` |
| `KSampler` | `inputs.denoise`, `inputs.seed` | Step params |
## CPU warning
ComfyUI on CPU can take hours per full-resolution image. Use `max_edge=1024` or lower in the pipeline step for experiments.
## Server
Default URL: `http://127.0.0.1:8188`. Start ComfyUI before running the pipeline step.
@@ -0,0 +1,37 @@
{
"1": {
"class_type": "LoadImage",
"inputs": {
"image": "example.png"
}
},
"2": {
"class_type": "CLIPTextEncode",
"inputs": {
"text": "placeholder prompt",
"clip": ["3", 0]
}
},
"4": {
"class_type": "KSampler",
"inputs": {
"seed": 0,
"steps": 20,
"cfg": 1.0,
"sampler_name": "euler",
"scheduler": "simple",
"denoise": 0.35,
"model": ["5", 0],
"positive": ["2", 0],
"negative": ["6", 0],
"latent_image": ["7", 0]
}
},
"8": {
"class_type": "SaveImage",
"inputs": {
"filename_prefix": "imagepipeline_flux_edit",
"images": ["9", 0]
}
}
}