Compare commits
2 Commits
6390f5d529
...
655c376a97
| Author | SHA1 | Date | |
|---|---|---|---|
| 655c376a97 | |||
| 1ef9e0206c |
@@ -6,3 +6,5 @@ tqdm>=4.66.0
|
|||||||
piexif>=1.1.3
|
piexif>=1.1.3
|
||||||
Pillow>=10.0.0
|
Pillow>=10.0.0
|
||||||
pytest>=8.0
|
pytest>=8.0
|
||||||
|
imageio>=2.34
|
||||||
|
imageio-ffmpeg>=0.5
|
||||||
|
|||||||
@@ -0,0 +1,710 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Build a chronological MP4 from downloaded mosaic.jpg files for one machine ROI.
|
||||||
|
|
||||||
|
Reads archives/scans.csv, filters by machine and mosaic_on_disk, optionally
|
||||||
|
restricts to one (start_x, start_y, end_x, end_y) ROI, dedupes by scan_id
|
||||||
|
(last row wins), sorts by scan_time, and encodes frames with imageio/ffmpeg.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
.venv/bin/python scripts/build_mosaic_movie.py --machine "BW1-4 [AMR-15]"
|
||||||
|
.venv/bin/python scripts/build_mosaic_movie.py --machine "BW1-4 [AMR-15]" \\
|
||||||
|
--roi "195.65,219.22,219.73,235.04" --fps 8 --output /tmp/out.mp4
|
||||||
|
.venv/bin/python scripts/build_mosaic_movie.py --machine "BW1-4 [AMR-15]" --dry-run
|
||||||
|
# Lighter preview (caps tall full-tube mosaics by height — easier on players):
|
||||||
|
.venv/bin/python scripts/build_mosaic_movie.py --machine "BW1-4 [AMR-15]" \\
|
||||||
|
--roi "0.0,0.0,310.0,740.0" --preview
|
||||||
|
# Metadata is drawn on each frame by default (semi-transparent bar at the top);
|
||||||
|
# use --no-metadata-overlay to disable.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import csv
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
from collections import Counter, defaultdict
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import imageio
|
||||||
|
import numpy as np
|
||||||
|
from PIL import Image, ImageDraw, ImageFont
|
||||||
|
|
||||||
|
|
||||||
|
class MovieEncodeError(Exception):
|
||||||
|
"""Raised from encoding helpers; caught by encode_movie for batch-safe handling."""
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class EncodedMovieResult:
|
||||||
|
success: bool
|
||||||
|
machine: str
|
||||||
|
roi: tuple[float, float, float, float]
|
||||||
|
csv_frame_count: int
|
||||||
|
written: int
|
||||||
|
missing: int
|
||||||
|
dropped_read: int
|
||||||
|
output_path: Path | None
|
||||||
|
skipped_reason: str | None
|
||||||
|
size_mb: float | None
|
||||||
|
elapsed_s: float | None
|
||||||
|
|
||||||
|
|
||||||
|
def sanitize_machine_label(label: str) -> str:
|
||||||
|
return label.replace("[", "").replace("]", "").replace(" ", "_").strip("_")
|
||||||
|
|
||||||
|
|
||||||
|
def parse_roi(s: str) -> tuple[float, float, float, float]:
|
||||||
|
parts = [p.strip() for p in s.split(",")]
|
||||||
|
if len(parts) != 4:
|
||||||
|
sys.exit("--roi must be four comma-separated numbers: start_x,start_y,end_x,end_y")
|
||||||
|
try:
|
||||||
|
return tuple(float(p) for p in parts) # type: ignore[return-value]
|
||||||
|
except ValueError as e:
|
||||||
|
sys.exit(f"Invalid --roi numbers: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
def extent_close(
|
||||||
|
row: dict,
|
||||||
|
roi: tuple[float, float, float, float],
|
||||||
|
*,
|
||||||
|
tol: float = 1e-4,
|
||||||
|
) -> bool:
|
||||||
|
keys = ("start_x", "start_y", "end_x", "end_y")
|
||||||
|
try:
|
||||||
|
vals = tuple(float(row[k]) for k in keys)
|
||||||
|
except (KeyError, ValueError):
|
||||||
|
return False
|
||||||
|
return all(abs(a - b) < tol for a, b in zip(vals, roi))
|
||||||
|
|
||||||
|
|
||||||
|
def extent_key(row: dict) -> tuple[str, str, str, str]:
|
||||||
|
"""Stable grouping key from CSV string fields."""
|
||||||
|
return (
|
||||||
|
row.get("start_x", "").strip(),
|
||||||
|
row.get("start_y", "").strip(),
|
||||||
|
row.get("end_x", "").strip(),
|
||||||
|
row.get("end_y", "").strip(),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def key_to_roi_floats(key: tuple[str, str, str, str]) -> tuple[float, float, float, float]:
|
||||||
|
return tuple(float(x) for x in key) # type: ignore[return-value]
|
||||||
|
|
||||||
|
|
||||||
|
def parse_args() -> argparse.Namespace:
|
||||||
|
p = argparse.ArgumentParser(description=__doc__)
|
||||||
|
p.add_argument("--machine", required=True, help='RootView machine label, e.g. "BW1-4 [AMR-15]"')
|
||||||
|
p.add_argument(
|
||||||
|
"--roi",
|
||||||
|
metavar="SX,SY,EX,EY",
|
||||||
|
help="Restrict to this extent (mm). If omitted, pick the ROI with the most on-disk mosaics.",
|
||||||
|
)
|
||||||
|
p.add_argument("--archive", default="archives", type=Path, help="Archive root (default: archives)")
|
||||||
|
p.add_argument(
|
||||||
|
"--scans-csv",
|
||||||
|
default=None,
|
||||||
|
type=Path,
|
||||||
|
help="Path to scans.csv (default: <archive>/scans.csv)",
|
||||||
|
)
|
||||||
|
p.add_argument(
|
||||||
|
"--output",
|
||||||
|
"-o",
|
||||||
|
type=Path,
|
||||||
|
default=None,
|
||||||
|
help="Output .mp4 path (default: <archive>/movies/<machine>/roi_<...>.mp4)",
|
||||||
|
)
|
||||||
|
p.add_argument("--fps", type=float, default=10.0, help="Frames per second (default: 10)")
|
||||||
|
p.add_argument(
|
||||||
|
"--max-height",
|
||||||
|
type=int,
|
||||||
|
default=None,
|
||||||
|
metavar="PX",
|
||||||
|
help="Scale each frame so height is at most PX pixels (width keeps aspect); "
|
||||||
|
"suited to tall full-tube mosaics. Both dimensions are rounded to even pixels for H.264.",
|
||||||
|
)
|
||||||
|
p.add_argument(
|
||||||
|
"--preview",
|
||||||
|
action="store_true",
|
||||||
|
help="Shorthand for --max-height 1080 (overridden if --max-height is also set).",
|
||||||
|
)
|
||||||
|
p.add_argument(
|
||||||
|
"--dry-run",
|
||||||
|
action="store_true",
|
||||||
|
help="List frames that would be written (no MP4)",
|
||||||
|
)
|
||||||
|
p.add_argument(
|
||||||
|
"--no-metadata-overlay",
|
||||||
|
action="store_true",
|
||||||
|
help="Do not draw scan metadata on each frame (default: overlay on).",
|
||||||
|
)
|
||||||
|
args = p.parse_args()
|
||||||
|
if args.preview and args.max_height is None:
|
||||||
|
args.max_height = 1080
|
||||||
|
return args
|
||||||
|
|
||||||
|
|
||||||
|
def _csv_required_fieldnames() -> tuple[str, ...]:
|
||||||
|
return (
|
||||||
|
"machine",
|
||||||
|
"scan_id",
|
||||||
|
"scan_time",
|
||||||
|
"mosaic_on_disk",
|
||||||
|
"mosaic_local_path",
|
||||||
|
"start_x",
|
||||||
|
"start_y",
|
||||||
|
"end_x",
|
||||||
|
"end_y",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def validate_scans_csv_header(reader: csv.DictReader, scans_csv: Path) -> None:
|
||||||
|
if reader.fieldnames is None:
|
||||||
|
sys.exit(f"Empty CSV: {scans_csv}")
|
||||||
|
required = _csv_required_fieldnames()
|
||||||
|
missing = [c for c in required if c not in reader.fieldnames]
|
||||||
|
if missing:
|
||||||
|
sys.exit(f"{scans_csv} missing columns: {missing}")
|
||||||
|
|
||||||
|
|
||||||
|
def load_latest_rows(
|
||||||
|
scans_csv: Path,
|
||||||
|
machine: str,
|
||||||
|
roi: tuple[float, float, float, float] | None,
|
||||||
|
) -> list[dict]:
|
||||||
|
"""Last row per scan_id for matching machine; mosaic_on_disk True; optional ROI."""
|
||||||
|
latest: dict[str, dict] = {}
|
||||||
|
with scans_csv.open(newline="", encoding="utf-8") as fh:
|
||||||
|
reader = csv.DictReader(fh)
|
||||||
|
validate_scans_csv_header(reader, scans_csv)
|
||||||
|
|
||||||
|
for row in reader:
|
||||||
|
if row.get("machine", "") != machine:
|
||||||
|
continue
|
||||||
|
if row.get("mosaic_on_disk", "").strip() != "True":
|
||||||
|
continue
|
||||||
|
if roi is not None and not extent_close(row, roi):
|
||||||
|
continue
|
||||||
|
sid = row.get("scan_id", "").strip()
|
||||||
|
if not sid:
|
||||||
|
continue
|
||||||
|
latest[sid] = row
|
||||||
|
|
||||||
|
return list(latest.values())
|
||||||
|
|
||||||
|
|
||||||
|
def load_on_disk_rows_by_machine(scans_csv: Path) -> dict[str, list[dict]]:
|
||||||
|
"""One pass: last row per (machine, scan_id) where mosaic_on_disk True; group by machine."""
|
||||||
|
latest: dict[tuple[str, str], dict] = {}
|
||||||
|
with scans_csv.open(newline="", encoding="utf-8") as fh:
|
||||||
|
reader = csv.DictReader(fh)
|
||||||
|
validate_scans_csv_header(reader, scans_csv)
|
||||||
|
for row in reader:
|
||||||
|
if row.get("mosaic_on_disk", "").strip() != "True":
|
||||||
|
continue
|
||||||
|
sid = row.get("scan_id", "").strip()
|
||||||
|
m = row.get("machine", "").strip()
|
||||||
|
if not sid or not m:
|
||||||
|
continue
|
||||||
|
latest[(m, sid)] = row
|
||||||
|
|
||||||
|
by_machine: dict[str, list[dict]] = defaultdict(list)
|
||||||
|
for (m, _sid), r in latest.items():
|
||||||
|
by_machine[m].append(r)
|
||||||
|
return {k: v for k, v in by_machine.items()}
|
||||||
|
|
||||||
|
|
||||||
|
def pick_top_rois(rows: list[dict], n: int) -> list[tuple[tuple[float, float, float, float], int]]:
|
||||||
|
"""Top n distinct extents by count of deduped rows. Empty if rows empty or n < 1."""
|
||||||
|
if not rows or n < 1:
|
||||||
|
return []
|
||||||
|
counts = Counter(extent_key(r) for r in rows)
|
||||||
|
return [(key_to_roi_floats(key), cnt) for key, cnt in counts.most_common(n)]
|
||||||
|
|
||||||
|
|
||||||
|
def pick_top_roi(rows: list[dict]) -> tuple[float, float, float, float]:
|
||||||
|
if not rows:
|
||||||
|
sys.exit("No rows with mosaic_on_disk=True for this machine (and ROI filter, if any).")
|
||||||
|
return pick_top_rois(rows, 1)[0][0]
|
||||||
|
|
||||||
|
|
||||||
|
def default_output_path(
|
||||||
|
archive: Path,
|
||||||
|
machine: str,
|
||||||
|
roi: tuple[float, float, float, float],
|
||||||
|
*,
|
||||||
|
max_height: int | None,
|
||||||
|
metadata_overlay: bool,
|
||||||
|
rank: int | None = None,
|
||||||
|
) -> Path:
|
||||||
|
safe = sanitize_machine_label(machine)
|
||||||
|
sx, sy, ex, ey = roi
|
||||||
|
base = f"roi_{sx}_{sy}_{ex}_{ey}".replace(" ", "")
|
||||||
|
if rank is not None:
|
||||||
|
base = f"{base}_r{rank}"
|
||||||
|
if max_height is not None:
|
||||||
|
base = f"{base}_h{max_height}"
|
||||||
|
if metadata_overlay:
|
||||||
|
base = f"{base}_meta"
|
||||||
|
name = f"{base}.mp4"
|
||||||
|
return archive / "movies" / safe / name
|
||||||
|
|
||||||
|
|
||||||
|
def resolve_mosaic_path(rel: str, archive: Path) -> Path:
|
||||||
|
"""CSV paths are usually repo-relative, e.g. archives/BW1-4__AMR-15/.../mosaic.jpg."""
|
||||||
|
p = Path(rel)
|
||||||
|
if p.is_absolute():
|
||||||
|
return p.resolve()
|
||||||
|
ar = archive.resolve()
|
||||||
|
norm = rel.replace("\\", "/")
|
||||||
|
if norm.startswith("archives/") or norm.startswith("./archives/"):
|
||||||
|
return (ar.parent / rel).resolve()
|
||||||
|
return (ar / rel).resolve()
|
||||||
|
|
||||||
|
|
||||||
|
def even_dimensions(w: int, h: int) -> tuple[int, int]:
|
||||||
|
"""libx264 requires even width and height."""
|
||||||
|
w2 = w - (w % 2)
|
||||||
|
h2 = h - (h % 2)
|
||||||
|
if w2 < 2 or h2 < 2:
|
||||||
|
raise MovieEncodeError(f"Frame dimensions too small after evenizing: {w}x{h}")
|
||||||
|
return w2, h2
|
||||||
|
|
||||||
|
|
||||||
|
def frame_size_mode(paths: list[Path]) -> tuple[int, int]:
|
||||||
|
sizes: list[tuple[int, int]] = []
|
||||||
|
for p in paths:
|
||||||
|
try:
|
||||||
|
with Image.open(p) as im:
|
||||||
|
sizes.append(im.size)
|
||||||
|
except OSError:
|
||||||
|
continue
|
||||||
|
if not sizes:
|
||||||
|
raise MovieEncodeError("No readable mosaic images to determine frame size.")
|
||||||
|
w, h = Counter(sizes).most_common(1)[0][0]
|
||||||
|
return even_dimensions(w, h)
|
||||||
|
|
||||||
|
|
||||||
|
def encode_size(native_w: int, native_h: int, max_height: int | None) -> tuple[int, int]:
|
||||||
|
"""Native size is already even; optional downscale for preview encodes (cap height)."""
|
||||||
|
if max_height is None:
|
||||||
|
return native_w, native_h
|
||||||
|
if max_height < 32:
|
||||||
|
raise MovieEncodeError("--max-height must be at least 32")
|
||||||
|
cap = max_height - (max_height % 2)
|
||||||
|
if cap < 2:
|
||||||
|
raise MovieEncodeError("--max-height must allow an even height of at least 2")
|
||||||
|
if native_h <= cap:
|
||||||
|
return native_w, native_h
|
||||||
|
new_h = cap
|
||||||
|
new_w = int(round(native_w * (new_h / native_h)))
|
||||||
|
new_w -= new_w % 2
|
||||||
|
if new_w < 2:
|
||||||
|
raise MovieEncodeError("Computed preview width too small; try a larger --max-height")
|
||||||
|
return new_w, new_h
|
||||||
|
|
||||||
|
|
||||||
|
def _truetype_font_candidates() -> list[Path]:
|
||||||
|
windir = os.environ.get("WINDIR", r"C:\Windows")
|
||||||
|
return [
|
||||||
|
Path(windir) / "Fonts" / "arial.ttf",
|
||||||
|
Path(windir) / "Fonts" / "consola.ttf",
|
||||||
|
Path("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf"),
|
||||||
|
Path("/usr/share/fonts/truetype/liberation/LiberationSans-Regular.ttf"),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def get_overlay_font(size: int) -> ImageFont.FreeTypeFont | ImageFont.ImageFont:
|
||||||
|
for path in _truetype_font_candidates():
|
||||||
|
if path.is_file():
|
||||||
|
try:
|
||||||
|
return ImageFont.truetype(str(path), size=size)
|
||||||
|
except OSError:
|
||||||
|
continue
|
||||||
|
return ImageFont.load_default()
|
||||||
|
|
||||||
|
|
||||||
|
def _truncate(s: str, max_len: int) -> str:
|
||||||
|
s = s.strip()
|
||||||
|
if len(s) <= max_len:
|
||||||
|
return s
|
||||||
|
if max_len <= 3:
|
||||||
|
return s[:max_len]
|
||||||
|
return s[: max_len - 3] + "..."
|
||||||
|
|
||||||
|
|
||||||
|
def metadata_overlay_lines(row: dict, *, max_name_chars: int) -> list[str]:
|
||||||
|
sid = row.get("scan_id", "").strip()
|
||||||
|
st = row.get("scan_time", "").strip()
|
||||||
|
name = _truncate(row.get("name", "").strip(), max_name_chars)
|
||||||
|
nx = row.get("nx", "").strip()
|
||||||
|
ny = row.get("ny", "").strip()
|
||||||
|
dx = row.get("dx", "").strip()
|
||||||
|
dy = row.get("dy", "").strip()
|
||||||
|
lines = row.get("scan_lines", "").strip()
|
||||||
|
mode = row.get("scan_mode", "").strip()
|
||||||
|
user = row.get("user", "").strip()
|
||||||
|
status = row.get("status", "").strip()
|
||||||
|
sx = row.get("start_x", "").strip()
|
||||||
|
sy = row.get("start_y", "").strip()
|
||||||
|
ex = row.get("end_x", "").strip()
|
||||||
|
ey = row.get("end_y", "").strip()
|
||||||
|
machine = row.get("machine", "").strip()
|
||||||
|
|
||||||
|
grid = f"{nx}x{ny}" if nx and ny else ""
|
||||||
|
step = f"{dx}x{dy} mm" if dx and dy else ""
|
||||||
|
geom = " ".join(p for p in (grid, step) if p)
|
||||||
|
orient = f"{lines} / {mode}" if lines or mode else ""
|
||||||
|
|
||||||
|
out: list[str] = []
|
||||||
|
if machine:
|
||||||
|
out.append(machine)
|
||||||
|
if sid or st:
|
||||||
|
parts = []
|
||||||
|
if sid:
|
||||||
|
parts.append(f"id {sid}")
|
||||||
|
if st:
|
||||||
|
parts.append(st)
|
||||||
|
out.append(" ".join(parts))
|
||||||
|
if name:
|
||||||
|
out.append(name)
|
||||||
|
if geom or orient:
|
||||||
|
out.append(" ".join(p for p in (geom, orient) if p))
|
||||||
|
if sx and sy and ex and ey:
|
||||||
|
out.append(f"ROI mm {sx},{sy} .. {ex},{ey}")
|
||||||
|
if user or status:
|
||||||
|
tail: list[str] = []
|
||||||
|
if user:
|
||||||
|
tail.append(f"user {user}")
|
||||||
|
if status:
|
||||||
|
tail.append(status)
|
||||||
|
out.append(" ".join(tail))
|
||||||
|
return out if out else ["(no metadata)"]
|
||||||
|
|
||||||
|
|
||||||
|
def draw_metadata_overlay(
|
||||||
|
rgb: Image.Image,
|
||||||
|
row: dict,
|
||||||
|
*,
|
||||||
|
margin: int,
|
||||||
|
) -> None:
|
||||||
|
"""Draw a semi-transparent label block along the top; mutates rgb in place."""
|
||||||
|
# Panel alpha: ~50% so roots stay visible through the bar.
|
||||||
|
panel_fill = (0, 0, 0, 128)
|
||||||
|
panel_outline = (220, 220, 230, 100)
|
||||||
|
|
||||||
|
w, h = rgb.size
|
||||||
|
margin = max(4, min(margin, w // 8))
|
||||||
|
font_size = max(10, min(22, h // 50))
|
||||||
|
pad = max(4, font_size // 3)
|
||||||
|
line_gap = max(2, font_size // 6)
|
||||||
|
|
||||||
|
def measure_block(fs: int, name_max: int) -> tuple[ImageFont.FreeTypeFont | ImageFont.ImageFont, list[str], int, int]:
|
||||||
|
font = get_overlay_font(fs)
|
||||||
|
lines = metadata_overlay_lines(row, max_name_chars=name_max)
|
||||||
|
tmp = Image.new("RGB", (1, 1))
|
||||||
|
draw = ImageDraw.Draw(tmp)
|
||||||
|
max_tw = 0
|
||||||
|
total_h = 0
|
||||||
|
for line in lines:
|
||||||
|
bbox = draw.textbbox((0, 0), line, font=font)
|
||||||
|
tw = bbox[2] - bbox[0]
|
||||||
|
th = bbox[3] - bbox[1]
|
||||||
|
max_tw = max(max_tw, tw)
|
||||||
|
total_h += th
|
||||||
|
if len(lines) > 1:
|
||||||
|
total_h += line_gap * (len(lines) - 1)
|
||||||
|
block_w = max_tw + 2 * pad
|
||||||
|
block_h = total_h + 2 * pad
|
||||||
|
return font, lines, block_w, block_h
|
||||||
|
|
||||||
|
name_max = max(24, w // max(6, font_size // 2))
|
||||||
|
font, lines, block_w, block_h = measure_block(font_size, name_max)
|
||||||
|
max_block_w = w - 2 * margin
|
||||||
|
max_block_h = min(h // 2, h - 2 * margin)
|
||||||
|
while (block_w > max_block_w or block_h > max_block_h) and font_size > 9:
|
||||||
|
font_size -= 1
|
||||||
|
name_max = max(16, w // max(7, font_size // 2))
|
||||||
|
font, lines, block_w, block_h = measure_block(font_size, name_max)
|
||||||
|
while block_w > max_block_w and name_max > 12:
|
||||||
|
name_max -= 4
|
||||||
|
font, lines, block_w, block_h = measure_block(font_size, name_max)
|
||||||
|
|
||||||
|
bw = min(block_w, max_block_w)
|
||||||
|
bh = min(block_h, max_block_h)
|
||||||
|
x0 = margin
|
||||||
|
x1 = x0 + bw
|
||||||
|
y0 = margin
|
||||||
|
y1 = y0 + bh
|
||||||
|
|
||||||
|
overlay = Image.new("RGBA", (w, h), (0, 0, 0, 0))
|
||||||
|
draw_ov = ImageDraw.Draw(overlay)
|
||||||
|
draw_ov.rounded_rectangle(
|
||||||
|
[x0, y0, x1, y1],
|
||||||
|
radius=max(4, pad // 2),
|
||||||
|
fill=panel_fill,
|
||||||
|
outline=panel_outline,
|
||||||
|
width=1,
|
||||||
|
)
|
||||||
|
|
||||||
|
base = rgb.convert("RGBA")
|
||||||
|
composited = Image.alpha_composite(base, overlay)
|
||||||
|
draw = ImageDraw.Draw(composited)
|
||||||
|
|
||||||
|
cx, cy = x0 + pad, y0 + pad
|
||||||
|
text_bottom_limit = y1 - pad
|
||||||
|
for line in lines:
|
||||||
|
bbox = draw.textbbox((0, 0), line, font=font)
|
||||||
|
th = bbox[3] - bbox[1]
|
||||||
|
if cy + th > text_bottom_limit:
|
||||||
|
break
|
||||||
|
draw.text(
|
||||||
|
(cx, cy),
|
||||||
|
line,
|
||||||
|
fill=(245, 245, 245, 255),
|
||||||
|
font=font,
|
||||||
|
stroke_width=1,
|
||||||
|
stroke_fill=(0, 0, 0, 255),
|
||||||
|
)
|
||||||
|
cy += th + line_gap
|
||||||
|
|
||||||
|
rgb.paste(composited.convert("RGB"))
|
||||||
|
|
||||||
|
|
||||||
|
def encode_movie(
|
||||||
|
*,
|
||||||
|
machine: str,
|
||||||
|
roi: tuple[float, float, float, float],
|
||||||
|
rows: list[dict],
|
||||||
|
archive: Path,
|
||||||
|
max_height: int | None,
|
||||||
|
metadata_overlay: bool,
|
||||||
|
fps: float,
|
||||||
|
output: Path | None = None,
|
||||||
|
dry_run: bool = False,
|
||||||
|
rank: int | None = None,
|
||||||
|
quiet: bool = False,
|
||||||
|
) -> EncodedMovieResult:
|
||||||
|
"""Build MP4 from pre-filtered rows for one ROI. Does not sys.exit on encode failures."""
|
||||||
|
rows_sorted = sorted(
|
||||||
|
rows,
|
||||||
|
key=lambda r: (r.get("scan_time") or "", r.get("scan_id") or ""),
|
||||||
|
)
|
||||||
|
csv_frame_count = len(rows_sorted)
|
||||||
|
|
||||||
|
row_path_candidates: list[tuple[dict, Path]] = []
|
||||||
|
for r in rows_sorted:
|
||||||
|
rel = (r.get("mosaic_local_path") or "").strip()
|
||||||
|
if not rel:
|
||||||
|
continue
|
||||||
|
row_path_candidates.append((r, resolve_mosaic_path(rel, archive)))
|
||||||
|
|
||||||
|
out: Path = output or default_output_path(
|
||||||
|
archive,
|
||||||
|
machine,
|
||||||
|
roi,
|
||||||
|
max_height=max_height,
|
||||||
|
metadata_overlay=metadata_overlay,
|
||||||
|
rank=rank,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not quiet:
|
||||||
|
print(f"Machine: {machine}")
|
||||||
|
print(f"ROI (mm): {roi[0]}, {roi[1]}, {roi[2]}, {roi[3]}")
|
||||||
|
print(f"Frames (from CSV, deduped): {csv_frame_count}")
|
||||||
|
if max_height is not None:
|
||||||
|
print(f"Preview max height: {max_height}px")
|
||||||
|
print(f"Metadata overlay: {'on' if metadata_overlay else 'off'}")
|
||||||
|
|
||||||
|
on_disk = sum(1 for _r, p in row_path_candidates if p.is_file())
|
||||||
|
missing_paths = len(row_path_candidates) - on_disk
|
||||||
|
|
||||||
|
if dry_run:
|
||||||
|
if not quiet:
|
||||||
|
print(f"On-disk files among ordered list: {on_disk} / {len(row_path_candidates)}")
|
||||||
|
for i, (_r, p) in enumerate(row_path_candidates[:5]):
|
||||||
|
print(f" [{i}] {p} exists={p.is_file()}")
|
||||||
|
if len(row_path_candidates) > 10:
|
||||||
|
print(" ...")
|
||||||
|
start = max(0, len(row_path_candidates) - 3)
|
||||||
|
for i, (_r, p) in enumerate(row_path_candidates[start:], start=start):
|
||||||
|
print(f" [{i}] {p} exists={p.is_file()}")
|
||||||
|
return EncodedMovieResult(
|
||||||
|
success=True,
|
||||||
|
machine=machine,
|
||||||
|
roi=roi,
|
||||||
|
csv_frame_count=csv_frame_count,
|
||||||
|
written=0,
|
||||||
|
missing=missing_paths,
|
||||||
|
dropped_read=0,
|
||||||
|
output_path=out,
|
||||||
|
skipped_reason=None,
|
||||||
|
size_mb=None,
|
||||||
|
elapsed_s=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
row_path_pairs = [(r, p) for r, p in row_path_candidates if p.is_file()]
|
||||||
|
if not row_path_pairs:
|
||||||
|
if not quiet:
|
||||||
|
print("No mosaic files on disk for the selected rows.")
|
||||||
|
return EncodedMovieResult(
|
||||||
|
success=False,
|
||||||
|
machine=machine,
|
||||||
|
roi=roi,
|
||||||
|
csv_frame_count=csv_frame_count,
|
||||||
|
written=0,
|
||||||
|
missing=missing_paths,
|
||||||
|
dropped_read=0,
|
||||||
|
output_path=out,
|
||||||
|
skipped_reason="No mosaic files on disk for the selected rows.",
|
||||||
|
size_mb=None,
|
||||||
|
elapsed_s=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
t0 = time.perf_counter()
|
||||||
|
try:
|
||||||
|
ordered_paths = [p for _r, p in row_path_pairs]
|
||||||
|
target_w, target_h = frame_size_mode(ordered_paths)
|
||||||
|
enc_w, enc_h = encode_size(target_w, target_h, max_height)
|
||||||
|
except MovieEncodeError as e:
|
||||||
|
if not quiet:
|
||||||
|
print(str(e))
|
||||||
|
return EncodedMovieResult(
|
||||||
|
success=False,
|
||||||
|
machine=machine,
|
||||||
|
roi=roi,
|
||||||
|
csv_frame_count=csv_frame_count,
|
||||||
|
written=0,
|
||||||
|
missing=missing_paths,
|
||||||
|
dropped_read=0,
|
||||||
|
output_path=out,
|
||||||
|
skipped_reason=str(e),
|
||||||
|
size_mb=None,
|
||||||
|
elapsed_s=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not quiet:
|
||||||
|
print(f"Target frame size (mode): {target_w} x {target_h}")
|
||||||
|
if (enc_w, enc_h) != (target_w, target_h):
|
||||||
|
print(f"Encode size (after max-height): {enc_w} x {enc_h}")
|
||||||
|
|
||||||
|
out.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
written = 0
|
||||||
|
dropped = 0
|
||||||
|
resized = 0
|
||||||
|
scaled_preview = 0
|
||||||
|
try:
|
||||||
|
writer = imageio.get_writer(
|
||||||
|
str(out),
|
||||||
|
fps=float(fps),
|
||||||
|
codec="libx264",
|
||||||
|
quality=8,
|
||||||
|
macro_block_size=1,
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
for row, p in row_path_pairs:
|
||||||
|
try:
|
||||||
|
with Image.open(p) as im:
|
||||||
|
rgb = im.convert("RGB")
|
||||||
|
if rgb.size != (target_w, target_h):
|
||||||
|
rgb = rgb.resize((target_w, target_h), Image.Resampling.LANCZOS)
|
||||||
|
resized += 1
|
||||||
|
if rgb.size != (enc_w, enc_h):
|
||||||
|
rgb = rgb.resize((enc_w, enc_h), Image.Resampling.LANCZOS)
|
||||||
|
scaled_preview += 1
|
||||||
|
if metadata_overlay:
|
||||||
|
draw_metadata_overlay(rgb, row, margin=max(6, enc_w // 80))
|
||||||
|
frame = np.asarray(rgb)
|
||||||
|
writer.append_data(frame)
|
||||||
|
written += 1
|
||||||
|
except OSError:
|
||||||
|
dropped += 1
|
||||||
|
finally:
|
||||||
|
writer.close()
|
||||||
|
except Exception as e:
|
||||||
|
if not quiet:
|
||||||
|
print(f"Encode error: {e}")
|
||||||
|
return EncodedMovieResult(
|
||||||
|
success=False,
|
||||||
|
machine=machine,
|
||||||
|
roi=roi,
|
||||||
|
csv_frame_count=csv_frame_count,
|
||||||
|
written=written,
|
||||||
|
missing=missing_paths,
|
||||||
|
dropped_read=dropped,
|
||||||
|
output_path=out,
|
||||||
|
skipped_reason=f"encode_error: {e}",
|
||||||
|
size_mb=None,
|
||||||
|
elapsed_s=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
elapsed = time.perf_counter() - t0
|
||||||
|
size_mb = out.stat().st_size / (1024 * 1024) if out.is_file() else 0.0
|
||||||
|
if not quiet:
|
||||||
|
print(
|
||||||
|
f"Written: {written} frames (normalized to mode: {resized}, "
|
||||||
|
f"preview scale: {scaled_preview})"
|
||||||
|
)
|
||||||
|
print(f"Dropped (read error): {dropped}")
|
||||||
|
print(f"Missing paths (not on disk): {missing_paths}")
|
||||||
|
print(f"Output: {out.resolve()} ({size_mb:.2f} MB)")
|
||||||
|
|
||||||
|
return EncodedMovieResult(
|
||||||
|
success=True,
|
||||||
|
machine=machine,
|
||||||
|
roi=roi,
|
||||||
|
csv_frame_count=csv_frame_count,
|
||||||
|
written=written,
|
||||||
|
missing=missing_paths,
|
||||||
|
dropped_read=dropped,
|
||||||
|
output_path=out,
|
||||||
|
skipped_reason=None,
|
||||||
|
size_mb=size_mb,
|
||||||
|
elapsed_s=elapsed,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
args = parse_args()
|
||||||
|
archive: Path = args.archive
|
||||||
|
scans_csv: Path = args.scans_csv or (archive / "scans.csv")
|
||||||
|
if not scans_csv.is_file():
|
||||||
|
sys.exit(f"scans.csv not found: {scans_csv}")
|
||||||
|
|
||||||
|
roi_sel: tuple[float, float, float, float] | None
|
||||||
|
if args.roi:
|
||||||
|
roi_sel = parse_roi(args.roi)
|
||||||
|
else:
|
||||||
|
roi_sel = None
|
||||||
|
|
||||||
|
rows = load_latest_rows(scans_csv, args.machine, roi_sel)
|
||||||
|
if roi_sel is None:
|
||||||
|
roi_sel = pick_top_roi(rows)
|
||||||
|
rows = [r for r in rows if extent_close(r, roi_sel)]
|
||||||
|
|
||||||
|
assert roi_sel is not None
|
||||||
|
max_height: int | None = args.max_height
|
||||||
|
metadata_overlay = not args.no_metadata_overlay
|
||||||
|
|
||||||
|
res = encode_movie(
|
||||||
|
machine=args.machine,
|
||||||
|
roi=roi_sel,
|
||||||
|
rows=rows,
|
||||||
|
archive=archive,
|
||||||
|
max_height=max_height,
|
||||||
|
metadata_overlay=metadata_overlay,
|
||||||
|
fps=float(args.fps),
|
||||||
|
output=args.output,
|
||||||
|
dry_run=bool(args.dry_run),
|
||||||
|
rank=None,
|
||||||
|
quiet=False,
|
||||||
|
)
|
||||||
|
if not res.success and not args.dry_run:
|
||||||
|
sys.exit(1 if res.skipped_reason else 1)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -0,0 +1,269 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Build preview MP4s for the top N ROIs per machine (default N=2, max-height 1080).
|
||||||
|
|
||||||
|
Reads archives/scans.csv once, groups on-disk mosaic rows by machine, then for
|
||||||
|
each machine picks the most frequent ROI extents and calls encode_movie().
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
python scripts/build_mosaic_movies_batch.py
|
||||||
|
python scripts/build_mosaic_movies_batch.py --dry-run
|
||||||
|
python scripts/build_mosaic_movies_batch.py --skip-existing
|
||||||
|
python scripts/build_mosaic_movies_batch.py --machine "BW2-10 [AMR-22]"
|
||||||
|
python scripts/build_mosaic_movies_batch.py --full-res # no max-height cap
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
_SCRIPTS_DIR = Path(__file__).resolve().parent
|
||||||
|
|
||||||
|
# Import sibling module (run as python scripts/build_mosaic_movies_batch.py from repo root)
|
||||||
|
sys.path.insert(0, str(_SCRIPTS_DIR))
|
||||||
|
import build_mosaic_movie as bmm # noqa: E402
|
||||||
|
|
||||||
|
|
||||||
|
def read_machine_labels(path: Path) -> list[str]:
|
||||||
|
out: list[str] = []
|
||||||
|
with path.open(encoding="utf-8") as fh:
|
||||||
|
for line in fh:
|
||||||
|
s = line.strip()
|
||||||
|
if not s or s.startswith("#"):
|
||||||
|
continue
|
||||||
|
out.append(s)
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Job:
|
||||||
|
machine: str
|
||||||
|
rank: int
|
||||||
|
roi: tuple[float, float, float, float]
|
||||||
|
extent_count: int
|
||||||
|
rows: list[dict]
|
||||||
|
output_path: Path
|
||||||
|
|
||||||
|
|
||||||
|
def collect_jobs(
|
||||||
|
*,
|
||||||
|
machines: list[str],
|
||||||
|
by_machine: dict[str, list[dict]],
|
||||||
|
archive: Path,
|
||||||
|
top_rois: int,
|
||||||
|
max_height: int | None,
|
||||||
|
metadata_overlay: bool,
|
||||||
|
) -> list[Job]:
|
||||||
|
jobs: list[Job] = []
|
||||||
|
for machine in machines:
|
||||||
|
rows = by_machine.get(machine, [])
|
||||||
|
if not rows:
|
||||||
|
continue
|
||||||
|
picks = bmm.pick_top_rois(rows, top_rois)
|
||||||
|
for rank, (roi, extent_count) in enumerate(picks, start=1):
|
||||||
|
rows_roi = [r for r in rows if bmm.extent_close(r, roi)]
|
||||||
|
out = bmm.default_output_path(
|
||||||
|
archive,
|
||||||
|
machine,
|
||||||
|
roi,
|
||||||
|
max_height=max_height,
|
||||||
|
metadata_overlay=metadata_overlay,
|
||||||
|
rank=rank,
|
||||||
|
)
|
||||||
|
jobs.append(
|
||||||
|
Job(
|
||||||
|
machine=machine,
|
||||||
|
rank=rank,
|
||||||
|
roi=roi,
|
||||||
|
extent_count=extent_count,
|
||||||
|
rows=rows_roi,
|
||||||
|
output_path=out,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return jobs
|
||||||
|
|
||||||
|
|
||||||
|
def parse_args() -> argparse.Namespace:
|
||||||
|
p = argparse.ArgumentParser(description=__doc__)
|
||||||
|
p.add_argument(
|
||||||
|
"--machines-file",
|
||||||
|
type=Path,
|
||||||
|
default=_SCRIPTS_DIR / "machines.example.txt",
|
||||||
|
help="One machine label per line (default: scripts/machines.example.txt next to this script)",
|
||||||
|
)
|
||||||
|
p.add_argument(
|
||||||
|
"--machine",
|
||||||
|
metavar="LABEL",
|
||||||
|
help='If set, only this machine (overrides list to a single job set), e.g. "BW2-10 [AMR-22]"',
|
||||||
|
)
|
||||||
|
p.add_argument("--archive", type=Path, default=Path("archives"))
|
||||||
|
p.add_argument("--scans-csv", type=Path, default=None)
|
||||||
|
p.add_argument("--top-rois", type=int, default=2, help="How many top extents per machine (default: 2)")
|
||||||
|
p.add_argument("--max-height", type=int, default=1080, help="Preview cap in px (default: 1080)")
|
||||||
|
p.add_argument(
|
||||||
|
"--full-res",
|
||||||
|
action="store_true",
|
||||||
|
help="Disable max-height cap (full mosaic resolution; can be huge)",
|
||||||
|
)
|
||||||
|
p.add_argument("--fps", type=float, default=10.0)
|
||||||
|
p.add_argument("--dry-run", action="store_true")
|
||||||
|
p.add_argument(
|
||||||
|
"--skip-existing",
|
||||||
|
action="store_true",
|
||||||
|
help="Skip encode if output MP4 exists and is non-empty",
|
||||||
|
)
|
||||||
|
p.add_argument("--no-metadata-overlay", action="store_true")
|
||||||
|
args = p.parse_args()
|
||||||
|
if args.full_res:
|
||||||
|
args.max_height = None
|
||||||
|
return args
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
args = parse_args()
|
||||||
|
archive: Path = args.archive
|
||||||
|
scans_csv: Path = args.scans_csv or (archive / "scans.csv")
|
||||||
|
if not scans_csv.is_file():
|
||||||
|
sys.exit(f"scans.csv not found: {scans_csv}")
|
||||||
|
|
||||||
|
if args.machine:
|
||||||
|
machines = [args.machine.strip()]
|
||||||
|
else:
|
||||||
|
if not args.machines_file.is_file():
|
||||||
|
sys.exit(f"Machines file not found: {args.machines_file}")
|
||||||
|
machines = read_machine_labels(args.machines_file)
|
||||||
|
if not machines:
|
||||||
|
sys.exit(f"No machine labels in {args.machines_file}")
|
||||||
|
|
||||||
|
t_load0 = time.perf_counter()
|
||||||
|
by_machine = bmm.load_on_disk_rows_by_machine(scans_csv)
|
||||||
|
load_s = time.perf_counter() - t_load0
|
||||||
|
|
||||||
|
max_height: int | None = args.max_height
|
||||||
|
metadata_overlay = not args.no_metadata_overlay
|
||||||
|
|
||||||
|
jobs = collect_jobs(
|
||||||
|
machines=machines,
|
||||||
|
by_machine=by_machine,
|
||||||
|
archive=archive,
|
||||||
|
top_rois=args.top_rois,
|
||||||
|
max_height=max_height,
|
||||||
|
metadata_overlay=metadata_overlay,
|
||||||
|
)
|
||||||
|
total = len(jobs)
|
||||||
|
if total == 0:
|
||||||
|
sys.exit("No jobs (no on-disk mosaics for selected machines).")
|
||||||
|
|
||||||
|
print(f"Loaded scans.csv grouped by machine in {load_s:.2f}s ({total} job(s))")
|
||||||
|
if max_height is not None:
|
||||||
|
print(f"Max height: {max_height}px")
|
||||||
|
else:
|
||||||
|
print("Max height: (full resolution)")
|
||||||
|
print(f"Metadata overlay: {'on' if metadata_overlay else 'off'}")
|
||||||
|
print()
|
||||||
|
|
||||||
|
summary_rows: list[tuple[str, ...]] = []
|
||||||
|
|
||||||
|
for idx, job in enumerate(jobs, start=1):
|
||||||
|
sx, sy, ex, ey = job.roi
|
||||||
|
roi_s = f"{sx},{sy}..{ex},{ey}"
|
||||||
|
print(
|
||||||
|
f"[{idx}/{total}] {job.machine} rank={job.rank} ROI {roi_s} "
|
||||||
|
f"({job.extent_count} CSV rows this extent, {len(job.rows)} deduped rows)"
|
||||||
|
)
|
||||||
|
|
||||||
|
if args.skip_existing and job.output_path.is_file() and job.output_path.stat().st_size > 0:
|
||||||
|
sz = job.output_path.stat().st_size / (1024 * 1024)
|
||||||
|
print(f" SKIP (exists): {job.output_path}")
|
||||||
|
summary_rows.append(
|
||||||
|
(
|
||||||
|
job.machine,
|
||||||
|
str(job.rank),
|
||||||
|
roi_s,
|
||||||
|
str(len(job.rows)),
|
||||||
|
"-",
|
||||||
|
"-",
|
||||||
|
str(job.output_path),
|
||||||
|
"SKIP (exists)",
|
||||||
|
f"{sz:.2f}",
|
||||||
|
"-",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
enc_t0 = time.perf_counter()
|
||||||
|
res = bmm.encode_movie(
|
||||||
|
machine=job.machine,
|
||||||
|
roi=job.roi,
|
||||||
|
rows=job.rows,
|
||||||
|
archive=archive,
|
||||||
|
max_height=max_height,
|
||||||
|
metadata_overlay=metadata_overlay,
|
||||||
|
fps=float(args.fps),
|
||||||
|
output=None,
|
||||||
|
dry_run=args.dry_run,
|
||||||
|
rank=job.rank,
|
||||||
|
quiet=True,
|
||||||
|
)
|
||||||
|
enc_elapsed = time.perf_counter() - enc_t0
|
||||||
|
|
||||||
|
if res.success:
|
||||||
|
if args.dry_run:
|
||||||
|
print(f" dry-run OK -> {res.output_path} (missing files: {res.missing})")
|
||||||
|
else:
|
||||||
|
print(
|
||||||
|
f" OK {res.written} frames "
|
||||||
|
f"{(res.size_mb or 0):.2f} MB {enc_elapsed:.1f}s"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
print(f" FAIL {res.skipped_reason or 'unknown'}")
|
||||||
|
|
||||||
|
status = "OK" if res.success else "FAIL"
|
||||||
|
if not res.success and res.skipped_reason:
|
||||||
|
status = f"FAIL: {res.skipped_reason[:40]}"
|
||||||
|
mb = f"{res.size_mb:.2f}" if res.size_mb is not None else "-"
|
||||||
|
es = f"{enc_elapsed:.1f}" if not args.dry_run else "-"
|
||||||
|
w = str(res.written) if not args.dry_run else "0"
|
||||||
|
if args.dry_run:
|
||||||
|
status = "dry-run"
|
||||||
|
mb = "-"
|
||||||
|
|
||||||
|
summary_rows.append(
|
||||||
|
(
|
||||||
|
job.machine,
|
||||||
|
str(job.rank),
|
||||||
|
roi_s,
|
||||||
|
str(len(job.rows)),
|
||||||
|
w,
|
||||||
|
str(res.missing),
|
||||||
|
str(res.output_path or job.output_path),
|
||||||
|
status,
|
||||||
|
mb,
|
||||||
|
es,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
print()
|
||||||
|
|
||||||
|
print("=" * 120)
|
||||||
|
hdr = (
|
||||||
|
f"{'machine':<26} {'rk':>2} {'ROI (mm)':<36} {'csv':>4} {'out':>5} "
|
||||||
|
f"{'miss':>4} {'MB':>7} {'s':>6} {'status':<22}"
|
||||||
|
)
|
||||||
|
print(hdr)
|
||||||
|
print("-" * 120)
|
||||||
|
for row in summary_rows:
|
||||||
|
m, rk, roi_s, csv_n, wrt, miss, path, status, mb, es = row
|
||||||
|
print(
|
||||||
|
f"{m:<26} {rk:>2} {roi_s:<36} {csv_n:>4} {wrt:>5} {miss:>4} "
|
||||||
|
f"{mb:>7} {es:>6} {status:<22}"
|
||||||
|
)
|
||||||
|
print(f" -> {path}")
|
||||||
|
print("=" * 120)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -2,11 +2,13 @@
|
|||||||
"""
|
"""
|
||||||
Report mosaic download progress from archives/scans.csv.
|
Report mosaic download progress from archives/scans.csv.
|
||||||
|
|
||||||
Output is formatted as Markdown. Add --by-year for a per-machine ×
|
Output is Markdown. Use ``--by-year`` for a per-machine × per-year
|
||||||
per-year breakdown table.
|
done/failed table. When the first mosaic pass is complete (no pending rows)
|
||||||
|
but failures remain, a **Mosaic retry estimates** section is printed with
|
||||||
|
queue counts and duration hints.
|
||||||
|
|
||||||
Rate/ETA require two calls at least 60 s apart. Mean mosaic size is
|
Rate/ETA use a 30-minute rolling window when snapshots show progress.
|
||||||
sampled from up to 100 already-downloaded files and cached for 1 hour.
|
Mean mosaic size is sampled from up to 100 downloads (1-hour cache).
|
||||||
|
|
||||||
Usage:
|
Usage:
|
||||||
python scripts/mosaic_progress_report.py [--archive DIR] [--by-year]
|
python scripts/mosaic_progress_report.py [--archive DIR] [--by-year]
|
||||||
@@ -30,6 +32,11 @@ _R_PRE19 = 1.00
|
|||||||
_R_PURGED = 0.00
|
_R_PURGED = 0.00
|
||||||
_R_RECENT = 0.82
|
_R_RECENT = 0.82
|
||||||
|
|
||||||
|
FIRST_PASS_FALLBACK_RATE_PER_HR = 1100.0
|
||||||
|
RETRY_OPTIMISTIC_RATE_PER_HR = 1800.0
|
||||||
|
RETRY_REALISTIC_RATE_PER_HR = 1100.0
|
||||||
|
RETRY_PESSIMISTIC_RATE_PER_HR = 300.0
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Helpers
|
# Helpers
|
||||||
@@ -127,6 +134,12 @@ def _expected_remaining(pending_rows: list[dict]) -> float:
|
|||||||
return count
|
return count
|
||||||
|
|
||||||
|
|
||||||
|
def _retry_hours_from_rate(n_scans: int, rate_per_hr: float) -> str:
|
||||||
|
if n_scans <= 0 or rate_per_hr <= 0:
|
||||||
|
return "—"
|
||||||
|
return _fmt_duration(n_scans / rate_per_hr * 3600.0)
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Main
|
# Main
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@@ -213,15 +226,32 @@ def main() -> None:
|
|||||||
|
|
||||||
rate_per_sec: float | None = None
|
rate_per_sec: float | None = None
|
||||||
rate_window_str = ""
|
rate_window_str = ""
|
||||||
|
snap_delta_proc = 0
|
||||||
if recent:
|
if recent:
|
||||||
oldest = recent[0]
|
oldest = recent[0]
|
||||||
dt = now.timestamp() - oldest["ts"]
|
dt = now.timestamp() - oldest["ts"]
|
||||||
dp = processed - oldest["proc"]
|
dp = processed - oldest["proc"]
|
||||||
|
snap_delta_proc = dp
|
||||||
if dt >= 60 and dp > 0:
|
if dt >= 60 and dp > 0:
|
||||||
rate_per_sec = dp / dt
|
rate_per_sec = dp / dt
|
||||||
window_min = dt / 60
|
window_min = dt / 60
|
||||||
rate_window_str = f"{window_min:.0f}-min avg"
|
rate_window_str = f"{window_min:.0f}-min avg"
|
||||||
|
|
||||||
|
# One-time baseline after initial mosaic crawl finished (no pending rows).
|
||||||
|
if pending == 0 and "first_pass_mean_rate_per_hr" not in cache:
|
||||||
|
cache["first_pass_completed_at"] = now.isoformat()
|
||||||
|
cache["first_pass_processed"] = total
|
||||||
|
cache["first_pass_mean_rate_per_hr"] = FIRST_PASS_FALLBACK_RATE_PER_HR
|
||||||
|
|
||||||
|
first_pass_rate_hr = float(
|
||||||
|
cache.get("first_pass_mean_rate_per_hr", FIRST_PASS_FALLBACK_RATE_PER_HR)
|
||||||
|
)
|
||||||
|
live_rate_hr = rate_per_sec * 3600 if rate_per_sec else None
|
||||||
|
# Active scrape shows progress in snapshots; idle archive shows dp == 0.
|
||||||
|
retry_estimate_rate_hr = (
|
||||||
|
live_rate_hr if live_rate_hr is not None else first_pass_rate_hr
|
||||||
|
)
|
||||||
|
|
||||||
# --- Disk space ---
|
# --- Disk space ---
|
||||||
mean_bytes: float | None = None
|
mean_bytes: float | None = None
|
||||||
size_note = ""
|
size_note = ""
|
||||||
@@ -325,6 +355,76 @@ def main() -> None:
|
|||||||
align=["l", "r", "r", "r", "r", "r"],
|
align=["l", "r", "r", "r", "r", "r"],
|
||||||
))
|
))
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------
|
||||||
|
# Retry estimates (first pass complete: pending == 0, failures remain)
|
||||||
|
# -----------------------------------------------------------------------
|
||||||
|
if failed > 0 and pending == 0:
|
||||||
|
failed_rows_list = [
|
||||||
|
r for r in latest.values()
|
||||||
|
if r.get("mosaic_download_status") == "failed"
|
||||||
|
]
|
||||||
|
n_all = len(failed_rows_list)
|
||||||
|
n_2023 = sum(
|
||||||
|
1 for r in failed_rows_list
|
||||||
|
if (r.get("scan_time") or "")[:4] >= "2023"
|
||||||
|
and len((r.get("scan_time") or "")[:4]) == 4
|
||||||
|
)
|
||||||
|
n_200 = sum(
|
||||||
|
1 for r in failed_rows_list
|
||||||
|
if r.get("mosaic_error_code") == "200"
|
||||||
|
)
|
||||||
|
rate_note = (
|
||||||
|
"rolling 30-min window"
|
||||||
|
if snap_delta_proc > 0
|
||||||
|
else f"first-pass baseline ({first_pass_rate_hr:,.0f}/hr)"
|
||||||
|
)
|
||||||
|
print()
|
||||||
|
print("### Mosaic retry estimates\n")
|
||||||
|
print(
|
||||||
|
f"*Suggested command after server fix:* "
|
||||||
|
f"`python scraper.py --retry-failed --workers 2` "
|
||||||
|
f"(filters: `--retry-since YEAR`, `--retry-error-code CODE`)*\n"
|
||||||
|
)
|
||||||
|
print(
|
||||||
|
f"*ETA column uses **{retry_estimate_rate_hr:,.0f} scans/hr** "
|
||||||
|
f"({rate_note}). Fixed columns use scenario rates.*\n"
|
||||||
|
)
|
||||||
|
est_hdr = (
|
||||||
|
"Retry scope",
|
||||||
|
"Count",
|
||||||
|
f"@{RETRY_OPTIMISTIC_RATE_PER_HR:.0f}/hr",
|
||||||
|
f"@{RETRY_REALISTIC_RATE_PER_HR:.0f}/hr",
|
||||||
|
f"@{RETRY_PESSIMISTIC_RATE_PER_HR:.0f}/hr",
|
||||||
|
f"@{retry_estimate_rate_hr:.0f}/hr",
|
||||||
|
)
|
||||||
|
retry_tbl_rows = [
|
||||||
|
[
|
||||||
|
"HTTP 200 (empty body)",
|
||||||
|
f"{n_200:,}",
|
||||||
|
_retry_hours_from_rate(n_200, RETRY_OPTIMISTIC_RATE_PER_HR),
|
||||||
|
_retry_hours_from_rate(n_200, RETRY_REALISTIC_RATE_PER_HR),
|
||||||
|
_retry_hours_from_rate(n_200, RETRY_PESSIMISTIC_RATE_PER_HR),
|
||||||
|
_retry_hours_from_rate(n_200, retry_estimate_rate_hr),
|
||||||
|
],
|
||||||
|
[
|
||||||
|
"Failed, scan_time ≥ 2023",
|
||||||
|
f"{n_2023:,}",
|
||||||
|
_retry_hours_from_rate(n_2023, RETRY_OPTIMISTIC_RATE_PER_HR),
|
||||||
|
_retry_hours_from_rate(n_2023, RETRY_REALISTIC_RATE_PER_HR),
|
||||||
|
_retry_hours_from_rate(n_2023, RETRY_PESSIMISTIC_RATE_PER_HR),
|
||||||
|
_retry_hours_from_rate(n_2023, retry_estimate_rate_hr),
|
||||||
|
],
|
||||||
|
[
|
||||||
|
"**All failed**",
|
||||||
|
f"**{n_all:,}**",
|
||||||
|
_retry_hours_from_rate(n_all, RETRY_OPTIMISTIC_RATE_PER_HR),
|
||||||
|
_retry_hours_from_rate(n_all, RETRY_REALISTIC_RATE_PER_HR),
|
||||||
|
_retry_hours_from_rate(n_all, RETRY_PESSIMISTIC_RATE_PER_HR),
|
||||||
|
_retry_hours_from_rate(n_all, retry_estimate_rate_hr),
|
||||||
|
],
|
||||||
|
]
|
||||||
|
print(_md_table(list(est_hdr), retry_tbl_rows, align=["l", "r", "r", "r", "r", "r"]))
|
||||||
|
|
||||||
# -----------------------------------------------------------------------
|
# -----------------------------------------------------------------------
|
||||||
# --by-year table
|
# --by-year table
|
||||||
# -----------------------------------------------------------------------
|
# -----------------------------------------------------------------------
|
||||||
|
|||||||
+47
-1
@@ -84,6 +84,33 @@ def parse_args() -> argparse.Namespace:
|
|||||||
"inventorying all scans across all machines."
|
"inventorying all scans across all machines."
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
p.add_argument(
|
||||||
|
"--retry-failed",
|
||||||
|
action="store_true",
|
||||||
|
help=(
|
||||||
|
"Mosaic-only: re-attempt scans whose latest scans.csv row has "
|
||||||
|
"mosaic_download_status=failed (queue from CSV, not the server list). "
|
||||||
|
"Implies --mosaic-only."
|
||||||
|
),
|
||||||
|
)
|
||||||
|
p.add_argument(
|
||||||
|
"--retry-since",
|
||||||
|
metavar="YEAR",
|
||||||
|
default=None,
|
||||||
|
help=(
|
||||||
|
"With --retry-failed: only scans with scan_time year >= YEAR "
|
||||||
|
"(e.g. 2023)."
|
||||||
|
),
|
||||||
|
)
|
||||||
|
p.add_argument(
|
||||||
|
"--retry-error-code",
|
||||||
|
metavar="CODE",
|
||||||
|
default=None,
|
||||||
|
help=(
|
||||||
|
"With --retry-failed: filter by mosaic_error_code "
|
||||||
|
"(e.g. 200 for empty-body failures)."
|
||||||
|
),
|
||||||
|
)
|
||||||
p.add_argument(
|
p.add_argument(
|
||||||
"--dry-run",
|
"--dry-run",
|
||||||
action="store_true",
|
action="store_true",
|
||||||
@@ -159,6 +186,16 @@ def main() -> None:
|
|||||||
if args.scan_id is not None and args.scan_id <= 0:
|
if args.scan_id is not None and args.scan_id <= 0:
|
||||||
sys.exit("--scan-id must be a positive integer")
|
sys.exit("--scan-id must be a positive integer")
|
||||||
|
|
||||||
|
if args.retry_since and not args.retry_failed:
|
||||||
|
sys.exit("--retry-since requires --retry-failed.")
|
||||||
|
if args.retry_error_code and not args.retry_failed:
|
||||||
|
sys.exit("--retry-error-code requires --retry-failed.")
|
||||||
|
|
||||||
|
if args.retry_failed:
|
||||||
|
if args.metadata_only:
|
||||||
|
sys.exit("--retry-failed cannot be used with --metadata-only.")
|
||||||
|
args.mosaic_only = True # implied
|
||||||
|
|
||||||
# --list-machines doesn't need credentials
|
# --list-machines doesn't need credentials
|
||||||
if args.list_machines:
|
if args.list_machines:
|
||||||
base_url = "http://205.149.147.131:8010/"
|
base_url = "http://205.149.147.131:8010/"
|
||||||
@@ -261,7 +298,13 @@ def main() -> None:
|
|||||||
if args.metadata_only:
|
if args.metadata_only:
|
||||||
log.info("Mode: metadata only (mosaics and tiles skipped)")
|
log.info("Mode: metadata only (mosaics and tiles skipped)")
|
||||||
elif args.mosaic_only:
|
elif args.mosaic_only:
|
||||||
log.info("Mode: mosaics only (individual tiles skipped)")
|
if args.retry_failed:
|
||||||
|
log.info(
|
||||||
|
"Mode: mosaic retry (failed scans from %s)",
|
||||||
|
SCANS_CSV_FILENAME,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
log.info("Mode: mosaics only (individual tiles skipped)")
|
||||||
if args.dry_run:
|
if args.dry_run:
|
||||||
log.info("Mode: dry-run (no files will be written)")
|
log.info("Mode: dry-run (no files will be written)")
|
||||||
|
|
||||||
@@ -285,6 +328,9 @@ def main() -> None:
|
|||||||
metadata_only=args.metadata_only,
|
metadata_only=args.metadata_only,
|
||||||
scan_id_filter=args.scan_id,
|
scan_id_filter=args.scan_id,
|
||||||
max_tiles=args.max_tiles,
|
max_tiles=args.max_tiles,
|
||||||
|
retry_failed=args.retry_failed,
|
||||||
|
retry_since_year=args.retry_since,
|
||||||
|
retry_error_code=args.retry_error_code,
|
||||||
)
|
)
|
||||||
totals.merge(stats)
|
totals.merge(stats)
|
||||||
finally:
|
finally:
|
||||||
|
|||||||
+114
-22
@@ -2,14 +2,22 @@
|
|||||||
High-level scrape orchestration: drives the per-machine and per-scan loops.
|
High-level scrape orchestration: drives the per-machine and per-scan loops.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import csv
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
import time
|
||||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
|
from tqdm import tqdm
|
||||||
|
|
||||||
from spruce.download_result import PERMANENT_MISSING, UNKNOWN, error_code_str
|
from spruce.download_result import PERMANENT_MISSING, UNKNOWN, error_code_str
|
||||||
|
from spruce.exif import write_mosaic_exif
|
||||||
|
from spruce.paths import machine_dir_name, tile_dest, mosaic_dest, _extract_date
|
||||||
|
from spruce.progress import ProgressTracker, CsvWriter
|
||||||
|
from spruce.session import MachineSession
|
||||||
|
|
||||||
# RootView returns ~43-byte 1×1 JPEG placeholders for empty cells; stay well
|
# RootView returns ~43-byte 1×1 JPEG placeholders for empty cells; stay well
|
||||||
# below smallest observed real tile (~7 KiB in production samples).
|
# below smallest observed real tile (~7 KiB in production samples).
|
||||||
@@ -49,16 +57,64 @@ class RunStats:
|
|||||||
self.scans_probe_skipped += other.scans_probe_skipped
|
self.scans_probe_skipped += other.scans_probe_skipped
|
||||||
self.scans_disk_space_skipped += other.scans_disk_space_skipped
|
self.scans_disk_space_skipped += other.scans_disk_space_skipped
|
||||||
|
|
||||||
from tqdm import tqdm
|
|
||||||
|
|
||||||
from spruce.exif import write_mosaic_exif
|
|
||||||
from spruce.paths import machine_dir_name, tile_dest, mosaic_dest, _extract_date
|
|
||||||
from spruce.progress import ProgressTracker, CsvWriter
|
|
||||||
from spruce.session import MachineSession
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def _read_scans_csv_latest(scans_csv_path: Path) -> dict[tuple[str, str], dict[str, str]]:
|
||||||
|
"""Last row wins per (machine, scan_id)."""
|
||||||
|
latest: dict[tuple[str, str], dict[str, str]] = {}
|
||||||
|
if not scans_csv_path.exists():
|
||||||
|
return latest
|
||||||
|
with open(scans_csv_path, newline="", encoding="utf-8") as fh:
|
||||||
|
for row in csv.DictReader(fh):
|
||||||
|
key = (row.get("machine", ""), row.get("scan_id", ""))
|
||||||
|
latest[key] = row
|
||||||
|
return latest
|
||||||
|
|
||||||
|
|
||||||
|
def load_failed_scans_from_csv(
|
||||||
|
scans_csv_path: Path,
|
||||||
|
machine_label: str,
|
||||||
|
*,
|
||||||
|
since_year: str | None = None,
|
||||||
|
error_code: str | None = None,
|
||||||
|
) -> list[dict[str, Any]]:
|
||||||
|
"""
|
||||||
|
Dedupe scans.csv by (machine, scan_id); return failed mosaic rows for one machine.
|
||||||
|
|
||||||
|
Each dict is suitable as the ``scan`` argument to ``process_scan`` (scan_id,
|
||||||
|
scan_time, name, status).
|
||||||
|
"""
|
||||||
|
latest = _read_scans_csv_latest(scans_csv_path)
|
||||||
|
out: list[dict[str, Any]] = []
|
||||||
|
for (_m, _sid), row in latest.items():
|
||||||
|
if row.get("machine") != machine_label:
|
||||||
|
continue
|
||||||
|
if row.get("mosaic_download_status") != "failed":
|
||||||
|
continue
|
||||||
|
if error_code is not None and row.get("mosaic_error_code", "") != error_code:
|
||||||
|
continue
|
||||||
|
st = row.get("scan_time", "") or ""
|
||||||
|
if since_year is not None:
|
||||||
|
yr = st[:4]
|
||||||
|
if len(yr) < 4 or yr < since_year:
|
||||||
|
continue
|
||||||
|
sid = int(row["scan_id"])
|
||||||
|
out.append(
|
||||||
|
{
|
||||||
|
"scan_id": sid,
|
||||||
|
"scan_time": st,
|
||||||
|
"name": row.get("name", ""),
|
||||||
|
"status": row.get("status", "") or "Completed",
|
||||||
|
"user": row.get("user", ""),
|
||||||
|
"scan_lines": row.get("scan_lines", ""),
|
||||||
|
"scan_mode": row.get("scan_mode", ""),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
out.sort(key=lambda s: s["scan_id"])
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Per-scan helpers
|
# Per-scan helpers
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@@ -499,6 +555,9 @@ def scrape_machine(
|
|||||||
metadata_only: bool = False,
|
metadata_only: bool = False,
|
||||||
scan_id_filter: int | None = None,
|
scan_id_filter: int | None = None,
|
||||||
max_tiles: int | None = None,
|
max_tiles: int | None = None,
|
||||||
|
retry_failed: bool = False,
|
||||||
|
retry_since_year: str | None = None,
|
||||||
|
retry_error_code: str | None = None,
|
||||||
) -> RunStats:
|
) -> RunStats:
|
||||||
"""Login, fetch scans, and download all content for one machine."""
|
"""Login, fetch scans, and download all content for one machine."""
|
||||||
sess = MachineSession(machine, config)
|
sess = MachineSession(machine, config)
|
||||||
@@ -518,8 +577,37 @@ def scrape_machine(
|
|||||||
log.error("[%s] Login failed after 3 attempts — skipping machine.", machine["label"])
|
log.error("[%s] Login failed after 3 attempts — skipping machine.", machine["label"])
|
||||||
return RunStats()
|
return RunStats()
|
||||||
|
|
||||||
if scan_id_filter is not None:
|
if retry_failed:
|
||||||
scans: list[dict[str, Any]] = [
|
scans = load_failed_scans_from_csv(
|
||||||
|
scans_csv.path,
|
||||||
|
machine["label"],
|
||||||
|
since_year=retry_since_year,
|
||||||
|
error_code=retry_error_code,
|
||||||
|
)
|
||||||
|
if scan_id_filter is not None:
|
||||||
|
scans = [s for s in scans if s["scan_id"] == scan_id_filter]
|
||||||
|
if not scans:
|
||||||
|
log.warning(
|
||||||
|
"[%s] Retry: scan_id %d not among failed rows for this machine.",
|
||||||
|
machine["label"],
|
||||||
|
scan_id_filter,
|
||||||
|
)
|
||||||
|
return RunStats()
|
||||||
|
log.info("[%s] Mosaic retry: single scan %d.", machine["label"], scan_id_filter)
|
||||||
|
elif not scans:
|
||||||
|
log.warning(
|
||||||
|
"[%s] No failed mosaic rows in scans.csv match retry filters.",
|
||||||
|
machine["label"],
|
||||||
|
)
|
||||||
|
return RunStats()
|
||||||
|
else:
|
||||||
|
log.info(
|
||||||
|
"[%s] Mosaic retry: %d failed scan(s) from scans.csv.",
|
||||||
|
machine["label"],
|
||||||
|
len(scans),
|
||||||
|
)
|
||||||
|
elif scan_id_filter is not None:
|
||||||
|
scans = [
|
||||||
{"scan_id": scan_id_filter, "status": "Completed"}
|
{"scan_id": scan_id_filter, "status": "Completed"}
|
||||||
]
|
]
|
||||||
log.info("[%s] Targeting scan ID %d.", machine["label"], scan_id_filter)
|
log.info("[%s] Targeting scan ID %d.", machine["label"], scan_id_filter)
|
||||||
@@ -529,21 +617,25 @@ def scrape_machine(
|
|||||||
log.warning("[%s] No scans found.", machine["label"])
|
log.warning("[%s] No scans found.", machine["label"])
|
||||||
return RunStats()
|
return RunStats()
|
||||||
|
|
||||||
# Build a set of scan_ids already fully processed in a prior run so we can
|
# Build existing_ids: scan_ids to skip entirely (no metadata fetch, no HTTP).
|
||||||
# skip them entirely (no metadata fetch, no mosaic request).
|
# In normal mode: skip anything with a definitive non-pending status.
|
||||||
# Only scans with a definitive non-pending status count; skipped_metadata_only
|
# In retry mode: only skip scans that are already downloaded or skipped for
|
||||||
# rows still need to be processed in mosaic mode.
|
# disk-space reasons — failed scans must be re-attempted.
|
||||||
PENDING_STATUSES = {"skipped_metadata_only", ""}
|
PENDING_STATUSES = {"skipped_metadata_only", ""}
|
||||||
|
BLOCK_AFTER_RETRY_STATUSES = {"downloaded", "skipped_zero_disk_space"}
|
||||||
existing_ids: set[int] = set()
|
existing_ids: set[int] = set()
|
||||||
if not metadata_only and scans_csv._fh.name:
|
if not metadata_only:
|
||||||
existing_path = Path(scans_csv._fh.name)
|
latest_rows = _read_scans_csv_latest(scans_csv.path)
|
||||||
if existing_path.exists():
|
for (_mlabel, _sid), _row in latest_rows.items():
|
||||||
import csv as _csv
|
if _mlabel != machine["label"]:
|
||||||
with open(existing_path, newline="", encoding="utf-8") as _f:
|
continue
|
||||||
for _row in _csv.DictReader(_f):
|
st = _row.get("mosaic_download_status", "")
|
||||||
if _row.get("machine") == machine["label"]:
|
if retry_failed:
|
||||||
if _row.get("mosaic_download_status", "") not in PENDING_STATUSES:
|
if st in BLOCK_AFTER_RETRY_STATUSES:
|
||||||
existing_ids.add(int(_row["scan_id"]))
|
existing_ids.add(int(_row["scan_id"]))
|
||||||
|
else:
|
||||||
|
if st not in PENDING_STATUSES:
|
||||||
|
existing_ids.add(int(_row["scan_id"]))
|
||||||
|
|
||||||
stats = RunStats()
|
stats = RunStats()
|
||||||
for scan in scans:
|
for scan in scans:
|
||||||
|
|||||||
@@ -77,6 +77,7 @@ class CsvWriter:
|
|||||||
def __init__(self, path: Path, fields: list[str]) -> None:
|
def __init__(self, path: Path, fields: list[str]) -> None:
|
||||||
is_new = not path.exists()
|
is_new = not path.exists()
|
||||||
path.parent.mkdir(parents=True, exist_ok=True)
|
path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
self.path = path
|
||||||
self._fh = open(path, "a", newline="", encoding="utf-8")
|
self._fh = open(path, "a", newline="", encoding="utf-8")
|
||||||
self._writer = csv.DictWriter(self._fh, fieldnames=fields)
|
self._writer = csv.DictWriter(self._fh, fieldnames=fields)
|
||||||
if is_new:
|
if is_new:
|
||||||
|
|||||||
@@ -0,0 +1,133 @@
|
|||||||
|
"""Retry queue loading from scans.csv (mosaic_download_status=failed)."""
|
||||||
|
|
||||||
|
import csv
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from spruce.orchestrator import load_failed_scans_from_csv
|
||||||
|
from spruce.settings import SCANS_CSV_FIELDS
|
||||||
|
|
||||||
|
|
||||||
|
def _blank_row(**kwargs: str) -> dict[str, str]:
|
||||||
|
row = {k: "" for k in SCANS_CSV_FIELDS}
|
||||||
|
row.update(kwargs)
|
||||||
|
return row
|
||||||
|
|
||||||
|
|
||||||
|
def _write_scans_csv(path: Path, rows: list[dict[str, str]]) -> None:
|
||||||
|
with open(path, "w", newline="", encoding="utf-8") as fh:
|
||||||
|
w = csv.DictWriter(fh, fieldnames=SCANS_CSV_FIELDS)
|
||||||
|
w.writeheader()
|
||||||
|
for r in rows:
|
||||||
|
w.writerow({k: r.get(k, "") for k in SCANS_CSV_FIELDS})
|
||||||
|
|
||||||
|
|
||||||
|
def test_load_failed_scans_dedup_keeps_last_row(tmp_path: Path) -> None:
|
||||||
|
path = tmp_path / "scans.csv"
|
||||||
|
common = {
|
||||||
|
"machine": "BW1 [X]",
|
||||||
|
"machine_id": "1",
|
||||||
|
"scan_id": "100",
|
||||||
|
"mosaic_url": "http://x/m.jpg",
|
||||||
|
"mosaic_local_path": "",
|
||||||
|
"mosaic_on_disk": "False",
|
||||||
|
}
|
||||||
|
_write_scans_csv(
|
||||||
|
path,
|
||||||
|
[
|
||||||
|
_blank_row(
|
||||||
|
**common,
|
||||||
|
mosaic_download_status="failed",
|
||||||
|
mosaic_error_code="404",
|
||||||
|
scan_time="2020-01-01",
|
||||||
|
),
|
||||||
|
_blank_row(
|
||||||
|
**common,
|
||||||
|
mosaic_download_status="failed",
|
||||||
|
mosaic_error_code="404",
|
||||||
|
scan_time="2020-06-01",
|
||||||
|
),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
out = load_failed_scans_from_csv(path, "BW1 [X]")
|
||||||
|
assert len(out) == 1
|
||||||
|
assert out[0]["scan_id"] == 100
|
||||||
|
assert out[0]["scan_time"] == "2020-06-01"
|
||||||
|
|
||||||
|
|
||||||
|
def test_load_failed_scans_since_year(tmp_path: Path) -> None:
|
||||||
|
path = tmp_path / "scans.csv"
|
||||||
|
base = {
|
||||||
|
"machine": "M",
|
||||||
|
"machine_id": "1",
|
||||||
|
"mosaic_url": "",
|
||||||
|
"mosaic_local_path": "",
|
||||||
|
"mosaic_on_disk": "",
|
||||||
|
"mosaic_download_status": "failed",
|
||||||
|
"mosaic_error_code": "404",
|
||||||
|
}
|
||||||
|
_write_scans_csv(
|
||||||
|
path,
|
||||||
|
[
|
||||||
|
_blank_row(**base, scan_id="1", scan_time="2022-12-31"),
|
||||||
|
_blank_row(**base, scan_id="2", scan_time="2023-01-01"),
|
||||||
|
_blank_row(**base, scan_id="3", scan_time=""),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
out = load_failed_scans_from_csv(path, "M", since_year="2023")
|
||||||
|
ids = {s["scan_id"] for s in out}
|
||||||
|
assert ids == {2}
|
||||||
|
|
||||||
|
|
||||||
|
def test_load_failed_scans_error_code(tmp_path: Path) -> None:
|
||||||
|
path = tmp_path / "scans.csv"
|
||||||
|
base = {
|
||||||
|
"machine": "M",
|
||||||
|
"machine_id": "1",
|
||||||
|
"scan_time": "2024-01-01",
|
||||||
|
"mosaic_url": "",
|
||||||
|
"mosaic_local_path": "",
|
||||||
|
"mosaic_on_disk": "",
|
||||||
|
"mosaic_download_status": "failed",
|
||||||
|
}
|
||||||
|
_write_scans_csv(
|
||||||
|
path,
|
||||||
|
[
|
||||||
|
_blank_row(**base, scan_id="10", mosaic_error_code="404"),
|
||||||
|
_blank_row(**base, scan_id="11", mosaic_error_code="200"),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
out = load_failed_scans_from_csv(path, "M", error_code="200")
|
||||||
|
assert [s["scan_id"] for s in out] == [11]
|
||||||
|
|
||||||
|
|
||||||
|
def test_load_failed_scans_excludes_downloaded(tmp_path: Path) -> None:
|
||||||
|
path = tmp_path / "scans.csv"
|
||||||
|
base = {
|
||||||
|
"machine": "M",
|
||||||
|
"machine_id": "1",
|
||||||
|
"scan_time": "2024-01-01",
|
||||||
|
"mosaic_url": "",
|
||||||
|
"mosaic_local_path": "",
|
||||||
|
"mosaic_on_disk": "True",
|
||||||
|
}
|
||||||
|
_write_scans_csv(
|
||||||
|
path,
|
||||||
|
[
|
||||||
|
_blank_row(
|
||||||
|
**base,
|
||||||
|
scan_id="5",
|
||||||
|
mosaic_download_status="downloaded",
|
||||||
|
mosaic_error_code="",
|
||||||
|
),
|
||||||
|
_blank_row(
|
||||||
|
**base,
|
||||||
|
scan_id="6",
|
||||||
|
mosaic_download_status="failed",
|
||||||
|
mosaic_error_code="404",
|
||||||
|
),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
out = load_failed_scans_from_csv(path, "M")
|
||||||
|
assert [s["scan_id"] for s in out] == [6]
|
||||||
Reference in New Issue
Block a user