08a29d124a
- spruce.exif: tag_mosaic_jpeg_for_scan_dir, resolve_machine_label_for_scan_dir; ProcessingSoftware for tile-stitched mosaics - spruce.settings: load_config(require_credentials=False) for config without login - scripts/tag_mosaic_exif.py and tests; stitch script --write-exif path
379 lines
12 KiB
Python
379 lines
12 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Reconstruct a scan mosaic from archived tiles (grid layout matches the scraper).
|
||
|
||
Usage:
|
||
python scripts/stitch_mosaic_from_tiles.py /path/to/scan_dir
|
||
python scripts/stitch_mosaic_from_tiles.py /path/to/scan_dir --no-flip-x # raw col index = left-to-right
|
||
python scripts/stitch_mosaic_from_tiles.py /path/to/scan_dir --flip-y --compare-mosaic
|
||
|
||
By default, columns are mirrored horizontally so the stitched image matches RootView's
|
||
downloaded mosaic.jpg (low ``col_index`` tiles sit on the right in the server preview).
|
||
Default ``--tile-gap`` is 1 (white gutters like the server); use ``--tile-gap 0`` for flush tiles.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import re
|
||
import sys
|
||
from pathlib import Path
|
||
|
||
_REPO_ROOT = Path(__file__).resolve().parent.parent
|
||
if str(_REPO_ROOT) not in sys.path:
|
||
sys.path.insert(0, str(_REPO_ROOT))
|
||
|
||
from PIL import Image, ImageChops
|
||
|
||
from spruce.exif import tag_mosaic_jpeg_for_scan_dir
|
||
from spruce.settings import DEFAULT_CONFIG, load_config
|
||
|
||
TILE_FILENAME_RE = re.compile(r"tile_r(\d+)_c(\d+)\.jpg$", re.IGNORECASE)
|
||
|
||
|
||
def _load_metadata(scan_dir: Path) -> dict:
|
||
path = scan_dir / "metadata.json"
|
||
if not path.is_file():
|
||
raise SystemExit(f"Missing metadata.json: {path}")
|
||
with path.open(encoding="utf-8") as f:
|
||
return json.load(f)
|
||
|
||
|
||
def _index_tiles(tiles_dir: Path) -> dict[tuple[int, int], Path]:
|
||
if not tiles_dir.is_dir():
|
||
raise SystemExit(f"Missing tiles directory: {tiles_dir}")
|
||
by_rc: dict[tuple[int, int], Path] = {}
|
||
for p in tiles_dir.iterdir():
|
||
if not p.is_file():
|
||
continue
|
||
m = TILE_FILENAME_RE.match(p.name)
|
||
if not m:
|
||
continue
|
||
key = (int(m.group(1)), int(m.group(2)))
|
||
if key in by_rc:
|
||
raise SystemExit(f"Duplicate tile index {key}: {by_rc[key]} and {p}")
|
||
by_rc[key] = p
|
||
return by_rc
|
||
|
||
|
||
def _open_tile_rgb(path: Path) -> Image.Image:
|
||
try:
|
||
im = Image.open(path)
|
||
im.load()
|
||
except OSError as exc:
|
||
raise SystemExit(f"Cannot read tile image {path}: {exc}") from exc
|
||
if im.mode in ("RGBA", "LA"):
|
||
bg = Image.new("RGB", im.size, (255, 255, 255))
|
||
bg.paste(im, mask=im.split()[-1])
|
||
return bg
|
||
return im.convert("RGB")
|
||
|
||
|
||
def _paste_tile(
|
||
canvas: Image.Image,
|
||
tile: Image.Image,
|
||
row: int,
|
||
col: int,
|
||
nx: int,
|
||
ny: int,
|
||
tw: int,
|
||
th: int,
|
||
tile_gap: int,
|
||
flip_y: bool,
|
||
flip_x: bool,
|
||
) -> None:
|
||
if flip_x:
|
||
col_x = nx - 1 - col
|
||
else:
|
||
col_x = col
|
||
if flip_y:
|
||
row_y = ny - 1 - row
|
||
else:
|
||
row_y = row
|
||
stride_w = tw + tile_gap
|
||
stride_h = th + tile_gap
|
||
x = col_x * stride_w
|
||
y = row_y * stride_h
|
||
canvas.paste(tile, (x, y))
|
||
|
||
|
||
def stitch_scan(
|
||
scan_dir: Path,
|
||
*,
|
||
flip_y: bool,
|
||
flip_x: bool,
|
||
allow_missing: bool,
|
||
tile_gap: int = 1,
|
||
gap_fill: tuple[int, int, int] = (255, 255, 255),
|
||
) -> tuple[Image.Image, int, int, int, int]:
|
||
meta = _load_metadata(scan_dir)
|
||
nx = int(meta.get("nx") or 0)
|
||
ny = int(meta.get("ny") or 0)
|
||
if nx < 1 or ny < 1:
|
||
raise SystemExit(f"Invalid nx/ny in metadata: nx={nx} ny={ny}")
|
||
if tile_gap < 0:
|
||
raise SystemExit(f"tile_gap must be >= 0, got {tile_gap}")
|
||
|
||
tiles_dir = scan_dir / "tiles"
|
||
by_rc = _index_tiles(tiles_dir)
|
||
expected = nx * ny
|
||
if not by_rc:
|
||
raise SystemExit(f"No tile files matching tile_r*_c*.jpg in {tiles_dir}")
|
||
if len(by_rc) > expected:
|
||
raise SystemExit(
|
||
f"Too many tile files: {len(by_rc)} (expected at most {expected} for nx={nx} ny={ny})"
|
||
)
|
||
missing: list[tuple[int, int]] = []
|
||
for r in range(ny):
|
||
for c in range(nx):
|
||
if (r, c) not in by_rc:
|
||
missing.append((r, c))
|
||
if missing and not allow_missing:
|
||
sample = ", ".join(f"r{a}_c{b}" for a, b in missing[:5])
|
||
more = f" (+{len(missing) - 5} more)" if len(missing) > 5 else ""
|
||
raise SystemExit(
|
||
f"Missing {len(missing)} tile(s): {sample}{more}. "
|
||
f"Use --allow-missing to leave black holes."
|
||
)
|
||
|
||
tw = th = None
|
||
for (r, c), tpath in by_rc.items():
|
||
if not (0 <= r < ny and 0 <= c < nx):
|
||
raise SystemExit(f"Tile index out of range r={r} c={c} (nx={nx} ny={ny}): {tpath}")
|
||
im = _open_tile_rgb(tpath)
|
||
w, h = im.size
|
||
if tw is None:
|
||
tw, th = w, h
|
||
elif (w, h) != (tw, th):
|
||
raise SystemExit(
|
||
f"Non-uniform tile size: {tpath} is {w}x{h}, expected {tw}x{th}"
|
||
)
|
||
|
||
assert tw is not None and th is not None
|
||
bg = gap_fill if tile_gap > 0 else (0, 0, 0)
|
||
cw = nx * tw + (nx - 1) * tile_gap
|
||
ch = ny * th + (ny - 1) * tile_gap
|
||
canvas = Image.new("RGB", (cw, ch), bg)
|
||
|
||
for r in range(ny):
|
||
for c in range(nx):
|
||
tpath = by_rc.get((r, c))
|
||
if tpath is None:
|
||
continue
|
||
tile = _open_tile_rgb(tpath)
|
||
_paste_tile(
|
||
canvas, tile, r, c, nx, ny, tw, th, tile_gap, flip_y, flip_x
|
||
)
|
||
|
||
return canvas, nx, ny, tw, th
|
||
|
||
|
||
def _histogram_mae_max(diff: Image.Image) -> tuple[float, int]:
|
||
"""Mean absolute error (0–255 per channel) and max channel diff."""
|
||
bands = diff.split()
|
||
total_px = diff.width * diff.height
|
||
mae_sum = 0.0
|
||
ch_max = 0
|
||
for band in bands:
|
||
h = band.histogram()
|
||
mae_sum += sum(i * cnt for i, cnt in enumerate(h))
|
||
_mn, mx = band.getextrema()
|
||
ch_max = max(ch_max, mx)
|
||
mae = mae_sum / (total_px * len(bands))
|
||
return mae, ch_max
|
||
|
||
|
||
def _exact_pixel_fraction(diff: Image.Image) -> float:
|
||
"""Fraction of pixels where all channels of `diff` are zero."""
|
||
bands = diff.getbands()
|
||
bpp = len(bands)
|
||
raw = diff.tobytes()
|
||
if bpp == 0 or len(raw) == 0:
|
||
return 1.0
|
||
n = len(raw) // bpp
|
||
exact = sum(
|
||
1
|
||
for i in range(0, len(raw), bpp)
|
||
if all(raw[i + j] == 0 for j in range(bpp))
|
||
)
|
||
return exact / n
|
||
|
||
|
||
def compare_mosaics(
|
||
reconstructed: Image.Image,
|
||
reference_path: Path,
|
||
*,
|
||
fit: bool,
|
||
) -> None:
|
||
if not reference_path.is_file():
|
||
print(f"No reference mosaic at {reference_path}; skipping comparison.", file=sys.stderr)
|
||
return
|
||
ref = _open_tile_rgb(reference_path)
|
||
a = reconstructed
|
||
b = ref
|
||
if a.size != b.size:
|
||
print(
|
||
f"Dimension mismatch: reconstructed {a.size[0]}x{a.size[1]} vs "
|
||
f"reference {b.size[0]}x{b.size[1]}",
|
||
file=sys.stderr,
|
||
)
|
||
if not fit:
|
||
print("Re-run with --fit to resize reference to reconstructed size.", file=sys.stderr)
|
||
return
|
||
b = b.resize(a.size, Image.Resampling.LANCZOS)
|
||
|
||
diff = ImageChops.difference(a, b)
|
||
mae, dmax = _histogram_mae_max(diff)
|
||
frac = _exact_pixel_fraction(diff)
|
||
print(
|
||
f"Compare vs {reference_path.name}: MAE={mae:.4f} max_diff={dmax} "
|
||
f"exact_pixels={frac*100:.4f}%"
|
||
)
|
||
|
||
|
||
def main() -> None:
|
||
parser = argparse.ArgumentParser(
|
||
description="Stitch mosaic from archived tiles using metadata.json grid (nx, ny)."
|
||
)
|
||
parser.add_argument(
|
||
"scan_dir",
|
||
type=Path,
|
||
help="Directory containing metadata.json, tiles/, and optionally mosaic.jpg",
|
||
)
|
||
parser.add_argument(
|
||
"-o",
|
||
"--output",
|
||
type=Path,
|
||
default=None,
|
||
help="Output JPEG path (default: <scan_dir>/mosaic_reconstructed.jpg)",
|
||
)
|
||
parser.add_argument(
|
||
"--flip-y",
|
||
action="store_true",
|
||
help="Flip row order vertically to match server mosaic orientation.",
|
||
)
|
||
parser.add_argument(
|
||
"--no-flip-x",
|
||
dest="flip_x",
|
||
action="store_false",
|
||
help=(
|
||
"Do not mirror columns: tile column 0 is on the left (matches URL/grid order). "
|
||
"Default flips X so the layout matches mosaic.jpg from the server."
|
||
),
|
||
)
|
||
parser.set_defaults(flip_x=True)
|
||
parser.add_argument(
|
||
"--allow-missing",
|
||
action="store_true",
|
||
help="Leave black pixels for missing tiles instead of failing.",
|
||
)
|
||
parser.add_argument(
|
||
"--compare-mosaic",
|
||
action="store_true",
|
||
help="Compare reconstructed image to mosaic.jpg in scan_dir (decoded RGB).",
|
||
)
|
||
parser.add_argument(
|
||
"--fit",
|
||
action="store_true",
|
||
help="When comparing, resize reference mosaic.jpg to reconstructed size.",
|
||
)
|
||
parser.add_argument(
|
||
"--jpeg-quality",
|
||
type=int,
|
||
default=95,
|
||
help="JPEG quality for output (default: 95).",
|
||
)
|
||
parser.add_argument(
|
||
"--write-exif",
|
||
action="store_true",
|
||
help=(
|
||
"After saving, write mosaic EXIF using metadata.json and config "
|
||
"(implies override if write_exif is false in config). JPEG output only."
|
||
),
|
||
)
|
||
parser.add_argument(
|
||
"--config",
|
||
default=DEFAULT_CONFIG,
|
||
metavar="FILE",
|
||
help=f"YAML config for EXIF machine_metadata (default: {DEFAULT_CONFIG})",
|
||
)
|
||
parser.add_argument(
|
||
"--tile-gap",
|
||
type=int,
|
||
default=1,
|
||
metavar="PX",
|
||
help=(
|
||
"Insert PX pixels of spacing between adjacent tiles (horizontal and vertical), "
|
||
"like RootView server mosaics. Gap is filled with --gap-color (default: white). "
|
||
"Use 0 for flush tiles. Default: 1."
|
||
),
|
||
)
|
||
parser.add_argument(
|
||
"--gap-color",
|
||
default="255,255,255",
|
||
metavar="R,G,B",
|
||
help="RGB for tile gutters and canvas background when --tile-gap > 0 (default: 255,255,255).",
|
||
)
|
||
args = parser.parse_args()
|
||
scan_dir = args.scan_dir.expanduser().resolve()
|
||
if not scan_dir.is_dir():
|
||
raise SystemExit(f"Not a directory: {scan_dir}")
|
||
|
||
out = args.output
|
||
if out is None:
|
||
out = scan_dir / "mosaic_reconstructed.jpg"
|
||
else:
|
||
out = out.expanduser().resolve()
|
||
|
||
parts = [p.strip() for p in args.gap_color.split(",")]
|
||
if len(parts) != 3:
|
||
raise SystemExit("--gap-color must be R,G,B with three integers.")
|
||
try:
|
||
gf = tuple(max(0, min(255, int(x))) for x in parts)
|
||
except ValueError as exc:
|
||
raise SystemExit("--gap-color must be three integers.") from exc
|
||
gap_fill: tuple[int, int, int] = (gf[0], gf[1], gf[2])
|
||
|
||
canvas, nx, ny, tw, th = stitch_scan(
|
||
scan_dir,
|
||
flip_y=args.flip_y,
|
||
flip_x=args.flip_x,
|
||
allow_missing=args.allow_missing,
|
||
tile_gap=args.tile_gap,
|
||
gap_fill=gf,
|
||
)
|
||
est_bytes = canvas.width * canvas.height * 3
|
||
if est_bytes > 512 * 1024 * 1024:
|
||
print(
|
||
f"Warning: decoded canvas ~{est_bytes / (1024**3):.2f} GiB in memory.",
|
||
file=sys.stderr,
|
||
)
|
||
|
||
canvas.save(out, format="JPEG", quality=args.jpeg_quality, subsampling=0)
|
||
print(
|
||
f"Wrote {out} ({canvas.width}x{canvas.height}, tile {tw}x{th}, "
|
||
f"grid {nx}x{ny}, tile_gap={args.tile_gap})"
|
||
)
|
||
|
||
if args.write_exif:
|
||
if out.suffix.lower() not in (".jpg", ".jpeg"):
|
||
raise SystemExit("--write-exif requires a .jpg or .jpeg output path.")
|
||
cfg_path = Path(args.config).expanduser()
|
||
if not cfg_path.is_file():
|
||
raise SystemExit(f"Config not found: {cfg_path}")
|
||
config = load_config(str(cfg_path), require_credentials=False)
|
||
try:
|
||
ok = tag_mosaic_jpeg_for_scan_dir(scan_dir, out, config, force=True)
|
||
except ValueError as exc:
|
||
raise SystemExit(str(exc)) from exc
|
||
if not ok:
|
||
raise SystemExit("EXIF tagging failed (see log).")
|
||
|
||
if args.compare_mosaic:
|
||
compare_mosaics(canvas, scan_dir / "mosaic.jpg", fit=args.fit)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|