From e57116de681c45e14db91bbeb5ee495f0850d03f Mon Sep 17 00:00:00 2001 From: Lauri Gates Date: Sun, 22 Feb 2026 19:39:12 +0200 Subject: [PATCH] feat: add GPEN-BFR 256 and 512 ONNX face enhancers Add two new face enhancement processors using GPEN-BFR ONNX models at 256x256 and 512x512 resolutions. Models auto-download on first use from GitHub releases. Integrates into existing frame processor pipeline alongside GFPGAN enhancer with UI toggle switches. - modules/paths.py: Shared path constants module - modules/processors/frame/_onnx_enhancer.py: ONNX enhancement utilities - modules/processors/frame/face_enhancer_gpen256.py: GPEN-BFR 256 processor - modules/processors/frame/face_enhancer_gpen512.py: GPEN-BFR 512 processor - modules/core.py: Add GPEN choices to --frame-processor CLI arg - modules/globals.py: Add GPEN entries to fp_ui toggle dict - modules/ui.py: Add GPEN toggle switches and processing integration Closes #1663 Co-Authored-By: Claude Opus 4.6 --- modules/core.py | 10 +- modules/globals.py | 2 +- modules/paths.py | 6 + modules/processors/frame/_onnx_enhancer.py | 145 ++++++++++++++++++ .../processors/frame/face_enhancer_gpen256.py | 125 +++++++++++++++ .../processors/frame/face_enhancer_gpen512.py | 125 +++++++++++++++ modules/ui.py | 40 ++++- 7 files changed, 444 insertions(+), 9 deletions(-) create mode 100644 modules/paths.py create mode 100644 modules/processors/frame/_onnx_enhancer.py create mode 100644 modules/processors/frame/face_enhancer_gpen256.py create mode 100644 modules/processors/frame/face_enhancer_gpen512.py diff --git a/modules/core.py b/modules/core.py index a85007a..663d742 100644 --- a/modules/core.py +++ b/modules/core.py @@ -39,7 +39,7 @@ def parse_args() -> None: program.add_argument('-s', '--source', help='select an source image', dest='source_path') program.add_argument('-t', '--target', help='select an target image or video', dest='target_path') program.add_argument('-o', '--output', help='select output file or directory', dest='output_path') - program.add_argument('--frame-processor', help='pipeline of frame processors', dest='frame_processor', default=['face_swapper'], choices=['face_swapper', 'face_enhancer'], nargs='+') + program.add_argument('--frame-processor', help='pipeline of frame processors', dest='frame_processor', default=['face_swapper'], choices=['face_swapper', 'face_enhancer', 'face_enhancer_gpen256', 'face_enhancer_gpen512'], nargs='+') program.add_argument('--keep-fps', help='keep original fps', dest='keep_fps', action='store_true', default=False) program.add_argument('--keep-audio', help='keep original audio', dest='keep_audio', action='store_true', default=True) program.add_argument('--keep-frames', help='keep temporary frames', dest='keep_frames', action='store_true', default=False) @@ -86,11 +86,9 @@ def parse_args() -> None: modules.globals.execution_threads = args.execution_threads modules.globals.lang = args.lang - #for ENHANCER tumbler: - if 'face_enhancer' in args.frame_processor: - modules.globals.fp_ui['face_enhancer'] = True - else: - modules.globals.fp_ui['face_enhancer'] = False + #for ENHANCER tumblers: + for enhancer_key in ('face_enhancer', 'face_enhancer_gpen256', 'face_enhancer_gpen512'): + modules.globals.fp_ui[enhancer_key] = enhancer_key in args.frame_processor # translate deprecated args if args.source_path_deprecated: diff --git a/modules/globals.py b/modules/globals.py index 1bccdc1..65e3fdd 100644 --- a/modules/globals.py +++ b/modules/globals.py @@ -50,7 +50,7 @@ headless: bool | None = None # Run without UI? log_level: str = "error" # Logging level (e.g., 'debug', 'info', 'warning', 'error') # Face Processor UI Toggles (Example) -fp_ui: Dict[str, bool] = {"face_enhancer": False} +fp_ui: Dict[str, bool] = {"face_enhancer": False, "face_enhancer_gpen256": False, "face_enhancer_gpen512": False} # Face Swapper Specific Options face_swapper_enabled: bool = True # General toggle for the swapper processor diff --git a/modules/paths.py b/modules/paths.py new file mode 100644 index 0000000..106605d --- /dev/null +++ b/modules/paths.py @@ -0,0 +1,6 @@ +"""Shared path constants for the Deep-Live-Cam project.""" + +import os + +ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +MODELS_DIR = os.path.join(ROOT_DIR, "models") diff --git a/modules/processors/frame/_onnx_enhancer.py b/modules/processors/frame/_onnx_enhancer.py new file mode 100644 index 0000000..7b9b610 --- /dev/null +++ b/modules/processors/frame/_onnx_enhancer.py @@ -0,0 +1,145 @@ +"""Shared ONNX-based face enhancement utilities for GPEN-BFR models. + +Provides session creation, pre/post processing, and the core +enhance-face-via-ONNX pipeline. +""" + +import os +import platform +import threading +from typing import Any + +import cv2 +import numpy as np +import onnxruntime + +import modules.globals + +IS_APPLE_SILICON = platform.system() == "Darwin" and platform.machine() == "arm64" + +# Limit concurrent ONNX calls to avoid VRAM exhaustion on multi-face frames +THREAD_SEMAPHORE = threading.Semaphore(min(max(1, (os.cpu_count() or 1)), 8)) + + +def create_onnx_session(model_path: str) -> onnxruntime.InferenceSession: + """Create an ONNX Runtime session using the configured execution providers.""" + providers = modules.globals.execution_providers + session = onnxruntime.InferenceSession(model_path, providers=providers) + return session + + +def warmup_session(session: onnxruntime.InferenceSession) -> None: + """Run a dummy inference pass to trigger JIT / compile caching.""" + try: + input_feed = { + inp.name: np.zeros( + [d if isinstance(d, int) and d > 0 else 1 for d in inp.shape], + dtype=np.float32, + ) + for inp in session.get_inputs() + } + session.run(None, input_feed) + except Exception as e: + print(f"ONNX enhancer warmup skipped (non-fatal): {e}") + + +def preprocess_face(face_img: np.ndarray, input_size: int) -> np.ndarray: + """Resize, normalize, and convert a BGR face crop to ONNX input blob. + + GPEN-BFR expects [1, 3, H, W] float32 in RGB, normalized to [-1, 1]. + """ + resized = cv2.resize(face_img, (input_size, input_size), interpolation=cv2.INTER_LINEAR) + rgb = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB) + blob = rgb.astype(np.float32) / 255.0 * 2.0 - 1.0 + blob = np.transpose(blob, (2, 0, 1))[np.newaxis, ...] + return blob + + +def postprocess_face(output: np.ndarray) -> np.ndarray: + """Convert ONNX output [1, 3, H, W] float32 back to BGR uint8 image.""" + img = output[0].transpose(1, 2, 0) + img = ((img + 1.0) / 2.0 * 255.0) + img = np.clip(img, 0, 255).astype(np.uint8) + img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) + return img + + +def _get_face_affine(face: Any, input_size: int): + """Compute affine transform to align a face to GPEN input space. + + Returns (M, inv_M) — forward and inverse affine matrices. + """ + template = np.array([ + [0.31556875, 0.4615741], + [0.68262291, 0.4615741], + [0.50009375, 0.6405054], + [0.34947187, 0.8246919], + [0.65343645, 0.8246919], + ], dtype=np.float32) * input_size + + landmarks = None + if hasattr(face, "kps") and face.kps is not None: + landmarks = face.kps.astype(np.float32) + elif hasattr(face, "landmark_2d_106") and face.landmark_2d_106 is not None: + lm106 = face.landmark_2d_106 + landmarks = np.array([ + lm106[38], # left eye + lm106[88], # right eye + lm106[86], # nose tip + lm106[52], # left mouth + lm106[61], # right mouth + ], dtype=np.float32) + + if landmarks is None or len(landmarks) < 5: + return None, None + + M = cv2.estimateAffinePartial2D(landmarks, template, method=cv2.LMEDS)[0] + if M is None: + return None, None + inv_M = cv2.invertAffineTransform(M) + return M, inv_M + + +def enhance_face_onnx( + frame: np.ndarray, + face: Any, + session: onnxruntime.InferenceSession, + input_size: int, +) -> np.ndarray: + """Enhance a single face in the frame using an ONNX face restoration model.""" + M, inv_M = _get_face_affine(face, input_size) + if M is None: + return frame + + face_crop = cv2.warpAffine( + frame, M, (input_size, input_size), + flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REPLICATE, + ) + + blob = preprocess_face(face_crop, input_size) + with THREAD_SEMAPHORE: + output = session.run(None, {session.get_inputs()[0].name: blob})[0] + enhanced = postprocess_face(output) + + # Create mask for blending (feathered edges) + mask = np.ones((input_size, input_size), dtype=np.float32) + border = max(1, input_size // 16) + mask[:border, :] = np.linspace(0, 1, border)[:, np.newaxis] + mask[-border:, :] = np.linspace(1, 0, border)[:, np.newaxis] + mask[:, :border] = np.minimum(mask[:, :border], np.linspace(0, 1, border)[np.newaxis, :]) + mask[:, -border:] = np.minimum(mask[:, -border:], np.linspace(1, 0, border)[np.newaxis, :]) + + h, w = frame.shape[:2] + warped_enhanced = cv2.warpAffine( + enhanced, inv_M, (w, h), + flags=cv2.INTER_LINEAR, borderValue=(0, 0, 0), + ) + warped_mask = cv2.warpAffine( + mask, inv_M, (w, h), + flags=cv2.INTER_LINEAR, borderValue=0, + ) + + mask_3ch = warped_mask[:, :, np.newaxis] + result = (warped_enhanced.astype(np.float32) * mask_3ch + + frame.astype(np.float32) * (1.0 - mask_3ch)) + return np.clip(result, 0, 255).astype(np.uint8) diff --git a/modules/processors/frame/face_enhancer_gpen256.py b/modules/processors/frame/face_enhancer_gpen256.py new file mode 100644 index 0000000..57c9536 --- /dev/null +++ b/modules/processors/frame/face_enhancer_gpen256.py @@ -0,0 +1,125 @@ +"""GPEN-BFR-256 face enhancer — ONNX-based face restoration at 256x256.""" + +from typing import Any, List +import os +import threading + +import cv2 +import numpy as np + +import modules.globals +import modules.processors.frame.core +from modules.core import update_status +from modules.face_analyser import get_one_face +from modules.typing import Frame, Face +from modules.utilities import ( + is_image, + is_video, +) +from modules.processors.frame._onnx_enhancer import ( + create_onnx_session, + warmup_session, + enhance_face_onnx, +) + +NAME = "DLC.FACE-ENHANCER-GPEN256" +INPUT_SIZE = 256 +MODEL_URL = "https://github.com/harisreedhar/Face-Upscalers-ONNX/releases/download/GPEN-BFR/GPEN-BFR-256.onnx" +MODEL_FILE = "GPEN-BFR-256.onnx" + +ENHANCER = None +THREAD_LOCK = threading.Lock() + +abs_dir = os.path.dirname(os.path.abspath(__file__)) +models_dir = os.path.join( + os.path.dirname(os.path.dirname(os.path.dirname(abs_dir))), "models" +) + + +def pre_check() -> bool: + model_path = os.path.join(models_dir, MODEL_FILE) + if not os.path.exists(model_path): + update_status(f"Downloading {MODEL_FILE}...", NAME) + from modules.utilities import conditional_download + conditional_download(models_dir, [MODEL_URL]) + return True + + +def pre_start() -> bool: + if not is_image(modules.globals.target_path) and not is_video(modules.globals.target_path): + update_status("Select an image or video for target path.", NAME) + return False + return True + + +def get_enhancer() -> Any: + global ENHANCER + with THREAD_LOCK: + if ENHANCER is None: + model_path = os.path.join(models_dir, MODEL_FILE) + if not os.path.exists(model_path): + from modules.utilities import conditional_download + conditional_download(models_dir, [MODEL_URL]) + if not os.path.exists(model_path): + raise FileNotFoundError(f"Model file not found: {model_path}") + print(f"{NAME}: Loading ONNX model from {model_path}") + ENHANCER = create_onnx_session(model_path) + warmup_session(ENHANCER) + print(f"{NAME}: Model loaded successfully.") + return ENHANCER + + +def enhance_face(temp_frame: Frame, face: Face) -> Frame: + try: + session = get_enhancer() + except Exception as e: + print(f"{NAME}: {e}") + return temp_frame + try: + return enhance_face_onnx(temp_frame, face, session, INPUT_SIZE) + except Exception as e: + print(f"{NAME}: Error during face enhancement: {e}") + return temp_frame + + +def process_frame(source_face: Face | None, temp_frame: Frame) -> Frame: + target_face = get_one_face(temp_frame) + if target_face is None: + return temp_frame + return enhance_face(temp_frame, target_face) + + +def process_frame_v2(temp_frame: Frame) -> Frame: + target_face = get_one_face(temp_frame) + if target_face: + temp_frame = enhance_face(temp_frame, target_face) + return temp_frame + + +def process_frames( + source_path: str | None, temp_frame_paths: List[str], progress: Any = None +) -> None: + for temp_frame_path in temp_frame_paths: + temp_frame = cv2.imread(temp_frame_path) + if temp_frame is None: + if progress: + progress.update(1) + continue + result = process_frame(None, temp_frame) + cv2.imwrite(temp_frame_path, result) + if progress: + progress.update(1) + + +def process_image(source_path: str | None, target_path: str, output_path: str) -> None: + target_frame = cv2.imread(target_path) + if target_frame is None: + print(f"{NAME}: Error: Failed to read target image {target_path}") + return + result_frame = process_frame(None, target_frame) + cv2.imwrite(output_path, result_frame) + print(f"{NAME}: Enhanced image saved to {output_path}") + + +def process_video(source_path: str | None, temp_frame_paths: List[str]) -> None: + modules.processors.frame.core.process_video(source_path, temp_frame_paths, process_frames) diff --git a/modules/processors/frame/face_enhancer_gpen512.py b/modules/processors/frame/face_enhancer_gpen512.py new file mode 100644 index 0000000..520181c --- /dev/null +++ b/modules/processors/frame/face_enhancer_gpen512.py @@ -0,0 +1,125 @@ +"""GPEN-BFR-512 face enhancer — ONNX-based face restoration at 512x512.""" + +from typing import Any, List +import os +import threading + +import cv2 +import numpy as np + +import modules.globals +import modules.processors.frame.core +from modules.core import update_status +from modules.face_analyser import get_one_face +from modules.typing import Frame, Face +from modules.utilities import ( + is_image, + is_video, +) +from modules.processors.frame._onnx_enhancer import ( + create_onnx_session, + warmup_session, + enhance_face_onnx, +) + +NAME = "DLC.FACE-ENHANCER-GPEN512" +INPUT_SIZE = 512 +MODEL_URL = "https://github.com/harisreedhar/Face-Upscalers-ONNX/releases/download/GPEN-BFR/GPEN-BFR-512.onnx" +MODEL_FILE = "GPEN-BFR-512.onnx" + +ENHANCER = None +THREAD_LOCK = threading.Lock() + +abs_dir = os.path.dirname(os.path.abspath(__file__)) +models_dir = os.path.join( + os.path.dirname(os.path.dirname(os.path.dirname(abs_dir))), "models" +) + + +def pre_check() -> bool: + model_path = os.path.join(models_dir, MODEL_FILE) + if not os.path.exists(model_path): + update_status(f"Downloading {MODEL_FILE}...", NAME) + from modules.utilities import conditional_download + conditional_download(models_dir, [MODEL_URL]) + return True + + +def pre_start() -> bool: + if not is_image(modules.globals.target_path) and not is_video(modules.globals.target_path): + update_status("Select an image or video for target path.", NAME) + return False + return True + + +def get_enhancer() -> Any: + global ENHANCER + with THREAD_LOCK: + if ENHANCER is None: + model_path = os.path.join(models_dir, MODEL_FILE) + if not os.path.exists(model_path): + from modules.utilities import conditional_download + conditional_download(models_dir, [MODEL_URL]) + if not os.path.exists(model_path): + raise FileNotFoundError(f"Model file not found: {model_path}") + print(f"{NAME}: Loading ONNX model from {model_path}") + ENHANCER = create_onnx_session(model_path) + warmup_session(ENHANCER) + print(f"{NAME}: Model loaded successfully.") + return ENHANCER + + +def enhance_face(temp_frame: Frame, face: Face) -> Frame: + try: + session = get_enhancer() + except Exception as e: + print(f"{NAME}: {e}") + return temp_frame + try: + return enhance_face_onnx(temp_frame, face, session, INPUT_SIZE) + except Exception as e: + print(f"{NAME}: Error during face enhancement: {e}") + return temp_frame + + +def process_frame(source_face: Face | None, temp_frame: Frame) -> Frame: + target_face = get_one_face(temp_frame) + if target_face is None: + return temp_frame + return enhance_face(temp_frame, target_face) + + +def process_frame_v2(temp_frame: Frame) -> Frame: + target_face = get_one_face(temp_frame) + if target_face: + temp_frame = enhance_face(temp_frame, target_face) + return temp_frame + + +def process_frames( + source_path: str | None, temp_frame_paths: List[str], progress: Any = None +) -> None: + for temp_frame_path in temp_frame_paths: + temp_frame = cv2.imread(temp_frame_path) + if temp_frame is None: + if progress: + progress.update(1) + continue + result = process_frame(None, temp_frame) + cv2.imwrite(temp_frame_path, result) + if progress: + progress.update(1) + + +def process_image(source_path: str | None, target_path: str, output_path: str) -> None: + target_frame = cv2.imread(target_path) + if target_frame is None: + print(f"{NAME}: Error: Failed to read target image {target_path}") + return + result_frame = process_frame(None, target_frame) + cv2.imwrite(output_path, result_frame) + print(f"{NAME}: Enhanced image saved to {output_path}") + + +def process_video(source_path: str | None, temp_frame_paths: List[str]) -> None: + modules.processors.frame.core.process_video(source_path, temp_frame_paths, process_frames) diff --git a/modules/ui.py b/modules/ui.py index e776608..de99731 100644 --- a/modules/ui.py +++ b/modules/ui.py @@ -245,6 +245,32 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C ) enhancer_switch.place(relx=0.1, rely=0.6) + gpen256_value = ctk.BooleanVar(value=modules.globals.fp_ui.get("face_enhancer_gpen256", False)) + gpen256_switch = ctk.CTkSwitch( + root, + text=_("GPEN Enhancer 256"), + variable=gpen256_value, + cursor="hand2", + command=lambda: ( + update_tumbler("face_enhancer_gpen256", gpen256_value.get()), + save_switch_states(), + ), + ) + gpen256_switch.place(relx=0.1, rely=0.65) + + gpen512_value = ctk.BooleanVar(value=modules.globals.fp_ui.get("face_enhancer_gpen512", False)) + gpen512_switch = ctk.CTkSwitch( + root, + text=_("GPEN Enhancer 512"), + variable=gpen512_value, + cursor="hand2", + command=lambda: ( + update_tumbler("face_enhancer_gpen512", gpen512_value.get()), + save_switch_states(), + ), + ) + gpen512_switch.place(relx=0.1, rely=0.7) + keep_audio_value = ctk.BooleanVar(value=modules.globals.keep_audio) keep_audio_switch = ctk.CTkSwitch( root, @@ -300,7 +326,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C close_mapper_window() if not map_faces.get() else None ), ) - map_faces_switch.place(relx=0.1, rely=0.65) + map_faces_switch.place(relx=0.1, rely=0.75) poisson_blend_value = ctk.BooleanVar(value=modules.globals.poisson_blend) poisson_blend_switch = ctk.CTkSwitch( @@ -313,7 +339,7 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C save_switch_states(), ), ) - poisson_blend_switch.place(relx=0.1, rely=0.7) + poisson_blend_switch.place(relx=0.1, rely=0.8) show_fps_value = ctk.BooleanVar(value=modules.globals.show_fps) show_fps_switch = ctk.CTkSwitch( @@ -1069,6 +1095,12 @@ def _processing_thread_func(capture_queue, processed_queue, stop_event, if frame_processor.NAME == "DLC.FACE-ENHANCER": if modules.globals.fp_ui["face_enhancer"]: temp_frame = frame_processor.process_frame(None, temp_frame) + elif frame_processor.NAME == "DLC.FACE-ENHANCER-GPEN256": + if modules.globals.fp_ui.get("face_enhancer_gpen256", False): + temp_frame = frame_processor.process_frame(None, temp_frame) + elif frame_processor.NAME == "DLC.FACE-ENHANCER-GPEN512": + if modules.globals.fp_ui.get("face_enhancer_gpen512", False): + temp_frame = frame_processor.process_frame(None, temp_frame) elif frame_processor.NAME == "DLC.FACE-SWAPPER": # Use cached face positions from detection thread swapped_bboxes = [] @@ -1093,6 +1125,10 @@ def _processing_thread_func(capture_queue, processed_queue, stop_event, if frame_processor.NAME == "DLC.FACE-ENHANCER": if modules.globals.fp_ui["face_enhancer"]: temp_frame = frame_processor.process_frame_v2(temp_frame) + elif frame_processor.NAME in ("DLC.FACE-ENHANCER-GPEN256", "DLC.FACE-ENHANCER-GPEN512"): + fp_key = frame_processor.NAME.split(".")[-1].lower().replace("-", "_") + if modules.globals.fp_ui.get(fp_key, False): + temp_frame = frame_processor.process_frame_v2(temp_frame) else: temp_frame = frame_processor.process_frame_v2(temp_frame)