Add --metadata-only mode; harden resume and idempotency

- Add --metadata-only flag: fetches scan detail pages, writes
  metadata.json + scans.csv rows, skips all image downloads.
  Re-runs skip scans whose metadata.json already exists.
- Atomic progress.json saves (temp-file rename).
- Heal-on-resume: tiles on disk but not in progress are silently
  re-marked before building the pending list.
- scans.csv dedup: skip row if mosaic URL already in progress.
- Rename mosaic_downloaded -> mosaic_on_disk (reflects disk state).
- --recheck now checks mosaics as well as tiles.
- RunStats dataclass replaces raw int return; richer run summary.
- Fix argparse allow_abbrev reverted; fix --scan-id + --metadata-only
  glob fallback when scan_time is absent.
- Add .venv/ to .gitignore.
- README: fix typo, update worker counts, document all new behaviour.
This commit is contained in:
2026-04-24 09:44:57 -04:00
parent e122f6435a
commit f2193011ca
8 changed files with 294 additions and 93 deletions
+68 -11
View File
@@ -10,7 +10,7 @@ from pathlib import Path
import yaml
from spruce.orchestrator import scrape_machine
from spruce.orchestrator import scrape_machine, RunStats
from spruce.parsers import parse_machine_option
from spruce.progress import ProgressTracker, CsvWriter
from spruce.recheck import recheck_archive, recheck_tile_files
@@ -75,6 +75,15 @@ def parse_args() -> argparse.Namespace:
action="store_true",
help="Download mosaics only; skip individual tiles",
)
p.add_argument(
"--metadata-only",
action="store_true",
help=(
"Fetch scan parameters only; write metadata.json and scans.csv "
"rows but skip mosaics and tiles. Very fast — suitable for "
"inventorying all scans across all machines."
),
)
p.add_argument(
"--dry-run",
action="store_true",
@@ -220,7 +229,11 @@ def main() -> None:
len(machines),
", ".join(m["label"] for m in machines),
)
if args.mosaic_only:
if args.mosaic_only and args.metadata_only:
sys.exit("--mosaic-only and --metadata-only are mutually exclusive.")
if args.metadata_only:
log.info("Mode: metadata only (mosaics and tiles skipped)")
elif args.mosaic_only:
log.info("Mode: mosaics only (individual tiles skipped)")
if args.dry_run:
log.info("Mode: dry-run (no files will be written)")
@@ -230,10 +243,10 @@ def main() -> None:
tiles_csv = CsvWriter(output_dir / TILES_CSV_FILENAME, TILES_CSV_FIELDS)
scans_csv = CsvWriter(output_dir / SCANS_CSV_FILENAME, SCANS_CSV_FIELDS)
total = 0
totals = RunStats()
try:
for machine in machines:
count = scrape_machine(
stats = scrape_machine(
machine=machine,
config=config,
output_dir=output_dir,
@@ -242,18 +255,62 @@ def main() -> None:
scans_csv=scans_csv,
dry_run=args.dry_run,
mosaic_only=args.mosaic_only,
metadata_only=args.metadata_only,
scan_id_filter=args.scan_id,
)
total += count
totals.merge(stats)
finally:
tiles_csv.close()
scans_csv.close()
progress.save()
if args.dry_run:
log.info("Dry run complete.")
_print_summary(
totals=totals,
machines=machines,
output_dir=output_dir,
dry_run=args.dry_run,
metadata_only=args.metadata_only,
mosaic_only=args.mosaic_only,
)
def _print_summary(
totals: RunStats,
machines: list[dict],
output_dir: Path,
dry_run: bool,
metadata_only: bool,
mosaic_only: bool,
) -> None:
W = 46
sep = "" * W
def row(label: str, value: str, note: str = "") -> str:
note_str = f" ({note})" if note else ""
return f" {label:<22}{value}{note_str}"
log.info(sep)
if dry_run:
log.info(" Dry run complete — no files written.")
else:
log.info("Done. Total files downloaded: %d", total)
log.info("Tiles CSV : %s", output_dir / TILES_CSV_FILENAME)
log.info("Scans CSV : %s", output_dir / SCANS_CSV_FILENAME)
log.info("Progress : %s", output_dir / PROGRESS_FILENAME)
log.info(" Run complete")
log.info(sep)
log.info(row("Machines:", str(len(machines))))
log.info(
row("Scans fetched:", str(totals.scans_fetched),
f"{totals.scans_skipped} already cached, "
f"{totals.scans_failed} failed"
if totals.scans_skipped or totals.scans_failed else "")
)
if not metadata_only:
log.info(row("Mosaics downloaded:", str(totals.mosaics_downloaded)))
if not metadata_only and not mosaic_only:
log.info(row("Tiles downloaded:", str(totals.tiles_downloaded)))
if metadata_only:
log.info(row("Metadata written:", str(totals.metadata_written), "new JSON files"))
log.info(sep)
log.info(row("Scans CSV:", str(output_dir / SCANS_CSV_FILENAME)))
if not metadata_only:
log.info(row("Tiles CSV:", str(output_dir / TILES_CSV_FILENAME)))
log.info(row("Progress:", str(output_dir / PROGRESS_FILENAME)))
log.info(sep)
+139 -50
View File
@@ -5,9 +5,30 @@ High-level scrape orchestration: drives the per-machine and per-scan loops.
import json
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
@dataclass
class RunStats:
"""Accumulated counters for one or more machines."""
scans_fetched: int = 0 # metadata fetched from server this run
scans_skipped: int = 0 # metadata.json already on disk; no HTTP request
scans_failed: int = 0 # fetch error or missing grid params
metadata_written: int = 0 # new metadata.json files created
mosaics_downloaded: int = 0
tiles_downloaded: int = 0
def merge(self, other: "RunStats") -> None:
self.scans_fetched += other.scans_fetched
self.scans_skipped += other.scans_skipped
self.scans_failed += other.scans_failed
self.metadata_written += other.metadata_written
self.mosaics_downloaded += other.mosaics_downloaded
self.tiles_downloaded += other.tiles_downloaded
from tqdm import tqdm
from spruce.paths import machine_dir_name, tile_dest, mosaic_dest, _extract_date
@@ -66,6 +87,24 @@ def _download_tiles_for_scan(
dry_run: bool,
) -> int:
"""Download all pending tiles for a scan. Returns count of tiles downloaded."""
# Heal progress for tiles that exist on disk but weren't recorded (e.g.
# crash between write and batch save). Prevents duplicate tiles.csv rows.
healed = 0
for t in tiles:
if not progress.is_done(t["url"]):
dest = tile_dest(output_dir, machine, scan_meta, t)
if dest.exists() and dest.stat().st_size > 0:
progress.mark_done(t["url"])
healed += 1
if healed:
log.debug(
"[%s] Scan %d: healed %d tile(s) already on disk into progress.",
machine["label"],
scan_id,
healed,
)
progress.save()
pending = [t for t in tiles if not progress.is_done(t["url"])]
log.info(
"[%s] Scan %d: %d tiles total, %d pending.",
@@ -152,12 +191,43 @@ def process_scan(
tiles_csv: CsvWriter,
dry_run: bool,
mosaic_only: bool,
) -> int:
metadata_only: bool = False,
) -> RunStats:
"""
Process one scan: fetch metadata, download mosaic and (optionally) tiles.
Returns total files downloaded for this scan.
Returns a RunStats with counters for what happened this call.
If metadata_only is True, writes metadata.json and the scans.csv row but
skips both the mosaic and the tiles.
"""
scan_id: int = scan["scan_id"]
stats = RunStats()
# In metadata-only mode, skip the HTTP fetch if metadata.json already exists.
# Try the date-hinted path first; fall back to a glob when scan_time is
# absent (e.g. when --scan-id is used and the synthetic scan dict has no
# scan_time field).
if metadata_only and not dry_run:
machine_root = output_dir / machine_dir_name(machine)
scan_date_hint = _extract_date(scan.get("scan_time", ""))
found_meta: Path | None = None
if scan_date_hint and scan_date_hint != "unknown":
candidate = machine_root / scan_date_hint / str(scan_id) / "metadata.json"
if candidate.exists():
found_meta = candidate
if found_meta is None:
matches = list(machine_root.glob(f"*/{scan_id}/metadata.json"))
if matches:
found_meta = matches[0]
if found_meta is not None:
log.debug(
"[%s] Scan %d: metadata.json already exists, skipping fetch.",
machine["label"],
scan_id,
)
stats.scans_skipped += 1
return stats
log.info("[%s] Processing scan %d", machine["label"], scan_id)
try:
@@ -166,7 +236,8 @@ def process_scan(
log.error(
"[%s] Cannot fetch scan %d metadata: %s", machine["label"], scan_id, exc
)
return 0
stats.scans_failed += 1
return stats
if not scan_meta.get("nx") or not scan_meta.get("ny"):
log.warning(
@@ -174,7 +245,10 @@ def process_scan(
machine["label"],
scan_id,
)
return 0
stats.scans_failed += 1
return stats
stats.scans_fetched += 1
# Merge list-level metadata into scan_meta (detail page takes precedence)
for k in (
@@ -199,51 +273,64 @@ def process_scan(
meta_file.write_text(
json.dumps(scan_meta, indent=2, default=str), encoding="utf-8"
)
stats.metadata_written += 1
# Mosaic
# Mosaic (skipped entirely in metadata-only mode)
mosaic_path = mosaic_dest(output_dir, machine, scan_meta, scan_id)
mosaic_url = sess.mosaic_url(scan_id)
mosaic_downloaded = _download_mosaic(
sess, scan_meta, scan_id, mosaic_path, progress, machine, dry_run
)
total = 1 if mosaic_downloaded else 0
mosaic_already_done = progress.is_done(mosaic_url)
if metadata_only:
mosaic_just_downloaded = False
else:
mosaic_just_downloaded = _download_mosaic(
sess, scan_meta, scan_id, mosaic_path, progress, machine, dry_run
)
if mosaic_just_downloaded:
stats.mosaics_downloaded += 1
# Write scan-level CSV row
scans_csv.write(
{
"machine": machine["label"],
"machine_id": machine["machine_id"],
"scan_id": scan_id,
"name": scan_meta.get("name", ""),
"scan_time": scan_meta.get("scan_time", ""),
"start_x": scan_meta.get("start_x", ""),
"start_y": scan_meta.get("start_y", ""),
"end_x": scan_meta.get("end_x", ""),
"end_y": scan_meta.get("end_y", ""),
"dx": scan_meta.get("dx", ""),
"dy": scan_meta.get("dy", ""),
"nx": scan_meta.get("nx", ""),
"ny": scan_meta.get("ny", ""),
"total_tiles": scan_meta.get("total_tiles", ""),
"scan_lines": scan_meta.get("scan_lines", ""),
"scan_mode": scan_meta.get("scan_mode", ""),
"start_datetime": scan_meta.get("start_datetime", ""),
"end_datetime": scan_meta.get("end_datetime", ""),
"status": scan_meta.get("status", ""),
"user": scan_meta.get("user", ""),
"disk_space_mb": scan_meta.get("disk_space_mb", ""),
"mosaic_url": mosaic_url,
"mosaic_local_path": str(mosaic_path),
"mosaic_downloaded": mosaic_downloaded,
}
)
# Write scan-level CSV row only if this scan hasn't been recorded before.
if mosaic_already_done and not metadata_only:
log.debug(
"[%s] Scan %d: already in scans.csv (mosaic was previously downloaded), skipping CSV row.",
machine["label"],
scan_id,
)
else:
scans_csv.write(
{
"machine": machine["label"],
"machine_id": machine["machine_id"],
"scan_id": scan_id,
"name": scan_meta.get("name", ""),
"scan_time": scan_meta.get("scan_time", ""),
"start_x": scan_meta.get("start_x", ""),
"start_y": scan_meta.get("start_y", ""),
"end_x": scan_meta.get("end_x", ""),
"end_y": scan_meta.get("end_y", ""),
"dx": scan_meta.get("dx", ""),
"dy": scan_meta.get("dy", ""),
"nx": scan_meta.get("nx", ""),
"ny": scan_meta.get("ny", ""),
"total_tiles": scan_meta.get("total_tiles", ""),
"scan_lines": scan_meta.get("scan_lines", ""),
"scan_mode": scan_meta.get("scan_mode", ""),
"start_datetime": scan_meta.get("start_datetime", ""),
"end_datetime": scan_meta.get("end_datetime", ""),
"status": scan_meta.get("status", ""),
"user": scan_meta.get("user", ""),
"disk_space_mb": scan_meta.get("disk_space_mb", ""),
"mosaic_url": mosaic_url,
"mosaic_local_path": str(mosaic_path),
"mosaic_on_disk": mosaic_path.exists(),
}
)
if mosaic_only:
return total
if mosaic_only or metadata_only:
return stats
# Tiles
tiles = sess.enumerate_tiles(scan_meta)
total += _download_tiles_for_scan(
stats.tiles_downloaded += _download_tiles_for_scan(
sess,
tiles,
scan_meta,
@@ -255,7 +342,7 @@ def process_scan(
tiles_csv,
dry_run,
)
return total
return stats
# ---------------------------------------------------------------------------
@@ -272,12 +359,13 @@ def scrape_machine(
scans_csv: CsvWriter,
dry_run: bool,
mosaic_only: bool,
scan_id_filter: int | None,
) -> int:
metadata_only: bool = False,
scan_id_filter: int | None = None,
) -> RunStats:
"""Login, fetch scans, and download all content for one machine."""
sess = MachineSession(machine, config)
if not sess.login():
return 0
return RunStats()
if scan_id_filter is not None:
scans: list[dict[str, Any]] = [
@@ -288,11 +376,11 @@ def scrape_machine(
scans = sess.get_all_scans()
if not scans:
log.warning("[%s] No scans found.", machine["label"])
return 0
return RunStats()
total = 0
stats = RunStats()
for scan in scans:
total += process_scan(
stats.merge(process_scan(
sess=sess,
scan=scan,
output_dir=output_dir,
@@ -303,5 +391,6 @@ def scrape_machine(
tiles_csv=tiles_csv,
dry_run=dry_run,
mosaic_only=mosaic_only,
)
return total
metadata_only=metadata_only,
))
return stats
+3 -1
View File
@@ -57,9 +57,11 @@ class ProgressTracker:
def save(self) -> None:
self.path.parent.mkdir(parents=True, exist_ok=True)
self.path.write_text(
tmp = self.path.with_suffix(".json.tmp")
tmp.write_text(
json.dumps({"completed_urls": sorted(self._done)}, indent=2)
)
tmp.replace(self.path) # atomic on POSIX; avoids corrupt JSON on crash
class CsvWriter:
+33 -13
View File
@@ -86,47 +86,67 @@ def recheck_archive(output_dir: Path, progress: ProgressTracker) -> int:
non-empty. Removes bad entries from progress so the next run re-downloads
them. Returns the count of entries removed.
Only tile URLs are checked (mosaic URLs are skipped — mosaics are large
single files and are unlikely to be partially written due to streaming).
Both tile URLs and mosaic URLs are checked.
"""
if len(progress) == 0:
log.info("Progress file is empty — nothing to recheck.")
return 0
tile_urls = [u for u in progress.iter_urls() if "cmd=image" in u]
mosaic_count = len(progress) - len(tile_urls)
all_urls = list(progress.iter_urls())
tile_urls = [u for u in all_urls if "cmd=image" in u]
mosaic_urls = [u for u in all_urls if "mosaic.jpg" in u]
log.info(
"Rechecking %d tile URLs (%d mosaic URLs not rechecked) …",
"Rechecking %d tile URL(s) and %d mosaic URL(s) …",
len(tile_urls),
mosaic_count,
len(mosaic_urls),
)
# Build a disk index once
existing_files = _build_disk_index(output_dir)
log.debug("Found %d tile files on disk.", len(existing_files))
# Build a disk index of all tile files once
existing_tiles = _build_disk_index(output_dir)
log.debug("Found %d tile files on disk.", len(existing_tiles))
bad_urls: list[str] = []
# --- Tile check ---
for url in tile_urls:
p = _parse_tile_url(url)
scan_id = p["scan_id"]
# Find tile files that live under a directory named after this scan_id
candidates = [path for path in existing_files if str(scan_id) in path.parts]
candidates = [path for path in existing_tiles if str(scan_id) in path.parts]
if not candidates:
bad_urls.append(url)
continue
if not any(existing_files[path] > 0 for path in candidates):
if not any(existing_tiles[path] > 0 for path in candidates):
bad_urls.append(url)
# --- Mosaic check ---
for url in mosaic_urls:
# Mosaic URLs: http://<host>:8011/RootView_Database/<scan_id>/mosaic.jpg
# Corresponding local path: <output_dir>/**/<scan_id>/mosaic.jpg
try:
scan_id = url.rstrip("/").split("/")[-2]
except IndexError:
bad_urls.append(url)
continue
matches = list(output_dir.glob(f"*/*/{scan_id}/mosaic.jpg"))
if not matches or not any(p.stat().st_size > 0 for p in matches):
log.debug("Mosaic missing or zero-byte for scan %s: %s", scan_id, url)
bad_urls.append(url)
if not bad_urls:
log.info("All %d tile URLs look healthy.", len(tile_urls))
log.info(
"All %d tile URL(s) and %d mosaic URL(s) look healthy.",
len(tile_urls),
len(mosaic_urls),
)
return 0
log.warning(
"Found %d suspect tile URL(s). Removing from progress.",
"Found %d suspect URL(s). Removing from progress.",
len(bad_urls),
)
for url in bad_urls:
-4
View File
@@ -263,10 +263,6 @@ class MachineSession:
}
if dry_run:
return row
if dest.exists():
row["downloaded_at"] = "already_exists"
row["file_size_bytes"] = dest.stat().st_size
return row
size = self.download_file(tile["url"], dest)
if size:
row["downloaded_at"] = datetime.now(timezone.utc).isoformat()
+1 -1
View File
@@ -46,7 +46,7 @@ SCANS_CSV_FIELDS: list[str] = [
"disk_space_mb",
"mosaic_url",
"mosaic_local_path",
"mosaic_downloaded",
"mosaic_on_disk",
]
TILES_CSV_FIELDS: list[str] = [