Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 655c376a97 | |||
| 1ef9e0206c | |||
| 6390f5d529 |
@@ -6,3 +6,5 @@ tqdm>=4.66.0
|
||||
piexif>=1.1.3
|
||||
Pillow>=10.0.0
|
||||
pytest>=8.0
|
||||
imageio>=2.34
|
||||
imageio-ffmpeg>=0.5
|
||||
|
||||
@@ -0,0 +1,710 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Build a chronological MP4 from downloaded mosaic.jpg files for one machine ROI.
|
||||
|
||||
Reads archives/scans.csv, filters by machine and mosaic_on_disk, optionally
|
||||
restricts to one (start_x, start_y, end_x, end_y) ROI, dedupes by scan_id
|
||||
(last row wins), sorts by scan_time, and encodes frames with imageio/ffmpeg.
|
||||
|
||||
Usage:
|
||||
.venv/bin/python scripts/build_mosaic_movie.py --machine "BW1-4 [AMR-15]"
|
||||
.venv/bin/python scripts/build_mosaic_movie.py --machine "BW1-4 [AMR-15]" \\
|
||||
--roi "195.65,219.22,219.73,235.04" --fps 8 --output /tmp/out.mp4
|
||||
.venv/bin/python scripts/build_mosaic_movie.py --machine "BW1-4 [AMR-15]" --dry-run
|
||||
# Lighter preview (caps tall full-tube mosaics by height — easier on players):
|
||||
.venv/bin/python scripts/build_mosaic_movie.py --machine "BW1-4 [AMR-15]" \\
|
||||
--roi "0.0,0.0,310.0,740.0" --preview
|
||||
# Metadata is drawn on each frame by default (semi-transparent bar at the top);
|
||||
# use --no-metadata-overlay to disable.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from collections import Counter, defaultdict
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
import imageio
|
||||
import numpy as np
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
|
||||
|
||||
class MovieEncodeError(Exception):
|
||||
"""Raised from encoding helpers; caught by encode_movie for batch-safe handling."""
|
||||
|
||||
|
||||
@dataclass
|
||||
class EncodedMovieResult:
|
||||
success: bool
|
||||
machine: str
|
||||
roi: tuple[float, float, float, float]
|
||||
csv_frame_count: int
|
||||
written: int
|
||||
missing: int
|
||||
dropped_read: int
|
||||
output_path: Path | None
|
||||
skipped_reason: str | None
|
||||
size_mb: float | None
|
||||
elapsed_s: float | None
|
||||
|
||||
|
||||
def sanitize_machine_label(label: str) -> str:
|
||||
return label.replace("[", "").replace("]", "").replace(" ", "_").strip("_")
|
||||
|
||||
|
||||
def parse_roi(s: str) -> tuple[float, float, float, float]:
|
||||
parts = [p.strip() for p in s.split(",")]
|
||||
if len(parts) != 4:
|
||||
sys.exit("--roi must be four comma-separated numbers: start_x,start_y,end_x,end_y")
|
||||
try:
|
||||
return tuple(float(p) for p in parts) # type: ignore[return-value]
|
||||
except ValueError as e:
|
||||
sys.exit(f"Invalid --roi numbers: {e}")
|
||||
|
||||
|
||||
def extent_close(
|
||||
row: dict,
|
||||
roi: tuple[float, float, float, float],
|
||||
*,
|
||||
tol: float = 1e-4,
|
||||
) -> bool:
|
||||
keys = ("start_x", "start_y", "end_x", "end_y")
|
||||
try:
|
||||
vals = tuple(float(row[k]) for k in keys)
|
||||
except (KeyError, ValueError):
|
||||
return False
|
||||
return all(abs(a - b) < tol for a, b in zip(vals, roi))
|
||||
|
||||
|
||||
def extent_key(row: dict) -> tuple[str, str, str, str]:
|
||||
"""Stable grouping key from CSV string fields."""
|
||||
return (
|
||||
row.get("start_x", "").strip(),
|
||||
row.get("start_y", "").strip(),
|
||||
row.get("end_x", "").strip(),
|
||||
row.get("end_y", "").strip(),
|
||||
)
|
||||
|
||||
|
||||
def key_to_roi_floats(key: tuple[str, str, str, str]) -> tuple[float, float, float, float]:
|
||||
return tuple(float(x) for x in key) # type: ignore[return-value]
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
p = argparse.ArgumentParser(description=__doc__)
|
||||
p.add_argument("--machine", required=True, help='RootView machine label, e.g. "BW1-4 [AMR-15]"')
|
||||
p.add_argument(
|
||||
"--roi",
|
||||
metavar="SX,SY,EX,EY",
|
||||
help="Restrict to this extent (mm). If omitted, pick the ROI with the most on-disk mosaics.",
|
||||
)
|
||||
p.add_argument("--archive", default="archives", type=Path, help="Archive root (default: archives)")
|
||||
p.add_argument(
|
||||
"--scans-csv",
|
||||
default=None,
|
||||
type=Path,
|
||||
help="Path to scans.csv (default: <archive>/scans.csv)",
|
||||
)
|
||||
p.add_argument(
|
||||
"--output",
|
||||
"-o",
|
||||
type=Path,
|
||||
default=None,
|
||||
help="Output .mp4 path (default: <archive>/movies/<machine>/roi_<...>.mp4)",
|
||||
)
|
||||
p.add_argument("--fps", type=float, default=10.0, help="Frames per second (default: 10)")
|
||||
p.add_argument(
|
||||
"--max-height",
|
||||
type=int,
|
||||
default=None,
|
||||
metavar="PX",
|
||||
help="Scale each frame so height is at most PX pixels (width keeps aspect); "
|
||||
"suited to tall full-tube mosaics. Both dimensions are rounded to even pixels for H.264.",
|
||||
)
|
||||
p.add_argument(
|
||||
"--preview",
|
||||
action="store_true",
|
||||
help="Shorthand for --max-height 1080 (overridden if --max-height is also set).",
|
||||
)
|
||||
p.add_argument(
|
||||
"--dry-run",
|
||||
action="store_true",
|
||||
help="List frames that would be written (no MP4)",
|
||||
)
|
||||
p.add_argument(
|
||||
"--no-metadata-overlay",
|
||||
action="store_true",
|
||||
help="Do not draw scan metadata on each frame (default: overlay on).",
|
||||
)
|
||||
args = p.parse_args()
|
||||
if args.preview and args.max_height is None:
|
||||
args.max_height = 1080
|
||||
return args
|
||||
|
||||
|
||||
def _csv_required_fieldnames() -> tuple[str, ...]:
|
||||
return (
|
||||
"machine",
|
||||
"scan_id",
|
||||
"scan_time",
|
||||
"mosaic_on_disk",
|
||||
"mosaic_local_path",
|
||||
"start_x",
|
||||
"start_y",
|
||||
"end_x",
|
||||
"end_y",
|
||||
)
|
||||
|
||||
|
||||
def validate_scans_csv_header(reader: csv.DictReader, scans_csv: Path) -> None:
|
||||
if reader.fieldnames is None:
|
||||
sys.exit(f"Empty CSV: {scans_csv}")
|
||||
required = _csv_required_fieldnames()
|
||||
missing = [c for c in required if c not in reader.fieldnames]
|
||||
if missing:
|
||||
sys.exit(f"{scans_csv} missing columns: {missing}")
|
||||
|
||||
|
||||
def load_latest_rows(
|
||||
scans_csv: Path,
|
||||
machine: str,
|
||||
roi: tuple[float, float, float, float] | None,
|
||||
) -> list[dict]:
|
||||
"""Last row per scan_id for matching machine; mosaic_on_disk True; optional ROI."""
|
||||
latest: dict[str, dict] = {}
|
||||
with scans_csv.open(newline="", encoding="utf-8") as fh:
|
||||
reader = csv.DictReader(fh)
|
||||
validate_scans_csv_header(reader, scans_csv)
|
||||
|
||||
for row in reader:
|
||||
if row.get("machine", "") != machine:
|
||||
continue
|
||||
if row.get("mosaic_on_disk", "").strip() != "True":
|
||||
continue
|
||||
if roi is not None and not extent_close(row, roi):
|
||||
continue
|
||||
sid = row.get("scan_id", "").strip()
|
||||
if not sid:
|
||||
continue
|
||||
latest[sid] = row
|
||||
|
||||
return list(latest.values())
|
||||
|
||||
|
||||
def load_on_disk_rows_by_machine(scans_csv: Path) -> dict[str, list[dict]]:
|
||||
"""One pass: last row per (machine, scan_id) where mosaic_on_disk True; group by machine."""
|
||||
latest: dict[tuple[str, str], dict] = {}
|
||||
with scans_csv.open(newline="", encoding="utf-8") as fh:
|
||||
reader = csv.DictReader(fh)
|
||||
validate_scans_csv_header(reader, scans_csv)
|
||||
for row in reader:
|
||||
if row.get("mosaic_on_disk", "").strip() != "True":
|
||||
continue
|
||||
sid = row.get("scan_id", "").strip()
|
||||
m = row.get("machine", "").strip()
|
||||
if not sid or not m:
|
||||
continue
|
||||
latest[(m, sid)] = row
|
||||
|
||||
by_machine: dict[str, list[dict]] = defaultdict(list)
|
||||
for (m, _sid), r in latest.items():
|
||||
by_machine[m].append(r)
|
||||
return {k: v for k, v in by_machine.items()}
|
||||
|
||||
|
||||
def pick_top_rois(rows: list[dict], n: int) -> list[tuple[tuple[float, float, float, float], int]]:
|
||||
"""Top n distinct extents by count of deduped rows. Empty if rows empty or n < 1."""
|
||||
if not rows or n < 1:
|
||||
return []
|
||||
counts = Counter(extent_key(r) for r in rows)
|
||||
return [(key_to_roi_floats(key), cnt) for key, cnt in counts.most_common(n)]
|
||||
|
||||
|
||||
def pick_top_roi(rows: list[dict]) -> tuple[float, float, float, float]:
|
||||
if not rows:
|
||||
sys.exit("No rows with mosaic_on_disk=True for this machine (and ROI filter, if any).")
|
||||
return pick_top_rois(rows, 1)[0][0]
|
||||
|
||||
|
||||
def default_output_path(
|
||||
archive: Path,
|
||||
machine: str,
|
||||
roi: tuple[float, float, float, float],
|
||||
*,
|
||||
max_height: int | None,
|
||||
metadata_overlay: bool,
|
||||
rank: int | None = None,
|
||||
) -> Path:
|
||||
safe = sanitize_machine_label(machine)
|
||||
sx, sy, ex, ey = roi
|
||||
base = f"roi_{sx}_{sy}_{ex}_{ey}".replace(" ", "")
|
||||
if rank is not None:
|
||||
base = f"{base}_r{rank}"
|
||||
if max_height is not None:
|
||||
base = f"{base}_h{max_height}"
|
||||
if metadata_overlay:
|
||||
base = f"{base}_meta"
|
||||
name = f"{base}.mp4"
|
||||
return archive / "movies" / safe / name
|
||||
|
||||
|
||||
def resolve_mosaic_path(rel: str, archive: Path) -> Path:
|
||||
"""CSV paths are usually repo-relative, e.g. archives/BW1-4__AMR-15/.../mosaic.jpg."""
|
||||
p = Path(rel)
|
||||
if p.is_absolute():
|
||||
return p.resolve()
|
||||
ar = archive.resolve()
|
||||
norm = rel.replace("\\", "/")
|
||||
if norm.startswith("archives/") or norm.startswith("./archives/"):
|
||||
return (ar.parent / rel).resolve()
|
||||
return (ar / rel).resolve()
|
||||
|
||||
|
||||
def even_dimensions(w: int, h: int) -> tuple[int, int]:
|
||||
"""libx264 requires even width and height."""
|
||||
w2 = w - (w % 2)
|
||||
h2 = h - (h % 2)
|
||||
if w2 < 2 or h2 < 2:
|
||||
raise MovieEncodeError(f"Frame dimensions too small after evenizing: {w}x{h}")
|
||||
return w2, h2
|
||||
|
||||
|
||||
def frame_size_mode(paths: list[Path]) -> tuple[int, int]:
|
||||
sizes: list[tuple[int, int]] = []
|
||||
for p in paths:
|
||||
try:
|
||||
with Image.open(p) as im:
|
||||
sizes.append(im.size)
|
||||
except OSError:
|
||||
continue
|
||||
if not sizes:
|
||||
raise MovieEncodeError("No readable mosaic images to determine frame size.")
|
||||
w, h = Counter(sizes).most_common(1)[0][0]
|
||||
return even_dimensions(w, h)
|
||||
|
||||
|
||||
def encode_size(native_w: int, native_h: int, max_height: int | None) -> tuple[int, int]:
|
||||
"""Native size is already even; optional downscale for preview encodes (cap height)."""
|
||||
if max_height is None:
|
||||
return native_w, native_h
|
||||
if max_height < 32:
|
||||
raise MovieEncodeError("--max-height must be at least 32")
|
||||
cap = max_height - (max_height % 2)
|
||||
if cap < 2:
|
||||
raise MovieEncodeError("--max-height must allow an even height of at least 2")
|
||||
if native_h <= cap:
|
||||
return native_w, native_h
|
||||
new_h = cap
|
||||
new_w = int(round(native_w * (new_h / native_h)))
|
||||
new_w -= new_w % 2
|
||||
if new_w < 2:
|
||||
raise MovieEncodeError("Computed preview width too small; try a larger --max-height")
|
||||
return new_w, new_h
|
||||
|
||||
|
||||
def _truetype_font_candidates() -> list[Path]:
|
||||
windir = os.environ.get("WINDIR", r"C:\Windows")
|
||||
return [
|
||||
Path(windir) / "Fonts" / "arial.ttf",
|
||||
Path(windir) / "Fonts" / "consola.ttf",
|
||||
Path("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf"),
|
||||
Path("/usr/share/fonts/truetype/liberation/LiberationSans-Regular.ttf"),
|
||||
]
|
||||
|
||||
|
||||
def get_overlay_font(size: int) -> ImageFont.FreeTypeFont | ImageFont.ImageFont:
|
||||
for path in _truetype_font_candidates():
|
||||
if path.is_file():
|
||||
try:
|
||||
return ImageFont.truetype(str(path), size=size)
|
||||
except OSError:
|
||||
continue
|
||||
return ImageFont.load_default()
|
||||
|
||||
|
||||
def _truncate(s: str, max_len: int) -> str:
|
||||
s = s.strip()
|
||||
if len(s) <= max_len:
|
||||
return s
|
||||
if max_len <= 3:
|
||||
return s[:max_len]
|
||||
return s[: max_len - 3] + "..."
|
||||
|
||||
|
||||
def metadata_overlay_lines(row: dict, *, max_name_chars: int) -> list[str]:
|
||||
sid = row.get("scan_id", "").strip()
|
||||
st = row.get("scan_time", "").strip()
|
||||
name = _truncate(row.get("name", "").strip(), max_name_chars)
|
||||
nx = row.get("nx", "").strip()
|
||||
ny = row.get("ny", "").strip()
|
||||
dx = row.get("dx", "").strip()
|
||||
dy = row.get("dy", "").strip()
|
||||
lines = row.get("scan_lines", "").strip()
|
||||
mode = row.get("scan_mode", "").strip()
|
||||
user = row.get("user", "").strip()
|
||||
status = row.get("status", "").strip()
|
||||
sx = row.get("start_x", "").strip()
|
||||
sy = row.get("start_y", "").strip()
|
||||
ex = row.get("end_x", "").strip()
|
||||
ey = row.get("end_y", "").strip()
|
||||
machine = row.get("machine", "").strip()
|
||||
|
||||
grid = f"{nx}x{ny}" if nx and ny else ""
|
||||
step = f"{dx}x{dy} mm" if dx and dy else ""
|
||||
geom = " ".join(p for p in (grid, step) if p)
|
||||
orient = f"{lines} / {mode}" if lines or mode else ""
|
||||
|
||||
out: list[str] = []
|
||||
if machine:
|
||||
out.append(machine)
|
||||
if sid or st:
|
||||
parts = []
|
||||
if sid:
|
||||
parts.append(f"id {sid}")
|
||||
if st:
|
||||
parts.append(st)
|
||||
out.append(" ".join(parts))
|
||||
if name:
|
||||
out.append(name)
|
||||
if geom or orient:
|
||||
out.append(" ".join(p for p in (geom, orient) if p))
|
||||
if sx and sy and ex and ey:
|
||||
out.append(f"ROI mm {sx},{sy} .. {ex},{ey}")
|
||||
if user or status:
|
||||
tail: list[str] = []
|
||||
if user:
|
||||
tail.append(f"user {user}")
|
||||
if status:
|
||||
tail.append(status)
|
||||
out.append(" ".join(tail))
|
||||
return out if out else ["(no metadata)"]
|
||||
|
||||
|
||||
def draw_metadata_overlay(
|
||||
rgb: Image.Image,
|
||||
row: dict,
|
||||
*,
|
||||
margin: int,
|
||||
) -> None:
|
||||
"""Draw a semi-transparent label block along the top; mutates rgb in place."""
|
||||
# Panel alpha: ~50% so roots stay visible through the bar.
|
||||
panel_fill = (0, 0, 0, 128)
|
||||
panel_outline = (220, 220, 230, 100)
|
||||
|
||||
w, h = rgb.size
|
||||
margin = max(4, min(margin, w // 8))
|
||||
font_size = max(10, min(22, h // 50))
|
||||
pad = max(4, font_size // 3)
|
||||
line_gap = max(2, font_size // 6)
|
||||
|
||||
def measure_block(fs: int, name_max: int) -> tuple[ImageFont.FreeTypeFont | ImageFont.ImageFont, list[str], int, int]:
|
||||
font = get_overlay_font(fs)
|
||||
lines = metadata_overlay_lines(row, max_name_chars=name_max)
|
||||
tmp = Image.new("RGB", (1, 1))
|
||||
draw = ImageDraw.Draw(tmp)
|
||||
max_tw = 0
|
||||
total_h = 0
|
||||
for line in lines:
|
||||
bbox = draw.textbbox((0, 0), line, font=font)
|
||||
tw = bbox[2] - bbox[0]
|
||||
th = bbox[3] - bbox[1]
|
||||
max_tw = max(max_tw, tw)
|
||||
total_h += th
|
||||
if len(lines) > 1:
|
||||
total_h += line_gap * (len(lines) - 1)
|
||||
block_w = max_tw + 2 * pad
|
||||
block_h = total_h + 2 * pad
|
||||
return font, lines, block_w, block_h
|
||||
|
||||
name_max = max(24, w // max(6, font_size // 2))
|
||||
font, lines, block_w, block_h = measure_block(font_size, name_max)
|
||||
max_block_w = w - 2 * margin
|
||||
max_block_h = min(h // 2, h - 2 * margin)
|
||||
while (block_w > max_block_w or block_h > max_block_h) and font_size > 9:
|
||||
font_size -= 1
|
||||
name_max = max(16, w // max(7, font_size // 2))
|
||||
font, lines, block_w, block_h = measure_block(font_size, name_max)
|
||||
while block_w > max_block_w and name_max > 12:
|
||||
name_max -= 4
|
||||
font, lines, block_w, block_h = measure_block(font_size, name_max)
|
||||
|
||||
bw = min(block_w, max_block_w)
|
||||
bh = min(block_h, max_block_h)
|
||||
x0 = margin
|
||||
x1 = x0 + bw
|
||||
y0 = margin
|
||||
y1 = y0 + bh
|
||||
|
||||
overlay = Image.new("RGBA", (w, h), (0, 0, 0, 0))
|
||||
draw_ov = ImageDraw.Draw(overlay)
|
||||
draw_ov.rounded_rectangle(
|
||||
[x0, y0, x1, y1],
|
||||
radius=max(4, pad // 2),
|
||||
fill=panel_fill,
|
||||
outline=panel_outline,
|
||||
width=1,
|
||||
)
|
||||
|
||||
base = rgb.convert("RGBA")
|
||||
composited = Image.alpha_composite(base, overlay)
|
||||
draw = ImageDraw.Draw(composited)
|
||||
|
||||
cx, cy = x0 + pad, y0 + pad
|
||||
text_bottom_limit = y1 - pad
|
||||
for line in lines:
|
||||
bbox = draw.textbbox((0, 0), line, font=font)
|
||||
th = bbox[3] - bbox[1]
|
||||
if cy + th > text_bottom_limit:
|
||||
break
|
||||
draw.text(
|
||||
(cx, cy),
|
||||
line,
|
||||
fill=(245, 245, 245, 255),
|
||||
font=font,
|
||||
stroke_width=1,
|
||||
stroke_fill=(0, 0, 0, 255),
|
||||
)
|
||||
cy += th + line_gap
|
||||
|
||||
rgb.paste(composited.convert("RGB"))
|
||||
|
||||
|
||||
def encode_movie(
|
||||
*,
|
||||
machine: str,
|
||||
roi: tuple[float, float, float, float],
|
||||
rows: list[dict],
|
||||
archive: Path,
|
||||
max_height: int | None,
|
||||
metadata_overlay: bool,
|
||||
fps: float,
|
||||
output: Path | None = None,
|
||||
dry_run: bool = False,
|
||||
rank: int | None = None,
|
||||
quiet: bool = False,
|
||||
) -> EncodedMovieResult:
|
||||
"""Build MP4 from pre-filtered rows for one ROI. Does not sys.exit on encode failures."""
|
||||
rows_sorted = sorted(
|
||||
rows,
|
||||
key=lambda r: (r.get("scan_time") or "", r.get("scan_id") or ""),
|
||||
)
|
||||
csv_frame_count = len(rows_sorted)
|
||||
|
||||
row_path_candidates: list[tuple[dict, Path]] = []
|
||||
for r in rows_sorted:
|
||||
rel = (r.get("mosaic_local_path") or "").strip()
|
||||
if not rel:
|
||||
continue
|
||||
row_path_candidates.append((r, resolve_mosaic_path(rel, archive)))
|
||||
|
||||
out: Path = output or default_output_path(
|
||||
archive,
|
||||
machine,
|
||||
roi,
|
||||
max_height=max_height,
|
||||
metadata_overlay=metadata_overlay,
|
||||
rank=rank,
|
||||
)
|
||||
|
||||
if not quiet:
|
||||
print(f"Machine: {machine}")
|
||||
print(f"ROI (mm): {roi[0]}, {roi[1]}, {roi[2]}, {roi[3]}")
|
||||
print(f"Frames (from CSV, deduped): {csv_frame_count}")
|
||||
if max_height is not None:
|
||||
print(f"Preview max height: {max_height}px")
|
||||
print(f"Metadata overlay: {'on' if metadata_overlay else 'off'}")
|
||||
|
||||
on_disk = sum(1 for _r, p in row_path_candidates if p.is_file())
|
||||
missing_paths = len(row_path_candidates) - on_disk
|
||||
|
||||
if dry_run:
|
||||
if not quiet:
|
||||
print(f"On-disk files among ordered list: {on_disk} / {len(row_path_candidates)}")
|
||||
for i, (_r, p) in enumerate(row_path_candidates[:5]):
|
||||
print(f" [{i}] {p} exists={p.is_file()}")
|
||||
if len(row_path_candidates) > 10:
|
||||
print(" ...")
|
||||
start = max(0, len(row_path_candidates) - 3)
|
||||
for i, (_r, p) in enumerate(row_path_candidates[start:], start=start):
|
||||
print(f" [{i}] {p} exists={p.is_file()}")
|
||||
return EncodedMovieResult(
|
||||
success=True,
|
||||
machine=machine,
|
||||
roi=roi,
|
||||
csv_frame_count=csv_frame_count,
|
||||
written=0,
|
||||
missing=missing_paths,
|
||||
dropped_read=0,
|
||||
output_path=out,
|
||||
skipped_reason=None,
|
||||
size_mb=None,
|
||||
elapsed_s=None,
|
||||
)
|
||||
|
||||
row_path_pairs = [(r, p) for r, p in row_path_candidates if p.is_file()]
|
||||
if not row_path_pairs:
|
||||
if not quiet:
|
||||
print("No mosaic files on disk for the selected rows.")
|
||||
return EncodedMovieResult(
|
||||
success=False,
|
||||
machine=machine,
|
||||
roi=roi,
|
||||
csv_frame_count=csv_frame_count,
|
||||
written=0,
|
||||
missing=missing_paths,
|
||||
dropped_read=0,
|
||||
output_path=out,
|
||||
skipped_reason="No mosaic files on disk for the selected rows.",
|
||||
size_mb=None,
|
||||
elapsed_s=None,
|
||||
)
|
||||
|
||||
t0 = time.perf_counter()
|
||||
try:
|
||||
ordered_paths = [p for _r, p in row_path_pairs]
|
||||
target_w, target_h = frame_size_mode(ordered_paths)
|
||||
enc_w, enc_h = encode_size(target_w, target_h, max_height)
|
||||
except MovieEncodeError as e:
|
||||
if not quiet:
|
||||
print(str(e))
|
||||
return EncodedMovieResult(
|
||||
success=False,
|
||||
machine=machine,
|
||||
roi=roi,
|
||||
csv_frame_count=csv_frame_count,
|
||||
written=0,
|
||||
missing=missing_paths,
|
||||
dropped_read=0,
|
||||
output_path=out,
|
||||
skipped_reason=str(e),
|
||||
size_mb=None,
|
||||
elapsed_s=None,
|
||||
)
|
||||
|
||||
if not quiet:
|
||||
print(f"Target frame size (mode): {target_w} x {target_h}")
|
||||
if (enc_w, enc_h) != (target_w, target_h):
|
||||
print(f"Encode size (after max-height): {enc_w} x {enc_h}")
|
||||
|
||||
out.parent.mkdir(parents=True, exist_ok=True)
|
||||
written = 0
|
||||
dropped = 0
|
||||
resized = 0
|
||||
scaled_preview = 0
|
||||
try:
|
||||
writer = imageio.get_writer(
|
||||
str(out),
|
||||
fps=float(fps),
|
||||
codec="libx264",
|
||||
quality=8,
|
||||
macro_block_size=1,
|
||||
)
|
||||
try:
|
||||
for row, p in row_path_pairs:
|
||||
try:
|
||||
with Image.open(p) as im:
|
||||
rgb = im.convert("RGB")
|
||||
if rgb.size != (target_w, target_h):
|
||||
rgb = rgb.resize((target_w, target_h), Image.Resampling.LANCZOS)
|
||||
resized += 1
|
||||
if rgb.size != (enc_w, enc_h):
|
||||
rgb = rgb.resize((enc_w, enc_h), Image.Resampling.LANCZOS)
|
||||
scaled_preview += 1
|
||||
if metadata_overlay:
|
||||
draw_metadata_overlay(rgb, row, margin=max(6, enc_w // 80))
|
||||
frame = np.asarray(rgb)
|
||||
writer.append_data(frame)
|
||||
written += 1
|
||||
except OSError:
|
||||
dropped += 1
|
||||
finally:
|
||||
writer.close()
|
||||
except Exception as e:
|
||||
if not quiet:
|
||||
print(f"Encode error: {e}")
|
||||
return EncodedMovieResult(
|
||||
success=False,
|
||||
machine=machine,
|
||||
roi=roi,
|
||||
csv_frame_count=csv_frame_count,
|
||||
written=written,
|
||||
missing=missing_paths,
|
||||
dropped_read=dropped,
|
||||
output_path=out,
|
||||
skipped_reason=f"encode_error: {e}",
|
||||
size_mb=None,
|
||||
elapsed_s=None,
|
||||
)
|
||||
|
||||
elapsed = time.perf_counter() - t0
|
||||
size_mb = out.stat().st_size / (1024 * 1024) if out.is_file() else 0.0
|
||||
if not quiet:
|
||||
print(
|
||||
f"Written: {written} frames (normalized to mode: {resized}, "
|
||||
f"preview scale: {scaled_preview})"
|
||||
)
|
||||
print(f"Dropped (read error): {dropped}")
|
||||
print(f"Missing paths (not on disk): {missing_paths}")
|
||||
print(f"Output: {out.resolve()} ({size_mb:.2f} MB)")
|
||||
|
||||
return EncodedMovieResult(
|
||||
success=True,
|
||||
machine=machine,
|
||||
roi=roi,
|
||||
csv_frame_count=csv_frame_count,
|
||||
written=written,
|
||||
missing=missing_paths,
|
||||
dropped_read=dropped,
|
||||
output_path=out,
|
||||
skipped_reason=None,
|
||||
size_mb=size_mb,
|
||||
elapsed_s=elapsed,
|
||||
)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
args = parse_args()
|
||||
archive: Path = args.archive
|
||||
scans_csv: Path = args.scans_csv or (archive / "scans.csv")
|
||||
if not scans_csv.is_file():
|
||||
sys.exit(f"scans.csv not found: {scans_csv}")
|
||||
|
||||
roi_sel: tuple[float, float, float, float] | None
|
||||
if args.roi:
|
||||
roi_sel = parse_roi(args.roi)
|
||||
else:
|
||||
roi_sel = None
|
||||
|
||||
rows = load_latest_rows(scans_csv, args.machine, roi_sel)
|
||||
if roi_sel is None:
|
||||
roi_sel = pick_top_roi(rows)
|
||||
rows = [r for r in rows if extent_close(r, roi_sel)]
|
||||
|
||||
assert roi_sel is not None
|
||||
max_height: int | None = args.max_height
|
||||
metadata_overlay = not args.no_metadata_overlay
|
||||
|
||||
res = encode_movie(
|
||||
machine=args.machine,
|
||||
roi=roi_sel,
|
||||
rows=rows,
|
||||
archive=archive,
|
||||
max_height=max_height,
|
||||
metadata_overlay=metadata_overlay,
|
||||
fps=float(args.fps),
|
||||
output=args.output,
|
||||
dry_run=bool(args.dry_run),
|
||||
rank=None,
|
||||
quiet=False,
|
||||
)
|
||||
if not res.success and not args.dry_run:
|
||||
sys.exit(1 if res.skipped_reason else 1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,269 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Build preview MP4s for the top N ROIs per machine (default N=2, max-height 1080).
|
||||
|
||||
Reads archives/scans.csv once, groups on-disk mosaic rows by machine, then for
|
||||
each machine picks the most frequent ROI extents and calls encode_movie().
|
||||
|
||||
Usage:
|
||||
python scripts/build_mosaic_movies_batch.py
|
||||
python scripts/build_mosaic_movies_batch.py --dry-run
|
||||
python scripts/build_mosaic_movies_batch.py --skip-existing
|
||||
python scripts/build_mosaic_movies_batch.py --machine "BW2-10 [AMR-22]"
|
||||
python scripts/build_mosaic_movies_batch.py --full-res # no max-height cap
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
_SCRIPTS_DIR = Path(__file__).resolve().parent
|
||||
|
||||
# Import sibling module (run as python scripts/build_mosaic_movies_batch.py from repo root)
|
||||
sys.path.insert(0, str(_SCRIPTS_DIR))
|
||||
import build_mosaic_movie as bmm # noqa: E402
|
||||
|
||||
|
||||
def read_machine_labels(path: Path) -> list[str]:
|
||||
out: list[str] = []
|
||||
with path.open(encoding="utf-8") as fh:
|
||||
for line in fh:
|
||||
s = line.strip()
|
||||
if not s or s.startswith("#"):
|
||||
continue
|
||||
out.append(s)
|
||||
return out
|
||||
|
||||
|
||||
@dataclass
|
||||
class Job:
|
||||
machine: str
|
||||
rank: int
|
||||
roi: tuple[float, float, float, float]
|
||||
extent_count: int
|
||||
rows: list[dict]
|
||||
output_path: Path
|
||||
|
||||
|
||||
def collect_jobs(
|
||||
*,
|
||||
machines: list[str],
|
||||
by_machine: dict[str, list[dict]],
|
||||
archive: Path,
|
||||
top_rois: int,
|
||||
max_height: int | None,
|
||||
metadata_overlay: bool,
|
||||
) -> list[Job]:
|
||||
jobs: list[Job] = []
|
||||
for machine in machines:
|
||||
rows = by_machine.get(machine, [])
|
||||
if not rows:
|
||||
continue
|
||||
picks = bmm.pick_top_rois(rows, top_rois)
|
||||
for rank, (roi, extent_count) in enumerate(picks, start=1):
|
||||
rows_roi = [r for r in rows if bmm.extent_close(r, roi)]
|
||||
out = bmm.default_output_path(
|
||||
archive,
|
||||
machine,
|
||||
roi,
|
||||
max_height=max_height,
|
||||
metadata_overlay=metadata_overlay,
|
||||
rank=rank,
|
||||
)
|
||||
jobs.append(
|
||||
Job(
|
||||
machine=machine,
|
||||
rank=rank,
|
||||
roi=roi,
|
||||
extent_count=extent_count,
|
||||
rows=rows_roi,
|
||||
output_path=out,
|
||||
)
|
||||
)
|
||||
return jobs
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
p = argparse.ArgumentParser(description=__doc__)
|
||||
p.add_argument(
|
||||
"--machines-file",
|
||||
type=Path,
|
||||
default=_SCRIPTS_DIR / "machines.example.txt",
|
||||
help="One machine label per line (default: scripts/machines.example.txt next to this script)",
|
||||
)
|
||||
p.add_argument(
|
||||
"--machine",
|
||||
metavar="LABEL",
|
||||
help='If set, only this machine (overrides list to a single job set), e.g. "BW2-10 [AMR-22]"',
|
||||
)
|
||||
p.add_argument("--archive", type=Path, default=Path("archives"))
|
||||
p.add_argument("--scans-csv", type=Path, default=None)
|
||||
p.add_argument("--top-rois", type=int, default=2, help="How many top extents per machine (default: 2)")
|
||||
p.add_argument("--max-height", type=int, default=1080, help="Preview cap in px (default: 1080)")
|
||||
p.add_argument(
|
||||
"--full-res",
|
||||
action="store_true",
|
||||
help="Disable max-height cap (full mosaic resolution; can be huge)",
|
||||
)
|
||||
p.add_argument("--fps", type=float, default=10.0)
|
||||
p.add_argument("--dry-run", action="store_true")
|
||||
p.add_argument(
|
||||
"--skip-existing",
|
||||
action="store_true",
|
||||
help="Skip encode if output MP4 exists and is non-empty",
|
||||
)
|
||||
p.add_argument("--no-metadata-overlay", action="store_true")
|
||||
args = p.parse_args()
|
||||
if args.full_res:
|
||||
args.max_height = None
|
||||
return args
|
||||
|
||||
|
||||
def main() -> None:
|
||||
args = parse_args()
|
||||
archive: Path = args.archive
|
||||
scans_csv: Path = args.scans_csv or (archive / "scans.csv")
|
||||
if not scans_csv.is_file():
|
||||
sys.exit(f"scans.csv not found: {scans_csv}")
|
||||
|
||||
if args.machine:
|
||||
machines = [args.machine.strip()]
|
||||
else:
|
||||
if not args.machines_file.is_file():
|
||||
sys.exit(f"Machines file not found: {args.machines_file}")
|
||||
machines = read_machine_labels(args.machines_file)
|
||||
if not machines:
|
||||
sys.exit(f"No machine labels in {args.machines_file}")
|
||||
|
||||
t_load0 = time.perf_counter()
|
||||
by_machine = bmm.load_on_disk_rows_by_machine(scans_csv)
|
||||
load_s = time.perf_counter() - t_load0
|
||||
|
||||
max_height: int | None = args.max_height
|
||||
metadata_overlay = not args.no_metadata_overlay
|
||||
|
||||
jobs = collect_jobs(
|
||||
machines=machines,
|
||||
by_machine=by_machine,
|
||||
archive=archive,
|
||||
top_rois=args.top_rois,
|
||||
max_height=max_height,
|
||||
metadata_overlay=metadata_overlay,
|
||||
)
|
||||
total = len(jobs)
|
||||
if total == 0:
|
||||
sys.exit("No jobs (no on-disk mosaics for selected machines).")
|
||||
|
||||
print(f"Loaded scans.csv grouped by machine in {load_s:.2f}s ({total} job(s))")
|
||||
if max_height is not None:
|
||||
print(f"Max height: {max_height}px")
|
||||
else:
|
||||
print("Max height: (full resolution)")
|
||||
print(f"Metadata overlay: {'on' if metadata_overlay else 'off'}")
|
||||
print()
|
||||
|
||||
summary_rows: list[tuple[str, ...]] = []
|
||||
|
||||
for idx, job in enumerate(jobs, start=1):
|
||||
sx, sy, ex, ey = job.roi
|
||||
roi_s = f"{sx},{sy}..{ex},{ey}"
|
||||
print(
|
||||
f"[{idx}/{total}] {job.machine} rank={job.rank} ROI {roi_s} "
|
||||
f"({job.extent_count} CSV rows this extent, {len(job.rows)} deduped rows)"
|
||||
)
|
||||
|
||||
if args.skip_existing and job.output_path.is_file() and job.output_path.stat().st_size > 0:
|
||||
sz = job.output_path.stat().st_size / (1024 * 1024)
|
||||
print(f" SKIP (exists): {job.output_path}")
|
||||
summary_rows.append(
|
||||
(
|
||||
job.machine,
|
||||
str(job.rank),
|
||||
roi_s,
|
||||
str(len(job.rows)),
|
||||
"-",
|
||||
"-",
|
||||
str(job.output_path),
|
||||
"SKIP (exists)",
|
||||
f"{sz:.2f}",
|
||||
"-",
|
||||
)
|
||||
)
|
||||
continue
|
||||
|
||||
enc_t0 = time.perf_counter()
|
||||
res = bmm.encode_movie(
|
||||
machine=job.machine,
|
||||
roi=job.roi,
|
||||
rows=job.rows,
|
||||
archive=archive,
|
||||
max_height=max_height,
|
||||
metadata_overlay=metadata_overlay,
|
||||
fps=float(args.fps),
|
||||
output=None,
|
||||
dry_run=args.dry_run,
|
||||
rank=job.rank,
|
||||
quiet=True,
|
||||
)
|
||||
enc_elapsed = time.perf_counter() - enc_t0
|
||||
|
||||
if res.success:
|
||||
if args.dry_run:
|
||||
print(f" dry-run OK -> {res.output_path} (missing files: {res.missing})")
|
||||
else:
|
||||
print(
|
||||
f" OK {res.written} frames "
|
||||
f"{(res.size_mb or 0):.2f} MB {enc_elapsed:.1f}s"
|
||||
)
|
||||
else:
|
||||
print(f" FAIL {res.skipped_reason or 'unknown'}")
|
||||
|
||||
status = "OK" if res.success else "FAIL"
|
||||
if not res.success and res.skipped_reason:
|
||||
status = f"FAIL: {res.skipped_reason[:40]}"
|
||||
mb = f"{res.size_mb:.2f}" if res.size_mb is not None else "-"
|
||||
es = f"{enc_elapsed:.1f}" if not args.dry_run else "-"
|
||||
w = str(res.written) if not args.dry_run else "0"
|
||||
if args.dry_run:
|
||||
status = "dry-run"
|
||||
mb = "-"
|
||||
|
||||
summary_rows.append(
|
||||
(
|
||||
job.machine,
|
||||
str(job.rank),
|
||||
roi_s,
|
||||
str(len(job.rows)),
|
||||
w,
|
||||
str(res.missing),
|
||||
str(res.output_path or job.output_path),
|
||||
status,
|
||||
mb,
|
||||
es,
|
||||
)
|
||||
)
|
||||
print()
|
||||
|
||||
print("=" * 120)
|
||||
hdr = (
|
||||
f"{'machine':<26} {'rk':>2} {'ROI (mm)':<36} {'csv':>4} {'out':>5} "
|
||||
f"{'miss':>4} {'MB':>7} {'s':>6} {'status':<22}"
|
||||
)
|
||||
print(hdr)
|
||||
print("-" * 120)
|
||||
for row in summary_rows:
|
||||
m, rk, roi_s, csv_n, wrt, miss, path, status, mb, es = row
|
||||
print(
|
||||
f"{m:<26} {rk:>2} {roi_s:<36} {csv_n:>4} {wrt:>5} {miss:>4} "
|
||||
f"{mb:>7} {es:>6} {status:<22}"
|
||||
)
|
||||
print(f" -> {path}")
|
||||
print("=" * 120)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -2,11 +2,13 @@
|
||||
"""
|
||||
Report mosaic download progress from archives/scans.csv.
|
||||
|
||||
Output is formatted as Markdown. Add --by-year for a per-machine ×
|
||||
per-year breakdown table.
|
||||
Output is Markdown. Use ``--by-year`` for a per-machine × per-year
|
||||
done/failed table. When the first mosaic pass is complete (no pending rows)
|
||||
but failures remain, a **Mosaic retry estimates** section is printed with
|
||||
queue counts and duration hints.
|
||||
|
||||
Rate/ETA require two calls at least 60 s apart. Mean mosaic size is
|
||||
sampled from up to 100 already-downloaded files and cached for 1 hour.
|
||||
Rate/ETA use a 30-minute rolling window when snapshots show progress.
|
||||
Mean mosaic size is sampled from up to 100 downloads (1-hour cache).
|
||||
|
||||
Usage:
|
||||
python scripts/mosaic_progress_report.py [--archive DIR] [--by-year]
|
||||
@@ -30,6 +32,11 @@ _R_PRE19 = 1.00
|
||||
_R_PURGED = 0.00
|
||||
_R_RECENT = 0.82
|
||||
|
||||
FIRST_PASS_FALLBACK_RATE_PER_HR = 1100.0
|
||||
RETRY_OPTIMISTIC_RATE_PER_HR = 1800.0
|
||||
RETRY_REALISTIC_RATE_PER_HR = 1100.0
|
||||
RETRY_PESSIMISTIC_RATE_PER_HR = 300.0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
@@ -127,6 +134,12 @@ def _expected_remaining(pending_rows: list[dict]) -> float:
|
||||
return count
|
||||
|
||||
|
||||
def _retry_hours_from_rate(n_scans: int, rate_per_hr: float) -> str:
|
||||
if n_scans <= 0 or rate_per_hr <= 0:
|
||||
return "—"
|
||||
return _fmt_duration(n_scans / rate_per_hr * 3600.0)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -213,15 +226,32 @@ def main() -> None:
|
||||
|
||||
rate_per_sec: float | None = None
|
||||
rate_window_str = ""
|
||||
snap_delta_proc = 0
|
||||
if recent:
|
||||
oldest = recent[0]
|
||||
dt = now.timestamp() - oldest["ts"]
|
||||
dp = processed - oldest["proc"]
|
||||
snap_delta_proc = dp
|
||||
if dt >= 60 and dp > 0:
|
||||
rate_per_sec = dp / dt
|
||||
window_min = dt / 60
|
||||
rate_window_str = f"{window_min:.0f}-min avg"
|
||||
|
||||
# One-time baseline after initial mosaic crawl finished (no pending rows).
|
||||
if pending == 0 and "first_pass_mean_rate_per_hr" not in cache:
|
||||
cache["first_pass_completed_at"] = now.isoformat()
|
||||
cache["first_pass_processed"] = total
|
||||
cache["first_pass_mean_rate_per_hr"] = FIRST_PASS_FALLBACK_RATE_PER_HR
|
||||
|
||||
first_pass_rate_hr = float(
|
||||
cache.get("first_pass_mean_rate_per_hr", FIRST_PASS_FALLBACK_RATE_PER_HR)
|
||||
)
|
||||
live_rate_hr = rate_per_sec * 3600 if rate_per_sec else None
|
||||
# Active scrape shows progress in snapshots; idle archive shows dp == 0.
|
||||
retry_estimate_rate_hr = (
|
||||
live_rate_hr if live_rate_hr is not None else first_pass_rate_hr
|
||||
)
|
||||
|
||||
# --- Disk space ---
|
||||
mean_bytes: float | None = None
|
||||
size_note = ""
|
||||
@@ -325,6 +355,76 @@ def main() -> None:
|
||||
align=["l", "r", "r", "r", "r", "r"],
|
||||
))
|
||||
|
||||
# -----------------------------------------------------------------------
|
||||
# Retry estimates (first pass complete: pending == 0, failures remain)
|
||||
# -----------------------------------------------------------------------
|
||||
if failed > 0 and pending == 0:
|
||||
failed_rows_list = [
|
||||
r for r in latest.values()
|
||||
if r.get("mosaic_download_status") == "failed"
|
||||
]
|
||||
n_all = len(failed_rows_list)
|
||||
n_2023 = sum(
|
||||
1 for r in failed_rows_list
|
||||
if (r.get("scan_time") or "")[:4] >= "2023"
|
||||
and len((r.get("scan_time") or "")[:4]) == 4
|
||||
)
|
||||
n_200 = sum(
|
||||
1 for r in failed_rows_list
|
||||
if r.get("mosaic_error_code") == "200"
|
||||
)
|
||||
rate_note = (
|
||||
"rolling 30-min window"
|
||||
if snap_delta_proc > 0
|
||||
else f"first-pass baseline ({first_pass_rate_hr:,.0f}/hr)"
|
||||
)
|
||||
print()
|
||||
print("### Mosaic retry estimates\n")
|
||||
print(
|
||||
f"*Suggested command after server fix:* "
|
||||
f"`python scraper.py --retry-failed --workers 2` "
|
||||
f"(filters: `--retry-since YEAR`, `--retry-error-code CODE`)*\n"
|
||||
)
|
||||
print(
|
||||
f"*ETA column uses **{retry_estimate_rate_hr:,.0f} scans/hr** "
|
||||
f"({rate_note}). Fixed columns use scenario rates.*\n"
|
||||
)
|
||||
est_hdr = (
|
||||
"Retry scope",
|
||||
"Count",
|
||||
f"@{RETRY_OPTIMISTIC_RATE_PER_HR:.0f}/hr",
|
||||
f"@{RETRY_REALISTIC_RATE_PER_HR:.0f}/hr",
|
||||
f"@{RETRY_PESSIMISTIC_RATE_PER_HR:.0f}/hr",
|
||||
f"@{retry_estimate_rate_hr:.0f}/hr",
|
||||
)
|
||||
retry_tbl_rows = [
|
||||
[
|
||||
"HTTP 200 (empty body)",
|
||||
f"{n_200:,}",
|
||||
_retry_hours_from_rate(n_200, RETRY_OPTIMISTIC_RATE_PER_HR),
|
||||
_retry_hours_from_rate(n_200, RETRY_REALISTIC_RATE_PER_HR),
|
||||
_retry_hours_from_rate(n_200, RETRY_PESSIMISTIC_RATE_PER_HR),
|
||||
_retry_hours_from_rate(n_200, retry_estimate_rate_hr),
|
||||
],
|
||||
[
|
||||
"Failed, scan_time ≥ 2023",
|
||||
f"{n_2023:,}",
|
||||
_retry_hours_from_rate(n_2023, RETRY_OPTIMISTIC_RATE_PER_HR),
|
||||
_retry_hours_from_rate(n_2023, RETRY_REALISTIC_RATE_PER_HR),
|
||||
_retry_hours_from_rate(n_2023, RETRY_PESSIMISTIC_RATE_PER_HR),
|
||||
_retry_hours_from_rate(n_2023, retry_estimate_rate_hr),
|
||||
],
|
||||
[
|
||||
"**All failed**",
|
||||
f"**{n_all:,}**",
|
||||
_retry_hours_from_rate(n_all, RETRY_OPTIMISTIC_RATE_PER_HR),
|
||||
_retry_hours_from_rate(n_all, RETRY_REALISTIC_RATE_PER_HR),
|
||||
_retry_hours_from_rate(n_all, RETRY_PESSIMISTIC_RATE_PER_HR),
|
||||
_retry_hours_from_rate(n_all, retry_estimate_rate_hr),
|
||||
],
|
||||
]
|
||||
print(_md_table(list(est_hdr), retry_tbl_rows, align=["l", "r", "r", "r", "r", "r"]))
|
||||
|
||||
# -----------------------------------------------------------------------
|
||||
# --by-year table
|
||||
# -----------------------------------------------------------------------
|
||||
|
||||
+47
-1
@@ -84,6 +84,33 @@ def parse_args() -> argparse.Namespace:
|
||||
"inventorying all scans across all machines."
|
||||
),
|
||||
)
|
||||
p.add_argument(
|
||||
"--retry-failed",
|
||||
action="store_true",
|
||||
help=(
|
||||
"Mosaic-only: re-attempt scans whose latest scans.csv row has "
|
||||
"mosaic_download_status=failed (queue from CSV, not the server list). "
|
||||
"Implies --mosaic-only."
|
||||
),
|
||||
)
|
||||
p.add_argument(
|
||||
"--retry-since",
|
||||
metavar="YEAR",
|
||||
default=None,
|
||||
help=(
|
||||
"With --retry-failed: only scans with scan_time year >= YEAR "
|
||||
"(e.g. 2023)."
|
||||
),
|
||||
)
|
||||
p.add_argument(
|
||||
"--retry-error-code",
|
||||
metavar="CODE",
|
||||
default=None,
|
||||
help=(
|
||||
"With --retry-failed: filter by mosaic_error_code "
|
||||
"(e.g. 200 for empty-body failures)."
|
||||
),
|
||||
)
|
||||
p.add_argument(
|
||||
"--dry-run",
|
||||
action="store_true",
|
||||
@@ -159,6 +186,16 @@ def main() -> None:
|
||||
if args.scan_id is not None and args.scan_id <= 0:
|
||||
sys.exit("--scan-id must be a positive integer")
|
||||
|
||||
if args.retry_since and not args.retry_failed:
|
||||
sys.exit("--retry-since requires --retry-failed.")
|
||||
if args.retry_error_code and not args.retry_failed:
|
||||
sys.exit("--retry-error-code requires --retry-failed.")
|
||||
|
||||
if args.retry_failed:
|
||||
if args.metadata_only:
|
||||
sys.exit("--retry-failed cannot be used with --metadata-only.")
|
||||
args.mosaic_only = True # implied
|
||||
|
||||
# --list-machines doesn't need credentials
|
||||
if args.list_machines:
|
||||
base_url = "http://205.149.147.131:8010/"
|
||||
@@ -261,7 +298,13 @@ def main() -> None:
|
||||
if args.metadata_only:
|
||||
log.info("Mode: metadata only (mosaics and tiles skipped)")
|
||||
elif args.mosaic_only:
|
||||
log.info("Mode: mosaics only (individual tiles skipped)")
|
||||
if args.retry_failed:
|
||||
log.info(
|
||||
"Mode: mosaic retry (failed scans from %s)",
|
||||
SCANS_CSV_FILENAME,
|
||||
)
|
||||
else:
|
||||
log.info("Mode: mosaics only (individual tiles skipped)")
|
||||
if args.dry_run:
|
||||
log.info("Mode: dry-run (no files will be written)")
|
||||
|
||||
@@ -285,6 +328,9 @@ def main() -> None:
|
||||
metadata_only=args.metadata_only,
|
||||
scan_id_filter=args.scan_id,
|
||||
max_tiles=args.max_tiles,
|
||||
retry_failed=args.retry_failed,
|
||||
retry_since_year=args.retry_since,
|
||||
retry_error_code=args.retry_error_code,
|
||||
)
|
||||
totals.merge(stats)
|
||||
finally:
|
||||
|
||||
+114
-22
@@ -2,14 +2,22 @@
|
||||
High-level scrape orchestration: drives the per-machine and per-scan loops.
|
||||
"""
|
||||
|
||||
import csv
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from tqdm import tqdm
|
||||
|
||||
from spruce.download_result import PERMANENT_MISSING, UNKNOWN, error_code_str
|
||||
from spruce.exif import write_mosaic_exif
|
||||
from spruce.paths import machine_dir_name, tile_dest, mosaic_dest, _extract_date
|
||||
from spruce.progress import ProgressTracker, CsvWriter
|
||||
from spruce.session import MachineSession
|
||||
|
||||
# RootView returns ~43-byte 1×1 JPEG placeholders for empty cells; stay well
|
||||
# below smallest observed real tile (~7 KiB in production samples).
|
||||
@@ -49,16 +57,64 @@ class RunStats:
|
||||
self.scans_probe_skipped += other.scans_probe_skipped
|
||||
self.scans_disk_space_skipped += other.scans_disk_space_skipped
|
||||
|
||||
from tqdm import tqdm
|
||||
|
||||
from spruce.exif import write_mosaic_exif
|
||||
from spruce.paths import machine_dir_name, tile_dest, mosaic_dest, _extract_date
|
||||
from spruce.progress import ProgressTracker, CsvWriter
|
||||
from spruce.session import MachineSession
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _read_scans_csv_latest(scans_csv_path: Path) -> dict[tuple[str, str], dict[str, str]]:
|
||||
"""Last row wins per (machine, scan_id)."""
|
||||
latest: dict[tuple[str, str], dict[str, str]] = {}
|
||||
if not scans_csv_path.exists():
|
||||
return latest
|
||||
with open(scans_csv_path, newline="", encoding="utf-8") as fh:
|
||||
for row in csv.DictReader(fh):
|
||||
key = (row.get("machine", ""), row.get("scan_id", ""))
|
||||
latest[key] = row
|
||||
return latest
|
||||
|
||||
|
||||
def load_failed_scans_from_csv(
|
||||
scans_csv_path: Path,
|
||||
machine_label: str,
|
||||
*,
|
||||
since_year: str | None = None,
|
||||
error_code: str | None = None,
|
||||
) -> list[dict[str, Any]]:
|
||||
"""
|
||||
Dedupe scans.csv by (machine, scan_id); return failed mosaic rows for one machine.
|
||||
|
||||
Each dict is suitable as the ``scan`` argument to ``process_scan`` (scan_id,
|
||||
scan_time, name, status).
|
||||
"""
|
||||
latest = _read_scans_csv_latest(scans_csv_path)
|
||||
out: list[dict[str, Any]] = []
|
||||
for (_m, _sid), row in latest.items():
|
||||
if row.get("machine") != machine_label:
|
||||
continue
|
||||
if row.get("mosaic_download_status") != "failed":
|
||||
continue
|
||||
if error_code is not None and row.get("mosaic_error_code", "") != error_code:
|
||||
continue
|
||||
st = row.get("scan_time", "") or ""
|
||||
if since_year is not None:
|
||||
yr = st[:4]
|
||||
if len(yr) < 4 or yr < since_year:
|
||||
continue
|
||||
sid = int(row["scan_id"])
|
||||
out.append(
|
||||
{
|
||||
"scan_id": sid,
|
||||
"scan_time": st,
|
||||
"name": row.get("name", ""),
|
||||
"status": row.get("status", "") or "Completed",
|
||||
"user": row.get("user", ""),
|
||||
"scan_lines": row.get("scan_lines", ""),
|
||||
"scan_mode": row.get("scan_mode", ""),
|
||||
}
|
||||
)
|
||||
out.sort(key=lambda s: s["scan_id"])
|
||||
return out
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Per-scan helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -499,6 +555,9 @@ def scrape_machine(
|
||||
metadata_only: bool = False,
|
||||
scan_id_filter: int | None = None,
|
||||
max_tiles: int | None = None,
|
||||
retry_failed: bool = False,
|
||||
retry_since_year: str | None = None,
|
||||
retry_error_code: str | None = None,
|
||||
) -> RunStats:
|
||||
"""Login, fetch scans, and download all content for one machine."""
|
||||
sess = MachineSession(machine, config)
|
||||
@@ -518,8 +577,37 @@ def scrape_machine(
|
||||
log.error("[%s] Login failed after 3 attempts — skipping machine.", machine["label"])
|
||||
return RunStats()
|
||||
|
||||
if scan_id_filter is not None:
|
||||
scans: list[dict[str, Any]] = [
|
||||
if retry_failed:
|
||||
scans = load_failed_scans_from_csv(
|
||||
scans_csv.path,
|
||||
machine["label"],
|
||||
since_year=retry_since_year,
|
||||
error_code=retry_error_code,
|
||||
)
|
||||
if scan_id_filter is not None:
|
||||
scans = [s for s in scans if s["scan_id"] == scan_id_filter]
|
||||
if not scans:
|
||||
log.warning(
|
||||
"[%s] Retry: scan_id %d not among failed rows for this machine.",
|
||||
machine["label"],
|
||||
scan_id_filter,
|
||||
)
|
||||
return RunStats()
|
||||
log.info("[%s] Mosaic retry: single scan %d.", machine["label"], scan_id_filter)
|
||||
elif not scans:
|
||||
log.warning(
|
||||
"[%s] No failed mosaic rows in scans.csv match retry filters.",
|
||||
machine["label"],
|
||||
)
|
||||
return RunStats()
|
||||
else:
|
||||
log.info(
|
||||
"[%s] Mosaic retry: %d failed scan(s) from scans.csv.",
|
||||
machine["label"],
|
||||
len(scans),
|
||||
)
|
||||
elif scan_id_filter is not None:
|
||||
scans = [
|
||||
{"scan_id": scan_id_filter, "status": "Completed"}
|
||||
]
|
||||
log.info("[%s] Targeting scan ID %d.", machine["label"], scan_id_filter)
|
||||
@@ -529,21 +617,25 @@ def scrape_machine(
|
||||
log.warning("[%s] No scans found.", machine["label"])
|
||||
return RunStats()
|
||||
|
||||
# Build a set of scan_ids already fully processed in a prior run so we can
|
||||
# skip them entirely (no metadata fetch, no mosaic request).
|
||||
# Only scans with a definitive non-pending status count; skipped_metadata_only
|
||||
# rows still need to be processed in mosaic mode.
|
||||
# Build existing_ids: scan_ids to skip entirely (no metadata fetch, no HTTP).
|
||||
# In normal mode: skip anything with a definitive non-pending status.
|
||||
# In retry mode: only skip scans that are already downloaded or skipped for
|
||||
# disk-space reasons — failed scans must be re-attempted.
|
||||
PENDING_STATUSES = {"skipped_metadata_only", ""}
|
||||
BLOCK_AFTER_RETRY_STATUSES = {"downloaded", "skipped_zero_disk_space"}
|
||||
existing_ids: set[int] = set()
|
||||
if not metadata_only and scans_csv._fh.name:
|
||||
existing_path = Path(scans_csv._fh.name)
|
||||
if existing_path.exists():
|
||||
import csv as _csv
|
||||
with open(existing_path, newline="", encoding="utf-8") as _f:
|
||||
for _row in _csv.DictReader(_f):
|
||||
if _row.get("machine") == machine["label"]:
|
||||
if _row.get("mosaic_download_status", "") not in PENDING_STATUSES:
|
||||
existing_ids.add(int(_row["scan_id"]))
|
||||
if not metadata_only:
|
||||
latest_rows = _read_scans_csv_latest(scans_csv.path)
|
||||
for (_mlabel, _sid), _row in latest_rows.items():
|
||||
if _mlabel != machine["label"]:
|
||||
continue
|
||||
st = _row.get("mosaic_download_status", "")
|
||||
if retry_failed:
|
||||
if st in BLOCK_AFTER_RETRY_STATUSES:
|
||||
existing_ids.add(int(_row["scan_id"]))
|
||||
else:
|
||||
if st not in PENDING_STATUSES:
|
||||
existing_ids.add(int(_row["scan_id"]))
|
||||
|
||||
stats = RunStats()
|
||||
for scan in scans:
|
||||
|
||||
@@ -77,6 +77,7 @@ class CsvWriter:
|
||||
def __init__(self, path: Path, fields: list[str]) -> None:
|
||||
is_new = not path.exists()
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
self.path = path
|
||||
self._fh = open(path, "a", newline="", encoding="utf-8")
|
||||
self._writer = csv.DictWriter(self._fh, fieldnames=fields)
|
||||
if is_new:
|
||||
|
||||
@@ -0,0 +1,133 @@
|
||||
"""Retry queue loading from scans.csv (mosaic_download_status=failed)."""
|
||||
|
||||
import csv
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from spruce.orchestrator import load_failed_scans_from_csv
|
||||
from spruce.settings import SCANS_CSV_FIELDS
|
||||
|
||||
|
||||
def _blank_row(**kwargs: str) -> dict[str, str]:
|
||||
row = {k: "" for k in SCANS_CSV_FIELDS}
|
||||
row.update(kwargs)
|
||||
return row
|
||||
|
||||
|
||||
def _write_scans_csv(path: Path, rows: list[dict[str, str]]) -> None:
|
||||
with open(path, "w", newline="", encoding="utf-8") as fh:
|
||||
w = csv.DictWriter(fh, fieldnames=SCANS_CSV_FIELDS)
|
||||
w.writeheader()
|
||||
for r in rows:
|
||||
w.writerow({k: r.get(k, "") for k in SCANS_CSV_FIELDS})
|
||||
|
||||
|
||||
def test_load_failed_scans_dedup_keeps_last_row(tmp_path: Path) -> None:
|
||||
path = tmp_path / "scans.csv"
|
||||
common = {
|
||||
"machine": "BW1 [X]",
|
||||
"machine_id": "1",
|
||||
"scan_id": "100",
|
||||
"mosaic_url": "http://x/m.jpg",
|
||||
"mosaic_local_path": "",
|
||||
"mosaic_on_disk": "False",
|
||||
}
|
||||
_write_scans_csv(
|
||||
path,
|
||||
[
|
||||
_blank_row(
|
||||
**common,
|
||||
mosaic_download_status="failed",
|
||||
mosaic_error_code="404",
|
||||
scan_time="2020-01-01",
|
||||
),
|
||||
_blank_row(
|
||||
**common,
|
||||
mosaic_download_status="failed",
|
||||
mosaic_error_code="404",
|
||||
scan_time="2020-06-01",
|
||||
),
|
||||
],
|
||||
)
|
||||
out = load_failed_scans_from_csv(path, "BW1 [X]")
|
||||
assert len(out) == 1
|
||||
assert out[0]["scan_id"] == 100
|
||||
assert out[0]["scan_time"] == "2020-06-01"
|
||||
|
||||
|
||||
def test_load_failed_scans_since_year(tmp_path: Path) -> None:
|
||||
path = tmp_path / "scans.csv"
|
||||
base = {
|
||||
"machine": "M",
|
||||
"machine_id": "1",
|
||||
"mosaic_url": "",
|
||||
"mosaic_local_path": "",
|
||||
"mosaic_on_disk": "",
|
||||
"mosaic_download_status": "failed",
|
||||
"mosaic_error_code": "404",
|
||||
}
|
||||
_write_scans_csv(
|
||||
path,
|
||||
[
|
||||
_blank_row(**base, scan_id="1", scan_time="2022-12-31"),
|
||||
_blank_row(**base, scan_id="2", scan_time="2023-01-01"),
|
||||
_blank_row(**base, scan_id="3", scan_time=""),
|
||||
],
|
||||
)
|
||||
out = load_failed_scans_from_csv(path, "M", since_year="2023")
|
||||
ids = {s["scan_id"] for s in out}
|
||||
assert ids == {2}
|
||||
|
||||
|
||||
def test_load_failed_scans_error_code(tmp_path: Path) -> None:
|
||||
path = tmp_path / "scans.csv"
|
||||
base = {
|
||||
"machine": "M",
|
||||
"machine_id": "1",
|
||||
"scan_time": "2024-01-01",
|
||||
"mosaic_url": "",
|
||||
"mosaic_local_path": "",
|
||||
"mosaic_on_disk": "",
|
||||
"mosaic_download_status": "failed",
|
||||
}
|
||||
_write_scans_csv(
|
||||
path,
|
||||
[
|
||||
_blank_row(**base, scan_id="10", mosaic_error_code="404"),
|
||||
_blank_row(**base, scan_id="11", mosaic_error_code="200"),
|
||||
],
|
||||
)
|
||||
out = load_failed_scans_from_csv(path, "M", error_code="200")
|
||||
assert [s["scan_id"] for s in out] == [11]
|
||||
|
||||
|
||||
def test_load_failed_scans_excludes_downloaded(tmp_path: Path) -> None:
|
||||
path = tmp_path / "scans.csv"
|
||||
base = {
|
||||
"machine": "M",
|
||||
"machine_id": "1",
|
||||
"scan_time": "2024-01-01",
|
||||
"mosaic_url": "",
|
||||
"mosaic_local_path": "",
|
||||
"mosaic_on_disk": "True",
|
||||
}
|
||||
_write_scans_csv(
|
||||
path,
|
||||
[
|
||||
_blank_row(
|
||||
**base,
|
||||
scan_id="5",
|
||||
mosaic_download_status="downloaded",
|
||||
mosaic_error_code="",
|
||||
),
|
||||
_blank_row(
|
||||
**base,
|
||||
scan_id="6",
|
||||
mosaic_download_status="failed",
|
||||
mosaic_error_code="404",
|
||||
),
|
||||
],
|
||||
)
|
||||
out = load_failed_scans_from_csv(path, "M")
|
||||
assert [s["scan_id"] for s in out] == [6]
|
||||
Reference in New Issue
Block a user