Files
remove-ai-watermarks/src/remove_ai_watermarks/cli.py
T
Victor Kuznetsov b1189549b8 feat(invisible): controlnet default, unified strength, retire --auto, add --model/--guidance-scale
Overhaul the diffusion-removal surface around a single robust default and a
complete, consistent CLI.

Pipeline + strength:
- controlnet is now the DEFAULT pipeline (CLI --pipeline + both engine ctors).
  With the certified higher strength it clears both photoreal and flat-graphic
  content, whereas plain SDXL left SynthID on flat graphics.
- Rename the plain-SDXL profile default -> sdxl; "default" stays as a back-compat
  alias (normalize_profile + a click callback that warns).
- Unify the strength ladder: resolve_strength applies ONE vendor-adaptive ladder
  (the certified controlnet floors OpenAI 0.20 / Google 0.30 / unknown 0.30) to
  both pipelines. sdxl is the weaker remover on its own hard case (flat fills),
  so the certified floor is the right floor for it too.

CLI completeness:
- Add --model (HF model id) to invisible + batch (was only on all) and
  --guidance-scale (CFG) to all three diffusion commands; both were library
  knobs the CLI did not expose.
- Flip --adaptive-polish to ON by default (it self-gates to a no-op where there
  is no detail deficit, so default-on is safe).
- Share --pipeline / --strength / --model / --guidance-scale as single
  decorators so invisible/all/batch keep an identical surface; the --strength
  help is derived from the strength constants (strength_default_help) so it can
  never drift from the ladder.

Removals:
- Delete the auto_config content-detection planner + its YuNet/DBNet assets
  (~2.6 MB): with controlnet always the pipeline and the polish self-gating, the
  face/text/edge detection no longer changed behavior. --auto is now a deprecated
  no-op that only warns (the polish it enabled is the default).

Docs (README, CLAUDE.md, docs/synthid.md) updated throughout; added an
InvisibleEngine Python API example. Tests cover the alias warnings, the
polish default, and the --model/--guidance-scale wiring.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-09 12:40:45 -07:00

1276 lines
45 KiB
Python

"""Unified CLI for remove-ai-watermarks.
Provides commands for:
- Visible watermark removal (Gemini sparkle) - works offline, fast
- Invisible watermark removal (SynthID etc.) - requires GPU/diffusion models
- AI metadata stripping - lightweight, no ML deps needed
"""
from __future__ import annotations
import contextlib
import json
import logging
import time
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal
import click
from remove_ai_watermarks import __version__, watermark_registry
from remove_ai_watermarks.noai.watermark_profiles import (
resolve_strength,
strength_default_help,
vendor_for_strength,
)
if TYPE_CHECKING:
from collections.abc import Generator
from numpy.typing import NDArray
from remove_ai_watermarks.gemini_engine import DetectionResult
# --- plain-text output layer (replaces rich: no colors, no markup, no boxes) ---
class _Table:
"""Plain-text stand-in for rich.Table."""
def __init__(self, *args: Any, title: str | None = None, **kwargs: Any) -> None:
self._title = title
self._headers: list[str] = []
self._rows: list[list[str]] = []
def add_column(self, header: str = "", *args: Any, **kwargs: Any) -> None:
self._headers.append(str(header))
def add_row(self, *cells: Any) -> None:
self._rows.append([str(c) for c in cells])
def render(self) -> str:
lines: list[str] = []
if self._title:
lines.append(self._title)
if any(self._headers):
lines.append(" ".join(self._headers))
lines.extend(" ".join(row) for row in self._rows)
return "\n".join(f" {line}" for line in lines)
class _Progress:
"""No-op stand-in for rich.Progress; results are printed directly instead."""
def __init__(self, *args: Any, **kwargs: Any) -> None:
pass
def __enter__(self) -> _Progress:
return self
def __exit__(self, *exc: object) -> bool:
return False
def add_task(self, *args: Any, **kwargs: Any) -> int:
return 0
def advance(self, *args: Any, **kwargs: Any) -> None:
pass
def update(self, *args: Any, **kwargs: Any) -> None:
pass
class _Console:
"""Minimal plain-text replacement for rich.Console."""
def print(self, *objects: Any, **kwargs: Any) -> None:
click.echo(" ".join(o.render() if isinstance(o, _Table) else str(o) for o in objects))
@contextlib.contextmanager
def status(self, message: str = "", **kwargs: Any) -> Generator[None, None, None]:
if message:
click.echo(message)
yield
def _panel(text: str = "", *args: Any, **kwargs: Any) -> str:
return text
def _column(*args: Any, **kwargs: Any) -> None:
return None
Panel = _panel
Table = _Table
Progress = _Progress
SpinnerColumn = BarColumn = TextColumn = TimeElapsedColumn = _column
console = _Console()
SUPPORTED_FORMATS = {".png", ".jpg", ".jpeg", ".webp"}
def _setup_logging(verbose: bool) -> None:
level = logging.DEBUG if verbose else logging.WARNING
logging.basicConfig(
level=level,
format="%(name)s | %(message)s",
handlers=[logging.StreamHandler()],
)
def _banner() -> None:
console.print(
Panel(
f"Remove-AI-Watermarks v{__version__}\nVisible & invisible watermark removal",
border_style="cyan",
padding=(0, 2),
)
)
def _validate_image(path: Path) -> Path:
if not path.exists():
console.print(f"Error: File not found: {path}")
raise SystemExit(1)
if path.suffix.lower() not in SUPPORTED_FORMATS:
console.print(f"Warning: {path.suffix} may not be supported (expected: {', '.join(SUPPORTED_FORMATS)})")
return path
_ALPHA_FORMATS = {".png", ".webp"}
# Shared option decorator for commands that run the invisible-watermark pipeline.
# Both cmd_invisible and cmd_all expose this flag; defining it once avoids
# copy-paste drift.
_controlnet_scale_option = click.option(
"--controlnet-scale",
type=float,
default=1.0,
help="ControlNet conditioning scale (structure/text preservation strength); "
"applies to the controlnet pipeline (the default). Higher = closer to original structure.",
)
_min_resolution_option = click.option(
"--min-resolution",
type=int,
default=1024,
help="Upscale long side UP to this (px) before diffusion when the input is smaller, so SDXL runs "
"near 1024 (small inputs distort at native); output is restored to the input size. 0 = off. Default 1024.",
)
_unsharp_option = click.option(
"--unsharp", type=float, default=0.0, help="Unsharp-mask sharpening strength (0 = off, typical: 0.3-0.8)."
)
_upscaler_option = click.option(
"--upscaler",
type=click.Choice(["lanczos", "esrgan"]),
default="lanczos",
help="How to upscale a small input to the --min-resolution floor: lanczos (default, cv2, no deps) or "
"esrgan (Real-ESRGAN via the 'esrgan' extra; better detail, slower on CPU). Best for photo/texture "
"content -- as a generic GAN with no face/glyph prior it can degrade faces (diffusion mitigates) and "
"thin text, so lanczos stays the default. Falls back to lanczos if the extra is absent. Only when upscaling.",
)
_auto_option = click.option(
"--auto",
is_flag=True,
default=False,
help="DEPRECATED: controlnet is already the default pipeline, so --auto now only "
"enables --adaptive-polish (the content detectors were removed). Use "
"--adaptive-polish instead.",
)
_adaptive_polish_option = click.option(
"--adaptive-polish/--no-adaptive-polish",
default=True,
help="Restore the input's detail level after removal (capped unsharp + edge-masked grain "
"targeting the input's sharpness, sparing text), countering the over-smoothed look. ON by "
"default; it self-limits where there is no detail deficit (text/flat graphics), so it is a "
"no-op there. Pass --no-adaptive-polish to disable. Independent of --unsharp/--humanize.",
)
# HuggingFace model + CFG knobs, shared by the diffusion commands (invisible/all/batch)
# so the surface stays identical across them.
_model_option = click.option(
"--model",
type=str,
default=None,
help="HuggingFace model ID for the diffusion pipeline. Default: the SDXL base checkpoint.",
)
_guidance_scale_option = click.option(
"--guidance-scale",
type=float,
default=None,
help="Classifier-free guidance scale (CFG). Default: 7.5 (the library default). "
"Lower = follow the prompt less / stay closer to the input.",
)
def _normalize_pipeline(ctx: click.Context, param: click.Parameter, value: str | None) -> str | None:
"""Resolve the legacy ``default`` profile name to ``sdxl`` (click option callback).
Emits a one-line deprecation notice when the user explicitly passes the outdated
``default`` value, pointing at the two current choices (``sdxl`` / ``controlnet``).
"""
if value is None:
return None
from remove_ai_watermarks.noai.watermark_profiles import normalize_profile
normalized = normalize_profile(value)
if value.strip().lower() == "default":
click.echo(
"Warning: --pipeline default is deprecated and maps to 'sdxl'. "
"Use --pipeline sdxl (plain SDXL) or --pipeline controlnet (the default).",
err=True,
)
return normalized
# ``controlnet`` (the default-SELECTED value) and ``sdxl`` (plain SDXL img2img) are the
# two current profiles; ``default`` is an OUTDATED back-compat alias for ``sdxl``
# (warned + normalized away by _normalize_pipeline).
_PIPELINE_CHOICES = ["sdxl", "controlnet", "default"]
_PIPELINE_HELP = (
"Pipeline profile. controlnet (DEFAULT) = SDXL + canny ControlNet that preserves "
"text/faces via edge conditioning while removing SynthID; sdxl = plain SDXL img2img "
"(lighter, no extra model download, but leaves SynthID on flat-graphic content). "
"('default' is an OUTDATED alias for 'sdxl' -- use sdxl or controlnet.)"
)
# Shared --pipeline / --strength decorators so the three diffusion commands
# (invisible/all/batch) keep an identical surface and the strength help can never
# drift from the watermark_profiles constants (strength_default_help derives it).
_pipeline_option = click.option(
"--pipeline",
type=click.Choice(_PIPELINE_CHOICES),
default="controlnet",
callback=_normalize_pipeline,
help=_PIPELINE_HELP,
)
_strength_option = click.option(
"--strength",
type=float,
default=None,
help=f"Denoising strength (0.0-1.0). Default: {strength_default_help()}.",
)
def _resolve_auto_polish(auto: bool, adaptive_polish: bool) -> bool:
"""Warn on the retired ``--auto`` flag, returning ``adaptive_polish`` unchanged.
``--auto`` used to plan the pipeline + polish from content detection, but the
pipeline is now always controlnet (the default) and the adaptive polish is ON by
default (it self-gates by detail level), so the content detectors were removed and
``--auto`` is now a no-op alias: the polish it used to enable is already the default,
and an explicit ``--no-adaptive-polish`` still wins. So it only emits a deprecation
warning and passes ``adaptive_polish`` through.
"""
if auto:
click.echo(
"Warning: --auto is deprecated and now does nothing (the adaptive polish it "
"enabled is ON by default). Use --no-adaptive-polish to turn the polish off.",
err=True,
)
return adaptive_polish
def _warn_if_esrgan_unavailable(upscaler: str) -> None:
"""Tell the user once if ``--upscaler esrgan`` will silently fall back to Lanczos.
The engine downgrades to Lanczos when the ``esrgan`` extra is absent (fail-safe, so
a batch never breaks mid-run) -- but without this notice the user would believe
Real-ESRGAN ran. Surfaced at the CLI layer, once per invocation (not per image).
"""
if upscaler != "esrgan":
return
from remove_ai_watermarks import upscaler as _upscaler
if not _upscaler.is_available():
console.print(" Note: --upscaler esrgan needs the 'esrgan' extra; falling back to Lanczos.")
def _watermark_region(det: DetectionResult, width: int, height: int) -> tuple[int, int, int, int]:
"""Pick a watermark bbox: detector's region if confident, else the default config slot."""
if det.confidence > 0.15:
return det.region
from remove_ai_watermarks.gemini_engine import get_watermark_config
config = get_watermark_config(width, height)
px, py = config.get_position(width, height)
return (px, py, config.logo_size, config.logo_size)
def _read_bgr_and_alpha(path: Path) -> tuple[NDArray[Any] | None, NDArray[Any] | None]:
"""Read an image preserving its alpha channel separately.
Returns ``(bgr, alpha)`` where ``alpha`` is a single-channel ndarray when the
source has transparency, else ``None``. Greyscale inputs are promoted to BGR.
Returns ``(None, None)`` if the image cannot be decoded.
"""
import cv2
from remove_ai_watermarks import image_io
image = image_io.imread(path, cv2.IMREAD_UNCHANGED)
if image is None:
return None, None
if image.ndim == 2:
return cv2.cvtColor(image, cv2.COLOR_GRAY2BGR), None
if image.shape[2] == 4:
return image[:, :, :3].copy(), image[:, :, 3].copy()
return image, None
def _write_bgr_with_alpha(
path: Path,
bgr: NDArray[Any],
alpha: NDArray[Any] | None,
) -> None:
"""Write BGR (with optional alpha) to ``path``.
When ``alpha`` is provided and the output extension supports it, the original
alpha plane is rejoined unchanged. The watermark region is NOT made
transparent: reverse-alpha (and inpaint) recover real pixels there, so
zeroing alpha would punch a transparent hole that renders as a white box on
any non-transparent viewer (issue #30). Preserving the input alpha keeps
genuinely transparent backgrounds intact without inventing new holes.
"""
import numpy as np
from remove_ai_watermarks import image_io
if alpha is None or path.suffix.lower() not in _ALPHA_FORMATS:
image_io.imwrite(path, bgr)
return
bgra = np.dstack([bgr, alpha])
image_io.imwrite(path, bgra)
# -- Main group -------------------------------------------------------
@click.group(invoke_without_command=True)
@click.version_option(__version__, prog_name="remove-ai-watermarks")
@click.option("-v", "--verbose", is_flag=True, help="Enable verbose logging.")
@click.pass_context
def main(ctx: click.Context, verbose: bool) -> None:
"""Remove visible and invisible AI watermarks from images."""
from dotenv import load_dotenv
load_dotenv() # Load .env (e.g. HF_TOKEN)
ctx.ensure_object(dict)
ctx.obj["verbose"] = verbose
_setup_logging(verbose)
if ctx.invoked_subcommand is None:
_banner()
click.echo(ctx.get_help())
# -- Visible (Gemini) watermark removal -------------------------------
@main.command("visible")
@click.argument("source", type=click.Path(exists=True, path_type=Path))
@click.option(
"-o", "--output", type=click.Path(path_type=Path), default=None, help="Output path (default: <source>_clean.<ext>)."
)
@click.option("--inpaint/--no-inpaint", default=True, help="Apply inpainting cleanup after removal.")
@click.option(
"--inpaint-method", type=click.Choice(["ns", "telea", "gaussian"]), default="ns", help="Inpainting method."
)
@click.option("--inpaint-strength", type=float, default=0.85, help="Inpainting blend strength (0.0-1.0).")
@click.option("--detect/--no-detect", default=True, help="Detect watermark before removal.")
@click.option(
"--mark",
type=click.Choice(["auto", *watermark_registry.mark_keys()]),
default="auto",
help="Which known visible mark to target (auto picks the strongest detected). "
"All marks are removed by exact reverse-alpha against a captured alpha map.",
)
@click.option("--strip-metadata/--keep-metadata", default=True, help="Strip AI metadata from output.")
@click.pass_context
def cmd_visible(
ctx: click.Context,
source: Path,
output: Path | None,
inpaint: bool,
inpaint_method: Literal["ns", "telea", "gaussian"],
inpaint_strength: float,
detect: bool,
mark: str,
strip_metadata: bool,
) -> None:
"""Remove a known visible AI watermark from an image.
Finds a known mark in its usual place (Gemini sparkle / Doubao text) via the
watermark registry and removes it by exact reverse-alpha against a captured
alpha map -- recovering the true pixels, not an inpaint guess. ``--mark auto``
picks the strongest detected mark. For arbitrary logos/objects, use ``erase``.
"""
from remove_ai_watermarks import watermark_registry as registry
_banner()
source = _validate_image(source)
if output is None:
output = source.with_stem(source.stem + "_clean")
# Load image (preserving any alpha channel separately)
image, alpha = _read_bgr_and_alpha(source)
if image is None:
console.print(f"Error: Failed to read image: {source}")
raise SystemExit(1)
h, w = image.shape[:2]
console.print(f" Input: {source.name} ({w}x{h})")
# Resolve the target mark from the known-watermark registry. ``auto`` scans
# every in-auto mark in its usual place and picks the strongest; an explicit
# ``--mark <key>`` targets that one (the user asserts its presence).
if mark == "auto":
best = registry.best_auto_mark(image)
if best is None:
console.print(" Warning: No known visible mark detected (gemini / doubao).")
if detect:
console.print(" Skipping. Use --mark <name> --no-detect to force.")
raise SystemExit(0)
target = "gemini" # forced (no-detect): fall back to the default mark
else:
target = best.key
console.print(f" Mark auto: {best.label} ({best.location}, conf {best.confidence:.2f})")
else:
target = mark
chosen = registry.get_mark(target)
det = chosen.detect(image)
if detect and not det.detected:
console.print(f" Warning: {chosen.label} not detected (conf {det.confidence:.2f}). Use --no-detect to force.")
raise SystemExit(0)
if det.detected:
console.print(f" {chosen.label} detected ({chosen.location}, conf {det.confidence:.2f})")
method: Literal["telea", "ns"] = "ns" if inpaint_method == "ns" else "telea"
t0 = time.monotonic()
with console.status(f"Removing {chosen.label}... ({chosen.recovery})"):
result, _ = chosen.remove(
image,
inpaint_method=method,
inpaint=inpaint,
inpaint_strength=inpaint_strength,
force=not detect,
)
elapsed = time.monotonic() - t0
# Save (rejoins the original alpha plane unchanged)
output.parent.mkdir(parents=True, exist_ok=True)
_write_bgr_with_alpha(output, result, alpha)
# Strip metadata
if strip_metadata:
try:
from remove_ai_watermarks.metadata import remove_ai_metadata
remove_ai_metadata(output, output)
except Exception as e:
if ctx.obj.get("verbose"):
console.print(f" Warning: Failed to strip metadata: {e}")
size_kb = output.stat().st_size / 1024
console.print(f" Saved: {output} ({size_kb:.0f} KB, {elapsed:.2f}s)")
# -- Universal region eraser -----------------------------------------
def _parse_region(spec: str) -> tuple[int, int, int, int]:
"""Parse an ``x,y,w,h`` region string into a 4-int tuple."""
parts = spec.replace(" ", "").split(",")
if len(parts) != 4:
raise click.BadParameter(f"region must be 'x,y,w,h', got: {spec!r}")
try:
x, y, w, h = (int(p) for p in parts)
except ValueError as e:
raise click.BadParameter(f"region values must be integers: {spec!r}") from e
if w <= 0 or h <= 0:
raise click.BadParameter(f"region width/height must be positive: {spec!r}")
return x, y, w, h
@main.command("erase")
@click.argument("source", type=click.Path(exists=True, path_type=Path))
@click.option("--region", "regions", multiple=True, required=True, help="x,y,w,h box to erase (repeatable).")
@click.option(
"-o", "--output", type=click.Path(path_type=Path), default=None, help="Output path (default: <source>_clean.<ext>)."
)
@click.option(
"--backend",
type=click.Choice(["cv2", "lama"]),
default="cv2",
help="Inpaint backend. cv2: instant, no deps. lama: onnxruntime big-LaMa, better quality (extra 'lama').",
)
@click.option("--inpaint-method", type=click.Choice(["telea", "ns"]), default="telea", help="cv2 inpaint method.")
@click.option("--dilate", type=int, default=3, help="Grow the box by this many px before inpainting.")
@click.option("--strip-metadata/--keep-metadata", default=True, help="Strip AI metadata from output.")
@click.pass_context
def cmd_erase(
ctx: click.Context,
source: Path,
regions: tuple[str, ...],
output: Path | None,
backend: Literal["cv2", "lama"],
inpaint_method: str,
dilate: int,
strip_metadata: bool,
) -> None:
"""Erase arbitrary region(s) from an image via inpainting.
Universal and position-agnostic: removes any logo / watermark / object inside
the boxes you pass, regardless of colour or location. Runs on CPU. Use this
for marks the dedicated ``visible`` engines (Gemini, Doubao) do not cover.
"""
from remove_ai_watermarks.region_eraser import erase
_banner()
source = _validate_image(source)
if output is None:
output = source.with_stem(source.stem + "_clean")
boxes = [_parse_region(r) for r in regions]
image, alpha = _read_bgr_and_alpha(source)
if image is None:
console.print(f"Error: Failed to read image: {source}")
raise SystemExit(1)
h, w = image.shape[:2]
console.print(f" Input: {source.name} ({w}x{h}) {len(boxes)} region(s), backend={backend}")
t0 = time.monotonic()
method: Literal["telea", "ns"] = "ns" if inpaint_method == "ns" else "telea"
try:
with console.status(f"Erasing ({backend})..."):
result = erase(image, boxes=boxes, backend=backend, dilate=dilate, cv2_method=method)
except RuntimeError as e:
console.print(f" Error: {e}")
raise SystemExit(1) from e
elapsed = time.monotonic() - t0
output.parent.mkdir(parents=True, exist_ok=True)
_write_bgr_with_alpha(output, result, alpha)
if strip_metadata:
try:
from remove_ai_watermarks.metadata import remove_ai_metadata
remove_ai_metadata(output, output)
except Exception as e:
if ctx.obj.get("verbose"):
console.print(f" Warning: Failed to strip metadata: {e}")
size_kb = output.stat().st_size / 1024
console.print(f" Erased {len(boxes)} region(s) -> {output} ({size_kb:.0f} KB, {elapsed:.2f}s)")
# -- Invisible watermark removal -------------------------------------
@main.command("invisible")
@click.argument("source", type=click.Path(exists=True, path_type=Path))
@click.option(
"-o", "--output", type=click.Path(path_type=Path), default=None, help="Output path (default: <source>_clean.<ext>)."
)
@_strength_option
@click.option("--steps", type=int, default=50, help="Number of denoising steps. Default: 50.")
@_pipeline_option
@click.option(
"--device",
type=click.Choice(["auto", "cpu", "mps", "cuda", "xpu"]),
default="auto",
help="Inference device.",
)
@click.option("--seed", type=int, default=None, help="Random seed for reproducibility.")
@click.option("--hf-token", type=str, default=None, help="HuggingFace API token.")
@click.option(
"--humanize", type=float, default=0.0, help="Analog Humanizer film grain intensity (0 = off, typical: 2.0-6.0)."
)
@click.option(
"--max-resolution",
type=int,
default=0,
help="Cap long side (px) before diffusion; 0 = native (best quality, like raiw.cc). Raise only on GPU/MPS OOM.",
)
@_controlnet_scale_option
@_min_resolution_option
@_unsharp_option
@_upscaler_option
@_model_option
@_guidance_scale_option
@_auto_option
@_adaptive_polish_option
@click.pass_context
def cmd_invisible(
ctx: click.Context,
source: Path,
output: Path | None,
strength: float | None,
steps: int,
pipeline: str,
device: str,
seed: int | None,
hf_token: str | None,
humanize: float,
unsharp: float,
max_resolution: int,
min_resolution: int,
controlnet_scale: float,
upscaler: str,
model: str | None,
guidance_scale: float | None,
auto: bool,
adaptive_polish: bool,
) -> None:
"""Remove invisible AI watermarks (SynthID, StableSignature, TreeRing).
Uses diffusion-based regeneration. Requires GPU for reasonable speed.
Requires the [gpu] extra: pip install 'remove-ai-watermarks[gpu]'
"""
from remove_ai_watermarks.invisible_engine import is_available as invisible_available
if not invisible_available():
console.print(
"Error: GPU dependencies not installed.\n Install them with: pip install 'remove-ai-watermarks[gpu]'"
)
raise SystemExit(1)
from remove_ai_watermarks.invisible_engine import InvisibleEngine
source = _validate_image(source)
_warn_if_esrgan_unavailable(upscaler)
adaptive_polish = _resolve_auto_polish(auto, adaptive_polish)
if output is None:
output = source.with_stem(source.stem + "_clean")
device_str = None if device == "auto" else device
def progress_cb(msg: str) -> None:
console.print(f" {msg}")
engine = InvisibleEngine(
model_id=model,
device=device_str,
pipeline=pipeline,
hf_token=hf_token,
progress_callback=progress_cb,
controlnet_conditioning_scale=controlnet_scale,
)
# Detect the SynthID vendor from the ORIGINAL (before processing strips C2PA) so the
# displayed and executed strength agree on the vendor-adaptive default.
vendor = vendor_for_strength(source)
console.print(f" Input: {source.name}")
console.print(f" Pipeline: {pipeline}")
console.print(f" Strength: {resolve_strength(strength, vendor)} Steps: {steps}")
t0 = time.monotonic()
result_path = engine.remove_watermark(
image_path=source,
output_path=output,
strength=strength,
num_inference_steps=steps,
guidance_scale=guidance_scale,
seed=seed,
humanize=humanize,
unsharp=unsharp,
adaptive_polish=adaptive_polish,
max_resolution=max_resolution,
min_resolution=min_resolution,
upscaler=upscaler,
vendor=vendor,
)
elapsed = time.monotonic() - t0
size_kb = result_path.stat().st_size / 1024
console.print(f"\n Saved: {result_path} ({size_kb:.0f} KB, {elapsed:.1f}s)")
# -- Metadata operations ---------------------------------------------
@main.command("metadata")
@click.argument("source", type=click.Path(exists=True, path_type=Path))
@click.option("--check", is_flag=True, help="Check for AI metadata (don't modify).")
@click.option("--remove", is_flag=True, help="Remove AI metadata.")
@click.option(
"-o", "--output", type=click.Path(path_type=Path), default=None, help="Output path (default: overwrite source)."
)
@click.option("--keep-standard/--remove-all", default=True, help="Keep standard metadata (Author, Title, etc.).")
@click.pass_context
def cmd_metadata(
ctx: click.Context,
source: Path,
check: bool,
remove: bool,
output: Path | None,
keep_standard: bool,
) -> None:
"""Check or remove AI-generation metadata (images, video, and audio).
Strips EXIF AI tags, PNG text chunks, C2PA provenance manifests, and the
China TC260 AIGC label. Beyond images (PNG/JPEG/WebP/AVIF/HEIF/JXL) it also
strips provenance metadata from MP4/MOV/M4V/M4A containers and, via ffmpeg,
from WebM/MP3/WAV/FLAC/OGG. The coded image, audio, and video data are left
untouched.
"""
from remove_ai_watermarks.metadata import get_ai_metadata, has_ai_metadata, remove_ai_metadata
# No _validate_image() here: unlike the image-only commands, metadata also
# accepts video/audio containers, so the image-format warning would misfire.
# click's `exists=True` on the argument already enforces the file exists.
_banner()
if check or (not remove):
has_ai = has_ai_metadata(source)
if has_ai:
console.print(f" Warning: AI metadata detected in {source.name}:")
meta = get_ai_metadata(source)
if synthid := meta.get("synthid_watermark"):
console.print(f" Warning: SynthID watermark (inferred from C2PA metadata) {synthid}")
table = Table(show_header=True, header_style="bold")
table.add_column("Key", style="cyan")
table.add_column("Value")
for k, v in meta.items():
table.add_row(k, str(v)[:80])
console.print(table)
else:
console.print(f" No AI metadata found in {source.name}")
if not remove:
return
# Remove
out = remove_ai_metadata(source, output, keep_standard=keep_standard)
console.print(f" AI metadata stripped -> {out}")
# -- Provenance identification ---------------------------------------
@main.command("identify")
@click.argument("source", type=click.Path(exists=True, path_type=Path))
@click.option(
"--no-visible",
is_flag=True,
help="Skip pixel-domain detectors (visible sparkle + invisible watermark); metadata-only.",
)
@click.option("--json", "as_json", is_flag=True, help="Emit the report as JSON instead of a table.")
@click.pass_context
def cmd_identify(ctx: click.Context, source: Path, no_visible: bool, as_json: bool) -> None:
"""Identify where an image was made and what watermarks it carries.
Aggregates C2PA Content Credentials, IPTC "Made with AI" tags, embedded
generation parameters, the SynthID metadata proxy, and the visible Gemini
sparkle into a single provenance verdict. Absence of signals is reported as
"unknown", never as "clean" (stripped metadata leaves no local proof).
"""
from dataclasses import asdict
from remove_ai_watermarks.identify import identify
source = _validate_image(source)
report = identify(source, check_visible=not no_visible, check_invisible=not no_visible)
if as_json:
click.echo(json.dumps(asdict(report), default=str, indent=2))
return
_banner()
verdict = {True: "AI-generated", False: "not AI", None: "unknown"}[report.is_ai_generated]
console.print(f"\n Verdict: {verdict} (confidence: {report.confidence})")
console.print(f" Platform: {report.platform or 'undetermined'}")
if report.is_ai_generated is None:
console.print(
" No locally-readable AI signal found. This is not the same as 'clean': "
"metadata is often stripped by re-encoding, screenshots, or upload, and SynthID-class "
"pixel watermarks (Gemini / Nano Banana / gpt-image) have no local detector. "
"See caveats below."
)
if report.integrity_clashes:
console.print("\n Warning: Integrity clash (provenance signals contradict each other)")
for clash in report.integrity_clashes:
console.print(f" - {clash}")
if report.watermarks:
table = Table(show_header=True, header_style="bold", title="Watermarks / provenance markers")
table.add_column("Marker", style="cyan")
for wm in report.watermarks:
table.add_row(wm)
console.print(table)
else:
console.print(" No watermarks or provenance markers found.")
if report.caveats:
console.print("\n Caveats:")
for c in report.caveats:
console.print(f" - {c}")
# -- Combined "all" mode ----------------------------------------------
@main.command("all")
@click.argument("source", type=click.Path(exists=True, path_type=Path))
@click.option(
"-o", "--output", type=click.Path(path_type=Path), default=None, help="Output path (default: <source>_clean.<ext>)."
)
@click.option("--inpaint/--no-inpaint", default=True, help="Apply inpainting cleanup after visible removal.")
@click.option(
"--inpaint-method", type=click.Choice(["ns", "telea", "gaussian"]), default="ns", help="Inpainting method."
)
@_strength_option
@click.option("--steps", type=int, default=50, help="Number of denoising steps for invisible removal.")
@_pipeline_option
@_model_option
@click.option(
"--device",
type=click.Choice(["auto", "cpu", "mps", "cuda", "xpu"]),
default="auto",
help="Inference device.",
)
@click.option("--seed", type=int, default=None, help="Random seed for reproducibility.")
@click.option("--hf-token", type=str, default=None, help="HuggingFace API token.")
@click.option(
"--humanize", type=float, default=0.0, help="Analog Humanizer film grain intensity (0 = off, typical: 2.0-6.0)."
)
@click.option(
"--max-resolution",
type=int,
default=0,
help="Cap long side (px) before diffusion; 0 = native (best quality, like raiw.cc). Raise only on GPU/MPS OOM.",
)
@_controlnet_scale_option
@_min_resolution_option
@_unsharp_option
@_upscaler_option
@_guidance_scale_option
@_auto_option
@_adaptive_polish_option
@click.pass_context
def cmd_all(
ctx: click.Context,
source: Path,
output: Path | None,
inpaint: bool,
inpaint_method: Literal["ns", "telea", "gaussian"],
strength: float | None,
steps: int,
pipeline: str,
model: str | None,
device: str,
seed: int | None,
hf_token: str | None,
humanize: float,
unsharp: float,
max_resolution: int,
min_resolution: int,
controlnet_scale: float,
upscaler: str,
guidance_scale: float | None,
auto: bool,
adaptive_polish: bool,
) -> None:
"""Remove ALL watermarks: visible + invisible + metadata.
Runs the full pipeline in order:
1. Visible watermark removal (Gemini sparkle, reverse alpha blending)
2. Invisible watermark removal (SynthID etc., diffusion regeneration)
3. AI metadata stripping (EXIF, PNG text, C2PA)
If invisible watermark deps are not installed, skips step 2 with a warning.
"""
from remove_ai_watermarks.gemini_engine import GeminiEngine
_banner()
source = _validate_image(source)
_warn_if_esrgan_unavailable(upscaler)
adaptive_polish = _resolve_auto_polish(auto, adaptive_polish)
if output is None:
output = source.with_stem(source.stem + "_clean")
t0 = time.monotonic()
# Use a temp file for intermediate results so the user doesn't see
# a partial output file during long model downloads.
import tempfile
tmp_fd, tmp_path_str = tempfile.mkstemp(suffix=source.suffix)
tmp_path = Path(tmp_path_str)
try:
import os
os.close(tmp_fd)
# -- Step 1: Visible watermark --------------------------------
console.print("\n 1) Visible watermark removal")
engine = GeminiEngine()
image, alpha = _read_bgr_and_alpha(source)
if image is None:
console.print(f"Error: Failed to read image: {source}")
raise SystemExit(1)
h, w = image.shape[:2]
console.print(f" Input: {source.name} ({w}x{h})")
with console.status("Removing visible watermark..."):
det = engine.detect_watermark(image)
if det.detected:
result = engine.remove_watermark(image)
if inpaint:
region = _watermark_region(det, w, h)
result = engine.inpaint_residual(result, region, method=inpaint_method)
console.print(" Visible watermark removed")
else:
result = image.copy()
console.print(" Skipped (no visible watermark detected)")
# Save to temp file for invisible engine input (preserve alpha if present)
_write_bgr_with_alpha(tmp_path, result, alpha)
# -- Step 2: Invisible watermark ------------------------------
console.print("\n 2) Invisible watermark removal")
from remove_ai_watermarks.invisible_engine import is_available as invisible_available
if not invisible_available():
console.print(
" Warning: Skipped - GPU dependencies not installed.\n"
" Install them with: pip install 'remove-ai-watermarks[gpu]'"
)
else:
from remove_ai_watermarks.invisible_engine import InvisibleEngine
device_str = None if device == "auto" else device
def progress_cb(msg: str) -> None:
console.print(f" {msg}")
inv_engine = InvisibleEngine(
model_id=model,
device=device_str,
pipeline=pipeline,
hf_token=hf_token,
progress_callback=progress_cb,
controlnet_conditioning_scale=controlnet_scale,
)
# Detect the vendor from the pristine ORIGINAL (`source`); `tmp_path` has
# already lost its C2PA to the visible-removal pass, so reading it would
# always resolve to the unknown-vendor default.
vendor = vendor_for_strength(source)
console.print(f" Strength: {resolve_strength(strength, vendor)} Steps: {steps}")
inv_engine.remove_watermark(
image_path=tmp_path,
output_path=tmp_path,
strength=strength,
num_inference_steps=steps,
guidance_scale=guidance_scale,
seed=seed,
humanize=humanize,
unsharp=unsharp,
adaptive_polish=adaptive_polish,
max_resolution=max_resolution,
min_resolution=min_resolution,
upscaler=upscaler,
vendor=vendor,
)
console.print(" Invisible watermark removed")
# -- Step 3: Metadata -----------------------------------------
console.print("\n 3) AI metadata stripping")
try:
from remove_ai_watermarks.metadata import remove_ai_metadata
remove_ai_metadata(tmp_path, tmp_path)
console.print(" AI metadata stripped")
except Exception as e:
console.print(f" Warning: Metadata strip failed: {e}")
# -- Write final result ----------------------------------------
# The invisible step (and downstream cv2.IMREAD_COLOR paths) drops alpha,
# so re-attach the original alpha plane unchanged when writing the final
# output for transparent formats.
output.parent.mkdir(parents=True, exist_ok=True)
final_bgr, _ = _read_bgr_and_alpha(tmp_path)
if final_bgr is None:
console.print(f"Error: Failed to read intermediate file: {tmp_path}")
raise SystemExit(1)
_write_bgr_with_alpha(output, final_bgr, alpha)
finally:
# Clean up temp file if it still exists
if tmp_path.exists():
tmp_path.unlink()
# -- Done -----------------------------------------------------
elapsed = time.monotonic() - t0
size_kb = output.stat().st_size / 1024
console.print(f"\n Done: {output} ({size_kb:.0f} KB, {elapsed:.1f}s total)")
# -- Batch command ----------------------------------------------------
def _process_batch_image(
ctx: click.Context,
img_path: Path,
out_path: Path,
mode: str,
inpaint: bool,
strength: float | None,
steps: int,
pipeline: str,
device: str,
seed: int | None,
hf_token: str | None,
humanize: float,
unsharp: float = 0.0,
max_resolution: int = 0,
min_resolution: int = 1024,
controlnet_scale: float = 1.0,
upscaler: str = "lanczos",
model: str | None = None,
guidance_scale: float | None = None,
adaptive_polish: bool = False,
) -> None:
"""Process a single image for batch mode.
Applies the requested watermark removal steps (visible, invisible,
metadata) to *img_path* and writes the result to *out_path*.
Raises:
ValueError: If the image cannot be opened.
"""
saved_alpha: NDArray[Any] | None = None
if mode in ("visible", "all"):
from remove_ai_watermarks.gemini_engine import GeminiEngine
if "_vis_engine" not in ctx.obj:
ctx.obj["_vis_engine"] = GeminiEngine()
engine = ctx.obj["_vis_engine"]
read_path = img_path
if mode == "all" and out_path.exists():
read_path = out_path
image, alpha = _read_bgr_and_alpha(read_path)
if image is None:
raise ValueError("Failed to read image")
det = engine.detect_watermark(image)
if det.detected:
result = engine.remove_watermark(image)
if inpaint:
h, w = image.shape[:2]
region = _watermark_region(det, w, h)
result = engine.inpaint_residual(result, region)
else:
result = image.copy()
_write_bgr_with_alpha(out_path, result, alpha)
saved_alpha = alpha
if mode in ("invisible", "all"):
from remove_ai_watermarks.invisible_engine import (
is_available as invisible_available,
)
if invisible_available():
from remove_ai_watermarks.invisible_engine import InvisibleEngine
# Cache the engine in ctx.obj so the batch builds it once (pipeline is a
# single CLI value, constant across the run).
engines = ctx.obj.setdefault("_inv_engines", {})
if pipeline not in engines:
engines[pipeline] = InvisibleEngine(
model_id=model,
device=None if device == "auto" else device,
pipeline=pipeline,
hf_token=hf_token,
controlnet_conditioning_scale=controlnet_scale,
)
engine_inv = engines[pipeline]
engine_inv.remove_watermark(
img_path if mode == "invisible" else out_path,
out_path,
strength=strength,
num_inference_steps=steps,
guidance_scale=guidance_scale,
seed=seed,
humanize=humanize,
unsharp=unsharp,
adaptive_polish=adaptive_polish,
max_resolution=max_resolution,
min_resolution=min_resolution,
upscaler=upscaler,
# Detect the vendor from the pristine original (`img_path`), not the
# visible-processed `out_path` whose C2PA is already gone.
vendor=vendor_for_strength(img_path),
)
if mode in ("metadata", "all"):
from remove_ai_watermarks.metadata import remove_ai_metadata
remove_ai_metadata(img_path if mode == "metadata" else out_path, out_path)
# In "all" mode, the invisible step (color-only OpenCV paths) drops alpha,
# so re-attach the cached alpha when the input had transparency.
if mode == "all" and saved_alpha is not None:
final_bgr, _ = _read_bgr_and_alpha(out_path)
if final_bgr is not None:
_write_bgr_with_alpha(out_path, final_bgr, saved_alpha)
@main.command("batch")
@click.argument("directory", type=click.Path(exists=True, file_okay=False, path_type=Path))
@click.option(
"-o",
"--output-dir",
type=click.Path(path_type=Path),
default=None,
help="Output directory (default: <dir>_clean/).",
)
@click.option(
"--mode", type=click.Choice(["visible", "invisible", "metadata", "all"]), default="visible", help="Processing mode."
)
@_strength_option
@click.option("--steps", type=int, default=50, help="Number of denoising steps (invisible mode).")
@click.option("--inpaint/--no-inpaint", default=True, help="Apply inpainting (visible mode).")
@click.option(
"--humanize", type=float, default=0.0, help="Analog Humanizer film grain intensity (0 = off, typical: 2.0-6.0)."
)
@_pipeline_option
@click.option(
"--device",
type=click.Choice(["auto", "cpu", "mps", "cuda", "xpu"]),
default="auto",
help="Inference device.",
)
@click.option("--seed", type=int, default=None, help="Random seed for reproducibility.")
@click.option("--hf-token", type=str, default=None, help="HuggingFace API token.")
@click.option(
"--max-resolution",
type=int,
default=0,
help="Cap long side (px) before diffusion; 0 = native (best quality, like raiw.cc). Raise only on GPU/MPS OOM.",
)
@_min_resolution_option
@_unsharp_option
@_upscaler_option
@_controlnet_scale_option
@_model_option
@_guidance_scale_option
@_auto_option
@_adaptive_polish_option
@click.pass_context
def cmd_batch(
ctx: click.Context,
directory: Path,
mode: str,
output_dir: Path | None,
strength: float | None,
steps: int,
pipeline: str,
device: str,
seed: int | None,
hf_token: str | None,
inpaint: bool,
humanize: float,
unsharp: float,
max_resolution: int,
min_resolution: int,
controlnet_scale: float,
upscaler: str,
model: str | None,
guidance_scale: float | None,
auto: bool,
adaptive_polish: bool,
) -> None:
"""Process all images in a directory."""
_banner()
if output_dir is None:
output_dir = directory.parent / (directory.name + "_clean")
output_dir.mkdir(parents=True, exist_ok=True)
images = sorted(p for p in directory.iterdir() if p.suffix.lower() in SUPPORTED_FORMATS)
if not images:
console.print(f"No supported images found in {directory}")
return
console.print(f" Found {len(images)} images in {directory}")
console.print(f" Output -> {output_dir}")
console.print(f" Mode: {mode}")
if mode in ("invisible", "all"):
_warn_if_esrgan_unavailable(upscaler)
adaptive_polish = _resolve_auto_polish(auto, adaptive_polish)
processed = 0
errors = 0
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TimeElapsedColumn(),
console=console,
) as progress:
task = progress.add_task("Processing...", total=len(images))
for img_path in images:
out_path = output_dir / img_path.name
progress.update(task, description=f"{img_path.name}")
try:
_process_batch_image(
ctx=ctx,
img_path=img_path,
out_path=out_path,
mode=mode,
inpaint=inpaint,
strength=strength,
steps=steps,
pipeline=pipeline,
device=device,
seed=seed,
hf_token=hf_token,
humanize=humanize,
unsharp=unsharp,
max_resolution=max_resolution,
min_resolution=min_resolution,
controlnet_scale=controlnet_scale,
upscaler=upscaler,
model=model,
guidance_scale=guidance_scale,
adaptive_polish=adaptive_polish,
)
processed += 1
except Exception as e:
errors += 1
if ctx.obj.get("verbose"):
console.print(f" {img_path.name}: {e}")
progress.advance(task)
console.print(f"\n {processed} processed" + (f" {errors} errors" if errors else ""))
if __name__ == "__main__":
main()