6 Commits

5 changed files with 27 additions and 399 deletions
+4 -104
View File
@@ -2,13 +2,11 @@
""" """
Report mosaic download progress from archives/scans.csv. Report mosaic download progress from archives/scans.csv.
Output is Markdown. Use ``--by-year`` for a per-machine × per-year Output is formatted as Markdown. Add --by-year for a per-machine ×
done/failed table. When the first mosaic pass is complete (no pending rows) per-year breakdown table.
but failures remain, a **Mosaic retry estimates** section is printed with
queue counts and duration hints.
Rate/ETA use a 30-minute rolling window when snapshots show progress. Rate/ETA require two calls at least 60 s apart. Mean mosaic size is
Mean mosaic size is sampled from up to 100 downloads (1-hour cache). sampled from up to 100 already-downloaded files and cached for 1 hour.
Usage: Usage:
python scripts/mosaic_progress_report.py [--archive DIR] [--by-year] python scripts/mosaic_progress_report.py [--archive DIR] [--by-year]
@@ -32,11 +30,6 @@ _R_PRE19 = 1.00
_R_PURGED = 0.00 _R_PURGED = 0.00
_R_RECENT = 0.82 _R_RECENT = 0.82
FIRST_PASS_FALLBACK_RATE_PER_HR = 1100.0
RETRY_OPTIMISTIC_RATE_PER_HR = 1800.0
RETRY_REALISTIC_RATE_PER_HR = 1100.0
RETRY_PESSIMISTIC_RATE_PER_HR = 300.0
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Helpers # Helpers
@@ -134,12 +127,6 @@ def _expected_remaining(pending_rows: list[dict]) -> float:
return count return count
def _retry_hours_from_rate(n_scans: int, rate_per_hr: float) -> str:
if n_scans <= 0 or rate_per_hr <= 0:
return ""
return _fmt_duration(n_scans / rate_per_hr * 3600.0)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Main # Main
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -226,32 +213,15 @@ def main() -> None:
rate_per_sec: float | None = None rate_per_sec: float | None = None
rate_window_str = "" rate_window_str = ""
snap_delta_proc = 0
if recent: if recent:
oldest = recent[0] oldest = recent[0]
dt = now.timestamp() - oldest["ts"] dt = now.timestamp() - oldest["ts"]
dp = processed - oldest["proc"] dp = processed - oldest["proc"]
snap_delta_proc = dp
if dt >= 60 and dp > 0: if dt >= 60 and dp > 0:
rate_per_sec = dp / dt rate_per_sec = dp / dt
window_min = dt / 60 window_min = dt / 60
rate_window_str = f"{window_min:.0f}-min avg" rate_window_str = f"{window_min:.0f}-min avg"
# One-time baseline after initial mosaic crawl finished (no pending rows).
if pending == 0 and "first_pass_mean_rate_per_hr" not in cache:
cache["first_pass_completed_at"] = now.isoformat()
cache["first_pass_processed"] = total
cache["first_pass_mean_rate_per_hr"] = FIRST_PASS_FALLBACK_RATE_PER_HR
first_pass_rate_hr = float(
cache.get("first_pass_mean_rate_per_hr", FIRST_PASS_FALLBACK_RATE_PER_HR)
)
live_rate_hr = rate_per_sec * 3600 if rate_per_sec else None
# Active scrape shows progress in snapshots; idle archive shows dp == 0.
retry_estimate_rate_hr = (
live_rate_hr if live_rate_hr is not None else first_pass_rate_hr
)
# --- Disk space --- # --- Disk space ---
mean_bytes: float | None = None mean_bytes: float | None = None
size_note = "" size_note = ""
@@ -355,76 +325,6 @@ def main() -> None:
align=["l", "r", "r", "r", "r", "r"], align=["l", "r", "r", "r", "r", "r"],
)) ))
# -----------------------------------------------------------------------
# Retry estimates (first pass complete: pending == 0, failures remain)
# -----------------------------------------------------------------------
if failed > 0 and pending == 0:
failed_rows_list = [
r for r in latest.values()
if r.get("mosaic_download_status") == "failed"
]
n_all = len(failed_rows_list)
n_2023 = sum(
1 for r in failed_rows_list
if (r.get("scan_time") or "")[:4] >= "2023"
and len((r.get("scan_time") or "")[:4]) == 4
)
n_200 = sum(
1 for r in failed_rows_list
if r.get("mosaic_error_code") == "200"
)
rate_note = (
"rolling 30-min window"
if snap_delta_proc > 0
else f"first-pass baseline ({first_pass_rate_hr:,.0f}/hr)"
)
print()
print("### Mosaic retry estimates\n")
print(
f"*Suggested command after server fix:* "
f"`python scraper.py --retry-failed --workers 2` "
f"(filters: `--retry-since YEAR`, `--retry-error-code CODE`)*\n"
)
print(
f"*ETA column uses **{retry_estimate_rate_hr:,.0f} scans/hr** "
f"({rate_note}). Fixed columns use scenario rates.*\n"
)
est_hdr = (
"Retry scope",
"Count",
f"@{RETRY_OPTIMISTIC_RATE_PER_HR:.0f}/hr",
f"@{RETRY_REALISTIC_RATE_PER_HR:.0f}/hr",
f"@{RETRY_PESSIMISTIC_RATE_PER_HR:.0f}/hr",
f"@{retry_estimate_rate_hr:.0f}/hr",
)
retry_tbl_rows = [
[
"HTTP 200 (empty body)",
f"{n_200:,}",
_retry_hours_from_rate(n_200, RETRY_OPTIMISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_200, RETRY_REALISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_200, RETRY_PESSIMISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_200, retry_estimate_rate_hr),
],
[
"Failed, scan_time ≥ 2023",
f"{n_2023:,}",
_retry_hours_from_rate(n_2023, RETRY_OPTIMISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_2023, RETRY_REALISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_2023, RETRY_PESSIMISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_2023, retry_estimate_rate_hr),
],
[
"**All failed**",
f"**{n_all:,}**",
_retry_hours_from_rate(n_all, RETRY_OPTIMISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_all, RETRY_REALISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_all, RETRY_PESSIMISTIC_RATE_PER_HR),
_retry_hours_from_rate(n_all, retry_estimate_rate_hr),
],
]
print(_md_table(list(est_hdr), retry_tbl_rows, align=["l", "r", "r", "r", "r", "r"]))
# ----------------------------------------------------------------------- # -----------------------------------------------------------------------
# --by-year table # --by-year table
# ----------------------------------------------------------------------- # -----------------------------------------------------------------------
+1 -47
View File
@@ -84,33 +84,6 @@ def parse_args() -> argparse.Namespace:
"inventorying all scans across all machines." "inventorying all scans across all machines."
), ),
) )
p.add_argument(
"--retry-failed",
action="store_true",
help=(
"Mosaic-only: re-attempt scans whose latest scans.csv row has "
"mosaic_download_status=failed (queue from CSV, not the server list). "
"Implies --mosaic-only."
),
)
p.add_argument(
"--retry-since",
metavar="YEAR",
default=None,
help=(
"With --retry-failed: only scans with scan_time year >= YEAR "
"(e.g. 2023)."
),
)
p.add_argument(
"--retry-error-code",
metavar="CODE",
default=None,
help=(
"With --retry-failed: filter by mosaic_error_code "
"(e.g. 200 for empty-body failures)."
),
)
p.add_argument( p.add_argument(
"--dry-run", "--dry-run",
action="store_true", action="store_true",
@@ -186,16 +159,6 @@ def main() -> None:
if args.scan_id is not None and args.scan_id <= 0: if args.scan_id is not None and args.scan_id <= 0:
sys.exit("--scan-id must be a positive integer") sys.exit("--scan-id must be a positive integer")
if args.retry_since and not args.retry_failed:
sys.exit("--retry-since requires --retry-failed.")
if args.retry_error_code and not args.retry_failed:
sys.exit("--retry-error-code requires --retry-failed.")
if args.retry_failed:
if args.metadata_only:
sys.exit("--retry-failed cannot be used with --metadata-only.")
args.mosaic_only = True # implied
# --list-machines doesn't need credentials # --list-machines doesn't need credentials
if args.list_machines: if args.list_machines:
base_url = "http://205.149.147.131:8010/" base_url = "http://205.149.147.131:8010/"
@@ -298,13 +261,7 @@ def main() -> None:
if args.metadata_only: if args.metadata_only:
log.info("Mode: metadata only (mosaics and tiles skipped)") log.info("Mode: metadata only (mosaics and tiles skipped)")
elif args.mosaic_only: elif args.mosaic_only:
if args.retry_failed: log.info("Mode: mosaics only (individual tiles skipped)")
log.info(
"Mode: mosaic retry (failed scans from %s)",
SCANS_CSV_FILENAME,
)
else:
log.info("Mode: mosaics only (individual tiles skipped)")
if args.dry_run: if args.dry_run:
log.info("Mode: dry-run (no files will be written)") log.info("Mode: dry-run (no files will be written)")
@@ -328,9 +285,6 @@ def main() -> None:
metadata_only=args.metadata_only, metadata_only=args.metadata_only,
scan_id_filter=args.scan_id, scan_id_filter=args.scan_id,
max_tiles=args.max_tiles, max_tiles=args.max_tiles,
retry_failed=args.retry_failed,
retry_since_year=args.retry_since,
retry_error_code=args.retry_error_code,
) )
totals.merge(stats) totals.merge(stats)
finally: finally:
+22 -114
View File
@@ -2,22 +2,14 @@
High-level scrape orchestration: drives the per-machine and per-scan loops. High-level scrape orchestration: drives the per-machine and per-scan loops.
""" """
import csv
import json import json
import logging import logging
import time
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
from tqdm import tqdm
from spruce.download_result import PERMANENT_MISSING, UNKNOWN, error_code_str from spruce.download_result import PERMANENT_MISSING, UNKNOWN, error_code_str
from spruce.exif import write_mosaic_exif
from spruce.paths import machine_dir_name, tile_dest, mosaic_dest, _extract_date
from spruce.progress import ProgressTracker, CsvWriter
from spruce.session import MachineSession
# RootView returns ~43-byte 1×1 JPEG placeholders for empty cells; stay well # RootView returns ~43-byte 1×1 JPEG placeholders for empty cells; stay well
# below smallest observed real tile (~7 KiB in production samples). # below smallest observed real tile (~7 KiB in production samples).
@@ -57,64 +49,16 @@ class RunStats:
self.scans_probe_skipped += other.scans_probe_skipped self.scans_probe_skipped += other.scans_probe_skipped
self.scans_disk_space_skipped += other.scans_disk_space_skipped self.scans_disk_space_skipped += other.scans_disk_space_skipped
from tqdm import tqdm
from spruce.exif import write_mosaic_exif
from spruce.paths import machine_dir_name, tile_dest, mosaic_dest, _extract_date
from spruce.progress import ProgressTracker, CsvWriter
from spruce.session import MachineSession
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def _read_scans_csv_latest(scans_csv_path: Path) -> dict[tuple[str, str], dict[str, str]]:
"""Last row wins per (machine, scan_id)."""
latest: dict[tuple[str, str], dict[str, str]] = {}
if not scans_csv_path.exists():
return latest
with open(scans_csv_path, newline="", encoding="utf-8") as fh:
for row in csv.DictReader(fh):
key = (row.get("machine", ""), row.get("scan_id", ""))
latest[key] = row
return latest
def load_failed_scans_from_csv(
scans_csv_path: Path,
machine_label: str,
*,
since_year: str | None = None,
error_code: str | None = None,
) -> list[dict[str, Any]]:
"""
Dedupe scans.csv by (machine, scan_id); return failed mosaic rows for one machine.
Each dict is suitable as the ``scan`` argument to ``process_scan`` (scan_id,
scan_time, name, status).
"""
latest = _read_scans_csv_latest(scans_csv_path)
out: list[dict[str, Any]] = []
for (_m, _sid), row in latest.items():
if row.get("machine") != machine_label:
continue
if row.get("mosaic_download_status") != "failed":
continue
if error_code is not None and row.get("mosaic_error_code", "") != error_code:
continue
st = row.get("scan_time", "") or ""
if since_year is not None:
yr = st[:4]
if len(yr) < 4 or yr < since_year:
continue
sid = int(row["scan_id"])
out.append(
{
"scan_id": sid,
"scan_time": st,
"name": row.get("name", ""),
"status": row.get("status", "") or "Completed",
"user": row.get("user", ""),
"scan_lines": row.get("scan_lines", ""),
"scan_mode": row.get("scan_mode", ""),
}
)
out.sort(key=lambda s: s["scan_id"])
return out
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Per-scan helpers # Per-scan helpers
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -555,9 +499,6 @@ def scrape_machine(
metadata_only: bool = False, metadata_only: bool = False,
scan_id_filter: int | None = None, scan_id_filter: int | None = None,
max_tiles: int | None = None, max_tiles: int | None = None,
retry_failed: bool = False,
retry_since_year: str | None = None,
retry_error_code: str | None = None,
) -> RunStats: ) -> RunStats:
"""Login, fetch scans, and download all content for one machine.""" """Login, fetch scans, and download all content for one machine."""
sess = MachineSession(machine, config) sess = MachineSession(machine, config)
@@ -577,37 +518,8 @@ def scrape_machine(
log.error("[%s] Login failed after 3 attempts — skipping machine.", machine["label"]) log.error("[%s] Login failed after 3 attempts — skipping machine.", machine["label"])
return RunStats() return RunStats()
if retry_failed: if scan_id_filter is not None:
scans = load_failed_scans_from_csv( scans: list[dict[str, Any]] = [
scans_csv.path,
machine["label"],
since_year=retry_since_year,
error_code=retry_error_code,
)
if scan_id_filter is not None:
scans = [s for s in scans if s["scan_id"] == scan_id_filter]
if not scans:
log.warning(
"[%s] Retry: scan_id %d not among failed rows for this machine.",
machine["label"],
scan_id_filter,
)
return RunStats()
log.info("[%s] Mosaic retry: single scan %d.", machine["label"], scan_id_filter)
elif not scans:
log.warning(
"[%s] No failed mosaic rows in scans.csv match retry filters.",
machine["label"],
)
return RunStats()
else:
log.info(
"[%s] Mosaic retry: %d failed scan(s) from scans.csv.",
machine["label"],
len(scans),
)
elif scan_id_filter is not None:
scans = [
{"scan_id": scan_id_filter, "status": "Completed"} {"scan_id": scan_id_filter, "status": "Completed"}
] ]
log.info("[%s] Targeting scan ID %d.", machine["label"], scan_id_filter) log.info("[%s] Targeting scan ID %d.", machine["label"], scan_id_filter)
@@ -617,25 +529,21 @@ def scrape_machine(
log.warning("[%s] No scans found.", machine["label"]) log.warning("[%s] No scans found.", machine["label"])
return RunStats() return RunStats()
# Build existing_ids: scan_ids to skip entirely (no metadata fetch, no HTTP). # Build a set of scan_ids already fully processed in a prior run so we can
# In normal mode: skip anything with a definitive non-pending status. # skip them entirely (no metadata fetch, no mosaic request).
# In retry mode: only skip scans that are already downloaded or skipped for # Only scans with a definitive non-pending status count; skipped_metadata_only
# disk-space reasons — failed scans must be re-attempted. # rows still need to be processed in mosaic mode.
PENDING_STATUSES = {"skipped_metadata_only", ""} PENDING_STATUSES = {"skipped_metadata_only", ""}
BLOCK_AFTER_RETRY_STATUSES = {"downloaded", "skipped_zero_disk_space"}
existing_ids: set[int] = set() existing_ids: set[int] = set()
if not metadata_only: if not metadata_only and scans_csv._fh.name:
latest_rows = _read_scans_csv_latest(scans_csv.path) existing_path = Path(scans_csv._fh.name)
for (_mlabel, _sid), _row in latest_rows.items(): if existing_path.exists():
if _mlabel != machine["label"]: import csv as _csv
continue with open(existing_path, newline="", encoding="utf-8") as _f:
st = _row.get("mosaic_download_status", "") for _row in _csv.DictReader(_f):
if retry_failed: if _row.get("machine") == machine["label"]:
if st in BLOCK_AFTER_RETRY_STATUSES: if _row.get("mosaic_download_status", "") not in PENDING_STATUSES:
existing_ids.add(int(_row["scan_id"])) existing_ids.add(int(_row["scan_id"]))
else:
if st not in PENDING_STATUSES:
existing_ids.add(int(_row["scan_id"]))
stats = RunStats() stats = RunStats()
for scan in scans: for scan in scans:
-1
View File
@@ -77,7 +77,6 @@ class CsvWriter:
def __init__(self, path: Path, fields: list[str]) -> None: def __init__(self, path: Path, fields: list[str]) -> None:
is_new = not path.exists() is_new = not path.exists()
path.parent.mkdir(parents=True, exist_ok=True) path.parent.mkdir(parents=True, exist_ok=True)
self.path = path
self._fh = open(path, "a", newline="", encoding="utf-8") self._fh = open(path, "a", newline="", encoding="utf-8")
self._writer = csv.DictWriter(self._fh, fieldnames=fields) self._writer = csv.DictWriter(self._fh, fieldnames=fields)
if is_new: if is_new:
-133
View File
@@ -1,133 +0,0 @@
"""Retry queue loading from scans.csv (mosaic_download_status=failed)."""
import csv
from pathlib import Path
import pytest
from spruce.orchestrator import load_failed_scans_from_csv
from spruce.settings import SCANS_CSV_FIELDS
def _blank_row(**kwargs: str) -> dict[str, str]:
row = {k: "" for k in SCANS_CSV_FIELDS}
row.update(kwargs)
return row
def _write_scans_csv(path: Path, rows: list[dict[str, str]]) -> None:
with open(path, "w", newline="", encoding="utf-8") as fh:
w = csv.DictWriter(fh, fieldnames=SCANS_CSV_FIELDS)
w.writeheader()
for r in rows:
w.writerow({k: r.get(k, "") for k in SCANS_CSV_FIELDS})
def test_load_failed_scans_dedup_keeps_last_row(tmp_path: Path) -> None:
path = tmp_path / "scans.csv"
common = {
"machine": "BW1 [X]",
"machine_id": "1",
"scan_id": "100",
"mosaic_url": "http://x/m.jpg",
"mosaic_local_path": "",
"mosaic_on_disk": "False",
}
_write_scans_csv(
path,
[
_blank_row(
**common,
mosaic_download_status="failed",
mosaic_error_code="404",
scan_time="2020-01-01",
),
_blank_row(
**common,
mosaic_download_status="failed",
mosaic_error_code="404",
scan_time="2020-06-01",
),
],
)
out = load_failed_scans_from_csv(path, "BW1 [X]")
assert len(out) == 1
assert out[0]["scan_id"] == 100
assert out[0]["scan_time"] == "2020-06-01"
def test_load_failed_scans_since_year(tmp_path: Path) -> None:
path = tmp_path / "scans.csv"
base = {
"machine": "M",
"machine_id": "1",
"mosaic_url": "",
"mosaic_local_path": "",
"mosaic_on_disk": "",
"mosaic_download_status": "failed",
"mosaic_error_code": "404",
}
_write_scans_csv(
path,
[
_blank_row(**base, scan_id="1", scan_time="2022-12-31"),
_blank_row(**base, scan_id="2", scan_time="2023-01-01"),
_blank_row(**base, scan_id="3", scan_time=""),
],
)
out = load_failed_scans_from_csv(path, "M", since_year="2023")
ids = {s["scan_id"] for s in out}
assert ids == {2}
def test_load_failed_scans_error_code(tmp_path: Path) -> None:
path = tmp_path / "scans.csv"
base = {
"machine": "M",
"machine_id": "1",
"scan_time": "2024-01-01",
"mosaic_url": "",
"mosaic_local_path": "",
"mosaic_on_disk": "",
"mosaic_download_status": "failed",
}
_write_scans_csv(
path,
[
_blank_row(**base, scan_id="10", mosaic_error_code="404"),
_blank_row(**base, scan_id="11", mosaic_error_code="200"),
],
)
out = load_failed_scans_from_csv(path, "M", error_code="200")
assert [s["scan_id"] for s in out] == [11]
def test_load_failed_scans_excludes_downloaded(tmp_path: Path) -> None:
path = tmp_path / "scans.csv"
base = {
"machine": "M",
"machine_id": "1",
"scan_time": "2024-01-01",
"mosaic_url": "",
"mosaic_local_path": "",
"mosaic_on_disk": "True",
}
_write_scans_csv(
path,
[
_blank_row(
**base,
scan_id="5",
mosaic_download_status="downloaded",
mosaic_error_code="",
),
_blank_row(
**base,
scan_id="6",
mosaic_download_status="failed",
mosaic_error_code="404",
),
],
)
out = load_failed_scans_from_csv(path, "M")
assert [s["scan_id"] for s in out] == [6]