Files
SPRUCE-scraper/spruce/orchestrator.py
T

528 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
High-level scrape orchestration: drives the per-machine and per-scan loops.
"""
import json
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from spruce.download_result import PERMANENT_MISSING, UNKNOWN, error_code_str
# RootView returns ~43-byte 1×1 JPEG placeholders for empty cells; stay well
# below smallest observed real tile (~7 KiB in production samples).
PLACEHOLDER_MAX_BYTES = 200
def _is_placeholder_tile(path: Path) -> bool:
"""Return True if a downloaded tile looks like a 1×1 server placeholder."""
try:
return path.is_file() and path.stat().st_size <= PLACEHOLDER_MAX_BYTES
except OSError:
return False
@dataclass
class RunStats:
"""Accumulated counters for one or more machines."""
scans_fetched: int = 0 # scan detail page fetched (metadata), not tiles/mosaics
scans_skipped: int = 0 # metadata.json already on disk; no HTTP request
scans_failed: int = 0 # metadata fetch error or missing grid params
metadata_written: int = 0 # new metadata.json files created
mosaics_downloaded: int = 0
mosaics_failed: int = 0 # mosaic URL attempted but 0 bytes / HTTP error
tiles_downloaded: int = 0
scans_probe_skipped: int = 0 # probe tile was 404 or placeholder; full tile pool skipped
scans_disk_space_skipped: int = 0 # disk_space_mb == 0; no mosaic or tiles attempted
def merge(self, other: "RunStats") -> None:
self.scans_fetched += other.scans_fetched
self.scans_skipped += other.scans_skipped
self.scans_failed += other.scans_failed
self.metadata_written += other.metadata_written
self.mosaics_downloaded += other.mosaics_downloaded
self.mosaics_failed += other.mosaics_failed
self.tiles_downloaded += other.tiles_downloaded
self.scans_probe_skipped += other.scans_probe_skipped
self.scans_disk_space_skipped += other.scans_disk_space_skipped
from tqdm import tqdm
from spruce.exif import write_mosaic_exif
from spruce.paths import machine_dir_name, tile_dest, mosaic_dest, _extract_date
from spruce.progress import ProgressTracker, CsvWriter
from spruce.session import MachineSession
log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Per-scan helpers
# ---------------------------------------------------------------------------
@dataclass
class MosaicAttempt:
"""Outcome of a mosaic download attempt (for scans.csv and RunStats)."""
downloaded: bool
csv_status: str
error: str
error_code: str
error_class: str
def _download_mosaic(
sess: MachineSession,
scan_meta: dict[str, Any],
scan_id: int,
mosaic_path: Path,
progress: ProgressTracker,
machine: dict[str, Any],
config: dict[str, Any],
dry_run: bool,
) -> MosaicAttempt:
"""Download the scan mosaic if not already done."""
url = sess.mosaic_url(scan_id)
if progress.is_done(url):
return MosaicAttempt(
False, "already_done", "", "", ""
)
if dry_run:
log.info("[DRY-RUN] Mosaic: %s%s", url, mosaic_path)
return MosaicAttempt(False, "dry_run", "", "", "")
log.info("[%s] Downloading mosaic for scan %d …", machine["label"], scan_id)
res = sess.download_file(url, mosaic_path)
if res.ok:
if config.get("write_exif", True):
mmeta: dict[str, Any] | None = config.get("machine_metadata", {}).get(
machine["label"]
)
write_mosaic_exif(
mosaic_path, scan_meta, machine, scan_id, mmeta
)
progress.mark_done(url)
progress.save()
log.info(
"[%s] Mosaic saved: %s (%.1f MB)",
machine["label"],
mosaic_path,
res.size / 1e6,
)
return MosaicAttempt(True, "downloaded", "", "", "")
return MosaicAttempt(
False,
"failed",
res.error or "",
error_code_str(res.status_code),
res.error_class,
)
def _download_tiles_for_scan(
sess: MachineSession,
tiles: list[dict[str, Any]],
scan_meta: dict[str, Any],
scan_id: int,
output_dir: Path,
machine: dict[str, Any],
config: dict[str, Any],
progress: ProgressTracker,
tiles_csv: CsvWriter,
dry_run: bool,
) -> int:
"""Download all pending tiles for a scan. Returns count of tiles downloaded."""
# Heal progress for tiles that exist on disk but weren't recorded (e.g.
# crash between write and batch save). Prevents duplicate tiles.csv rows.
healed = 0
for t in tiles:
if not progress.is_done(t["url"]):
dest = tile_dest(output_dir, machine, scan_meta, t)
if dest.exists() and dest.stat().st_size > 0:
progress.mark_done(t["url"])
healed += 1
if healed:
log.debug(
"[%s] Scan %d: healed %d tile(s) already on disk into progress.",
machine["label"],
scan_id,
healed,
)
progress.save()
pending = [t for t in tiles if not progress.is_done(t["url"])]
log.info(
"[%s] Scan %d: %d tiles total, %d pending.",
machine["label"],
scan_id,
len(tiles),
len(pending),
)
if dry_run:
for t in pending[:5]:
log.info("[DRY-RUN] Tile: %s", t["url"])
if len(pending) > 5:
log.info("[DRY-RUN] … and %d more tiles.", len(pending) - 5)
return 0
# Attach scan_time for CSV rows
for t in pending:
t["scan_time"] = scan_meta.get("scan_time", "")
workers: int = config["workers"]
downloaded = 0
with ThreadPoolExecutor(max_workers=workers) as pool:
futures = {
pool.submit(
sess.download_tile,
tile,
tile_dest(output_dir, machine, scan_meta, tile),
False,
): tile
for tile in pending
}
save_every = max(50, workers * 4)
batch: list[dict[str, Any]] = []
with tqdm(
total=len(pending),
desc=f"{machine['label']} scan {scan_id}",
unit="tile",
leave=True,
) as pbar:
for future in as_completed(futures):
result = future.result()
batch.append(result)
if result.get("status") == "downloaded":
progress.mark_done(result["url"])
downloaded += 1
pbar.update(1)
if len(batch) >= save_every:
for row in batch:
tiles_csv.write(row)
progress.save()
batch.clear()
for row in batch:
tiles_csv.write(row)
progress.save()
log.info(
"[%s] Scan %d complete: %d tiles downloaded.",
machine["label"],
scan_id,
downloaded,
)
return downloaded
# ---------------------------------------------------------------------------
# Per-scan driver
# ---------------------------------------------------------------------------
def process_scan(
sess: MachineSession,
scan: dict[str, Any],
output_dir: Path,
machine: dict[str, Any],
config: dict[str, Any],
progress: ProgressTracker,
scans_csv: CsvWriter,
tiles_csv: CsvWriter,
dry_run: bool,
mosaic_only: bool,
metadata_only: bool = False,
max_tiles: int | None = None,
) -> RunStats:
"""
Process one scan: fetch metadata, download mosaic and (optionally) tiles.
Returns a RunStats with counters for what happened this call.
If metadata_only is True, writes metadata.json and the scans.csv row but
skips both the mosaic and the tiles.
"""
scan_id: int = scan["scan_id"]
stats = RunStats()
# In metadata-only mode, skip the HTTP fetch if metadata.json already exists.
# Try the date-hinted path first; fall back to a glob when scan_time is
# absent (e.g. when --scan-id is used and the synthetic scan dict has no
# scan_time field).
if metadata_only and not dry_run:
machine_root = output_dir / machine_dir_name(machine)
scan_date_hint = _extract_date(scan.get("scan_time", ""))
found_meta: Path | None = None
if scan_date_hint and scan_date_hint != "unknown":
candidate = machine_root / scan_date_hint / str(scan_id) / "metadata.json"
if candidate.exists():
found_meta = candidate
# Date hint is reliable — don't glob if candidate wasn't found.
else:
matches = list(machine_root.glob(f"*/{scan_id}/metadata.json"))
if matches:
found_meta = matches[0]
if found_meta is not None:
log.debug(
"[%s] Scan %d: metadata.json already exists, skipping fetch.",
machine["label"],
scan_id,
)
stats.scans_skipped += 1
return stats
log.info("[%s] Processing scan %d …", machine["label"], scan_id)
try:
scan_meta = sess.get_scan_metadata(scan_id)
except Exception as exc:
log.error(
"[%s] Cannot fetch scan %d metadata: %s", machine["label"], scan_id, exc
)
stats.scans_failed += 1
return stats
if not scan_meta.get("nx") or not scan_meta.get("ny"):
log.warning(
"[%s] Scan %d: missing grid params, skipping.",
machine["label"],
scan_id,
)
stats.scans_failed += 1
return stats
stats.scans_fetched += 1
# Merge list-level metadata into scan_meta (detail page takes precedence)
for k in (
"name",
"scan_time",
"start_datetime",
"end_datetime",
"status",
"user",
"scan_lines",
"scan_mode",
):
scan_meta.setdefault(k, scan.get(k, ""))
# disk_space_mb == 0 is a reliable signal that the scan has no imagery.
# A 300-scan investigation (50 per bucket) found 0% viability in this bucket.
# Skip the mosaic and tile downloads entirely; write a record so scans.csv
# stays complete.
disk_space_skip = False
if not metadata_only:
try:
if float(scan_meta.get("disk_space_mb") or "nan") == 0.0:
disk_space_skip = True
log.info(
"[%s] Scan %d: disk_space_mb=0 — skipping mosaic and tiles.",
machine["label"],
scan_id,
)
except (ValueError, TypeError):
pass
# Save per-scan metadata.json
scan_date = _extract_date(scan_meta.get("scan_time", ""))
scan_dir = output_dir / machine_dir_name(machine) / scan_date / str(scan_id)
if not dry_run:
scan_dir.mkdir(parents=True, exist_ok=True)
meta_file = scan_dir / "metadata.json"
if not meta_file.exists():
meta_file.write_text(
json.dumps(scan_meta, indent=2, default=str), encoding="utf-8"
)
stats.metadata_written += 1
# Mosaic (skipped entirely in metadata-only or disk_space_skip mode)
mosaic_path = mosaic_dest(output_dir, machine, scan_meta, scan_id)
mosaic_url = sess.mosaic_url(scan_id)
mosaic_already_done = progress.is_done(mosaic_url)
if metadata_only or disk_space_skip:
mosaic_attempt: MosaicAttempt | None = None
else:
mosaic_attempt = _download_mosaic(
sess,
scan_meta,
scan_id,
mosaic_path,
progress,
machine,
config,
dry_run,
)
if not metadata_only and mosaic_attempt:
if mosaic_attempt.downloaded:
stats.mosaics_downloaded += 1
elif not dry_run and mosaic_attempt.csv_status == "failed":
stats.mosaics_failed += 1
if metadata_only:
mds, mer, mco, mcl = "skipped_metadata_only", "", "", ""
elif disk_space_skip:
mds, mer, mco, mcl = "skipped_zero_disk_space", "", "", ""
stats.scans_disk_space_skipped += 1
elif mosaic_attempt is not None:
mds = mosaic_attempt.csv_status
mer = mosaic_attempt.error
mco = mosaic_attempt.error_code
mcl = mosaic_attempt.error_class
else:
mds, mer, mco, mcl = "", "", "", ""
# Write scan-level CSV row only if this scan hasn't been recorded before.
if mosaic_already_done and not metadata_only:
log.debug(
"[%s] Scan %d: already in scans.csv (mosaic was previously downloaded), skipping CSV row.",
machine["label"],
scan_id,
)
else:
scans_csv.write(
{
"machine": machine["label"],
"machine_id": machine["machine_id"],
"scan_id": scan_id,
"name": scan_meta.get("name", ""),
"scan_time": scan_meta.get("scan_time", ""),
"start_x": scan_meta.get("start_x", ""),
"start_y": scan_meta.get("start_y", ""),
"end_x": scan_meta.get("end_x", ""),
"end_y": scan_meta.get("end_y", ""),
"dx": scan_meta.get("dx", ""),
"dy": scan_meta.get("dy", ""),
"nx": scan_meta.get("nx", ""),
"ny": scan_meta.get("ny", ""),
"total_tiles": scan_meta.get("total_tiles", ""),
"scan_lines": scan_meta.get("scan_lines", ""),
"scan_mode": scan_meta.get("scan_mode", ""),
"start_datetime": scan_meta.get("start_datetime", ""),
"end_datetime": scan_meta.get("end_datetime", ""),
"status": scan_meta.get("status", ""),
"user": scan_meta.get("user", ""),
"disk_space_mb": scan_meta.get("disk_space_mb", ""),
"mosaic_url": mosaic_url,
"mosaic_local_path": str(mosaic_path),
"mosaic_on_disk": mosaic_path.exists(),
"mosaic_download_status": mds,
"mosaic_error": mer,
"mosaic_error_code": mco,
"mosaic_error_class": mcl,
}
)
if mosaic_only or metadata_only or disk_space_skip:
return stats
# Tiles
tiles = sess.enumerate_tiles(scan_meta)
if max_tiles is not None:
tiles = tiles[:max_tiles]
# Tile probe: always download one tile before launching the full thread
# pool. Two failure modes justify this:
# 1. Mosaic failed (404/410 or empty body) — scan was set up but never
# run; tile grid is all placeholders or 404s.
# 2. Mosaic succeeded but tiles are server-side placeholders (1x1 JPEG,
# ~43 B) — mosaic was generated from empty data; downloading the full
# grid would fire thousands of guaranteed-placeholder requests.
if (
not dry_run
and tiles
and not progress.is_done(tiles[0]["url"])
):
probe_tile = tiles[0]
probe_dest = tile_dest(output_dir, machine, scan_meta, probe_tile)
probe_res = sess.download_file(probe_tile["url"], probe_dest)
if not probe_res.ok or _is_placeholder_tile(probe_dest):
probe_dest.unlink(missing_ok=True)
detail = (
"is placeholder"
if probe_res.ok
else f"failed ({probe_res.error_class or probe_res.error or 'unknown'})"
)
log.info(
"[%s] Scan %d: probe tile %s — empty/placeholder scan, skipping %d tile(s).",
machine["label"],
scan_id,
detail,
len(tiles),
)
stats.scans_probe_skipped += 1
return stats
stats.tiles_downloaded += _download_tiles_for_scan(
sess,
tiles,
scan_meta,
scan_id,
output_dir,
machine,
config,
progress,
tiles_csv,
dry_run,
)
return stats
# ---------------------------------------------------------------------------
# Per-machine driver
# ---------------------------------------------------------------------------
def scrape_machine(
machine: dict[str, Any],
config: dict[str, Any],
output_dir: Path,
progress: ProgressTracker,
tiles_csv: CsvWriter,
scans_csv: CsvWriter,
dry_run: bool,
mosaic_only: bool,
metadata_only: bool = False,
scan_id_filter: int | None = None,
max_tiles: int | None = None,
) -> RunStats:
"""Login, fetch scans, and download all content for one machine."""
sess = MachineSession(machine, config)
if not sess.login():
return RunStats()
if scan_id_filter is not None:
scans: list[dict[str, Any]] = [
{"scan_id": scan_id_filter, "status": "Completed"}
]
log.info("[%s] Targeting scan ID %d.", machine["label"], scan_id_filter)
else:
scans = sess.get_all_scans()
if not scans:
log.warning("[%s] No scans found.", machine["label"])
return RunStats()
stats = RunStats()
for scan in scans:
stats.merge(process_scan(
sess=sess,
scan=scan,
output_dir=output_dir,
machine=machine,
config=config,
progress=progress,
scans_csv=scans_csv,
tiles_csv=tiles_csv,
dry_run=dry_run,
mosaic_only=mosaic_only,
metadata_only=metadata_only,
max_tiles=max_tiles,
))
return stats