Files
SPRUCE-scraper/scripts/mosaic_progress_report.py

382 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Report mosaic download progress from archives/scans.csv.
Output is formatted as Markdown. Add --by-year for a per-machine ×
per-year breakdown table.
Rate/ETA require two calls at least 60 s apart. Mean mosaic size is
sampled from up to 100 already-downloaded files and cached for 1 hour.
Usage:
python scripts/mosaic_progress_report.py [--archive DIR] [--by-year]
"""
import argparse
import csv
import json
import os
import random
import sys
from collections import Counter
from datetime import datetime, timezone
from pathlib import Path
# Year-based viability model derived from BW1-4 observations:
# pre-2019 → kept on server long-term (~100 %)
# 2019-2022 → purged (~ 0 %)
# 2023+ → recent, mostly available (~82 %)
_R_PRE19 = 1.00
_R_PURGED = 0.00
_R_RECENT = 0.82
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _parse_dt(s: str) -> datetime | None:
try:
return datetime.fromisoformat(s.replace("Z", "+00:00"))
except Exception:
return None
def _fmt_duration(seconds: float) -> str:
if seconds < 0:
return "?"
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
if h >= 48:
return f"{h // 24}d {h % 24}h"
if h > 0:
return f"{h}h {m:02d}m"
return f"{m}m {int(seconds % 60):02d}s"
def _fmt_size(b: float) -> str:
if b >= 1e12:
return f"{b/1e12:.2f} TB"
if b >= 1e9:
return f"{b/1e9:.2f} GB"
if b >= 1e6:
return f"{b/1e6:.1f} MB"
return f"{b/1e3:.0f} KB"
def _md_table(headers: list[str], rows: list[list[str]], *, align: list[str] | None = None) -> str:
"""Render a Markdown table. align values: 'l', 'r', 'c' (default 'l')."""
if align is None:
align = ["l"] * len(headers)
sep_map = {"l": ":---", "r": "---:", "c": ":---:"}
def row_str(cells: list[str]) -> str:
return "| " + " | ".join(cells) + " |"
lines = [
row_str(headers),
row_str([sep_map.get(a, ":---") for a in align]),
]
for r in rows:
lines.append(row_str(r))
return "\n".join(lines)
def _sample_mean_bytes(rows: list[dict], cache: dict, max_sample: int = 100) -> float | None:
cached_mean = cache.get("mean_bytes")
cached_n = cache.get("sample_n", 0)
cached_ts = _parse_dt(cache.get("size_ts", ""))
now = datetime.now(timezone.utc)
if (
cached_mean and cached_ts
and (now - cached_ts).total_seconds() < 3600
and cached_n >= min(max_sample, len(rows))
):
return float(cached_mean)
sample = random.sample(rows, min(max_sample, len(rows)))
sizes = []
for row in sample:
p = row.get("mosaic_local_path", "")
if p:
try:
sz = os.path.getsize(p)
if sz > 0:
sizes.append(sz)
except OSError:
pass
if not sizes:
return None
mean = sum(sizes) / len(sizes)
cache["mean_bytes"] = mean
cache["sample_n"] = len(sizes)
cache["size_ts"] = now.isoformat()
return mean
def _expected_remaining(pending_rows: list[dict]) -> float:
count = 0.0
for row in pending_rows:
yr = row.get("scan_time", "")[:4]
if yr < "2019":
count += _R_PRE19
elif yr <= "2022":
count += _R_PURGED
else:
count += _R_RECENT
return count
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--archive", default="archives")
parser.add_argument(
"--by-year", action="store_true",
help="Add a per-machine × per-year done/failed breakdown table",
)
args = parser.parse_args()
archive = Path(args.archive)
scans_csv = archive / "scans.csv"
progress_json = archive / ".progress.json"
rate_cache_path = archive / ".mosaic_rate_cache.json"
if not scans_csv.exists():
sys.exit(f"scans.csv not found: {scans_csv}")
# --- Load & deduplicate (last row per machine+scan_id) ---
latest: dict[tuple[str, str], dict] = {}
with open(scans_csv, newline="", encoding="utf-8") as f:
for row in csv.DictReader(f):
key = (row.get("machine", ""), row.get("scan_id", ""))
latest[key] = row
by_machine: dict[str, Counter] = {}
# machine -> year -> Counter(status -> count)
by_machine_year: dict[str, dict[str, Counter]] = {}
total_counts: Counter = Counter()
downloaded_rows: list[dict] = []
pending_rows: list[dict] = []
for (_m, _sid), row in latest.items():
status = row.get("mosaic_download_status", "")
m = row.get("machine", "")
yr = (row.get("scan_time") or "")[:4] or "????"
by_machine.setdefault(m, Counter())[status] += 1
by_machine_year.setdefault(m, {}).setdefault(yr, Counter())[status] += 1
total_counts[status] += 1
if status == "downloaded":
downloaded_rows.append(row)
elif status == "skipped_metadata_only":
pending_rows.append(row)
total = sum(total_counts.values())
downloaded = total_counts["downloaded"]
failed = total_counts["failed"]
zero_skipped = total_counts["skipped_zero_disk_space"]
pending = total_counts["skipped_metadata_only"]
processed = downloaded + failed + zero_skipped
attempted = downloaded + failed
now = datetime.now(timezone.utc)
# --- Elapsed ---
elapsed_str = ""
if progress_json.exists():
try:
data = json.loads(progress_json.read_text())
started_at = _parse_dt(data.get("started_at", ""))
if started_at:
elapsed_str = _fmt_duration((now - started_at).total_seconds())
except Exception:
pass
# --- Rate cache ---
cache: dict = {}
if rate_cache_path.exists():
try:
cache = json.loads(rate_cache_path.read_text())
except Exception:
pass
# Rolling rate: keep up to 60 snapshots; compute rate from the oldest
# snapshot within the last 30 minutes for a smoothed estimate.
snapshots: list[dict] = cache.get("snapshots", [])
# Prune snapshots older than 30 minutes, but keep at least one
cutoff = now.timestamp() - 1800
recent = [s for s in snapshots if s.get("ts", 0) >= cutoff]
if not recent and snapshots:
recent = [snapshots[-1]] # always keep one for continuity
rate_per_sec: float | None = None
rate_window_str = ""
if recent:
oldest = recent[0]
dt = now.timestamp() - oldest["ts"]
dp = processed - oldest["proc"]
if dt >= 60 and dp > 0:
rate_per_sec = dp / dt
window_min = dt / 60
rate_window_str = f"{window_min:.0f}-min avg"
# --- Disk space ---
mean_bytes: float | None = None
size_note = ""
if downloaded_rows:
mean_bytes = _sample_mean_bytes(downloaded_rows, cache)
if mean_bytes and cache.get("sample_n"):
size_note = f"mean {_fmt_size(mean_bytes)} × {cache['sample_n']} sampled files"
dl_bytes: float | None = None
rem_bytes: float | None = None
if mean_bytes:
dl_bytes = downloaded * mean_bytes
rem_bytes = _expected_remaining(pending_rows) * mean_bytes
# Update cache: append new snapshot, keep last 60
recent.append({"ts": now.timestamp(), "proc": processed})
cache["snapshots"] = recent[-60:]
# Keep legacy keys for backwards compat
cache["timestamp"] = now.isoformat()
cache["processed"] = processed
try:
rate_cache_path.write_text(json.dumps(cache))
except Exception:
pass
rate_str = eta_str = ""
if rate_per_sec and rate_per_sec > 0:
rate_str = f"{rate_per_sec * 3600:,.0f} scans/hr ({rate_window_str})"
eta_str = _fmt_duration(pending / rate_per_sec)
# -----------------------------------------------------------------------
# Output — Markdown
# -----------------------------------------------------------------------
ts = datetime.now().strftime("%Y-%m-%d %H:%M")
print(f"# Mosaic download progress — {ts}\n")
print(f"**Archive:** `{archive.resolve()}` ")
meta_parts = []
if elapsed_str:
meta_parts.append(f"**Elapsed:** {elapsed_str}")
if rate_str:
meta_parts.append(f"**Rate:** {rate_str}")
if eta_str:
meta_parts.append(f"**ETA:** {eta_str}")
if meta_parts:
print(" ".join(meta_parts) + " ")
print()
# Summary table
summary_rows = [
["Downloaded", f"{downloaded:,}", f"{100*downloaded/total:.1f}%"],
["Failed", f"{failed:,}", f"{100*failed/total:.1f}%"],
["Skipped (disk=0)",f"{zero_skipped:,}", f"{100*zero_skipped/total:.1f}%"],
["Pending", f"{pending:,}", f"{100*pending/total:.1f}%"],
["**Total**", f"**{total:,}**", ""],
]
if attempted:
summary_rows.append(["**Success rate**", f"**{100*downloaded/attempted:.1f}%**", "*(of attempted)*"])
print(_md_table(["Metric", "Count", ""], summary_rows, align=["l", "r", "l"]))
print()
# Disk space
if dl_bytes is not None and rem_bytes is not None:
total_bytes = dl_bytes + rem_bytes
print(f"### Disk space\n")
print(f"_{size_note}_\n")
ds_rows = [
["Downloaded so far", _fmt_size(dl_bytes), ""],
["Estimated remaining", _fmt_size(rem_bytes), "*(model-based)*"],
["**Grand total**", f"**{_fmt_size(total_bytes)}**", ""],
]
print(_md_table(["", "Size", ""], ds_rows, align=["l", "r", "l"]))
print()
# Per-machine breakdown
print("### Per-machine breakdown\n")
machines = sorted(by_machine)
mc_rows = []
for m in machines:
mc = by_machine[m]
mt = sum(mc.values())
mc_rows.append([
m,
f"{mc['downloaded']:,}",
f"{mc['failed']:,}",
f"{mc['skipped_zero_disk_space']:,}",
f"{mc['skipped_metadata_only']:,}",
f"{mt:,}",
])
mc_rows.append([
"**TOTAL**",
f"**{downloaded:,}**",
f"**{failed:,}**",
f"**{zero_skipped:,}**",
f"**{pending:,}**",
f"**{total:,}**",
])
print(_md_table(
["Machine", "Done", "Failed", "Skip0", "Pending", "Total"],
mc_rows,
align=["l", "r", "r", "r", "r", "r"],
))
# -----------------------------------------------------------------------
# --by-year table
# -----------------------------------------------------------------------
if args.by_year:
print()
print("### Downloads by machine and year\n")
print("*Format: done / failed*\n")
# Only include years that have at least one downloaded or failed scan
all_years = sorted(
yr for yr in set(
yr for m_data in by_machine_year.values() for yr in m_data
)
if any(
by_machine_year[m].get(yr, Counter()).get("downloaded", 0)
+ by_machine_year[m].get(yr, Counter()).get("failed", 0) > 0
for m in by_machine_year
)
)
# Totals per year
yr_totals: dict[str, Counter] = {}
for yr in all_years:
yr_totals[yr] = Counter()
for m in machines:
yr_totals[yr] += by_machine_year.get(m, {}).get(yr, Counter())
year_rows = []
for m in machines:
row_cells = [m]
for yr in all_years:
c = by_machine_year.get(m, {}).get(yr, Counter())
d = c.get("downloaded", 0)
f = c.get("failed", 0)
row_cells.append(f"{d:,} / {f:,}" if (d or f) else "—")
year_rows.append(row_cells)
# Totals row
total_cells = ["**TOTAL**"]
for yr in all_years:
d = yr_totals[yr].get("downloaded", 0)
f = yr_totals[yr].get("failed", 0)
total_cells.append(f"**{d:,} / {f:,}**" if (d or f) else "—")
year_rows.append(total_cells)
print(_md_table(
["Machine"] + all_years,
year_rows,
align=["l"] + ["r"] * len(all_years),
))
if __name__ == "__main__":
main()