Files
SPRUCE-scraper/spruce/orchestrator.py
poprhythm 6390f5d529 Scraping resilience, metadata tooling, and repository hygiene
Consolidates mosaic and session hardening (login retry, skip processed scans, no retry on 404, started_at), progress reporting (Markdown tables, by-year rollup, rolling-window rate/ETA), and metadata workflow scripts (run_metadata_scan.sh, scan_progress_report.py, export_machine_metadata.py). Adds mosaic reconstruction sample JPEGs referenced by the report. Updates .gitignore for backup/ and .claude/; sample_random_scans helper is documented for branch testing/sample-runs only (see README).
2026-05-14 19:52:53 -04:00

575 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
High-level scrape orchestration: drives the per-machine and per-scan loops.
"""
import json
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from spruce.download_result import PERMANENT_MISSING, UNKNOWN, error_code_str
# RootView returns ~43-byte 1×1 JPEG placeholders for empty cells; stay well
# below smallest observed real tile (~7 KiB in production samples).
PLACEHOLDER_MAX_BYTES = 200
def _is_placeholder_tile(path: Path) -> bool:
"""Return True if a downloaded tile looks like a 1×1 server placeholder."""
try:
return path.is_file() and path.stat().st_size <= PLACEHOLDER_MAX_BYTES
except OSError:
return False
@dataclass
class RunStats:
"""Accumulated counters for one or more machines."""
scans_fetched: int = 0 # scan detail page fetched (metadata), not tiles/mosaics
scans_skipped: int = 0 # metadata.json already on disk; no HTTP request
scans_failed: int = 0 # metadata fetch error or missing grid params
metadata_written: int = 0 # new metadata.json files created
mosaics_downloaded: int = 0
mosaics_failed: int = 0 # mosaic URL attempted but 0 bytes / HTTP error
tiles_downloaded: int = 0
scans_probe_skipped: int = 0 # probe tile was 404 or placeholder; full tile pool skipped
scans_disk_space_skipped: int = 0 # disk_space_mb == 0; no mosaic or tiles attempted
def merge(self, other: "RunStats") -> None:
self.scans_fetched += other.scans_fetched
self.scans_skipped += other.scans_skipped
self.scans_failed += other.scans_failed
self.metadata_written += other.metadata_written
self.mosaics_downloaded += other.mosaics_downloaded
self.mosaics_failed += other.mosaics_failed
self.tiles_downloaded += other.tiles_downloaded
self.scans_probe_skipped += other.scans_probe_skipped
self.scans_disk_space_skipped += other.scans_disk_space_skipped
from tqdm import tqdm
from spruce.exif import write_mosaic_exif
from spruce.paths import machine_dir_name, tile_dest, mosaic_dest, _extract_date
from spruce.progress import ProgressTracker, CsvWriter
from spruce.session import MachineSession
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Per-scan helpers
# ---------------------------------------------------------------------------
@dataclass
class MosaicAttempt:
"""Outcome of a mosaic download attempt (for scans.csv and RunStats)."""
downloaded: bool
csv_status: str
error: str
error_code: str
error_class: str
def _download_mosaic(
sess: MachineSession,
scan_meta: dict[str, Any],
scan_id: int,
mosaic_path: Path,
progress: ProgressTracker,
machine: dict[str, Any],
config: dict[str, Any],
dry_run: bool,
) -> MosaicAttempt:
"""Download the scan mosaic if not already done."""
url = sess.mosaic_url(scan_id)
if progress.is_done(url):
return MosaicAttempt(
False, "already_done", "", "", ""
)
if dry_run:
log.info("[DRY-RUN] Mosaic: %s%s", url, mosaic_path)
return MosaicAttempt(False, "dry_run", "", "", "")
log.info("[%s] Downloading mosaic for scan %d …", machine["label"], scan_id)
res = sess.download_file(url, mosaic_path)
if res.ok:
if config.get("write_exif", True):
mmeta: dict[str, Any] | None = config.get("machine_metadata", {}).get(
machine["label"]
)
write_mosaic_exif(
mosaic_path, scan_meta, machine, scan_id, mmeta
)
progress.mark_done(url)
progress.save()
log.info(
"[%s] Mosaic saved: %s (%.1f MB)",
machine["label"],
mosaic_path,
res.size / 1e6,
)
return MosaicAttempt(True, "downloaded", "", "", "")
return MosaicAttempt(
False,
"failed",
res.error or "",
error_code_str(res.status_code),
res.error_class,
)
def _download_tiles_for_scan(
sess: MachineSession,
tiles: list[dict[str, Any]],
scan_meta: dict[str, Any],
scan_id: int,
output_dir: Path,
machine: dict[str, Any],
config: dict[str, Any],
progress: ProgressTracker,
tiles_csv: CsvWriter,
dry_run: bool,
) -> int:
"""Download all pending tiles for a scan. Returns count of tiles downloaded."""
# Heal progress for tiles that exist on disk but weren't recorded (e.g.
# crash between write and batch save). Prevents duplicate tiles.csv rows.
healed = 0
for t in tiles:
if not progress.is_done(t["url"]):
dest = tile_dest(output_dir, machine, scan_meta, t)
if dest.exists() and dest.stat().st_size > 0:
progress.mark_done(t["url"])
healed += 1
if healed:
log.debug(
"[%s] Scan %d: healed %d tile(s) already on disk into progress.",
machine["label"],
scan_id,
healed,
)
progress.save()
pending = [t for t in tiles if not progress.is_done(t["url"])]
log.info(
"[%s] Scan %d: %d tiles total, %d pending.",
machine["label"],
scan_id,
len(tiles),
len(pending),
)
if dry_run:
for t in pending[:5]:
log.info("[DRY-RUN] Tile: %s", t["url"])
if len(pending) > 5:
log.info("[DRY-RUN] … and %d more tiles.", len(pending) - 5)
return 0
# Attach scan_time for CSV rows
for t in pending:
t["scan_time"] = scan_meta.get("scan_time", "")
workers: int = config["workers"]
downloaded = 0
with ThreadPoolExecutor(max_workers=workers) as pool:
futures = {
pool.submit(
sess.download_tile,
tile,
tile_dest(output_dir, machine, scan_meta, tile),
False,
): tile
for tile in pending
}
save_every = max(50, workers * 4)
batch: list[dict[str, Any]] = []
with tqdm(
total=len(pending),
desc=f"{machine['label']} scan {scan_id}",
unit="tile",
leave=True,
) as pbar:
for future in as_completed(futures):
result = future.result()
batch.append(result)
if result.get("status") == "downloaded":
progress.mark_done(result["url"])
downloaded += 1
pbar.update(1)
if len(batch) >= save_every:
for row in batch:
tiles_csv.write(row)
progress.save()
batch.clear()
for row in batch:
tiles_csv.write(row)
progress.save()
log.info(
"[%s] Scan %d complete: %d tiles downloaded.",
machine["label"],
scan_id,
downloaded,
)
return downloaded
# ---------------------------------------------------------------------------
# Per-scan driver
# ---------------------------------------------------------------------------
def process_scan(
sess: MachineSession,
scan: dict[str, Any],
output_dir: Path,
machine: dict[str, Any],
config: dict[str, Any],
progress: ProgressTracker,
scans_csv: CsvWriter,
tiles_csv: CsvWriter,
dry_run: bool,
mosaic_only: bool,
metadata_only: bool = False,
max_tiles: int | None = None,
scans_csv_existing_ids: set[int] | None = None,
) -> RunStats:
"""
Process one scan: fetch metadata, download mosaic and (optionally) tiles.
Returns a RunStats with counters for what happened this call.
If metadata_only is True, writes metadata.json and the scans.csv row but
skips both the mosaic and the tiles.
"""
scan_id: int = scan["scan_id"]
stats = RunStats()
# In metadata-only mode, skip the HTTP fetch if metadata.json already exists.
# Try the date-hinted path first; fall back to a glob when scan_time is
# absent (e.g. when --scan-id is used and the synthetic scan dict has no
# scan_time field).
if metadata_only and not dry_run:
machine_root = output_dir / machine_dir_name(machine)
scan_date_hint = _extract_date(scan.get("scan_time", ""))
found_meta: Path | None = None
if scan_date_hint and scan_date_hint != "unknown":
candidate = machine_root / scan_date_hint / str(scan_id) / "metadata.json"
if candidate.exists():
found_meta = candidate
# Date hint is reliable — don't glob if candidate wasn't found.
else:
matches = list(machine_root.glob(f"*/{scan_id}/metadata.json"))
if matches:
found_meta = matches[0]
if found_meta is not None:
log.debug(
"[%s] Scan %d: metadata.json already exists, skipping fetch.",
machine["label"],
scan_id,
)
stats.scans_skipped += 1
return stats
log.info("[%s] Processing scan %d …", machine["label"], scan_id)
try:
scan_meta = sess.get_scan_metadata(scan_id)
except Exception as exc:
log.error(
"[%s] Cannot fetch scan %d metadata: %s", machine["label"], scan_id, exc
)
stats.scans_failed += 1
return stats
if not scan_meta.get("nx") or not scan_meta.get("ny"):
log.warning(
"[%s] Scan %d: missing grid params, skipping.",
machine["label"],
scan_id,
)
stats.scans_failed += 1
return stats
stats.scans_fetched += 1
# Merge list-level metadata into scan_meta (detail page takes precedence)
for k in (
"name",
"scan_time",
"start_datetime",
"end_datetime",
"status",
"user",
"scan_lines",
"scan_mode",
):
scan_meta.setdefault(k, scan.get(k, ""))
# disk_space_mb == 0 is a reliable signal that the scan has no imagery.
# A 300-scan investigation (50 per bucket) found 0% viability in this bucket.
# Skip the mosaic and tile downloads entirely; write a record so scans.csv
# stays complete.
disk_space_skip = False
if not metadata_only:
try:
if float(scan_meta.get("disk_space_mb") or "nan") == 0.0:
disk_space_skip = True
log.info(
"[%s] Scan %d: disk_space_mb=0 — skipping mosaic and tiles.",
machine["label"],
scan_id,
)
except (ValueError, TypeError):
pass
# Save per-scan metadata.json
scan_date = _extract_date(scan_meta.get("scan_time", ""))
scan_dir = output_dir / machine_dir_name(machine) / scan_date / str(scan_id)
if not dry_run:
scan_dir.mkdir(parents=True, exist_ok=True)
meta_file = scan_dir / "metadata.json"
if not meta_file.exists():
meta_file.write_text(
json.dumps(scan_meta, indent=2, default=str), encoding="utf-8"
)
stats.metadata_written += 1
# Mosaic (skipped entirely in metadata-only or disk_space_skip mode)
mosaic_path = mosaic_dest(output_dir, machine, scan_meta, scan_id)
mosaic_url = sess.mosaic_url(scan_id)
mosaic_already_done = progress.is_done(mosaic_url)
if metadata_only or disk_space_skip:
mosaic_attempt: MosaicAttempt | None = None
else:
mosaic_attempt = _download_mosaic(
sess,
scan_meta,
scan_id,
mosaic_path,
progress,
machine,
config,
dry_run,
)
if not metadata_only and mosaic_attempt:
if mosaic_attempt.downloaded:
stats.mosaics_downloaded += 1
elif not dry_run and mosaic_attempt.csv_status == "failed":
stats.mosaics_failed += 1
if metadata_only:
mds, mer, mco, mcl = "skipped_metadata_only", "", "", ""
elif disk_space_skip:
mds, mer, mco, mcl = "skipped_zero_disk_space", "", "", ""
stats.scans_disk_space_skipped += 1
elif mosaic_attempt is not None:
mds = mosaic_attempt.csv_status
mer = mosaic_attempt.error
mco = mosaic_attempt.error_code
mcl = mosaic_attempt.error_class
else:
mds, mer, mco, mcl = "", "", "", ""
# Write scan-level CSV row only if this scan hasn't been recorded before.
# Skip if: (1) mosaic URL already in .progress.json, or (2) scan already
# has a non-pending row in scans.csv from a prior run.
already_recorded = (mosaic_already_done and not metadata_only) or (
not metadata_only
and scans_csv_existing_ids is not None
and scan_id in scans_csv_existing_ids
)
if already_recorded:
log.debug(
"[%s] Scan %d: already in scans.csv, skipping CSV row.",
machine["label"],
scan_id,
)
else:
scans_csv.write(
{
"machine": machine["label"],
"machine_id": machine["machine_id"],
"scan_id": scan_id,
"name": scan_meta.get("name", ""),
"scan_time": scan_meta.get("scan_time", ""),
"start_x": scan_meta.get("start_x", ""),
"start_y": scan_meta.get("start_y", ""),
"end_x": scan_meta.get("end_x", ""),
"end_y": scan_meta.get("end_y", ""),
"dx": scan_meta.get("dx", ""),
"dy": scan_meta.get("dy", ""),
"nx": scan_meta.get("nx", ""),
"ny": scan_meta.get("ny", ""),
"total_tiles": scan_meta.get("total_tiles", ""),
"scan_lines": scan_meta.get("scan_lines", ""),
"scan_mode": scan_meta.get("scan_mode", ""),
"start_datetime": scan_meta.get("start_datetime", ""),
"end_datetime": scan_meta.get("end_datetime", ""),
"status": scan_meta.get("status", ""),
"user": scan_meta.get("user", ""),
"disk_space_mb": scan_meta.get("disk_space_mb", ""),
"mosaic_url": mosaic_url,
"mosaic_local_path": str(mosaic_path),
"mosaic_on_disk": mosaic_path.exists(),
"mosaic_download_status": mds,
"mosaic_error": mer,
"mosaic_error_code": mco,
"mosaic_error_class": mcl,
}
)
if mosaic_only or metadata_only or disk_space_skip:
return stats
# Tiles
tiles = sess.enumerate_tiles(scan_meta)
if max_tiles is not None:
tiles = tiles[:max_tiles]
# Tile probe: always download one tile before launching the full thread
# pool. Two failure modes justify this:
# 1. Mosaic failed (404/410 or empty body) — scan was set up but never
# run; tile grid is all placeholders or 404s.
# 2. Mosaic succeeded but tiles are server-side placeholders (1x1 JPEG,
# ~43 B) — mosaic was generated from empty data; downloading the full
# grid would fire thousands of guaranteed-placeholder requests.
if (
not dry_run
and tiles
and not progress.is_done(tiles[0]["url"])
):
probe_tile = tiles[0]
probe_dest = tile_dest(output_dir, machine, scan_meta, probe_tile)
probe_res = sess.download_file(probe_tile["url"], probe_dest)
if not probe_res.ok or _is_placeholder_tile(probe_dest):
probe_dest.unlink(missing_ok=True)
detail = (
"is placeholder"
if probe_res.ok
else f"failed ({probe_res.error_class or probe_res.error or 'unknown'})"
)
log.info(
"[%s] Scan %d: probe tile %s — empty/placeholder scan, skipping %d tile(s).",
machine["label"],
scan_id,
detail,
len(tiles),
)
stats.scans_probe_skipped += 1
return stats
stats.tiles_downloaded += _download_tiles_for_scan(
sess,
tiles,
scan_meta,
scan_id,
output_dir,
machine,
config,
progress,
tiles_csv,
dry_run,
)
return stats
# ---------------------------------------------------------------------------
# Per-machine driver
# ---------------------------------------------------------------------------
def scrape_machine(
machine: dict[str, Any],
config: dict[str, Any],
output_dir: Path,
progress: ProgressTracker,
tiles_csv: CsvWriter,
scans_csv: CsvWriter,
dry_run: bool,
mosaic_only: bool,
metadata_only: bool = False,
scan_id_filter: int | None = None,
max_tiles: int | None = None,
) -> RunStats:
"""Login, fetch scans, and download all content for one machine."""
sess = MachineSession(machine, config)
login_ok = False
for attempt in range(1, 4):
if sess.login():
login_ok = True
break
if attempt < 3:
log.warning(
"[%s] Login failed (attempt %d/3) — retrying in 10s.",
machine["label"],
attempt,
)
time.sleep(10)
if not login_ok:
log.error("[%s] Login failed after 3 attempts — skipping machine.", machine["label"])
return RunStats()
if scan_id_filter is not None:
scans: list[dict[str, Any]] = [
{"scan_id": scan_id_filter, "status": "Completed"}
]
log.info("[%s] Targeting scan ID %d.", machine["label"], scan_id_filter)
else:
scans = sess.get_all_scans()
if not scans:
log.warning("[%s] No scans found.", machine["label"])
return RunStats()
# Build a set of scan_ids already fully processed in a prior run so we can
# skip them entirely (no metadata fetch, no mosaic request).
# Only scans with a definitive non-pending status count; skipped_metadata_only
# rows still need to be processed in mosaic mode.
PENDING_STATUSES = {"skipped_metadata_only", ""}
existing_ids: set[int] = set()
if not metadata_only and scans_csv._fh.name:
existing_path = Path(scans_csv._fh.name)
if existing_path.exists():
import csv as _csv
with open(existing_path, newline="", encoding="utf-8") as _f:
for _row in _csv.DictReader(_f):
if _row.get("machine") == machine["label"]:
if _row.get("mosaic_download_status", "") not in PENDING_STATUSES:
existing_ids.add(int(_row["scan_id"]))
stats = RunStats()
for scan in scans:
# Skip scans already fully processed in a prior run — avoids redundant
# metadata fetches and mosaic requests for known-failed / known-done scans.
if not metadata_only and scan["scan_id"] in existing_ids:
log.debug(
"[%s] Scan %d: already processed, skipping.",
machine["label"],
scan["scan_id"],
)
continue
stats.merge(process_scan(
sess=sess,
scan=scan,
output_dir=output_dir,
machine=machine,
config=config,
progress=progress,
scans_csv=scans_csv,
tiles_csv=tiles_csv,
dry_run=dry_run,
mosaic_only=mosaic_only,
metadata_only=metadata_only,
max_tiles=max_tiles,
scans_csv_existing_ids=existing_ids,
))
return stats