Files
SPRUCE-scraper/spruce/progress.py
T

92 lines
2.9 KiB
Python

"""
Progress tracking (JSON) and CSV writing.
"""
import csv
import json
import logging
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterator
log = logging.getLogger(__name__)
class ProgressTracker:
"""
Tracks successfully downloaded URLs in a JSON file so runs can be resumed.
Public API (all external code should use only these methods):
is_done(url) — True if url has been downloaded
mark_done(url) — Record url as complete (call save() to persist)
discard(url) — Remove url from the completed set
iter_urls() — Iterate over all completed URLs
__len__() — Number of completed URLs
save() — Flush state to disk
"""
def __init__(self, path: Path) -> None:
self.path = path
self._done: set[str] = set()
self.started_at: str = datetime.now(timezone.utc).isoformat()
self._load()
def _load(self) -> None:
if self.path.exists():
try:
data = json.loads(self.path.read_text())
self._done = set(data.get("completed_urls", []))
if "started_at" in data:
self.started_at = data["started_at"]
log.info("Resuming: %d URLs already downloaded.", len(self._done))
except Exception:
log.warning("Could not read progress file; starting fresh.")
def is_done(self, url: str) -> bool:
return url in self._done
def mark_done(self, url: str) -> None:
self._done.add(url)
def discard(self, url: str) -> None:
"""Remove a URL from the completed set (re-queues it for download)."""
self._done.discard(url)
def iter_urls(self) -> Iterator[str]:
"""Iterate over all completed URLs."""
return iter(self._done)
def __len__(self) -> int:
return len(self._done)
def save(self) -> None:
self.path.parent.mkdir(parents=True, exist_ok=True)
tmp = self.path.with_suffix(".json.tmp")
tmp.write_text(
json.dumps(
{"started_at": self.started_at, "completed_urls": sorted(self._done)},
indent=2,
)
)
tmp.replace(self.path) # atomic on POSIX; avoids corrupt JSON on crash
class CsvWriter:
"""Append-mode CSV writer that writes a header on first creation."""
def __init__(self, path: Path, fields: list[str]) -> None:
is_new = not path.exists()
path.parent.mkdir(parents=True, exist_ok=True)
self._fh = open(path, "a", newline="", encoding="utf-8")
self._writer = csv.DictWriter(self._fh, fieldnames=fields)
if is_new:
self._writer.writeheader()
self._fields = fields
def write(self, row: dict) -> None:
self._writer.writerow({f: row.get(f, "") for f in self._fields})
self._fh.flush()
def close(self) -> None:
self._fh.close()