From c1f0fd8b58226573264f849fdd52b769a3f63c3d Mon Sep 17 00:00:00 2001 From: Alosh Denny Date: Sat, 28 Mar 2026 17:20:37 +0530 Subject: [PATCH] multi-resolution carriers --- src/extraction/synthid_bypass.py | 1113 ++++++++++++++++++------------ 1 file changed, 662 insertions(+), 451 deletions(-) diff --git a/src/extraction/synthid_bypass.py b/src/extraction/synthid_bypass.py index 6df88ad..f3180aa 100644 --- a/src/extraction/synthid_bypass.py +++ b/src/extraction/synthid_bypass.py @@ -49,14 +49,25 @@ class SynthIDBypass: deep learning models. """ - # Known SynthID carrier frequencies (from analysis) + # SynthID carrier frequencies extracted from 288 Gemini reference images. + # Carriers sit on a (48, 88) grid in frequency space at 512x512 resolution. KNOWN_CARRIERS = [ - (14, 14), (-14, -14), - (126, 14), (-126, -14), - (98, -14), (-98, 14), - (128, 128), (-128, -128), - (210, -14), (-210, 14), - (238, 14), (-238, -14), + # Axis-aligned (strong) + (48, 0), (-48, 0), + (96, 0), (-96, 0), + (192, 0), (-192, 0), + (210, 0), (-210, 0), + (238, 0), (-238, 0), + (0, 88), (0, -88), + (0, 176), (0, -176), + (0, 192), (0, -192), + # Off-axis (grid intersections) + (48, 88), (-48, -88), + (48, -88), (-48, 88), + (96, 88), (-96, -88), + (96, -88), (-96, 88), + (96, 176), (-96, -176), + (96, -176), (-96, 176), ] def __init__( @@ -1362,132 +1373,127 @@ class SynthIDBypass: image: np.ndarray, codebook: 'SpectralCodebook', strength: str = 'moderate', - verify: bool = True + passes: int = 0, + verify: bool = True, ) -> BypassResult: """ - V3 Spectral Bypass — Surgical frequency-domain watermark removal. - - Uses a SpectralCodebook (extracted from reference black/white images) - to estimate and subtract the watermark component at EVERY frequency bin, - weighted by phase consistency and content-adaptive scaling. - - This is fundamentally different from v2 (blind transforms): - - v2: applies broad distortions hoping to disrupt the watermark - - v3: surgically identifies and removes the watermark signal - - Args: - image: Input image (RGB, uint8) - codebook: Pre-extracted SpectralCodebook with watermark profile - strength: 'gentle', 'moderate', 'aggressive', 'maximum' - verify: Whether to run detection before/after - - Returns: - BypassResult with cleaned image and metrics + V3 Spectral Bypass — multi-resolution codebook subtraction. + + Automatically selects the best-matching profile from the codebook + for the input image's resolution. If an exact resolution match + exists, operates entirely in the FFT domain (fast path). + Otherwise, constructs the watermark in the spatial domain at the + profile's native resolution, resizes to the target, and subtracts. """ - # Save original immediately as uint8 before any float operations - # (avoids Python 3.14 JIT aliasing bugs with float arrays) - original_uint8 = np.clip(image, 0, 255).astype(np.uint8) if image.dtype != np.uint8 else image.copy() - - # Convert image to float64 in [0, 255] range for FFT processing + if passes <= 0: + passes = {'gentle': 1, 'moderate': 2, 'aggressive': 3, + 'maximum': 3}.get(strength, 2) + + original_uint8 = (np.clip(image, 0, 255).astype(np.uint8) + if image.dtype != np.uint8 else image.copy()) + if image.dtype == np.uint8: work = image.astype(np.float64) elif np.max(image) > 1.5: work = image.astype(np.float64) else: work = image.astype(np.float64) * 255.0 - + h, w = work.shape[:2] - - # Compute average luminance (0-1 scale) avg_luminance = float(np.mean(work)) / 255.0 - - # Handle size mismatch with codebook - cb_h, cb_w = int(codebook.ref_shape[0]), int(codebook.ref_shape[1]) - need_resize = (h != cb_h or w != cb_w) - - if need_resize: - work = cv2.resize(work, (cb_w, cb_h), interpolation=cv2.INTER_LANCZOS4) - - # --- FFT subtraction per channel --- - cleaned_channels = [] - stages = ['spectral_subtraction'] - - for ch in range(3): - fft_ch = np.fft.fft2(work[:, :, ch]) - wm_est = codebook.estimate_watermark_fft( - fft_ch, ch, strength=strength, image_luminance=avg_luminance - ) - cleaned_ch = np.real(np.fft.ifft2(fft_ch - wm_est)) - cleaned_channels.append(cleaned_ch) - - cleaned = np.clip(np.stack(cleaned_channels, axis=-1), 0, 255) - - # Resize back to original dimensions if needed - if need_resize: - cleaned = cv2.resize(cleaned, (w, h), interpolation=cv2.INTER_LANCZOS4) - - # Light anti-aliasing (gentle Gaussian to smooth FFT artifacts) - cleaned = cv2.GaussianBlur(cleaned, (3, 3), 0.5) + + profile, (prof_h, prof_w), exact = codebook.get_profile(h, w) + + stages = [] + str_sequence = { + 'gentle': ['gentle'], + 'moderate': ['moderate', 'gentle'], + 'aggressive': ['aggressive', 'moderate', 'gentle'], + 'maximum': ['maximum', 'aggressive', 'moderate'], + }.get(strength, ['moderate']) + while len(str_sequence) < passes: + str_sequence.append(str_sequence[-1]) + str_sequence = str_sequence[:passes] + + for p_idx, p_str in enumerate(str_sequence): + if exact: + cleaned_chs = [] + for ch in range(3): + fft_ch = np.fft.fft2(work[:, :, ch]) + wm_est = codebook.estimate_watermark_fft( + fft_ch, ch, strength=p_str, + image_luminance=avg_luminance, + profile=profile, ref_shape=(prof_h, prof_w), + ) + cleaned_chs.append( + np.real(np.fft.ifft2(fft_ch - wm_est))) + work = np.clip(np.stack(cleaned_chs, axis=-1), 0, 255) + else: + cleaned_chs = [] + for ch in range(3): + wm_cb = codebook.watermark_spatial( + ch, strength=p_str, + image_luminance=avg_luminance, + profile=profile, ref_shape=(prof_h, prof_w), + ) + wm_resized = cv2.resize( + wm_cb, (w, h), + interpolation=cv2.INTER_LANCZOS4, + ) + cleaned_chs.append(work[:, :, ch] - wm_resized) + work = np.clip(np.stack(cleaned_chs, axis=-1), 0, 255) + + stages.append(f'pass_{p_idx}({p_str})') + + work = cv2.GaussianBlur(work, (3, 3), 0.4) stages.append('anti_alias') - - # Final uint8 conversion - cleaned_uint8 = np.clip(cleaned, 0, 255).astype(np.uint8) - - # --- Quality metrics --- - orig_q = original_uint8.astype(np.float64) / 255.0 - clean_q = cleaned_uint8.astype(np.float64) / 255.0 - - # PSNR - mse = float(np.mean((orig_q - clean_q) ** 2)) + + cleaned_uint8 = np.clip(work, 0, 255).astype(np.uint8) + + oq = original_uint8.astype(np.float64) / 255.0 + cq = cleaned_uint8.astype(np.float64) / 255.0 + mse = float(np.mean((oq - cq) ** 2)) psnr = float('inf') if mse == 0 else float(10 * np.log10(1.0 / mse)) - - # SSIM (block-based) - _go = 0.299 * orig_q[:,:,0] + 0.587 * orig_q[:,:,1] + 0.114 * orig_q[:,:,2] - _gm = 0.299 * clean_q[:,:,0] + 0.587 * clean_q[:,:,1] + 0.114 * clean_q[:,:,2] - _blk = 8 - _rc = (_go.shape[0] // _blk) * _blk - _cc = (_go.shape[1] // _blk) * _blk - _a = _go[:_rc, :_cc].reshape(_rc // _blk, _blk, _cc // _blk, _blk).transpose(0, 2, 1, 3).reshape(-1, _blk, _blk) - _b = _gm[:_rc, :_cc].reshape(_rc // _blk, _blk, _cc // _blk, _blk).transpose(0, 2, 1, 3).reshape(-1, _blk, _blk) - _ma = _a.mean(axis=(1, 2)); _mb = _b.mean(axis=(1, 2)) - _va = _a.var(axis=(1, 2)); _vb = _b.var(axis=(1, 2)) - _cv = ((_a - _ma[:, None, None]) * (_b - _mb[:, None, None])).mean(axis=(1, 2)) + + _go = 0.299*oq[:,:,0] + 0.587*oq[:,:,1] + 0.114*oq[:,:,2] + _gm = 0.299*cq[:,:,0] + 0.587*cq[:,:,1] + 0.114*cq[:,:,2] + _b = 8 + _rc = (_go.shape[0]//_b)*_b; _cc = (_go.shape[1]//_b)*_b + _ao = _go[:_rc,:_cc].reshape(_rc//_b,_b,_cc//_b,_b).transpose(0,2,1,3).reshape(-1,_b,_b) + _am = _gm[:_rc,:_cc].reshape(_rc//_b,_b,_cc//_b,_b).transpose(0,2,1,3).reshape(-1,_b,_b) + _ma=_ao.mean(axis=(1,2)); _mb=_am.mean(axis=(1,2)) + _va=_ao.var(axis=(1,2)); _vb=_am.var(axis=(1,2)) + _cv=((_ao-_ma[:,None,None])*(_am-_mb[:,None,None])).mean(axis=(1,2)) ssim = float(np.mean( - (2.0 * _ma * _mb + 0.0001) * (2.0 * _cv + 0.0009) / - ((_ma**2 + _mb**2 + 0.0001) * (_va + _vb + 0.0009)) + (2*_ma*_mb+1e-4)*(2*_cv+9e-4) / + ((_ma**2+_mb**2+1e-4)*(_va+_vb+9e-4)) )) - - # --- Detection --- - detection_before = None - detection_after = None - + + detection_before = detection_after = None if verify and self.extractor is not None: try: - res_b = self.extractor.detect_array(original_uint8) - detection_before = { - 'is_watermarked': res_b.is_watermarked, - 'confidence': res_b.confidence, - 'phase_match': res_b.phase_match, - } + rb = self.extractor.detect_array(original_uint8) + detection_before = dict(is_watermarked=rb.is_watermarked, + confidence=rb.confidence, + phase_match=rb.phase_match) except Exception: pass - try: - res_a = self.extractor.detect_array(cleaned_uint8) - detection_after = { - 'is_watermarked': res_a.is_watermarked, - 'confidence': res_a.confidence, - 'phase_match': res_a.phase_match, - } + ra = self.extractor.detect_array(cleaned_uint8) + detection_after = dict(is_watermarked=ra.is_watermarked, + confidence=ra.confidence, + phase_match=ra.phase_match) except Exception: pass - - # Determine success - success = psnr > 30 and ssim > 0.90 + + nb = profile['n_black_refs'] + nw = profile['n_white_refs'] + nr = profile['n_random_refs'] + success = psnr > 28 and ssim > 0.88 if detection_before and detection_after: - conf_drop = detection_before['confidence'] - detection_after['confidence'] - success = success and (conf_drop > 0.15 or not detection_after['is_watermarked']) - + cd = detection_before['confidence'] - detection_after['confidence'] + success = success and (cd > 0.10 or not detection_after['is_watermarked']) + return BypassResult( success=success, cleaned_image=cleaned_uint8, @@ -1497,12 +1503,15 @@ class SynthIDBypass: detection_after=detection_after, stages_applied=stages, details={ - 'version': 'v3_spectral', + 'version': 'v3_multi_res', 'strength': strength, + 'passes': passes, + 'pass_schedule': str_sequence, 'avg_luminance': avg_luminance, - 'codebook_refs': f"{codebook.n_black_refs}b+{codebook.n_white_refs}w", - 'resized': need_resize, - } + 'profile_resolution': f'{prof_h}x{prof_w}', + 'exact_match': exact, + 'codebook_refs': f'{nb}b+{nw}w+{nr}r', + }, ) def bypass_v3_file( @@ -1528,46 +1537,83 @@ class SynthIDBypass: # ================================================================ -# SPECTRAL CODEBOOK — V3 Frequency-Domain Watermark Profile +# SPECTRAL CODEBOOK — Multi-Resolution V3 Frequency-Domain Profile # ================================================================ class SpectralCodebook: """ - Full frequency-domain watermark profile extracted from reference images. - - Unlike discrete carrier lists, this captures the ENTIRE spectral envelope - of the SynthID watermark — including the dense low-frequency cloud - discovered in analysis (magnitudes ~95K-103K at small frequencies). - - The codebook stores: - - magnitude_profile: average |FFT| of the watermark per channel (from black images) - - phase_template: average phase angle of the watermark per channel - - phase_consistency: per-bin measure of how stable the phase is across images - (high consistency = fixed key component, low = content-adaptive) - - white_magnitude_profile: complementary profile from white images + Multi-resolution watermark profile for SynthID bypass. + + Stores per-resolution profiles so a single codebook file handles + images of any size. Each profile captures the full spectral + envelope at one (H, W) resolution: + + magnitude_profile — avg |FFT| of the watermark signal + phase_template — circular-mean phase angle per bin + phase_consistency — coherence (0..1) per bin + content_magnitude_baseline — avg |FFT| of content (Wiener) + white_* / black_white_agreement — cross-validation data + + Build profiles with: + extract_from_references() — from pure-black / pure-white refs + build_from_watermarked() — from diverse watermarked content + Both detect native resolution automatically and add a profile + keyed by (H, W). Call both to cover multiple resolutions. """ - + + CHANNEL_WEIGHTS = np.array([0.85, 1.0, 0.70]) + + _PROFILE_ARRAYS = [ + 'magnitude_profile', 'phase_template', 'phase_consistency', + 'content_magnitude_baseline', 'white_magnitude_profile', + 'white_phase_template', 'white_phase_consistency', + 'black_white_agreement', + ] + _PROFILE_SCALARS = ['n_black_refs', 'n_white_refs', 'n_random_refs'] + def __init__(self): - self.magnitude_profile = None # (H, W, 3) avg |FFT| from black refs - self.phase_template = None # (H, W, 3) avg angle(FFT) from black refs - self.phase_consistency = None # (H, W, 3) 1 - circular_std/pi — higher = more consistent - self.white_magnitude_profile = None # (H, W, 3) avg |FFT| from white refs (inverted) - self.white_phase_template = None - self.ref_shape = None # (H, W) of reference images - self.n_black_refs = 0 - self.n_white_refs = 0 - + self.profiles = {} # {(H, W): profile_dict} + + @property + def resolutions(self): + return list(self.profiles.keys()) + + @property + def ref_shape(self): + """Primary resolution (first added). Backward-compat helper.""" + if not self.profiles: + return None + return next(iter(self.profiles)) + + def get_profile(self, h, w): + """Best-matching profile for target (H, W). + + Returns (profile_dict, (prof_H, prof_W), exact_match: bool). + Prefers exact resolution match, else closest aspect ratio. + """ + if (h, w) in self.profiles: + return self.profiles[(h, w)], (h, w), True + if not self.profiles: + raise ValueError("Codebook has no profiles") + target_ar = h / (w + 1e-9) + best_key, best_score = None, float('inf') + for kh, kw in self.profiles: + ar_diff = abs(kh / (kw + 1e-9) - target_ar) / (target_ar + 1e-9) + px_diff = abs(kh * kw - h * w) / (h * w + 1e-9) + score = ar_diff * 2 + px_diff + if score < best_score: + best_score, best_key = score, (kh, kw) + return self.profiles[best_key], best_key, False + + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ + @staticmethod - def _list_reference_images( - directory: str, - max_images: int = None, - ) -> list: - """ - Collect .png/.jpg/.jpeg/.webp in a directory, excluding common unwatermarked - baseline filenames (e.g. black.jpg, white.png) when folders also hold those. - """ + def _list_reference_images(directory: str, max_images: int = None) -> list: + """Collect image files, excluding unwatermarked baselines.""" import glob - + excl = { 'black.jpg', 'black.jpeg', 'black.png', 'white.jpg', 'white.jpeg', 'white.png', @@ -1576,12 +1622,10 @@ class SpectralCodebook: for ext in ('*.png', '*.jpg', '*.jpeg', '*.webp'): paths.extend(glob.glob(os.path.join(directory, ext))) paths.extend(glob.glob(os.path.join(directory, ext.upper()))) - # Dedupe while preserving sort order seen = set() out = [] for p in sorted(paths): - base = os.path.basename(p).lower() - if base in excl: + if os.path.basename(p).lower() in excl: continue if p not in seen: seen.add(p) @@ -1589,318 +1633,399 @@ class SpectralCodebook: if max_images: out = out[:max_images] return out - - def extract_from_references(self, black_dir: str, white_dir: str = None, - max_images: int = None): + + @staticmethod + def _load_image(fpath, target_shape=None): + """Load image, optionally resizing to target_shape=(H, W).""" + img = cv2.imread(fpath) + if img is None: + return None + rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype(np.float64) + if target_shape and (rgb.shape[0], rgb.shape[1]) != target_shape: + rgb = cv2.resize(rgb, (target_shape[1], target_shape[0]), + interpolation=cv2.INTER_LANCZOS4).astype(np.float64) + return rgb + + @staticmethod + def _image_shape(fpath): + """Read image dimensions without loading full array.""" + img = cv2.imread(fpath, cv2.IMREAD_UNCHANGED) + if img is None: + return None + return (img.shape[0], img.shape[1]) + + @staticmethod + def _accumulate_fft(img_rgb: np.ndarray): + """Return (magnitude[3], phase_unit[3]) stacks for one image.""" + mags, units = [], [] + for ch in range(3): + fft_r = np.fft.fft2(img_rgb[:, :, ch]) + mags.append(np.abs(fft_r)) + units.append(np.exp(1j * np.angle(fft_r))) + return np.stack(mags, axis=-1), np.stack(units, axis=-1) + + # ------------------------------------------------------------------ + # Extraction + # ------------------------------------------------------------------ + + def extract_from_references( + self, + black_dir: str, + white_dir: str = None, + random_dir: str = None, + max_images: int = None, + ): """ - Build spectral envelope from reference black/white Gemini images. - - For black images: watermark = pixel values themselves (deviations from 0). - For white images: watermark = 255 - pixel values (deviations from 255). - - Args: - black_dir: Directory of pure-black SynthID-watermarked images (.png/.jpg/.webp) - white_dir: Optional directory of pure-white watermarked images - max_images: Max images to use per color (None = all) - - Files named black.jpg / white.png (etc.) are skipped so unwatermarked baselines - in the same folder are not averaged with watermarked outputs. + Build a profile from black / white / random reference directories. + + The profile is stored at the native resolution of the black images. """ - # --- Black images --- - black_files = self._list_reference_images(black_dir, max_images=max_images) - + black_files = self._list_reference_images(black_dir, max_images) if not black_files: - raise ValueError( - f"No image files (.png/.jpg/.jpeg/.webp) found in {black_dir} " - f"(after excluding black.* / white.* baselines)" - ) - - print(f"Extracting spectral envelope from {len(black_files)} black images...") - - # Accumulate FFT data - all_magnitudes = [] - all_phases = [] # store as complex unit vectors for circular averaging - - for i, fpath in enumerate(black_files): - img = cv2.imread(fpath) + raise ValueError(f"No images in {black_dir} (baselines excluded)") + + build_shape = self._image_shape(black_files[0]) + print(f"[black] {len(black_files)} images resolution={build_shape}") + + mag_sum = phase_unit_sum = None + n = 0 + for i, fp in enumerate(black_files): + img = self._load_image(fp, target_shape=build_shape) if img is None: continue - img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype(np.float64) - h, w = img_rgb.shape[:2] - - if self.ref_shape is None: - self.ref_shape = (h, w) - elif (h, w) != self.ref_shape: - # Resize to match first image - img_rgb = cv2.resize(img_rgb, (self.ref_shape[1], self.ref_shape[0])).astype(np.float64) - - mag_channels = [] - phase_unit_channels = [] - for ch in range(3): - channel = img_rgb[:, :, ch] - fft_result = np.fft.fft2(channel) - mag_channels.append(np.abs(fft_result)) - # Store phase as unit complex number for circular averaging - phase_unit_channels.append(np.exp(1j * np.angle(fft_result))) - - all_magnitudes.append(np.stack(mag_channels, axis=-1)) - all_phases.append(np.stack(phase_unit_channels, axis=-1)) - - if (i + 1) % 5 == 0: - print(f" Processed {i + 1}/{len(black_files)} black images") - - self.n_black_refs = len(all_magnitudes) - - # Average magnitude - self.magnitude_profile = np.mean(all_magnitudes, axis=0) - - # Circular mean of phase (average the unit vectors, then take angle) - phase_mean_vec = np.mean(all_phases, axis=0) # complex array - self.phase_template = np.angle(phase_mean_vec) # resultant angle - - # Phase consistency: |mean of unit vectors| — 1.0 means perfectly consistent, 0.0 means random - self.phase_consistency = np.abs(phase_mean_vec) - - print(f" Black envelope extracted: shape={self.magnitude_profile.shape}") - - # --- Statistics --- - # Identify the most consistent carriers - h, w = self.ref_shape - consistency_g = self.phase_consistency[:, :, 1] # G channel - # Flatten and find top consistent non-DC bins - consistency_flat = consistency_g.copy() - consistency_flat[0, 0] = 0 # exclude DC - top_indices = np.unravel_index( - np.argsort(consistency_flat.ravel())[-20:], consistency_g.shape - ) - print(f" Top 10 most phase-consistent carriers (G channel):") - for fy, fx in zip(top_indices[0][-10:], top_indices[1][-10:]): - # Convert to signed frequency - fy_s = fy if fy <= h // 2 else fy - h - fx_s = fx if fx <= w // 2 else fx - w - mag = self.magnitude_profile[fy, fx, 1] - cons = consistency_g[fy, fx] - print(f" ({fy_s:+4d},{fx_s:+4d}) mag={mag:8.0f} consistency={cons:.4f}") - - # --- White images (optional) --- + m, u = self._accumulate_fft(img) + if mag_sum is None: + mag_sum, phase_unit_sum = m.copy(), u.copy() + else: + mag_sum += m + phase_unit_sum += u + n += 1 + if (i + 1) % 10 == 0: + print(f" {i + 1}/{len(black_files)}") + del m, u, img + + avg_mag = mag_sum / n + pv = phase_unit_sum / n + del mag_sum, phase_unit_sum + + profile = { + 'magnitude_profile': avg_mag, + 'phase_template': np.angle(pv), + 'phase_consistency': np.abs(pv), + 'content_magnitude_baseline': None, + 'white_magnitude_profile': None, + 'white_phase_template': None, + 'white_phase_consistency': None, + 'black_white_agreement': None, + 'n_black_refs': n, + 'n_white_refs': 0, + 'n_random_refs': 0, + } + print(f" mean consistency (G)=" + f"{np.mean(profile['phase_consistency'][:,:,1]):.4f}") + + # ---- White refs ---- if white_dir: - white_files = self._list_reference_images(white_dir, max_images=max_images) - + white_files = self._list_reference_images(white_dir, max_images) if white_files: - print(f"\nExtracting from {len(white_files)} white images...") - w_magnitudes = [] - w_phases = [] - - for fpath in white_files: - img = cv2.imread(fpath) + print(f"[white] {len(white_files)} images") + wm_sum = wu_sum = None + nw = 0 + for fp in white_files: + img = self._load_image(fp, target_shape=build_shape) if img is None: continue - img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype(np.float64) - # Invert: watermark = deviation from white - inverted = 255.0 - img_rgb - - if img_rgb.shape[:2] != self.ref_shape: - inverted = cv2.resize(inverted, (self.ref_shape[1], self.ref_shape[0])) - - mag_ch = [] - phase_ch = [] - for ch in range(3): - fft_r = np.fft.fft2(inverted[:, :, ch]) - mag_ch.append(np.abs(fft_r)) - phase_ch.append(np.exp(1j * np.angle(fft_r))) - - w_magnitudes.append(np.stack(mag_ch, axis=-1)) - w_phases.append(np.stack(phase_ch, axis=-1)) - - self.n_white_refs = len(w_magnitudes) - self.white_magnitude_profile = np.mean(w_magnitudes, axis=0) - self.white_phase_template = np.angle(np.mean(w_phases, axis=0)) - print(f" White envelope extracted: {self.n_white_refs} images") - - print(f"\nCodebook complete: {self.n_black_refs} black + {self.n_white_refs} white references") - + m, u = self._accumulate_fft(255.0 - img) + if wm_sum is None: + wm_sum, wu_sum = m.copy(), u.copy() + else: + wm_sum += m + wu_sum += u + nw += 1 + del m, u, img + profile['white_magnitude_profile'] = wm_sum / nw + wpv = wu_sum / nw + profile['white_phase_template'] = np.angle(wpv) + profile['white_phase_consistency'] = np.abs(wpv) + profile['n_white_refs'] = nw + profile['black_white_agreement'] = np.abs(np.cos( + profile['phase_template'] - profile['white_phase_template'] + )) + agree_g = profile['black_white_agreement'][:, :, 1] + print(f" cross-validated bins (|cos|>0.90, G): " + f"{int(np.sum(agree_g > 0.90))}") + + # ---- Random / content baseline ---- + if random_dir: + rand_files = self._list_reference_images(random_dir, max_images) + if rand_files: + print(f"[random] {len(rand_files)} images (content baseline)") + rm_sum = None + nr = 0 + for fp in rand_files: + img = self._load_image(fp, target_shape=build_shape) + if img is None: + continue + m, _ = self._accumulate_fft(img) + if rm_sum is None: + rm_sum = m.copy() + else: + rm_sum += m + nr += 1 + del m, img + profile['content_magnitude_baseline'] = rm_sum / nr + profile['n_random_refs'] = nr + + self.profiles[build_shape] = profile + self._print_top_carriers(profile, build_shape) + nb = profile['n_black_refs'] + nw = profile['n_white_refs'] + nr = profile['n_random_refs'] + print(f"\nProfile added: {build_shape[0]}x{build_shape[1]} " + f"({nb}b+{nw}w+{nr}r refs)") + + def build_from_watermarked( + self, + image_dir: str, + max_images: int = None, + ): + """ + Build a profile from diverse watermarked images at native resolution. + + Content averages out across images; the fixed watermark signal + survives in phase coherence. Watermark magnitude is estimated as + ``avg_magnitude × coherence²``. + + The profile is stored at the native resolution of the images. + """ + files = self._list_reference_images(image_dir, max_images) + if not files: + raise ValueError(f"No images in {image_dir}") + + build_shape = self._image_shape(files[0]) + print(f"[watermarked] {len(files)} images resolution={build_shape}") + + mag_sum = phase_unit_sum = None + n = 0 + for i, fp in enumerate(files): + img = self._load_image(fp, target_shape=build_shape) + if img is None: + continue + m, u = self._accumulate_fft(img) + if mag_sum is None: + mag_sum, phase_unit_sum = m.copy(), u.copy() + else: + mag_sum += m + phase_unit_sum += u + n += 1 + if (i + 1) % 10 == 0: + print(f" {i + 1}/{len(files)}") + del m, u, img + + avg_mag = mag_sum / n + pv = phase_unit_sum / n + coherence = np.abs(pv) + del mag_sum, phase_unit_sum + + profile = { + 'magnitude_profile': avg_mag * (coherence ** 2), + 'phase_template': np.angle(pv), + 'phase_consistency': coherence, + 'content_magnitude_baseline': avg_mag, + 'white_magnitude_profile': None, + 'white_phase_template': None, + 'white_phase_consistency': None, + 'black_white_agreement': None, + 'n_black_refs': 0, + 'n_white_refs': 0, + 'n_random_refs': n, + } + + self.profiles[build_shape] = profile + self._print_top_carriers(profile, build_shape) + print(f"\nProfile added: {build_shape[0]}x{build_shape[1]} " + f"({n} watermarked refs)") + + @staticmethod + def _print_top_carriers(profile, ref_shape): + h, w = ref_shape + cg = profile['phase_consistency'][:, :, 1].copy() + cg[0, 0] = 0 + top = np.unravel_index(np.argsort(cg.ravel())[-10:], cg.shape) + print(" Top-10 phase-consistent carriers (G):") + bwa = profile.get('black_white_agreement') + for fy, fx in zip(top[0][::-1], top[1][::-1]): + fy_s = fy if fy <= h // 2 else fy - h + fx_s = fx if fx <= w // 2 else fx - w + mg = profile['magnitude_profile'][fy, fx, 1] + cs = cg[fy, fx] + xv = (f" agree={bwa[fy, fx, 1]:.3f}" if bwa is not None else "") + print(f" ({fy_s:+4d},{fx_s:+4d}) mag={mg:9.0f} " + f"cons={cs:.4f}{xv}") + + # ------------------------------------------------------------------ + # Watermark estimation (Wiener-style) + # ------------------------------------------------------------------ + def estimate_watermark_fft( self, image_fft: np.ndarray, channel: int, strength: str = 'moderate', - image_luminance: float = 0.5 + image_luminance: float = 0.5, + profile: dict = None, + ref_shape: tuple = None, ) -> np.ndarray: """ - Estimate the watermark component in the frequency domain for a channel. - - Uses a SELECTIVE notch-filter approach: only targets frequency bins - that are BOTH high-magnitude (watermark carriers) AND phase-consistent - (fixed-key component). This avoids the catastrophic quality loss of - blanket subtraction. - - Args: - image_fft: FFT of one channel of the target image - channel: Channel index (0=R, 1=G, 2=B) - strength: 'gentle', 'moderate', 'aggressive', 'maximum' - image_luminance: Average luminance of the image (0-1) for adaptive scaling - - Returns: - Complex array — estimated watermark FFT to subtract + Estimate watermark FFT for subtraction. + + If *profile* / *ref_shape* are not supplied, auto-selects the + best-matching profile from ``image_fft.shape``. """ - if self.magnitude_profile is None: - raise ValueError("Codebook not extracted. Call extract_from_references first.") - - # Strength controls HOW MANY bins we target and HOW MUCH we remove - strength_config = { - 'gentle': {'mag_pct': 99.0, 'cons_thresh': 0.98, 'removal_frac': 0.5}, - 'moderate': {'mag_pct': 97.0, 'cons_thresh': 0.95, 'removal_frac': 0.7}, - 'aggressive': {'mag_pct': 95.0, 'cons_thresh': 0.90, 'removal_frac': 0.9}, - 'maximum': {'mag_pct': 90.0, 'cons_thresh': 0.80, 'removal_frac': 1.0}, - } - cfg = strength_config.get(strength, strength_config['moderate']) - - # Get the reference magnitude and phase for this channel - ref_mag = self.magnitude_profile[:, :, channel] - ref_phase = self.phase_template[:, :, channel] - consistency = self.phase_consistency[:, :, channel] - - # --- Content-adaptive magnitude --- - if self.white_magnitude_profile is not None: - white_mag = self.white_magnitude_profile[:, :, channel] - effective_mag = ref_mag * (1.0 - image_luminance) + white_mag * image_luminance + if profile is None: + profile, ref_shape, _ = self.get_profile(*image_fft.shape) + if ref_shape is None: + ref_shape = image_fft.shape + + cfg = { + 'gentle': {'removal': 0.60, 'cons_floor': 0.70, 'mag_cap': 0.50, + 'dc_radius': 30}, + 'moderate': {'removal': 0.80, 'cons_floor': 0.50, 'mag_cap': 0.70, + 'dc_radius': 25}, + 'aggressive': {'removal': 0.95, 'cons_floor': 0.30, 'mag_cap': 0.90, + 'dc_radius': 20}, + 'maximum': {'removal': 1.00, 'cons_floor': 0.15, 'mag_cap': 0.95, + 'dc_radius': 15}, + }.get(strength, {'removal': 0.80, 'cons_floor': 0.50, 'mag_cap': 0.70, + 'dc_radius': 25}) + + ch_w = float(self.CHANNEL_WEIGHTS[channel]) + H, W = ref_shape + + ref_mag = profile['magnitude_profile'][:, :, channel] + ref_phase = profile['phase_template'][:, :, channel] + consistency = profile['phase_consistency'][:, :, channel] + + w_mp = profile.get('white_magnitude_profile') + if w_mp is not None: + wm_mag = (ref_mag * (1.0 - image_luminance) + + w_mp[:, :, channel] * image_luminance) else: - effective_mag = ref_mag.copy() - - # --- SELECTIVE bin targeting --- - # Only target bins that are BOTH: - # 1. High magnitude (actual watermark carriers, not noise floor) - # 2. High phase consistency (fixed key, reliably identifiable) - mag_threshold = np.percentile(effective_mag, cfg['mag_pct']) - - # Binary mask of targeted bins - target_mask = ( - (effective_mag >= mag_threshold) & - (consistency >= cfg['cons_thresh']) - ).astype(np.float64) - - # Number of targeted bins - n_targeted = int(np.sum(target_mask)) - total_bins = target_mask.size - - # --- Safe subtraction amount --- - # The watermark magnitude from the codebook is the ISOLATED watermark signal - # (from pure-black images). In a real image, the total FFT magnitude at a bin is - # content + watermark, which is typically much larger than watermark alone. - # - # We subtract the codebook's watermark magnitude × removal_frac. - # BUT we cap the subtraction at a safe fraction of the image's total magnitude - # to prevent destroying content (watermark phase may not align perfectly - # with the actual watermark in the target image). - image_mag = np.abs(image_fft) - - # Watermark amount to subtract: codebook magnitude × strength - wm_subtract = effective_mag * cfg['removal_frac'] - - # Safety cap: never remove more than 30% of the image's magnitude at any bin - # (watermark energy is typically <10% of content energy at non-carrier bins) - max_safe_subtract = image_mag * 0.30 - subtract_mag = np.minimum(wm_subtract, max_safe_subtract) - - # Apply the target mask — only affect selected bins - subtract_mag = subtract_mag * target_mask - - # --- Phase for subtraction --- - # Use the codebook template phase (known fixed-key phase) - # This is the key: we know the watermark's phase from the references, - # so we subtract at the RIGHT phase to destructively interfere - wm_estimate = subtract_mag * np.exp(1j * ref_phase) - - return wm_estimate - + wm_mag = ref_mag.copy() + + bwa = profile.get('black_white_agreement') + if bwa is not None: + wm_mag = wm_mag * bwa[:, :, channel] + + dc_r = cfg['dc_radius'] + fy = np.arange(H).reshape(-1, 1).astype(np.float64) + fx = np.arange(W).reshape(1, -1).astype(np.float64) + fy = np.where(fy > H / 2, fy - H, fy) + fx = np.where(fx > W / 2, fx - W, fx) + dc_ramp = np.clip(np.sqrt(fy**2 + fx**2) / dc_r, 0, 1) + wm_mag = wm_mag * dc_ramp + + cons_weight = np.clip( + (consistency - cfg['cons_floor']) / (1.0 - cfg['cons_floor'] + 1e-9), + 0, 1, + ) + + subtract_mag = wm_mag * cons_weight * cfg['removal'] * ch_w + subtract_mag = np.minimum(subtract_mag, np.abs(image_fft) * cfg['mag_cap']) + + return subtract_mag * np.exp(1j * ref_phase) + + def watermark_spatial( + self, + channel: int, + strength: str = 'moderate', + image_luminance: float = 0.5, + profile: dict = None, + ref_shape: tuple = None, + ) -> np.ndarray: + """ + Estimated watermark in spatial domain at a profile's native resolution. + + The caller can ``cv2.resize`` the result to any target resolution. + """ + if profile is None: + if not self.profiles: + raise ValueError("No profiles") + ref_shape = next(iter(self.profiles)) + profile = self.profiles[ref_shape] + if ref_shape is None: + ref_shape = next(iter(self.profiles)) + + cb = profile.get('content_magnitude_baseline') + pt = profile['phase_template'][:, :, channel] + if cb is not None: + synth_fft = cb[:, :, channel] * np.exp(1j * pt) + else: + synth_fft = (profile['magnitude_profile'][:, :, channel] * 10 + * np.exp(1j * pt)) + + wm_fft = self.estimate_watermark_fft( + synth_fft, channel, strength=strength, + image_luminance=image_luminance, + profile=profile, ref_shape=ref_shape, + ) + return np.real(np.fft.ifft2(wm_fft)) + + # ------------------------------------------------------------------ + # Save / Load + # ------------------------------------------------------------------ + def save(self, path: str): - """Save codebook to .npz file.""" - data = { - 'magnitude_profile': self.magnitude_profile, - 'phase_template': self.phase_template, - 'phase_consistency': self.phase_consistency, - 'ref_shape': np.array(self.ref_shape), - 'n_black_refs': np.array(self.n_black_refs), - 'n_white_refs': np.array(self.n_white_refs), - } - if self.white_magnitude_profile is not None: - data['white_magnitude_profile'] = self.white_magnitude_profile - data['white_phase_template'] = self.white_phase_template + """Save all profiles to a single .npz file.""" + data = {'resolutions': np.array(list(self.profiles.keys()))} + for (h, w), prof in self.profiles.items(): + pfx = f'{h}x{w}/' + for key in self._PROFILE_ARRAYS: + if prof.get(key) is not None: + data[pfx + key] = prof[key] + for key in self._PROFILE_SCALARS: + data[pfx + key] = np.array(prof.get(key, 0)) np.savez_compressed(path, **data) - print(f"Codebook saved to {path}") - + res_str = ', '.join(f'{h}x{w}' for h, w in self.profiles) + print(f"Codebook saved → {path} [{res_str}]") + def load(self, path: str): - """Load codebook from .npz file.""" - data = np.load(path) - self.magnitude_profile = data['magnitude_profile'] - self.phase_template = data['phase_template'] - self.phase_consistency = data['phase_consistency'] - self.ref_shape = (int(data['ref_shape'][0]), int(data['ref_shape'][1])) - self.n_black_refs = int(data['n_black_refs']) - self.n_white_refs = int(data['n_white_refs']) - if 'white_magnitude_profile' in data: - self.white_magnitude_profile = data['white_magnitude_profile'] - self.white_phase_template = data['white_phase_template'] - print(f"Codebook loaded: {self.ref_shape}, {self.n_black_refs}b+{self.n_white_refs}w refs") + """Load profiles. Supports multi-res and legacy single-res files.""" + d = np.load(path) + if 'resolutions' in d: + for res in d['resolutions']: + h, w = int(res[0]), int(res[1]) + pfx = f'{h}x{w}/' + prof = {} + for key in self._PROFILE_ARRAYS: + fk = pfx + key + prof[key] = d[fk] if fk in d else None + for key in self._PROFILE_SCALARS: + fk = pfx + key + prof[key] = int(d[fk]) if fk in d else 0 + self.profiles[(h, w)] = prof + else: + h, w = int(d['ref_shape'][0]), int(d['ref_shape'][1]) + prof = {} + for key in self._PROFILE_ARRAYS: + prof[key] = d[key] if key in d else None + prof['n_black_refs'] = int(d['n_black_refs']) if 'n_black_refs' in d else 0 + prof['n_white_refs'] = int(d['n_white_refs']) if 'n_white_refs' in d else 0 + prof['n_random_refs'] = int(d['n_random_refs']) if 'n_random_refs' in d else 0 + self.profiles[(h, w)] = prof + + res_str = ', '.join(f'{h}x{w}' for h, w in self.profiles) + print(f"Codebook loaded: [{res_str}]") + for (h, w), prof in self.profiles.items(): + nb, nw, nr = prof['n_black_refs'], prof['n_white_refs'], prof['n_random_refs'] + print(f" {h}x{w}: {nb}b+{nw}w+{nr}r") # ================================================================ # CLI INTERFACE # ================================================================ -if __name__ == '__main__': - import argparse - - parser = argparse.ArgumentParser(description='SynthID Watermark Bypass (Non-DL)') - parser.add_argument('input', type=str, help='Input image path') - parser.add_argument('output', type=str, help='Output image path') - parser.add_argument('--v2', action='store_true', - help='Use V2 combined worst-case pipeline (recommended)') - parser.add_argument('--mode', type=str, default='balanced', - choices=['light', 'balanced', 'aggressive', 'maximum'], - help='V1 bypass mode (ignored with --v2)') - parser.add_argument('--strength', type=str, default='aggressive', - choices=['moderate', 'aggressive', 'maximum'], - help='V2 bypass strength') - parser.add_argument('--iterations', type=int, default=None, - help='Number of V2 pipeline iterations (default: auto)') - parser.add_argument('--codebook', type=str, default=None, - help='Codebook path for verification') - parser.add_argument('--no-verify', action='store_true', - help='Skip verification') - - args = parser.parse_args() - - # Initialize bypass - extractor = None - if args.codebook and not args.no_verify: - try: - from robust_extractor import RobustSynthIDExtractor - extractor = RobustSynthIDExtractor() - extractor.load_codebook(args.codebook) - except Exception as e: - print(f"Warning: Could not load extractor: {e}") - - bypass = SynthIDBypass(extractor=extractor) - - # Run bypass - if args.v2: - result = bypass.bypass_v2_file( - args.input, args.output, - strength=args.strength, - iterations=args.iterations, - verify=not args.no_verify - ) - mode_str = f"v2/{args.strength}" - else: - result = bypass.bypass_file( - args.input, args.output, - mode=args.mode, - verify=not args.no_verify - ) - mode_str = f"v1/{args.mode}" - - # Print results +def _print_bypass_result(result, mode_str): print("\n" + "=" * 60) print("SYNTHID BYPASS RESULTS") print("=" * 60) @@ -1909,25 +2034,111 @@ if __name__ == '__main__': print(f" PSNR: {result.psnr:.2f} dB") print(f" SSIM: {result.ssim:.4f}") print(f" Stages: {', '.join(result.stages_applied)}") - + if result.details: + for k, v in result.details.items(): + print(f" {k}: {v}") if result.detection_before: print("\n Before Bypass:") - print(f" Watermarked: {result.detection_before['is_watermarked']}") - conf_key = 'confidence' if 'confidence' in result.detection_before else 'phase_match' - print(f" Confidence: {result.detection_before[conf_key]:.4f}") - + for k, v in result.detection_before.items(): + print(f" {k}: {v}") if result.detection_after: print("\n After Bypass:") - print(f" Watermarked: {result.detection_after['is_watermarked']}") - conf_key = 'confidence' if 'confidence' in result.detection_after else 'phase_match' - print(f" Confidence: {result.detection_after[conf_key]:.4f}") - - if result.detection_before: - bk = 'confidence' if 'confidence' in result.detection_before else 'phase_match' - ak = 'confidence' if 'confidence' in result.detection_after else 'phase_match' - drop = result.detection_before[bk] - result.detection_after[ak] - pct = 100 * drop / (result.detection_before[bk] + 1e-10) - print(f"\n Confidence Drop: {drop:.4f} ({pct:.1f}%)") - + for k, v in result.detection_after.items(): + print(f" {k}: {v}") print("=" * 60) - print(f"Saved to: {args.output}") + + +if __name__ == '__main__': + import argparse + + parser = argparse.ArgumentParser( + description='SynthID Watermark Bypass & Codebook Builder') + sub = parser.add_subparsers(dest='command') + + # --- build-codebook --- + bp = sub.add_parser('build-codebook', + help='Build a multi-resolution spectral codebook') + bp.add_argument('--black', help='Black reference images directory') + bp.add_argument('--white', help='White reference images directory') + bp.add_argument('--watermarked', nargs='+', default=[], + help='Watermarked image dirs (one profile per dir)') + bp.add_argument('--output', required=True, help='Output .npz path') + + # --- bypass --- + byp = sub.add_parser('bypass', help='Remove SynthID watermark from image') + byp.add_argument('input', help='Input image path') + byp.add_argument('output', help='Output image path') + byp.add_argument('--version', choices=['v1', 'v2', 'v3'], default='v3') + byp.add_argument('--strength', default='aggressive', + choices=['gentle', 'moderate', 'aggressive', 'maximum']) + byp.add_argument('--codebook', help='Spectral codebook .npz (V3)') + byp.add_argument('--detector', help='Detector codebook .pkl') + byp.add_argument('--no-verify', action='store_true') + + # --- legacy positional (backward compat) --- + byp_legacy = sub.add_parser('legacy', help=argparse.SUPPRESS) + byp_legacy.add_argument('input', help='Input image path') + byp_legacy.add_argument('output', help='Output image path') + byp_legacy.add_argument('--v2', action='store_true') + byp_legacy.add_argument('--mode', default='balanced') + byp_legacy.add_argument('--strength', default='aggressive') + byp_legacy.add_argument('--codebook', default=None) + byp_legacy.add_argument('--no-verify', action='store_true') + + args = parser.parse_args() + + if args.command == 'build-codebook': + codebook = SpectralCodebook() + if args.black: + codebook.extract_from_references( + args.black, white_dir=args.white) + for d in args.watermarked: + codebook.build_from_watermarked(d) + if not codebook.profiles: + parser.error("Provide --black and/or --watermarked directories") + os.makedirs(os.path.dirname(args.output) or '.', exist_ok=True) + codebook.save(args.output) + + elif args.command in ('bypass', 'legacy'): + extractor = None + det_path = getattr(args, 'detector', None) or getattr(args, 'codebook', None) + if det_path and not args.no_verify: + try: + from robust_extractor import RobustSynthIDExtractor + extractor = RobustSynthIDExtractor() + extractor.load_codebook(det_path) + except Exception as e: + print(f"Warning: Could not load extractor: {e}") + + bypass = SynthIDBypass(extractor=extractor) + ver = getattr(args, 'version', None) + if ver == 'v3' or (args.command == 'bypass' and ver is None): + ver = 'v3' + if getattr(args, 'v2', False): + ver = 'v2' + + if ver == 'v3': + cb_path = args.codebook + if not cb_path: + parser.error("V3 requires --codebook path") + codebook = SpectralCodebook() + codebook.load(cb_path) + result = bypass.bypass_v3_file( + args.input, args.output, codebook, + strength=args.strength, verify=not args.no_verify) + _print_bypass_result(result, f"v3/{args.strength}") + elif ver == 'v2': + result = bypass.bypass_v2_file( + args.input, args.output, + strength=args.strength, verify=not args.no_verify) + _print_bypass_result(result, f"v2/{args.strength}") + else: + mode = getattr(args, 'mode', 'balanced') + result = bypass.bypass_file( + args.input, args.output, + mode=mode, verify=not args.no_verify) + _print_bypass_result(result, f"v1/{mode}") + print(f"Saved to: {args.output}") + + else: + parser.print_help()