diff --git a/requirements.txt b/requirements.txt index 4cf59c3..81389a7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,5 @@ tqdm>=4.66.0 piexif>=1.1.3 Pillow>=10.0.0 pytest>=8.0 +imageio>=2.34 +imageio-ffmpeg>=0.5 diff --git a/scripts/build_mosaic_movie.py b/scripts/build_mosaic_movie.py new file mode 100644 index 0000000..8c420c7 --- /dev/null +++ b/scripts/build_mosaic_movie.py @@ -0,0 +1,710 @@ +#!/usr/bin/env python3 +""" +Build a chronological MP4 from downloaded mosaic.jpg files for one machine ROI. + +Reads archives/scans.csv, filters by machine and mosaic_on_disk, optionally +restricts to one (start_x, start_y, end_x, end_y) ROI, dedupes by scan_id +(last row wins), sorts by scan_time, and encodes frames with imageio/ffmpeg. + +Usage: + .venv/bin/python scripts/build_mosaic_movie.py --machine "BW1-4 [AMR-15]" + .venv/bin/python scripts/build_mosaic_movie.py --machine "BW1-4 [AMR-15]" \\ + --roi "195.65,219.22,219.73,235.04" --fps 8 --output /tmp/out.mp4 + .venv/bin/python scripts/build_mosaic_movie.py --machine "BW1-4 [AMR-15]" --dry-run + # Lighter preview (caps tall full-tube mosaics by height — easier on players): + .venv/bin/python scripts/build_mosaic_movie.py --machine "BW1-4 [AMR-15]" \\ + --roi "0.0,0.0,310.0,740.0" --preview + # Metadata is drawn on each frame by default (semi-transparent bar at the top); + # use --no-metadata-overlay to disable. +""" + +from __future__ import annotations + +import argparse +import csv +import os +import sys +import time +from collections import Counter, defaultdict +from dataclasses import dataclass +from pathlib import Path + +import imageio +import numpy as np +from PIL import Image, ImageDraw, ImageFont + + +class MovieEncodeError(Exception): + """Raised from encoding helpers; caught by encode_movie for batch-safe handling.""" + + +@dataclass +class EncodedMovieResult: + success: bool + machine: str + roi: tuple[float, float, float, float] + csv_frame_count: int + written: int + missing: int + dropped_read: int + output_path: Path | None + skipped_reason: str | None + size_mb: float | None + elapsed_s: float | None + + +def sanitize_machine_label(label: str) -> str: + return label.replace("[", "").replace("]", "").replace(" ", "_").strip("_") + + +def parse_roi(s: str) -> tuple[float, float, float, float]: + parts = [p.strip() for p in s.split(",")] + if len(parts) != 4: + sys.exit("--roi must be four comma-separated numbers: start_x,start_y,end_x,end_y") + try: + return tuple(float(p) for p in parts) # type: ignore[return-value] + except ValueError as e: + sys.exit(f"Invalid --roi numbers: {e}") + + +def extent_close( + row: dict, + roi: tuple[float, float, float, float], + *, + tol: float = 1e-4, +) -> bool: + keys = ("start_x", "start_y", "end_x", "end_y") + try: + vals = tuple(float(row[k]) for k in keys) + except (KeyError, ValueError): + return False + return all(abs(a - b) < tol for a, b in zip(vals, roi)) + + +def extent_key(row: dict) -> tuple[str, str, str, str]: + """Stable grouping key from CSV string fields.""" + return ( + row.get("start_x", "").strip(), + row.get("start_y", "").strip(), + row.get("end_x", "").strip(), + row.get("end_y", "").strip(), + ) + + +def key_to_roi_floats(key: tuple[str, str, str, str]) -> tuple[float, float, float, float]: + return tuple(float(x) for x in key) # type: ignore[return-value] + + +def parse_args() -> argparse.Namespace: + p = argparse.ArgumentParser(description=__doc__) + p.add_argument("--machine", required=True, help='RootView machine label, e.g. "BW1-4 [AMR-15]"') + p.add_argument( + "--roi", + metavar="SX,SY,EX,EY", + help="Restrict to this extent (mm). If omitted, pick the ROI with the most on-disk mosaics.", + ) + p.add_argument("--archive", default="archives", type=Path, help="Archive root (default: archives)") + p.add_argument( + "--scans-csv", + default=None, + type=Path, + help="Path to scans.csv (default: /scans.csv)", + ) + p.add_argument( + "--output", + "-o", + type=Path, + default=None, + help="Output .mp4 path (default: /movies//roi_<...>.mp4)", + ) + p.add_argument("--fps", type=float, default=10.0, help="Frames per second (default: 10)") + p.add_argument( + "--max-height", + type=int, + default=None, + metavar="PX", + help="Scale each frame so height is at most PX pixels (width keeps aspect); " + "suited to tall full-tube mosaics. Both dimensions are rounded to even pixels for H.264.", + ) + p.add_argument( + "--preview", + action="store_true", + help="Shorthand for --max-height 1080 (overridden if --max-height is also set).", + ) + p.add_argument( + "--dry-run", + action="store_true", + help="List frames that would be written (no MP4)", + ) + p.add_argument( + "--no-metadata-overlay", + action="store_true", + help="Do not draw scan metadata on each frame (default: overlay on).", + ) + args = p.parse_args() + if args.preview and args.max_height is None: + args.max_height = 1080 + return args + + +def _csv_required_fieldnames() -> tuple[str, ...]: + return ( + "machine", + "scan_id", + "scan_time", + "mosaic_on_disk", + "mosaic_local_path", + "start_x", + "start_y", + "end_x", + "end_y", + ) + + +def validate_scans_csv_header(reader: csv.DictReader, scans_csv: Path) -> None: + if reader.fieldnames is None: + sys.exit(f"Empty CSV: {scans_csv}") + required = _csv_required_fieldnames() + missing = [c for c in required if c not in reader.fieldnames] + if missing: + sys.exit(f"{scans_csv} missing columns: {missing}") + + +def load_latest_rows( + scans_csv: Path, + machine: str, + roi: tuple[float, float, float, float] | None, +) -> list[dict]: + """Last row per scan_id for matching machine; mosaic_on_disk True; optional ROI.""" + latest: dict[str, dict] = {} + with scans_csv.open(newline="", encoding="utf-8") as fh: + reader = csv.DictReader(fh) + validate_scans_csv_header(reader, scans_csv) + + for row in reader: + if row.get("machine", "") != machine: + continue + if row.get("mosaic_on_disk", "").strip() != "True": + continue + if roi is not None and not extent_close(row, roi): + continue + sid = row.get("scan_id", "").strip() + if not sid: + continue + latest[sid] = row + + return list(latest.values()) + + +def load_on_disk_rows_by_machine(scans_csv: Path) -> dict[str, list[dict]]: + """One pass: last row per (machine, scan_id) where mosaic_on_disk True; group by machine.""" + latest: dict[tuple[str, str], dict] = {} + with scans_csv.open(newline="", encoding="utf-8") as fh: + reader = csv.DictReader(fh) + validate_scans_csv_header(reader, scans_csv) + for row in reader: + if row.get("mosaic_on_disk", "").strip() != "True": + continue + sid = row.get("scan_id", "").strip() + m = row.get("machine", "").strip() + if not sid or not m: + continue + latest[(m, sid)] = row + + by_machine: dict[str, list[dict]] = defaultdict(list) + for (m, _sid), r in latest.items(): + by_machine[m].append(r) + return {k: v for k, v in by_machine.items()} + + +def pick_top_rois(rows: list[dict], n: int) -> list[tuple[tuple[float, float, float, float], int]]: + """Top n distinct extents by count of deduped rows. Empty if rows empty or n < 1.""" + if not rows or n < 1: + return [] + counts = Counter(extent_key(r) for r in rows) + return [(key_to_roi_floats(key), cnt) for key, cnt in counts.most_common(n)] + + +def pick_top_roi(rows: list[dict]) -> tuple[float, float, float, float]: + if not rows: + sys.exit("No rows with mosaic_on_disk=True for this machine (and ROI filter, if any).") + return pick_top_rois(rows, 1)[0][0] + + +def default_output_path( + archive: Path, + machine: str, + roi: tuple[float, float, float, float], + *, + max_height: int | None, + metadata_overlay: bool, + rank: int | None = None, +) -> Path: + safe = sanitize_machine_label(machine) + sx, sy, ex, ey = roi + base = f"roi_{sx}_{sy}_{ex}_{ey}".replace(" ", "") + if rank is not None: + base = f"{base}_r{rank}" + if max_height is not None: + base = f"{base}_h{max_height}" + if metadata_overlay: + base = f"{base}_meta" + name = f"{base}.mp4" + return archive / "movies" / safe / name + + +def resolve_mosaic_path(rel: str, archive: Path) -> Path: + """CSV paths are usually repo-relative, e.g. archives/BW1-4__AMR-15/.../mosaic.jpg.""" + p = Path(rel) + if p.is_absolute(): + return p.resolve() + ar = archive.resolve() + norm = rel.replace("\\", "/") + if norm.startswith("archives/") or norm.startswith("./archives/"): + return (ar.parent / rel).resolve() + return (ar / rel).resolve() + + +def even_dimensions(w: int, h: int) -> tuple[int, int]: + """libx264 requires even width and height.""" + w2 = w - (w % 2) + h2 = h - (h % 2) + if w2 < 2 or h2 < 2: + raise MovieEncodeError(f"Frame dimensions too small after evenizing: {w}x{h}") + return w2, h2 + + +def frame_size_mode(paths: list[Path]) -> tuple[int, int]: + sizes: list[tuple[int, int]] = [] + for p in paths: + try: + with Image.open(p) as im: + sizes.append(im.size) + except OSError: + continue + if not sizes: + raise MovieEncodeError("No readable mosaic images to determine frame size.") + w, h = Counter(sizes).most_common(1)[0][0] + return even_dimensions(w, h) + + +def encode_size(native_w: int, native_h: int, max_height: int | None) -> tuple[int, int]: + """Native size is already even; optional downscale for preview encodes (cap height).""" + if max_height is None: + return native_w, native_h + if max_height < 32: + raise MovieEncodeError("--max-height must be at least 32") + cap = max_height - (max_height % 2) + if cap < 2: + raise MovieEncodeError("--max-height must allow an even height of at least 2") + if native_h <= cap: + return native_w, native_h + new_h = cap + new_w = int(round(native_w * (new_h / native_h))) + new_w -= new_w % 2 + if new_w < 2: + raise MovieEncodeError("Computed preview width too small; try a larger --max-height") + return new_w, new_h + + +def _truetype_font_candidates() -> list[Path]: + windir = os.environ.get("WINDIR", r"C:\Windows") + return [ + Path(windir) / "Fonts" / "arial.ttf", + Path(windir) / "Fonts" / "consola.ttf", + Path("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf"), + Path("/usr/share/fonts/truetype/liberation/LiberationSans-Regular.ttf"), + ] + + +def get_overlay_font(size: int) -> ImageFont.FreeTypeFont | ImageFont.ImageFont: + for path in _truetype_font_candidates(): + if path.is_file(): + try: + return ImageFont.truetype(str(path), size=size) + except OSError: + continue + return ImageFont.load_default() + + +def _truncate(s: str, max_len: int) -> str: + s = s.strip() + if len(s) <= max_len: + return s + if max_len <= 3: + return s[:max_len] + return s[: max_len - 3] + "..." + + +def metadata_overlay_lines(row: dict, *, max_name_chars: int) -> list[str]: + sid = row.get("scan_id", "").strip() + st = row.get("scan_time", "").strip() + name = _truncate(row.get("name", "").strip(), max_name_chars) + nx = row.get("nx", "").strip() + ny = row.get("ny", "").strip() + dx = row.get("dx", "").strip() + dy = row.get("dy", "").strip() + lines = row.get("scan_lines", "").strip() + mode = row.get("scan_mode", "").strip() + user = row.get("user", "").strip() + status = row.get("status", "").strip() + sx = row.get("start_x", "").strip() + sy = row.get("start_y", "").strip() + ex = row.get("end_x", "").strip() + ey = row.get("end_y", "").strip() + machine = row.get("machine", "").strip() + + grid = f"{nx}x{ny}" if nx and ny else "" + step = f"{dx}x{dy} mm" if dx and dy else "" + geom = " ".join(p for p in (grid, step) if p) + orient = f"{lines} / {mode}" if lines or mode else "" + + out: list[str] = [] + if machine: + out.append(machine) + if sid or st: + parts = [] + if sid: + parts.append(f"id {sid}") + if st: + parts.append(st) + out.append(" ".join(parts)) + if name: + out.append(name) + if geom or orient: + out.append(" ".join(p for p in (geom, orient) if p)) + if sx and sy and ex and ey: + out.append(f"ROI mm {sx},{sy} .. {ex},{ey}") + if user or status: + tail: list[str] = [] + if user: + tail.append(f"user {user}") + if status: + tail.append(status) + out.append(" ".join(tail)) + return out if out else ["(no metadata)"] + + +def draw_metadata_overlay( + rgb: Image.Image, + row: dict, + *, + margin: int, +) -> None: + """Draw a semi-transparent label block along the top; mutates rgb in place.""" + # Panel alpha: ~50% so roots stay visible through the bar. + panel_fill = (0, 0, 0, 128) + panel_outline = (220, 220, 230, 100) + + w, h = rgb.size + margin = max(4, min(margin, w // 8)) + font_size = max(10, min(22, h // 50)) + pad = max(4, font_size // 3) + line_gap = max(2, font_size // 6) + + def measure_block(fs: int, name_max: int) -> tuple[ImageFont.FreeTypeFont | ImageFont.ImageFont, list[str], int, int]: + font = get_overlay_font(fs) + lines = metadata_overlay_lines(row, max_name_chars=name_max) + tmp = Image.new("RGB", (1, 1)) + draw = ImageDraw.Draw(tmp) + max_tw = 0 + total_h = 0 + for line in lines: + bbox = draw.textbbox((0, 0), line, font=font) + tw = bbox[2] - bbox[0] + th = bbox[3] - bbox[1] + max_tw = max(max_tw, tw) + total_h += th + if len(lines) > 1: + total_h += line_gap * (len(lines) - 1) + block_w = max_tw + 2 * pad + block_h = total_h + 2 * pad + return font, lines, block_w, block_h + + name_max = max(24, w // max(6, font_size // 2)) + font, lines, block_w, block_h = measure_block(font_size, name_max) + max_block_w = w - 2 * margin + max_block_h = min(h // 2, h - 2 * margin) + while (block_w > max_block_w or block_h > max_block_h) and font_size > 9: + font_size -= 1 + name_max = max(16, w // max(7, font_size // 2)) + font, lines, block_w, block_h = measure_block(font_size, name_max) + while block_w > max_block_w and name_max > 12: + name_max -= 4 + font, lines, block_w, block_h = measure_block(font_size, name_max) + + bw = min(block_w, max_block_w) + bh = min(block_h, max_block_h) + x0 = margin + x1 = x0 + bw + y0 = margin + y1 = y0 + bh + + overlay = Image.new("RGBA", (w, h), (0, 0, 0, 0)) + draw_ov = ImageDraw.Draw(overlay) + draw_ov.rounded_rectangle( + [x0, y0, x1, y1], + radius=max(4, pad // 2), + fill=panel_fill, + outline=panel_outline, + width=1, + ) + + base = rgb.convert("RGBA") + composited = Image.alpha_composite(base, overlay) + draw = ImageDraw.Draw(composited) + + cx, cy = x0 + pad, y0 + pad + text_bottom_limit = y1 - pad + for line in lines: + bbox = draw.textbbox((0, 0), line, font=font) + th = bbox[3] - bbox[1] + if cy + th > text_bottom_limit: + break + draw.text( + (cx, cy), + line, + fill=(245, 245, 245, 255), + font=font, + stroke_width=1, + stroke_fill=(0, 0, 0, 255), + ) + cy += th + line_gap + + rgb.paste(composited.convert("RGB")) + + +def encode_movie( + *, + machine: str, + roi: tuple[float, float, float, float], + rows: list[dict], + archive: Path, + max_height: int | None, + metadata_overlay: bool, + fps: float, + output: Path | None = None, + dry_run: bool = False, + rank: int | None = None, + quiet: bool = False, +) -> EncodedMovieResult: + """Build MP4 from pre-filtered rows for one ROI. Does not sys.exit on encode failures.""" + rows_sorted = sorted( + rows, + key=lambda r: (r.get("scan_time") or "", r.get("scan_id") or ""), + ) + csv_frame_count = len(rows_sorted) + + row_path_candidates: list[tuple[dict, Path]] = [] + for r in rows_sorted: + rel = (r.get("mosaic_local_path") or "").strip() + if not rel: + continue + row_path_candidates.append((r, resolve_mosaic_path(rel, archive))) + + out: Path = output or default_output_path( + archive, + machine, + roi, + max_height=max_height, + metadata_overlay=metadata_overlay, + rank=rank, + ) + + if not quiet: + print(f"Machine: {machine}") + print(f"ROI (mm): {roi[0]}, {roi[1]}, {roi[2]}, {roi[3]}") + print(f"Frames (from CSV, deduped): {csv_frame_count}") + if max_height is not None: + print(f"Preview max height: {max_height}px") + print(f"Metadata overlay: {'on' if metadata_overlay else 'off'}") + + on_disk = sum(1 for _r, p in row_path_candidates if p.is_file()) + missing_paths = len(row_path_candidates) - on_disk + + if dry_run: + if not quiet: + print(f"On-disk files among ordered list: {on_disk} / {len(row_path_candidates)}") + for i, (_r, p) in enumerate(row_path_candidates[:5]): + print(f" [{i}] {p} exists={p.is_file()}") + if len(row_path_candidates) > 10: + print(" ...") + start = max(0, len(row_path_candidates) - 3) + for i, (_r, p) in enumerate(row_path_candidates[start:], start=start): + print(f" [{i}] {p} exists={p.is_file()}") + return EncodedMovieResult( + success=True, + machine=machine, + roi=roi, + csv_frame_count=csv_frame_count, + written=0, + missing=missing_paths, + dropped_read=0, + output_path=out, + skipped_reason=None, + size_mb=None, + elapsed_s=None, + ) + + row_path_pairs = [(r, p) for r, p in row_path_candidates if p.is_file()] + if not row_path_pairs: + if not quiet: + print("No mosaic files on disk for the selected rows.") + return EncodedMovieResult( + success=False, + machine=machine, + roi=roi, + csv_frame_count=csv_frame_count, + written=0, + missing=missing_paths, + dropped_read=0, + output_path=out, + skipped_reason="No mosaic files on disk for the selected rows.", + size_mb=None, + elapsed_s=None, + ) + + t0 = time.perf_counter() + try: + ordered_paths = [p for _r, p in row_path_pairs] + target_w, target_h = frame_size_mode(ordered_paths) + enc_w, enc_h = encode_size(target_w, target_h, max_height) + except MovieEncodeError as e: + if not quiet: + print(str(e)) + return EncodedMovieResult( + success=False, + machine=machine, + roi=roi, + csv_frame_count=csv_frame_count, + written=0, + missing=missing_paths, + dropped_read=0, + output_path=out, + skipped_reason=str(e), + size_mb=None, + elapsed_s=None, + ) + + if not quiet: + print(f"Target frame size (mode): {target_w} x {target_h}") + if (enc_w, enc_h) != (target_w, target_h): + print(f"Encode size (after max-height): {enc_w} x {enc_h}") + + out.parent.mkdir(parents=True, exist_ok=True) + written = 0 + dropped = 0 + resized = 0 + scaled_preview = 0 + try: + writer = imageio.get_writer( + str(out), + fps=float(fps), + codec="libx264", + quality=8, + macro_block_size=1, + ) + try: + for row, p in row_path_pairs: + try: + with Image.open(p) as im: + rgb = im.convert("RGB") + if rgb.size != (target_w, target_h): + rgb = rgb.resize((target_w, target_h), Image.Resampling.LANCZOS) + resized += 1 + if rgb.size != (enc_w, enc_h): + rgb = rgb.resize((enc_w, enc_h), Image.Resampling.LANCZOS) + scaled_preview += 1 + if metadata_overlay: + draw_metadata_overlay(rgb, row, margin=max(6, enc_w // 80)) + frame = np.asarray(rgb) + writer.append_data(frame) + written += 1 + except OSError: + dropped += 1 + finally: + writer.close() + except Exception as e: + if not quiet: + print(f"Encode error: {e}") + return EncodedMovieResult( + success=False, + machine=machine, + roi=roi, + csv_frame_count=csv_frame_count, + written=written, + missing=missing_paths, + dropped_read=dropped, + output_path=out, + skipped_reason=f"encode_error: {e}", + size_mb=None, + elapsed_s=None, + ) + + elapsed = time.perf_counter() - t0 + size_mb = out.stat().st_size / (1024 * 1024) if out.is_file() else 0.0 + if not quiet: + print( + f"Written: {written} frames (normalized to mode: {resized}, " + f"preview scale: {scaled_preview})" + ) + print(f"Dropped (read error): {dropped}") + print(f"Missing paths (not on disk): {missing_paths}") + print(f"Output: {out.resolve()} ({size_mb:.2f} MB)") + + return EncodedMovieResult( + success=True, + machine=machine, + roi=roi, + csv_frame_count=csv_frame_count, + written=written, + missing=missing_paths, + dropped_read=dropped, + output_path=out, + skipped_reason=None, + size_mb=size_mb, + elapsed_s=elapsed, + ) + + +def main() -> None: + args = parse_args() + archive: Path = args.archive + scans_csv: Path = args.scans_csv or (archive / "scans.csv") + if not scans_csv.is_file(): + sys.exit(f"scans.csv not found: {scans_csv}") + + roi_sel: tuple[float, float, float, float] | None + if args.roi: + roi_sel = parse_roi(args.roi) + else: + roi_sel = None + + rows = load_latest_rows(scans_csv, args.machine, roi_sel) + if roi_sel is None: + roi_sel = pick_top_roi(rows) + rows = [r for r in rows if extent_close(r, roi_sel)] + + assert roi_sel is not None + max_height: int | None = args.max_height + metadata_overlay = not args.no_metadata_overlay + + res = encode_movie( + machine=args.machine, + roi=roi_sel, + rows=rows, + archive=archive, + max_height=max_height, + metadata_overlay=metadata_overlay, + fps=float(args.fps), + output=args.output, + dry_run=bool(args.dry_run), + rank=None, + quiet=False, + ) + if not res.success and not args.dry_run: + sys.exit(1 if res.skipped_reason else 1) + + +if __name__ == "__main__": + main() diff --git a/scripts/build_mosaic_movies_batch.py b/scripts/build_mosaic_movies_batch.py new file mode 100644 index 0000000..bf5569f --- /dev/null +++ b/scripts/build_mosaic_movies_batch.py @@ -0,0 +1,269 @@ +#!/usr/bin/env python3 +""" +Build preview MP4s for the top N ROIs per machine (default N=2, max-height 1080). + +Reads archives/scans.csv once, groups on-disk mosaic rows by machine, then for +each machine picks the most frequent ROI extents and calls encode_movie(). + +Usage: + python scripts/build_mosaic_movies_batch.py + python scripts/build_mosaic_movies_batch.py --dry-run + python scripts/build_mosaic_movies_batch.py --skip-existing + python scripts/build_mosaic_movies_batch.py --machine "BW2-10 [AMR-22]" + python scripts/build_mosaic_movies_batch.py --full-res # no max-height cap +""" + +from __future__ import annotations + +import argparse +import sys +import time +from dataclasses import dataclass +from pathlib import Path + +_SCRIPTS_DIR = Path(__file__).resolve().parent + +# Import sibling module (run as python scripts/build_mosaic_movies_batch.py from repo root) +sys.path.insert(0, str(_SCRIPTS_DIR)) +import build_mosaic_movie as bmm # noqa: E402 + + +def read_machine_labels(path: Path) -> list[str]: + out: list[str] = [] + with path.open(encoding="utf-8") as fh: + for line in fh: + s = line.strip() + if not s or s.startswith("#"): + continue + out.append(s) + return out + + +@dataclass +class Job: + machine: str + rank: int + roi: tuple[float, float, float, float] + extent_count: int + rows: list[dict] + output_path: Path + + +def collect_jobs( + *, + machines: list[str], + by_machine: dict[str, list[dict]], + archive: Path, + top_rois: int, + max_height: int | None, + metadata_overlay: bool, +) -> list[Job]: + jobs: list[Job] = [] + for machine in machines: + rows = by_machine.get(machine, []) + if not rows: + continue + picks = bmm.pick_top_rois(rows, top_rois) + for rank, (roi, extent_count) in enumerate(picks, start=1): + rows_roi = [r for r in rows if bmm.extent_close(r, roi)] + out = bmm.default_output_path( + archive, + machine, + roi, + max_height=max_height, + metadata_overlay=metadata_overlay, + rank=rank, + ) + jobs.append( + Job( + machine=machine, + rank=rank, + roi=roi, + extent_count=extent_count, + rows=rows_roi, + output_path=out, + ) + ) + return jobs + + +def parse_args() -> argparse.Namespace: + p = argparse.ArgumentParser(description=__doc__) + p.add_argument( + "--machines-file", + type=Path, + default=_SCRIPTS_DIR / "machines.example.txt", + help="One machine label per line (default: scripts/machines.example.txt next to this script)", + ) + p.add_argument( + "--machine", + metavar="LABEL", + help='If set, only this machine (overrides list to a single job set), e.g. "BW2-10 [AMR-22]"', + ) + p.add_argument("--archive", type=Path, default=Path("archives")) + p.add_argument("--scans-csv", type=Path, default=None) + p.add_argument("--top-rois", type=int, default=2, help="How many top extents per machine (default: 2)") + p.add_argument("--max-height", type=int, default=1080, help="Preview cap in px (default: 1080)") + p.add_argument( + "--full-res", + action="store_true", + help="Disable max-height cap (full mosaic resolution; can be huge)", + ) + p.add_argument("--fps", type=float, default=10.0) + p.add_argument("--dry-run", action="store_true") + p.add_argument( + "--skip-existing", + action="store_true", + help="Skip encode if output MP4 exists and is non-empty", + ) + p.add_argument("--no-metadata-overlay", action="store_true") + args = p.parse_args() + if args.full_res: + args.max_height = None + return args + + +def main() -> None: + args = parse_args() + archive: Path = args.archive + scans_csv: Path = args.scans_csv or (archive / "scans.csv") + if not scans_csv.is_file(): + sys.exit(f"scans.csv not found: {scans_csv}") + + if args.machine: + machines = [args.machine.strip()] + else: + if not args.machines_file.is_file(): + sys.exit(f"Machines file not found: {args.machines_file}") + machines = read_machine_labels(args.machines_file) + if not machines: + sys.exit(f"No machine labels in {args.machines_file}") + + t_load0 = time.perf_counter() + by_machine = bmm.load_on_disk_rows_by_machine(scans_csv) + load_s = time.perf_counter() - t_load0 + + max_height: int | None = args.max_height + metadata_overlay = not args.no_metadata_overlay + + jobs = collect_jobs( + machines=machines, + by_machine=by_machine, + archive=archive, + top_rois=args.top_rois, + max_height=max_height, + metadata_overlay=metadata_overlay, + ) + total = len(jobs) + if total == 0: + sys.exit("No jobs (no on-disk mosaics for selected machines).") + + print(f"Loaded scans.csv grouped by machine in {load_s:.2f}s ({total} job(s))") + if max_height is not None: + print(f"Max height: {max_height}px") + else: + print("Max height: (full resolution)") + print(f"Metadata overlay: {'on' if metadata_overlay else 'off'}") + print() + + summary_rows: list[tuple[str, ...]] = [] + + for idx, job in enumerate(jobs, start=1): + sx, sy, ex, ey = job.roi + roi_s = f"{sx},{sy}..{ex},{ey}" + print( + f"[{idx}/{total}] {job.machine} rank={job.rank} ROI {roi_s} " + f"({job.extent_count} CSV rows this extent, {len(job.rows)} deduped rows)" + ) + + if args.skip_existing and job.output_path.is_file() and job.output_path.stat().st_size > 0: + sz = job.output_path.stat().st_size / (1024 * 1024) + print(f" SKIP (exists): {job.output_path}") + summary_rows.append( + ( + job.machine, + str(job.rank), + roi_s, + str(len(job.rows)), + "-", + "-", + str(job.output_path), + "SKIP (exists)", + f"{sz:.2f}", + "-", + ) + ) + continue + + enc_t0 = time.perf_counter() + res = bmm.encode_movie( + machine=job.machine, + roi=job.roi, + rows=job.rows, + archive=archive, + max_height=max_height, + metadata_overlay=metadata_overlay, + fps=float(args.fps), + output=None, + dry_run=args.dry_run, + rank=job.rank, + quiet=True, + ) + enc_elapsed = time.perf_counter() - enc_t0 + + if res.success: + if args.dry_run: + print(f" dry-run OK -> {res.output_path} (missing files: {res.missing})") + else: + print( + f" OK {res.written} frames " + f"{(res.size_mb or 0):.2f} MB {enc_elapsed:.1f}s" + ) + else: + print(f" FAIL {res.skipped_reason or 'unknown'}") + + status = "OK" if res.success else "FAIL" + if not res.success and res.skipped_reason: + status = f"FAIL: {res.skipped_reason[:40]}" + mb = f"{res.size_mb:.2f}" if res.size_mb is not None else "-" + es = f"{enc_elapsed:.1f}" if not args.dry_run else "-" + w = str(res.written) if not args.dry_run else "0" + if args.dry_run: + status = "dry-run" + mb = "-" + + summary_rows.append( + ( + job.machine, + str(job.rank), + roi_s, + str(len(job.rows)), + w, + str(res.missing), + str(res.output_path or job.output_path), + status, + mb, + es, + ) + ) + print() + + print("=" * 120) + hdr = ( + f"{'machine':<26} {'rk':>2} {'ROI (mm)':<36} {'csv':>4} {'out':>5} " + f"{'miss':>4} {'MB':>7} {'s':>6} {'status':<22}" + ) + print(hdr) + print("-" * 120) + for row in summary_rows: + m, rk, roi_s, csv_n, wrt, miss, path, status, mb, es = row + print( + f"{m:<26} {rk:>2} {roi_s:<36} {csv_n:>4} {wrt:>5} {miss:>4} " + f"{mb:>7} {es:>6} {status:<22}" + ) + print(f" -> {path}") + print("=" * 120) + + +if __name__ == "__main__": + main()