2 Commits

Author SHA1 Message Date
poprhythm 8593808cf3 Add --retry-failed mode and mosaic retry estimates to progress report
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 17:40:15 -04:00
poprhythm 6390f5d529 Scraping resilience, metadata tooling, and repository hygiene
Consolidates mosaic and session hardening (login retry, skip processed scans, no retry on 404, started_at), progress reporting (Markdown tables, by-year rollup, rolling-window rate/ETA), and metadata workflow scripts (run_metadata_scan.sh, scan_progress_report.py, export_machine_metadata.py). Adds mosaic reconstruction sample JPEGs referenced by the report. Updates .gitignore for backup/ and .claude/; sample_random_scans helper is documented for branch testing/sample-runs only (see README).
2026-05-14 19:52:53 -04:00
25 changed files with 1170 additions and 198 deletions
+3
View File
@@ -7,3 +7,6 @@ __pycache__/
.DS_Store .DS_Store
explore_dumps/ explore_dumps/
.venv/ .venv/
scripts/sync_to_nas.sh
backup/
.claude/
+2 -4
View File
@@ -97,10 +97,8 @@ python scraper.py --machine "BW3-20 [AMR-26]" --mosaic-only
# Download mosaics for all machines # Download mosaics for all machines
python scraper.py --mosaic-only python scraper.py --mosaic-only
# One random completed scan per machine: mosaic + all tiles (from machines.txt; uses --list-scans + --scan-id) # One random completed scan per machine (helper script): check out branch `testing/sample-runs`,
# MOSAIC_ONLY=1 ./scripts/sample_random_scans.sh machines.txt # optional: mosaics only, no tiles # then see `scripts/sample_random_scans.sh` and `docs/sample_random_scans_run_progress.md`.
# cp scripts/machines.example.txt machines.txt # then edit: one label per line
# ./scripts/sample_random_scans.sh machines.txt
# Download all tiles for a specific scan # Download all tiles for a specific scan
python scraper.py --machine "BW3-20 [AMR-26]" --scan-id 158374 --workers 4 python scraper.py --machine "BW3-20 [AMR-26]" --scan-id 158374 --workers 4
Binary file not shown.

After

Width:  |  Height:  |  Size: 53 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 13 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 41 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 37 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.6 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 50 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 58 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 15 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 52 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 218 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 50 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 62 KiB

+15
View File
@@ -0,0 +1,15 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
VENV="/tmp/spruce_venv"
if [[ ! -x "$VENV/bin/python" ]]; then
echo "Setting up venv at $VENV..."
python3 -m venv "$VENV"
"$VENV/bin/python" -m ensurepip --upgrade
"$VENV/bin/pip" install -q -r "$SCRIPT_DIR/requirements.txt"
fi
echo "Starting metadata-only scan of all machines..."
"$VENV/bin/python" "$SCRIPT_DIR/scraper.py" --metadata-only "$@"
+105
View File
@@ -0,0 +1,105 @@
#!/usr/bin/env python3
"""
Split scans.csv into per-machine metadata CSVs.
Reads the combined scans.csv produced by the scraper and writes one CSV per
machine containing only the website-sourced metadata columns (no mosaic paths,
download status, or error fields).
Usage:
python scripts/export_machine_metadata.py
python scripts/export_machine_metadata.py --input archives/scans.csv --output-dir archives/by_machine
"""
from __future__ import annotations
import argparse
import csv
import sys
from collections import defaultdict
from pathlib import Path
METADATA_COLUMNS = [
"machine",
"machine_id",
"scan_id",
"name",
"scan_time",
"start_x",
"start_y",
"end_x",
"end_y",
"dx",
"dy",
"nx",
"ny",
"total_tiles",
"scan_lines",
"scan_mode",
"start_datetime",
"end_datetime",
"status",
"user",
"disk_space_mb",
]
def sanitize_machine_label(label: str) -> str:
return label.replace("[", "").replace("]", "").replace(" ", "_").strip("_")
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Split scans.csv into per-machine metadata CSVs.")
p.add_argument(
"--input",
default="archives/scans.csv",
metavar="FILE",
help="Path to scans.csv (default: archives/scans.csv)",
)
p.add_argument(
"--output-dir",
default="archives/by_machine",
metavar="DIR",
help="Directory for output CSVs (default: archives/by_machine)",
)
return p.parse_args()
def main() -> None:
args = parse_args()
input_path = Path(args.input)
output_dir = Path(args.output_dir)
if not input_path.exists():
sys.exit(f"Input file not found: {input_path}")
with input_path.open(newline="") as fh:
reader = csv.DictReader(fh)
if reader.fieldnames is None:
sys.exit(f"{input_path} appears to be empty.")
missing = [c for c in METADATA_COLUMNS if c not in reader.fieldnames]
if missing:
sys.exit(f"Expected columns not found in {input_path}: {missing}")
rows_by_machine: dict[str, list[dict]] = defaultdict(list)
for row in reader:
rows_by_machine[row["machine"]].append(row)
output_dir.mkdir(parents=True, exist_ok=True)
for machine_label, rows in sorted(rows_by_machine.items()):
safe_name = sanitize_machine_label(machine_label)
out_path = output_dir / f"{safe_name}_scans_metadata.csv"
with out_path.open("w", newline="") as fh:
writer = csv.DictWriter(fh, fieldnames=METADATA_COLUMNS, extrasaction="ignore")
writer.writeheader()
writer.writerows(rows)
print(f" {out_path} ({len(rows)} rows)")
total = sum(len(r) for r in rows_by_machine.values())
print(f"\n{len(rows_by_machine)} machine(s), {total} total rows → {output_dir}/")
if __name__ == "__main__":
main()
+1 -1
View File
@@ -1,6 +1,6 @@
# All RootView minirhizotron machine labels (same set as `machine_metadata` in config.example.yaml). # All RootView minirhizotron machine labels (same set as `machine_metadata` in config.example.yaml).
# Copy to the repo root as machines.txt, or: cp scripts/machines.example.txt machines.txt # Copy to the repo root as machines.txt, or: cp scripts/machines.example.txt machines.txt
# sample_random_scans.sh: by default one random scan per line = mosaic + tiles; use MOSAIC_ONLY=1 for mosaics only # Random-sample helper `scripts/sample_random_scans.sh` lives on branch `testing/sample-runs` only.
BW1-4 [AMR-15] BW1-4 [AMR-15]
BW1-6 [AMR-19] BW1-6 [AMR-19]
BW1-7 [AMR-18] BW1-7 [AMR-18]
+481
View File
@@ -0,0 +1,481 @@
#!/usr/bin/env python3
"""
Report mosaic download progress from archives/scans.csv.
Output is Markdown. Use ``--by-year`` for a per-machine × per-year
done/failed table. When the first mosaic pass is complete (no pending rows)
but failures remain, a **Mosaic retry estimates** section is printed with
queue counts and duration hints.
Rate/ETA use a 30-minute rolling window when snapshots show progress.
Mean mosaic size is sampled from up to 100 downloads (1-hour cache).
Usage:
python scripts/mosaic_progress_report.py [--archive DIR] [--by-year]
"""
import argparse
import csv
import json
import os
import random
import sys
from collections import Counter
from datetime import datetime, timezone
from pathlib import Path
# Year-based viability model derived from BW1-4 observations:
# pre-2019 → kept on server long-term (~100 %)
# 2019-2022 → purged (~ 0 %)
# 2023+ → recent, mostly available (~82 %)
_R_PRE19 = 1.00
_R_PURGED = 0.00
_R_RECENT = 0.82
FIRST_PASS_FALLBACK_RATE_PER_HR = 1100.0
RETRY_OPTIMISTIC_RATE_PER_HR = 1800.0
RETRY_REALISTIC_RATE_PER_HR = 1100.0
RETRY_PESSIMISTIC_RATE_PER_HR = 300.0
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _parse_dt(s: str) -> datetime | None:
try:
return datetime.fromisoformat(s.replace("Z", "+00:00"))
except Exception:
return None
def _fmt_duration(seconds: float) -> str:
if seconds < 0:
return "?"
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
if h >= 48:
return f"{h // 24}d {h % 24}h"
if h > 0:
return f"{h}h {m:02d}m"
return f"{m}m {int(seconds % 60):02d}s"
def _fmt_size(b: float) -> str:
if b >= 1e12:
return f"{b/1e12:.2f} TB"
if b >= 1e9:
return f"{b/1e9:.2f} GB"
if b >= 1e6:
return f"{b/1e6:.1f} MB"
return f"{b/1e3:.0f} KB"
def _md_table(headers: list[str], rows: list[list[str]], *, align: list[str] | None = None) -> str:
"""Render a Markdown table. align values: 'l', 'r', 'c' (default 'l')."""
if align is None:
align = ["l"] * len(headers)
sep_map = {"l": ":---", "r": "---:", "c": ":---:"}
def row_str(cells: list[str]) -> str:
return "| " + " | ".join(cells) + " |"
lines = [
row_str(headers),
row_str([sep_map.get(a, ":---") for a in align]),
]
for r in rows:
lines.append(row_str(r))
return "\n".join(lines)
def _sample_mean_bytes(rows: list[dict], cache: dict, max_sample: int = 100) -> float | None:
cached_mean = cache.get("mean_bytes")
cached_n = cache.get("sample_n", 0)
cached_ts = _parse_dt(cache.get("size_ts", ""))
now = datetime.now(timezone.utc)
if (
cached_mean and cached_ts
and (now - cached_ts).total_seconds() < 3600
and cached_n >= min(max_sample, len(rows))
):
return float(cached_mean)
sample = random.sample(rows, min(max_sample, len(rows)))
sizes = []
for row in sample:
p = row.get("mosaic_local_path", "")
if p:
try:
sz = os.path.getsize(p)
if sz > 0:
sizes.append(sz)
except OSError:
pass
if not sizes:
return None
mean = sum(sizes) / len(sizes)
cache["mean_bytes"] = mean
cache["sample_n"] = len(sizes)
cache["size_ts"] = now.isoformat()
return mean
def _expected_remaining(pending_rows: list[dict]) -> float:
count = 0.0
for row in pending_rows:
yr = row.get("scan_time", "")[:4]
if yr < "2019":
count += _R_PRE19
elif yr <= "2022":
count += _R_PURGED
else:
count += _R_RECENT
return count
def _retry_hours_from_rate(n_scans: int, rate_per_hr: float) -> str:
if n_scans <= 0 or rate_per_hr <= 0:
return ""
return _fmt_duration(n_scans / rate_per_hr * 3600.0)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--archive", default="archives")
parser.add_argument(
"--by-year", action="store_true",
help="Add a per-machine × per-year done/failed breakdown table",
)
args = parser.parse_args()
archive = Path(args.archive)
scans_csv = archive / "scans.csv"
progress_json = archive / ".progress.json"
rate_cache_path = archive / ".mosaic_rate_cache.json"
if not scans_csv.exists():
sys.exit(f"scans.csv not found: {scans_csv}")
# --- Load & deduplicate (last row per machine+scan_id) ---
latest: dict[tuple[str, str], dict] = {}
with open(scans_csv, newline="", encoding="utf-8") as f:
for row in csv.DictReader(f):
key = (row.get("machine", ""), row.get("scan_id", ""))
latest[key] = row
by_machine: dict[str, Counter] = {}
# machine -> year -> Counter(status -> count)
by_machine_year: dict[str, dict[str, Counter]] = {}
total_counts: Counter = Counter()
downloaded_rows: list[dict] = []
pending_rows: list[dict] = []
for (_m, _sid), row in latest.items():
status = row.get("mosaic_download_status", "")
m = row.get("machine", "")
yr = (row.get("scan_time") or "")[:4] or "????"
by_machine.setdefault(m, Counter())[status] += 1
by_machine_year.setdefault(m, {}).setdefault(yr, Counter())[status] += 1
total_counts[status] += 1
if status == "downloaded":
downloaded_rows.append(row)
elif status == "skipped_metadata_only":
pending_rows.append(row)
total = sum(total_counts.values())
downloaded = total_counts["downloaded"]
failed = total_counts["failed"]
zero_skipped = total_counts["skipped_zero_disk_space"]
pending = total_counts["skipped_metadata_only"]
processed = downloaded + failed + zero_skipped
attempted = downloaded + failed
now = datetime.now(timezone.utc)
# --- Elapsed ---
elapsed_str = ""
if progress_json.exists():
try:
data = json.loads(progress_json.read_text())
started_at = _parse_dt(data.get("started_at", ""))
if started_at:
elapsed_str = _fmt_duration((now - started_at).total_seconds())
except Exception:
pass
# --- Rate cache ---
cache: dict = {}
if rate_cache_path.exists():
try:
cache = json.loads(rate_cache_path.read_text())
except Exception:
pass
# Rolling rate: keep up to 60 snapshots; compute rate from the oldest
# snapshot within the last 30 minutes for a smoothed estimate.
snapshots: list[dict] = cache.get("snapshots", [])
# Prune snapshots older than 30 minutes, but keep at least one
cutoff = now.timestamp() - 1800
recent = [s for s in snapshots if s.get("ts", 0) >= cutoff]
if not recent and snapshots:
recent = [snapshots[-1]] # always keep one for continuity
rate_per_sec: float | None = None
rate_window_str = ""
snap_delta_proc = 0
if recent:
oldest = recent[0]
dt = now.timestamp() - oldest["ts"]
dp = processed - oldest["proc"]
snap_delta_proc = dp
if dt >= 60 and dp > 0:
rate_per_sec = dp / dt
window_min = dt / 60
rate_window_str = f"{window_min:.0f}-min avg"
# One-time baseline after initial mosaic crawl finished (no pending rows).
if pending == 0 and "first_pass_mean_rate_per_hr" not in cache:
cache["first_pass_completed_at"] = now.isoformat()
cache["first_pass_processed"] = total
cache["first_pass_mean_rate_per_hr"] = FIRST_PASS_FALLBACK_RATE_PER_HR
first_pass_rate_hr = float(
cache.get("first_pass_mean_rate_per_hr", FIRST_PASS_FALLBACK_RATE_PER_HR)
)
live_rate_hr = rate_per_sec * 3600 if rate_per_sec else None
# Active scrape shows progress in snapshots; idle archive shows dp == 0.
retry_estimate_rate_hr = (
live_rate_hr if live_rate_hr is not None else first_pass_rate_hr
)
# --- Disk space ---
mean_bytes: float | None = None
size_note = ""
if downloaded_rows:
mean_bytes = _sample_mean_bytes(downloaded_rows, cache)
if mean_bytes and cache.get("sample_n"):
size_note = f"mean {_fmt_size(mean_bytes)} × {cache['sample_n']} sampled files"
dl_bytes: float | None = None
rem_bytes: float | None = None
if mean_bytes:
dl_bytes = downloaded * mean_bytes
rem_bytes = _expected_remaining(pending_rows) * mean_bytes
# Update cache: append new snapshot, keep last 60
recent.append({"ts": now.timestamp(), "proc": processed})
cache["snapshots"] = recent[-60:]
# Keep legacy keys for backwards compat
cache["timestamp"] = now.isoformat()
cache["processed"] = processed
try:
rate_cache_path.write_text(json.dumps(cache))
except Exception:
pass
rate_str = eta_str = ""
if rate_per_sec and rate_per_sec > 0:
rate_str = f"{rate_per_sec * 3600:,.0f} scans/hr ({rate_window_str})"
eta_str = _fmt_duration(pending / rate_per_sec)
# -----------------------------------------------------------------------
# Output — Markdown
# -----------------------------------------------------------------------
ts = datetime.now().strftime("%Y-%m-%d %H:%M")
print(f"# Mosaic download progress — {ts}\n")
print(f"**Archive:** `{archive.resolve()}` ")
meta_parts = []
if elapsed_str:
meta_parts.append(f"**Elapsed:** {elapsed_str}")
if rate_str:
meta_parts.append(f"**Rate:** {rate_str}")
if eta_str:
meta_parts.append(f"**ETA:** {eta_str}")
if meta_parts:
print(" ".join(meta_parts) + " ")
print()
# Summary table
summary_rows = [
["Downloaded", f"{downloaded:,}", f"{100*downloaded/total:.1f}%"],
["Failed", f"{failed:,}", f"{100*failed/total:.1f}%"],
["Skipped (disk=0)",f"{zero_skipped:,}", f"{100*zero_skipped/total:.1f}%"],
["Pending", f"{pending:,}", f"{100*pending/total:.1f}%"],
["**Total**", f"**{total:,}**", ""],
]
if attempted:
summary_rows.append(["**Success rate**", f"**{100*downloaded/attempted:.1f}%**", "*(of attempted)*"])
print(_md_table(["Metric", "Count", ""], summary_rows, align=["l", "r", "l"]))
print()
# Disk space
if dl_bytes is not None and rem_bytes is not None:
total_bytes = dl_bytes + rem_bytes
print(f"### Disk space\n")
print(f"_{size_note}_\n")
ds_rows = [
["Downloaded so far", _fmt_size(dl_bytes), ""],
["Estimated remaining", _fmt_size(rem_bytes), "*(model-based)*"],
["**Grand total**", f"**{_fmt_size(total_bytes)}**", ""],
]
print(_md_table(["", "Size", ""], ds_rows, align=["l", "r", "l"]))
print()
# Per-machine breakdown
print("### Per-machine breakdown\n")
machines = sorted(by_machine)
mc_rows = []
for m in machines:
mc = by_machine[m]
mt = sum(mc.values())
mc_rows.append([
m,
f"{mc['downloaded']:,}",
f"{mc['failed']:,}",
f"{mc['skipped_zero_disk_space']:,}",
f"{mc['skipped_metadata_only']:,}",
f"{mt:,}",
])
mc_rows.append([
"**TOTAL**",
f"**{downloaded:,}**",
f"**{failed:,}**",
f"**{zero_skipped:,}**",
f"**{pending:,}**",
f"**{total:,}**",
])
print(_md_table(
["Machine", "Done", "Failed", "Skip0", "Pending", "Total"],
mc_rows,
align=["l", "r", "r", "r", "r", "r"],
))
# -----------------------------------------------------------------------
# Retry estimates (first pass complete: pending == 0, failures remain)
# -----------------------------------------------------------------------
if failed > 0 and pending == 0:
failed_rows_list = [
r for r in latest.values()
if r.get("mosaic_download_status") == "failed"
]
n_all = len(failed_rows_list)
n_2023 = sum(
1 for r in failed_rows_list
if (r.get("scan_time") or "")[:4] >= "2023"
and len((r.get("scan_time") or "")[:4]) == 4
)
n_200 = sum(
1 for r in failed_rows_list
if r.get("mosaic_error_code") == "200"
)
rate_note = (
"rolling 30-min window"
if snap_delta_proc > 0
else f"first-pass baseline ({first_pass_rate_hr:,.0f}/hr)"
)
print()
print("### Mosaic retry estimates\n")
print(
f"*Suggested command after server fix:* "
f"`python scraper.py --retry-failed --workers 2` "
f"(filters: `--retry-since YEAR`, `--retry-error-code CODE`)*\n"
)
print(
f"*ETA column uses **{retry_estimate_rate_hr:,.0f} scans/hr** "
f"({rate_note}). Fixed columns use scenario rates.*\n"
)
est_hdr = (
"Retry scope",
"Count",
f"@{RETRY_OPTIMISTIC_RATE_PER_HR:.0f}/hr",
f"@{RETRY_REALISTIC_RATE_PER_HR:.0f}/hr",
f"@{RETRY_PESSIMISTIC_RATE_PER_HR:.0f}/hr",
f"@{retry_estimate_rate_hr:.0f}/hr",
)
retry_tbl_rows = [
[
"HTTP 200 (empty body)",
f"{n_200:,}",
_retry_hours_from_rate(n_200, RETRY_OPTIMISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_200, RETRY_REALISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_200, RETRY_PESSIMISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_200, retry_estimate_rate_hr),
],
[
"Failed, scan_time ≥ 2023",
f"{n_2023:,}",
_retry_hours_from_rate(n_2023, RETRY_OPTIMISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_2023, RETRY_REALISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_2023, RETRY_PESSIMISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_2023, retry_estimate_rate_hr),
],
[
"**All failed**",
f"**{n_all:,}**",
_retry_hours_from_rate(n_all, RETRY_OPTIMISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_all, RETRY_REALISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_all, RETRY_PESSIMISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_all, retry_estimate_rate_hr),
],
]
print(_md_table(list(est_hdr), retry_tbl_rows, align=["l", "r", "r", "r", "r", "r"]))
# -----------------------------------------------------------------------
# --by-year table
# -----------------------------------------------------------------------
if args.by_year:
print()
print("### Downloads by machine and year\n")
print("*Format: done / failed*\n")
# Only include years that have at least one downloaded or failed scan
all_years = sorted(
yr for yr in set(
yr for m_data in by_machine_year.values() for yr in m_data
)
if any(
by_machine_year[m].get(yr, Counter()).get("downloaded", 0)
+ by_machine_year[m].get(yr, Counter()).get("failed", 0) > 0
for m in by_machine_year
)
)
# Totals per year
yr_totals: dict[str, Counter] = {}
for yr in all_years:
yr_totals[yr] = Counter()
for m in machines:
yr_totals[yr] += by_machine_year.get(m, {}).get(yr, Counter())
year_rows = []
for m in machines:
row_cells = [m]
for yr in all_years:
c = by_machine_year.get(m, {}).get(yr, Counter())
d = c.get("downloaded", 0)
f = c.get("failed", 0)
row_cells.append(f"{d:,} / {f:,}" if (d or f) else "")
year_rows.append(row_cells)
# Totals row
total_cells = ["**TOTAL**"]
for yr in all_years:
d = yr_totals[yr].get("downloaded", 0)
f = yr_totals[yr].get("failed", 0)
total_cells.append(f"**{d:,} / {f:,}**" if (d or f) else "")
year_rows.append(total_cells)
print(_md_table(
["Machine"] + all_years,
year_rows,
align=["l"] + ["r"] * len(all_years),
))
if __name__ == "__main__":
main()
-178
View File
@@ -1,178 +0,0 @@
#!/usr/bin/env bash
# For each machine label in a text file, pick one random completed scan and download
# it: by default the mosaic and all tiles (same as: --machine "…" --scan-id N).
# For mosaic only (faster, no tile downloads), set: MOSAIC_ONLY=1
#
# Usage:
# ./scripts/sample_random_scans.sh [PATH_TO_machines.txt]
# Config path defaults to config.yaml in the repo root. Override with:
# CONFIG=/path/to/config.yaml ./scripts/sample_random_scans.sh machines.txt
# Dry-run the download step (listing still does real HTTP to fetch scan list):
# DRY_RUN=1 ./scripts/sample_random_scans.sh machines.txt
# Verbose / debug (extra per-step lines, scan counts from the list step):
# DEBUG=1 ./scripts/sample_random_scans.sh machines.txt
# By default, --list-scans fetches only the first page (one HTTP request, up to
# 320 scans). To paginate the full archive for the random pick (slower when many
# LIST_SCANS_ALL_PAGES=1 ./scripts/sample_random_scans.sh machines.txt
#
# machines.txt: one machine label per line (same as --machine and config machine names).
# See scripts/machines.example.txt
set -euo pipefail
REPO_ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
CONFIG="${CONFIG:-$REPO_ROOT/config.yaml}"
MACHINES_FILE="${1:-$REPO_ROOT/machines.txt}"
SCRAPER=(python3 "$REPO_ROOT/scraper.py" --config "$CONFIG")
log() { echo "[sample_random_scans] $*" >&2; }
log_debug() {
if [[ -n "${DEBUG:-}" ]]; then
echo "[sample_random_scans] debug: $*" >&2
fi
}
if [[ ! -f "$MACHINES_FILE" ]]; then
log "error: file not found: $MACHINES_FILE"
log "Create it with one machine label per line, or: cp scripts/machines.example.txt machines.txt"
exit 1
fi
if [[ ! -f "$CONFIG" ]]; then
log "error: config not found: $CONFIG"
exit 1
fi
# Non-empty, non-comment lines (same rules as the main loop)
TOTAL_MACHINES="$(
grep -v '^[[:space:]]*#' "$MACHINES_FILE" | grep -c -v '^[[:space:]]*$' || true
)"
if [[ -z "$TOTAL_MACHINES" || "$TOTAL_MACHINES" -eq 0 ]]; then
log "error: no machine lines in: $MACHINES_FILE"
exit 1
fi
log "starting repo=$REPO_ROOT"
log " config=$CONFIG"
log " machines_file=$MACHINES_FILE (${TOTAL_MACHINES} machine(s) in file)"
if [[ -n "${MOSAIC_ONLY:-}" ]]; then
if [[ -n "${DRY_RUN:-}" ]]; then
log " mode: MOSAIC_ONLY + DRY_RUN (mosaic only, --dry-run on download step)"
else
log " mode: MOSAIC_ONLY=1 (mosaics only, no tiles; use for a lighter sample)"
fi
else
if [[ -n "${DRY_RUN:-}" ]]; then
log " mode: DRY_RUN (list + full scan download use --dry-run; no files written)"
else
log " mode: full scan — mosaic + all tiles (workers from config)"
fi
fi
if [[ -n "${DEBUG:-}" ]]; then
log " DEBUG=1 (extra diagnostics enabled)"
fi
if [[ -n "${LIST_SCANS_ALL_PAGES:-}" ]]; then
log " list step: list-scans = full archive (all pages, slower)"
else
log " list step: list-scans --list-scans-first-page-only (one page, up to 320 IDs)"
fi
log "────────────────────────────────────────"
export REPO_ROOT CONFIG
[[ -n "${DEBUG:-}" ]] && export DEBUG
[[ -n "${LIST_SCANS_ALL_PAGES:-}" ]] && export LIST_SCANS_ALL_PAGES
PROCESSED=0
SKIPPED=0
IDX=0
while IFS= read -r line || [[ -n "${line-}" ]]; do
# trim, strip CR, skip blanks / comments
line="${line//$'\r'/}"
label="${line#"${line%%[![:space:]]*}"}"
label="${label%"${label##*[![:space:]]}"}"
[[ -z "$label" || "$label" == \#* ]] && continue
IDX=$((IDX + 1))
log "[$IDX/$TOTAL_MACHINES] machine: $label"
log " status: listing scans (--list-scans) …"
random_id="$(
REPO_ROOT="$REPO_ROOT" CONFIG="$CONFIG" LABEL="$label" python3 - <<'PY'
import os, random, subprocess, sys
label = os.environ["LABEL"]
repo = os.environ["REPO_ROOT"]
cfg = os.environ["CONFIG"]
debug = bool(os.environ.get("DEBUG"))
full = bool(os.environ.get("LIST_SCANS_ALL_PAGES"))
scraper = os.path.join(repo, "scraper.py")
if debug:
print(
f"[sample_random_scans] debug: running list-scans for {label!r} "
f"({'all pages' if full else 'first page only'})",
file=sys.stderr,
)
cmd = [sys.executable, scraper, "--list-scans", "--machine", label, "--config", cfg]
if not full:
cmd.insert(3, "--list-scans-first-page-only")
out = subprocess.check_output(
cmd,
text=True,
stderr=subprocess.STDOUT,
)
ids = []
for line in out.splitlines():
line = line.rstrip()
if not line or line.startswith("---") or "Total" in line:
continue
parts = line.split()
if parts and parts[0].isdigit():
ids.append(parts[0])
if not ids:
print(f"no scans parsed for {label!r} — check login and output", file=sys.stderr)
sys.exit(1)
if debug:
print(
f"[sample_random_scans] debug: parsed {len(ids)} scan id(s) for {label!r}",
file=sys.stderr,
)
print(random.choice(ids), end="")
PY
)" || {
log " status: SKIPPED (could not get scan list or pick id)"
SKIPPED=$((SKIPPED + 1))
continue
}
log " status: picked random scan_id=$random_id (uniform among IDs from this list step — first page by default, see start banner)"
if [[ -n "${MOSAIC_ONLY:-}" ]]; then
log " status: running scraper: --mosaic-only --scan-id (mosaic only) …"
else
log " status: running scraper: --scan-id (mosaic + tiles) …"
fi
if [[ -n "${DRY_RUN:-}" ]]; then
log " status: (dry-run — no files written for this scan)"
fi
if [[ -n "${MOSAIC_ONLY:-}" ]]; then
run_cmd=("${SCRAPER[@]}" --mosaic-only --machine "$label" --scan-id "$random_id")
else
run_cmd=("${SCRAPER[@]}" --machine "$label" --scan-id "$random_id")
fi
if [[ -n "${DRY_RUN:-}" ]]; then
run_cmd+=(--dry-run)
fi
if "${run_cmd[@]}"; then
log " status: OK — finished this machine (exit 0)"
PROCESSED=$((PROCESSED + 1))
else
rc=$?
log " status: FAILED — scraper exit code $rc (stopping; fix or remove this machine and re-run)"
exit "$rc"
fi
log "────────────────────────────────────────"
done < "$MACHINES_FILE"
log "done. summary: $PROCESSED machine(s) with sampled scan download completed, $SKIPPED skipped, $IDX line(s) processed out of $TOTAL_MACHINES in file."
exit 0
+218
View File
@@ -0,0 +1,218 @@
#!/usr/bin/env python3
"""
Report metadata-scan progress and projected completion times for all machines.
Usage:
python scripts/scan_progress_report.py [--archive ARCHIVE_DIR] [--recent N] [--mermaid] [--rate-chart]
Options:
--archive DIR Path to archives directory (default: archives)
--recent N Number of recent files used to compute current rate (default: 500)
--mermaid Also print a Mermaid Gantt chart
--rate-chart Also print a Mermaid XY chart of s/scan rate by hour
"""
import argparse
import glob
import os
import re
import sys
from collections import defaultdict
from datetime import datetime, timedelta
from pathlib import Path
# Canonical machine order and total scan counts (from README inventory, April 2026)
MACHINES = [
("BW1-4 [AMR-15]", 6121),
("BW1-6 [AMR-19]", 18198),
("BW1-7 [AMR-18]", 430),
("BW2-8 [AMR-25]", 8191),
("BW2-10 [AMR-22]", 16537),
("BW2-11 [AMR-23]", 26763),
("BW2-13 [AMR-24]", 13537),
("BW3-16 [AMR-16]", 7325),
("BW3-17 [AMR-20]", 471),
("BW3-19 [AMR-21]", 15186),
("BW3-20 [AMR-26]", 23052),
("BW3-21 [AMR-17]", 10115),
]
TOTAL_SCANS = sum(t for _, t in MACHINES)
def dir_name(label: str) -> str:
return re.sub(r"[^\w\-.]", "_", label).strip("_")
def get_timestamps(machine_dir: Path) -> list[float]:
files = glob.glob(str(machine_dir / "**" / "metadata.json"), recursive=True)
return sorted(os.path.getmtime(f) for f in files)
def print_rate_chart(all_timestamps: list[float]) -> None:
"""Print a Mermaid xychart-beta of average s/scan per hour."""
# One avg rate per hour
bins: dict[str, list[float]] = defaultdict(list)
start_hour: datetime | None = None
for i in range(1, len(all_timestamps)):
gap = all_timestamps[i] - all_timestamps[i - 1]
if gap < 300: # ignore inter-machine gaps
dt = datetime.fromtimestamp(all_timestamps[i])
hour_key = dt.strftime("%m-%d %Hh")
bins[hour_key].append(gap)
if start_hour is None:
start_hour = dt.replace(minute=0, second=0, microsecond=0)
# Drop the last (partial) hour
hours = sorted(bins.keys())
if hours:
hours = hours[:-1]
if not hours or start_hour is None:
print("(not enough data for rate chart)")
return
values = [f"{sum(bins[h])/len(bins[h]):.2f}" for h in hours]
y_max = max(float(v) for v in values)
y_ceil = int(y_max) + 3
n = len(hours)
# Numeric x-axis: Mermaid auto-picks readable tick positions
start_label = start_hour.strftime("%b %d %H:%M")
print("```mermaid")
print("xychart-beta")
print(f' title "Metadata scan rate (s/scan) — hourly, starting {start_label}"')
print(f' x-axis "Hours elapsed" 0 --> {n}')
print(f' y-axis "s / scan" 0 --> {y_ceil}')
print(f" line [{', '.join(values)}]")
print("```")
def fmt_dt(dt: datetime) -> str:
return dt.strftime("%a %b %d %H:%M")
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--archive", default="archives", help="Archives directory")
parser.add_argument("--recent", type=int, default=500,
help="Files used to compute recent rate (default: 500)")
parser.add_argument("--mermaid", action="store_true", help="Print Mermaid Gantt chart")
parser.add_argument("--rate-chart", action="store_true", help="Print Mermaid XY rate-over-time chart")
args = parser.parse_args()
archive = Path(args.archive)
if not archive.is_dir():
sys.exit(f"Archive directory not found: {archive}")
now = datetime.now()
# --- Gather per-machine data ---
machine_data = [] # (label, total, done, first_ts, last_ts)
all_timestamps: list[float] = []
for label, total in MACHINES:
mdir = archive / dir_name(label)
if mdir.is_dir():
times = get_timestamps(mdir)
else:
times = []
done = len(times)
first_ts = datetime.fromtimestamp(times[0]) if times else None
last_ts = datetime.fromtimestamp(times[-1]) if times else None
machine_data.append((label, total, done, first_ts, last_ts, times))
all_timestamps.extend(times)
all_timestamps.sort()
total_done = sum(d for _, _, d, *_ in machine_data)
# --- Rate calculation ---
recent_times = all_timestamps[-args.recent:] if len(all_timestamps) >= 2 else all_timestamps
if len(recent_times) >= 2:
recent_rate = (recent_times[-1] - recent_times[0]) / len(recent_times)
else:
recent_rate = None
if len(all_timestamps) >= 2:
overall_rate = (all_timestamps[-1] - all_timestamps[0]) / len(all_timestamps)
else:
overall_rate = None
rate = recent_rate or overall_rate or 5.0 # fallback
# --- Print timetable ---
print(f"Metadata scan progress — {now.strftime('%Y-%m-%d %H:%M')}")
print(f"Overall rate : {overall_rate:.2f} s/scan" if overall_rate else "Overall rate : n/a")
print(f"Recent rate : {recent_rate:.2f} s/scan (last {args.recent} files)" if recent_rate else "Recent rate : n/a")
print(f"Rate used : {rate:.2f} s/scan")
print(f"Done : {total_done:,} / {TOTAL_SCANS:,} ({100*total_done/TOTAL_SCANS:.1f}%)")
print()
print(f"{'Machine':<20} {'Total':>7} {'Done':>7} {'Pct':>6} {'Completion'}")
print("-" * 68)
cursor = now
gantt_rows: list[tuple[str, datetime, datetime, str]] = [] # label, start, end, status
for label, total, done, first_ts, last_ts, times in machine_data:
pct = 100 * done / total if total else 0
complete = done >= total or (done > 0 and done / total >= 0.999)
if done == 0:
# Not started yet
start = cursor
finish = cursor + timedelta(seconds=total * rate)
status = "pending"
print(f"{label:<20} {total:>7,} {'':>7} {'':>5} {fmt_dt(finish)}")
elif complete:
# Complete — use actual timestamps
start = first_ts
finish = last_ts
status = "done"
print(f"{label:<20} {total:>7,} {done:>7,} {pct:>5.1f}% complete")
else:
# In progress
remaining = total - done
start = first_ts
finish = cursor + timedelta(seconds=remaining * rate)
status = "active"
print(f"{label:<20} {total:>7,} {done:>7,} {pct:>5.1f}% {fmt_dt(finish)} ← in progress")
gantt_rows.append((label, start or cursor, finish, status))
if status != "done":
cursor = finish
print("-" * 68)
print(f"{'All done':<20} {TOTAL_SCANS:>7,} {total_done:>7,} {100*total_done/TOTAL_SCANS:>5.1f}% {fmt_dt(cursor)}")
# --- Mermaid rate chart ---
if args.rate_chart:
print()
print_rate_chart(all_timestamps)
# --- Mermaid Gantt ---
if args.mermaid:
print()
print("```mermaid")
print("gantt")
print(" title Metadata scan progress — all 12 machines")
print(" dateFormat YYYY-MM-DD HH:mm")
print(" axisFormat %b %d")
print()
section = None
for label, start, finish, status in gantt_rows:
prefix = label.split()[0][:3] # BW1, BW2, BW3
if prefix != section:
section = prefix
print(f" section {section}")
safe = label.replace("[", "").replace("]", "").replace(" ", "-")
tag = f"done, " if status == "done" else ("active, " if status == "active" else "")
s = start.strftime("%Y-%m-%d %H:%M")
e = finish.strftime("%Y-%m-%d %H:%M")
print(f" {label} :{tag}{safe}, {s}, {e}")
print("```")
if __name__ == "__main__":
main()
+46
View File
@@ -84,6 +84,33 @@ def parse_args() -> argparse.Namespace:
"inventorying all scans across all machines." "inventorying all scans across all machines."
), ),
) )
p.add_argument(
"--retry-failed",
action="store_true",
help=(
"Mosaic-only: re-attempt scans whose latest scans.csv row has "
"mosaic_download_status=failed (queue from CSV, not the server list). "
"Implies --mosaic-only."
),
)
p.add_argument(
"--retry-since",
metavar="YEAR",
default=None,
help=(
"With --retry-failed: only scans with scan_time year >= YEAR "
"(e.g. 2023)."
),
)
p.add_argument(
"--retry-error-code",
metavar="CODE",
default=None,
help=(
"With --retry-failed: filter by mosaic_error_code "
"(e.g. 200 for empty-body failures)."
),
)
p.add_argument( p.add_argument(
"--dry-run", "--dry-run",
action="store_true", action="store_true",
@@ -159,6 +186,16 @@ def main() -> None:
if args.scan_id is not None and args.scan_id <= 0: if args.scan_id is not None and args.scan_id <= 0:
sys.exit("--scan-id must be a positive integer") sys.exit("--scan-id must be a positive integer")
if args.retry_since and not args.retry_failed:
sys.exit("--retry-since requires --retry-failed.")
if args.retry_error_code and not args.retry_failed:
sys.exit("--retry-error-code requires --retry-failed.")
if args.retry_failed:
if args.metadata_only:
sys.exit("--retry-failed cannot be used with --metadata-only.")
args.mosaic_only = True # implied
# --list-machines doesn't need credentials # --list-machines doesn't need credentials
if args.list_machines: if args.list_machines:
base_url = "http://205.149.147.131:8010/" base_url = "http://205.149.147.131:8010/"
@@ -261,6 +298,12 @@ def main() -> None:
if args.metadata_only: if args.metadata_only:
log.info("Mode: metadata only (mosaics and tiles skipped)") log.info("Mode: metadata only (mosaics and tiles skipped)")
elif args.mosaic_only: elif args.mosaic_only:
if args.retry_failed:
log.info(
"Mode: mosaic retry (failed scans from %s)",
SCANS_CSV_FILENAME,
)
else:
log.info("Mode: mosaics only (individual tiles skipped)") log.info("Mode: mosaics only (individual tiles skipped)")
if args.dry_run: if args.dry_run:
log.info("Mode: dry-run (no files will be written)") log.info("Mode: dry-run (no files will be written)")
@@ -285,6 +328,9 @@ def main() -> None:
metadata_only=args.metadata_only, metadata_only=args.metadata_only,
scan_id_filter=args.scan_id, scan_id_filter=args.scan_id,
max_tiles=args.max_tiles, max_tiles=args.max_tiles,
retry_failed=args.retry_failed,
retry_since_year=args.retry_since,
retry_error_code=args.retry_error_code,
) )
totals.merge(stats) totals.merge(stats)
finally: finally:
+150 -11
View File
@@ -2,14 +2,22 @@
High-level scrape orchestration: drives the per-machine and per-scan loops. High-level scrape orchestration: drives the per-machine and per-scan loops.
""" """
import csv
import json import json
import logging import logging
import time
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
from tqdm import tqdm
from spruce.download_result import PERMANENT_MISSING, UNKNOWN, error_code_str from spruce.download_result import PERMANENT_MISSING, UNKNOWN, error_code_str
from spruce.exif import write_mosaic_exif
from spruce.paths import machine_dir_name, tile_dest, mosaic_dest, _extract_date
from spruce.progress import ProgressTracker, CsvWriter
from spruce.session import MachineSession
# RootView returns ~43-byte 1×1 JPEG placeholders for empty cells; stay well # RootView returns ~43-byte 1×1 JPEG placeholders for empty cells; stay well
# below smallest observed real tile (~7 KiB in production samples). # below smallest observed real tile (~7 KiB in production samples).
@@ -49,16 +57,64 @@ class RunStats:
self.scans_probe_skipped += other.scans_probe_skipped self.scans_probe_skipped += other.scans_probe_skipped
self.scans_disk_space_skipped += other.scans_disk_space_skipped self.scans_disk_space_skipped += other.scans_disk_space_skipped
from tqdm import tqdm
from spruce.exif import write_mosaic_exif
from spruce.paths import machine_dir_name, tile_dest, mosaic_dest, _extract_date
from spruce.progress import ProgressTracker, CsvWriter
from spruce.session import MachineSession
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def _read_scans_csv_latest(scans_csv_path: Path) -> dict[tuple[str, str], dict[str, str]]:
"""Last row wins per (machine, scan_id)."""
latest: dict[tuple[str, str], dict[str, str]] = {}
if not scans_csv_path.exists():
return latest
with open(scans_csv_path, newline="", encoding="utf-8") as fh:
for row in csv.DictReader(fh):
key = (row.get("machine", ""), row.get("scan_id", ""))
latest[key] = row
return latest
def load_failed_scans_from_csv(
scans_csv_path: Path,
machine_label: str,
*,
since_year: str | None = None,
error_code: str | None = None,
) -> list[dict[str, Any]]:
"""
Dedupe scans.csv by (machine, scan_id); return failed mosaic rows for one machine.
Each dict is suitable as the ``scan`` argument to ``process_scan`` (scan_id,
scan_time, name, status).
"""
latest = _read_scans_csv_latest(scans_csv_path)
out: list[dict[str, Any]] = []
for (_m, _sid), row in latest.items():
if row.get("machine") != machine_label:
continue
if row.get("mosaic_download_status") != "failed":
continue
if error_code is not None and row.get("mosaic_error_code", "") != error_code:
continue
st = row.get("scan_time", "") or ""
if since_year is not None:
yr = st[:4]
if len(yr) < 4 or yr < since_year:
continue
sid = int(row["scan_id"])
out.append(
{
"scan_id": sid,
"scan_time": st,
"name": row.get("name", ""),
"status": row.get("status", "") or "Completed",
"user": row.get("user", ""),
"scan_lines": row.get("scan_lines", ""),
"scan_mode": row.get("scan_mode", ""),
}
)
out.sort(key=lambda s: s["scan_id"])
return out
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Per-scan helpers # Per-scan helpers
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -241,6 +297,7 @@ def process_scan(
mosaic_only: bool, mosaic_only: bool,
metadata_only: bool = False, metadata_only: bool = False,
max_tiles: int | None = None, max_tiles: int | None = None,
scans_csv_existing_ids: set[int] | None = None,
) -> RunStats: ) -> RunStats:
""" """
Process one scan: fetch metadata, download mosaic and (optionally) tiles. Process one scan: fetch metadata, download mosaic and (optionally) tiles.
@@ -379,9 +436,16 @@ def process_scan(
mds, mer, mco, mcl = "", "", "", "" mds, mer, mco, mcl = "", "", "", ""
# Write scan-level CSV row only if this scan hasn't been recorded before. # Write scan-level CSV row only if this scan hasn't been recorded before.
if mosaic_already_done and not metadata_only: # Skip if: (1) mosaic URL already in .progress.json, or (2) scan already
# has a non-pending row in scans.csv from a prior run.
already_recorded = (mosaic_already_done and not metadata_only) or (
not metadata_only
and scans_csv_existing_ids is not None
and scan_id in scans_csv_existing_ids
)
if already_recorded:
log.debug( log.debug(
"[%s] Scan %d: already in scans.csv (mosaic was previously downloaded), skipping CSV row.", "[%s] Scan %d: already in scans.csv, skipping CSV row.",
machine["label"], machine["label"],
scan_id, scan_id,
) )
@@ -491,14 +555,59 @@ def scrape_machine(
metadata_only: bool = False, metadata_only: bool = False,
scan_id_filter: int | None = None, scan_id_filter: int | None = None,
max_tiles: int | None = None, max_tiles: int | None = None,
retry_failed: bool = False,
retry_since_year: str | None = None,
retry_error_code: str | None = None,
) -> RunStats: ) -> RunStats:
"""Login, fetch scans, and download all content for one machine.""" """Login, fetch scans, and download all content for one machine."""
sess = MachineSession(machine, config) sess = MachineSession(machine, config)
if not sess.login(): login_ok = False
for attempt in range(1, 4):
if sess.login():
login_ok = True
break
if attempt < 3:
log.warning(
"[%s] Login failed (attempt %d/3) — retrying in 10s.",
machine["label"],
attempt,
)
time.sleep(10)
if not login_ok:
log.error("[%s] Login failed after 3 attempts — skipping machine.", machine["label"])
return RunStats() return RunStats()
if retry_failed:
scans = load_failed_scans_from_csv(
scans_csv.path,
machine["label"],
since_year=retry_since_year,
error_code=retry_error_code,
)
if scan_id_filter is not None: if scan_id_filter is not None:
scans: list[dict[str, Any]] = [ scans = [s for s in scans if s["scan_id"] == scan_id_filter]
if not scans:
log.warning(
"[%s] Retry: scan_id %d not among failed rows for this machine.",
machine["label"],
scan_id_filter,
)
return RunStats()
log.info("[%s] Mosaic retry: single scan %d.", machine["label"], scan_id_filter)
elif not scans:
log.warning(
"[%s] No failed mosaic rows in scans.csv match retry filters.",
machine["label"],
)
return RunStats()
else:
log.info(
"[%s] Mosaic retry: %d failed scan(s) from scans.csv.",
machine["label"],
len(scans),
)
elif scan_id_filter is not None:
scans = [
{"scan_id": scan_id_filter, "status": "Completed"} {"scan_id": scan_id_filter, "status": "Completed"}
] ]
log.info("[%s] Targeting scan ID %d.", machine["label"], scan_id_filter) log.info("[%s] Targeting scan ID %d.", machine["label"], scan_id_filter)
@@ -508,8 +617,37 @@ def scrape_machine(
log.warning("[%s] No scans found.", machine["label"]) log.warning("[%s] No scans found.", machine["label"])
return RunStats() return RunStats()
# Build existing_ids: scan_ids to skip entirely (no metadata fetch, no HTTP).
# In normal mode: skip anything with a definitive non-pending status.
# In retry mode: only skip scans that are already downloaded or skipped for
# disk-space reasons — failed scans must be re-attempted.
PENDING_STATUSES = {"skipped_metadata_only", ""}
BLOCK_AFTER_RETRY_STATUSES = {"downloaded", "skipped_zero_disk_space"}
existing_ids: set[int] = set()
if not metadata_only:
latest_rows = _read_scans_csv_latest(scans_csv.path)
for (_mlabel, _sid), _row in latest_rows.items():
if _mlabel != machine["label"]:
continue
st = _row.get("mosaic_download_status", "")
if retry_failed:
if st in BLOCK_AFTER_RETRY_STATUSES:
existing_ids.add(int(_row["scan_id"]))
else:
if st not in PENDING_STATUSES:
existing_ids.add(int(_row["scan_id"]))
stats = RunStats() stats = RunStats()
for scan in scans: for scan in scans:
# Skip scans already fully processed in a prior run — avoids redundant
# metadata fetches and mosaic requests for known-failed / known-done scans.
if not metadata_only and scan["scan_id"] in existing_ids:
log.debug(
"[%s] Scan %d: already processed, skipping.",
machine["label"],
scan["scan_id"],
)
continue
stats.merge(process_scan( stats.merge(process_scan(
sess=sess, sess=sess,
scan=scan, scan=scan,
@@ -523,5 +661,6 @@ def scrape_machine(
mosaic_only=mosaic_only, mosaic_only=mosaic_only,
metadata_only=metadata_only, metadata_only=metadata_only,
max_tiles=max_tiles, max_tiles=max_tiles,
scans_csv_existing_ids=existing_ids,
)) ))
return stats return stats
+9 -1
View File
@@ -5,6 +5,7 @@ Progress tracking (JSON) and CSV writing.
import csv import csv
import json import json
import logging import logging
from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from typing import Iterator from typing import Iterator
@@ -27,6 +28,7 @@ class ProgressTracker:
def __init__(self, path: Path) -> None: def __init__(self, path: Path) -> None:
self.path = path self.path = path
self._done: set[str] = set() self._done: set[str] = set()
self.started_at: str = datetime.now(timezone.utc).isoformat()
self._load() self._load()
def _load(self) -> None: def _load(self) -> None:
@@ -34,6 +36,8 @@ class ProgressTracker:
try: try:
data = json.loads(self.path.read_text()) data = json.loads(self.path.read_text())
self._done = set(data.get("completed_urls", [])) self._done = set(data.get("completed_urls", []))
if "started_at" in data:
self.started_at = data["started_at"]
log.info("Resuming: %d URLs already downloaded.", len(self._done)) log.info("Resuming: %d URLs already downloaded.", len(self._done))
except Exception: except Exception:
log.warning("Could not read progress file; starting fresh.") log.warning("Could not read progress file; starting fresh.")
@@ -59,7 +63,10 @@ class ProgressTracker:
self.path.parent.mkdir(parents=True, exist_ok=True) self.path.parent.mkdir(parents=True, exist_ok=True)
tmp = self.path.with_suffix(".json.tmp") tmp = self.path.with_suffix(".json.tmp")
tmp.write_text( tmp.write_text(
json.dumps({"completed_urls": sorted(self._done)}, indent=2) json.dumps(
{"started_at": self.started_at, "completed_urls": sorted(self._done)},
indent=2,
)
) )
tmp.replace(self.path) # atomic on POSIX; avoids corrupt JSON on crash tmp.replace(self.path) # atomic on POSIX; avoids corrupt JSON on crash
@@ -70,6 +77,7 @@ class CsvWriter:
def __init__(self, path: Path, fields: list[str]) -> None: def __init__(self, path: Path, fields: list[str]) -> None:
is_new = not path.exists() is_new = not path.exists()
path.parent.mkdir(parents=True, exist_ok=True) path.parent.mkdir(parents=True, exist_ok=True)
self.path = path
self._fh = open(path, "a", newline="", encoding="utf-8") self._fh = open(path, "a", newline="", encoding="utf-8")
self._writer = csv.DictWriter(self._fh, fieldnames=fields) self._writer = csv.DictWriter(self._fh, fieldnames=fields)
if is_new: if is_new:
+5 -1
View File
@@ -14,6 +14,7 @@ from bs4 import BeautifulSoup
from spruce.download_result import ( from spruce.download_result import (
OK, OK,
PERMANENT_MISSING,
UNKNOWN, UNKNOWN,
DownloadResult, DownloadResult,
classify_http_error, classify_http_error,
@@ -263,6 +264,10 @@ class MachineSession:
and exc.response is not None and exc.response is not None
): ):
sc = exc.response.status_code sc = exc.response.status_code
cl = classify_http_error(sc, exc)
if cl == PERMANENT_MISSING:
# 404/410 will never succeed — don't waste time retrying.
return DownloadResult(0, sc, str(exc), cl)
if attempt < retries: if attempt < retries:
log.warning( log.warning(
"Attempt %d/%d failed %s: %s — retrying in %.0fs", "Attempt %d/%d failed %s: %s — retrying in %.0fs",
@@ -281,7 +286,6 @@ class MachineSession:
url, url,
exc, exc,
) )
cl = classify_http_error(sc, exc)
return DownloadResult(0, sc, str(exc), cl) return DownloadResult(0, sc, str(exc), cl)
return DownloadResult(0, None, "download_file: exhausted", UNKNOWN) return DownloadResult(0, None, "download_file: exhausted", UNKNOWN)
+133
View File
@@ -0,0 +1,133 @@
"""Retry queue loading from scans.csv (mosaic_download_status=failed)."""
import csv
from pathlib import Path
import pytest
from spruce.orchestrator import load_failed_scans_from_csv
from spruce.settings import SCANS_CSV_FIELDS
def _blank_row(**kwargs: str) -> dict[str, str]:
row = {k: "" for k in SCANS_CSV_FIELDS}
row.update(kwargs)
return row
def _write_scans_csv(path: Path, rows: list[dict[str, str]]) -> None:
with open(path, "w", newline="", encoding="utf-8") as fh:
w = csv.DictWriter(fh, fieldnames=SCANS_CSV_FIELDS)
w.writeheader()
for r in rows:
w.writerow({k: r.get(k, "") for k in SCANS_CSV_FIELDS})
def test_load_failed_scans_dedup_keeps_last_row(tmp_path: Path) -> None:
path = tmp_path / "scans.csv"
common = {
"machine": "BW1 [X]",
"machine_id": "1",
"scan_id": "100",
"mosaic_url": "http://x/m.jpg",
"mosaic_local_path": "",
"mosaic_on_disk": "False",
}
_write_scans_csv(
path,
[
_blank_row(
**common,
mosaic_download_status="failed",
mosaic_error_code="404",
scan_time="2020-01-01",
),
_blank_row(
**common,
mosaic_download_status="failed",
mosaic_error_code="404",
scan_time="2020-06-01",
),
],
)
out = load_failed_scans_from_csv(path, "BW1 [X]")
assert len(out) == 1
assert out[0]["scan_id"] == 100
assert out[0]["scan_time"] == "2020-06-01"
def test_load_failed_scans_since_year(tmp_path: Path) -> None:
path = tmp_path / "scans.csv"
base = {
"machine": "M",
"machine_id": "1",
"mosaic_url": "",
"mosaic_local_path": "",
"mosaic_on_disk": "",
"mosaic_download_status": "failed",
"mosaic_error_code": "404",
}
_write_scans_csv(
path,
[
_blank_row(**base, scan_id="1", scan_time="2022-12-31"),
_blank_row(**base, scan_id="2", scan_time="2023-01-01"),
_blank_row(**base, scan_id="3", scan_time=""),
],
)
out = load_failed_scans_from_csv(path, "M", since_year="2023")
ids = {s["scan_id"] for s in out}
assert ids == {2}
def test_load_failed_scans_error_code(tmp_path: Path) -> None:
path = tmp_path / "scans.csv"
base = {
"machine": "M",
"machine_id": "1",
"scan_time": "2024-01-01",
"mosaic_url": "",
"mosaic_local_path": "",
"mosaic_on_disk": "",
"mosaic_download_status": "failed",
}
_write_scans_csv(
path,
[
_blank_row(**base, scan_id="10", mosaic_error_code="404"),
_blank_row(**base, scan_id="11", mosaic_error_code="200"),
],
)
out = load_failed_scans_from_csv(path, "M", error_code="200")
assert [s["scan_id"] for s in out] == [11]
def test_load_failed_scans_excludes_downloaded(tmp_path: Path) -> None:
path = tmp_path / "scans.csv"
base = {
"machine": "M",
"machine_id": "1",
"scan_time": "2024-01-01",
"mosaic_url": "",
"mosaic_local_path": "",
"mosaic_on_disk": "True",
}
_write_scans_csv(
path,
[
_blank_row(
**base,
scan_id="5",
mosaic_download_status="downloaded",
mosaic_error_code="",
),
_blank_row(
**base,
scan_id="6",
mosaic_download_status="failed",
mosaic_error_code="404",
),
],
)
out = load_failed_scans_from_csv(path, "M")
assert [s["scan_id"] for s in out] == [6]