#!/usr/bin/env python3 """ Report mosaic download progress from archives/scans.csv. Output is Markdown. Use ``--by-year`` for a per-machine × per-year done/failed table. When the first mosaic pass is complete (no pending rows) but failures remain, a **Mosaic retry estimates** section is printed with queue counts and duration hints. Rate/ETA use a 30-minute rolling window when snapshots show progress. Mean mosaic size is sampled from up to 100 downloads (1-hour cache). Usage: python scripts/mosaic_progress_report.py [--archive DIR] [--by-year] """ import argparse import csv import json import os import random import sys from collections import Counter from datetime import datetime, timezone from pathlib import Path # Year-based viability model derived from BW1-4 observations: # pre-2019 → kept on server long-term (~100 %) # 2019-2022 → purged (~ 0 %) # 2023+ → recent, mostly available (~82 %) _R_PRE19 = 1.00 _R_PURGED = 0.00 _R_RECENT = 0.82 FIRST_PASS_FALLBACK_RATE_PER_HR = 1100.0 RETRY_OPTIMISTIC_RATE_PER_HR = 1800.0 RETRY_REALISTIC_RATE_PER_HR = 1100.0 RETRY_PESSIMISTIC_RATE_PER_HR = 300.0 # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _parse_dt(s: str) -> datetime | None: try: return datetime.fromisoformat(s.replace("Z", "+00:00")) except Exception: return None def _fmt_duration(seconds: float) -> str: if seconds < 0: return "?" h = int(seconds // 3600) m = int((seconds % 3600) // 60) if h >= 48: return f"{h // 24}d {h % 24}h" if h > 0: return f"{h}h {m:02d}m" return f"{m}m {int(seconds % 60):02d}s" def _fmt_size(b: float) -> str: if b >= 1e12: return f"{b/1e12:.2f} TB" if b >= 1e9: return f"{b/1e9:.2f} GB" if b >= 1e6: return f"{b/1e6:.1f} MB" return f"{b/1e3:.0f} KB" def _md_table(headers: list[str], rows: list[list[str]], *, align: list[str] | None = None) -> str: """Render a Markdown table. align values: 'l', 'r', 'c' (default 'l').""" if align is None: align = ["l"] * len(headers) sep_map = {"l": ":---", "r": "---:", "c": ":---:"} def row_str(cells: list[str]) -> str: return "| " + " | ".join(cells) + " |" lines = [ row_str(headers), row_str([sep_map.get(a, ":---") for a in align]), ] for r in rows: lines.append(row_str(r)) return "\n".join(lines) def _sample_mean_bytes(rows: list[dict], cache: dict, max_sample: int = 100) -> float | None: cached_mean = cache.get("mean_bytes") cached_n = cache.get("sample_n", 0) cached_ts = _parse_dt(cache.get("size_ts", "")) now = datetime.now(timezone.utc) if ( cached_mean and cached_ts and (now - cached_ts).total_seconds() < 3600 and cached_n >= min(max_sample, len(rows)) ): return float(cached_mean) sample = random.sample(rows, min(max_sample, len(rows))) sizes = [] for row in sample: p = row.get("mosaic_local_path", "") if p: try: sz = os.path.getsize(p) if sz > 0: sizes.append(sz) except OSError: pass if not sizes: return None mean = sum(sizes) / len(sizes) cache["mean_bytes"] = mean cache["sample_n"] = len(sizes) cache["size_ts"] = now.isoformat() return mean def _expected_remaining(pending_rows: list[dict]) -> float: count = 0.0 for row in pending_rows: yr = row.get("scan_time", "")[:4] if yr < "2019": count += _R_PRE19 elif yr <= "2022": count += _R_PURGED else: count += _R_RECENT return count def _retry_hours_from_rate(n_scans: int, rate_per_hr: float) -> str: if n_scans <= 0 or rate_per_hr <= 0: return "—" return _fmt_duration(n_scans / rate_per_hr * 3600.0) # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main() -> None: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--archive", default="archives") parser.add_argument( "--by-year", action="store_true", help="Add a per-machine × per-year done/failed breakdown table", ) args = parser.parse_args() archive = Path(args.archive) scans_csv = archive / "scans.csv" progress_json = archive / ".progress.json" rate_cache_path = archive / ".mosaic_rate_cache.json" if not scans_csv.exists(): sys.exit(f"scans.csv not found: {scans_csv}") # --- Load & deduplicate (last row per machine+scan_id) --- latest: dict[tuple[str, str], dict] = {} with open(scans_csv, newline="", encoding="utf-8") as f: for row in csv.DictReader(f): key = (row.get("machine", ""), row.get("scan_id", "")) latest[key] = row by_machine: dict[str, Counter] = {} # machine -> year -> Counter(status -> count) by_machine_year: dict[str, dict[str, Counter]] = {} total_counts: Counter = Counter() downloaded_rows: list[dict] = [] pending_rows: list[dict] = [] for (_m, _sid), row in latest.items(): status = row.get("mosaic_download_status", "") m = row.get("machine", "") yr = (row.get("scan_time") or "")[:4] or "????" by_machine.setdefault(m, Counter())[status] += 1 by_machine_year.setdefault(m, {}).setdefault(yr, Counter())[status] += 1 total_counts[status] += 1 if status == "downloaded": downloaded_rows.append(row) elif status == "skipped_metadata_only": pending_rows.append(row) total = sum(total_counts.values()) downloaded = total_counts["downloaded"] failed = total_counts["failed"] zero_skipped = total_counts["skipped_zero_disk_space"] pending = total_counts["skipped_metadata_only"] processed = downloaded + failed + zero_skipped attempted = downloaded + failed now = datetime.now(timezone.utc) # --- Elapsed --- elapsed_str = "" if progress_json.exists(): try: data = json.loads(progress_json.read_text()) started_at = _parse_dt(data.get("started_at", "")) if started_at: elapsed_str = _fmt_duration((now - started_at).total_seconds()) except Exception: pass # --- Rate cache --- cache: dict = {} if rate_cache_path.exists(): try: cache = json.loads(rate_cache_path.read_text()) except Exception: pass # Rolling rate: keep up to 60 snapshots; compute rate from the oldest # snapshot within the last 30 minutes for a smoothed estimate. snapshots: list[dict] = cache.get("snapshots", []) # Prune snapshots older than 30 minutes, but keep at least one cutoff = now.timestamp() - 1800 recent = [s for s in snapshots if s.get("ts", 0) >= cutoff] if not recent and snapshots: recent = [snapshots[-1]] # always keep one for continuity rate_per_sec: float | None = None rate_window_str = "" snap_delta_proc = 0 if recent: oldest = recent[0] dt = now.timestamp() - oldest["ts"] dp = processed - oldest["proc"] snap_delta_proc = dp if dt >= 60 and dp > 0: rate_per_sec = dp / dt window_min = dt / 60 rate_window_str = f"{window_min:.0f}-min avg" # One-time baseline after initial mosaic crawl finished (no pending rows). if pending == 0 and "first_pass_mean_rate_per_hr" not in cache: cache["first_pass_completed_at"] = now.isoformat() cache["first_pass_processed"] = total cache["first_pass_mean_rate_per_hr"] = FIRST_PASS_FALLBACK_RATE_PER_HR first_pass_rate_hr = float( cache.get("first_pass_mean_rate_per_hr", FIRST_PASS_FALLBACK_RATE_PER_HR) ) live_rate_hr = rate_per_sec * 3600 if rate_per_sec else None # Active scrape shows progress in snapshots; idle archive shows dp == 0. retry_estimate_rate_hr = ( live_rate_hr if live_rate_hr is not None else first_pass_rate_hr ) # --- Disk space --- mean_bytes: float | None = None size_note = "" if downloaded_rows: mean_bytes = _sample_mean_bytes(downloaded_rows, cache) if mean_bytes and cache.get("sample_n"): size_note = f"mean {_fmt_size(mean_bytes)} × {cache['sample_n']} sampled files" dl_bytes: float | None = None rem_bytes: float | None = None if mean_bytes: dl_bytes = downloaded * mean_bytes rem_bytes = _expected_remaining(pending_rows) * mean_bytes # Update cache: append new snapshot, keep last 60 recent.append({"ts": now.timestamp(), "proc": processed}) cache["snapshots"] = recent[-60:] # Keep legacy keys for backwards compat cache["timestamp"] = now.isoformat() cache["processed"] = processed try: rate_cache_path.write_text(json.dumps(cache)) except Exception: pass rate_str = eta_str = "" if rate_per_sec and rate_per_sec > 0: rate_str = f"{rate_per_sec * 3600:,.0f} scans/hr ({rate_window_str})" eta_str = _fmt_duration(pending / rate_per_sec) # ----------------------------------------------------------------------- # Output — Markdown # ----------------------------------------------------------------------- ts = datetime.now().strftime("%Y-%m-%d %H:%M") print(f"# Mosaic download progress — {ts}\n") print(f"**Archive:** `{archive.resolve()}` ") meta_parts = [] if elapsed_str: meta_parts.append(f"**Elapsed:** {elapsed_str}") if rate_str: meta_parts.append(f"**Rate:** {rate_str}") if eta_str: meta_parts.append(f"**ETA:** {eta_str}") if meta_parts: print(" ".join(meta_parts) + " ") print() # Summary table summary_rows = [ ["Downloaded", f"{downloaded:,}", f"{100*downloaded/total:.1f}%"], ["Failed", f"{failed:,}", f"{100*failed/total:.1f}%"], ["Skipped (disk=0)",f"{zero_skipped:,}", f"{100*zero_skipped/total:.1f}%"], ["Pending", f"{pending:,}", f"{100*pending/total:.1f}%"], ["**Total**", f"**{total:,}**", ""], ] if attempted: summary_rows.append(["**Success rate**", f"**{100*downloaded/attempted:.1f}%**", "*(of attempted)*"]) print(_md_table(["Metric", "Count", ""], summary_rows, align=["l", "r", "l"])) print() # Disk space if dl_bytes is not None and rem_bytes is not None: total_bytes = dl_bytes + rem_bytes print(f"### Disk space\n") print(f"_{size_note}_\n") ds_rows = [ ["Downloaded so far", _fmt_size(dl_bytes), ""], ["Estimated remaining", _fmt_size(rem_bytes), "*(model-based)*"], ["**Grand total**", f"**{_fmt_size(total_bytes)}**", ""], ] print(_md_table(["", "Size", ""], ds_rows, align=["l", "r", "l"])) print() # Per-machine breakdown print("### Per-machine breakdown\n") machines = sorted(by_machine) mc_rows = [] for m in machines: mc = by_machine[m] mt = sum(mc.values()) mc_rows.append([ m, f"{mc['downloaded']:,}", f"{mc['failed']:,}", f"{mc['skipped_zero_disk_space']:,}", f"{mc['skipped_metadata_only']:,}", f"{mt:,}", ]) mc_rows.append([ "**TOTAL**", f"**{downloaded:,}**", f"**{failed:,}**", f"**{zero_skipped:,}**", f"**{pending:,}**", f"**{total:,}**", ]) print(_md_table( ["Machine", "Done", "Failed", "Skip0", "Pending", "Total"], mc_rows, align=["l", "r", "r", "r", "r", "r"], )) # ----------------------------------------------------------------------- # Retry estimates (first pass complete: pending == 0, failures remain) # ----------------------------------------------------------------------- if failed > 0 and pending == 0: failed_rows_list = [ r for r in latest.values() if r.get("mosaic_download_status") == "failed" ] n_all = len(failed_rows_list) n_2023 = sum( 1 for r in failed_rows_list if (r.get("scan_time") or "")[:4] >= "2023" and len((r.get("scan_time") or "")[:4]) == 4 ) n_200 = sum( 1 for r in failed_rows_list if r.get("mosaic_error_code") == "200" ) rate_note = ( "rolling 30-min window" if snap_delta_proc > 0 else f"first-pass baseline ({first_pass_rate_hr:,.0f}/hr)" ) print() print("### Mosaic retry estimates\n") print( f"*Suggested command after server fix:* " f"`python scraper.py --retry-failed --workers 2` " f"(filters: `--retry-since YEAR`, `--retry-error-code CODE`)*\n" ) print( f"*ETA column uses **{retry_estimate_rate_hr:,.0f} scans/hr** " f"({rate_note}). Fixed columns use scenario rates.*\n" ) est_hdr = ( "Retry scope", "Count", f"@{RETRY_OPTIMISTIC_RATE_PER_HR:.0f}/hr", f"@{RETRY_REALISTIC_RATE_PER_HR:.0f}/hr", f"@{RETRY_PESSIMISTIC_RATE_PER_HR:.0f}/hr", f"@{retry_estimate_rate_hr:.0f}/hr", ) retry_tbl_rows = [ [ "HTTP 200 (empty body)", f"{n_200:,}", _retry_hours_from_rate(n_200, RETRY_OPTIMISTIC_RATE_PER_HR), _retry_hours_from_rate(n_200, RETRY_REALISTIC_RATE_PER_HR), _retry_hours_from_rate(n_200, RETRY_PESSIMISTIC_RATE_PER_HR), _retry_hours_from_rate(n_200, retry_estimate_rate_hr), ], [ "Failed, scan_time ≥ 2023", f"{n_2023:,}", _retry_hours_from_rate(n_2023, RETRY_OPTIMISTIC_RATE_PER_HR), _retry_hours_from_rate(n_2023, RETRY_REALISTIC_RATE_PER_HR), _retry_hours_from_rate(n_2023, RETRY_PESSIMISTIC_RATE_PER_HR), _retry_hours_from_rate(n_2023, retry_estimate_rate_hr), ], [ "**All failed**", f"**{n_all:,}**", _retry_hours_from_rate(n_all, RETRY_OPTIMISTIC_RATE_PER_HR), _retry_hours_from_rate(n_all, RETRY_REALISTIC_RATE_PER_HR), _retry_hours_from_rate(n_all, RETRY_PESSIMISTIC_RATE_PER_HR), _retry_hours_from_rate(n_all, retry_estimate_rate_hr), ], ] print(_md_table(list(est_hdr), retry_tbl_rows, align=["l", "r", "r", "r", "r", "r"])) # ----------------------------------------------------------------------- # --by-year table # ----------------------------------------------------------------------- if args.by_year: print() print("### Downloads by machine and year\n") print("*Format: done / failed*\n") # Only include years that have at least one downloaded or failed scan all_years = sorted( yr for yr in set( yr for m_data in by_machine_year.values() for yr in m_data ) if any( by_machine_year[m].get(yr, Counter()).get("downloaded", 0) + by_machine_year[m].get(yr, Counter()).get("failed", 0) > 0 for m in by_machine_year ) ) # Totals per year yr_totals: dict[str, Counter] = {} for yr in all_years: yr_totals[yr] = Counter() for m in machines: yr_totals[yr] += by_machine_year.get(m, {}).get(yr, Counter()) year_rows = [] for m in machines: row_cells = [m] for yr in all_years: c = by_machine_year.get(m, {}).get(yr, Counter()) d = c.get("downloaded", 0) f = c.get("failed", 0) row_cells.append(f"{d:,} / {f:,}" if (d or f) else "—") year_rows.append(row_cells) # Totals row total_cells = ["**TOTAL**"] for yr in all_years: d = yr_totals[yr].get("downloaded", 0) f = yr_totals[yr].get("failed", 0) total_cells.append(f"**{d:,} / {f:,}**" if (d or f) else "—") year_rows.append(total_cells) print(_md_table( ["Machine"] + all_years, year_rows, align=["l"] + ["r"] * len(all_years), )) if __name__ == "__main__": main()