mirror of
https://github.com/wiltodelta/remove-ai-watermarks.git
synced 2026-06-10 12:53:56 +02:00
refactor(face-restore): drop GFPGAN, ship PhotoMaker-V2 as the sole restore (non-commercial)
Visual review of the GFPGAN-on-cleaned output (9-face grid, 1448x1086) showed it only polished the already-drifted face without restoring identity — useless for the "restore who is in the photo" intent. Dropping it. The shipped restore path is now PhotoMaker-V2, which delivers true identity-from- embedding face regeneration via a CLIP+ArcFace dual encoder. The ArcFace branch pulls InsightFace antelopev2/buffalo_l model packs at runtime, which InsightFace releases under a research-only license, so the whole extra is **NON-COMMERCIAL**. raiw.cc and any monetized deployment must NOT install the `photomaker` extra. This is called out at every entry point: CLI flag help, module docstring, pyproject extra block, CLAUDE.md extras bullet, README install snippet. Changes: - Deleted `src/remove_ai_watermarks/face_restore.py` and its tests. - Deleted the `restore` extra (gfpgan/facexlib/basicsr + scipy<1.18 / numba<0.60 pins) and the basicsr setuptools<69 build pin from pyproject.toml. - Restored `src/remove_ai_watermarks/photomaker_restore.py` (V2 this time: `TencentARC/PhotoMaker-V2`, `photomaker-v2.bin`, no `pm_version='v1'` override). - Restored the `photomaker` extra in pyproject with all the upstream-compat pins (einops, peft, onnxruntime, insightface) and the `allow-direct-references` hatch metadata block. - `InvisibleEngine` swapped `_restore_faces` -> `_restore_faces_photomaker`; `--restore-faces-method` removed (only one method, no choice). - CLI flag help, CLAUDE.md, README, docs/synthid.md, and docs/controlnet-removal-pipeline-research.md all updated. - docs/synthid-robust-identity-research.md status notice rewritten to list both abandoned commercial-safe attempts (V1 + GFPGAN-on-cleaned) and the non-commercial trade-off we accepted. ruff + strict pyright(src/) clean; 578 tests pass (the 9 GFPGAN tests are gone, the 11 PhotoMaker tests stay green). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -238,16 +238,18 @@ def _warn_if_esrgan_unavailable(upscaler: str) -> None:
|
||||
def _restore_faces_options(f: Any) -> Any:
|
||||
"""Attach the face-restoration flag to an invisible-pipeline command.
|
||||
|
||||
The post-pass runs GFPGAN on the DIFFUSION-CLEANED image (not the original), so
|
||||
SynthID is not re-introduced (the input pixels GFPGAN derives from are already
|
||||
SynthID-free). See ``face_restore.py``.
|
||||
The post-pass uses PhotoMaker-V2 to regenerate each face from a CLIP+ArcFace
|
||||
embedding. **NON-COMMERCIAL** -- PhotoMaker-V2 pulls InsightFace antelopev2/
|
||||
buffalo_l model packs at runtime, which are research-only. A paid service
|
||||
(raiw.cc, any monetized SaaS) MUST NOT use this flag.
|
||||
"""
|
||||
return click.option(
|
||||
"--restore-faces/--no-restore-faces",
|
||||
default=False,
|
||||
help="EXPERIMENTAL, opt-in. Polish face detail with a GFPGAN post-pass on the "
|
||||
"cleaned image when faces are present (needs the 'restore' extra); off by default, "
|
||||
"auto-skips when no face is detected or the extra is absent.",
|
||||
help="EXPERIMENTAL, opt-in, **NON-COMMERCIAL** -- needs the 'photomaker' extra "
|
||||
"which pulls non-commercial InsightFace model packs. Restores face identity via "
|
||||
"PhotoMaker-V2 (CLIP+ArcFace embedding -> fresh face); off by default, auto-skips "
|
||||
"when no face is detected or the extra is absent.",
|
||||
)(f)
|
||||
|
||||
|
||||
|
||||
@@ -1,210 +0,0 @@
|
||||
"""Optional GFPGAN face-polish post-pass for the invisible removal pipeline.
|
||||
|
||||
The diffusion removal pass scrubs the watermark everywhere but lets faces drift in
|
||||
likeness (canny holds face *structure*, not *identity*). This module sharpens and
|
||||
re-synthesizes each face from GFPGAN's StyleGAN2 prior, running on the
|
||||
DIFFUSION-CLEANED image -- not on the original.
|
||||
|
||||
**Why "cleaned, not original":** an earlier version of this module ran GFPGAN on the
|
||||
ORIGINAL (watermarked) image and was oracle-confirmed (2026-06-04) to re-introduce
|
||||
SynthID into the face regions, because GFPGAN at fidelity weight 0.5 blends ~half
|
||||
the input pixels with the prior, and SynthID is robust to that partial blend. The
|
||||
fix is to feed GFPGAN the already-clean image -- whatever pixels it preserves are
|
||||
already SynthID-free, so the composited face stays clean. Identity is recovered from
|
||||
the StyleGAN2 prior conditioned on the already-drifted cleaned face (not on the
|
||||
original face), so identity fidelity is somewhat lower than the would-have-been
|
||||
identity-as-embedding stack (PhotoMaker-V1), but the upstream PhotoMaker package has
|
||||
significant compatibility issues with the diffusers version we ship, so this is the
|
||||
shipping path.
|
||||
|
||||
Both GFPGAN (Apache-2.0) and its RetinaFace detector (MIT) are commercial-safe.
|
||||
The GFPGANv1.4 weights and the RetinaFace detector download on first use and are
|
||||
never bundled. Requires the optional ``restore`` extra (gfpgan/facexlib/basicsr).
|
||||
"""
|
||||
|
||||
# cv2/torch/gfpgan boundary: gfpgan/basicsr/facexlib ship no usable type stubs and
|
||||
# this module wraps cv2 (feather composite) and torch; relax the unknown-type rules
|
||||
# for this file only.
|
||||
# pyright: reportUnknownMemberType=false, reportUnknownArgumentType=false, reportUnknownVariableType=false, reportUnknownParameterType=false, reportMissingTypeArgument=false, reportMissingTypeStubs=false, reportMissingImports=false, reportArgumentType=false, reportAssignmentType=false, reportReturnType=false, reportCallIssue=false, reportIndexIssue=false, reportOperatorIssue=false, reportOptionalMemberAccess=false, reportOptionalCall=false, reportOptionalSubscript=false, reportOptionalOperand=false, reportAttributeAccessIssue=false, reportPrivateImportUsage=false, reportPrivateUsage=false, reportInvalidTypeForm=false, reportConstantRedefinition=false, reportUnnecessaryComparison=false
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import sys
|
||||
import threading
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from numpy.typing import NDArray
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# GFPGANv1.4 weights (Apache-2.0). Downloaded on first use, never bundled.
|
||||
_GFPGAN_MODEL_URL = "https://github.com/TencentARC/GFPGAN/releases/download/v1.3.0/GFPGANv1.4.pth"
|
||||
_GFPGAN_ARCH = "clean"
|
||||
_GFPGAN_CHANNEL_MULTIPLIER = 2
|
||||
|
||||
_restorer: Any | None = None
|
||||
_restorer_lock = threading.Lock()
|
||||
|
||||
|
||||
def is_available() -> bool:
|
||||
"""True when the optional GFPGAN face-restoration deps are importable."""
|
||||
import importlib.util
|
||||
|
||||
return importlib.util.find_spec("gfpgan") is not None and importlib.util.find_spec("facexlib") is not None
|
||||
|
||||
|
||||
def _apply_basicsr_shim() -> None:
|
||||
"""Install the ``torchvision.transforms.functional_tensor`` compatibility shim.
|
||||
|
||||
basicsr (a GFPGAN dependency) imports ``rgb_to_grayscale`` from the
|
||||
``torchvision.transforms.functional_tensor`` module, which newer torchvision
|
||||
removed. Recreate that module pointing at the public functional API. Idempotent:
|
||||
only installed when the real module is missing.
|
||||
"""
|
||||
import importlib.util
|
||||
|
||||
if importlib.util.find_spec("torchvision.transforms.functional_tensor") is not None:
|
||||
return
|
||||
if "torchvision.transforms.functional_tensor" in sys.modules:
|
||||
return
|
||||
|
||||
import types
|
||||
|
||||
import torchvision.transforms.functional as tv_functional
|
||||
|
||||
shim = types.ModuleType("torchvision.transforms.functional_tensor")
|
||||
shim.rgb_to_grayscale = tv_functional.rgb_to_grayscale
|
||||
sys.modules["torchvision.transforms.functional_tensor"] = shim
|
||||
|
||||
|
||||
def _select_device() -> str:
|
||||
"""Pick the GFPGAN device: CUDA when present, else CPU.
|
||||
|
||||
The pip GFPGANer has an MPS device-mismatch bug, and this is a cheap post-pass
|
||||
on a few face crops, so MPS is deliberately avoided -- CPU is the safe default
|
||||
on Apple silicon.
|
||||
"""
|
||||
try:
|
||||
import torch
|
||||
|
||||
if torch.cuda.is_available():
|
||||
return "cuda"
|
||||
except Exception as e:
|
||||
logger.debug("face_restore: CUDA probe failed (%s); using CPU", e)
|
||||
return "cpu"
|
||||
|
||||
|
||||
def _get_restorer() -> Any:
|
||||
"""Return the lazily-built GFPGANer singleton (downloads weights on first use)."""
|
||||
global _restorer
|
||||
if _restorer is not None:
|
||||
return _restorer
|
||||
with _restorer_lock:
|
||||
if _restorer is None:
|
||||
_apply_basicsr_shim()
|
||||
from gfpgan import GFPGANer
|
||||
|
||||
_restorer = GFPGANer(
|
||||
model_path=_GFPGAN_MODEL_URL,
|
||||
upscale=1,
|
||||
arch=_GFPGAN_ARCH,
|
||||
channel_multiplier=_GFPGAN_CHANNEL_MULTIPLIER,
|
||||
device=_select_device(),
|
||||
)
|
||||
return _restorer
|
||||
|
||||
|
||||
def _composite_faces(
|
||||
base_bgr: NDArray[Any],
|
||||
restored_bgr: NDArray[Any],
|
||||
boxes: list[tuple[float, float, float, float]],
|
||||
pad: int = 14,
|
||||
feather_div: int = 6,
|
||||
) -> NDArray[Any]:
|
||||
"""Feather-composite restored face regions from ``restored_bgr`` into ``base_bgr``.
|
||||
|
||||
Pure cv2/numpy helper (no gfpgan), so it is unit-testable without the model.
|
||||
For each ``(x1, y1, x2, y2)`` box: pad and clip to the image, build a Gaussian-
|
||||
feathered rectangular alpha, and blend ``restored * a + base * (1 - a)``. Boxes
|
||||
that fall fully outside the image (or an empty list) leave ``base_bgr`` unchanged.
|
||||
"""
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
out = base_bgr.astype(np.float32)
|
||||
h, w = base_bgr.shape[:2]
|
||||
|
||||
for box in boxes:
|
||||
x1 = int(box[0]) - pad
|
||||
y1 = int(box[1]) - pad
|
||||
x2 = int(box[2]) + pad
|
||||
y2 = int(box[3]) + pad
|
||||
x1 = max(0, min(x1, w))
|
||||
y1 = max(0, min(y1, h))
|
||||
x2 = max(0, min(x2, w))
|
||||
y2 = max(0, min(y2, h))
|
||||
bw = x2 - x1
|
||||
bh = y2 - y1
|
||||
if bw <= 0 or bh <= 0:
|
||||
continue
|
||||
|
||||
alpha = np.zeros((h, w), dtype=np.float32)
|
||||
alpha[y1:y2, x1:x2] = 1.0
|
||||
k = max(3, (min(bw, bh) // feather_div) | 1) # odd kernel >= 3
|
||||
alpha = cv2.GaussianBlur(alpha, (k, k), 0)
|
||||
alpha = alpha[:, :, None]
|
||||
out = restored_bgr.astype(np.float32) * alpha + out * (1.0 - alpha)
|
||||
|
||||
return np.clip(out, 0, 255).astype(np.uint8)
|
||||
|
||||
|
||||
def restore_faces(
|
||||
original_bgr: NDArray[Any], # legacy positional kept for API stability; unused
|
||||
cleaned_bgr: NDArray[Any],
|
||||
weight: float = 0.5,
|
||||
pad: int = 14,
|
||||
feather_div: int = 6,
|
||||
) -> NDArray[Any]:
|
||||
"""Restore face identity in ``cleaned_bgr`` by running GFPGAN on the CLEANED image.
|
||||
|
||||
GFPGAN is a fidelity-restoration net: it sharpens and re-synthesizes face details
|
||||
from its StyleGAN2 prior conditioned on the INPUT face. **Running it on the
|
||||
diffusion-cleaned image (not the original)** is what makes this pass SynthID-safe:
|
||||
the input pixels GFPGAN derives from are already SynthID-free, so the partial
|
||||
pixel-blend at the default weight 0.5 cannot re-introduce the watermark.
|
||||
|
||||
The earlier version of this module ran GFPGAN on the ORIGINAL (watermarked) image
|
||||
and was oracle-confirmed (2026-06-04) to re-introduce SynthID into the face
|
||||
regions. The fix is the single-line source swap below.
|
||||
|
||||
The ``original_bgr`` argument is kept for positional API stability with the
|
||||
earlier signature but is no longer used; pass it for legacy callers, ignore it
|
||||
in new code.
|
||||
|
||||
Args:
|
||||
original_bgr: UNUSED (legacy; kept for positional API stability).
|
||||
cleaned_bgr: The diffusion-cleaned image as cv2 BGR (faces drifted from the
|
||||
removal pass). GFPGAN runs on THIS, polishing each face without changing
|
||||
the watermark state of the source pixels.
|
||||
weight: GFPGAN fidelity weight (0-1); lower = more StyleGAN2 regeneration of
|
||||
the face from the prior.
|
||||
pad: Pixels to grow each face box before compositing.
|
||||
feather_div: Larger = sharper composite edge (box-min // feather_div kernel).
|
||||
"""
|
||||
restorer = _get_restorer()
|
||||
_, _, restored_img = restorer.enhance(
|
||||
cleaned_bgr,
|
||||
has_aligned=False,
|
||||
only_center_face=False,
|
||||
paste_back=True,
|
||||
weight=weight,
|
||||
)
|
||||
|
||||
det_faces = getattr(restorer.face_helper, "det_faces", None) or []
|
||||
boxes = [(float(b[0]), float(b[1]), float(b[2]), float(b[3])) for b in det_faces]
|
||||
if not boxes:
|
||||
logger.debug("face_restore: no faces detected; returning cleaned image unchanged")
|
||||
return cleaned_bgr
|
||||
|
||||
return _composite_faces(cleaned_bgr, restored_img, boxes, pad=pad, feather_div=feather_div)
|
||||
@@ -180,11 +180,11 @@ class InvisibleEngine:
|
||||
guidance_scale: Classifier-free guidance scale.
|
||||
seed: Random seed for reproducibility.
|
||||
humanize: Intensity of Analog Humanizer film grain (0 = off).
|
||||
restore_faces: EXPERIMENTAL, opt-in (default False). Run the GFPGAN
|
||||
face-polish post-pass when faces are present (needs the ``restore``
|
||||
extra). Runs on the diffusion-CLEANED image (not the original), so
|
||||
SynthID is not re-introduced. Auto-skips with a debug log when the
|
||||
extra is absent or no face is detected.
|
||||
restore_faces: EXPERIMENTAL, opt-in (default False). **NON-COMMERCIAL.**
|
||||
Run the PhotoMaker-V2 face-identity post-pass when faces are present
|
||||
(needs the ``photomaker`` extra, which pulls non-commercial InsightFace
|
||||
model packs). Auto-skips with a debug log when the extra is absent or no
|
||||
face is detected. See ``photomaker_restore.py`` for the legal notice.
|
||||
unsharp: Final unsharp-mask sharpening strength (0 = off, default).
|
||||
Applied last (after face restoration) to counter the soft,
|
||||
over-smoothed look of the diffusion + restoration; ~0.5-0.8 is a
|
||||
@@ -316,7 +316,7 @@ class InvisibleEngine:
|
||||
# GFPGAN derives from are already SynthID-free). Auto-skips when faces are
|
||||
# absent or the optional `restore` extra is not installed.
|
||||
if restore_faces:
|
||||
self._restore_faces(out_path)
|
||||
self._restore_faces_photomaker(out_path, image, seed)
|
||||
|
||||
# Final sharpening, LAST so it crisps the face-restored result too (a
|
||||
# pre-restore sharpen would be smoothed back over by the face pass).
|
||||
@@ -355,24 +355,29 @@ class InvisibleEngine:
|
||||
if _tmp_path.exists():
|
||||
_tmp_path.unlink()
|
||||
|
||||
def _restore_faces(self, out_path: Path) -> None:
|
||||
"""Run the GFPGAN face-polish post-pass on the cleaned ``out_path``.
|
||||
def _restore_faces_photomaker(
|
||||
self,
|
||||
out_path: Path,
|
||||
original_image: Any,
|
||||
seed: int | None,
|
||||
) -> None:
|
||||
"""Run the PhotoMaker-V2 face-identity post-pass on the cleaned ``out_path``.
|
||||
|
||||
SynthID-safe: GFPGAN is run on the diffusion-CLEANED image (not the original),
|
||||
so the partial pixel-blend it does at fidelity weight 0.5 cannot re-introduce
|
||||
the watermark -- the input pixels GFPGAN derives from are already SynthID-free.
|
||||
Best-effort: any failure logs a warning and leaves the un-restored cleaned
|
||||
output in place; a missing ``restore`` extra is logged at debug and skipped
|
||||
(the flag must never error when the extra is absent or no face is present).
|
||||
**NON-COMMERCIAL** (see ``photomaker_restore.py``). PhotoMaker carries identity
|
||||
in a CLIP+ArcFace embedding and regenerates fresh face pixels conditioned on
|
||||
it, so the watermark is not transported. Best-effort: any failure (missing
|
||||
extra, model load, runtime error) logs a warning and leaves the un-restored
|
||||
cleaned output in place.
|
||||
"""
|
||||
from remove_ai_watermarks import face_restore
|
||||
from remove_ai_watermarks import photomaker_restore
|
||||
|
||||
if not face_restore.is_available():
|
||||
logger.debug("restore_faces requested but the 'restore' extra is not installed; skipping")
|
||||
if not photomaker_restore.is_available():
|
||||
logger.debug("restore_faces requested but the 'photomaker' extra is not installed; skipping")
|
||||
return
|
||||
|
||||
try:
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
from remove_ai_watermarks import image_io
|
||||
|
||||
@@ -381,13 +386,15 @@ class InvisibleEngine:
|
||||
logger.warning("restore_faces: could not read cleaned output %s; skipping", out_path)
|
||||
return
|
||||
|
||||
if self._progress_callback:
|
||||
self._progress_callback("Polishing face identity (GFPGAN on cleaned image)...")
|
||||
# original_bgr is unused (GFPGAN runs on cleaned_bgr); pass an empty array
|
||||
# for positional API stability with the legacy signature.
|
||||
import numpy as np
|
||||
original_rgb = original_image.convert("RGB")
|
||||
original_bgr = cv2.cvtColor(np.array(original_rgb), cv2.COLOR_RGB2BGR)
|
||||
cleaned_size = (cleaned_bgr.shape[1], cleaned_bgr.shape[0])
|
||||
if (original_bgr.shape[1], original_bgr.shape[0]) != cleaned_size:
|
||||
original_bgr = cv2.resize(original_bgr, cleaned_size, interpolation=cv2.INTER_LANCZOS4)
|
||||
|
||||
restored = face_restore.restore_faces(np.empty((0, 0, 3), dtype=np.uint8), cleaned_bgr)
|
||||
if self._progress_callback:
|
||||
self._progress_callback("Restoring face identity (PhotoMaker-V2 post-pass)...")
|
||||
restored = photomaker_restore.restore_faces_photomaker(original_bgr, cleaned_bgr, seed=seed)
|
||||
image_io.imwrite(out_path, restored)
|
||||
except Exception as e:
|
||||
logger.warning("restore_faces post-pass failed (%s); keeping un-restored output", e)
|
||||
|
||||
@@ -0,0 +1,317 @@
|
||||
"""SynthID-robust face identity restoration via PhotoMaker-V2.
|
||||
|
||||
**NON-COMMERCIAL.** This module uses PhotoMaker-V2, whose ID encoder
|
||||
(``PhotoMakerIDEncoder_CLIPInsightfaceExtendtoken``) requires an ArcFace embedding
|
||||
from InsightFace's pretrained ``antelopev2`` / ``buffalo_l`` model packs. Those packs
|
||||
are released by InsightFace under a **non-commercial / research-only license**:
|
||||
|
||||
"The pretrained models we provided with this library are available for
|
||||
non-commercial research purposes only."
|
||||
-- insightface PyPI README
|
||||
|
||||
The PyPI ``insightface`` package itself is MIT-licensed code, but the model weights
|
||||
it downloads on first ``FaceAnalysis()`` are not commercial. **A paid service
|
||||
(raiw.cc, any monetized SaaS, any enterprise deployment) MUST NOT use this path.**
|
||||
The default ``--restore-faces`` method is ``gfpgan`` (commercial-safe, ships with
|
||||
the ``restore`` extra); ``--restore-faces-method photomaker`` is an explicit opt-in
|
||||
for non-commercial use only. See ``docs/synthid-robust-identity-research.md``.
|
||||
|
||||
The diffusion removal pass scrubs the pixel watermark from the WHOLE image, including
|
||||
faces, but lets faces drift in identity. PhotoMaker-V2 carries identity in two
|
||||
semantic streams (an OpenCLIP-ViT-H/14 image embedding AND an ArcFace identity
|
||||
embedding) and uses them to CONDITION a fresh txt2img generation -- the pixels are
|
||||
new, so the watermark cannot be transported.
|
||||
|
||||
That embeddings do not carry an invisible pixel watermark like SynthID is the
|
||||
load-bearing assumption of the whole approach; the OpenCLIP smoke test (cosine
|
||||
0.9977 invariance to SynthID-magnitude pixel noise) supports it for the CLIP
|
||||
stream, and ArcFace is even more invariant to small perceptual changes by design.
|
||||
|
||||
Architecture: PhotoMaker-V2 is a fine-tuned OpenCLIP-ViT-H/14 + InsightFace dual ID
|
||||
encoder plus LoRA on the SDXL UNet attention layers. It ships as a single
|
||||
``photomaker-v2.bin`` checkpoint loaded into a ``PhotoMakerStableDiffusionXLPipeline``
|
||||
(txt2img). We use it as a SECOND PASS after the main controlnet/default removal:
|
||||
|
||||
1. Main removal pass (`controlnet` at the certified strength) cleans SynthID
|
||||
everywhere but leaves faces drifted.
|
||||
2. For each face found in the CLEANED image (YuNet), this module takes the SAME
|
||||
face region from the ORIGINAL, computes the dual ID embedding from it, and
|
||||
runs PhotoMaker txt2img to regenerate JUST that face crop from the embedding.
|
||||
The freshly generated face is feather-composited back into the cleaned image.
|
||||
|
||||
The generated face pixels are diffusion-fresh and inherit identity from the
|
||||
embedding (not the pixels), so SynthID is not re-introduced.
|
||||
|
||||
Requires the optional ``photomaker`` extra: ``pip install
|
||||
'remove-ai-watermarks[photomaker]'`` -- this pulls the upstream PhotoMaker package
|
||||
(Apache-2.0), ``insightface`` (MIT code), ``einops``, ``peft``, ``onnxruntime``,
|
||||
and ``huggingface-hub``. Weights and InsightFace model packs download on first use;
|
||||
never bundled.
|
||||
"""
|
||||
|
||||
# cv2/torch/diffusers boundary: relax unknown-type rules for this file only.
|
||||
# pyright: reportUnknownMemberType=false, reportUnknownArgumentType=false, reportUnknownVariableType=false, reportUnknownParameterType=false, reportMissingTypeArgument=false, reportMissingTypeStubs=false, reportMissingImports=false, reportArgumentType=false, reportAssignmentType=false, reportReturnType=false, reportCallIssue=false, reportIndexIssue=false, reportOperatorIssue=false, reportOptionalMemberAccess=false, reportOptionalCall=false, reportOptionalSubscript=false, reportOptionalOperand=false, reportAttributeAccessIssue=false, reportPrivateImportUsage=false, reportPrivateUsage=false, reportInvalidTypeForm=false, reportConstantRedefinition=false, reportUnnecessaryComparison=false
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib.util
|
||||
import logging
|
||||
import threading
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from numpy.typing import NDArray
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# PhotoMaker-V2 weights (Apache-2.0 adapter; ID encoder pulls non-commercial
|
||||
# InsightFace model packs at runtime -- see the NON-COMMERCIAL notice in the module
|
||||
# docstring). Downloaded on first use; never bundled.
|
||||
_PHOTOMAKER_REPO = "TencentARC/PhotoMaker-V2"
|
||||
_PHOTOMAKER_FILE = "photomaker-v2.bin"
|
||||
# SDXL base shared with the main pipeline (same checkpoint as `default`/`controlnet`).
|
||||
_SDXL_MODEL_ID = "stabilityai/stable-diffusion-xl-base-1.0"
|
||||
|
||||
# The neutral prompt PhotoMaker is designed around: a class noun + the trigger word
|
||||
# `img`, which PhotoMaker replaces with the ID embedding at inference. Keeping it
|
||||
# scene-neutral (no extra style words) maximises identity transfer from the embed and
|
||||
# minimises hallucinated background/lighting that would not match the cleaned scene.
|
||||
_PHOTOMAKER_PROMPT = "a portrait photo of a person img, natural lighting, sharp focus"
|
||||
_PHOTOMAKER_NEGATIVE = "blurry, lowres, deformed, distorted, watermark"
|
||||
|
||||
# Square size used to feed PhotoMaker (must match a multiple of 64; 512 fits CPU/GPU
|
||||
# comfortably and gives the encoder enough pixels for a stable embedding).
|
||||
_PHOTOMAKER_FACE_SIZE = 512
|
||||
|
||||
_pipeline: Any | None = None
|
||||
_pipeline_lock = threading.Lock()
|
||||
|
||||
|
||||
def is_available() -> bool:
|
||||
"""True when the optional PhotoMaker extra deps are importable."""
|
||||
return (
|
||||
importlib.util.find_spec("photomaker") is not None
|
||||
and importlib.util.find_spec("diffusers") is not None
|
||||
and importlib.util.find_spec("huggingface_hub") is not None
|
||||
)
|
||||
|
||||
|
||||
def _select_device() -> str:
|
||||
"""Pick the PhotoMaker pipeline device: CUDA when present, MPS on Apple, else CPU."""
|
||||
try:
|
||||
import torch
|
||||
|
||||
if torch.cuda.is_available():
|
||||
return "cuda"
|
||||
if torch.backends.mps.is_available():
|
||||
return "mps"
|
||||
except Exception as e:
|
||||
logger.debug("photomaker_restore: device probe failed (%s); using CPU", e)
|
||||
return "cpu"
|
||||
|
||||
|
||||
def _get_pipeline() -> Any:
|
||||
"""Return the lazily-built PhotoMaker pipeline singleton (downloads weights on first use)."""
|
||||
global _pipeline
|
||||
if _pipeline is not None:
|
||||
return _pipeline
|
||||
with _pipeline_lock:
|
||||
if _pipeline is None:
|
||||
import torch
|
||||
from huggingface_hub import hf_hub_download
|
||||
from photomaker import PhotoMakerStableDiffusionXLPipeline
|
||||
|
||||
device = _select_device()
|
||||
dtype = torch.float16 if device == "cuda" else torch.float32
|
||||
logger.info("photomaker_restore: loading SDXL+PhotoMaker on %s (%s)", device, dtype)
|
||||
|
||||
adapter_path = hf_hub_download(repo_id=_PHOTOMAKER_REPO, filename=_PHOTOMAKER_FILE)
|
||||
pipe = PhotoMakerStableDiffusionXLPipeline.from_pretrained(_SDXL_MODEL_ID, torch_dtype=dtype)
|
||||
# Move SDXL submodules to the device BEFORE loading the PhotoMaker adapter:
|
||||
# ``load_photomaker_adapter`` reads ``self.device`` / ``self.unet.dtype`` to
|
||||
# place the new ID encoder. If we ``.to(device)`` after, the SDXL submodules
|
||||
# move but the id_encoder stays where it was (custom attribute, not in the
|
||||
# auto-managed module tree), and inference errors with
|
||||
# "Input type (torch.cuda.HalfTensor) and weight type (torch.HalfTensor)
|
||||
# should be the same" (caught empirically 2026-06-04).
|
||||
pipe.to(device)
|
||||
# Default ``pm_version`` is "v2"; we load the V2 weights (photomaker-v2.bin)
|
||||
# into the V2 encoder (PhotoMakerIDEncoder_CLIPInsightfaceExtendtoken). The V2
|
||||
# encoder takes BOTH the CLIP image features AND an InsightFace ArcFace
|
||||
# embedding -- the latter is what makes this path non-commercial.
|
||||
pipe.load_photomaker_adapter(
|
||||
str(Path(adapter_path).parent),
|
||||
subfolder="",
|
||||
weight_name=_PHOTOMAKER_FILE,
|
||||
trigger_word="img",
|
||||
)
|
||||
pipe.fuse_lora()
|
||||
# Belt: also explicitly cast the loaded id_encoder, because some
|
||||
# diffusers/torch combinations leave the encoder buffers untouched even
|
||||
# though ``pipe.to(device)`` ran first.
|
||||
if hasattr(pipe, "id_encoder") and pipe.id_encoder is not None:
|
||||
pipe.id_encoder = pipe.id_encoder.to(device=device, dtype=dtype)
|
||||
_pipeline = pipe
|
||||
return _pipeline
|
||||
|
||||
|
||||
def _face_crop_square(
|
||||
image_bgr: NDArray[Any],
|
||||
box: tuple[int, int, int, int],
|
||||
pad: float = 0.30,
|
||||
) -> tuple[NDArray[Any], tuple[int, int, int, int]]:
|
||||
"""Square crop around a face box (with padding), clipped to the image.
|
||||
|
||||
Returns ``(crop_bgr, (x1, y1, x2, y2))``. The crop is the image content inside the
|
||||
returned square box -- callers use the box for the composite step. Pure numpy slicing,
|
||||
no model.
|
||||
"""
|
||||
h, w = image_bgr.shape[:2]
|
||||
x, y, bw, bh = box
|
||||
cx, cy = x + bw // 2, y + bh // 2
|
||||
side = int(max(bw, bh) * (1.0 + 2.0 * pad))
|
||||
half = side // 2
|
||||
x1 = max(0, cx - half)
|
||||
y1 = max(0, cy - half)
|
||||
x2 = min(w, cx + half)
|
||||
y2 = min(h, cy + half)
|
||||
return image_bgr[y1:y2, x1:x2], (x1, y1, x2, y2)
|
||||
|
||||
|
||||
def _composite_faces(
|
||||
base_bgr: NDArray[Any],
|
||||
restored_crops: list[tuple[NDArray[Any], tuple[int, int, int, int]]],
|
||||
feather_div: int = 6,
|
||||
) -> NDArray[Any]:
|
||||
"""Feather-composite a list of ``(restored_crop, (x1, y1, x2, y2))`` into ``base_bgr``.
|
||||
|
||||
Pure cv2/numpy helper (no model), unit-testable. For each ``(crop, box)``: resize
|
||||
the crop to the box size, build a Gaussian-feathered rectangular alpha, and blend
|
||||
``crop * a + base * (1 - a)``. Boxes that fall fully outside the image (or an empty
|
||||
list) leave ``base_bgr`` unchanged. Mirrors the alpha math in ``face_restore._composite_faces``.
|
||||
"""
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
out = base_bgr.astype(np.float32)
|
||||
h, w = base_bgr.shape[:2]
|
||||
|
||||
for crop, (x1, y1, x2, y2) in restored_crops:
|
||||
x1, y1 = max(0, x1), max(0, y1)
|
||||
x2, y2 = min(w, x2), min(h, y2)
|
||||
bw, bh = x2 - x1, y2 - y1
|
||||
if bw <= 0 or bh <= 0:
|
||||
continue
|
||||
resized = cv2.resize(crop, (bw, bh), interpolation=cv2.INTER_LANCZOS4)
|
||||
|
||||
alpha = np.zeros((h, w), dtype=np.float32)
|
||||
alpha[y1:y2, x1:x2] = 1.0
|
||||
k = max(3, (min(bw, bh) // feather_div) | 1)
|
||||
alpha = cv2.GaussianBlur(alpha, (k, k), 0)[:, :, None]
|
||||
|
||||
full_restored = np.zeros_like(out)
|
||||
full_restored[y1:y2, x1:x2] = resized
|
||||
out = full_restored * alpha + out * (1.0 - alpha)
|
||||
|
||||
return np.clip(out, 0, 255).astype(np.uint8)
|
||||
|
||||
|
||||
def restore_faces_photomaker(
|
||||
original_bgr: NDArray[Any],
|
||||
cleaned_bgr: NDArray[Any],
|
||||
num_inference_steps: int = 30,
|
||||
guidance_scale: float = 5.0,
|
||||
style_strength: int = 20,
|
||||
seed: int | None = None,
|
||||
detect_faces_fn: Any | None = None,
|
||||
) -> NDArray[Any]:
|
||||
"""SynthID-robust face identity restoration via PhotoMaker txt2img.
|
||||
|
||||
Pipeline:
|
||||
1. Detect faces in ``cleaned_bgr`` (YuNet via the package's ``auto_config`` by
|
||||
default; override via ``detect_faces_fn`` for tests).
|
||||
2. For each face: take the SAME box from ``original_bgr`` -> square crop -> PhotoMaker
|
||||
txt2img with that crop as the ID image -> a fresh face generated from the
|
||||
OpenCLIP embedding (the embedding is SynthID-invariant by ~3 orders of magnitude,
|
||||
see docs/synthid-robust-identity-research.md).
|
||||
3. Feather-composite each regenerated face into ``cleaned_bgr``.
|
||||
|
||||
Faces are taken from ``original_bgr`` (the embedding ignores the watermark) but the
|
||||
PIXELS that land in the output are diffusion-fresh, so SynthID is not transported.
|
||||
|
||||
Args:
|
||||
original_bgr: The original (watermarked) image as cv2 BGR. Source of identity.
|
||||
cleaned_bgr: The main-pass output as cv2 BGR. Faces drifted in identity; this
|
||||
module replaces those face regions.
|
||||
num_inference_steps: Diffusion steps inside PhotoMaker (def 30).
|
||||
guidance_scale: CFG scale inside PhotoMaker (def 5.0; the PhotoMaker recipe).
|
||||
style_strength: PhotoMaker's ``start_merge_step`` knob ~ 20-30 (def 20).
|
||||
seed: Optional seed for reproducibility.
|
||||
detect_faces_fn: Optional callable ``(bgr) -> list[(x,y,w,h)]`` to override the
|
||||
default YuNet detector (used by tests).
|
||||
|
||||
Returns:
|
||||
``cleaned_bgr`` with regenerated face regions composited in (or unchanged when
|
||||
no face is detected).
|
||||
"""
|
||||
import cv2
|
||||
import numpy as np
|
||||
import torch
|
||||
from PIL import Image
|
||||
|
||||
if detect_faces_fn is None:
|
||||
from remove_ai_watermarks import auto_config as _ac
|
||||
|
||||
def _default_detect(bgr: NDArray[Any]) -> list[tuple[int, int, int, int]]:
|
||||
h, w = bgr.shape[:2]
|
||||
model = Path(_ac.__file__).parent / "assets" / "face_detection_yunet_2023mar.onnx"
|
||||
det = cv2.FaceDetectorYN.create(str(model), "", (w, h), _ac._FACE_SCORE, 0.3, 5000)
|
||||
det.setInputSize((w, h))
|
||||
_, faces = det.detect(bgr)
|
||||
if faces is None:
|
||||
return []
|
||||
return [(int(f[0]), int(f[1]), int(f[2]), int(f[3])) for f in faces if int(f[2]) > 0 and int(f[3]) > 0]
|
||||
|
||||
detect_faces_fn = _default_detect
|
||||
|
||||
boxes = detect_faces_fn(cleaned_bgr)
|
||||
if not boxes:
|
||||
logger.debug("photomaker_restore: no faces detected; returning cleaned image unchanged")
|
||||
return cleaned_bgr
|
||||
|
||||
pipeline = _get_pipeline()
|
||||
generator = None
|
||||
if seed is not None:
|
||||
generator = torch.Generator(device=pipeline.device).manual_seed(seed)
|
||||
|
||||
restored: list[tuple[NDArray[Any], tuple[int, int, int, int]]] = []
|
||||
for box in boxes:
|
||||
id_crop_bgr, square_box = _face_crop_square(original_bgr, box)
|
||||
if id_crop_bgr.size == 0:
|
||||
continue
|
||||
id_crop_rgb = cv2.cvtColor(id_crop_bgr, cv2.COLOR_BGR2RGB)
|
||||
id_image_pil = Image.fromarray(id_crop_rgb)
|
||||
|
||||
# Don't pass negative_prompt: the PhotoMaker pipeline manages its own CFG by
|
||||
# concatenating [negative_prompt_embeds, prompt_embeds]; if we pass a custom
|
||||
# negative the upstream code splits text_only vs id-injected branches and
|
||||
# the resulting embed batch dims can mismatch (we saw
|
||||
# "Sizes of tensors must match except in dimension 1. Expected size 2 but got
|
||||
# size 1" on a real run). The default empty negative is what the upstream
|
||||
# gradio demo uses.
|
||||
out = pipeline(
|
||||
prompt=_PHOTOMAKER_PROMPT,
|
||||
input_id_images=[id_image_pil],
|
||||
num_inference_steps=num_inference_steps,
|
||||
guidance_scale=guidance_scale,
|
||||
start_merge_step=style_strength,
|
||||
generator=generator,
|
||||
height=_PHOTOMAKER_FACE_SIZE,
|
||||
width=_PHOTOMAKER_FACE_SIZE,
|
||||
num_images_per_prompt=1,
|
||||
)
|
||||
gen_rgb = out.images[0]
|
||||
gen_bgr = cv2.cvtColor(np.array(gen_rgb), cv2.COLOR_RGB2BGR)
|
||||
restored.append((gen_bgr, square_box))
|
||||
|
||||
return _composite_faces(cleaned_bgr, restored)
|
||||
Reference in New Issue
Block a user