""" High-level scrape orchestration: drives the per-machine and per-scan loops. """ import json import logging from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass, field from pathlib import Path from typing import Any @dataclass class RunStats: """Accumulated counters for one or more machines.""" scans_fetched: int = 0 # scan detail page fetched (metadata), not tiles/mosaics scans_skipped: int = 0 # metadata.json already on disk; no HTTP request scans_failed: int = 0 # metadata fetch error or missing grid params metadata_written: int = 0 # new metadata.json files created mosaics_downloaded: int = 0 mosaics_failed: int = 0 # mosaic URL attempted but 0 bytes / HTTP error tiles_downloaded: int = 0 def merge(self, other: "RunStats") -> None: self.scans_fetched += other.scans_fetched self.scans_skipped += other.scans_skipped self.scans_failed += other.scans_failed self.metadata_written += other.metadata_written self.mosaics_downloaded += other.mosaics_downloaded self.mosaics_failed += other.mosaics_failed self.tiles_downloaded += other.tiles_downloaded from tqdm import tqdm from spruce.exif import write_mosaic_exif from spruce.paths import machine_dir_name, tile_dest, mosaic_dest, _extract_date from spruce.progress import ProgressTracker, CsvWriter from spruce.session import MachineSession log = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Per-scan helpers # --------------------------------------------------------------------------- def _download_mosaic( sess: MachineSession, scan_meta: dict[str, Any], scan_id: int, mosaic_path: Path, progress: ProgressTracker, machine: dict[str, Any], config: dict[str, Any], dry_run: bool, ) -> bool: """Download the scan mosaic if not already done. Returns True if downloaded.""" url = sess.mosaic_url(scan_id) if progress.is_done(url): return False if dry_run: log.info("[DRY-RUN] Mosaic: %s → %s", url, mosaic_path) return False log.info("[%s] Downloading mosaic for scan %d …", machine["label"], scan_id) size = sess.download_file(url, mosaic_path) if size: if config.get("write_exif", True): mmeta: dict[str, Any] | None = config.get("machine_metadata", {}).get( machine["label"] ) write_mosaic_exif( mosaic_path, scan_meta, machine, scan_id, mmeta ) progress.mark_done(url) progress.save() log.info( "[%s] Mosaic saved: %s (%.1f MB)", machine["label"], mosaic_path, size / 1e6, ) return True return False def _download_tiles_for_scan( sess: MachineSession, tiles: list[dict[str, Any]], scan_meta: dict[str, Any], scan_id: int, output_dir: Path, machine: dict[str, Any], config: dict[str, Any], progress: ProgressTracker, tiles_csv: CsvWriter, dry_run: bool, ) -> int: """Download all pending tiles for a scan. Returns count of tiles downloaded.""" # Heal progress for tiles that exist on disk but weren't recorded (e.g. # crash between write and batch save). Prevents duplicate tiles.csv rows. healed = 0 for t in tiles: if not progress.is_done(t["url"]): dest = tile_dest(output_dir, machine, scan_meta, t) if dest.exists() and dest.stat().st_size > 0: progress.mark_done(t["url"]) healed += 1 if healed: log.debug( "[%s] Scan %d: healed %d tile(s) already on disk into progress.", machine["label"], scan_id, healed, ) progress.save() pending = [t for t in tiles if not progress.is_done(t["url"])] log.info( "[%s] Scan %d: %d tiles total, %d pending.", machine["label"], scan_id, len(tiles), len(pending), ) if dry_run: for t in pending[:5]: log.info("[DRY-RUN] Tile: %s", t["url"]) if len(pending) > 5: log.info("[DRY-RUN] … and %d more tiles.", len(pending) - 5) return 0 # Attach scan_time for CSV rows for t in pending: t["scan_time"] = scan_meta.get("scan_time", "") workers: int = config["workers"] downloaded = 0 with ThreadPoolExecutor(max_workers=workers) as pool: futures = { pool.submit( sess.download_tile, tile, tile_dest(output_dir, machine, scan_meta, tile), False, ): tile for tile in pending } save_every = max(50, workers * 4) batch: list[dict[str, Any]] = [] with tqdm( total=len(pending), desc=f"{machine['label']} scan {scan_id}", unit="tile", leave=True, ) as pbar: for future in as_completed(futures): result = future.result() if result.get("file_size_bytes"): batch.append(result) progress.mark_done(result["url"]) downloaded += 1 pbar.update(1) if len(batch) >= save_every: for row in batch: tiles_csv.write(row) progress.save() batch.clear() for row in batch: tiles_csv.write(row) progress.save() log.info( "[%s] Scan %d complete: %d tiles downloaded.", machine["label"], scan_id, downloaded, ) return downloaded # --------------------------------------------------------------------------- # Per-scan driver # --------------------------------------------------------------------------- def process_scan( sess: MachineSession, scan: dict[str, Any], output_dir: Path, machine: dict[str, Any], config: dict[str, Any], progress: ProgressTracker, scans_csv: CsvWriter, tiles_csv: CsvWriter, dry_run: bool, mosaic_only: bool, metadata_only: bool = False, ) -> RunStats: """ Process one scan: fetch metadata, download mosaic and (optionally) tiles. Returns a RunStats with counters for what happened this call. If metadata_only is True, writes metadata.json and the scans.csv row but skips both the mosaic and the tiles. """ scan_id: int = scan["scan_id"] stats = RunStats() # In metadata-only mode, skip the HTTP fetch if metadata.json already exists. # Try the date-hinted path first; fall back to a glob when scan_time is # absent (e.g. when --scan-id is used and the synthetic scan dict has no # scan_time field). if metadata_only and not dry_run: machine_root = output_dir / machine_dir_name(machine) scan_date_hint = _extract_date(scan.get("scan_time", "")) found_meta: Path | None = None if scan_date_hint and scan_date_hint != "unknown": candidate = machine_root / scan_date_hint / str(scan_id) / "metadata.json" if candidate.exists(): found_meta = candidate if found_meta is None: matches = list(machine_root.glob(f"*/{scan_id}/metadata.json")) if matches: found_meta = matches[0] if found_meta is not None: log.debug( "[%s] Scan %d: metadata.json already exists, skipping fetch.", machine["label"], scan_id, ) stats.scans_skipped += 1 return stats log.info("[%s] Processing scan %d …", machine["label"], scan_id) try: scan_meta = sess.get_scan_metadata(scan_id) except Exception as exc: log.error( "[%s] Cannot fetch scan %d metadata: %s", machine["label"], scan_id, exc ) stats.scans_failed += 1 return stats if not scan_meta.get("nx") or not scan_meta.get("ny"): log.warning( "[%s] Scan %d: missing grid params, skipping.", machine["label"], scan_id, ) stats.scans_failed += 1 return stats stats.scans_fetched += 1 # Merge list-level metadata into scan_meta (detail page takes precedence) for k in ( "name", "scan_time", "start_datetime", "end_datetime", "status", "user", "scan_lines", "scan_mode", ): scan_meta.setdefault(k, scan.get(k, "")) # Save per-scan metadata.json scan_date = _extract_date(scan_meta.get("scan_time", "")) scan_dir = output_dir / machine_dir_name(machine) / scan_date / str(scan_id) if not dry_run: scan_dir.mkdir(parents=True, exist_ok=True) meta_file = scan_dir / "metadata.json" if not meta_file.exists(): meta_file.write_text( json.dumps(scan_meta, indent=2, default=str), encoding="utf-8" ) stats.metadata_written += 1 # Mosaic (skipped entirely in metadata-only mode) mosaic_path = mosaic_dest(output_dir, machine, scan_meta, scan_id) mosaic_url = sess.mosaic_url(scan_id) mosaic_already_done = progress.is_done(mosaic_url) if metadata_only: mosaic_just_downloaded = False else: mosaic_just_downloaded = _download_mosaic( sess, scan_meta, scan_id, mosaic_path, progress, machine, config, dry_run, ) if not metadata_only and mosaic_just_downloaded: stats.mosaics_downloaded += 1 elif ( not metadata_only and not dry_run and not mosaic_already_done and not mosaic_just_downloaded ): stats.mosaics_failed += 1 # Write scan-level CSV row only if this scan hasn't been recorded before. if mosaic_already_done and not metadata_only: log.debug( "[%s] Scan %d: already in scans.csv (mosaic was previously downloaded), skipping CSV row.", machine["label"], scan_id, ) else: scans_csv.write( { "machine": machine["label"], "machine_id": machine["machine_id"], "scan_id": scan_id, "name": scan_meta.get("name", ""), "scan_time": scan_meta.get("scan_time", ""), "start_x": scan_meta.get("start_x", ""), "start_y": scan_meta.get("start_y", ""), "end_x": scan_meta.get("end_x", ""), "end_y": scan_meta.get("end_y", ""), "dx": scan_meta.get("dx", ""), "dy": scan_meta.get("dy", ""), "nx": scan_meta.get("nx", ""), "ny": scan_meta.get("ny", ""), "total_tiles": scan_meta.get("total_tiles", ""), "scan_lines": scan_meta.get("scan_lines", ""), "scan_mode": scan_meta.get("scan_mode", ""), "start_datetime": scan_meta.get("start_datetime", ""), "end_datetime": scan_meta.get("end_datetime", ""), "status": scan_meta.get("status", ""), "user": scan_meta.get("user", ""), "disk_space_mb": scan_meta.get("disk_space_mb", ""), "mosaic_url": mosaic_url, "mosaic_local_path": str(mosaic_path), "mosaic_on_disk": mosaic_path.exists(), } ) if mosaic_only or metadata_only: return stats # Tiles tiles = sess.enumerate_tiles(scan_meta) stats.tiles_downloaded += _download_tiles_for_scan( sess, tiles, scan_meta, scan_id, output_dir, machine, config, progress, tiles_csv, dry_run, ) return stats # --------------------------------------------------------------------------- # Per-machine driver # --------------------------------------------------------------------------- def scrape_machine( machine: dict[str, Any], config: dict[str, Any], output_dir: Path, progress: ProgressTracker, tiles_csv: CsvWriter, scans_csv: CsvWriter, dry_run: bool, mosaic_only: bool, metadata_only: bool = False, scan_id_filter: int | None = None, ) -> RunStats: """Login, fetch scans, and download all content for one machine.""" sess = MachineSession(machine, config) if not sess.login(): return RunStats() if scan_id_filter is not None: scans: list[dict[str, Any]] = [ {"scan_id": scan_id_filter, "status": "Completed"} ] log.info("[%s] Targeting scan ID %d.", machine["label"], scan_id_filter) else: scans = sess.get_all_scans() if not scans: log.warning("[%s] No scans found.", machine["label"]) return RunStats() stats = RunStats() for scan in scans: stats.merge(process_scan( sess=sess, scan=scan, output_dir=output_dir, machine=machine, config=config, progress=progress, scans_csv=scans_csv, tiles_csv=tiles_csv, dry_run=dry_run, mosaic_only=mosaic_only, metadata_only=metadata_only, )) return stats