Files
SPRUCE-scraper/spruce/exif.py
poprhythm 08a29d124a Add offline mosaic EXIF tagging (stitch --write-exif, tag_mosaic_exif CLI)
- spruce.exif: tag_mosaic_jpeg_for_scan_dir, resolve_machine_label_for_scan_dir; ProcessingSoftware for tile-stitched mosaics
- spruce.settings: load_config(require_credentials=False) for config without login
- scripts/tag_mosaic_exif.py and tests; stitch script --write-exif path
2026-04-26 20:47:23 -04:00

350 lines
12 KiB
Python

"""
Write EXIF metadata into downloaded mosaic JPEGs (piexif, no re-encode).
"""
import json
import logging
import re
from pathlib import Path
from typing import Any
import piexif
from spruce.paths import machine_dir_name
from piexif import ExifIFD, GPSIFD, ImageIFD
log = logging.getLogger(__name__)
USER_COMMENT_ASCII = b"ASCII\x00\x00\x00"
# ProcessingSoftware for mosaics stitched from tiles (distinct from server download path).
DEFAULT_PROCESSING_SOFTWARE_MOSAIC_FROM_TILES = "spruce-scraper/1.0 mosaic_from_tiles"
def _fmt_exif_datetime(scan_time: str) -> str:
"""`YYYY-MM-DD HH:MM:SS` -> `YYYY:MM:DD HH:MM:SS` for EXIF; empty on failure."""
m = re.match(
r"^(\d{4})-(\d{2})-(\d{2})\s+(\d{2}):(\d{2}):(\d{2})$", scan_time.strip()
)
if not m:
return ""
y, mo, d, h, mi, s = m.groups()
return f"{y}:{mo}:{d} {h}:{mi}:{s}"
def _fmt_dim(v: Any) -> str:
if v is None:
return "?"
if isinstance(v, float) and v == int(v):
return str(int(v))
return str(v)
def _fmt_machine_meta_scalar(v: Any) -> str:
if isinstance(v, float) and v == int(v):
return str(int(v))
if isinstance(v, float):
return format(v, "g")
return str(v).strip()
def _user_comment_treatment_suffix(machine_meta: dict[str, Any] | None) -> str:
if not machine_meta:
return ""
parts: list[str] = []
pn = machine_meta.get("plot_number")
if pn is not None and str(pn).strip() != "":
parts.append(f"plot_number {_fmt_machine_meta_scalar(pn)}")
if machine_meta.get("enclosure") is not None:
enc = machine_meta["enclosure"]
if isinstance(enc, bool):
parts.append("enclosure yes" if enc else "enclosure no")
else:
parts.append(f"enclosure {_fmt_machine_meta_scalar(enc)}")
if machine_meta.get("temp_treatment") is not None:
tt = machine_meta["temp_treatment"]
parts.append(f"temp_treatment {_fmt_machine_meta_scalar(tt)}")
if machine_meta.get("co2_treatment") is not None:
c = str(machine_meta["co2_treatment"]).strip().lower()
parts.append(f"co2_treatment {c}")
if not parts:
return ""
return " | " + " | ".join(parts)
def _build_user_comment(
scan_meta: dict[str, Any],
machine: dict[str, Any],
scan_id: int,
machine_meta: dict[str, Any] | None = None,
) -> bytes:
nx = scan_meta.get("nx", "?")
ny = scan_meta.get("ny", "?")
dx = _fmt_dim(scan_meta.get("dx"))
dy = _fmt_dim(scan_meta.get("dy"))
ex = _fmt_dim(scan_meta.get("end_x"))
ey = _fmt_dim(scan_meta.get("end_y"))
text = (
f"SPRUCE scan {scan_id} | machine {machine['label']} | "
f"grid {nx}x{ny} @ {dx}x{dy}mm over {ex}x{ey}mm | "
f"see metadata.json"
)
text += _user_comment_treatment_suffix(machine_meta)
return USER_COMMENT_ASCII + text.encode("ascii", errors="replace")
def _co2_keyword(co2: str) -> str:
c = (co2 or "").strip().lower()
if c == "ambient":
return "aCO2"
if c == "elevated":
return "eCO2"
return c or "co2"
def _enclosure_keyword(enclosure: Any) -> str | None:
if enclosure is None:
return None
if isinstance(enclosure, bool):
return "enclosed" if enclosure else "no enclosure"
s = str(enclosure).strip().lower()
if s in ("yes", "y", "true", "1"):
return "enclosed"
if s in ("no", "n", "false", "0"):
return "no enclosure"
return None
def _temp_treatment_keyword(temp: Any) -> str | None:
if temp is None or isinstance(temp, bool):
return None
if isinstance(temp, (int, float)):
w = float(temp)
s = f"{w:g}C" if w != int(w) else f"{int(w)}C"
return f"temp +{s}"
t = str(temp).strip()
if not t:
return None
try:
w = float(t)
s = f"{w:g}C" if w != int(w) else f"{int(w)}C"
return f"temp +{s}"
except (TypeError, ValueError):
return f"temp {t}"
def _build_xp_keywords(machine_meta: dict[str, Any] | None) -> bytes | None:
if not machine_meta:
return None
parts: list[str] = []
pn = machine_meta.get("plot_number")
if pn is not None and str(pn).strip() != "":
parts.append(f"plot {pn}")
enc = _enclosure_keyword(machine_meta.get("enclosure"))
if enc:
parts.append(enc)
if machine_meta.get("temp_treatment") is not None:
tk = _temp_treatment_keyword(machine_meta["temp_treatment"])
if tk:
parts.append(tk)
if machine_meta.get("co2_treatment") is not None:
parts.append(_co2_keyword(str(machine_meta["co2_treatment"])))
if not parts:
return None
text = "SPRUCE; " + "; ".join(parts)
return text.encode("utf-16le") + b"\x00\x00"
def _decimal_to_dms_rational(deg: float) -> list[tuple[tuple[int, int], ...]]:
abs_deg = abs(deg)
d = int(abs_deg)
t_min = (abs_deg - d) * 60.0
m = int(t_min)
s = (t_min - m) * 60.0
sec = round(s * 1_000_000)
return [(d, 1), (m, 1), (sec, 1_000_000)]
def write_mosaic_exif(
jpeg_path: Path,
scan_meta: dict[str, Any],
machine: dict[str, Any],
scan_id: int,
machine_meta: dict[str, Any] | None,
processing_software: str = "spruce-scraper/1.0",
) -> bool:
"""
Insert EXIF into a mosaic JPEG. Returns True on success.
On failure, logs a warning and returns False.
"""
try:
name = (scan_meta.get("name") or "").strip()
desc = f"{machine['label']} scan {scan_id}"
if name:
desc = f"{desc} ({name})"
desc_b = desc.encode("utf-8", errors="replace")
make_b = b"RootView"
ver = (machine.get("version") or "").strip()
software_b = f"RootView {ver}".encode("utf-8") if ver else b"RootView"
model_b = str(machine.get("label", "")).encode("utf-8", errors="replace")
proc_b = processing_software.encode("utf-8", errors="replace")
artist = (scan_meta.get("user") or "").strip()
artist_b = artist.encode("utf-8", errors="replace") if artist else b""
scan_time = (scan_meta.get("scan_time") or "").strip()
dt = _fmt_exif_datetime(scan_time)
dt_b = dt.encode("ascii") if dt else b""
zeroth: dict[int, Any] = {
ImageIFD.ImageDescription: desc_b,
ImageIFD.Make: make_b,
ImageIFD.Model: model_b,
ImageIFD.Software: software_b,
ImageIFD.ProcessingSoftware: proc_b,
}
if artist_b:
zeroth[ImageIFD.Artist] = artist_b
if dt_b:
zeroth[ImageIFD.DateTime] = dt_b
wp = _build_xp_keywords(machine_meta)
if wp is not None:
zeroth[ImageIFD.XPKeywords] = wp
exif_ifd: dict[int, Any] = {
ExifIFD.UserComment: _build_user_comment(
scan_meta, machine, scan_id, machine_meta
),
}
if dt:
bdt = dt.encode("ascii")
exif_ifd[ExifIFD.DateTimeOriginal] = bdt
exif_ifd[ExifIFD.DateTimeDigitized] = bdt
exif_dict: dict[str, Any] = {
"0th": zeroth,
"Exif": exif_ifd,
}
lat_raw = None if not machine_meta else machine_meta.get("latitude_wgs_84")
lon_raw = None if not machine_meta else machine_meta.get("longitude_wgs_84")
if machine_meta and lat_raw is not None and lon_raw is not None:
lat = float(lat_raw)
lon = float(lon_raw)
if not (-90 <= lat <= 90) or not (-180 <= lon <= 180):
log.warning("Invalid lat/lon for EXIF GPS, skipping GPS: %s", jpeg_path)
else:
gps: dict[int, Any] = {
GPSIFD.GPSVersionID: (2, 0, 0, 0),
GPSIFD.GPSLatitudeRef: b"N" if lat >= 0 else b"S",
GPSIFD.GPSLatitude: _decimal_to_dms_rational(abs(lat)),
GPSIFD.GPSLongitudeRef: b"E" if lon >= 0 else b"W",
GPSIFD.GPSLongitude: _decimal_to_dms_rational(abs(lon)),
}
if machine_meta.get("elevation_masl") is not None:
alt = float(machine_meta["elevation_masl"])
alt_abs = abs(alt)
gps[GPSIFD.GPSAltitudeRef] = 0 if alt >= 0 else 1
if alt_abs == int(alt_abs):
gps[GPSIFD.GPSAltitude] = (int(alt_abs), 1)
else:
num = round(alt_abs * 1000)
gps[GPSIFD.GPSAltitude] = (num, 1000)
exif_dict["GPS"] = gps
exif_bytes = piexif.dump(exif_dict)
piexif.insert(exif_bytes, str(jpeg_path))
except Exception as exc:
log.warning("EXIF write failed for %s: %s", jpeg_path, exc)
return False
return True
def resolve_machine_label_for_scan_dir(
scan_dir: Path,
config: dict[str, Any],
machine_label: str | None,
) -> str:
"""
Return the RootView machine label for EXIF.
If ``machine_label`` is set, it is returned stripped. Otherwise the parent
of ``scan_dir`` is walked (archive slug under ``machine_dir_name``) and
matched uniquely against keys in ``config['machine_metadata']``.
"""
if machine_label is not None and str(machine_label).strip() != "":
return str(machine_label).strip()
slug = scan_dir.parent.parent.name
meta = config.get("machine_metadata") or {}
matches = [lbl for lbl in meta if machine_dir_name({"label": lbl}) == slug]
if len(matches) == 1:
return matches[0]
if not matches:
raise ValueError(
f"Could not map archive folder {slug!r} to a machine label. "
f"Add it under machine_metadata in config or pass machine_label explicitly."
)
raise ValueError(
f"Ambiguous archive folder {slug!r}: multiple machine_metadata keys "
f"match ({matches!r}). Pass machine_label explicitly."
)
def tag_mosaic_jpeg_for_scan_dir(
scan_dir: Path,
jpeg_path: Path,
config: dict[str, Any],
*,
machine_label: str | None = None,
processing_software: str | None = None,
force: bool = False,
) -> bool:
"""
Write the same mosaic EXIF as the scraper, using ``metadata.json`` in
``scan_dir`` and optional ``machine_metadata`` from ``config``.
Returns False if tagging was skipped (e.g. ``write_exif`` disabled and not
``force``). Raises ``ValueError`` for invalid paths or JPEG suffix.
``force=True`` tags even when ``config['write_exif']`` is false (e.g. stitch
script ``--write-exif``).
"""
scan_dir = scan_dir.resolve()
jpeg_path = jpeg_path.resolve()
suffix = jpeg_path.suffix.lower()
if suffix not in (".jpg", ".jpeg"):
raise ValueError(
f"EXIF tagging only supports JPEG files; got suffix {jpeg_path.suffix!r}: {jpeg_path}"
)
if not force and not config.get("write_exif", True):
log.warning(
"Skipping EXIF: write_exif is false in config (use force=True to override)."
)
return False
meta_path = scan_dir / "metadata.json"
if not meta_path.is_file():
raise ValueError(f"Missing metadata.json: {meta_path}")
with meta_path.open(encoding="utf-8") as fh:
scan_meta: dict[str, Any] = json.load(fh)
scan_id = int(scan_meta["scan_id"])
if scan_dir.name != str(scan_id):
raise ValueError(
f"scan_dir must be the scan id folder (got name {scan_dir.name!r}, "
f"metadata scan_id={scan_id}). Expected …/<date>/{scan_id}/"
)
label = resolve_machine_label_for_scan_dir(scan_dir, config, machine_label)
machine: dict[str, Any] = {"label": label, "version": ""}
mmeta = (config.get("machine_metadata") or {}).get(label)
proc = processing_software or DEFAULT_PROCESSING_SOFTWARE_MOSAIC_FROM_TILES
return write_mosaic_exif(
jpeg_path,
scan_meta,
machine,
scan_id,
mmeta,
processing_software=proc,
)