4 Commits

Author SHA1 Message Date
poprhythm 8593808cf3 Add --retry-failed mode and mosaic retry estimates to progress report
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 17:40:15 -04:00
poprhythm 6390f5d529 Scraping resilience, metadata tooling, and repository hygiene
Consolidates mosaic and session hardening (login retry, skip processed scans, no retry on 404, started_at), progress reporting (Markdown tables, by-year rollup, rolling-window rate/ETA), and metadata workflow scripts (run_metadata_scan.sh, scan_progress_report.py, export_machine_metadata.py). Adds mosaic reconstruction sample JPEGs referenced by the report. Updates .gitignore for backup/ and .claude/; sample_random_scans helper is documented for branch testing/sample-runs only (see README).
2026-05-14 19:52:53 -04:00
poprhythm 752c278dff Added skip logic - Based on random sampling, when disk_space_mb=0, it is safe to entirely skip it 2026-05-10 21:20:56 -04:00
poprhythm 5a7fdd820b Update mosaic URL formatting to zero-pad scan IDs to 6 digits 2026-05-09 22:06:14 -04:00
25 changed files with 1286 additions and 206 deletions
+3
View File
@@ -7,3 +7,6 @@ __pycache__/
.DS_Store
explore_dumps/
.venv/
scripts/sync_to_nas.sh
backup/
.claude/
+2 -4
View File
@@ -97,10 +97,8 @@ python scraper.py --machine "BW3-20 [AMR-26]" --mosaic-only
# Download mosaics for all machines
python scraper.py --mosaic-only
# One random completed scan per machine: mosaic + all tiles (from machines.txt; uses --list-scans + --scan-id)
# MOSAIC_ONLY=1 ./scripts/sample_random_scans.sh machines.txt # optional: mosaics only, no tiles
# cp scripts/machines.example.txt machines.txt # then edit: one label per line
# ./scripts/sample_random_scans.sh machines.txt
# One random completed scan per machine (helper script): check out branch `testing/sample-runs`,
# then see `scripts/sample_random_scans.sh` and `docs/sample_random_scans_run_progress.md`.
# Download all tiles for a specific scan
python scraper.py --machine "BW3-20 [AMR-26]" --scan-id 158374 --workers 4
Binary file not shown.

After

Width:  |  Height:  |  Size: 53 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 13 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 41 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 37 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.6 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 50 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 58 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 15 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 52 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 218 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 50 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 62 KiB

+15
View File
@@ -0,0 +1,15 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
VENV="/tmp/spruce_venv"
if [[ ! -x "$VENV/bin/python" ]]; then
echo "Setting up venv at $VENV..."
python3 -m venv "$VENV"
"$VENV/bin/python" -m ensurepip --upgrade
"$VENV/bin/pip" install -q -r "$SCRIPT_DIR/requirements.txt"
fi
echo "Starting metadata-only scan of all machines..."
"$VENV/bin/python" "$SCRIPT_DIR/scraper.py" --metadata-only "$@"
+105
View File
@@ -0,0 +1,105 @@
#!/usr/bin/env python3
"""
Split scans.csv into per-machine metadata CSVs.
Reads the combined scans.csv produced by the scraper and writes one CSV per
machine containing only the website-sourced metadata columns (no mosaic paths,
download status, or error fields).
Usage:
python scripts/export_machine_metadata.py
python scripts/export_machine_metadata.py --input archives/scans.csv --output-dir archives/by_machine
"""
from __future__ import annotations
import argparse
import csv
import sys
from collections import defaultdict
from pathlib import Path
METADATA_COLUMNS = [
"machine",
"machine_id",
"scan_id",
"name",
"scan_time",
"start_x",
"start_y",
"end_x",
"end_y",
"dx",
"dy",
"nx",
"ny",
"total_tiles",
"scan_lines",
"scan_mode",
"start_datetime",
"end_datetime",
"status",
"user",
"disk_space_mb",
]
def sanitize_machine_label(label: str) -> str:
return label.replace("[", "").replace("]", "").replace(" ", "_").strip("_")
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Split scans.csv into per-machine metadata CSVs.")
p.add_argument(
"--input",
default="archives/scans.csv",
metavar="FILE",
help="Path to scans.csv (default: archives/scans.csv)",
)
p.add_argument(
"--output-dir",
default="archives/by_machine",
metavar="DIR",
help="Directory for output CSVs (default: archives/by_machine)",
)
return p.parse_args()
def main() -> None:
args = parse_args()
input_path = Path(args.input)
output_dir = Path(args.output_dir)
if not input_path.exists():
sys.exit(f"Input file not found: {input_path}")
with input_path.open(newline="") as fh:
reader = csv.DictReader(fh)
if reader.fieldnames is None:
sys.exit(f"{input_path} appears to be empty.")
missing = [c for c in METADATA_COLUMNS if c not in reader.fieldnames]
if missing:
sys.exit(f"Expected columns not found in {input_path}: {missing}")
rows_by_machine: dict[str, list[dict]] = defaultdict(list)
for row in reader:
rows_by_machine[row["machine"]].append(row)
output_dir.mkdir(parents=True, exist_ok=True)
for machine_label, rows in sorted(rows_by_machine.items()):
safe_name = sanitize_machine_label(machine_label)
out_path = output_dir / f"{safe_name}_scans_metadata.csv"
with out_path.open("w", newline="") as fh:
writer = csv.DictWriter(fh, fieldnames=METADATA_COLUMNS, extrasaction="ignore")
writer.writeheader()
writer.writerows(rows)
print(f" {out_path} ({len(rows)} rows)")
total = sum(len(r) for r in rows_by_machine.values())
print(f"\n{len(rows_by_machine)} machine(s), {total} total rows → {output_dir}/")
if __name__ == "__main__":
main()
+1 -1
View File
@@ -1,6 +1,6 @@
# All RootView minirhizotron machine labels (same set as `machine_metadata` in config.example.yaml).
# Copy to the repo root as machines.txt, or: cp scripts/machines.example.txt machines.txt
# sample_random_scans.sh: by default one random scan per line = mosaic + tiles; use MOSAIC_ONLY=1 for mosaics only
# Random-sample helper `scripts/sample_random_scans.sh` lives on branch `testing/sample-runs` only.
BW1-4 [AMR-15]
BW1-6 [AMR-19]
BW1-7 [AMR-18]
+481
View File
@@ -0,0 +1,481 @@
#!/usr/bin/env python3
"""
Report mosaic download progress from archives/scans.csv.
Output is Markdown. Use ``--by-year`` for a per-machine × per-year
done/failed table. When the first mosaic pass is complete (no pending rows)
but failures remain, a **Mosaic retry estimates** section is printed with
queue counts and duration hints.
Rate/ETA use a 30-minute rolling window when snapshots show progress.
Mean mosaic size is sampled from up to 100 downloads (1-hour cache).
Usage:
python scripts/mosaic_progress_report.py [--archive DIR] [--by-year]
"""
import argparse
import csv
import json
import os
import random
import sys
from collections import Counter
from datetime import datetime, timezone
from pathlib import Path
# Year-based viability model derived from BW1-4 observations:
# pre-2019 → kept on server long-term (~100 %)
# 2019-2022 → purged (~ 0 %)
# 2023+ → recent, mostly available (~82 %)
_R_PRE19 = 1.00
_R_PURGED = 0.00
_R_RECENT = 0.82
FIRST_PASS_FALLBACK_RATE_PER_HR = 1100.0
RETRY_OPTIMISTIC_RATE_PER_HR = 1800.0
RETRY_REALISTIC_RATE_PER_HR = 1100.0
RETRY_PESSIMISTIC_RATE_PER_HR = 300.0
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _parse_dt(s: str) -> datetime | None:
try:
return datetime.fromisoformat(s.replace("Z", "+00:00"))
except Exception:
return None
def _fmt_duration(seconds: float) -> str:
if seconds < 0:
return "?"
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
if h >= 48:
return f"{h // 24}d {h % 24}h"
if h > 0:
return f"{h}h {m:02d}m"
return f"{m}m {int(seconds % 60):02d}s"
def _fmt_size(b: float) -> str:
if b >= 1e12:
return f"{b/1e12:.2f} TB"
if b >= 1e9:
return f"{b/1e9:.2f} GB"
if b >= 1e6:
return f"{b/1e6:.1f} MB"
return f"{b/1e3:.0f} KB"
def _md_table(headers: list[str], rows: list[list[str]], *, align: list[str] | None = None) -> str:
"""Render a Markdown table. align values: 'l', 'r', 'c' (default 'l')."""
if align is None:
align = ["l"] * len(headers)
sep_map = {"l": ":---", "r": "---:", "c": ":---:"}
def row_str(cells: list[str]) -> str:
return "| " + " | ".join(cells) + " |"
lines = [
row_str(headers),
row_str([sep_map.get(a, ":---") for a in align]),
]
for r in rows:
lines.append(row_str(r))
return "\n".join(lines)
def _sample_mean_bytes(rows: list[dict], cache: dict, max_sample: int = 100) -> float | None:
cached_mean = cache.get("mean_bytes")
cached_n = cache.get("sample_n", 0)
cached_ts = _parse_dt(cache.get("size_ts", ""))
now = datetime.now(timezone.utc)
if (
cached_mean and cached_ts
and (now - cached_ts).total_seconds() < 3600
and cached_n >= min(max_sample, len(rows))
):
return float(cached_mean)
sample = random.sample(rows, min(max_sample, len(rows)))
sizes = []
for row in sample:
p = row.get("mosaic_local_path", "")
if p:
try:
sz = os.path.getsize(p)
if sz > 0:
sizes.append(sz)
except OSError:
pass
if not sizes:
return None
mean = sum(sizes) / len(sizes)
cache["mean_bytes"] = mean
cache["sample_n"] = len(sizes)
cache["size_ts"] = now.isoformat()
return mean
def _expected_remaining(pending_rows: list[dict]) -> float:
count = 0.0
for row in pending_rows:
yr = row.get("scan_time", "")[:4]
if yr < "2019":
count += _R_PRE19
elif yr <= "2022":
count += _R_PURGED
else:
count += _R_RECENT
return count
def _retry_hours_from_rate(n_scans: int, rate_per_hr: float) -> str:
if n_scans <= 0 or rate_per_hr <= 0:
return ""
return _fmt_duration(n_scans / rate_per_hr * 3600.0)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--archive", default="archives")
parser.add_argument(
"--by-year", action="store_true",
help="Add a per-machine × per-year done/failed breakdown table",
)
args = parser.parse_args()
archive = Path(args.archive)
scans_csv = archive / "scans.csv"
progress_json = archive / ".progress.json"
rate_cache_path = archive / ".mosaic_rate_cache.json"
if not scans_csv.exists():
sys.exit(f"scans.csv not found: {scans_csv}")
# --- Load & deduplicate (last row per machine+scan_id) ---
latest: dict[tuple[str, str], dict] = {}
with open(scans_csv, newline="", encoding="utf-8") as f:
for row in csv.DictReader(f):
key = (row.get("machine", ""), row.get("scan_id", ""))
latest[key] = row
by_machine: dict[str, Counter] = {}
# machine -> year -> Counter(status -> count)
by_machine_year: dict[str, dict[str, Counter]] = {}
total_counts: Counter = Counter()
downloaded_rows: list[dict] = []
pending_rows: list[dict] = []
for (_m, _sid), row in latest.items():
status = row.get("mosaic_download_status", "")
m = row.get("machine", "")
yr = (row.get("scan_time") or "")[:4] or "????"
by_machine.setdefault(m, Counter())[status] += 1
by_machine_year.setdefault(m, {}).setdefault(yr, Counter())[status] += 1
total_counts[status] += 1
if status == "downloaded":
downloaded_rows.append(row)
elif status == "skipped_metadata_only":
pending_rows.append(row)
total = sum(total_counts.values())
downloaded = total_counts["downloaded"]
failed = total_counts["failed"]
zero_skipped = total_counts["skipped_zero_disk_space"]
pending = total_counts["skipped_metadata_only"]
processed = downloaded + failed + zero_skipped
attempted = downloaded + failed
now = datetime.now(timezone.utc)
# --- Elapsed ---
elapsed_str = ""
if progress_json.exists():
try:
data = json.loads(progress_json.read_text())
started_at = _parse_dt(data.get("started_at", ""))
if started_at:
elapsed_str = _fmt_duration((now - started_at).total_seconds())
except Exception:
pass
# --- Rate cache ---
cache: dict = {}
if rate_cache_path.exists():
try:
cache = json.loads(rate_cache_path.read_text())
except Exception:
pass
# Rolling rate: keep up to 60 snapshots; compute rate from the oldest
# snapshot within the last 30 minutes for a smoothed estimate.
snapshots: list[dict] = cache.get("snapshots", [])
# Prune snapshots older than 30 minutes, but keep at least one
cutoff = now.timestamp() - 1800
recent = [s for s in snapshots if s.get("ts", 0) >= cutoff]
if not recent and snapshots:
recent = [snapshots[-1]] # always keep one for continuity
rate_per_sec: float | None = None
rate_window_str = ""
snap_delta_proc = 0
if recent:
oldest = recent[0]
dt = now.timestamp() - oldest["ts"]
dp = processed - oldest["proc"]
snap_delta_proc = dp
if dt >= 60 and dp > 0:
rate_per_sec = dp / dt
window_min = dt / 60
rate_window_str = f"{window_min:.0f}-min avg"
# One-time baseline after initial mosaic crawl finished (no pending rows).
if pending == 0 and "first_pass_mean_rate_per_hr" not in cache:
cache["first_pass_completed_at"] = now.isoformat()
cache["first_pass_processed"] = total
cache["first_pass_mean_rate_per_hr"] = FIRST_PASS_FALLBACK_RATE_PER_HR
first_pass_rate_hr = float(
cache.get("first_pass_mean_rate_per_hr", FIRST_PASS_FALLBACK_RATE_PER_HR)
)
live_rate_hr = rate_per_sec * 3600 if rate_per_sec else None
# Active scrape shows progress in snapshots; idle archive shows dp == 0.
retry_estimate_rate_hr = (
live_rate_hr if live_rate_hr is not None else first_pass_rate_hr
)
# --- Disk space ---
mean_bytes: float | None = None
size_note = ""
if downloaded_rows:
mean_bytes = _sample_mean_bytes(downloaded_rows, cache)
if mean_bytes and cache.get("sample_n"):
size_note = f"mean {_fmt_size(mean_bytes)} × {cache['sample_n']} sampled files"
dl_bytes: float | None = None
rem_bytes: float | None = None
if mean_bytes:
dl_bytes = downloaded * mean_bytes
rem_bytes = _expected_remaining(pending_rows) * mean_bytes
# Update cache: append new snapshot, keep last 60
recent.append({"ts": now.timestamp(), "proc": processed})
cache["snapshots"] = recent[-60:]
# Keep legacy keys for backwards compat
cache["timestamp"] = now.isoformat()
cache["processed"] = processed
try:
rate_cache_path.write_text(json.dumps(cache))
except Exception:
pass
rate_str = eta_str = ""
if rate_per_sec and rate_per_sec > 0:
rate_str = f"{rate_per_sec * 3600:,.0f} scans/hr ({rate_window_str})"
eta_str = _fmt_duration(pending / rate_per_sec)
# -----------------------------------------------------------------------
# Output — Markdown
# -----------------------------------------------------------------------
ts = datetime.now().strftime("%Y-%m-%d %H:%M")
print(f"# Mosaic download progress — {ts}\n")
print(f"**Archive:** `{archive.resolve()}` ")
meta_parts = []
if elapsed_str:
meta_parts.append(f"**Elapsed:** {elapsed_str}")
if rate_str:
meta_parts.append(f"**Rate:** {rate_str}")
if eta_str:
meta_parts.append(f"**ETA:** {eta_str}")
if meta_parts:
print(" ".join(meta_parts) + " ")
print()
# Summary table
summary_rows = [
["Downloaded", f"{downloaded:,}", f"{100*downloaded/total:.1f}%"],
["Failed", f"{failed:,}", f"{100*failed/total:.1f}%"],
["Skipped (disk=0)",f"{zero_skipped:,}", f"{100*zero_skipped/total:.1f}%"],
["Pending", f"{pending:,}", f"{100*pending/total:.1f}%"],
["**Total**", f"**{total:,}**", ""],
]
if attempted:
summary_rows.append(["**Success rate**", f"**{100*downloaded/attempted:.1f}%**", "*(of attempted)*"])
print(_md_table(["Metric", "Count", ""], summary_rows, align=["l", "r", "l"]))
print()
# Disk space
if dl_bytes is not None and rem_bytes is not None:
total_bytes = dl_bytes + rem_bytes
print(f"### Disk space\n")
print(f"_{size_note}_\n")
ds_rows = [
["Downloaded so far", _fmt_size(dl_bytes), ""],
["Estimated remaining", _fmt_size(rem_bytes), "*(model-based)*"],
["**Grand total**", f"**{_fmt_size(total_bytes)}**", ""],
]
print(_md_table(["", "Size", ""], ds_rows, align=["l", "r", "l"]))
print()
# Per-machine breakdown
print("### Per-machine breakdown\n")
machines = sorted(by_machine)
mc_rows = []
for m in machines:
mc = by_machine[m]
mt = sum(mc.values())
mc_rows.append([
m,
f"{mc['downloaded']:,}",
f"{mc['failed']:,}",
f"{mc['skipped_zero_disk_space']:,}",
f"{mc['skipped_metadata_only']:,}",
f"{mt:,}",
])
mc_rows.append([
"**TOTAL**",
f"**{downloaded:,}**",
f"**{failed:,}**",
f"**{zero_skipped:,}**",
f"**{pending:,}**",
f"**{total:,}**",
])
print(_md_table(
["Machine", "Done", "Failed", "Skip0", "Pending", "Total"],
mc_rows,
align=["l", "r", "r", "r", "r", "r"],
))
# -----------------------------------------------------------------------
# Retry estimates (first pass complete: pending == 0, failures remain)
# -----------------------------------------------------------------------
if failed > 0 and pending == 0:
failed_rows_list = [
r for r in latest.values()
if r.get("mosaic_download_status") == "failed"
]
n_all = len(failed_rows_list)
n_2023 = sum(
1 for r in failed_rows_list
if (r.get("scan_time") or "")[:4] >= "2023"
and len((r.get("scan_time") or "")[:4]) == 4
)
n_200 = sum(
1 for r in failed_rows_list
if r.get("mosaic_error_code") == "200"
)
rate_note = (
"rolling 30-min window"
if snap_delta_proc > 0
else f"first-pass baseline ({first_pass_rate_hr:,.0f}/hr)"
)
print()
print("### Mosaic retry estimates\n")
print(
f"*Suggested command after server fix:* "
f"`python scraper.py --retry-failed --workers 2` "
f"(filters: `--retry-since YEAR`, `--retry-error-code CODE`)*\n"
)
print(
f"*ETA column uses **{retry_estimate_rate_hr:,.0f} scans/hr** "
f"({rate_note}). Fixed columns use scenario rates.*\n"
)
est_hdr = (
"Retry scope",
"Count",
f"@{RETRY_OPTIMISTIC_RATE_PER_HR:.0f}/hr",
f"@{RETRY_REALISTIC_RATE_PER_HR:.0f}/hr",
f"@{RETRY_PESSIMISTIC_RATE_PER_HR:.0f}/hr",
f"@{retry_estimate_rate_hr:.0f}/hr",
)
retry_tbl_rows = [
[
"HTTP 200 (empty body)",
f"{n_200:,}",
_retry_hours_from_rate(n_200, RETRY_OPTIMISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_200, RETRY_REALISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_200, RETRY_PESSIMISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_200, retry_estimate_rate_hr),
],
[
"Failed, scan_time ≥ 2023",
f"{n_2023:,}",
_retry_hours_from_rate(n_2023, RETRY_OPTIMISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_2023, RETRY_REALISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_2023, RETRY_PESSIMISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_2023, retry_estimate_rate_hr),
],
[
"**All failed**",
f"**{n_all:,}**",
_retry_hours_from_rate(n_all, RETRY_OPTIMISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_all, RETRY_REALISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_all, RETRY_PESSIMISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_all, retry_estimate_rate_hr),
],
]
print(_md_table(list(est_hdr), retry_tbl_rows, align=["l", "r", "r", "r", "r", "r"]))
# -----------------------------------------------------------------------
# --by-year table
# -----------------------------------------------------------------------
if args.by_year:
print()
print("### Downloads by machine and year\n")
print("*Format: done / failed*\n")
# Only include years that have at least one downloaded or failed scan
all_years = sorted(
yr for yr in set(
yr for m_data in by_machine_year.values() for yr in m_data
)
if any(
by_machine_year[m].get(yr, Counter()).get("downloaded", 0)
+ by_machine_year[m].get(yr, Counter()).get("failed", 0) > 0
for m in by_machine_year
)
)
# Totals per year
yr_totals: dict[str, Counter] = {}
for yr in all_years:
yr_totals[yr] = Counter()
for m in machines:
yr_totals[yr] += by_machine_year.get(m, {}).get(yr, Counter())
year_rows = []
for m in machines:
row_cells = [m]
for yr in all_years:
c = by_machine_year.get(m, {}).get(yr, Counter())
d = c.get("downloaded", 0)
f = c.get("failed", 0)
row_cells.append(f"{d:,} / {f:,}" if (d or f) else "")
year_rows.append(row_cells)
# Totals row
total_cells = ["**TOTAL**"]
for yr in all_years:
d = yr_totals[yr].get("downloaded", 0)
f = yr_totals[yr].get("failed", 0)
total_cells.append(f"**{d:,} / {f:,}**" if (d or f) else "")
year_rows.append(total_cells)
print(_md_table(
["Machine"] + all_years,
year_rows,
align=["l"] + ["r"] * len(all_years),
))
if __name__ == "__main__":
main()
-178
View File
@@ -1,178 +0,0 @@
#!/usr/bin/env bash
# For each machine label in a text file, pick one random completed scan and download
# it: by default the mosaic and all tiles (same as: --machine "…" --scan-id N).
# For mosaic only (faster, no tile downloads), set: MOSAIC_ONLY=1
#
# Usage:
# ./scripts/sample_random_scans.sh [PATH_TO_machines.txt]
# Config path defaults to config.yaml in the repo root. Override with:
# CONFIG=/path/to/config.yaml ./scripts/sample_random_scans.sh machines.txt
# Dry-run the download step (listing still does real HTTP to fetch scan list):
# DRY_RUN=1 ./scripts/sample_random_scans.sh machines.txt
# Verbose / debug (extra per-step lines, scan counts from the list step):
# DEBUG=1 ./scripts/sample_random_scans.sh machines.txt
# By default, --list-scans fetches only the first page (one HTTP request, up to
# 320 scans). To paginate the full archive for the random pick (slower when many
# LIST_SCANS_ALL_PAGES=1 ./scripts/sample_random_scans.sh machines.txt
#
# machines.txt: one machine label per line (same as --machine and config machine names).
# See scripts/machines.example.txt
set -euo pipefail
REPO_ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
CONFIG="${CONFIG:-$REPO_ROOT/config.yaml}"
MACHINES_FILE="${1:-$REPO_ROOT/machines.txt}"
SCRAPER=(python3 "$REPO_ROOT/scraper.py" --config "$CONFIG")
log() { echo "[sample_random_scans] $*" >&2; }
log_debug() {
if [[ -n "${DEBUG:-}" ]]; then
echo "[sample_random_scans] debug: $*" >&2
fi
}
if [[ ! -f "$MACHINES_FILE" ]]; then
log "error: file not found: $MACHINES_FILE"
log "Create it with one machine label per line, or: cp scripts/machines.example.txt machines.txt"
exit 1
fi
if [[ ! -f "$CONFIG" ]]; then
log "error: config not found: $CONFIG"
exit 1
fi
# Non-empty, non-comment lines (same rules as the main loop)
TOTAL_MACHINES="$(
grep -v '^[[:space:]]*#' "$MACHINES_FILE" | grep -c -v '^[[:space:]]*$' || true
)"
if [[ -z "$TOTAL_MACHINES" || "$TOTAL_MACHINES" -eq 0 ]]; then
log "error: no machine lines in: $MACHINES_FILE"
exit 1
fi
log "starting repo=$REPO_ROOT"
log " config=$CONFIG"
log " machines_file=$MACHINES_FILE (${TOTAL_MACHINES} machine(s) in file)"
if [[ -n "${MOSAIC_ONLY:-}" ]]; then
if [[ -n "${DRY_RUN:-}" ]]; then
log " mode: MOSAIC_ONLY + DRY_RUN (mosaic only, --dry-run on download step)"
else
log " mode: MOSAIC_ONLY=1 (mosaics only, no tiles; use for a lighter sample)"
fi
else
if [[ -n "${DRY_RUN:-}" ]]; then
log " mode: DRY_RUN (list + full scan download use --dry-run; no files written)"
else
log " mode: full scan — mosaic + all tiles (workers from config)"
fi
fi
if [[ -n "${DEBUG:-}" ]]; then
log " DEBUG=1 (extra diagnostics enabled)"
fi
if [[ -n "${LIST_SCANS_ALL_PAGES:-}" ]]; then
log " list step: list-scans = full archive (all pages, slower)"
else
log " list step: list-scans --list-scans-first-page-only (one page, up to 320 IDs)"
fi
log "────────────────────────────────────────"
export REPO_ROOT CONFIG
[[ -n "${DEBUG:-}" ]] && export DEBUG
[[ -n "${LIST_SCANS_ALL_PAGES:-}" ]] && export LIST_SCANS_ALL_PAGES
PROCESSED=0
SKIPPED=0
IDX=0
while IFS= read -r line || [[ -n "${line-}" ]]; do
# trim, strip CR, skip blanks / comments
line="${line//$'\r'/}"
label="${line#"${line%%[![:space:]]*}"}"
label="${label%"${label##*[![:space:]]}"}"
[[ -z "$label" || "$label" == \#* ]] && continue
IDX=$((IDX + 1))
log "[$IDX/$TOTAL_MACHINES] machine: $label"
log " status: listing scans (--list-scans) …"
random_id="$(
REPO_ROOT="$REPO_ROOT" CONFIG="$CONFIG" LABEL="$label" python3 - <<'PY'
import os, random, subprocess, sys
label = os.environ["LABEL"]
repo = os.environ["REPO_ROOT"]
cfg = os.environ["CONFIG"]
debug = bool(os.environ.get("DEBUG"))
full = bool(os.environ.get("LIST_SCANS_ALL_PAGES"))
scraper = os.path.join(repo, "scraper.py")
if debug:
print(
f"[sample_random_scans] debug: running list-scans for {label!r} "
f"({'all pages' if full else 'first page only'})",
file=sys.stderr,
)
cmd = [sys.executable, scraper, "--list-scans", "--machine", label, "--config", cfg]
if not full:
cmd.insert(3, "--list-scans-first-page-only")
out = subprocess.check_output(
cmd,
text=True,
stderr=subprocess.STDOUT,
)
ids = []
for line in out.splitlines():
line = line.rstrip()
if not line or line.startswith("---") or "Total" in line:
continue
parts = line.split()
if parts and parts[0].isdigit():
ids.append(parts[0])
if not ids:
print(f"no scans parsed for {label!r} — check login and output", file=sys.stderr)
sys.exit(1)
if debug:
print(
f"[sample_random_scans] debug: parsed {len(ids)} scan id(s) for {label!r}",
file=sys.stderr,
)
print(random.choice(ids), end="")
PY
)" || {
log " status: SKIPPED (could not get scan list or pick id)"
SKIPPED=$((SKIPPED + 1))
continue
}
log " status: picked random scan_id=$random_id (uniform among IDs from this list step — first page by default, see start banner)"
if [[ -n "${MOSAIC_ONLY:-}" ]]; then
log " status: running scraper: --mosaic-only --scan-id (mosaic only) …"
else
log " status: running scraper: --scan-id (mosaic + tiles) …"
fi
if [[ -n "${DRY_RUN:-}" ]]; then
log " status: (dry-run — no files written for this scan)"
fi
if [[ -n "${MOSAIC_ONLY:-}" ]]; then
run_cmd=("${SCRAPER[@]}" --mosaic-only --machine "$label" --scan-id "$random_id")
else
run_cmd=("${SCRAPER[@]}" --machine "$label" --scan-id "$random_id")
fi
if [[ -n "${DRY_RUN:-}" ]]; then
run_cmd+=(--dry-run)
fi
if "${run_cmd[@]}"; then
log " status: OK — finished this machine (exit 0)"
PROCESSED=$((PROCESSED + 1))
else
rc=$?
log " status: FAILED — scraper exit code $rc (stopping; fix or remove this machine and re-run)"
exit "$rc"
fi
log "────────────────────────────────────────"
done < "$MACHINES_FILE"
log "done. summary: $PROCESSED machine(s) with sampled scan download completed, $SKIPPED skipped, $IDX line(s) processed out of $TOTAL_MACHINES in file."
exit 0
+218
View File
@@ -0,0 +1,218 @@
#!/usr/bin/env python3
"""
Report metadata-scan progress and projected completion times for all machines.
Usage:
python scripts/scan_progress_report.py [--archive ARCHIVE_DIR] [--recent N] [--mermaid] [--rate-chart]
Options:
--archive DIR Path to archives directory (default: archives)
--recent N Number of recent files used to compute current rate (default: 500)
--mermaid Also print a Mermaid Gantt chart
--rate-chart Also print a Mermaid XY chart of s/scan rate by hour
"""
import argparse
import glob
import os
import re
import sys
from collections import defaultdict
from datetime import datetime, timedelta
from pathlib import Path
# Canonical machine order and total scan counts (from README inventory, April 2026)
MACHINES = [
("BW1-4 [AMR-15]", 6121),
("BW1-6 [AMR-19]", 18198),
("BW1-7 [AMR-18]", 430),
("BW2-8 [AMR-25]", 8191),
("BW2-10 [AMR-22]", 16537),
("BW2-11 [AMR-23]", 26763),
("BW2-13 [AMR-24]", 13537),
("BW3-16 [AMR-16]", 7325),
("BW3-17 [AMR-20]", 471),
("BW3-19 [AMR-21]", 15186),
("BW3-20 [AMR-26]", 23052),
("BW3-21 [AMR-17]", 10115),
]
TOTAL_SCANS = sum(t for _, t in MACHINES)
def dir_name(label: str) -> str:
return re.sub(r"[^\w\-.]", "_", label).strip("_")
def get_timestamps(machine_dir: Path) -> list[float]:
files = glob.glob(str(machine_dir / "**" / "metadata.json"), recursive=True)
return sorted(os.path.getmtime(f) for f in files)
def print_rate_chart(all_timestamps: list[float]) -> None:
"""Print a Mermaid xychart-beta of average s/scan per hour."""
# One avg rate per hour
bins: dict[str, list[float]] = defaultdict(list)
start_hour: datetime | None = None
for i in range(1, len(all_timestamps)):
gap = all_timestamps[i] - all_timestamps[i - 1]
if gap < 300: # ignore inter-machine gaps
dt = datetime.fromtimestamp(all_timestamps[i])
hour_key = dt.strftime("%m-%d %Hh")
bins[hour_key].append(gap)
if start_hour is None:
start_hour = dt.replace(minute=0, second=0, microsecond=0)
# Drop the last (partial) hour
hours = sorted(bins.keys())
if hours:
hours = hours[:-1]
if not hours or start_hour is None:
print("(not enough data for rate chart)")
return
values = [f"{sum(bins[h])/len(bins[h]):.2f}" for h in hours]
y_max = max(float(v) for v in values)
y_ceil = int(y_max) + 3
n = len(hours)
# Numeric x-axis: Mermaid auto-picks readable tick positions
start_label = start_hour.strftime("%b %d %H:%M")
print("```mermaid")
print("xychart-beta")
print(f' title "Metadata scan rate (s/scan) — hourly, starting {start_label}"')
print(f' x-axis "Hours elapsed" 0 --> {n}')
print(f' y-axis "s / scan" 0 --> {y_ceil}')
print(f" line [{', '.join(values)}]")
print("```")
def fmt_dt(dt: datetime) -> str:
return dt.strftime("%a %b %d %H:%M")
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--archive", default="archives", help="Archives directory")
parser.add_argument("--recent", type=int, default=500,
help="Files used to compute recent rate (default: 500)")
parser.add_argument("--mermaid", action="store_true", help="Print Mermaid Gantt chart")
parser.add_argument("--rate-chart", action="store_true", help="Print Mermaid XY rate-over-time chart")
args = parser.parse_args()
archive = Path(args.archive)
if not archive.is_dir():
sys.exit(f"Archive directory not found: {archive}")
now = datetime.now()
# --- Gather per-machine data ---
machine_data = [] # (label, total, done, first_ts, last_ts)
all_timestamps: list[float] = []
for label, total in MACHINES:
mdir = archive / dir_name(label)
if mdir.is_dir():
times = get_timestamps(mdir)
else:
times = []
done = len(times)
first_ts = datetime.fromtimestamp(times[0]) if times else None
last_ts = datetime.fromtimestamp(times[-1]) if times else None
machine_data.append((label, total, done, first_ts, last_ts, times))
all_timestamps.extend(times)
all_timestamps.sort()
total_done = sum(d for _, _, d, *_ in machine_data)
# --- Rate calculation ---
recent_times = all_timestamps[-args.recent:] if len(all_timestamps) >= 2 else all_timestamps
if len(recent_times) >= 2:
recent_rate = (recent_times[-1] - recent_times[0]) / len(recent_times)
else:
recent_rate = None
if len(all_timestamps) >= 2:
overall_rate = (all_timestamps[-1] - all_timestamps[0]) / len(all_timestamps)
else:
overall_rate = None
rate = recent_rate or overall_rate or 5.0 # fallback
# --- Print timetable ---
print(f"Metadata scan progress — {now.strftime('%Y-%m-%d %H:%M')}")
print(f"Overall rate : {overall_rate:.2f} s/scan" if overall_rate else "Overall rate : n/a")
print(f"Recent rate : {recent_rate:.2f} s/scan (last {args.recent} files)" if recent_rate else "Recent rate : n/a")
print(f"Rate used : {rate:.2f} s/scan")
print(f"Done : {total_done:,} / {TOTAL_SCANS:,} ({100*total_done/TOTAL_SCANS:.1f}%)")
print()
print(f"{'Machine':<20} {'Total':>7} {'Done':>7} {'Pct':>6} {'Completion'}")
print("-" * 68)
cursor = now
gantt_rows: list[tuple[str, datetime, datetime, str]] = [] # label, start, end, status
for label, total, done, first_ts, last_ts, times in machine_data:
pct = 100 * done / total if total else 0
complete = done >= total or (done > 0 and done / total >= 0.999)
if done == 0:
# Not started yet
start = cursor
finish = cursor + timedelta(seconds=total * rate)
status = "pending"
print(f"{label:<20} {total:>7,} {'':>7} {'':>5} {fmt_dt(finish)}")
elif complete:
# Complete — use actual timestamps
start = first_ts
finish = last_ts
status = "done"
print(f"{label:<20} {total:>7,} {done:>7,} {pct:>5.1f}% complete")
else:
# In progress
remaining = total - done
start = first_ts
finish = cursor + timedelta(seconds=remaining * rate)
status = "active"
print(f"{label:<20} {total:>7,} {done:>7,} {pct:>5.1f}% {fmt_dt(finish)} ← in progress")
gantt_rows.append((label, start or cursor, finish, status))
if status != "done":
cursor = finish
print("-" * 68)
print(f"{'All done':<20} {TOTAL_SCANS:>7,} {total_done:>7,} {100*total_done/TOTAL_SCANS:>5.1f}% {fmt_dt(cursor)}")
# --- Mermaid rate chart ---
if args.rate_chart:
print()
print_rate_chart(all_timestamps)
# --- Mermaid Gantt ---
if args.mermaid:
print()
print("```mermaid")
print("gantt")
print(" title Metadata scan progress — all 12 machines")
print(" dateFormat YYYY-MM-DD HH:mm")
print(" axisFormat %b %d")
print()
section = None
for label, start, finish, status in gantt_rows:
prefix = label.split()[0][:3] # BW1, BW2, BW3
if prefix != section:
section = prefix
print(f" section {section}")
safe = label.replace("[", "").replace("]", "").replace(" ", "-")
tag = f"done, " if status == "done" else ("active, " if status == "active" else "")
s = start.strftime("%Y-%m-%d %H:%M")
e = finish.strftime("%Y-%m-%d %H:%M")
print(f" {label} :{tag}{safe}, {s}, {e}")
print("```")
if __name__ == "__main__":
main()
+77
View File
@@ -84,6 +84,33 @@ def parse_args() -> argparse.Namespace:
"inventorying all scans across all machines."
),
)
p.add_argument(
"--retry-failed",
action="store_true",
help=(
"Mosaic-only: re-attempt scans whose latest scans.csv row has "
"mosaic_download_status=failed (queue from CSV, not the server list). "
"Implies --mosaic-only."
),
)
p.add_argument(
"--retry-since",
metavar="YEAR",
default=None,
help=(
"With --retry-failed: only scans with scan_time year >= YEAR "
"(e.g. 2023)."
),
)
p.add_argument(
"--retry-error-code",
metavar="CODE",
default=None,
help=(
"With --retry-failed: filter by mosaic_error_code "
"(e.g. 200 for empty-body failures)."
),
)
p.add_argument(
"--dry-run",
action="store_true",
@@ -122,6 +149,17 @@ def parse_args() -> argparse.Namespace:
"and report how many were re-queued. Run before resuming after a crash."
),
)
p.add_argument(
"--max-tiles",
type=int,
default=None,
metavar="N",
help=(
"Download at most N tiles per scan (default: all). "
"Pass 1 to probe a single tile — useful for quickly checking "
"whether a scan has real imagery or only placeholder responses."
),
)
p.add_argument(
"--verbose",
"-v",
@@ -145,6 +183,19 @@ def main() -> None:
if args.list_scans_first_page_only and not args.list_scans:
sys.exit("--list-scans-first-page-only requires --list-scans")
if args.scan_id is not None and args.scan_id <= 0:
sys.exit("--scan-id must be a positive integer")
if args.retry_since and not args.retry_failed:
sys.exit("--retry-since requires --retry-failed.")
if args.retry_error_code and not args.retry_failed:
sys.exit("--retry-error-code requires --retry-failed.")
if args.retry_failed:
if args.metadata_only:
sys.exit("--retry-failed cannot be used with --metadata-only.")
args.mosaic_only = True # implied
# --list-machines doesn't need credentials
if args.list_machines:
base_url = "http://205.149.147.131:8010/"
@@ -247,6 +298,12 @@ def main() -> None:
if args.metadata_only:
log.info("Mode: metadata only (mosaics and tiles skipped)")
elif args.mosaic_only:
if args.retry_failed:
log.info(
"Mode: mosaic retry (failed scans from %s)",
SCANS_CSV_FILENAME,
)
else:
log.info("Mode: mosaics only (individual tiles skipped)")
if args.dry_run:
log.info("Mode: dry-run (no files will be written)")
@@ -270,6 +327,10 @@ def main() -> None:
mosaic_only=args.mosaic_only,
metadata_only=args.metadata_only,
scan_id_filter=args.scan_id,
max_tiles=args.max_tiles,
retry_failed=args.retry_failed,
retry_since_year=args.retry_since,
retry_error_code=args.retry_error_code,
)
totals.merge(stats)
finally:
@@ -331,6 +392,22 @@ def _print_summary(
)
if not metadata_only and not mosaic_only:
log.info(row("Tiles downloaded:", str(totals.tiles_downloaded)))
if totals.scans_probe_skipped:
log.info(
row(
"Probe-skipped scans:",
str(totals.scans_probe_skipped),
"probe tile was 404 or placeholder; tile pool skipped",
)
)
if not metadata_only and totals.scans_disk_space_skipped:
log.info(
row(
"Zero-disk-space skipped:",
str(totals.scans_disk_space_skipped),
"disk_space_mb=0; mosaic and tiles not attempted",
)
)
if not dry_run and not metadata_only:
log.info(
row(
+230 -16
View File
@@ -2,14 +2,34 @@
High-level scrape orchestration: drives the per-machine and per-scan loops.
"""
import csv
import json
import logging
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from spruce.download_result import error_code_str
from tqdm import tqdm
from spruce.download_result import PERMANENT_MISSING, UNKNOWN, error_code_str
from spruce.exif import write_mosaic_exif
from spruce.paths import machine_dir_name, tile_dest, mosaic_dest, _extract_date
from spruce.progress import ProgressTracker, CsvWriter
from spruce.session import MachineSession
# RootView returns ~43-byte 1×1 JPEG placeholders for empty cells; stay well
# below smallest observed real tile (~7 KiB in production samples).
PLACEHOLDER_MAX_BYTES = 200
def _is_placeholder_tile(path: Path) -> bool:
"""Return True if a downloaded tile looks like a 1×1 server placeholder."""
try:
return path.is_file() and path.stat().st_size <= PLACEHOLDER_MAX_BYTES
except OSError:
return False
@dataclass
@@ -23,6 +43,8 @@ class RunStats:
mosaics_downloaded: int = 0
mosaics_failed: int = 0 # mosaic URL attempted but 0 bytes / HTTP error
tiles_downloaded: int = 0
scans_probe_skipped: int = 0 # probe tile was 404 or placeholder; full tile pool skipped
scans_disk_space_skipped: int = 0 # disk_space_mb == 0; no mosaic or tiles attempted
def merge(self, other: "RunStats") -> None:
self.scans_fetched += other.scans_fetched
@@ -32,17 +54,67 @@ class RunStats:
self.mosaics_downloaded += other.mosaics_downloaded
self.mosaics_failed += other.mosaics_failed
self.tiles_downloaded += other.tiles_downloaded
from tqdm import tqdm
from spruce.exif import write_mosaic_exif
from spruce.paths import machine_dir_name, tile_dest, mosaic_dest, _extract_date
from spruce.progress import ProgressTracker, CsvWriter
from spruce.session import MachineSession
self.scans_probe_skipped += other.scans_probe_skipped
self.scans_disk_space_skipped += other.scans_disk_space_skipped
log = logging.getLogger(__name__)
def _read_scans_csv_latest(scans_csv_path: Path) -> dict[tuple[str, str], dict[str, str]]:
"""Last row wins per (machine, scan_id)."""
latest: dict[tuple[str, str], dict[str, str]] = {}
if not scans_csv_path.exists():
return latest
with open(scans_csv_path, newline="", encoding="utf-8") as fh:
for row in csv.DictReader(fh):
key = (row.get("machine", ""), row.get("scan_id", ""))
latest[key] = row
return latest
def load_failed_scans_from_csv(
scans_csv_path: Path,
machine_label: str,
*,
since_year: str | None = None,
error_code: str | None = None,
) -> list[dict[str, Any]]:
"""
Dedupe scans.csv by (machine, scan_id); return failed mosaic rows for one machine.
Each dict is suitable as the ``scan`` argument to ``process_scan`` (scan_id,
scan_time, name, status).
"""
latest = _read_scans_csv_latest(scans_csv_path)
out: list[dict[str, Any]] = []
for (_m, _sid), row in latest.items():
if row.get("machine") != machine_label:
continue
if row.get("mosaic_download_status") != "failed":
continue
if error_code is not None and row.get("mosaic_error_code", "") != error_code:
continue
st = row.get("scan_time", "") or ""
if since_year is not None:
yr = st[:4]
if len(yr) < 4 or yr < since_year:
continue
sid = int(row["scan_id"])
out.append(
{
"scan_id": sid,
"scan_time": st,
"name": row.get("name", ""),
"status": row.get("status", "") or "Completed",
"user": row.get("user", ""),
"scan_lines": row.get("scan_lines", ""),
"scan_mode": row.get("scan_mode", ""),
}
)
out.sort(key=lambda s: s["scan_id"])
return out
# ---------------------------------------------------------------------------
# Per-scan helpers
# ---------------------------------------------------------------------------
@@ -224,6 +296,8 @@ def process_scan(
dry_run: bool,
mosaic_only: bool,
metadata_only: bool = False,
max_tiles: int | None = None,
scans_csv_existing_ids: set[int] | None = None,
) -> RunStats:
"""
Process one scan: fetch metadata, download mosaic and (optionally) tiles.
@@ -247,7 +321,8 @@ def process_scan(
candidate = machine_root / scan_date_hint / str(scan_id) / "metadata.json"
if candidate.exists():
found_meta = candidate
if found_meta is None:
# Date hint is reliable — don't glob if candidate wasn't found.
else:
matches = list(machine_root.glob(f"*/{scan_id}/metadata.json"))
if matches:
found_meta = matches[0]
@@ -295,6 +370,23 @@ def process_scan(
):
scan_meta.setdefault(k, scan.get(k, ""))
# disk_space_mb == 0 is a reliable signal that the scan has no imagery.
# A 300-scan investigation (50 per bucket) found 0% viability in this bucket.
# Skip the mosaic and tile downloads entirely; write a record so scans.csv
# stays complete.
disk_space_skip = False
if not metadata_only:
try:
if float(scan_meta.get("disk_space_mb") or "nan") == 0.0:
disk_space_skip = True
log.info(
"[%s] Scan %d: disk_space_mb=0 — skipping mosaic and tiles.",
machine["label"],
scan_id,
)
except (ValueError, TypeError):
pass
# Save per-scan metadata.json
scan_date = _extract_date(scan_meta.get("scan_time", ""))
scan_dir = output_dir / machine_dir_name(machine) / scan_date / str(scan_id)
@@ -307,11 +399,11 @@ def process_scan(
)
stats.metadata_written += 1
# Mosaic (skipped entirely in metadata-only mode)
# Mosaic (skipped entirely in metadata-only or disk_space_skip mode)
mosaic_path = mosaic_dest(output_dir, machine, scan_meta, scan_id)
mosaic_url = sess.mosaic_url(scan_id)
mosaic_already_done = progress.is_done(mosaic_url)
if metadata_only:
if metadata_only or disk_space_skip:
mosaic_attempt: MosaicAttempt | None = None
else:
mosaic_attempt = _download_mosaic(
@@ -332,6 +424,9 @@ def process_scan(
if metadata_only:
mds, mer, mco, mcl = "skipped_metadata_only", "", "", ""
elif disk_space_skip:
mds, mer, mco, mcl = "skipped_zero_disk_space", "", "", ""
stats.scans_disk_space_skipped += 1
elif mosaic_attempt is not None:
mds = mosaic_attempt.csv_status
mer = mosaic_attempt.error
@@ -341,9 +436,16 @@ def process_scan(
mds, mer, mco, mcl = "", "", "", ""
# Write scan-level CSV row only if this scan hasn't been recorded before.
if mosaic_already_done and not metadata_only:
# Skip if: (1) mosaic URL already in .progress.json, or (2) scan already
# has a non-pending row in scans.csv from a prior run.
already_recorded = (mosaic_already_done and not metadata_only) or (
not metadata_only
and scans_csv_existing_ids is not None
and scan_id in scans_csv_existing_ids
)
if already_recorded:
log.debug(
"[%s] Scan %d: already in scans.csv (mosaic was previously downloaded), skipping CSV row.",
"[%s] Scan %d: already in scans.csv, skipping CSV row.",
machine["label"],
scan_id,
)
@@ -381,11 +483,46 @@ def process_scan(
}
)
if mosaic_only or metadata_only:
if mosaic_only or metadata_only or disk_space_skip:
return stats
# Tiles
tiles = sess.enumerate_tiles(scan_meta)
if max_tiles is not None:
tiles = tiles[:max_tiles]
# Tile probe: always download one tile before launching the full thread
# pool. Two failure modes justify this:
# 1. Mosaic failed (404/410 or empty body) — scan was set up but never
# run; tile grid is all placeholders or 404s.
# 2. Mosaic succeeded but tiles are server-side placeholders (1x1 JPEG,
# ~43 B) — mosaic was generated from empty data; downloading the full
# grid would fire thousands of guaranteed-placeholder requests.
if (
not dry_run
and tiles
and not progress.is_done(tiles[0]["url"])
):
probe_tile = tiles[0]
probe_dest = tile_dest(output_dir, machine, scan_meta, probe_tile)
probe_res = sess.download_file(probe_tile["url"], probe_dest)
if not probe_res.ok or _is_placeholder_tile(probe_dest):
probe_dest.unlink(missing_ok=True)
detail = (
"is placeholder"
if probe_res.ok
else f"failed ({probe_res.error_class or probe_res.error or 'unknown'})"
)
log.info(
"[%s] Scan %d: probe tile %s — empty/placeholder scan, skipping %d tile(s).",
machine["label"],
scan_id,
detail,
len(tiles),
)
stats.scans_probe_skipped += 1
return stats
stats.tiles_downloaded += _download_tiles_for_scan(
sess,
tiles,
@@ -417,14 +554,60 @@ def scrape_machine(
mosaic_only: bool,
metadata_only: bool = False,
scan_id_filter: int | None = None,
max_tiles: int | None = None,
retry_failed: bool = False,
retry_since_year: str | None = None,
retry_error_code: str | None = None,
) -> RunStats:
"""Login, fetch scans, and download all content for one machine."""
sess = MachineSession(machine, config)
if not sess.login():
login_ok = False
for attempt in range(1, 4):
if sess.login():
login_ok = True
break
if attempt < 3:
log.warning(
"[%s] Login failed (attempt %d/3) — retrying in 10s.",
machine["label"],
attempt,
)
time.sleep(10)
if not login_ok:
log.error("[%s] Login failed after 3 attempts — skipping machine.", machine["label"])
return RunStats()
if retry_failed:
scans = load_failed_scans_from_csv(
scans_csv.path,
machine["label"],
since_year=retry_since_year,
error_code=retry_error_code,
)
if scan_id_filter is not None:
scans: list[dict[str, Any]] = [
scans = [s for s in scans if s["scan_id"] == scan_id_filter]
if not scans:
log.warning(
"[%s] Retry: scan_id %d not among failed rows for this machine.",
machine["label"],
scan_id_filter,
)
return RunStats()
log.info("[%s] Mosaic retry: single scan %d.", machine["label"], scan_id_filter)
elif not scans:
log.warning(
"[%s] No failed mosaic rows in scans.csv match retry filters.",
machine["label"],
)
return RunStats()
else:
log.info(
"[%s] Mosaic retry: %d failed scan(s) from scans.csv.",
machine["label"],
len(scans),
)
elif scan_id_filter is not None:
scans = [
{"scan_id": scan_id_filter, "status": "Completed"}
]
log.info("[%s] Targeting scan ID %d.", machine["label"], scan_id_filter)
@@ -434,8 +617,37 @@ def scrape_machine(
log.warning("[%s] No scans found.", machine["label"])
return RunStats()
# Build existing_ids: scan_ids to skip entirely (no metadata fetch, no HTTP).
# In normal mode: skip anything with a definitive non-pending status.
# In retry mode: only skip scans that are already downloaded or skipped for
# disk-space reasons — failed scans must be re-attempted.
PENDING_STATUSES = {"skipped_metadata_only", ""}
BLOCK_AFTER_RETRY_STATUSES = {"downloaded", "skipped_zero_disk_space"}
existing_ids: set[int] = set()
if not metadata_only:
latest_rows = _read_scans_csv_latest(scans_csv.path)
for (_mlabel, _sid), _row in latest_rows.items():
if _mlabel != machine["label"]:
continue
st = _row.get("mosaic_download_status", "")
if retry_failed:
if st in BLOCK_AFTER_RETRY_STATUSES:
existing_ids.add(int(_row["scan_id"]))
else:
if st not in PENDING_STATUSES:
existing_ids.add(int(_row["scan_id"]))
stats = RunStats()
for scan in scans:
# Skip scans already fully processed in a prior run — avoids redundant
# metadata fetches and mosaic requests for known-failed / known-done scans.
if not metadata_only and scan["scan_id"] in existing_ids:
log.debug(
"[%s] Scan %d: already processed, skipping.",
machine["label"],
scan["scan_id"],
)
continue
stats.merge(process_scan(
sess=sess,
scan=scan,
@@ -448,5 +660,7 @@ def scrape_machine(
dry_run=dry_run,
mosaic_only=mosaic_only,
metadata_only=metadata_only,
max_tiles=max_tiles,
scans_csv_existing_ids=existing_ids,
))
return stats
+9 -1
View File
@@ -5,6 +5,7 @@ Progress tracking (JSON) and CSV writing.
import csv
import json
import logging
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterator
@@ -27,6 +28,7 @@ class ProgressTracker:
def __init__(self, path: Path) -> None:
self.path = path
self._done: set[str] = set()
self.started_at: str = datetime.now(timezone.utc).isoformat()
self._load()
def _load(self) -> None:
@@ -34,6 +36,8 @@ class ProgressTracker:
try:
data = json.loads(self.path.read_text())
self._done = set(data.get("completed_urls", []))
if "started_at" in data:
self.started_at = data["started_at"]
log.info("Resuming: %d URLs already downloaded.", len(self._done))
except Exception:
log.warning("Could not read progress file; starting fresh.")
@@ -59,7 +63,10 @@ class ProgressTracker:
self.path.parent.mkdir(parents=True, exist_ok=True)
tmp = self.path.with_suffix(".json.tmp")
tmp.write_text(
json.dumps({"completed_urls": sorted(self._done)}, indent=2)
json.dumps(
{"started_at": self.started_at, "completed_urls": sorted(self._done)},
indent=2,
)
)
tmp.replace(self.path) # atomic on POSIX; avoids corrupt JSON on crash
@@ -70,6 +77,7 @@ class CsvWriter:
def __init__(self, path: Path, fields: list[str]) -> None:
is_new = not path.exists()
path.parent.mkdir(parents=True, exist_ok=True)
self.path = path
self._fh = open(path, "a", newline="", encoding="utf-8")
self._writer = csv.DictWriter(self._fh, fieldnames=fields)
if is_new:
+9 -3
View File
@@ -14,6 +14,7 @@ from bs4 import BeautifulSoup
from spruce.download_result import (
OK,
PERMANENT_MISSING,
UNKNOWN,
DownloadResult,
classify_http_error,
@@ -216,8 +217,10 @@ class MachineSession:
# ------------------------------------------------------------------
def mosaic_url(self, scan_id: int) -> str:
# The server stores scan directories zero-padded to 6 digits (e.g. 010700/).
# Scans with IDs >= 100000 are unaffected since they are already 6 digits.
return urljoin(
self.image_base_url, f"RootView_Database/{scan_id}/mosaic.jpg"
self.image_base_url, f"RootView_Database/{scan_id:06d}/mosaic.jpg"
)
# ------------------------------------------------------------------
@@ -261,8 +264,12 @@ class MachineSession:
and exc.response is not None
):
sc = exc.response.status_code
cl = classify_http_error(sc, exc)
if cl == PERMANENT_MISSING:
# 404/410 will never succeed — don't waste time retrying.
return DownloadResult(0, sc, str(exc), cl)
if attempt < retries:
log.debug(
log.warning(
"Attempt %d/%d failed %s: %s — retrying in %.0fs",
attempt,
retries,
@@ -279,7 +286,6 @@ class MachineSession:
url,
exc,
)
cl = classify_http_error(sc, exc)
return DownloadResult(0, sc, str(exc), cl)
return DownloadResult(0, None, "download_file: exhausted", UNKNOWN)
+133
View File
@@ -0,0 +1,133 @@
"""Retry queue loading from scans.csv (mosaic_download_status=failed)."""
import csv
from pathlib import Path
import pytest
from spruce.orchestrator import load_failed_scans_from_csv
from spruce.settings import SCANS_CSV_FIELDS
def _blank_row(**kwargs: str) -> dict[str, str]:
row = {k: "" for k in SCANS_CSV_FIELDS}
row.update(kwargs)
return row
def _write_scans_csv(path: Path, rows: list[dict[str, str]]) -> None:
with open(path, "w", newline="", encoding="utf-8") as fh:
w = csv.DictWriter(fh, fieldnames=SCANS_CSV_FIELDS)
w.writeheader()
for r in rows:
w.writerow({k: r.get(k, "") for k in SCANS_CSV_FIELDS})
def test_load_failed_scans_dedup_keeps_last_row(tmp_path: Path) -> None:
path = tmp_path / "scans.csv"
common = {
"machine": "BW1 [X]",
"machine_id": "1",
"scan_id": "100",
"mosaic_url": "http://x/m.jpg",
"mosaic_local_path": "",
"mosaic_on_disk": "False",
}
_write_scans_csv(
path,
[
_blank_row(
**common,
mosaic_download_status="failed",
mosaic_error_code="404",
scan_time="2020-01-01",
),
_blank_row(
**common,
mosaic_download_status="failed",
mosaic_error_code="404",
scan_time="2020-06-01",
),
],
)
out = load_failed_scans_from_csv(path, "BW1 [X]")
assert len(out) == 1
assert out[0]["scan_id"] == 100
assert out[0]["scan_time"] == "2020-06-01"
def test_load_failed_scans_since_year(tmp_path: Path) -> None:
path = tmp_path / "scans.csv"
base = {
"machine": "M",
"machine_id": "1",
"mosaic_url": "",
"mosaic_local_path": "",
"mosaic_on_disk": "",
"mosaic_download_status": "failed",
"mosaic_error_code": "404",
}
_write_scans_csv(
path,
[
_blank_row(**base, scan_id="1", scan_time="2022-12-31"),
_blank_row(**base, scan_id="2", scan_time="2023-01-01"),
_blank_row(**base, scan_id="3", scan_time=""),
],
)
out = load_failed_scans_from_csv(path, "M", since_year="2023")
ids = {s["scan_id"] for s in out}
assert ids == {2}
def test_load_failed_scans_error_code(tmp_path: Path) -> None:
path = tmp_path / "scans.csv"
base = {
"machine": "M",
"machine_id": "1",
"scan_time": "2024-01-01",
"mosaic_url": "",
"mosaic_local_path": "",
"mosaic_on_disk": "",
"mosaic_download_status": "failed",
}
_write_scans_csv(
path,
[
_blank_row(**base, scan_id="10", mosaic_error_code="404"),
_blank_row(**base, scan_id="11", mosaic_error_code="200"),
],
)
out = load_failed_scans_from_csv(path, "M", error_code="200")
assert [s["scan_id"] for s in out] == [11]
def test_load_failed_scans_excludes_downloaded(tmp_path: Path) -> None:
path = tmp_path / "scans.csv"
base = {
"machine": "M",
"machine_id": "1",
"scan_time": "2024-01-01",
"mosaic_url": "",
"mosaic_local_path": "",
"mosaic_on_disk": "True",
}
_write_scans_csv(
path,
[
_blank_row(
**base,
scan_id="5",
mosaic_download_status="downloaded",
mosaic_error_code="",
),
_blank_row(
**base,
scan_id="6",
mosaic_download_status="failed",
mosaic_error_code="404",
),
],
)
out = load_failed_scans_from_csv(path, "M")
assert [s["scan_id"] for s in out] == [6]