Files
remove-ai-watermarks/scripts/visible_removal_audit.py
T
Victor Kuznetsov e7fb64dca1 fix(gemini): remove more-opaque sparkles via per-image alpha gain
The captured sparkle alpha peaks ~0.51, but some real Gemini sparkles are
rendered more opaque. The fixed-alpha reverse blend then UNDER-subtracts and
leaves a bright residual the detector still fires on. A visible-removal audit
through the registry path on the spaces corpus showed this as a meaningful
fraction of marks -- all under-removals, not a background-brightness class
(failures and successes had the same input confidence and background luma; the
discriminator was the removal delta itself).

remove_watermark now estimates a per-image alpha gain (_estimate_alpha_gain:
effective sparkle opacity at the bright core vs the local background ring,
a_eff/a_cap, clamped [1.0, 1.94]) and scales the alpha to match before the
over-sub/blend branch. A 1.05 deadband keeps a sparkle that already matches the
capture byte-identical to the pre-fix output, so the fix is purely additive
(0 regressions on the audit set; failures dropped substantially). The over-sub
guard still runs on the scaled alpha as the safety net for an over-shoot.

- _estimate_alpha_gain + _ALPHA_GAIN_MAX/_DEADBAND/_CORE_FRAC in gemini_engine.
- TestUnderSubtractionGain asserts on footprint pixels, NOT the detector (its
  NCC is degenerate on a flat synthetic bg; the real corpus removal drops the
  detector ~0.80 -> ~0.27).
- scripts/visible_removal_audit.py: the detect -> remove -> re-detect audit tool
  that found and validated this (operates on gitignored data/spaces only).
- CLAUDE.md + README: document the under-subtraction gain.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-03 19:48:40 -07:00

139 lines
5.4 KiB
Python

"""Audit visible-watermark removal over a local image corpus.
For every image the registry detects a known visible mark in, run that mark's
removal and re-detect on the output, recording before/after confidence and
whether the detector still fires. Also bucket the detected-positive originals
into per-mark dataset dirs so the visible-mark corpora are reproducible.
Detector-clean after removal is necessary but, for the Doubao/Jimeng text marks,
NOT sufficient (their NCC detector is fooled by a thin residual outline -- see
CLAUDE.md). Treat a detector-clean Doubao/Jimeng as "detector passes"; visual
residual is a separate check.
Operates on gitignored data only (data/spaces/...); writes nothing tracked.
uv run python scripts/visible_removal_audit.py \
--corpus data/spaces/originals --out data/spaces/_visible_audit.csv \
--dataset-root data/spaces/_visible_datasets
"""
from __future__ import annotations
import csv
import logging
import shutil
from pathlib import Path
import click
from remove_ai_watermarks import image_io
from remove_ai_watermarks.watermark_registry import detect_marks, get_mark
log = logging.getLogger(__name__)
_EXTS = {".png", ".jpg", ".jpeg", ".webp", ".avif", ".heic"}
def _rel(p: Path, corpus: Path) -> str:
try:
return str(p.relative_to(corpus))
except ValueError:
return p.name
@click.command()
@click.option(
"--corpus", type=click.Path(exists=True, file_okay=False, path_type=Path), default=Path("data/spaces/originals")
)
@click.option("--out", type=click.Path(path_type=Path), default=Path("data/spaces/_visible_audit.csv"))
@click.option("--dataset-root", type=click.Path(path_type=Path), default=Path("data/spaces/_visible_datasets"))
@click.option(
"--paths-file",
type=click.Path(exists=True, path_type=Path),
default=None,
help="Audit only these paths (one per line), skipping the full rglob.",
)
@click.option("--limit", type=int, default=0, help="Scan at most N files (0 = all).")
def main(corpus: Path, out: Path, dataset_root: Path, paths_file: Path | None, limit: int) -> None:
logging.basicConfig(level=logging.WARNING, format="%(message)s")
if paths_file is not None:
files = [Path(s) for line in paths_file.read_text().splitlines() if (s := line.strip()) and Path(s).is_file()]
else:
files = sorted(p for p in corpus.rglob("*") if p.is_file() and p.suffix.lower() in _EXTS)
if limit:
files = files[:limit]
click.echo(f"Scanning {len(files)} files under {corpus} ...")
rows: list[dict[str, str]] = []
n_detected = 0
n_clean_after = 0
fails: list[tuple[str, str, float]] = []
with click.progressbar(files, label="audit") as bar:
for p in bar:
img = image_io.imread(p)
if img is None:
continue
for det in detect_marks(img, include_explicit=False):
if not det.detected:
continue
n_detected += 1
mark = get_mark(det.key)
# Bucket the positive original into the per-mark dataset.
ddir = dataset_root / det.key
ddir.mkdir(parents=True, exist_ok=True)
if not (ddir / p.name).exists():
shutil.copy2(p, ddir / p.name)
# Remove, then re-detect with the SAME mark's detector.
try:
cleaned, _ = mark.remove(img)
after = mark.detect(cleaned)
except Exception as exc:
log.warning("remove failed on %s (%s): %s", p.name, det.key, exc)
rows.append(
{
"path": _rel(p, corpus),
"mark": det.key,
"conf_before": f"{det.confidence:.3f}",
"conf_after": "",
"removed": "error",
}
)
continue
removed = not after.detected
n_clean_after += int(removed)
if not removed:
fails.append((_rel(p, corpus), det.key, after.confidence))
rows.append(
{
"path": _rel(p, corpus),
"mark": det.key,
"conf_before": f"{det.confidence:.3f}",
"conf_after": f"{after.confidence:.3f}",
"removed": str(removed),
}
)
out.parent.mkdir(parents=True, exist_ok=True)
with out.open("w", newline="") as f:
w = csv.DictWriter(f, fieldnames=["path", "mark", "conf_before", "conf_after", "removed"])
w.writeheader()
w.writerows(rows)
by_mark: dict[str, list[bool]] = {}
for r in rows:
if r["removed"] in ("True", "False"):
by_mark.setdefault(r["mark"], []).append(r["removed"] == "True")
click.echo(f"\nDetected positives: {n_detected}; detector-clean after removal: {n_clean_after}")
for k, v in sorted(by_mark.items()):
click.echo(f" {k:8} removed {sum(v)}/{len(v)} ({100 * sum(v) // max(1, len(v))}%)")
if fails:
click.echo(f"\nDetector still fires after removal ({len(fails)}):")
for path, key, conf in fails[:30]:
click.echo(f" {key:8} {conf:.3f} {path}")
click.echo(f"\nReport: {out} | Datasets: {dataset_root}/<mark>/")
if __name__ == "__main__":
main()