1 Commits

Author SHA1 Message Date
poprhythm 8593808cf3 Add --retry-failed mode and mosaic retry estimates to progress report
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 17:40:15 -04:00
3 changed files with 0 additions and 981 deletions
-2
View File
@@ -6,5 +6,3 @@ tqdm>=4.66.0
piexif>=1.1.3
Pillow>=10.0.0
pytest>=8.0
imageio>=2.34
imageio-ffmpeg>=0.5
-710
View File
@@ -1,710 +0,0 @@
#!/usr/bin/env python3
"""
Build a chronological MP4 from downloaded mosaic.jpg files for one machine ROI.
Reads archives/scans.csv, filters by machine and mosaic_on_disk, optionally
restricts to one (start_x, start_y, end_x, end_y) ROI, dedupes by scan_id
(last row wins), sorts by scan_time, and encodes frames with imageio/ffmpeg.
Usage:
.venv/bin/python scripts/build_mosaic_movie.py --machine "BW1-4 [AMR-15]"
.venv/bin/python scripts/build_mosaic_movie.py --machine "BW1-4 [AMR-15]" \\
--roi "195.65,219.22,219.73,235.04" --fps 8 --output /tmp/out.mp4
.venv/bin/python scripts/build_mosaic_movie.py --machine "BW1-4 [AMR-15]" --dry-run
# Lighter preview (caps tall full-tube mosaics by height — easier on players):
.venv/bin/python scripts/build_mosaic_movie.py --machine "BW1-4 [AMR-15]" \\
--roi "0.0,0.0,310.0,740.0" --preview
# Metadata is drawn on each frame by default (semi-transparent bar at the top);
# use --no-metadata-overlay to disable.
"""
from __future__ import annotations
import argparse
import csv
import os
import sys
import time
from collections import Counter, defaultdict
from dataclasses import dataclass
from pathlib import Path
import imageio
import numpy as np
from PIL import Image, ImageDraw, ImageFont
class MovieEncodeError(Exception):
"""Raised from encoding helpers; caught by encode_movie for batch-safe handling."""
@dataclass
class EncodedMovieResult:
success: bool
machine: str
roi: tuple[float, float, float, float]
csv_frame_count: int
written: int
missing: int
dropped_read: int
output_path: Path | None
skipped_reason: str | None
size_mb: float | None
elapsed_s: float | None
def sanitize_machine_label(label: str) -> str:
return label.replace("[", "").replace("]", "").replace(" ", "_").strip("_")
def parse_roi(s: str) -> tuple[float, float, float, float]:
parts = [p.strip() for p in s.split(",")]
if len(parts) != 4:
sys.exit("--roi must be four comma-separated numbers: start_x,start_y,end_x,end_y")
try:
return tuple(float(p) for p in parts) # type: ignore[return-value]
except ValueError as e:
sys.exit(f"Invalid --roi numbers: {e}")
def extent_close(
row: dict,
roi: tuple[float, float, float, float],
*,
tol: float = 1e-4,
) -> bool:
keys = ("start_x", "start_y", "end_x", "end_y")
try:
vals = tuple(float(row[k]) for k in keys)
except (KeyError, ValueError):
return False
return all(abs(a - b) < tol for a, b in zip(vals, roi))
def extent_key(row: dict) -> tuple[str, str, str, str]:
"""Stable grouping key from CSV string fields."""
return (
row.get("start_x", "").strip(),
row.get("start_y", "").strip(),
row.get("end_x", "").strip(),
row.get("end_y", "").strip(),
)
def key_to_roi_floats(key: tuple[str, str, str, str]) -> tuple[float, float, float, float]:
return tuple(float(x) for x in key) # type: ignore[return-value]
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description=__doc__)
p.add_argument("--machine", required=True, help='RootView machine label, e.g. "BW1-4 [AMR-15]"')
p.add_argument(
"--roi",
metavar="SX,SY,EX,EY",
help="Restrict to this extent (mm). If omitted, pick the ROI with the most on-disk mosaics.",
)
p.add_argument("--archive", default="archives", type=Path, help="Archive root (default: archives)")
p.add_argument(
"--scans-csv",
default=None,
type=Path,
help="Path to scans.csv (default: <archive>/scans.csv)",
)
p.add_argument(
"--output",
"-o",
type=Path,
default=None,
help="Output .mp4 path (default: <archive>/movies/<machine>/roi_<...>.mp4)",
)
p.add_argument("--fps", type=float, default=10.0, help="Frames per second (default: 10)")
p.add_argument(
"--max-height",
type=int,
default=None,
metavar="PX",
help="Scale each frame so height is at most PX pixels (width keeps aspect); "
"suited to tall full-tube mosaics. Both dimensions are rounded to even pixels for H.264.",
)
p.add_argument(
"--preview",
action="store_true",
help="Shorthand for --max-height 1080 (overridden if --max-height is also set).",
)
p.add_argument(
"--dry-run",
action="store_true",
help="List frames that would be written (no MP4)",
)
p.add_argument(
"--no-metadata-overlay",
action="store_true",
help="Do not draw scan metadata on each frame (default: overlay on).",
)
args = p.parse_args()
if args.preview and args.max_height is None:
args.max_height = 1080
return args
def _csv_required_fieldnames() -> tuple[str, ...]:
return (
"machine",
"scan_id",
"scan_time",
"mosaic_on_disk",
"mosaic_local_path",
"start_x",
"start_y",
"end_x",
"end_y",
)
def validate_scans_csv_header(reader: csv.DictReader, scans_csv: Path) -> None:
if reader.fieldnames is None:
sys.exit(f"Empty CSV: {scans_csv}")
required = _csv_required_fieldnames()
missing = [c for c in required if c not in reader.fieldnames]
if missing:
sys.exit(f"{scans_csv} missing columns: {missing}")
def load_latest_rows(
scans_csv: Path,
machine: str,
roi: tuple[float, float, float, float] | None,
) -> list[dict]:
"""Last row per scan_id for matching machine; mosaic_on_disk True; optional ROI."""
latest: dict[str, dict] = {}
with scans_csv.open(newline="", encoding="utf-8") as fh:
reader = csv.DictReader(fh)
validate_scans_csv_header(reader, scans_csv)
for row in reader:
if row.get("machine", "") != machine:
continue
if row.get("mosaic_on_disk", "").strip() != "True":
continue
if roi is not None and not extent_close(row, roi):
continue
sid = row.get("scan_id", "").strip()
if not sid:
continue
latest[sid] = row
return list(latest.values())
def load_on_disk_rows_by_machine(scans_csv: Path) -> dict[str, list[dict]]:
"""One pass: last row per (machine, scan_id) where mosaic_on_disk True; group by machine."""
latest: dict[tuple[str, str], dict] = {}
with scans_csv.open(newline="", encoding="utf-8") as fh:
reader = csv.DictReader(fh)
validate_scans_csv_header(reader, scans_csv)
for row in reader:
if row.get("mosaic_on_disk", "").strip() != "True":
continue
sid = row.get("scan_id", "").strip()
m = row.get("machine", "").strip()
if not sid or not m:
continue
latest[(m, sid)] = row
by_machine: dict[str, list[dict]] = defaultdict(list)
for (m, _sid), r in latest.items():
by_machine[m].append(r)
return {k: v for k, v in by_machine.items()}
def pick_top_rois(rows: list[dict], n: int) -> list[tuple[tuple[float, float, float, float], int]]:
"""Top n distinct extents by count of deduped rows. Empty if rows empty or n < 1."""
if not rows or n < 1:
return []
counts = Counter(extent_key(r) for r in rows)
return [(key_to_roi_floats(key), cnt) for key, cnt in counts.most_common(n)]
def pick_top_roi(rows: list[dict]) -> tuple[float, float, float, float]:
if not rows:
sys.exit("No rows with mosaic_on_disk=True for this machine (and ROI filter, if any).")
return pick_top_rois(rows, 1)[0][0]
def default_output_path(
archive: Path,
machine: str,
roi: tuple[float, float, float, float],
*,
max_height: int | None,
metadata_overlay: bool,
rank: int | None = None,
) -> Path:
safe = sanitize_machine_label(machine)
sx, sy, ex, ey = roi
base = f"roi_{sx}_{sy}_{ex}_{ey}".replace(" ", "")
if rank is not None:
base = f"{base}_r{rank}"
if max_height is not None:
base = f"{base}_h{max_height}"
if metadata_overlay:
base = f"{base}_meta"
name = f"{base}.mp4"
return archive / "movies" / safe / name
def resolve_mosaic_path(rel: str, archive: Path) -> Path:
"""CSV paths are usually repo-relative, e.g. archives/BW1-4__AMR-15/.../mosaic.jpg."""
p = Path(rel)
if p.is_absolute():
return p.resolve()
ar = archive.resolve()
norm = rel.replace("\\", "/")
if norm.startswith("archives/") or norm.startswith("./archives/"):
return (ar.parent / rel).resolve()
return (ar / rel).resolve()
def even_dimensions(w: int, h: int) -> tuple[int, int]:
"""libx264 requires even width and height."""
w2 = w - (w % 2)
h2 = h - (h % 2)
if w2 < 2 or h2 < 2:
raise MovieEncodeError(f"Frame dimensions too small after evenizing: {w}x{h}")
return w2, h2
def frame_size_mode(paths: list[Path]) -> tuple[int, int]:
sizes: list[tuple[int, int]] = []
for p in paths:
try:
with Image.open(p) as im:
sizes.append(im.size)
except OSError:
continue
if not sizes:
raise MovieEncodeError("No readable mosaic images to determine frame size.")
w, h = Counter(sizes).most_common(1)[0][0]
return even_dimensions(w, h)
def encode_size(native_w: int, native_h: int, max_height: int | None) -> tuple[int, int]:
"""Native size is already even; optional downscale for preview encodes (cap height)."""
if max_height is None:
return native_w, native_h
if max_height < 32:
raise MovieEncodeError("--max-height must be at least 32")
cap = max_height - (max_height % 2)
if cap < 2:
raise MovieEncodeError("--max-height must allow an even height of at least 2")
if native_h <= cap:
return native_w, native_h
new_h = cap
new_w = int(round(native_w * (new_h / native_h)))
new_w -= new_w % 2
if new_w < 2:
raise MovieEncodeError("Computed preview width too small; try a larger --max-height")
return new_w, new_h
def _truetype_font_candidates() -> list[Path]:
windir = os.environ.get("WINDIR", r"C:\Windows")
return [
Path(windir) / "Fonts" / "arial.ttf",
Path(windir) / "Fonts" / "consola.ttf",
Path("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf"),
Path("/usr/share/fonts/truetype/liberation/LiberationSans-Regular.ttf"),
]
def get_overlay_font(size: int) -> ImageFont.FreeTypeFont | ImageFont.ImageFont:
for path in _truetype_font_candidates():
if path.is_file():
try:
return ImageFont.truetype(str(path), size=size)
except OSError:
continue
return ImageFont.load_default()
def _truncate(s: str, max_len: int) -> str:
s = s.strip()
if len(s) <= max_len:
return s
if max_len <= 3:
return s[:max_len]
return s[: max_len - 3] + "..."
def metadata_overlay_lines(row: dict, *, max_name_chars: int) -> list[str]:
sid = row.get("scan_id", "").strip()
st = row.get("scan_time", "").strip()
name = _truncate(row.get("name", "").strip(), max_name_chars)
nx = row.get("nx", "").strip()
ny = row.get("ny", "").strip()
dx = row.get("dx", "").strip()
dy = row.get("dy", "").strip()
lines = row.get("scan_lines", "").strip()
mode = row.get("scan_mode", "").strip()
user = row.get("user", "").strip()
status = row.get("status", "").strip()
sx = row.get("start_x", "").strip()
sy = row.get("start_y", "").strip()
ex = row.get("end_x", "").strip()
ey = row.get("end_y", "").strip()
machine = row.get("machine", "").strip()
grid = f"{nx}x{ny}" if nx and ny else ""
step = f"{dx}x{dy} mm" if dx and dy else ""
geom = " ".join(p for p in (grid, step) if p)
orient = f"{lines} / {mode}" if lines or mode else ""
out: list[str] = []
if machine:
out.append(machine)
if sid or st:
parts = []
if sid:
parts.append(f"id {sid}")
if st:
parts.append(st)
out.append(" ".join(parts))
if name:
out.append(name)
if geom or orient:
out.append(" ".join(p for p in (geom, orient) if p))
if sx and sy and ex and ey:
out.append(f"ROI mm {sx},{sy} .. {ex},{ey}")
if user or status:
tail: list[str] = []
if user:
tail.append(f"user {user}")
if status:
tail.append(status)
out.append(" ".join(tail))
return out if out else ["(no metadata)"]
def draw_metadata_overlay(
rgb: Image.Image,
row: dict,
*,
margin: int,
) -> None:
"""Draw a semi-transparent label block along the top; mutates rgb in place."""
# Panel alpha: ~50% so roots stay visible through the bar.
panel_fill = (0, 0, 0, 128)
panel_outline = (220, 220, 230, 100)
w, h = rgb.size
margin = max(4, min(margin, w // 8))
font_size = max(10, min(22, h // 50))
pad = max(4, font_size // 3)
line_gap = max(2, font_size // 6)
def measure_block(fs: int, name_max: int) -> tuple[ImageFont.FreeTypeFont | ImageFont.ImageFont, list[str], int, int]:
font = get_overlay_font(fs)
lines = metadata_overlay_lines(row, max_name_chars=name_max)
tmp = Image.new("RGB", (1, 1))
draw = ImageDraw.Draw(tmp)
max_tw = 0
total_h = 0
for line in lines:
bbox = draw.textbbox((0, 0), line, font=font)
tw = bbox[2] - bbox[0]
th = bbox[3] - bbox[1]
max_tw = max(max_tw, tw)
total_h += th
if len(lines) > 1:
total_h += line_gap * (len(lines) - 1)
block_w = max_tw + 2 * pad
block_h = total_h + 2 * pad
return font, lines, block_w, block_h
name_max = max(24, w // max(6, font_size // 2))
font, lines, block_w, block_h = measure_block(font_size, name_max)
max_block_w = w - 2 * margin
max_block_h = min(h // 2, h - 2 * margin)
while (block_w > max_block_w or block_h > max_block_h) and font_size > 9:
font_size -= 1
name_max = max(16, w // max(7, font_size // 2))
font, lines, block_w, block_h = measure_block(font_size, name_max)
while block_w > max_block_w and name_max > 12:
name_max -= 4
font, lines, block_w, block_h = measure_block(font_size, name_max)
bw = min(block_w, max_block_w)
bh = min(block_h, max_block_h)
x0 = margin
x1 = x0 + bw
y0 = margin
y1 = y0 + bh
overlay = Image.new("RGBA", (w, h), (0, 0, 0, 0))
draw_ov = ImageDraw.Draw(overlay)
draw_ov.rounded_rectangle(
[x0, y0, x1, y1],
radius=max(4, pad // 2),
fill=panel_fill,
outline=panel_outline,
width=1,
)
base = rgb.convert("RGBA")
composited = Image.alpha_composite(base, overlay)
draw = ImageDraw.Draw(composited)
cx, cy = x0 + pad, y0 + pad
text_bottom_limit = y1 - pad
for line in lines:
bbox = draw.textbbox((0, 0), line, font=font)
th = bbox[3] - bbox[1]
if cy + th > text_bottom_limit:
break
draw.text(
(cx, cy),
line,
fill=(245, 245, 245, 255),
font=font,
stroke_width=1,
stroke_fill=(0, 0, 0, 255),
)
cy += th + line_gap
rgb.paste(composited.convert("RGB"))
def encode_movie(
*,
machine: str,
roi: tuple[float, float, float, float],
rows: list[dict],
archive: Path,
max_height: int | None,
metadata_overlay: bool,
fps: float,
output: Path | None = None,
dry_run: bool = False,
rank: int | None = None,
quiet: bool = False,
) -> EncodedMovieResult:
"""Build MP4 from pre-filtered rows for one ROI. Does not sys.exit on encode failures."""
rows_sorted = sorted(
rows,
key=lambda r: (r.get("scan_time") or "", r.get("scan_id") or ""),
)
csv_frame_count = len(rows_sorted)
row_path_candidates: list[tuple[dict, Path]] = []
for r in rows_sorted:
rel = (r.get("mosaic_local_path") or "").strip()
if not rel:
continue
row_path_candidates.append((r, resolve_mosaic_path(rel, archive)))
out: Path = output or default_output_path(
archive,
machine,
roi,
max_height=max_height,
metadata_overlay=metadata_overlay,
rank=rank,
)
if not quiet:
print(f"Machine: {machine}")
print(f"ROI (mm): {roi[0]}, {roi[1]}, {roi[2]}, {roi[3]}")
print(f"Frames (from CSV, deduped): {csv_frame_count}")
if max_height is not None:
print(f"Preview max height: {max_height}px")
print(f"Metadata overlay: {'on' if metadata_overlay else 'off'}")
on_disk = sum(1 for _r, p in row_path_candidates if p.is_file())
missing_paths = len(row_path_candidates) - on_disk
if dry_run:
if not quiet:
print(f"On-disk files among ordered list: {on_disk} / {len(row_path_candidates)}")
for i, (_r, p) in enumerate(row_path_candidates[:5]):
print(f" [{i}] {p} exists={p.is_file()}")
if len(row_path_candidates) > 10:
print(" ...")
start = max(0, len(row_path_candidates) - 3)
for i, (_r, p) in enumerate(row_path_candidates[start:], start=start):
print(f" [{i}] {p} exists={p.is_file()}")
return EncodedMovieResult(
success=True,
machine=machine,
roi=roi,
csv_frame_count=csv_frame_count,
written=0,
missing=missing_paths,
dropped_read=0,
output_path=out,
skipped_reason=None,
size_mb=None,
elapsed_s=None,
)
row_path_pairs = [(r, p) for r, p in row_path_candidates if p.is_file()]
if not row_path_pairs:
if not quiet:
print("No mosaic files on disk for the selected rows.")
return EncodedMovieResult(
success=False,
machine=machine,
roi=roi,
csv_frame_count=csv_frame_count,
written=0,
missing=missing_paths,
dropped_read=0,
output_path=out,
skipped_reason="No mosaic files on disk for the selected rows.",
size_mb=None,
elapsed_s=None,
)
t0 = time.perf_counter()
try:
ordered_paths = [p for _r, p in row_path_pairs]
target_w, target_h = frame_size_mode(ordered_paths)
enc_w, enc_h = encode_size(target_w, target_h, max_height)
except MovieEncodeError as e:
if not quiet:
print(str(e))
return EncodedMovieResult(
success=False,
machine=machine,
roi=roi,
csv_frame_count=csv_frame_count,
written=0,
missing=missing_paths,
dropped_read=0,
output_path=out,
skipped_reason=str(e),
size_mb=None,
elapsed_s=None,
)
if not quiet:
print(f"Target frame size (mode): {target_w} x {target_h}")
if (enc_w, enc_h) != (target_w, target_h):
print(f"Encode size (after max-height): {enc_w} x {enc_h}")
out.parent.mkdir(parents=True, exist_ok=True)
written = 0
dropped = 0
resized = 0
scaled_preview = 0
try:
writer = imageio.get_writer(
str(out),
fps=float(fps),
codec="libx264",
quality=8,
macro_block_size=1,
)
try:
for row, p in row_path_pairs:
try:
with Image.open(p) as im:
rgb = im.convert("RGB")
if rgb.size != (target_w, target_h):
rgb = rgb.resize((target_w, target_h), Image.Resampling.LANCZOS)
resized += 1
if rgb.size != (enc_w, enc_h):
rgb = rgb.resize((enc_w, enc_h), Image.Resampling.LANCZOS)
scaled_preview += 1
if metadata_overlay:
draw_metadata_overlay(rgb, row, margin=max(6, enc_w // 80))
frame = np.asarray(rgb)
writer.append_data(frame)
written += 1
except OSError:
dropped += 1
finally:
writer.close()
except Exception as e:
if not quiet:
print(f"Encode error: {e}")
return EncodedMovieResult(
success=False,
machine=machine,
roi=roi,
csv_frame_count=csv_frame_count,
written=written,
missing=missing_paths,
dropped_read=dropped,
output_path=out,
skipped_reason=f"encode_error: {e}",
size_mb=None,
elapsed_s=None,
)
elapsed = time.perf_counter() - t0
size_mb = out.stat().st_size / (1024 * 1024) if out.is_file() else 0.0
if not quiet:
print(
f"Written: {written} frames (normalized to mode: {resized}, "
f"preview scale: {scaled_preview})"
)
print(f"Dropped (read error): {dropped}")
print(f"Missing paths (not on disk): {missing_paths}")
print(f"Output: {out.resolve()} ({size_mb:.2f} MB)")
return EncodedMovieResult(
success=True,
machine=machine,
roi=roi,
csv_frame_count=csv_frame_count,
written=written,
missing=missing_paths,
dropped_read=dropped,
output_path=out,
skipped_reason=None,
size_mb=size_mb,
elapsed_s=elapsed,
)
def main() -> None:
args = parse_args()
archive: Path = args.archive
scans_csv: Path = args.scans_csv or (archive / "scans.csv")
if not scans_csv.is_file():
sys.exit(f"scans.csv not found: {scans_csv}")
roi_sel: tuple[float, float, float, float] | None
if args.roi:
roi_sel = parse_roi(args.roi)
else:
roi_sel = None
rows = load_latest_rows(scans_csv, args.machine, roi_sel)
if roi_sel is None:
roi_sel = pick_top_roi(rows)
rows = [r for r in rows if extent_close(r, roi_sel)]
assert roi_sel is not None
max_height: int | None = args.max_height
metadata_overlay = not args.no_metadata_overlay
res = encode_movie(
machine=args.machine,
roi=roi_sel,
rows=rows,
archive=archive,
max_height=max_height,
metadata_overlay=metadata_overlay,
fps=float(args.fps),
output=args.output,
dry_run=bool(args.dry_run),
rank=None,
quiet=False,
)
if not res.success and not args.dry_run:
sys.exit(1 if res.skipped_reason else 1)
if __name__ == "__main__":
main()
-269
View File
@@ -1,269 +0,0 @@
#!/usr/bin/env python3
"""
Build preview MP4s for the top N ROIs per machine (default N=2, max-height 1080).
Reads archives/scans.csv once, groups on-disk mosaic rows by machine, then for
each machine picks the most frequent ROI extents and calls encode_movie().
Usage:
python scripts/build_mosaic_movies_batch.py
python scripts/build_mosaic_movies_batch.py --dry-run
python scripts/build_mosaic_movies_batch.py --skip-existing
python scripts/build_mosaic_movies_batch.py --machine "BW2-10 [AMR-22]"
python scripts/build_mosaic_movies_batch.py --full-res # no max-height cap
"""
from __future__ import annotations
import argparse
import sys
import time
from dataclasses import dataclass
from pathlib import Path
_SCRIPTS_DIR = Path(__file__).resolve().parent
# Import sibling module (run as python scripts/build_mosaic_movies_batch.py from repo root)
sys.path.insert(0, str(_SCRIPTS_DIR))
import build_mosaic_movie as bmm # noqa: E402
def read_machine_labels(path: Path) -> list[str]:
out: list[str] = []
with path.open(encoding="utf-8") as fh:
for line in fh:
s = line.strip()
if not s or s.startswith("#"):
continue
out.append(s)
return out
@dataclass
class Job:
machine: str
rank: int
roi: tuple[float, float, float, float]
extent_count: int
rows: list[dict]
output_path: Path
def collect_jobs(
*,
machines: list[str],
by_machine: dict[str, list[dict]],
archive: Path,
top_rois: int,
max_height: int | None,
metadata_overlay: bool,
) -> list[Job]:
jobs: list[Job] = []
for machine in machines:
rows = by_machine.get(machine, [])
if not rows:
continue
picks = bmm.pick_top_rois(rows, top_rois)
for rank, (roi, extent_count) in enumerate(picks, start=1):
rows_roi = [r for r in rows if bmm.extent_close(r, roi)]
out = bmm.default_output_path(
archive,
machine,
roi,
max_height=max_height,
metadata_overlay=metadata_overlay,
rank=rank,
)
jobs.append(
Job(
machine=machine,
rank=rank,
roi=roi,
extent_count=extent_count,
rows=rows_roi,
output_path=out,
)
)
return jobs
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description=__doc__)
p.add_argument(
"--machines-file",
type=Path,
default=_SCRIPTS_DIR / "machines.example.txt",
help="One machine label per line (default: scripts/machines.example.txt next to this script)",
)
p.add_argument(
"--machine",
metavar="LABEL",
help='If set, only this machine (overrides list to a single job set), e.g. "BW2-10 [AMR-22]"',
)
p.add_argument("--archive", type=Path, default=Path("archives"))
p.add_argument("--scans-csv", type=Path, default=None)
p.add_argument("--top-rois", type=int, default=2, help="How many top extents per machine (default: 2)")
p.add_argument("--max-height", type=int, default=1080, help="Preview cap in px (default: 1080)")
p.add_argument(
"--full-res",
action="store_true",
help="Disable max-height cap (full mosaic resolution; can be huge)",
)
p.add_argument("--fps", type=float, default=10.0)
p.add_argument("--dry-run", action="store_true")
p.add_argument(
"--skip-existing",
action="store_true",
help="Skip encode if output MP4 exists and is non-empty",
)
p.add_argument("--no-metadata-overlay", action="store_true")
args = p.parse_args()
if args.full_res:
args.max_height = None
return args
def main() -> None:
args = parse_args()
archive: Path = args.archive
scans_csv: Path = args.scans_csv or (archive / "scans.csv")
if not scans_csv.is_file():
sys.exit(f"scans.csv not found: {scans_csv}")
if args.machine:
machines = [args.machine.strip()]
else:
if not args.machines_file.is_file():
sys.exit(f"Machines file not found: {args.machines_file}")
machines = read_machine_labels(args.machines_file)
if not machines:
sys.exit(f"No machine labels in {args.machines_file}")
t_load0 = time.perf_counter()
by_machine = bmm.load_on_disk_rows_by_machine(scans_csv)
load_s = time.perf_counter() - t_load0
max_height: int | None = args.max_height
metadata_overlay = not args.no_metadata_overlay
jobs = collect_jobs(
machines=machines,
by_machine=by_machine,
archive=archive,
top_rois=args.top_rois,
max_height=max_height,
metadata_overlay=metadata_overlay,
)
total = len(jobs)
if total == 0:
sys.exit("No jobs (no on-disk mosaics for selected machines).")
print(f"Loaded scans.csv grouped by machine in {load_s:.2f}s ({total} job(s))")
if max_height is not None:
print(f"Max height: {max_height}px")
else:
print("Max height: (full resolution)")
print(f"Metadata overlay: {'on' if metadata_overlay else 'off'}")
print()
summary_rows: list[tuple[str, ...]] = []
for idx, job in enumerate(jobs, start=1):
sx, sy, ex, ey = job.roi
roi_s = f"{sx},{sy}..{ex},{ey}"
print(
f"[{idx}/{total}] {job.machine} rank={job.rank} ROI {roi_s} "
f"({job.extent_count} CSV rows this extent, {len(job.rows)} deduped rows)"
)
if args.skip_existing and job.output_path.is_file() and job.output_path.stat().st_size > 0:
sz = job.output_path.stat().st_size / (1024 * 1024)
print(f" SKIP (exists): {job.output_path}")
summary_rows.append(
(
job.machine,
str(job.rank),
roi_s,
str(len(job.rows)),
"-",
"-",
str(job.output_path),
"SKIP (exists)",
f"{sz:.2f}",
"-",
)
)
continue
enc_t0 = time.perf_counter()
res = bmm.encode_movie(
machine=job.machine,
roi=job.roi,
rows=job.rows,
archive=archive,
max_height=max_height,
metadata_overlay=metadata_overlay,
fps=float(args.fps),
output=None,
dry_run=args.dry_run,
rank=job.rank,
quiet=True,
)
enc_elapsed = time.perf_counter() - enc_t0
if res.success:
if args.dry_run:
print(f" dry-run OK -> {res.output_path} (missing files: {res.missing})")
else:
print(
f" OK {res.written} frames "
f"{(res.size_mb or 0):.2f} MB {enc_elapsed:.1f}s"
)
else:
print(f" FAIL {res.skipped_reason or 'unknown'}")
status = "OK" if res.success else "FAIL"
if not res.success and res.skipped_reason:
status = f"FAIL: {res.skipped_reason[:40]}"
mb = f"{res.size_mb:.2f}" if res.size_mb is not None else "-"
es = f"{enc_elapsed:.1f}" if not args.dry_run else "-"
w = str(res.written) if not args.dry_run else "0"
if args.dry_run:
status = "dry-run"
mb = "-"
summary_rows.append(
(
job.machine,
str(job.rank),
roi_s,
str(len(job.rows)),
w,
str(res.missing),
str(res.output_path or job.output_path),
status,
mb,
es,
)
)
print()
print("=" * 120)
hdr = (
f"{'machine':<26} {'rk':>2} {'ROI (mm)':<36} {'csv':>4} {'out':>5} "
f"{'miss':>4} {'MB':>7} {'s':>6} {'status':<22}"
)
print(hdr)
print("-" * 120)
for row in summary_rows:
m, rk, roi_s, csv_n, wrt, miss, path, status, mb, es = row
print(
f"{m:<26} {rk:>2} {roi_s:<36} {csv_n:>4} {wrt:>5} {miss:>4} "
f"{mb:>7} {es:>6} {status:<22}"
)
print(f" -> {path}")
print("=" * 120)
if __name__ == "__main__":
main()