feat(scripts): add V4 codebook build, batch dissolve, and calibration scripts

build_codebook_v4.py  — builds SpectralCodebookV4 from the hierarchical
  reverse-synthid-dataset (model × color × resolution).
dissolve_batch.py     — runs all bypass presets (gentle … nuke) over an
  input directory. Supports Round-06 'final' and 'nuke' strengths.
calibrate_from_feedback.py — updates carrier_weights from detection
  feedback, closing the human-in-the-loop calibration loop.

Made-with: Cursor
This commit is contained in:
Alosh Denny
2026-04-24 02:08:56 +05:30
parent 736d746f5a
commit 083a5eec6a
3 changed files with 724 additions and 0 deletions
+161
View File
@@ -0,0 +1,161 @@
#!/usr/bin/env python3
"""
Build the reverse-SynthID V4 codebook from a hierarchical dataset.
Expected layout::
<root>/
<model>/
black/ HxW/*.png
white/ HxW/*.png
blue/ HxW/*.png
green/ HxW/*.png
red/ HxW/*.png
gray/ HxW/*.png
gradient/ HxW/*.png
diverse/ HxW/*.png
The script produces one ``ProfileV4`` per ``(model, H, W)`` that has at least
``--min-consensus-colors`` consensus colours (``black``, ``white``, ``blue``,
``green``, ``red``, ``gray``) with enough reference images. ``gradient/`` and
``diverse/`` are used as content-baseline only, never as carrier sources.
Usage::
python scripts/build_codebook_v4.py \\
--root /Users/aoxo/vscode/reverse-synthid-data \\
--output artifacts/spectral_codebook_v4.npz
# Restrict to a single model:
python scripts/build_codebook_v4.py --root <root> --models nano-banana-pro-preview
# Also emit a 'union' pseudo-model that averages profiles across models:
python scripts/build_codebook_v4.py --root <root> --add-union
"""
from __future__ import annotations
import argparse
import os
import sys
from typing import List, Optional
REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, os.path.join(REPO_ROOT, "src", "extraction"))
from synthid_bypass_v4 import ( # noqa: E402
ALL_COLORS,
SpectralCodebookV4,
)
DEFAULT_DATASET_ROOT = "/Users/aoxo/vscode/reverse-synthid-data"
DEFAULT_OUTPUT = os.path.join(REPO_ROOT, "artifacts", "spectral_codebook_v4.npz")
def build(
root: str,
output: str,
models: Optional[List[str]] = None,
colors: Optional[List[str]] = None,
min_refs_per_color: int = 3,
min_consensus_colors: int = 3,
max_per_bucket: Optional[int] = None,
add_union: bool = False,
) -> None:
if not os.path.isdir(root):
raise FileNotFoundError(f"Dataset root not found: {root}")
codebook = SpectralCodebookV4()
codebook._bind_root(root) # type: ignore[attr-defined]
codebook.build_from_hierarchical_dataset(
root=root,
models=models,
colors=colors,
min_refs_per_color=min_refs_per_color,
min_consensus_colors=min_consensus_colors,
max_per_bucket=max_per_bucket,
verbose=True,
)
if not codebook.profiles:
print("\nNo profiles built. Check that --root points at a directory "
"containing <model>/<color>/<HxW>/*.png")
sys.exit(2)
if add_union:
codebook.add_union_profiles(verbose=True)
os.makedirs(os.path.dirname(output) if os.path.dirname(output) else ".",
exist_ok=True)
codebook.save(output)
print("\nProfiles:")
for key in sorted(codebook.profiles):
model, h, w = key
prof = codebook.profiles[key]
refs = ", ".join(
f"{c}={n}" for c, n in sorted(prof.n_refs_per_color.items())
)
print(f" {model}/{h}x{w}: {refs} (content={prof.n_content_refs})")
def main() -> None:
p = argparse.ArgumentParser(
description="Build the reverse-SynthID V4 codebook.",
)
p.add_argument(
"--root", default=DEFAULT_DATASET_ROOT,
help=(
"Hierarchical dataset root (default: "
f"{DEFAULT_DATASET_ROOT}). Should contain <model>/<color>/<HxW>/*."
),
)
p.add_argument(
"--output", default=DEFAULT_OUTPUT,
help=f"Output .npz path (default: {DEFAULT_OUTPUT}).",
)
p.add_argument(
"--models", nargs="*", default=None,
help="Restrict to these model subdirectories (default: auto-detect).",
)
p.add_argument(
"--colors", nargs="*", default=None, choices=list(ALL_COLORS),
help="Colours to include (default: all known).",
)
p.add_argument(
"--min-refs-per-color", type=int, default=3,
help="Drop (color, resolution) buckets with fewer images than this.",
)
p.add_argument(
"--min-consensus-colors", type=int, default=3,
help=(
"Require at least this many consensus colours per (model, HxW) "
"or the profile is skipped."
),
)
p.add_argument(
"--max-per-bucket", type=int, default=None,
help="Cap images per (color, resolution) bucket (default: unlimited).",
)
p.add_argument(
"--add-union", action="store_true",
help="Also emit a 'union' pseudo-model averaging across real models.",
)
args = p.parse_args()
build(
root=args.root,
output=args.output,
models=args.models,
colors=args.colors,
min_refs_per_color=args.min_refs_per_color,
min_consensus_colors=args.min_consensus_colors,
max_per_bucket=args.max_per_bucket,
add_union=args.add_union,
)
if __name__ == "__main__":
main()
+308
View File
@@ -0,0 +1,308 @@
#!/usr/bin/env python3
"""
Close the manual-validation loop for reverse-SynthID V4.
Reads the ``manifest.csv`` from ``dissolve_batch.py`` plus a ``tally.csv``
you filled by hand after checking each variant in the Gemini app. Updates
``carrier_weights`` in the V4 codebook in place:
- Bins that the **failed** variants (``still_watermarked=y``) tried to subtract
get their weights **bumped up**, so subsequent dissolves attack those bins
harder.
- Bins that the **succeeded** variants (``still_watermarked=n``) already
subtracted get their weights **damped slightly**, to recover fidelity
without giving up detector immunity.
The tally CSV accepts ``y``/``n``/``yes``/``no``/``1``/``0`` (case-insensitive)
in ``still_watermarked``. Rows with a blank value are ignored.
Usage::
python scripts/calibrate_from_feedback.py \\
--manifest runs/round_01/manifest.csv \\
--tally runs/round_01/tally.csv \\
--codebook artifacts/spectral_codebook_v4.npz \\
--step 0.25
The codebook is rewritten in place; a timestamped backup is made next to it
unless ``--no-backup`` is passed.
"""
from __future__ import annotations
import argparse
import csv
import datetime
import os
import shutil
import sys
from typing import Dict, List, Optional, Tuple
REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, os.path.join(REPO_ROOT, "src", "extraction"))
import numpy as np # noqa: E402
from synthid_bypass_v4 import SpectralCodebookV4 # noqa: E402
TRUE_TOKENS = {"y", "yes", "1", "true", "t"}
FALSE_TOKENS = {"n", "no", "0", "false", "f"}
# ---------------------------------------------------------------------------
# CSV loading
# ---------------------------------------------------------------------------
def _read_csv_dicts(path: str) -> List[Dict[str, str]]:
with open(path, newline="") as f:
return list(csv.DictReader(f))
def _parse_still_watermarked(value: str) -> Optional[bool]:
"""``y/n`` → ``True/False``; empty/unknown → ``None``."""
if value is None:
return None
v = value.strip().lower()
if v == "":
return None
if v in TRUE_TOKENS:
return True
if v in FALSE_TOKENS:
return False
return None
def load_feedback(
manifest_path: str, tally_path: str,
) -> List[Dict]:
"""Join manifest + tally on ``(source, variant)``; return labelled rows.
Only rows whose tally has a parseable ``still_watermarked`` are returned.
"""
manifest = _read_csv_dicts(manifest_path)
# Tally may be the same file as the manifest (user filled in place) or a
# separate file with at least (source, variant, still_watermarked).
tally_raw = _read_csv_dicts(tally_path)
tally: Dict[Tuple[str, str], bool] = {}
for row in tally_raw:
still = _parse_still_watermarked(row.get("still_watermarked", ""))
if still is None:
continue
key = (row["source"], row["variant"])
tally[key] = still
joined: List[Dict] = []
for row in manifest:
key = (row["source"], row["variant"])
if key not in tally:
continue
merged = dict(row)
merged["still_watermarked"] = tally[key]
joined.append(merged)
return joined
# ---------------------------------------------------------------------------
# Calibration logic
# ---------------------------------------------------------------------------
def _parse_profile_key(profile_key: str) -> Optional[Tuple[str, int, int]]:
"""Parse ``'model_name/HxW'`` → ``(model, H, W)``."""
if not profile_key or "/" not in profile_key:
return None
model, res = profile_key.rsplit("/", 1)
if "x" not in res:
return None
try:
h, w = (int(p) for p in res.lower().split("x"))
except ValueError:
return None
return (model, h, w)
def calibrate(
codebook: SpectralCodebookV4,
feedback: List[Dict],
step: float,
damp_factor: float,
consensus_floor: float,
verbose: bool,
) -> Dict[Tuple[str, int, int], Dict[str, float]]:
"""Update ``carrier_weights`` in-place. Returns per-profile summary stats.
The update rule, per profile ``P``:
Let ``F`` = number of feedback rows against ``P`` with
``still_watermarked=True`` (failed dissolves).
Let ``S`` = number with ``still_watermarked=False`` (cleared dissolves).
If ``F > 0``: scale ``carrier_weights`` by ``1 + step * (F / (F + S))``
but only on bins with ``consensus_coherence >= consensus_floor``. Non-
carrier bins are never touched — we don't want to amplify noise.
If ``F == 0 and S > 0``: scale ``carrier_weights`` by
``1 - damp_factor * step`` on carrier bins (gentle fidelity recovery
once we're clearing the detector).
"""
groups: Dict[Tuple[str, int, int], Dict[str, List[Dict]]] = {}
for row in feedback:
pkey = _parse_profile_key(row.get("profile_key", ""))
if pkey is None:
continue
bucket = groups.setdefault(pkey, {"fail": [], "pass": []})
target = "fail" if row["still_watermarked"] else "pass"
bucket[target].append(row)
summary: Dict[Tuple[str, int, int], Dict[str, float]] = {}
for pkey, bucket in groups.items():
if pkey not in codebook.profiles:
if verbose:
print(f" skip {pkey}: no matching profile in codebook")
continue
prof = codebook.profiles[pkey]
F = len(bucket["fail"])
S = len(bucket["pass"])
carrier_mask = (prof.consensus_coherence >= consensus_floor).astype(np.float32)
if F > 0:
fail_ratio = F / max(F + S, 1)
scale = 1.0 + step * fail_ratio
delta = 1.0 + (scale - 1.0) * carrier_mask
action = f"bump ×{scale:.3f}"
elif S > 0:
scale = max(1.0 - damp_factor * step, 0.2)
delta = 1.0 + (scale - 1.0) * carrier_mask
action = f"damp ×{scale:.3f}"
else:
continue
before_mean = float(np.mean(prof.carrier_weights[..., 1]))
codebook.update_carrier_weights(pkey, delta)
after_mean = float(np.mean(prof.carrier_weights[..., 1]))
summary[pkey] = {
"fail": F,
"pass": S,
"before_mean_g": before_mean,
"after_mean_g": after_mean,
"action": action,
}
if verbose:
print(f" {pkey[0]}/{pkey[1]}x{pkey[2]}: {action} "
f"fail={F} pass={S} "
f"mean(G) {before_mean:.4f}{after_mean:.4f}")
return summary
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def run(
manifest_path: str,
tally_path: str,
codebook_path: str,
step: float,
damp_factor: float,
consensus_floor: float,
backup: bool,
) -> None:
if not os.path.isfile(manifest_path):
raise FileNotFoundError(f"Manifest not found: {manifest_path}")
if not os.path.isfile(tally_path):
raise FileNotFoundError(f"Tally not found: {tally_path}")
if not os.path.isfile(codebook_path):
raise FileNotFoundError(f"Codebook not found: {codebook_path}")
feedback = load_feedback(manifest_path, tally_path)
if not feedback:
print("No usable feedback rows (empty still_watermarked?). Nothing "
"to do.")
return
print(f"Loaded {len(feedback)} labelled rows from tally.")
codebook = SpectralCodebookV4()
codebook.load(codebook_path)
if backup:
ts = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
backup_path = codebook_path + f".bak-{ts}.npz"
shutil.copyfile(codebook_path, backup_path)
print(f"Backup → {backup_path}")
summary = calibrate(
codebook=codebook,
feedback=feedback,
step=step,
damp_factor=damp_factor,
consensus_floor=consensus_floor,
verbose=True,
)
if not summary:
print("No profiles updated.")
return
codebook.save(codebook_path)
n_fail = sum(s["fail"] for s in summary.values())
n_pass = sum(s["pass"] for s in summary.values())
print(f"\nCalibration complete. Profiles updated: {len(summary)}")
print(f"Feedback: {n_pass} cleared / {n_fail} still watermarked "
f"({n_pass * 100.0 / max(n_pass + n_fail, 1):.1f}% success).")
if n_fail > 0:
print("Next: re-run dissolve_batch.py on a fresh batch; weights "
"are now stronger at persistent carriers.")
else:
print("100% cleared — consider lowering strength for better "
"fidelity on the next batch.")
def main() -> None:
p = argparse.ArgumentParser(
description=(
"Update V4 carrier_weights from manual Gemini detection tallies."
),
)
p.add_argument("--manifest", required=True,
help="Path to manifest.csv produced by dissolve_batch.py.")
p.add_argument("--tally", required=True,
help=(
"Path to tally.csv with (source, variant, "
"still_watermarked) columns. May be the manifest file "
"itself if you filled it in place."
))
p.add_argument("--codebook", required=True,
help="V4 codebook .npz to update (in place).")
p.add_argument("--step", type=float, default=0.25,
help="Base scale step; 0.25 = up to +25%% per round.")
p.add_argument("--damp-factor", type=float, default=0.15,
help="Damping multiplier applied when all variants "
"cleared (fidelity recovery).")
p.add_argument("--consensus-floor", type=float, default=0.50,
help="Only update bins with consensus_coherence >= this.")
p.add_argument("--no-backup", dest="backup", action="store_false",
help="Skip the timestamped backup of the codebook.")
p.set_defaults(backup=True)
args = p.parse_args()
run(
manifest_path=args.manifest,
tally_path=args.tally,
codebook_path=args.codebook,
step=args.step,
damp_factor=args.damp_factor,
consensus_floor=args.consensus_floor,
backup=args.backup,
)
if __name__ == "__main__":
main()
+255
View File
@@ -0,0 +1,255 @@
#!/usr/bin/env python3
"""
Phase-2 driver for the reverse-SynthID V4 manual-validation loop.
Takes an input folder of watermarked images and emits one or more strength
variants per image (``A``, ``B``, ``C``, ... by default). Writes a
``manifest.csv`` that pairs each variant with:
- source image path
- output path
- strength preset
- profile key used
- PSNR / SSIM achieved
You then paste the variants into the Gemini app, run SynthID detection, and
fill in a small ``tally.csv`` (columns: ``source,variant,still_watermarked``,
values ``y/n``). Feed both files into ``calibrate_from_feedback.py`` to
update the codebook's per-carrier weights and iterate.
Usage::
python scripts/dissolve_batch.py \\
--input /path/to/input_images \\
--output /path/to/out_dir \\
--codebook artifacts/spectral_codebook_v4.npz \\
--model nano-banana-pro-preview \\
--strengths gentle moderate aggressive
Strengths map to filesystem-safe single-letter variants (A,B,C,D) in
manifest order, which makes the tally CSV trivial to fill by hand.
"""
from __future__ import annotations
import argparse
import csv
import glob
import os
import sys
import time
from typing import List, Optional
REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, os.path.join(REPO_ROOT, "src", "extraction"))
import cv2 # noqa: E402
import numpy as np # noqa: E402
from synthid_bypass_v4 import SpectralCodebookV4, SynthIDBypassV4 # noqa: E402
IMAGE_EXTS = (".png", ".jpg", ".jpeg", ".webp")
DEFAULT_STRENGTHS = ("gentle", "moderate", "aggressive")
VARIANT_LETTERS = "ABCDEFGH"
def iter_input_images(input_path: str) -> List[str]:
"""Resolve ``--input`` (file, directory, or glob) to a sorted list."""
if os.path.isdir(input_path):
out: List[str] = []
for ext in IMAGE_EXTS:
out.extend(glob.glob(os.path.join(input_path, f"*{ext}")))
out.extend(glob.glob(os.path.join(input_path, f"*{ext.upper()}")))
return sorted(set(out))
if os.path.isfile(input_path):
return [input_path]
# Treat as a glob pattern.
return sorted(glob.glob(input_path))
def dissolve_one(
bypass: SynthIDBypassV4,
codebook: SpectralCodebookV4,
src: str,
out_dir: str,
variant_letter: str,
strength: str,
model: Optional[str],
) -> dict:
"""Dissolve one image at one strength; return a manifest row."""
base = os.path.splitext(os.path.basename(src))[0]
out_name = f"{base}__{variant_letter}_{strength}.png"
out_path = os.path.join(out_dir, out_name)
t0 = time.time()
try:
result = bypass.bypass_v4_file(
src, out_path, codebook,
strength=strength, model=model, verify=False,
)
row = {
"source": os.path.abspath(src),
"variant": variant_letter,
"strength": strength,
"output": os.path.abspath(out_path),
"profile_key": result.details["profile_key"],
"exact_match": int(bool(result.details["exact_match"])),
"psnr": round(result.psnr, 3),
"ssim": round(result.ssim, 5),
"n_passes_applied": result.details["n_passes_applied"],
"n_passes_rolled_back": result.details["n_passes_rolled_back"],
"elapsed_sec": round(time.time() - t0, 3),
"still_watermarked": "", # filled by you during validation
"notes": "",
}
except Exception as e:
row = {
"source": os.path.abspath(src),
"variant": variant_letter,
"strength": strength,
"output": "",
"profile_key": "",
"exact_match": 0,
"psnr": "",
"ssim": "",
"n_passes_applied": 0,
"n_passes_rolled_back": 0,
"elapsed_sec": round(time.time() - t0, 3),
"still_watermarked": "",
"notes": f"ERROR: {e}",
}
return row
def run(
input_path: str,
out_dir: str,
codebook_path: str,
strengths: List[str],
model: Optional[str] = None,
limit: Optional[int] = None,
manifest_name: str = "manifest.csv",
) -> str:
sources = iter_input_images(input_path)
if limit is not None:
sources = sources[:limit]
if not sources:
print(f"No images found in {input_path}")
sys.exit(2)
os.makedirs(out_dir, exist_ok=True)
codebook = SpectralCodebookV4()
codebook.load(codebook_path)
if model is not None and model not in codebook.models:
print(f"WARNING: --model {model} not found in codebook. "
f"Available: {codebook.models}. Proceeding anyway "
"(best-effort fallback across models).")
bypass = SynthIDBypassV4()
if len(strengths) > len(VARIANT_LETTERS):
raise ValueError(
f"Too many strengths ({len(strengths)}); "
f"max supported: {len(VARIANT_LETTERS)}"
)
letters = list(VARIANT_LETTERS[:len(strengths)])
manifest_path = os.path.join(out_dir, manifest_name)
fieldnames = [
"source", "variant", "strength", "output", "profile_key",
"exact_match", "psnr", "ssim",
"n_passes_applied", "n_passes_rolled_back",
"elapsed_sec", "still_watermarked", "notes",
]
print(f"Dissolving {len(sources)} image(s) × {len(strengths)} variant(s) "
f"{out_dir}")
if model:
print(f"Model hint: {model}")
rows = []
for i, src in enumerate(sources):
print(f"[{i + 1}/{len(sources)}] {os.path.basename(src)}")
for letter, strength in zip(letters, strengths):
row = dissolve_one(
bypass=bypass,
codebook=codebook,
src=src,
out_dir=out_dir,
variant_letter=letter,
strength=strength,
model=model,
)
rows.append(row)
if row["notes"].startswith("ERROR"):
print(f" {letter}/{strength:12s} {row['notes']}")
else:
print(f" {letter}/{strength:12s} "
f"psnr={row['psnr']:>6} ssim={row['ssim']:>7} "
f"profile={row['profile_key']} "
f"exact={row['exact_match']}")
with open(manifest_path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
print(f"\nManifest: {manifest_path}")
print("\nNext steps:")
print(" 1. Upload each ABS-path output to the Gemini app and run "
"SynthID detection.")
print(" 2. For each row, fill the `still_watermarked` column with "
"`y` or `n` (leave blank to skip).")
print(f" 3. Save the filled file as tally.csv and run:")
print(f" python scripts/calibrate_from_feedback.py "
f"--manifest {manifest_path} --tally <your_tally.csv> "
f"--codebook {codebook_path}")
return manifest_path
def main() -> None:
p = argparse.ArgumentParser(
description="Emit bypass variants for manual Gemini validation.",
)
p.add_argument("--input", required=True,
help="Path to an image, a directory, or a glob pattern.")
p.add_argument("--output", required=True,
help="Directory to write variants and manifest.csv into.")
p.add_argument("--codebook", required=True,
help="Path to the V4 codebook .npz.")
p.add_argument("--strengths", nargs="+", default=list(DEFAULT_STRENGTHS),
choices=["gentle", "moderate", "aggressive", "maximum",
"demolish", "annihilate", "combo",
"blog_pure", "blog_plus", "blog_combo",
"residual_pure", "residual_plus", "residual_combo",
"regen_pure", "regen_plus", "regen_combo",
"final", "nuke"],
help=f"Strengths to emit (default: {DEFAULT_STRENGTHS}).")
p.add_argument("--model", default=None,
help=(
"Optional model hint (e.g. nano-banana-pro-preview). "
"Omit to let the codebook auto-select by resolution."
))
p.add_argument("--limit", type=int, default=None,
help="Stop after this many input images (for quick tests).")
p.add_argument("--manifest-name", default="manifest.csv",
help="Manifest filename inside --output (default: manifest.csv).")
args = p.parse_args()
run(
input_path=args.input,
out_dir=args.output,
codebook_path=args.codebook,
strengths=args.strengths,
model=args.model,
limit=args.limit,
manifest_name=args.manifest_name,
)
if __name__ == "__main__":
main()