08a29d124a
- spruce.exif: tag_mosaic_jpeg_for_scan_dir, resolve_machine_label_for_scan_dir; ProcessingSoftware for tile-stitched mosaics - spruce.settings: load_config(require_credentials=False) for config without login - scripts/tag_mosaic_exif.py and tests; stitch script --write-exif path
350 lines
12 KiB
Python
350 lines
12 KiB
Python
"""
|
|
Write EXIF metadata into downloaded mosaic JPEGs (piexif, no re-encode).
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import piexif
|
|
|
|
from spruce.paths import machine_dir_name
|
|
from piexif import ExifIFD, GPSIFD, ImageIFD
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
USER_COMMENT_ASCII = b"ASCII\x00\x00\x00"
|
|
|
|
# ProcessingSoftware for mosaics stitched from tiles (distinct from server download path).
|
|
DEFAULT_PROCESSING_SOFTWARE_MOSAIC_FROM_TILES = "spruce-scraper/1.0 mosaic_from_tiles"
|
|
|
|
|
|
def _fmt_exif_datetime(scan_time: str) -> str:
|
|
"""`YYYY-MM-DD HH:MM:SS` -> `YYYY:MM:DD HH:MM:SS` for EXIF; empty on failure."""
|
|
m = re.match(
|
|
r"^(\d{4})-(\d{2})-(\d{2})\s+(\d{2}):(\d{2}):(\d{2})$", scan_time.strip()
|
|
)
|
|
if not m:
|
|
return ""
|
|
y, mo, d, h, mi, s = m.groups()
|
|
return f"{y}:{mo}:{d} {h}:{mi}:{s}"
|
|
|
|
|
|
def _fmt_dim(v: Any) -> str:
|
|
if v is None:
|
|
return "?"
|
|
if isinstance(v, float) and v == int(v):
|
|
return str(int(v))
|
|
return str(v)
|
|
|
|
|
|
def _fmt_machine_meta_scalar(v: Any) -> str:
|
|
if isinstance(v, float) and v == int(v):
|
|
return str(int(v))
|
|
if isinstance(v, float):
|
|
return format(v, "g")
|
|
return str(v).strip()
|
|
|
|
|
|
def _user_comment_treatment_suffix(machine_meta: dict[str, Any] | None) -> str:
|
|
if not machine_meta:
|
|
return ""
|
|
parts: list[str] = []
|
|
pn = machine_meta.get("plot_number")
|
|
if pn is not None and str(pn).strip() != "":
|
|
parts.append(f"plot_number {_fmt_machine_meta_scalar(pn)}")
|
|
if machine_meta.get("enclosure") is not None:
|
|
enc = machine_meta["enclosure"]
|
|
if isinstance(enc, bool):
|
|
parts.append("enclosure yes" if enc else "enclosure no")
|
|
else:
|
|
parts.append(f"enclosure {_fmt_machine_meta_scalar(enc)}")
|
|
if machine_meta.get("temp_treatment") is not None:
|
|
tt = machine_meta["temp_treatment"]
|
|
parts.append(f"temp_treatment {_fmt_machine_meta_scalar(tt)}")
|
|
if machine_meta.get("co2_treatment") is not None:
|
|
c = str(machine_meta["co2_treatment"]).strip().lower()
|
|
parts.append(f"co2_treatment {c}")
|
|
if not parts:
|
|
return ""
|
|
return " | " + " | ".join(parts)
|
|
|
|
|
|
def _build_user_comment(
|
|
scan_meta: dict[str, Any],
|
|
machine: dict[str, Any],
|
|
scan_id: int,
|
|
machine_meta: dict[str, Any] | None = None,
|
|
) -> bytes:
|
|
nx = scan_meta.get("nx", "?")
|
|
ny = scan_meta.get("ny", "?")
|
|
dx = _fmt_dim(scan_meta.get("dx"))
|
|
dy = _fmt_dim(scan_meta.get("dy"))
|
|
ex = _fmt_dim(scan_meta.get("end_x"))
|
|
ey = _fmt_dim(scan_meta.get("end_y"))
|
|
text = (
|
|
f"SPRUCE scan {scan_id} | machine {machine['label']} | "
|
|
f"grid {nx}x{ny} @ {dx}x{dy}mm over {ex}x{ey}mm | "
|
|
f"see metadata.json"
|
|
)
|
|
text += _user_comment_treatment_suffix(machine_meta)
|
|
return USER_COMMENT_ASCII + text.encode("ascii", errors="replace")
|
|
|
|
|
|
def _co2_keyword(co2: str) -> str:
|
|
c = (co2 or "").strip().lower()
|
|
if c == "ambient":
|
|
return "aCO2"
|
|
if c == "elevated":
|
|
return "eCO2"
|
|
return c or "co2"
|
|
|
|
|
|
def _enclosure_keyword(enclosure: Any) -> str | None:
|
|
if enclosure is None:
|
|
return None
|
|
if isinstance(enclosure, bool):
|
|
return "enclosed" if enclosure else "no enclosure"
|
|
s = str(enclosure).strip().lower()
|
|
if s in ("yes", "y", "true", "1"):
|
|
return "enclosed"
|
|
if s in ("no", "n", "false", "0"):
|
|
return "no enclosure"
|
|
return None
|
|
|
|
|
|
def _temp_treatment_keyword(temp: Any) -> str | None:
|
|
if temp is None or isinstance(temp, bool):
|
|
return None
|
|
if isinstance(temp, (int, float)):
|
|
w = float(temp)
|
|
s = f"{w:g}C" if w != int(w) else f"{int(w)}C"
|
|
return f"temp +{s}"
|
|
t = str(temp).strip()
|
|
if not t:
|
|
return None
|
|
try:
|
|
w = float(t)
|
|
s = f"{w:g}C" if w != int(w) else f"{int(w)}C"
|
|
return f"temp +{s}"
|
|
except (TypeError, ValueError):
|
|
return f"temp {t}"
|
|
|
|
|
|
def _build_xp_keywords(machine_meta: dict[str, Any] | None) -> bytes | None:
|
|
if not machine_meta:
|
|
return None
|
|
parts: list[str] = []
|
|
pn = machine_meta.get("plot_number")
|
|
if pn is not None and str(pn).strip() != "":
|
|
parts.append(f"plot {pn}")
|
|
enc = _enclosure_keyword(machine_meta.get("enclosure"))
|
|
if enc:
|
|
parts.append(enc)
|
|
if machine_meta.get("temp_treatment") is not None:
|
|
tk = _temp_treatment_keyword(machine_meta["temp_treatment"])
|
|
if tk:
|
|
parts.append(tk)
|
|
if machine_meta.get("co2_treatment") is not None:
|
|
parts.append(_co2_keyword(str(machine_meta["co2_treatment"])))
|
|
if not parts:
|
|
return None
|
|
text = "SPRUCE; " + "; ".join(parts)
|
|
return text.encode("utf-16le") + b"\x00\x00"
|
|
|
|
def _decimal_to_dms_rational(deg: float) -> list[tuple[tuple[int, int], ...]]:
|
|
abs_deg = abs(deg)
|
|
d = int(abs_deg)
|
|
t_min = (abs_deg - d) * 60.0
|
|
m = int(t_min)
|
|
s = (t_min - m) * 60.0
|
|
sec = round(s * 1_000_000)
|
|
return [(d, 1), (m, 1), (sec, 1_000_000)]
|
|
|
|
|
|
def write_mosaic_exif(
|
|
jpeg_path: Path,
|
|
scan_meta: dict[str, Any],
|
|
machine: dict[str, Any],
|
|
scan_id: int,
|
|
machine_meta: dict[str, Any] | None,
|
|
processing_software: str = "spruce-scraper/1.0",
|
|
) -> bool:
|
|
"""
|
|
Insert EXIF into a mosaic JPEG. Returns True on success.
|
|
On failure, logs a warning and returns False.
|
|
"""
|
|
try:
|
|
name = (scan_meta.get("name") or "").strip()
|
|
desc = f"{machine['label']} scan {scan_id}"
|
|
if name:
|
|
desc = f"{desc} ({name})"
|
|
desc_b = desc.encode("utf-8", errors="replace")
|
|
|
|
make_b = b"RootView"
|
|
ver = (machine.get("version") or "").strip()
|
|
software_b = f"RootView {ver}".encode("utf-8") if ver else b"RootView"
|
|
model_b = str(machine.get("label", "")).encode("utf-8", errors="replace")
|
|
proc_b = processing_software.encode("utf-8", errors="replace")
|
|
artist = (scan_meta.get("user") or "").strip()
|
|
artist_b = artist.encode("utf-8", errors="replace") if artist else b""
|
|
|
|
scan_time = (scan_meta.get("scan_time") or "").strip()
|
|
dt = _fmt_exif_datetime(scan_time)
|
|
dt_b = dt.encode("ascii") if dt else b""
|
|
|
|
zeroth: dict[int, Any] = {
|
|
ImageIFD.ImageDescription: desc_b,
|
|
ImageIFD.Make: make_b,
|
|
ImageIFD.Model: model_b,
|
|
ImageIFD.Software: software_b,
|
|
ImageIFD.ProcessingSoftware: proc_b,
|
|
}
|
|
if artist_b:
|
|
zeroth[ImageIFD.Artist] = artist_b
|
|
if dt_b:
|
|
zeroth[ImageIFD.DateTime] = dt_b
|
|
|
|
wp = _build_xp_keywords(machine_meta)
|
|
if wp is not None:
|
|
zeroth[ImageIFD.XPKeywords] = wp
|
|
|
|
exif_ifd: dict[int, Any] = {
|
|
ExifIFD.UserComment: _build_user_comment(
|
|
scan_meta, machine, scan_id, machine_meta
|
|
),
|
|
}
|
|
if dt:
|
|
bdt = dt.encode("ascii")
|
|
exif_ifd[ExifIFD.DateTimeOriginal] = bdt
|
|
exif_ifd[ExifIFD.DateTimeDigitized] = bdt
|
|
|
|
exif_dict: dict[str, Any] = {
|
|
"0th": zeroth,
|
|
"Exif": exif_ifd,
|
|
}
|
|
|
|
lat_raw = None if not machine_meta else machine_meta.get("latitude_wgs_84")
|
|
lon_raw = None if not machine_meta else machine_meta.get("longitude_wgs_84")
|
|
if machine_meta and lat_raw is not None and lon_raw is not None:
|
|
lat = float(lat_raw)
|
|
lon = float(lon_raw)
|
|
if not (-90 <= lat <= 90) or not (-180 <= lon <= 180):
|
|
log.warning("Invalid lat/lon for EXIF GPS, skipping GPS: %s", jpeg_path)
|
|
else:
|
|
gps: dict[int, Any] = {
|
|
GPSIFD.GPSVersionID: (2, 0, 0, 0),
|
|
GPSIFD.GPSLatitudeRef: b"N" if lat >= 0 else b"S",
|
|
GPSIFD.GPSLatitude: _decimal_to_dms_rational(abs(lat)),
|
|
GPSIFD.GPSLongitudeRef: b"E" if lon >= 0 else b"W",
|
|
GPSIFD.GPSLongitude: _decimal_to_dms_rational(abs(lon)),
|
|
}
|
|
if machine_meta.get("elevation_masl") is not None:
|
|
alt = float(machine_meta["elevation_masl"])
|
|
alt_abs = abs(alt)
|
|
gps[GPSIFD.GPSAltitudeRef] = 0 if alt >= 0 else 1
|
|
if alt_abs == int(alt_abs):
|
|
gps[GPSIFD.GPSAltitude] = (int(alt_abs), 1)
|
|
else:
|
|
num = round(alt_abs * 1000)
|
|
gps[GPSIFD.GPSAltitude] = (num, 1000)
|
|
exif_dict["GPS"] = gps
|
|
|
|
exif_bytes = piexif.dump(exif_dict)
|
|
piexif.insert(exif_bytes, str(jpeg_path))
|
|
except Exception as exc:
|
|
log.warning("EXIF write failed for %s: %s", jpeg_path, exc)
|
|
return False
|
|
return True
|
|
|
|
|
|
def resolve_machine_label_for_scan_dir(
|
|
scan_dir: Path,
|
|
config: dict[str, Any],
|
|
machine_label: str | None,
|
|
) -> str:
|
|
"""
|
|
Return the RootView machine label for EXIF.
|
|
|
|
If ``machine_label`` is set, it is returned stripped. Otherwise the parent
|
|
of ``scan_dir`` is walked (archive slug under ``machine_dir_name``) and
|
|
matched uniquely against keys in ``config['machine_metadata']``.
|
|
"""
|
|
if machine_label is not None and str(machine_label).strip() != "":
|
|
return str(machine_label).strip()
|
|
slug = scan_dir.parent.parent.name
|
|
meta = config.get("machine_metadata") or {}
|
|
matches = [lbl for lbl in meta if machine_dir_name({"label": lbl}) == slug]
|
|
if len(matches) == 1:
|
|
return matches[0]
|
|
if not matches:
|
|
raise ValueError(
|
|
f"Could not map archive folder {slug!r} to a machine label. "
|
|
f"Add it under machine_metadata in config or pass machine_label explicitly."
|
|
)
|
|
raise ValueError(
|
|
f"Ambiguous archive folder {slug!r}: multiple machine_metadata keys "
|
|
f"match ({matches!r}). Pass machine_label explicitly."
|
|
)
|
|
|
|
|
|
def tag_mosaic_jpeg_for_scan_dir(
|
|
scan_dir: Path,
|
|
jpeg_path: Path,
|
|
config: dict[str, Any],
|
|
*,
|
|
machine_label: str | None = None,
|
|
processing_software: str | None = None,
|
|
force: bool = False,
|
|
) -> bool:
|
|
"""
|
|
Write the same mosaic EXIF as the scraper, using ``metadata.json`` in
|
|
``scan_dir`` and optional ``machine_metadata`` from ``config``.
|
|
|
|
Returns False if tagging was skipped (e.g. ``write_exif`` disabled and not
|
|
``force``). Raises ``ValueError`` for invalid paths or JPEG suffix.
|
|
|
|
``force=True`` tags even when ``config['write_exif']`` is false (e.g. stitch
|
|
script ``--write-exif``).
|
|
"""
|
|
scan_dir = scan_dir.resolve()
|
|
jpeg_path = jpeg_path.resolve()
|
|
suffix = jpeg_path.suffix.lower()
|
|
if suffix not in (".jpg", ".jpeg"):
|
|
raise ValueError(
|
|
f"EXIF tagging only supports JPEG files; got suffix {jpeg_path.suffix!r}: {jpeg_path}"
|
|
)
|
|
|
|
if not force and not config.get("write_exif", True):
|
|
log.warning(
|
|
"Skipping EXIF: write_exif is false in config (use force=True to override)."
|
|
)
|
|
return False
|
|
|
|
meta_path = scan_dir / "metadata.json"
|
|
if not meta_path.is_file():
|
|
raise ValueError(f"Missing metadata.json: {meta_path}")
|
|
with meta_path.open(encoding="utf-8") as fh:
|
|
scan_meta: dict[str, Any] = json.load(fh)
|
|
scan_id = int(scan_meta["scan_id"])
|
|
if scan_dir.name != str(scan_id):
|
|
raise ValueError(
|
|
f"scan_dir must be the scan id folder (got name {scan_dir.name!r}, "
|
|
f"metadata scan_id={scan_id}). Expected …/<date>/{scan_id}/"
|
|
)
|
|
|
|
label = resolve_machine_label_for_scan_dir(scan_dir, config, machine_label)
|
|
machine: dict[str, Any] = {"label": label, "version": ""}
|
|
mmeta = (config.get("machine_metadata") or {}).get(label)
|
|
proc = processing_software or DEFAULT_PROCESSING_SOFTWARE_MOSAIC_FROM_TILES
|
|
return write_mosaic_exif(
|
|
jpeg_path,
|
|
scan_meta,
|
|
machine,
|
|
scan_id,
|
|
mmeta,
|
|
processing_software=proc,
|
|
)
|