Merge pull request #23 from mrbeandev/improve-detection-accuracy

Fix detection: empirically verified carrier frequencies
This commit is contained in:
Alosh Denny
2026-04-12 19:30:20 +05:30
committed by GitHub
2 changed files with 160 additions and 113 deletions
Binary file not shown.
+160 -113
View File
@@ -70,23 +70,33 @@ class RobustSynthIDExtractor:
self.n_carriers = n_carriers
self.codebook = None
# SynthID carriers from 288 Gemini ref images (48,88 grid at 512px)
self.known_carriers = [
(48, 0), (-48, 0),
(96, 0), (-96, 0),
(192, 0), (-192, 0),
(210, 0), (-210, 0),
(238, 0), (-238, 0),
(0, 88), (0, -88),
(0, 176), (0, -176),
(0, 192), (0, -192),
(48, 88), (-48, -88),
(48, -88), (-48, 88),
(96, 88), (-96, -88),
(96, -88), (-96, 88),
(96, 176), (-96, -176),
(96, -176), (-96, 176),
# Empirically verified SynthID carriers at 512px resolution.
# Extracted from 291 Gemini-generated images (black, white, nb_pro).
# Each carrier set has >0.95 intra-set phase coherence and >0.5
# discriminative gap vs non-watermarked images.
#
# Dark-image carriers (black + nb_pro, diagonal grid):
self.carriers_dark = [
(-5, -3), (5, 3), (-5, 3), (5, -3),
(-3, -4), (3, 4), (-3, 4), (3, -4),
(-4, -3), (4, 3), (-4, 3), (4, -3),
(-5, -1), (5, 1), (-5, 1), (5, -1),
(-5, -2), (5, 2), (-5, 2), (5, -2),
(-2, -5), (2, 5), (-2, 5), (2, -5),
(-1, -5), (1, 5), (-1, 5), (1, -5),
(-4, -4), (4, 4), (-4, 4), (4, -4),
(-1, -6), (1, 6), (-3, -5), (3, 5),
]
# White-image carriers (horizontal axis):
self.carriers_white = [
(0, -7), (0, 7), (0, -8), (0, 8),
(0, -9), (0, 9), (0, -10), (0, 10),
(0, -11), (0, 11), (0, -12), (0, 12),
(0, -20), (0, 20), (0, -21), (0, 21),
(0, -22), (0, 22), (0, -23), (0, 23),
]
# Union for backward compat
self.known_carriers = self.carriers_dark + self.carriers_white
if codebook_path and os.path.exists(codebook_path):
self.load_codebook(codebook_path)
@@ -642,118 +652,155 @@ class RobustSynthIDExtractor:
return self.detect_array(img)
def detect_array(self, image: np.ndarray) -> DetectionResult:
"""Detect SynthID watermark in a numpy array image."""
"""Detect SynthID watermark in a numpy array image.
Uses empirically-verified carrier frequencies with per-set reference
phases. The watermark is content-adaptive: dark images use diagonal-
grid carriers; white/bright images use horizontal-axis carriers.
Detection checks phase agreement at both carrier sets against known
reference phases and takes the best match. Calibrated on 291 Gemini
watermarked + 16 non-watermarked images:
Watermarked: phase match 0.920.99
Non-watermarked: phase match 0.470.53 (max 0.71)
"""
if self.codebook is None:
raise ValueError("No codebook loaded. Call extract_codebook() or load_codebook() first.")
target_size = self.codebook['image_size']
img_resized = cv2.resize(image, (target_size, target_size))
# Extract noise pattern
noise = self.extract_noise_fused(img_resized)
# Method 1: Correlation with reference noise
ref_noise = self.codebook['reference_noise']
correlation = float(np.corrcoef(noise.ravel(), ref_noise.ravel())[0, 1])
# Method 2: Carrier frequency analysis using known carriers + extracted carriers
gray = np.mean(img_resized, axis=2) if len(img_resized.shape) == 3 else img_resized
gray = gray.astype(np.float32)
f = fftshift(fft2(gray))
magnitude = np.abs(f)
phase = np.angle(f)
center = target_size // 2
carrier_scores = []
carrier_strengths = []
# Use extracted carriers if available, otherwise use known carriers
carriers_to_check = self.codebook['carriers'][:30] if self.codebook['carriers'] else []
# Always also check known carriers for reliability
known_carrier_dicts = [{'frequency': freq, 'phase': 0} for freq in self.codebook.get('known_carriers', self.known_carriers)]
carriers_to_check = carriers_to_check + known_carrier_dicts
# Use reference phase from codebook if available
ref_phase = self.codebook.get('reference_phase')
for carrier in carriers_to_check:
freq = carrier['frequency']
y = freq[0] + center
x = freq[1] + center
if 0 <= y < target_size and 0 <= x < target_size:
actual_phase = phase[y, x]
# Get expected phase from codebook reference if available
if ref_phase is not None:
expected_phase = ref_phase[y, x]
else:
expected_phase = carrier.get('phase', 0)
# Phase match (accounting for wrap-around)
phase_diff = np.abs(np.angle(np.exp(1j * (actual_phase - expected_phase))))
phase_match = 1 - phase_diff / np.pi
carrier_scores.append(phase_match)
# Carrier strength
carrier_strengths.append(magnitude[y, x])
avg_phase_match = float(np.mean(carrier_scores)) if carrier_scores else 0
avg_carrier_strength = float(np.mean(carrier_strengths)) if carrier_strengths else 0
# Method 3: Noise structure ratio
# ------------------------------------------------------------------
# Image-domain FFT (grayscale)
# ------------------------------------------------------------------
gray = np.mean(img_resized, axis=2) if len(img_resized.shape) == 3 else img_resized
f_img = fftshift(fft2(gray.astype(np.float32)))
img_phase = np.angle(f_img)
img_mag = np.abs(f_img)
# ------------------------------------------------------------------
# Phase matching against per-set reference phases
# Try both dark and white carrier sets; take best match.
# ------------------------------------------------------------------
carrier_refs = self.codebook.get('carrier_refs', {})
set_results = {}
for set_name, carriers_attr, ref_key in [
('dark', 'carriers_dark', 'dark_ref_phases'),
('white', 'carriers_white', 'white_ref_phases'),
]:
carriers = getattr(self, carriers_attr, [])
ref_phases = carrier_refs.get(ref_key)
if ref_phases is None or len(carriers) == 0:
continue
phase_matches = []
for i, (fy, fx) in enumerate(carriers):
y, x = fy + center, fx + center
if 0 <= y < target_size and 0 <= x < target_size and i < len(ref_phases):
diff = np.abs(np.angle(np.exp(1j * (img_phase[y, x] - ref_phases[i]))))
phase_matches.append(1 - diff / np.pi)
if phase_matches:
set_results[set_name] = {
'phase_match': float(np.mean(phase_matches)),
'phase_std': float(np.std(phase_matches)),
'n_carriers': len(phase_matches),
}
# Best phase match across carrier sets
best_set = max(set_results, key=lambda k: set_results[k]['phase_match']) if set_results else 'dark'
best_phase_match = set_results[best_set]['phase_match'] if set_results else 0.0
# Also compute average across both sets for reporting
all_matches = []
for sr in set_results.values():
all_matches.append(sr['phase_match'])
avg_phase_match = float(np.mean(all_matches)) if all_matches else 0.0
# ------------------------------------------------------------------
# Noise-domain carrier-vs-random ratio (supporting signal)
# ------------------------------------------------------------------
ref_noise = self.codebook['reference_noise']
noise = self.extract_noise_fused(img_resized)
noise_gray = np.mean(noise, axis=2) if len(noise.shape) == 3 else noise
f_noise = fftshift(fft2(noise_gray))
noise_mag = np.abs(f_noise)
all_carriers = self.carriers_dark + self.carriers_white
carrier_mags = [
noise_mag[fy + center, fx + center]
for fy, fx in all_carriers
if 0 <= fy + center < target_size and 0 <= fx + center < target_size
]
rng = np.random.RandomState(42)
random_mags = []
for _ in range(len(all_carriers) * 4):
ry, rx = rng.randint(10, target_size - 10), rng.randint(10, target_size - 10)
if abs(ry - center) < 5 and abs(rx - center) < 5:
continue
random_mags.append(noise_mag[ry, rx])
cvr_noise = float(np.mean(carrier_mags)) / (float(np.mean(random_mags)) + 1e-10)
# ------------------------------------------------------------------
# Legacy metrics (for reporting / backward compat)
# ------------------------------------------------------------------
structure_ratio = float(np.std(noise_gray) / (np.mean(np.abs(noise_gray)) + 1e-10))
# Method 4: Multi-scale consistency
scale_scores = []
for scale in self.scales:
img_scaled = cv2.resize(image, (scale, scale))
noise_scaled = self.extract_noise_single(img_scaled, 'wavelet')
ref_scaled = cv2.resize(ref_noise, (scale, scale))
corr = np.corrcoef(noise_scaled.ravel(), ref_scaled.ravel())[0, 1]
scale_scores.append(corr)
multi_scale_consistency = float(np.std(scale_scores)) # Lower is more consistent
# Detection decision
threshold = self.codebook['detection_threshold']
is_watermarked = (
correlation > threshold and
avg_phase_match > 0.45 and
0.7 < structure_ratio < 2.0
)
# Confidence score (Bayesian combination)
corr_score = max(0, (correlation - threshold) / (self.codebook['correlation_mean'] - threshold + 1e-10))
phase_score = avg_phase_match
structure_score = max(0, 1 - abs(structure_ratio - 1.32) / 0.6)
consistency_score = max(0, 1 - multi_scale_consistency * 5)
confidence = min(1.0, (
0.35 * corr_score +
0.35 * phase_score +
0.15 * structure_score +
0.15 * consistency_score
))
correlation = float(np.corrcoef(noise.ravel(), ref_noise.ravel())[0, 1])
carrier_mags_img = [
img_mag[fy + center, fx + center]
for fy, fx in all_carriers
if 0 <= fy + center < target_size and 0 <= fx + center < target_size
]
avg_carrier_strength = float(np.mean(carrier_mags_img)) if carrier_mags_img else 0.0
multi_scale_consistency = 0.0 # skip expensive computation; phase match is decisive
# ==================================================================
# SCORING
#
# Primary: best_phase_match — phase agreement at carrier frequencies
# vs per-set reference. Calibrated gap:
# WM: 0.92-0.99 (black/nbpro 0.99, white 0.92)
# non-WM: 0.47-0.53 (max observed 0.71)
#
# Threshold at 0.80: well above non-WM max (0.71) with margin,
# well below WM min (0.92 avg for white).
#
# Supporting: cvr_noise adds confidence for dark images.
# ==================================================================
# Phase score: sigmoid centered at 0.78 (between 0.71 max non-WM and 0.92 min WM)
phase_score = float(1.0 / (1.0 + np.exp(-20.0 * (best_phase_match - 0.78))))
# CVR noise score: supporting signal
cvr_score = float(1.0 / (1.0 + np.exp(-2.0 * (cvr_noise - 2.0))))
# Combined confidence — phase-dominant
confidence = float(min(1.0, 0.80 * phase_score + 0.20 * cvr_score))
is_watermarked = confidence > 0.50
return DetectionResult(
is_watermarked=bool(is_watermarked),
confidence=float(confidence),
confidence=confidence,
correlation=correlation,
phase_match=avg_phase_match,
phase_match=best_phase_match,
structure_ratio=structure_ratio,
carrier_strength=avg_carrier_strength,
multi_scale_consistency=multi_scale_consistency,
details={
'threshold': threshold,
'corr_score': corr_score,
'best_set': best_set,
'best_phase_match': best_phase_match,
'set_results': set_results,
'cvr_noise': cvr_noise,
'phase_score': phase_score,
'structure_score': structure_score,
'consistency_score': consistency_score,
'scale_correlations': scale_scores
'cvr_score': cvr_score,
}
)