""" High-level scrape orchestration: drives the per-machine and per-scan loops. """ import csv import json import logging import time from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass from pathlib import Path from typing import Any from tqdm import tqdm from spruce.download_result import PERMANENT_MISSING, UNKNOWN, error_code_str from spruce.exif import write_mosaic_exif from spruce.paths import machine_dir_name, tile_dest, mosaic_dest, _extract_date from spruce.progress import ProgressTracker, CsvWriter from spruce.session import MachineSession # RootView returns ~43-byte 1×1 JPEG placeholders for empty cells; stay well # below smallest observed real tile (~7 KiB in production samples). PLACEHOLDER_MAX_BYTES = 200 def _is_placeholder_tile(path: Path) -> bool: """Return True if a downloaded tile looks like a 1×1 server placeholder.""" try: return path.is_file() and path.stat().st_size <= PLACEHOLDER_MAX_BYTES except OSError: return False @dataclass class RunStats: """Accumulated counters for one or more machines.""" scans_fetched: int = 0 # scan detail page fetched (metadata), not tiles/mosaics scans_skipped: int = 0 # metadata.json already on disk; no HTTP request scans_failed: int = 0 # metadata fetch error or missing grid params metadata_written: int = 0 # new metadata.json files created mosaics_downloaded: int = 0 mosaics_failed: int = 0 # mosaic URL attempted but 0 bytes / HTTP error tiles_downloaded: int = 0 scans_probe_skipped: int = 0 # probe tile was 404 or placeholder; full tile pool skipped scans_disk_space_skipped: int = 0 # disk_space_mb == 0; no mosaic or tiles attempted def merge(self, other: "RunStats") -> None: self.scans_fetched += other.scans_fetched self.scans_skipped += other.scans_skipped self.scans_failed += other.scans_failed self.metadata_written += other.metadata_written self.mosaics_downloaded += other.mosaics_downloaded self.mosaics_failed += other.mosaics_failed self.tiles_downloaded += other.tiles_downloaded self.scans_probe_skipped += other.scans_probe_skipped self.scans_disk_space_skipped += other.scans_disk_space_skipped log = logging.getLogger(__name__) def _read_scans_csv_latest(scans_csv_path: Path) -> dict[tuple[str, str], dict[str, str]]: """Last row wins per (machine, scan_id).""" latest: dict[tuple[str, str], dict[str, str]] = {} if not scans_csv_path.exists(): return latest with open(scans_csv_path, newline="", encoding="utf-8") as fh: for row in csv.DictReader(fh): key = (row.get("machine", ""), row.get("scan_id", "")) latest[key] = row return latest def load_failed_scans_from_csv( scans_csv_path: Path, machine_label: str, *, since_year: str | None = None, error_code: str | None = None, ) -> list[dict[str, Any]]: """ Dedupe scans.csv by (machine, scan_id); return failed mosaic rows for one machine. Each dict is suitable as the ``scan`` argument to ``process_scan`` (scan_id, scan_time, name, status). """ latest = _read_scans_csv_latest(scans_csv_path) out: list[dict[str, Any]] = [] for (_m, _sid), row in latest.items(): if row.get("machine") != machine_label: continue if row.get("mosaic_download_status") != "failed": continue if error_code is not None and row.get("mosaic_error_code", "") != error_code: continue st = row.get("scan_time", "") or "" if since_year is not None: yr = st[:4] if len(yr) < 4 or yr < since_year: continue sid = int(row["scan_id"]) out.append( { "scan_id": sid, "scan_time": st, "name": row.get("name", ""), "status": row.get("status", "") or "Completed", "user": row.get("user", ""), "scan_lines": row.get("scan_lines", ""), "scan_mode": row.get("scan_mode", ""), } ) out.sort(key=lambda s: s["scan_id"]) return out # --------------------------------------------------------------------------- # Per-scan helpers # --------------------------------------------------------------------------- @dataclass class MosaicAttempt: """Outcome of a mosaic download attempt (for scans.csv and RunStats).""" downloaded: bool csv_status: str error: str error_code: str error_class: str def _download_mosaic( sess: MachineSession, scan_meta: dict[str, Any], scan_id: int, mosaic_path: Path, progress: ProgressTracker, machine: dict[str, Any], config: dict[str, Any], dry_run: bool, ) -> MosaicAttempt: """Download the scan mosaic if not already done.""" url = sess.mosaic_url(scan_id) if progress.is_done(url): return MosaicAttempt( False, "already_done", "", "", "" ) if dry_run: log.info("[DRY-RUN] Mosaic: %s → %s", url, mosaic_path) return MosaicAttempt(False, "dry_run", "", "", "") log.info("[%s] Downloading mosaic for scan %d …", machine["label"], scan_id) res = sess.download_file(url, mosaic_path) if res.ok: if config.get("write_exif", True): mmeta: dict[str, Any] | None = config.get("machine_metadata", {}).get( machine["label"] ) write_mosaic_exif( mosaic_path, scan_meta, machine, scan_id, mmeta ) progress.mark_done(url) progress.save() log.info( "[%s] Mosaic saved: %s (%.1f MB)", machine["label"], mosaic_path, res.size / 1e6, ) return MosaicAttempt(True, "downloaded", "", "", "") return MosaicAttempt( False, "failed", res.error or "", error_code_str(res.status_code), res.error_class, ) def _download_tiles_for_scan( sess: MachineSession, tiles: list[dict[str, Any]], scan_meta: dict[str, Any], scan_id: int, output_dir: Path, machine: dict[str, Any], config: dict[str, Any], progress: ProgressTracker, tiles_csv: CsvWriter, dry_run: bool, ) -> int: """Download all pending tiles for a scan. Returns count of tiles downloaded.""" # Heal progress for tiles that exist on disk but weren't recorded (e.g. # crash between write and batch save). Prevents duplicate tiles.csv rows. healed = 0 for t in tiles: if not progress.is_done(t["url"]): dest = tile_dest(output_dir, machine, scan_meta, t) if dest.exists() and dest.stat().st_size > 0: progress.mark_done(t["url"]) healed += 1 if healed: log.debug( "[%s] Scan %d: healed %d tile(s) already on disk into progress.", machine["label"], scan_id, healed, ) progress.save() pending = [t for t in tiles if not progress.is_done(t["url"])] log.info( "[%s] Scan %d: %d tiles total, %d pending.", machine["label"], scan_id, len(tiles), len(pending), ) if dry_run: for t in pending[:5]: log.info("[DRY-RUN] Tile: %s", t["url"]) if len(pending) > 5: log.info("[DRY-RUN] … and %d more tiles.", len(pending) - 5) return 0 # Attach scan_time for CSV rows for t in pending: t["scan_time"] = scan_meta.get("scan_time", "") workers: int = config["workers"] downloaded = 0 with ThreadPoolExecutor(max_workers=workers) as pool: futures = { pool.submit( sess.download_tile, tile, tile_dest(output_dir, machine, scan_meta, tile), False, ): tile for tile in pending } save_every = max(50, workers * 4) batch: list[dict[str, Any]] = [] with tqdm( total=len(pending), desc=f"{machine['label']} scan {scan_id}", unit="tile", leave=True, ) as pbar: for future in as_completed(futures): result = future.result() batch.append(result) if result.get("status") == "downloaded": progress.mark_done(result["url"]) downloaded += 1 pbar.update(1) if len(batch) >= save_every: for row in batch: tiles_csv.write(row) progress.save() batch.clear() for row in batch: tiles_csv.write(row) progress.save() log.info( "[%s] Scan %d complete: %d tiles downloaded.", machine["label"], scan_id, downloaded, ) return downloaded # --------------------------------------------------------------------------- # Per-scan driver # --------------------------------------------------------------------------- def process_scan( sess: MachineSession, scan: dict[str, Any], output_dir: Path, machine: dict[str, Any], config: dict[str, Any], progress: ProgressTracker, scans_csv: CsvWriter, tiles_csv: CsvWriter, dry_run: bool, mosaic_only: bool, metadata_only: bool = False, max_tiles: int | None = None, scans_csv_existing_ids: set[int] | None = None, ) -> RunStats: """ Process one scan: fetch metadata, download mosaic and (optionally) tiles. Returns a RunStats with counters for what happened this call. If metadata_only is True, writes metadata.json and the scans.csv row but skips both the mosaic and the tiles. """ scan_id: int = scan["scan_id"] stats = RunStats() # In metadata-only mode, skip the HTTP fetch if metadata.json already exists. # Try the date-hinted path first; fall back to a glob when scan_time is # absent (e.g. when --scan-id is used and the synthetic scan dict has no # scan_time field). if metadata_only and not dry_run: machine_root = output_dir / machine_dir_name(machine) scan_date_hint = _extract_date(scan.get("scan_time", "")) found_meta: Path | None = None if scan_date_hint and scan_date_hint != "unknown": candidate = machine_root / scan_date_hint / str(scan_id) / "metadata.json" if candidate.exists(): found_meta = candidate # Date hint is reliable — don't glob if candidate wasn't found. else: matches = list(machine_root.glob(f"*/{scan_id}/metadata.json")) if matches: found_meta = matches[0] if found_meta is not None: log.debug( "[%s] Scan %d: metadata.json already exists, skipping fetch.", machine["label"], scan_id, ) stats.scans_skipped += 1 return stats log.info("[%s] Processing scan %d …", machine["label"], scan_id) try: scan_meta = sess.get_scan_metadata(scan_id) except Exception as exc: log.error( "[%s] Cannot fetch scan %d metadata: %s", machine["label"], scan_id, exc ) stats.scans_failed += 1 return stats if not scan_meta.get("nx") or not scan_meta.get("ny"): log.warning( "[%s] Scan %d: missing grid params, skipping.", machine["label"], scan_id, ) stats.scans_failed += 1 return stats stats.scans_fetched += 1 # Merge list-level metadata into scan_meta (detail page takes precedence) for k in ( "name", "scan_time", "start_datetime", "end_datetime", "status", "user", "scan_lines", "scan_mode", ): scan_meta.setdefault(k, scan.get(k, "")) # disk_space_mb == 0 is a reliable signal that the scan has no imagery. # A 300-scan investigation (50 per bucket) found 0% viability in this bucket. # Skip the mosaic and tile downloads entirely; write a record so scans.csv # stays complete. disk_space_skip = False if not metadata_only: try: if float(scan_meta.get("disk_space_mb") or "nan") == 0.0: disk_space_skip = True log.info( "[%s] Scan %d: disk_space_mb=0 — skipping mosaic and tiles.", machine["label"], scan_id, ) except (ValueError, TypeError): pass # Save per-scan metadata.json scan_date = _extract_date(scan_meta.get("scan_time", "")) scan_dir = output_dir / machine_dir_name(machine) / scan_date / str(scan_id) if not dry_run: scan_dir.mkdir(parents=True, exist_ok=True) meta_file = scan_dir / "metadata.json" if not meta_file.exists(): meta_file.write_text( json.dumps(scan_meta, indent=2, default=str), encoding="utf-8" ) stats.metadata_written += 1 # Mosaic (skipped entirely in metadata-only or disk_space_skip mode) mosaic_path = mosaic_dest(output_dir, machine, scan_meta, scan_id) mosaic_url = sess.mosaic_url(scan_id) mosaic_already_done = progress.is_done(mosaic_url) if metadata_only or disk_space_skip: mosaic_attempt: MosaicAttempt | None = None else: mosaic_attempt = _download_mosaic( sess, scan_meta, scan_id, mosaic_path, progress, machine, config, dry_run, ) if not metadata_only and mosaic_attempt: if mosaic_attempt.downloaded: stats.mosaics_downloaded += 1 elif not dry_run and mosaic_attempt.csv_status == "failed": stats.mosaics_failed += 1 if metadata_only: mds, mer, mco, mcl = "skipped_metadata_only", "", "", "" elif disk_space_skip: mds, mer, mco, mcl = "skipped_zero_disk_space", "", "", "" stats.scans_disk_space_skipped += 1 elif mosaic_attempt is not None: mds = mosaic_attempt.csv_status mer = mosaic_attempt.error mco = mosaic_attempt.error_code mcl = mosaic_attempt.error_class else: mds, mer, mco, mcl = "", "", "", "" # Write scan-level CSV row only if this scan hasn't been recorded before. # Skip if: (1) mosaic URL already in .progress.json, or (2) scan already # has a non-pending row in scans.csv from a prior run. already_recorded = (mosaic_already_done and not metadata_only) or ( not metadata_only and scans_csv_existing_ids is not None and scan_id in scans_csv_existing_ids ) if already_recorded: log.debug( "[%s] Scan %d: already in scans.csv, skipping CSV row.", machine["label"], scan_id, ) else: scans_csv.write( { "machine": machine["label"], "machine_id": machine["machine_id"], "scan_id": scan_id, "name": scan_meta.get("name", ""), "scan_time": scan_meta.get("scan_time", ""), "start_x": scan_meta.get("start_x", ""), "start_y": scan_meta.get("start_y", ""), "end_x": scan_meta.get("end_x", ""), "end_y": scan_meta.get("end_y", ""), "dx": scan_meta.get("dx", ""), "dy": scan_meta.get("dy", ""), "nx": scan_meta.get("nx", ""), "ny": scan_meta.get("ny", ""), "total_tiles": scan_meta.get("total_tiles", ""), "scan_lines": scan_meta.get("scan_lines", ""), "scan_mode": scan_meta.get("scan_mode", ""), "start_datetime": scan_meta.get("start_datetime", ""), "end_datetime": scan_meta.get("end_datetime", ""), "status": scan_meta.get("status", ""), "user": scan_meta.get("user", ""), "disk_space_mb": scan_meta.get("disk_space_mb", ""), "mosaic_url": mosaic_url, "mosaic_local_path": str(mosaic_path), "mosaic_on_disk": mosaic_path.exists(), "mosaic_download_status": mds, "mosaic_error": mer, "mosaic_error_code": mco, "mosaic_error_class": mcl, } ) if mosaic_only or metadata_only or disk_space_skip: return stats # Tiles tiles = sess.enumerate_tiles(scan_meta) if max_tiles is not None: tiles = tiles[:max_tiles] # Tile probe: always download one tile before launching the full thread # pool. Two failure modes justify this: # 1. Mosaic failed (404/410 or empty body) — scan was set up but never # run; tile grid is all placeholders or 404s. # 2. Mosaic succeeded but tiles are server-side placeholders (1x1 JPEG, # ~43 B) — mosaic was generated from empty data; downloading the full # grid would fire thousands of guaranteed-placeholder requests. if ( not dry_run and tiles and not progress.is_done(tiles[0]["url"]) ): probe_tile = tiles[0] probe_dest = tile_dest(output_dir, machine, scan_meta, probe_tile) probe_res = sess.download_file(probe_tile["url"], probe_dest) if not probe_res.ok or _is_placeholder_tile(probe_dest): probe_dest.unlink(missing_ok=True) detail = ( "is placeholder" if probe_res.ok else f"failed ({probe_res.error_class or probe_res.error or 'unknown'})" ) log.info( "[%s] Scan %d: probe tile %s — empty/placeholder scan, skipping %d tile(s).", machine["label"], scan_id, detail, len(tiles), ) stats.scans_probe_skipped += 1 return stats stats.tiles_downloaded += _download_tiles_for_scan( sess, tiles, scan_meta, scan_id, output_dir, machine, config, progress, tiles_csv, dry_run, ) return stats # --------------------------------------------------------------------------- # Per-machine driver # --------------------------------------------------------------------------- def scrape_machine( machine: dict[str, Any], config: dict[str, Any], output_dir: Path, progress: ProgressTracker, tiles_csv: CsvWriter, scans_csv: CsvWriter, dry_run: bool, mosaic_only: bool, metadata_only: bool = False, scan_id_filter: int | None = None, max_tiles: int | None = None, retry_failed: bool = False, retry_since_year: str | None = None, retry_error_code: str | None = None, ) -> RunStats: """Login, fetch scans, and download all content for one machine.""" sess = MachineSession(machine, config) login_ok = False for attempt in range(1, 4): if sess.login(): login_ok = True break if attempt < 3: log.warning( "[%s] Login failed (attempt %d/3) — retrying in 10s.", machine["label"], attempt, ) time.sleep(10) if not login_ok: log.error("[%s] Login failed after 3 attempts — skipping machine.", machine["label"]) return RunStats() if retry_failed: scans = load_failed_scans_from_csv( scans_csv.path, machine["label"], since_year=retry_since_year, error_code=retry_error_code, ) if scan_id_filter is not None: scans = [s for s in scans if s["scan_id"] == scan_id_filter] if not scans: log.warning( "[%s] Retry: scan_id %d not among failed rows for this machine.", machine["label"], scan_id_filter, ) return RunStats() log.info("[%s] Mosaic retry: single scan %d.", machine["label"], scan_id_filter) elif not scans: log.warning( "[%s] No failed mosaic rows in scans.csv match retry filters.", machine["label"], ) return RunStats() else: log.info( "[%s] Mosaic retry: %d failed scan(s) from scans.csv.", machine["label"], len(scans), ) elif scan_id_filter is not None: scans = [ {"scan_id": scan_id_filter, "status": "Completed"} ] log.info("[%s] Targeting scan ID %d.", machine["label"], scan_id_filter) else: scans = sess.get_all_scans() if not scans: log.warning("[%s] No scans found.", machine["label"]) return RunStats() # Build existing_ids: scan_ids to skip entirely (no metadata fetch, no HTTP). # In normal mode: skip anything with a definitive non-pending status. # In retry mode: only skip scans that are already downloaded or skipped for # disk-space reasons — failed scans must be re-attempted. PENDING_STATUSES = {"skipped_metadata_only", ""} BLOCK_AFTER_RETRY_STATUSES = {"downloaded", "skipped_zero_disk_space"} existing_ids: set[int] = set() if not metadata_only: latest_rows = _read_scans_csv_latest(scans_csv.path) for (_mlabel, _sid), _row in latest_rows.items(): if _mlabel != machine["label"]: continue st = _row.get("mosaic_download_status", "") if retry_failed: if st in BLOCK_AFTER_RETRY_STATUSES: existing_ids.add(int(_row["scan_id"])) else: if st not in PENDING_STATUSES: existing_ids.add(int(_row["scan_id"])) stats = RunStats() for scan in scans: # Skip scans already fully processed in a prior run — avoids redundant # metadata fetches and mosaic requests for known-failed / known-done scans. if not metadata_only and scan["scan_id"] in existing_ids: log.debug( "[%s] Scan %d: already processed, skipping.", machine["label"], scan["scan_id"], ) continue stats.merge(process_scan( sess=sess, scan=scan, output_dir=output_dir, machine=machine, config=config, progress=progress, scans_csv=scans_csv, tiles_csv=tiles_csv, dry_run=dry_run, mosaic_only=mosaic_only, metadata_only=metadata_only, max_tiles=max_tiles, scans_csv_existing_ids=existing_ids, )) return stats