Files
SPRUCE-scraper/scripts/build_mosaic_movies_batch.py
T
poprhythm 1ef9e0206c Add mosaic timelapse scripts and imageio dependencies.
Introduce build_mosaic_movie for single-ROI MP4s from archived mosaics, with optional max-height preview, semi-transparent metadata overlay, and encode_movie API for reuse. Add build_mosaic_movies_batch to encode the top N ROIs per machine using one scans.csv pass, progress output, and --skip-existing for safe reruns. Declare imageio and imageio-ffmpeg in requirements.txt.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 17:04:06 -04:00

270 lines
8.4 KiB
Python

#!/usr/bin/env python3
"""
Build preview MP4s for the top N ROIs per machine (default N=2, max-height 1080).
Reads archives/scans.csv once, groups on-disk mosaic rows by machine, then for
each machine picks the most frequent ROI extents and calls encode_movie().
Usage:
python scripts/build_mosaic_movies_batch.py
python scripts/build_mosaic_movies_batch.py --dry-run
python scripts/build_mosaic_movies_batch.py --skip-existing
python scripts/build_mosaic_movies_batch.py --machine "BW2-10 [AMR-22]"
python scripts/build_mosaic_movies_batch.py --full-res # no max-height cap
"""
from __future__ import annotations
import argparse
import sys
import time
from dataclasses import dataclass
from pathlib import Path
_SCRIPTS_DIR = Path(__file__).resolve().parent
# Import sibling module (run as python scripts/build_mosaic_movies_batch.py from repo root)
sys.path.insert(0, str(_SCRIPTS_DIR))
import build_mosaic_movie as bmm # noqa: E402
def read_machine_labels(path: Path) -> list[str]:
out: list[str] = []
with path.open(encoding="utf-8") as fh:
for line in fh:
s = line.strip()
if not s or s.startswith("#"):
continue
out.append(s)
return out
@dataclass
class Job:
machine: str
rank: int
roi: tuple[float, float, float, float]
extent_count: int
rows: list[dict]
output_path: Path
def collect_jobs(
*,
machines: list[str],
by_machine: dict[str, list[dict]],
archive: Path,
top_rois: int,
max_height: int | None,
metadata_overlay: bool,
) -> list[Job]:
jobs: list[Job] = []
for machine in machines:
rows = by_machine.get(machine, [])
if not rows:
continue
picks = bmm.pick_top_rois(rows, top_rois)
for rank, (roi, extent_count) in enumerate(picks, start=1):
rows_roi = [r for r in rows if bmm.extent_close(r, roi)]
out = bmm.default_output_path(
archive,
machine,
roi,
max_height=max_height,
metadata_overlay=metadata_overlay,
rank=rank,
)
jobs.append(
Job(
machine=machine,
rank=rank,
roi=roi,
extent_count=extent_count,
rows=rows_roi,
output_path=out,
)
)
return jobs
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description=__doc__)
p.add_argument(
"--machines-file",
type=Path,
default=_SCRIPTS_DIR / "machines.example.txt",
help="One machine label per line (default: scripts/machines.example.txt next to this script)",
)
p.add_argument(
"--machine",
metavar="LABEL",
help='If set, only this machine (overrides list to a single job set), e.g. "BW2-10 [AMR-22]"',
)
p.add_argument("--archive", type=Path, default=Path("archives"))
p.add_argument("--scans-csv", type=Path, default=None)
p.add_argument("--top-rois", type=int, default=2, help="How many top extents per machine (default: 2)")
p.add_argument("--max-height", type=int, default=1080, help="Preview cap in px (default: 1080)")
p.add_argument(
"--full-res",
action="store_true",
help="Disable max-height cap (full mosaic resolution; can be huge)",
)
p.add_argument("--fps", type=float, default=10.0)
p.add_argument("--dry-run", action="store_true")
p.add_argument(
"--skip-existing",
action="store_true",
help="Skip encode if output MP4 exists and is non-empty",
)
p.add_argument("--no-metadata-overlay", action="store_true")
args = p.parse_args()
if args.full_res:
args.max_height = None
return args
def main() -> None:
args = parse_args()
archive: Path = args.archive
scans_csv: Path = args.scans_csv or (archive / "scans.csv")
if not scans_csv.is_file():
sys.exit(f"scans.csv not found: {scans_csv}")
if args.machine:
machines = [args.machine.strip()]
else:
if not args.machines_file.is_file():
sys.exit(f"Machines file not found: {args.machines_file}")
machines = read_machine_labels(args.machines_file)
if not machines:
sys.exit(f"No machine labels in {args.machines_file}")
t_load0 = time.perf_counter()
by_machine = bmm.load_on_disk_rows_by_machine(scans_csv)
load_s = time.perf_counter() - t_load0
max_height: int | None = args.max_height
metadata_overlay = not args.no_metadata_overlay
jobs = collect_jobs(
machines=machines,
by_machine=by_machine,
archive=archive,
top_rois=args.top_rois,
max_height=max_height,
metadata_overlay=metadata_overlay,
)
total = len(jobs)
if total == 0:
sys.exit("No jobs (no on-disk mosaics for selected machines).")
print(f"Loaded scans.csv grouped by machine in {load_s:.2f}s ({total} job(s))")
if max_height is not None:
print(f"Max height: {max_height}px")
else:
print("Max height: (full resolution)")
print(f"Metadata overlay: {'on' if metadata_overlay else 'off'}")
print()
summary_rows: list[tuple[str, ...]] = []
for idx, job in enumerate(jobs, start=1):
sx, sy, ex, ey = job.roi
roi_s = f"{sx},{sy}..{ex},{ey}"
print(
f"[{idx}/{total}] {job.machine} rank={job.rank} ROI {roi_s} "
f"({job.extent_count} CSV rows this extent, {len(job.rows)} deduped rows)"
)
if args.skip_existing and job.output_path.is_file() and job.output_path.stat().st_size > 0:
sz = job.output_path.stat().st_size / (1024 * 1024)
print(f" SKIP (exists): {job.output_path}")
summary_rows.append(
(
job.machine,
str(job.rank),
roi_s,
str(len(job.rows)),
"-",
"-",
str(job.output_path),
"SKIP (exists)",
f"{sz:.2f}",
"-",
)
)
continue
enc_t0 = time.perf_counter()
res = bmm.encode_movie(
machine=job.machine,
roi=job.roi,
rows=job.rows,
archive=archive,
max_height=max_height,
metadata_overlay=metadata_overlay,
fps=float(args.fps),
output=None,
dry_run=args.dry_run,
rank=job.rank,
quiet=True,
)
enc_elapsed = time.perf_counter() - enc_t0
if res.success:
if args.dry_run:
print(f" dry-run OK -> {res.output_path} (missing files: {res.missing})")
else:
print(
f" OK {res.written} frames "
f"{(res.size_mb or 0):.2f} MB {enc_elapsed:.1f}s"
)
else:
print(f" FAIL {res.skipped_reason or 'unknown'}")
status = "OK" if res.success else "FAIL"
if not res.success and res.skipped_reason:
status = f"FAIL: {res.skipped_reason[:40]}"
mb = f"{res.size_mb:.2f}" if res.size_mb is not None else "-"
es = f"{enc_elapsed:.1f}" if not args.dry_run else "-"
w = str(res.written) if not args.dry_run else "0"
if args.dry_run:
status = "dry-run"
mb = "-"
summary_rows.append(
(
job.machine,
str(job.rank),
roi_s,
str(len(job.rows)),
w,
str(res.missing),
str(res.output_path or job.output_path),
status,
mb,
es,
)
)
print()
print("=" * 120)
hdr = (
f"{'machine':<26} {'rk':>2} {'ROI (mm)':<36} {'csv':>4} {'out':>5} "
f"{'miss':>4} {'MB':>7} {'s':>6} {'status':<22}"
)
print(hdr)
print("-" * 120)
for row in summary_rows:
m, rk, roi_s, csv_n, wrt, miss, path, status, mb, es = row
print(
f"{m:<26} {rk:>2} {roi_s:<36} {csv_n:>4} {wrt:>5} {miss:>4} "
f"{mb:>7} {es:>6} {status:<22}"
)
print(f" -> {path}")
print("=" * 120)
if __name__ == "__main__":
main()