Add mosaic scrape resilience and progress reporting

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
2026-05-12 21:44:06 -04:00
parent 752c278dff
commit 0cd7243c8d
+269
View File
@@ -0,0 +1,269 @@
#!/usr/bin/env python3
"""
Report mosaic download progress from archives/scans.csv.
Shows elapsed time, processing rate, estimated time remaining, and
projected disk usage (actual downloaded + model-estimated remaining).
Rate/ETA require two calls at least 60s apart. Mean mosaic size is
sampled from up to 100 already-downloaded files and cached so later
calls skip the filesystem hit unless the sample is stale.
Usage:
python scripts/mosaic_progress_report.py [--archive ARCHIVE_DIR]
"""
import argparse
import csv
import json
import os
import random
import sys
from collections import Counter
from datetime import datetime, timezone
from pathlib import Path
# Year-based viability model derived from BW1-4 observations:
# pre-2019 → kept on server long-term (~100%)
# 2019-2022 → purged (~0%)
# 2023+ → recent, mostly available (~82%)
_R_PRE19 = 1.00
_R_PURGED = 0.00
_R_RECENT = 0.82
def _parse_dt(s: str) -> datetime | None:
try:
return datetime.fromisoformat(s.replace("Z", "+00:00"))
except Exception:
return None
def _fmt_duration(seconds: float) -> str:
if seconds < 0:
return "?"
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
if h >= 48:
return f"{h // 24}d {h % 24}h"
if h > 0:
return f"{h}h {m:02d}m"
return f"{m}m {int(seconds % 60):02d}s"
def _fmt_size(bytes_: float) -> str:
if bytes_ >= 1e12:
return f"{bytes_/1e12:.2f} TB"
if bytes_ >= 1e9:
return f"{bytes_/1e9:.2f} GB"
if bytes_ >= 1e6:
return f"{bytes_/1e6:.1f} MB"
return f"{bytes_/1e3:.0f} KB"
def _sample_mean_bytes(rows: list[dict], cache: dict, max_sample: int = 100) -> float | None:
"""
Return mean mosaic size in bytes, using a cached value when fresh
(< 1 hour old and based on a similarly-sized download set).
Falls back to filesystem sampling of `rows`.
"""
cached_mean = cache.get("mean_bytes")
cached_n = cache.get("sample_n", 0)
cached_ts = _parse_dt(cache.get("size_ts", ""))
now = datetime.now(timezone.utc)
if (
cached_mean
and cached_ts
and (now - cached_ts).total_seconds() < 3600
and cached_n >= min(max_sample, len(rows))
):
return float(cached_mean)
sample = random.sample(rows, min(max_sample, len(rows)))
sizes = []
for row in sample:
p = row.get("mosaic_local_path", "")
if p:
try:
sz = os.path.getsize(p)
if sz > 0:
sizes.append(sz)
except OSError:
pass
if not sizes:
return None
mean = sum(sizes) / len(sizes)
cache["mean_bytes"] = mean
cache["sample_n"] = len(sizes)
cache["size_ts"] = now.isoformat()
return mean
def _expected_remaining_downloads(pending_rows: list[dict]) -> float:
"""Estimate how many pending scans will produce a successful mosaic."""
count = 0.0
for row in pending_rows:
yr = row.get("scan_time", "")[:4]
if yr < "2019":
count += _R_PRE19
elif yr <= "2022":
count += _R_PURGED
else:
count += _R_RECENT
return count
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--archive", default="archives", help="Archives directory")
args = parser.parse_args()
archive = Path(args.archive)
scans_csv = archive / "scans.csv"
progress_json = archive / ".progress.json"
rate_cache_path = archive / ".mosaic_rate_cache.json"
if not scans_csv.exists():
sys.exit(f"scans.csv not found: {scans_csv}")
# --- Load scans (deduplicated: last row wins per machine+scan_id) ---
latest: dict[tuple[str, str], dict] = {}
with open(scans_csv, newline="", encoding="utf-8") as f:
for row in csv.DictReader(f):
key = (row.get("machine", ""), row.get("scan_id", ""))
latest[key] = row
by_machine: dict[str, Counter] = {}
total_counts: Counter = Counter()
downloaded_rows: list[dict] = []
pending_rows: list[dict] = []
for (_machine, _sid), row in latest.items():
status = row.get("mosaic_download_status", "")
m = row.get("machine", "")
by_machine.setdefault(m, Counter())
by_machine[m][status] += 1
total_counts[status] += 1
if status == "downloaded":
downloaded_rows.append(row)
elif status == "skipped_metadata_only":
pending_rows.append(row)
total = sum(total_counts.values())
downloaded = total_counts["downloaded"]
failed = total_counts["failed"]
zero_skipped = total_counts["skipped_zero_disk_space"]
pending = total_counts["skipped_metadata_only"]
processed = downloaded + failed + zero_skipped
attempted = downloaded + failed
now = datetime.now(timezone.utc)
# --- Elapsed time ---
elapsed_str = ""
if progress_json.exists():
try:
data = json.loads(progress_json.read_text())
started_at = _parse_dt(data.get("started_at", ""))
if started_at:
elapsed_str = _fmt_duration((now - started_at).total_seconds())
except Exception:
pass
# --- Rate cache (load, compute, update) ---
cache: dict = {}
if rate_cache_path.exists():
try:
cache = json.loads(rate_cache_path.read_text())
except Exception:
cache = {}
rate_scans_per_sec: float | None = None
prev_ts = _parse_dt(cache.get("timestamp", ""))
prev_proc = cache.get("processed")
if prev_ts and prev_proc is not None:
delta_t = (now - prev_ts).total_seconds()
delta_p = processed - int(prev_proc)
if delta_t >= 60 and delta_p > 0:
rate_scans_per_sec = delta_p / delta_t
# --- Disk space ---
mean_bytes: float | None = None
size_note = ""
if downloaded_rows:
mean_bytes = _sample_mean_bytes(downloaded_rows, cache)
if mean_bytes and cache.get("sample_n"):
size_note = f"(mean {_fmt_size(mean_bytes)} from {cache['sample_n']} files)"
downloaded_bytes: float | None = None
remaining_bytes: float | None = None
total_bytes: float | None = None
if mean_bytes:
downloaded_bytes = downloaded * mean_bytes
exp_remaining = _expected_remaining_downloads(pending_rows)
remaining_bytes = exp_remaining * mean_bytes
total_bytes = downloaded_bytes + remaining_bytes
# Update cache
cache["timestamp"] = now.isoformat()
cache["processed"] = processed
try:
rate_cache_path.write_text(json.dumps(cache))
except Exception:
pass
# --- Formatted output ---
rate_str = ""
eta_str = ""
if rate_scans_per_sec and rate_scans_per_sec > 0:
rate_str = f"{rate_scans_per_sec * 3600:,.0f} scans/hr"
eta_str = _fmt_duration(pending / rate_scans_per_sec)
print(f"Mosaic download progress \u2014 {datetime.now().strftime('%Y-%m-%d %H:%M')}")
print(f"Archive: {archive.resolve()}")
timing_parts = []
if elapsed_str:
timing_parts.append(f"Elapsed: {elapsed_str}")
if rate_str:
timing_parts.append(f"Rate: {rate_str}")
if eta_str:
timing_parts.append(f"ETA: {eta_str}")
if timing_parts:
print(" ".join(timing_parts))
print()
print(f" Downloaded: {downloaded:>8,} ({100*downloaded/total:.1f}%)")
print(f" Failed: {failed:>8,} ({100*failed/total:.1f}%)")
print(f" Skipped (disk=0): {zero_skipped:>8,} ({100*zero_skipped/total:.1f}%)")
print(f" Pending: {pending:>8,} ({100*pending/total:.1f}%)")
print(f" Total scans: {total:>8,}")
if attempted:
print(f" Success rate: {100*downloaded/attempted:.1f}% (of attempted)")
print()
if total_bytes is not None:
print(f" Disk space {size_note}")
print(f" Downloaded so far: {_fmt_size(downloaded_bytes):>10}")
print(f" Estimated remaining: {_fmt_size(remaining_bytes):>10} (model-based)")
print(f" Grand total: {_fmt_size(total_bytes):>10}")
print()
print(f" {'Machine':<25} {'Done':>7} {'Failed':>7} {'Skip0':>6} {'Pending':>8} {'Total':>7}")
print(" " + "-" * 70)
for machine in sorted(by_machine):
mc = by_machine[machine]
mt = sum(mc.values())
print(
f" {machine:<25} {mc['downloaded']:>7,} {mc['failed']:>7,} "
f"{mc['skipped_zero_disk_space']:>6,} {mc['skipped_metadata_only']:>8,} {mt:>7,}"
)
print(" " + "-" * 70)
print(
f" {'TOTAL':<25} {downloaded:>7,} {failed:>7,} "
f"{zero_skipped:>6,} {pending:>8,} {total:>7,}"
)
if __name__ == "__main__":
main()