This commit is contained in:
Alosh Denny
2026-02-15 18:04:20 +05:30
parent ad79ba532f
commit 25483c159a
22 changed files with 3341 additions and 296 deletions
+8 -1
View File
@@ -26,8 +26,15 @@ Thumbs.db
!artifacts/codebook/*.pkl
# Data (too large for git)
data/pure_white/
data/
artifacts/spectral_codebook.npz
artifacts/samples/
artifacts/refs/
# Temporary
*.tmp
*.log
_temp_*
_output_*
push_randomly.sh
synthid_bypass.py.final
+305 -286
View File
@@ -2,213 +2,156 @@
<img src="assets/synthid-watermark.jpeg" alt="SynthID Watermark Analysis" width="100%">
</p>
<h1 align="center">🔍 AI Watermark Reverse Engineering</h1>
<h1 align="center">🔍 Reverse-Engineering SynthID</h1>
<p align="center">
<b>Discovering hidden AI watermark patterns through signal analysis</b>
<b>Discovering, detecting, and surgically removing Google's AI watermark through spectral analysis</b>
</p>
<p align="center">
<img src="https://img.shields.io/badge/Python-3.10+-blue?style=flat-square&logo=python" alt="Python">
<img src="https://img.shields.io/badge/License-Research-green?style=flat-square" alt="License">
<img src="https://img.shields.io/badge/Status-Complete-success?style=flat-square" alt="Status">
<img src="https://img.shields.io/badge/Images_Analyzed-123,268-brightgreen?style=flat-square" alt="Images">
<img src="https://img.shields.io/badge/Detection_Rate-99.9%25-success?style=flat-square" alt="Detection">
<img src="https://img.shields.io/badge/Detection_Rate-90%25-success?style=flat-square" alt="Detection">
<img src="https://img.shields.io/badge/Bypass_V3-PSNR_40dB+-blueviolet?style=flat-square" alt="Bypass V3">
</p>
---
## 🎯 Overview
This project reverse-engineers **AI watermarking technologies** by analyzing AI-generated and AI-edited images. We use signal processing techniques to discover watermark structures without access to proprietary neural network encoders/decoders.
This project reverse-engineers **Google's SynthID** watermarking system - the invisible watermark embedded into every image generated by Google Gemini. Using only signal processing and spectral analysis (no access to the proprietary encoder/decoder), we:
### Projects
1. **Discovered** the watermark's exact frequency-domain structure
2. **Built a detector** that identifies SynthID watermarks with 90% accuracy
3. **Developed a spectral bypass** (V3) that surgically removes watermark components while preserving image quality at **40+ dB PSNR**
| Analysis | Images | Detection Rate | Key Finding |
|:---------|:------:|:--------------:|:------------|
| **[Nano-150k Investigation](#-nano-150k-watermark-investigation)** | 123,268 | 99.9% | Multi-layer frequency + spatial watermarking |
| **[SynthID Analysis](#-synthid-google-gemini-analysis)** | 250 | 84% | Spread-spectrum phase encoding |
### What Makes This Different
Unlike brute-force approaches (JPEG compression, noise injection), our V3 bypass uses a **SpectralCodebook** - a fingerprint of the watermark's exact frequency signature - extracted from reference images. This allows surgical, frequency-bin-level removal rather than blind signal destruction.
---
## 🔬 Nano-150k Watermark Investigation
## 🔬 Key Findings
Analysis of **123,268 AI-edited image pairs** from the Nano-150k dataset to detect and characterize embedded watermarks.
### The Watermark is a Fixed Spectral Pattern
### Key Discovery
By generating pure black and white images through Google Gemini, we isolated the watermark signal from content. The results are striking:
AI-edited images contain **multi-layer watermarks** using both frequency domain (DCT/DFT) and spatial domain (color shifts) embedding techniques. The watermarks are invisible to humans but detectable via statistical analysis.
<p align="center">
<img src="assets/synthid_black.jpg" alt="Watermark on black background" width="45%">
<img src="assets/synthid_white.jpg" alt="Watermark on white background" width="45%">
</p>
### Detection Results
<p align="center">
<i>Left: SynthID watermark extracted from a pure-black Gemini image (enhanced 100×). Right: Same watermark on a white background. The diagonal stripe pattern and carrier frequencies are clearly visible.</i>
</p>
| Metric | Rate | Description |
|:-------|:----:|:------------|
| **Frequency Domain Modifications** | 100.0% | All images show spectral changes |
| **Significant Color Shifts** | 95.3% | Mean shift > 1.0 in RGB channels |
| **Perceptual Hash Changes** | 66.0% | Invisible modifications detected |
| **LSB Anomalies** | 10.2% | Least significant bit patterns |
| **2+ Watermark Indicators** | 99.9% | Multi-layer evidence |
| **3+ Watermark Indicators** | 69.2% | Strong multi-layer evidence |
### Carrier Frequency Discovery
### Watermark Confidence Distribution
The watermark embeds energy at specific carrier frequencies with **>99.9% phase coherence** across all images:
```
0 indicators: 0 ( 0.0%)
1 indicator: 122 ( 0.1%)
2 indicators: 37,832 (30.7%) ███████████████
3 indicators: 74,525 (60.5%) ██████████████████████████████
4 indicators: 10,789 ( 8.8%) ████
```
| Carrier Frequency (fy, fx) | Phase Coherence | Magnitude | Phase (rad) |
|:--------------------------:|:---------------:|:---------:|:-----------:|
| **(±14, ±14)** | 99.96% | 16,807 | ±1.44 |
| **(±126, ±14)** | 99.96% | 8,046 | ±2.37 |
| **(±98, ∓14)** | 99.94% | 6,283 | ±0.61 |
| **(±128, ±128)** | 99.25% | 6,908 | ±2.29 |
| **(±210, ∓14)** | 99.96% | 6,032 | ±1.13 |
| **(±238, ±14)** | 99.90% | 4,190 | ±1.61 |
### Extracted Watermark Visualizations
> **Key insight:** Most carriers cluster along the `y = ±14` line in frequency space, suggesting a structured frequency selection algorithm. The diagonal stripe pattern visible in the enhanced images corresponds to these carrier frequencies.
<table>
<tr>
<td width="50%">
### Phase Consistency - A Fixed Model-Level Key
**Extracted Watermark Pattern**
<img src="watermark_investigation/WATERMARK_EXTRACTED.png" width="100%">
The watermark's phase template is **identical across all images** from the same Gemini model:
</td>
<td width="50%">
- **Green channel phase std**: < 0.007 radians across 50 reference images
- **Cross-image correlation**: 21.8% mean pairwise noise correlation
- **Noise structure ratio**: 1.32 ± 0.02 (byproduct of the neural encoder)
**Comprehensive Analysis**
<img src="watermark_investigation/WATERMARK_FINAL_ANALYSIS.png" width="100%">
This means SynthID does not embed per-image messages - it uses a **fixed spectral fingerprint** that can be profiled and subtracted.
</td>
</tr>
<tr>
<td width="50%">
### Frequency Spectrum Analysis
**Frequency Spectrum**
<img src="watermark_investigation/WATERMARK_frequency_spectrum.png" width="100%">
<p align="center">
<img src="artifacts/visualizations/synthid_watermark_spectrum.png" alt="FFT Magnitude Spectrum" width="35%">
<img src="artifacts/visualizations/synthid_watermark_carriers.png" alt="Carrier Frequencies" width="35%">
</p>
<p align="center">
<i>Left: FFT magnitude spectrum showing bright carrier frequency peaks. Right: Reconstructed carrier pattern showing the diagonal structure.</i>
</p>
</td>
<td width="50%">
**Enhanced Difference Pattern**
<img src="watermark_investigation/WATERMARK_enhanced_difference.png" width="100%">
</td>
</tr>
</table>
### Analysis by Edit Category
| Category | Image Pairs | Avg Freq Diff | Watermark Strength |
|:---------|:-----------:|:-------------:|:------------------:|
| hairstyle | 16,012 | 1.786 | High |
| sweet_headshot | 16,008 | 1.759 | High |
| black_headshot | 17,700 | 1.735 | High |
| background | 32,765 | 1.037 | Medium |
| time-change | 18,178 | 1.028 | Medium |
| action | 22,605 | 1.013 | Medium |
### Processing Statistics
- **Total Processing Time**: 170.2 minutes
- **Processing Rate**: 12.1 pairs/second
- **Success Rate**: 100% (0 failed loads)
<p align="center">
<img src="artifacts/visualizations/deep_analysis/frequency_analysis.png" alt="Frequency Analysis" width="80%">
</p>
<p align="center">
<i>Detailed frequency analysis: Average magnitude spectrum (left) and phase coherence map (right). The carrier positions are marked with crosshairs.</i>
</p>
---
## 🔬 SynthID (Google Gemini) Analysis
## 🏗️ Architecture
Analysis of **250 AI-generated images** from Google Gemini to reverse-engineer SynthID watermarking.
### Three Generations of Bypass
### Key Discovery
| Version | Approach | PSNR | Detection Impact | Status |
|:-------:|:---------|:----:|:----------------:|:------:|
| **V1** | JPEG compression (Q50) | 37 dB | ~11% phase drop | ✅ Baseline |
| **V2** | Multi-stage transforms (noise, color, frequency) | 27-37 dB | ~0% confidence drop | ✅ Quality trade-off |
| **V3** | Spectral codebook subtraction | **33-43 dB** | **1-7% confidence drop** | ✅ Best quality |
SynthID uses **spread-spectrum phase encoding** in the frequency domain—not LSB replacement or simple noise addition. The watermark embeds information through precise phase relationships at specific carrier frequencies.
## 🔬 Discovered Patterns
| Carrier Frequency | Phase Coherence | Description |
|:----------------:|:---------------:|:------------|
| **(±14, ±14)** | 99.99% | Primary diagonal carrier |
| **(±126, ±14)** | 99.97% | Secondary horizontal |
| **(±98, ±14)** | 99.94% | Tertiary carrier |
| **(±128, ±128)** | 99.92% | Center frequency |
| **(±210, ±14)** | 99.77% | Extended carrier |
| **(±238, ±14)** | 99.71% | Edge carrier |
### Detection Metrics
- **Noise Correlation**: ~0.218 between watermarked images
- **Structure Ratio**: ~1.32
- **Detection Threshold**: correlation > 0.179
## 🖼️ Extracted Watermark Visualizations
<table>
<tr>
<td width="50%">
**Enhanced Visualization (500x Amplification)**
<img src="artifacts/visualizations/synthid_watermark_amp500x.png" width="100%">
</td>
<td width="50%">
**Frequency Domain Carriers**
<img src="artifacts/visualizations/synthid_watermark_frequency.png" width="100%">
</td>
</tr>
<tr>
<td width="50%">
**False Color (HSV Encoding)**
<img src="artifacts/visualizations/synthid_watermark_falsecolor.png" width="100%">
</td>
<td width="50%">
**Phase Encoding Pattern**
<img src="artifacts/visualizations/synthid_watermark_phase.png" width="100%">
</td>
</tr>
</table>
## 📁 Project Structure
### V3 Pipeline (Spectral Bypass)
```
reverse-SynthID/
├── 📄 README.md # This file
├── 📋 requirements.txt # Python dependencies
├── 🔍 watermark_investigation/ # Nano-150k Analysis (NEW)
│ ├── WATERMARK_EXTRACTED.png # Final extracted watermark
│ ├── WATERMARK_FINAL_ANALYSIS.png # Comprehensive visualization
│ ├── WATERMARK_enhanced_difference.png # Enhanced pattern
│ ├── WATERMARK_frequency_spectrum.png # Frequency domain
│ ├── WATERMARK_signed_pattern.png # Signed watermark
│ ├── watermark_FULL_123k_results.json # Complete results
│ ├── watermark_evidence/ # Visual evidence
│ └── *.py # Analysis scripts
├── 💻 src/
│ ├── analysis/
│ │ ├── synthid_codebook_finder.py # Pattern discovery
│ │ └── deep_synthid_analysis.py # Frequency analysis
│ └── extraction/
│ └── synthid_codebook_extractor.py # Codebook extraction & detection
├── 🎯 artifacts/
│ ├── codebook/
│ │ ├── synthid_codebook.pkl # Extracted codebook (9 MB)
│ │ └── synthid_codebook_meta.json # Carrier frequencies
│ └── visualizations/ # Watermark images
├── 📂 data/
│ └── pure_white/ # 250 Gemini AI images
├── 📚 docs/
│ └── SYNTHID_CODEBOOK_ANALYSIS.md # Technical documentation
└── 🖼️ assets/
└── synthid-watermark.jpeg # Cover image
Input Image → FFT per channel → Estimate Watermark → Subtract → IFFT → Clip → Output
SpectralCodebook
(25 black + 25 white refs)
```
1. **SpectralCodebook** profiles the watermark from reference images (pure black/white Gemini outputs)
2. **Selective notch filter** targets only high-magnitude (P97+), high-consistency (≥95%) frequency bins
3. **Safe magnitude cap** limits subtraction to 30% of image energy per bin - preserving content
4. **Content-adaptive scaling** adjusts subtraction based on image luminance
---
## 📊 Results
### V3 Bypass Quality (PSNR in dB - higher is better)
| Image | Gentle | Moderate | Aggressive | Maximum |
|:-----:|:------:|:--------:|:----------:|:-------:|
| 121407 | **42.9** | 41.4 | 40.0 | 39.3 |
| 110802 | 33.4 | 33.3 | 33.1 | 33.0 |
| 131614 | **38.4** | 38.0 | 37.5 | 37.2 |
| 119198 | **38.4** | 37.7 | 37.0 | 36.6 |
| 12085 | **42.5** | 41.6 | 40.7 | 40.3 |
> All images maintain **>33 dB PSNR** - visually indistinguishable from the original.
### Watermark Detection Confidence
| Image | Before | After (Aggressive) | Drop |
|:-----:|:------:|:------------------:|:----:|
| 121407 | 0.394 | 0.387 | 1.7% |
| 131614 | 0.437 | 0.422 | 3.4% |
| 12085 | 0.394 | 0.366 | **7.2%** |
| 119198 | 0.389 | 0.386 | 0.9% |
### Sample Output
<p align="center">
<img src="assets/sample_watermarked.png" alt="Original watermarked image" width="45%">
<img src="assets/sample_cleaned.png" alt="V3 bypass output" width="45%">
</p>
<p align="center">
<i>Left: Original SynthID-watermarked Gemini image. Right: After V3 spectral bypass - visually identical, watermark energy reduced.</i>
</p>
---
## 🚀 Quick Start
### Installation
@@ -217,159 +160,235 @@ reverse-SynthID/
git clone https://github.com/yourusername/reverse-SynthID.git
cd reverse-SynthID
# Create virtual environment
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
# Install dependencies
pip install -r requirements.txt
```
### Run Nano-150k Watermark Analysis
### 1. Build Detection Codebook
```bash
# Full analysis on all 123k pairs (takes ~3 hours)
python watermark_investigation/watermark_full_123k_analysis.py
# Extract final watermark visualization
python watermark_investigation/extract_final_watermark.py
# Quick sample analysis (1000 pairs)
python watermark_investigation/watermark_full_analysis.py
python src/extraction/robust_extractor.py extract /path/to/watermarked/images \
--output artifacts/codebook/robust_codebook.pkl
```
### Detect SynthID Watermark
### 2. Detect Watermark
```bash
python src/extraction/synthid_codebook_extractor.py detect "path/to/image.png" \
--codebook "artifacts/codebook/synthid_codebook.pkl"
python src/extraction/robust_extractor.py detect image.png \
--codebook artifacts/codebook/robust_codebook.pkl
```
**Output:**
```
Detection Results:
Watermarked: True
Confidence: 1.0000
Correlation: 0.5355
Phase Match: 0.9571
Structure Ratio: 1.2753
Confidence: 0.95
Phase Match: 0.6683
```
### Extract New Codebook
### 3. Build Spectral Codebook (V3)
```bash
python src/extraction/synthid_codebook_extractor.py extract "data/pure_white/" \
--output "./my_codebook.pkl"
```
### Run Analysis
```bash
# Comprehensive pattern discovery
python src/analysis/synthid_codebook_finder.py
# Deep frequency analysis
python src/analysis/deep_synthid_analysis.py
```
## 🧠 How It Works
### Nano-150k Watermark Detection
1. **Frequency Domain Analysis**: Compute FFT differences between original and edited images
2. **LSB Pattern Detection**: Analyze least significant bit distributions for anomalies
3. **Color Shift Measurement**: Detect systematic RGB channel modifications
4. **Perceptual Hashing**: Compare perceptual hashes to find invisible changes
5. **Multi-Indicator Scoring**: Combine multiple detection methods for confidence
### SynthID Detection
1. **Pattern Discovery**: Analyze noise patterns across multiple images to find consistent structures
2. **Frequency Analysis**: Use FFT to identify carrier frequencies with phase modulation
3. **Phase Coherence**: Measure phase consistency at carrier frequencies
4. **Codebook Extraction**: Build reference patterns from averaged signals
5. **Detection**: Compare test image against codebook using correlation metrics
## 📊 Technical Details
### Nano-150k Watermark Characteristics
- **Embedding Domains**: Frequency (DCT/DFT) + Spatial (color shifts)
- **Detection Methods**: FFT analysis, LSB statistics, perceptual hashing
- **Signal Strength**: Mean freq diff ~1.32, color shifts 32-35 pixel values
- **Robustness**: Survives JPEG compression, consistent across edit types
- **Categories Analyzed**: background, action, time-change, headshot, hairstyle
### SynthID Watermark Characteristics
- **Embedding Domain**: Frequency (FFT phase)
- **Signal Strength**: ~0.1-0.15 pixel values
- **Carrier Count**: 100+ frequency locations
- **Robustness**: Survives moderate compression
### Detection Algorithms
**Nano-150k Multi-Indicator Detection:**
```python
def detect_watermark(original, edited):
indicators = 0
# 1. Frequency domain analysis
freq_diff = compute_fft_difference(original, edited)
if freq_diff > 0.5:
indicators += 1
# 2. Color shift detection
color_shift = compute_color_shift(original, edited)
if any(abs(shift) > 1.0 for shift in color_shift):
indicators += 1
# 3. LSB anomaly detection
lsb_deviation = compute_lsb_deviation(edited)
if any(dev > 0.02 for dev in lsb_deviation):
indicators += 1
# 4. Perceptual hash comparison
phash_dist = compute_phash_distance(original, edited)
if 5 < phash_dist <= 30:
indicators += 1
return indicators >= 2, indicators
from synthid_bypass import SpectralCodebook
codebook = SpectralCodebook()
codebook.extract_from_references(
black_dir='assets/black/', # Pure-black Gemini images
white_dir='assets/white/' # Pure-white Gemini images
)
codebook.save('artifacts/spectral_codebook.npz')
```
**SynthID Detection:**
### 4. Run V3 Bypass
```python
def detect_synthid(image, codebook):
# 1. Extract noise pattern
noise = image - denoise(image)
# 2. Check carrier phase coherence
fft = fft2(noise)
phase_match = check_phases(fft, codebook.carriers)
# 3. Correlate with reference
correlation = correlate(noise, codebook.reference)
# 4. Apply decision thresholds
is_watermarked = (
correlation > 0.179 and
phase_match > 0.5 and
0.8 < structure_ratio < 1.8
)
return is_watermarked, confidence
from synthid_bypass import SynthIDBypass, SpectralCodebook
codebook = SpectralCodebook()
codebook.load('artifacts/spectral_codebook.npz')
bypass = SynthIDBypass()
result = bypass.bypass_v3(image_rgb, codebook, strength='aggressive')
print(f"PSNR: {result.psnr:.1f} dB") # ~40 dB
```
**Strength levels:** `gentle` (minimal change, ~43 dB) → `moderate``aggressive``maximum` (strongest removal, ~33 dB)
---
## 📁 Project Structure
```
reverse-SynthID/
├── src/
│ ├── extraction/
│ │ ├── synthid_bypass.py # V1/V2/V3 bypass implementations + SpectralCodebook
│ │ ├── robust_extractor.py # Multi-scale watermark detection (90% accuracy)
│ │ ├── watermark_remover.py # Frequency-domain watermark removal
│ │ ├── benchmark_extraction.py # Performance benchmarking suite
│ │ └── synthid_codebook_extractor.py # Original codebook extractor (legacy)
│ └── analysis/
│ ├── deep_synthid_analysis.py # FFT/phase analysis scripts
│ └── synthid_codebook_finder.py # Carrier frequency discovery
├── assets/
│ ├── synthid_black.jpg # Watermark on black (enhanced)
│ ├── synthid_white.jpg # Watermark on white (enhanced)
│ ├── black/ # Reference black images from Gemini
│ └── white/ # Reference white images from Gemini
├── artifacts/
│ ├── codebook/ # Detection codebooks (.pkl)
│ ├── spectral_codebook.npz # V3 spectral fingerprint (119 MB)
│ ├── v3_output/ # V3 bypass output samples
│ └── visualizations/ # FFT, phase, carrier visualizations
├── watermark_investigation/ # Early-stage Nano-150k analysis (archived)
├── SYNTHID_CODEBOOK_ANALYSIS.md # Detailed codebook reverse-engineering report
├── synthid.pdf # SynthID paper reference
└── requirements.txt
```
---
## 🔬 Technical Deep Dive
### How SynthID Works (Reverse-Engineered)
```
┌──────────────────────────────────────────────────────────────┐
│ SynthID Encoder (in Gemini) │
├──────────────────────────────────────────────────────────────┤
│ 1. Generate carrier frequencies: {(14,14), (126,14), ...} │
│ 2. Assign fixed phase values to each carrier │
│ 3. Neural encoder adds learned noise pattern to image │
│ 4. Watermark is imperceptible - spread across spectrum │
├──────────────────────────────────────────────────────────────┤
│ SynthID Decoder (in Google) │
├──────────────────────────────────────────────────────────────┤
│ 1. Extract noise residual (wavelet denoising) │
│ 2. FFT → check phase at known carrier frequencies │
│ 3. If phases match expected values → Watermarked │
└──────────────────────────────────────────────────────────────┘
```
### SpectralCodebook Extraction
The codebook captures the watermark's full frequency fingerprint:
- **50 reference images** (25 pure black + 25 pure white, all from Gemini)
- Extracts **magnitude envelope** and **phase template** per channel
- Computes **phase consistency score** per frequency bin
- Content-adaptive profiles for dark vs. light image regions
### Selective Notch Filter
The V3 bypass doesn't subtract blindly - it targets only bins where:
1. **Magnitude** exceeds the 97th percentile (strong watermark energy)
2. **Phase consistency** ≥ 0.95 across reference images (confirmed watermark, not noise)
3. **Subtraction** is capped at 30% of the image's energy at each bin
This surgical precision is why V3 achieves 40+ dB PSNR while still reducing watermark energy.
### Noise Correlation Signature
| Metric | Value | Significance |
|:-------|:-----:|:-------------|
| Mean pairwise noise correlation | **0.218** | Identical watermark in all images |
| Noise structure ratio | **1.32** | Neural encoder byproduct |
| Phase coherence (top carriers) | **>99.9%** | Fixed model-level key |
| Green channel phase std | **<0.007 rad** | Strongest consistency channel |
### Bit Plane Analysis
| Bit Plane | Consistency | Role |
|:---------:|:-----------:|:-----|
| Bit 0 (LSB) | 0.049 | Watermark signal |
| Bit 1 | 0.074 | Watermark signal |
| Bit 2 | 0.125 | Partially watermarked |
| Bit 3 | 0.513 | Mixed |
| Bits 4-7 | 0.6351.000 | Image structure |
---
## 🛠️ Core Modules
### `robust_extractor.py` - Detection
Multi-scale, multi-denoiser watermark detector achieving 90% detection rate.
```python
from robust_extractor import RobustSynthIDExtractor
extractor = RobustSynthIDExtractor()
extractor.load_codebook('artifacts/codebook/robust_codebook.pkl')
result = extractor.detect_array(image)
print(f"Watermarked: {result.is_watermarked}")
print(f"Confidence: {result.confidence:.4f}")
print(f"Phase Match: {result.phase_match:.4f}")
```
**Features:**
- Multi-scale analysis (256, 512, 1024px)
- Wavelet + bilateral + NLM denoising fusion
- ICA-based watermark/content separation
- Ensemble carrier detection across scales
### `synthid_bypass.py` - Bypass (V1/V2/V3)
Three generations of watermark bypass:
```python
from synthid_bypass import SynthIDBypass, SpectralCodebook
bypass = SynthIDBypass()
# V1: Simple JPEG compression
result = bypass.bypass_simple(image, jpeg_quality=50)
# V2: Multi-stage transform pipeline
result = bypass.bypass_v2(image, strength='moderate')
# V3: Spectral codebook subtraction (best)
codebook = SpectralCodebook()
codebook.load('artifacts/spectral_codebook.npz')
result = bypass.bypass_v3(image, codebook, strength='aggressive')
```
### `watermark_remover.py` - Removal
Quality-preserving frequency-domain removal:
```python
from watermark_remover import WatermarkRemover
remover = WatermarkRemover(extractor)
result = remover.remove(image, mode='balanced')
```
---
## 📚 References
- [SynthID: Identifying AI-generated images](https://deepmind.google/technologies/synthid/)
- [Arxiv Paper - SynthID-Image: Image watermarking at internet scale]([https://doi.org/10.1038/s41586-024-07754-z](https://arxiv.org/abs/2510.09263))
- [SynthID Paper (arXiv:2510.09263)](https://arxiv.org/abs/2510.09263)
- [Synthid-Bypass ComfyUI Workflow](https://github.com/aloshdenny/Synthid-Bypass) - img2img re-generation approach
---
## ⚠️ Disclaimer
This project is for **research and educational purposes only**. SynthID is proprietary technology owned by Google DeepMind. The extracted patterns and detection methods are intended for:
This project is for **research and educational purposes only**. SynthID is proprietary technology owned by Google DeepMind. These tools are intended for:
- Academic research on watermarking techniques
- Security analysis of AI-generated content identification
- Understanding spread-spectrum encoding methods
- 🎓 Academic research on watermarking robustness
- 🔒 Security analysis of AI-generated content identification
- 📡 Understanding spread-spectrum encoding methods
**Do not use these tools to misrepresent AI-generated content as human-created.**
---
## 📄 License
@@ -378,5 +397,5 @@ Research and educational use only. See [LICENSE](LICENSE) for details.
---
<p align="center">
Made with 🔬 by reverse engineering enthusiasts
Made with 🔬 by watermark reverse engineering researchers
</p>
+28 -9
View File
@@ -146,27 +146,45 @@ Based on our analysis, SynthID likely works as follows:
2. **Image modifications may break detection**: Heavy JPEG compression, cropping, or resizing may degrade the watermark
3. **Binary watermark bits unknown**: We discovered the carrier frequencies but not the actual message encoded
## V3 Spectral Bypass — Building on These Findings
The carrier frequencies and phase consistency discovered here became the foundation for the **V3 Spectral Bypass** (`synthid_bypass.py`). Using a `SpectralCodebook` extracted from 50 reference images (25 black + 25 white), V3 can surgically subtract the watermark in the frequency domain:
- **40+ dB PSNR** — visually indistinguishable from original
- **1-7% detection confidence reduction** per pass
- **Selective notch filter** — targets only confirmed watermark bins (P97+ magnitude, ≥95% phase consistency)
See the main [README](README.md) for full V3 documentation and results.
## Files Generated
| File | Description |
|------|-------------|
| `synthid_codebook.pkl` | Full codebook with numpy arrays |
| `synthid_codebook_meta.json` | Human-readable metadata |
| `deep_analysis/` | Visualization of patterns |
| `codebook_results/` | Initial analysis results |
| `artifacts/codebook/synthid_codebook.pkl` | Detection codebook with numpy arrays |
| `artifacts/codebook/synthid_codebook_meta.json` | Human-readable metadata |
| `artifacts/spectral_codebook.npz` | V3 spectral fingerprint (119 MB) |
| `artifacts/visualizations/` | FFT, phase, carrier visualizations |
## Usage
### To detect SynthID watermark:
```bash
python synthid_codebook_extractor.py detect image.png --codebook synthid_codebook.pkl
python src/extraction/robust_extractor.py detect image.png \
--codebook artifacts/codebook/robust_codebook.pkl
```
### To extract codebook from new images:
### To run V3 spectral bypass:
```bash
python synthid_codebook_extractor.py extract /path/to/images --output new_codebook.pkl
```python
from synthid_bypass import SynthIDBypass, SpectralCodebook
codebook = SpectralCodebook()
codebook.load('artifacts/spectral_codebook.npz')
bypass = SynthIDBypass()
result = bypass.bypass_v3(image_rgb, codebook, strength='aggressive')
print(f"PSNR: {result.psnr:.1f} dB")
```
## Conclusion
@@ -176,5 +194,6 @@ SynthID uses a sophisticated spread-spectrum watermarking technique that:
- Uses **specific carrier frequencies** (14, 98, 126, 128, 210, 238 Hz and their conjugates)
- Creates a **consistent noise signature** detectable via correlation analysis
- Is **imperceptible** to human observers but **robust** enough to survive common image operations
- Uses a **fixed model-level key** (identical phase template across all images)
This analysis enables detection of SynthID watermarks without access to Google's proprietary decoder.
These findings enabled both detection (90% accuracy) and spectral bypass (40+ dB PSNR) without access to Google's proprietary encoder/decoder.
Binary file not shown.
Binary file not shown.

After

Width:  |  Height:  |  Size: 906 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 961 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 770 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 889 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.3 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.3 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.2 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.0 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.8 MiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 50 KiB

After

Width:  |  Height:  |  Size: 756 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 756 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 765 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 496 KiB

+6
View File
@@ -8,6 +8,12 @@ opencv-python>=4.5.0
# Wavelet analysis
PyWavelets>=1.1.1
# Machine learning (for ICA)
scikit-learn>=0.24.0
# Image processing
Pillow>=8.0.0
# Visualization
matplotlib>=3.4.0
+465
View File
@@ -0,0 +1,465 @@
"""
SynthID Watermark Extraction Benchmark Suite
Comprehensive benchmarking for watermark extraction and removal:
1. Detection accuracy across image types
2. Removal quality (PSNR, SSIM)
3. Re-detection test (verify watermark is removed)
4. Performance metrics
Usage:
python benchmark_extraction.py --input-dir /path/to/images --codebook codebook.pkl
"""
import os
import sys
import json
import time
import argparse
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass, asdict
from pathlib import Path
import numpy as np
import cv2
from collections import defaultdict
# Add parent directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from robust_extractor import RobustSynthIDExtractor, DetectionResult
from watermark_remover import WatermarkRemover, RemovalResult
@dataclass
class BenchmarkResults:
"""Results from benchmarking run."""
n_images: int
detection_rate: float
avg_confidence: float
avg_correlation: float
avg_phase_match: float
removal_success_rate: float
avg_psnr: float
avg_ssim: float
avg_confidence_drop: float
re_detection_rate: float
total_time: float
avg_time_per_image: float
details: Dict
class BenchmarkSuite:
"""
Comprehensive benchmark suite for SynthID extraction and removal.
"""
def __init__(
self,
codebook_path: Optional[str] = None,
verbose: bool = True
):
"""
Initialize benchmark suite.
Args:
codebook_path: Path to pre-extracted codebook
verbose: Print progress during benchmarking
"""
self.verbose = verbose
self.extractor = RobustSynthIDExtractor()
self.remover = None
if codebook_path and os.path.exists(codebook_path):
self.extractor.load_codebook(codebook_path)
self.remover = WatermarkRemover(extractor=self.extractor)
def log(self, message: str):
"""Print message if verbose."""
if self.verbose:
print(message)
def load_images(
self,
image_dir: str,
sample_size: Optional[int] = None,
extensions: set = {'.png', '.jpg', '.jpeg', '.webp'}
) -> List[Tuple[str, np.ndarray]]:
"""Load images from directory."""
images = []
for fname in sorted(os.listdir(image_dir)):
if os.path.splitext(fname)[1].lower() in extensions:
path = os.path.join(image_dir, fname)
img = cv2.imread(path)
if img is not None:
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
images.append((path, img_rgb))
if sample_size and len(images) >= sample_size:
break
return images
def benchmark_detection(
self,
images: List[Tuple[str, np.ndarray]]
) -> Dict:
"""
Benchmark detection accuracy.
Returns:
Dict with detection metrics
"""
self.log(f"\n{'='*60}")
self.log("DETECTION BENCHMARK")
self.log(f"{'='*60}")
results = []
start_time = time.time()
for i, (path, img) in enumerate(images):
try:
result = self.extractor.detect_array(img)
results.append({
'path': path,
'is_watermarked': result.is_watermarked,
'confidence': result.confidence,
'correlation': result.correlation,
'phase_match': result.phase_match,
'structure_ratio': result.structure_ratio,
'carrier_strength': result.carrier_strength,
})
except Exception as e:
self.log(f" Error processing {path}: {e}")
results.append({
'path': path,
'error': str(e)
})
if (i + 1) % 10 == 0:
self.log(f" Processed {i+1}/{len(images)} images...")
elapsed = time.time() - start_time
# Compute statistics
valid_results = [r for r in results if 'error' not in r]
detected = [r for r in valid_results if r['is_watermarked']]
detection_rate = len(detected) / len(valid_results) if valid_results else 0
avg_confidence = np.mean([r['confidence'] for r in valid_results]) if valid_results else 0
avg_correlation = np.mean([r['correlation'] for r in valid_results]) if valid_results else 0
avg_phase_match = np.mean([r['phase_match'] for r in valid_results]) if valid_results else 0
self.log(f"\n Detection Rate: {detection_rate:.1%}")
self.log(f" Avg Confidence: {avg_confidence:.4f}")
self.log(f" Avg Correlation: {avg_correlation:.4f}")
self.log(f" Avg Phase Match: {avg_phase_match:.4f}")
self.log(f" Time: {elapsed:.2f}s ({elapsed/len(images):.3f}s per image)")
return {
'n_images': len(images),
'n_valid': len(valid_results),
'n_detected': len(detected),
'detection_rate': detection_rate,
'avg_confidence': avg_confidence,
'avg_correlation': avg_correlation,
'avg_phase_match': avg_phase_match,
'elapsed_seconds': elapsed,
'results': results
}
def benchmark_removal(
self,
images: List[Tuple[str, np.ndarray]],
output_dir: Optional[str] = None,
save_samples: int = 5
) -> Dict:
"""
Benchmark removal quality.
Args:
images: List of (path, image) tuples
output_dir: Directory to save sample cleaned images
save_samples: Number of sample images to save
Returns:
Dict with removal metrics
"""
if self.remover is None:
return {'error': 'No remover initialized (need codebook)'}
self.log(f"\n{'='*60}")
self.log("REMOVAL BENCHMARK")
self.log(f"{'='*60}")
results = []
start_time = time.time()
if output_dir:
os.makedirs(output_dir, exist_ok=True)
for i, (path, img) in enumerate(images):
try:
result = self.remover.remove(img, verify=True)
entry = {
'path': path,
'success': result.success,
'psnr': result.psnr,
'ssim': result.ssim,
'removal_confidence': result.removal_confidence,
'original_watermarked': result.original_detection.is_watermarked,
'original_confidence': result.original_detection.confidence,
'cleaned_watermarked': result.cleaned_detection.is_watermarked if result.cleaned_detection else None,
'cleaned_confidence': result.cleaned_detection.confidence if result.cleaned_detection else None,
}
results.append(entry)
# Save sample outputs
if output_dir and i < save_samples:
fname = os.path.basename(path)
out_path = os.path.join(output_dir, f"cleaned_{fname}")
cv2.imwrite(out_path, cv2.cvtColor(result.cleaned_image, cv2.COLOR_RGB2BGR))
except Exception as e:
self.log(f" Error processing {path}: {e}")
results.append({
'path': path,
'error': str(e)
})
if (i + 1) % 10 == 0:
self.log(f" Processed {i+1}/{len(images)} images...")
elapsed = time.time() - start_time
# Compute statistics
valid_results = [r for r in results if 'error' not in r]
successful = [r for r in valid_results if r['success']]
re_detected = [r for r in valid_results if r.get('cleaned_watermarked', True)]
removal_success_rate = len(successful) / len(valid_results) if valid_results else 0
avg_psnr = np.mean([r['psnr'] for r in valid_results]) if valid_results else 0
avg_ssim = np.mean([r['ssim'] for r in valid_results]) if valid_results else 0
# Confidence drop
conf_drops = []
for r in valid_results:
if r.get('cleaned_confidence') is not None:
drop = r['original_confidence'] - r['cleaned_confidence']
conf_drops.append(drop)
avg_conf_drop = np.mean(conf_drops) if conf_drops else 0
re_detection_rate = len(re_detected) / len(valid_results) if valid_results else 0
self.log(f"\n Removal Success Rate: {removal_success_rate:.1%}")
self.log(f" Avg PSNR: {avg_psnr:.2f} dB")
self.log(f" Avg SSIM: {avg_ssim:.4f}")
self.log(f" Avg Confidence Drop: {avg_conf_drop:.4f}")
self.log(f" Re-detection Rate: {re_detection_rate:.1%}")
self.log(f" Time: {elapsed:.2f}s ({elapsed/len(images):.3f}s per image)")
return {
'n_images': len(images),
'n_valid': len(valid_results),
'n_successful': len(successful),
'removal_success_rate': removal_success_rate,
'avg_psnr': avg_psnr,
'avg_ssim': avg_ssim,
'avg_confidence_drop': avg_conf_drop,
're_detection_rate': re_detection_rate,
'elapsed_seconds': elapsed,
'results': results
}
def run_full_benchmark(
self,
image_dir: str,
sample_size: Optional[int] = None,
output_dir: Optional[str] = None,
save_report: Optional[str] = None
) -> BenchmarkResults:
"""
Run complete benchmark suite.
Args:
image_dir: Directory containing watermarked images
sample_size: Max images to test (None for all)
output_dir: Directory to save cleaned samples
save_report: Path to save JSON report
Returns:
BenchmarkResults
"""
self.log(f"\n{'='*60}")
self.log("SYNTHID EXTRACTION BENCHMARK SUITE")
self.log(f"{'='*60}")
self.log(f"Image directory: {image_dir}")
self.log(f"Sample size: {sample_size or 'all'}")
# Load images
self.log("\nLoading images...")
images = self.load_images(image_dir, sample_size)
self.log(f"Loaded {len(images)} images")
if not images:
raise ValueError("No images found in directory")
# Run benchmarks
total_start = time.time()
detection_results = self.benchmark_detection(images)
removal_results = self.benchmark_removal(images, output_dir)
total_time = time.time() - total_start
# Compile results
results = BenchmarkResults(
n_images=len(images),
detection_rate=detection_results['detection_rate'],
avg_confidence=detection_results['avg_confidence'],
avg_correlation=detection_results['avg_correlation'],
avg_phase_match=detection_results['avg_phase_match'],
removal_success_rate=removal_results.get('removal_success_rate', 0),
avg_psnr=removal_results.get('avg_psnr', 0),
avg_ssim=removal_results.get('avg_ssim', 0),
avg_confidence_drop=removal_results.get('avg_confidence_drop', 0),
re_detection_rate=removal_results.get('re_detection_rate', 0),
total_time=total_time,
avg_time_per_image=total_time / len(images),
details={
'detection': detection_results,
'removal': removal_results
}
)
# Print summary
self.log(f"\n{'='*60}")
self.log("BENCHMARK SUMMARY")
self.log(f"{'='*60}")
self.log(f"Images Tested: {results.n_images}")
self.log(f"")
self.log(f"Detection:")
self.log(f" Rate: {results.detection_rate:.1%}")
self.log(f" Confidence: {results.avg_confidence:.4f}")
self.log(f"")
self.log(f"Removal:")
self.log(f" Success Rate: {results.removal_success_rate:.1%}")
self.log(f" PSNR: {results.avg_psnr:.2f} dB")
self.log(f" SSIM: {results.avg_ssim:.4f}")
self.log(f" Re-detection: {results.re_detection_rate:.1%}")
self.log(f"")
self.log(f"Performance:")
self.log(f" Total Time: {results.total_time:.2f}s")
self.log(f" Per Image: {results.avg_time_per_image:.3f}s")
self.log(f"{'='*60}")
# Save report
if save_report:
report = asdict(results)
# Remove large result arrays for JSON
if 'details' in report:
for key in ['detection', 'removal']:
if key in report['details']:
report['details'][key].pop('results', None)
with open(save_report, 'w') as f:
json.dump(report, f, indent=2)
self.log(f"\nReport saved to: {save_report}")
return results
def compare_with_original(
image_dir: str,
original_codebook: str,
robust_codebook: str,
sample_size: int = 50
):
"""
Compare original vs robust extractor performance.
"""
print("\n" + "=" * 60)
print("COMPARISON: Original vs Robust Extractor")
print("=" * 60)
# Original extractor (using same interface)
from synthid_codebook_extractor import detect_synthid
# Robust extractor
robust = RobustSynthIDExtractor()
robust.load_codebook(robust_codebook)
# Load images
extensions = {'.png', '.jpg', '.jpeg', '.webp'}
images = []
for fname in sorted(os.listdir(image_dir)):
if os.path.splitext(fname)[1].lower() in extensions:
path = os.path.join(image_dir, fname)
images.append(path)
if len(images) >= sample_size:
break
print(f"Testing on {len(images)} images...")
# Compare
original_detected = 0
robust_detected = 0
for path in images:
# Original
try:
orig_result = detect_synthid(path, original_codebook)
if orig_result['is_watermarked']:
original_detected += 1
except:
pass
# Robust
try:
robust_result = robust.detect(path)
if robust_result.is_watermarked:
robust_detected += 1
except:
pass
print(f"\nResults:")
print(f" Original Extractor: {original_detected}/{len(images)} ({100*original_detected/len(images):.1f}%)")
print(f" Robust Extractor: {robust_detected}/{len(images)} ({100*robust_detected/len(images):.1f}%)")
print(f" Improvement: {robust_detected - original_detected} more detected")
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='SynthID Extraction Benchmark Suite')
parser.add_argument('--input-dir', type=str, required=True,
help='Directory with watermarked images')
parser.add_argument('--codebook', type=str, required=True,
help='Path to codebook file')
parser.add_argument('--sample-size', type=int, default=None,
help='Number of images to test (default: all)')
parser.add_argument('--output-dir', type=str, default=None,
help='Directory to save cleaned samples')
parser.add_argument('--output-report', type=str, default='benchmark_results.json',
help='Path to save JSON report')
parser.add_argument('--quiet', action='store_true',
help='Reduce output verbosity')
args = parser.parse_args()
suite = BenchmarkSuite(
codebook_path=args.codebook,
verbose=not args.quiet
)
results = suite.run_full_benchmark(
image_dir=args.input_dir,
sample_size=args.sample_size,
output_dir=args.output_dir,
save_report=args.output_report
)
File diff suppressed because it is too large Load Diff
+628
View File
@@ -0,0 +1,628 @@
"""
SynthID Watermark Remover Signature-Based Approach
Uses watermark signatures extracted from pure black/white Gemini images
to perform targeted watermark subtraction, combined with JPEG compression
for maximum effectiveness.
Key findings from analysis:
- Pure black images reveal the exact watermark as pixel values > 0
- 24/25 black images share the same pattern (r=0.74), indicating a fixed key
- JPEG Q50 + Signature subtraction gives 15-19% phase reduction at 34-38dB PSNR
- The watermark is content-adaptive, but has a fixed structural component
"""
import os
import sys
import io
import json
import numpy as np
import cv2
from PIL import Image
from scipy.ndimage import zoom
from dataclasses import dataclass, field
from typing import Optional, Dict, Tuple
@dataclass
class RemovalResult:
"""Result of watermark removal."""
success: bool
cleaned_image: np.ndarray
psnr: float
ssim: float
detection_before: Optional[Dict] = None
detection_after: Optional[Dict] = None
method: str = ''
details: Dict = field(default_factory=dict)
class WatermarkRemover:
"""
SynthID watermark remover using extracted signatures.
Approach:
1. Load pre-extracted watermark signature from pure black/white Gemini images
2. Resize signature to match target image
3. Subtract signature from image (disrupts fixed watermark component)
4. Apply JPEG compression (disrupts remaining adaptive component)
"""
def __init__(
self,
signature_dir: str = None,
extractor=None
):
"""
Args:
signature_dir: Path to directory containing signature .npy files
extractor: RobustSynthIDExtractor instance for verification
"""
self.extractor = extractor
self.signature = None
self.white_signature = None
self.meta = None
if signature_dir:
self.load_signature(signature_dir)
def load_signature(self, signature_dir: str):
"""Load watermark signature from pre-extracted files."""
black_path = os.path.join(signature_dir, 'synthid_black_signature.npy')
white_path = os.path.join(signature_dir, 'synthid_white_signature.npy')
meta_path = os.path.join(signature_dir, 'signature_meta.json')
if os.path.exists(black_path):
self.signature = np.load(black_path)
if os.path.exists(white_path):
self.white_signature = np.load(white_path)
if os.path.exists(meta_path):
with open(meta_path) as f:
self.meta = json.load(f)
def extract_signature_from_images(
self,
black_dir: str = None,
white_dir: str = None,
output_dir: str = None
):
"""
Extract watermark signature directly from pure black/white Gemini images.
On a pure black image, every pixel > 0 IS the watermark.
On a pure white image, every pixel < 255 IS the watermark.
"""
import glob
if black_dir:
black_files = sorted(glob.glob(os.path.join(black_dir, '*.png')))
print(f"Found {len(black_files)} black images")
# Load all and cluster by correlation to find main group
all_wms = []
for f in black_files:
img = cv2.imread(f)
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
all_wms.append(img_rgb.astype(np.float32))
# Simple clustering: find the majority group
n = len(all_wms)
if n > 2:
# Check pairwise correlation of flattened binary masks
binary_wms = [(wm > 0).astype(np.float32).ravel() for wm in all_wms]
corr_matrix = np.zeros((n, n))
for i in range(n):
for j in range(i+1, n):
c = np.corrcoef(binary_wms[i], binary_wms[j])[0, 1]
corr_matrix[i, j] = c
corr_matrix[j, i] = c
# Find largest group with r > 0.5
groups = []
visited = set()
for i in range(n):
if i in visited:
continue
group = [i]
for j in range(i+1, n):
if j not in visited and corr_matrix[i, j] > 0.5:
group.append(j)
for g in group:
visited.add(g)
groups.append(group)
# Use the largest group
main_group = max(groups, key=len)
print(f"Main group: {len(main_group)} images (excluded {n - len(main_group)} outliers)")
else:
main_group = list(range(n))
self.signature = np.mean([all_wms[i] for i in main_group], axis=0)
print(f"Signature shape: {self.signature.shape}")
if white_dir:
white_files = sorted(glob.glob(os.path.join(white_dir, '*.png')))
print(f"Found {len(white_files)} white images")
white_wms = []
for f in white_files:
img = cv2.imread(f)
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
white_wms.append(255.0 - img_rgb.astype(np.float32))
self.white_signature = np.mean(white_wms, axis=0)
# Save if output directory specified
if output_dir:
os.makedirs(output_dir, exist_ok=True)
if self.signature is not None:
np.save(os.path.join(output_dir, 'synthid_black_signature.npy'), self.signature)
if self.white_signature is not None:
np.save(os.path.join(output_dir, 'synthid_white_signature.npy'), self.white_signature)
meta = {
'black_shape': list(self.signature.shape) if self.signature is not None else None,
'white_shape': list(self.white_signature.shape) if self.white_signature is not None else None,
'recommended_subtraction_scale': 1.0,
'recommended_jpeg_quality': 50,
}
with open(os.path.join(output_dir, 'signature_meta.json'), 'w') as f:
json.dump(meta, f, indent=2)
print(f"Saved to {output_dir}")
def _resize_signature(self, target_h: int, target_w: int) -> np.ndarray:
"""Resize signature to match target image dimensions."""
if self.signature is None:
raise ValueError("No signature loaded. Call load_signature() first.")
sig_h, sig_w = self.signature.shape[:2]
if sig_h == target_h and sig_w == target_w:
return self.signature
scale_y = target_h / sig_h
scale_x = target_w / sig_w
return zoom(self.signature, (scale_y, scale_x, 1), order=1)
@staticmethod
def _jpeg_compress(image: np.ndarray, quality: int = 50) -> np.ndarray:
"""Apply JPEG compression/decompression."""
img_uint8 = np.clip(image, 0, 255).astype(np.uint8)
pil_img = Image.fromarray(img_uint8, mode='RGB')
buf = io.BytesIO()
pil_img.save(buf, format='JPEG', quality=quality)
buf.seek(0)
return np.array(Image.open(buf)).astype(np.float32)
@staticmethod
def compute_psnr(original: np.ndarray, modified: np.ndarray) -> float:
"""Compute Peak Signal-to-Noise Ratio."""
mse = np.mean((original.astype(float) - modified.astype(float)) ** 2)
if mse == 0:
return float('inf')
return float(10 * np.log10(255.0 ** 2 / mse))
@staticmethod
def compute_ssim(original: np.ndarray, modified: np.ndarray) -> float:
"""Compute simplified SSIM."""
from scipy import ndimage
C1, C2 = (0.01 * 255) ** 2, (0.03 * 255) ** 2
orig_f = original.astype(np.float64)
mod_f = modified.astype(np.float64)
mu1 = ndimage.uniform_filter(orig_f, size=11)
mu2 = ndimage.uniform_filter(mod_f, size=11)
sigma1_sq = ndimage.uniform_filter(orig_f ** 2, size=11) - mu1 ** 2
sigma2_sq = ndimage.uniform_filter(mod_f ** 2, size=11) - mu2 ** 2
sigma12 = ndimage.uniform_filter(orig_f * mod_f, size=11) - mu1 * mu2
ssim_map = ((2 * mu1 * mu2 + C1) * (2 * sigma12 + C2)) / \
((mu1 ** 2 + mu2 ** 2 + C1) * (sigma1_sq + sigma2_sq + C2))
return float(np.mean(ssim_map))
def remove(
self,
image: np.ndarray,
mode: str = 'balanced',
verify: bool = True,
strength: str = 'aggressive'
) -> RemovalResult:
"""
Remove SynthID watermark from image.
Args:
image: Input image (RGB, uint8)
mode: 'light', 'balanced', 'aggressive', 'maximum', or 'combined_worst'
verify: Whether to verify removal with detection
Returns:
RemovalResult with cleaned image and metrics
"""
# V2 combined worst-case mode — delegates to bypass_v2 pipeline
if mode == 'combined_worst':
return self._remove_combined_worst(image, verify=verify, strength=strength)
img_f = image.astype(np.float32)
h, w = img_f.shape[:2]
# Get mode parameters
params = self._get_mode_params(mode)
# Resize signature
resized_sig = self._resize_signature(h, w)
# Initial detection
detection_before = None
if verify and self.extractor is not None:
result = self.extractor.detect_array(image)
detection_before = {
'is_watermarked': result.is_watermarked,
'confidence': result.confidence,
'phase_match': result.phase_match
}
# Apply removal pipeline
current = img_f.copy()
method_parts = []
# Step 1: JPEG compression (if first)
if params.get('jpeg_first', False):
current = self._jpeg_compress(current, quality=params['jpeg_quality'])
method_parts.append(f"JPEG_Q{params['jpeg_quality']}")
# Step 2: Signature subtraction
if params['subtract_scale'] > 0:
current = current - resized_sig * params['subtract_scale']
current = np.clip(current, 0, 255)
method_parts.append(f"Sub_{params['subtract_scale']}x")
# Step 3: JPEG compression (if after subtraction)
if params.get('jpeg_after', False):
current = self._jpeg_compress(current, quality=params['jpeg_quality'])
method_parts.append(f"JPEG_Q{params['jpeg_quality']}")
# Step 4: Additional JPEG passes
for _ in range(params.get('extra_jpeg_passes', 0)):
q = params.get('extra_jpeg_quality', 60)
current = self._jpeg_compress(current, quality=q)
method_parts.append(f"JPEG_Q{q}")
# Final cleanup
cleaned = np.clip(current, 0, 255).astype(np.uint8)
# Quality metrics
psnr = self.compute_psnr(image, cleaned)
ssim = self.compute_ssim(image, cleaned)
# Final detection
detection_after = None
if verify and self.extractor is not None:
result = self.extractor.detect_array(cleaned)
detection_after = {
'is_watermarked': result.is_watermarked,
'confidence': result.confidence,
'phase_match': result.phase_match
}
# Determine success
success = psnr > 28
if detection_before and detection_after:
phase_drop = detection_before['phase_match'] - detection_after['phase_match']
success = success and (phase_drop > 0.05 or not detection_after['is_watermarked'])
method = ' + '.join(method_parts)
return RemovalResult(
success=success,
cleaned_image=cleaned,
psnr=psnr,
ssim=ssim,
detection_before=detection_before,
detection_after=detection_after,
method=method,
details={'mode': mode, 'params': params}
)
def _remove_combined_worst(
self,
image: np.ndarray,
verify: bool = True,
strength: str = 'aggressive'
) -> RemovalResult:
"""
Combined worst-case removal using bypass_v2 pipeline.
This is the v2 approach that stacks transforms from multiple
categories (spatial, quality, noise, color, overlay) to exploit
SynthID's weakness against combined transforms.
"""
from synthid_bypass import SynthIDBypass
bypass = SynthIDBypass(extractor=self.extractor)
result = bypass.bypass_v2(image, strength=strength, verify=verify)
return RemovalResult(
success=result.success,
cleaned_image=result.cleaned_image,
psnr=result.psnr,
ssim=result.ssim,
detection_before=result.detection_before,
detection_after=result.detection_after,
method=f'combined_worst_{strength}',
details={
'mode': 'combined_worst',
'strength': strength,
'stages': result.stages_applied,
'v2_details': result.details
}
)
def _get_mode_params(self, mode: str) -> Dict:
"""Get parameters for each removal mode."""
if mode == 'light':
return {
'subtract_scale': 0.5,
'jpeg_first': False,
'jpeg_after': True,
'jpeg_quality': 65,
'extra_jpeg_passes': 0,
}
elif mode == 'aggressive':
return {
'subtract_scale': 2.0,
'jpeg_first': True,
'jpeg_after': True,
'jpeg_quality': 50,
'extra_jpeg_passes': 0,
}
elif mode == 'maximum':
return {
'subtract_scale': 5.0,
'jpeg_first': True,
'jpeg_after': True,
'jpeg_quality': 50,
'extra_jpeg_passes': 1,
'extra_jpeg_quality': 55,
}
else: # balanced (default)
return {
'subtract_scale': 1.0,
'jpeg_first': True,
'jpeg_after': False,
'jpeg_quality': 50,
'extra_jpeg_passes': 0,
}
def remove_file(
self,
input_path: str,
output_path: str,
mode: str = 'balanced',
verify: bool = True,
strength: str = 'aggressive'
) -> RemovalResult:
"""Remove watermark from image file and save result."""
img = cv2.imread(input_path)
if img is None:
raise ValueError(f"Could not load image: {input_path}")
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
result = self.remove(img_rgb, mode=mode, verify=verify, strength=strength)
os.makedirs(os.path.dirname(output_path) or '.', exist_ok=True)
cv2.imwrite(output_path, cv2.cvtColor(result.cleaned_image, cv2.COLOR_RGB2BGR))
return result
def batch_remove(
self,
input_dir: str,
output_dir: str,
mode: str = 'balanced',
verify: bool = True,
limit: int = None,
strength: str = 'aggressive'
):
"""Remove watermark from all images in a directory."""
import glob
os.makedirs(output_dir, exist_ok=True)
extensions = ['*.png', '*.jpg', '*.jpeg', '*.webp']
files = []
for ext in extensions:
files.extend(glob.glob(os.path.join(input_dir, ext)))
files = sorted(files)
if limit:
files = files[:limit]
print(f"Processing {len(files)} images in {mode} mode")
if mode == 'combined_worst':
print(f"Strength: {strength}")
print("=" * 70)
results = []
for i, f in enumerate(files):
basename = os.path.basename(f)
output_path = os.path.join(output_dir, basename)
try:
result = self.remove_file(f, output_path, mode=mode, verify=verify, strength=strength)
results.append(result)
if verify and result.detection_before and result.detection_after:
before = result.detection_before['phase_match']
after = result.detection_after['phase_match']
drop = (before - after) / before * 100
det_before = '' if result.detection_before['is_watermarked'] else ''
det_after = '' if result.detection_after['is_watermarked'] else ''
print(f" [{i+1}/{len(files)}] {basename:20s} | {det_before}{det_after} | "
f"phase: {before:.3f}{after:.3f} ({drop:+5.1f}%) | PSNR: {result.psnr:.1f}dB")
else:
print(f" [{i+1}/{len(files)}] {basename:20s} | PSNR: {result.psnr:.1f}dB")
except Exception as e:
print(f" [{i+1}/{len(files)}] {basename:20s} | ERROR: {e}")
# Summary
if results and verify:
drops = []
successes = 0
for r in results:
if r.detection_before and r.detection_after:
before = r.detection_before['phase_match']
after = r.detection_after['phase_match']
drops.append((before - after) / before * 100)
if not r.detection_after['is_watermarked']:
successes += 1
print("=" * 70)
print(f"Results: {len(results)} images processed")
if drops:
print(f" Average phase drop: {np.mean(drops):.1f}%")
print(f" Best phase drop: {max(drops):.1f}%")
print(f" Undetected: {successes}/{len(results)}")
print(f" Average PSNR: {np.mean([r.psnr for r in results]):.1f}dB")
return results
# ================================================================
# CLI INTERFACE
# ================================================================
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(
description='SynthID Watermark Remover (Signature-Based)',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Remove watermark from a single image
python watermark_remover.py remove input.png output.png --signature artifacts/signature/
# Batch remove from directory
python watermark_remover.py batch /path/to/images/ /path/to/output/ --signature artifacts/signature/
# Extract signature from pure images
python watermark_remover.py extract --black assets/black/gemini/ --white assets/white/gemini/ -o artifacts/signature/
"""
)
subparsers = parser.add_subparsers(dest='command', help='Command')
# Remove command
remove_parser = subparsers.add_parser('remove', help='Remove watermark from image')
remove_parser.add_argument('input', help='Input image path')
remove_parser.add_argument('output', help='Output image path')
remove_parser.add_argument('--signature', '-s', default='artifacts/signature/',
help='Path to signature directory')
remove_parser.add_argument('--mode', '-m', default='balanced',
choices=['light', 'balanced', 'aggressive', 'maximum', 'combined_worst'],
help='Removal mode')
remove_parser.add_argument('--strength', default='aggressive',
choices=['moderate', 'aggressive', 'maximum'],
help='Strength for combined_worst mode')
remove_parser.add_argument('--codebook', '-c', default=None,
help='Codebook path for verification')
remove_parser.add_argument('--no-verify', action='store_true',
help='Skip verification')
# Batch command
batch_parser = subparsers.add_parser('batch', help='Batch remove watermarks')
batch_parser.add_argument('input_dir', help='Input directory')
batch_parser.add_argument('output_dir', help='Output directory')
batch_parser.add_argument('--signature', '-s', default='artifacts/signature/')
batch_parser.add_argument('--mode', '-m', default='balanced',
choices=['light', 'balanced', 'aggressive', 'maximum', 'combined_worst'])
batch_parser.add_argument('--strength', default='aggressive',
choices=['moderate', 'aggressive', 'maximum'])
batch_parser.add_argument('--codebook', '-c', default=None)
batch_parser.add_argument('--no-verify', action='store_true')
batch_parser.add_argument('--limit', '-n', type=int, default=None)
# Extract command
extract_parser = subparsers.add_parser('extract', help='Extract signature from pure images')
extract_parser.add_argument('--black', help='Directory of pure black Gemini images')
extract_parser.add_argument('--white', help='Directory of pure white Gemini images')
extract_parser.add_argument('-o', '--output', default='artifacts/signature/',
help='Output directory for signature')
args = parser.parse_args()
if args.command is None:
parser.print_help()
sys.exit(1)
if args.command == 'extract':
remover = WatermarkRemover()
remover.extract_signature_from_images(
black_dir=args.black,
white_dir=args.white,
output_dir=args.output
)
else:
# Load extractor for verification
extractor = None
codebook = getattr(args, 'codebook', None)
no_verify = getattr(args, 'no_verify', False)
if codebook and not no_verify:
try:
from robust_extractor import RobustSynthIDExtractor
extractor = RobustSynthIDExtractor()
extractor.load_codebook(codebook)
except Exception as e:
print(f"Warning: Could not load extractor: {e}")
sig_dir = args.signature
remover = WatermarkRemover(signature_dir=sig_dir, extractor=extractor)
strength = getattr(args, 'strength', 'aggressive')
if args.command == 'remove':
result = remover.remove_file(
args.input, args.output,
mode=args.mode, verify=not no_verify,
strength=strength
)
print("\n" + "=" * 60)
print("WATERMARK REMOVAL RESULTS")
print("=" * 60)
print(f" Mode: {args.mode}")
if args.mode == 'combined_worst':
print(f" Strength: {strength}")
print(f" Method: {result.method}")
print(f" Success: {result.success}")
print(f" PSNR: {result.psnr:.2f} dB")
print(f" SSIM: {result.ssim:.4f}")
if result.detection_before:
print(f"\n Before:")
print(f" Watermarked: {result.detection_before['is_watermarked']}")
print(f" Phase Match: {result.detection_before['phase_match']:.4f}")
if result.detection_after:
print(f"\n After:")
print(f" Watermarked: {result.detection_after['is_watermarked']}")
print(f" Phase Match: {result.detection_after['phase_match']:.4f}")
if result.detection_before:
drop = result.detection_before['phase_match'] - result.detection_after['phase_match']
pct = 100 * drop / result.detection_before['phase_match']
print(f"\n Phase Drop: {drop:.4f} ({pct:.1f}%)")
print("=" * 60)
print(f"Saved to: {args.output}")
elif args.command == 'batch':
remover.batch_remove(
args.input_dir, args.output_dir,
mode=args.mode, verify=not no_verify,
limit=args.limit,
strength=strength
)