Files

667 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
High-level scrape orchestration: drives the per-machine and per-scan loops.
"""
import csv
import json
import logging
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from tqdm import tqdm
from spruce.download_result import PERMANENT_MISSING, UNKNOWN, error_code_str
from spruce.exif import write_mosaic_exif
from spruce.paths import machine_dir_name, tile_dest, mosaic_dest, _extract_date
from spruce.progress import ProgressTracker, CsvWriter
from spruce.session import MachineSession
# RootView returns ~43-byte 1×1 JPEG placeholders for empty cells; stay well
# below smallest observed real tile (~7 KiB in production samples).
PLACEHOLDER_MAX_BYTES = 200
def _is_placeholder_tile(path: Path) -> bool:
"""Return True if a downloaded tile looks like a 1×1 server placeholder."""
try:
return path.is_file() and path.stat().st_size <= PLACEHOLDER_MAX_BYTES
except OSError:
return False
@dataclass
class RunStats:
"""Accumulated counters for one or more machines."""
scans_fetched: int = 0 # scan detail page fetched (metadata), not tiles/mosaics
scans_skipped: int = 0 # metadata.json already on disk; no HTTP request
scans_failed: int = 0 # metadata fetch error or missing grid params
metadata_written: int = 0 # new metadata.json files created
mosaics_downloaded: int = 0
mosaics_failed: int = 0 # mosaic URL attempted but 0 bytes / HTTP error
tiles_downloaded: int = 0
scans_probe_skipped: int = 0 # probe tile was 404 or placeholder; full tile pool skipped
scans_disk_space_skipped: int = 0 # disk_space_mb == 0; no mosaic or tiles attempted
def merge(self, other: "RunStats") -> None:
self.scans_fetched += other.scans_fetched
self.scans_skipped += other.scans_skipped
self.scans_failed += other.scans_failed
self.metadata_written += other.metadata_written
self.mosaics_downloaded += other.mosaics_downloaded
self.mosaics_failed += other.mosaics_failed
self.tiles_downloaded += other.tiles_downloaded
self.scans_probe_skipped += other.scans_probe_skipped
self.scans_disk_space_skipped += other.scans_disk_space_skipped
log = logging.getLogger(__name__)
def _read_scans_csv_latest(scans_csv_path: Path) -> dict[tuple[str, str], dict[str, str]]:
"""Last row wins per (machine, scan_id)."""
latest: dict[tuple[str, str], dict[str, str]] = {}
if not scans_csv_path.exists():
return latest
with open(scans_csv_path, newline="", encoding="utf-8") as fh:
for row in csv.DictReader(fh):
key = (row.get("machine", ""), row.get("scan_id", ""))
latest[key] = row
return latest
def load_failed_scans_from_csv(
scans_csv_path: Path,
machine_label: str,
*,
since_year: str | None = None,
error_code: str | None = None,
) -> list[dict[str, Any]]:
"""
Dedupe scans.csv by (machine, scan_id); return failed mosaic rows for one machine.
Each dict is suitable as the ``scan`` argument to ``process_scan`` (scan_id,
scan_time, name, status).
"""
latest = _read_scans_csv_latest(scans_csv_path)
out: list[dict[str, Any]] = []
for (_m, _sid), row in latest.items():
if row.get("machine") != machine_label:
continue
if row.get("mosaic_download_status") != "failed":
continue
if error_code is not None and row.get("mosaic_error_code", "") != error_code:
continue
st = row.get("scan_time", "") or ""
if since_year is not None:
yr = st[:4]
if len(yr) < 4 or yr < since_year:
continue
sid = int(row["scan_id"])
out.append(
{
"scan_id": sid,
"scan_time": st,
"name": row.get("name", ""),
"status": row.get("status", "") or "Completed",
"user": row.get("user", ""),
"scan_lines": row.get("scan_lines", ""),
"scan_mode": row.get("scan_mode", ""),
}
)
out.sort(key=lambda s: s["scan_id"])
return out
# ---------------------------------------------------------------------------
# Per-scan helpers
# ---------------------------------------------------------------------------
@dataclass
class MosaicAttempt:
"""Outcome of a mosaic download attempt (for scans.csv and RunStats)."""
downloaded: bool
csv_status: str
error: str
error_code: str
error_class: str
def _download_mosaic(
sess: MachineSession,
scan_meta: dict[str, Any],
scan_id: int,
mosaic_path: Path,
progress: ProgressTracker,
machine: dict[str, Any],
config: dict[str, Any],
dry_run: bool,
) -> MosaicAttempt:
"""Download the scan mosaic if not already done."""
url = sess.mosaic_url(scan_id)
if progress.is_done(url):
return MosaicAttempt(
False, "already_done", "", "", ""
)
if dry_run:
log.info("[DRY-RUN] Mosaic: %s%s", url, mosaic_path)
return MosaicAttempt(False, "dry_run", "", "", "")
log.info("[%s] Downloading mosaic for scan %d …", machine["label"], scan_id)
res = sess.download_file(url, mosaic_path)
if res.ok:
if config.get("write_exif", True):
mmeta: dict[str, Any] | None = config.get("machine_metadata", {}).get(
machine["label"]
)
write_mosaic_exif(
mosaic_path, scan_meta, machine, scan_id, mmeta
)
progress.mark_done(url)
progress.save()
log.info(
"[%s] Mosaic saved: %s (%.1f MB)",
machine["label"],
mosaic_path,
res.size / 1e6,
)
return MosaicAttempt(True, "downloaded", "", "", "")
return MosaicAttempt(
False,
"failed",
res.error or "",
error_code_str(res.status_code),
res.error_class,
)
def _download_tiles_for_scan(
sess: MachineSession,
tiles: list[dict[str, Any]],
scan_meta: dict[str, Any],
scan_id: int,
output_dir: Path,
machine: dict[str, Any],
config: dict[str, Any],
progress: ProgressTracker,
tiles_csv: CsvWriter,
dry_run: bool,
) -> int:
"""Download all pending tiles for a scan. Returns count of tiles downloaded."""
# Heal progress for tiles that exist on disk but weren't recorded (e.g.
# crash between write and batch save). Prevents duplicate tiles.csv rows.
healed = 0
for t in tiles:
if not progress.is_done(t["url"]):
dest = tile_dest(output_dir, machine, scan_meta, t)
if dest.exists() and dest.stat().st_size > 0:
progress.mark_done(t["url"])
healed += 1
if healed:
log.debug(
"[%s] Scan %d: healed %d tile(s) already on disk into progress.",
machine["label"],
scan_id,
healed,
)
progress.save()
pending = [t for t in tiles if not progress.is_done(t["url"])]
log.info(
"[%s] Scan %d: %d tiles total, %d pending.",
machine["label"],
scan_id,
len(tiles),
len(pending),
)
if dry_run:
for t in pending[:5]:
log.info("[DRY-RUN] Tile: %s", t["url"])
if len(pending) > 5:
log.info("[DRY-RUN] … and %d more tiles.", len(pending) - 5)
return 0
# Attach scan_time for CSV rows
for t in pending:
t["scan_time"] = scan_meta.get("scan_time", "")
workers: int = config["workers"]
downloaded = 0
with ThreadPoolExecutor(max_workers=workers) as pool:
futures = {
pool.submit(
sess.download_tile,
tile,
tile_dest(output_dir, machine, scan_meta, tile),
False,
): tile
for tile in pending
}
save_every = max(50, workers * 4)
batch: list[dict[str, Any]] = []
with tqdm(
total=len(pending),
desc=f"{machine['label']} scan {scan_id}",
unit="tile",
leave=True,
) as pbar:
for future in as_completed(futures):
result = future.result()
batch.append(result)
if result.get("status") == "downloaded":
progress.mark_done(result["url"])
downloaded += 1
pbar.update(1)
if len(batch) >= save_every:
for row in batch:
tiles_csv.write(row)
progress.save()
batch.clear()
for row in batch:
tiles_csv.write(row)
progress.save()
log.info(
"[%s] Scan %d complete: %d tiles downloaded.",
machine["label"],
scan_id,
downloaded,
)
return downloaded
# ---------------------------------------------------------------------------
# Per-scan driver
# ---------------------------------------------------------------------------
def process_scan(
sess: MachineSession,
scan: dict[str, Any],
output_dir: Path,
machine: dict[str, Any],
config: dict[str, Any],
progress: ProgressTracker,
scans_csv: CsvWriter,
tiles_csv: CsvWriter,
dry_run: bool,
mosaic_only: bool,
metadata_only: bool = False,
max_tiles: int | None = None,
scans_csv_existing_ids: set[int] | None = None,
) -> RunStats:
"""
Process one scan: fetch metadata, download mosaic and (optionally) tiles.
Returns a RunStats with counters for what happened this call.
If metadata_only is True, writes metadata.json and the scans.csv row but
skips both the mosaic and the tiles.
"""
scan_id: int = scan["scan_id"]
stats = RunStats()
# In metadata-only mode, skip the HTTP fetch if metadata.json already exists.
# Try the date-hinted path first; fall back to a glob when scan_time is
# absent (e.g. when --scan-id is used and the synthetic scan dict has no
# scan_time field).
if metadata_only and not dry_run:
machine_root = output_dir / machine_dir_name(machine)
scan_date_hint = _extract_date(scan.get("scan_time", ""))
found_meta: Path | None = None
if scan_date_hint and scan_date_hint != "unknown":
candidate = machine_root / scan_date_hint / str(scan_id) / "metadata.json"
if candidate.exists():
found_meta = candidate
# Date hint is reliable — don't glob if candidate wasn't found.
else:
matches = list(machine_root.glob(f"*/{scan_id}/metadata.json"))
if matches:
found_meta = matches[0]
if found_meta is not None:
log.debug(
"[%s] Scan %d: metadata.json already exists, skipping fetch.",
machine["label"],
scan_id,
)
stats.scans_skipped += 1
return stats
log.info("[%s] Processing scan %d …", machine["label"], scan_id)
try:
scan_meta = sess.get_scan_metadata(scan_id)
except Exception as exc:
log.error(
"[%s] Cannot fetch scan %d metadata: %s", machine["label"], scan_id, exc
)
stats.scans_failed += 1
return stats
if not scan_meta.get("nx") or not scan_meta.get("ny"):
log.warning(
"[%s] Scan %d: missing grid params, skipping.",
machine["label"],
scan_id,
)
stats.scans_failed += 1
return stats
stats.scans_fetched += 1
# Merge list-level metadata into scan_meta (detail page takes precedence)
for k in (
"name",
"scan_time",
"start_datetime",
"end_datetime",
"status",
"user",
"scan_lines",
"scan_mode",
):
scan_meta.setdefault(k, scan.get(k, ""))
# disk_space_mb == 0 is a reliable signal that the scan has no imagery.
# A 300-scan investigation (50 per bucket) found 0% viability in this bucket.
# Skip the mosaic and tile downloads entirely; write a record so scans.csv
# stays complete.
disk_space_skip = False
if not metadata_only:
try:
if float(scan_meta.get("disk_space_mb") or "nan") == 0.0:
disk_space_skip = True
log.info(
"[%s] Scan %d: disk_space_mb=0 — skipping mosaic and tiles.",
machine["label"],
scan_id,
)
except (ValueError, TypeError):
pass
# Save per-scan metadata.json
scan_date = _extract_date(scan_meta.get("scan_time", ""))
scan_dir = output_dir / machine_dir_name(machine) / scan_date / str(scan_id)
if not dry_run:
scan_dir.mkdir(parents=True, exist_ok=True)
meta_file = scan_dir / "metadata.json"
if not meta_file.exists():
meta_file.write_text(
json.dumps(scan_meta, indent=2, default=str), encoding="utf-8"
)
stats.metadata_written += 1
# Mosaic (skipped entirely in metadata-only or disk_space_skip mode)
mosaic_path = mosaic_dest(output_dir, machine, scan_meta, scan_id)
mosaic_url = sess.mosaic_url(scan_id)
mosaic_already_done = progress.is_done(mosaic_url)
if metadata_only or disk_space_skip:
mosaic_attempt: MosaicAttempt | None = None
else:
mosaic_attempt = _download_mosaic(
sess,
scan_meta,
scan_id,
mosaic_path,
progress,
machine,
config,
dry_run,
)
if not metadata_only and mosaic_attempt:
if mosaic_attempt.downloaded:
stats.mosaics_downloaded += 1
elif not dry_run and mosaic_attempt.csv_status == "failed":
stats.mosaics_failed += 1
if metadata_only:
mds, mer, mco, mcl = "skipped_metadata_only", "", "", ""
elif disk_space_skip:
mds, mer, mco, mcl = "skipped_zero_disk_space", "", "", ""
stats.scans_disk_space_skipped += 1
elif mosaic_attempt is not None:
mds = mosaic_attempt.csv_status
mer = mosaic_attempt.error
mco = mosaic_attempt.error_code
mcl = mosaic_attempt.error_class
else:
mds, mer, mco, mcl = "", "", "", ""
# Write scan-level CSV row only if this scan hasn't been recorded before.
# Skip if: (1) mosaic URL already in .progress.json, or (2) scan already
# has a non-pending row in scans.csv from a prior run.
already_recorded = (mosaic_already_done and not metadata_only) or (
not metadata_only
and scans_csv_existing_ids is not None
and scan_id in scans_csv_existing_ids
)
if already_recorded:
log.debug(
"[%s] Scan %d: already in scans.csv, skipping CSV row.",
machine["label"],
scan_id,
)
else:
scans_csv.write(
{
"machine": machine["label"],
"machine_id": machine["machine_id"],
"scan_id": scan_id,
"name": scan_meta.get("name", ""),
"scan_time": scan_meta.get("scan_time", ""),
"start_x": scan_meta.get("start_x", ""),
"start_y": scan_meta.get("start_y", ""),
"end_x": scan_meta.get("end_x", ""),
"end_y": scan_meta.get("end_y", ""),
"dx": scan_meta.get("dx", ""),
"dy": scan_meta.get("dy", ""),
"nx": scan_meta.get("nx", ""),
"ny": scan_meta.get("ny", ""),
"total_tiles": scan_meta.get("total_tiles", ""),
"scan_lines": scan_meta.get("scan_lines", ""),
"scan_mode": scan_meta.get("scan_mode", ""),
"start_datetime": scan_meta.get("start_datetime", ""),
"end_datetime": scan_meta.get("end_datetime", ""),
"status": scan_meta.get("status", ""),
"user": scan_meta.get("user", ""),
"disk_space_mb": scan_meta.get("disk_space_mb", ""),
"mosaic_url": mosaic_url,
"mosaic_local_path": str(mosaic_path),
"mosaic_on_disk": mosaic_path.exists(),
"mosaic_download_status": mds,
"mosaic_error": mer,
"mosaic_error_code": mco,
"mosaic_error_class": mcl,
}
)
if mosaic_only or metadata_only or disk_space_skip:
return stats
# Tiles
tiles = sess.enumerate_tiles(scan_meta)
if max_tiles is not None:
tiles = tiles[:max_tiles]
# Tile probe: always download one tile before launching the full thread
# pool. Two failure modes justify this:
# 1. Mosaic failed (404/410 or empty body) — scan was set up but never
# run; tile grid is all placeholders or 404s.
# 2. Mosaic succeeded but tiles are server-side placeholders (1x1 JPEG,
# ~43 B) — mosaic was generated from empty data; downloading the full
# grid would fire thousands of guaranteed-placeholder requests.
if (
not dry_run
and tiles
and not progress.is_done(tiles[0]["url"])
):
probe_tile = tiles[0]
probe_dest = tile_dest(output_dir, machine, scan_meta, probe_tile)
probe_res = sess.download_file(probe_tile["url"], probe_dest)
if not probe_res.ok or _is_placeholder_tile(probe_dest):
probe_dest.unlink(missing_ok=True)
detail = (
"is placeholder"
if probe_res.ok
else f"failed ({probe_res.error_class or probe_res.error or 'unknown'})"
)
log.info(
"[%s] Scan %d: probe tile %s — empty/placeholder scan, skipping %d tile(s).",
machine["label"],
scan_id,
detail,
len(tiles),
)
stats.scans_probe_skipped += 1
return stats
stats.tiles_downloaded += _download_tiles_for_scan(
sess,
tiles,
scan_meta,
scan_id,
output_dir,
machine,
config,
progress,
tiles_csv,
dry_run,
)
return stats
# ---------------------------------------------------------------------------
# Per-machine driver
# ---------------------------------------------------------------------------
def scrape_machine(
machine: dict[str, Any],
config: dict[str, Any],
output_dir: Path,
progress: ProgressTracker,
tiles_csv: CsvWriter,
scans_csv: CsvWriter,
dry_run: bool,
mosaic_only: bool,
metadata_only: bool = False,
scan_id_filter: int | None = None,
max_tiles: int | None = None,
retry_failed: bool = False,
retry_since_year: str | None = None,
retry_error_code: str | None = None,
) -> RunStats:
"""Login, fetch scans, and download all content for one machine."""
sess = MachineSession(machine, config)
login_ok = False
for attempt in range(1, 4):
if sess.login():
login_ok = True
break
if attempt < 3:
log.warning(
"[%s] Login failed (attempt %d/3) — retrying in 10s.",
machine["label"],
attempt,
)
time.sleep(10)
if not login_ok:
log.error("[%s] Login failed after 3 attempts — skipping machine.", machine["label"])
return RunStats()
if retry_failed:
scans = load_failed_scans_from_csv(
scans_csv.path,
machine["label"],
since_year=retry_since_year,
error_code=retry_error_code,
)
if scan_id_filter is not None:
scans = [s for s in scans if s["scan_id"] == scan_id_filter]
if not scans:
log.warning(
"[%s] Retry: scan_id %d not among failed rows for this machine.",
machine["label"],
scan_id_filter,
)
return RunStats()
log.info("[%s] Mosaic retry: single scan %d.", machine["label"], scan_id_filter)
elif not scans:
log.warning(
"[%s] No failed mosaic rows in scans.csv match retry filters.",
machine["label"],
)
return RunStats()
else:
log.info(
"[%s] Mosaic retry: %d failed scan(s) from scans.csv.",
machine["label"],
len(scans),
)
elif scan_id_filter is not None:
scans = [
{"scan_id": scan_id_filter, "status": "Completed"}
]
log.info("[%s] Targeting scan ID %d.", machine["label"], scan_id_filter)
else:
scans = sess.get_all_scans()
if not scans:
log.warning("[%s] No scans found.", machine["label"])
return RunStats()
# Build existing_ids: scan_ids to skip entirely (no metadata fetch, no HTTP).
# In normal mode: skip anything with a definitive non-pending status.
# In retry mode: only skip scans that are already downloaded or skipped for
# disk-space reasons — failed scans must be re-attempted.
PENDING_STATUSES = {"skipped_metadata_only", ""}
BLOCK_AFTER_RETRY_STATUSES = {"downloaded", "skipped_zero_disk_space"}
existing_ids: set[int] = set()
if not metadata_only:
latest_rows = _read_scans_csv_latest(scans_csv.path)
for (_mlabel, _sid), _row in latest_rows.items():
if _mlabel != machine["label"]:
continue
st = _row.get("mosaic_download_status", "")
if retry_failed:
if st in BLOCK_AFTER_RETRY_STATUSES:
existing_ids.add(int(_row["scan_id"]))
else:
if st not in PENDING_STATUSES:
existing_ids.add(int(_row["scan_id"]))
stats = RunStats()
for scan in scans:
# Skip scans already fully processed in a prior run — avoids redundant
# metadata fetches and mosaic requests for known-failed / known-done scans.
if not metadata_only and scan["scan_id"] in existing_ids:
log.debug(
"[%s] Scan %d: already processed, skipping.",
machine["label"],
scan["scan_id"],
)
continue
stats.merge(process_scan(
sess=sess,
scan=scan,
output_dir=output_dir,
machine=machine,
config=config,
progress=progress,
scans_csv=scans_csv,
tiles_csv=tiles_csv,
dry_run=dry_run,
mosaic_only=mosaic_only,
metadata_only=metadata_only,
max_tiles=max_tiles,
scans_csv_existing_ids=existing_ids,
))
return stats