8593808cf3
Co-authored-by: Cursor <cursoragent@cursor.com>
482 lines
17 KiB
Python
482 lines
17 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Report mosaic download progress from archives/scans.csv.
|
||
|
||
Output is Markdown. Use ``--by-year`` for a per-machine × per-year
|
||
done/failed table. When the first mosaic pass is complete (no pending rows)
|
||
but failures remain, a **Mosaic retry estimates** section is printed with
|
||
queue counts and duration hints.
|
||
|
||
Rate/ETA use a 30-minute rolling window when snapshots show progress.
|
||
Mean mosaic size is sampled from up to 100 downloads (1-hour cache).
|
||
|
||
Usage:
|
||
python scripts/mosaic_progress_report.py [--archive DIR] [--by-year]
|
||
"""
|
||
|
||
import argparse
|
||
import csv
|
||
import json
|
||
import os
|
||
import random
|
||
import sys
|
||
from collections import Counter
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
|
||
# Year-based viability model derived from BW1-4 observations:
|
||
# pre-2019 → kept on server long-term (~100 %)
|
||
# 2019-2022 → purged (~ 0 %)
|
||
# 2023+ → recent, mostly available (~82 %)
|
||
_R_PRE19 = 1.00
|
||
_R_PURGED = 0.00
|
||
_R_RECENT = 0.82
|
||
|
||
FIRST_PASS_FALLBACK_RATE_PER_HR = 1100.0
|
||
RETRY_OPTIMISTIC_RATE_PER_HR = 1800.0
|
||
RETRY_REALISTIC_RATE_PER_HR = 1100.0
|
||
RETRY_PESSIMISTIC_RATE_PER_HR = 300.0
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _parse_dt(s: str) -> datetime | None:
|
||
try:
|
||
return datetime.fromisoformat(s.replace("Z", "+00:00"))
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _fmt_duration(seconds: float) -> str:
|
||
if seconds < 0:
|
||
return "?"
|
||
h = int(seconds // 3600)
|
||
m = int((seconds % 3600) // 60)
|
||
if h >= 48:
|
||
return f"{h // 24}d {h % 24}h"
|
||
if h > 0:
|
||
return f"{h}h {m:02d}m"
|
||
return f"{m}m {int(seconds % 60):02d}s"
|
||
|
||
|
||
def _fmt_size(b: float) -> str:
|
||
if b >= 1e12:
|
||
return f"{b/1e12:.2f} TB"
|
||
if b >= 1e9:
|
||
return f"{b/1e9:.2f} GB"
|
||
if b >= 1e6:
|
||
return f"{b/1e6:.1f} MB"
|
||
return f"{b/1e3:.0f} KB"
|
||
|
||
|
||
def _md_table(headers: list[str], rows: list[list[str]], *, align: list[str] | None = None) -> str:
|
||
"""Render a Markdown table. align values: 'l', 'r', 'c' (default 'l')."""
|
||
if align is None:
|
||
align = ["l"] * len(headers)
|
||
sep_map = {"l": ":---", "r": "---:", "c": ":---:"}
|
||
|
||
def row_str(cells: list[str]) -> str:
|
||
return "| " + " | ".join(cells) + " |"
|
||
|
||
lines = [
|
||
row_str(headers),
|
||
row_str([sep_map.get(a, ":---") for a in align]),
|
||
]
|
||
for r in rows:
|
||
lines.append(row_str(r))
|
||
return "\n".join(lines)
|
||
|
||
|
||
def _sample_mean_bytes(rows: list[dict], cache: dict, max_sample: int = 100) -> float | None:
|
||
cached_mean = cache.get("mean_bytes")
|
||
cached_n = cache.get("sample_n", 0)
|
||
cached_ts = _parse_dt(cache.get("size_ts", ""))
|
||
now = datetime.now(timezone.utc)
|
||
if (
|
||
cached_mean and cached_ts
|
||
and (now - cached_ts).total_seconds() < 3600
|
||
and cached_n >= min(max_sample, len(rows))
|
||
):
|
||
return float(cached_mean)
|
||
|
||
sample = random.sample(rows, min(max_sample, len(rows)))
|
||
sizes = []
|
||
for row in sample:
|
||
p = row.get("mosaic_local_path", "")
|
||
if p:
|
||
try:
|
||
sz = os.path.getsize(p)
|
||
if sz > 0:
|
||
sizes.append(sz)
|
||
except OSError:
|
||
pass
|
||
if not sizes:
|
||
return None
|
||
mean = sum(sizes) / len(sizes)
|
||
cache["mean_bytes"] = mean
|
||
cache["sample_n"] = len(sizes)
|
||
cache["size_ts"] = now.isoformat()
|
||
return mean
|
||
|
||
|
||
def _expected_remaining(pending_rows: list[dict]) -> float:
|
||
count = 0.0
|
||
for row in pending_rows:
|
||
yr = row.get("scan_time", "")[:4]
|
||
if yr < "2019":
|
||
count += _R_PRE19
|
||
elif yr <= "2022":
|
||
count += _R_PURGED
|
||
else:
|
||
count += _R_RECENT
|
||
return count
|
||
|
||
|
||
def _retry_hours_from_rate(n_scans: int, rate_per_hr: float) -> str:
|
||
if n_scans <= 0 or rate_per_hr <= 0:
|
||
return "—"
|
||
return _fmt_duration(n_scans / rate_per_hr * 3600.0)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Main
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def main() -> None:
|
||
parser = argparse.ArgumentParser(description=__doc__)
|
||
parser.add_argument("--archive", default="archives")
|
||
parser.add_argument(
|
||
"--by-year", action="store_true",
|
||
help="Add a per-machine × per-year done/failed breakdown table",
|
||
)
|
||
args = parser.parse_args()
|
||
|
||
archive = Path(args.archive)
|
||
scans_csv = archive / "scans.csv"
|
||
progress_json = archive / ".progress.json"
|
||
rate_cache_path = archive / ".mosaic_rate_cache.json"
|
||
|
||
if not scans_csv.exists():
|
||
sys.exit(f"scans.csv not found: {scans_csv}")
|
||
|
||
# --- Load & deduplicate (last row per machine+scan_id) ---
|
||
latest: dict[tuple[str, str], dict] = {}
|
||
with open(scans_csv, newline="", encoding="utf-8") as f:
|
||
for row in csv.DictReader(f):
|
||
key = (row.get("machine", ""), row.get("scan_id", ""))
|
||
latest[key] = row
|
||
|
||
by_machine: dict[str, Counter] = {}
|
||
# machine -> year -> Counter(status -> count)
|
||
by_machine_year: dict[str, dict[str, Counter]] = {}
|
||
total_counts: Counter = Counter()
|
||
downloaded_rows: list[dict] = []
|
||
pending_rows: list[dict] = []
|
||
|
||
for (_m, _sid), row in latest.items():
|
||
status = row.get("mosaic_download_status", "")
|
||
m = row.get("machine", "")
|
||
yr = (row.get("scan_time") or "")[:4] or "????"
|
||
by_machine.setdefault(m, Counter())[status] += 1
|
||
by_machine_year.setdefault(m, {}).setdefault(yr, Counter())[status] += 1
|
||
total_counts[status] += 1
|
||
if status == "downloaded":
|
||
downloaded_rows.append(row)
|
||
elif status == "skipped_metadata_only":
|
||
pending_rows.append(row)
|
||
|
||
total = sum(total_counts.values())
|
||
downloaded = total_counts["downloaded"]
|
||
failed = total_counts["failed"]
|
||
zero_skipped = total_counts["skipped_zero_disk_space"]
|
||
pending = total_counts["skipped_metadata_only"]
|
||
processed = downloaded + failed + zero_skipped
|
||
attempted = downloaded + failed
|
||
now = datetime.now(timezone.utc)
|
||
|
||
# --- Elapsed ---
|
||
elapsed_str = ""
|
||
if progress_json.exists():
|
||
try:
|
||
data = json.loads(progress_json.read_text())
|
||
started_at = _parse_dt(data.get("started_at", ""))
|
||
if started_at:
|
||
elapsed_str = _fmt_duration((now - started_at).total_seconds())
|
||
except Exception:
|
||
pass
|
||
|
||
# --- Rate cache ---
|
||
cache: dict = {}
|
||
if rate_cache_path.exists():
|
||
try:
|
||
cache = json.loads(rate_cache_path.read_text())
|
||
except Exception:
|
||
pass
|
||
|
||
# Rolling rate: keep up to 60 snapshots; compute rate from the oldest
|
||
# snapshot within the last 30 minutes for a smoothed estimate.
|
||
snapshots: list[dict] = cache.get("snapshots", [])
|
||
# Prune snapshots older than 30 minutes, but keep at least one
|
||
cutoff = now.timestamp() - 1800
|
||
recent = [s for s in snapshots if s.get("ts", 0) >= cutoff]
|
||
if not recent and snapshots:
|
||
recent = [snapshots[-1]] # always keep one for continuity
|
||
|
||
rate_per_sec: float | None = None
|
||
rate_window_str = ""
|
||
snap_delta_proc = 0
|
||
if recent:
|
||
oldest = recent[0]
|
||
dt = now.timestamp() - oldest["ts"]
|
||
dp = processed - oldest["proc"]
|
||
snap_delta_proc = dp
|
||
if dt >= 60 and dp > 0:
|
||
rate_per_sec = dp / dt
|
||
window_min = dt / 60
|
||
rate_window_str = f"{window_min:.0f}-min avg"
|
||
|
||
# One-time baseline after initial mosaic crawl finished (no pending rows).
|
||
if pending == 0 and "first_pass_mean_rate_per_hr" not in cache:
|
||
cache["first_pass_completed_at"] = now.isoformat()
|
||
cache["first_pass_processed"] = total
|
||
cache["first_pass_mean_rate_per_hr"] = FIRST_PASS_FALLBACK_RATE_PER_HR
|
||
|
||
first_pass_rate_hr = float(
|
||
cache.get("first_pass_mean_rate_per_hr", FIRST_PASS_FALLBACK_RATE_PER_HR)
|
||
)
|
||
live_rate_hr = rate_per_sec * 3600 if rate_per_sec else None
|
||
# Active scrape shows progress in snapshots; idle archive shows dp == 0.
|
||
retry_estimate_rate_hr = (
|
||
live_rate_hr if live_rate_hr is not None else first_pass_rate_hr
|
||
)
|
||
|
||
# --- Disk space ---
|
||
mean_bytes: float | None = None
|
||
size_note = ""
|
||
if downloaded_rows:
|
||
mean_bytes = _sample_mean_bytes(downloaded_rows, cache)
|
||
if mean_bytes and cache.get("sample_n"):
|
||
size_note = f"mean {_fmt_size(mean_bytes)} × {cache['sample_n']} sampled files"
|
||
|
||
dl_bytes: float | None = None
|
||
rem_bytes: float | None = None
|
||
if mean_bytes:
|
||
dl_bytes = downloaded * mean_bytes
|
||
rem_bytes = _expected_remaining(pending_rows) * mean_bytes
|
||
|
||
# Update cache: append new snapshot, keep last 60
|
||
recent.append({"ts": now.timestamp(), "proc": processed})
|
||
cache["snapshots"] = recent[-60:]
|
||
# Keep legacy keys for backwards compat
|
||
cache["timestamp"] = now.isoformat()
|
||
cache["processed"] = processed
|
||
try:
|
||
rate_cache_path.write_text(json.dumps(cache))
|
||
except Exception:
|
||
pass
|
||
|
||
rate_str = eta_str = ""
|
||
if rate_per_sec and rate_per_sec > 0:
|
||
rate_str = f"{rate_per_sec * 3600:,.0f} scans/hr ({rate_window_str})"
|
||
eta_str = _fmt_duration(pending / rate_per_sec)
|
||
|
||
# -----------------------------------------------------------------------
|
||
# Output — Markdown
|
||
# -----------------------------------------------------------------------
|
||
|
||
ts = datetime.now().strftime("%Y-%m-%d %H:%M")
|
||
print(f"# Mosaic download progress — {ts}\n")
|
||
print(f"**Archive:** `{archive.resolve()}` ")
|
||
meta_parts = []
|
||
if elapsed_str:
|
||
meta_parts.append(f"**Elapsed:** {elapsed_str}")
|
||
if rate_str:
|
||
meta_parts.append(f"**Rate:** {rate_str}")
|
||
if eta_str:
|
||
meta_parts.append(f"**ETA:** {eta_str}")
|
||
if meta_parts:
|
||
print(" ".join(meta_parts) + " ")
|
||
print()
|
||
|
||
# Summary table
|
||
summary_rows = [
|
||
["Downloaded", f"{downloaded:,}", f"{100*downloaded/total:.1f}%"],
|
||
["Failed", f"{failed:,}", f"{100*failed/total:.1f}%"],
|
||
["Skipped (disk=0)",f"{zero_skipped:,}", f"{100*zero_skipped/total:.1f}%"],
|
||
["Pending", f"{pending:,}", f"{100*pending/total:.1f}%"],
|
||
["**Total**", f"**{total:,}**", ""],
|
||
]
|
||
if attempted:
|
||
summary_rows.append(["**Success rate**", f"**{100*downloaded/attempted:.1f}%**", "*(of attempted)*"])
|
||
print(_md_table(["Metric", "Count", ""], summary_rows, align=["l", "r", "l"]))
|
||
print()
|
||
|
||
# Disk space
|
||
if dl_bytes is not None and rem_bytes is not None:
|
||
total_bytes = dl_bytes + rem_bytes
|
||
print(f"### Disk space\n")
|
||
print(f"_{size_note}_\n")
|
||
ds_rows = [
|
||
["Downloaded so far", _fmt_size(dl_bytes), ""],
|
||
["Estimated remaining", _fmt_size(rem_bytes), "*(model-based)*"],
|
||
["**Grand total**", f"**{_fmt_size(total_bytes)}**", ""],
|
||
]
|
||
print(_md_table(["", "Size", ""], ds_rows, align=["l", "r", "l"]))
|
||
print()
|
||
|
||
# Per-machine breakdown
|
||
print("### Per-machine breakdown\n")
|
||
machines = sorted(by_machine)
|
||
mc_rows = []
|
||
for m in machines:
|
||
mc = by_machine[m]
|
||
mt = sum(mc.values())
|
||
mc_rows.append([
|
||
m,
|
||
f"{mc['downloaded']:,}",
|
||
f"{mc['failed']:,}",
|
||
f"{mc['skipped_zero_disk_space']:,}",
|
||
f"{mc['skipped_metadata_only']:,}",
|
||
f"{mt:,}",
|
||
])
|
||
mc_rows.append([
|
||
"**TOTAL**",
|
||
f"**{downloaded:,}**",
|
||
f"**{failed:,}**",
|
||
f"**{zero_skipped:,}**",
|
||
f"**{pending:,}**",
|
||
f"**{total:,}**",
|
||
])
|
||
print(_md_table(
|
||
["Machine", "Done", "Failed", "Skip0", "Pending", "Total"],
|
||
mc_rows,
|
||
align=["l", "r", "r", "r", "r", "r"],
|
||
))
|
||
|
||
# -----------------------------------------------------------------------
|
||
# Retry estimates (first pass complete: pending == 0, failures remain)
|
||
# -----------------------------------------------------------------------
|
||
if failed > 0 and pending == 0:
|
||
failed_rows_list = [
|
||
r for r in latest.values()
|
||
if r.get("mosaic_download_status") == "failed"
|
||
]
|
||
n_all = len(failed_rows_list)
|
||
n_2023 = sum(
|
||
1 for r in failed_rows_list
|
||
if (r.get("scan_time") or "")[:4] >= "2023"
|
||
and len((r.get("scan_time") or "")[:4]) == 4
|
||
)
|
||
n_200 = sum(
|
||
1 for r in failed_rows_list
|
||
if r.get("mosaic_error_code") == "200"
|
||
)
|
||
rate_note = (
|
||
"rolling 30-min window"
|
||
if snap_delta_proc > 0
|
||
else f"first-pass baseline ({first_pass_rate_hr:,.0f}/hr)"
|
||
)
|
||
print()
|
||
print("### Mosaic retry estimates\n")
|
||
print(
|
||
f"*Suggested command after server fix:* "
|
||
f"`python scraper.py --retry-failed --workers 2` "
|
||
f"(filters: `--retry-since YEAR`, `--retry-error-code CODE`)*\n"
|
||
)
|
||
print(
|
||
f"*ETA column uses **{retry_estimate_rate_hr:,.0f} scans/hr** "
|
||
f"({rate_note}). Fixed columns use scenario rates.*\n"
|
||
)
|
||
est_hdr = (
|
||
"Retry scope",
|
||
"Count",
|
||
f"@{RETRY_OPTIMISTIC_RATE_PER_HR:.0f}/hr",
|
||
f"@{RETRY_REALISTIC_RATE_PER_HR:.0f}/hr",
|
||
f"@{RETRY_PESSIMISTIC_RATE_PER_HR:.0f}/hr",
|
||
f"@{retry_estimate_rate_hr:.0f}/hr",
|
||
)
|
||
retry_tbl_rows = [
|
||
[
|
||
"HTTP 200 (empty body)",
|
||
f"{n_200:,}",
|
||
_retry_hours_from_rate(n_200, RETRY_OPTIMISTIC_RATE_PER_HR),
|
||
_retry_hours_from_rate(n_200, RETRY_REALISTIC_RATE_PER_HR),
|
||
_retry_hours_from_rate(n_200, RETRY_PESSIMISTIC_RATE_PER_HR),
|
||
_retry_hours_from_rate(n_200, retry_estimate_rate_hr),
|
||
],
|
||
[
|
||
"Failed, scan_time ≥ 2023",
|
||
f"{n_2023:,}",
|
||
_retry_hours_from_rate(n_2023, RETRY_OPTIMISTIC_RATE_PER_HR),
|
||
_retry_hours_from_rate(n_2023, RETRY_REALISTIC_RATE_PER_HR),
|
||
_retry_hours_from_rate(n_2023, RETRY_PESSIMISTIC_RATE_PER_HR),
|
||
_retry_hours_from_rate(n_2023, retry_estimate_rate_hr),
|
||
],
|
||
[
|
||
"**All failed**",
|
||
f"**{n_all:,}**",
|
||
_retry_hours_from_rate(n_all, RETRY_OPTIMISTIC_RATE_PER_HR),
|
||
_retry_hours_from_rate(n_all, RETRY_REALISTIC_RATE_PER_HR),
|
||
_retry_hours_from_rate(n_all, RETRY_PESSIMISTIC_RATE_PER_HR),
|
||
_retry_hours_from_rate(n_all, retry_estimate_rate_hr),
|
||
],
|
||
]
|
||
print(_md_table(list(est_hdr), retry_tbl_rows, align=["l", "r", "r", "r", "r", "r"]))
|
||
|
||
# -----------------------------------------------------------------------
|
||
# --by-year table
|
||
# -----------------------------------------------------------------------
|
||
if args.by_year:
|
||
print()
|
||
print("### Downloads by machine and year\n")
|
||
print("*Format: done / failed*\n")
|
||
|
||
# Only include years that have at least one downloaded or failed scan
|
||
all_years = sorted(
|
||
yr for yr in set(
|
||
yr for m_data in by_machine_year.values() for yr in m_data
|
||
)
|
||
if any(
|
||
by_machine_year[m].get(yr, Counter()).get("downloaded", 0)
|
||
+ by_machine_year[m].get(yr, Counter()).get("failed", 0) > 0
|
||
for m in by_machine_year
|
||
)
|
||
)
|
||
|
||
# Totals per year
|
||
yr_totals: dict[str, Counter] = {}
|
||
for yr in all_years:
|
||
yr_totals[yr] = Counter()
|
||
for m in machines:
|
||
yr_totals[yr] += by_machine_year.get(m, {}).get(yr, Counter())
|
||
|
||
year_rows = []
|
||
for m in machines:
|
||
row_cells = [m]
|
||
for yr in all_years:
|
||
c = by_machine_year.get(m, {}).get(yr, Counter())
|
||
d = c.get("downloaded", 0)
|
||
f = c.get("failed", 0)
|
||
row_cells.append(f"{d:,} / {f:,}" if (d or f) else "—")
|
||
year_rows.append(row_cells)
|
||
|
||
# Totals row
|
||
total_cells = ["**TOTAL**"]
|
||
for yr in all_years:
|
||
d = yr_totals[yr].get("downloaded", 0)
|
||
f = yr_totals[yr].get("failed", 0)
|
||
total_cells.append(f"**{d:,} / {f:,}**" if (d or f) else "—")
|
||
year_rows.append(total_cells)
|
||
|
||
print(_md_table(
|
||
["Machine"] + all_years,
|
||
year_rows,
|
||
align=["l"] + ["r"] * len(all_years),
|
||
))
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|