#!/usr/bin/env python3 """ Build a chronological MP4 from downloaded mosaic.jpg files for one machine ROI. Reads archives/scans.csv, filters by machine and mosaic_on_disk, optionally restricts to one (start_x, start_y, end_x, end_y) ROI, dedupes by scan_id (last row wins), sorts by scan_time, and encodes frames with imageio/ffmpeg. Usage: .venv/bin/python scripts/build_mosaic_movie.py --machine "BW1-4 [AMR-15]" .venv/bin/python scripts/build_mosaic_movie.py --machine "BW1-4 [AMR-15]" \\ --roi "195.65,219.22,219.73,235.04" --fps 8 --output /tmp/out.mp4 .venv/bin/python scripts/build_mosaic_movie.py --machine "BW1-4 [AMR-15]" --dry-run # Lighter preview (caps tall full-tube mosaics by height — easier on players): .venv/bin/python scripts/build_mosaic_movie.py --machine "BW1-4 [AMR-15]" \\ --roi "0.0,0.0,310.0,740.0" --preview # Metadata is drawn on each frame by default (semi-transparent bar at the top); # use --no-metadata-overlay to disable. """ from __future__ import annotations import argparse import csv import os import sys import time from collections import Counter, defaultdict from dataclasses import dataclass from pathlib import Path import imageio import numpy as np from PIL import Image, ImageDraw, ImageFont class MovieEncodeError(Exception): """Raised from encoding helpers; caught by encode_movie for batch-safe handling.""" @dataclass class EncodedMovieResult: success: bool machine: str roi: tuple[float, float, float, float] csv_frame_count: int written: int missing: int dropped_read: int output_path: Path | None skipped_reason: str | None size_mb: float | None elapsed_s: float | None def sanitize_machine_label(label: str) -> str: return label.replace("[", "").replace("]", "").replace(" ", "_").strip("_") def parse_roi(s: str) -> tuple[float, float, float, float]: parts = [p.strip() for p in s.split(",")] if len(parts) != 4: sys.exit("--roi must be four comma-separated numbers: start_x,start_y,end_x,end_y") try: return tuple(float(p) for p in parts) # type: ignore[return-value] except ValueError as e: sys.exit(f"Invalid --roi numbers: {e}") def extent_close( row: dict, roi: tuple[float, float, float, float], *, tol: float = 1e-4, ) -> bool: keys = ("start_x", "start_y", "end_x", "end_y") try: vals = tuple(float(row[k]) for k in keys) except (KeyError, ValueError): return False return all(abs(a - b) < tol for a, b in zip(vals, roi)) def extent_key(row: dict) -> tuple[str, str, str, str]: """Stable grouping key from CSV string fields.""" return ( row.get("start_x", "").strip(), row.get("start_y", "").strip(), row.get("end_x", "").strip(), row.get("end_y", "").strip(), ) def key_to_roi_floats(key: tuple[str, str, str, str]) -> tuple[float, float, float, float]: return tuple(float(x) for x in key) # type: ignore[return-value] def parse_args() -> argparse.Namespace: p = argparse.ArgumentParser(description=__doc__) p.add_argument("--machine", required=True, help='RootView machine label, e.g. "BW1-4 [AMR-15]"') p.add_argument( "--roi", metavar="SX,SY,EX,EY", help="Restrict to this extent (mm). If omitted, pick the ROI with the most on-disk mosaics.", ) p.add_argument("--archive", default="archives", type=Path, help="Archive root (default: archives)") p.add_argument( "--scans-csv", default=None, type=Path, help="Path to scans.csv (default: /scans.csv)", ) p.add_argument( "--output", "-o", type=Path, default=None, help="Output .mp4 path (default: /movies//roi_<...>.mp4)", ) p.add_argument("--fps", type=float, default=10.0, help="Frames per second (default: 10)") p.add_argument( "--max-height", type=int, default=None, metavar="PX", help="Scale each frame so height is at most PX pixels (width keeps aspect); " "suited to tall full-tube mosaics. Both dimensions are rounded to even pixels for H.264.", ) p.add_argument( "--preview", action="store_true", help="Shorthand for --max-height 1080 (overridden if --max-height is also set).", ) p.add_argument( "--dry-run", action="store_true", help="List frames that would be written (no MP4)", ) p.add_argument( "--no-metadata-overlay", action="store_true", help="Do not draw scan metadata on each frame (default: overlay on).", ) args = p.parse_args() if args.preview and args.max_height is None: args.max_height = 1080 return args def _csv_required_fieldnames() -> tuple[str, ...]: return ( "machine", "scan_id", "scan_time", "mosaic_on_disk", "mosaic_local_path", "start_x", "start_y", "end_x", "end_y", ) def validate_scans_csv_header(reader: csv.DictReader, scans_csv: Path) -> None: if reader.fieldnames is None: sys.exit(f"Empty CSV: {scans_csv}") required = _csv_required_fieldnames() missing = [c for c in required if c not in reader.fieldnames] if missing: sys.exit(f"{scans_csv} missing columns: {missing}") def load_latest_rows( scans_csv: Path, machine: str, roi: tuple[float, float, float, float] | None, ) -> list[dict]: """Last row per scan_id for matching machine; mosaic_on_disk True; optional ROI.""" latest: dict[str, dict] = {} with scans_csv.open(newline="", encoding="utf-8") as fh: reader = csv.DictReader(fh) validate_scans_csv_header(reader, scans_csv) for row in reader: if row.get("machine", "") != machine: continue if row.get("mosaic_on_disk", "").strip() != "True": continue if roi is not None and not extent_close(row, roi): continue sid = row.get("scan_id", "").strip() if not sid: continue latest[sid] = row return list(latest.values()) def load_on_disk_rows_by_machine(scans_csv: Path) -> dict[str, list[dict]]: """One pass: last row per (machine, scan_id) where mosaic_on_disk True; group by machine.""" latest: dict[tuple[str, str], dict] = {} with scans_csv.open(newline="", encoding="utf-8") as fh: reader = csv.DictReader(fh) validate_scans_csv_header(reader, scans_csv) for row in reader: if row.get("mosaic_on_disk", "").strip() != "True": continue sid = row.get("scan_id", "").strip() m = row.get("machine", "").strip() if not sid or not m: continue latest[(m, sid)] = row by_machine: dict[str, list[dict]] = defaultdict(list) for (m, _sid), r in latest.items(): by_machine[m].append(r) return {k: v for k, v in by_machine.items()} def pick_top_rois(rows: list[dict], n: int) -> list[tuple[tuple[float, float, float, float], int]]: """Top n distinct extents by count of deduped rows. Empty if rows empty or n < 1.""" if not rows or n < 1: return [] counts = Counter(extent_key(r) for r in rows) return [(key_to_roi_floats(key), cnt) for key, cnt in counts.most_common(n)] def pick_top_roi(rows: list[dict]) -> tuple[float, float, float, float]: if not rows: sys.exit("No rows with mosaic_on_disk=True for this machine (and ROI filter, if any).") return pick_top_rois(rows, 1)[0][0] def default_output_path( archive: Path, machine: str, roi: tuple[float, float, float, float], *, max_height: int | None, metadata_overlay: bool, rank: int | None = None, ) -> Path: safe = sanitize_machine_label(machine) sx, sy, ex, ey = roi base = f"roi_{sx}_{sy}_{ex}_{ey}".replace(" ", "") if rank is not None: base = f"{base}_r{rank}" if max_height is not None: base = f"{base}_h{max_height}" if metadata_overlay: base = f"{base}_meta" name = f"{base}.mp4" return archive / "movies" / safe / name def resolve_mosaic_path(rel: str, archive: Path) -> Path: """CSV paths are usually repo-relative, e.g. archives/BW1-4__AMR-15/.../mosaic.jpg.""" p = Path(rel) if p.is_absolute(): return p.resolve() ar = archive.resolve() norm = rel.replace("\\", "/") if norm.startswith("archives/") or norm.startswith("./archives/"): return (ar.parent / rel).resolve() return (ar / rel).resolve() def even_dimensions(w: int, h: int) -> tuple[int, int]: """libx264 requires even width and height.""" w2 = w - (w % 2) h2 = h - (h % 2) if w2 < 2 or h2 < 2: raise MovieEncodeError(f"Frame dimensions too small after evenizing: {w}x{h}") return w2, h2 def frame_size_mode(paths: list[Path]) -> tuple[int, int]: sizes: list[tuple[int, int]] = [] for p in paths: try: with Image.open(p) as im: sizes.append(im.size) except OSError: continue if not sizes: raise MovieEncodeError("No readable mosaic images to determine frame size.") w, h = Counter(sizes).most_common(1)[0][0] return even_dimensions(w, h) def encode_size(native_w: int, native_h: int, max_height: int | None) -> tuple[int, int]: """Native size is already even; optional downscale for preview encodes (cap height).""" if max_height is None: return native_w, native_h if max_height < 32: raise MovieEncodeError("--max-height must be at least 32") cap = max_height - (max_height % 2) if cap < 2: raise MovieEncodeError("--max-height must allow an even height of at least 2") if native_h <= cap: return native_w, native_h new_h = cap new_w = int(round(native_w * (new_h / native_h))) new_w -= new_w % 2 if new_w < 2: raise MovieEncodeError("Computed preview width too small; try a larger --max-height") return new_w, new_h def _truetype_font_candidates() -> list[Path]: windir = os.environ.get("WINDIR", r"C:\Windows") return [ Path(windir) / "Fonts" / "arial.ttf", Path(windir) / "Fonts" / "consola.ttf", Path("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf"), Path("/usr/share/fonts/truetype/liberation/LiberationSans-Regular.ttf"), ] def get_overlay_font(size: int) -> ImageFont.FreeTypeFont | ImageFont.ImageFont: for path in _truetype_font_candidates(): if path.is_file(): try: return ImageFont.truetype(str(path), size=size) except OSError: continue return ImageFont.load_default() def _truncate(s: str, max_len: int) -> str: s = s.strip() if len(s) <= max_len: return s if max_len <= 3: return s[:max_len] return s[: max_len - 3] + "..." def metadata_overlay_lines(row: dict, *, max_name_chars: int) -> list[str]: sid = row.get("scan_id", "").strip() st = row.get("scan_time", "").strip() name = _truncate(row.get("name", "").strip(), max_name_chars) nx = row.get("nx", "").strip() ny = row.get("ny", "").strip() dx = row.get("dx", "").strip() dy = row.get("dy", "").strip() lines = row.get("scan_lines", "").strip() mode = row.get("scan_mode", "").strip() user = row.get("user", "").strip() status = row.get("status", "").strip() sx = row.get("start_x", "").strip() sy = row.get("start_y", "").strip() ex = row.get("end_x", "").strip() ey = row.get("end_y", "").strip() machine = row.get("machine", "").strip() grid = f"{nx}x{ny}" if nx and ny else "" step = f"{dx}x{dy} mm" if dx and dy else "" geom = " ".join(p for p in (grid, step) if p) orient = f"{lines} / {mode}" if lines or mode else "" out: list[str] = [] if machine: out.append(machine) if sid or st: parts = [] if sid: parts.append(f"id {sid}") if st: parts.append(st) out.append(" ".join(parts)) if name: out.append(name) if geom or orient: out.append(" ".join(p for p in (geom, orient) if p)) if sx and sy and ex and ey: out.append(f"ROI mm {sx},{sy} .. {ex},{ey}") if user or status: tail: list[str] = [] if user: tail.append(f"user {user}") if status: tail.append(status) out.append(" ".join(tail)) return out if out else ["(no metadata)"] def draw_metadata_overlay( rgb: Image.Image, row: dict, *, margin: int, ) -> None: """Draw a semi-transparent label block along the top; mutates rgb in place.""" # Panel alpha: ~50% so roots stay visible through the bar. panel_fill = (0, 0, 0, 128) panel_outline = (220, 220, 230, 100) w, h = rgb.size margin = max(4, min(margin, w // 8)) font_size = max(10, min(22, h // 50)) pad = max(4, font_size // 3) line_gap = max(2, font_size // 6) def measure_block(fs: int, name_max: int) -> tuple[ImageFont.FreeTypeFont | ImageFont.ImageFont, list[str], int, int]: font = get_overlay_font(fs) lines = metadata_overlay_lines(row, max_name_chars=name_max) tmp = Image.new("RGB", (1, 1)) draw = ImageDraw.Draw(tmp) max_tw = 0 total_h = 0 for line in lines: bbox = draw.textbbox((0, 0), line, font=font) tw = bbox[2] - bbox[0] th = bbox[3] - bbox[1] max_tw = max(max_tw, tw) total_h += th if len(lines) > 1: total_h += line_gap * (len(lines) - 1) block_w = max_tw + 2 * pad block_h = total_h + 2 * pad return font, lines, block_w, block_h name_max = max(24, w // max(6, font_size // 2)) font, lines, block_w, block_h = measure_block(font_size, name_max) max_block_w = w - 2 * margin max_block_h = min(h // 2, h - 2 * margin) while (block_w > max_block_w or block_h > max_block_h) and font_size > 9: font_size -= 1 name_max = max(16, w // max(7, font_size // 2)) font, lines, block_w, block_h = measure_block(font_size, name_max) while block_w > max_block_w and name_max > 12: name_max -= 4 font, lines, block_w, block_h = measure_block(font_size, name_max) bw = min(block_w, max_block_w) bh = min(block_h, max_block_h) x0 = margin x1 = x0 + bw y0 = margin y1 = y0 + bh overlay = Image.new("RGBA", (w, h), (0, 0, 0, 0)) draw_ov = ImageDraw.Draw(overlay) draw_ov.rounded_rectangle( [x0, y0, x1, y1], radius=max(4, pad // 2), fill=panel_fill, outline=panel_outline, width=1, ) base = rgb.convert("RGBA") composited = Image.alpha_composite(base, overlay) draw = ImageDraw.Draw(composited) cx, cy = x0 + pad, y0 + pad text_bottom_limit = y1 - pad for line in lines: bbox = draw.textbbox((0, 0), line, font=font) th = bbox[3] - bbox[1] if cy + th > text_bottom_limit: break draw.text( (cx, cy), line, fill=(245, 245, 245, 255), font=font, stroke_width=1, stroke_fill=(0, 0, 0, 255), ) cy += th + line_gap rgb.paste(composited.convert("RGB")) def encode_movie( *, machine: str, roi: tuple[float, float, float, float], rows: list[dict], archive: Path, max_height: int | None, metadata_overlay: bool, fps: float, output: Path | None = None, dry_run: bool = False, rank: int | None = None, quiet: bool = False, ) -> EncodedMovieResult: """Build MP4 from pre-filtered rows for one ROI. Does not sys.exit on encode failures.""" rows_sorted = sorted( rows, key=lambda r: (r.get("scan_time") or "", r.get("scan_id") or ""), ) csv_frame_count = len(rows_sorted) row_path_candidates: list[tuple[dict, Path]] = [] for r in rows_sorted: rel = (r.get("mosaic_local_path") or "").strip() if not rel: continue row_path_candidates.append((r, resolve_mosaic_path(rel, archive))) out: Path = output or default_output_path( archive, machine, roi, max_height=max_height, metadata_overlay=metadata_overlay, rank=rank, ) if not quiet: print(f"Machine: {machine}") print(f"ROI (mm): {roi[0]}, {roi[1]}, {roi[2]}, {roi[3]}") print(f"Frames (from CSV, deduped): {csv_frame_count}") if max_height is not None: print(f"Preview max height: {max_height}px") print(f"Metadata overlay: {'on' if metadata_overlay else 'off'}") on_disk = sum(1 for _r, p in row_path_candidates if p.is_file()) missing_paths = len(row_path_candidates) - on_disk if dry_run: if not quiet: print(f"On-disk files among ordered list: {on_disk} / {len(row_path_candidates)}") for i, (_r, p) in enumerate(row_path_candidates[:5]): print(f" [{i}] {p} exists={p.is_file()}") if len(row_path_candidates) > 10: print(" ...") start = max(0, len(row_path_candidates) - 3) for i, (_r, p) in enumerate(row_path_candidates[start:], start=start): print(f" [{i}] {p} exists={p.is_file()}") return EncodedMovieResult( success=True, machine=machine, roi=roi, csv_frame_count=csv_frame_count, written=0, missing=missing_paths, dropped_read=0, output_path=out, skipped_reason=None, size_mb=None, elapsed_s=None, ) row_path_pairs = [(r, p) for r, p in row_path_candidates if p.is_file()] if not row_path_pairs: if not quiet: print("No mosaic files on disk for the selected rows.") return EncodedMovieResult( success=False, machine=machine, roi=roi, csv_frame_count=csv_frame_count, written=0, missing=missing_paths, dropped_read=0, output_path=out, skipped_reason="No mosaic files on disk for the selected rows.", size_mb=None, elapsed_s=None, ) t0 = time.perf_counter() try: ordered_paths = [p for _r, p in row_path_pairs] target_w, target_h = frame_size_mode(ordered_paths) enc_w, enc_h = encode_size(target_w, target_h, max_height) except MovieEncodeError as e: if not quiet: print(str(e)) return EncodedMovieResult( success=False, machine=machine, roi=roi, csv_frame_count=csv_frame_count, written=0, missing=missing_paths, dropped_read=0, output_path=out, skipped_reason=str(e), size_mb=None, elapsed_s=None, ) if not quiet: print(f"Target frame size (mode): {target_w} x {target_h}") if (enc_w, enc_h) != (target_w, target_h): print(f"Encode size (after max-height): {enc_w} x {enc_h}") out.parent.mkdir(parents=True, exist_ok=True) written = 0 dropped = 0 resized = 0 scaled_preview = 0 try: writer = imageio.get_writer( str(out), fps=float(fps), codec="libx264", quality=8, macro_block_size=1, ) try: for row, p in row_path_pairs: try: with Image.open(p) as im: rgb = im.convert("RGB") if rgb.size != (target_w, target_h): rgb = rgb.resize((target_w, target_h), Image.Resampling.LANCZOS) resized += 1 if rgb.size != (enc_w, enc_h): rgb = rgb.resize((enc_w, enc_h), Image.Resampling.LANCZOS) scaled_preview += 1 if metadata_overlay: draw_metadata_overlay(rgb, row, margin=max(6, enc_w // 80)) frame = np.asarray(rgb) writer.append_data(frame) written += 1 except OSError: dropped += 1 finally: writer.close() except Exception as e: if not quiet: print(f"Encode error: {e}") return EncodedMovieResult( success=False, machine=machine, roi=roi, csv_frame_count=csv_frame_count, written=written, missing=missing_paths, dropped_read=dropped, output_path=out, skipped_reason=f"encode_error: {e}", size_mb=None, elapsed_s=None, ) elapsed = time.perf_counter() - t0 size_mb = out.stat().st_size / (1024 * 1024) if out.is_file() else 0.0 if not quiet: print( f"Written: {written} frames (normalized to mode: {resized}, " f"preview scale: {scaled_preview})" ) print(f"Dropped (read error): {dropped}") print(f"Missing paths (not on disk): {missing_paths}") print(f"Output: {out.resolve()} ({size_mb:.2f} MB)") return EncodedMovieResult( success=True, machine=machine, roi=roi, csv_frame_count=csv_frame_count, written=written, missing=missing_paths, dropped_read=dropped, output_path=out, skipped_reason=None, size_mb=size_mb, elapsed_s=elapsed, ) def main() -> None: args = parse_args() archive: Path = args.archive scans_csv: Path = args.scans_csv or (archive / "scans.csv") if not scans_csv.is_file(): sys.exit(f"scans.csv not found: {scans_csv}") roi_sel: tuple[float, float, float, float] | None if args.roi: roi_sel = parse_roi(args.roi) else: roi_sel = None rows = load_latest_rows(scans_csv, args.machine, roi_sel) if roi_sel is None: roi_sel = pick_top_roi(rows) rows = [r for r in rows if extent_close(r, roi_sel)] assert roi_sel is not None max_height: int | None = args.max_height metadata_overlay = not args.no_metadata_overlay res = encode_movie( machine=args.machine, roi=roi_sel, rows=rows, archive=archive, max_height=max_height, metadata_overlay=metadata_overlay, fps=float(args.fps), output=args.output, dry_run=bool(args.dry_run), rank=None, quiet=False, ) if not res.success and not args.dry_run: sys.exit(1 if res.skipped_reason else 1) if __name__ == "__main__": main()