Files
Kenneth Estanislao ca8e39e3bb Fix mouth mask
2026-05-18 02:11:04 +08:00

1562 lines
71 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from typing import Any, List, Optional, Tuple
import cv2
import insightface
import logging
import threading
import numpy as np
import platform
import modules.globals
import modules.processors.frame.core
from modules.core import update_status
from modules.face_analyser import get_one_face, get_many_faces, default_source_face
from modules.typing import Face, Frame
from modules.utilities import (
conditional_download,
is_image,
is_video,
)
from modules.cluster_analysis import find_closest_centroid
from modules.gpu_processing import gpu_gaussian_blur, gpu_sharpen, gpu_add_weighted, gpu_resize, gpu_cvt_color
import os
from collections import deque
import time
FACE_SWAPPER = None
THREAD_LOCK = threading.Lock()
NAME = "DLC.FACE-SWAPPER"
# --- START: Added for Interpolation ---
PREVIOUS_FRAME_RESULT = None # Stores the final processed frame from the previous step
# --- END: Added for Interpolation ---
# --- Poisson blend (ported from deep-live-cam-gumroad-edition) ---
# Root-cause fix for the "wobble": the blend mask is NOT built from the
# independently-detected 106-pt landmarks (they jitter sub-pixel every frame
# and seamlessClone is hyper-sensitive to its mask boundary). Instead it is
# derived from the swap's OWN affine transform (M) + the swapped pixels
# (bgr_fake), so the mask is locked exactly to where the swapped face was
# placed — no independent jitter source, no EMA, no lag. The mask is cached
# when the face is nearly still so an identical array is reused (zero wobble).
_ELLIPTICAL_MASK_CACHE: dict = {}
_poisson_cached_mask: Optional[np.ndarray] = None
_poisson_cached_key: Optional[tuple] = None
def _create_elliptical_mask(size: Tuple[int, int]) -> np.ndarray:
"""Fixed, heavily-blurred elliptical mask in aligned-face space.
Geometry-based (not content-adaptive) and cached by size — identical
every frame for the same model input size, so it contributes no jitter.
"""
global _ELLIPTICAL_MASK_CACHE
if size in _ELLIPTICAL_MASK_CACHE:
return _ELLIPTICAL_MASK_CACHE[size]
h, w = size
center = (w // 2, h // 2)
axes = (int(w * 0.44), int(h * 0.44))
mask = np.zeros((h, w), dtype=np.float32)
cv2.ellipse(mask, center, axes, 0, 0, 360, 1, -1)
if h * w < 65536:
mask = cv2.GaussianBlur(mask, (31, 31), 12)
else:
mask = gpu_gaussian_blur(mask, (31, 31), 12)
_ELLIPTICAL_MASK_CACHE[size] = mask
return mask
def _apply_poisson_blend(swapped_frame: Frame, original_frame: Frame,
target_face: Face, affine_matrix: np.ndarray = None,
bgr_fake: np.ndarray = None) -> Frame:
"""Poisson-blend the swapped face onto the original frame.
Preferred path derives the blend mask from the swap's inverse affine so
it tracks the swapped face exactly per-frame (no landmark jitter, no
smoothing). Falls back to a cached bbox-ellipse if the affine is absent.
Writes only the blended ellipse back so other faces are preserved.
"""
global _poisson_cached_mask, _poisson_cached_key
try:
# ---- Preferred: blend ONLY the genuinely-swapped region ----
# Use the exact paste-back mask (warped elliptical mask), eroded so
# the Poisson seam sits on solidly-swapped pixels only.
if affine_matrix is not None and bgr_fake is not None:
try:
h, w = swapped_frame.shape[:2]
fh, fw = bgr_fake.shape[:2]
inv = cv2.invertAffineTransform(affine_matrix)
corners = np.array([[0, 0, 1], [fw, 0, 1], [fw, fh, 1], [0, fh, 1]],
dtype=np.float32)
t = corners @ inv.T
px1 = max(0, int(np.floor(t[:, 0].min())))
py1 = max(0, int(np.floor(t[:, 1].min())))
px2 = min(w, int(np.ceil(t[:, 0].max())))
py2 = min(h, int(np.ceil(t[:, 1].max())))
rw, rh = px2 - px1, py2 - py1
if rw > 8 and rh > 8:
roi_aff = inv.copy()
roi_aff[0, 2] -= px1
roi_aff[1, 2] -= py1
fm = _create_elliptical_mask((fh, fw))
mroi = cv2.warpAffine(fm, roi_aff, (rw, rh),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_CONSTANT, borderValue=0)
bin_roi = np.where(mroi > 0.5, np.uint8(255), np.uint8(0))
k = max(3, (min(rw, rh) // 20) | 1)
bin_roi = cv2.erode(bin_roi,
cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (k, k)))
bx, by, bw, bh = cv2.boundingRect(bin_roi)
if bw > 0 and bh > 0:
mx1, my1 = px1 + bx, py1 + by
mx2, my2 = mx1 + bw - 1, my1 + bh - 1
# seamlessClone needs the cloned region off the border
if mx1 > 0 and my1 > 0 and mx2 < w - 1 and my2 < h - 1:
mask = np.zeros((h, w), dtype=np.uint8)
mask[py1:py2, px1:px2] = bin_roi
center = (mx1 + bw // 2, my1 + bh // 2)
blended = cv2.seamlessClone(swapped_frame, original_frame,
mask, center, cv2.NORMAL_CLONE)
np.copyto(swapped_frame[my1:my2 + 1, mx1:mx2 + 1],
blended[my1:my2 + 1, mx1:mx2 + 1],
where=mask[my1:my2 + 1, mx1:mx2 + 1, None].astype(bool))
return swapped_frame
except Exception:
pass # fall through to the robust bbox-ellipse path below
# ---- Fallback: bbox-ellipse (defensive, cached when still) ----
if not hasattr(target_face, 'bbox') or target_face.bbox is None:
return swapped_frame
x1, y1, x2, y2 = target_face.bbox.astype(int)
h, w = swapped_frame.shape[:2]
x1, y1 = (max(0, x1), max(0, y1))
x2, y2 = (min(w, x2), min(h, y2))
if x2 <= x1 or y2 <= y1 or x2 - x1 <= 10 or (y2 - y1 <= 10):
return swapped_frame
padding = int(min(x2 - x1, y2 - y1) * 0.1)
x1_p = max(0, x1 - padding)
y1_p = max(0, y1 - padding)
x2_p = min(w, x2 + padding)
y2_p = min(h, y2 + padding)
center_x = int(round((x1 + x2) / 2.0))
center_y = int(round((y1 + y2) / 2.0))
radius_x = max(1, int(round((x2_p - x1_p) / 2.0)))
radius_y = max(1, int(round((y2_p - y1_p) / 2.0)))
if not (0 <= center_x < w and 0 <= center_y < h):
return swapped_frame
center = (center_x, center_y)
if center_x - radius_x < 0 or center_x + radius_x >= w or center_y - radius_y < 0 or (center_y + radius_y >= h):
return swapped_frame
# Reuse cached mask when center/radius unchanged frame-to-frame
# (face nearly still) — saves the np.zeros + cv2.ellipse, and the
# identical array means literally zero wobble while still.
mask_key = (center_x, center_y, radius_x, radius_y, h, w)
if _poisson_cached_key == mask_key and _poisson_cached_mask is not None:
mask = _poisson_cached_mask
else:
mask = np.zeros((h, w), dtype=np.uint8)
cv2.ellipse(mask, center, (radius_x, radius_y), 0, 0, 360, 255, -1)
if np.sum(mask) == 0:
return swapped_frame
_poisson_cached_mask = mask
_poisson_cached_key = mask_key
blended = cv2.seamlessClone(swapped_frame, original_frame, mask, center, cv2.NORMAL_CLONE)
# Composite ONLY this face's ellipse back (ROI-bounded) so previously
# blended faces in multi-face mode are preserved.
rx0 = max(0, center_x - radius_x)
rx1 = min(w, center_x + radius_x + 1)
ry0 = max(0, center_y - radius_y)
ry1 = min(h, center_y + radius_y + 1)
roi_mask = mask[ry0:ry1, rx0:rx1]
np.copyto(swapped_frame[ry0:ry1, rx0:rx1],
blended[ry0:ry1, rx0:rx1],
where=roi_mask[:, :, None].astype(bool))
return swapped_frame
except Exception:
return swapped_frame
# --- START: Mac M1-M5 Optimizations ---
IS_APPLE_SILICON = platform.system() == 'Darwin' and platform.machine() == 'arm64'
FRAME_CACHE = deque(maxlen=3) # Cache for frame reuse
FACE_DETECTION_CACHE = {} # Cache face detections
LAST_DETECTION_TIME = 0
DETECTION_INTERVAL = 0.033 # ~30 FPS detection rate for live mode
FRAME_SKIP_COUNTER = 0
ADAPTIVE_QUALITY = True
# --- END: Mac M1-M5 Optimizations ---
abs_dir = os.path.dirname(os.path.abspath(__file__))
models_dir = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(abs_dir))), "models"
)
def pre_check() -> bool:
# Use models_dir instead of abs_dir to save to the correct location
download_directory_path = models_dir
# Make sure the models directory exists, catch permission errors if they occur
try:
os.makedirs(download_directory_path, exist_ok=True)
except OSError as e:
logging.error(f"Failed to create directory {download_directory_path} due to permission error: {e}")
return False
# Use the direct download URL from Hugging Face (FP32 model for broad GPU compatibility)
conditional_download(
download_directory_path,
[
"https://huggingface.co/hacksider/deep-live-cam/resolve/main/inswapper_128.onnx"
],
)
return True
def pre_start() -> bool:
# Check for either model variant
fp16_path = os.path.join(models_dir, "inswapper_128_fp16.onnx")
fp32_path = os.path.join(models_dir, "inswapper_128.onnx")
if not os.path.exists(fp16_path) and not os.path.exists(fp32_path):
update_status(f"Model not found in {models_dir}. Please download inswapper_128.onnx.", NAME)
return False
# Try to get the face swapper to ensure it loads correctly
if get_face_swapper() is None:
# Error message already printed within get_face_swapper
return False
return True
def get_face_swapper() -> Any:
global FACE_SWAPPER
with THREAD_LOCK:
if FACE_SWAPPER is None:
# Prefer FP16 on GPUs with Tensor Cores (Turing+) — half the
# memory bandwidth, faster inference. Fall back to FP32 for
# older GPUs (e.g. GTX 16xx) where FP16 can produce NaN.
fp32_path = os.path.join(models_dir, "inswapper_128.onnx")
fp16_path = os.path.join(models_dir, "inswapper_128_fp16.onnx")
use_fp16 = _HAS_TORCH_CUDA and os.path.exists(fp16_path)
if use_fp16:
model_path = fp16_path
elif os.path.exists(fp32_path):
model_path = fp32_path
else:
update_status(f"No inswapper model found in {models_dir}.", NAME)
return None
# On Apple Silicon, rewrite Pad(reflect) → Slice+Concat so
# CoreML can run the entire model in a single partition on
# the Neural Engine instead of bouncing between CPU and ANE.
if IS_APPLE_SILICON:
from modules.onnx_optimize import optimize_for_coreml
model_path = optimize_for_coreml(model_path)
update_status(f"Loading face swapper model from: {model_path}", NAME)
try:
providers_config = []
for p in modules.globals.execution_providers:
if p == "CoreMLExecutionProvider" and IS_APPLE_SILICON:
# Enhanced CoreML configuration for M1-M5
providers_config.append((
"CoreMLExecutionProvider",
{
"ModelFormat": "MLProgram",
"MLComputeUnits": "ALL", # Use Neural Engine + GPU + CPU
"SpecializationStrategy": "FastPrediction",
"AllowLowPrecisionAccumulationOnGPU": 1,
"EnableOnSubgraphs": 1,
}
))
elif p == "CUDAExecutionProvider":
# Use bare provider — ONNX Runtime defaults are
# fastest on modern GPUs (Blackwell/sm_120).
providers_config.append(p)
else:
providers_config.append(p)
FACE_SWAPPER = insightface.model_zoo.get_model(
model_path,
providers=providers_config,
)
# Set up CUDA graph session for faster inference
if _HAS_TORCH_CUDA and any(
p == "CUDAExecutionProvider" or
(isinstance(p, tuple) and p[0] == "CUDAExecutionProvider")
for p in providers_config
):
_init_cuda_graph_session(model_path, FACE_SWAPPER)
update_status("Face swapper model loaded successfully.", NAME)
except Exception as e:
update_status(f"Error loading face swapper model: {e}", NAME)
FACE_SWAPPER = None
return None
return FACE_SWAPPER
_HAS_TORCH_CUDA = False
try:
import torch
if torch.cuda.is_available():
_HAS_TORCH_CUDA = True
except ImportError:
pass
# Cache for paste-back
_paste_cache = {
'soft_alpha': None, # feathered alpha mask in aligned-face space
'alpha_size': 0,
}
def _get_soft_alpha(size: int) -> np.ndarray:
"""Feathered alpha template in aligned-face space, cached.
The legacy paste-back eroded and Gaussian-blurred the warped mask in
output coordinates with kernels scaled to the output face size, which
made the per-frame cost quartic in face linear size. Doing the same
erode+blur once in aligned space and then warping the *soft* mask
per-frame gives a visually equivalent feather at O(crop_area) cost —
the feather radius scales naturally with the affine transform.
"""
if _paste_cache['alpha_size'] != size:
# Elliptical (not square) template — matches the gumroad edition's
# _create_elliptical_mask. A full/eroded square leaves the aligned
# crop's corners near-opaque, so the swapped square's straight edges
# show as a visible box on the face. An ellipse (axes 0.44*size) zeroes
# the corners and the heavy blur feathers smoothly into the original.
center = (size // 2, size // 2)
axes = (int(size * 0.44), int(size * 0.44))
mask = np.zeros((size, size), dtype=np.uint8)
cv2.ellipse(mask, center, axes, 0, 0, 360, 255, -1)
mask = cv2.GaussianBlur(mask, (31, 31), 12)
_paste_cache['soft_alpha'] = mask # uint8 [0, 255] — blended via cv2 SIMD ops
_paste_cache['alpha_size'] = size
return _paste_cache['soft_alpha']
# CUDA graph swap session cache
_cuda_graph_session = {
'session': None,
'io_binding': None,
'ort_input': None,
'ort_latent': None,
'recorded': False,
}
# Serializes CUDA-graph replay. The io_binding + ort_input/ort_latent are
# shared across threads and run_with_iobinding mutates GPU-side buffers;
# concurrent calls would produce wrong output.
_cuda_graph_lock = threading.Lock()
class _CudaGraphSessionAdapter:
"""Drop-in wrapper around an ONNX Runtime session.
Routes ``.run()`` through CUDA graph replay when a recorded graph is
available, and transparently proxies every other attribute to the
underlying session so insightface's INSwapper sees an unchanged API.
"""
def __init__(self, underlying):
# Use object.__setattr__ to bypass our own __setattr__.
object.__setattr__(self, "_underlying", underlying)
def run(self, output_names, input_dict, **kwargs):
if _cuda_graph_session['recorded']:
try:
keys = list(input_dict.keys())
blob = input_dict[keys[0]]
latent = input_dict[keys[1]]
return [_cuda_graph_swap_inference(blob, latent)]
except Exception:
pass
return self._underlying.run(output_names, input_dict, **kwargs)
def __getattr__(self, name):
return getattr(self._underlying, name)
def __setattr__(self, name, value):
setattr(self._underlying, name, value)
def _init_cuda_graph_session(model_path: str, swapper):
"""Create a CUDA-graph-enabled ONNX session for the swap model.
CUDA graphs record the GPU kernel launch sequence once, then replay it
with near-zero CPU overhead on subsequent runs. Requires static input
shapes (inswapper is always 1x3x128x128 + 1x512).
"""
import onnxruntime as ort
try:
providers = [('CUDAExecutionProvider', {'enable_cuda_graph': '1'})]
sess = ort.InferenceSession(model_path, providers=providers)
# Pre-allocate GPU buffers with correct shapes
inp_shape = (1, 3, swapper.input_size[1], swapper.input_size[0])
latent_shape = (1, 512)
dummy_inp = np.zeros(inp_shape, dtype=np.float32)
dummy_lat = np.zeros(latent_shape, dtype=np.float32)
ort_input = ort.OrtValue.ortvalue_from_numpy(dummy_inp, 'cuda', 0)
ort_latent = ort.OrtValue.ortvalue_from_numpy(dummy_lat, 'cuda', 0)
io = sess.io_binding()
io.bind_ortvalue_input(swapper.input_names[0], ort_input)
io.bind_ortvalue_input(swapper.input_names[1], ort_latent)
io.bind_output(swapper.output_names[0], 'cuda', 0)
# First run records the CUDA graph
sess.run_with_iobinding(io)
_cuda_graph_session['session'] = sess
_cuda_graph_session['io_binding'] = io
_cuda_graph_session['ort_input'] = ort_input
_cuda_graph_session['ort_latent'] = ort_latent
_cuda_graph_session['recorded'] = True
# Wrap swapper.session in an adapter instead of rebinding
# session.run. insightface's INSwapper.get() reads .run via the
# session attribute, so either works; the adapter survives any
# later attribute reads on the session and keeps the original
# session object untouched.
if not isinstance(swapper.session, _CudaGraphSessionAdapter):
swapper.session = _CudaGraphSessionAdapter(swapper.session)
import sys
print(f"[{NAME}] CUDA graph session initialized (swap model)")
sys.stdout.flush()
except Exception as e:
print(f"[{NAME}] CUDA graph init failed, using standard session: {e}")
_cuda_graph_session['recorded'] = False
def _cuda_graph_swap_inference(blob: np.ndarray, latent: np.ndarray) -> np.ndarray:
"""Run swap model via CUDA graph replay — minimal CPU overhead."""
cg = _cuda_graph_session
with _cuda_graph_lock:
cg['ort_input'].update_inplace(blob)
cg['ort_latent'].update_inplace(latent)
cg['session'].run_with_iobinding(cg['io_binding'])
return cg['io_binding'].get_outputs()[0].numpy()
def _fast_paste_back(target_img: Frame, bgr_fake: np.ndarray, aimg: np.ndarray, M: np.ndarray) -> Frame:
"""Paste bgr_fake back onto target_img via the inverse affine of M.
Restricts work to the face bbox in output coordinates and warps a
precomputed feathered alpha template per-frame instead of running a
size-scaled erode+blur on the warped mask. Cost is O(crop_area) regardless
of how much of the frame the face occupies.
"""
h, w = target_img.shape[:2]
face_h, face_w = aimg.shape[:2]
# inswapper's aligned-face space is square (128x128). _get_soft_alpha
# caches a single NxN template keyed by N, so fail loudly if that ever
# stops being true rather than silently mis-warping the alpha mask.
assert face_h == face_w, f"Expected square aligned face, got {face_h}x{face_w}"
IM = cv2.invertAffineTransform(M)
# Bbox in output coords from the affine corners of the aligned-face square.
corners = np.array(
[[0, 0], [face_w, 0], [face_w, face_h], [0, face_h]], dtype=np.float32
)
transformed = (IM[:, :2] @ corners.T).T + IM[:, 2]
x1 = int(np.floor(transformed[:, 0].min()))
x2 = int(np.ceil(transformed[:, 0].max()))
y1 = int(np.floor(transformed[:, 1].min()))
y2 = int(np.ceil(transformed[:, 1].max()))
if x1 >= x2 or y1 >= y2:
return target_img
# Small interpolation margin only — the feather is baked into the template.
pad = 2
y1p, y2p = max(0, y1 - pad), min(h, y2 + pad + 1)
x1p, x2p = max(0, x1 - pad), min(w, x2 + pad + 1)
IM_crop = IM.copy()
IM_crop[0, 2] -= x1p
IM_crop[1, 2] -= y1p
crop_w, crop_h = x2p - x1p, y2p - y1p
soft_alpha = _get_soft_alpha(face_h)
bgr_fake_crop = cv2.warpAffine(bgr_fake, IM_crop, (crop_w, crop_h), borderMode=cv2.BORDER_REPLICATE)
alpha_crop = cv2.warpAffine(soft_alpha, IM_crop, (crop_w, crop_h), borderValue=0)
target_crop = target_img[y1p:y2p, x1p:x2p]
if _HAS_TORCH_CUDA:
# Scale alpha to [0, 1] on device — cheaper to upload uint8 than float.
mask_t = torch.from_numpy(alpha_crop).cuda().float().mul_(1.0 / 255.0).unsqueeze(2)
fake_t = torch.from_numpy(bgr_fake_crop).float().cuda()
tgt_t = torch.from_numpy(target_crop).float().cuda()
blended = (mask_t * fake_t + (1.0 - mask_t) * tgt_t).to(torch.uint8).cpu().numpy()
target_img[y1p:y2p, x1p:x2p] = blended
else:
# Fused uint8 blend via cv2 SIMD — no float32 round-trip.
# Measured ~7-8× faster than the old numpy float32 path on a 1000×1000 crop.
alpha_3c = cv2.merge([alpha_crop, alpha_crop, alpha_crop])
inv_alpha = 255 - alpha_3c
a_fake = cv2.multiply(bgr_fake_crop, alpha_3c, scale=1.0 / 255.0)
a_tgt = cv2.multiply(target_crop, inv_alpha, scale=1.0 / 255.0)
target_img[y1p:y2p, x1p:x2p] = cv2.add(a_fake, a_tgt)
return target_img
def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
"""Optimized face swapping with better memory management and performance."""
face_swapper = get_face_swapper()
if face_swapper is None:
update_status("Face swapper model not loaded or failed to load. Skipping swap.", NAME)
return temp_frame
# Safety check for faces
if source_face is None or target_face is None:
return temp_frame
if not hasattr(source_face, 'normed_embedding') or source_face.normed_embedding is None:
return temp_frame
# _fast_paste_back writes in-place on the GPU path. Only copy when
# mouth_mask or opacity < 1 need an unmodified original.
opacity = getattr(modules.globals, "opacity", 1.0)
opacity = max(0.0, min(1.0, opacity))
mouth_mask_enabled = getattr(modules.globals, "mouth_mask", False)
poisson_blend_enabled = getattr(modules.globals, "poisson_blend", False)
# Poisson blend's seamlessClone needs the genuine pre-swap frame as its
# destination. Without this, original_frame aliases temp_frame, which
# _fast_paste_back mutates in place — so seamlessClone would blend the
# swapped face onto the already-swapped frame (no visible effect).
needs_original = opacity < 1.0 or mouth_mask_enabled or poisson_blend_enabled
if needs_original:
original_frame = temp_frame.copy()
else:
original_frame = temp_frame
if temp_frame.dtype != np.uint8:
temp_frame = np.clip(temp_frame, 0, 255).astype(np.uint8)
try:
if not temp_frame.flags['C_CONTIGUOUS']:
temp_frame = np.ascontiguousarray(temp_frame)
# Use paste_back=False and our optimized paste-back
if any("DmlExecutionProvider" in p for p in modules.globals.execution_providers):
with modules.globals.dml_lock:
bgr_fake, M = face_swapper.get(
temp_frame, target_face, source_face, paste_back=False
)
else:
bgr_fake, M = face_swapper.get(
temp_frame, target_face, source_face, paste_back=False
)
if bgr_fake is None:
return original_frame
if not isinstance(bgr_fake, np.ndarray):
return original_frame
# Pass a dummy aimg with correct shape — _fast_paste_back only uses aimg.shape
# to create the white mask. Avoids redundant norm_crop2 (~0.6ms).
_face_size = face_swapper.input_size[0]
_aimg_dummy = np.empty((_face_size, _face_size, 3), dtype=np.uint8)
swapped_frame = _fast_paste_back(temp_frame, bgr_fake, _aimg_dummy, M)
except Exception as e:
print(f"Error during face swap: {e}")
return original_frame
# --- Post-swap Processing (Masking, Opacity, etc.) ---
# Now, work with the guaranteed uint8 'swapped_frame'
if mouth_mask_enabled: # Check if mouth_mask is enabled
# Create a mask for the target face
face_mask = create_face_mask(target_face, original_frame) # Use original_frame for mask creation geometry
# Create the mouth mask using the ORIGINAL frame (before swap) for cutout
mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon = (
create_lower_mouth_mask(target_face, original_frame) # Use original_frame for real mouth cutout
)
# Apply the mouth area only if mouth_cutout exists
if mouth_cutout is not None and mouth_box != (0,0,0,0):
# Apply mouth area (from original) onto the 'swapped_frame'
swapped_frame = apply_mouth_area(
swapped_frame, mouth_cutout, mouth_box, face_mask, lower_lip_polygon
)
# Draw bounding box only while slider is being dragged
if getattr(modules.globals, "show_mouth_mask_box", False):
mouth_mask_data = (mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon)
swapped_frame = draw_mouth_mask_visualization(
swapped_frame, target_face, mouth_mask_data
)
# --- Poisson Blending ---
# Mask derived from the swap's own affine (M) + swapped pixels (bgr_fake),
# so it tracks the swapped face exactly per-frame — no landmark jitter,
# no EMA, no lag. See _apply_poisson_blend.
if getattr(modules.globals, "poisson_blend", False):
swapped_frame = _apply_poisson_blend(
swapped_frame, original_frame, target_face, M, bgr_fake
)
# Apply opacity blend between the original frame and the swapped frame
if opacity >= 1.0:
return swapped_frame.astype(np.uint8)
# Blend the original_frame with the (potentially mouth-masked) swapped_frame
final_swapped_frame = gpu_add_weighted(original_frame.astype(np.uint8), 1 - opacity, swapped_frame.astype(np.uint8), opacity, 0)
return final_swapped_frame.astype(np.uint8)
# --- START: Mac M1-M5 Optimized Face Detection ---
def get_faces_optimized(frame: Frame, use_cache: bool = True) -> Optional[List[Face]]:
"""Optimized face detection for live mode on Apple Silicon"""
global LAST_DETECTION_TIME, FACE_DETECTION_CACHE
if not use_cache or not IS_APPLE_SILICON:
# Standard detection
if modules.globals.many_faces:
return get_many_faces(frame)
else:
face = get_one_face(frame)
return [face] if face else None
# Adaptive detection rate for live mode
current_time = time.time()
time_since_last = current_time - LAST_DETECTION_TIME
# Skip detection if too soon (adaptive frame skipping)
if time_since_last < DETECTION_INTERVAL and FACE_DETECTION_CACHE:
return FACE_DETECTION_CACHE.get('faces')
# Perform detection
LAST_DETECTION_TIME = current_time
if modules.globals.many_faces:
faces = get_many_faces(frame)
else:
face = get_one_face(frame)
faces = [face] if face else None
# Cache results
FACE_DETECTION_CACHE['faces'] = faces
FACE_DETECTION_CACHE['timestamp'] = current_time
return faces
# --- END: Mac M1-M5 Optimized Face Detection ---
# --- START: Helper function for interpolation and sharpening ---
def apply_post_processing(current_frame: Frame, swapped_face_bboxes: List[np.ndarray]) -> Frame:
"""Applies sharpening and interpolation with Apple Silicon optimizations."""
global PREVIOUS_FRAME_RESULT
sharpness_value = getattr(modules.globals, "sharpness", 0.0)
enable_interpolation = getattr(modules.globals, "enable_interpolation", False)
# Skip copy when no post-processing is active
if sharpness_value <= 0.0 and not enable_interpolation:
PREVIOUS_FRAME_RESULT = None
return current_frame
processed_frame = current_frame.copy()
# 1. Apply Sharpening (if enabled) with optimized kernel for Apple Silicon
sharpness_value = getattr(modules.globals, "sharpness", 0.0)
if sharpness_value > 0.0 and swapped_face_bboxes:
height, width = processed_frame.shape[:2]
for bbox in swapped_face_bboxes:
# Ensure bbox is iterable and has 4 elements
if not hasattr(bbox, '__iter__') or len(bbox) != 4:
# print(f"Warning: Invalid bbox format for sharpening: {bbox}") # Debug
continue
x1, y1, x2, y2 = bbox
# Ensure coordinates are integers and within bounds
try:
x1, y1 = max(0, int(x1)), max(0, int(y1))
x2, y2 = min(width, int(x2)), min(height, int(y2))
except ValueError:
# print(f"Warning: Could not convert bbox coordinates to int: {bbox}") # Debug
continue
if x2 <= x1 or y2 <= y1:
continue
face_region = processed_frame[y1:y2, x1:x2]
if face_region.size == 0: continue
# Apply sharpening (GPU-accelerated when CUDA OpenCV is available)
try:
sigma = 2 if IS_APPLE_SILICON else 3
sharpened_region = gpu_sharpen(face_region, strength=sharpness_value, sigma=sigma)
processed_frame[y1:y2, x1:x2] = sharpened_region
except cv2.error:
pass
# 2. Apply Interpolation (if enabled)
enable_interpolation = getattr(modules.globals, "enable_interpolation", False)
interpolation_weight = getattr(modules.globals, "interpolation_weight", 0.2)
final_frame = processed_frame # Start with the current (potentially sharpened) frame
if enable_interpolation and 0 < interpolation_weight < 1:
if PREVIOUS_FRAME_RESULT is not None and PREVIOUS_FRAME_RESULT.shape == processed_frame.shape and PREVIOUS_FRAME_RESULT.dtype == processed_frame.dtype:
# Perform interpolation
try:
final_frame = gpu_add_weighted(
PREVIOUS_FRAME_RESULT, 1.0 - interpolation_weight,
processed_frame, interpolation_weight,
0
)
# Ensure final frame is uint8
final_frame = np.clip(final_frame, 0, 255).astype(np.uint8)
except cv2.error as interp_e:
# print(f"Warning: OpenCV error during interpolation: {interp_e}") # Debug
final_frame = processed_frame # Use current frame if interpolation fails
PREVIOUS_FRAME_RESULT = None # Reset state if error occurs
# Update the state for the next frame *with the interpolated result*
PREVIOUS_FRAME_RESULT = final_frame.copy()
else:
# If previous frame invalid or doesn't match, use current frame and update state
if PREVIOUS_FRAME_RESULT is not None and PREVIOUS_FRAME_RESULT.shape != processed_frame.shape:
# print("Info: Frame shape changed, resetting interpolation state.") # Debug
pass
PREVIOUS_FRAME_RESULT = processed_frame.copy()
else:
# Interpolation is off or weight is invalid — no need to cache
PREVIOUS_FRAME_RESULT = None
return final_frame
# --- END: Helper function for interpolation and sharpening ---
def process_frame(source_face: Face, temp_frame: Frame, target_face: Face = None) -> Frame:
"""Process a single frame, swapping source_face onto detected target(s).
Args:
target_face: Pre-detected target face. When provided, skips the
internal face detection call (saves ~30-40ms per frame).
Ignored when many_faces mode is active.
"""
if getattr(modules.globals, "opacity", 1.0) == 0:
global PREVIOUS_FRAME_RESULT
PREVIOUS_FRAME_RESULT = None
return temp_frame
processed_frame = temp_frame
swapped_face_bboxes = []
if modules.globals.many_faces:
many_faces = get_many_faces(processed_frame)
if many_faces:
current_swap_target = processed_frame.copy()
for face in many_faces:
current_swap_target = swap_face(source_face, face, current_swap_target)
if face is not None and hasattr(face, "bbox") and face.bbox is not None:
swapped_face_bboxes.append(face.bbox.astype(int))
processed_frame = current_swap_target
else:
if target_face is None:
target_face = get_one_face(processed_frame)
if target_face:
processed_frame = swap_face(source_face, target_face, processed_frame)
if hasattr(target_face, "bbox") and target_face.bbox is not None:
swapped_face_bboxes.append(target_face.bbox.astype(int))
final_frame = apply_post_processing(processed_frame, swapped_face_bboxes)
return final_frame
def process_frame_v2(temp_frame: Frame, temp_frame_path: str = "") -> Frame:
"""Handles complex mapping scenarios (map_faces=True) and live streams."""
if getattr(modules.globals, "opacity", 1.0) == 0:
# If opacity is 0, no swap happens, so no post-processing needed.
# Also reset interpolation state if it was active.
global PREVIOUS_FRAME_RESULT
PREVIOUS_FRAME_RESULT = None
return temp_frame
processed_frame = temp_frame # Start with the input frame
swapped_face_bboxes = [] # Keep track of where swaps happened
# Determine source/target pairs based on mode
source_target_pairs = []
# Ensure maps exist before accessing them
source_target_map = getattr(modules.globals, "source_target_map", None)
simple_map = getattr(modules.globals, "simple_map", None)
# Check if target is a file path (image or video) or live stream
is_file_target = modules.globals.target_path and (is_image(modules.globals.target_path) or is_video(modules.globals.target_path))
if is_file_target:
# Processing specific image or video file with pre-analyzed maps
if source_target_map:
if modules.globals.many_faces:
source_face = default_source_face() # Use default source for all targets
if source_face:
for map_data in source_target_map:
if is_image(modules.globals.target_path):
target_info = map_data.get("target", {})
if target_info: # Check if target info exists
target_face = target_info.get("face")
if target_face:
source_target_pairs.append((source_face, target_face))
elif is_video(modules.globals.target_path):
# Find faces for the current frame_path in video map
target_frames_data = map_data.get("target_faces_in_frame", [])
if target_frames_data: # Check if frame data exists
target_frames = [f for f in target_frames_data if f and f.get("location") == temp_frame_path]
for frame_data in target_frames:
faces_in_frame = frame_data.get("faces", [])
if faces_in_frame: # Check if faces exist
for target_face in faces_in_frame:
source_target_pairs.append((source_face, target_face))
else: # Single face or specific mapping
for map_data in source_target_map:
source_info = map_data.get("source", {})
if not source_info: continue # Skip if no source info
source_face = source_info.get("face")
if not source_face: continue # Skip if no source defined for this map entry
if is_image(modules.globals.target_path):
target_info = map_data.get("target", {})
if target_info:
target_face = target_info.get("face")
if target_face:
source_target_pairs.append((source_face, target_face))
elif is_video(modules.globals.target_path):
target_frames_data = map_data.get("target_faces_in_frame", [])
if target_frames_data:
target_frames = [f for f in target_frames_data if f and f.get("location") == temp_frame_path]
for frame_data in target_frames:
faces_in_frame = frame_data.get("faces", [])
if faces_in_frame:
for target_face in faces_in_frame:
source_target_pairs.append((source_face, target_face))
else:
# Live stream or webcam processing (analyze faces on the fly)
detected_faces = get_many_faces(processed_frame)
if detected_faces:
if modules.globals.many_faces:
source_face = default_source_face() # Use default source for all detected targets
if source_face:
for target_face in detected_faces:
source_target_pairs.append((source_face, target_face))
elif simple_map:
# Use simple_map (source_faces <-> target_embeddings)
source_faces = simple_map.get("source_faces", [])
target_embeddings = simple_map.get("target_embeddings", [])
if source_faces and target_embeddings and len(source_faces) == len(target_embeddings):
# Match detected faces to the closest target embedding
if len(detected_faces) <= len(target_embeddings):
# More targets defined than detected - match each detected face
for detected_face in detected_faces:
if detected_face.normed_embedding is None: continue
closest_idx, _ = find_closest_centroid(target_embeddings, detected_face.normed_embedding)
if 0 <= closest_idx < len(source_faces):
source_target_pairs.append((source_faces[closest_idx], detected_face))
else:
# More faces detected than targets defined - match each target embedding to closest detected face
detected_embeddings = [f.normed_embedding for f in detected_faces if f.normed_embedding is not None]
detected_faces_with_embedding = [f for f in detected_faces if f.normed_embedding is not None]
if not detected_embeddings: return processed_frame # No embeddings to match
for i, target_embedding in enumerate(target_embeddings):
if 0 <= i < len(source_faces): # Ensure source face exists for this embedding
closest_idx, _ = find_closest_centroid(detected_embeddings, target_embedding)
if 0 <= closest_idx < len(detected_faces_with_embedding):
source_target_pairs.append((source_faces[i], detected_faces_with_embedding[closest_idx]))
else: # Fallback: if no map, use default source for the single detected face (if any)
source_face = default_source_face()
target_face = get_one_face(processed_frame, detected_faces) # Use faces already detected
if source_face and target_face:
source_target_pairs.append((source_face, target_face))
# Perform swaps based on the collected pairs
current_swap_target = processed_frame.copy() # Apply swaps sequentially
for source_face, target_face in source_target_pairs:
if source_face and target_face:
current_swap_target = swap_face(source_face, target_face, current_swap_target)
if target_face is not None and hasattr(target_face, "bbox") and target_face.bbox is not None:
swapped_face_bboxes.append(target_face.bbox.astype(int))
processed_frame = current_swap_target # Assign final result
# Apply sharpening and interpolation
final_frame = apply_post_processing(processed_frame, swapped_face_bboxes)
return final_frame
def process_frames(
source_path: str, temp_frame_paths: List[str], progress: Any = None
) -> None:
"""
Processes a list of frame paths (typically for video).
Optimized with better memory management and caching.
Iterates through frames, applies the appropriate swapping logic based on globals,
and saves the result back to the frame path. Handles multi-threading via caller.
"""
# Determine which processing function to use based on map_faces global setting
use_v2 = getattr(modules.globals, "map_faces", False)
source_face = None # Initialize source_face
# --- Pre-load source face only if needed (Simple Mode: map_faces=False) ---
if not use_v2:
if not source_path or not os.path.exists(source_path):
update_status(f"Error: Source path invalid or not provided for simple mode: {source_path}", NAME)
# Log the error but allow proceeding; subsequent check will stop processing.
else:
try:
source_img = cv2.imread(source_path)
if source_img is None:
# Specific error for file reading failure
update_status(f"Error reading source image file {source_path}. Please check the path and file integrity.", NAME)
else:
source_face = get_one_face(source_img)
if source_face is None:
# Specific message for no face detected after successful read
update_status(f"Warning: Successfully read source image {source_path}, but no face was detected. Swaps will be skipped.", NAME)
# Free memory immediately after extracting face
del source_img
except Exception as e:
# Print the specific exception caught
import traceback
print(f"{NAME}: Caught exception during source image processing for {source_path}:")
traceback.print_exc() # Print the full traceback
update_status(f"Error during source image reading or analysis {source_path}: {e}", NAME)
# Log general exception during the process
total_frames = len(temp_frame_paths)
# update_status(f"Processing {total_frames} frames. Use V2 (map_faces): {use_v2}", NAME) # Optional Debug
# --- Stop processing entirely if in Simple Mode and source face is invalid ---
if not use_v2 and source_face is None:
update_status(f"Halting video processing: Invalid or no face detected in source image for simple mode.", NAME)
if progress:
# Ensure the progress bar completes if it was started
remaining_updates = total_frames - progress.n if hasattr(progress, 'n') else total_frames
if remaining_updates > 0:
progress.update(remaining_updates)
return # Exit the function entirely
# --- Process each frame path provided in the list ---
# Note: In the current core.py multi_process_frame, temp_frame_paths will usually contain only ONE path per call.
for i, temp_frame_path in enumerate(temp_frame_paths):
# update_status(f"Processing frame {i+1}/{total_frames}: {os.path.basename(temp_frame_path)}", NAME) # Optional Debug
# Read the target frame
temp_frame = None
try:
temp_frame = cv2.imread(temp_frame_path)
if temp_frame is None:
print(f"{NAME}: Error: Could not read frame: {temp_frame_path}, skipping.")
if progress: progress.update(1)
continue # Skip this frame if read fails
except Exception as read_e:
print(f"{NAME}: Error reading frame {temp_frame_path}: {read_e}, skipping.")
if progress: progress.update(1)
continue
# Select processing function and execute
result_frame = None
try:
if use_v2:
# V2 uses global maps and needs the frame path for lookup in video mode
# update_status(f"Using process_frame_v2 for: {os.path.basename(temp_frame_path)}", NAME) # Optional Debug
result_frame = process_frame_v2(temp_frame, temp_frame_path)
else:
# Simple mode uses the pre-loaded source_face (already checked for validity above)
# update_status(f"Using process_frame (simple) for: {os.path.basename(temp_frame_path)}", NAME) # Optional Debug
result_frame = process_frame(source_face, temp_frame) # source_face is guaranteed to be valid here
# Check if processing actually returned a frame
if result_frame is None:
print(f"{NAME}: Warning: Processing returned None for frame {temp_frame_path}. Using original.")
result_frame = temp_frame
except Exception as proc_e:
print(f"{NAME}: Error processing frame {temp_frame_path}: {proc_e}")
# import traceback # Optional for detailed debugging
# traceback.print_exc()
result_frame = temp_frame # Use original frame on processing error
# Write the result back to the same frame path with optimized compression
try:
# Use PNG compression level 3 (faster) instead of default 9
write_success = cv2.imwrite(temp_frame_path, result_frame, [cv2.IMWRITE_PNG_COMPRESSION, 3])
if not write_success:
print(f"{NAME}: Error: Failed to write processed frame to {temp_frame_path}")
except Exception as write_e:
print(f"{NAME}: Error writing frame {temp_frame_path}: {write_e}")
# Free memory immediately after processing
del temp_frame
if result_frame is not None:
del result_frame
# Update progress bar
if progress:
progress.update(1)
# else: # Basic console progress (optional)
# if (i + 1) % 10 == 0 or (i + 1) == total_frames: # Update every 10 frames or on last frame
# update_status(f"Processed frame {i+1}/{total_frames}", NAME)
def process_image(source_path: str, target_path: str, output_path: str) -> None:
"""Processes a single target image."""
# --- Reset interpolation state for single image processing ---
global PREVIOUS_FRAME_RESULT
PREVIOUS_FRAME_RESULT = None
# ---
use_v2 = getattr(modules.globals, "map_faces", False)
# Read target first
try:
target_frame = cv2.imread(target_path)
if target_frame is None:
update_status(f"Error: Could not read target image: {target_path}", NAME)
return
except Exception as read_e:
update_status(f"Error reading target image {target_path}: {read_e}", NAME)
return
result = None
try:
if use_v2:
if getattr(modules.globals, "many_faces", False):
update_status("Processing image with 'map_faces' and 'many_faces'. Using pre-analysis map.", NAME)
# V2 processes based on global maps, doesn't need source_path here directly
# Assumes maps are pre-populated. Pass target_path for map lookup.
result = process_frame_v2(target_frame, target_path)
else: # Simple mode
try:
source_img = cv2.imread(source_path)
if source_img is None:
update_status(f"Error: Could not read source image: {source_path}", NAME)
return
source_face = get_one_face(source_img)
if not source_face:
update_status(f"Error: No face found in source image: {source_path}", NAME)
return
except Exception as src_e:
update_status(f"Error reading or analyzing source image {source_path}: {src_e}", NAME)
return
result = process_frame(source_face, target_frame)
# Write the result if processing was successful
if result is not None:
write_success = cv2.imwrite(output_path, result)
if write_success:
update_status(f"Output image saved to: {output_path}", NAME)
else:
update_status(f"Error: Failed to write output image to {output_path}", NAME)
else:
# This case might occur if process_frame/v2 returns None unexpectedly
update_status("Image processing failed (result was None).", NAME)
except Exception as proc_e:
update_status(f"Error during image processing: {proc_e}", NAME)
# import traceback
# traceback.print_exc()
def process_video(source_path: str, temp_frame_paths: List[str]) -> None:
"""Sets up and calls the frame processing for video."""
# --- Reset interpolation state before starting video processing ---
global PREVIOUS_FRAME_RESULT
PREVIOUS_FRAME_RESULT = None
# ---
mode_desc = "'map_faces'" if getattr(modules.globals, "map_faces", False) else "'simple'"
if getattr(modules.globals, "map_faces", False) and getattr(modules.globals, "many_faces", False):
mode_desc += " and 'many_faces'. Using pre-analysis map."
update_status(f"Processing video with {mode_desc} mode.", NAME)
# Pass the correct source_path (needed for simple mode in process_frames)
# The core processing logic handles calling the right frame function (process_frames)
modules.processors.frame.core.process_video(
source_path, temp_frame_paths, process_frames # Pass the newly modified process_frames
)
# ==========================
# MASKING FUNCTIONS (Mostly unchanged, added safety checks and minor improvements)
# ==========================
def create_lower_mouth_mask(
face: Face, frame: Frame
) -> (np.ndarray, np.ndarray, tuple, np.ndarray):
mask = np.zeros(frame.shape[:2], dtype=np.uint8)
mouth_cutout = None
lower_lip_polygon = None # Initialize
mouth_box = (0,0,0,0) # Initialize
# Validate face and landmarks
if face is None or not hasattr(face, 'landmark_2d_106'):
# print("Warning: Invalid face object passed to create_lower_mouth_mask.")
return mask, mouth_cutout, mouth_box, lower_lip_polygon
landmarks = face.landmark_2d_106
# Check landmark validity
if landmarks is None or not isinstance(landmarks, np.ndarray) or landmarks.shape[0] < 106:
# print("Warning: Invalid or insufficient landmarks for mouth mask.")
return mask, mouth_cutout, mouth_box, lower_lip_polygon
try: # Wrap main logic in try-except
# Outer mouth/lip landmarks (52-63) — the lip outline only. In this
# repo's insightface 2d106 convention these 12 points, taken in index
# order, form a SIMPLE (non-self-intersecting) closed polygon that
# cv2.fillPoly fills as one solid region directly over the mouth.
# This is the last shipped, known-good landmark set; range(52,72)
# (the regression) added the inner-lip points and made the path
# self-intersect, and the ancient [65,66,62,...,0,8,7...] indices
# belong to a different/older landmark convention (they land on the
# inner lip + random jaw points, so the mask never covers the mouth).
lower_lip_order = list(range(52, 64))
# All indices must be valid for the loaded landmark set
if max(lower_lip_order) >= landmarks.shape[0]:
# print(f"Warning: Landmark index out of bounds for shape {landmarks.shape[0]}.")
return mask, mouth_cutout, mouth_box, lower_lip_polygon
lower_lip_landmarks = landmarks[lower_lip_order].astype(np.float32)
# Filter out potential NaN or Inf values in landmarks
if not np.all(np.isfinite(lower_lip_landmarks)):
# print("Warning: Non-finite values detected in lower lip landmarks.")
return mask, mouth_cutout, mouth_box, lower_lip_polygon
center = np.mean(lower_lip_landmarks, axis=0)
if not np.all(np.isfinite(center)): # Check center calculation
# print("Warning: Could not calculate valid center for mouth mask.")
return mask, mouth_cutout, mouth_box, lower_lip_polygon
# Drive expansion from the Mouth Mask slider so it actually responds.
# The known-good version expanded by the now-unused mask_down_size
# constant, which is why the slider had no effect.
# s: 0.0 (slider ~0, tight lip outline) -> 1.0 (slider 100, mouth->chin).
mouth_mask_size = getattr(modules.globals, "mouth_mask_size", 0.0) # 0-100 slider
s = max(0.0, min(1.0, mouth_mask_size / 100.0))
# Uniformly scaling a simple polygon about its centroid keeps it simple
# (no self-intersection). x grows with expansion_factor; points below
# centre (toward the chin) also get an extra downward stretch so high
# slider values reach from the mouth down to the chin.
expansion_factor = 1.0 + s * 2.0 # 1.0x -> 3.0x
chin_bias = 1.0 + s * 2.0 # extra downward stretch
offsets = lower_lip_landmarks - center
scale_y = np.where(offsets[:, 1] > 0,
expansion_factor * chin_bias, expansion_factor)
expanded_landmarks = lower_lip_landmarks.copy()
expanded_landmarks[:, 0] = center[0] + offsets[:, 0] * expansion_factor
expanded_landmarks[:, 1] = center[1] + offsets[:, 1] * scale_y
# Ensure landmarks are finite after adjustments
if not np.all(np.isfinite(expanded_landmarks)):
# print("Warning: Non-finite values detected after expanding landmarks.")
return mask, mouth_cutout, mouth_box, lower_lip_polygon
expanded_landmarks = expanded_landmarks.astype(np.int32)
min_x, min_y = np.min(expanded_landmarks, axis=0)
max_x, max_y = np.max(expanded_landmarks, axis=0)
# Add padding *after* initial min/max calculation
padding_ratio = 0.1 # Percentage padding
padding_x = int((max_x - min_x) * padding_ratio)
padding_y = int((max_y - min_y) * padding_ratio) # Use y-range for y-padding
# Apply padding and clamp to frame boundaries
frame_h, frame_w = frame.shape[:2]
min_x = max(0, min_x - padding_x)
min_y = max(0, min_y - padding_y)
max_x = min(frame_w, max_x + padding_x)
max_y = min(frame_h, max_y + padding_y)
if max_x > min_x and max_y > min_y:
# Create the mask ROI
mask_roi_h = max_y - min_y
mask_roi_w = max_x - min_x
mask_roi = np.zeros((mask_roi_h, mask_roi_w), dtype=np.uint8)
# Shift polygon coordinates relative to the ROI's top-left corner
polygon_relative_to_roi = expanded_landmarks - [min_x, min_y]
# Draw polygon on the ROI mask
cv2.fillPoly(mask_roi, [polygon_relative_to_roi], 255)
# Apply Gaussian blur (GPU-accelerated when available)
blur_k_size = getattr(modules.globals, "mask_blur_kernel", 15) # Default 15
blur_k_size = max(1, blur_k_size // 2 * 2 + 1) # Ensure odd
mask_roi = gpu_gaussian_blur(mask_roi, (blur_k_size, blur_k_size), 0)
# Place the mask ROI in the full-sized mask
mask[min_y:max_y, min_x:max_x] = mask_roi
# Extract the masked area from the *original* frame
mouth_cutout = frame[min_y:max_y, min_x:max_x].copy()
lower_lip_polygon = expanded_landmarks # Return polygon in original frame coords
mouth_box = (min_x, min_y, max_x, max_y) # Return the calculated box
else:
# print("Warning: Invalid mouth mask bounding box after padding/clamping.") # Optional debug
pass
except IndexError as idx_e:
# print(f"Warning: Landmark index out of bounds during mouth mask creation: {idx_e}") # Optional debug
pass
except Exception as e:
print(f"Error in create_lower_mouth_mask: {e}") # Print unexpected errors
# import traceback
# traceback.print_exc()
pass
# Return values, ensuring defaults if errors occurred
return mask, mouth_cutout, mouth_box, lower_lip_polygon
def draw_mouth_mask_visualization(
frame: Frame, face: Face, mouth_mask_data: tuple
) -> Frame:
# Validate inputs
if frame is None or face is None or mouth_mask_data is None or len(mouth_mask_data) != 4:
return frame # Return original frame if inputs are invalid
mask, mouth_cutout, box, lower_lip_polygon = mouth_mask_data
(min_x, min_y, max_x, max_y) = box
# Check if polygon is valid for drawing
if lower_lip_polygon is None or not isinstance(lower_lip_polygon, np.ndarray) or len(lower_lip_polygon) < 3:
return frame # Cannot draw without a valid polygon
vis_frame = frame.copy()
height, width = vis_frame.shape[:2]
# Ensure box coordinates are valid integers within frame bounds
try:
min_x, min_y = max(0, int(min_x)), max(0, int(min_y))
max_x, max_y = min(width, int(max_x)), min(height, int(max_y))
except ValueError:
# print("Warning: Invalid coordinates for mask visualization box.")
return frame
if max_x <= min_x or max_y <= min_y:
return frame # Invalid box
# Draw the lower lip polygon (green outline)
try:
# Ensure polygon points are within frame boundaries before drawing
safe_polygon = lower_lip_polygon.copy()
safe_polygon[:, 0] = np.clip(safe_polygon[:, 0], 0, width - 1)
safe_polygon[:, 1] = np.clip(safe_polygon[:, 1], 0, height - 1)
cv2.polylines(vis_frame, [safe_polygon.astype(np.int32)], isClosed=True, color=(0, 255, 0), thickness=2)
except Exception as e:
print(f"Error drawing polygon for visualization: {e}") # Optional debug
pass
# Draw bounding box (red rectangle)
cv2.rectangle(vis_frame, (min_x, min_y), (max_x, max_y), (0, 0, 255), 2)
# Optional: Add labels
label_pos_y = min_y - 10 if min_y > 20 else max_y + 15 # Adjust position based on box location
label_pos_x = min_x
try:
cv2.putText(vis_frame, "Mouth Mask", (label_pos_x, label_pos_y),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1, cv2.LINE_AA)
except Exception as e:
# print(f"Error drawing text for visualization: {e}") # Optional debug
pass
return vis_frame
def apply_mouth_area(
frame: np.ndarray,
mouth_cutout: np.ndarray,
mouth_box: tuple,
face_mask: np.ndarray, # Full face mask (for blending edges)
mouth_polygon: np.ndarray, # Specific polygon for the mouth area itself
) -> np.ndarray:
# Basic validation
if (frame is None or mouth_cutout is None or mouth_box is None or
face_mask is None or mouth_polygon is None):
# print("Warning: Invalid input (None value) to apply_mouth_area") # Optional debug
return frame
if (mouth_cutout.size == 0 or face_mask.size == 0 or len(mouth_polygon) < 3):
# print("Warning: Invalid input (empty array/polygon) to apply_mouth_area") # Optional debug
return frame
try: # Wrap main logic in try-except
min_x, min_y, max_x, max_y = map(int, mouth_box) # Ensure integer coords
box_width = max_x - min_x
box_height = max_y - min_y
# Check box validity
if box_width <= 0 or box_height <= 0:
# print("Warning: Invalid mouth box dimensions in apply_mouth_area.")
return frame
# Define the Region of Interest (ROI) on the target frame (swapped frame)
frame_h, frame_w = frame.shape[:2]
# Clamp coordinates strictly within frame boundaries
min_y, max_y = max(0, min_y), min(frame_h, max_y)
min_x, max_x = max(0, min_x), min(frame_w, max_x)
# Recalculate box dimensions based on clamped coords
box_width = max_x - min_x
box_height = max_y - min_y
if box_width <= 0 or box_height <= 0:
# print("Warning: ROI became invalid after clamping in apply_mouth_area.")
return frame # ROI is invalid
roi = frame[min_y:max_y, min_x:max_x]
# Ensure ROI extraction was successful
if roi.size == 0:
# print("Warning: Extracted ROI is empty in apply_mouth_area.")
return frame
# Resize mouth cutout from original frame to fit the ROI size
resized_mouth_cutout = None
if roi.shape[:2] != mouth_cutout.shape[:2]:
# Check if mouth_cutout has valid dimensions before resizing
if mouth_cutout.shape[0] > 0 and mouth_cutout.shape[1] > 0:
resized_mouth_cutout = gpu_resize(mouth_cutout, (box_width, box_height), interpolation=cv2.INTER_LINEAR)
else:
# print("Warning: mouth_cutout has invalid dimensions, cannot resize.")
return frame # Cannot proceed without valid cutout
else:
resized_mouth_cutout = mouth_cutout
# If resize failed or original was invalid
if resized_mouth_cutout is None or resized_mouth_cutout.size == 0:
# print("Warning: Mouth cutout is invalid after resize attempt.")
return frame
# --- Mask Creation ---
# Create a mask based on the mouth_polygon, relative to the ROI
polygon_mask_roi = np.zeros(roi.shape[:2], dtype=np.uint8)
adjusted_polygon = mouth_polygon - [min_x, min_y]
cv2.fillPoly(polygon_mask_roi, [adjusted_polygon.astype(np.int32)], 255)
# Feather the edges with Gaussian blur for smooth blending
feather_amount = max(1, min(30, min(box_width, box_height) // 8))
kernel_size = 2 * feather_amount + 1
feathered_mask = cv2.GaussianBlur(polygon_mask_roi.astype(np.float32), (kernel_size, kernel_size), 0)
# Normalize to [0.0, 1.0]
max_val = feathered_mask.max()
if max_val > 1e-6:
feathered_mask = feathered_mask / max_val
else:
feathered_mask.fill(0.0)
# --- Blending: paste original mouth onto swapped face ---
if len(frame.shape) == 3 and frame.shape[2] == 3:
mask_3ch = feathered_mask[:, :, np.newaxis].astype(np.float32)
inv_mask = 1.0 - mask_3ch
# Blend: (original_mouth * mask) + (swapped_face * (1 - mask))
blended_roi = (resized_mouth_cutout.astype(np.float32) * mask_3ch +
roi.astype(np.float32) * inv_mask)
frame[min_y:max_y, min_x:max_x] = np.clip(blended_roi, 0, 255).astype(np.uint8)
except Exception as e:
print(f"Error applying mouth area: {e}") # Optional debug
# import traceback
# traceback.print_exc()
pass # Don't crash, just return the frame as is
return frame
def create_face_mask(face: Face, frame: Frame) -> np.ndarray:
"""Creates a feathered mask covering the whole face area based on landmarks."""
if frame is None or not hasattr(frame, "shape") or len(frame.shape) < 2:
return np.zeros((0, 0), dtype=np.uint8)
mask = np.zeros(frame.shape[:2], dtype=np.uint8) # Start with uint8
# Validate inputs
if face is None or not hasattr(face, 'landmark_2d_106'):
# print("Warning: Invalid face or frame for create_face_mask.")
return mask # Return empty mask
landmarks = face.landmark_2d_106
if landmarks is None or not isinstance(landmarks, np.ndarray) or landmarks.shape[0] < 106:
# print("Warning: Invalid or insufficient landmarks for face mask.")
return mask # Return empty mask
try: # Wrap main logic in try-except
# Filter out non-finite landmark values
if not np.all(np.isfinite(landmarks)):
# print("Warning: Non-finite values detected in landmarks for face mask.")
return mask
landmarks_int = landmarks.astype(np.int32)
# Use standard face outline landmarks (0-32)
# Use standard face outline (0-32)
face_outline = landmarks_int[0:33]
# Estimate forehead points to ensure mask covers the whole face (including forehead)
# This is critical for Poisson blending to work correctly on the forehead
eyebrows = landmarks_int[33:43]
if eyebrows.shape[0] > 0:
chin = landmarks_int[16]
eyebrow_center = np.mean(eyebrows, axis=0)
# Vector from chin to eyebrows (upwards)
up_vector = eyebrow_center - chin
norm = np.linalg.norm(up_vector)
if norm > 0:
up_vector /= norm
# Extend upwards by 1.0 of the chin-to-eyebrow distance (aggressive coverage)
# This ensures the mask covers the entire forehead for proper blending
forehead_offset = up_vector * (norm * 1.0)
# Shift eyebrows up to create forehead points
forehead_points = eyebrows + forehead_offset
# Expand the top points slightly outwards to cover forehead corners
# Calculate the center of the new top points
top_center = np.mean(forehead_points, axis=0)
# Expand outwards by 20%
forehead_points = (forehead_points - top_center) * 1.2 + top_center
# Combine outline and forehead points
face_outline = np.concatenate((face_outline, forehead_points.astype(np.int32)), axis=0)
# Calculate convex hull of these points
# Use try-except as convexHull can fail on degenerate input
try:
hull = cv2.convexHull(face_outline.astype(np.float32)) # Use float for accuracy
if hull is None or len(hull) < 3:
# print("Warning: Convex hull calculation failed or returned too few points.")
# Fallback: use bounding box of landmarks? Or just return empty mask?
return mask
# Draw the filled convex hull on the mask
cv2.fillConvexPoly(mask, hull.astype(np.int32), 255)
except Exception as hull_e:
print(f"Error creating convex hull for face mask: {hull_e}")
return mask # Return empty mask on error
# Apply Gaussian blur to feather the mask edges (GPU-accelerated when available)
blur_k_size = getattr(modules.globals, "face_mask_blur", 31) # Default 31
blur_k_size = max(1, blur_k_size // 2 * 2 + 1) # Ensure odd and positive
mask = gpu_gaussian_blur(mask, (blur_k_size, blur_k_size), 0)
# --- Optional: Return float mask for apply_mouth_area ---
# mask = mask.astype(float) / 255.0
# ---
except IndexError:
# print("Warning: Landmark index out of bounds for face mask.") # Optional debug
pass
except Exception as e:
print(f"Error creating face mask: {e}") # Print unexpected errors
# import traceback
# traceback.print_exc()
pass
return mask # Return uint8 mask
def apply_color_transfer(source, target):
"""
Apply color transfer using LAB color space. Handles potential division by zero and ensures output is uint8.
"""
# Input validation
if source is None or target is None or source.size == 0 or target.size == 0:
# print("Warning: Invalid input to apply_color_transfer.")
return source # Return original source if invalid input
# Ensure images are 3-channel BGR uint8
if len(source.shape) != 3 or source.shape[2] != 3 or source.dtype != np.uint8:
# print("Warning: Source image for color transfer is not uint8 BGR.")
# Attempt conversion if possible, otherwise return original
try:
if len(source.shape) == 2: # Grayscale
source = cv2.cvtColor(source, cv2.COLOR_GRAY2BGR)
source = np.clip(source, 0, 255).astype(np.uint8)
if len(source.shape)!= 3 or source.shape[2]!= 3: raise ValueError("Conversion failed")
except Exception:
return source
if len(target.shape) != 3 or target.shape[2] != 3 or target.dtype != np.uint8:
# print("Warning: Target image for color transfer is not uint8 BGR.")
try:
if len(target.shape) == 2: # Grayscale
target = cv2.cvtColor(target, cv2.COLOR_GRAY2BGR)
target = np.clip(target, 0, 255).astype(np.uint8)
if len(target.shape)!= 3 or target.shape[2]!= 3: raise ValueError("Conversion failed")
except Exception:
return source # Return original source if target invalid
result_bgr = source # Default to original source in case of errors
try:
# Convert to float32 [0, 1] range for LAB conversion
source_float = source.astype(np.float32) / 255.0
target_float = target.astype(np.float32) / 255.0
source_lab = cv2.cvtColor(source_float, cv2.COLOR_BGR2LAB)
target_lab = cv2.cvtColor(target_float, cv2.COLOR_BGR2LAB)
# Compute statistics
source_mean, source_std = cv2.meanStdDev(source_lab)
target_mean, target_std = cv2.meanStdDev(target_lab)
# Reshape for broadcasting
source_mean = source_mean.reshape((1, 1, 3))
source_std = source_std.reshape((1, 1, 3))
target_mean = target_mean.reshape((1, 1, 3))
target_std = target_std.reshape((1, 1, 3))
# Avoid division by zero or very small std deviations (add epsilon)
epsilon = 1e-6
source_std = np.maximum(source_std, epsilon)
# target_std = np.maximum(target_std, epsilon) # Target std can be small
# Perform color transfer in LAB space
result_lab = (source_lab - source_mean) * (target_std / source_std) + target_mean
# --- No explicit clipping needed in LAB space typically ---
# Clipping is handled implicitly by the conversion back to BGR and then to uint8
# Convert back to BGR float [0, 1]
result_bgr_float = cv2.cvtColor(result_lab, cv2.COLOR_LAB2BGR)
# Clip final BGR values to [0, 1] range before scaling to [0, 255]
result_bgr_float = np.clip(result_bgr_float, 0.0, 1.0)
# Convert back to uint8 [0, 255]
result_bgr = (result_bgr_float * 255.0).astype("uint8")
except cv2.error as e:
# print(f"OpenCV error during color transfer: {e}. Returning original source.") # Optional debug
return source # Return original source if conversion fails
except Exception as e:
# print(f"Unexpected color transfer error: {e}. Returning original source.") # Optional debug
# import traceback
# traceback.print_exc()
return source
return result_bgr