mirror of
https://github.com/wiltodelta/remove-ai-watermarks.git
synced 2026-07-04 23:47:49 +02:00
19f9ab0947
Regenerating pixels removes SynthID / open watermarks but degrades a real photo, so running it on a clean image is the dominant paid score-0 cause on no-watermark uploads. Gate invisible/all/batch on identify.has_invisible_target: when no invisible AI signal is locally detectable and --force is unset, skip the regeneration. Per-command semantics: - invisible: write no output, exit EXIT_NO_INVISIBLE_SIGNAL (2) - all: skip step 2 but keep visible-removed pixels + strip metadata, exit 0 - batch: skip the scrub; copy the input through in invisible mode A skip never claims the image is clean (a pixel SynthID is undetectable once its metadata proxy is gone); the message says so and routes to --force. The gate fails safe (a detector error runs the removal). has_invisible_target wraps identify(check_visible=False, check_invisible=True) and returns the new ProvenanceReport.ai_from_metadata field (the confidence==high union), so the raiw.cc worker can reuse the same gate. Gate placed before engine construction so the skip path is cheap; shared via cli._should_skip_invisible_scrub. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1053 lines
48 KiB
Python
1053 lines
48 KiB
Python
"""Tests for the provenance identifier (identify.py).
|
|
|
|
Pure attribution logic is unit-tested directly; end-to-end verdicts assert
|
|
against the real committed C2PA / IPTC fixtures in data/samples/.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import subprocess
|
|
import sys
|
|
from dataclasses import asdict
|
|
from pathlib import Path
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
from remove_ai_watermarks.identify import (
|
|
ProvenanceReport,
|
|
_ai_tools_in,
|
|
_attribute_platform,
|
|
_integrity_clashes,
|
|
_issuers_in,
|
|
_vendor_of,
|
|
has_invisible_target,
|
|
identify,
|
|
)
|
|
from remove_ai_watermarks.watermark_registry import GEMINI_SPARKLE_TRUST_CONF
|
|
|
|
# Where the lazy import inside identify._visible_sparkle resolves the detector.
|
|
_SPARKLE_TARGET = "remove_ai_watermarks.gemini_engine.detect_sparkle_confidence"
|
|
|
|
SAMPLES_DIR = Path(__file__).resolve().parent.parent / "data" / "samples"
|
|
|
|
|
|
# ── Pure attribution logic (no file IO) ─────────────────────────────
|
|
|
|
|
|
class TestAttributePlatform:
|
|
def test_openai(self):
|
|
assert "OpenAI" in (_attribute_platform(["OpenAI"]) or "")
|
|
|
|
def test_designer_wins_over_openai_backend(self):
|
|
# Microsoft Designer signs as "OpenAI, Microsoft"; name the product.
|
|
platform = _attribute_platform(["OpenAI", "Microsoft"])
|
|
assert platform
|
|
assert "Designer" in platform
|
|
|
|
def test_adobe(self):
|
|
assert _attribute_platform(["Adobe"]) == "Adobe Firefly"
|
|
|
|
def test_google(self):
|
|
assert "Google" in (_attribute_platform(["Google LLC"]) or "")
|
|
|
|
def test_truepic_is_signer_not_generator(self):
|
|
platform = _attribute_platform(["Truepic"])
|
|
assert platform
|
|
assert "signer" in platform.lower()
|
|
|
|
def test_microsoft_label_is_model_neutral(self):
|
|
# Bing now runs MAI-Image, not DALL-E; the label must not claim DALL-E.
|
|
platform = _attribute_platform(["Microsoft"])
|
|
assert platform
|
|
assert "DALL-E" not in platform
|
|
|
|
def test_stability(self):
|
|
platform = _attribute_platform(["Stability AI"])
|
|
assert platform
|
|
assert "Stability AI" in platform
|
|
|
|
def test_canva(self):
|
|
platform = _attribute_platform(["Canva"])
|
|
assert platform
|
|
assert "Canva" in platform
|
|
|
|
def test_byteplus_attributes_to_bytedance(self):
|
|
# ByteDance's intl brand signs as "Byteplus Pte. Ltd."; the registry maps
|
|
# it to the ByteDance platform (was mis-read as Adobe via an incidental
|
|
# "Adobe XMP" file string before the entry existed).
|
|
platform = _attribute_platform(["BytePlus (ByteDance)"])
|
|
assert platform
|
|
assert "ByteDance" in platform
|
|
|
|
def test_empty_is_none(self):
|
|
assert _attribute_platform([]) is None
|
|
|
|
|
|
class TestIssuersIn:
|
|
def test_finds_openai(self):
|
|
assert _issuers_in(b"...OpenAI...trainedAlgorithmicMedia") == ["OpenAI"]
|
|
|
|
def test_finds_multiple_sorted(self):
|
|
assert _issuers_in(b"Microsoft and OpenAI") == ["Microsoft", "OpenAI"]
|
|
|
|
def test_none_present(self):
|
|
assert _issuers_in(b"just some bytes") == []
|
|
|
|
|
|
class TestAiToolsIn:
|
|
def test_finds_generator(self):
|
|
assert _ai_tools_in(b"...claim_generator Imagen 3...") == ["Imagen"]
|
|
|
|
def test_none_present(self):
|
|
assert _ai_tools_in(b"a regular photo, no tools") == []
|
|
|
|
|
|
class TestIdentifyNonPng:
|
|
"""Non-PNG containers (JPEG/WebP/AVIF) carry C2PA where the caBX parser can't
|
|
reach; identify recovers issuer + generator via the binary scan. Synthetic
|
|
byte blobs mirror tests/test_metadata.py::TestSynthIDSourceNonPng.
|
|
"""
|
|
|
|
def _c2pa_jpeg(self, tmp_path: Path, blob: bytes) -> Path:
|
|
path = tmp_path / "img.jpg"
|
|
path.write_bytes(b"\xff\xd8\xff\xe1jumbc2pa" + blob + b"\xff\xd9")
|
|
return path
|
|
|
|
def test_google_imagen_jpeg(self, tmp_path: Path):
|
|
path = self._c2pa_jpeg(tmp_path, b"Google Imagen ... trainedAlgorithmicMedia")
|
|
r = identify(path, check_visible=False)
|
|
assert r.is_ai_generated is True
|
|
assert r.platform is not None
|
|
assert "Google" in r.platform
|
|
# Generator recovered from the non-PNG blob shows up in the c2pa signal.
|
|
c2pa_signal = next(s for s in r.signals if s.name == "c2pa")
|
|
assert "Imagen" in c2pa_signal.detail
|
|
|
|
def test_openai_jpeg_has_synthid(self, tmp_path: Path):
|
|
path = self._c2pa_jpeg(tmp_path, b"OpenAI DALL-E ... trainedAlgorithmicMedia")
|
|
r = identify(path, check_visible=False)
|
|
assert any("SynthID" in w for w in r.watermarks)
|
|
|
|
def test_black_forest_labs_flux_attributed(self, tmp_path: Path):
|
|
path = self._c2pa_jpeg(tmp_path, b"Black Forest Labs API ... trainedAlgorithmicMedia")
|
|
r = identify(path, check_visible=False, check_invisible=False)
|
|
assert r.is_ai_generated is True
|
|
assert r.platform == "Black Forest Labs (FLUX)"
|
|
|
|
def test_bytedance_volcengine_attributed(self, tmp_path: Path):
|
|
path = self._c2pa_jpeg(tmp_path, b"certificate_center@volcengine.com ... trainedAlgorithmicMedia")
|
|
r = identify(path, check_visible=False, check_invisible=False)
|
|
assert r.is_ai_generated is True
|
|
assert "ByteDance" in (r.platform or "")
|
|
|
|
def test_bytedance_chinese_legal_name_attributed(self, tmp_path: Path):
|
|
# Some Volcano Engine certs name the signer with the Chinese legal entity
|
|
# rather than the latin "volcengine"; the latin needle misses it, so the
|
|
# Chinese-name registry entry is what attributes real ByteDance output.
|
|
blob = "北京火山引擎科技有限公司".encode() + b" ... trainedAlgorithmicMedia"
|
|
path = self._c2pa_jpeg(tmp_path, blob)
|
|
r = identify(path, check_visible=False, check_invisible=False)
|
|
assert r.is_ai_generated is True
|
|
assert "ByteDance" in (r.platform or "")
|
|
|
|
def test_elevenlabs_attributed(self, tmp_path: Path):
|
|
path = self._c2pa_jpeg(tmp_path, b"Eleven Labs Inc. ... trainedAlgorithmicMedia")
|
|
r = identify(path, check_visible=False, check_invisible=False)
|
|
assert r.is_ai_generated is True
|
|
assert r.platform == "ElevenLabs"
|
|
assert not any("SynthID" in w for w in r.watermarks) # ElevenLabs does not use SynthID
|
|
|
|
def test_stability_ai_issuer_attributed_no_synthid(self, tmp_path: Path):
|
|
path = self._c2pa_jpeg(tmp_path, b"Stability AI ... trainedAlgorithmicMedia")
|
|
r = identify(path, check_visible=False)
|
|
assert r.is_ai_generated is True
|
|
assert r.platform is not None
|
|
assert "Stability AI" in r.platform
|
|
assert not any("SynthID" in w for w in r.watermarks) # Stability does not use SynthID
|
|
|
|
def test_trained_source_is_generated_kind(self, tmp_path: Path):
|
|
path = self._c2pa_jpeg(tmp_path, b"OpenAI ... trainedAlgorithmicMedia")
|
|
r = identify(path, check_visible=False, check_invisible=False)
|
|
assert r.is_ai_generated is True
|
|
assert r.ai_source_kind == "generated"
|
|
|
|
def test_composite_source_is_enhanced_kind(self, tmp_path: Path):
|
|
# compositeWithTrainedAlgorithmicMedia: a real photo with an AI-composited
|
|
# region. Still AI (is_ai True), but the kind must read "enhanced" so a
|
|
# caller can do region-targeted cleaning instead of a full-frame regen.
|
|
path = self._c2pa_jpeg(tmp_path, b"Adobe ... compositeWithTrainedAlgorithmicMedia")
|
|
r = identify(path, check_visible=False, check_invisible=False)
|
|
assert r.is_ai_generated is True
|
|
assert r.ai_source_kind == "enhanced"
|
|
|
|
def test_c2pa_without_ai_marker_is_unknown(self, tmp_path: Path):
|
|
# Adobe signs C2PA on plain Photoshop edits too. Without an AI digital-
|
|
# source marker, the honest verdict is unknown -- the C2PA watermark is
|
|
# still listed, but is_ai_generated is not asserted True.
|
|
path = self._c2pa_jpeg(tmp_path, b"Adobe ... no ai marker here")
|
|
r = identify(path, check_visible=False)
|
|
assert r.is_ai_generated is None
|
|
assert any("C2PA" in w for w in r.watermarks)
|
|
assert not any("SynthID" in w for w in r.watermarks)
|
|
|
|
|
|
class TestIdentifySamsungGalaxy:
|
|
"""Samsung Galaxy / ASUS Gallery C2PA signers (verified on real signed files
|
|
2026-05-29; synthetic byte blobs here since the originals are private).
|
|
|
|
Galaxy AI edits stamp BOTH the device cert AND an AI source-type / genAIType,
|
|
so the signer attribution must NOT trip the camera-vs-AI integrity clash.
|
|
"""
|
|
|
|
def _jpeg(self, tmp_path: Path, name: str, blob: bytes) -> Path:
|
|
path = tmp_path / name
|
|
path.write_bytes(b"\xff\xd8\xff\xe1jumbc2pa" + blob + b"\xff\xd9")
|
|
return path
|
|
|
|
def test_galaxy_trained_source_is_high_ai(self, tmp_path: Path):
|
|
path = self._jpeg(tmp_path, "s25.jpg", b"Samsung Galaxy Galaxy S25 c2pa-rs trainedAlgorithmicMedia")
|
|
r = identify(path, check_visible=False, check_invisible=False)
|
|
assert r.is_ai_generated is True
|
|
assert r.confidence == "high"
|
|
assert r.platform == "Samsung Galaxy (C2PA)"
|
|
assert r.integrity_clashes == [] # device cert + AI source-type is legitimate, not a clash
|
|
|
|
def test_galaxy_genai_only_is_medium_ai(self, tmp_path: Path):
|
|
# The Galaxy S24 case: no trainedAlgorithmicMedia, genAIType is the only
|
|
# AI marker -- previously missed, now a medium-confidence verdict.
|
|
path = self._jpeg(
|
|
tmp_path, "s24.jpg", b'Samsung Galaxy Galaxy S24 c2pa-rs PhotoEditor_Re_Edit_Data{"genAIType":1}'
|
|
)
|
|
r = identify(path, check_visible=False, check_invisible=False)
|
|
assert r.is_ai_generated is True
|
|
assert r.confidence == "medium"
|
|
assert r.platform == "Samsung Galaxy (C2PA)"
|
|
assert any(s.name == "samsung_genai" for s in r.signals)
|
|
assert r.integrity_clashes == []
|
|
|
|
def test_asus_gallery_signer_not_ai(self, tmp_path: Path):
|
|
# ASUS Gallery signs edited photos; no AI source-type or genAIType, so the
|
|
# platform is attributed but the verdict stays unknown.
|
|
path = self._jpeg(tmp_path, "asus.jpg", b"/com.asus.gallery/3.8.0.98 c2pa-rs no ai marker")
|
|
r = identify(path, check_visible=False, check_invisible=False)
|
|
assert r.is_ai_generated is None
|
|
assert r.platform == "ASUS Gallery (C2PA signer)"
|
|
assert any("C2PA" in w for w in r.watermarks)
|
|
|
|
def test_galaxy_capture_without_ai_marker_is_not_ai(self, tmp_path: Path):
|
|
# A genuine Galaxy phone capture carries Samsung Galaxy C2PA provenance but
|
|
# NO AI source-type / genAIType. It must stay is_ai=None -- the device cert
|
|
# is authenticity provenance of a real photo, not an AI-generation signal.
|
|
path = self._jpeg(tmp_path, "s25_capture.jpg", b"Samsung Galaxy Galaxy S25 c2pa-rs no ai marker")
|
|
r = identify(path, check_visible=False, check_invisible=False)
|
|
assert r.is_ai_generated is None
|
|
assert r.platform == "Samsung Galaxy (C2PA)"
|
|
assert any("C2PA" in w for w in r.watermarks)
|
|
|
|
|
|
# ── End-to-end verdicts on real fixtures ────────────────────────────
|
|
|
|
|
|
@pytest.mark.skipif(not SAMPLES_DIR.exists(), reason="data/samples not present")
|
|
class TestIdentifyRealSamples:
|
|
def test_openai_chatgpt(self):
|
|
r = identify(SAMPLES_DIR / "chatgpt-1.png", check_visible=False)
|
|
assert r.is_ai_generated is True
|
|
assert r.confidence == "high"
|
|
assert r.platform
|
|
assert "OpenAI" in r.platform
|
|
assert any("C2PA" in w for w in r.watermarks)
|
|
assert any("SynthID" in w for w in r.watermarks)
|
|
|
|
def test_adobe_firefly_has_no_synthid(self):
|
|
r = identify(SAMPLES_DIR / "firefly-1.png", check_visible=False)
|
|
assert r.is_ai_generated is True
|
|
assert r.platform == "Adobe Firefly"
|
|
assert not any("SynthID" in w for w in r.watermarks)
|
|
|
|
def test_iptc_made_with_ai(self):
|
|
# mj-1.png carries the IPTC digitalSourceType "Made with AI" marker.
|
|
r = identify(SAMPLES_DIR / "mj-1.png", check_visible=False)
|
|
assert r.is_ai_generated is True
|
|
assert any("IPTC" in w for w in r.watermarks)
|
|
|
|
def test_flux_bfl_c2pa_png(self):
|
|
# flux-1.png: real Black Forest Labs FLUX.2 Playground output (signed C2PA).
|
|
r = identify(SAMPLES_DIR / "flux-1.png", check_visible=False)
|
|
assert r.is_ai_generated is True
|
|
assert r.platform == "Black Forest Labs (FLUX)"
|
|
|
|
def test_flux_bfl_c2pa_jpeg_via_reader(self):
|
|
# flux-1.jpg: same source as a JPEG -- the real committed JPEG-with-C2PA
|
|
# fixture that exercises the c2pa-python non-PNG reader path end to end.
|
|
r = identify(SAMPLES_DIR / "flux-1.jpg", check_visible=False)
|
|
assert r.is_ai_generated is True
|
|
assert r.platform == "Black Forest Labs (FLUX)"
|
|
|
|
def test_clean_photo_is_unknown_not_clean(self, clean_photo: Path):
|
|
r = identify(clean_photo, check_visible=False)
|
|
assert r.is_ai_generated is None # never asserted False
|
|
assert r.platform is None
|
|
assert r.confidence == "none"
|
|
assert r.watermarks == []
|
|
|
|
def test_has_invisible_target_true_on_metadata_ai(self):
|
|
# The scrub gate: a C2PA/SynthID image and an IPTC "Made with AI" image are
|
|
# both invisible/metadata targets, so the diffusion scrub should run.
|
|
assert has_invisible_target(SAMPLES_DIR / "chatgpt-1.png") is True
|
|
assert has_invisible_target(SAMPLES_DIR / "mj-1.png") is True
|
|
# ai_from_metadata mirrors confidence == "high" and backs the helper.
|
|
assert identify(SAMPLES_DIR / "chatgpt-1.png", check_visible=False).ai_from_metadata is True
|
|
|
|
def test_has_invisible_target_false_on_clean_photo(self, clean_photo: Path):
|
|
# No detectable invisible signal -> skip the scrub (do not degrade a clean image).
|
|
assert has_invisible_target(clean_photo) is False
|
|
assert identify(clean_photo, check_visible=False).ai_from_metadata is False
|
|
|
|
def test_strip_caveat_always_present(self, clean_photo: Path):
|
|
r = identify(clean_photo, check_visible=False)
|
|
assert any("not proof" in c for c in r.caveats)
|
|
|
|
def test_returns_report_dataclass(self):
|
|
assert isinstance(identify(SAMPLES_DIR / "firefly-1.png", check_visible=False), ProvenanceReport)
|
|
|
|
|
|
class TestHasInvisibleTargetFailSafe:
|
|
"""The scrub gate fails SAFE: when a detector errors, it runs the removal."""
|
|
|
|
def test_detector_error_defaults_to_run(self, tmp_path: Path):
|
|
# If identify raises (a detector crash), the gate must return True so the
|
|
# caller still attempts removal -- leaving a watermark on a paid removal is
|
|
# worse than over-regenerating. (Garbage bytes do NOT raise; identify returns
|
|
# a clean None verdict there, so that path correctly skips -- see below.)
|
|
bad = tmp_path / "x.png"
|
|
bad.write_bytes(b"not image bytes")
|
|
with patch("remove_ai_watermarks.identify.identify", side_effect=RuntimeError("boom")):
|
|
assert has_invisible_target(bad) is True
|
|
|
|
def test_unreadable_bytes_are_not_a_target(self, tmp_path: Path):
|
|
# No raise, no signal -> not a scrub target (the CLI rejects undecodable
|
|
# images earlier anyway; this only documents the gate's own verdict).
|
|
bad = tmp_path / "x.png"
|
|
bad.write_bytes(b"not image bytes")
|
|
assert has_invisible_target(bad) is False
|
|
|
|
def test_local_ai_params_are_a_target(self, tmp_png_with_ai_metadata: Path):
|
|
assert has_invisible_target(tmp_png_with_ai_metadata) is True
|
|
|
|
|
|
# ── Local diffusion parameters (Stable Diffusion / ComfyUI) ─────────
|
|
|
|
|
|
class TestIdentifyLocalParams:
|
|
"""A PNG carrying SD-style generation params is attributed to a local pipeline."""
|
|
|
|
def test_sd_params_attributed_to_local_pipeline(self, tmp_png_with_ai_metadata: Path):
|
|
r = identify(tmp_png_with_ai_metadata, check_visible=False)
|
|
assert r.is_ai_generated is True
|
|
assert r.confidence == "high"
|
|
assert r.platform is not None
|
|
assert "Stable Diffusion" in r.platform
|
|
assert any("generation parameters" in w for w in r.watermarks)
|
|
|
|
def test_gen_params_signal_lists_keys(self, tmp_png_with_ai_metadata: Path):
|
|
r = identify(tmp_png_with_ai_metadata, check_visible=False)
|
|
signal = next(s for s in r.signals if s.name == "gen_params")
|
|
assert "parameters" in signal.detail
|
|
assert signal.confidence == "high"
|
|
|
|
def test_local_gen_params_have_no_c2pa_source_kind(self, tmp_png_with_ai_metadata: Path):
|
|
# AI verdict from local SD params (not C2PA) -> ai_source_kind stays None.
|
|
r = identify(tmp_png_with_ai_metadata, check_visible=False)
|
|
assert r.is_ai_generated is True
|
|
assert r.ai_source_kind is None
|
|
|
|
def test_clean_png_is_unknown(self, tmp_clean_png: Path):
|
|
r = identify(tmp_clean_png, check_visible=False)
|
|
assert r.is_ai_generated is None
|
|
assert r.platform is None
|
|
assert r.confidence == "none"
|
|
assert r.signals == []
|
|
|
|
|
|
# ── China TC260 AIGC label as a PNG text chunk (Doubao) ─────────────
|
|
|
|
|
|
class TestIdentifyAigcPngChunk:
|
|
"""The raw-JSON ``AIGC`` PNG chunk (no namespaced XMP marker) is a high-
|
|
confidence AI verdict, same as the XMP form."""
|
|
|
|
def _aigc_chunk_png(self, tmp_path: Path) -> Path:
|
|
from PIL import Image
|
|
from PIL.PngImagePlugin import PngInfo
|
|
|
|
p = tmp_path / "doubao_chunk.png"
|
|
pnginfo = PngInfo()
|
|
pnginfo.add_text("AIGC", json.dumps({"Label": "1", "ContentProducer": "doubao"}))
|
|
Image.new("RGB", (32, 32)).save(p, pnginfo=pnginfo)
|
|
return p
|
|
|
|
def test_png_chunk_detected_high(self, tmp_path: Path):
|
|
r = identify(self._aigc_chunk_png(tmp_path), check_visible=False)
|
|
assert r.is_ai_generated is True
|
|
assert r.confidence == "high"
|
|
assert r.platform is not None
|
|
assert "AIGC" in r.platform
|
|
signal = next(s for s in r.signals if s.name == "aigc")
|
|
assert "doubao" in signal.detail
|
|
|
|
|
|
# ── HuggingFace-hosted job marker (medium confidence) ───────────────
|
|
|
|
|
|
class TestIdentifyHuggingFaceJob:
|
|
"""The hf-job-id chunk lifts an otherwise-Unknown verdict to a tentative
|
|
(medium) AI, never overriding a high-confidence metadata signal."""
|
|
|
|
def _hf_png(self, tmp_path: Path) -> Path:
|
|
from PIL import Image
|
|
from PIL.PngImagePlugin import PngInfo
|
|
|
|
p = tmp_path / "hfjob.png"
|
|
pnginfo = PngInfo()
|
|
pnginfo.add_text("hf-job-id", "ec8380a6-2091-423a-b835-209420f99ee1")
|
|
Image.new("RGB", (32, 32)).save(p, pnginfo=pnginfo)
|
|
return p
|
|
|
|
def test_hf_job_promotes_to_medium(self, tmp_path: Path):
|
|
r = identify(self._hf_png(tmp_path), check_visible=False)
|
|
assert r.is_ai_generated is True
|
|
assert r.confidence == "medium"
|
|
assert r.platform is not None
|
|
assert "HuggingFace" in r.platform
|
|
signal = next(s for s in r.signals if s.name == "hf_job")
|
|
assert signal.confidence == "medium"
|
|
|
|
def test_hf_job_caveat_present(self, tmp_path: Path):
|
|
r = identify(self._hf_png(tmp_path), check_visible=False)
|
|
assert any("hf-job-id" in c for c in r.caveats)
|
|
|
|
def test_metadata_keeps_high_even_with_hf_job(self, tmp_png_with_ai_metadata: Path):
|
|
# A high-confidence metadata verdict is not downgraded by an hf-job hit.
|
|
from PIL import Image
|
|
from PIL.PngImagePlugin import PngInfo
|
|
|
|
img = Image.open(tmp_png_with_ai_metadata)
|
|
pnginfo = PngInfo()
|
|
for k, v in img.text.items():
|
|
pnginfo.add_text(k, v)
|
|
pnginfo.add_text("hf-job-id", "ec8380a6-2091-423a-b835-209420f99ee1")
|
|
img.save(tmp_png_with_ai_metadata, pnginfo=pnginfo)
|
|
r = identify(tmp_png_with_ai_metadata, check_visible=False)
|
|
assert r.confidence == "high"
|
|
|
|
|
|
# ── Visible-sparkle fallback (mocked detector) ──────────────────────
|
|
|
|
|
|
class TestIdentifyVisibleSparkle:
|
|
"""The visible-sparkle signal gates on the corpus-tuned threshold (0.5)."""
|
|
|
|
def test_above_threshold_promotes_to_medium(self, tmp_clean_png: Path):
|
|
with patch(_SPARKLE_TARGET, return_value=0.7):
|
|
r = identify(tmp_clean_png, check_visible=True)
|
|
assert r.is_ai_generated is True
|
|
assert r.confidence == "medium"
|
|
assert r.platform is not None
|
|
assert "Gemini" in r.platform
|
|
signal = next(s for s in r.signals if s.name == "visible_sparkle")
|
|
assert signal.confidence == "medium"
|
|
|
|
def test_below_threshold_not_promoted(self, tmp_clean_png: Path):
|
|
with patch(_SPARKLE_TARGET, return_value=0.4):
|
|
r = identify(tmp_clean_png, check_visible=True)
|
|
assert r.is_ai_generated is None
|
|
assert not any(s.name == "visible_sparkle" for s in r.signals)
|
|
|
|
def test_detector_unavailable_does_not_crash(self, tmp_clean_png: Path):
|
|
with patch(_SPARKLE_TARGET, return_value=None):
|
|
r = identify(tmp_clean_png, check_visible=True)
|
|
assert r.is_ai_generated is None
|
|
assert not any(s.name == "visible_sparkle" for s in r.signals)
|
|
|
|
def test_check_visible_false_skips_detector(self, tmp_clean_png: Path):
|
|
# Even a strong detection is ignored when the caller opts out.
|
|
with patch(_SPARKLE_TARGET, return_value=0.99) as mock_detect:
|
|
r = identify(tmp_clean_png, check_visible=False)
|
|
mock_detect.assert_not_called()
|
|
assert not any(s.name == "visible_sparkle" for s in r.signals)
|
|
|
|
def test_metadata_keeps_high_even_with_sparkle(self, tmp_png_with_ai_metadata: Path):
|
|
# Metadata verdict (high) is not downgraded by an additional sparkle hit.
|
|
with patch(_SPARKLE_TARGET, return_value=0.7):
|
|
r = identify(tmp_png_with_ai_metadata, check_visible=True)
|
|
assert r.confidence == "high"
|
|
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parent.parent
|
|
_DEMO_BEFORE = REPO_ROOT / "demo_banana_before.png"
|
|
_DEMO_AFTER = REPO_ROOT / "demo_banana_after.png"
|
|
|
|
|
|
@pytest.mark.skipif(not (_DEMO_BEFORE.exists() and _DEMO_AFTER.exists()), reason="demo banana pair not present")
|
|
class TestSparkleDetectRemoveAlignment:
|
|
"""Detect (identify) and remove (registry.best_auto_mark) must agree on the
|
|
same image -- the retained-corpus desync where identify reported a sparkle the
|
|
removal arbitration declined (or vice versa). Both gate on the single shared
|
|
GEMINI_SPARKLE_TRUST_CONF, so a sparkle just over the line is taken by BOTH
|
|
and one just under is declined by BOTH. Fixtures composite the real captured
|
|
sparkle (before-minus-after) back at reduced opacity to land on either side.
|
|
"""
|
|
|
|
@staticmethod
|
|
def _faint_sparkle(tmp_path: Path, opacity: float) -> Path:
|
|
import numpy as np
|
|
|
|
from remove_ai_watermarks import image_io
|
|
|
|
before = image_io.imread(_DEMO_BEFORE).astype("float32")
|
|
after = image_io.imread(_DEMO_AFTER).astype("float32")
|
|
faint = np.clip(after + opacity * (before - after), 0, 255).astype("uint8")
|
|
out = tmp_path / f"sparkle_{int(opacity * 100)}.png"
|
|
image_io.imwrite(out, faint)
|
|
return out
|
|
|
|
def _detect_remove(self, path: Path) -> tuple[bool, bool, float]:
|
|
from remove_ai_watermarks import image_io, watermark_registry
|
|
from remove_ai_watermarks.gemini_engine import detect_sparkle_confidence
|
|
|
|
conf = detect_sparkle_confidence(path) or 0.0
|
|
identify_fires = conf >= GEMINI_SPARKLE_TRUST_CONF
|
|
best = watermark_registry.best_auto_mark(image_io.imread(path))
|
|
remove_takes_gemini = best is not None and best.key == "gemini"
|
|
return identify_fires, remove_takes_gemini, conf
|
|
|
|
def test_above_threshold_both_fire(self, tmp_path: Path):
|
|
path = self._faint_sparkle(tmp_path, 0.7) # ~0.55 conf, just over the line
|
|
identify_fires, remove_takes, conf = self._detect_remove(path)
|
|
assert conf >= GEMINI_SPARKLE_TRUST_CONF
|
|
assert identify_fires, f"identify declined a sparkle above threshold (conf={conf:.3f})"
|
|
assert remove_takes, f"removal declined a sparkle above threshold (conf={conf:.3f})"
|
|
|
|
def test_below_threshold_both_decline(self, tmp_path: Path):
|
|
path = self._faint_sparkle(tmp_path, 0.5) # ~0.37 conf, just under the line
|
|
identify_fires, remove_takes, conf = self._detect_remove(path)
|
|
assert conf < GEMINI_SPARKLE_TRUST_CONF
|
|
assert not identify_fires, f"identify fired below threshold (conf={conf:.3f})"
|
|
assert not remove_takes, f"removal fired below threshold (conf={conf:.3f})"
|
|
|
|
def test_full_strength_both_fire(self):
|
|
# The shipped demo sparkle at full strength: unambiguous agreement.
|
|
identify_fires, remove_takes, conf = self._detect_remove(_DEMO_BEFORE)
|
|
assert conf >= GEMINI_SPARKLE_TRUST_CONF
|
|
assert identify_fires
|
|
assert remove_takes
|
|
|
|
|
|
class TestIdentifyImportIsLight:
|
|
"""`import identify` must stay torch-free (lazy noai/__init__): the package
|
|
is deployed on a 512 MB host where eagerly pulling torch/diffusers OOMs."""
|
|
|
|
def test_import_identify_does_not_pull_torch(self):
|
|
# Only meaningful where torch is installed (the gpu/detect extra); on a
|
|
# core-only CI runner torch can't be in sys.modules anyway.
|
|
pytest.importorskip("torch")
|
|
code = "import sys, remove_ai_watermarks.identify; sys.exit(1 if 'torch' in sys.modules else 0)"
|
|
result = subprocess.run([sys.executable, "-c", code], capture_output=True, check=False) # noqa: S603
|
|
assert result.returncode == 0, f"import identify pulled torch: {result.stderr.decode()[-500:]}"
|
|
|
|
|
|
# Where the registry-backed Doubao/Jimeng visible detector resolves.
|
|
_TEXT_MARKS_TARGET = "remove_ai_watermarks.identify._visible_text_marks"
|
|
|
|
|
|
class TestIdentifyVisibleTextMarks:
|
|
"""The visible Doubao/Jimeng marks are a stripped-metadata visual fallback,
|
|
parallel to the Gemini sparkle: each lifts an Unknown verdict to medium."""
|
|
|
|
@staticmethod
|
|
def _detection(key: str, label: str, conf: float):
|
|
from remove_ai_watermarks.watermark_registry import MarkDetection
|
|
|
|
return MarkDetection(key, label, "bottom-right", True, conf, (0, 0, 10, 10))
|
|
|
|
def test_doubao_promotes_to_medium(self, tmp_clean_png: Path):
|
|
det = self._detection("doubao", "Doubao 豆包AI生成 text", 0.8)
|
|
with patch(_SPARKLE_TARGET, return_value=None), patch(_TEXT_MARKS_TARGET, return_value=[det]):
|
|
r = identify(tmp_clean_png, check_visible=True)
|
|
assert r.is_ai_generated is True
|
|
assert r.confidence == "medium"
|
|
assert r.platform is not None
|
|
assert "Doubao" in r.platform
|
|
signal = next(s for s in r.signals if s.name == "visible_doubao")
|
|
assert signal.confidence == "medium"
|
|
|
|
def test_jimeng_promotes_to_medium(self, tmp_clean_png: Path):
|
|
det = self._detection("jimeng", "Jimeng 即梦AI wordmark", 0.9)
|
|
with patch(_SPARKLE_TARGET, return_value=None), patch(_TEXT_MARKS_TARGET, return_value=[det]):
|
|
r = identify(tmp_clean_png, check_visible=True)
|
|
assert r.is_ai_generated is True
|
|
assert r.confidence == "medium"
|
|
assert r.platform is not None
|
|
assert "Jimeng" in r.platform
|
|
assert any(s.name == "visible_jimeng" for s in r.signals)
|
|
|
|
def test_check_visible_false_skips_text_marks(self, tmp_clean_png: Path):
|
|
det = self._detection("doubao", "Doubao 豆包AI生成 text", 0.99)
|
|
with patch(_SPARKLE_TARGET, return_value=None), patch(_TEXT_MARKS_TARGET, return_value=[det]) as mock:
|
|
r = identify(tmp_clean_png, check_visible=False)
|
|
mock.assert_not_called()
|
|
assert not any(s.name == "visible_doubao" for s in r.signals)
|
|
|
|
def test_metadata_keeps_high_even_with_text_mark(self, tmp_png_with_ai_metadata: Path):
|
|
det = self._detection("doubao", "Doubao 豆包AI生成 text", 0.8)
|
|
with patch(_SPARKLE_TARGET, return_value=None), patch(_TEXT_MARKS_TARGET, return_value=[det]):
|
|
r = identify(tmp_png_with_ai_metadata, check_visible=True)
|
|
assert r.confidence == "high"
|
|
|
|
def test_visible_path_decodes_file_once(self, tmp_clean_png: Path):
|
|
"""The web path identify(check_visible=True, check_invisible=False) must
|
|
decode the image exactly once and share the array across the sparkle +
|
|
text-mark detectors. Two decodes of the same bitmap spiked memory on the
|
|
small web worker (the OOM the decode-once refactor addresses)."""
|
|
import remove_ai_watermarks.image_io as image_io
|
|
|
|
real_imread = image_io.imread
|
|
with patch.object(image_io, "imread", side_effect=real_imread) as mock_imread:
|
|
identify(tmp_clean_png, check_visible=True, check_invisible=False)
|
|
assert mock_imread.call_count == 1
|
|
|
|
|
|
# ── Caveats and serialization ───────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.skipif(not SAMPLES_DIR.exists(), reason="data/samples not present")
|
|
class TestIdentifyCaveats:
|
|
def test_openai_hedge_caveat_present(self):
|
|
r = identify(SAMPLES_DIR / "chatgpt-1.png", check_visible=False)
|
|
assert any("before the rollout" in c for c in r.caveats)
|
|
|
|
def test_synthid_proxy_caveat_present(self):
|
|
r = identify(SAMPLES_DIR / "chatgpt-1.png", check_visible=False)
|
|
assert any("not locally" in c for c in r.caveats)
|
|
|
|
def test_caveats_are_deduplicated(self):
|
|
r = identify(SAMPLES_DIR / "chatgpt-1.png", check_visible=False)
|
|
assert len(r.caveats) == len(set(r.caveats))
|
|
|
|
|
|
class TestOpenAiCaveatVendorScoped:
|
|
"""The OpenAI rollout caveat keys on the normalized SynthID vendor, not a raw
|
|
"OpenAI" substring over the issuer + verdict blob -- so a Google-SynthID
|
|
manifest with an incidental "OpenAI" byte elsewhere is not mislabeled, while
|
|
a genuine OpenAI manifest still gets the hedge.
|
|
"""
|
|
|
|
@staticmethod
|
|
def _png_chunk(ctype: bytes, data: bytes) -> bytes:
|
|
import struct
|
|
import zlib
|
|
|
|
return struct.pack(">I", len(data)) + ctype + data + struct.pack(">I", zlib.crc32(ctype + data) & 0xFFFFFFFF)
|
|
|
|
def _png(self, tmp_path: Path, name: str, *extra: bytes) -> Path:
|
|
import struct
|
|
import zlib
|
|
|
|
ihdr = struct.pack(">IIBBBBB", 1, 1, 8, 6, 0, 0, 0)
|
|
body = (
|
|
b"\x89PNG\r\n\x1a\n"
|
|
+ self._png_chunk(b"IHDR", ihdr)
|
|
+ self._png_chunk(b"IDAT", zlib.compress(b"\x00" * 6, 9))
|
|
+ b"".join(extra)
|
|
+ self._png_chunk(b"IEND", b"")
|
|
)
|
|
path = tmp_path / name
|
|
path.write_bytes(body)
|
|
return path
|
|
|
|
def test_google_synthid_with_incidental_openai_byte_no_caveat(self, tmp_path: Path):
|
|
# Google C2PA/SynthID manifest in caBX; the byte "OpenAI" lives in a
|
|
# separate tEXt chunk (e.g. a trust-chain note), not as a SynthID vendor.
|
|
png = self._png(
|
|
tmp_path,
|
|
"g.png",
|
|
self._png_chunk(b"caBX", b"jumbc2pa Google ... trainedAlgorithmicMedia"),
|
|
self._png_chunk(b"tEXt", b"note\x00signed via OpenAI trust chain"),
|
|
)
|
|
r = identify(png, check_visible=False, check_invisible=False)
|
|
assert any("SynthID watermark, inferred from C2PA metadata (likely present (Google" in w for w in r.watermarks)
|
|
assert not any("before the rollout" in c for c in r.caveats)
|
|
|
|
def test_openai_synthid_still_gets_caveat(self, tmp_path: Path):
|
|
png = self._png(tmp_path, "oa.png", self._png_chunk(b"caBX", b"jumbc2pa OpenAI ... trainedAlgorithmicMedia"))
|
|
r = identify(png, check_visible=False, check_invisible=False)
|
|
assert any("SynthID watermark, inferred from C2PA metadata (likely present (OpenAI" in w for w in r.watermarks)
|
|
assert any("before the rollout" in c for c in r.caveats)
|
|
|
|
|
|
class TestReportSerializable:
|
|
def test_report_is_json_serializable(self, tmp_png_with_ai_metadata: Path):
|
|
# The CLI --json path relies on asdict + json.dumps(default=str).
|
|
report = identify(tmp_png_with_ai_metadata, check_visible=False)
|
|
dumped = json.dumps(asdict(report), default=str)
|
|
assert "is_ai_generated" in dumped
|
|
|
|
|
|
class TestIdentifyExifGenerator:
|
|
"""An AI generator tag in EXIF/XMP (incl. AVIF) drives attribution."""
|
|
|
|
def test_avif_firefly_software_attributed(self, tmp_path: Path):
|
|
import piexif
|
|
from PIL import Image
|
|
|
|
exif = piexif.dump({"0th": {piexif.ImageIFD.Software: b"Adobe Firefly"}, "Exif": {}, "GPS": {}, "1st": {}})
|
|
path = tmp_path / "firefly.avif"
|
|
Image.new("RGB", (64, 64), (90, 80, 70)).save(path, exif=exif)
|
|
r = identify(path, check_visible=False)
|
|
assert r.is_ai_generated is True
|
|
assert r.platform is not None
|
|
assert "Firefly" in r.platform
|
|
assert any("generator tag" in w for w in r.watermarks)
|
|
|
|
|
|
class TestIdentifyXaiSignature:
|
|
"""xAI / Grok's EXIF Signature + UUID-Artist drives an xAI verdict."""
|
|
|
|
def test_grok_signature_attributed(self, tmp_path: Path):
|
|
import piexif
|
|
from PIL import Image
|
|
|
|
exif = piexif.dump(
|
|
{
|
|
"0th": {
|
|
piexif.ImageIFD.ImageDescription: b"Signature: " + b"A" * 120,
|
|
piexif.ImageIFD.Artist: b"12345678-1234-1234-1234-123456789abc",
|
|
},
|
|
"Exif": {},
|
|
"GPS": {},
|
|
"1st": {},
|
|
}
|
|
)
|
|
path = tmp_path / "grok.jpg"
|
|
Image.new("RGB", (64, 64), (70, 80, 90)).save(path, exif=exif)
|
|
r = identify(path, check_visible=False)
|
|
assert r.is_ai_generated is True
|
|
assert r.platform is not None
|
|
assert "xAI" in r.platform
|
|
assert any("xAI/Grok" in w for w in r.watermarks)
|
|
|
|
|
|
class TestIdentifySoftBinding:
|
|
"""A C2PA soft-binding alg names a forensic-watermark vendor in the inventory."""
|
|
|
|
def test_soft_binding_vendor_listed(self, tmp_path: Path):
|
|
p = tmp_path / "sb.jpg"
|
|
p.write_bytes(b"\xff\xd8\xff\xe1 c2pa jumb com.digimarc.validate.1 \xff\xd9")
|
|
r = identify(p, check_visible=False, check_invisible=False)
|
|
assert any("Digimarc" in w for w in r.watermarks)
|
|
assert any(s.name == "soft_binding" for s in r.signals)
|
|
|
|
|
|
class TestIdentifyIptcAi:
|
|
"""IPTC 2025.1 AISystemUsed drives an AI verdict + platform attribution."""
|
|
|
|
def test_iptc_ai_system_attributed(self, tmp_path: Path):
|
|
p = tmp_path / "iptc.jpg"
|
|
p.write_bytes(
|
|
b"\xff\xd8\xff\xe1<x:xmpmeta><Iptc4xmpExt:AISystemUsed>Google Gemini"
|
|
b"</Iptc4xmpExt:AISystemUsed></x:xmpmeta>\xff\xd9"
|
|
)
|
|
r = identify(p, check_visible=False, check_invisible=False)
|
|
assert r.is_ai_generated is True
|
|
assert r.platform is not None
|
|
assert "Gemini" in r.platform
|
|
|
|
|
|
class TestIdentifyC2paDevice:
|
|
"""A distinctive C2PA device token wins platform attribution over incidental
|
|
issuer-name mentions (regression guard for real-sample mis-attribution:
|
|
Leica->Truepic, Nikon->Adobe, Pixel->Google Gemini)."""
|
|
|
|
def test_leica_token_beats_incidental_tokens(self, tmp_path: Path):
|
|
# "Adobe"/"Google"/"Truepic" appear incidentally; Leica's lc_c2pa wins.
|
|
blob = b"\xff\xd8\xff\xe1 c2pa.claim jumbf Adobe Google Truepic lc_c2pa \xff\xd9"
|
|
p = tmp_path / "leica_like.jpg"
|
|
p.write_bytes(blob)
|
|
r = identify(p, check_visible=False, check_invisible=False)
|
|
assert r.platform == "Leica (camera, C2PA capture)"
|
|
|
|
def test_pixel_camera_cert_beats_incidental_google(self, tmp_path: Path):
|
|
# Pixel's cert CN is "Pixel Camera"; "Google LLC" appears as the cert org
|
|
# but must NOT yield "Google (Gemini / Imagen)" -- it is a camera capture.
|
|
blob = b"\xff\xd8\xff\xe1 c2pa.claim jumbf Google LLC Adobe Pixel Camera \xff\xd9"
|
|
p = tmp_path / "pixel_like.jpg"
|
|
p.write_bytes(blob)
|
|
r = identify(p, check_visible=False, check_invisible=False)
|
|
assert r.platform == "Google Pixel (camera, C2PA capture)"
|
|
assert r.is_ai_generated is None # camera capture, not AI
|
|
|
|
def test_sony_namespace_beats_bare_make(self, tmp_path: Path):
|
|
# Sony's own C2PA assertion namespace (sony.sig), not the bare "Sony"
|
|
# EXIF Make that appears on ordinary photos.
|
|
blob = b"\xff\xd8\xff\xe1 c2pa.claim jumbf Adobe Sony sony.sig.v1_1 \xff\xd9"
|
|
p = tmp_path / "sony_like.jpg"
|
|
p.write_bytes(blob)
|
|
r = identify(p, check_visible=False, check_invisible=False)
|
|
assert r.platform == "Sony (camera, C2PA capture)"
|
|
|
|
def test_unmapped_device_not_mislabeled_via_incidental_issuer(self, tmp_path: Path):
|
|
# An unmapped camera (Canon) whose manifest incidentally contains the
|
|
# "Adobe" XMP-toolkit string, with NO AI source type, must NOT be labeled
|
|
# "Adobe Firefly". The issuer->generator mapping only applies to AI content.
|
|
blob = b"\xff\xd8\xff\xe1 c2pa.claim jumbf Canon EOS Adobe XMP Core \xff\xd9"
|
|
p = tmp_path / "canon_like.jpg"
|
|
p.write_bytes(blob)
|
|
r = identify(p, check_visible=False, check_invisible=False)
|
|
assert r.is_ai_generated is None # camera capture, not AI
|
|
assert r.platform is not None
|
|
assert "Firefly" not in r.platform # not mislabeled as an AI generator
|
|
|
|
|
|
# ── Open invisible watermark (SD/SDXL/FLUX) integration ─────────────
|
|
|
|
from remove_ai_watermarks.invisible_watermark import is_available as _wm_available # noqa: E402
|
|
|
|
|
|
@pytest.mark.skipif(not _wm_available(), reason="invisible-watermark not installed")
|
|
class TestIdentifyInvisibleWatermark:
|
|
def _sdxl_watermarked(self, tmp_path: Path) -> Path:
|
|
import cv2
|
|
import numpy as np
|
|
from imwatermark import WatermarkEncoder
|
|
|
|
from remove_ai_watermarks.invisible_watermark import _BITS_48
|
|
|
|
bits = [int(b) for b in format(_BITS_48["Stable Diffusion XL"], "048b")]
|
|
enc = WatermarkEncoder()
|
|
enc.set_watermark("bits", bits)
|
|
img = np.random.default_rng(0).integers(0, 255, (512, 512, 3), dtype=np.uint8)
|
|
path = tmp_path / "sdxl.png"
|
|
cv2.imwrite(str(path), enc.encode(img, "dwtDct"))
|
|
return path
|
|
|
|
def test_sdxl_watermark_identified(self, tmp_path: Path):
|
|
r = identify(self._sdxl_watermarked(tmp_path), check_visible=False)
|
|
assert r.is_ai_generated is True
|
|
assert r.confidence == "high"
|
|
assert r.platform is not None
|
|
assert "Stable Diffusion XL" in r.platform
|
|
assert any("invisible watermark" in w.lower() for w in r.watermarks)
|
|
|
|
def test_check_invisible_false_skips(self, tmp_path: Path):
|
|
r = identify(self._sdxl_watermarked(tmp_path), check_visible=False, check_invisible=False)
|
|
assert not any(s.name == "invisible_watermark" for s in r.signals)
|
|
|
|
|
|
class TestIdentifyAIGC:
|
|
"""China TC260 AIGC label is detected and attributed (e.g. Doubao)."""
|
|
|
|
def _aigc_png(self, tmp_path: Path) -> Path:
|
|
from PIL import Image
|
|
|
|
p = tmp_path / "doubao.png"
|
|
Image.new("RGB", (32, 32)).save(p)
|
|
xmp = (
|
|
'<x:xmpmeta xmlns:x="adobe:ns:meta/"><rdf:RDF '
|
|
'xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#">'
|
|
'<rdf:Description xmlns:TC260="http://www.tc260.org.cn/ns/AIGC/1.0/">'
|
|
"<TC260:AIGC>{"Label":"1","ContentProducer":"BYTEDANCE001"}"
|
|
"</TC260:AIGC></rdf:Description></rdf:RDF></x:xmpmeta>"
|
|
)
|
|
with open(p, "ab") as f:
|
|
f.write(xmp.encode())
|
|
return p
|
|
|
|
def test_aigc_detected(self, tmp_path: Path):
|
|
r = identify(self._aigc_png(tmp_path), check_visible=False)
|
|
assert r.is_ai_generated is True
|
|
assert r.platform is not None
|
|
assert "AIGC" in r.platform or "TC260" in r.platform
|
|
assert any("AIGC" in w for w in r.watermarks)
|
|
|
|
def test_aigc_signal_carries_producer(self, tmp_path: Path):
|
|
r = identify(self._aigc_png(tmp_path), check_visible=False)
|
|
sig = next(s for s in r.signals if s.name == "aigc")
|
|
assert "BYTEDANCE001" in sig.detail
|
|
|
|
|
|
# ── Integrity clashes (contradictions between independent signals) ──────
|
|
|
|
|
|
class TestVendorOf:
|
|
def test_openai_variants(self):
|
|
assert _vendor_of("OpenAI (ChatGPT / gpt-image / DALL-E / Sora)") == "OpenAI"
|
|
assert _vendor_of("DALL-E 3") == "OpenAI"
|
|
|
|
def test_google_variants(self):
|
|
assert _vendor_of("Google (Gemini / Imagen)") == "Google"
|
|
assert _vendor_of("Imagen 3") == "Google"
|
|
|
|
def test_other_vendors(self):
|
|
assert _vendor_of("Ideogram AI") == "Ideogram"
|
|
assert _vendor_of("Adobe Firefly") == "Adobe"
|
|
assert _vendor_of("Stability AI (Stable Image)") == "Stability AI"
|
|
|
|
def test_camera_label_is_not_an_ai_vendor(self):
|
|
# Camera platform labels must NOT normalize to an AI vendor, or a camera
|
|
# capture would be mistaken for AI-generation in clash detection.
|
|
assert _vendor_of("Leica (camera, C2PA capture)") is None
|
|
|
|
def test_unknown_is_none(self):
|
|
assert _vendor_of("a regular photo") is None
|
|
assert _vendor_of(None) is None
|
|
|
|
|
|
class TestIntegrityClashesHelper:
|
|
def test_two_ai_vendors_clash(self):
|
|
clashes = _integrity_clashes({"c2pa": "OpenAI", "exif_generator": "Ideogram"}, None, camera_has_ai_marker=True)
|
|
assert len(clashes) == 1
|
|
assert "OpenAI" in clashes[0]
|
|
assert "Ideogram" in clashes[0]
|
|
|
|
def test_same_vendor_two_signals_no_clash(self):
|
|
# C2PA Google + SynthID-Google proxy is consistent, not a contradiction.
|
|
assert _integrity_clashes({"c2pa": "Google", "synthid": "Google"}, None, camera_has_ai_marker=True) == []
|
|
|
|
def test_multi_actor_manifest_no_clash(self):
|
|
# A multi-actor C2PA manifest names a product + the engine it wraps in ONE
|
|
# valid chain (Microsoft Designer on OpenAI, Microsoft on Google, Adobe over
|
|
# a Gemini original). The c2pa issuer attribution and the SynthID proxy share
|
|
# the same manifest source, so the differing vendors must NOT read as a clash.
|
|
for c2pa_vendor, synthid_vendor in (("Microsoft", "OpenAI"), ("Microsoft", "Google"), ("Adobe", "Google")):
|
|
assert (
|
|
_integrity_clashes({"c2pa": c2pa_vendor, "synthid": synthid_vendor}, None, camera_has_ai_marker=True)
|
|
== []
|
|
)
|
|
|
|
def test_manifest_vendor_vs_independent_signal_clashes(self):
|
|
# A vendor named only inside the manifest still clashes with a genuinely
|
|
# independent stamp (here an EXIF/XMP generator tag) naming a third vendor.
|
|
clashes = _integrity_clashes(
|
|
{"c2pa": "Microsoft", "synthid": "Google", "exif_generator": "Ideogram"},
|
|
None,
|
|
camera_has_ai_marker=True,
|
|
)
|
|
assert len(clashes) == 1
|
|
assert "Ideogram" in clashes[0]
|
|
|
|
def test_single_vendor_no_clash(self):
|
|
assert _integrity_clashes({"c2pa": "OpenAI"}, None, camera_has_ai_marker=True) == []
|
|
|
|
def test_empty_no_clash(self):
|
|
assert _integrity_clashes({}, None, camera_has_ai_marker=False) == []
|
|
|
|
def test_camera_plus_ai_marker_clashes(self):
|
|
clashes = _integrity_clashes(
|
|
{"exif_generator": "Ideogram"},
|
|
"Google Pixel (camera, C2PA capture)",
|
|
camera_has_ai_marker=True,
|
|
)
|
|
assert any("Camera-capture" in c and "Pixel" in c for c in clashes)
|
|
|
|
def test_camera_without_ai_marker_no_clash(self):
|
|
# A clean camera capture (the normal case for our Pixel/Leica/Sony files)
|
|
# must NOT raise a clash.
|
|
assert _integrity_clashes({}, "Leica (camera, C2PA capture)", camera_has_ai_marker=False) == []
|
|
|
|
def test_pixel_generative_edit_same_manifest_no_clash(self):
|
|
# A Google Pixel that BOTH captures and runs on-device generative AI
|
|
# (Magic Editor / Pixel Studio) records the capture and the AI edit in
|
|
# ONE C2PA manifest -- the AI vendor is named only from that same
|
|
# manifest (c2pa / synthid), independent of nothing. That is a legitimate
|
|
# edit chain, NOT a camera-vs-AI contradiction, so rule 2 must stay quiet.
|
|
assert (
|
|
_integrity_clashes(
|
|
{"c2pa": "Google", "synthid": "Google"},
|
|
"Google Pixel (camera, C2PA capture)",
|
|
camera_has_ai_marker=True,
|
|
)
|
|
== []
|
|
)
|
|
|
|
def test_camera_plus_independent_ai_marker_still_clashes(self):
|
|
# But a camera capture next to an AI marker from a genuinely INDEPENDENT
|
|
# source (EXIF/XMP generator, TC260 AIGC, ...) is still a laundering tell.
|
|
clashes = _integrity_clashes(
|
|
{"c2pa": "Google", "aigc": "China AIGC (TC260)"},
|
|
"Google Pixel (camera, C2PA capture)",
|
|
camera_has_ai_marker=True,
|
|
)
|
|
assert any("Camera-capture" in c for c in clashes)
|
|
|
|
|
|
class TestIntegrityClashEndToEnd:
|
|
def _c2pa_jpeg(self, tmp_path: Path, blob: bytes) -> Path:
|
|
path = tmp_path / "img.jpg"
|
|
path.write_bytes(b"\xff\xd8\xff\xe1jumbc2pa" + blob + b"\xff\xd9")
|
|
return path
|
|
|
|
def test_two_generator_stamps_clash(self, tmp_path: Path):
|
|
# An OpenAI C2PA manifest (AI source) on an image that ALSO carries a
|
|
# China TC260 AIGC label = two independent generator stamps naming
|
|
# different origins -> a laundering tell.
|
|
path = self._c2pa_jpeg(tmp_path, b"OpenAI ... trainedAlgorithmicMedia ... TC260:AIGC label")
|
|
r = identify(path, check_visible=False, check_invisible=False)
|
|
assert r.integrity_clashes
|
|
assert any("Conflicting AI-origin" in c for c in r.integrity_clashes)
|
|
|
|
def test_single_stamp_no_clash(self, tmp_path: Path):
|
|
path = self._c2pa_jpeg(tmp_path, b"OpenAI ... trainedAlgorithmicMedia")
|
|
r = identify(path, check_visible=False, check_invisible=False)
|
|
assert r.integrity_clashes == []
|
|
|
|
def test_camera_device_plus_ai_marker_clash(self, tmp_path: Path):
|
|
# Integrity-clash rule #2: a camera-capture C2PA device token (Pixel
|
|
# Camera) coexisting with an independent AI-generation marker (a China
|
|
# TC260 AIGC label) -- a genuine camera capture is not AI-generated, so
|
|
# the provenance is inconsistent (a laundering / spoofing tell).
|
|
path = self._c2pa_jpeg(
|
|
tmp_path,
|
|
b'Pixel Camera ... <TC260:AIGC>{"Label":"1","ContentProducer":"BYTEDANCE001"}</TC260:AIGC>',
|
|
)
|
|
r = identify(path, check_visible=False, check_invisible=False)
|
|
assert r.platform == "Google Pixel (camera, C2PA capture)"
|
|
assert any("Camera-capture C2PA credentials" in c and "AI-generation markers" in c for c in r.integrity_clashes)
|
|
|
|
def test_pixel_generative_edit_no_clash(self, tmp_path: Path):
|
|
# A real Google Pixel generative edit (Magic Editor / Pixel Studio) signs
|
|
# ONE manifest carrying both the Pixel Camera capture and a Google
|
|
# Generative AI edit (trainedAlgorithmicMedia + "Applied imperceptible
|
|
# SynthID watermark"). The AI marker lives in the SAME manifest as the
|
|
# device, so it is an edit chain, not a camera-vs-AI contradiction.
|
|
path = self._c2pa_jpeg(
|
|
tmp_path,
|
|
b"Pixel Camera ... Created by Pixel Camera ... computationalCapture ... "
|
|
b"Created by Google Generative AI ... trainedAlgorithmicMedia ... "
|
|
b"Applied imperceptible SynthID watermark",
|
|
)
|
|
r = identify(path, check_visible=False, check_invisible=False)
|
|
assert r.is_ai_generated is True
|
|
assert r.integrity_clashes == []
|
|
|
|
def test_clash_serializes_to_json(self, tmp_path: Path):
|
|
path = self._c2pa_jpeg(tmp_path, b"OpenAI ... trainedAlgorithmicMedia ... TC260:AIGC label")
|
|
r = identify(path, check_visible=False, check_invisible=False)
|
|
payload = json.loads(json.dumps(asdict(r), default=str))
|
|
assert payload["integrity_clashes"] == r.integrity_clashes
|
|
|
|
|
|
@pytest.mark.skipif(not SAMPLES_DIR.exists(), reason="data/samples not present")
|
|
@pytest.mark.parametrize("fixture", ["chatgpt-1.png", "firefly-1.png", "doubao-1.png", "grok-1.jpg", "mj-1.png"])
|
|
class TestRealSamplesHaveNoClash:
|
|
"""Every real single-origin fixture must report zero clashes (false-positive guard)."""
|
|
|
|
def test_no_false_positive_clash(self, fixture: str):
|
|
path = SAMPLES_DIR / fixture
|
|
if not path.exists():
|
|
pytest.skip(f"{fixture} not present")
|
|
r = identify(path, check_visible=False, check_invisible=False)
|
|
assert r.integrity_clashes == []
|