diff --git a/CLAUDE.md b/CLAUDE.md index 5c94ca7..e3f4a9d 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -5,6 +5,7 @@ You are a **principal Python engineer** maintaining a CLI tool and library for r ## How to run - `uv run remove-ai-watermarks all -o ` +- `uv run remove-ai-watermarks identify ` — provenance verdict (platform + watermark inventory + confidence); `--json` for machine output, `--no-visible` to skip the cv2 sparkle detector - `uv run remove-ai-watermarks metadata --check` — inspect AI metadata (C2PA, EXIF, PNG chunks) - `uv run remove-ai-watermarks metadata --remove -o ` — strip all AI metadata @@ -25,7 +26,9 @@ You are a **principal Python engineer** maintaining a CLI tool and library for r - `noai/c2pa.py` — PNG chunk parser; use `extract_c2pa_chunk(path)` to get raw caBX payload, `has_c2pa_metadata(path)` to detect. Do not reimplement chunk parsing. `extract_c2pa_info(path)` sets `synthid_watermark`/`synthid_vendors` when the manifest is signed by a SynthID-using vendor. - `noai/constants.py` — PNG_SIGNATURE, C2PA_CHUNK_TYPE, C2PA_SIGNATURES, C2PA_ISSUERS, and `SYNTHID_C2PA_ISSUERS` (issuers that pair SynthID with C2PA: Google, OpenAI). Add a new issuer here, not inline. -- `metadata.py` — `synthid_source(path)` returns the vendor name(s) if the C2PA manifest implies a SynthID pixel watermark, else None. Format-agnostic: PNG via the caBX parser, JPEG/WebP/AVIF/HEIF/JXL via a binary scan (C2PA marker + SynthID issuer + AI-source marker). `get_ai_metadata` surfaces the verdict, and `metadata --check` prints it as a callout. +- `metadata.py` — `synthid_source(path)` returns the vendor name(s) if the C2PA manifest implies a SynthID pixel watermark, else None. Format-agnostic: PNG via the caBX parser, JPEG/WebP/AVIF/HEIF/JXL via a binary scan (C2PA marker + SynthID issuer + AI-source marker). `get_ai_metadata` surfaces the verdict, and `metadata --check` prints it as a callout. Both `get_ai_metadata` and `has_ai_metadata` guard the PIL open with `except Exception` (HEIC/unknown formats raise non-OSError) and fall through to the binary scan. +- `identify.py` — `identify(path)` aggregates every locally-readable signal (C2PA issuer→platform, IPTC "Made with AI", embedded SD/ComfyUI params, SynthID proxy, visible Gemini sparkle) into one `ProvenanceReport`. `is_ai_generated` is True or None (never asserted False — stripped metadata is not proof of clean origin). Visible-sparkle is promoted only at confidence ≥ `_SPARKLE_THRESHOLD` (0.5; corpus-tuned to separate Gemini sparkles ≥0.56 from non-sparkle ≤0.49). The cv2 dependency lives in `gemini_engine.detect_sparkle_confidence`, not here. Add platform mappings to `_ISSUER_PLATFORM`, not inline. For non-PNG containers (JPEG/WebP/AVIF/HEIF/JXL) the caBX parser returns nothing, so issuer (`_issuers_in`) and generator (`_ai_tools_in`, reusing `C2PA_AI_TOOLS`) are recovered by binary-scanning the first MB; EXIF/XMP *fields* inside ISOBMFF are still not parsed (no positive fixtures to validate against). +- `gemini_engine.py` — visible Gemini-sparkle remover/detector (cv2/numpy, no GPU). `detect_sparkle_confidence(path)` is the file-level entry point used by `identify.py`. - `face_protector.py` — YOLO detect + soft-blend pattern; mirror this for any "protect region during diffusion" features ## Known limitations diff --git a/README.md b/README.md index 2f5a083..3676322 100644 --- a/README.md +++ b/README.md @@ -180,6 +180,13 @@ remove-ai-watermarks batch ./images/ --mode all #### Individual commands ```bash +# Identify provenance: where an image was made + its watermark inventory. +# Aggregates C2PA, IPTC "Made with AI", embedded SD/ComfyUI params, the +# SynthID proxy, and the visible Gemini sparkle into one verdict. Reports +# "unknown" (never "clean") when no signal is found, since stripped metadata +# is not proof of a clean origin. Add --json for machine-readable output. +remove-ai-watermarks identify image.png + # Visible watermark only (Gemini / Nano Banana sparkle) — fast, offline remove-ai-watermarks visible image.png -o clean.png diff --git a/src/remove_ai_watermarks/cli.py b/src/remove_ai_watermarks/cli.py index f3c6811..04d10ec 100644 --- a/src/remove_ai_watermarks/cli.py +++ b/src/remove_ai_watermarks/cli.py @@ -8,6 +8,7 @@ Provides commands for: from __future__ import annotations +import json import logging import time from pathlib import Path @@ -329,6 +330,55 @@ def cmd_metadata( console.print(f" [green]✓[/] AI metadata stripped → {out}") +# ── Provenance identification ─────────────────────────────────────── + + +@main.command("identify") +@click.argument("source", type=click.Path(exists=True, path_type=Path)) +@click.option("--no-visible", is_flag=True, help="Skip the visible-sparkle detector (metadata-only, no cv2).") +@click.option("--json", "as_json", is_flag=True, help="Emit the report as JSON instead of a table.") +@click.pass_context +def cmd_identify(ctx: click.Context, source: Path, no_visible: bool, as_json: bool) -> None: + """Identify where an image was made and what watermarks it carries. + + Aggregates C2PA Content Credentials, IPTC "Made with AI" tags, embedded + generation parameters, the SynthID metadata proxy, and the visible Gemini + sparkle into a single provenance verdict. Absence of signals is reported as + "unknown", never as "clean" (stripped metadata leaves no local proof). + """ + from dataclasses import asdict + + from remove_ai_watermarks.identify import identify + + source = _validate_image(source) + report = identify(source, check_visible=not no_visible) + + if as_json: + click.echo(json.dumps(asdict(report), default=str, indent=2)) + return + + _banner() + verdict = {True: "[yellow]AI-generated[/]", False: "[green]not AI[/]", None: "[dim]unknown[/]"}[ + report.is_ai_generated + ] + console.print(f"\n Verdict: {verdict} [dim](confidence: {report.confidence})[/]") + console.print(f" Platform: {report.platform or '[dim]undetermined[/]'}") + + if report.watermarks: + table = Table(show_header=True, header_style="bold", title="Watermarks / provenance markers") + table.add_column("Marker", style="cyan") + for wm in report.watermarks: + table.add_row(wm) + console.print(table) + else: + console.print(" [dim]No watermarks or provenance markers found.[/]") + + if report.caveats: + console.print("\n [dim]Caveats:[/]") + for c in report.caveats: + console.print(f" [dim]- {c}[/]") + + # ── Combined "all" mode ────────────────────────────────────────────── diff --git a/src/remove_ai_watermarks/gemini_engine.py b/src/remove_ai_watermarks/gemini_engine.py index fafb9ce..cdf2d9a 100644 --- a/src/remove_ai_watermarks/gemini_engine.py +++ b/src/remove_ai_watermarks/gemini_engine.py @@ -546,3 +546,17 @@ class GeminiEngine: result[py1:py2, px1:px2] = blended.astype(np.uint8) return result + + +def detect_sparkle_confidence(image_path: Path) -> float | None: + """Visible-sparkle detection confidence for a file, for provenance use. + + Loads the image with cv2 and runs :meth:`GeminiEngine.detect_watermark`. + Returns the NCC confidence in [0, 1], or None if the image cannot be read + (cv2 returns None for unsupported containers such as HEIC). Kept here so the + cv2 dependency stays in this module; callers apply their own threshold. + """ + img = cv2.imread(str(image_path)) + if img is None: + return None + return float(GeminiEngine().detect_watermark(img).confidence) diff --git a/src/remove_ai_watermarks/identify.py b/src/remove_ai_watermarks/identify.py new file mode 100644 index 0000000..4138898 --- /dev/null +++ b/src/remove_ai_watermarks/identify.py @@ -0,0 +1,243 @@ +"""Image provenance: identify where an image was made and what watermarks it carries. + +Aggregates every locally-readable signal into a single :class:`ProvenanceReport`: + +- **C2PA Content Credentials** (issuer, claim generator, digital source type) -> + the signing platform (OpenAI, Google, Adobe, Microsoft). +- **IPTC ``digitalSourceType``** "Made with AI" marker (Meta, X, others). +- **PNG text / EXIF generation parameters** (Stable Diffusion, ComfyUI, InvokeAI). +- **SynthID metadata proxy** -- a C2PA companion from a SynthID-using vendor + (Google / OpenAI) implies the invisible pixel watermark. +- **Visible Gemini sparkle** (optional; needs cv2/numpy, no GPU). + +Hard limit: a stripped image (re-encoded, screenshotted, social-media upload) +loses all metadata, and the SynthID *pixel* watermark is not locally decodable +(proprietary decoder). Absence of signals is therefore reported as ``Unknown``, +never as "clean". See CLAUDE.md "SynthID detection is metadata-only". +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from typing import TYPE_CHECKING + +from remove_ai_watermarks.metadata import ( + AI_METADATA_KEYS, + C2PA_UUID, + IPTC_AI_MARKERS, + get_ai_metadata, +) +from remove_ai_watermarks.noai.c2pa import extract_c2pa_info +from remove_ai_watermarks.noai.constants import C2PA_AI_TOOLS, C2PA_ISSUERS + +if TYPE_CHECKING: + from pathlib import Path + +log = logging.getLogger(__name__) + +# How much of a non-PNG container to binary-scan for the C2PA issuer. +_SCAN_BYTES = 1024 * 1024 + +# Visible-sparkle confidence above which the signal is trusted as provenance. +# Stricter than the removal default (0.25): on the corpus, Gemini-family +# sparkles score >= 0.56 while non-sparkle images top out at 0.49, so 0.5 +# cleanly separates them and avoids false positives when sparkle is the only +# signal (e.g. an OpenAI image scored 0.37 -- below threshold, correctly dropped). +_SPARKLE_THRESHOLD = 0.5 + +# Issuer (C2PA signer) -> human-readable generating platform. Ordered: when a +# manifest names several issuers (Microsoft Designer signs as "OpenAI, +# Microsoft"), the first match wins so the product, not the backend, is named. +_ISSUER_PLATFORM: tuple[tuple[str, str], ...] = ( + ("Microsoft", "Microsoft Designer (DALL-E / OpenAI backend)"), + ("Adobe", "Adobe Firefly"), + ("OpenAI", "OpenAI (ChatGPT / gpt-image / DALL-E / Sora)"), + ("Google", "Google (Gemini / Imagen)"), +) + +# PNG-text / EXIF keys that indicate a local diffusion pipeline (vs. a hosted +# platform's C2PA). Subset of AI_METADATA_KEYS; excludes the C2PA/Software keys. +_LOCAL_GEN_KEYS = frozenset( + AI_METADATA_KEYS & {"parameters", "prompt", "negative_prompt", "workflow", "comfyui", "invokeai_metadata", "dream"} +) + +_STRIP_CAVEAT = ( + "Absence of metadata is not proof the image is clean: C2PA, EXIF, and PNG " + "text chunks are stripped by re-encoding, screenshots, or social-media upload." +) +_SYNTHID_CAVEAT = ( + "SynthID is a metadata proxy here; the pixel watermark is not locally " + "verifiable (proprietary decoder). Confirm via the Gemini app or openai.com/verify." +) +_OPENAI_CAVEAT = ( + "OpenAI began pairing SynthID with C2PA around 2026-05; OpenAI images from " + "before the rollout carry C2PA without SynthID, so the SynthID verdict is 'likely'." +) +_IPTC_ONLY_CAVEAT = "The IPTC 'Made with AI' tag flags AI provenance but does not identify the specific platform." + + +@dataclass +class Signal: + """A single provenance signal that was found (or affirmatively absent).""" + + name: str + detail: str + confidence: str # "high" | "medium" + + +@dataclass +class ProvenanceReport: + """Aggregated provenance verdict for one image.""" + + path: Path + is_ai_generated: bool | None # True / False is never asserted; None = unknown + platform: str | None + confidence: str # "high" | "medium" | "none" + watermarks: list[str] = field(default_factory=list[str]) + signals: list[Signal] = field(default_factory=list["Signal"]) + caveats: list[str] = field(default_factory=list[str]) + + +def _issuers_in(data: bytes) -> list[str]: + """C2PA issuer names whose signature byte appears in ``data`` (binary scan).""" + return sorted({name for sig, name in C2PA_ISSUERS.items() if sig in data}) + + +def _ai_tools_in(data: bytes) -> list[str]: + """Known C2PA AI-tool / generator names appearing in ``data`` (binary scan). + + PNG has a structured claim_generator; for JPEG/WebP/AVIF/HEIF/JXL the + generator lives in a JUMBF/EXIF/XMP blob the PNG parser can't reach, so a + byte scan recovers the same attribution (e.g. "Imagen", "DALL-E"). + """ + return sorted({name for sig, name in C2PA_AI_TOOLS.items() if sig in data}) + + +def _attribute_platform(issuers: list[str]) -> str | None: + """Map a set of C2PA issuer names to a human-readable generating platform.""" + joined = " ".join(issuers) + for needle, platform in _ISSUER_PLATFORM: + if needle in joined: + return platform + if issuers: # e.g. Truepic alone -- a signing authority, not a generator + return f"C2PA signer: {', '.join(issuers)} (no known AI generator named)" + return None + + +def _visible_sparkle(image_path: Path) -> float | None: + """Visible Gemini-sparkle confidence in [0, 1], or None if unavailable. + + Optional: needs cv2/numpy (no GPU). The cv2 work lives in gemini_engine so + this module stays dependency-light; returns None if cv2 or the engine + assets are missing, or the image can't be read. + """ + try: + from remove_ai_watermarks.gemini_engine import detect_sparkle_confidence + except Exception as exc: # cv2/engine assets missing + log.debug("visible-sparkle detector unavailable: %s", exc) + return None + return detect_sparkle_confidence(image_path) + + +def identify(image_path: Path, *, check_visible: bool = True) -> ProvenanceReport: + """Identify an image's origin platform and watermark inventory. + + Args: + image_path: Path to the image (PNG, JPEG, WebP, or ISOBMFF container). + check_visible: Also run the visible Gemini-sparkle detector (cv2). Set + False for a pure-metadata, dependency-light scan. + + Returns: + A :class:`ProvenanceReport`. ``is_ai_generated`` is True when any AI + signal is found and None (unknown) when none is -- it is never asserted + False, because stripped metadata leaves no local proof of a clean origin. + """ + info = extract_c2pa_info(image_path) # PNG-structured; {} for other formats + meta = get_ai_metadata(image_path) # PNG text + EXIF + C2PA fields + synthid + + # First MB covers C2PA (PNG caBX, JPEG APP11, AVIF/HEIF/JXL uuid box) and + # IPTC markers for the non-PNG path where extract_c2pa_info returns {}. + with open(image_path, "rb") as f: + head = f.read(_SCAN_BYTES) + + signals: list[Signal] = [] + watermarks: list[str] = [] + caveats: list[str] = [] + + # ── C2PA Content Credentials ──────────────────────────────────── + has_c2pa = bool(info) or b"c2pa" in head.lower() or C2PA_UUID in head + issuers = [info["issuer"]] if info.get("issuer") else _issuers_in(head) + platform = _attribute_platform(issuers) if has_c2pa else None + c2pa_is_ai = "trainedAlgorithmicMedia" in info.get("source_type", "") or any( + m in head for m in (b"trainedAlgorithmicMedia", b"compositeWithTrainedAlgorithmicMedia") + ) + # Generator: structured for PNG, binary-scanned for other containers. + generator = info.get("claim_generator") or (", ".join(tools) if (tools := _ai_tools_in(head)) else None) + if has_c2pa: + detail = ", ".join(filter(None, [", ".join(issuers), generator, info.get("source_type")])) + signals.append(Signal("c2pa", detail or "C2PA manifest present", "high")) + watermarks.append(f"C2PA Content Credentials ({', '.join(issuers) or 'unknown signer'})") + + # ── SynthID metadata proxy ────────────────────────────────────── + # get_ai_metadata already sets synthid_watermark for both PNG (caBX parser) + # and non-PNG (its own synthid_source fallback), so no extra scan is needed. + synthid = meta.get("synthid_watermark") + if synthid: + watermarks.append(f"SynthID pixel watermark ({synthid})") + caveats.append(_SYNTHID_CAVEAT) + if "OpenAI" in (" ".join(issuers) + synthid): + caveats.append(_OPENAI_CAVEAT) + + # ── IPTC "Made with AI" (Meta etc.), only meaningful without C2PA ─ + iptc = any(m in head for m in IPTC_AI_MARKERS) + if iptc and not has_c2pa: + signals.append(Signal("iptc", "digitalSourceType (Made with AI)", "high")) + watermarks.append("IPTC digitalSourceType (Made with AI)") + caveats.append(_IPTC_ONLY_CAVEAT) + if platform is None: + platform = "Made-with-AI tag (e.g. Meta AI); platform not specified" + + # ── Local diffusion parameters (Stable Diffusion / ComfyUI) ────── + local_keys = sorted(k for k in meta if k.lower() in _LOCAL_GEN_KEYS) + if local_keys: + signals.append(Signal("gen_params", f"embedded keys: {', '.join(local_keys)}", "high")) + watermarks.append("Embedded generation parameters (Stable Diffusion / ComfyUI)") + if platform is None: + platform = "Stable Diffusion / local pipeline (Automatic1111, ComfyUI, InvokeAI)" + + # ── Verdict so far (metadata) ─────────────────────────────────── + ai_from_metadata = bool((has_c2pa and (c2pa_is_ai or synthid)) or iptc or local_keys) + + # ── Visible Gemini sparkle (fallback for stripped-metadata case) ─ + if check_visible and (conf := _visible_sparkle(image_path)) is not None and conf >= _SPARKLE_THRESHOLD: + signals.append(Signal("visible_sparkle", f"NCC confidence {conf:.2f}", "medium")) + watermarks.append(f"Visible Gemini sparkle (confidence {conf:.2f})") + if platform is None: + platform = "Google Gemini family (visible sparkle detected)" + + visible_only = any(s.name == "visible_sparkle" for s in signals) and not ai_from_metadata + + if ai_from_metadata: + is_ai: bool | None = True + confidence = "high" + elif visible_only: + is_ai = True + confidence = "medium" + else: + is_ai = None + confidence = "none" + + caveats.append(_STRIP_CAVEAT) + # De-duplicate while preserving order. + caveats = list(dict.fromkeys(caveats)) + + return ProvenanceReport( + path=image_path, + is_ai_generated=is_ai, + platform=platform, + confidence=confidence, + watermarks=watermarks, + signals=signals, + caveats=caveats, + ) diff --git a/tests/test_identify.py b/tests/test_identify.py new file mode 100644 index 0000000..cb1dec0 --- /dev/null +++ b/tests/test_identify.py @@ -0,0 +1,136 @@ +"""Tests for the provenance identifier (identify.py). + +Pure attribution logic is unit-tested directly; end-to-end verdicts assert +against the real committed C2PA / IPTC fixtures in data/samples/. +""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from remove_ai_watermarks.identify import ( + ProvenanceReport, + _ai_tools_in, + _attribute_platform, + _issuers_in, + identify, +) + +SAMPLES_DIR = Path(__file__).resolve().parent.parent / "data" / "samples" + + +# ── Pure attribution logic (no file IO) ───────────────────────────── + + +class TestAttributePlatform: + def test_openai(self): + assert "OpenAI" in (_attribute_platform(["OpenAI"]) or "") + + def test_designer_wins_over_openai_backend(self): + # Microsoft Designer signs as "OpenAI, Microsoft"; name the product. + platform = _attribute_platform(["OpenAI", "Microsoft"]) + assert platform + assert "Designer" in platform + + def test_adobe(self): + assert _attribute_platform(["Adobe"]) == "Adobe Firefly" + + def test_google(self): + assert "Google" in (_attribute_platform(["Google LLC"]) or "") + + def test_truepic_is_signer_not_generator(self): + platform = _attribute_platform(["Truepic"]) + assert platform + assert "signer" in platform.lower() + + def test_empty_is_none(self): + assert _attribute_platform([]) is None + + +class TestIssuersIn: + def test_finds_openai(self): + assert _issuers_in(b"...OpenAI...trainedAlgorithmicMedia") == ["OpenAI"] + + def test_finds_multiple_sorted(self): + assert _issuers_in(b"Microsoft and OpenAI") == ["Microsoft", "OpenAI"] + + def test_none_present(self): + assert _issuers_in(b"just some bytes") == [] + + +class TestAiToolsIn: + def test_finds_generator(self): + assert _ai_tools_in(b"...claim_generator Imagen 3...") == ["Imagen"] + + def test_none_present(self): + assert _ai_tools_in(b"a regular photo, no tools") == [] + + +class TestIdentifyNonPng: + """Non-PNG containers (JPEG/WebP/AVIF) carry C2PA where the caBX parser can't + reach; identify recovers issuer + generator via the binary scan. Synthetic + byte blobs mirror tests/test_metadata.py::TestSynthIDSourceNonPng. + """ + + def _c2pa_jpeg(self, tmp_path: Path, blob: bytes) -> Path: + path = tmp_path / "img.jpg" + path.write_bytes(b"\xff\xd8\xff\xe1jumbc2pa" + blob + b"\xff\xd9") + return path + + def test_google_imagen_jpeg(self, tmp_path: Path): + path = self._c2pa_jpeg(tmp_path, b"Google Imagen ... trainedAlgorithmicMedia") + r = identify(path, check_visible=False) + assert r.is_ai_generated is True + assert r.platform is not None + assert "Google" in r.platform + # Generator recovered from the non-PNG blob shows up in the c2pa signal. + c2pa_signal = next(s for s in r.signals if s.name == "c2pa") + assert "Imagen" in c2pa_signal.detail + + def test_openai_jpeg_has_synthid(self, tmp_path: Path): + path = self._c2pa_jpeg(tmp_path, b"OpenAI DALL-E ... trainedAlgorithmicMedia") + r = identify(path, check_visible=False) + assert any("SynthID" in w for w in r.watermarks) + + +# ── End-to-end verdicts on real fixtures ──────────────────────────── + + +@pytest.mark.skipif(not SAMPLES_DIR.exists(), reason="data/samples not present") +class TestIdentifyRealSamples: + def test_openai_chatgpt(self): + r = identify(SAMPLES_DIR / "chatgpt-1.png", check_visible=False) + assert r.is_ai_generated is True + assert r.confidence == "high" + assert r.platform + assert "OpenAI" in r.platform + assert any("C2PA" in w for w in r.watermarks) + assert any("SynthID" in w for w in r.watermarks) + + def test_adobe_firefly_has_no_synthid(self): + r = identify(SAMPLES_DIR / "firefly-1.png", check_visible=False) + assert r.is_ai_generated is True + assert r.platform == "Adobe Firefly" + assert not any("SynthID" in w for w in r.watermarks) + + def test_iptc_made_with_ai(self): + # mj-1.png carries the IPTC digitalSourceType "Made with AI" marker. + r = identify(SAMPLES_DIR / "mj-1.png", check_visible=False) + assert r.is_ai_generated is True + assert any("IPTC" in w for w in r.watermarks) + + def test_clean_photo_is_unknown_not_clean(self): + r = identify(SAMPLES_DIR / "not-ai-1.jpeg", check_visible=False) + assert r.is_ai_generated is None # never asserted False + assert r.platform is None + assert r.confidence == "none" + assert r.watermarks == [] + + def test_strip_caveat_always_present(self): + r = identify(SAMPLES_DIR / "not-ai-1.jpeg", check_visible=False) + assert any("not proof" in c for c in r.caveats) + + def test_returns_report_dataclass(self): + assert isinstance(identify(SAMPLES_DIR / "firefly-1.png", check_visible=False), ProvenanceReport)