e3f0c07119
Co-authored-by: Cursor <cursoragent@cursor.com>
92 lines
2.9 KiB
Python
92 lines
2.9 KiB
Python
"""
|
|
Progress tracking (JSON) and CSV writing.
|
|
"""
|
|
|
|
import csv
|
|
import json
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Iterator
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class ProgressTracker:
|
|
"""
|
|
Tracks successfully downloaded URLs in a JSON file so runs can be resumed.
|
|
|
|
Public API (all external code should use only these methods):
|
|
is_done(url) — True if url has been downloaded
|
|
mark_done(url) — Record url as complete (call save() to persist)
|
|
discard(url) — Remove url from the completed set
|
|
iter_urls() — Iterate over all completed URLs
|
|
__len__() — Number of completed URLs
|
|
save() — Flush state to disk
|
|
"""
|
|
|
|
def __init__(self, path: Path) -> None:
|
|
self.path = path
|
|
self._done: set[str] = set()
|
|
self.started_at: str = datetime.now(timezone.utc).isoformat()
|
|
self._load()
|
|
|
|
def _load(self) -> None:
|
|
if self.path.exists():
|
|
try:
|
|
data = json.loads(self.path.read_text())
|
|
self._done = set(data.get("completed_urls", []))
|
|
if "started_at" in data:
|
|
self.started_at = data["started_at"]
|
|
log.info("Resuming: %d URLs already downloaded.", len(self._done))
|
|
except Exception:
|
|
log.warning("Could not read progress file; starting fresh.")
|
|
|
|
def is_done(self, url: str) -> bool:
|
|
return url in self._done
|
|
|
|
def mark_done(self, url: str) -> None:
|
|
self._done.add(url)
|
|
|
|
def discard(self, url: str) -> None:
|
|
"""Remove a URL from the completed set (re-queues it for download)."""
|
|
self._done.discard(url)
|
|
|
|
def iter_urls(self) -> Iterator[str]:
|
|
"""Iterate over all completed URLs."""
|
|
return iter(self._done)
|
|
|
|
def __len__(self) -> int:
|
|
return len(self._done)
|
|
|
|
def save(self) -> None:
|
|
self.path.parent.mkdir(parents=True, exist_ok=True)
|
|
tmp = self.path.with_suffix(".json.tmp")
|
|
tmp.write_text(
|
|
json.dumps(
|
|
{"started_at": self.started_at, "completed_urls": sorted(self._done)},
|
|
indent=2,
|
|
)
|
|
)
|
|
tmp.replace(self.path) # atomic on POSIX; avoids corrupt JSON on crash
|
|
|
|
|
|
class CsvWriter:
|
|
"""Append-mode CSV writer that writes a header on first creation."""
|
|
|
|
def __init__(self, path: Path, fields: list[str]) -> None:
|
|
is_new = not path.exists()
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
self._fh = open(path, "a", newline="", encoding="utf-8")
|
|
self._writer = csv.DictWriter(self._fh, fieldnames=fields)
|
|
if is_new:
|
|
self._writer.writeheader()
|
|
self._fields = fields
|
|
|
|
def write(self, row: dict) -> None:
|
|
self._writer.writerow({f: row.get(f, "") for f in self._fields})
|
|
self._fh.flush()
|
|
|
|
def close(self) -> None:
|
|
self._fh.close()
|