Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 655c376a97 | |||
| 1ef9e0206c | |||
| 6390f5d529 |
@@ -7,3 +7,6 @@ __pycache__/
|
||||
.DS_Store
|
||||
explore_dumps/
|
||||
.venv/
|
||||
scripts/sync_to_nas.sh
|
||||
backup/
|
||||
.claude/
|
||||
|
||||
@@ -97,10 +97,8 @@ python scraper.py --machine "BW3-20 [AMR-26]" --mosaic-only
|
||||
# Download mosaics for all machines
|
||||
python scraper.py --mosaic-only
|
||||
|
||||
# One random completed scan per machine: mosaic + all tiles (from machines.txt; uses --list-scans + --scan-id)
|
||||
# MOSAIC_ONLY=1 ./scripts/sample_random_scans.sh machines.txt # optional: mosaics only, no tiles
|
||||
# cp scripts/machines.example.txt machines.txt # then edit: one label per line
|
||||
# ./scripts/sample_random_scans.sh machines.txt
|
||||
# One random completed scan per machine (helper script): check out branch `testing/sample-runs`,
|
||||
# then see `scripts/sample_random_scans.sh` and `docs/sample_random_scans_run_progress.md`.
|
||||
|
||||
# Download all tiles for a specific scan
|
||||
python scraper.py --machine "BW3-20 [AMR-26]" --scan-id 158374 --workers 4
|
||||
|
||||
|
After Width: | Height: | Size: 53 KiB |
|
After Width: | Height: | Size: 13 MiB |
|
After Width: | Height: | Size: 41 KiB |
|
After Width: | Height: | Size: 37 KiB |
|
After Width: | Height: | Size: 7.6 MiB |
|
After Width: | Height: | Size: 50 KiB |
|
After Width: | Height: | Size: 58 KiB |
|
After Width: | Height: | Size: 15 MiB |
|
After Width: | Height: | Size: 52 KiB |
|
After Width: | Height: | Size: 218 KiB |
|
After Width: | Height: | Size: 50 MiB |
|
After Width: | Height: | Size: 62 KiB |
@@ -6,3 +6,5 @@ tqdm>=4.66.0
|
||||
piexif>=1.1.3
|
||||
Pillow>=10.0.0
|
||||
pytest>=8.0
|
||||
imageio>=2.34
|
||||
imageio-ffmpeg>=0.5
|
||||
|
||||
@@ -0,0 +1,15 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
VENV="/tmp/spruce_venv"
|
||||
|
||||
if [[ ! -x "$VENV/bin/python" ]]; then
|
||||
echo "Setting up venv at $VENV..."
|
||||
python3 -m venv "$VENV"
|
||||
"$VENV/bin/python" -m ensurepip --upgrade
|
||||
"$VENV/bin/pip" install -q -r "$SCRIPT_DIR/requirements.txt"
|
||||
fi
|
||||
|
||||
echo "Starting metadata-only scan of all machines..."
|
||||
"$VENV/bin/python" "$SCRIPT_DIR/scraper.py" --metadata-only "$@"
|
||||
@@ -0,0 +1,710 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Build a chronological MP4 from downloaded mosaic.jpg files for one machine ROI.
|
||||
|
||||
Reads archives/scans.csv, filters by machine and mosaic_on_disk, optionally
|
||||
restricts to one (start_x, start_y, end_x, end_y) ROI, dedupes by scan_id
|
||||
(last row wins), sorts by scan_time, and encodes frames with imageio/ffmpeg.
|
||||
|
||||
Usage:
|
||||
.venv/bin/python scripts/build_mosaic_movie.py --machine "BW1-4 [AMR-15]"
|
||||
.venv/bin/python scripts/build_mosaic_movie.py --machine "BW1-4 [AMR-15]" \\
|
||||
--roi "195.65,219.22,219.73,235.04" --fps 8 --output /tmp/out.mp4
|
||||
.venv/bin/python scripts/build_mosaic_movie.py --machine "BW1-4 [AMR-15]" --dry-run
|
||||
# Lighter preview (caps tall full-tube mosaics by height — easier on players):
|
||||
.venv/bin/python scripts/build_mosaic_movie.py --machine "BW1-4 [AMR-15]" \\
|
||||
--roi "0.0,0.0,310.0,740.0" --preview
|
||||
# Metadata is drawn on each frame by default (semi-transparent bar at the top);
|
||||
# use --no-metadata-overlay to disable.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from collections import Counter, defaultdict
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
import imageio
|
||||
import numpy as np
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
|
||||
|
||||
class MovieEncodeError(Exception):
|
||||
"""Raised from encoding helpers; caught by encode_movie for batch-safe handling."""
|
||||
|
||||
|
||||
@dataclass
|
||||
class EncodedMovieResult:
|
||||
success: bool
|
||||
machine: str
|
||||
roi: tuple[float, float, float, float]
|
||||
csv_frame_count: int
|
||||
written: int
|
||||
missing: int
|
||||
dropped_read: int
|
||||
output_path: Path | None
|
||||
skipped_reason: str | None
|
||||
size_mb: float | None
|
||||
elapsed_s: float | None
|
||||
|
||||
|
||||
def sanitize_machine_label(label: str) -> str:
|
||||
return label.replace("[", "").replace("]", "").replace(" ", "_").strip("_")
|
||||
|
||||
|
||||
def parse_roi(s: str) -> tuple[float, float, float, float]:
|
||||
parts = [p.strip() for p in s.split(",")]
|
||||
if len(parts) != 4:
|
||||
sys.exit("--roi must be four comma-separated numbers: start_x,start_y,end_x,end_y")
|
||||
try:
|
||||
return tuple(float(p) for p in parts) # type: ignore[return-value]
|
||||
except ValueError as e:
|
||||
sys.exit(f"Invalid --roi numbers: {e}")
|
||||
|
||||
|
||||
def extent_close(
|
||||
row: dict,
|
||||
roi: tuple[float, float, float, float],
|
||||
*,
|
||||
tol: float = 1e-4,
|
||||
) -> bool:
|
||||
keys = ("start_x", "start_y", "end_x", "end_y")
|
||||
try:
|
||||
vals = tuple(float(row[k]) for k in keys)
|
||||
except (KeyError, ValueError):
|
||||
return False
|
||||
return all(abs(a - b) < tol for a, b in zip(vals, roi))
|
||||
|
||||
|
||||
def extent_key(row: dict) -> tuple[str, str, str, str]:
|
||||
"""Stable grouping key from CSV string fields."""
|
||||
return (
|
||||
row.get("start_x", "").strip(),
|
||||
row.get("start_y", "").strip(),
|
||||
row.get("end_x", "").strip(),
|
||||
row.get("end_y", "").strip(),
|
||||
)
|
||||
|
||||
|
||||
def key_to_roi_floats(key: tuple[str, str, str, str]) -> tuple[float, float, float, float]:
|
||||
return tuple(float(x) for x in key) # type: ignore[return-value]
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
p = argparse.ArgumentParser(description=__doc__)
|
||||
p.add_argument("--machine", required=True, help='RootView machine label, e.g. "BW1-4 [AMR-15]"')
|
||||
p.add_argument(
|
||||
"--roi",
|
||||
metavar="SX,SY,EX,EY",
|
||||
help="Restrict to this extent (mm). If omitted, pick the ROI with the most on-disk mosaics.",
|
||||
)
|
||||
p.add_argument("--archive", default="archives", type=Path, help="Archive root (default: archives)")
|
||||
p.add_argument(
|
||||
"--scans-csv",
|
||||
default=None,
|
||||
type=Path,
|
||||
help="Path to scans.csv (default: <archive>/scans.csv)",
|
||||
)
|
||||
p.add_argument(
|
||||
"--output",
|
||||
"-o",
|
||||
type=Path,
|
||||
default=None,
|
||||
help="Output .mp4 path (default: <archive>/movies/<machine>/roi_<...>.mp4)",
|
||||
)
|
||||
p.add_argument("--fps", type=float, default=10.0, help="Frames per second (default: 10)")
|
||||
p.add_argument(
|
||||
"--max-height",
|
||||
type=int,
|
||||
default=None,
|
||||
metavar="PX",
|
||||
help="Scale each frame so height is at most PX pixels (width keeps aspect); "
|
||||
"suited to tall full-tube mosaics. Both dimensions are rounded to even pixels for H.264.",
|
||||
)
|
||||
p.add_argument(
|
||||
"--preview",
|
||||
action="store_true",
|
||||
help="Shorthand for --max-height 1080 (overridden if --max-height is also set).",
|
||||
)
|
||||
p.add_argument(
|
||||
"--dry-run",
|
||||
action="store_true",
|
||||
help="List frames that would be written (no MP4)",
|
||||
)
|
||||
p.add_argument(
|
||||
"--no-metadata-overlay",
|
||||
action="store_true",
|
||||
help="Do not draw scan metadata on each frame (default: overlay on).",
|
||||
)
|
||||
args = p.parse_args()
|
||||
if args.preview and args.max_height is None:
|
||||
args.max_height = 1080
|
||||
return args
|
||||
|
||||
|
||||
def _csv_required_fieldnames() -> tuple[str, ...]:
|
||||
return (
|
||||
"machine",
|
||||
"scan_id",
|
||||
"scan_time",
|
||||
"mosaic_on_disk",
|
||||
"mosaic_local_path",
|
||||
"start_x",
|
||||
"start_y",
|
||||
"end_x",
|
||||
"end_y",
|
||||
)
|
||||
|
||||
|
||||
def validate_scans_csv_header(reader: csv.DictReader, scans_csv: Path) -> None:
|
||||
if reader.fieldnames is None:
|
||||
sys.exit(f"Empty CSV: {scans_csv}")
|
||||
required = _csv_required_fieldnames()
|
||||
missing = [c for c in required if c not in reader.fieldnames]
|
||||
if missing:
|
||||
sys.exit(f"{scans_csv} missing columns: {missing}")
|
||||
|
||||
|
||||
def load_latest_rows(
|
||||
scans_csv: Path,
|
||||
machine: str,
|
||||
roi: tuple[float, float, float, float] | None,
|
||||
) -> list[dict]:
|
||||
"""Last row per scan_id for matching machine; mosaic_on_disk True; optional ROI."""
|
||||
latest: dict[str, dict] = {}
|
||||
with scans_csv.open(newline="", encoding="utf-8") as fh:
|
||||
reader = csv.DictReader(fh)
|
||||
validate_scans_csv_header(reader, scans_csv)
|
||||
|
||||
for row in reader:
|
||||
if row.get("machine", "") != machine:
|
||||
continue
|
||||
if row.get("mosaic_on_disk", "").strip() != "True":
|
||||
continue
|
||||
if roi is not None and not extent_close(row, roi):
|
||||
continue
|
||||
sid = row.get("scan_id", "").strip()
|
||||
if not sid:
|
||||
continue
|
||||
latest[sid] = row
|
||||
|
||||
return list(latest.values())
|
||||
|
||||
|
||||
def load_on_disk_rows_by_machine(scans_csv: Path) -> dict[str, list[dict]]:
|
||||
"""One pass: last row per (machine, scan_id) where mosaic_on_disk True; group by machine."""
|
||||
latest: dict[tuple[str, str], dict] = {}
|
||||
with scans_csv.open(newline="", encoding="utf-8") as fh:
|
||||
reader = csv.DictReader(fh)
|
||||
validate_scans_csv_header(reader, scans_csv)
|
||||
for row in reader:
|
||||
if row.get("mosaic_on_disk", "").strip() != "True":
|
||||
continue
|
||||
sid = row.get("scan_id", "").strip()
|
||||
m = row.get("machine", "").strip()
|
||||
if not sid or not m:
|
||||
continue
|
||||
latest[(m, sid)] = row
|
||||
|
||||
by_machine: dict[str, list[dict]] = defaultdict(list)
|
||||
for (m, _sid), r in latest.items():
|
||||
by_machine[m].append(r)
|
||||
return {k: v for k, v in by_machine.items()}
|
||||
|
||||
|
||||
def pick_top_rois(rows: list[dict], n: int) -> list[tuple[tuple[float, float, float, float], int]]:
|
||||
"""Top n distinct extents by count of deduped rows. Empty if rows empty or n < 1."""
|
||||
if not rows or n < 1:
|
||||
return []
|
||||
counts = Counter(extent_key(r) for r in rows)
|
||||
return [(key_to_roi_floats(key), cnt) for key, cnt in counts.most_common(n)]
|
||||
|
||||
|
||||
def pick_top_roi(rows: list[dict]) -> tuple[float, float, float, float]:
|
||||
if not rows:
|
||||
sys.exit("No rows with mosaic_on_disk=True for this machine (and ROI filter, if any).")
|
||||
return pick_top_rois(rows, 1)[0][0]
|
||||
|
||||
|
||||
def default_output_path(
|
||||
archive: Path,
|
||||
machine: str,
|
||||
roi: tuple[float, float, float, float],
|
||||
*,
|
||||
max_height: int | None,
|
||||
metadata_overlay: bool,
|
||||
rank: int | None = None,
|
||||
) -> Path:
|
||||
safe = sanitize_machine_label(machine)
|
||||
sx, sy, ex, ey = roi
|
||||
base = f"roi_{sx}_{sy}_{ex}_{ey}".replace(" ", "")
|
||||
if rank is not None:
|
||||
base = f"{base}_r{rank}"
|
||||
if max_height is not None:
|
||||
base = f"{base}_h{max_height}"
|
||||
if metadata_overlay:
|
||||
base = f"{base}_meta"
|
||||
name = f"{base}.mp4"
|
||||
return archive / "movies" / safe / name
|
||||
|
||||
|
||||
def resolve_mosaic_path(rel: str, archive: Path) -> Path:
|
||||
"""CSV paths are usually repo-relative, e.g. archives/BW1-4__AMR-15/.../mosaic.jpg."""
|
||||
p = Path(rel)
|
||||
if p.is_absolute():
|
||||
return p.resolve()
|
||||
ar = archive.resolve()
|
||||
norm = rel.replace("\\", "/")
|
||||
if norm.startswith("archives/") or norm.startswith("./archives/"):
|
||||
return (ar.parent / rel).resolve()
|
||||
return (ar / rel).resolve()
|
||||
|
||||
|
||||
def even_dimensions(w: int, h: int) -> tuple[int, int]:
|
||||
"""libx264 requires even width and height."""
|
||||
w2 = w - (w % 2)
|
||||
h2 = h - (h % 2)
|
||||
if w2 < 2 or h2 < 2:
|
||||
raise MovieEncodeError(f"Frame dimensions too small after evenizing: {w}x{h}")
|
||||
return w2, h2
|
||||
|
||||
|
||||
def frame_size_mode(paths: list[Path]) -> tuple[int, int]:
|
||||
sizes: list[tuple[int, int]] = []
|
||||
for p in paths:
|
||||
try:
|
||||
with Image.open(p) as im:
|
||||
sizes.append(im.size)
|
||||
except OSError:
|
||||
continue
|
||||
if not sizes:
|
||||
raise MovieEncodeError("No readable mosaic images to determine frame size.")
|
||||
w, h = Counter(sizes).most_common(1)[0][0]
|
||||
return even_dimensions(w, h)
|
||||
|
||||
|
||||
def encode_size(native_w: int, native_h: int, max_height: int | None) -> tuple[int, int]:
|
||||
"""Native size is already even; optional downscale for preview encodes (cap height)."""
|
||||
if max_height is None:
|
||||
return native_w, native_h
|
||||
if max_height < 32:
|
||||
raise MovieEncodeError("--max-height must be at least 32")
|
||||
cap = max_height - (max_height % 2)
|
||||
if cap < 2:
|
||||
raise MovieEncodeError("--max-height must allow an even height of at least 2")
|
||||
if native_h <= cap:
|
||||
return native_w, native_h
|
||||
new_h = cap
|
||||
new_w = int(round(native_w * (new_h / native_h)))
|
||||
new_w -= new_w % 2
|
||||
if new_w < 2:
|
||||
raise MovieEncodeError("Computed preview width too small; try a larger --max-height")
|
||||
return new_w, new_h
|
||||
|
||||
|
||||
def _truetype_font_candidates() -> list[Path]:
|
||||
windir = os.environ.get("WINDIR", r"C:\Windows")
|
||||
return [
|
||||
Path(windir) / "Fonts" / "arial.ttf",
|
||||
Path(windir) / "Fonts" / "consola.ttf",
|
||||
Path("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf"),
|
||||
Path("/usr/share/fonts/truetype/liberation/LiberationSans-Regular.ttf"),
|
||||
]
|
||||
|
||||
|
||||
def get_overlay_font(size: int) -> ImageFont.FreeTypeFont | ImageFont.ImageFont:
|
||||
for path in _truetype_font_candidates():
|
||||
if path.is_file():
|
||||
try:
|
||||
return ImageFont.truetype(str(path), size=size)
|
||||
except OSError:
|
||||
continue
|
||||
return ImageFont.load_default()
|
||||
|
||||
|
||||
def _truncate(s: str, max_len: int) -> str:
|
||||
s = s.strip()
|
||||
if len(s) <= max_len:
|
||||
return s
|
||||
if max_len <= 3:
|
||||
return s[:max_len]
|
||||
return s[: max_len - 3] + "..."
|
||||
|
||||
|
||||
def metadata_overlay_lines(row: dict, *, max_name_chars: int) -> list[str]:
|
||||
sid = row.get("scan_id", "").strip()
|
||||
st = row.get("scan_time", "").strip()
|
||||
name = _truncate(row.get("name", "").strip(), max_name_chars)
|
||||
nx = row.get("nx", "").strip()
|
||||
ny = row.get("ny", "").strip()
|
||||
dx = row.get("dx", "").strip()
|
||||
dy = row.get("dy", "").strip()
|
||||
lines = row.get("scan_lines", "").strip()
|
||||
mode = row.get("scan_mode", "").strip()
|
||||
user = row.get("user", "").strip()
|
||||
status = row.get("status", "").strip()
|
||||
sx = row.get("start_x", "").strip()
|
||||
sy = row.get("start_y", "").strip()
|
||||
ex = row.get("end_x", "").strip()
|
||||
ey = row.get("end_y", "").strip()
|
||||
machine = row.get("machine", "").strip()
|
||||
|
||||
grid = f"{nx}x{ny}" if nx and ny else ""
|
||||
step = f"{dx}x{dy} mm" if dx and dy else ""
|
||||
geom = " ".join(p for p in (grid, step) if p)
|
||||
orient = f"{lines} / {mode}" if lines or mode else ""
|
||||
|
||||
out: list[str] = []
|
||||
if machine:
|
||||
out.append(machine)
|
||||
if sid or st:
|
||||
parts = []
|
||||
if sid:
|
||||
parts.append(f"id {sid}")
|
||||
if st:
|
||||
parts.append(st)
|
||||
out.append(" ".join(parts))
|
||||
if name:
|
||||
out.append(name)
|
||||
if geom or orient:
|
||||
out.append(" ".join(p for p in (geom, orient) if p))
|
||||
if sx and sy and ex and ey:
|
||||
out.append(f"ROI mm {sx},{sy} .. {ex},{ey}")
|
||||
if user or status:
|
||||
tail: list[str] = []
|
||||
if user:
|
||||
tail.append(f"user {user}")
|
||||
if status:
|
||||
tail.append(status)
|
||||
out.append(" ".join(tail))
|
||||
return out if out else ["(no metadata)"]
|
||||
|
||||
|
||||
def draw_metadata_overlay(
|
||||
rgb: Image.Image,
|
||||
row: dict,
|
||||
*,
|
||||
margin: int,
|
||||
) -> None:
|
||||
"""Draw a semi-transparent label block along the top; mutates rgb in place."""
|
||||
# Panel alpha: ~50% so roots stay visible through the bar.
|
||||
panel_fill = (0, 0, 0, 128)
|
||||
panel_outline = (220, 220, 230, 100)
|
||||
|
||||
w, h = rgb.size
|
||||
margin = max(4, min(margin, w // 8))
|
||||
font_size = max(10, min(22, h // 50))
|
||||
pad = max(4, font_size // 3)
|
||||
line_gap = max(2, font_size // 6)
|
||||
|
||||
def measure_block(fs: int, name_max: int) -> tuple[ImageFont.FreeTypeFont | ImageFont.ImageFont, list[str], int, int]:
|
||||
font = get_overlay_font(fs)
|
||||
lines = metadata_overlay_lines(row, max_name_chars=name_max)
|
||||
tmp = Image.new("RGB", (1, 1))
|
||||
draw = ImageDraw.Draw(tmp)
|
||||
max_tw = 0
|
||||
total_h = 0
|
||||
for line in lines:
|
||||
bbox = draw.textbbox((0, 0), line, font=font)
|
||||
tw = bbox[2] - bbox[0]
|
||||
th = bbox[3] - bbox[1]
|
||||
max_tw = max(max_tw, tw)
|
||||
total_h += th
|
||||
if len(lines) > 1:
|
||||
total_h += line_gap * (len(lines) - 1)
|
||||
block_w = max_tw + 2 * pad
|
||||
block_h = total_h + 2 * pad
|
||||
return font, lines, block_w, block_h
|
||||
|
||||
name_max = max(24, w // max(6, font_size // 2))
|
||||
font, lines, block_w, block_h = measure_block(font_size, name_max)
|
||||
max_block_w = w - 2 * margin
|
||||
max_block_h = min(h // 2, h - 2 * margin)
|
||||
while (block_w > max_block_w or block_h > max_block_h) and font_size > 9:
|
||||
font_size -= 1
|
||||
name_max = max(16, w // max(7, font_size // 2))
|
||||
font, lines, block_w, block_h = measure_block(font_size, name_max)
|
||||
while block_w > max_block_w and name_max > 12:
|
||||
name_max -= 4
|
||||
font, lines, block_w, block_h = measure_block(font_size, name_max)
|
||||
|
||||
bw = min(block_w, max_block_w)
|
||||
bh = min(block_h, max_block_h)
|
||||
x0 = margin
|
||||
x1 = x0 + bw
|
||||
y0 = margin
|
||||
y1 = y0 + bh
|
||||
|
||||
overlay = Image.new("RGBA", (w, h), (0, 0, 0, 0))
|
||||
draw_ov = ImageDraw.Draw(overlay)
|
||||
draw_ov.rounded_rectangle(
|
||||
[x0, y0, x1, y1],
|
||||
radius=max(4, pad // 2),
|
||||
fill=panel_fill,
|
||||
outline=panel_outline,
|
||||
width=1,
|
||||
)
|
||||
|
||||
base = rgb.convert("RGBA")
|
||||
composited = Image.alpha_composite(base, overlay)
|
||||
draw = ImageDraw.Draw(composited)
|
||||
|
||||
cx, cy = x0 + pad, y0 + pad
|
||||
text_bottom_limit = y1 - pad
|
||||
for line in lines:
|
||||
bbox = draw.textbbox((0, 0), line, font=font)
|
||||
th = bbox[3] - bbox[1]
|
||||
if cy + th > text_bottom_limit:
|
||||
break
|
||||
draw.text(
|
||||
(cx, cy),
|
||||
line,
|
||||
fill=(245, 245, 245, 255),
|
||||
font=font,
|
||||
stroke_width=1,
|
||||
stroke_fill=(0, 0, 0, 255),
|
||||
)
|
||||
cy += th + line_gap
|
||||
|
||||
rgb.paste(composited.convert("RGB"))
|
||||
|
||||
|
||||
def encode_movie(
|
||||
*,
|
||||
machine: str,
|
||||
roi: tuple[float, float, float, float],
|
||||
rows: list[dict],
|
||||
archive: Path,
|
||||
max_height: int | None,
|
||||
metadata_overlay: bool,
|
||||
fps: float,
|
||||
output: Path | None = None,
|
||||
dry_run: bool = False,
|
||||
rank: int | None = None,
|
||||
quiet: bool = False,
|
||||
) -> EncodedMovieResult:
|
||||
"""Build MP4 from pre-filtered rows for one ROI. Does not sys.exit on encode failures."""
|
||||
rows_sorted = sorted(
|
||||
rows,
|
||||
key=lambda r: (r.get("scan_time") or "", r.get("scan_id") or ""),
|
||||
)
|
||||
csv_frame_count = len(rows_sorted)
|
||||
|
||||
row_path_candidates: list[tuple[dict, Path]] = []
|
||||
for r in rows_sorted:
|
||||
rel = (r.get("mosaic_local_path") or "").strip()
|
||||
if not rel:
|
||||
continue
|
||||
row_path_candidates.append((r, resolve_mosaic_path(rel, archive)))
|
||||
|
||||
out: Path = output or default_output_path(
|
||||
archive,
|
||||
machine,
|
||||
roi,
|
||||
max_height=max_height,
|
||||
metadata_overlay=metadata_overlay,
|
||||
rank=rank,
|
||||
)
|
||||
|
||||
if not quiet:
|
||||
print(f"Machine: {machine}")
|
||||
print(f"ROI (mm): {roi[0]}, {roi[1]}, {roi[2]}, {roi[3]}")
|
||||
print(f"Frames (from CSV, deduped): {csv_frame_count}")
|
||||
if max_height is not None:
|
||||
print(f"Preview max height: {max_height}px")
|
||||
print(f"Metadata overlay: {'on' if metadata_overlay else 'off'}")
|
||||
|
||||
on_disk = sum(1 for _r, p in row_path_candidates if p.is_file())
|
||||
missing_paths = len(row_path_candidates) - on_disk
|
||||
|
||||
if dry_run:
|
||||
if not quiet:
|
||||
print(f"On-disk files among ordered list: {on_disk} / {len(row_path_candidates)}")
|
||||
for i, (_r, p) in enumerate(row_path_candidates[:5]):
|
||||
print(f" [{i}] {p} exists={p.is_file()}")
|
||||
if len(row_path_candidates) > 10:
|
||||
print(" ...")
|
||||
start = max(0, len(row_path_candidates) - 3)
|
||||
for i, (_r, p) in enumerate(row_path_candidates[start:], start=start):
|
||||
print(f" [{i}] {p} exists={p.is_file()}")
|
||||
return EncodedMovieResult(
|
||||
success=True,
|
||||
machine=machine,
|
||||
roi=roi,
|
||||
csv_frame_count=csv_frame_count,
|
||||
written=0,
|
||||
missing=missing_paths,
|
||||
dropped_read=0,
|
||||
output_path=out,
|
||||
skipped_reason=None,
|
||||
size_mb=None,
|
||||
elapsed_s=None,
|
||||
)
|
||||
|
||||
row_path_pairs = [(r, p) for r, p in row_path_candidates if p.is_file()]
|
||||
if not row_path_pairs:
|
||||
if not quiet:
|
||||
print("No mosaic files on disk for the selected rows.")
|
||||
return EncodedMovieResult(
|
||||
success=False,
|
||||
machine=machine,
|
||||
roi=roi,
|
||||
csv_frame_count=csv_frame_count,
|
||||
written=0,
|
||||
missing=missing_paths,
|
||||
dropped_read=0,
|
||||
output_path=out,
|
||||
skipped_reason="No mosaic files on disk for the selected rows.",
|
||||
size_mb=None,
|
||||
elapsed_s=None,
|
||||
)
|
||||
|
||||
t0 = time.perf_counter()
|
||||
try:
|
||||
ordered_paths = [p for _r, p in row_path_pairs]
|
||||
target_w, target_h = frame_size_mode(ordered_paths)
|
||||
enc_w, enc_h = encode_size(target_w, target_h, max_height)
|
||||
except MovieEncodeError as e:
|
||||
if not quiet:
|
||||
print(str(e))
|
||||
return EncodedMovieResult(
|
||||
success=False,
|
||||
machine=machine,
|
||||
roi=roi,
|
||||
csv_frame_count=csv_frame_count,
|
||||
written=0,
|
||||
missing=missing_paths,
|
||||
dropped_read=0,
|
||||
output_path=out,
|
||||
skipped_reason=str(e),
|
||||
size_mb=None,
|
||||
elapsed_s=None,
|
||||
)
|
||||
|
||||
if not quiet:
|
||||
print(f"Target frame size (mode): {target_w} x {target_h}")
|
||||
if (enc_w, enc_h) != (target_w, target_h):
|
||||
print(f"Encode size (after max-height): {enc_w} x {enc_h}")
|
||||
|
||||
out.parent.mkdir(parents=True, exist_ok=True)
|
||||
written = 0
|
||||
dropped = 0
|
||||
resized = 0
|
||||
scaled_preview = 0
|
||||
try:
|
||||
writer = imageio.get_writer(
|
||||
str(out),
|
||||
fps=float(fps),
|
||||
codec="libx264",
|
||||
quality=8,
|
||||
macro_block_size=1,
|
||||
)
|
||||
try:
|
||||
for row, p in row_path_pairs:
|
||||
try:
|
||||
with Image.open(p) as im:
|
||||
rgb = im.convert("RGB")
|
||||
if rgb.size != (target_w, target_h):
|
||||
rgb = rgb.resize((target_w, target_h), Image.Resampling.LANCZOS)
|
||||
resized += 1
|
||||
if rgb.size != (enc_w, enc_h):
|
||||
rgb = rgb.resize((enc_w, enc_h), Image.Resampling.LANCZOS)
|
||||
scaled_preview += 1
|
||||
if metadata_overlay:
|
||||
draw_metadata_overlay(rgb, row, margin=max(6, enc_w // 80))
|
||||
frame = np.asarray(rgb)
|
||||
writer.append_data(frame)
|
||||
written += 1
|
||||
except OSError:
|
||||
dropped += 1
|
||||
finally:
|
||||
writer.close()
|
||||
except Exception as e:
|
||||
if not quiet:
|
||||
print(f"Encode error: {e}")
|
||||
return EncodedMovieResult(
|
||||
success=False,
|
||||
machine=machine,
|
||||
roi=roi,
|
||||
csv_frame_count=csv_frame_count,
|
||||
written=written,
|
||||
missing=missing_paths,
|
||||
dropped_read=dropped,
|
||||
output_path=out,
|
||||
skipped_reason=f"encode_error: {e}",
|
||||
size_mb=None,
|
||||
elapsed_s=None,
|
||||
)
|
||||
|
||||
elapsed = time.perf_counter() - t0
|
||||
size_mb = out.stat().st_size / (1024 * 1024) if out.is_file() else 0.0
|
||||
if not quiet:
|
||||
print(
|
||||
f"Written: {written} frames (normalized to mode: {resized}, "
|
||||
f"preview scale: {scaled_preview})"
|
||||
)
|
||||
print(f"Dropped (read error): {dropped}")
|
||||
print(f"Missing paths (not on disk): {missing_paths}")
|
||||
print(f"Output: {out.resolve()} ({size_mb:.2f} MB)")
|
||||
|
||||
return EncodedMovieResult(
|
||||
success=True,
|
||||
machine=machine,
|
||||
roi=roi,
|
||||
csv_frame_count=csv_frame_count,
|
||||
written=written,
|
||||
missing=missing_paths,
|
||||
dropped_read=dropped,
|
||||
output_path=out,
|
||||
skipped_reason=None,
|
||||
size_mb=size_mb,
|
||||
elapsed_s=elapsed,
|
||||
)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
args = parse_args()
|
||||
archive: Path = args.archive
|
||||
scans_csv: Path = args.scans_csv or (archive / "scans.csv")
|
||||
if not scans_csv.is_file():
|
||||
sys.exit(f"scans.csv not found: {scans_csv}")
|
||||
|
||||
roi_sel: tuple[float, float, float, float] | None
|
||||
if args.roi:
|
||||
roi_sel = parse_roi(args.roi)
|
||||
else:
|
||||
roi_sel = None
|
||||
|
||||
rows = load_latest_rows(scans_csv, args.machine, roi_sel)
|
||||
if roi_sel is None:
|
||||
roi_sel = pick_top_roi(rows)
|
||||
rows = [r for r in rows if extent_close(r, roi_sel)]
|
||||
|
||||
assert roi_sel is not None
|
||||
max_height: int | None = args.max_height
|
||||
metadata_overlay = not args.no_metadata_overlay
|
||||
|
||||
res = encode_movie(
|
||||
machine=args.machine,
|
||||
roi=roi_sel,
|
||||
rows=rows,
|
||||
archive=archive,
|
||||
max_height=max_height,
|
||||
metadata_overlay=metadata_overlay,
|
||||
fps=float(args.fps),
|
||||
output=args.output,
|
||||
dry_run=bool(args.dry_run),
|
||||
rank=None,
|
||||
quiet=False,
|
||||
)
|
||||
if not res.success and not args.dry_run:
|
||||
sys.exit(1 if res.skipped_reason else 1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,269 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Build preview MP4s for the top N ROIs per machine (default N=2, max-height 1080).
|
||||
|
||||
Reads archives/scans.csv once, groups on-disk mosaic rows by machine, then for
|
||||
each machine picks the most frequent ROI extents and calls encode_movie().
|
||||
|
||||
Usage:
|
||||
python scripts/build_mosaic_movies_batch.py
|
||||
python scripts/build_mosaic_movies_batch.py --dry-run
|
||||
python scripts/build_mosaic_movies_batch.py --skip-existing
|
||||
python scripts/build_mosaic_movies_batch.py --machine "BW2-10 [AMR-22]"
|
||||
python scripts/build_mosaic_movies_batch.py --full-res # no max-height cap
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
_SCRIPTS_DIR = Path(__file__).resolve().parent
|
||||
|
||||
# Import sibling module (run as python scripts/build_mosaic_movies_batch.py from repo root)
|
||||
sys.path.insert(0, str(_SCRIPTS_DIR))
|
||||
import build_mosaic_movie as bmm # noqa: E402
|
||||
|
||||
|
||||
def read_machine_labels(path: Path) -> list[str]:
|
||||
out: list[str] = []
|
||||
with path.open(encoding="utf-8") as fh:
|
||||
for line in fh:
|
||||
s = line.strip()
|
||||
if not s or s.startswith("#"):
|
||||
continue
|
||||
out.append(s)
|
||||
return out
|
||||
|
||||
|
||||
@dataclass
|
||||
class Job:
|
||||
machine: str
|
||||
rank: int
|
||||
roi: tuple[float, float, float, float]
|
||||
extent_count: int
|
||||
rows: list[dict]
|
||||
output_path: Path
|
||||
|
||||
|
||||
def collect_jobs(
|
||||
*,
|
||||
machines: list[str],
|
||||
by_machine: dict[str, list[dict]],
|
||||
archive: Path,
|
||||
top_rois: int,
|
||||
max_height: int | None,
|
||||
metadata_overlay: bool,
|
||||
) -> list[Job]:
|
||||
jobs: list[Job] = []
|
||||
for machine in machines:
|
||||
rows = by_machine.get(machine, [])
|
||||
if not rows:
|
||||
continue
|
||||
picks = bmm.pick_top_rois(rows, top_rois)
|
||||
for rank, (roi, extent_count) in enumerate(picks, start=1):
|
||||
rows_roi = [r for r in rows if bmm.extent_close(r, roi)]
|
||||
out = bmm.default_output_path(
|
||||
archive,
|
||||
machine,
|
||||
roi,
|
||||
max_height=max_height,
|
||||
metadata_overlay=metadata_overlay,
|
||||
rank=rank,
|
||||
)
|
||||
jobs.append(
|
||||
Job(
|
||||
machine=machine,
|
||||
rank=rank,
|
||||
roi=roi,
|
||||
extent_count=extent_count,
|
||||
rows=rows_roi,
|
||||
output_path=out,
|
||||
)
|
||||
)
|
||||
return jobs
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
p = argparse.ArgumentParser(description=__doc__)
|
||||
p.add_argument(
|
||||
"--machines-file",
|
||||
type=Path,
|
||||
default=_SCRIPTS_DIR / "machines.example.txt",
|
||||
help="One machine label per line (default: scripts/machines.example.txt next to this script)",
|
||||
)
|
||||
p.add_argument(
|
||||
"--machine",
|
||||
metavar="LABEL",
|
||||
help='If set, only this machine (overrides list to a single job set), e.g. "BW2-10 [AMR-22]"',
|
||||
)
|
||||
p.add_argument("--archive", type=Path, default=Path("archives"))
|
||||
p.add_argument("--scans-csv", type=Path, default=None)
|
||||
p.add_argument("--top-rois", type=int, default=2, help="How many top extents per machine (default: 2)")
|
||||
p.add_argument("--max-height", type=int, default=1080, help="Preview cap in px (default: 1080)")
|
||||
p.add_argument(
|
||||
"--full-res",
|
||||
action="store_true",
|
||||
help="Disable max-height cap (full mosaic resolution; can be huge)",
|
||||
)
|
||||
p.add_argument("--fps", type=float, default=10.0)
|
||||
p.add_argument("--dry-run", action="store_true")
|
||||
p.add_argument(
|
||||
"--skip-existing",
|
||||
action="store_true",
|
||||
help="Skip encode if output MP4 exists and is non-empty",
|
||||
)
|
||||
p.add_argument("--no-metadata-overlay", action="store_true")
|
||||
args = p.parse_args()
|
||||
if args.full_res:
|
||||
args.max_height = None
|
||||
return args
|
||||
|
||||
|
||||
def main() -> None:
|
||||
args = parse_args()
|
||||
archive: Path = args.archive
|
||||
scans_csv: Path = args.scans_csv or (archive / "scans.csv")
|
||||
if not scans_csv.is_file():
|
||||
sys.exit(f"scans.csv not found: {scans_csv}")
|
||||
|
||||
if args.machine:
|
||||
machines = [args.machine.strip()]
|
||||
else:
|
||||
if not args.machines_file.is_file():
|
||||
sys.exit(f"Machines file not found: {args.machines_file}")
|
||||
machines = read_machine_labels(args.machines_file)
|
||||
if not machines:
|
||||
sys.exit(f"No machine labels in {args.machines_file}")
|
||||
|
||||
t_load0 = time.perf_counter()
|
||||
by_machine = bmm.load_on_disk_rows_by_machine(scans_csv)
|
||||
load_s = time.perf_counter() - t_load0
|
||||
|
||||
max_height: int | None = args.max_height
|
||||
metadata_overlay = not args.no_metadata_overlay
|
||||
|
||||
jobs = collect_jobs(
|
||||
machines=machines,
|
||||
by_machine=by_machine,
|
||||
archive=archive,
|
||||
top_rois=args.top_rois,
|
||||
max_height=max_height,
|
||||
metadata_overlay=metadata_overlay,
|
||||
)
|
||||
total = len(jobs)
|
||||
if total == 0:
|
||||
sys.exit("No jobs (no on-disk mosaics for selected machines).")
|
||||
|
||||
print(f"Loaded scans.csv grouped by machine in {load_s:.2f}s ({total} job(s))")
|
||||
if max_height is not None:
|
||||
print(f"Max height: {max_height}px")
|
||||
else:
|
||||
print("Max height: (full resolution)")
|
||||
print(f"Metadata overlay: {'on' if metadata_overlay else 'off'}")
|
||||
print()
|
||||
|
||||
summary_rows: list[tuple[str, ...]] = []
|
||||
|
||||
for idx, job in enumerate(jobs, start=1):
|
||||
sx, sy, ex, ey = job.roi
|
||||
roi_s = f"{sx},{sy}..{ex},{ey}"
|
||||
print(
|
||||
f"[{idx}/{total}] {job.machine} rank={job.rank} ROI {roi_s} "
|
||||
f"({job.extent_count} CSV rows this extent, {len(job.rows)} deduped rows)"
|
||||
)
|
||||
|
||||
if args.skip_existing and job.output_path.is_file() and job.output_path.stat().st_size > 0:
|
||||
sz = job.output_path.stat().st_size / (1024 * 1024)
|
||||
print(f" SKIP (exists): {job.output_path}")
|
||||
summary_rows.append(
|
||||
(
|
||||
job.machine,
|
||||
str(job.rank),
|
||||
roi_s,
|
||||
str(len(job.rows)),
|
||||
"-",
|
||||
"-",
|
||||
str(job.output_path),
|
||||
"SKIP (exists)",
|
||||
f"{sz:.2f}",
|
||||
"-",
|
||||
)
|
||||
)
|
||||
continue
|
||||
|
||||
enc_t0 = time.perf_counter()
|
||||
res = bmm.encode_movie(
|
||||
machine=job.machine,
|
||||
roi=job.roi,
|
||||
rows=job.rows,
|
||||
archive=archive,
|
||||
max_height=max_height,
|
||||
metadata_overlay=metadata_overlay,
|
||||
fps=float(args.fps),
|
||||
output=None,
|
||||
dry_run=args.dry_run,
|
||||
rank=job.rank,
|
||||
quiet=True,
|
||||
)
|
||||
enc_elapsed = time.perf_counter() - enc_t0
|
||||
|
||||
if res.success:
|
||||
if args.dry_run:
|
||||
print(f" dry-run OK -> {res.output_path} (missing files: {res.missing})")
|
||||
else:
|
||||
print(
|
||||
f" OK {res.written} frames "
|
||||
f"{(res.size_mb or 0):.2f} MB {enc_elapsed:.1f}s"
|
||||
)
|
||||
else:
|
||||
print(f" FAIL {res.skipped_reason or 'unknown'}")
|
||||
|
||||
status = "OK" if res.success else "FAIL"
|
||||
if not res.success and res.skipped_reason:
|
||||
status = f"FAIL: {res.skipped_reason[:40]}"
|
||||
mb = f"{res.size_mb:.2f}" if res.size_mb is not None else "-"
|
||||
es = f"{enc_elapsed:.1f}" if not args.dry_run else "-"
|
||||
w = str(res.written) if not args.dry_run else "0"
|
||||
if args.dry_run:
|
||||
status = "dry-run"
|
||||
mb = "-"
|
||||
|
||||
summary_rows.append(
|
||||
(
|
||||
job.machine,
|
||||
str(job.rank),
|
||||
roi_s,
|
||||
str(len(job.rows)),
|
||||
w,
|
||||
str(res.missing),
|
||||
str(res.output_path or job.output_path),
|
||||
status,
|
||||
mb,
|
||||
es,
|
||||
)
|
||||
)
|
||||
print()
|
||||
|
||||
print("=" * 120)
|
||||
hdr = (
|
||||
f"{'machine':<26} {'rk':>2} {'ROI (mm)':<36} {'csv':>4} {'out':>5} "
|
||||
f"{'miss':>4} {'MB':>7} {'s':>6} {'status':<22}"
|
||||
)
|
||||
print(hdr)
|
||||
print("-" * 120)
|
||||
for row in summary_rows:
|
||||
m, rk, roi_s, csv_n, wrt, miss, path, status, mb, es = row
|
||||
print(
|
||||
f"{m:<26} {rk:>2} {roi_s:<36} {csv_n:>4} {wrt:>5} {miss:>4} "
|
||||
f"{mb:>7} {es:>6} {status:<22}"
|
||||
)
|
||||
print(f" -> {path}")
|
||||
print("=" * 120)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,105 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Split scans.csv into per-machine metadata CSVs.
|
||||
|
||||
Reads the combined scans.csv produced by the scraper and writes one CSV per
|
||||
machine containing only the website-sourced metadata columns (no mosaic paths,
|
||||
download status, or error fields).
|
||||
|
||||
Usage:
|
||||
python scripts/export_machine_metadata.py
|
||||
python scripts/export_machine_metadata.py --input archives/scans.csv --output-dir archives/by_machine
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
import sys
|
||||
from collections import defaultdict
|
||||
from pathlib import Path
|
||||
|
||||
METADATA_COLUMNS = [
|
||||
"machine",
|
||||
"machine_id",
|
||||
"scan_id",
|
||||
"name",
|
||||
"scan_time",
|
||||
"start_x",
|
||||
"start_y",
|
||||
"end_x",
|
||||
"end_y",
|
||||
"dx",
|
||||
"dy",
|
||||
"nx",
|
||||
"ny",
|
||||
"total_tiles",
|
||||
"scan_lines",
|
||||
"scan_mode",
|
||||
"start_datetime",
|
||||
"end_datetime",
|
||||
"status",
|
||||
"user",
|
||||
"disk_space_mb",
|
||||
]
|
||||
|
||||
|
||||
def sanitize_machine_label(label: str) -> str:
|
||||
return label.replace("[", "").replace("]", "").replace(" ", "_").strip("_")
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
p = argparse.ArgumentParser(description="Split scans.csv into per-machine metadata CSVs.")
|
||||
p.add_argument(
|
||||
"--input",
|
||||
default="archives/scans.csv",
|
||||
metavar="FILE",
|
||||
help="Path to scans.csv (default: archives/scans.csv)",
|
||||
)
|
||||
p.add_argument(
|
||||
"--output-dir",
|
||||
default="archives/by_machine",
|
||||
metavar="DIR",
|
||||
help="Directory for output CSVs (default: archives/by_machine)",
|
||||
)
|
||||
return p.parse_args()
|
||||
|
||||
|
||||
def main() -> None:
|
||||
args = parse_args()
|
||||
input_path = Path(args.input)
|
||||
output_dir = Path(args.output_dir)
|
||||
|
||||
if not input_path.exists():
|
||||
sys.exit(f"Input file not found: {input_path}")
|
||||
|
||||
with input_path.open(newline="") as fh:
|
||||
reader = csv.DictReader(fh)
|
||||
if reader.fieldnames is None:
|
||||
sys.exit(f"{input_path} appears to be empty.")
|
||||
|
||||
missing = [c for c in METADATA_COLUMNS if c not in reader.fieldnames]
|
||||
if missing:
|
||||
sys.exit(f"Expected columns not found in {input_path}: {missing}")
|
||||
|
||||
rows_by_machine: dict[str, list[dict]] = defaultdict(list)
|
||||
for row in reader:
|
||||
rows_by_machine[row["machine"]].append(row)
|
||||
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
for machine_label, rows in sorted(rows_by_machine.items()):
|
||||
safe_name = sanitize_machine_label(machine_label)
|
||||
out_path = output_dir / f"{safe_name}_scans_metadata.csv"
|
||||
with out_path.open("w", newline="") as fh:
|
||||
writer = csv.DictWriter(fh, fieldnames=METADATA_COLUMNS, extrasaction="ignore")
|
||||
writer.writeheader()
|
||||
writer.writerows(rows)
|
||||
print(f" {out_path} ({len(rows)} rows)")
|
||||
|
||||
total = sum(len(r) for r in rows_by_machine.values())
|
||||
print(f"\n{len(rows_by_machine)} machine(s), {total} total rows → {output_dir}/")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,6 +1,6 @@
|
||||
# All RootView minirhizotron machine labels (same set as `machine_metadata` in config.example.yaml).
|
||||
# Copy to the repo root as machines.txt, or: cp scripts/machines.example.txt machines.txt
|
||||
# sample_random_scans.sh: by default one random scan per line = mosaic + tiles; use MOSAIC_ONLY=1 for mosaics only
|
||||
# Random-sample helper `scripts/sample_random_scans.sh` lives on branch `testing/sample-runs` only.
|
||||
BW1-4 [AMR-15]
|
||||
BW1-6 [AMR-19]
|
||||
BW1-7 [AMR-18]
|
||||
|
||||
@@ -0,0 +1,481 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Report mosaic download progress from archives/scans.csv.
|
||||
|
||||
Output is Markdown. Use ``--by-year`` for a per-machine × per-year
|
||||
done/failed table. When the first mosaic pass is complete (no pending rows)
|
||||
but failures remain, a **Mosaic retry estimates** section is printed with
|
||||
queue counts and duration hints.
|
||||
|
||||
Rate/ETA use a 30-minute rolling window when snapshots show progress.
|
||||
Mean mosaic size is sampled from up to 100 downloads (1-hour cache).
|
||||
|
||||
Usage:
|
||||
python scripts/mosaic_progress_report.py [--archive DIR] [--by-year]
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
import json
|
||||
import os
|
||||
import random
|
||||
import sys
|
||||
from collections import Counter
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
# Year-based viability model derived from BW1-4 observations:
|
||||
# pre-2019 → kept on server long-term (~100 %)
|
||||
# 2019-2022 → purged (~ 0 %)
|
||||
# 2023+ → recent, mostly available (~82 %)
|
||||
_R_PRE19 = 1.00
|
||||
_R_PURGED = 0.00
|
||||
_R_RECENT = 0.82
|
||||
|
||||
FIRST_PASS_FALLBACK_RATE_PER_HR = 1100.0
|
||||
RETRY_OPTIMISTIC_RATE_PER_HR = 1800.0
|
||||
RETRY_REALISTIC_RATE_PER_HR = 1100.0
|
||||
RETRY_PESSIMISTIC_RATE_PER_HR = 300.0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _parse_dt(s: str) -> datetime | None:
|
||||
try:
|
||||
return datetime.fromisoformat(s.replace("Z", "+00:00"))
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def _fmt_duration(seconds: float) -> str:
|
||||
if seconds < 0:
|
||||
return "?"
|
||||
h = int(seconds // 3600)
|
||||
m = int((seconds % 3600) // 60)
|
||||
if h >= 48:
|
||||
return f"{h // 24}d {h % 24}h"
|
||||
if h > 0:
|
||||
return f"{h}h {m:02d}m"
|
||||
return f"{m}m {int(seconds % 60):02d}s"
|
||||
|
||||
|
||||
def _fmt_size(b: float) -> str:
|
||||
if b >= 1e12:
|
||||
return f"{b/1e12:.2f} TB"
|
||||
if b >= 1e9:
|
||||
return f"{b/1e9:.2f} GB"
|
||||
if b >= 1e6:
|
||||
return f"{b/1e6:.1f} MB"
|
||||
return f"{b/1e3:.0f} KB"
|
||||
|
||||
|
||||
def _md_table(headers: list[str], rows: list[list[str]], *, align: list[str] | None = None) -> str:
|
||||
"""Render a Markdown table. align values: 'l', 'r', 'c' (default 'l')."""
|
||||
if align is None:
|
||||
align = ["l"] * len(headers)
|
||||
sep_map = {"l": ":---", "r": "---:", "c": ":---:"}
|
||||
|
||||
def row_str(cells: list[str]) -> str:
|
||||
return "| " + " | ".join(cells) + " |"
|
||||
|
||||
lines = [
|
||||
row_str(headers),
|
||||
row_str([sep_map.get(a, ":---") for a in align]),
|
||||
]
|
||||
for r in rows:
|
||||
lines.append(row_str(r))
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def _sample_mean_bytes(rows: list[dict], cache: dict, max_sample: int = 100) -> float | None:
|
||||
cached_mean = cache.get("mean_bytes")
|
||||
cached_n = cache.get("sample_n", 0)
|
||||
cached_ts = _parse_dt(cache.get("size_ts", ""))
|
||||
now = datetime.now(timezone.utc)
|
||||
if (
|
||||
cached_mean and cached_ts
|
||||
and (now - cached_ts).total_seconds() < 3600
|
||||
and cached_n >= min(max_sample, len(rows))
|
||||
):
|
||||
return float(cached_mean)
|
||||
|
||||
sample = random.sample(rows, min(max_sample, len(rows)))
|
||||
sizes = []
|
||||
for row in sample:
|
||||
p = row.get("mosaic_local_path", "")
|
||||
if p:
|
||||
try:
|
||||
sz = os.path.getsize(p)
|
||||
if sz > 0:
|
||||
sizes.append(sz)
|
||||
except OSError:
|
||||
pass
|
||||
if not sizes:
|
||||
return None
|
||||
mean = sum(sizes) / len(sizes)
|
||||
cache["mean_bytes"] = mean
|
||||
cache["sample_n"] = len(sizes)
|
||||
cache["size_ts"] = now.isoformat()
|
||||
return mean
|
||||
|
||||
|
||||
def _expected_remaining(pending_rows: list[dict]) -> float:
|
||||
count = 0.0
|
||||
for row in pending_rows:
|
||||
yr = row.get("scan_time", "")[:4]
|
||||
if yr < "2019":
|
||||
count += _R_PRE19
|
||||
elif yr <= "2022":
|
||||
count += _R_PURGED
|
||||
else:
|
||||
count += _R_RECENT
|
||||
return count
|
||||
|
||||
|
||||
def _retry_hours_from_rate(n_scans: int, rate_per_hr: float) -> str:
|
||||
if n_scans <= 0 or rate_per_hr <= 0:
|
||||
return "—"
|
||||
return _fmt_duration(n_scans / rate_per_hr * 3600.0)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument("--archive", default="archives")
|
||||
parser.add_argument(
|
||||
"--by-year", action="store_true",
|
||||
help="Add a per-machine × per-year done/failed breakdown table",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
archive = Path(args.archive)
|
||||
scans_csv = archive / "scans.csv"
|
||||
progress_json = archive / ".progress.json"
|
||||
rate_cache_path = archive / ".mosaic_rate_cache.json"
|
||||
|
||||
if not scans_csv.exists():
|
||||
sys.exit(f"scans.csv not found: {scans_csv}")
|
||||
|
||||
# --- Load & deduplicate (last row per machine+scan_id) ---
|
||||
latest: dict[tuple[str, str], dict] = {}
|
||||
with open(scans_csv, newline="", encoding="utf-8") as f:
|
||||
for row in csv.DictReader(f):
|
||||
key = (row.get("machine", ""), row.get("scan_id", ""))
|
||||
latest[key] = row
|
||||
|
||||
by_machine: dict[str, Counter] = {}
|
||||
# machine -> year -> Counter(status -> count)
|
||||
by_machine_year: dict[str, dict[str, Counter]] = {}
|
||||
total_counts: Counter = Counter()
|
||||
downloaded_rows: list[dict] = []
|
||||
pending_rows: list[dict] = []
|
||||
|
||||
for (_m, _sid), row in latest.items():
|
||||
status = row.get("mosaic_download_status", "")
|
||||
m = row.get("machine", "")
|
||||
yr = (row.get("scan_time") or "")[:4] or "????"
|
||||
by_machine.setdefault(m, Counter())[status] += 1
|
||||
by_machine_year.setdefault(m, {}).setdefault(yr, Counter())[status] += 1
|
||||
total_counts[status] += 1
|
||||
if status == "downloaded":
|
||||
downloaded_rows.append(row)
|
||||
elif status == "skipped_metadata_only":
|
||||
pending_rows.append(row)
|
||||
|
||||
total = sum(total_counts.values())
|
||||
downloaded = total_counts["downloaded"]
|
||||
failed = total_counts["failed"]
|
||||
zero_skipped = total_counts["skipped_zero_disk_space"]
|
||||
pending = total_counts["skipped_metadata_only"]
|
||||
processed = downloaded + failed + zero_skipped
|
||||
attempted = downloaded + failed
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
# --- Elapsed ---
|
||||
elapsed_str = ""
|
||||
if progress_json.exists():
|
||||
try:
|
||||
data = json.loads(progress_json.read_text())
|
||||
started_at = _parse_dt(data.get("started_at", ""))
|
||||
if started_at:
|
||||
elapsed_str = _fmt_duration((now - started_at).total_seconds())
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# --- Rate cache ---
|
||||
cache: dict = {}
|
||||
if rate_cache_path.exists():
|
||||
try:
|
||||
cache = json.loads(rate_cache_path.read_text())
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Rolling rate: keep up to 60 snapshots; compute rate from the oldest
|
||||
# snapshot within the last 30 minutes for a smoothed estimate.
|
||||
snapshots: list[dict] = cache.get("snapshots", [])
|
||||
# Prune snapshots older than 30 minutes, but keep at least one
|
||||
cutoff = now.timestamp() - 1800
|
||||
recent = [s for s in snapshots if s.get("ts", 0) >= cutoff]
|
||||
if not recent and snapshots:
|
||||
recent = [snapshots[-1]] # always keep one for continuity
|
||||
|
||||
rate_per_sec: float | None = None
|
||||
rate_window_str = ""
|
||||
snap_delta_proc = 0
|
||||
if recent:
|
||||
oldest = recent[0]
|
||||
dt = now.timestamp() - oldest["ts"]
|
||||
dp = processed - oldest["proc"]
|
||||
snap_delta_proc = dp
|
||||
if dt >= 60 and dp > 0:
|
||||
rate_per_sec = dp / dt
|
||||
window_min = dt / 60
|
||||
rate_window_str = f"{window_min:.0f}-min avg"
|
||||
|
||||
# One-time baseline after initial mosaic crawl finished (no pending rows).
|
||||
if pending == 0 and "first_pass_mean_rate_per_hr" not in cache:
|
||||
cache["first_pass_completed_at"] = now.isoformat()
|
||||
cache["first_pass_processed"] = total
|
||||
cache["first_pass_mean_rate_per_hr"] = FIRST_PASS_FALLBACK_RATE_PER_HR
|
||||
|
||||
first_pass_rate_hr = float(
|
||||
cache.get("first_pass_mean_rate_per_hr", FIRST_PASS_FALLBACK_RATE_PER_HR)
|
||||
)
|
||||
live_rate_hr = rate_per_sec * 3600 if rate_per_sec else None
|
||||
# Active scrape shows progress in snapshots; idle archive shows dp == 0.
|
||||
retry_estimate_rate_hr = (
|
||||
live_rate_hr if live_rate_hr is not None else first_pass_rate_hr
|
||||
)
|
||||
|
||||
# --- Disk space ---
|
||||
mean_bytes: float | None = None
|
||||
size_note = ""
|
||||
if downloaded_rows:
|
||||
mean_bytes = _sample_mean_bytes(downloaded_rows, cache)
|
||||
if mean_bytes and cache.get("sample_n"):
|
||||
size_note = f"mean {_fmt_size(mean_bytes)} × {cache['sample_n']} sampled files"
|
||||
|
||||
dl_bytes: float | None = None
|
||||
rem_bytes: float | None = None
|
||||
if mean_bytes:
|
||||
dl_bytes = downloaded * mean_bytes
|
||||
rem_bytes = _expected_remaining(pending_rows) * mean_bytes
|
||||
|
||||
# Update cache: append new snapshot, keep last 60
|
||||
recent.append({"ts": now.timestamp(), "proc": processed})
|
||||
cache["snapshots"] = recent[-60:]
|
||||
# Keep legacy keys for backwards compat
|
||||
cache["timestamp"] = now.isoformat()
|
||||
cache["processed"] = processed
|
||||
try:
|
||||
rate_cache_path.write_text(json.dumps(cache))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
rate_str = eta_str = ""
|
||||
if rate_per_sec and rate_per_sec > 0:
|
||||
rate_str = f"{rate_per_sec * 3600:,.0f} scans/hr ({rate_window_str})"
|
||||
eta_str = _fmt_duration(pending / rate_per_sec)
|
||||
|
||||
# -----------------------------------------------------------------------
|
||||
# Output — Markdown
|
||||
# -----------------------------------------------------------------------
|
||||
|
||||
ts = datetime.now().strftime("%Y-%m-%d %H:%M")
|
||||
print(f"# Mosaic download progress — {ts}\n")
|
||||
print(f"**Archive:** `{archive.resolve()}` ")
|
||||
meta_parts = []
|
||||
if elapsed_str:
|
||||
meta_parts.append(f"**Elapsed:** {elapsed_str}")
|
||||
if rate_str:
|
||||
meta_parts.append(f"**Rate:** {rate_str}")
|
||||
if eta_str:
|
||||
meta_parts.append(f"**ETA:** {eta_str}")
|
||||
if meta_parts:
|
||||
print(" ".join(meta_parts) + " ")
|
||||
print()
|
||||
|
||||
# Summary table
|
||||
summary_rows = [
|
||||
["Downloaded", f"{downloaded:,}", f"{100*downloaded/total:.1f}%"],
|
||||
["Failed", f"{failed:,}", f"{100*failed/total:.1f}%"],
|
||||
["Skipped (disk=0)",f"{zero_skipped:,}", f"{100*zero_skipped/total:.1f}%"],
|
||||
["Pending", f"{pending:,}", f"{100*pending/total:.1f}%"],
|
||||
["**Total**", f"**{total:,}**", ""],
|
||||
]
|
||||
if attempted:
|
||||
summary_rows.append(["**Success rate**", f"**{100*downloaded/attempted:.1f}%**", "*(of attempted)*"])
|
||||
print(_md_table(["Metric", "Count", ""], summary_rows, align=["l", "r", "l"]))
|
||||
print()
|
||||
|
||||
# Disk space
|
||||
if dl_bytes is not None and rem_bytes is not None:
|
||||
total_bytes = dl_bytes + rem_bytes
|
||||
print(f"### Disk space\n")
|
||||
print(f"_{size_note}_\n")
|
||||
ds_rows = [
|
||||
["Downloaded so far", _fmt_size(dl_bytes), ""],
|
||||
["Estimated remaining", _fmt_size(rem_bytes), "*(model-based)*"],
|
||||
["**Grand total**", f"**{_fmt_size(total_bytes)}**", ""],
|
||||
]
|
||||
print(_md_table(["", "Size", ""], ds_rows, align=["l", "r", "l"]))
|
||||
print()
|
||||
|
||||
# Per-machine breakdown
|
||||
print("### Per-machine breakdown\n")
|
||||
machines = sorted(by_machine)
|
||||
mc_rows = []
|
||||
for m in machines:
|
||||
mc = by_machine[m]
|
||||
mt = sum(mc.values())
|
||||
mc_rows.append([
|
||||
m,
|
||||
f"{mc['downloaded']:,}",
|
||||
f"{mc['failed']:,}",
|
||||
f"{mc['skipped_zero_disk_space']:,}",
|
||||
f"{mc['skipped_metadata_only']:,}",
|
||||
f"{mt:,}",
|
||||
])
|
||||
mc_rows.append([
|
||||
"**TOTAL**",
|
||||
f"**{downloaded:,}**",
|
||||
f"**{failed:,}**",
|
||||
f"**{zero_skipped:,}**",
|
||||
f"**{pending:,}**",
|
||||
f"**{total:,}**",
|
||||
])
|
||||
print(_md_table(
|
||||
["Machine", "Done", "Failed", "Skip0", "Pending", "Total"],
|
||||
mc_rows,
|
||||
align=["l", "r", "r", "r", "r", "r"],
|
||||
))
|
||||
|
||||
# -----------------------------------------------------------------------
|
||||
# Retry estimates (first pass complete: pending == 0, failures remain)
|
||||
# -----------------------------------------------------------------------
|
||||
if failed > 0 and pending == 0:
|
||||
failed_rows_list = [
|
||||
r for r in latest.values()
|
||||
if r.get("mosaic_download_status") == "failed"
|
||||
]
|
||||
n_all = len(failed_rows_list)
|
||||
n_2023 = sum(
|
||||
1 for r in failed_rows_list
|
||||
if (r.get("scan_time") or "")[:4] >= "2023"
|
||||
and len((r.get("scan_time") or "")[:4]) == 4
|
||||
)
|
||||
n_200 = sum(
|
||||
1 for r in failed_rows_list
|
||||
if r.get("mosaic_error_code") == "200"
|
||||
)
|
||||
rate_note = (
|
||||
"rolling 30-min window"
|
||||
if snap_delta_proc > 0
|
||||
else f"first-pass baseline ({first_pass_rate_hr:,.0f}/hr)"
|
||||
)
|
||||
print()
|
||||
print("### Mosaic retry estimates\n")
|
||||
print(
|
||||
f"*Suggested command after server fix:* "
|
||||
f"`python scraper.py --retry-failed --workers 2` "
|
||||
f"(filters: `--retry-since YEAR`, `--retry-error-code CODE`)*\n"
|
||||
)
|
||||
print(
|
||||
f"*ETA column uses **{retry_estimate_rate_hr:,.0f} scans/hr** "
|
||||
f"({rate_note}). Fixed columns use scenario rates.*\n"
|
||||
)
|
||||
est_hdr = (
|
||||
"Retry scope",
|
||||
"Count",
|
||||
f"@{RETRY_OPTIMISTIC_RATE_PER_HR:.0f}/hr",
|
||||
f"@{RETRY_REALISTIC_RATE_PER_HR:.0f}/hr",
|
||||
f"@{RETRY_PESSIMISTIC_RATE_PER_HR:.0f}/hr",
|
||||
f"@{retry_estimate_rate_hr:.0f}/hr",
|
||||
)
|
||||
retry_tbl_rows = [
|
||||
[
|
||||
"HTTP 200 (empty body)",
|
||||
f"{n_200:,}",
|
||||
_retry_hours_from_rate(n_200, RETRY_OPTIMISTIC_RATE_PER_HR),
|
||||
_retry_hours_from_rate(n_200, RETRY_REALISTIC_RATE_PER_HR),
|
||||
_retry_hours_from_rate(n_200, RETRY_PESSIMISTIC_RATE_PER_HR),
|
||||
_retry_hours_from_rate(n_200, retry_estimate_rate_hr),
|
||||
],
|
||||
[
|
||||
"Failed, scan_time ≥ 2023",
|
||||
f"{n_2023:,}",
|
||||
_retry_hours_from_rate(n_2023, RETRY_OPTIMISTIC_RATE_PER_HR),
|
||||
_retry_hours_from_rate(n_2023, RETRY_REALISTIC_RATE_PER_HR),
|
||||
_retry_hours_from_rate(n_2023, RETRY_PESSIMISTIC_RATE_PER_HR),
|
||||
_retry_hours_from_rate(n_2023, retry_estimate_rate_hr),
|
||||
],
|
||||
[
|
||||
"**All failed**",
|
||||
f"**{n_all:,}**",
|
||||
_retry_hours_from_rate(n_all, RETRY_OPTIMISTIC_RATE_PER_HR),
|
||||
_retry_hours_from_rate(n_all, RETRY_REALISTIC_RATE_PER_HR),
|
||||
_retry_hours_from_rate(n_all, RETRY_PESSIMISTIC_RATE_PER_HR),
|
||||
_retry_hours_from_rate(n_all, retry_estimate_rate_hr),
|
||||
],
|
||||
]
|
||||
print(_md_table(list(est_hdr), retry_tbl_rows, align=["l", "r", "r", "r", "r", "r"]))
|
||||
|
||||
# -----------------------------------------------------------------------
|
||||
# --by-year table
|
||||
# -----------------------------------------------------------------------
|
||||
if args.by_year:
|
||||
print()
|
||||
print("### Downloads by machine and year\n")
|
||||
print("*Format: done / failed*\n")
|
||||
|
||||
# Only include years that have at least one downloaded or failed scan
|
||||
all_years = sorted(
|
||||
yr for yr in set(
|
||||
yr for m_data in by_machine_year.values() for yr in m_data
|
||||
)
|
||||
if any(
|
||||
by_machine_year[m].get(yr, Counter()).get("downloaded", 0)
|
||||
+ by_machine_year[m].get(yr, Counter()).get("failed", 0) > 0
|
||||
for m in by_machine_year
|
||||
)
|
||||
)
|
||||
|
||||
# Totals per year
|
||||
yr_totals: dict[str, Counter] = {}
|
||||
for yr in all_years:
|
||||
yr_totals[yr] = Counter()
|
||||
for m in machines:
|
||||
yr_totals[yr] += by_machine_year.get(m, {}).get(yr, Counter())
|
||||
|
||||
year_rows = []
|
||||
for m in machines:
|
||||
row_cells = [m]
|
||||
for yr in all_years:
|
||||
c = by_machine_year.get(m, {}).get(yr, Counter())
|
||||
d = c.get("downloaded", 0)
|
||||
f = c.get("failed", 0)
|
||||
row_cells.append(f"{d:,} / {f:,}" if (d or f) else "—")
|
||||
year_rows.append(row_cells)
|
||||
|
||||
# Totals row
|
||||
total_cells = ["**TOTAL**"]
|
||||
for yr in all_years:
|
||||
d = yr_totals[yr].get("downloaded", 0)
|
||||
f = yr_totals[yr].get("failed", 0)
|
||||
total_cells.append(f"**{d:,} / {f:,}**" if (d or f) else "—")
|
||||
year_rows.append(total_cells)
|
||||
|
||||
print(_md_table(
|
||||
["Machine"] + all_years,
|
||||
year_rows,
|
||||
align=["l"] + ["r"] * len(all_years),
|
||||
))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,178 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
# For each machine label in a text file, pick one random completed scan and download
|
||||
# it: by default the mosaic and all tiles (same as: --machine "…" --scan-id N).
|
||||
# For mosaic only (faster, no tile downloads), set: MOSAIC_ONLY=1
|
||||
#
|
||||
# Usage:
|
||||
# ./scripts/sample_random_scans.sh [PATH_TO_machines.txt]
|
||||
# Config path defaults to config.yaml in the repo root. Override with:
|
||||
# CONFIG=/path/to/config.yaml ./scripts/sample_random_scans.sh machines.txt
|
||||
# Dry-run the download step (listing still does real HTTP to fetch scan list):
|
||||
# DRY_RUN=1 ./scripts/sample_random_scans.sh machines.txt
|
||||
# Verbose / debug (extra per-step lines, scan counts from the list step):
|
||||
# DEBUG=1 ./scripts/sample_random_scans.sh machines.txt
|
||||
# By default, --list-scans fetches only the first page (one HTTP request, up to
|
||||
# 320 scans). To paginate the full archive for the random pick (slower when many
|
||||
# LIST_SCANS_ALL_PAGES=1 ./scripts/sample_random_scans.sh machines.txt
|
||||
#
|
||||
# machines.txt: one machine label per line (same as --machine and config machine names).
|
||||
# See scripts/machines.example.txt
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
REPO_ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
|
||||
CONFIG="${CONFIG:-$REPO_ROOT/config.yaml}"
|
||||
MACHINES_FILE="${1:-$REPO_ROOT/machines.txt}"
|
||||
SCRAPER=(python3 "$REPO_ROOT/scraper.py" --config "$CONFIG")
|
||||
|
||||
log() { echo "[sample_random_scans] $*" >&2; }
|
||||
log_debug() {
|
||||
if [[ -n "${DEBUG:-}" ]]; then
|
||||
echo "[sample_random_scans] debug: $*" >&2
|
||||
fi
|
||||
}
|
||||
|
||||
if [[ ! -f "$MACHINES_FILE" ]]; then
|
||||
log "error: file not found: $MACHINES_FILE"
|
||||
log "Create it with one machine label per line, or: cp scripts/machines.example.txt machines.txt"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [[ ! -f "$CONFIG" ]]; then
|
||||
log "error: config not found: $CONFIG"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Non-empty, non-comment lines (same rules as the main loop)
|
||||
TOTAL_MACHINES="$(
|
||||
grep -v '^[[:space:]]*#' "$MACHINES_FILE" | grep -c -v '^[[:space:]]*$' || true
|
||||
)"
|
||||
if [[ -z "$TOTAL_MACHINES" || "$TOTAL_MACHINES" -eq 0 ]]; then
|
||||
log "error: no machine lines in: $MACHINES_FILE"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
log "starting repo=$REPO_ROOT"
|
||||
log " config=$CONFIG"
|
||||
log " machines_file=$MACHINES_FILE (${TOTAL_MACHINES} machine(s) in file)"
|
||||
if [[ -n "${MOSAIC_ONLY:-}" ]]; then
|
||||
if [[ -n "${DRY_RUN:-}" ]]; then
|
||||
log " mode: MOSAIC_ONLY + DRY_RUN (mosaic only, --dry-run on download step)"
|
||||
else
|
||||
log " mode: MOSAIC_ONLY=1 (mosaics only, no tiles; use for a lighter sample)"
|
||||
fi
|
||||
else
|
||||
if [[ -n "${DRY_RUN:-}" ]]; then
|
||||
log " mode: DRY_RUN (list + full scan download use --dry-run; no files written)"
|
||||
else
|
||||
log " mode: full scan — mosaic + all tiles (workers from config)"
|
||||
fi
|
||||
fi
|
||||
if [[ -n "${DEBUG:-}" ]]; then
|
||||
log " DEBUG=1 (extra diagnostics enabled)"
|
||||
fi
|
||||
if [[ -n "${LIST_SCANS_ALL_PAGES:-}" ]]; then
|
||||
log " list step: list-scans = full archive (all pages, slower)"
|
||||
else
|
||||
log " list step: list-scans --list-scans-first-page-only (one page, up to 320 IDs)"
|
||||
fi
|
||||
log "────────────────────────────────────────"
|
||||
|
||||
export REPO_ROOT CONFIG
|
||||
[[ -n "${DEBUG:-}" ]] && export DEBUG
|
||||
[[ -n "${LIST_SCANS_ALL_PAGES:-}" ]] && export LIST_SCANS_ALL_PAGES
|
||||
|
||||
PROCESSED=0
|
||||
SKIPPED=0
|
||||
IDX=0
|
||||
|
||||
while IFS= read -r line || [[ -n "${line-}" ]]; do
|
||||
# trim, strip CR, skip blanks / comments
|
||||
line="${line//$'\r'/}"
|
||||
label="${line#"${line%%[![:space:]]*}"}"
|
||||
label="${label%"${label##*[![:space:]]}"}"
|
||||
[[ -z "$label" || "$label" == \#* ]] && continue
|
||||
|
||||
IDX=$((IDX + 1))
|
||||
log "[$IDX/$TOTAL_MACHINES] machine: $label"
|
||||
log " status: listing scans (--list-scans) …"
|
||||
|
||||
random_id="$(
|
||||
REPO_ROOT="$REPO_ROOT" CONFIG="$CONFIG" LABEL="$label" python3 - <<'PY'
|
||||
import os, random, subprocess, sys
|
||||
|
||||
label = os.environ["LABEL"]
|
||||
repo = os.environ["REPO_ROOT"]
|
||||
cfg = os.environ["CONFIG"]
|
||||
debug = bool(os.environ.get("DEBUG"))
|
||||
full = bool(os.environ.get("LIST_SCANS_ALL_PAGES"))
|
||||
scraper = os.path.join(repo, "scraper.py")
|
||||
if debug:
|
||||
print(
|
||||
f"[sample_random_scans] debug: running list-scans for {label!r} "
|
||||
f"({'all pages' if full else 'first page only'})",
|
||||
file=sys.stderr,
|
||||
)
|
||||
cmd = [sys.executable, scraper, "--list-scans", "--machine", label, "--config", cfg]
|
||||
if not full:
|
||||
cmd.insert(3, "--list-scans-first-page-only")
|
||||
out = subprocess.check_output(
|
||||
cmd,
|
||||
text=True,
|
||||
stderr=subprocess.STDOUT,
|
||||
)
|
||||
ids = []
|
||||
for line in out.splitlines():
|
||||
line = line.rstrip()
|
||||
if not line or line.startswith("---") or "Total" in line:
|
||||
continue
|
||||
parts = line.split()
|
||||
if parts and parts[0].isdigit():
|
||||
ids.append(parts[0])
|
||||
if not ids:
|
||||
print(f"no scans parsed for {label!r} — check login and output", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
if debug:
|
||||
print(
|
||||
f"[sample_random_scans] debug: parsed {len(ids)} scan id(s) for {label!r}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
print(random.choice(ids), end="")
|
||||
PY
|
||||
)" || {
|
||||
log " status: SKIPPED (could not get scan list or pick id)"
|
||||
SKIPPED=$((SKIPPED + 1))
|
||||
continue
|
||||
}
|
||||
|
||||
log " status: picked random scan_id=$random_id (uniform among IDs from this list step — first page by default, see start banner)"
|
||||
if [[ -n "${MOSAIC_ONLY:-}" ]]; then
|
||||
log " status: running scraper: --mosaic-only --scan-id (mosaic only) …"
|
||||
else
|
||||
log " status: running scraper: --scan-id (mosaic + tiles) …"
|
||||
fi
|
||||
if [[ -n "${DRY_RUN:-}" ]]; then
|
||||
log " status: (dry-run — no files written for this scan)"
|
||||
fi
|
||||
|
||||
if [[ -n "${MOSAIC_ONLY:-}" ]]; then
|
||||
run_cmd=("${SCRAPER[@]}" --mosaic-only --machine "$label" --scan-id "$random_id")
|
||||
else
|
||||
run_cmd=("${SCRAPER[@]}" --machine "$label" --scan-id "$random_id")
|
||||
fi
|
||||
if [[ -n "${DRY_RUN:-}" ]]; then
|
||||
run_cmd+=(--dry-run)
|
||||
fi
|
||||
if "${run_cmd[@]}"; then
|
||||
log " status: OK — finished this machine (exit 0)"
|
||||
PROCESSED=$((PROCESSED + 1))
|
||||
else
|
||||
rc=$?
|
||||
log " status: FAILED — scraper exit code $rc (stopping; fix or remove this machine and re-run)"
|
||||
exit "$rc"
|
||||
fi
|
||||
log "────────────────────────────────────────"
|
||||
done < "$MACHINES_FILE"
|
||||
|
||||
log "done. summary: $PROCESSED machine(s) with sampled scan download completed, $SKIPPED skipped, $IDX line(s) processed out of $TOTAL_MACHINES in file."
|
||||
exit 0
|
||||
@@ -0,0 +1,218 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Report metadata-scan progress and projected completion times for all machines.
|
||||
|
||||
Usage:
|
||||
python scripts/scan_progress_report.py [--archive ARCHIVE_DIR] [--recent N] [--mermaid] [--rate-chart]
|
||||
|
||||
Options:
|
||||
--archive DIR Path to archives directory (default: archives)
|
||||
--recent N Number of recent files used to compute current rate (default: 500)
|
||||
--mermaid Also print a Mermaid Gantt chart
|
||||
--rate-chart Also print a Mermaid XY chart of s/scan rate by hour
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import glob
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
# Canonical machine order and total scan counts (from README inventory, April 2026)
|
||||
MACHINES = [
|
||||
("BW1-4 [AMR-15]", 6121),
|
||||
("BW1-6 [AMR-19]", 18198),
|
||||
("BW1-7 [AMR-18]", 430),
|
||||
("BW2-8 [AMR-25]", 8191),
|
||||
("BW2-10 [AMR-22]", 16537),
|
||||
("BW2-11 [AMR-23]", 26763),
|
||||
("BW2-13 [AMR-24]", 13537),
|
||||
("BW3-16 [AMR-16]", 7325),
|
||||
("BW3-17 [AMR-20]", 471),
|
||||
("BW3-19 [AMR-21]", 15186),
|
||||
("BW3-20 [AMR-26]", 23052),
|
||||
("BW3-21 [AMR-17]", 10115),
|
||||
]
|
||||
TOTAL_SCANS = sum(t for _, t in MACHINES)
|
||||
|
||||
|
||||
def dir_name(label: str) -> str:
|
||||
return re.sub(r"[^\w\-.]", "_", label).strip("_")
|
||||
|
||||
|
||||
def get_timestamps(machine_dir: Path) -> list[float]:
|
||||
files = glob.glob(str(machine_dir / "**" / "metadata.json"), recursive=True)
|
||||
return sorted(os.path.getmtime(f) for f in files)
|
||||
|
||||
|
||||
def print_rate_chart(all_timestamps: list[float]) -> None:
|
||||
"""Print a Mermaid xychart-beta of average s/scan per hour."""
|
||||
# One avg rate per hour
|
||||
bins: dict[str, list[float]] = defaultdict(list)
|
||||
start_hour: datetime | None = None
|
||||
for i in range(1, len(all_timestamps)):
|
||||
gap = all_timestamps[i] - all_timestamps[i - 1]
|
||||
if gap < 300: # ignore inter-machine gaps
|
||||
dt = datetime.fromtimestamp(all_timestamps[i])
|
||||
hour_key = dt.strftime("%m-%d %Hh")
|
||||
bins[hour_key].append(gap)
|
||||
if start_hour is None:
|
||||
start_hour = dt.replace(minute=0, second=0, microsecond=0)
|
||||
|
||||
# Drop the last (partial) hour
|
||||
hours = sorted(bins.keys())
|
||||
if hours:
|
||||
hours = hours[:-1]
|
||||
|
||||
if not hours or start_hour is None:
|
||||
print("(not enough data for rate chart)")
|
||||
return
|
||||
|
||||
values = [f"{sum(bins[h])/len(bins[h]):.2f}" for h in hours]
|
||||
y_max = max(float(v) for v in values)
|
||||
y_ceil = int(y_max) + 3
|
||||
n = len(hours)
|
||||
|
||||
# Numeric x-axis: Mermaid auto-picks readable tick positions
|
||||
start_label = start_hour.strftime("%b %d %H:%M")
|
||||
print("```mermaid")
|
||||
print("xychart-beta")
|
||||
print(f' title "Metadata scan rate (s/scan) — hourly, starting {start_label}"')
|
||||
print(f' x-axis "Hours elapsed" 0 --> {n}')
|
||||
print(f' y-axis "s / scan" 0 --> {y_ceil}')
|
||||
print(f" line [{', '.join(values)}]")
|
||||
print("```")
|
||||
|
||||
|
||||
def fmt_dt(dt: datetime) -> str:
|
||||
return dt.strftime("%a %b %d %H:%M")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument("--archive", default="archives", help="Archives directory")
|
||||
parser.add_argument("--recent", type=int, default=500,
|
||||
help="Files used to compute recent rate (default: 500)")
|
||||
parser.add_argument("--mermaid", action="store_true", help="Print Mermaid Gantt chart")
|
||||
parser.add_argument("--rate-chart", action="store_true", help="Print Mermaid XY rate-over-time chart")
|
||||
args = parser.parse_args()
|
||||
|
||||
archive = Path(args.archive)
|
||||
if not archive.is_dir():
|
||||
sys.exit(f"Archive directory not found: {archive}")
|
||||
|
||||
now = datetime.now()
|
||||
|
||||
# --- Gather per-machine data ---
|
||||
machine_data = [] # (label, total, done, first_ts, last_ts)
|
||||
all_timestamps: list[float] = []
|
||||
|
||||
for label, total in MACHINES:
|
||||
mdir = archive / dir_name(label)
|
||||
if mdir.is_dir():
|
||||
times = get_timestamps(mdir)
|
||||
else:
|
||||
times = []
|
||||
done = len(times)
|
||||
first_ts = datetime.fromtimestamp(times[0]) if times else None
|
||||
last_ts = datetime.fromtimestamp(times[-1]) if times else None
|
||||
machine_data.append((label, total, done, first_ts, last_ts, times))
|
||||
all_timestamps.extend(times)
|
||||
|
||||
all_timestamps.sort()
|
||||
total_done = sum(d for _, _, d, *_ in machine_data)
|
||||
|
||||
# --- Rate calculation ---
|
||||
recent_times = all_timestamps[-args.recent:] if len(all_timestamps) >= 2 else all_timestamps
|
||||
if len(recent_times) >= 2:
|
||||
recent_rate = (recent_times[-1] - recent_times[0]) / len(recent_times)
|
||||
else:
|
||||
recent_rate = None
|
||||
|
||||
if len(all_timestamps) >= 2:
|
||||
overall_rate = (all_timestamps[-1] - all_timestamps[0]) / len(all_timestamps)
|
||||
else:
|
||||
overall_rate = None
|
||||
|
||||
rate = recent_rate or overall_rate or 5.0 # fallback
|
||||
|
||||
# --- Print timetable ---
|
||||
print(f"Metadata scan progress — {now.strftime('%Y-%m-%d %H:%M')}")
|
||||
print(f"Overall rate : {overall_rate:.2f} s/scan" if overall_rate else "Overall rate : n/a")
|
||||
print(f"Recent rate : {recent_rate:.2f} s/scan (last {args.recent} files)" if recent_rate else "Recent rate : n/a")
|
||||
print(f"Rate used : {rate:.2f} s/scan")
|
||||
print(f"Done : {total_done:,} / {TOTAL_SCANS:,} ({100*total_done/TOTAL_SCANS:.1f}%)")
|
||||
print()
|
||||
print(f"{'Machine':<20} {'Total':>7} {'Done':>7} {'Pct':>6} {'Completion'}")
|
||||
print("-" * 68)
|
||||
|
||||
cursor = now
|
||||
gantt_rows: list[tuple[str, datetime, datetime, str]] = [] # label, start, end, status
|
||||
|
||||
for label, total, done, first_ts, last_ts, times in machine_data:
|
||||
pct = 100 * done / total if total else 0
|
||||
|
||||
complete = done >= total or (done > 0 and done / total >= 0.999)
|
||||
|
||||
if done == 0:
|
||||
# Not started yet
|
||||
start = cursor
|
||||
finish = cursor + timedelta(seconds=total * rate)
|
||||
status = "pending"
|
||||
print(f"{label:<20} {total:>7,} {'—':>7} {'—':>5} {fmt_dt(finish)}")
|
||||
elif complete:
|
||||
# Complete — use actual timestamps
|
||||
start = first_ts
|
||||
finish = last_ts
|
||||
status = "done"
|
||||
print(f"{label:<20} {total:>7,} {done:>7,} {pct:>5.1f}% complete")
|
||||
else:
|
||||
# In progress
|
||||
remaining = total - done
|
||||
start = first_ts
|
||||
finish = cursor + timedelta(seconds=remaining * rate)
|
||||
status = "active"
|
||||
print(f"{label:<20} {total:>7,} {done:>7,} {pct:>5.1f}% {fmt_dt(finish)} ← in progress")
|
||||
|
||||
gantt_rows.append((label, start or cursor, finish, status))
|
||||
if status != "done":
|
||||
cursor = finish
|
||||
|
||||
print("-" * 68)
|
||||
print(f"{'All done':<20} {TOTAL_SCANS:>7,} {total_done:>7,} {100*total_done/TOTAL_SCANS:>5.1f}% {fmt_dt(cursor)}")
|
||||
|
||||
# --- Mermaid rate chart ---
|
||||
if args.rate_chart:
|
||||
print()
|
||||
print_rate_chart(all_timestamps)
|
||||
|
||||
# --- Mermaid Gantt ---
|
||||
if args.mermaid:
|
||||
print()
|
||||
print("```mermaid")
|
||||
print("gantt")
|
||||
print(" title Metadata scan progress — all 12 machines")
|
||||
print(" dateFormat YYYY-MM-DD HH:mm")
|
||||
print(" axisFormat %b %d")
|
||||
print()
|
||||
|
||||
section = None
|
||||
for label, start, finish, status in gantt_rows:
|
||||
prefix = label.split()[0][:3] # BW1, BW2, BW3
|
||||
if prefix != section:
|
||||
section = prefix
|
||||
print(f" section {section}")
|
||||
safe = label.replace("[", "").replace("]", "").replace(" ", "-")
|
||||
tag = f"done, " if status == "done" else ("active, " if status == "active" else "")
|
||||
s = start.strftime("%Y-%m-%d %H:%M")
|
||||
e = finish.strftime("%Y-%m-%d %H:%M")
|
||||
print(f" {label} :{tag}{safe}, {s}, {e}")
|
||||
print("```")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -84,6 +84,33 @@ def parse_args() -> argparse.Namespace:
|
||||
"inventorying all scans across all machines."
|
||||
),
|
||||
)
|
||||
p.add_argument(
|
||||
"--retry-failed",
|
||||
action="store_true",
|
||||
help=(
|
||||
"Mosaic-only: re-attempt scans whose latest scans.csv row has "
|
||||
"mosaic_download_status=failed (queue from CSV, not the server list). "
|
||||
"Implies --mosaic-only."
|
||||
),
|
||||
)
|
||||
p.add_argument(
|
||||
"--retry-since",
|
||||
metavar="YEAR",
|
||||
default=None,
|
||||
help=(
|
||||
"With --retry-failed: only scans with scan_time year >= YEAR "
|
||||
"(e.g. 2023)."
|
||||
),
|
||||
)
|
||||
p.add_argument(
|
||||
"--retry-error-code",
|
||||
metavar="CODE",
|
||||
default=None,
|
||||
help=(
|
||||
"With --retry-failed: filter by mosaic_error_code "
|
||||
"(e.g. 200 for empty-body failures)."
|
||||
),
|
||||
)
|
||||
p.add_argument(
|
||||
"--dry-run",
|
||||
action="store_true",
|
||||
@@ -159,6 +186,16 @@ def main() -> None:
|
||||
if args.scan_id is not None and args.scan_id <= 0:
|
||||
sys.exit("--scan-id must be a positive integer")
|
||||
|
||||
if args.retry_since and not args.retry_failed:
|
||||
sys.exit("--retry-since requires --retry-failed.")
|
||||
if args.retry_error_code and not args.retry_failed:
|
||||
sys.exit("--retry-error-code requires --retry-failed.")
|
||||
|
||||
if args.retry_failed:
|
||||
if args.metadata_only:
|
||||
sys.exit("--retry-failed cannot be used with --metadata-only.")
|
||||
args.mosaic_only = True # implied
|
||||
|
||||
# --list-machines doesn't need credentials
|
||||
if args.list_machines:
|
||||
base_url = "http://205.149.147.131:8010/"
|
||||
@@ -261,6 +298,12 @@ def main() -> None:
|
||||
if args.metadata_only:
|
||||
log.info("Mode: metadata only (mosaics and tiles skipped)")
|
||||
elif args.mosaic_only:
|
||||
if args.retry_failed:
|
||||
log.info(
|
||||
"Mode: mosaic retry (failed scans from %s)",
|
||||
SCANS_CSV_FILENAME,
|
||||
)
|
||||
else:
|
||||
log.info("Mode: mosaics only (individual tiles skipped)")
|
||||
if args.dry_run:
|
||||
log.info("Mode: dry-run (no files will be written)")
|
||||
@@ -285,6 +328,9 @@ def main() -> None:
|
||||
metadata_only=args.metadata_only,
|
||||
scan_id_filter=args.scan_id,
|
||||
max_tiles=args.max_tiles,
|
||||
retry_failed=args.retry_failed,
|
||||
retry_since_year=args.retry_since,
|
||||
retry_error_code=args.retry_error_code,
|
||||
)
|
||||
totals.merge(stats)
|
||||
finally:
|
||||
|
||||
@@ -2,14 +2,22 @@
|
||||
High-level scrape orchestration: drives the per-machine and per-scan loops.
|
||||
"""
|
||||
|
||||
import csv
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from tqdm import tqdm
|
||||
|
||||
from spruce.download_result import PERMANENT_MISSING, UNKNOWN, error_code_str
|
||||
from spruce.exif import write_mosaic_exif
|
||||
from spruce.paths import machine_dir_name, tile_dest, mosaic_dest, _extract_date
|
||||
from spruce.progress import ProgressTracker, CsvWriter
|
||||
from spruce.session import MachineSession
|
||||
|
||||
# RootView returns ~43-byte 1×1 JPEG placeholders for empty cells; stay well
|
||||
# below smallest observed real tile (~7 KiB in production samples).
|
||||
@@ -49,16 +57,64 @@ class RunStats:
|
||||
self.scans_probe_skipped += other.scans_probe_skipped
|
||||
self.scans_disk_space_skipped += other.scans_disk_space_skipped
|
||||
|
||||
from tqdm import tqdm
|
||||
|
||||
from spruce.exif import write_mosaic_exif
|
||||
from spruce.paths import machine_dir_name, tile_dest, mosaic_dest, _extract_date
|
||||
from spruce.progress import ProgressTracker, CsvWriter
|
||||
from spruce.session import MachineSession
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _read_scans_csv_latest(scans_csv_path: Path) -> dict[tuple[str, str], dict[str, str]]:
|
||||
"""Last row wins per (machine, scan_id)."""
|
||||
latest: dict[tuple[str, str], dict[str, str]] = {}
|
||||
if not scans_csv_path.exists():
|
||||
return latest
|
||||
with open(scans_csv_path, newline="", encoding="utf-8") as fh:
|
||||
for row in csv.DictReader(fh):
|
||||
key = (row.get("machine", ""), row.get("scan_id", ""))
|
||||
latest[key] = row
|
||||
return latest
|
||||
|
||||
|
||||
def load_failed_scans_from_csv(
|
||||
scans_csv_path: Path,
|
||||
machine_label: str,
|
||||
*,
|
||||
since_year: str | None = None,
|
||||
error_code: str | None = None,
|
||||
) -> list[dict[str, Any]]:
|
||||
"""
|
||||
Dedupe scans.csv by (machine, scan_id); return failed mosaic rows for one machine.
|
||||
|
||||
Each dict is suitable as the ``scan`` argument to ``process_scan`` (scan_id,
|
||||
scan_time, name, status).
|
||||
"""
|
||||
latest = _read_scans_csv_latest(scans_csv_path)
|
||||
out: list[dict[str, Any]] = []
|
||||
for (_m, _sid), row in latest.items():
|
||||
if row.get("machine") != machine_label:
|
||||
continue
|
||||
if row.get("mosaic_download_status") != "failed":
|
||||
continue
|
||||
if error_code is not None and row.get("mosaic_error_code", "") != error_code:
|
||||
continue
|
||||
st = row.get("scan_time", "") or ""
|
||||
if since_year is not None:
|
||||
yr = st[:4]
|
||||
if len(yr) < 4 or yr < since_year:
|
||||
continue
|
||||
sid = int(row["scan_id"])
|
||||
out.append(
|
||||
{
|
||||
"scan_id": sid,
|
||||
"scan_time": st,
|
||||
"name": row.get("name", ""),
|
||||
"status": row.get("status", "") or "Completed",
|
||||
"user": row.get("user", ""),
|
||||
"scan_lines": row.get("scan_lines", ""),
|
||||
"scan_mode": row.get("scan_mode", ""),
|
||||
}
|
||||
)
|
||||
out.sort(key=lambda s: s["scan_id"])
|
||||
return out
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Per-scan helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -241,6 +297,7 @@ def process_scan(
|
||||
mosaic_only: bool,
|
||||
metadata_only: bool = False,
|
||||
max_tiles: int | None = None,
|
||||
scans_csv_existing_ids: set[int] | None = None,
|
||||
) -> RunStats:
|
||||
"""
|
||||
Process one scan: fetch metadata, download mosaic and (optionally) tiles.
|
||||
@@ -379,9 +436,16 @@ def process_scan(
|
||||
mds, mer, mco, mcl = "", "", "", ""
|
||||
|
||||
# Write scan-level CSV row only if this scan hasn't been recorded before.
|
||||
if mosaic_already_done and not metadata_only:
|
||||
# Skip if: (1) mosaic URL already in .progress.json, or (2) scan already
|
||||
# has a non-pending row in scans.csv from a prior run.
|
||||
already_recorded = (mosaic_already_done and not metadata_only) or (
|
||||
not metadata_only
|
||||
and scans_csv_existing_ids is not None
|
||||
and scan_id in scans_csv_existing_ids
|
||||
)
|
||||
if already_recorded:
|
||||
log.debug(
|
||||
"[%s] Scan %d: already in scans.csv (mosaic was previously downloaded), skipping CSV row.",
|
||||
"[%s] Scan %d: already in scans.csv, skipping CSV row.",
|
||||
machine["label"],
|
||||
scan_id,
|
||||
)
|
||||
@@ -491,14 +555,59 @@ def scrape_machine(
|
||||
metadata_only: bool = False,
|
||||
scan_id_filter: int | None = None,
|
||||
max_tiles: int | None = None,
|
||||
retry_failed: bool = False,
|
||||
retry_since_year: str | None = None,
|
||||
retry_error_code: str | None = None,
|
||||
) -> RunStats:
|
||||
"""Login, fetch scans, and download all content for one machine."""
|
||||
sess = MachineSession(machine, config)
|
||||
if not sess.login():
|
||||
login_ok = False
|
||||
for attempt in range(1, 4):
|
||||
if sess.login():
|
||||
login_ok = True
|
||||
break
|
||||
if attempt < 3:
|
||||
log.warning(
|
||||
"[%s] Login failed (attempt %d/3) — retrying in 10s.",
|
||||
machine["label"],
|
||||
attempt,
|
||||
)
|
||||
time.sleep(10)
|
||||
if not login_ok:
|
||||
log.error("[%s] Login failed after 3 attempts — skipping machine.", machine["label"])
|
||||
return RunStats()
|
||||
|
||||
if retry_failed:
|
||||
scans = load_failed_scans_from_csv(
|
||||
scans_csv.path,
|
||||
machine["label"],
|
||||
since_year=retry_since_year,
|
||||
error_code=retry_error_code,
|
||||
)
|
||||
if scan_id_filter is not None:
|
||||
scans: list[dict[str, Any]] = [
|
||||
scans = [s for s in scans if s["scan_id"] == scan_id_filter]
|
||||
if not scans:
|
||||
log.warning(
|
||||
"[%s] Retry: scan_id %d not among failed rows for this machine.",
|
||||
machine["label"],
|
||||
scan_id_filter,
|
||||
)
|
||||
return RunStats()
|
||||
log.info("[%s] Mosaic retry: single scan %d.", machine["label"], scan_id_filter)
|
||||
elif not scans:
|
||||
log.warning(
|
||||
"[%s] No failed mosaic rows in scans.csv match retry filters.",
|
||||
machine["label"],
|
||||
)
|
||||
return RunStats()
|
||||
else:
|
||||
log.info(
|
||||
"[%s] Mosaic retry: %d failed scan(s) from scans.csv.",
|
||||
machine["label"],
|
||||
len(scans),
|
||||
)
|
||||
elif scan_id_filter is not None:
|
||||
scans = [
|
||||
{"scan_id": scan_id_filter, "status": "Completed"}
|
||||
]
|
||||
log.info("[%s] Targeting scan ID %d.", machine["label"], scan_id_filter)
|
||||
@@ -508,8 +617,37 @@ def scrape_machine(
|
||||
log.warning("[%s] No scans found.", machine["label"])
|
||||
return RunStats()
|
||||
|
||||
# Build existing_ids: scan_ids to skip entirely (no metadata fetch, no HTTP).
|
||||
# In normal mode: skip anything with a definitive non-pending status.
|
||||
# In retry mode: only skip scans that are already downloaded or skipped for
|
||||
# disk-space reasons — failed scans must be re-attempted.
|
||||
PENDING_STATUSES = {"skipped_metadata_only", ""}
|
||||
BLOCK_AFTER_RETRY_STATUSES = {"downloaded", "skipped_zero_disk_space"}
|
||||
existing_ids: set[int] = set()
|
||||
if not metadata_only:
|
||||
latest_rows = _read_scans_csv_latest(scans_csv.path)
|
||||
for (_mlabel, _sid), _row in latest_rows.items():
|
||||
if _mlabel != machine["label"]:
|
||||
continue
|
||||
st = _row.get("mosaic_download_status", "")
|
||||
if retry_failed:
|
||||
if st in BLOCK_AFTER_RETRY_STATUSES:
|
||||
existing_ids.add(int(_row["scan_id"]))
|
||||
else:
|
||||
if st not in PENDING_STATUSES:
|
||||
existing_ids.add(int(_row["scan_id"]))
|
||||
|
||||
stats = RunStats()
|
||||
for scan in scans:
|
||||
# Skip scans already fully processed in a prior run — avoids redundant
|
||||
# metadata fetches and mosaic requests for known-failed / known-done scans.
|
||||
if not metadata_only and scan["scan_id"] in existing_ids:
|
||||
log.debug(
|
||||
"[%s] Scan %d: already processed, skipping.",
|
||||
machine["label"],
|
||||
scan["scan_id"],
|
||||
)
|
||||
continue
|
||||
stats.merge(process_scan(
|
||||
sess=sess,
|
||||
scan=scan,
|
||||
@@ -523,5 +661,6 @@ def scrape_machine(
|
||||
mosaic_only=mosaic_only,
|
||||
metadata_only=metadata_only,
|
||||
max_tiles=max_tiles,
|
||||
scans_csv_existing_ids=existing_ids,
|
||||
))
|
||||
return stats
|
||||
|
||||
@@ -5,6 +5,7 @@ Progress tracking (JSON) and CSV writing.
|
||||
import csv
|
||||
import json
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Iterator
|
||||
|
||||
@@ -27,6 +28,7 @@ class ProgressTracker:
|
||||
def __init__(self, path: Path) -> None:
|
||||
self.path = path
|
||||
self._done: set[str] = set()
|
||||
self.started_at: str = datetime.now(timezone.utc).isoformat()
|
||||
self._load()
|
||||
|
||||
def _load(self) -> None:
|
||||
@@ -34,6 +36,8 @@ class ProgressTracker:
|
||||
try:
|
||||
data = json.loads(self.path.read_text())
|
||||
self._done = set(data.get("completed_urls", []))
|
||||
if "started_at" in data:
|
||||
self.started_at = data["started_at"]
|
||||
log.info("Resuming: %d URLs already downloaded.", len(self._done))
|
||||
except Exception:
|
||||
log.warning("Could not read progress file; starting fresh.")
|
||||
@@ -59,7 +63,10 @@ class ProgressTracker:
|
||||
self.path.parent.mkdir(parents=True, exist_ok=True)
|
||||
tmp = self.path.with_suffix(".json.tmp")
|
||||
tmp.write_text(
|
||||
json.dumps({"completed_urls": sorted(self._done)}, indent=2)
|
||||
json.dumps(
|
||||
{"started_at": self.started_at, "completed_urls": sorted(self._done)},
|
||||
indent=2,
|
||||
)
|
||||
)
|
||||
tmp.replace(self.path) # atomic on POSIX; avoids corrupt JSON on crash
|
||||
|
||||
@@ -70,6 +77,7 @@ class CsvWriter:
|
||||
def __init__(self, path: Path, fields: list[str]) -> None:
|
||||
is_new = not path.exists()
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
self.path = path
|
||||
self._fh = open(path, "a", newline="", encoding="utf-8")
|
||||
self._writer = csv.DictWriter(self._fh, fieldnames=fields)
|
||||
if is_new:
|
||||
|
||||
@@ -14,6 +14,7 @@ from bs4 import BeautifulSoup
|
||||
|
||||
from spruce.download_result import (
|
||||
OK,
|
||||
PERMANENT_MISSING,
|
||||
UNKNOWN,
|
||||
DownloadResult,
|
||||
classify_http_error,
|
||||
@@ -263,6 +264,10 @@ class MachineSession:
|
||||
and exc.response is not None
|
||||
):
|
||||
sc = exc.response.status_code
|
||||
cl = classify_http_error(sc, exc)
|
||||
if cl == PERMANENT_MISSING:
|
||||
# 404/410 will never succeed — don't waste time retrying.
|
||||
return DownloadResult(0, sc, str(exc), cl)
|
||||
if attempt < retries:
|
||||
log.warning(
|
||||
"Attempt %d/%d failed %s: %s — retrying in %.0fs",
|
||||
@@ -281,7 +286,6 @@ class MachineSession:
|
||||
url,
|
||||
exc,
|
||||
)
|
||||
cl = classify_http_error(sc, exc)
|
||||
return DownloadResult(0, sc, str(exc), cl)
|
||||
return DownloadResult(0, None, "download_file: exhausted", UNKNOWN)
|
||||
|
||||
|
||||
@@ -0,0 +1,133 @@
|
||||
"""Retry queue loading from scans.csv (mosaic_download_status=failed)."""
|
||||
|
||||
import csv
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from spruce.orchestrator import load_failed_scans_from_csv
|
||||
from spruce.settings import SCANS_CSV_FIELDS
|
||||
|
||||
|
||||
def _blank_row(**kwargs: str) -> dict[str, str]:
|
||||
row = {k: "" for k in SCANS_CSV_FIELDS}
|
||||
row.update(kwargs)
|
||||
return row
|
||||
|
||||
|
||||
def _write_scans_csv(path: Path, rows: list[dict[str, str]]) -> None:
|
||||
with open(path, "w", newline="", encoding="utf-8") as fh:
|
||||
w = csv.DictWriter(fh, fieldnames=SCANS_CSV_FIELDS)
|
||||
w.writeheader()
|
||||
for r in rows:
|
||||
w.writerow({k: r.get(k, "") for k in SCANS_CSV_FIELDS})
|
||||
|
||||
|
||||
def test_load_failed_scans_dedup_keeps_last_row(tmp_path: Path) -> None:
|
||||
path = tmp_path / "scans.csv"
|
||||
common = {
|
||||
"machine": "BW1 [X]",
|
||||
"machine_id": "1",
|
||||
"scan_id": "100",
|
||||
"mosaic_url": "http://x/m.jpg",
|
||||
"mosaic_local_path": "",
|
||||
"mosaic_on_disk": "False",
|
||||
}
|
||||
_write_scans_csv(
|
||||
path,
|
||||
[
|
||||
_blank_row(
|
||||
**common,
|
||||
mosaic_download_status="failed",
|
||||
mosaic_error_code="404",
|
||||
scan_time="2020-01-01",
|
||||
),
|
||||
_blank_row(
|
||||
**common,
|
||||
mosaic_download_status="failed",
|
||||
mosaic_error_code="404",
|
||||
scan_time="2020-06-01",
|
||||
),
|
||||
],
|
||||
)
|
||||
out = load_failed_scans_from_csv(path, "BW1 [X]")
|
||||
assert len(out) == 1
|
||||
assert out[0]["scan_id"] == 100
|
||||
assert out[0]["scan_time"] == "2020-06-01"
|
||||
|
||||
|
||||
def test_load_failed_scans_since_year(tmp_path: Path) -> None:
|
||||
path = tmp_path / "scans.csv"
|
||||
base = {
|
||||
"machine": "M",
|
||||
"machine_id": "1",
|
||||
"mosaic_url": "",
|
||||
"mosaic_local_path": "",
|
||||
"mosaic_on_disk": "",
|
||||
"mosaic_download_status": "failed",
|
||||
"mosaic_error_code": "404",
|
||||
}
|
||||
_write_scans_csv(
|
||||
path,
|
||||
[
|
||||
_blank_row(**base, scan_id="1", scan_time="2022-12-31"),
|
||||
_blank_row(**base, scan_id="2", scan_time="2023-01-01"),
|
||||
_blank_row(**base, scan_id="3", scan_time=""),
|
||||
],
|
||||
)
|
||||
out = load_failed_scans_from_csv(path, "M", since_year="2023")
|
||||
ids = {s["scan_id"] for s in out}
|
||||
assert ids == {2}
|
||||
|
||||
|
||||
def test_load_failed_scans_error_code(tmp_path: Path) -> None:
|
||||
path = tmp_path / "scans.csv"
|
||||
base = {
|
||||
"machine": "M",
|
||||
"machine_id": "1",
|
||||
"scan_time": "2024-01-01",
|
||||
"mosaic_url": "",
|
||||
"mosaic_local_path": "",
|
||||
"mosaic_on_disk": "",
|
||||
"mosaic_download_status": "failed",
|
||||
}
|
||||
_write_scans_csv(
|
||||
path,
|
||||
[
|
||||
_blank_row(**base, scan_id="10", mosaic_error_code="404"),
|
||||
_blank_row(**base, scan_id="11", mosaic_error_code="200"),
|
||||
],
|
||||
)
|
||||
out = load_failed_scans_from_csv(path, "M", error_code="200")
|
||||
assert [s["scan_id"] for s in out] == [11]
|
||||
|
||||
|
||||
def test_load_failed_scans_excludes_downloaded(tmp_path: Path) -> None:
|
||||
path = tmp_path / "scans.csv"
|
||||
base = {
|
||||
"machine": "M",
|
||||
"machine_id": "1",
|
||||
"scan_time": "2024-01-01",
|
||||
"mosaic_url": "",
|
||||
"mosaic_local_path": "",
|
||||
"mosaic_on_disk": "True",
|
||||
}
|
||||
_write_scans_csv(
|
||||
path,
|
||||
[
|
||||
_blank_row(
|
||||
**base,
|
||||
scan_id="5",
|
||||
mosaic_download_status="downloaded",
|
||||
mosaic_error_code="",
|
||||
),
|
||||
_blank_row(
|
||||
**base,
|
||||
scan_id="6",
|
||||
mosaic_download_status="failed",
|
||||
mosaic_error_code="404",
|
||||
),
|
||||
],
|
||||
)
|
||||
out = load_failed_scans_from_csv(path, "M")
|
||||
assert [s["scan_id"] for s in out] == [6]
|
||||