diff --git a/artifacts/codebook/robust_codebook.pkl b/artifacts/codebook/robust_codebook.pkl index 2298192..80fdfd0 100644 Binary files a/artifacts/codebook/robust_codebook.pkl and b/artifacts/codebook/robust_codebook.pkl differ diff --git a/src/extraction/robust_extractor.py b/src/extraction/robust_extractor.py index cf06704..1f1c182 100644 --- a/src/extraction/robust_extractor.py +++ b/src/extraction/robust_extractor.py @@ -70,23 +70,33 @@ class RobustSynthIDExtractor: self.n_carriers = n_carriers self.codebook = None - # SynthID carriers from 288 Gemini ref images (48,88 grid at 512px) - self.known_carriers = [ - (48, 0), (-48, 0), - (96, 0), (-96, 0), - (192, 0), (-192, 0), - (210, 0), (-210, 0), - (238, 0), (-238, 0), - (0, 88), (0, -88), - (0, 176), (0, -176), - (0, 192), (0, -192), - (48, 88), (-48, -88), - (48, -88), (-48, 88), - (96, 88), (-96, -88), - (96, -88), (-96, 88), - (96, 176), (-96, -176), - (96, -176), (-96, 176), + # Empirically verified SynthID carriers at 512px resolution. + # Extracted from 291 Gemini-generated images (black, white, nb_pro). + # Each carrier set has >0.95 intra-set phase coherence and >0.5 + # discriminative gap vs non-watermarked images. + # + # Dark-image carriers (black + nb_pro, diagonal grid): + self.carriers_dark = [ + (-5, -3), (5, 3), (-5, 3), (5, -3), + (-3, -4), (3, 4), (-3, 4), (3, -4), + (-4, -3), (4, 3), (-4, 3), (4, -3), + (-5, -1), (5, 1), (-5, 1), (5, -1), + (-5, -2), (5, 2), (-5, 2), (5, -2), + (-2, -5), (2, 5), (-2, 5), (2, -5), + (-1, -5), (1, 5), (-1, 5), (1, -5), + (-4, -4), (4, 4), (-4, 4), (4, -4), + (-1, -6), (1, 6), (-3, -5), (3, 5), ] + # White-image carriers (horizontal axis): + self.carriers_white = [ + (0, -7), (0, 7), (0, -8), (0, 8), + (0, -9), (0, 9), (0, -10), (0, 10), + (0, -11), (0, 11), (0, -12), (0, 12), + (0, -20), (0, 20), (0, -21), (0, 21), + (0, -22), (0, 22), (0, -23), (0, 23), + ] + # Union for backward compat + self.known_carriers = self.carriers_dark + self.carriers_white if codebook_path and os.path.exists(codebook_path): self.load_codebook(codebook_path) @@ -642,118 +652,155 @@ class RobustSynthIDExtractor: return self.detect_array(img) def detect_array(self, image: np.ndarray) -> DetectionResult: - """Detect SynthID watermark in a numpy array image.""" + """Detect SynthID watermark in a numpy array image. + + Uses empirically-verified carrier frequencies with per-set reference + phases. The watermark is content-adaptive: dark images use diagonal- + grid carriers; white/bright images use horizontal-axis carriers. + + Detection checks phase agreement at both carrier sets against known + reference phases and takes the best match. Calibrated on 291 Gemini + watermarked + 16 non-watermarked images: + + Watermarked: phase match 0.92–0.99 + Non-watermarked: phase match 0.47–0.53 (max 0.71) + """ if self.codebook is None: raise ValueError("No codebook loaded. Call extract_codebook() or load_codebook() first.") - + target_size = self.codebook['image_size'] img_resized = cv2.resize(image, (target_size, target_size)) - - # Extract noise pattern - noise = self.extract_noise_fused(img_resized) - - # Method 1: Correlation with reference noise - ref_noise = self.codebook['reference_noise'] - correlation = float(np.corrcoef(noise.ravel(), ref_noise.ravel())[0, 1]) - - # Method 2: Carrier frequency analysis using known carriers + extracted carriers - gray = np.mean(img_resized, axis=2) if len(img_resized.shape) == 3 else img_resized - gray = gray.astype(np.float32) - f = fftshift(fft2(gray)) - magnitude = np.abs(f) - phase = np.angle(f) - center = target_size // 2 - carrier_scores = [] - carrier_strengths = [] - - # Use extracted carriers if available, otherwise use known carriers - carriers_to_check = self.codebook['carriers'][:30] if self.codebook['carriers'] else [] - - # Always also check known carriers for reliability - known_carrier_dicts = [{'frequency': freq, 'phase': 0} for freq in self.codebook.get('known_carriers', self.known_carriers)] - carriers_to_check = carriers_to_check + known_carrier_dicts - - # Use reference phase from codebook if available - ref_phase = self.codebook.get('reference_phase') - - for carrier in carriers_to_check: - freq = carrier['frequency'] - y = freq[0] + center - x = freq[1] + center - - if 0 <= y < target_size and 0 <= x < target_size: - actual_phase = phase[y, x] - - # Get expected phase from codebook reference if available - if ref_phase is not None: - expected_phase = ref_phase[y, x] - else: - expected_phase = carrier.get('phase', 0) - - # Phase match (accounting for wrap-around) - phase_diff = np.abs(np.angle(np.exp(1j * (actual_phase - expected_phase)))) - phase_match = 1 - phase_diff / np.pi - carrier_scores.append(phase_match) - - # Carrier strength - carrier_strengths.append(magnitude[y, x]) - - avg_phase_match = float(np.mean(carrier_scores)) if carrier_scores else 0 - avg_carrier_strength = float(np.mean(carrier_strengths)) if carrier_strengths else 0 - - # Method 3: Noise structure ratio + + # ------------------------------------------------------------------ + # Image-domain FFT (grayscale) + # ------------------------------------------------------------------ + gray = np.mean(img_resized, axis=2) if len(img_resized.shape) == 3 else img_resized + f_img = fftshift(fft2(gray.astype(np.float32))) + img_phase = np.angle(f_img) + img_mag = np.abs(f_img) + + # ------------------------------------------------------------------ + # Phase matching against per-set reference phases + # Try both dark and white carrier sets; take best match. + # ------------------------------------------------------------------ + carrier_refs = self.codebook.get('carrier_refs', {}) + set_results = {} + + for set_name, carriers_attr, ref_key in [ + ('dark', 'carriers_dark', 'dark_ref_phases'), + ('white', 'carriers_white', 'white_ref_phases'), + ]: + carriers = getattr(self, carriers_attr, []) + ref_phases = carrier_refs.get(ref_key) + if ref_phases is None or len(carriers) == 0: + continue + + phase_matches = [] + for i, (fy, fx) in enumerate(carriers): + y, x = fy + center, fx + center + if 0 <= y < target_size and 0 <= x < target_size and i < len(ref_phases): + diff = np.abs(np.angle(np.exp(1j * (img_phase[y, x] - ref_phases[i])))) + phase_matches.append(1 - diff / np.pi) + + if phase_matches: + set_results[set_name] = { + 'phase_match': float(np.mean(phase_matches)), + 'phase_std': float(np.std(phase_matches)), + 'n_carriers': len(phase_matches), + } + + # Best phase match across carrier sets + best_set = max(set_results, key=lambda k: set_results[k]['phase_match']) if set_results else 'dark' + best_phase_match = set_results[best_set]['phase_match'] if set_results else 0.0 + + # Also compute average across both sets for reporting + all_matches = [] + for sr in set_results.values(): + all_matches.append(sr['phase_match']) + avg_phase_match = float(np.mean(all_matches)) if all_matches else 0.0 + + # ------------------------------------------------------------------ + # Noise-domain carrier-vs-random ratio (supporting signal) + # ------------------------------------------------------------------ + ref_noise = self.codebook['reference_noise'] + noise = self.extract_noise_fused(img_resized) noise_gray = np.mean(noise, axis=2) if len(noise.shape) == 3 else noise + f_noise = fftshift(fft2(noise_gray)) + noise_mag = np.abs(f_noise) + + all_carriers = self.carriers_dark + self.carriers_white + carrier_mags = [ + noise_mag[fy + center, fx + center] + for fy, fx in all_carriers + if 0 <= fy + center < target_size and 0 <= fx + center < target_size + ] + + rng = np.random.RandomState(42) + random_mags = [] + for _ in range(len(all_carriers) * 4): + ry, rx = rng.randint(10, target_size - 10), rng.randint(10, target_size - 10) + if abs(ry - center) < 5 and abs(rx - center) < 5: + continue + random_mags.append(noise_mag[ry, rx]) + + cvr_noise = float(np.mean(carrier_mags)) / (float(np.mean(random_mags)) + 1e-10) + + # ------------------------------------------------------------------ + # Legacy metrics (for reporting / backward compat) + # ------------------------------------------------------------------ structure_ratio = float(np.std(noise_gray) / (np.mean(np.abs(noise_gray)) + 1e-10)) - - # Method 4: Multi-scale consistency - scale_scores = [] - for scale in self.scales: - img_scaled = cv2.resize(image, (scale, scale)) - noise_scaled = self.extract_noise_single(img_scaled, 'wavelet') - ref_scaled = cv2.resize(ref_noise, (scale, scale)) - - corr = np.corrcoef(noise_scaled.ravel(), ref_scaled.ravel())[0, 1] - scale_scores.append(corr) - - multi_scale_consistency = float(np.std(scale_scores)) # Lower is more consistent - - # Detection decision - threshold = self.codebook['detection_threshold'] - is_watermarked = ( - correlation > threshold and - avg_phase_match > 0.45 and - 0.7 < structure_ratio < 2.0 - ) - - # Confidence score (Bayesian combination) - corr_score = max(0, (correlation - threshold) / (self.codebook['correlation_mean'] - threshold + 1e-10)) - phase_score = avg_phase_match - structure_score = max(0, 1 - abs(structure_ratio - 1.32) / 0.6) - consistency_score = max(0, 1 - multi_scale_consistency * 5) - - confidence = min(1.0, ( - 0.35 * corr_score + - 0.35 * phase_score + - 0.15 * structure_score + - 0.15 * consistency_score - )) - + correlation = float(np.corrcoef(noise.ravel(), ref_noise.ravel())[0, 1]) + + carrier_mags_img = [ + img_mag[fy + center, fx + center] + for fy, fx in all_carriers + if 0 <= fy + center < target_size and 0 <= fx + center < target_size + ] + avg_carrier_strength = float(np.mean(carrier_mags_img)) if carrier_mags_img else 0.0 + + multi_scale_consistency = 0.0 # skip expensive computation; phase match is decisive + + # ================================================================== + # SCORING + # + # Primary: best_phase_match — phase agreement at carrier frequencies + # vs per-set reference. Calibrated gap: + # WM: 0.92-0.99 (black/nbpro 0.99, white 0.92) + # non-WM: 0.47-0.53 (max observed 0.71) + # + # Threshold at 0.80: well above non-WM max (0.71) with margin, + # well below WM min (0.92 avg for white). + # + # Supporting: cvr_noise adds confidence for dark images. + # ================================================================== + + # Phase score: sigmoid centered at 0.78 (between 0.71 max non-WM and 0.92 min WM) + phase_score = float(1.0 / (1.0 + np.exp(-20.0 * (best_phase_match - 0.78)))) + + # CVR noise score: supporting signal + cvr_score = float(1.0 / (1.0 + np.exp(-2.0 * (cvr_noise - 2.0)))) + + # Combined confidence — phase-dominant + confidence = float(min(1.0, 0.80 * phase_score + 0.20 * cvr_score)) + + is_watermarked = confidence > 0.50 + return DetectionResult( is_watermarked=bool(is_watermarked), - confidence=float(confidence), + confidence=confidence, correlation=correlation, - phase_match=avg_phase_match, + phase_match=best_phase_match, structure_ratio=structure_ratio, carrier_strength=avg_carrier_strength, multi_scale_consistency=multi_scale_consistency, details={ - 'threshold': threshold, - 'corr_score': corr_score, + 'best_set': best_set, + 'best_phase_match': best_phase_match, + 'set_results': set_results, + 'cvr_noise': cvr_noise, 'phase_score': phase_score, - 'structure_score': structure_score, - 'consistency_score': consistency_score, - 'scale_correlations': scale_scores + 'cvr_score': cvr_score, } )