""" Archive integrity checks — find corrupt / missing tiles and remove them from the progress tracker so they are re-downloaded on the next run. """ import logging import urllib.parse from pathlib import Path from typing import Any from spruce.progress import ProgressTracker log = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Private helpers # --------------------------------------------------------------------------- def _parse_tile_url(url: str) -> dict[str, str]: """Extract scan_id, x, y from a tile URL query string.""" qs = dict(urllib.parse.parse_qsl(urllib.parse.urlparse(url).query)) return { "scan_id": qs.get("id", ""), "x": qs.get("x", ""), "y": qs.get("y", ""), } def _build_disk_index(output_dir: Path) -> dict[Path, int]: """Return {tile_path: size_bytes} for every tile file found on disk.""" return {p: p.stat().st_size for p in output_dir.rglob("tile_r*.jpg")} # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def recheck_tile_files(output_dir: Path, progress: ProgressTracker) -> int: """ Walk every tile file on disk and delete any that are zero bytes. Also removes the corresponding URL from progress in the same pass, so a single --recheck call is sufficient before resuming. Returns the count of files deleted. """ # Build a reverse map: (scan_id, x, y) -> url for all completed tile URLs coord_to_url: dict[tuple[str, str, str], str] = {} for url in progress.iter_urls(): if "cmd=image" in url: p = _parse_tile_url(url) key = (p["scan_id"], p["x"], p["y"]) coord_to_url[key] = url deleted = 0 for tile_path in output_dir.rglob("tile_r*.jpg"): if tile_path.stat().st_size == 0: log.warning("Deleting zero-byte tile: %s", tile_path) tile_path.unlink() deleted += 1 # Try to find the matching URL from progress and discard it scan_id = _scan_id_from_path(tile_path) if scan_id: # Discard any URL for this scan_id — precise x/y matching # requires metadata.json; scan-level discard is safe because # recheck_archive will clean up any remaining stale URLs. for key, url in list(coord_to_url.items()): if key[0] == scan_id: progress.discard(url) del coord_to_url[key] if deleted: log.info("Deleted %d zero-byte tile file(s).", deleted) progress.save() else: log.info("No zero-byte tile files found on disk.") return deleted def recheck_archive(output_dir: Path, progress: ProgressTracker) -> int: """ Walk every URL in .progress.json and verify its local file exists and is non-empty. Removes bad entries from progress so the next run re-downloads them. Returns the count of entries removed. Only tile URLs are checked (mosaic URLs are skipped — mosaics are large single files and are unlikely to be partially written due to streaming). """ if len(progress) == 0: log.info("Progress file is empty — nothing to recheck.") return 0 tile_urls = [u for u in progress.iter_urls() if "cmd=image" in u] mosaic_count = len(progress) - len(tile_urls) log.info( "Rechecking %d tile URLs (%d mosaic URLs not rechecked) …", len(tile_urls), mosaic_count, ) # Build a disk index once existing_files = _build_disk_index(output_dir) log.debug("Found %d tile files on disk.", len(existing_files)) bad_urls: list[str] = [] for url in tile_urls: p = _parse_tile_url(url) scan_id = p["scan_id"] # Find tile files that live under a directory named after this scan_id candidates = [path for path in existing_files if str(scan_id) in path.parts] if not candidates: bad_urls.append(url) continue if not any(existing_files[path] > 0 for path in candidates): bad_urls.append(url) if not bad_urls: log.info("All %d tile URLs look healthy.", len(tile_urls)) return 0 log.warning( "Found %d suspect tile URL(s). Removing from progress.", len(bad_urls), ) for url in bad_urls: progress.discard(url) progress.save() log.info( "Removed %d URL(s) from .progress.json — they will be re-downloaded on next run.", len(bad_urls), ) return len(bad_urls) # --------------------------------------------------------------------------- # Internal utility # --------------------------------------------------------------------------- def _scan_id_from_path(tile_path: Path) -> str | None: """ Given a tile path like .../158374/tiles/tile_r0_c0.jpg, return '158374'. Looks for the directory two levels above the filename (parent.parent.name). """ try: # structure: ///tiles/ return tile_path.parent.parent.name except Exception: return None