Files
remove-ai-watermarks/src/remove_ai_watermarks/trustmark_detector.py
T
Victor Kuznetsov 5d0e6c3a65 fix: harden metadata parsers and engines; sync docs (full-repo review)
Apply fixes from a full-repo review (code, tests, docs).

Security / correctness:
- Clamp attacker-controlled PNG/caBX chunk lengths to the remaining file
  size in metadata.py and noai/c2pa.py (a malformed length no longer drives
  a multi-GB read); skipped chunks seek instead of read.
- noai/isobmff.strip_c2pa_boxes is now fail-safe on a malformed box: return
  the original bytes with a warning instead of silently truncating the tail,
  so metadata --remove can no longer emit a corrupt file.
- doubao_engine._fixed_alpha_map clamps the glyph box to the image (no crash
  on degenerate width-vs-height).
- watermark_remover._run_region_hires gates the phaseCorrelate offset on
  response and magnitude (a spurious shift no longer garbles text) and drops
  the generator after a CPU fallback (no MPS/CPU device mismatch).

Robustness:
- gemini_engine, doubao_engine, region_eraser normalize grayscale and RGBA
  inputs to BGR at the engine entry points.
- image_io.imwrite returns False on an unwritable path (matches cv2).
- invisible_engine guards a None imread result before use.
- trustmark_detector._decoder uses a double-checked threading lock.
- ctrlregen.tiling.tile_positions raises on overlap >= tile.
- humanizer chromatic shift no longer wraps opposite-edge pixels.
- identify OpenAI caveat keyed on the normalized vendor, not a substring.
- Remove the dead "visible --detect-threshold" CLI option.
- publish.yml verifies the release tag matches the package version.

Docs:
- README strength 0.05 to 0.10; .env.example HF_TOKEN marked optional;
  doubao_capture README updated to reverse-alpha-only; CLAUDE.md synced with
  the new behaviors and the batch command.

Tests: new test_security_clamp.py for the read clamp and isobmff fail-safe;
erase CLI coverage; integrity-clash rule 2 end-to-end; multi-tag EXIF
survival and cross-format strip guards; channel/size, tiling, humanizer, and
imwrite regressions. Full suite 493 passed, 2 skipped; ruff and pyright src/
clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-05-30 18:00:39 -07:00

119 lines
4.7 KiB
Python

"""Detect Adobe TrustMark invisible watermarks.
TrustMark (github.com/adobe/trustmark, MIT) is the open, keyless image watermark
behind Adobe "Durable Content Credentials": when a C2PA manifest is stripped, a
TrustMark soft binding can still re-link the asset to its manifest in a
repository. Unlike SynthID it has a PUBLIC decoder with no secret key, so a
TrustMark-stamped image can be identified locally. Adobe's shipping products use
Variant P (the ``com.adobe.trustmark.P`` soft-binding ``alg``); this wrapper
loads that model.
Optional dependency (extra: ``trustmark``); the model weights download on first
use. ``detect_trustmark`` returns None when the package is absent. This detects
provenance (Adobe Content Credentials), NOT AI generation as such -- TrustMark
also marks human-authored content -- so callers should treat it as a watermark
signal, not proof of AI origin.
"""
# trustmark ships no type stubs; relax untyped-library diagnostics for this thin
# wrapper module only.
# pyright: reportMissingTypeStubs=false, reportUnknownMemberType=false, reportUnknownVariableType=false, reportUnknownArgumentType=false, reportMissingImports=false
from __future__ import annotations
import logging
import threading
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from pathlib import Path
log = logging.getLogger(__name__)
# Adobe ships Variant P in production (com.adobe.trustmark.P).
_MODEL_TYPE = "P"
# Lazily constructed singleton -- model load + first-use download is expensive.
# Guarded by a lock so concurrent callers don't double-construct/double-download.
_tm: Any = None
_tm_lock = threading.Lock()
def is_available() -> bool:
"""True if the optional ``trustmark`` package is installed."""
import importlib.util
return importlib.util.find_spec("trustmark") is not None
def _decoder() -> Any:
global _tm
if _tm is None:
with _tm_lock:
if _tm is None:
from trustmark import TrustMark
_tm = TrustMark(verbose=False, model_type=_MODEL_TYPE)
return _tm
# JPEG quality for the false-positive durability gate (see detect_trustmark).
# Deliberately mild: a genuine TrustMark survives far harsher, while every
# observed false positive collapsed even at this quality.
_REENCODE_QUALITY = 95
def detect_trustmark(image_path: Path) -> str | None:
"""Return a TrustMark scheme note if a *durable* TrustMark watermark is
decoded, else None.
Returns e.g. ``"Adobe TrustMark (variant P, schema 0)"`` when the decoder
reports the watermark present AND it survives a mild JPEG re-encode, or None
if it is absent, the optional ``trustmark`` package is not installed, or the
image cannot be read/decoded.
**False-positive gate.** TrustMark's ``wm_present`` flag is a BCH
error-correction validity check, which spuriously validates on a small
fraction of un-watermarked images -- content-correlated, so AI-generated
textures trip it more often than camera photos (verified 2026-05-29 on real
files: the false "detections" were on Gemini / OpenAI / Doubao output that
cannot carry Adobe's watermark, and decoded a random-bytes secret). A genuine
TrustMark is a *durable* soft binding engineered to survive re-encoding (that
is its entire purpose once C2PA is stripped), so we re-decode after a mild
JPEG round-trip and require the same schema both times. Every observed false
positive collapsed under this gate.
"""
if not is_available():
return None
try:
from PIL import Image
with Image.open(image_path) as img:
cover = img.convert("RGB")
decoder = _decoder()
_wm_secret, wm_present, wm_schema = decoder.decode(cover)
if not wm_present:
return None
if not _survives_reencode(decoder, cover, wm_schema):
log.debug("TrustMark decode for %s did not survive re-encode; treating as false positive", image_path)
return None
except Exception as exc: # model download / decode failure / unreadable image
log.debug("TrustMark decode failed for %s: %s", image_path, exc)
return None
return f"Adobe TrustMark (variant {_MODEL_TYPE}, schema {wm_schema})"
def _survives_reencode(decoder: Any, cover: Any, schema: int) -> bool:
"""True if the watermark re-decodes with the same schema after a mild JPEG
round-trip -- the durability a genuine TrustMark guarantees, which a BCH
false positive (content noise) does not."""
import io
from PIL import Image
buffer = io.BytesIO()
cover.save(buffer, "JPEG", quality=_REENCODE_QUALITY)
buffer.seek(0)
with Image.open(buffer) as reencoded:
_secret, present, reencoded_schema = decoder.decode(reencoded.convert("RGB"))
return bool(present) and reencoded_schema == schema