mirror of
https://github.com/wiltodelta/remove-ai-watermarks.git
synced 2026-06-10 12:53:56 +02:00
feat: GFPGAN face-identity restoration post-pass
Add an optional, commercial-safe face-restoration post-pass that recovers face identity the diffusion removal pass drifts (canny holds structure, not likeness) while still scrubbing the pixel watermark in the face regions. - face_restore.py: GFPGANer singleton (CPU unless CUDA), the basicsr torchvision.transforms.functional_tensor shim, and the pure feather _composite_faces helper (unit-tested without the model). GFPGAN re-synthesizes each face from a StyleGAN2 prior, so composited face pixels are GAN-generated (no watermark, no pixel-copy) -- oracle-clean at weight 0.5 with identity preserved. - InvisibleEngine.remove_watermark: restore_faces / restore_faces_weight, best-effort, auto-skips when the extra is absent or no face is detected. - CLI --restore-faces/--no-restore-faces + --restore-faces-weight on invisible/all/batch (on by default). - restore extra (gfpgan/facexlib/basicsr), numpy<2-pinned (scipy<1.18, numba<0.60) and kept out of `all`; basicsr needs Python <3.13 + setuptools<69 to build, so pin .python-version 3.12. Commercial-safe: GFPGAN Apache-2.0, RetinaFace MIT. The CodeFormer alternative is non-commercial and is not shipped. The earlier IP-Adapter FaceID layer was removed (footgun: needs high strength, corrupts faces at the low removal strength). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -147,6 +147,25 @@ _controlnet_scale_option = click.option(
|
||||
)
|
||||
|
||||
|
||||
def _restore_faces_options(f: Any) -> Any:
|
||||
"""Attach the shared GFPGAN face-restoration flags to an invisible-pipeline command."""
|
||||
restore_flag = click.option(
|
||||
"--restore-faces/--no-restore-faces",
|
||||
default=True,
|
||||
help="Restore face identity with a GFPGAN post-pass when faces are present "
|
||||
"(needs the 'restore' extra); on by default, auto-skips when no face is detected "
|
||||
"or the extra is absent.",
|
||||
)
|
||||
weight_flag = click.option(
|
||||
"--restore-faces-weight",
|
||||
type=float,
|
||||
default=0.5,
|
||||
help="GFPGAN fidelity weight (0-1); lower = more GAN regeneration (cleaner "
|
||||
"watermark scrub), higher = closer to the input.",
|
||||
)
|
||||
return restore_flag(weight_flag(f))
|
||||
|
||||
|
||||
def _watermark_region(det: DetectionResult, width: int, height: int) -> tuple[int, int, int, int]:
|
||||
"""Pick a watermark bbox: detector's region if confident, else the default config slot."""
|
||||
if det.confidence > 0.15:
|
||||
@@ -472,6 +491,7 @@ def cmd_erase(
|
||||
help="Cap long side (px) before diffusion; 0 = native (best quality, like raiw.cc). Raise only on GPU/MPS OOM.",
|
||||
)
|
||||
@_controlnet_scale_option
|
||||
@_restore_faces_options
|
||||
@click.pass_context
|
||||
def cmd_invisible(
|
||||
ctx: click.Context,
|
||||
@@ -486,6 +506,8 @@ def cmd_invisible(
|
||||
humanize: float,
|
||||
max_resolution: int,
|
||||
controlnet_scale: float,
|
||||
restore_faces: bool,
|
||||
restore_faces_weight: float,
|
||||
) -> None:
|
||||
"""Remove invisible AI watermarks (SynthID, StableSignature, TreeRing).
|
||||
|
||||
@@ -537,6 +559,8 @@ def cmd_invisible(
|
||||
humanize=humanize,
|
||||
max_resolution=max_resolution,
|
||||
vendor=vendor,
|
||||
restore_faces=restore_faces,
|
||||
restore_faces_weight=restore_faces_weight,
|
||||
)
|
||||
elapsed = time.monotonic() - t0
|
||||
|
||||
@@ -712,6 +736,7 @@ def cmd_identify(ctx: click.Context, source: Path, no_visible: bool, as_json: bo
|
||||
help="Cap long side (px) before diffusion; 0 = native (best quality, like raiw.cc). Raise only on GPU/MPS OOM.",
|
||||
)
|
||||
@_controlnet_scale_option
|
||||
@_restore_faces_options
|
||||
@click.pass_context
|
||||
def cmd_all(
|
||||
ctx: click.Context,
|
||||
@@ -729,6 +754,8 @@ def cmd_all(
|
||||
humanize: float,
|
||||
max_resolution: int,
|
||||
controlnet_scale: float,
|
||||
restore_faces: bool,
|
||||
restore_faces_weight: float,
|
||||
) -> None:
|
||||
"""Remove ALL watermarks: visible + invisible + metadata.
|
||||
|
||||
@@ -826,6 +853,8 @@ def cmd_all(
|
||||
humanize=humanize,
|
||||
max_resolution=max_resolution,
|
||||
vendor=vendor,
|
||||
restore_faces=restore_faces,
|
||||
restore_faces_weight=restore_faces_weight,
|
||||
)
|
||||
console.print(" Invisible watermark removed")
|
||||
|
||||
@@ -878,6 +907,8 @@ def _process_batch_image(
|
||||
hf_token: str | None,
|
||||
humanize: float,
|
||||
max_resolution: int = 0,
|
||||
restore_faces: bool = True,
|
||||
restore_faces_weight: float = 0.5,
|
||||
) -> None:
|
||||
"""Process a single image for batch mode.
|
||||
|
||||
@@ -938,6 +969,8 @@ def _process_batch_image(
|
||||
seed=seed,
|
||||
humanize=humanize,
|
||||
max_resolution=max_resolution,
|
||||
restore_faces=restore_faces,
|
||||
restore_faces_weight=restore_faces_weight,
|
||||
# Detect the vendor from the pristine original (`img_path`), not the
|
||||
# visible-processed `out_path` whose C2PA is already gone.
|
||||
vendor=vendor_for_strength(img_path),
|
||||
@@ -995,6 +1028,7 @@ def _process_batch_image(
|
||||
default=0,
|
||||
help="Cap long side (px) before diffusion; 0 = native (best quality, like raiw.cc). Raise only on GPU/MPS OOM.",
|
||||
)
|
||||
@_restore_faces_options
|
||||
@click.pass_context
|
||||
def cmd_batch(
|
||||
ctx: click.Context,
|
||||
@@ -1010,6 +1044,8 @@ def cmd_batch(
|
||||
inpaint: bool,
|
||||
humanize: float,
|
||||
max_resolution: int,
|
||||
restore_faces: bool,
|
||||
restore_faces_weight: float,
|
||||
) -> None:
|
||||
"""Process all images in a directory."""
|
||||
_banner()
|
||||
@@ -1060,6 +1096,8 @@ def cmd_batch(
|
||||
hf_token=hf_token,
|
||||
humanize=humanize,
|
||||
max_resolution=max_resolution,
|
||||
restore_faces=restore_faces,
|
||||
restore_faces_weight=restore_faces_weight,
|
||||
)
|
||||
processed += 1
|
||||
|
||||
|
||||
@@ -0,0 +1,191 @@
|
||||
"""Optional GFPGAN face-restoration post-pass for the invisible removal pipeline.
|
||||
|
||||
The diffusion removal pass scrubs the watermark everywhere but lets faces drift in
|
||||
likeness (canny holds face *structure*, not *identity*). This module restores each
|
||||
face's identity by running GFPGAN on the ORIGINAL (watermarked) image and
|
||||
feather-compositing the restored face REGIONS into the cleaned image.
|
||||
|
||||
GFPGAN RE-SYNTHESIZES each face from a StyleGAN2 prior -- the composited pixels are
|
||||
GAN-generated, NOT copied from the original -- so the pixel watermark is scrubbed in
|
||||
the face regions too, while identity is preserved (oracle-validated at weight 0.5).
|
||||
Both GFPGAN (Apache-2.0) and its RetinaFace detector (MIT) are commercial-safe.
|
||||
|
||||
The GFPGANv1.4 weights and the RetinaFace detector download on first use and are
|
||||
never bundled. Requires the optional ``restore`` extra (gfpgan/facexlib/basicsr).
|
||||
"""
|
||||
|
||||
# cv2/torch/gfpgan boundary: gfpgan/basicsr/facexlib ship no usable type stubs and
|
||||
# this module wraps cv2 (feather composite) and torch; relax the unknown-type rules
|
||||
# for this file only.
|
||||
# pyright: reportUnknownMemberType=false, reportUnknownArgumentType=false, reportUnknownVariableType=false, reportUnknownParameterType=false, reportMissingTypeArgument=false, reportMissingTypeStubs=false, reportMissingImports=false, reportArgumentType=false, reportAssignmentType=false, reportReturnType=false, reportCallIssue=false, reportIndexIssue=false, reportOperatorIssue=false, reportOptionalMemberAccess=false, reportOptionalCall=false, reportOptionalSubscript=false, reportOptionalOperand=false, reportAttributeAccessIssue=false, reportPrivateImportUsage=false, reportPrivateUsage=false, reportInvalidTypeForm=false, reportConstantRedefinition=false, reportUnnecessaryComparison=false
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import sys
|
||||
import threading
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from numpy.typing import NDArray
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# GFPGANv1.4 weights (Apache-2.0). Downloaded on first use, never bundled.
|
||||
_GFPGAN_MODEL_URL = "https://github.com/TencentARC/GFPGAN/releases/download/v1.3.0/GFPGANv1.4.pth"
|
||||
_GFPGAN_ARCH = "clean"
|
||||
_GFPGAN_CHANNEL_MULTIPLIER = 2
|
||||
|
||||
_restorer: Any | None = None
|
||||
_restorer_lock = threading.Lock()
|
||||
|
||||
|
||||
def is_available() -> bool:
|
||||
"""True when the optional GFPGAN face-restoration deps are importable."""
|
||||
import importlib.util
|
||||
|
||||
return importlib.util.find_spec("gfpgan") is not None and importlib.util.find_spec("facexlib") is not None
|
||||
|
||||
|
||||
def _apply_basicsr_shim() -> None:
|
||||
"""Install the ``torchvision.transforms.functional_tensor`` compatibility shim.
|
||||
|
||||
basicsr (a GFPGAN dependency) imports ``rgb_to_grayscale`` from the
|
||||
``torchvision.transforms.functional_tensor`` module, which newer torchvision
|
||||
removed. Recreate that module pointing at the public functional API. Idempotent:
|
||||
only installed when the real module is missing.
|
||||
"""
|
||||
import importlib.util
|
||||
|
||||
if importlib.util.find_spec("torchvision.transforms.functional_tensor") is not None:
|
||||
return
|
||||
if "torchvision.transforms.functional_tensor" in sys.modules:
|
||||
return
|
||||
|
||||
import types
|
||||
|
||||
import torchvision.transforms.functional as tv_functional
|
||||
|
||||
shim = types.ModuleType("torchvision.transforms.functional_tensor")
|
||||
shim.rgb_to_grayscale = tv_functional.rgb_to_grayscale
|
||||
sys.modules["torchvision.transforms.functional_tensor"] = shim
|
||||
|
||||
|
||||
def _select_device() -> str:
|
||||
"""Pick the GFPGAN device: CUDA when present, else CPU.
|
||||
|
||||
The pip GFPGANer has an MPS device-mismatch bug, and this is a cheap post-pass
|
||||
on a few face crops, so MPS is deliberately avoided -- CPU is the safe default
|
||||
on Apple silicon.
|
||||
"""
|
||||
try:
|
||||
import torch
|
||||
|
||||
if torch.cuda.is_available():
|
||||
return "cuda"
|
||||
except Exception as e:
|
||||
logger.debug("face_restore: CUDA probe failed (%s); using CPU", e)
|
||||
return "cpu"
|
||||
|
||||
|
||||
def _get_restorer() -> Any:
|
||||
"""Return the lazily-built GFPGANer singleton (downloads weights on first use)."""
|
||||
global _restorer
|
||||
if _restorer is not None:
|
||||
return _restorer
|
||||
with _restorer_lock:
|
||||
if _restorer is None:
|
||||
_apply_basicsr_shim()
|
||||
from gfpgan import GFPGANer
|
||||
|
||||
_restorer = GFPGANer(
|
||||
model_path=_GFPGAN_MODEL_URL,
|
||||
upscale=1,
|
||||
arch=_GFPGAN_ARCH,
|
||||
channel_multiplier=_GFPGAN_CHANNEL_MULTIPLIER,
|
||||
device=_select_device(),
|
||||
)
|
||||
return _restorer
|
||||
|
||||
|
||||
def _composite_faces(
|
||||
base_bgr: NDArray[Any],
|
||||
restored_bgr: NDArray[Any],
|
||||
boxes: list[tuple[float, float, float, float]],
|
||||
pad: int = 14,
|
||||
feather_div: int = 6,
|
||||
) -> NDArray[Any]:
|
||||
"""Feather-composite restored face regions from ``restored_bgr`` into ``base_bgr``.
|
||||
|
||||
Pure cv2/numpy helper (no gfpgan), so it is unit-testable without the model.
|
||||
For each ``(x1, y1, x2, y2)`` box: pad and clip to the image, build a Gaussian-
|
||||
feathered rectangular alpha, and blend ``restored * a + base * (1 - a)``. Boxes
|
||||
that fall fully outside the image (or an empty list) leave ``base_bgr`` unchanged.
|
||||
"""
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
out = base_bgr.astype(np.float32)
|
||||
h, w = base_bgr.shape[:2]
|
||||
|
||||
for box in boxes:
|
||||
x1 = int(box[0]) - pad
|
||||
y1 = int(box[1]) - pad
|
||||
x2 = int(box[2]) + pad
|
||||
y2 = int(box[3]) + pad
|
||||
x1 = max(0, min(x1, w))
|
||||
y1 = max(0, min(y1, h))
|
||||
x2 = max(0, min(x2, w))
|
||||
y2 = max(0, min(y2, h))
|
||||
bw = x2 - x1
|
||||
bh = y2 - y1
|
||||
if bw <= 0 or bh <= 0:
|
||||
continue
|
||||
|
||||
alpha = np.zeros((h, w), dtype=np.float32)
|
||||
alpha[y1:y2, x1:x2] = 1.0
|
||||
k = max(3, (min(bw, bh) // feather_div) | 1) # odd kernel >= 3
|
||||
alpha = cv2.GaussianBlur(alpha, (k, k), 0)
|
||||
alpha = alpha[:, :, None]
|
||||
out = restored_bgr.astype(np.float32) * alpha + out * (1.0 - alpha)
|
||||
|
||||
return np.clip(out, 0, 255).astype(np.uint8)
|
||||
|
||||
|
||||
def restore_faces(
|
||||
original_bgr: NDArray[Any],
|
||||
cleaned_bgr: NDArray[Any],
|
||||
weight: float = 0.5,
|
||||
pad: int = 14,
|
||||
feather_div: int = 6,
|
||||
) -> NDArray[Any]:
|
||||
"""Restore face identity in ``cleaned_bgr`` using GFPGAN on ``original_bgr``.
|
||||
|
||||
Runs GFPGAN on the ORIGINAL (watermarked) image to recover the true-identity,
|
||||
GAN-regenerated faces plus the RetinaFace boxes, then feather-composites those
|
||||
face regions into the cleaned image. The composited pixels are GFPGAN-generated
|
||||
(not original), so no watermark and no pixel-copy. Returns ``cleaned_bgr``
|
||||
unchanged when no face is detected.
|
||||
|
||||
Args:
|
||||
original_bgr: The original (watermarked) image as cv2 BGR.
|
||||
cleaned_bgr: The diffusion-cleaned image as cv2 BGR (faces drifted).
|
||||
weight: GFPGAN fidelity weight (0-1); lower = more GAN regeneration.
|
||||
pad: Pixels to grow each face box before compositing.
|
||||
feather_div: Larger = sharper composite edge (box-min // feather_div kernel).
|
||||
"""
|
||||
restorer = _get_restorer()
|
||||
_, _, restored_img = restorer.enhance(
|
||||
original_bgr,
|
||||
has_aligned=False,
|
||||
only_center_face=False,
|
||||
paste_back=True,
|
||||
weight=weight,
|
||||
)
|
||||
|
||||
det_faces = getattr(restorer.face_helper, "det_faces", None) or []
|
||||
boxes = [(float(b[0]), float(b[1]), float(b[2]), float(b[3])) for b in det_faces]
|
||||
if not boxes:
|
||||
logger.debug("face_restore: no faces detected; returning cleaned image unchanged")
|
||||
return cleaned_bgr
|
||||
|
||||
return _composite_faces(cleaned_bgr, restored_img, boxes, pad=pad, feather_div=feather_div)
|
||||
@@ -17,7 +17,7 @@ import logging
|
||||
import os
|
||||
import warnings
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Callable
|
||||
@@ -126,6 +126,8 @@ class InvisibleEngine:
|
||||
humanize: float = 0.0,
|
||||
max_resolution: int = 0,
|
||||
vendor: str | None = None,
|
||||
restore_faces: bool = True,
|
||||
restore_faces_weight: float = 0.5,
|
||||
) -> Path:
|
||||
"""Remove invisible watermark from an image.
|
||||
|
||||
@@ -138,6 +140,11 @@ class InvisibleEngine:
|
||||
guidance_scale: Classifier-free guidance scale.
|
||||
seed: Random seed for reproducibility.
|
||||
humanize: Intensity of Analog Humanizer film grain (0 = off).
|
||||
restore_faces: Run the optional GFPGAN face-restoration post-pass when
|
||||
faces are present (needs the ``restore`` extra). Auto-skips with a
|
||||
debug log when the extra is absent or no face is detected.
|
||||
restore_faces_weight: GFPGAN fidelity weight (0-1); lower = more GAN
|
||||
regeneration (cleaner watermark scrub), higher = closer to input.
|
||||
max_resolution: Cap the long side (px) before diffusion. 0 (default)
|
||||
= native resolution, no pre-downscale -- matches the hosted
|
||||
raiw.cc backend. Set a positive value only to bound GPU/MPS
|
||||
@@ -234,12 +241,69 @@ class InvisibleEngine:
|
||||
out_cv = cv2.resize(out_cv, orig_size, interpolation=cv2.INTER_LANCZOS4)
|
||||
image_io.imwrite(out_path, out_cv)
|
||||
|
||||
# Optional GFPGAN face-restoration post-pass: restore face identity that
|
||||
# the diffusion regeneration drifted, while still scrubbing the pixel
|
||||
# watermark (GFPGAN re-synthesizes faces from a StyleGAN2 prior). Runs on
|
||||
# the cleaned output at its final resolution; auto-skips when faces are
|
||||
# absent or the optional extra is not installed.
|
||||
if restore_faces:
|
||||
self._restore_faces(out_path, image, restore_faces_weight)
|
||||
|
||||
return out_path
|
||||
finally:
|
||||
# _tmp_path is always set above (we persist the image unconditionally).
|
||||
if _tmp_path.exists():
|
||||
_tmp_path.unlink()
|
||||
|
||||
def _restore_faces(
|
||||
self,
|
||||
out_path: Path,
|
||||
original_image: Any,
|
||||
weight: float,
|
||||
) -> None:
|
||||
"""Run the GFPGAN face-restoration post-pass on the cleaned ``out_path``.
|
||||
|
||||
Composites GFPGAN-restored (identity-preserving, watermark-scrubbed) face
|
||||
regions from the ORIGINAL image into the cleaned output. Best-effort: any
|
||||
failure logs a warning and leaves the un-restored cleaned output in place;
|
||||
a missing ``restore`` extra is logged at debug and skipped (the default-on
|
||||
flag must never error when the extra is absent or no face is present).
|
||||
"""
|
||||
from remove_ai_watermarks import face_restore
|
||||
|
||||
if not face_restore.is_available():
|
||||
logger.debug("restore_faces requested but the 'restore' extra is not installed; skipping")
|
||||
return
|
||||
|
||||
try:
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
from remove_ai_watermarks import image_io
|
||||
|
||||
cleaned_bgr = image_io.imread(out_path, cv2.IMREAD_COLOR)
|
||||
if cleaned_bgr is None:
|
||||
logger.warning("restore_faces: could not read cleaned output %s; skipping", out_path)
|
||||
return
|
||||
|
||||
# Original (EXIF-transposed) as BGR, aligned to the cleaned image so the
|
||||
# GFPGAN face boxes land in the cleaned image's coordinate space. The
|
||||
# cleaned output is already restored to the original resolution above, so
|
||||
# this resize is normally a no-op (it only fires if a max-resolution cap
|
||||
# left the source PIL image smaller).
|
||||
original_rgb = original_image.convert("RGB")
|
||||
original_bgr = cv2.cvtColor(np.array(original_rgb), cv2.COLOR_RGB2BGR)
|
||||
cleaned_size = (cleaned_bgr.shape[1], cleaned_bgr.shape[0])
|
||||
if (original_bgr.shape[1], original_bgr.shape[0]) != cleaned_size:
|
||||
original_bgr = cv2.resize(original_bgr, cleaned_size, interpolation=cv2.INTER_LANCZOS4)
|
||||
|
||||
if self._progress_callback:
|
||||
self._progress_callback("Restoring face identity (GFPGAN post-pass)...")
|
||||
restored = face_restore.restore_faces(original_bgr, cleaned_bgr, weight=weight)
|
||||
image_io.imwrite(out_path, restored)
|
||||
except Exception as e:
|
||||
logger.warning("restore_faces post-pass failed (%s); keeping un-restored output", e)
|
||||
|
||||
def remove_watermark_batch(
|
||||
self,
|
||||
input_dir: Path,
|
||||
|
||||
Reference in New Issue
Block a user