2 Commits

Author SHA1 Message Date
poprhythm 655c376a97 Add --retry-failed mode and mosaic retry estimates to progress report
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 17:38:51 -04:00
poprhythm 1ef9e0206c Add mosaic timelapse scripts and imageio dependencies.
Introduce build_mosaic_movie for single-ROI MP4s from archived mosaics, with optional max-height preview, semi-transparent metadata overlay, and encode_movie API for reuse. Add build_mosaic_movies_batch to encode the top N ROIs per machine using one scans.csv pass, progress output, and --skip-existing for safe reruns. Declare imageio and imageio-ffmpeg in requirements.txt.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 17:04:06 -04:00
8 changed files with 1380 additions and 27 deletions
+2
View File
@@ -6,3 +6,5 @@ tqdm>=4.66.0
piexif>=1.1.3
Pillow>=10.0.0
pytest>=8.0
imageio>=2.34
imageio-ffmpeg>=0.5
+710
View File
@@ -0,0 +1,710 @@
#!/usr/bin/env python3
"""
Build a chronological MP4 from downloaded mosaic.jpg files for one machine ROI.
Reads archives/scans.csv, filters by machine and mosaic_on_disk, optionally
restricts to one (start_x, start_y, end_x, end_y) ROI, dedupes by scan_id
(last row wins), sorts by scan_time, and encodes frames with imageio/ffmpeg.
Usage:
.venv/bin/python scripts/build_mosaic_movie.py --machine "BW1-4 [AMR-15]"
.venv/bin/python scripts/build_mosaic_movie.py --machine "BW1-4 [AMR-15]" \\
--roi "195.65,219.22,219.73,235.04" --fps 8 --output /tmp/out.mp4
.venv/bin/python scripts/build_mosaic_movie.py --machine "BW1-4 [AMR-15]" --dry-run
# Lighter preview (caps tall full-tube mosaics by height — easier on players):
.venv/bin/python scripts/build_mosaic_movie.py --machine "BW1-4 [AMR-15]" \\
--roi "0.0,0.0,310.0,740.0" --preview
# Metadata is drawn on each frame by default (semi-transparent bar at the top);
# use --no-metadata-overlay to disable.
"""
from __future__ import annotations
import argparse
import csv
import os
import sys
import time
from collections import Counter, defaultdict
from dataclasses import dataclass
from pathlib import Path
import imageio
import numpy as np
from PIL import Image, ImageDraw, ImageFont
class MovieEncodeError(Exception):
"""Raised from encoding helpers; caught by encode_movie for batch-safe handling."""
@dataclass
class EncodedMovieResult:
success: bool
machine: str
roi: tuple[float, float, float, float]
csv_frame_count: int
written: int
missing: int
dropped_read: int
output_path: Path | None
skipped_reason: str | None
size_mb: float | None
elapsed_s: float | None
def sanitize_machine_label(label: str) -> str:
return label.replace("[", "").replace("]", "").replace(" ", "_").strip("_")
def parse_roi(s: str) -> tuple[float, float, float, float]:
parts = [p.strip() for p in s.split(",")]
if len(parts) != 4:
sys.exit("--roi must be four comma-separated numbers: start_x,start_y,end_x,end_y")
try:
return tuple(float(p) for p in parts) # type: ignore[return-value]
except ValueError as e:
sys.exit(f"Invalid --roi numbers: {e}")
def extent_close(
row: dict,
roi: tuple[float, float, float, float],
*,
tol: float = 1e-4,
) -> bool:
keys = ("start_x", "start_y", "end_x", "end_y")
try:
vals = tuple(float(row[k]) for k in keys)
except (KeyError, ValueError):
return False
return all(abs(a - b) < tol for a, b in zip(vals, roi))
def extent_key(row: dict) -> tuple[str, str, str, str]:
"""Stable grouping key from CSV string fields."""
return (
row.get("start_x", "").strip(),
row.get("start_y", "").strip(),
row.get("end_x", "").strip(),
row.get("end_y", "").strip(),
)
def key_to_roi_floats(key: tuple[str, str, str, str]) -> tuple[float, float, float, float]:
return tuple(float(x) for x in key) # type: ignore[return-value]
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description=__doc__)
p.add_argument("--machine", required=True, help='RootView machine label, e.g. "BW1-4 [AMR-15]"')
p.add_argument(
"--roi",
metavar="SX,SY,EX,EY",
help="Restrict to this extent (mm). If omitted, pick the ROI with the most on-disk mosaics.",
)
p.add_argument("--archive", default="archives", type=Path, help="Archive root (default: archives)")
p.add_argument(
"--scans-csv",
default=None,
type=Path,
help="Path to scans.csv (default: <archive>/scans.csv)",
)
p.add_argument(
"--output",
"-o",
type=Path,
default=None,
help="Output .mp4 path (default: <archive>/movies/<machine>/roi_<...>.mp4)",
)
p.add_argument("--fps", type=float, default=10.0, help="Frames per second (default: 10)")
p.add_argument(
"--max-height",
type=int,
default=None,
metavar="PX",
help="Scale each frame so height is at most PX pixels (width keeps aspect); "
"suited to tall full-tube mosaics. Both dimensions are rounded to even pixels for H.264.",
)
p.add_argument(
"--preview",
action="store_true",
help="Shorthand for --max-height 1080 (overridden if --max-height is also set).",
)
p.add_argument(
"--dry-run",
action="store_true",
help="List frames that would be written (no MP4)",
)
p.add_argument(
"--no-metadata-overlay",
action="store_true",
help="Do not draw scan metadata on each frame (default: overlay on).",
)
args = p.parse_args()
if args.preview and args.max_height is None:
args.max_height = 1080
return args
def _csv_required_fieldnames() -> tuple[str, ...]:
return (
"machine",
"scan_id",
"scan_time",
"mosaic_on_disk",
"mosaic_local_path",
"start_x",
"start_y",
"end_x",
"end_y",
)
def validate_scans_csv_header(reader: csv.DictReader, scans_csv: Path) -> None:
if reader.fieldnames is None:
sys.exit(f"Empty CSV: {scans_csv}")
required = _csv_required_fieldnames()
missing = [c for c in required if c not in reader.fieldnames]
if missing:
sys.exit(f"{scans_csv} missing columns: {missing}")
def load_latest_rows(
scans_csv: Path,
machine: str,
roi: tuple[float, float, float, float] | None,
) -> list[dict]:
"""Last row per scan_id for matching machine; mosaic_on_disk True; optional ROI."""
latest: dict[str, dict] = {}
with scans_csv.open(newline="", encoding="utf-8") as fh:
reader = csv.DictReader(fh)
validate_scans_csv_header(reader, scans_csv)
for row in reader:
if row.get("machine", "") != machine:
continue
if row.get("mosaic_on_disk", "").strip() != "True":
continue
if roi is not None and not extent_close(row, roi):
continue
sid = row.get("scan_id", "").strip()
if not sid:
continue
latest[sid] = row
return list(latest.values())
def load_on_disk_rows_by_machine(scans_csv: Path) -> dict[str, list[dict]]:
"""One pass: last row per (machine, scan_id) where mosaic_on_disk True; group by machine."""
latest: dict[tuple[str, str], dict] = {}
with scans_csv.open(newline="", encoding="utf-8") as fh:
reader = csv.DictReader(fh)
validate_scans_csv_header(reader, scans_csv)
for row in reader:
if row.get("mosaic_on_disk", "").strip() != "True":
continue
sid = row.get("scan_id", "").strip()
m = row.get("machine", "").strip()
if not sid or not m:
continue
latest[(m, sid)] = row
by_machine: dict[str, list[dict]] = defaultdict(list)
for (m, _sid), r in latest.items():
by_machine[m].append(r)
return {k: v for k, v in by_machine.items()}
def pick_top_rois(rows: list[dict], n: int) -> list[tuple[tuple[float, float, float, float], int]]:
"""Top n distinct extents by count of deduped rows. Empty if rows empty or n < 1."""
if not rows or n < 1:
return []
counts = Counter(extent_key(r) for r in rows)
return [(key_to_roi_floats(key), cnt) for key, cnt in counts.most_common(n)]
def pick_top_roi(rows: list[dict]) -> tuple[float, float, float, float]:
if not rows:
sys.exit("No rows with mosaic_on_disk=True for this machine (and ROI filter, if any).")
return pick_top_rois(rows, 1)[0][0]
def default_output_path(
archive: Path,
machine: str,
roi: tuple[float, float, float, float],
*,
max_height: int | None,
metadata_overlay: bool,
rank: int | None = None,
) -> Path:
safe = sanitize_machine_label(machine)
sx, sy, ex, ey = roi
base = f"roi_{sx}_{sy}_{ex}_{ey}".replace(" ", "")
if rank is not None:
base = f"{base}_r{rank}"
if max_height is not None:
base = f"{base}_h{max_height}"
if metadata_overlay:
base = f"{base}_meta"
name = f"{base}.mp4"
return archive / "movies" / safe / name
def resolve_mosaic_path(rel: str, archive: Path) -> Path:
"""CSV paths are usually repo-relative, e.g. archives/BW1-4__AMR-15/.../mosaic.jpg."""
p = Path(rel)
if p.is_absolute():
return p.resolve()
ar = archive.resolve()
norm = rel.replace("\\", "/")
if norm.startswith("archives/") or norm.startswith("./archives/"):
return (ar.parent / rel).resolve()
return (ar / rel).resolve()
def even_dimensions(w: int, h: int) -> tuple[int, int]:
"""libx264 requires even width and height."""
w2 = w - (w % 2)
h2 = h - (h % 2)
if w2 < 2 or h2 < 2:
raise MovieEncodeError(f"Frame dimensions too small after evenizing: {w}x{h}")
return w2, h2
def frame_size_mode(paths: list[Path]) -> tuple[int, int]:
sizes: list[tuple[int, int]] = []
for p in paths:
try:
with Image.open(p) as im:
sizes.append(im.size)
except OSError:
continue
if not sizes:
raise MovieEncodeError("No readable mosaic images to determine frame size.")
w, h = Counter(sizes).most_common(1)[0][0]
return even_dimensions(w, h)
def encode_size(native_w: int, native_h: int, max_height: int | None) -> tuple[int, int]:
"""Native size is already even; optional downscale for preview encodes (cap height)."""
if max_height is None:
return native_w, native_h
if max_height < 32:
raise MovieEncodeError("--max-height must be at least 32")
cap = max_height - (max_height % 2)
if cap < 2:
raise MovieEncodeError("--max-height must allow an even height of at least 2")
if native_h <= cap:
return native_w, native_h
new_h = cap
new_w = int(round(native_w * (new_h / native_h)))
new_w -= new_w % 2
if new_w < 2:
raise MovieEncodeError("Computed preview width too small; try a larger --max-height")
return new_w, new_h
def _truetype_font_candidates() -> list[Path]:
windir = os.environ.get("WINDIR", r"C:\Windows")
return [
Path(windir) / "Fonts" / "arial.ttf",
Path(windir) / "Fonts" / "consola.ttf",
Path("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf"),
Path("/usr/share/fonts/truetype/liberation/LiberationSans-Regular.ttf"),
]
def get_overlay_font(size: int) -> ImageFont.FreeTypeFont | ImageFont.ImageFont:
for path in _truetype_font_candidates():
if path.is_file():
try:
return ImageFont.truetype(str(path), size=size)
except OSError:
continue
return ImageFont.load_default()
def _truncate(s: str, max_len: int) -> str:
s = s.strip()
if len(s) <= max_len:
return s
if max_len <= 3:
return s[:max_len]
return s[: max_len - 3] + "..."
def metadata_overlay_lines(row: dict, *, max_name_chars: int) -> list[str]:
sid = row.get("scan_id", "").strip()
st = row.get("scan_time", "").strip()
name = _truncate(row.get("name", "").strip(), max_name_chars)
nx = row.get("nx", "").strip()
ny = row.get("ny", "").strip()
dx = row.get("dx", "").strip()
dy = row.get("dy", "").strip()
lines = row.get("scan_lines", "").strip()
mode = row.get("scan_mode", "").strip()
user = row.get("user", "").strip()
status = row.get("status", "").strip()
sx = row.get("start_x", "").strip()
sy = row.get("start_y", "").strip()
ex = row.get("end_x", "").strip()
ey = row.get("end_y", "").strip()
machine = row.get("machine", "").strip()
grid = f"{nx}x{ny}" if nx and ny else ""
step = f"{dx}x{dy} mm" if dx and dy else ""
geom = " ".join(p for p in (grid, step) if p)
orient = f"{lines} / {mode}" if lines or mode else ""
out: list[str] = []
if machine:
out.append(machine)
if sid or st:
parts = []
if sid:
parts.append(f"id {sid}")
if st:
parts.append(st)
out.append(" ".join(parts))
if name:
out.append(name)
if geom or orient:
out.append(" ".join(p for p in (geom, orient) if p))
if sx and sy and ex and ey:
out.append(f"ROI mm {sx},{sy} .. {ex},{ey}")
if user or status:
tail: list[str] = []
if user:
tail.append(f"user {user}")
if status:
tail.append(status)
out.append(" ".join(tail))
return out if out else ["(no metadata)"]
def draw_metadata_overlay(
rgb: Image.Image,
row: dict,
*,
margin: int,
) -> None:
"""Draw a semi-transparent label block along the top; mutates rgb in place."""
# Panel alpha: ~50% so roots stay visible through the bar.
panel_fill = (0, 0, 0, 128)
panel_outline = (220, 220, 230, 100)
w, h = rgb.size
margin = max(4, min(margin, w // 8))
font_size = max(10, min(22, h // 50))
pad = max(4, font_size // 3)
line_gap = max(2, font_size // 6)
def measure_block(fs: int, name_max: int) -> tuple[ImageFont.FreeTypeFont | ImageFont.ImageFont, list[str], int, int]:
font = get_overlay_font(fs)
lines = metadata_overlay_lines(row, max_name_chars=name_max)
tmp = Image.new("RGB", (1, 1))
draw = ImageDraw.Draw(tmp)
max_tw = 0
total_h = 0
for line in lines:
bbox = draw.textbbox((0, 0), line, font=font)
tw = bbox[2] - bbox[0]
th = bbox[3] - bbox[1]
max_tw = max(max_tw, tw)
total_h += th
if len(lines) > 1:
total_h += line_gap * (len(lines) - 1)
block_w = max_tw + 2 * pad
block_h = total_h + 2 * pad
return font, lines, block_w, block_h
name_max = max(24, w // max(6, font_size // 2))
font, lines, block_w, block_h = measure_block(font_size, name_max)
max_block_w = w - 2 * margin
max_block_h = min(h // 2, h - 2 * margin)
while (block_w > max_block_w or block_h > max_block_h) and font_size > 9:
font_size -= 1
name_max = max(16, w // max(7, font_size // 2))
font, lines, block_w, block_h = measure_block(font_size, name_max)
while block_w > max_block_w and name_max > 12:
name_max -= 4
font, lines, block_w, block_h = measure_block(font_size, name_max)
bw = min(block_w, max_block_w)
bh = min(block_h, max_block_h)
x0 = margin
x1 = x0 + bw
y0 = margin
y1 = y0 + bh
overlay = Image.new("RGBA", (w, h), (0, 0, 0, 0))
draw_ov = ImageDraw.Draw(overlay)
draw_ov.rounded_rectangle(
[x0, y0, x1, y1],
radius=max(4, pad // 2),
fill=panel_fill,
outline=panel_outline,
width=1,
)
base = rgb.convert("RGBA")
composited = Image.alpha_composite(base, overlay)
draw = ImageDraw.Draw(composited)
cx, cy = x0 + pad, y0 + pad
text_bottom_limit = y1 - pad
for line in lines:
bbox = draw.textbbox((0, 0), line, font=font)
th = bbox[3] - bbox[1]
if cy + th > text_bottom_limit:
break
draw.text(
(cx, cy),
line,
fill=(245, 245, 245, 255),
font=font,
stroke_width=1,
stroke_fill=(0, 0, 0, 255),
)
cy += th + line_gap
rgb.paste(composited.convert("RGB"))
def encode_movie(
*,
machine: str,
roi: tuple[float, float, float, float],
rows: list[dict],
archive: Path,
max_height: int | None,
metadata_overlay: bool,
fps: float,
output: Path | None = None,
dry_run: bool = False,
rank: int | None = None,
quiet: bool = False,
) -> EncodedMovieResult:
"""Build MP4 from pre-filtered rows for one ROI. Does not sys.exit on encode failures."""
rows_sorted = sorted(
rows,
key=lambda r: (r.get("scan_time") or "", r.get("scan_id") or ""),
)
csv_frame_count = len(rows_sorted)
row_path_candidates: list[tuple[dict, Path]] = []
for r in rows_sorted:
rel = (r.get("mosaic_local_path") or "").strip()
if not rel:
continue
row_path_candidates.append((r, resolve_mosaic_path(rel, archive)))
out: Path = output or default_output_path(
archive,
machine,
roi,
max_height=max_height,
metadata_overlay=metadata_overlay,
rank=rank,
)
if not quiet:
print(f"Machine: {machine}")
print(f"ROI (mm): {roi[0]}, {roi[1]}, {roi[2]}, {roi[3]}")
print(f"Frames (from CSV, deduped): {csv_frame_count}")
if max_height is not None:
print(f"Preview max height: {max_height}px")
print(f"Metadata overlay: {'on' if metadata_overlay else 'off'}")
on_disk = sum(1 for _r, p in row_path_candidates if p.is_file())
missing_paths = len(row_path_candidates) - on_disk
if dry_run:
if not quiet:
print(f"On-disk files among ordered list: {on_disk} / {len(row_path_candidates)}")
for i, (_r, p) in enumerate(row_path_candidates[:5]):
print(f" [{i}] {p} exists={p.is_file()}")
if len(row_path_candidates) > 10:
print(" ...")
start = max(0, len(row_path_candidates) - 3)
for i, (_r, p) in enumerate(row_path_candidates[start:], start=start):
print(f" [{i}] {p} exists={p.is_file()}")
return EncodedMovieResult(
success=True,
machine=machine,
roi=roi,
csv_frame_count=csv_frame_count,
written=0,
missing=missing_paths,
dropped_read=0,
output_path=out,
skipped_reason=None,
size_mb=None,
elapsed_s=None,
)
row_path_pairs = [(r, p) for r, p in row_path_candidates if p.is_file()]
if not row_path_pairs:
if not quiet:
print("No mosaic files on disk for the selected rows.")
return EncodedMovieResult(
success=False,
machine=machine,
roi=roi,
csv_frame_count=csv_frame_count,
written=0,
missing=missing_paths,
dropped_read=0,
output_path=out,
skipped_reason="No mosaic files on disk for the selected rows.",
size_mb=None,
elapsed_s=None,
)
t0 = time.perf_counter()
try:
ordered_paths = [p for _r, p in row_path_pairs]
target_w, target_h = frame_size_mode(ordered_paths)
enc_w, enc_h = encode_size(target_w, target_h, max_height)
except MovieEncodeError as e:
if not quiet:
print(str(e))
return EncodedMovieResult(
success=False,
machine=machine,
roi=roi,
csv_frame_count=csv_frame_count,
written=0,
missing=missing_paths,
dropped_read=0,
output_path=out,
skipped_reason=str(e),
size_mb=None,
elapsed_s=None,
)
if not quiet:
print(f"Target frame size (mode): {target_w} x {target_h}")
if (enc_w, enc_h) != (target_w, target_h):
print(f"Encode size (after max-height): {enc_w} x {enc_h}")
out.parent.mkdir(parents=True, exist_ok=True)
written = 0
dropped = 0
resized = 0
scaled_preview = 0
try:
writer = imageio.get_writer(
str(out),
fps=float(fps),
codec="libx264",
quality=8,
macro_block_size=1,
)
try:
for row, p in row_path_pairs:
try:
with Image.open(p) as im:
rgb = im.convert("RGB")
if rgb.size != (target_w, target_h):
rgb = rgb.resize((target_w, target_h), Image.Resampling.LANCZOS)
resized += 1
if rgb.size != (enc_w, enc_h):
rgb = rgb.resize((enc_w, enc_h), Image.Resampling.LANCZOS)
scaled_preview += 1
if metadata_overlay:
draw_metadata_overlay(rgb, row, margin=max(6, enc_w // 80))
frame = np.asarray(rgb)
writer.append_data(frame)
written += 1
except OSError:
dropped += 1
finally:
writer.close()
except Exception as e:
if not quiet:
print(f"Encode error: {e}")
return EncodedMovieResult(
success=False,
machine=machine,
roi=roi,
csv_frame_count=csv_frame_count,
written=written,
missing=missing_paths,
dropped_read=dropped,
output_path=out,
skipped_reason=f"encode_error: {e}",
size_mb=None,
elapsed_s=None,
)
elapsed = time.perf_counter() - t0
size_mb = out.stat().st_size / (1024 * 1024) if out.is_file() else 0.0
if not quiet:
print(
f"Written: {written} frames (normalized to mode: {resized}, "
f"preview scale: {scaled_preview})"
)
print(f"Dropped (read error): {dropped}")
print(f"Missing paths (not on disk): {missing_paths}")
print(f"Output: {out.resolve()} ({size_mb:.2f} MB)")
return EncodedMovieResult(
success=True,
machine=machine,
roi=roi,
csv_frame_count=csv_frame_count,
written=written,
missing=missing_paths,
dropped_read=dropped,
output_path=out,
skipped_reason=None,
size_mb=size_mb,
elapsed_s=elapsed,
)
def main() -> None:
args = parse_args()
archive: Path = args.archive
scans_csv: Path = args.scans_csv or (archive / "scans.csv")
if not scans_csv.is_file():
sys.exit(f"scans.csv not found: {scans_csv}")
roi_sel: tuple[float, float, float, float] | None
if args.roi:
roi_sel = parse_roi(args.roi)
else:
roi_sel = None
rows = load_latest_rows(scans_csv, args.machine, roi_sel)
if roi_sel is None:
roi_sel = pick_top_roi(rows)
rows = [r for r in rows if extent_close(r, roi_sel)]
assert roi_sel is not None
max_height: int | None = args.max_height
metadata_overlay = not args.no_metadata_overlay
res = encode_movie(
machine=args.machine,
roi=roi_sel,
rows=rows,
archive=archive,
max_height=max_height,
metadata_overlay=metadata_overlay,
fps=float(args.fps),
output=args.output,
dry_run=bool(args.dry_run),
rank=None,
quiet=False,
)
if not res.success and not args.dry_run:
sys.exit(1 if res.skipped_reason else 1)
if __name__ == "__main__":
main()
+269
View File
@@ -0,0 +1,269 @@
#!/usr/bin/env python3
"""
Build preview MP4s for the top N ROIs per machine (default N=2, max-height 1080).
Reads archives/scans.csv once, groups on-disk mosaic rows by machine, then for
each machine picks the most frequent ROI extents and calls encode_movie().
Usage:
python scripts/build_mosaic_movies_batch.py
python scripts/build_mosaic_movies_batch.py --dry-run
python scripts/build_mosaic_movies_batch.py --skip-existing
python scripts/build_mosaic_movies_batch.py --machine "BW2-10 [AMR-22]"
python scripts/build_mosaic_movies_batch.py --full-res # no max-height cap
"""
from __future__ import annotations
import argparse
import sys
import time
from dataclasses import dataclass
from pathlib import Path
_SCRIPTS_DIR = Path(__file__).resolve().parent
# Import sibling module (run as python scripts/build_mosaic_movies_batch.py from repo root)
sys.path.insert(0, str(_SCRIPTS_DIR))
import build_mosaic_movie as bmm # noqa: E402
def read_machine_labels(path: Path) -> list[str]:
out: list[str] = []
with path.open(encoding="utf-8") as fh:
for line in fh:
s = line.strip()
if not s or s.startswith("#"):
continue
out.append(s)
return out
@dataclass
class Job:
machine: str
rank: int
roi: tuple[float, float, float, float]
extent_count: int
rows: list[dict]
output_path: Path
def collect_jobs(
*,
machines: list[str],
by_machine: dict[str, list[dict]],
archive: Path,
top_rois: int,
max_height: int | None,
metadata_overlay: bool,
) -> list[Job]:
jobs: list[Job] = []
for machine in machines:
rows = by_machine.get(machine, [])
if not rows:
continue
picks = bmm.pick_top_rois(rows, top_rois)
for rank, (roi, extent_count) in enumerate(picks, start=1):
rows_roi = [r for r in rows if bmm.extent_close(r, roi)]
out = bmm.default_output_path(
archive,
machine,
roi,
max_height=max_height,
metadata_overlay=metadata_overlay,
rank=rank,
)
jobs.append(
Job(
machine=machine,
rank=rank,
roi=roi,
extent_count=extent_count,
rows=rows_roi,
output_path=out,
)
)
return jobs
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description=__doc__)
p.add_argument(
"--machines-file",
type=Path,
default=_SCRIPTS_DIR / "machines.example.txt",
help="One machine label per line (default: scripts/machines.example.txt next to this script)",
)
p.add_argument(
"--machine",
metavar="LABEL",
help='If set, only this machine (overrides list to a single job set), e.g. "BW2-10 [AMR-22]"',
)
p.add_argument("--archive", type=Path, default=Path("archives"))
p.add_argument("--scans-csv", type=Path, default=None)
p.add_argument("--top-rois", type=int, default=2, help="How many top extents per machine (default: 2)")
p.add_argument("--max-height", type=int, default=1080, help="Preview cap in px (default: 1080)")
p.add_argument(
"--full-res",
action="store_true",
help="Disable max-height cap (full mosaic resolution; can be huge)",
)
p.add_argument("--fps", type=float, default=10.0)
p.add_argument("--dry-run", action="store_true")
p.add_argument(
"--skip-existing",
action="store_true",
help="Skip encode if output MP4 exists and is non-empty",
)
p.add_argument("--no-metadata-overlay", action="store_true")
args = p.parse_args()
if args.full_res:
args.max_height = None
return args
def main() -> None:
args = parse_args()
archive: Path = args.archive
scans_csv: Path = args.scans_csv or (archive / "scans.csv")
if not scans_csv.is_file():
sys.exit(f"scans.csv not found: {scans_csv}")
if args.machine:
machines = [args.machine.strip()]
else:
if not args.machines_file.is_file():
sys.exit(f"Machines file not found: {args.machines_file}")
machines = read_machine_labels(args.machines_file)
if not machines:
sys.exit(f"No machine labels in {args.machines_file}")
t_load0 = time.perf_counter()
by_machine = bmm.load_on_disk_rows_by_machine(scans_csv)
load_s = time.perf_counter() - t_load0
max_height: int | None = args.max_height
metadata_overlay = not args.no_metadata_overlay
jobs = collect_jobs(
machines=machines,
by_machine=by_machine,
archive=archive,
top_rois=args.top_rois,
max_height=max_height,
metadata_overlay=metadata_overlay,
)
total = len(jobs)
if total == 0:
sys.exit("No jobs (no on-disk mosaics for selected machines).")
print(f"Loaded scans.csv grouped by machine in {load_s:.2f}s ({total} job(s))")
if max_height is not None:
print(f"Max height: {max_height}px")
else:
print("Max height: (full resolution)")
print(f"Metadata overlay: {'on' if metadata_overlay else 'off'}")
print()
summary_rows: list[tuple[str, ...]] = []
for idx, job in enumerate(jobs, start=1):
sx, sy, ex, ey = job.roi
roi_s = f"{sx},{sy}..{ex},{ey}"
print(
f"[{idx}/{total}] {job.machine} rank={job.rank} ROI {roi_s} "
f"({job.extent_count} CSV rows this extent, {len(job.rows)} deduped rows)"
)
if args.skip_existing and job.output_path.is_file() and job.output_path.stat().st_size > 0:
sz = job.output_path.stat().st_size / (1024 * 1024)
print(f" SKIP (exists): {job.output_path}")
summary_rows.append(
(
job.machine,
str(job.rank),
roi_s,
str(len(job.rows)),
"-",
"-",
str(job.output_path),
"SKIP (exists)",
f"{sz:.2f}",
"-",
)
)
continue
enc_t0 = time.perf_counter()
res = bmm.encode_movie(
machine=job.machine,
roi=job.roi,
rows=job.rows,
archive=archive,
max_height=max_height,
metadata_overlay=metadata_overlay,
fps=float(args.fps),
output=None,
dry_run=args.dry_run,
rank=job.rank,
quiet=True,
)
enc_elapsed = time.perf_counter() - enc_t0
if res.success:
if args.dry_run:
print(f" dry-run OK -> {res.output_path} (missing files: {res.missing})")
else:
print(
f" OK {res.written} frames "
f"{(res.size_mb or 0):.2f} MB {enc_elapsed:.1f}s"
)
else:
print(f" FAIL {res.skipped_reason or 'unknown'}")
status = "OK" if res.success else "FAIL"
if not res.success and res.skipped_reason:
status = f"FAIL: {res.skipped_reason[:40]}"
mb = f"{res.size_mb:.2f}" if res.size_mb is not None else "-"
es = f"{enc_elapsed:.1f}" if not args.dry_run else "-"
w = str(res.written) if not args.dry_run else "0"
if args.dry_run:
status = "dry-run"
mb = "-"
summary_rows.append(
(
job.machine,
str(job.rank),
roi_s,
str(len(job.rows)),
w,
str(res.missing),
str(res.output_path or job.output_path),
status,
mb,
es,
)
)
print()
print("=" * 120)
hdr = (
f"{'machine':<26} {'rk':>2} {'ROI (mm)':<36} {'csv':>4} {'out':>5} "
f"{'miss':>4} {'MB':>7} {'s':>6} {'status':<22}"
)
print(hdr)
print("-" * 120)
for row in summary_rows:
m, rk, roi_s, csv_n, wrt, miss, path, status, mb, es = row
print(
f"{m:<26} {rk:>2} {roi_s:<36} {csv_n:>4} {wrt:>5} {miss:>4} "
f"{mb:>7} {es:>6} {status:<22}"
)
print(f" -> {path}")
print("=" * 120)
if __name__ == "__main__":
main()
+104 -4
View File
@@ -2,11 +2,13 @@
"""
Report mosaic download progress from archives/scans.csv.
Output is formatted as Markdown. Add --by-year for a per-machine ×
per-year breakdown table.
Output is Markdown. Use ``--by-year`` for a per-machine × per-year
done/failed table. When the first mosaic pass is complete (no pending rows)
but failures remain, a **Mosaic retry estimates** section is printed with
queue counts and duration hints.
Rate/ETA require two calls at least 60 s apart. Mean mosaic size is
sampled from up to 100 already-downloaded files and cached for 1 hour.
Rate/ETA use a 30-minute rolling window when snapshots show progress.
Mean mosaic size is sampled from up to 100 downloads (1-hour cache).
Usage:
python scripts/mosaic_progress_report.py [--archive DIR] [--by-year]
@@ -30,6 +32,11 @@ _R_PRE19 = 1.00
_R_PURGED = 0.00
_R_RECENT = 0.82
FIRST_PASS_FALLBACK_RATE_PER_HR = 1100.0
RETRY_OPTIMISTIC_RATE_PER_HR = 1800.0
RETRY_REALISTIC_RATE_PER_HR = 1100.0
RETRY_PESSIMISTIC_RATE_PER_HR = 300.0
# ---------------------------------------------------------------------------
# Helpers
@@ -127,6 +134,12 @@ def _expected_remaining(pending_rows: list[dict]) -> float:
return count
def _retry_hours_from_rate(n_scans: int, rate_per_hr: float) -> str:
if n_scans <= 0 or rate_per_hr <= 0:
return ""
return _fmt_duration(n_scans / rate_per_hr * 3600.0)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
@@ -213,15 +226,32 @@ def main() -> None:
rate_per_sec: float | None = None
rate_window_str = ""
snap_delta_proc = 0
if recent:
oldest = recent[0]
dt = now.timestamp() - oldest["ts"]
dp = processed - oldest["proc"]
snap_delta_proc = dp
if dt >= 60 and dp > 0:
rate_per_sec = dp / dt
window_min = dt / 60
rate_window_str = f"{window_min:.0f}-min avg"
# One-time baseline after initial mosaic crawl finished (no pending rows).
if pending == 0 and "first_pass_mean_rate_per_hr" not in cache:
cache["first_pass_completed_at"] = now.isoformat()
cache["first_pass_processed"] = total
cache["first_pass_mean_rate_per_hr"] = FIRST_PASS_FALLBACK_RATE_PER_HR
first_pass_rate_hr = float(
cache.get("first_pass_mean_rate_per_hr", FIRST_PASS_FALLBACK_RATE_PER_HR)
)
live_rate_hr = rate_per_sec * 3600 if rate_per_sec else None
# Active scrape shows progress in snapshots; idle archive shows dp == 0.
retry_estimate_rate_hr = (
live_rate_hr if live_rate_hr is not None else first_pass_rate_hr
)
# --- Disk space ---
mean_bytes: float | None = None
size_note = ""
@@ -325,6 +355,76 @@ def main() -> None:
align=["l", "r", "r", "r", "r", "r"],
))
# -----------------------------------------------------------------------
# Retry estimates (first pass complete: pending == 0, failures remain)
# -----------------------------------------------------------------------
if failed > 0 and pending == 0:
failed_rows_list = [
r for r in latest.values()
if r.get("mosaic_download_status") == "failed"
]
n_all = len(failed_rows_list)
n_2023 = sum(
1 for r in failed_rows_list
if (r.get("scan_time") or "")[:4] >= "2023"
and len((r.get("scan_time") or "")[:4]) == 4
)
n_200 = sum(
1 for r in failed_rows_list
if r.get("mosaic_error_code") == "200"
)
rate_note = (
"rolling 30-min window"
if snap_delta_proc > 0
else f"first-pass baseline ({first_pass_rate_hr:,.0f}/hr)"
)
print()
print("### Mosaic retry estimates\n")
print(
f"*Suggested command after server fix:* "
f"`python scraper.py --retry-failed --workers 2` "
f"(filters: `--retry-since YEAR`, `--retry-error-code CODE`)*\n"
)
print(
f"*ETA column uses **{retry_estimate_rate_hr:,.0f} scans/hr** "
f"({rate_note}). Fixed columns use scenario rates.*\n"
)
est_hdr = (
"Retry scope",
"Count",
f"@{RETRY_OPTIMISTIC_RATE_PER_HR:.0f}/hr",
f"@{RETRY_REALISTIC_RATE_PER_HR:.0f}/hr",
f"@{RETRY_PESSIMISTIC_RATE_PER_HR:.0f}/hr",
f"@{retry_estimate_rate_hr:.0f}/hr",
)
retry_tbl_rows = [
[
"HTTP 200 (empty body)",
f"{n_200:,}",
_retry_hours_from_rate(n_200, RETRY_OPTIMISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_200, RETRY_REALISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_200, RETRY_PESSIMISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_200, retry_estimate_rate_hr),
],
[
"Failed, scan_time ≥ 2023",
f"{n_2023:,}",
_retry_hours_from_rate(n_2023, RETRY_OPTIMISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_2023, RETRY_REALISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_2023, RETRY_PESSIMISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_2023, retry_estimate_rate_hr),
],
[
"**All failed**",
f"**{n_all:,}**",
_retry_hours_from_rate(n_all, RETRY_OPTIMISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_all, RETRY_REALISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_all, RETRY_PESSIMISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_all, retry_estimate_rate_hr),
],
]
print(_md_table(list(est_hdr), retry_tbl_rows, align=["l", "r", "r", "r", "r", "r"]))
# -----------------------------------------------------------------------
# --by-year table
# -----------------------------------------------------------------------
+47 -1
View File
@@ -84,6 +84,33 @@ def parse_args() -> argparse.Namespace:
"inventorying all scans across all machines."
),
)
p.add_argument(
"--retry-failed",
action="store_true",
help=(
"Mosaic-only: re-attempt scans whose latest scans.csv row has "
"mosaic_download_status=failed (queue from CSV, not the server list). "
"Implies --mosaic-only."
),
)
p.add_argument(
"--retry-since",
metavar="YEAR",
default=None,
help=(
"With --retry-failed: only scans with scan_time year >= YEAR "
"(e.g. 2023)."
),
)
p.add_argument(
"--retry-error-code",
metavar="CODE",
default=None,
help=(
"With --retry-failed: filter by mosaic_error_code "
"(e.g. 200 for empty-body failures)."
),
)
p.add_argument(
"--dry-run",
action="store_true",
@@ -159,6 +186,16 @@ def main() -> None:
if args.scan_id is not None and args.scan_id <= 0:
sys.exit("--scan-id must be a positive integer")
if args.retry_since and not args.retry_failed:
sys.exit("--retry-since requires --retry-failed.")
if args.retry_error_code and not args.retry_failed:
sys.exit("--retry-error-code requires --retry-failed.")
if args.retry_failed:
if args.metadata_only:
sys.exit("--retry-failed cannot be used with --metadata-only.")
args.mosaic_only = True # implied
# --list-machines doesn't need credentials
if args.list_machines:
base_url = "http://205.149.147.131:8010/"
@@ -261,7 +298,13 @@ def main() -> None:
if args.metadata_only:
log.info("Mode: metadata only (mosaics and tiles skipped)")
elif args.mosaic_only:
log.info("Mode: mosaics only (individual tiles skipped)")
if args.retry_failed:
log.info(
"Mode: mosaic retry (failed scans from %s)",
SCANS_CSV_FILENAME,
)
else:
log.info("Mode: mosaics only (individual tiles skipped)")
if args.dry_run:
log.info("Mode: dry-run (no files will be written)")
@@ -285,6 +328,9 @@ def main() -> None:
metadata_only=args.metadata_only,
scan_id_filter=args.scan_id,
max_tiles=args.max_tiles,
retry_failed=args.retry_failed,
retry_since_year=args.retry_since,
retry_error_code=args.retry_error_code,
)
totals.merge(stats)
finally:
+114 -22
View File
@@ -2,14 +2,22 @@
High-level scrape orchestration: drives the per-machine and per-scan loops.
"""
import csv
import json
import logging
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from tqdm import tqdm
from spruce.download_result import PERMANENT_MISSING, UNKNOWN, error_code_str
from spruce.exif import write_mosaic_exif
from spruce.paths import machine_dir_name, tile_dest, mosaic_dest, _extract_date
from spruce.progress import ProgressTracker, CsvWriter
from spruce.session import MachineSession
# RootView returns ~43-byte 1×1 JPEG placeholders for empty cells; stay well
# below smallest observed real tile (~7 KiB in production samples).
@@ -49,16 +57,64 @@ class RunStats:
self.scans_probe_skipped += other.scans_probe_skipped
self.scans_disk_space_skipped += other.scans_disk_space_skipped
from tqdm import tqdm
from spruce.exif import write_mosaic_exif
from spruce.paths import machine_dir_name, tile_dest, mosaic_dest, _extract_date
from spruce.progress import ProgressTracker, CsvWriter
from spruce.session import MachineSession
log = logging.getLogger(__name__)
def _read_scans_csv_latest(scans_csv_path: Path) -> dict[tuple[str, str], dict[str, str]]:
"""Last row wins per (machine, scan_id)."""
latest: dict[tuple[str, str], dict[str, str]] = {}
if not scans_csv_path.exists():
return latest
with open(scans_csv_path, newline="", encoding="utf-8") as fh:
for row in csv.DictReader(fh):
key = (row.get("machine", ""), row.get("scan_id", ""))
latest[key] = row
return latest
def load_failed_scans_from_csv(
scans_csv_path: Path,
machine_label: str,
*,
since_year: str | None = None,
error_code: str | None = None,
) -> list[dict[str, Any]]:
"""
Dedupe scans.csv by (machine, scan_id); return failed mosaic rows for one machine.
Each dict is suitable as the ``scan`` argument to ``process_scan`` (scan_id,
scan_time, name, status).
"""
latest = _read_scans_csv_latest(scans_csv_path)
out: list[dict[str, Any]] = []
for (_m, _sid), row in latest.items():
if row.get("machine") != machine_label:
continue
if row.get("mosaic_download_status") != "failed":
continue
if error_code is not None and row.get("mosaic_error_code", "") != error_code:
continue
st = row.get("scan_time", "") or ""
if since_year is not None:
yr = st[:4]
if len(yr) < 4 or yr < since_year:
continue
sid = int(row["scan_id"])
out.append(
{
"scan_id": sid,
"scan_time": st,
"name": row.get("name", ""),
"status": row.get("status", "") or "Completed",
"user": row.get("user", ""),
"scan_lines": row.get("scan_lines", ""),
"scan_mode": row.get("scan_mode", ""),
}
)
out.sort(key=lambda s: s["scan_id"])
return out
# ---------------------------------------------------------------------------
# Per-scan helpers
# ---------------------------------------------------------------------------
@@ -499,6 +555,9 @@ def scrape_machine(
metadata_only: bool = False,
scan_id_filter: int | None = None,
max_tiles: int | None = None,
retry_failed: bool = False,
retry_since_year: str | None = None,
retry_error_code: str | None = None,
) -> RunStats:
"""Login, fetch scans, and download all content for one machine."""
sess = MachineSession(machine, config)
@@ -518,8 +577,37 @@ def scrape_machine(
log.error("[%s] Login failed after 3 attempts — skipping machine.", machine["label"])
return RunStats()
if scan_id_filter is not None:
scans: list[dict[str, Any]] = [
if retry_failed:
scans = load_failed_scans_from_csv(
scans_csv.path,
machine["label"],
since_year=retry_since_year,
error_code=retry_error_code,
)
if scan_id_filter is not None:
scans = [s for s in scans if s["scan_id"] == scan_id_filter]
if not scans:
log.warning(
"[%s] Retry: scan_id %d not among failed rows for this machine.",
machine["label"],
scan_id_filter,
)
return RunStats()
log.info("[%s] Mosaic retry: single scan %d.", machine["label"], scan_id_filter)
elif not scans:
log.warning(
"[%s] No failed mosaic rows in scans.csv match retry filters.",
machine["label"],
)
return RunStats()
else:
log.info(
"[%s] Mosaic retry: %d failed scan(s) from scans.csv.",
machine["label"],
len(scans),
)
elif scan_id_filter is not None:
scans = [
{"scan_id": scan_id_filter, "status": "Completed"}
]
log.info("[%s] Targeting scan ID %d.", machine["label"], scan_id_filter)
@@ -529,21 +617,25 @@ def scrape_machine(
log.warning("[%s] No scans found.", machine["label"])
return RunStats()
# Build a set of scan_ids already fully processed in a prior run so we can
# skip them entirely (no metadata fetch, no mosaic request).
# Only scans with a definitive non-pending status count; skipped_metadata_only
# rows still need to be processed in mosaic mode.
# Build existing_ids: scan_ids to skip entirely (no metadata fetch, no HTTP).
# In normal mode: skip anything with a definitive non-pending status.
# In retry mode: only skip scans that are already downloaded or skipped for
# disk-space reasons — failed scans must be re-attempted.
PENDING_STATUSES = {"skipped_metadata_only", ""}
BLOCK_AFTER_RETRY_STATUSES = {"downloaded", "skipped_zero_disk_space"}
existing_ids: set[int] = set()
if not metadata_only and scans_csv._fh.name:
existing_path = Path(scans_csv._fh.name)
if existing_path.exists():
import csv as _csv
with open(existing_path, newline="", encoding="utf-8") as _f:
for _row in _csv.DictReader(_f):
if _row.get("machine") == machine["label"]:
if _row.get("mosaic_download_status", "") not in PENDING_STATUSES:
existing_ids.add(int(_row["scan_id"]))
if not metadata_only:
latest_rows = _read_scans_csv_latest(scans_csv.path)
for (_mlabel, _sid), _row in latest_rows.items():
if _mlabel != machine["label"]:
continue
st = _row.get("mosaic_download_status", "")
if retry_failed:
if st in BLOCK_AFTER_RETRY_STATUSES:
existing_ids.add(int(_row["scan_id"]))
else:
if st not in PENDING_STATUSES:
existing_ids.add(int(_row["scan_id"]))
stats = RunStats()
for scan in scans:
+1
View File
@@ -77,6 +77,7 @@ class CsvWriter:
def __init__(self, path: Path, fields: list[str]) -> None:
is_new = not path.exists()
path.parent.mkdir(parents=True, exist_ok=True)
self.path = path
self._fh = open(path, "a", newline="", encoding="utf-8")
self._writer = csv.DictWriter(self._fh, fieldnames=fields)
if is_new:
+133
View File
@@ -0,0 +1,133 @@
"""Retry queue loading from scans.csv (mosaic_download_status=failed)."""
import csv
from pathlib import Path
import pytest
from spruce.orchestrator import load_failed_scans_from_csv
from spruce.settings import SCANS_CSV_FIELDS
def _blank_row(**kwargs: str) -> dict[str, str]:
row = {k: "" for k in SCANS_CSV_FIELDS}
row.update(kwargs)
return row
def _write_scans_csv(path: Path, rows: list[dict[str, str]]) -> None:
with open(path, "w", newline="", encoding="utf-8") as fh:
w = csv.DictWriter(fh, fieldnames=SCANS_CSV_FIELDS)
w.writeheader()
for r in rows:
w.writerow({k: r.get(k, "") for k in SCANS_CSV_FIELDS})
def test_load_failed_scans_dedup_keeps_last_row(tmp_path: Path) -> None:
path = tmp_path / "scans.csv"
common = {
"machine": "BW1 [X]",
"machine_id": "1",
"scan_id": "100",
"mosaic_url": "http://x/m.jpg",
"mosaic_local_path": "",
"mosaic_on_disk": "False",
}
_write_scans_csv(
path,
[
_blank_row(
**common,
mosaic_download_status="failed",
mosaic_error_code="404",
scan_time="2020-01-01",
),
_blank_row(
**common,
mosaic_download_status="failed",
mosaic_error_code="404",
scan_time="2020-06-01",
),
],
)
out = load_failed_scans_from_csv(path, "BW1 [X]")
assert len(out) == 1
assert out[0]["scan_id"] == 100
assert out[0]["scan_time"] == "2020-06-01"
def test_load_failed_scans_since_year(tmp_path: Path) -> None:
path = tmp_path / "scans.csv"
base = {
"machine": "M",
"machine_id": "1",
"mosaic_url": "",
"mosaic_local_path": "",
"mosaic_on_disk": "",
"mosaic_download_status": "failed",
"mosaic_error_code": "404",
}
_write_scans_csv(
path,
[
_blank_row(**base, scan_id="1", scan_time="2022-12-31"),
_blank_row(**base, scan_id="2", scan_time="2023-01-01"),
_blank_row(**base, scan_id="3", scan_time=""),
],
)
out = load_failed_scans_from_csv(path, "M", since_year="2023")
ids = {s["scan_id"] for s in out}
assert ids == {2}
def test_load_failed_scans_error_code(tmp_path: Path) -> None:
path = tmp_path / "scans.csv"
base = {
"machine": "M",
"machine_id": "1",
"scan_time": "2024-01-01",
"mosaic_url": "",
"mosaic_local_path": "",
"mosaic_on_disk": "",
"mosaic_download_status": "failed",
}
_write_scans_csv(
path,
[
_blank_row(**base, scan_id="10", mosaic_error_code="404"),
_blank_row(**base, scan_id="11", mosaic_error_code="200"),
],
)
out = load_failed_scans_from_csv(path, "M", error_code="200")
assert [s["scan_id"] for s in out] == [11]
def test_load_failed_scans_excludes_downloaded(tmp_path: Path) -> None:
path = tmp_path / "scans.csv"
base = {
"machine": "M",
"machine_id": "1",
"scan_time": "2024-01-01",
"mosaic_url": "",
"mosaic_local_path": "",
"mosaic_on_disk": "True",
}
_write_scans_csv(
path,
[
_blank_row(
**base,
scan_id="5",
mosaic_download_status="downloaded",
mosaic_error_code="",
),
_blank_row(
**base,
scan_id="6",
mosaic_download_status="failed",
mosaic_error_code="404",
),
],
)
out = load_failed_scans_from_csv(path, "M")
assert [s["scan_id"] for s in out] == [6]