From 083a5eec6a542847dda16990bc9ed3bc5e8c2bd0 Mon Sep 17 00:00:00 2001 From: Alosh Denny Date: Fri, 24 Apr 2026 02:08:56 +0530 Subject: [PATCH] feat(scripts): add V4 codebook build, batch dissolve, and calibration scripts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit build_codebook_v4.py — builds SpectralCodebookV4 from the hierarchical reverse-synthid-dataset (model × color × resolution). dissolve_batch.py — runs all bypass presets (gentle … nuke) over an input directory. Supports Round-06 'final' and 'nuke' strengths. calibrate_from_feedback.py — updates carrier_weights from detection feedback, closing the human-in-the-loop calibration loop. Made-with: Cursor --- scripts/build_codebook_v4.py | 161 +++++++++++++++ scripts/calibrate_from_feedback.py | 308 +++++++++++++++++++++++++++++ scripts/dissolve_batch.py | 255 ++++++++++++++++++++++++ 3 files changed, 724 insertions(+) create mode 100644 scripts/build_codebook_v4.py create mode 100644 scripts/calibrate_from_feedback.py create mode 100644 scripts/dissolve_batch.py diff --git a/scripts/build_codebook_v4.py b/scripts/build_codebook_v4.py new file mode 100644 index 0000000..802eb73 --- /dev/null +++ b/scripts/build_codebook_v4.py @@ -0,0 +1,161 @@ +#!/usr/bin/env python3 +""" +Build the reverse-SynthID V4 codebook from a hierarchical dataset. + +Expected layout:: + + / + / + black/ HxW/*.png + white/ HxW/*.png + blue/ HxW/*.png + green/ HxW/*.png + red/ HxW/*.png + gray/ HxW/*.png + gradient/ HxW/*.png + diverse/ HxW/*.png + +The script produces one ``ProfileV4`` per ``(model, H, W)`` that has at least +``--min-consensus-colors`` consensus colours (``black``, ``white``, ``blue``, +``green``, ``red``, ``gray``) with enough reference images. ``gradient/`` and +``diverse/`` are used as content-baseline only, never as carrier sources. + +Usage:: + + python scripts/build_codebook_v4.py \\ + --root /Users/aoxo/vscode/reverse-synthid-data \\ + --output artifacts/spectral_codebook_v4.npz + + # Restrict to a single model: + python scripts/build_codebook_v4.py --root --models nano-banana-pro-preview + + # Also emit a 'union' pseudo-model that averages profiles across models: + python scripts/build_codebook_v4.py --root --add-union +""" + +from __future__ import annotations + +import argparse +import os +import sys +from typing import List, Optional + + +REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.insert(0, os.path.join(REPO_ROOT, "src", "extraction")) + +from synthid_bypass_v4 import ( # noqa: E402 + ALL_COLORS, + SpectralCodebookV4, +) + + +DEFAULT_DATASET_ROOT = "/Users/aoxo/vscode/reverse-synthid-data" +DEFAULT_OUTPUT = os.path.join(REPO_ROOT, "artifacts", "spectral_codebook_v4.npz") + + +def build( + root: str, + output: str, + models: Optional[List[str]] = None, + colors: Optional[List[str]] = None, + min_refs_per_color: int = 3, + min_consensus_colors: int = 3, + max_per_bucket: Optional[int] = None, + add_union: bool = False, +) -> None: + if not os.path.isdir(root): + raise FileNotFoundError(f"Dataset root not found: {root}") + + codebook = SpectralCodebookV4() + codebook._bind_root(root) # type: ignore[attr-defined] + codebook.build_from_hierarchical_dataset( + root=root, + models=models, + colors=colors, + min_refs_per_color=min_refs_per_color, + min_consensus_colors=min_consensus_colors, + max_per_bucket=max_per_bucket, + verbose=True, + ) + + if not codebook.profiles: + print("\nNo profiles built. Check that --root points at a directory " + "containing ///*.png") + sys.exit(2) + + if add_union: + codebook.add_union_profiles(verbose=True) + + os.makedirs(os.path.dirname(output) if os.path.dirname(output) else ".", + exist_ok=True) + codebook.save(output) + + print("\nProfiles:") + for key in sorted(codebook.profiles): + model, h, w = key + prof = codebook.profiles[key] + refs = ", ".join( + f"{c}={n}" for c, n in sorted(prof.n_refs_per_color.items()) + ) + print(f" {model}/{h}x{w}: {refs} (content={prof.n_content_refs})") + + +def main() -> None: + p = argparse.ArgumentParser( + description="Build the reverse-SynthID V4 codebook.", + ) + p.add_argument( + "--root", default=DEFAULT_DATASET_ROOT, + help=( + "Hierarchical dataset root (default: " + f"{DEFAULT_DATASET_ROOT}). Should contain ///*." + ), + ) + p.add_argument( + "--output", default=DEFAULT_OUTPUT, + help=f"Output .npz path (default: {DEFAULT_OUTPUT}).", + ) + p.add_argument( + "--models", nargs="*", default=None, + help="Restrict to these model subdirectories (default: auto-detect).", + ) + p.add_argument( + "--colors", nargs="*", default=None, choices=list(ALL_COLORS), + help="Colours to include (default: all known).", + ) + p.add_argument( + "--min-refs-per-color", type=int, default=3, + help="Drop (color, resolution) buckets with fewer images than this.", + ) + p.add_argument( + "--min-consensus-colors", type=int, default=3, + help=( + "Require at least this many consensus colours per (model, HxW) " + "or the profile is skipped." + ), + ) + p.add_argument( + "--max-per-bucket", type=int, default=None, + help="Cap images per (color, resolution) bucket (default: unlimited).", + ) + p.add_argument( + "--add-union", action="store_true", + help="Also emit a 'union' pseudo-model averaging across real models.", + ) + args = p.parse_args() + + build( + root=args.root, + output=args.output, + models=args.models, + colors=args.colors, + min_refs_per_color=args.min_refs_per_color, + min_consensus_colors=args.min_consensus_colors, + max_per_bucket=args.max_per_bucket, + add_union=args.add_union, + ) + + +if __name__ == "__main__": + main() diff --git a/scripts/calibrate_from_feedback.py b/scripts/calibrate_from_feedback.py new file mode 100644 index 0000000..93500f5 --- /dev/null +++ b/scripts/calibrate_from_feedback.py @@ -0,0 +1,308 @@ +#!/usr/bin/env python3 +""" +Close the manual-validation loop for reverse-SynthID V4. + +Reads the ``manifest.csv`` from ``dissolve_batch.py`` plus a ``tally.csv`` +you filled by hand after checking each variant in the Gemini app. Updates +``carrier_weights`` in the V4 codebook in place: + +- Bins that the **failed** variants (``still_watermarked=y``) tried to subtract + get their weights **bumped up**, so subsequent dissolves attack those bins + harder. +- Bins that the **succeeded** variants (``still_watermarked=n``) already + subtracted get their weights **damped slightly**, to recover fidelity + without giving up detector immunity. + +The tally CSV accepts ``y``/``n``/``yes``/``no``/``1``/``0`` (case-insensitive) +in ``still_watermarked``. Rows with a blank value are ignored. + +Usage:: + + python scripts/calibrate_from_feedback.py \\ + --manifest runs/round_01/manifest.csv \\ + --tally runs/round_01/tally.csv \\ + --codebook artifacts/spectral_codebook_v4.npz \\ + --step 0.25 + +The codebook is rewritten in place; a timestamped backup is made next to it +unless ``--no-backup`` is passed. +""" + +from __future__ import annotations + +import argparse +import csv +import datetime +import os +import shutil +import sys +from typing import Dict, List, Optional, Tuple + + +REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.insert(0, os.path.join(REPO_ROOT, "src", "extraction")) + +import numpy as np # noqa: E402 + +from synthid_bypass_v4 import SpectralCodebookV4 # noqa: E402 + + +TRUE_TOKENS = {"y", "yes", "1", "true", "t"} +FALSE_TOKENS = {"n", "no", "0", "false", "f"} + + +# --------------------------------------------------------------------------- +# CSV loading +# --------------------------------------------------------------------------- + +def _read_csv_dicts(path: str) -> List[Dict[str, str]]: + with open(path, newline="") as f: + return list(csv.DictReader(f)) + + +def _parse_still_watermarked(value: str) -> Optional[bool]: + """``y/n`` → ``True/False``; empty/unknown → ``None``.""" + if value is None: + return None + v = value.strip().lower() + if v == "": + return None + if v in TRUE_TOKENS: + return True + if v in FALSE_TOKENS: + return False + return None + + +def load_feedback( + manifest_path: str, tally_path: str, +) -> List[Dict]: + """Join manifest + tally on ``(source, variant)``; return labelled rows. + + Only rows whose tally has a parseable ``still_watermarked`` are returned. + """ + manifest = _read_csv_dicts(manifest_path) + + # Tally may be the same file as the manifest (user filled in place) or a + # separate file with at least (source, variant, still_watermarked). + tally_raw = _read_csv_dicts(tally_path) + tally: Dict[Tuple[str, str], bool] = {} + for row in tally_raw: + still = _parse_still_watermarked(row.get("still_watermarked", "")) + if still is None: + continue + key = (row["source"], row["variant"]) + tally[key] = still + + joined: List[Dict] = [] + for row in manifest: + key = (row["source"], row["variant"]) + if key not in tally: + continue + merged = dict(row) + merged["still_watermarked"] = tally[key] + joined.append(merged) + return joined + + +# --------------------------------------------------------------------------- +# Calibration logic +# --------------------------------------------------------------------------- + +def _parse_profile_key(profile_key: str) -> Optional[Tuple[str, int, int]]: + """Parse ``'model_name/HxW'`` → ``(model, H, W)``.""" + if not profile_key or "/" not in profile_key: + return None + model, res = profile_key.rsplit("/", 1) + if "x" not in res: + return None + try: + h, w = (int(p) for p in res.lower().split("x")) + except ValueError: + return None + return (model, h, w) + + +def calibrate( + codebook: SpectralCodebookV4, + feedback: List[Dict], + step: float, + damp_factor: float, + consensus_floor: float, + verbose: bool, +) -> Dict[Tuple[str, int, int], Dict[str, float]]: + """Update ``carrier_weights`` in-place. Returns per-profile summary stats. + + The update rule, per profile ``P``: + + Let ``F`` = number of feedback rows against ``P`` with + ``still_watermarked=True`` (failed dissolves). + Let ``S`` = number with ``still_watermarked=False`` (cleared dissolves). + + If ``F > 0``: scale ``carrier_weights`` by ``1 + step * (F / (F + S))`` + but only on bins with ``consensus_coherence >= consensus_floor``. Non- + carrier bins are never touched — we don't want to amplify noise. + + If ``F == 0 and S > 0``: scale ``carrier_weights`` by + ``1 - damp_factor * step`` on carrier bins (gentle fidelity recovery + once we're clearing the detector). + """ + groups: Dict[Tuple[str, int, int], Dict[str, List[Dict]]] = {} + for row in feedback: + pkey = _parse_profile_key(row.get("profile_key", "")) + if pkey is None: + continue + bucket = groups.setdefault(pkey, {"fail": [], "pass": []}) + target = "fail" if row["still_watermarked"] else "pass" + bucket[target].append(row) + + summary: Dict[Tuple[str, int, int], Dict[str, float]] = {} + + for pkey, bucket in groups.items(): + if pkey not in codebook.profiles: + if verbose: + print(f" skip {pkey}: no matching profile in codebook") + continue + prof = codebook.profiles[pkey] + F = len(bucket["fail"]) + S = len(bucket["pass"]) + + carrier_mask = (prof.consensus_coherence >= consensus_floor).astype(np.float32) + + if F > 0: + fail_ratio = F / max(F + S, 1) + scale = 1.0 + step * fail_ratio + delta = 1.0 + (scale - 1.0) * carrier_mask + action = f"bump ×{scale:.3f}" + elif S > 0: + scale = max(1.0 - damp_factor * step, 0.2) + delta = 1.0 + (scale - 1.0) * carrier_mask + action = f"damp ×{scale:.3f}" + else: + continue + + before_mean = float(np.mean(prof.carrier_weights[..., 1])) + codebook.update_carrier_weights(pkey, delta) + after_mean = float(np.mean(prof.carrier_weights[..., 1])) + + summary[pkey] = { + "fail": F, + "pass": S, + "before_mean_g": before_mean, + "after_mean_g": after_mean, + "action": action, + } + if verbose: + print(f" {pkey[0]}/{pkey[1]}x{pkey[2]}: {action} " + f"fail={F} pass={S} " + f"mean(G) {before_mean:.4f} → {after_mean:.4f}") + + return summary + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + +def run( + manifest_path: str, + tally_path: str, + codebook_path: str, + step: float, + damp_factor: float, + consensus_floor: float, + backup: bool, +) -> None: + if not os.path.isfile(manifest_path): + raise FileNotFoundError(f"Manifest not found: {manifest_path}") + if not os.path.isfile(tally_path): + raise FileNotFoundError(f"Tally not found: {tally_path}") + if not os.path.isfile(codebook_path): + raise FileNotFoundError(f"Codebook not found: {codebook_path}") + + feedback = load_feedback(manifest_path, tally_path) + if not feedback: + print("No usable feedback rows (empty still_watermarked?). Nothing " + "to do.") + return + + print(f"Loaded {len(feedback)} labelled rows from tally.") + + codebook = SpectralCodebookV4() + codebook.load(codebook_path) + + if backup: + ts = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") + backup_path = codebook_path + f".bak-{ts}.npz" + shutil.copyfile(codebook_path, backup_path) + print(f"Backup → {backup_path}") + + summary = calibrate( + codebook=codebook, + feedback=feedback, + step=step, + damp_factor=damp_factor, + consensus_floor=consensus_floor, + verbose=True, + ) + + if not summary: + print("No profiles updated.") + return + + codebook.save(codebook_path) + + n_fail = sum(s["fail"] for s in summary.values()) + n_pass = sum(s["pass"] for s in summary.values()) + print(f"\nCalibration complete. Profiles updated: {len(summary)}") + print(f"Feedback: {n_pass} cleared / {n_fail} still watermarked " + f"({n_pass * 100.0 / max(n_pass + n_fail, 1):.1f}% success).") + if n_fail > 0: + print("Next: re-run dissolve_batch.py on a fresh batch; weights " + "are now stronger at persistent carriers.") + else: + print("100% cleared — consider lowering strength for better " + "fidelity on the next batch.") + + +def main() -> None: + p = argparse.ArgumentParser( + description=( + "Update V4 carrier_weights from manual Gemini detection tallies." + ), + ) + p.add_argument("--manifest", required=True, + help="Path to manifest.csv produced by dissolve_batch.py.") + p.add_argument("--tally", required=True, + help=( + "Path to tally.csv with (source, variant, " + "still_watermarked) columns. May be the manifest file " + "itself if you filled it in place." + )) + p.add_argument("--codebook", required=True, + help="V4 codebook .npz to update (in place).") + p.add_argument("--step", type=float, default=0.25, + help="Base scale step; 0.25 = up to +25%% per round.") + p.add_argument("--damp-factor", type=float, default=0.15, + help="Damping multiplier applied when all variants " + "cleared (fidelity recovery).") + p.add_argument("--consensus-floor", type=float, default=0.50, + help="Only update bins with consensus_coherence >= this.") + p.add_argument("--no-backup", dest="backup", action="store_false", + help="Skip the timestamped backup of the codebook.") + p.set_defaults(backup=True) + args = p.parse_args() + + run( + manifest_path=args.manifest, + tally_path=args.tally, + codebook_path=args.codebook, + step=args.step, + damp_factor=args.damp_factor, + consensus_floor=args.consensus_floor, + backup=args.backup, + ) + + +if __name__ == "__main__": + main() diff --git a/scripts/dissolve_batch.py b/scripts/dissolve_batch.py new file mode 100644 index 0000000..ea58862 --- /dev/null +++ b/scripts/dissolve_batch.py @@ -0,0 +1,255 @@ +#!/usr/bin/env python3 +""" +Phase-2 driver for the reverse-SynthID V4 manual-validation loop. + +Takes an input folder of watermarked images and emits one or more strength +variants per image (``A``, ``B``, ``C``, ... by default). Writes a +``manifest.csv`` that pairs each variant with: + +- source image path +- output path +- strength preset +- profile key used +- PSNR / SSIM achieved + +You then paste the variants into the Gemini app, run SynthID detection, and +fill in a small ``tally.csv`` (columns: ``source,variant,still_watermarked``, +values ``y/n``). Feed both files into ``calibrate_from_feedback.py`` to +update the codebook's per-carrier weights and iterate. + +Usage:: + + python scripts/dissolve_batch.py \\ + --input /path/to/input_images \\ + --output /path/to/out_dir \\ + --codebook artifacts/spectral_codebook_v4.npz \\ + --model nano-banana-pro-preview \\ + --strengths gentle moderate aggressive + +Strengths map to filesystem-safe single-letter variants (A,B,C,D) in +manifest order, which makes the tally CSV trivial to fill by hand. +""" + +from __future__ import annotations + +import argparse +import csv +import glob +import os +import sys +import time +from typing import List, Optional + + +REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.insert(0, os.path.join(REPO_ROOT, "src", "extraction")) + +import cv2 # noqa: E402 +import numpy as np # noqa: E402 + +from synthid_bypass_v4 import SpectralCodebookV4, SynthIDBypassV4 # noqa: E402 + + +IMAGE_EXTS = (".png", ".jpg", ".jpeg", ".webp") +DEFAULT_STRENGTHS = ("gentle", "moderate", "aggressive") +VARIANT_LETTERS = "ABCDEFGH" + + +def iter_input_images(input_path: str) -> List[str]: + """Resolve ``--input`` (file, directory, or glob) to a sorted list.""" + if os.path.isdir(input_path): + out: List[str] = [] + for ext in IMAGE_EXTS: + out.extend(glob.glob(os.path.join(input_path, f"*{ext}"))) + out.extend(glob.glob(os.path.join(input_path, f"*{ext.upper()}"))) + return sorted(set(out)) + if os.path.isfile(input_path): + return [input_path] + # Treat as a glob pattern. + return sorted(glob.glob(input_path)) + + +def dissolve_one( + bypass: SynthIDBypassV4, + codebook: SpectralCodebookV4, + src: str, + out_dir: str, + variant_letter: str, + strength: str, + model: Optional[str], +) -> dict: + """Dissolve one image at one strength; return a manifest row.""" + base = os.path.splitext(os.path.basename(src))[0] + out_name = f"{base}__{variant_letter}_{strength}.png" + out_path = os.path.join(out_dir, out_name) + + t0 = time.time() + try: + result = bypass.bypass_v4_file( + src, out_path, codebook, + strength=strength, model=model, verify=False, + ) + row = { + "source": os.path.abspath(src), + "variant": variant_letter, + "strength": strength, + "output": os.path.abspath(out_path), + "profile_key": result.details["profile_key"], + "exact_match": int(bool(result.details["exact_match"])), + "psnr": round(result.psnr, 3), + "ssim": round(result.ssim, 5), + "n_passes_applied": result.details["n_passes_applied"], + "n_passes_rolled_back": result.details["n_passes_rolled_back"], + "elapsed_sec": round(time.time() - t0, 3), + "still_watermarked": "", # filled by you during validation + "notes": "", + } + except Exception as e: + row = { + "source": os.path.abspath(src), + "variant": variant_letter, + "strength": strength, + "output": "", + "profile_key": "", + "exact_match": 0, + "psnr": "", + "ssim": "", + "n_passes_applied": 0, + "n_passes_rolled_back": 0, + "elapsed_sec": round(time.time() - t0, 3), + "still_watermarked": "", + "notes": f"ERROR: {e}", + } + return row + + +def run( + input_path: str, + out_dir: str, + codebook_path: str, + strengths: List[str], + model: Optional[str] = None, + limit: Optional[int] = None, + manifest_name: str = "manifest.csv", +) -> str: + sources = iter_input_images(input_path) + if limit is not None: + sources = sources[:limit] + if not sources: + print(f"No images found in {input_path}") + sys.exit(2) + + os.makedirs(out_dir, exist_ok=True) + + codebook = SpectralCodebookV4() + codebook.load(codebook_path) + + if model is not None and model not in codebook.models: + print(f"WARNING: --model {model} not found in codebook. " + f"Available: {codebook.models}. Proceeding anyway " + "(best-effort fallback across models).") + + bypass = SynthIDBypassV4() + + if len(strengths) > len(VARIANT_LETTERS): + raise ValueError( + f"Too many strengths ({len(strengths)}); " + f"max supported: {len(VARIANT_LETTERS)}" + ) + letters = list(VARIANT_LETTERS[:len(strengths)]) + + manifest_path = os.path.join(out_dir, manifest_name) + fieldnames = [ + "source", "variant", "strength", "output", "profile_key", + "exact_match", "psnr", "ssim", + "n_passes_applied", "n_passes_rolled_back", + "elapsed_sec", "still_watermarked", "notes", + ] + + print(f"Dissolving {len(sources)} image(s) × {len(strengths)} variant(s) " + f"→ {out_dir}") + if model: + print(f"Model hint: {model}") + + rows = [] + for i, src in enumerate(sources): + print(f"[{i + 1}/{len(sources)}] {os.path.basename(src)}") + for letter, strength in zip(letters, strengths): + row = dissolve_one( + bypass=bypass, + codebook=codebook, + src=src, + out_dir=out_dir, + variant_letter=letter, + strength=strength, + model=model, + ) + rows.append(row) + if row["notes"].startswith("ERROR"): + print(f" {letter}/{strength:12s} {row['notes']}") + else: + print(f" {letter}/{strength:12s} " + f"psnr={row['psnr']:>6} ssim={row['ssim']:>7} " + f"profile={row['profile_key']} " + f"exact={row['exact_match']}") + + with open(manifest_path, "w", newline="") as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(rows) + + print(f"\nManifest: {manifest_path}") + print("\nNext steps:") + print(" 1. Upload each ABS-path output to the Gemini app and run " + "SynthID detection.") + print(" 2. For each row, fill the `still_watermarked` column with " + "`y` or `n` (leave blank to skip).") + print(f" 3. Save the filled file as tally.csv and run:") + print(f" python scripts/calibrate_from_feedback.py " + f"--manifest {manifest_path} --tally " + f"--codebook {codebook_path}") + return manifest_path + + +def main() -> None: + p = argparse.ArgumentParser( + description="Emit bypass variants for manual Gemini validation.", + ) + p.add_argument("--input", required=True, + help="Path to an image, a directory, or a glob pattern.") + p.add_argument("--output", required=True, + help="Directory to write variants and manifest.csv into.") + p.add_argument("--codebook", required=True, + help="Path to the V4 codebook .npz.") + p.add_argument("--strengths", nargs="+", default=list(DEFAULT_STRENGTHS), + choices=["gentle", "moderate", "aggressive", "maximum", + "demolish", "annihilate", "combo", + "blog_pure", "blog_plus", "blog_combo", + "residual_pure", "residual_plus", "residual_combo", + "regen_pure", "regen_plus", "regen_combo", + "final", "nuke"], + help=f"Strengths to emit (default: {DEFAULT_STRENGTHS}).") + p.add_argument("--model", default=None, + help=( + "Optional model hint (e.g. nano-banana-pro-preview). " + "Omit to let the codebook auto-select by resolution." + )) + p.add_argument("--limit", type=int, default=None, + help="Stop after this many input images (for quick tests).") + p.add_argument("--manifest-name", default="manifest.csv", + help="Manifest filename inside --output (default: manifest.csv).") + args = p.parse_args() + + run( + input_path=args.input, + out_dir=args.output, + codebook_path=args.codebook, + strengths=args.strengths, + model=args.model, + limit=args.limit, + manifest_name=args.manifest_name, + ) + + +if __name__ == "__main__": + main()