""" HTTP session for a single RootView machine: login, scan listing, tile downloads. """ import logging import time from datetime import datetime, timezone from pathlib import Path from urllib.parse import urljoin from typing import Any import requests from bs4 import BeautifulSoup from spruce.download_result import ( OK, UNKNOWN, DownloadResult, classify_http_error, error_code_str, ) from spruce.parsers import parse_scan_row, parse_scan_view, _grid_values log = logging.getLogger(__name__) USER_AGENT = "spruce-scraper/1.0" class MachineSession: """Manages an authenticated HTTP session for one RootView machine.""" def __init__(self, machine: dict[str, Any], config: dict[str, Any]) -> None: self.machine = machine self.cfg = config self.http = requests.Session() self.http.headers["User-Agent"] = USER_AGENT self.base_url: str = config["base_url"] self.image_base_url: str = config.get( "image_base_url", "http://205.149.147.131:8011/" ) # ------------------------------------------------------------------ # Auth # ------------------------------------------------------------------ def login(self) -> bool: url = urljoin(self.base_url, "index.php") payload = { "RTLLogin": "1", "RTLNAME": self.machine["option_value"], "RTLUSER": self.cfg["username"], "RTLPWD": self.cfg["password"], "rtl_latest_version": "3.0.0.18", "submit": " submit ", } try: resp = self.http.post(url, data=payload, timeout=self.cfg["timeout"]) resp.raise_for_status() except requests.RequestException as exc: log.error("[%s] Login failed: %s", self.machine["label"], exc) return False soup = BeautifulSoup(resp.text, "lxml") error_tag = soup.find(class_="error") if error_tag and error_tag.get_text(strip=True): log.error( "[%s] Login rejected: %s", self.machine["label"], error_tag.get_text(strip=True), ) return False log.info("[%s] Login succeeded.", self.machine["label"]) return True # ------------------------------------------------------------------ # Scan list (paginated) # ------------------------------------------------------------------ def get_all_scans( self, first_page_only: bool = False ) -> list[dict[str, Any]]: """ Fetch the scan list from the RootView table. By default, walks all pages. With first_page_only=True, only the first request is made (FilterCount 320) — enough for a random pick without paginating a large history. """ page_size = 320 if first_page_only: all_scans = self._fetch_scan_page(0, page_size) log.info( "[%s] First page only: %d scan(s) (not paginating).", self.machine["label"], len(all_scans), ) return all_scans all_scans: list[dict[str, Any]] = [] start = 0 while True: page_scans = self._fetch_scan_page(start, page_size) if not page_scans: break all_scans.extend(page_scans) log.debug( "[%s] Page start=%d: %d scans (total so far: %d)", self.machine["label"], start, len(page_scans), len(all_scans), ) if len(page_scans) < page_size: break start += page_size time.sleep(self.cfg["request_delay"]) log.info("[%s] Found %d scans.", self.machine["label"], len(all_scans)) return all_scans def _fetch_scan_page( self, start: int, page_size: int ) -> list[dict[str, Any]]: """POST the scan list form and parse the returned table.""" time.sleep(self.cfg["request_delay"]) resp = self.http.post( urljoin(self.base_url, "index.php"), data={ "cmd": "scan", "start": str(start), "order": "0", "order_dir": "1", "FilterScanStatus": "2", # Completed scans "FilterUser": "", "hidedate": "", "FilterDtFrom": "", "FilterDtTo": "", "FilterIdFrom": "0", "FilterIdTo": "0", "FilterCount": str(page_size), }, timeout=self.cfg["timeout"], ) resp.raise_for_status() soup = BeautifulSoup(resp.text, "lxml") scans: list[dict[str, Any]] = [] for row in soup.find_all("tr"): cells = [td.get_text(strip=True) for td in row.find_all("td")] scan = parse_scan_row(cells) if scan: scans.append(scan) return scans # ------------------------------------------------------------------ # Scan detail # ------------------------------------------------------------------ def get_scan_metadata(self, scan_id: int) -> dict[str, Any]: """Fetch the scan view page and extract grid parameters.""" time.sleep(self.cfg["request_delay"]) resp = self.http.get( urljoin(self.base_url, "index.php"), params={"cmd": "scan", "mode": "view", "id": str(scan_id)}, timeout=self.cfg["timeout"], ) resp.raise_for_status() return parse_scan_view(resp.text) # ------------------------------------------------------------------ # Tile enumeration # ------------------------------------------------------------------ def enumerate_tiles(self, scan_meta: dict[str, Any]) -> list[dict[str, Any]]: """ Generate the full list of tile descriptors for a scan. Each descriptor has: url, row_index, col_index, x_mm, y_mm """ scan_id = scan_meta["scan_id"] nx: int = scan_meta.get("nx", 0) ny: int = scan_meta.get("ny", 0) start_x: float = scan_meta.get("start_x", 0.0) start_y: float = scan_meta.get("start_y", 0.0) dx: float = scan_meta.get("dx", 1.0) dy: float = scan_meta.get("dy", 1.0) scale: int = self.cfg.get("tile_scale", 1) xs = _grid_values(start_x, nx, dx) ys = _grid_values(start_y, ny, dy) tiles: list[dict[str, Any]] = [] for row_idx, y in enumerate(ys): for col_idx, x in enumerate(xs): url = ( urljoin(self.base_url, "index.php") + f"?cmd=image&mode=image_scan&id={scan_id}" + f"&s={scale}&x={x}&y={y}" ) tiles.append( { "scan_id": scan_id, "row_index": row_idx, "col_index": col_idx, "x_mm": x, "y_mm": y, "url": url, } ) return tiles # ------------------------------------------------------------------ # Mosaic URL # ------------------------------------------------------------------ def mosaic_url(self, scan_id: int) -> str: # The server stores scan directories zero-padded to 6 digits (e.g. 010700/). # Scans with IDs >= 100000 are unaffected since they are already 6 digits. return urljoin( self.image_base_url, f"RootView_Database/{scan_id:06d}/mosaic.jpg" ) # ------------------------------------------------------------------ # Downloads # ------------------------------------------------------------------ def download_file( self, url: str, dest: Path, retries: int = 3 ) -> DownloadResult: """ Stream-download url to dest with retries. Returns a DownloadResult (bytes, optional HTTP code, final error, class). """ dest.parent.mkdir(parents=True, exist_ok=True) backoff = 5.0 for attempt in range(1, retries + 1): try: resp = self.http.get( url, timeout=self.cfg["timeout"], stream=True ) resp.raise_for_status() size = 0 with open(dest, "wb") as fh: for chunk in resp.iter_content(chunk_size=65536): if chunk: fh.write(chunk) size += len(chunk) if size == 0: return DownloadResult( 0, resp.status_code, "0 bytes in response body", UNKNOWN, ) return DownloadResult(size, resp.status_code, None, OK) except Exception as exc: sc: int | None = None if ( isinstance(exc, requests.HTTPError) and exc.response is not None ): sc = exc.response.status_code if attempt < retries: log.warning( "Attempt %d/%d failed %s: %s — retrying in %.0fs", attempt, retries, url, exc, backoff, ) time.sleep(backoff) backoff *= 2 else: log.warning( "Download failed after %d attempts %s: %s", retries, url, exc, ) cl = classify_http_error(sc, exc) return DownloadResult(0, sc, str(exc), cl) return DownloadResult(0, None, "download_file: exhausted", UNKNOWN) def download_tile( self, tile: dict[str, Any], dest: Path, dry_run: bool ) -> dict[str, Any]: """Download a single tile. Returns a metadata row dict.""" row: dict[str, Any] = { "machine": self.machine["label"], "machine_id": self.machine["machine_id"], "scan_id": tile["scan_id"], "scan_time": tile.get("scan_time", ""), "row_index": tile["row_index"], "col_index": tile["col_index"], "x_mm": tile["x_mm"], "y_mm": tile["y_mm"], "url": tile["url"], "local_path": str(dest), "downloaded_at": "", "file_size_bytes": "", "status": "", "error": "", "error_code": "", "error_class": "", } if dry_run: row["status"] = "dry_run" return row res = self.download_file(tile["url"], dest) if res.ok: row["status"] = "downloaded" row["downloaded_at"] = datetime.now(timezone.utc).isoformat() row["file_size_bytes"] = res.size else: row["status"] = "failed" row["error"] = res.error or "" row["error_code"] = error_code_str(res.status_code) row["error_class"] = res.error_class return row