""" Write EXIF metadata into downloaded mosaic JPEGs (piexif, no re-encode). """ import json import logging import re from pathlib import Path from typing import Any import piexif from spruce.paths import machine_dir_name from piexif import ExifIFD, GPSIFD, ImageIFD log = logging.getLogger(__name__) USER_COMMENT_ASCII = b"ASCII\x00\x00\x00" # ProcessingSoftware for mosaics stitched from tiles (distinct from server download path). DEFAULT_PROCESSING_SOFTWARE_MOSAIC_FROM_TILES = "spruce-scraper/1.0 mosaic_from_tiles" def _fmt_exif_datetime(scan_time: str) -> str: """`YYYY-MM-DD HH:MM:SS` -> `YYYY:MM:DD HH:MM:SS` for EXIF; empty on failure.""" m = re.match( r"^(\d{4})-(\d{2})-(\d{2})\s+(\d{2}):(\d{2}):(\d{2})$", scan_time.strip() ) if not m: return "" y, mo, d, h, mi, s = m.groups() return f"{y}:{mo}:{d} {h}:{mi}:{s}" def _fmt_dim(v: Any) -> str: if v is None: return "?" if isinstance(v, float) and v == int(v): return str(int(v)) return str(v) def _fmt_machine_meta_scalar(v: Any) -> str: if isinstance(v, float) and v == int(v): return str(int(v)) if isinstance(v, float): return format(v, "g") return str(v).strip() def _user_comment_treatment_suffix(machine_meta: dict[str, Any] | None) -> str: if not machine_meta: return "" parts: list[str] = [] pn = machine_meta.get("plot_number") if pn is not None and str(pn).strip() != "": parts.append(f"plot_number {_fmt_machine_meta_scalar(pn)}") if machine_meta.get("enclosure") is not None: enc = machine_meta["enclosure"] if isinstance(enc, bool): parts.append("enclosure yes" if enc else "enclosure no") else: parts.append(f"enclosure {_fmt_machine_meta_scalar(enc)}") if machine_meta.get("temp_treatment") is not None: tt = machine_meta["temp_treatment"] parts.append(f"temp_treatment {_fmt_machine_meta_scalar(tt)}") if machine_meta.get("co2_treatment") is not None: c = str(machine_meta["co2_treatment"]).strip().lower() parts.append(f"co2_treatment {c}") if not parts: return "" return " | " + " | ".join(parts) def _build_user_comment( scan_meta: dict[str, Any], machine: dict[str, Any], scan_id: int, machine_meta: dict[str, Any] | None = None, ) -> bytes: nx = scan_meta.get("nx", "?") ny = scan_meta.get("ny", "?") dx = _fmt_dim(scan_meta.get("dx")) dy = _fmt_dim(scan_meta.get("dy")) ex = _fmt_dim(scan_meta.get("end_x")) ey = _fmt_dim(scan_meta.get("end_y")) text = ( f"SPRUCE scan {scan_id} | machine {machine['label']} | " f"grid {nx}x{ny} @ {dx}x{dy}mm over {ex}x{ey}mm | " f"see metadata.json" ) text += _user_comment_treatment_suffix(machine_meta) return USER_COMMENT_ASCII + text.encode("ascii", errors="replace") def _co2_keyword(co2: str) -> str: c = (co2 or "").strip().lower() if c == "ambient": return "aCO2" if c == "elevated": return "eCO2" return c or "co2" def _enclosure_keyword(enclosure: Any) -> str | None: if enclosure is None: return None if isinstance(enclosure, bool): return "enclosed" if enclosure else "no enclosure" s = str(enclosure).strip().lower() if s in ("yes", "y", "true", "1"): return "enclosed" if s in ("no", "n", "false", "0"): return "no enclosure" return None def _temp_treatment_keyword(temp: Any) -> str | None: if temp is None or isinstance(temp, bool): return None if isinstance(temp, (int, float)): w = float(temp) s = f"{w:g}C" if w != int(w) else f"{int(w)}C" return f"temp +{s}" t = str(temp).strip() if not t: return None try: w = float(t) s = f"{w:g}C" if w != int(w) else f"{int(w)}C" return f"temp +{s}" except (TypeError, ValueError): return f"temp {t}" def _build_xp_keywords(machine_meta: dict[str, Any] | None) -> bytes | None: if not machine_meta: return None parts: list[str] = [] pn = machine_meta.get("plot_number") if pn is not None and str(pn).strip() != "": parts.append(f"plot {pn}") enc = _enclosure_keyword(machine_meta.get("enclosure")) if enc: parts.append(enc) if machine_meta.get("temp_treatment") is not None: tk = _temp_treatment_keyword(machine_meta["temp_treatment"]) if tk: parts.append(tk) if machine_meta.get("co2_treatment") is not None: parts.append(_co2_keyword(str(machine_meta["co2_treatment"]))) if not parts: return None text = "SPRUCE; " + "; ".join(parts) return text.encode("utf-16le") + b"\x00\x00" def _decimal_to_dms_rational(deg: float) -> list[tuple[tuple[int, int], ...]]: abs_deg = abs(deg) d = int(abs_deg) t_min = (abs_deg - d) * 60.0 m = int(t_min) s = (t_min - m) * 60.0 sec = round(s * 1_000_000) return [(d, 1), (m, 1), (sec, 1_000_000)] def write_mosaic_exif( jpeg_path: Path, scan_meta: dict[str, Any], machine: dict[str, Any], scan_id: int, machine_meta: dict[str, Any] | None, processing_software: str = "spruce-scraper/1.0", ) -> bool: """ Insert EXIF into a mosaic JPEG. Returns True on success. On failure, logs a warning and returns False. """ try: name = (scan_meta.get("name") or "").strip() desc = f"{machine['label']} scan {scan_id}" if name: desc = f"{desc} ({name})" desc_b = desc.encode("utf-8", errors="replace") make_b = b"RootView" ver = (machine.get("version") or "").strip() software_b = f"RootView {ver}".encode("utf-8") if ver else b"RootView" model_b = str(machine.get("label", "")).encode("utf-8", errors="replace") proc_b = processing_software.encode("utf-8", errors="replace") artist = (scan_meta.get("user") or "").strip() artist_b = artist.encode("utf-8", errors="replace") if artist else b"" scan_time = (scan_meta.get("scan_time") or "").strip() dt = _fmt_exif_datetime(scan_time) dt_b = dt.encode("ascii") if dt else b"" zeroth: dict[int, Any] = { ImageIFD.ImageDescription: desc_b, ImageIFD.Make: make_b, ImageIFD.Model: model_b, ImageIFD.Software: software_b, ImageIFD.ProcessingSoftware: proc_b, } if artist_b: zeroth[ImageIFD.Artist] = artist_b if dt_b: zeroth[ImageIFD.DateTime] = dt_b wp = _build_xp_keywords(machine_meta) if wp is not None: zeroth[ImageIFD.XPKeywords] = wp exif_ifd: dict[int, Any] = { ExifIFD.UserComment: _build_user_comment( scan_meta, machine, scan_id, machine_meta ), } if dt: bdt = dt.encode("ascii") exif_ifd[ExifIFD.DateTimeOriginal] = bdt exif_ifd[ExifIFD.DateTimeDigitized] = bdt exif_dict: dict[str, Any] = { "0th": zeroth, "Exif": exif_ifd, } lat_raw = None if not machine_meta else machine_meta.get("latitude_wgs_84") lon_raw = None if not machine_meta else machine_meta.get("longitude_wgs_84") if machine_meta and lat_raw is not None and lon_raw is not None: lat = float(lat_raw) lon = float(lon_raw) if not (-90 <= lat <= 90) or not (-180 <= lon <= 180): log.warning("Invalid lat/lon for EXIF GPS, skipping GPS: %s", jpeg_path) else: gps: dict[int, Any] = { GPSIFD.GPSVersionID: (2, 0, 0, 0), GPSIFD.GPSLatitudeRef: b"N" if lat >= 0 else b"S", GPSIFD.GPSLatitude: _decimal_to_dms_rational(abs(lat)), GPSIFD.GPSLongitudeRef: b"E" if lon >= 0 else b"W", GPSIFD.GPSLongitude: _decimal_to_dms_rational(abs(lon)), } if machine_meta.get("elevation_masl") is not None: alt = float(machine_meta["elevation_masl"]) alt_abs = abs(alt) gps[GPSIFD.GPSAltitudeRef] = 0 if alt >= 0 else 1 if alt_abs == int(alt_abs): gps[GPSIFD.GPSAltitude] = (int(alt_abs), 1) else: num = round(alt_abs * 1000) gps[GPSIFD.GPSAltitude] = (num, 1000) exif_dict["GPS"] = gps exif_bytes = piexif.dump(exif_dict) piexif.insert(exif_bytes, str(jpeg_path)) except Exception as exc: log.warning("EXIF write failed for %s: %s", jpeg_path, exc) return False return True def resolve_machine_label_for_scan_dir( scan_dir: Path, config: dict[str, Any], machine_label: str | None, ) -> str: """ Return the RootView machine label for EXIF. If ``machine_label`` is set, it is returned stripped. Otherwise the parent of ``scan_dir`` is walked (archive slug under ``machine_dir_name``) and matched uniquely against keys in ``config['machine_metadata']``. """ if machine_label is not None and str(machine_label).strip() != "": return str(machine_label).strip() slug = scan_dir.parent.parent.name meta = config.get("machine_metadata") or {} matches = [lbl for lbl in meta if machine_dir_name({"label": lbl}) == slug] if len(matches) == 1: return matches[0] if not matches: raise ValueError( f"Could not map archive folder {slug!r} to a machine label. " f"Add it under machine_metadata in config or pass machine_label explicitly." ) raise ValueError( f"Ambiguous archive folder {slug!r}: multiple machine_metadata keys " f"match ({matches!r}). Pass machine_label explicitly." ) def tag_mosaic_jpeg_for_scan_dir( scan_dir: Path, jpeg_path: Path, config: dict[str, Any], *, machine_label: str | None = None, processing_software: str | None = None, force: bool = False, ) -> bool: """ Write the same mosaic EXIF as the scraper, using ``metadata.json`` in ``scan_dir`` and optional ``machine_metadata`` from ``config``. Returns False if tagging was skipped (e.g. ``write_exif`` disabled and not ``force``). Raises ``ValueError`` for invalid paths or JPEG suffix. ``force=True`` tags even when ``config['write_exif']`` is false (e.g. stitch script ``--write-exif``). """ scan_dir = scan_dir.resolve() jpeg_path = jpeg_path.resolve() suffix = jpeg_path.suffix.lower() if suffix not in (".jpg", ".jpeg"): raise ValueError( f"EXIF tagging only supports JPEG files; got suffix {jpeg_path.suffix!r}: {jpeg_path}" ) if not force and not config.get("write_exif", True): log.warning( "Skipping EXIF: write_exif is false in config (use force=True to override)." ) return False meta_path = scan_dir / "metadata.json" if not meta_path.is_file(): raise ValueError(f"Missing metadata.json: {meta_path}") with meta_path.open(encoding="utf-8") as fh: scan_meta: dict[str, Any] = json.load(fh) scan_id = int(scan_meta["scan_id"]) if scan_dir.name != str(scan_id): raise ValueError( f"scan_dir must be the scan id folder (got name {scan_dir.name!r}, " f"metadata scan_id={scan_id}). Expected …//{scan_id}/" ) label = resolve_machine_label_for_scan_dir(scan_dir, config, machine_label) machine: dict[str, Any] = {"label": label, "version": ""} mmeta = (config.get("machine_metadata") or {}).get(label) proc = processing_software or DEFAULT_PROCESSING_SOFTWARE_MOSAIC_FROM_TILES return write_mosaic_exif( jpeg_path, scan_meta, machine, scan_id, mmeta, processing_software=proc, )