6390f5d529
Consolidates mosaic and session hardening (login retry, skip processed scans, no retry on 404, started_at), progress reporting (Markdown tables, by-year rollup, rolling-window rate/ETA), and metadata workflow scripts (run_metadata_scan.sh, scan_progress_report.py, export_machine_metadata.py). Adds mosaic reconstruction sample JPEGs referenced by the report. Updates .gitignore for backup/ and .claude/; sample_random_scans helper is documented for branch testing/sample-runs only (see README).
575 lines
20 KiB
Python
575 lines
20 KiB
Python
"""
|
||
High-level scrape orchestration: drives the per-machine and per-scan loops.
|
||
"""
|
||
|
||
import json
|
||
import logging
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
from dataclasses import dataclass
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
from spruce.download_result import PERMANENT_MISSING, UNKNOWN, error_code_str
|
||
|
||
# RootView returns ~43-byte 1×1 JPEG placeholders for empty cells; stay well
|
||
# below smallest observed real tile (~7 KiB in production samples).
|
||
PLACEHOLDER_MAX_BYTES = 200
|
||
|
||
|
||
def _is_placeholder_tile(path: Path) -> bool:
|
||
"""Return True if a downloaded tile looks like a 1×1 server placeholder."""
|
||
try:
|
||
return path.is_file() and path.stat().st_size <= PLACEHOLDER_MAX_BYTES
|
||
except OSError:
|
||
return False
|
||
|
||
|
||
@dataclass
|
||
class RunStats:
|
||
"""Accumulated counters for one or more machines."""
|
||
|
||
scans_fetched: int = 0 # scan detail page fetched (metadata), not tiles/mosaics
|
||
scans_skipped: int = 0 # metadata.json already on disk; no HTTP request
|
||
scans_failed: int = 0 # metadata fetch error or missing grid params
|
||
metadata_written: int = 0 # new metadata.json files created
|
||
mosaics_downloaded: int = 0
|
||
mosaics_failed: int = 0 # mosaic URL attempted but 0 bytes / HTTP error
|
||
tiles_downloaded: int = 0
|
||
scans_probe_skipped: int = 0 # probe tile was 404 or placeholder; full tile pool skipped
|
||
scans_disk_space_skipped: int = 0 # disk_space_mb == 0; no mosaic or tiles attempted
|
||
|
||
def merge(self, other: "RunStats") -> None:
|
||
self.scans_fetched += other.scans_fetched
|
||
self.scans_skipped += other.scans_skipped
|
||
self.scans_failed += other.scans_failed
|
||
self.metadata_written += other.metadata_written
|
||
self.mosaics_downloaded += other.mosaics_downloaded
|
||
self.mosaics_failed += other.mosaics_failed
|
||
self.tiles_downloaded += other.tiles_downloaded
|
||
self.scans_probe_skipped += other.scans_probe_skipped
|
||
self.scans_disk_space_skipped += other.scans_disk_space_skipped
|
||
|
||
from tqdm import tqdm
|
||
|
||
from spruce.exif import write_mosaic_exif
|
||
from spruce.paths import machine_dir_name, tile_dest, mosaic_dest, _extract_date
|
||
from spruce.progress import ProgressTracker, CsvWriter
|
||
from spruce.session import MachineSession
|
||
|
||
log = logging.getLogger(__name__)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Per-scan helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@dataclass
|
||
class MosaicAttempt:
|
||
"""Outcome of a mosaic download attempt (for scans.csv and RunStats)."""
|
||
|
||
downloaded: bool
|
||
csv_status: str
|
||
error: str
|
||
error_code: str
|
||
error_class: str
|
||
|
||
|
||
def _download_mosaic(
|
||
sess: MachineSession,
|
||
scan_meta: dict[str, Any],
|
||
scan_id: int,
|
||
mosaic_path: Path,
|
||
progress: ProgressTracker,
|
||
machine: dict[str, Any],
|
||
config: dict[str, Any],
|
||
dry_run: bool,
|
||
) -> MosaicAttempt:
|
||
"""Download the scan mosaic if not already done."""
|
||
url = sess.mosaic_url(scan_id)
|
||
if progress.is_done(url):
|
||
return MosaicAttempt(
|
||
False, "already_done", "", "", ""
|
||
)
|
||
if dry_run:
|
||
log.info("[DRY-RUN] Mosaic: %s → %s", url, mosaic_path)
|
||
return MosaicAttempt(False, "dry_run", "", "", "")
|
||
log.info("[%s] Downloading mosaic for scan %d …", machine["label"], scan_id)
|
||
res = sess.download_file(url, mosaic_path)
|
||
if res.ok:
|
||
if config.get("write_exif", True):
|
||
mmeta: dict[str, Any] | None = config.get("machine_metadata", {}).get(
|
||
machine["label"]
|
||
)
|
||
write_mosaic_exif(
|
||
mosaic_path, scan_meta, machine, scan_id, mmeta
|
||
)
|
||
progress.mark_done(url)
|
||
progress.save()
|
||
log.info(
|
||
"[%s] Mosaic saved: %s (%.1f MB)",
|
||
machine["label"],
|
||
mosaic_path,
|
||
res.size / 1e6,
|
||
)
|
||
return MosaicAttempt(True, "downloaded", "", "", "")
|
||
return MosaicAttempt(
|
||
False,
|
||
"failed",
|
||
res.error or "",
|
||
error_code_str(res.status_code),
|
||
res.error_class,
|
||
)
|
||
|
||
|
||
def _download_tiles_for_scan(
|
||
sess: MachineSession,
|
||
tiles: list[dict[str, Any]],
|
||
scan_meta: dict[str, Any],
|
||
scan_id: int,
|
||
output_dir: Path,
|
||
machine: dict[str, Any],
|
||
config: dict[str, Any],
|
||
progress: ProgressTracker,
|
||
tiles_csv: CsvWriter,
|
||
dry_run: bool,
|
||
) -> int:
|
||
"""Download all pending tiles for a scan. Returns count of tiles downloaded."""
|
||
# Heal progress for tiles that exist on disk but weren't recorded (e.g.
|
||
# crash between write and batch save). Prevents duplicate tiles.csv rows.
|
||
healed = 0
|
||
for t in tiles:
|
||
if not progress.is_done(t["url"]):
|
||
dest = tile_dest(output_dir, machine, scan_meta, t)
|
||
if dest.exists() and dest.stat().st_size > 0:
|
||
progress.mark_done(t["url"])
|
||
healed += 1
|
||
if healed:
|
||
log.debug(
|
||
"[%s] Scan %d: healed %d tile(s) already on disk into progress.",
|
||
machine["label"],
|
||
scan_id,
|
||
healed,
|
||
)
|
||
progress.save()
|
||
|
||
pending = [t for t in tiles if not progress.is_done(t["url"])]
|
||
log.info(
|
||
"[%s] Scan %d: %d tiles total, %d pending.",
|
||
machine["label"],
|
||
scan_id,
|
||
len(tiles),
|
||
len(pending),
|
||
)
|
||
|
||
if dry_run:
|
||
for t in pending[:5]:
|
||
log.info("[DRY-RUN] Tile: %s", t["url"])
|
||
if len(pending) > 5:
|
||
log.info("[DRY-RUN] … and %d more tiles.", len(pending) - 5)
|
||
return 0
|
||
|
||
# Attach scan_time for CSV rows
|
||
for t in pending:
|
||
t["scan_time"] = scan_meta.get("scan_time", "")
|
||
|
||
workers: int = config["workers"]
|
||
downloaded = 0
|
||
|
||
with ThreadPoolExecutor(max_workers=workers) as pool:
|
||
futures = {
|
||
pool.submit(
|
||
sess.download_tile,
|
||
tile,
|
||
tile_dest(output_dir, machine, scan_meta, tile),
|
||
False,
|
||
): tile
|
||
for tile in pending
|
||
}
|
||
|
||
save_every = max(50, workers * 4)
|
||
batch: list[dict[str, Any]] = []
|
||
|
||
with tqdm(
|
||
total=len(pending),
|
||
desc=f"{machine['label']} scan {scan_id}",
|
||
unit="tile",
|
||
leave=True,
|
||
) as pbar:
|
||
for future in as_completed(futures):
|
||
result = future.result()
|
||
batch.append(result)
|
||
if result.get("status") == "downloaded":
|
||
progress.mark_done(result["url"])
|
||
downloaded += 1
|
||
pbar.update(1)
|
||
|
||
if len(batch) >= save_every:
|
||
for row in batch:
|
||
tiles_csv.write(row)
|
||
progress.save()
|
||
batch.clear()
|
||
|
||
for row in batch:
|
||
tiles_csv.write(row)
|
||
progress.save()
|
||
|
||
log.info(
|
||
"[%s] Scan %d complete: %d tiles downloaded.",
|
||
machine["label"],
|
||
scan_id,
|
||
downloaded,
|
||
)
|
||
return downloaded
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Per-scan driver
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def process_scan(
|
||
sess: MachineSession,
|
||
scan: dict[str, Any],
|
||
output_dir: Path,
|
||
machine: dict[str, Any],
|
||
config: dict[str, Any],
|
||
progress: ProgressTracker,
|
||
scans_csv: CsvWriter,
|
||
tiles_csv: CsvWriter,
|
||
dry_run: bool,
|
||
mosaic_only: bool,
|
||
metadata_only: bool = False,
|
||
max_tiles: int | None = None,
|
||
scans_csv_existing_ids: set[int] | None = None,
|
||
) -> RunStats:
|
||
"""
|
||
Process one scan: fetch metadata, download mosaic and (optionally) tiles.
|
||
Returns a RunStats with counters for what happened this call.
|
||
|
||
If metadata_only is True, writes metadata.json and the scans.csv row but
|
||
skips both the mosaic and the tiles.
|
||
"""
|
||
scan_id: int = scan["scan_id"]
|
||
stats = RunStats()
|
||
|
||
# In metadata-only mode, skip the HTTP fetch if metadata.json already exists.
|
||
# Try the date-hinted path first; fall back to a glob when scan_time is
|
||
# absent (e.g. when --scan-id is used and the synthetic scan dict has no
|
||
# scan_time field).
|
||
if metadata_only and not dry_run:
|
||
machine_root = output_dir / machine_dir_name(machine)
|
||
scan_date_hint = _extract_date(scan.get("scan_time", ""))
|
||
found_meta: Path | None = None
|
||
if scan_date_hint and scan_date_hint != "unknown":
|
||
candidate = machine_root / scan_date_hint / str(scan_id) / "metadata.json"
|
||
if candidate.exists():
|
||
found_meta = candidate
|
||
# Date hint is reliable — don't glob if candidate wasn't found.
|
||
else:
|
||
matches = list(machine_root.glob(f"*/{scan_id}/metadata.json"))
|
||
if matches:
|
||
found_meta = matches[0]
|
||
if found_meta is not None:
|
||
log.debug(
|
||
"[%s] Scan %d: metadata.json already exists, skipping fetch.",
|
||
machine["label"],
|
||
scan_id,
|
||
)
|
||
stats.scans_skipped += 1
|
||
return stats
|
||
|
||
log.info("[%s] Processing scan %d …", machine["label"], scan_id)
|
||
|
||
try:
|
||
scan_meta = sess.get_scan_metadata(scan_id)
|
||
except Exception as exc:
|
||
log.error(
|
||
"[%s] Cannot fetch scan %d metadata: %s", machine["label"], scan_id, exc
|
||
)
|
||
stats.scans_failed += 1
|
||
return stats
|
||
|
||
if not scan_meta.get("nx") or not scan_meta.get("ny"):
|
||
log.warning(
|
||
"[%s] Scan %d: missing grid params, skipping.",
|
||
machine["label"],
|
||
scan_id,
|
||
)
|
||
stats.scans_failed += 1
|
||
return stats
|
||
|
||
stats.scans_fetched += 1
|
||
|
||
# Merge list-level metadata into scan_meta (detail page takes precedence)
|
||
for k in (
|
||
"name",
|
||
"scan_time",
|
||
"start_datetime",
|
||
"end_datetime",
|
||
"status",
|
||
"user",
|
||
"scan_lines",
|
||
"scan_mode",
|
||
):
|
||
scan_meta.setdefault(k, scan.get(k, ""))
|
||
|
||
# disk_space_mb == 0 is a reliable signal that the scan has no imagery.
|
||
# A 300-scan investigation (50 per bucket) found 0% viability in this bucket.
|
||
# Skip the mosaic and tile downloads entirely; write a record so scans.csv
|
||
# stays complete.
|
||
disk_space_skip = False
|
||
if not metadata_only:
|
||
try:
|
||
if float(scan_meta.get("disk_space_mb") or "nan") == 0.0:
|
||
disk_space_skip = True
|
||
log.info(
|
||
"[%s] Scan %d: disk_space_mb=0 — skipping mosaic and tiles.",
|
||
machine["label"],
|
||
scan_id,
|
||
)
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
# Save per-scan metadata.json
|
||
scan_date = _extract_date(scan_meta.get("scan_time", ""))
|
||
scan_dir = output_dir / machine_dir_name(machine) / scan_date / str(scan_id)
|
||
if not dry_run:
|
||
scan_dir.mkdir(parents=True, exist_ok=True)
|
||
meta_file = scan_dir / "metadata.json"
|
||
if not meta_file.exists():
|
||
meta_file.write_text(
|
||
json.dumps(scan_meta, indent=2, default=str), encoding="utf-8"
|
||
)
|
||
stats.metadata_written += 1
|
||
|
||
# Mosaic (skipped entirely in metadata-only or disk_space_skip mode)
|
||
mosaic_path = mosaic_dest(output_dir, machine, scan_meta, scan_id)
|
||
mosaic_url = sess.mosaic_url(scan_id)
|
||
mosaic_already_done = progress.is_done(mosaic_url)
|
||
if metadata_only or disk_space_skip:
|
||
mosaic_attempt: MosaicAttempt | None = None
|
||
else:
|
||
mosaic_attempt = _download_mosaic(
|
||
sess,
|
||
scan_meta,
|
||
scan_id,
|
||
mosaic_path,
|
||
progress,
|
||
machine,
|
||
config,
|
||
dry_run,
|
||
)
|
||
if not metadata_only and mosaic_attempt:
|
||
if mosaic_attempt.downloaded:
|
||
stats.mosaics_downloaded += 1
|
||
elif not dry_run and mosaic_attempt.csv_status == "failed":
|
||
stats.mosaics_failed += 1
|
||
|
||
if metadata_only:
|
||
mds, mer, mco, mcl = "skipped_metadata_only", "", "", ""
|
||
elif disk_space_skip:
|
||
mds, mer, mco, mcl = "skipped_zero_disk_space", "", "", ""
|
||
stats.scans_disk_space_skipped += 1
|
||
elif mosaic_attempt is not None:
|
||
mds = mosaic_attempt.csv_status
|
||
mer = mosaic_attempt.error
|
||
mco = mosaic_attempt.error_code
|
||
mcl = mosaic_attempt.error_class
|
||
else:
|
||
mds, mer, mco, mcl = "", "", "", ""
|
||
|
||
# Write scan-level CSV row only if this scan hasn't been recorded before.
|
||
# Skip if: (1) mosaic URL already in .progress.json, or (2) scan already
|
||
# has a non-pending row in scans.csv from a prior run.
|
||
already_recorded = (mosaic_already_done and not metadata_only) or (
|
||
not metadata_only
|
||
and scans_csv_existing_ids is not None
|
||
and scan_id in scans_csv_existing_ids
|
||
)
|
||
if already_recorded:
|
||
log.debug(
|
||
"[%s] Scan %d: already in scans.csv, skipping CSV row.",
|
||
machine["label"],
|
||
scan_id,
|
||
)
|
||
else:
|
||
scans_csv.write(
|
||
{
|
||
"machine": machine["label"],
|
||
"machine_id": machine["machine_id"],
|
||
"scan_id": scan_id,
|
||
"name": scan_meta.get("name", ""),
|
||
"scan_time": scan_meta.get("scan_time", ""),
|
||
"start_x": scan_meta.get("start_x", ""),
|
||
"start_y": scan_meta.get("start_y", ""),
|
||
"end_x": scan_meta.get("end_x", ""),
|
||
"end_y": scan_meta.get("end_y", ""),
|
||
"dx": scan_meta.get("dx", ""),
|
||
"dy": scan_meta.get("dy", ""),
|
||
"nx": scan_meta.get("nx", ""),
|
||
"ny": scan_meta.get("ny", ""),
|
||
"total_tiles": scan_meta.get("total_tiles", ""),
|
||
"scan_lines": scan_meta.get("scan_lines", ""),
|
||
"scan_mode": scan_meta.get("scan_mode", ""),
|
||
"start_datetime": scan_meta.get("start_datetime", ""),
|
||
"end_datetime": scan_meta.get("end_datetime", ""),
|
||
"status": scan_meta.get("status", ""),
|
||
"user": scan_meta.get("user", ""),
|
||
"disk_space_mb": scan_meta.get("disk_space_mb", ""),
|
||
"mosaic_url": mosaic_url,
|
||
"mosaic_local_path": str(mosaic_path),
|
||
"mosaic_on_disk": mosaic_path.exists(),
|
||
"mosaic_download_status": mds,
|
||
"mosaic_error": mer,
|
||
"mosaic_error_code": mco,
|
||
"mosaic_error_class": mcl,
|
||
}
|
||
)
|
||
|
||
if mosaic_only or metadata_only or disk_space_skip:
|
||
return stats
|
||
|
||
# Tiles
|
||
tiles = sess.enumerate_tiles(scan_meta)
|
||
if max_tiles is not None:
|
||
tiles = tiles[:max_tiles]
|
||
|
||
# Tile probe: always download one tile before launching the full thread
|
||
# pool. Two failure modes justify this:
|
||
# 1. Mosaic failed (404/410 or empty body) — scan was set up but never
|
||
# run; tile grid is all placeholders or 404s.
|
||
# 2. Mosaic succeeded but tiles are server-side placeholders (1x1 JPEG,
|
||
# ~43 B) — mosaic was generated from empty data; downloading the full
|
||
# grid would fire thousands of guaranteed-placeholder requests.
|
||
if (
|
||
not dry_run
|
||
and tiles
|
||
and not progress.is_done(tiles[0]["url"])
|
||
):
|
||
probe_tile = tiles[0]
|
||
probe_dest = tile_dest(output_dir, machine, scan_meta, probe_tile)
|
||
probe_res = sess.download_file(probe_tile["url"], probe_dest)
|
||
if not probe_res.ok or _is_placeholder_tile(probe_dest):
|
||
probe_dest.unlink(missing_ok=True)
|
||
detail = (
|
||
"is placeholder"
|
||
if probe_res.ok
|
||
else f"failed ({probe_res.error_class or probe_res.error or 'unknown'})"
|
||
)
|
||
log.info(
|
||
"[%s] Scan %d: probe tile %s — empty/placeholder scan, skipping %d tile(s).",
|
||
machine["label"],
|
||
scan_id,
|
||
detail,
|
||
len(tiles),
|
||
)
|
||
stats.scans_probe_skipped += 1
|
||
return stats
|
||
|
||
stats.tiles_downloaded += _download_tiles_for_scan(
|
||
sess,
|
||
tiles,
|
||
scan_meta,
|
||
scan_id,
|
||
output_dir,
|
||
machine,
|
||
config,
|
||
progress,
|
||
tiles_csv,
|
||
dry_run,
|
||
)
|
||
return stats
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Per-machine driver
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def scrape_machine(
|
||
machine: dict[str, Any],
|
||
config: dict[str, Any],
|
||
output_dir: Path,
|
||
progress: ProgressTracker,
|
||
tiles_csv: CsvWriter,
|
||
scans_csv: CsvWriter,
|
||
dry_run: bool,
|
||
mosaic_only: bool,
|
||
metadata_only: bool = False,
|
||
scan_id_filter: int | None = None,
|
||
max_tiles: int | None = None,
|
||
) -> RunStats:
|
||
"""Login, fetch scans, and download all content for one machine."""
|
||
sess = MachineSession(machine, config)
|
||
login_ok = False
|
||
for attempt in range(1, 4):
|
||
if sess.login():
|
||
login_ok = True
|
||
break
|
||
if attempt < 3:
|
||
log.warning(
|
||
"[%s] Login failed (attempt %d/3) — retrying in 10s.",
|
||
machine["label"],
|
||
attempt,
|
||
)
|
||
time.sleep(10)
|
||
if not login_ok:
|
||
log.error("[%s] Login failed after 3 attempts — skipping machine.", machine["label"])
|
||
return RunStats()
|
||
|
||
if scan_id_filter is not None:
|
||
scans: list[dict[str, Any]] = [
|
||
{"scan_id": scan_id_filter, "status": "Completed"}
|
||
]
|
||
log.info("[%s] Targeting scan ID %d.", machine["label"], scan_id_filter)
|
||
else:
|
||
scans = sess.get_all_scans()
|
||
if not scans:
|
||
log.warning("[%s] No scans found.", machine["label"])
|
||
return RunStats()
|
||
|
||
# Build a set of scan_ids already fully processed in a prior run so we can
|
||
# skip them entirely (no metadata fetch, no mosaic request).
|
||
# Only scans with a definitive non-pending status count; skipped_metadata_only
|
||
# rows still need to be processed in mosaic mode.
|
||
PENDING_STATUSES = {"skipped_metadata_only", ""}
|
||
existing_ids: set[int] = set()
|
||
if not metadata_only and scans_csv._fh.name:
|
||
existing_path = Path(scans_csv._fh.name)
|
||
if existing_path.exists():
|
||
import csv as _csv
|
||
with open(existing_path, newline="", encoding="utf-8") as _f:
|
||
for _row in _csv.DictReader(_f):
|
||
if _row.get("machine") == machine["label"]:
|
||
if _row.get("mosaic_download_status", "") not in PENDING_STATUSES:
|
||
existing_ids.add(int(_row["scan_id"]))
|
||
|
||
stats = RunStats()
|
||
for scan in scans:
|
||
# Skip scans already fully processed in a prior run — avoids redundant
|
||
# metadata fetches and mosaic requests for known-failed / known-done scans.
|
||
if not metadata_only and scan["scan_id"] in existing_ids:
|
||
log.debug(
|
||
"[%s] Scan %d: already processed, skipping.",
|
||
machine["label"],
|
||
scan["scan_id"],
|
||
)
|
||
continue
|
||
stats.merge(process_scan(
|
||
sess=sess,
|
||
scan=scan,
|
||
output_dir=output_dir,
|
||
machine=machine,
|
||
config=config,
|
||
progress=progress,
|
||
scans_csv=scans_csv,
|
||
tiles_csv=tiles_csv,
|
||
dry_run=dry_run,
|
||
mosaic_only=mosaic_only,
|
||
metadata_only=metadata_only,
|
||
max_tiles=max_tiles,
|
||
scans_csv_existing_ids=existing_ids,
|
||
))
|
||
return stats
|