Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 655c376a97 | |||
| 1ef9e0206c |
@@ -6,3 +6,5 @@ tqdm>=4.66.0
|
||||
piexif>=1.1.3
|
||||
Pillow>=10.0.0
|
||||
pytest>=8.0
|
||||
imageio>=2.34
|
||||
imageio-ffmpeg>=0.5
|
||||
|
||||
@@ -0,0 +1,710 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Build a chronological MP4 from downloaded mosaic.jpg files for one machine ROI.
|
||||
|
||||
Reads archives/scans.csv, filters by machine and mosaic_on_disk, optionally
|
||||
restricts to one (start_x, start_y, end_x, end_y) ROI, dedupes by scan_id
|
||||
(last row wins), sorts by scan_time, and encodes frames with imageio/ffmpeg.
|
||||
|
||||
Usage:
|
||||
.venv/bin/python scripts/build_mosaic_movie.py --machine "BW1-4 [AMR-15]"
|
||||
.venv/bin/python scripts/build_mosaic_movie.py --machine "BW1-4 [AMR-15]" \\
|
||||
--roi "195.65,219.22,219.73,235.04" --fps 8 --output /tmp/out.mp4
|
||||
.venv/bin/python scripts/build_mosaic_movie.py --machine "BW1-4 [AMR-15]" --dry-run
|
||||
# Lighter preview (caps tall full-tube mosaics by height — easier on players):
|
||||
.venv/bin/python scripts/build_mosaic_movie.py --machine "BW1-4 [AMR-15]" \\
|
||||
--roi "0.0,0.0,310.0,740.0" --preview
|
||||
# Metadata is drawn on each frame by default (semi-transparent bar at the top);
|
||||
# use --no-metadata-overlay to disable.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from collections import Counter, defaultdict
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
import imageio
|
||||
import numpy as np
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
|
||||
|
||||
class MovieEncodeError(Exception):
|
||||
"""Raised from encoding helpers; caught by encode_movie for batch-safe handling."""
|
||||
|
||||
|
||||
@dataclass
|
||||
class EncodedMovieResult:
|
||||
success: bool
|
||||
machine: str
|
||||
roi: tuple[float, float, float, float]
|
||||
csv_frame_count: int
|
||||
written: int
|
||||
missing: int
|
||||
dropped_read: int
|
||||
output_path: Path | None
|
||||
skipped_reason: str | None
|
||||
size_mb: float | None
|
||||
elapsed_s: float | None
|
||||
|
||||
|
||||
def sanitize_machine_label(label: str) -> str:
|
||||
return label.replace("[", "").replace("]", "").replace(" ", "_").strip("_")
|
||||
|
||||
|
||||
def parse_roi(s: str) -> tuple[float, float, float, float]:
|
||||
parts = [p.strip() for p in s.split(",")]
|
||||
if len(parts) != 4:
|
||||
sys.exit("--roi must be four comma-separated numbers: start_x,start_y,end_x,end_y")
|
||||
try:
|
||||
return tuple(float(p) for p in parts) # type: ignore[return-value]
|
||||
except ValueError as e:
|
||||
sys.exit(f"Invalid --roi numbers: {e}")
|
||||
|
||||
|
||||
def extent_close(
|
||||
row: dict,
|
||||
roi: tuple[float, float, float, float],
|
||||
*,
|
||||
tol: float = 1e-4,
|
||||
) -> bool:
|
||||
keys = ("start_x", "start_y", "end_x", "end_y")
|
||||
try:
|
||||
vals = tuple(float(row[k]) for k in keys)
|
||||
except (KeyError, ValueError):
|
||||
return False
|
||||
return all(abs(a - b) < tol for a, b in zip(vals, roi))
|
||||
|
||||
|
||||
def extent_key(row: dict) -> tuple[str, str, str, str]:
|
||||
"""Stable grouping key from CSV string fields."""
|
||||
return (
|
||||
row.get("start_x", "").strip(),
|
||||
row.get("start_y", "").strip(),
|
||||
row.get("end_x", "").strip(),
|
||||
row.get("end_y", "").strip(),
|
||||
)
|
||||
|
||||
|
||||
def key_to_roi_floats(key: tuple[str, str, str, str]) -> tuple[float, float, float, float]:
|
||||
return tuple(float(x) for x in key) # type: ignore[return-value]
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
p = argparse.ArgumentParser(description=__doc__)
|
||||
p.add_argument("--machine", required=True, help='RootView machine label, e.g. "BW1-4 [AMR-15]"')
|
||||
p.add_argument(
|
||||
"--roi",
|
||||
metavar="SX,SY,EX,EY",
|
||||
help="Restrict to this extent (mm). If omitted, pick the ROI with the most on-disk mosaics.",
|
||||
)
|
||||
p.add_argument("--archive", default="archives", type=Path, help="Archive root (default: archives)")
|
||||
p.add_argument(
|
||||
"--scans-csv",
|
||||
default=None,
|
||||
type=Path,
|
||||
help="Path to scans.csv (default: <archive>/scans.csv)",
|
||||
)
|
||||
p.add_argument(
|
||||
"--output",
|
||||
"-o",
|
||||
type=Path,
|
||||
default=None,
|
||||
help="Output .mp4 path (default: <archive>/movies/<machine>/roi_<...>.mp4)",
|
||||
)
|
||||
p.add_argument("--fps", type=float, default=10.0, help="Frames per second (default: 10)")
|
||||
p.add_argument(
|
||||
"--max-height",
|
||||
type=int,
|
||||
default=None,
|
||||
metavar="PX",
|
||||
help="Scale each frame so height is at most PX pixels (width keeps aspect); "
|
||||
"suited to tall full-tube mosaics. Both dimensions are rounded to even pixels for H.264.",
|
||||
)
|
||||
p.add_argument(
|
||||
"--preview",
|
||||
action="store_true",
|
||||
help="Shorthand for --max-height 1080 (overridden if --max-height is also set).",
|
||||
)
|
||||
p.add_argument(
|
||||
"--dry-run",
|
||||
action="store_true",
|
||||
help="List frames that would be written (no MP4)",
|
||||
)
|
||||
p.add_argument(
|
||||
"--no-metadata-overlay",
|
||||
action="store_true",
|
||||
help="Do not draw scan metadata on each frame (default: overlay on).",
|
||||
)
|
||||
args = p.parse_args()
|
||||
if args.preview and args.max_height is None:
|
||||
args.max_height = 1080
|
||||
return args
|
||||
|
||||
|
||||
def _csv_required_fieldnames() -> tuple[str, ...]:
|
||||
return (
|
||||
"machine",
|
||||
"scan_id",
|
||||
"scan_time",
|
||||
"mosaic_on_disk",
|
||||
"mosaic_local_path",
|
||||
"start_x",
|
||||
"start_y",
|
||||
"end_x",
|
||||
"end_y",
|
||||
)
|
||||
|
||||
|
||||
def validate_scans_csv_header(reader: csv.DictReader, scans_csv: Path) -> None:
|
||||
if reader.fieldnames is None:
|
||||
sys.exit(f"Empty CSV: {scans_csv}")
|
||||
required = _csv_required_fieldnames()
|
||||
missing = [c for c in required if c not in reader.fieldnames]
|
||||
if missing:
|
||||
sys.exit(f"{scans_csv} missing columns: {missing}")
|
||||
|
||||
|
||||
def load_latest_rows(
|
||||
scans_csv: Path,
|
||||
machine: str,
|
||||
roi: tuple[float, float, float, float] | None,
|
||||
) -> list[dict]:
|
||||
"""Last row per scan_id for matching machine; mosaic_on_disk True; optional ROI."""
|
||||
latest: dict[str, dict] = {}
|
||||
with scans_csv.open(newline="", encoding="utf-8") as fh:
|
||||
reader = csv.DictReader(fh)
|
||||
validate_scans_csv_header(reader, scans_csv)
|
||||
|
||||
for row in reader:
|
||||
if row.get("machine", "") != machine:
|
||||
continue
|
||||
if row.get("mosaic_on_disk", "").strip() != "True":
|
||||
continue
|
||||
if roi is not None and not extent_close(row, roi):
|
||||
continue
|
||||
sid = row.get("scan_id", "").strip()
|
||||
if not sid:
|
||||
continue
|
||||
latest[sid] = row
|
||||
|
||||
return list(latest.values())
|
||||
|
||||
|
||||
def load_on_disk_rows_by_machine(scans_csv: Path) -> dict[str, list[dict]]:
|
||||
"""One pass: last row per (machine, scan_id) where mosaic_on_disk True; group by machine."""
|
||||
latest: dict[tuple[str, str], dict] = {}
|
||||
with scans_csv.open(newline="", encoding="utf-8") as fh:
|
||||
reader = csv.DictReader(fh)
|
||||
validate_scans_csv_header(reader, scans_csv)
|
||||
for row in reader:
|
||||
if row.get("mosaic_on_disk", "").strip() != "True":
|
||||
continue
|
||||
sid = row.get("scan_id", "").strip()
|
||||
m = row.get("machine", "").strip()
|
||||
if not sid or not m:
|
||||
continue
|
||||
latest[(m, sid)] = row
|
||||
|
||||
by_machine: dict[str, list[dict]] = defaultdict(list)
|
||||
for (m, _sid), r in latest.items():
|
||||
by_machine[m].append(r)
|
||||
return {k: v for k, v in by_machine.items()}
|
||||
|
||||
|
||||
def pick_top_rois(rows: list[dict], n: int) -> list[tuple[tuple[float, float, float, float], int]]:
|
||||
"""Top n distinct extents by count of deduped rows. Empty if rows empty or n < 1."""
|
||||
if not rows or n < 1:
|
||||
return []
|
||||
counts = Counter(extent_key(r) for r in rows)
|
||||
return [(key_to_roi_floats(key), cnt) for key, cnt in counts.most_common(n)]
|
||||
|
||||
|
||||
def pick_top_roi(rows: list[dict]) -> tuple[float, float, float, float]:
|
||||
if not rows:
|
||||
sys.exit("No rows with mosaic_on_disk=True for this machine (and ROI filter, if any).")
|
||||
return pick_top_rois(rows, 1)[0][0]
|
||||
|
||||
|
||||
def default_output_path(
|
||||
archive: Path,
|
||||
machine: str,
|
||||
roi: tuple[float, float, float, float],
|
||||
*,
|
||||
max_height: int | None,
|
||||
metadata_overlay: bool,
|
||||
rank: int | None = None,
|
||||
) -> Path:
|
||||
safe = sanitize_machine_label(machine)
|
||||
sx, sy, ex, ey = roi
|
||||
base = f"roi_{sx}_{sy}_{ex}_{ey}".replace(" ", "")
|
||||
if rank is not None:
|
||||
base = f"{base}_r{rank}"
|
||||
if max_height is not None:
|
||||
base = f"{base}_h{max_height}"
|
||||
if metadata_overlay:
|
||||
base = f"{base}_meta"
|
||||
name = f"{base}.mp4"
|
||||
return archive / "movies" / safe / name
|
||||
|
||||
|
||||
def resolve_mosaic_path(rel: str, archive: Path) -> Path:
|
||||
"""CSV paths are usually repo-relative, e.g. archives/BW1-4__AMR-15/.../mosaic.jpg."""
|
||||
p = Path(rel)
|
||||
if p.is_absolute():
|
||||
return p.resolve()
|
||||
ar = archive.resolve()
|
||||
norm = rel.replace("\\", "/")
|
||||
if norm.startswith("archives/") or norm.startswith("./archives/"):
|
||||
return (ar.parent / rel).resolve()
|
||||
return (ar / rel).resolve()
|
||||
|
||||
|
||||
def even_dimensions(w: int, h: int) -> tuple[int, int]:
|
||||
"""libx264 requires even width and height."""
|
||||
w2 = w - (w % 2)
|
||||
h2 = h - (h % 2)
|
||||
if w2 < 2 or h2 < 2:
|
||||
raise MovieEncodeError(f"Frame dimensions too small after evenizing: {w}x{h}")
|
||||
return w2, h2
|
||||
|
||||
|
||||
def frame_size_mode(paths: list[Path]) -> tuple[int, int]:
|
||||
sizes: list[tuple[int, int]] = []
|
||||
for p in paths:
|
||||
try:
|
||||
with Image.open(p) as im:
|
||||
sizes.append(im.size)
|
||||
except OSError:
|
||||
continue
|
||||
if not sizes:
|
||||
raise MovieEncodeError("No readable mosaic images to determine frame size.")
|
||||
w, h = Counter(sizes).most_common(1)[0][0]
|
||||
return even_dimensions(w, h)
|
||||
|
||||
|
||||
def encode_size(native_w: int, native_h: int, max_height: int | None) -> tuple[int, int]:
|
||||
"""Native size is already even; optional downscale for preview encodes (cap height)."""
|
||||
if max_height is None:
|
||||
return native_w, native_h
|
||||
if max_height < 32:
|
||||
raise MovieEncodeError("--max-height must be at least 32")
|
||||
cap = max_height - (max_height % 2)
|
||||
if cap < 2:
|
||||
raise MovieEncodeError("--max-height must allow an even height of at least 2")
|
||||
if native_h <= cap:
|
||||
return native_w, native_h
|
||||
new_h = cap
|
||||
new_w = int(round(native_w * (new_h / native_h)))
|
||||
new_w -= new_w % 2
|
||||
if new_w < 2:
|
||||
raise MovieEncodeError("Computed preview width too small; try a larger --max-height")
|
||||
return new_w, new_h
|
||||
|
||||
|
||||
def _truetype_font_candidates() -> list[Path]:
|
||||
windir = os.environ.get("WINDIR", r"C:\Windows")
|
||||
return [
|
||||
Path(windir) / "Fonts" / "arial.ttf",
|
||||
Path(windir) / "Fonts" / "consola.ttf",
|
||||
Path("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf"),
|
||||
Path("/usr/share/fonts/truetype/liberation/LiberationSans-Regular.ttf"),
|
||||
]
|
||||
|
||||
|
||||
def get_overlay_font(size: int) -> ImageFont.FreeTypeFont | ImageFont.ImageFont:
|
||||
for path in _truetype_font_candidates():
|
||||
if path.is_file():
|
||||
try:
|
||||
return ImageFont.truetype(str(path), size=size)
|
||||
except OSError:
|
||||
continue
|
||||
return ImageFont.load_default()
|
||||
|
||||
|
||||
def _truncate(s: str, max_len: int) -> str:
|
||||
s = s.strip()
|
||||
if len(s) <= max_len:
|
||||
return s
|
||||
if max_len <= 3:
|
||||
return s[:max_len]
|
||||
return s[: max_len - 3] + "..."
|
||||
|
||||
|
||||
def metadata_overlay_lines(row: dict, *, max_name_chars: int) -> list[str]:
|
||||
sid = row.get("scan_id", "").strip()
|
||||
st = row.get("scan_time", "").strip()
|
||||
name = _truncate(row.get("name", "").strip(), max_name_chars)
|
||||
nx = row.get("nx", "").strip()
|
||||
ny = row.get("ny", "").strip()
|
||||
dx = row.get("dx", "").strip()
|
||||
dy = row.get("dy", "").strip()
|
||||
lines = row.get("scan_lines", "").strip()
|
||||
mode = row.get("scan_mode", "").strip()
|
||||
user = row.get("user", "").strip()
|
||||
status = row.get("status", "").strip()
|
||||
sx = row.get("start_x", "").strip()
|
||||
sy = row.get("start_y", "").strip()
|
||||
ex = row.get("end_x", "").strip()
|
||||
ey = row.get("end_y", "").strip()
|
||||
machine = row.get("machine", "").strip()
|
||||
|
||||
grid = f"{nx}x{ny}" if nx and ny else ""
|
||||
step = f"{dx}x{dy} mm" if dx and dy else ""
|
||||
geom = " ".join(p for p in (grid, step) if p)
|
||||
orient = f"{lines} / {mode}" if lines or mode else ""
|
||||
|
||||
out: list[str] = []
|
||||
if machine:
|
||||
out.append(machine)
|
||||
if sid or st:
|
||||
parts = []
|
||||
if sid:
|
||||
parts.append(f"id {sid}")
|
||||
if st:
|
||||
parts.append(st)
|
||||
out.append(" ".join(parts))
|
||||
if name:
|
||||
out.append(name)
|
||||
if geom or orient:
|
||||
out.append(" ".join(p for p in (geom, orient) if p))
|
||||
if sx and sy and ex and ey:
|
||||
out.append(f"ROI mm {sx},{sy} .. {ex},{ey}")
|
||||
if user or status:
|
||||
tail: list[str] = []
|
||||
if user:
|
||||
tail.append(f"user {user}")
|
||||
if status:
|
||||
tail.append(status)
|
||||
out.append(" ".join(tail))
|
||||
return out if out else ["(no metadata)"]
|
||||
|
||||
|
||||
def draw_metadata_overlay(
|
||||
rgb: Image.Image,
|
||||
row: dict,
|
||||
*,
|
||||
margin: int,
|
||||
) -> None:
|
||||
"""Draw a semi-transparent label block along the top; mutates rgb in place."""
|
||||
# Panel alpha: ~50% so roots stay visible through the bar.
|
||||
panel_fill = (0, 0, 0, 128)
|
||||
panel_outline = (220, 220, 230, 100)
|
||||
|
||||
w, h = rgb.size
|
||||
margin = max(4, min(margin, w // 8))
|
||||
font_size = max(10, min(22, h // 50))
|
||||
pad = max(4, font_size // 3)
|
||||
line_gap = max(2, font_size // 6)
|
||||
|
||||
def measure_block(fs: int, name_max: int) -> tuple[ImageFont.FreeTypeFont | ImageFont.ImageFont, list[str], int, int]:
|
||||
font = get_overlay_font(fs)
|
||||
lines = metadata_overlay_lines(row, max_name_chars=name_max)
|
||||
tmp = Image.new("RGB", (1, 1))
|
||||
draw = ImageDraw.Draw(tmp)
|
||||
max_tw = 0
|
||||
total_h = 0
|
||||
for line in lines:
|
||||
bbox = draw.textbbox((0, 0), line, font=font)
|
||||
tw = bbox[2] - bbox[0]
|
||||
th = bbox[3] - bbox[1]
|
||||
max_tw = max(max_tw, tw)
|
||||
total_h += th
|
||||
if len(lines) > 1:
|
||||
total_h += line_gap * (len(lines) - 1)
|
||||
block_w = max_tw + 2 * pad
|
||||
block_h = total_h + 2 * pad
|
||||
return font, lines, block_w, block_h
|
||||
|
||||
name_max = max(24, w // max(6, font_size // 2))
|
||||
font, lines, block_w, block_h = measure_block(font_size, name_max)
|
||||
max_block_w = w - 2 * margin
|
||||
max_block_h = min(h // 2, h - 2 * margin)
|
||||
while (block_w > max_block_w or block_h > max_block_h) and font_size > 9:
|
||||
font_size -= 1
|
||||
name_max = max(16, w // max(7, font_size // 2))
|
||||
font, lines, block_w, block_h = measure_block(font_size, name_max)
|
||||
while block_w > max_block_w and name_max > 12:
|
||||
name_max -= 4
|
||||
font, lines, block_w, block_h = measure_block(font_size, name_max)
|
||||
|
||||
bw = min(block_w, max_block_w)
|
||||
bh = min(block_h, max_block_h)
|
||||
x0 = margin
|
||||
x1 = x0 + bw
|
||||
y0 = margin
|
||||
y1 = y0 + bh
|
||||
|
||||
overlay = Image.new("RGBA", (w, h), (0, 0, 0, 0))
|
||||
draw_ov = ImageDraw.Draw(overlay)
|
||||
draw_ov.rounded_rectangle(
|
||||
[x0, y0, x1, y1],
|
||||
radius=max(4, pad // 2),
|
||||
fill=panel_fill,
|
||||
outline=panel_outline,
|
||||
width=1,
|
||||
)
|
||||
|
||||
base = rgb.convert("RGBA")
|
||||
composited = Image.alpha_composite(base, overlay)
|
||||
draw = ImageDraw.Draw(composited)
|
||||
|
||||
cx, cy = x0 + pad, y0 + pad
|
||||
text_bottom_limit = y1 - pad
|
||||
for line in lines:
|
||||
bbox = draw.textbbox((0, 0), line, font=font)
|
||||
th = bbox[3] - bbox[1]
|
||||
if cy + th > text_bottom_limit:
|
||||
break
|
||||
draw.text(
|
||||
(cx, cy),
|
||||
line,
|
||||
fill=(245, 245, 245, 255),
|
||||
font=font,
|
||||
stroke_width=1,
|
||||
stroke_fill=(0, 0, 0, 255),
|
||||
)
|
||||
cy += th + line_gap
|
||||
|
||||
rgb.paste(composited.convert("RGB"))
|
||||
|
||||
|
||||
def encode_movie(
|
||||
*,
|
||||
machine: str,
|
||||
roi: tuple[float, float, float, float],
|
||||
rows: list[dict],
|
||||
archive: Path,
|
||||
max_height: int | None,
|
||||
metadata_overlay: bool,
|
||||
fps: float,
|
||||
output: Path | None = None,
|
||||
dry_run: bool = False,
|
||||
rank: int | None = None,
|
||||
quiet: bool = False,
|
||||
) -> EncodedMovieResult:
|
||||
"""Build MP4 from pre-filtered rows for one ROI. Does not sys.exit on encode failures."""
|
||||
rows_sorted = sorted(
|
||||
rows,
|
||||
key=lambda r: (r.get("scan_time") or "", r.get("scan_id") or ""),
|
||||
)
|
||||
csv_frame_count = len(rows_sorted)
|
||||
|
||||
row_path_candidates: list[tuple[dict, Path]] = []
|
||||
for r in rows_sorted:
|
||||
rel = (r.get("mosaic_local_path") or "").strip()
|
||||
if not rel:
|
||||
continue
|
||||
row_path_candidates.append((r, resolve_mosaic_path(rel, archive)))
|
||||
|
||||
out: Path = output or default_output_path(
|
||||
archive,
|
||||
machine,
|
||||
roi,
|
||||
max_height=max_height,
|
||||
metadata_overlay=metadata_overlay,
|
||||
rank=rank,
|
||||
)
|
||||
|
||||
if not quiet:
|
||||
print(f"Machine: {machine}")
|
||||
print(f"ROI (mm): {roi[0]}, {roi[1]}, {roi[2]}, {roi[3]}")
|
||||
print(f"Frames (from CSV, deduped): {csv_frame_count}")
|
||||
if max_height is not None:
|
||||
print(f"Preview max height: {max_height}px")
|
||||
print(f"Metadata overlay: {'on' if metadata_overlay else 'off'}")
|
||||
|
||||
on_disk = sum(1 for _r, p in row_path_candidates if p.is_file())
|
||||
missing_paths = len(row_path_candidates) - on_disk
|
||||
|
||||
if dry_run:
|
||||
if not quiet:
|
||||
print(f"On-disk files among ordered list: {on_disk} / {len(row_path_candidates)}")
|
||||
for i, (_r, p) in enumerate(row_path_candidates[:5]):
|
||||
print(f" [{i}] {p} exists={p.is_file()}")
|
||||
if len(row_path_candidates) > 10:
|
||||
print(" ...")
|
||||
start = max(0, len(row_path_candidates) - 3)
|
||||
for i, (_r, p) in enumerate(row_path_candidates[start:], start=start):
|
||||
print(f" [{i}] {p} exists={p.is_file()}")
|
||||
return EncodedMovieResult(
|
||||
success=True,
|
||||
machine=machine,
|
||||
roi=roi,
|
||||
csv_frame_count=csv_frame_count,
|
||||
written=0,
|
||||
missing=missing_paths,
|
||||
dropped_read=0,
|
||||
output_path=out,
|
||||
skipped_reason=None,
|
||||
size_mb=None,
|
||||
elapsed_s=None,
|
||||
)
|
||||
|
||||
row_path_pairs = [(r, p) for r, p in row_path_candidates if p.is_file()]
|
||||
if not row_path_pairs:
|
||||
if not quiet:
|
||||
print("No mosaic files on disk for the selected rows.")
|
||||
return EncodedMovieResult(
|
||||
success=False,
|
||||
machine=machine,
|
||||
roi=roi,
|
||||
csv_frame_count=csv_frame_count,
|
||||
written=0,
|
||||
missing=missing_paths,
|
||||
dropped_read=0,
|
||||
output_path=out,
|
||||
skipped_reason="No mosaic files on disk for the selected rows.",
|
||||
size_mb=None,
|
||||
elapsed_s=None,
|
||||
)
|
||||
|
||||
t0 = time.perf_counter()
|
||||
try:
|
||||
ordered_paths = [p for _r, p in row_path_pairs]
|
||||
target_w, target_h = frame_size_mode(ordered_paths)
|
||||
enc_w, enc_h = encode_size(target_w, target_h, max_height)
|
||||
except MovieEncodeError as e:
|
||||
if not quiet:
|
||||
print(str(e))
|
||||
return EncodedMovieResult(
|
||||
success=False,
|
||||
machine=machine,
|
||||
roi=roi,
|
||||
csv_frame_count=csv_frame_count,
|
||||
written=0,
|
||||
missing=missing_paths,
|
||||
dropped_read=0,
|
||||
output_path=out,
|
||||
skipped_reason=str(e),
|
||||
size_mb=None,
|
||||
elapsed_s=None,
|
||||
)
|
||||
|
||||
if not quiet:
|
||||
print(f"Target frame size (mode): {target_w} x {target_h}")
|
||||
if (enc_w, enc_h) != (target_w, target_h):
|
||||
print(f"Encode size (after max-height): {enc_w} x {enc_h}")
|
||||
|
||||
out.parent.mkdir(parents=True, exist_ok=True)
|
||||
written = 0
|
||||
dropped = 0
|
||||
resized = 0
|
||||
scaled_preview = 0
|
||||
try:
|
||||
writer = imageio.get_writer(
|
||||
str(out),
|
||||
fps=float(fps),
|
||||
codec="libx264",
|
||||
quality=8,
|
||||
macro_block_size=1,
|
||||
)
|
||||
try:
|
||||
for row, p in row_path_pairs:
|
||||
try:
|
||||
with Image.open(p) as im:
|
||||
rgb = im.convert("RGB")
|
||||
if rgb.size != (target_w, target_h):
|
||||
rgb = rgb.resize((target_w, target_h), Image.Resampling.LANCZOS)
|
||||
resized += 1
|
||||
if rgb.size != (enc_w, enc_h):
|
||||
rgb = rgb.resize((enc_w, enc_h), Image.Resampling.LANCZOS)
|
||||
scaled_preview += 1
|
||||
if metadata_overlay:
|
||||
draw_metadata_overlay(rgb, row, margin=max(6, enc_w // 80))
|
||||
frame = np.asarray(rgb)
|
||||
writer.append_data(frame)
|
||||
written += 1
|
||||
except OSError:
|
||||
dropped += 1
|
||||
finally:
|
||||
writer.close()
|
||||
except Exception as e:
|
||||
if not quiet:
|
||||
print(f"Encode error: {e}")
|
||||
return EncodedMovieResult(
|
||||
success=False,
|
||||
machine=machine,
|
||||
roi=roi,
|
||||
csv_frame_count=csv_frame_count,
|
||||
written=written,
|
||||
missing=missing_paths,
|
||||
dropped_read=dropped,
|
||||
output_path=out,
|
||||
skipped_reason=f"encode_error: {e}",
|
||||
size_mb=None,
|
||||
elapsed_s=None,
|
||||
)
|
||||
|
||||
elapsed = time.perf_counter() - t0
|
||||
size_mb = out.stat().st_size / (1024 * 1024) if out.is_file() else 0.0
|
||||
if not quiet:
|
||||
print(
|
||||
f"Written: {written} frames (normalized to mode: {resized}, "
|
||||
f"preview scale: {scaled_preview})"
|
||||
)
|
||||
print(f"Dropped (read error): {dropped}")
|
||||
print(f"Missing paths (not on disk): {missing_paths}")
|
||||
print(f"Output: {out.resolve()} ({size_mb:.2f} MB)")
|
||||
|
||||
return EncodedMovieResult(
|
||||
success=True,
|
||||
machine=machine,
|
||||
roi=roi,
|
||||
csv_frame_count=csv_frame_count,
|
||||
written=written,
|
||||
missing=missing_paths,
|
||||
dropped_read=dropped,
|
||||
output_path=out,
|
||||
skipped_reason=None,
|
||||
size_mb=size_mb,
|
||||
elapsed_s=elapsed,
|
||||
)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
args = parse_args()
|
||||
archive: Path = args.archive
|
||||
scans_csv: Path = args.scans_csv or (archive / "scans.csv")
|
||||
if not scans_csv.is_file():
|
||||
sys.exit(f"scans.csv not found: {scans_csv}")
|
||||
|
||||
roi_sel: tuple[float, float, float, float] | None
|
||||
if args.roi:
|
||||
roi_sel = parse_roi(args.roi)
|
||||
else:
|
||||
roi_sel = None
|
||||
|
||||
rows = load_latest_rows(scans_csv, args.machine, roi_sel)
|
||||
if roi_sel is None:
|
||||
roi_sel = pick_top_roi(rows)
|
||||
rows = [r for r in rows if extent_close(r, roi_sel)]
|
||||
|
||||
assert roi_sel is not None
|
||||
max_height: int | None = args.max_height
|
||||
metadata_overlay = not args.no_metadata_overlay
|
||||
|
||||
res = encode_movie(
|
||||
machine=args.machine,
|
||||
roi=roi_sel,
|
||||
rows=rows,
|
||||
archive=archive,
|
||||
max_height=max_height,
|
||||
metadata_overlay=metadata_overlay,
|
||||
fps=float(args.fps),
|
||||
output=args.output,
|
||||
dry_run=bool(args.dry_run),
|
||||
rank=None,
|
||||
quiet=False,
|
||||
)
|
||||
if not res.success and not args.dry_run:
|
||||
sys.exit(1 if res.skipped_reason else 1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,269 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Build preview MP4s for the top N ROIs per machine (default N=2, max-height 1080).
|
||||
|
||||
Reads archives/scans.csv once, groups on-disk mosaic rows by machine, then for
|
||||
each machine picks the most frequent ROI extents and calls encode_movie().
|
||||
|
||||
Usage:
|
||||
python scripts/build_mosaic_movies_batch.py
|
||||
python scripts/build_mosaic_movies_batch.py --dry-run
|
||||
python scripts/build_mosaic_movies_batch.py --skip-existing
|
||||
python scripts/build_mosaic_movies_batch.py --machine "BW2-10 [AMR-22]"
|
||||
python scripts/build_mosaic_movies_batch.py --full-res # no max-height cap
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
_SCRIPTS_DIR = Path(__file__).resolve().parent
|
||||
|
||||
# Import sibling module (run as python scripts/build_mosaic_movies_batch.py from repo root)
|
||||
sys.path.insert(0, str(_SCRIPTS_DIR))
|
||||
import build_mosaic_movie as bmm # noqa: E402
|
||||
|
||||
|
||||
def read_machine_labels(path: Path) -> list[str]:
|
||||
out: list[str] = []
|
||||
with path.open(encoding="utf-8") as fh:
|
||||
for line in fh:
|
||||
s = line.strip()
|
||||
if not s or s.startswith("#"):
|
||||
continue
|
||||
out.append(s)
|
||||
return out
|
||||
|
||||
|
||||
@dataclass
|
||||
class Job:
|
||||
machine: str
|
||||
rank: int
|
||||
roi: tuple[float, float, float, float]
|
||||
extent_count: int
|
||||
rows: list[dict]
|
||||
output_path: Path
|
||||
|
||||
|
||||
def collect_jobs(
|
||||
*,
|
||||
machines: list[str],
|
||||
by_machine: dict[str, list[dict]],
|
||||
archive: Path,
|
||||
top_rois: int,
|
||||
max_height: int | None,
|
||||
metadata_overlay: bool,
|
||||
) -> list[Job]:
|
||||
jobs: list[Job] = []
|
||||
for machine in machines:
|
||||
rows = by_machine.get(machine, [])
|
||||
if not rows:
|
||||
continue
|
||||
picks = bmm.pick_top_rois(rows, top_rois)
|
||||
for rank, (roi, extent_count) in enumerate(picks, start=1):
|
||||
rows_roi = [r for r in rows if bmm.extent_close(r, roi)]
|
||||
out = bmm.default_output_path(
|
||||
archive,
|
||||
machine,
|
||||
roi,
|
||||
max_height=max_height,
|
||||
metadata_overlay=metadata_overlay,
|
||||
rank=rank,
|
||||
)
|
||||
jobs.append(
|
||||
Job(
|
||||
machine=machine,
|
||||
rank=rank,
|
||||
roi=roi,
|
||||
extent_count=extent_count,
|
||||
rows=rows_roi,
|
||||
output_path=out,
|
||||
)
|
||||
)
|
||||
return jobs
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
p = argparse.ArgumentParser(description=__doc__)
|
||||
p.add_argument(
|
||||
"--machines-file",
|
||||
type=Path,
|
||||
default=_SCRIPTS_DIR / "machines.example.txt",
|
||||
help="One machine label per line (default: scripts/machines.example.txt next to this script)",
|
||||
)
|
||||
p.add_argument(
|
||||
"--machine",
|
||||
metavar="LABEL",
|
||||
help='If set, only this machine (overrides list to a single job set), e.g. "BW2-10 [AMR-22]"',
|
||||
)
|
||||
p.add_argument("--archive", type=Path, default=Path("archives"))
|
||||
p.add_argument("--scans-csv", type=Path, default=None)
|
||||
p.add_argument("--top-rois", type=int, default=2, help="How many top extents per machine (default: 2)")
|
||||
p.add_argument("--max-height", type=int, default=1080, help="Preview cap in px (default: 1080)")
|
||||
p.add_argument(
|
||||
"--full-res",
|
||||
action="store_true",
|
||||
help="Disable max-height cap (full mosaic resolution; can be huge)",
|
||||
)
|
||||
p.add_argument("--fps", type=float, default=10.0)
|
||||
p.add_argument("--dry-run", action="store_true")
|
||||
p.add_argument(
|
||||
"--skip-existing",
|
||||
action="store_true",
|
||||
help="Skip encode if output MP4 exists and is non-empty",
|
||||
)
|
||||
p.add_argument("--no-metadata-overlay", action="store_true")
|
||||
args = p.parse_args()
|
||||
if args.full_res:
|
||||
args.max_height = None
|
||||
return args
|
||||
|
||||
|
||||
def main() -> None:
|
||||
args = parse_args()
|
||||
archive: Path = args.archive
|
||||
scans_csv: Path = args.scans_csv or (archive / "scans.csv")
|
||||
if not scans_csv.is_file():
|
||||
sys.exit(f"scans.csv not found: {scans_csv}")
|
||||
|
||||
if args.machine:
|
||||
machines = [args.machine.strip()]
|
||||
else:
|
||||
if not args.machines_file.is_file():
|
||||
sys.exit(f"Machines file not found: {args.machines_file}")
|
||||
machines = read_machine_labels(args.machines_file)
|
||||
if not machines:
|
||||
sys.exit(f"No machine labels in {args.machines_file}")
|
||||
|
||||
t_load0 = time.perf_counter()
|
||||
by_machine = bmm.load_on_disk_rows_by_machine(scans_csv)
|
||||
load_s = time.perf_counter() - t_load0
|
||||
|
||||
max_height: int | None = args.max_height
|
||||
metadata_overlay = not args.no_metadata_overlay
|
||||
|
||||
jobs = collect_jobs(
|
||||
machines=machines,
|
||||
by_machine=by_machine,
|
||||
archive=archive,
|
||||
top_rois=args.top_rois,
|
||||
max_height=max_height,
|
||||
metadata_overlay=metadata_overlay,
|
||||
)
|
||||
total = len(jobs)
|
||||
if total == 0:
|
||||
sys.exit("No jobs (no on-disk mosaics for selected machines).")
|
||||
|
||||
print(f"Loaded scans.csv grouped by machine in {load_s:.2f}s ({total} job(s))")
|
||||
if max_height is not None:
|
||||
print(f"Max height: {max_height}px")
|
||||
else:
|
||||
print("Max height: (full resolution)")
|
||||
print(f"Metadata overlay: {'on' if metadata_overlay else 'off'}")
|
||||
print()
|
||||
|
||||
summary_rows: list[tuple[str, ...]] = []
|
||||
|
||||
for idx, job in enumerate(jobs, start=1):
|
||||
sx, sy, ex, ey = job.roi
|
||||
roi_s = f"{sx},{sy}..{ex},{ey}"
|
||||
print(
|
||||
f"[{idx}/{total}] {job.machine} rank={job.rank} ROI {roi_s} "
|
||||
f"({job.extent_count} CSV rows this extent, {len(job.rows)} deduped rows)"
|
||||
)
|
||||
|
||||
if args.skip_existing and job.output_path.is_file() and job.output_path.stat().st_size > 0:
|
||||
sz = job.output_path.stat().st_size / (1024 * 1024)
|
||||
print(f" SKIP (exists): {job.output_path}")
|
||||
summary_rows.append(
|
||||
(
|
||||
job.machine,
|
||||
str(job.rank),
|
||||
roi_s,
|
||||
str(len(job.rows)),
|
||||
"-",
|
||||
"-",
|
||||
str(job.output_path),
|
||||
"SKIP (exists)",
|
||||
f"{sz:.2f}",
|
||||
"-",
|
||||
)
|
||||
)
|
||||
continue
|
||||
|
||||
enc_t0 = time.perf_counter()
|
||||
res = bmm.encode_movie(
|
||||
machine=job.machine,
|
||||
roi=job.roi,
|
||||
rows=job.rows,
|
||||
archive=archive,
|
||||
max_height=max_height,
|
||||
metadata_overlay=metadata_overlay,
|
||||
fps=float(args.fps),
|
||||
output=None,
|
||||
dry_run=args.dry_run,
|
||||
rank=job.rank,
|
||||
quiet=True,
|
||||
)
|
||||
enc_elapsed = time.perf_counter() - enc_t0
|
||||
|
||||
if res.success:
|
||||
if args.dry_run:
|
||||
print(f" dry-run OK -> {res.output_path} (missing files: {res.missing})")
|
||||
else:
|
||||
print(
|
||||
f" OK {res.written} frames "
|
||||
f"{(res.size_mb or 0):.2f} MB {enc_elapsed:.1f}s"
|
||||
)
|
||||
else:
|
||||
print(f" FAIL {res.skipped_reason or 'unknown'}")
|
||||
|
||||
status = "OK" if res.success else "FAIL"
|
||||
if not res.success and res.skipped_reason:
|
||||
status = f"FAIL: {res.skipped_reason[:40]}"
|
||||
mb = f"{res.size_mb:.2f}" if res.size_mb is not None else "-"
|
||||
es = f"{enc_elapsed:.1f}" if not args.dry_run else "-"
|
||||
w = str(res.written) if not args.dry_run else "0"
|
||||
if args.dry_run:
|
||||
status = "dry-run"
|
||||
mb = "-"
|
||||
|
||||
summary_rows.append(
|
||||
(
|
||||
job.machine,
|
||||
str(job.rank),
|
||||
roi_s,
|
||||
str(len(job.rows)),
|
||||
w,
|
||||
str(res.missing),
|
||||
str(res.output_path or job.output_path),
|
||||
status,
|
||||
mb,
|
||||
es,
|
||||
)
|
||||
)
|
||||
print()
|
||||
|
||||
print("=" * 120)
|
||||
hdr = (
|
||||
f"{'machine':<26} {'rk':>2} {'ROI (mm)':<36} {'csv':>4} {'out':>5} "
|
||||
f"{'miss':>4} {'MB':>7} {'s':>6} {'status':<22}"
|
||||
)
|
||||
print(hdr)
|
||||
print("-" * 120)
|
||||
for row in summary_rows:
|
||||
m, rk, roi_s, csv_n, wrt, miss, path, status, mb, es = row
|
||||
print(
|
||||
f"{m:<26} {rk:>2} {roi_s:<36} {csv_n:>4} {wrt:>5} {miss:>4} "
|
||||
f"{mb:>7} {es:>6} {status:<22}"
|
||||
)
|
||||
print(f" -> {path}")
|
||||
print("=" * 120)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user