Files
SPRUCE-scraper/scripts/mosaic_progress_report.py
T

482 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Report mosaic download progress from archives/scans.csv.
Output is Markdown. Use ``--by-year`` for a per-machine × per-year
done/failed table. When the first mosaic pass is complete (no pending rows)
but failures remain, a **Mosaic retry estimates** section is printed with
queue counts and duration hints.
Rate/ETA use a 30-minute rolling window when snapshots show progress.
Mean mosaic size is sampled from up to 100 downloads (1-hour cache).
Usage:
python scripts/mosaic_progress_report.py [--archive DIR] [--by-year]
"""
import argparse
import csv
import json
import os
import random
import sys
from collections import Counter
from datetime import datetime, timezone
from pathlib import Path
# Year-based viability model derived from BW1-4 observations:
# pre-2019 → kept on server long-term (~100 %)
# 2019-2022 → purged (~ 0 %)
# 2023+ → recent, mostly available (~82 %)
_R_PRE19 = 1.00
_R_PURGED = 0.00
_R_RECENT = 0.82
FIRST_PASS_FALLBACK_RATE_PER_HR = 1100.0
RETRY_OPTIMISTIC_RATE_PER_HR = 1800.0
RETRY_REALISTIC_RATE_PER_HR = 1100.0
RETRY_PESSIMISTIC_RATE_PER_HR = 300.0
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _parse_dt(s: str) -> datetime | None:
try:
return datetime.fromisoformat(s.replace("Z", "+00:00"))
except Exception:
return None
def _fmt_duration(seconds: float) -> str:
if seconds < 0:
return "?"
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
if h >= 48:
return f"{h // 24}d {h % 24}h"
if h > 0:
return f"{h}h {m:02d}m"
return f"{m}m {int(seconds % 60):02d}s"
def _fmt_size(b: float) -> str:
if b >= 1e12:
return f"{b/1e12:.2f} TB"
if b >= 1e9:
return f"{b/1e9:.2f} GB"
if b >= 1e6:
return f"{b/1e6:.1f} MB"
return f"{b/1e3:.0f} KB"
def _md_table(headers: list[str], rows: list[list[str]], *, align: list[str] | None = None) -> str:
"""Render a Markdown table. align values: 'l', 'r', 'c' (default 'l')."""
if align is None:
align = ["l"] * len(headers)
sep_map = {"l": ":---", "r": "---:", "c": ":---:"}
def row_str(cells: list[str]) -> str:
return "| " + " | ".join(cells) + " |"
lines = [
row_str(headers),
row_str([sep_map.get(a, ":---") for a in align]),
]
for r in rows:
lines.append(row_str(r))
return "\n".join(lines)
def _sample_mean_bytes(rows: list[dict], cache: dict, max_sample: int = 100) -> float | None:
cached_mean = cache.get("mean_bytes")
cached_n = cache.get("sample_n", 0)
cached_ts = _parse_dt(cache.get("size_ts", ""))
now = datetime.now(timezone.utc)
if (
cached_mean and cached_ts
and (now - cached_ts).total_seconds() < 3600
and cached_n >= min(max_sample, len(rows))
):
return float(cached_mean)
sample = random.sample(rows, min(max_sample, len(rows)))
sizes = []
for row in sample:
p = row.get("mosaic_local_path", "")
if p:
try:
sz = os.path.getsize(p)
if sz > 0:
sizes.append(sz)
except OSError:
pass
if not sizes:
return None
mean = sum(sizes) / len(sizes)
cache["mean_bytes"] = mean
cache["sample_n"] = len(sizes)
cache["size_ts"] = now.isoformat()
return mean
def _expected_remaining(pending_rows: list[dict]) -> float:
count = 0.0
for row in pending_rows:
yr = row.get("scan_time", "")[:4]
if yr < "2019":
count += _R_PRE19
elif yr <= "2022":
count += _R_PURGED
else:
count += _R_RECENT
return count
def _retry_hours_from_rate(n_scans: int, rate_per_hr: float) -> str:
if n_scans <= 0 or rate_per_hr <= 0:
return "—"
return _fmt_duration(n_scans / rate_per_hr * 3600.0)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--archive", default="archives")
parser.add_argument(
"--by-year", action="store_true",
help="Add a per-machine × per-year done/failed breakdown table",
)
args = parser.parse_args()
archive = Path(args.archive)
scans_csv = archive / "scans.csv"
progress_json = archive / ".progress.json"
rate_cache_path = archive / ".mosaic_rate_cache.json"
if not scans_csv.exists():
sys.exit(f"scans.csv not found: {scans_csv}")
# --- Load & deduplicate (last row per machine+scan_id) ---
latest: dict[tuple[str, str], dict] = {}
with open(scans_csv, newline="", encoding="utf-8") as f:
for row in csv.DictReader(f):
key = (row.get("machine", ""), row.get("scan_id", ""))
latest[key] = row
by_machine: dict[str, Counter] = {}
# machine -> year -> Counter(status -> count)
by_machine_year: dict[str, dict[str, Counter]] = {}
total_counts: Counter = Counter()
downloaded_rows: list[dict] = []
pending_rows: list[dict] = []
for (_m, _sid), row in latest.items():
status = row.get("mosaic_download_status", "")
m = row.get("machine", "")
yr = (row.get("scan_time") or "")[:4] or "????"
by_machine.setdefault(m, Counter())[status] += 1
by_machine_year.setdefault(m, {}).setdefault(yr, Counter())[status] += 1
total_counts[status] += 1
if status == "downloaded":
downloaded_rows.append(row)
elif status == "skipped_metadata_only":
pending_rows.append(row)
total = sum(total_counts.values())
downloaded = total_counts["downloaded"]
failed = total_counts["failed"]
zero_skipped = total_counts["skipped_zero_disk_space"]
pending = total_counts["skipped_metadata_only"]
processed = downloaded + failed + zero_skipped
attempted = downloaded + failed
now = datetime.now(timezone.utc)
# --- Elapsed ---
elapsed_str = ""
if progress_json.exists():
try:
data = json.loads(progress_json.read_text())
started_at = _parse_dt(data.get("started_at", ""))
if started_at:
elapsed_str = _fmt_duration((now - started_at).total_seconds())
except Exception:
pass
# --- Rate cache ---
cache: dict = {}
if rate_cache_path.exists():
try:
cache = json.loads(rate_cache_path.read_text())
except Exception:
pass
# Rolling rate: keep up to 60 snapshots; compute rate from the oldest
# snapshot within the last 30 minutes for a smoothed estimate.
snapshots: list[dict] = cache.get("snapshots", [])
# Prune snapshots older than 30 minutes, but keep at least one
cutoff = now.timestamp() - 1800
recent = [s for s in snapshots if s.get("ts", 0) >= cutoff]
if not recent and snapshots:
recent = [snapshots[-1]] # always keep one for continuity
rate_per_sec: float | None = None
rate_window_str = ""
snap_delta_proc = 0
if recent:
oldest = recent[0]
dt = now.timestamp() - oldest["ts"]
dp = processed - oldest["proc"]
snap_delta_proc = dp
if dt >= 60 and dp > 0:
rate_per_sec = dp / dt
window_min = dt / 60
rate_window_str = f"{window_min:.0f}-min avg"
# One-time baseline after initial mosaic crawl finished (no pending rows).
if pending == 0 and "first_pass_mean_rate_per_hr" not in cache:
cache["first_pass_completed_at"] = now.isoformat()
cache["first_pass_processed"] = total
cache["first_pass_mean_rate_per_hr"] = FIRST_PASS_FALLBACK_RATE_PER_HR
first_pass_rate_hr = float(
cache.get("first_pass_mean_rate_per_hr", FIRST_PASS_FALLBACK_RATE_PER_HR)
)
live_rate_hr = rate_per_sec * 3600 if rate_per_sec else None
# Active scrape shows progress in snapshots; idle archive shows dp == 0.
retry_estimate_rate_hr = (
live_rate_hr if live_rate_hr is not None else first_pass_rate_hr
)
# --- Disk space ---
mean_bytes: float | None = None
size_note = ""
if downloaded_rows:
mean_bytes = _sample_mean_bytes(downloaded_rows, cache)
if mean_bytes and cache.get("sample_n"):
size_note = f"mean {_fmt_size(mean_bytes)} × {cache['sample_n']} sampled files"
dl_bytes: float | None = None
rem_bytes: float | None = None
if mean_bytes:
dl_bytes = downloaded * mean_bytes
rem_bytes = _expected_remaining(pending_rows) * mean_bytes
# Update cache: append new snapshot, keep last 60
recent.append({"ts": now.timestamp(), "proc": processed})
cache["snapshots"] = recent[-60:]
# Keep legacy keys for backwards compat
cache["timestamp"] = now.isoformat()
cache["processed"] = processed
try:
rate_cache_path.write_text(json.dumps(cache))
except Exception:
pass
rate_str = eta_str = ""
if rate_per_sec and rate_per_sec > 0:
rate_str = f"{rate_per_sec * 3600:,.0f} scans/hr ({rate_window_str})"
eta_str = _fmt_duration(pending / rate_per_sec)
# -----------------------------------------------------------------------
# Output — Markdown
# -----------------------------------------------------------------------
ts = datetime.now().strftime("%Y-%m-%d %H:%M")
print(f"# Mosaic download progress — {ts}\n")
print(f"**Archive:** `{archive.resolve()}` ")
meta_parts = []
if elapsed_str:
meta_parts.append(f"**Elapsed:** {elapsed_str}")
if rate_str:
meta_parts.append(f"**Rate:** {rate_str}")
if eta_str:
meta_parts.append(f"**ETA:** {eta_str}")
if meta_parts:
print(" ".join(meta_parts) + " ")
print()
# Summary table
summary_rows = [
["Downloaded", f"{downloaded:,}", f"{100*downloaded/total:.1f}%"],
["Failed", f"{failed:,}", f"{100*failed/total:.1f}%"],
["Skipped (disk=0)",f"{zero_skipped:,}", f"{100*zero_skipped/total:.1f}%"],
["Pending", f"{pending:,}", f"{100*pending/total:.1f}%"],
["**Total**", f"**{total:,}**", ""],
]
if attempted:
summary_rows.append(["**Success rate**", f"**{100*downloaded/attempted:.1f}%**", "*(of attempted)*"])
print(_md_table(["Metric", "Count", ""], summary_rows, align=["l", "r", "l"]))
print()
# Disk space
if dl_bytes is not None and rem_bytes is not None:
total_bytes = dl_bytes + rem_bytes
print(f"### Disk space\n")
print(f"_{size_note}_\n")
ds_rows = [
["Downloaded so far", _fmt_size(dl_bytes), ""],
["Estimated remaining", _fmt_size(rem_bytes), "*(model-based)*"],
["**Grand total**", f"**{_fmt_size(total_bytes)}**", ""],
]
print(_md_table(["", "Size", ""], ds_rows, align=["l", "r", "l"]))
print()
# Per-machine breakdown
print("### Per-machine breakdown\n")
machines = sorted(by_machine)
mc_rows = []
for m in machines:
mc = by_machine[m]
mt = sum(mc.values())
mc_rows.append([
m,
f"{mc['downloaded']:,}",
f"{mc['failed']:,}",
f"{mc['skipped_zero_disk_space']:,}",
f"{mc['skipped_metadata_only']:,}",
f"{mt:,}",
])
mc_rows.append([
"**TOTAL**",
f"**{downloaded:,}**",
f"**{failed:,}**",
f"**{zero_skipped:,}**",
f"**{pending:,}**",
f"**{total:,}**",
])
print(_md_table(
["Machine", "Done", "Failed", "Skip0", "Pending", "Total"],
mc_rows,
align=["l", "r", "r", "r", "r", "r"],
))
# -----------------------------------------------------------------------
# Retry estimates (first pass complete: pending == 0, failures remain)
# -----------------------------------------------------------------------
if failed > 0 and pending == 0:
failed_rows_list = [
r for r in latest.values()
if r.get("mosaic_download_status") == "failed"
]
n_all = len(failed_rows_list)
n_2023 = sum(
1 for r in failed_rows_list
if (r.get("scan_time") or "")[:4] >= "2023"
and len((r.get("scan_time") or "")[:4]) == 4
)
n_200 = sum(
1 for r in failed_rows_list
if r.get("mosaic_error_code") == "200"
)
rate_note = (
"rolling 30-min window"
if snap_delta_proc > 0
else f"first-pass baseline ({first_pass_rate_hr:,.0f}/hr)"
)
print()
print("### Mosaic retry estimates\n")
print(
f"*Suggested command after server fix:* "
f"`python scraper.py --retry-failed --workers 2` "
f"(filters: `--retry-since YEAR`, `--retry-error-code CODE`)*\n"
)
print(
f"*ETA column uses **{retry_estimate_rate_hr:,.0f} scans/hr** "
f"({rate_note}). Fixed columns use scenario rates.*\n"
)
est_hdr = (
"Retry scope",
"Count",
f"@{RETRY_OPTIMISTIC_RATE_PER_HR:.0f}/hr",
f"@{RETRY_REALISTIC_RATE_PER_HR:.0f}/hr",
f"@{RETRY_PESSIMISTIC_RATE_PER_HR:.0f}/hr",
f"@{retry_estimate_rate_hr:.0f}/hr",
)
retry_tbl_rows = [
[
"HTTP 200 (empty body)",
f"{n_200:,}",
_retry_hours_from_rate(n_200, RETRY_OPTIMISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_200, RETRY_REALISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_200, RETRY_PESSIMISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_200, retry_estimate_rate_hr),
],
[
"Failed, scan_time ≥ 2023",
f"{n_2023:,}",
_retry_hours_from_rate(n_2023, RETRY_OPTIMISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_2023, RETRY_REALISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_2023, RETRY_PESSIMISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_2023, retry_estimate_rate_hr),
],
[
"**All failed**",
f"**{n_all:,}**",
_retry_hours_from_rate(n_all, RETRY_OPTIMISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_all, RETRY_REALISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_all, RETRY_PESSIMISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_all, retry_estimate_rate_hr),
],
]
print(_md_table(list(est_hdr), retry_tbl_rows, align=["l", "r", "r", "r", "r", "r"]))
# -----------------------------------------------------------------------
# --by-year table
# -----------------------------------------------------------------------
if args.by_year:
print()
print("### Downloads by machine and year\n")
print("*Format: done / failed*\n")
# Only include years that have at least one downloaded or failed scan
all_years = sorted(
yr for yr in set(
yr for m_data in by_machine_year.values() for yr in m_data
)
if any(
by_machine_year[m].get(yr, Counter()).get("downloaded", 0)
+ by_machine_year[m].get(yr, Counter()).get("failed", 0) > 0
for m in by_machine_year
)
)
# Totals per year
yr_totals: dict[str, Counter] = {}
for yr in all_years:
yr_totals[yr] = Counter()
for m in machines:
yr_totals[yr] += by_machine_year.get(m, {}).get(yr, Counter())
year_rows = []
for m in machines:
row_cells = [m]
for yr in all_years:
c = by_machine_year.get(m, {}).get(yr, Counter())
d = c.get("downloaded", 0)
f = c.get("failed", 0)
row_cells.append(f"{d:,} / {f:,}" if (d or f) else "—")
year_rows.append(row_cells)
# Totals row
total_cells = ["**TOTAL**"]
for yr in all_years:
d = yr_totals[yr].get("downloaded", 0)
f = yr_totals[yr].get("failed", 0)
total_cells.append(f"**{d:,} / {f:,}**" if (d or f) else "—")
year_rows.append(total_cells)
print(_md_table(
["Machine"] + all_years,
year_rows,
align=["l"] + ["r"] * len(all_years),
))
if __name__ == "__main__":
main()