mirror of
https://github.com/hacksider/Deep-Live-Cam.git
synced 2026-05-25 15:07:48 +02:00
1562 lines
71 KiB
Python
1562 lines
71 KiB
Python
from typing import Any, List, Optional, Tuple
|
||
import cv2
|
||
import insightface
|
||
import logging
|
||
import threading
|
||
import numpy as np
|
||
import platform
|
||
import modules.globals
|
||
import modules.processors.frame.core
|
||
from modules.core import update_status
|
||
from modules.face_analyser import get_one_face, get_many_faces, default_source_face
|
||
from modules.typing import Face, Frame
|
||
from modules.utilities import (
|
||
conditional_download,
|
||
is_image,
|
||
is_video,
|
||
)
|
||
from modules.cluster_analysis import find_closest_centroid
|
||
from modules.gpu_processing import gpu_gaussian_blur, gpu_sharpen, gpu_add_weighted, gpu_resize, gpu_cvt_color
|
||
import os
|
||
from collections import deque
|
||
import time
|
||
|
||
FACE_SWAPPER = None
|
||
THREAD_LOCK = threading.Lock()
|
||
NAME = "DLC.FACE-SWAPPER"
|
||
|
||
# --- START: Added for Interpolation ---
|
||
PREVIOUS_FRAME_RESULT = None # Stores the final processed frame from the previous step
|
||
# --- END: Added for Interpolation ---
|
||
|
||
# --- Poisson blend (ported from deep-live-cam-gumroad-edition) ---
|
||
# Root-cause fix for the "wobble": the blend mask is NOT built from the
|
||
# independently-detected 106-pt landmarks (they jitter sub-pixel every frame
|
||
# and seamlessClone is hyper-sensitive to its mask boundary). Instead it is
|
||
# derived from the swap's OWN affine transform (M) + the swapped pixels
|
||
# (bgr_fake), so the mask is locked exactly to where the swapped face was
|
||
# placed — no independent jitter source, no EMA, no lag. The mask is cached
|
||
# when the face is nearly still so an identical array is reused (zero wobble).
|
||
_ELLIPTICAL_MASK_CACHE: dict = {}
|
||
_poisson_cached_mask: Optional[np.ndarray] = None
|
||
_poisson_cached_key: Optional[tuple] = None
|
||
|
||
|
||
def _create_elliptical_mask(size: Tuple[int, int]) -> np.ndarray:
|
||
"""Fixed, heavily-blurred elliptical mask in aligned-face space.
|
||
|
||
Geometry-based (not content-adaptive) and cached by size — identical
|
||
every frame for the same model input size, so it contributes no jitter.
|
||
"""
|
||
global _ELLIPTICAL_MASK_CACHE
|
||
if size in _ELLIPTICAL_MASK_CACHE:
|
||
return _ELLIPTICAL_MASK_CACHE[size]
|
||
h, w = size
|
||
center = (w // 2, h // 2)
|
||
axes = (int(w * 0.44), int(h * 0.44))
|
||
mask = np.zeros((h, w), dtype=np.float32)
|
||
cv2.ellipse(mask, center, axes, 0, 0, 360, 1, -1)
|
||
if h * w < 65536:
|
||
mask = cv2.GaussianBlur(mask, (31, 31), 12)
|
||
else:
|
||
mask = gpu_gaussian_blur(mask, (31, 31), 12)
|
||
_ELLIPTICAL_MASK_CACHE[size] = mask
|
||
return mask
|
||
|
||
|
||
def _apply_poisson_blend(swapped_frame: Frame, original_frame: Frame,
|
||
target_face: Face, affine_matrix: np.ndarray = None,
|
||
bgr_fake: np.ndarray = None) -> Frame:
|
||
"""Poisson-blend the swapped face onto the original frame.
|
||
|
||
Preferred path derives the blend mask from the swap's inverse affine so
|
||
it tracks the swapped face exactly per-frame (no landmark jitter, no
|
||
smoothing). Falls back to a cached bbox-ellipse if the affine is absent.
|
||
Writes only the blended ellipse back so other faces are preserved.
|
||
"""
|
||
global _poisson_cached_mask, _poisson_cached_key
|
||
try:
|
||
# ---- Preferred: blend ONLY the genuinely-swapped region ----
|
||
# Use the exact paste-back mask (warped elliptical mask), eroded so
|
||
# the Poisson seam sits on solidly-swapped pixels only.
|
||
if affine_matrix is not None and bgr_fake is not None:
|
||
try:
|
||
h, w = swapped_frame.shape[:2]
|
||
fh, fw = bgr_fake.shape[:2]
|
||
inv = cv2.invertAffineTransform(affine_matrix)
|
||
corners = np.array([[0, 0, 1], [fw, 0, 1], [fw, fh, 1], [0, fh, 1]],
|
||
dtype=np.float32)
|
||
t = corners @ inv.T
|
||
px1 = max(0, int(np.floor(t[:, 0].min())))
|
||
py1 = max(0, int(np.floor(t[:, 1].min())))
|
||
px2 = min(w, int(np.ceil(t[:, 0].max())))
|
||
py2 = min(h, int(np.ceil(t[:, 1].max())))
|
||
rw, rh = px2 - px1, py2 - py1
|
||
if rw > 8 and rh > 8:
|
||
roi_aff = inv.copy()
|
||
roi_aff[0, 2] -= px1
|
||
roi_aff[1, 2] -= py1
|
||
fm = _create_elliptical_mask((fh, fw))
|
||
mroi = cv2.warpAffine(fm, roi_aff, (rw, rh),
|
||
flags=cv2.INTER_LINEAR,
|
||
borderMode=cv2.BORDER_CONSTANT, borderValue=0)
|
||
bin_roi = np.where(mroi > 0.5, np.uint8(255), np.uint8(0))
|
||
k = max(3, (min(rw, rh) // 20) | 1)
|
||
bin_roi = cv2.erode(bin_roi,
|
||
cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (k, k)))
|
||
bx, by, bw, bh = cv2.boundingRect(bin_roi)
|
||
if bw > 0 and bh > 0:
|
||
mx1, my1 = px1 + bx, py1 + by
|
||
mx2, my2 = mx1 + bw - 1, my1 + bh - 1
|
||
# seamlessClone needs the cloned region off the border
|
||
if mx1 > 0 and my1 > 0 and mx2 < w - 1 and my2 < h - 1:
|
||
mask = np.zeros((h, w), dtype=np.uint8)
|
||
mask[py1:py2, px1:px2] = bin_roi
|
||
center = (mx1 + bw // 2, my1 + bh // 2)
|
||
blended = cv2.seamlessClone(swapped_frame, original_frame,
|
||
mask, center, cv2.NORMAL_CLONE)
|
||
np.copyto(swapped_frame[my1:my2 + 1, mx1:mx2 + 1],
|
||
blended[my1:my2 + 1, mx1:mx2 + 1],
|
||
where=mask[my1:my2 + 1, mx1:mx2 + 1, None].astype(bool))
|
||
return swapped_frame
|
||
except Exception:
|
||
pass # fall through to the robust bbox-ellipse path below
|
||
# ---- Fallback: bbox-ellipse (defensive, cached when still) ----
|
||
if not hasattr(target_face, 'bbox') or target_face.bbox is None:
|
||
return swapped_frame
|
||
x1, y1, x2, y2 = target_face.bbox.astype(int)
|
||
h, w = swapped_frame.shape[:2]
|
||
x1, y1 = (max(0, x1), max(0, y1))
|
||
x2, y2 = (min(w, x2), min(h, y2))
|
||
if x2 <= x1 or y2 <= y1 or x2 - x1 <= 10 or (y2 - y1 <= 10):
|
||
return swapped_frame
|
||
padding = int(min(x2 - x1, y2 - y1) * 0.1)
|
||
x1_p = max(0, x1 - padding)
|
||
y1_p = max(0, y1 - padding)
|
||
x2_p = min(w, x2 + padding)
|
||
y2_p = min(h, y2 + padding)
|
||
center_x = int(round((x1 + x2) / 2.0))
|
||
center_y = int(round((y1 + y2) / 2.0))
|
||
radius_x = max(1, int(round((x2_p - x1_p) / 2.0)))
|
||
radius_y = max(1, int(round((y2_p - y1_p) / 2.0)))
|
||
if not (0 <= center_x < w and 0 <= center_y < h):
|
||
return swapped_frame
|
||
center = (center_x, center_y)
|
||
if center_x - radius_x < 0 or center_x + radius_x >= w or center_y - radius_y < 0 or (center_y + radius_y >= h):
|
||
return swapped_frame
|
||
# Reuse cached mask when center/radius unchanged frame-to-frame
|
||
# (face nearly still) — saves the np.zeros + cv2.ellipse, and the
|
||
# identical array means literally zero wobble while still.
|
||
mask_key = (center_x, center_y, radius_x, radius_y, h, w)
|
||
if _poisson_cached_key == mask_key and _poisson_cached_mask is not None:
|
||
mask = _poisson_cached_mask
|
||
else:
|
||
mask = np.zeros((h, w), dtype=np.uint8)
|
||
cv2.ellipse(mask, center, (radius_x, radius_y), 0, 0, 360, 255, -1)
|
||
if np.sum(mask) == 0:
|
||
return swapped_frame
|
||
_poisson_cached_mask = mask
|
||
_poisson_cached_key = mask_key
|
||
blended = cv2.seamlessClone(swapped_frame, original_frame, mask, center, cv2.NORMAL_CLONE)
|
||
# Composite ONLY this face's ellipse back (ROI-bounded) so previously
|
||
# blended faces in multi-face mode are preserved.
|
||
rx0 = max(0, center_x - radius_x)
|
||
rx1 = min(w, center_x + radius_x + 1)
|
||
ry0 = max(0, center_y - radius_y)
|
||
ry1 = min(h, center_y + radius_y + 1)
|
||
roi_mask = mask[ry0:ry1, rx0:rx1]
|
||
np.copyto(swapped_frame[ry0:ry1, rx0:rx1],
|
||
blended[ry0:ry1, rx0:rx1],
|
||
where=roi_mask[:, :, None].astype(bool))
|
||
return swapped_frame
|
||
except Exception:
|
||
return swapped_frame
|
||
|
||
# --- START: Mac M1-M5 Optimizations ---
|
||
IS_APPLE_SILICON = platform.system() == 'Darwin' and platform.machine() == 'arm64'
|
||
FRAME_CACHE = deque(maxlen=3) # Cache for frame reuse
|
||
FACE_DETECTION_CACHE = {} # Cache face detections
|
||
LAST_DETECTION_TIME = 0
|
||
DETECTION_INTERVAL = 0.033 # ~30 FPS detection rate for live mode
|
||
FRAME_SKIP_COUNTER = 0
|
||
ADAPTIVE_QUALITY = True
|
||
# --- END: Mac M1-M5 Optimizations ---
|
||
|
||
abs_dir = os.path.dirname(os.path.abspath(__file__))
|
||
models_dir = os.path.join(
|
||
os.path.dirname(os.path.dirname(os.path.dirname(abs_dir))), "models"
|
||
)
|
||
|
||
def pre_check() -> bool:
|
||
# Use models_dir instead of abs_dir to save to the correct location
|
||
download_directory_path = models_dir
|
||
|
||
# Make sure the models directory exists, catch permission errors if they occur
|
||
try:
|
||
os.makedirs(download_directory_path, exist_ok=True)
|
||
except OSError as e:
|
||
logging.error(f"Failed to create directory {download_directory_path} due to permission error: {e}")
|
||
return False
|
||
|
||
# Use the direct download URL from Hugging Face (FP32 model for broad GPU compatibility)
|
||
conditional_download(
|
||
download_directory_path,
|
||
[
|
||
"https://huggingface.co/hacksider/deep-live-cam/resolve/main/inswapper_128.onnx"
|
||
],
|
||
)
|
||
return True
|
||
|
||
|
||
def pre_start() -> bool:
|
||
# Check for either model variant
|
||
fp16_path = os.path.join(models_dir, "inswapper_128_fp16.onnx")
|
||
fp32_path = os.path.join(models_dir, "inswapper_128.onnx")
|
||
if not os.path.exists(fp16_path) and not os.path.exists(fp32_path):
|
||
update_status(f"Model not found in {models_dir}. Please download inswapper_128.onnx.", NAME)
|
||
return False
|
||
|
||
# Try to get the face swapper to ensure it loads correctly
|
||
if get_face_swapper() is None:
|
||
# Error message already printed within get_face_swapper
|
||
return False
|
||
|
||
return True
|
||
|
||
|
||
def get_face_swapper() -> Any:
|
||
global FACE_SWAPPER
|
||
|
||
with THREAD_LOCK:
|
||
if FACE_SWAPPER is None:
|
||
# Prefer FP16 on GPUs with Tensor Cores (Turing+) — half the
|
||
# memory bandwidth, faster inference. Fall back to FP32 for
|
||
# older GPUs (e.g. GTX 16xx) where FP16 can produce NaN.
|
||
fp32_path = os.path.join(models_dir, "inswapper_128.onnx")
|
||
fp16_path = os.path.join(models_dir, "inswapper_128_fp16.onnx")
|
||
use_fp16 = _HAS_TORCH_CUDA and os.path.exists(fp16_path)
|
||
if use_fp16:
|
||
model_path = fp16_path
|
||
elif os.path.exists(fp32_path):
|
||
model_path = fp32_path
|
||
else:
|
||
update_status(f"No inswapper model found in {models_dir}.", NAME)
|
||
return None
|
||
# On Apple Silicon, rewrite Pad(reflect) → Slice+Concat so
|
||
# CoreML can run the entire model in a single partition on
|
||
# the Neural Engine instead of bouncing between CPU and ANE.
|
||
if IS_APPLE_SILICON:
|
||
from modules.onnx_optimize import optimize_for_coreml
|
||
model_path = optimize_for_coreml(model_path)
|
||
|
||
update_status(f"Loading face swapper model from: {model_path}", NAME)
|
||
try:
|
||
providers_config = []
|
||
for p in modules.globals.execution_providers:
|
||
if p == "CoreMLExecutionProvider" and IS_APPLE_SILICON:
|
||
# Enhanced CoreML configuration for M1-M5
|
||
providers_config.append((
|
||
"CoreMLExecutionProvider",
|
||
{
|
||
"ModelFormat": "MLProgram",
|
||
"MLComputeUnits": "ALL", # Use Neural Engine + GPU + CPU
|
||
"SpecializationStrategy": "FastPrediction",
|
||
"AllowLowPrecisionAccumulationOnGPU": 1,
|
||
"EnableOnSubgraphs": 1,
|
||
}
|
||
))
|
||
elif p == "CUDAExecutionProvider":
|
||
# Use bare provider — ONNX Runtime defaults are
|
||
# fastest on modern GPUs (Blackwell/sm_120).
|
||
providers_config.append(p)
|
||
else:
|
||
providers_config.append(p)
|
||
FACE_SWAPPER = insightface.model_zoo.get_model(
|
||
model_path,
|
||
providers=providers_config,
|
||
)
|
||
# Set up CUDA graph session for faster inference
|
||
if _HAS_TORCH_CUDA and any(
|
||
p == "CUDAExecutionProvider" or
|
||
(isinstance(p, tuple) and p[0] == "CUDAExecutionProvider")
|
||
for p in providers_config
|
||
):
|
||
_init_cuda_graph_session(model_path, FACE_SWAPPER)
|
||
update_status("Face swapper model loaded successfully.", NAME)
|
||
except Exception as e:
|
||
update_status(f"Error loading face swapper model: {e}", NAME)
|
||
FACE_SWAPPER = None
|
||
return None
|
||
return FACE_SWAPPER
|
||
|
||
|
||
_HAS_TORCH_CUDA = False
|
||
try:
|
||
import torch
|
||
if torch.cuda.is_available():
|
||
_HAS_TORCH_CUDA = True
|
||
except ImportError:
|
||
pass
|
||
|
||
# Cache for paste-back
|
||
_paste_cache = {
|
||
'soft_alpha': None, # feathered alpha mask in aligned-face space
|
||
'alpha_size': 0,
|
||
}
|
||
|
||
|
||
def _get_soft_alpha(size: int) -> np.ndarray:
|
||
"""Feathered alpha template in aligned-face space, cached.
|
||
|
||
The legacy paste-back eroded and Gaussian-blurred the warped mask in
|
||
output coordinates with kernels scaled to the output face size, which
|
||
made the per-frame cost quartic in face linear size. Doing the same
|
||
erode+blur once in aligned space and then warping the *soft* mask
|
||
per-frame gives a visually equivalent feather at O(crop_area) cost —
|
||
the feather radius scales naturally with the affine transform.
|
||
"""
|
||
if _paste_cache['alpha_size'] != size:
|
||
# Elliptical (not square) template — matches the gumroad edition's
|
||
# _create_elliptical_mask. A full/eroded square leaves the aligned
|
||
# crop's corners near-opaque, so the swapped square's straight edges
|
||
# show as a visible box on the face. An ellipse (axes 0.44*size) zeroes
|
||
# the corners and the heavy blur feathers smoothly into the original.
|
||
center = (size // 2, size // 2)
|
||
axes = (int(size * 0.44), int(size * 0.44))
|
||
mask = np.zeros((size, size), dtype=np.uint8)
|
||
cv2.ellipse(mask, center, axes, 0, 0, 360, 255, -1)
|
||
mask = cv2.GaussianBlur(mask, (31, 31), 12)
|
||
_paste_cache['soft_alpha'] = mask # uint8 [0, 255] — blended via cv2 SIMD ops
|
||
_paste_cache['alpha_size'] = size
|
||
return _paste_cache['soft_alpha']
|
||
|
||
# CUDA graph swap session cache
|
||
_cuda_graph_session = {
|
||
'session': None,
|
||
'io_binding': None,
|
||
'ort_input': None,
|
||
'ort_latent': None,
|
||
'recorded': False,
|
||
}
|
||
# Serializes CUDA-graph replay. The io_binding + ort_input/ort_latent are
|
||
# shared across threads and run_with_iobinding mutates GPU-side buffers;
|
||
# concurrent calls would produce wrong output.
|
||
_cuda_graph_lock = threading.Lock()
|
||
|
||
|
||
class _CudaGraphSessionAdapter:
|
||
"""Drop-in wrapper around an ONNX Runtime session.
|
||
|
||
Routes ``.run()`` through CUDA graph replay when a recorded graph is
|
||
available, and transparently proxies every other attribute to the
|
||
underlying session so insightface's INSwapper sees an unchanged API.
|
||
"""
|
||
|
||
def __init__(self, underlying):
|
||
# Use object.__setattr__ to bypass our own __setattr__.
|
||
object.__setattr__(self, "_underlying", underlying)
|
||
|
||
def run(self, output_names, input_dict, **kwargs):
|
||
if _cuda_graph_session['recorded']:
|
||
try:
|
||
keys = list(input_dict.keys())
|
||
blob = input_dict[keys[0]]
|
||
latent = input_dict[keys[1]]
|
||
return [_cuda_graph_swap_inference(blob, latent)]
|
||
except Exception:
|
||
pass
|
||
return self._underlying.run(output_names, input_dict, **kwargs)
|
||
|
||
def __getattr__(self, name):
|
||
return getattr(self._underlying, name)
|
||
|
||
def __setattr__(self, name, value):
|
||
setattr(self._underlying, name, value)
|
||
|
||
|
||
def _init_cuda_graph_session(model_path: str, swapper):
|
||
"""Create a CUDA-graph-enabled ONNX session for the swap model.
|
||
|
||
CUDA graphs record the GPU kernel launch sequence once, then replay it
|
||
with near-zero CPU overhead on subsequent runs. Requires static input
|
||
shapes (inswapper is always 1x3x128x128 + 1x512).
|
||
"""
|
||
import onnxruntime as ort
|
||
try:
|
||
providers = [('CUDAExecutionProvider', {'enable_cuda_graph': '1'})]
|
||
sess = ort.InferenceSession(model_path, providers=providers)
|
||
|
||
# Pre-allocate GPU buffers with correct shapes
|
||
inp_shape = (1, 3, swapper.input_size[1], swapper.input_size[0])
|
||
latent_shape = (1, 512)
|
||
dummy_inp = np.zeros(inp_shape, dtype=np.float32)
|
||
dummy_lat = np.zeros(latent_shape, dtype=np.float32)
|
||
|
||
ort_input = ort.OrtValue.ortvalue_from_numpy(dummy_inp, 'cuda', 0)
|
||
ort_latent = ort.OrtValue.ortvalue_from_numpy(dummy_lat, 'cuda', 0)
|
||
|
||
io = sess.io_binding()
|
||
io.bind_ortvalue_input(swapper.input_names[0], ort_input)
|
||
io.bind_ortvalue_input(swapper.input_names[1], ort_latent)
|
||
io.bind_output(swapper.output_names[0], 'cuda', 0)
|
||
|
||
# First run records the CUDA graph
|
||
sess.run_with_iobinding(io)
|
||
|
||
_cuda_graph_session['session'] = sess
|
||
_cuda_graph_session['io_binding'] = io
|
||
_cuda_graph_session['ort_input'] = ort_input
|
||
_cuda_graph_session['ort_latent'] = ort_latent
|
||
_cuda_graph_session['recorded'] = True
|
||
|
||
# Wrap swapper.session in an adapter instead of rebinding
|
||
# session.run. insightface's INSwapper.get() reads .run via the
|
||
# session attribute, so either works; the adapter survives any
|
||
# later attribute reads on the session and keeps the original
|
||
# session object untouched.
|
||
if not isinstance(swapper.session, _CudaGraphSessionAdapter):
|
||
swapper.session = _CudaGraphSessionAdapter(swapper.session)
|
||
|
||
import sys
|
||
print(f"[{NAME}] CUDA graph session initialized (swap model)")
|
||
sys.stdout.flush()
|
||
except Exception as e:
|
||
print(f"[{NAME}] CUDA graph init failed, using standard session: {e}")
|
||
_cuda_graph_session['recorded'] = False
|
||
|
||
|
||
def _cuda_graph_swap_inference(blob: np.ndarray, latent: np.ndarray) -> np.ndarray:
|
||
"""Run swap model via CUDA graph replay — minimal CPU overhead."""
|
||
cg = _cuda_graph_session
|
||
with _cuda_graph_lock:
|
||
cg['ort_input'].update_inplace(blob)
|
||
cg['ort_latent'].update_inplace(latent)
|
||
cg['session'].run_with_iobinding(cg['io_binding'])
|
||
return cg['io_binding'].get_outputs()[0].numpy()
|
||
|
||
|
||
def _fast_paste_back(target_img: Frame, bgr_fake: np.ndarray, aimg: np.ndarray, M: np.ndarray) -> Frame:
|
||
"""Paste bgr_fake back onto target_img via the inverse affine of M.
|
||
|
||
Restricts work to the face bbox in output coordinates and warps a
|
||
precomputed feathered alpha template per-frame instead of running a
|
||
size-scaled erode+blur on the warped mask. Cost is O(crop_area) regardless
|
||
of how much of the frame the face occupies.
|
||
"""
|
||
h, w = target_img.shape[:2]
|
||
face_h, face_w = aimg.shape[:2]
|
||
# inswapper's aligned-face space is square (128x128). _get_soft_alpha
|
||
# caches a single NxN template keyed by N, so fail loudly if that ever
|
||
# stops being true rather than silently mis-warping the alpha mask.
|
||
assert face_h == face_w, f"Expected square aligned face, got {face_h}x{face_w}"
|
||
IM = cv2.invertAffineTransform(M)
|
||
|
||
# Bbox in output coords from the affine corners of the aligned-face square.
|
||
corners = np.array(
|
||
[[0, 0], [face_w, 0], [face_w, face_h], [0, face_h]], dtype=np.float32
|
||
)
|
||
transformed = (IM[:, :2] @ corners.T).T + IM[:, 2]
|
||
x1 = int(np.floor(transformed[:, 0].min()))
|
||
x2 = int(np.ceil(transformed[:, 0].max()))
|
||
y1 = int(np.floor(transformed[:, 1].min()))
|
||
y2 = int(np.ceil(transformed[:, 1].max()))
|
||
if x1 >= x2 or y1 >= y2:
|
||
return target_img
|
||
|
||
# Small interpolation margin only — the feather is baked into the template.
|
||
pad = 2
|
||
y1p, y2p = max(0, y1 - pad), min(h, y2 + pad + 1)
|
||
x1p, x2p = max(0, x1 - pad), min(w, x2 + pad + 1)
|
||
|
||
IM_crop = IM.copy()
|
||
IM_crop[0, 2] -= x1p
|
||
IM_crop[1, 2] -= y1p
|
||
crop_w, crop_h = x2p - x1p, y2p - y1p
|
||
|
||
soft_alpha = _get_soft_alpha(face_h)
|
||
bgr_fake_crop = cv2.warpAffine(bgr_fake, IM_crop, (crop_w, crop_h), borderMode=cv2.BORDER_REPLICATE)
|
||
alpha_crop = cv2.warpAffine(soft_alpha, IM_crop, (crop_w, crop_h), borderValue=0)
|
||
|
||
target_crop = target_img[y1p:y2p, x1p:x2p]
|
||
|
||
if _HAS_TORCH_CUDA:
|
||
# Scale alpha to [0, 1] on device — cheaper to upload uint8 than float.
|
||
mask_t = torch.from_numpy(alpha_crop).cuda().float().mul_(1.0 / 255.0).unsqueeze(2)
|
||
fake_t = torch.from_numpy(bgr_fake_crop).float().cuda()
|
||
tgt_t = torch.from_numpy(target_crop).float().cuda()
|
||
blended = (mask_t * fake_t + (1.0 - mask_t) * tgt_t).to(torch.uint8).cpu().numpy()
|
||
target_img[y1p:y2p, x1p:x2p] = blended
|
||
else:
|
||
# Fused uint8 blend via cv2 SIMD — no float32 round-trip.
|
||
# Measured ~7-8× faster than the old numpy float32 path on a 1000×1000 crop.
|
||
alpha_3c = cv2.merge([alpha_crop, alpha_crop, alpha_crop])
|
||
inv_alpha = 255 - alpha_3c
|
||
a_fake = cv2.multiply(bgr_fake_crop, alpha_3c, scale=1.0 / 255.0)
|
||
a_tgt = cv2.multiply(target_crop, inv_alpha, scale=1.0 / 255.0)
|
||
target_img[y1p:y2p, x1p:x2p] = cv2.add(a_fake, a_tgt)
|
||
|
||
return target_img
|
||
|
||
|
||
def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
|
||
"""Optimized face swapping with better memory management and performance."""
|
||
face_swapper = get_face_swapper()
|
||
if face_swapper is None:
|
||
update_status("Face swapper model not loaded or failed to load. Skipping swap.", NAME)
|
||
return temp_frame
|
||
|
||
# Safety check for faces
|
||
if source_face is None or target_face is None:
|
||
return temp_frame
|
||
if not hasattr(source_face, 'normed_embedding') or source_face.normed_embedding is None:
|
||
return temp_frame
|
||
|
||
# _fast_paste_back writes in-place on the GPU path. Only copy when
|
||
# mouth_mask or opacity < 1 need an unmodified original.
|
||
opacity = getattr(modules.globals, "opacity", 1.0)
|
||
opacity = max(0.0, min(1.0, opacity))
|
||
mouth_mask_enabled = getattr(modules.globals, "mouth_mask", False)
|
||
poisson_blend_enabled = getattr(modules.globals, "poisson_blend", False)
|
||
# Poisson blend's seamlessClone needs the genuine pre-swap frame as its
|
||
# destination. Without this, original_frame aliases temp_frame, which
|
||
# _fast_paste_back mutates in place — so seamlessClone would blend the
|
||
# swapped face onto the already-swapped frame (no visible effect).
|
||
needs_original = opacity < 1.0 or mouth_mask_enabled or poisson_blend_enabled
|
||
if needs_original:
|
||
original_frame = temp_frame.copy()
|
||
else:
|
||
original_frame = temp_frame
|
||
|
||
if temp_frame.dtype != np.uint8:
|
||
temp_frame = np.clip(temp_frame, 0, 255).astype(np.uint8)
|
||
|
||
try:
|
||
if not temp_frame.flags['C_CONTIGUOUS']:
|
||
temp_frame = np.ascontiguousarray(temp_frame)
|
||
|
||
# Use paste_back=False and our optimized paste-back
|
||
if any("DmlExecutionProvider" in p for p in modules.globals.execution_providers):
|
||
with modules.globals.dml_lock:
|
||
bgr_fake, M = face_swapper.get(
|
||
temp_frame, target_face, source_face, paste_back=False
|
||
)
|
||
else:
|
||
bgr_fake, M = face_swapper.get(
|
||
temp_frame, target_face, source_face, paste_back=False
|
||
)
|
||
|
||
if bgr_fake is None:
|
||
return original_frame
|
||
|
||
if not isinstance(bgr_fake, np.ndarray):
|
||
return original_frame
|
||
|
||
# Pass a dummy aimg with correct shape — _fast_paste_back only uses aimg.shape
|
||
# to create the white mask. Avoids redundant norm_crop2 (~0.6ms).
|
||
_face_size = face_swapper.input_size[0]
|
||
_aimg_dummy = np.empty((_face_size, _face_size, 3), dtype=np.uint8)
|
||
|
||
swapped_frame = _fast_paste_back(temp_frame, bgr_fake, _aimg_dummy, M)
|
||
|
||
except Exception as e:
|
||
print(f"Error during face swap: {e}")
|
||
return original_frame
|
||
|
||
# --- Post-swap Processing (Masking, Opacity, etc.) ---
|
||
# Now, work with the guaranteed uint8 'swapped_frame'
|
||
|
||
if mouth_mask_enabled: # Check if mouth_mask is enabled
|
||
# Create a mask for the target face
|
||
face_mask = create_face_mask(target_face, original_frame) # Use original_frame for mask creation geometry
|
||
|
||
# Create the mouth mask using the ORIGINAL frame (before swap) for cutout
|
||
mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon = (
|
||
create_lower_mouth_mask(target_face, original_frame) # Use original_frame for real mouth cutout
|
||
)
|
||
|
||
# Apply the mouth area only if mouth_cutout exists
|
||
if mouth_cutout is not None and mouth_box != (0,0,0,0):
|
||
# Apply mouth area (from original) onto the 'swapped_frame'
|
||
swapped_frame = apply_mouth_area(
|
||
swapped_frame, mouth_cutout, mouth_box, face_mask, lower_lip_polygon
|
||
)
|
||
|
||
# Draw bounding box only while slider is being dragged
|
||
if getattr(modules.globals, "show_mouth_mask_box", False):
|
||
mouth_mask_data = (mouth_mask, mouth_cutout, mouth_box, lower_lip_polygon)
|
||
swapped_frame = draw_mouth_mask_visualization(
|
||
swapped_frame, target_face, mouth_mask_data
|
||
)
|
||
|
||
# --- Poisson Blending ---
|
||
# Mask derived from the swap's own affine (M) + swapped pixels (bgr_fake),
|
||
# so it tracks the swapped face exactly per-frame — no landmark jitter,
|
||
# no EMA, no lag. See _apply_poisson_blend.
|
||
if getattr(modules.globals, "poisson_blend", False):
|
||
swapped_frame = _apply_poisson_blend(
|
||
swapped_frame, original_frame, target_face, M, bgr_fake
|
||
)
|
||
|
||
# Apply opacity blend between the original frame and the swapped frame
|
||
if opacity >= 1.0:
|
||
return swapped_frame.astype(np.uint8)
|
||
|
||
# Blend the original_frame with the (potentially mouth-masked) swapped_frame
|
||
final_swapped_frame = gpu_add_weighted(original_frame.astype(np.uint8), 1 - opacity, swapped_frame.astype(np.uint8), opacity, 0)
|
||
return final_swapped_frame.astype(np.uint8)
|
||
|
||
|
||
# --- START: Mac M1-M5 Optimized Face Detection ---
|
||
def get_faces_optimized(frame: Frame, use_cache: bool = True) -> Optional[List[Face]]:
|
||
"""Optimized face detection for live mode on Apple Silicon"""
|
||
global LAST_DETECTION_TIME, FACE_DETECTION_CACHE
|
||
|
||
if not use_cache or not IS_APPLE_SILICON:
|
||
# Standard detection
|
||
if modules.globals.many_faces:
|
||
return get_many_faces(frame)
|
||
else:
|
||
face = get_one_face(frame)
|
||
return [face] if face else None
|
||
|
||
# Adaptive detection rate for live mode
|
||
current_time = time.time()
|
||
time_since_last = current_time - LAST_DETECTION_TIME
|
||
|
||
# Skip detection if too soon (adaptive frame skipping)
|
||
if time_since_last < DETECTION_INTERVAL and FACE_DETECTION_CACHE:
|
||
return FACE_DETECTION_CACHE.get('faces')
|
||
|
||
# Perform detection
|
||
LAST_DETECTION_TIME = current_time
|
||
if modules.globals.many_faces:
|
||
faces = get_many_faces(frame)
|
||
else:
|
||
face = get_one_face(frame)
|
||
faces = [face] if face else None
|
||
|
||
# Cache results
|
||
FACE_DETECTION_CACHE['faces'] = faces
|
||
FACE_DETECTION_CACHE['timestamp'] = current_time
|
||
|
||
return faces
|
||
# --- END: Mac M1-M5 Optimized Face Detection ---
|
||
|
||
# --- START: Helper function for interpolation and sharpening ---
|
||
def apply_post_processing(current_frame: Frame, swapped_face_bboxes: List[np.ndarray]) -> Frame:
|
||
"""Applies sharpening and interpolation with Apple Silicon optimizations."""
|
||
global PREVIOUS_FRAME_RESULT
|
||
|
||
sharpness_value = getattr(modules.globals, "sharpness", 0.0)
|
||
enable_interpolation = getattr(modules.globals, "enable_interpolation", False)
|
||
|
||
# Skip copy when no post-processing is active
|
||
if sharpness_value <= 0.0 and not enable_interpolation:
|
||
PREVIOUS_FRAME_RESULT = None
|
||
return current_frame
|
||
|
||
processed_frame = current_frame.copy()
|
||
|
||
# 1. Apply Sharpening (if enabled) with optimized kernel for Apple Silicon
|
||
sharpness_value = getattr(modules.globals, "sharpness", 0.0)
|
||
if sharpness_value > 0.0 and swapped_face_bboxes:
|
||
height, width = processed_frame.shape[:2]
|
||
for bbox in swapped_face_bboxes:
|
||
# Ensure bbox is iterable and has 4 elements
|
||
if not hasattr(bbox, '__iter__') or len(bbox) != 4:
|
||
# print(f"Warning: Invalid bbox format for sharpening: {bbox}") # Debug
|
||
continue
|
||
x1, y1, x2, y2 = bbox
|
||
# Ensure coordinates are integers and within bounds
|
||
try:
|
||
x1, y1 = max(0, int(x1)), max(0, int(y1))
|
||
x2, y2 = min(width, int(x2)), min(height, int(y2))
|
||
except ValueError:
|
||
# print(f"Warning: Could not convert bbox coordinates to int: {bbox}") # Debug
|
||
continue
|
||
|
||
|
||
if x2 <= x1 or y2 <= y1:
|
||
continue
|
||
|
||
face_region = processed_frame[y1:y2, x1:x2]
|
||
if face_region.size == 0: continue
|
||
|
||
# Apply sharpening (GPU-accelerated when CUDA OpenCV is available)
|
||
try:
|
||
sigma = 2 if IS_APPLE_SILICON else 3
|
||
sharpened_region = gpu_sharpen(face_region, strength=sharpness_value, sigma=sigma)
|
||
processed_frame[y1:y2, x1:x2] = sharpened_region
|
||
except cv2.error:
|
||
pass
|
||
|
||
|
||
# 2. Apply Interpolation (if enabled)
|
||
enable_interpolation = getattr(modules.globals, "enable_interpolation", False)
|
||
interpolation_weight = getattr(modules.globals, "interpolation_weight", 0.2)
|
||
|
||
final_frame = processed_frame # Start with the current (potentially sharpened) frame
|
||
|
||
if enable_interpolation and 0 < interpolation_weight < 1:
|
||
if PREVIOUS_FRAME_RESULT is not None and PREVIOUS_FRAME_RESULT.shape == processed_frame.shape and PREVIOUS_FRAME_RESULT.dtype == processed_frame.dtype:
|
||
# Perform interpolation
|
||
try:
|
||
final_frame = gpu_add_weighted(
|
||
PREVIOUS_FRAME_RESULT, 1.0 - interpolation_weight,
|
||
processed_frame, interpolation_weight,
|
||
0
|
||
)
|
||
# Ensure final frame is uint8
|
||
final_frame = np.clip(final_frame, 0, 255).astype(np.uint8)
|
||
except cv2.error as interp_e:
|
||
# print(f"Warning: OpenCV error during interpolation: {interp_e}") # Debug
|
||
final_frame = processed_frame # Use current frame if interpolation fails
|
||
PREVIOUS_FRAME_RESULT = None # Reset state if error occurs
|
||
|
||
# Update the state for the next frame *with the interpolated result*
|
||
PREVIOUS_FRAME_RESULT = final_frame.copy()
|
||
else:
|
||
# If previous frame invalid or doesn't match, use current frame and update state
|
||
if PREVIOUS_FRAME_RESULT is not None and PREVIOUS_FRAME_RESULT.shape != processed_frame.shape:
|
||
# print("Info: Frame shape changed, resetting interpolation state.") # Debug
|
||
pass
|
||
PREVIOUS_FRAME_RESULT = processed_frame.copy()
|
||
else:
|
||
# Interpolation is off or weight is invalid — no need to cache
|
||
PREVIOUS_FRAME_RESULT = None
|
||
|
||
|
||
return final_frame
|
||
# --- END: Helper function for interpolation and sharpening ---
|
||
|
||
|
||
def process_frame(source_face: Face, temp_frame: Frame, target_face: Face = None) -> Frame:
|
||
"""Process a single frame, swapping source_face onto detected target(s).
|
||
|
||
Args:
|
||
target_face: Pre-detected target face. When provided, skips the
|
||
internal face detection call (saves ~30-40ms per frame).
|
||
Ignored when many_faces mode is active.
|
||
"""
|
||
if getattr(modules.globals, "opacity", 1.0) == 0:
|
||
global PREVIOUS_FRAME_RESULT
|
||
PREVIOUS_FRAME_RESULT = None
|
||
return temp_frame
|
||
|
||
processed_frame = temp_frame
|
||
swapped_face_bboxes = []
|
||
|
||
if modules.globals.many_faces:
|
||
many_faces = get_many_faces(processed_frame)
|
||
if many_faces:
|
||
current_swap_target = processed_frame.copy()
|
||
for face in many_faces:
|
||
current_swap_target = swap_face(source_face, face, current_swap_target)
|
||
if face is not None and hasattr(face, "bbox") and face.bbox is not None:
|
||
swapped_face_bboxes.append(face.bbox.astype(int))
|
||
processed_frame = current_swap_target
|
||
else:
|
||
if target_face is None:
|
||
target_face = get_one_face(processed_frame)
|
||
if target_face:
|
||
processed_frame = swap_face(source_face, target_face, processed_frame)
|
||
if hasattr(target_face, "bbox") and target_face.bbox is not None:
|
||
swapped_face_bboxes.append(target_face.bbox.astype(int))
|
||
|
||
final_frame = apply_post_processing(processed_frame, swapped_face_bboxes)
|
||
return final_frame
|
||
|
||
|
||
def process_frame_v2(temp_frame: Frame, temp_frame_path: str = "") -> Frame:
|
||
"""Handles complex mapping scenarios (map_faces=True) and live streams."""
|
||
if getattr(modules.globals, "opacity", 1.0) == 0:
|
||
# If opacity is 0, no swap happens, so no post-processing needed.
|
||
# Also reset interpolation state if it was active.
|
||
global PREVIOUS_FRAME_RESULT
|
||
PREVIOUS_FRAME_RESULT = None
|
||
return temp_frame
|
||
|
||
processed_frame = temp_frame # Start with the input frame
|
||
swapped_face_bboxes = [] # Keep track of where swaps happened
|
||
|
||
# Determine source/target pairs based on mode
|
||
source_target_pairs = []
|
||
|
||
# Ensure maps exist before accessing them
|
||
source_target_map = getattr(modules.globals, "source_target_map", None)
|
||
simple_map = getattr(modules.globals, "simple_map", None)
|
||
|
||
# Check if target is a file path (image or video) or live stream
|
||
is_file_target = modules.globals.target_path and (is_image(modules.globals.target_path) or is_video(modules.globals.target_path))
|
||
|
||
if is_file_target:
|
||
# Processing specific image or video file with pre-analyzed maps
|
||
if source_target_map:
|
||
if modules.globals.many_faces:
|
||
source_face = default_source_face() # Use default source for all targets
|
||
if source_face:
|
||
for map_data in source_target_map:
|
||
if is_image(modules.globals.target_path):
|
||
target_info = map_data.get("target", {})
|
||
if target_info: # Check if target info exists
|
||
target_face = target_info.get("face")
|
||
if target_face:
|
||
source_target_pairs.append((source_face, target_face))
|
||
elif is_video(modules.globals.target_path):
|
||
# Find faces for the current frame_path in video map
|
||
target_frames_data = map_data.get("target_faces_in_frame", [])
|
||
if target_frames_data: # Check if frame data exists
|
||
target_frames = [f for f in target_frames_data if f and f.get("location") == temp_frame_path]
|
||
for frame_data in target_frames:
|
||
faces_in_frame = frame_data.get("faces", [])
|
||
if faces_in_frame: # Check if faces exist
|
||
for target_face in faces_in_frame:
|
||
source_target_pairs.append((source_face, target_face))
|
||
else: # Single face or specific mapping
|
||
for map_data in source_target_map:
|
||
source_info = map_data.get("source", {})
|
||
if not source_info: continue # Skip if no source info
|
||
source_face = source_info.get("face")
|
||
if not source_face: continue # Skip if no source defined for this map entry
|
||
|
||
if is_image(modules.globals.target_path):
|
||
target_info = map_data.get("target", {})
|
||
if target_info:
|
||
target_face = target_info.get("face")
|
||
if target_face:
|
||
source_target_pairs.append((source_face, target_face))
|
||
elif is_video(modules.globals.target_path):
|
||
target_frames_data = map_data.get("target_faces_in_frame", [])
|
||
if target_frames_data:
|
||
target_frames = [f for f in target_frames_data if f and f.get("location") == temp_frame_path]
|
||
for frame_data in target_frames:
|
||
faces_in_frame = frame_data.get("faces", [])
|
||
if faces_in_frame:
|
||
for target_face in faces_in_frame:
|
||
source_target_pairs.append((source_face, target_face))
|
||
|
||
else:
|
||
# Live stream or webcam processing (analyze faces on the fly)
|
||
detected_faces = get_many_faces(processed_frame)
|
||
if detected_faces:
|
||
if modules.globals.many_faces:
|
||
source_face = default_source_face() # Use default source for all detected targets
|
||
if source_face:
|
||
for target_face in detected_faces:
|
||
source_target_pairs.append((source_face, target_face))
|
||
elif simple_map:
|
||
# Use simple_map (source_faces <-> target_embeddings)
|
||
source_faces = simple_map.get("source_faces", [])
|
||
target_embeddings = simple_map.get("target_embeddings", [])
|
||
|
||
if source_faces and target_embeddings and len(source_faces) == len(target_embeddings):
|
||
# Match detected faces to the closest target embedding
|
||
if len(detected_faces) <= len(target_embeddings):
|
||
# More targets defined than detected - match each detected face
|
||
for detected_face in detected_faces:
|
||
if detected_face.normed_embedding is None: continue
|
||
closest_idx, _ = find_closest_centroid(target_embeddings, detected_face.normed_embedding)
|
||
if 0 <= closest_idx < len(source_faces):
|
||
source_target_pairs.append((source_faces[closest_idx], detected_face))
|
||
else:
|
||
# More faces detected than targets defined - match each target embedding to closest detected face
|
||
detected_embeddings = [f.normed_embedding for f in detected_faces if f.normed_embedding is not None]
|
||
detected_faces_with_embedding = [f for f in detected_faces if f.normed_embedding is not None]
|
||
if not detected_embeddings: return processed_frame # No embeddings to match
|
||
|
||
for i, target_embedding in enumerate(target_embeddings):
|
||
if 0 <= i < len(source_faces): # Ensure source face exists for this embedding
|
||
closest_idx, _ = find_closest_centroid(detected_embeddings, target_embedding)
|
||
if 0 <= closest_idx < len(detected_faces_with_embedding):
|
||
source_target_pairs.append((source_faces[i], detected_faces_with_embedding[closest_idx]))
|
||
else: # Fallback: if no map, use default source for the single detected face (if any)
|
||
source_face = default_source_face()
|
||
target_face = get_one_face(processed_frame, detected_faces) # Use faces already detected
|
||
if source_face and target_face:
|
||
source_target_pairs.append((source_face, target_face))
|
||
|
||
|
||
# Perform swaps based on the collected pairs
|
||
current_swap_target = processed_frame.copy() # Apply swaps sequentially
|
||
for source_face, target_face in source_target_pairs:
|
||
if source_face and target_face:
|
||
current_swap_target = swap_face(source_face, target_face, current_swap_target)
|
||
if target_face is not None and hasattr(target_face, "bbox") and target_face.bbox is not None:
|
||
swapped_face_bboxes.append(target_face.bbox.astype(int))
|
||
processed_frame = current_swap_target # Assign final result
|
||
|
||
|
||
# Apply sharpening and interpolation
|
||
final_frame = apply_post_processing(processed_frame, swapped_face_bboxes)
|
||
|
||
return final_frame
|
||
|
||
|
||
def process_frames(
|
||
source_path: str, temp_frame_paths: List[str], progress: Any = None
|
||
) -> None:
|
||
"""
|
||
Processes a list of frame paths (typically for video).
|
||
Optimized with better memory management and caching.
|
||
Iterates through frames, applies the appropriate swapping logic based on globals,
|
||
and saves the result back to the frame path. Handles multi-threading via caller.
|
||
"""
|
||
# Determine which processing function to use based on map_faces global setting
|
||
use_v2 = getattr(modules.globals, "map_faces", False)
|
||
source_face = None # Initialize source_face
|
||
|
||
# --- Pre-load source face only if needed (Simple Mode: map_faces=False) ---
|
||
if not use_v2:
|
||
if not source_path or not os.path.exists(source_path):
|
||
update_status(f"Error: Source path invalid or not provided for simple mode: {source_path}", NAME)
|
||
# Log the error but allow proceeding; subsequent check will stop processing.
|
||
else:
|
||
try:
|
||
source_img = cv2.imread(source_path)
|
||
if source_img is None:
|
||
# Specific error for file reading failure
|
||
update_status(f"Error reading source image file {source_path}. Please check the path and file integrity.", NAME)
|
||
else:
|
||
source_face = get_one_face(source_img)
|
||
if source_face is None:
|
||
# Specific message for no face detected after successful read
|
||
update_status(f"Warning: Successfully read source image {source_path}, but no face was detected. Swaps will be skipped.", NAME)
|
||
# Free memory immediately after extracting face
|
||
del source_img
|
||
except Exception as e:
|
||
# Print the specific exception caught
|
||
import traceback
|
||
print(f"{NAME}: Caught exception during source image processing for {source_path}:")
|
||
traceback.print_exc() # Print the full traceback
|
||
update_status(f"Error during source image reading or analysis {source_path}: {e}", NAME)
|
||
# Log general exception during the process
|
||
|
||
total_frames = len(temp_frame_paths)
|
||
# update_status(f"Processing {total_frames} frames. Use V2 (map_faces): {use_v2}", NAME) # Optional Debug
|
||
|
||
# --- Stop processing entirely if in Simple Mode and source face is invalid ---
|
||
if not use_v2 and source_face is None:
|
||
update_status(f"Halting video processing: Invalid or no face detected in source image for simple mode.", NAME)
|
||
if progress:
|
||
# Ensure the progress bar completes if it was started
|
||
remaining_updates = total_frames - progress.n if hasattr(progress, 'n') else total_frames
|
||
if remaining_updates > 0:
|
||
progress.update(remaining_updates)
|
||
return # Exit the function entirely
|
||
|
||
# --- Process each frame path provided in the list ---
|
||
# Note: In the current core.py multi_process_frame, temp_frame_paths will usually contain only ONE path per call.
|
||
for i, temp_frame_path in enumerate(temp_frame_paths):
|
||
# update_status(f"Processing frame {i+1}/{total_frames}: {os.path.basename(temp_frame_path)}", NAME) # Optional Debug
|
||
|
||
# Read the target frame
|
||
temp_frame = None
|
||
try:
|
||
temp_frame = cv2.imread(temp_frame_path)
|
||
if temp_frame is None:
|
||
print(f"{NAME}: Error: Could not read frame: {temp_frame_path}, skipping.")
|
||
if progress: progress.update(1)
|
||
continue # Skip this frame if read fails
|
||
except Exception as read_e:
|
||
print(f"{NAME}: Error reading frame {temp_frame_path}: {read_e}, skipping.")
|
||
if progress: progress.update(1)
|
||
continue
|
||
|
||
# Select processing function and execute
|
||
result_frame = None
|
||
try:
|
||
if use_v2:
|
||
# V2 uses global maps and needs the frame path for lookup in video mode
|
||
# update_status(f"Using process_frame_v2 for: {os.path.basename(temp_frame_path)}", NAME) # Optional Debug
|
||
result_frame = process_frame_v2(temp_frame, temp_frame_path)
|
||
else:
|
||
# Simple mode uses the pre-loaded source_face (already checked for validity above)
|
||
# update_status(f"Using process_frame (simple) for: {os.path.basename(temp_frame_path)}", NAME) # Optional Debug
|
||
result_frame = process_frame(source_face, temp_frame) # source_face is guaranteed to be valid here
|
||
|
||
# Check if processing actually returned a frame
|
||
if result_frame is None:
|
||
print(f"{NAME}: Warning: Processing returned None for frame {temp_frame_path}. Using original.")
|
||
result_frame = temp_frame
|
||
|
||
except Exception as proc_e:
|
||
print(f"{NAME}: Error processing frame {temp_frame_path}: {proc_e}")
|
||
# import traceback # Optional for detailed debugging
|
||
# traceback.print_exc()
|
||
result_frame = temp_frame # Use original frame on processing error
|
||
|
||
# Write the result back to the same frame path with optimized compression
|
||
try:
|
||
# Use PNG compression level 3 (faster) instead of default 9
|
||
write_success = cv2.imwrite(temp_frame_path, result_frame, [cv2.IMWRITE_PNG_COMPRESSION, 3])
|
||
if not write_success:
|
||
print(f"{NAME}: Error: Failed to write processed frame to {temp_frame_path}")
|
||
except Exception as write_e:
|
||
print(f"{NAME}: Error writing frame {temp_frame_path}: {write_e}")
|
||
|
||
# Free memory immediately after processing
|
||
del temp_frame
|
||
if result_frame is not None:
|
||
del result_frame
|
||
|
||
# Update progress bar
|
||
if progress:
|
||
progress.update(1)
|
||
# else: # Basic console progress (optional)
|
||
# if (i + 1) % 10 == 0 or (i + 1) == total_frames: # Update every 10 frames or on last frame
|
||
# update_status(f"Processed frame {i+1}/{total_frames}", NAME)
|
||
|
||
|
||
def process_image(source_path: str, target_path: str, output_path: str) -> None:
|
||
"""Processes a single target image."""
|
||
# --- Reset interpolation state for single image processing ---
|
||
global PREVIOUS_FRAME_RESULT
|
||
PREVIOUS_FRAME_RESULT = None
|
||
# ---
|
||
|
||
use_v2 = getattr(modules.globals, "map_faces", False)
|
||
|
||
# Read target first
|
||
try:
|
||
target_frame = cv2.imread(target_path)
|
||
if target_frame is None:
|
||
update_status(f"Error: Could not read target image: {target_path}", NAME)
|
||
return
|
||
except Exception as read_e:
|
||
update_status(f"Error reading target image {target_path}: {read_e}", NAME)
|
||
return
|
||
|
||
result = None
|
||
try:
|
||
if use_v2:
|
||
if getattr(modules.globals, "many_faces", False):
|
||
update_status("Processing image with 'map_faces' and 'many_faces'. Using pre-analysis map.", NAME)
|
||
# V2 processes based on global maps, doesn't need source_path here directly
|
||
# Assumes maps are pre-populated. Pass target_path for map lookup.
|
||
result = process_frame_v2(target_frame, target_path)
|
||
|
||
else: # Simple mode
|
||
try:
|
||
source_img = cv2.imread(source_path)
|
||
if source_img is None:
|
||
update_status(f"Error: Could not read source image: {source_path}", NAME)
|
||
return
|
||
source_face = get_one_face(source_img)
|
||
if not source_face:
|
||
update_status(f"Error: No face found in source image: {source_path}", NAME)
|
||
return
|
||
except Exception as src_e:
|
||
update_status(f"Error reading or analyzing source image {source_path}: {src_e}", NAME)
|
||
return
|
||
|
||
result = process_frame(source_face, target_frame)
|
||
|
||
# Write the result if processing was successful
|
||
if result is not None:
|
||
write_success = cv2.imwrite(output_path, result)
|
||
if write_success:
|
||
update_status(f"Output image saved to: {output_path}", NAME)
|
||
else:
|
||
update_status(f"Error: Failed to write output image to {output_path}", NAME)
|
||
else:
|
||
# This case might occur if process_frame/v2 returns None unexpectedly
|
||
update_status("Image processing failed (result was None).", NAME)
|
||
|
||
except Exception as proc_e:
|
||
update_status(f"Error during image processing: {proc_e}", NAME)
|
||
# import traceback
|
||
# traceback.print_exc()
|
||
|
||
|
||
def process_video(source_path: str, temp_frame_paths: List[str]) -> None:
|
||
"""Sets up and calls the frame processing for video."""
|
||
# --- Reset interpolation state before starting video processing ---
|
||
global PREVIOUS_FRAME_RESULT
|
||
PREVIOUS_FRAME_RESULT = None
|
||
# ---
|
||
|
||
mode_desc = "'map_faces'" if getattr(modules.globals, "map_faces", False) else "'simple'"
|
||
if getattr(modules.globals, "map_faces", False) and getattr(modules.globals, "many_faces", False):
|
||
mode_desc += " and 'many_faces'. Using pre-analysis map."
|
||
update_status(f"Processing video with {mode_desc} mode.", NAME)
|
||
|
||
# Pass the correct source_path (needed for simple mode in process_frames)
|
||
# The core processing logic handles calling the right frame function (process_frames)
|
||
modules.processors.frame.core.process_video(
|
||
source_path, temp_frame_paths, process_frames # Pass the newly modified process_frames
|
||
)
|
||
|
||
# ==========================
|
||
# MASKING FUNCTIONS (Mostly unchanged, added safety checks and minor improvements)
|
||
# ==========================
|
||
|
||
def create_lower_mouth_mask(
|
||
face: Face, frame: Frame
|
||
) -> (np.ndarray, np.ndarray, tuple, np.ndarray):
|
||
mask = np.zeros(frame.shape[:2], dtype=np.uint8)
|
||
mouth_cutout = None
|
||
lower_lip_polygon = None # Initialize
|
||
mouth_box = (0,0,0,0) # Initialize
|
||
|
||
# Validate face and landmarks
|
||
if face is None or not hasattr(face, 'landmark_2d_106'):
|
||
# print("Warning: Invalid face object passed to create_lower_mouth_mask.")
|
||
return mask, mouth_cutout, mouth_box, lower_lip_polygon
|
||
|
||
landmarks = face.landmark_2d_106
|
||
|
||
# Check landmark validity
|
||
if landmarks is None or not isinstance(landmarks, np.ndarray) or landmarks.shape[0] < 106:
|
||
# print("Warning: Invalid or insufficient landmarks for mouth mask.")
|
||
return mask, mouth_cutout, mouth_box, lower_lip_polygon
|
||
|
||
try: # Wrap main logic in try-except
|
||
# Outer mouth/lip landmarks (52-63) — the lip outline only. In this
|
||
# repo's insightface 2d106 convention these 12 points, taken in index
|
||
# order, form a SIMPLE (non-self-intersecting) closed polygon that
|
||
# cv2.fillPoly fills as one solid region directly over the mouth.
|
||
# This is the last shipped, known-good landmark set; range(52,72)
|
||
# (the regression) added the inner-lip points and made the path
|
||
# self-intersect, and the ancient [65,66,62,...,0,8,7...] indices
|
||
# belong to a different/older landmark convention (they land on the
|
||
# inner lip + random jaw points, so the mask never covers the mouth).
|
||
lower_lip_order = list(range(52, 64))
|
||
|
||
# All indices must be valid for the loaded landmark set
|
||
if max(lower_lip_order) >= landmarks.shape[0]:
|
||
# print(f"Warning: Landmark index out of bounds for shape {landmarks.shape[0]}.")
|
||
return mask, mouth_cutout, mouth_box, lower_lip_polygon
|
||
|
||
lower_lip_landmarks = landmarks[lower_lip_order].astype(np.float32)
|
||
|
||
# Filter out potential NaN or Inf values in landmarks
|
||
if not np.all(np.isfinite(lower_lip_landmarks)):
|
||
# print("Warning: Non-finite values detected in lower lip landmarks.")
|
||
return mask, mouth_cutout, mouth_box, lower_lip_polygon
|
||
|
||
center = np.mean(lower_lip_landmarks, axis=0)
|
||
if not np.all(np.isfinite(center)): # Check center calculation
|
||
# print("Warning: Could not calculate valid center for mouth mask.")
|
||
return mask, mouth_cutout, mouth_box, lower_lip_polygon
|
||
|
||
# Drive expansion from the Mouth Mask slider so it actually responds.
|
||
# The known-good version expanded by the now-unused mask_down_size
|
||
# constant, which is why the slider had no effect.
|
||
# s: 0.0 (slider ~0, tight lip outline) -> 1.0 (slider 100, mouth->chin).
|
||
mouth_mask_size = getattr(modules.globals, "mouth_mask_size", 0.0) # 0-100 slider
|
||
s = max(0.0, min(1.0, mouth_mask_size / 100.0))
|
||
|
||
# Uniformly scaling a simple polygon about its centroid keeps it simple
|
||
# (no self-intersection). x grows with expansion_factor; points below
|
||
# centre (toward the chin) also get an extra downward stretch so high
|
||
# slider values reach from the mouth down to the chin.
|
||
expansion_factor = 1.0 + s * 2.0 # 1.0x -> 3.0x
|
||
chin_bias = 1.0 + s * 2.0 # extra downward stretch
|
||
offsets = lower_lip_landmarks - center
|
||
scale_y = np.where(offsets[:, 1] > 0,
|
||
expansion_factor * chin_bias, expansion_factor)
|
||
expanded_landmarks = lower_lip_landmarks.copy()
|
||
expanded_landmarks[:, 0] = center[0] + offsets[:, 0] * expansion_factor
|
||
expanded_landmarks[:, 1] = center[1] + offsets[:, 1] * scale_y
|
||
|
||
# Ensure landmarks are finite after adjustments
|
||
if not np.all(np.isfinite(expanded_landmarks)):
|
||
# print("Warning: Non-finite values detected after expanding landmarks.")
|
||
return mask, mouth_cutout, mouth_box, lower_lip_polygon
|
||
|
||
expanded_landmarks = expanded_landmarks.astype(np.int32)
|
||
|
||
min_x, min_y = np.min(expanded_landmarks, axis=0)
|
||
max_x, max_y = np.max(expanded_landmarks, axis=0)
|
||
|
||
# Add padding *after* initial min/max calculation
|
||
padding_ratio = 0.1 # Percentage padding
|
||
padding_x = int((max_x - min_x) * padding_ratio)
|
||
padding_y = int((max_y - min_y) * padding_ratio) # Use y-range for y-padding
|
||
|
||
# Apply padding and clamp to frame boundaries
|
||
frame_h, frame_w = frame.shape[:2]
|
||
min_x = max(0, min_x - padding_x)
|
||
min_y = max(0, min_y - padding_y)
|
||
max_x = min(frame_w, max_x + padding_x)
|
||
max_y = min(frame_h, max_y + padding_y)
|
||
|
||
|
||
if max_x > min_x and max_y > min_y:
|
||
# Create the mask ROI
|
||
mask_roi_h = max_y - min_y
|
||
mask_roi_w = max_x - min_x
|
||
mask_roi = np.zeros((mask_roi_h, mask_roi_w), dtype=np.uint8)
|
||
|
||
# Shift polygon coordinates relative to the ROI's top-left corner
|
||
polygon_relative_to_roi = expanded_landmarks - [min_x, min_y]
|
||
|
||
# Draw polygon on the ROI mask
|
||
cv2.fillPoly(mask_roi, [polygon_relative_to_roi], 255)
|
||
|
||
# Apply Gaussian blur (GPU-accelerated when available)
|
||
blur_k_size = getattr(modules.globals, "mask_blur_kernel", 15) # Default 15
|
||
blur_k_size = max(1, blur_k_size // 2 * 2 + 1) # Ensure odd
|
||
mask_roi = gpu_gaussian_blur(mask_roi, (blur_k_size, blur_k_size), 0)
|
||
|
||
# Place the mask ROI in the full-sized mask
|
||
mask[min_y:max_y, min_x:max_x] = mask_roi
|
||
|
||
# Extract the masked area from the *original* frame
|
||
mouth_cutout = frame[min_y:max_y, min_x:max_x].copy()
|
||
|
||
lower_lip_polygon = expanded_landmarks # Return polygon in original frame coords
|
||
mouth_box = (min_x, min_y, max_x, max_y) # Return the calculated box
|
||
else:
|
||
# print("Warning: Invalid mouth mask bounding box after padding/clamping.") # Optional debug
|
||
pass
|
||
|
||
except IndexError as idx_e:
|
||
# print(f"Warning: Landmark index out of bounds during mouth mask creation: {idx_e}") # Optional debug
|
||
pass
|
||
except Exception as e:
|
||
print(f"Error in create_lower_mouth_mask: {e}") # Print unexpected errors
|
||
# import traceback
|
||
# traceback.print_exc()
|
||
pass
|
||
|
||
# Return values, ensuring defaults if errors occurred
|
||
return mask, mouth_cutout, mouth_box, lower_lip_polygon
|
||
|
||
|
||
def draw_mouth_mask_visualization(
|
||
frame: Frame, face: Face, mouth_mask_data: tuple
|
||
) -> Frame:
|
||
|
||
# Validate inputs
|
||
if frame is None or face is None or mouth_mask_data is None or len(mouth_mask_data) != 4:
|
||
return frame # Return original frame if inputs are invalid
|
||
|
||
mask, mouth_cutout, box, lower_lip_polygon = mouth_mask_data
|
||
(min_x, min_y, max_x, max_y) = box
|
||
|
||
# Check if polygon is valid for drawing
|
||
if lower_lip_polygon is None or not isinstance(lower_lip_polygon, np.ndarray) or len(lower_lip_polygon) < 3:
|
||
return frame # Cannot draw without a valid polygon
|
||
|
||
vis_frame = frame.copy()
|
||
height, width = vis_frame.shape[:2]
|
||
|
||
# Ensure box coordinates are valid integers within frame bounds
|
||
try:
|
||
min_x, min_y = max(0, int(min_x)), max(0, int(min_y))
|
||
max_x, max_y = min(width, int(max_x)), min(height, int(max_y))
|
||
except ValueError:
|
||
# print("Warning: Invalid coordinates for mask visualization box.")
|
||
return frame
|
||
|
||
if max_x <= min_x or max_y <= min_y:
|
||
return frame # Invalid box
|
||
|
||
# Draw the lower lip polygon (green outline)
|
||
try:
|
||
# Ensure polygon points are within frame boundaries before drawing
|
||
safe_polygon = lower_lip_polygon.copy()
|
||
safe_polygon[:, 0] = np.clip(safe_polygon[:, 0], 0, width - 1)
|
||
safe_polygon[:, 1] = np.clip(safe_polygon[:, 1], 0, height - 1)
|
||
cv2.polylines(vis_frame, [safe_polygon.astype(np.int32)], isClosed=True, color=(0, 255, 0), thickness=2)
|
||
except Exception as e:
|
||
print(f"Error drawing polygon for visualization: {e}") # Optional debug
|
||
pass
|
||
|
||
# Draw bounding box (red rectangle)
|
||
cv2.rectangle(vis_frame, (min_x, min_y), (max_x, max_y), (0, 0, 255), 2)
|
||
|
||
# Optional: Add labels
|
||
label_pos_y = min_y - 10 if min_y > 20 else max_y + 15 # Adjust position based on box location
|
||
label_pos_x = min_x
|
||
try:
|
||
cv2.putText(vis_frame, "Mouth Mask", (label_pos_x, label_pos_y),
|
||
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1, cv2.LINE_AA)
|
||
except Exception as e:
|
||
# print(f"Error drawing text for visualization: {e}") # Optional debug
|
||
pass
|
||
|
||
|
||
return vis_frame
|
||
|
||
|
||
def apply_mouth_area(
|
||
frame: np.ndarray,
|
||
mouth_cutout: np.ndarray,
|
||
mouth_box: tuple,
|
||
face_mask: np.ndarray, # Full face mask (for blending edges)
|
||
mouth_polygon: np.ndarray, # Specific polygon for the mouth area itself
|
||
) -> np.ndarray:
|
||
|
||
# Basic validation
|
||
if (frame is None or mouth_cutout is None or mouth_box is None or
|
||
face_mask is None or mouth_polygon is None):
|
||
# print("Warning: Invalid input (None value) to apply_mouth_area") # Optional debug
|
||
return frame
|
||
if (mouth_cutout.size == 0 or face_mask.size == 0 or len(mouth_polygon) < 3):
|
||
# print("Warning: Invalid input (empty array/polygon) to apply_mouth_area") # Optional debug
|
||
return frame
|
||
|
||
try: # Wrap main logic in try-except
|
||
min_x, min_y, max_x, max_y = map(int, mouth_box) # Ensure integer coords
|
||
box_width = max_x - min_x
|
||
box_height = max_y - min_y
|
||
|
||
# Check box validity
|
||
if box_width <= 0 or box_height <= 0:
|
||
# print("Warning: Invalid mouth box dimensions in apply_mouth_area.")
|
||
return frame
|
||
|
||
# Define the Region of Interest (ROI) on the target frame (swapped frame)
|
||
frame_h, frame_w = frame.shape[:2]
|
||
# Clamp coordinates strictly within frame boundaries
|
||
min_y, max_y = max(0, min_y), min(frame_h, max_y)
|
||
min_x, max_x = max(0, min_x), min(frame_w, max_x)
|
||
|
||
# Recalculate box dimensions based on clamped coords
|
||
box_width = max_x - min_x
|
||
box_height = max_y - min_y
|
||
if box_width <= 0 or box_height <= 0:
|
||
# print("Warning: ROI became invalid after clamping in apply_mouth_area.")
|
||
return frame # ROI is invalid
|
||
|
||
roi = frame[min_y:max_y, min_x:max_x]
|
||
|
||
# Ensure ROI extraction was successful
|
||
if roi.size == 0:
|
||
# print("Warning: Extracted ROI is empty in apply_mouth_area.")
|
||
return frame
|
||
|
||
# Resize mouth cutout from original frame to fit the ROI size
|
||
resized_mouth_cutout = None
|
||
if roi.shape[:2] != mouth_cutout.shape[:2]:
|
||
# Check if mouth_cutout has valid dimensions before resizing
|
||
if mouth_cutout.shape[0] > 0 and mouth_cutout.shape[1] > 0:
|
||
resized_mouth_cutout = gpu_resize(mouth_cutout, (box_width, box_height), interpolation=cv2.INTER_LINEAR)
|
||
else:
|
||
# print("Warning: mouth_cutout has invalid dimensions, cannot resize.")
|
||
return frame # Cannot proceed without valid cutout
|
||
else:
|
||
resized_mouth_cutout = mouth_cutout
|
||
|
||
# If resize failed or original was invalid
|
||
if resized_mouth_cutout is None or resized_mouth_cutout.size == 0:
|
||
# print("Warning: Mouth cutout is invalid after resize attempt.")
|
||
return frame
|
||
|
||
# --- Mask Creation ---
|
||
# Create a mask based on the mouth_polygon, relative to the ROI
|
||
polygon_mask_roi = np.zeros(roi.shape[:2], dtype=np.uint8)
|
||
adjusted_polygon = mouth_polygon - [min_x, min_y]
|
||
cv2.fillPoly(polygon_mask_roi, [adjusted_polygon.astype(np.int32)], 255)
|
||
|
||
# Feather the edges with Gaussian blur for smooth blending
|
||
feather_amount = max(1, min(30, min(box_width, box_height) // 8))
|
||
kernel_size = 2 * feather_amount + 1
|
||
feathered_mask = cv2.GaussianBlur(polygon_mask_roi.astype(np.float32), (kernel_size, kernel_size), 0)
|
||
|
||
# Normalize to [0.0, 1.0]
|
||
max_val = feathered_mask.max()
|
||
if max_val > 1e-6:
|
||
feathered_mask = feathered_mask / max_val
|
||
else:
|
||
feathered_mask.fill(0.0)
|
||
|
||
# --- Blending: paste original mouth onto swapped face ---
|
||
if len(frame.shape) == 3 and frame.shape[2] == 3:
|
||
mask_3ch = feathered_mask[:, :, np.newaxis].astype(np.float32)
|
||
inv_mask = 1.0 - mask_3ch
|
||
|
||
# Blend: (original_mouth * mask) + (swapped_face * (1 - mask))
|
||
blended_roi = (resized_mouth_cutout.astype(np.float32) * mask_3ch +
|
||
roi.astype(np.float32) * inv_mask)
|
||
|
||
frame[min_y:max_y, min_x:max_x] = np.clip(blended_roi, 0, 255).astype(np.uint8)
|
||
|
||
except Exception as e:
|
||
print(f"Error applying mouth area: {e}") # Optional debug
|
||
# import traceback
|
||
# traceback.print_exc()
|
||
pass # Don't crash, just return the frame as is
|
||
|
||
return frame
|
||
|
||
|
||
def create_face_mask(face: Face, frame: Frame) -> np.ndarray:
|
||
"""Creates a feathered mask covering the whole face area based on landmarks."""
|
||
if frame is None or not hasattr(frame, "shape") or len(frame.shape) < 2:
|
||
return np.zeros((0, 0), dtype=np.uint8)
|
||
|
||
mask = np.zeros(frame.shape[:2], dtype=np.uint8) # Start with uint8
|
||
|
||
# Validate inputs
|
||
if face is None or not hasattr(face, 'landmark_2d_106'):
|
||
# print("Warning: Invalid face or frame for create_face_mask.")
|
||
return mask # Return empty mask
|
||
|
||
landmarks = face.landmark_2d_106
|
||
if landmarks is None or not isinstance(landmarks, np.ndarray) or landmarks.shape[0] < 106:
|
||
# print("Warning: Invalid or insufficient landmarks for face mask.")
|
||
return mask # Return empty mask
|
||
|
||
try: # Wrap main logic in try-except
|
||
# Filter out non-finite landmark values
|
||
if not np.all(np.isfinite(landmarks)):
|
||
# print("Warning: Non-finite values detected in landmarks for face mask.")
|
||
return mask
|
||
|
||
landmarks_int = landmarks.astype(np.int32)
|
||
|
||
# Use standard face outline landmarks (0-32)
|
||
# Use standard face outline (0-32)
|
||
face_outline = landmarks_int[0:33]
|
||
|
||
# Estimate forehead points to ensure mask covers the whole face (including forehead)
|
||
# This is critical for Poisson blending to work correctly on the forehead
|
||
eyebrows = landmarks_int[33:43]
|
||
if eyebrows.shape[0] > 0:
|
||
chin = landmarks_int[16]
|
||
eyebrow_center = np.mean(eyebrows, axis=0)
|
||
|
||
# Vector from chin to eyebrows (upwards)
|
||
up_vector = eyebrow_center - chin
|
||
norm = np.linalg.norm(up_vector)
|
||
if norm > 0:
|
||
up_vector /= norm
|
||
|
||
# Extend upwards by 1.0 of the chin-to-eyebrow distance (aggressive coverage)
|
||
# This ensures the mask covers the entire forehead for proper blending
|
||
forehead_offset = up_vector * (norm * 1.0)
|
||
|
||
# Shift eyebrows up to create forehead points
|
||
forehead_points = eyebrows + forehead_offset
|
||
|
||
# Expand the top points slightly outwards to cover forehead corners
|
||
# Calculate the center of the new top points
|
||
top_center = np.mean(forehead_points, axis=0)
|
||
|
||
# Expand outwards by 20%
|
||
forehead_points = (forehead_points - top_center) * 1.2 + top_center
|
||
|
||
# Combine outline and forehead points
|
||
face_outline = np.concatenate((face_outline, forehead_points.astype(np.int32)), axis=0)
|
||
|
||
# Calculate convex hull of these points
|
||
# Use try-except as convexHull can fail on degenerate input
|
||
try:
|
||
hull = cv2.convexHull(face_outline.astype(np.float32)) # Use float for accuracy
|
||
if hull is None or len(hull) < 3:
|
||
# print("Warning: Convex hull calculation failed or returned too few points.")
|
||
# Fallback: use bounding box of landmarks? Or just return empty mask?
|
||
return mask
|
||
|
||
# Draw the filled convex hull on the mask
|
||
cv2.fillConvexPoly(mask, hull.astype(np.int32), 255)
|
||
except Exception as hull_e:
|
||
print(f"Error creating convex hull for face mask: {hull_e}")
|
||
return mask # Return empty mask on error
|
||
|
||
|
||
# Apply Gaussian blur to feather the mask edges (GPU-accelerated when available)
|
||
blur_k_size = getattr(modules.globals, "face_mask_blur", 31) # Default 31
|
||
blur_k_size = max(1, blur_k_size // 2 * 2 + 1) # Ensure odd and positive
|
||
mask = gpu_gaussian_blur(mask, (blur_k_size, blur_k_size), 0)
|
||
|
||
# --- Optional: Return float mask for apply_mouth_area ---
|
||
# mask = mask.astype(float) / 255.0
|
||
# ---
|
||
|
||
except IndexError:
|
||
# print("Warning: Landmark index out of bounds for face mask.") # Optional debug
|
||
pass
|
||
except Exception as e:
|
||
print(f"Error creating face mask: {e}") # Print unexpected errors
|
||
# import traceback
|
||
# traceback.print_exc()
|
||
pass
|
||
|
||
return mask # Return uint8 mask
|
||
|
||
|
||
def apply_color_transfer(source, target):
|
||
"""
|
||
Apply color transfer using LAB color space. Handles potential division by zero and ensures output is uint8.
|
||
"""
|
||
# Input validation
|
||
if source is None or target is None or source.size == 0 or target.size == 0:
|
||
# print("Warning: Invalid input to apply_color_transfer.")
|
||
return source # Return original source if invalid input
|
||
|
||
# Ensure images are 3-channel BGR uint8
|
||
if len(source.shape) != 3 or source.shape[2] != 3 or source.dtype != np.uint8:
|
||
# print("Warning: Source image for color transfer is not uint8 BGR.")
|
||
# Attempt conversion if possible, otherwise return original
|
||
try:
|
||
if len(source.shape) == 2: # Grayscale
|
||
source = cv2.cvtColor(source, cv2.COLOR_GRAY2BGR)
|
||
source = np.clip(source, 0, 255).astype(np.uint8)
|
||
if len(source.shape)!= 3 or source.shape[2]!= 3: raise ValueError("Conversion failed")
|
||
except Exception:
|
||
return source
|
||
if len(target.shape) != 3 or target.shape[2] != 3 or target.dtype != np.uint8:
|
||
# print("Warning: Target image for color transfer is not uint8 BGR.")
|
||
try:
|
||
if len(target.shape) == 2: # Grayscale
|
||
target = cv2.cvtColor(target, cv2.COLOR_GRAY2BGR)
|
||
target = np.clip(target, 0, 255).astype(np.uint8)
|
||
if len(target.shape)!= 3 or target.shape[2]!= 3: raise ValueError("Conversion failed")
|
||
except Exception:
|
||
return source # Return original source if target invalid
|
||
|
||
result_bgr = source # Default to original source in case of errors
|
||
|
||
try:
|
||
# Convert to float32 [0, 1] range for LAB conversion
|
||
source_float = source.astype(np.float32) / 255.0
|
||
target_float = target.astype(np.float32) / 255.0
|
||
|
||
source_lab = cv2.cvtColor(source_float, cv2.COLOR_BGR2LAB)
|
||
target_lab = cv2.cvtColor(target_float, cv2.COLOR_BGR2LAB)
|
||
|
||
# Compute statistics
|
||
source_mean, source_std = cv2.meanStdDev(source_lab)
|
||
target_mean, target_std = cv2.meanStdDev(target_lab)
|
||
|
||
# Reshape for broadcasting
|
||
source_mean = source_mean.reshape((1, 1, 3))
|
||
source_std = source_std.reshape((1, 1, 3))
|
||
target_mean = target_mean.reshape((1, 1, 3))
|
||
target_std = target_std.reshape((1, 1, 3))
|
||
|
||
# Avoid division by zero or very small std deviations (add epsilon)
|
||
epsilon = 1e-6
|
||
source_std = np.maximum(source_std, epsilon)
|
||
# target_std = np.maximum(target_std, epsilon) # Target std can be small
|
||
|
||
# Perform color transfer in LAB space
|
||
result_lab = (source_lab - source_mean) * (target_std / source_std) + target_mean
|
||
|
||
# --- No explicit clipping needed in LAB space typically ---
|
||
# Clipping is handled implicitly by the conversion back to BGR and then to uint8
|
||
|
||
# Convert back to BGR float [0, 1]
|
||
result_bgr_float = cv2.cvtColor(result_lab, cv2.COLOR_LAB2BGR)
|
||
|
||
# Clip final BGR values to [0, 1] range before scaling to [0, 255]
|
||
result_bgr_float = np.clip(result_bgr_float, 0.0, 1.0)
|
||
|
||
# Convert back to uint8 [0, 255]
|
||
result_bgr = (result_bgr_float * 255.0).astype("uint8")
|
||
|
||
except cv2.error as e:
|
||
# print(f"OpenCV error during color transfer: {e}. Returning original source.") # Optional debug
|
||
return source # Return original source if conversion fails
|
||
except Exception as e:
|
||
# print(f"Unexpected color transfer error: {e}. Returning original source.") # Optional debug
|
||
# import traceback
|
||
# traceback.print_exc()
|
||
return source
|
||
|
||
return result_bgr
|