diff --git a/scripts/mosaic_progress_report.py b/scripts/mosaic_progress_report.py index 4730308..d69651d 100644 --- a/scripts/mosaic_progress_report.py +++ b/scripts/mosaic_progress_report.py @@ -2,11 +2,13 @@ """ Report mosaic download progress from archives/scans.csv. -Output is formatted as Markdown. Add --by-year for a per-machine × -per-year breakdown table. +Output is Markdown. Use ``--by-year`` for a per-machine × per-year +done/failed table. When the first mosaic pass is complete (no pending rows) +but failures remain, a **Mosaic retry estimates** section is printed with +queue counts and duration hints. -Rate/ETA require two calls at least 60 s apart. Mean mosaic size is -sampled from up to 100 already-downloaded files and cached for 1 hour. +Rate/ETA use a 30-minute rolling window when snapshots show progress. +Mean mosaic size is sampled from up to 100 downloads (1-hour cache). Usage: python scripts/mosaic_progress_report.py [--archive DIR] [--by-year] @@ -30,6 +32,11 @@ _R_PRE19 = 1.00 _R_PURGED = 0.00 _R_RECENT = 0.82 +FIRST_PASS_FALLBACK_RATE_PER_HR = 1100.0 +RETRY_OPTIMISTIC_RATE_PER_HR = 1800.0 +RETRY_REALISTIC_RATE_PER_HR = 1100.0 +RETRY_PESSIMISTIC_RATE_PER_HR = 300.0 + # --------------------------------------------------------------------------- # Helpers @@ -127,6 +134,12 @@ def _expected_remaining(pending_rows: list[dict]) -> float: return count +def _retry_hours_from_rate(n_scans: int, rate_per_hr: float) -> str: + if n_scans <= 0 or rate_per_hr <= 0: + return "—" + return _fmt_duration(n_scans / rate_per_hr * 3600.0) + + # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- @@ -213,15 +226,32 @@ def main() -> None: rate_per_sec: float | None = None rate_window_str = "" + snap_delta_proc = 0 if recent: oldest = recent[0] dt = now.timestamp() - oldest["ts"] dp = processed - oldest["proc"] + snap_delta_proc = dp if dt >= 60 and dp > 0: rate_per_sec = dp / dt window_min = dt / 60 rate_window_str = f"{window_min:.0f}-min avg" + # One-time baseline after initial mosaic crawl finished (no pending rows). + if pending == 0 and "first_pass_mean_rate_per_hr" not in cache: + cache["first_pass_completed_at"] = now.isoformat() + cache["first_pass_processed"] = total + cache["first_pass_mean_rate_per_hr"] = FIRST_PASS_FALLBACK_RATE_PER_HR + + first_pass_rate_hr = float( + cache.get("first_pass_mean_rate_per_hr", FIRST_PASS_FALLBACK_RATE_PER_HR) + ) + live_rate_hr = rate_per_sec * 3600 if rate_per_sec else None + # Active scrape shows progress in snapshots; idle archive shows dp == 0. + retry_estimate_rate_hr = ( + live_rate_hr if live_rate_hr is not None else first_pass_rate_hr + ) + # --- Disk space --- mean_bytes: float | None = None size_note = "" @@ -325,6 +355,76 @@ def main() -> None: align=["l", "r", "r", "r", "r", "r"], )) + # ----------------------------------------------------------------------- + # Retry estimates (first pass complete: pending == 0, failures remain) + # ----------------------------------------------------------------------- + if failed > 0 and pending == 0: + failed_rows_list = [ + r for r in latest.values() + if r.get("mosaic_download_status") == "failed" + ] + n_all = len(failed_rows_list) + n_2023 = sum( + 1 for r in failed_rows_list + if (r.get("scan_time") or "")[:4] >= "2023" + and len((r.get("scan_time") or "")[:4]) == 4 + ) + n_200 = sum( + 1 for r in failed_rows_list + if r.get("mosaic_error_code") == "200" + ) + rate_note = ( + "rolling 30-min window" + if snap_delta_proc > 0 + else f"first-pass baseline ({first_pass_rate_hr:,.0f}/hr)" + ) + print() + print("### Mosaic retry estimates\n") + print( + f"*Suggested command after server fix:* " + f"`python scraper.py --retry-failed --workers 2` " + f"(filters: `--retry-since YEAR`, `--retry-error-code CODE`)*\n" + ) + print( + f"*ETA column uses **{retry_estimate_rate_hr:,.0f} scans/hr** " + f"({rate_note}). Fixed columns use scenario rates.*\n" + ) + est_hdr = ( + "Retry scope", + "Count", + f"@{RETRY_OPTIMISTIC_RATE_PER_HR:.0f}/hr", + f"@{RETRY_REALISTIC_RATE_PER_HR:.0f}/hr", + f"@{RETRY_PESSIMISTIC_RATE_PER_HR:.0f}/hr", + f"@{retry_estimate_rate_hr:.0f}/hr", + ) + retry_tbl_rows = [ + [ + "HTTP 200 (empty body)", + f"{n_200:,}", + _retry_hours_from_rate(n_200, RETRY_OPTIMISTIC_RATE_PER_HR), + _retry_hours_from_rate(n_200, RETRY_REALISTIC_RATE_PER_HR), + _retry_hours_from_rate(n_200, RETRY_PESSIMISTIC_RATE_PER_HR), + _retry_hours_from_rate(n_200, retry_estimate_rate_hr), + ], + [ + "Failed, scan_time ≥ 2023", + f"{n_2023:,}", + _retry_hours_from_rate(n_2023, RETRY_OPTIMISTIC_RATE_PER_HR), + _retry_hours_from_rate(n_2023, RETRY_REALISTIC_RATE_PER_HR), + _retry_hours_from_rate(n_2023, RETRY_PESSIMISTIC_RATE_PER_HR), + _retry_hours_from_rate(n_2023, retry_estimate_rate_hr), + ], + [ + "**All failed**", + f"**{n_all:,}**", + _retry_hours_from_rate(n_all, RETRY_OPTIMISTIC_RATE_PER_HR), + _retry_hours_from_rate(n_all, RETRY_REALISTIC_RATE_PER_HR), + _retry_hours_from_rate(n_all, RETRY_PESSIMISTIC_RATE_PER_HR), + _retry_hours_from_rate(n_all, retry_estimate_rate_hr), + ], + ] + print(_md_table(list(est_hdr), retry_tbl_rows, align=["l", "r", "r", "r", "r", "r"])) + # ----------------------------------------------------------------------- # --by-year table # ----------------------------------------------------------------------- diff --git a/spruce/cli.py b/spruce/cli.py index c5151b9..5b87e92 100644 --- a/spruce/cli.py +++ b/spruce/cli.py @@ -84,6 +84,33 @@ def parse_args() -> argparse.Namespace: "inventorying all scans across all machines." ), ) + p.add_argument( + "--retry-failed", + action="store_true", + help=( + "Mosaic-only: re-attempt scans whose latest scans.csv row has " + "mosaic_download_status=failed (queue from CSV, not the server list). " + "Implies --mosaic-only." + ), + ) + p.add_argument( + "--retry-since", + metavar="YEAR", + default=None, + help=( + "With --retry-failed: only scans with scan_time year >= YEAR " + "(e.g. 2023)." + ), + ) + p.add_argument( + "--retry-error-code", + metavar="CODE", + default=None, + help=( + "With --retry-failed: filter by mosaic_error_code " + "(e.g. 200 for empty-body failures)." + ), + ) p.add_argument( "--dry-run", action="store_true", @@ -159,6 +186,16 @@ def main() -> None: if args.scan_id is not None and args.scan_id <= 0: sys.exit("--scan-id must be a positive integer") + if args.retry_since and not args.retry_failed: + sys.exit("--retry-since requires --retry-failed.") + if args.retry_error_code and not args.retry_failed: + sys.exit("--retry-error-code requires --retry-failed.") + + if args.retry_failed: + if args.metadata_only: + sys.exit("--retry-failed cannot be used with --metadata-only.") + args.mosaic_only = True # implied + # --list-machines doesn't need credentials if args.list_machines: base_url = "http://205.149.147.131:8010/" @@ -261,7 +298,13 @@ def main() -> None: if args.metadata_only: log.info("Mode: metadata only (mosaics and tiles skipped)") elif args.mosaic_only: - log.info("Mode: mosaics only (individual tiles skipped)") + if args.retry_failed: + log.info( + "Mode: mosaic retry (failed scans from %s)", + SCANS_CSV_FILENAME, + ) + else: + log.info("Mode: mosaics only (individual tiles skipped)") if args.dry_run: log.info("Mode: dry-run (no files will be written)") @@ -285,6 +328,9 @@ def main() -> None: metadata_only=args.metadata_only, scan_id_filter=args.scan_id, max_tiles=args.max_tiles, + retry_failed=args.retry_failed, + retry_since_year=args.retry_since, + retry_error_code=args.retry_error_code, ) totals.merge(stats) finally: diff --git a/spruce/orchestrator.py b/spruce/orchestrator.py index 067f533..34e2c58 100644 --- a/spruce/orchestrator.py +++ b/spruce/orchestrator.py @@ -2,14 +2,22 @@ High-level scrape orchestration: drives the per-machine and per-scan loops. """ +import csv import json import logging +import time from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass from pathlib import Path from typing import Any +from tqdm import tqdm + from spruce.download_result import PERMANENT_MISSING, UNKNOWN, error_code_str +from spruce.exif import write_mosaic_exif +from spruce.paths import machine_dir_name, tile_dest, mosaic_dest, _extract_date +from spruce.progress import ProgressTracker, CsvWriter +from spruce.session import MachineSession # RootView returns ~43-byte 1×1 JPEG placeholders for empty cells; stay well # below smallest observed real tile (~7 KiB in production samples). @@ -49,16 +57,64 @@ class RunStats: self.scans_probe_skipped += other.scans_probe_skipped self.scans_disk_space_skipped += other.scans_disk_space_skipped -from tqdm import tqdm - -from spruce.exif import write_mosaic_exif -from spruce.paths import machine_dir_name, tile_dest, mosaic_dest, _extract_date -from spruce.progress import ProgressTracker, CsvWriter -from spruce.session import MachineSession - log = logging.getLogger(__name__) +def _read_scans_csv_latest(scans_csv_path: Path) -> dict[tuple[str, str], dict[str, str]]: + """Last row wins per (machine, scan_id).""" + latest: dict[tuple[str, str], dict[str, str]] = {} + if not scans_csv_path.exists(): + return latest + with open(scans_csv_path, newline="", encoding="utf-8") as fh: + for row in csv.DictReader(fh): + key = (row.get("machine", ""), row.get("scan_id", "")) + latest[key] = row + return latest + + +def load_failed_scans_from_csv( + scans_csv_path: Path, + machine_label: str, + *, + since_year: str | None = None, + error_code: str | None = None, +) -> list[dict[str, Any]]: + """ + Dedupe scans.csv by (machine, scan_id); return failed mosaic rows for one machine. + + Each dict is suitable as the ``scan`` argument to ``process_scan`` (scan_id, + scan_time, name, status). + """ + latest = _read_scans_csv_latest(scans_csv_path) + out: list[dict[str, Any]] = [] + for (_m, _sid), row in latest.items(): + if row.get("machine") != machine_label: + continue + if row.get("mosaic_download_status") != "failed": + continue + if error_code is not None and row.get("mosaic_error_code", "") != error_code: + continue + st = row.get("scan_time", "") or "" + if since_year is not None: + yr = st[:4] + if len(yr) < 4 or yr < since_year: + continue + sid = int(row["scan_id"]) + out.append( + { + "scan_id": sid, + "scan_time": st, + "name": row.get("name", ""), + "status": row.get("status", "") or "Completed", + "user": row.get("user", ""), + "scan_lines": row.get("scan_lines", ""), + "scan_mode": row.get("scan_mode", ""), + } + ) + out.sort(key=lambda s: s["scan_id"]) + return out + + # --------------------------------------------------------------------------- # Per-scan helpers # --------------------------------------------------------------------------- @@ -499,6 +555,9 @@ def scrape_machine( metadata_only: bool = False, scan_id_filter: int | None = None, max_tiles: int | None = None, + retry_failed: bool = False, + retry_since_year: str | None = None, + retry_error_code: str | None = None, ) -> RunStats: """Login, fetch scans, and download all content for one machine.""" sess = MachineSession(machine, config) @@ -518,8 +577,37 @@ def scrape_machine( log.error("[%s] Login failed after 3 attempts — skipping machine.", machine["label"]) return RunStats() - if scan_id_filter is not None: - scans: list[dict[str, Any]] = [ + if retry_failed: + scans = load_failed_scans_from_csv( + scans_csv.path, + machine["label"], + since_year=retry_since_year, + error_code=retry_error_code, + ) + if scan_id_filter is not None: + scans = [s for s in scans if s["scan_id"] == scan_id_filter] + if not scans: + log.warning( + "[%s] Retry: scan_id %d not among failed rows for this machine.", + machine["label"], + scan_id_filter, + ) + return RunStats() + log.info("[%s] Mosaic retry: single scan %d.", machine["label"], scan_id_filter) + elif not scans: + log.warning( + "[%s] No failed mosaic rows in scans.csv match retry filters.", + machine["label"], + ) + return RunStats() + else: + log.info( + "[%s] Mosaic retry: %d failed scan(s) from scans.csv.", + machine["label"], + len(scans), + ) + elif scan_id_filter is not None: + scans = [ {"scan_id": scan_id_filter, "status": "Completed"} ] log.info("[%s] Targeting scan ID %d.", machine["label"], scan_id_filter) @@ -529,21 +617,25 @@ def scrape_machine( log.warning("[%s] No scans found.", machine["label"]) return RunStats() - # Build a set of scan_ids already fully processed in a prior run so we can - # skip them entirely (no metadata fetch, no mosaic request). - # Only scans with a definitive non-pending status count; skipped_metadata_only - # rows still need to be processed in mosaic mode. + # Build existing_ids: scan_ids to skip entirely (no metadata fetch, no HTTP). + # In normal mode: skip anything with a definitive non-pending status. + # In retry mode: only skip scans that are already downloaded or skipped for + # disk-space reasons — failed scans must be re-attempted. PENDING_STATUSES = {"skipped_metadata_only", ""} + BLOCK_AFTER_RETRY_STATUSES = {"downloaded", "skipped_zero_disk_space"} existing_ids: set[int] = set() - if not metadata_only and scans_csv._fh.name: - existing_path = Path(scans_csv._fh.name) - if existing_path.exists(): - import csv as _csv - with open(existing_path, newline="", encoding="utf-8") as _f: - for _row in _csv.DictReader(_f): - if _row.get("machine") == machine["label"]: - if _row.get("mosaic_download_status", "") not in PENDING_STATUSES: - existing_ids.add(int(_row["scan_id"])) + if not metadata_only: + latest_rows = _read_scans_csv_latest(scans_csv.path) + for (_mlabel, _sid), _row in latest_rows.items(): + if _mlabel != machine["label"]: + continue + st = _row.get("mosaic_download_status", "") + if retry_failed: + if st in BLOCK_AFTER_RETRY_STATUSES: + existing_ids.add(int(_row["scan_id"])) + else: + if st not in PENDING_STATUSES: + existing_ids.add(int(_row["scan_id"])) stats = RunStats() for scan in scans: diff --git a/spruce/progress.py b/spruce/progress.py index 997b884..48f2149 100644 --- a/spruce/progress.py +++ b/spruce/progress.py @@ -77,6 +77,7 @@ class CsvWriter: def __init__(self, path: Path, fields: list[str]) -> None: is_new = not path.exists() path.parent.mkdir(parents=True, exist_ok=True) + self.path = path self._fh = open(path, "a", newline="", encoding="utf-8") self._writer = csv.DictWriter(self._fh, fieldnames=fields) if is_new: diff --git a/tests/test_retry_failed.py b/tests/test_retry_failed.py new file mode 100644 index 0000000..8a529a0 --- /dev/null +++ b/tests/test_retry_failed.py @@ -0,0 +1,133 @@ +"""Retry queue loading from scans.csv (mosaic_download_status=failed).""" + +import csv +from pathlib import Path + +import pytest + +from spruce.orchestrator import load_failed_scans_from_csv +from spruce.settings import SCANS_CSV_FIELDS + + +def _blank_row(**kwargs: str) -> dict[str, str]: + row = {k: "" for k in SCANS_CSV_FIELDS} + row.update(kwargs) + return row + + +def _write_scans_csv(path: Path, rows: list[dict[str, str]]) -> None: + with open(path, "w", newline="", encoding="utf-8") as fh: + w = csv.DictWriter(fh, fieldnames=SCANS_CSV_FIELDS) + w.writeheader() + for r in rows: + w.writerow({k: r.get(k, "") for k in SCANS_CSV_FIELDS}) + + +def test_load_failed_scans_dedup_keeps_last_row(tmp_path: Path) -> None: + path = tmp_path / "scans.csv" + common = { + "machine": "BW1 [X]", + "machine_id": "1", + "scan_id": "100", + "mosaic_url": "http://x/m.jpg", + "mosaic_local_path": "", + "mosaic_on_disk": "False", + } + _write_scans_csv( + path, + [ + _blank_row( + **common, + mosaic_download_status="failed", + mosaic_error_code="404", + scan_time="2020-01-01", + ), + _blank_row( + **common, + mosaic_download_status="failed", + mosaic_error_code="404", + scan_time="2020-06-01", + ), + ], + ) + out = load_failed_scans_from_csv(path, "BW1 [X]") + assert len(out) == 1 + assert out[0]["scan_id"] == 100 + assert out[0]["scan_time"] == "2020-06-01" + + +def test_load_failed_scans_since_year(tmp_path: Path) -> None: + path = tmp_path / "scans.csv" + base = { + "machine": "M", + "machine_id": "1", + "mosaic_url": "", + "mosaic_local_path": "", + "mosaic_on_disk": "", + "mosaic_download_status": "failed", + "mosaic_error_code": "404", + } + _write_scans_csv( + path, + [ + _blank_row(**base, scan_id="1", scan_time="2022-12-31"), + _blank_row(**base, scan_id="2", scan_time="2023-01-01"), + _blank_row(**base, scan_id="3", scan_time=""), + ], + ) + out = load_failed_scans_from_csv(path, "M", since_year="2023") + ids = {s["scan_id"] for s in out} + assert ids == {2} + + +def test_load_failed_scans_error_code(tmp_path: Path) -> None: + path = tmp_path / "scans.csv" + base = { + "machine": "M", + "machine_id": "1", + "scan_time": "2024-01-01", + "mosaic_url": "", + "mosaic_local_path": "", + "mosaic_on_disk": "", + "mosaic_download_status": "failed", + } + _write_scans_csv( + path, + [ + _blank_row(**base, scan_id="10", mosaic_error_code="404"), + _blank_row(**base, scan_id="11", mosaic_error_code="200"), + ], + ) + out = load_failed_scans_from_csv(path, "M", error_code="200") + assert [s["scan_id"] for s in out] == [11] + + +def test_load_failed_scans_excludes_downloaded(tmp_path: Path) -> None: + path = tmp_path / "scans.csv" + base = { + "machine": "M", + "machine_id": "1", + "scan_time": "2024-01-01", + "mosaic_url": "", + "mosaic_local_path": "", + "mosaic_on_disk": "True", + } + _write_scans_csv( + path, + [ + _blank_row( + **base, + scan_id="5", + mosaic_download_status="downloaded", + mosaic_error_code="", + ), + _blank_row( + **base, + scan_id="6", + mosaic_download_status="failed", + mosaic_error_code="404", + ), + ], + ) + out = load_failed_scans_from_csv(path, "M") + assert [s["scan_id"] for s in out] == [6]