diff --git a/benchmark_pipeline.py b/benchmark_pipeline.py new file mode 100644 index 0000000..b8f6235 --- /dev/null +++ b/benchmark_pipeline.py @@ -0,0 +1,178 @@ +"""Standalone pipeline benchmark — no UI required. + +Captures 200 frames from the webcam and runs the full face swap pipeline, +printing per-stage timing and effective FPS. +""" +import os, sys, time, cv2, numpy as np, queue, threading + +# PATH fix for cuDNN (Windows only) +if sys.platform == "win32": + _sp = os.path.join(sys.prefix, "Lib", "site-packages") + _torch_lib = os.path.join(_sp, "torch", "lib") + if os.path.isdir(_torch_lib): + os.environ["PATH"] = _torch_lib + os.pathsep + os.environ["PATH"] + +import insightface +from insightface.app import FaceAnalysis +from insightface.utils import face_align +from modules.processors.frame.face_swapper import _fast_paste_back +from modules import platform_info + +platform_info.print_banner() + +# Pick providers based on what's actually available on this machine. +if platform_info.HAS_CUDA_PROVIDER: + _providers = ["CUDAExecutionProvider", "CPUExecutionProvider"] +elif platform_info.HAS_COREML_PROVIDER: + _providers = ["CoreMLExecutionProvider", "CPUExecutionProvider"] +else: + _providers = ["CPUExecutionProvider"] + +# --- Init models (same as the app) --- +print(f"Loading models with providers={_providers}...") +fa = FaceAnalysis( + name="buffalo_l", + providers=_providers, + allowed_modules=["detection", "recognition", "landmark_2d_106"], +) +fa.prepare(ctx_id=0, det_size=(640, 640)) +swap_model = insightface.model_zoo.get_model( + "models/inswapper_128.onnx", + providers=_providers, +) +face_size = swap_model.input_size[0] +aimg_dummy = np.empty((face_size, face_size, 3), dtype=np.uint8) + +# --- Camera setup --- +# Windows: DirectShow explicit for MJPEG 1080p60 support. +# macOS/Linux: default backend (AVFoundation / V4L2). +print("Opening camera at 1080p60 MJPEG...") +if sys.platform == "win32": + cap = cv2.VideoCapture(0, cv2.CAP_DSHOW) +else: + cap = cv2.VideoCapture(0) +cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"MJPG")) +cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920) +cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080) +cap.set(cv2.CAP_PROP_FPS, 60) +time.sleep(0.5) + +# Warmup + get source face +for _ in range(15): + cap.read() +ret, src_frame = cap.read() +faces = fa.get(src_frame) +if not faces: + print("ERROR: No face detected in warmup frame") + cap.release() + sys.exit(1) +source_face = faces[0] +print(f"Source face acquired. Frame: {src_frame.shape}") + +# --- Capture thread (same as app) --- +capture_queue = queue.Queue(maxsize=2) +stop_event = threading.Event() + +def capture_thread(): + while not stop_event.is_set(): + ret, frame = cap.read() + if not ret: + break + try: + capture_queue.put_nowait(frame) + except queue.Full: + try: capture_queue.get_nowait() + except queue.Empty: pass + try: capture_queue.put_nowait(frame) + except queue.Full: pass + +cap_t = threading.Thread(target=capture_thread, daemon=True) +cap_t.start() + +# --- Warmup processing --- +print("Warming up pipeline...") +for _ in range(20): + try: + frame = capture_queue.get(timeout=0.1) + except queue.Empty: + continue + f = frame.copy() + det_faces = fa.get(f) + if det_faces: + tgt = min(det_faces, key=lambda x: x.bbox[0]) + bgr_fake, M = swap_model.get(f, tgt, source_face, paste_back=False) + _fast_paste_back(f, bgr_fake, aimg_dummy, M) + +# --- Benchmark --- +N = 200 +print(f"\nBenchmarking {N} frames...") + +t_queue, t_det, t_onnx, t_paste, t_copy, t_cvt, t_total = [], [], [], [], [], [], [] +det_count = 0 +cached_face = None + +for i in range(N): + tt = time.perf_counter() + + t0 = time.perf_counter() + try: + frame = capture_queue.get(timeout=0.1) + except queue.Empty: + continue + t_queue.append((time.perf_counter() - t0) * 1000) + + # Detection every 3rd frame — det-only (no landmark/recognition) + det_count += 1 + if det_count % 3 == 0: + t0 = time.perf_counter() + from insightface.app.common import Face as _Face + bboxes, kpss = fa.det_model.detect(frame, max_num=0, metric='default') + if bboxes.shape[0] > 0: + idx = int(bboxes[:, 0].argmin()) + cached_face = _Face(bbox=bboxes[idx, :4], kps=kpss[idx], det_score=bboxes[idx, 4]) + t_det.append((time.perf_counter() - t0) * 1000) + + if cached_face is not None: + # No frame.copy() — _fast_paste_back writes in-place, we own the frame + t0 = time.perf_counter() + bgr_fake, M = swap_model.get(frame, cached_face, source_face, paste_back=False) + t_onnx.append((time.perf_counter() - t0) * 1000) + + t0 = time.perf_counter() + result = _fast_paste_back(frame, bgr_fake, aimg_dummy, M) + t_paste.append((time.perf_counter() - t0) * 1000) + + # Display prep — resize then flip (no cvtColor needed) + t0 = time.perf_counter() + small = cv2.resize(result, (640, 360)) + _ = small[:, :, ::-1] # BGR→RGB zero-copy + t_cvt.append((time.perf_counter() - t0) * 1000) + + t_total.append((time.perf_counter() - tt) * 1000) + +stop_event.set() +cap.release() + +# --- Results --- +def s(name, arr): + if not arr: + return + avg = sum(arr) / len(arr) + print(f" {name:25s}: avg={avg:6.1f}ms min={min(arr):5.1f}ms max={max(arr):6.1f}ms n={len(arr)}") + +print(f"\n{'='*55}") +print(f" 1080p Pipeline Benchmark ({len(t_total)} frames)") +print(f"{'='*55}") +s("queue.get (wait for cam)", t_queue) +s("detection (fa.get)", t_det) +s("frame.copy()", t_copy) +s("ONNX swap", t_onnx) +s("_fast_paste_back", t_paste) +s("cvtColor BGR->RGB", t_cvt) +s("TOTAL per frame", t_total) + +avg_total = sum(t_total) / len(t_total) +avg_queue = sum(t_queue) / len(t_queue) +print(f"\n Effective FPS: {1000/avg_total:.1f}") +print(f" FPS (excl. cam wait): {1000/(avg_total - avg_queue):.1f}") +print(f"{'='*55}") diff --git a/modules/core.py b/modules/core.py index 247e13c..c9457b8 100644 --- a/modules/core.py +++ b/modules/core.py @@ -57,7 +57,7 @@ def parse_args() -> None: program.add_argument('--live-mirror', help='The live camera display as you see it in the front-facing camera frame', dest='live_mirror', action='store_true', default=False) program.add_argument('--live-resizable', help='The live camera frame is resizable', dest='live_resizable', action='store_true', default=False) program.add_argument('--max-memory', help='maximum amount of RAM in GB', dest='max_memory', type=int, default=suggest_max_memory()) - program.add_argument('--execution-provider', help='execution provider', dest='execution_provider', default=['cpu'], choices=suggest_execution_providers(), nargs='+') + program.add_argument('--execution-provider', help='execution provider', dest='execution_provider', default=[suggest_default_execution_provider()], choices=suggest_execution_providers(), nargs='+') program.add_argument('--execution-threads', help='number of execution threads', dest='execution_threads', type=int, default=suggest_execution_threads()) program.add_argument('-v', '--version', action='version', version=f'{modules.metadata.name} {modules.metadata.version}') @@ -131,6 +131,15 @@ def suggest_max_memory() -> int: return 16 +def suggest_default_execution_provider() -> str: + """Pick the best available provider: cuda > rocm > coreml > dml > cpu.""" + available = encode_execution_providers(onnxruntime.get_available_providers()) + for pref in ('cuda', 'rocm', 'coreml', 'dml'): + if pref in available: + return pref + return 'cpu' + + def suggest_execution_providers() -> List[str]: return encode_execution_providers(onnxruntime.get_available_providers()) diff --git a/modules/face_analyser.py b/modules/face_analyser.py index 76da825..71e1cc2 100644 --- a/modules/face_analyser.py +++ b/modules/face_analyser.py @@ -16,6 +16,8 @@ from pathlib import Path FACE_ANALYSER = None FACE_ANALYSER_LOCK = threading.Lock() +DET_SIZE = (640, 640) + def get_face_analyser() -> Any: """Get face analyser with thread-safe initialization.""" @@ -34,22 +36,116 @@ def get_face_analyser() -> Any: providers=providers, allowed_modules=['detection', 'recognition', 'landmark_2d_106'] ) - FACE_ANALYSER.prepare(ctx_id=0, det_size=(640, 640)) + FACE_ANALYSER.prepare(ctx_id=0, det_size=DET_SIZE) + _optimize_det_model(FACE_ANALYSER, providers) return FACE_ANALYSER +def _optimize_det_model(fa: Any, providers) -> None: + """Replace the detection model's ONNX session with a CoreML-optimized one. + + Folds dynamic Shape→Gather chains into constants (the input size is + fixed at det_size), eliminating CPU↔ANE partition boundaries in the + RetinaFace FPN upsampling path. 21ms → 4ms on M3 Max. + """ + from modules.onnx_optimize import optimize_for_coreml, IS_APPLE_SILICON + if not IS_APPLE_SILICON: + return + + det_model = fa.det_model + model_path = getattr(det_model, 'model_file', None) + if model_path is None or not os.path.exists(model_path): + return + + input_shape = (1, 3, DET_SIZE[1], DET_SIZE[0]) + optimized_path = optimize_for_coreml(model_path, input_shape=input_shape) + if optimized_path == model_path: + return + + import onnxruntime + session_options = onnxruntime.SessionOptions() + session_options.graph_optimization_level = ( + onnxruntime.GraphOptimizationLevel.ORT_ENABLE_ALL + ) + + # Route detection to GPU shader cores (CPUAndGPU) instead of ANE. + # This lets detection run concurrently with the swap model on the + # ANE, overlapping the two inference calls. Detection is fast + # enough on GPU (~4ms) and this frees ANE for the heavier swap. + det_providers = [] + for p in providers: + name = p[0] if isinstance(p, tuple) else p + if name == "CoreMLExecutionProvider": + det_providers.append(( + "CoreMLExecutionProvider", + {"ModelFormat": "MLProgram", "MLComputeUnits": "CPUAndGPU"}, + )) + else: + det_providers.append(p) + + det_model.session = onnxruntime.InferenceSession( + optimized_path, sess_options=session_options, providers=det_providers, + ) + + +def _needs_landmark() -> bool: + """Check whether any active feature requires 106-point landmarks. + + Landmarks are needed by face enhancers and mouth masking, but not + by the face swapper alone. + """ + if getattr(modules.globals, "mouth_mask", False): + return True + processors = getattr(modules.globals, "frame_processors", []) + return any(p in processors for p in + ("face_enhancer", "face_enhancer_gpen256", "face_enhancer_gpen512")) + + def _is_dml() -> bool: return any("DmlExecutionProvider" in p for p in modules.globals.execution_providers) +def _analyse_faces(frame: Frame) -> list: + """Run face detection, then recognition (and optionally landmark). + + Replaces InsightFace's ``FaceAnalysis.get()`` to skip the + landmark_2d_106 model when only face_swapper is active (saves ~1ms + per face and avoids an unnecessary ONNX session call). + """ + fa = get_face_analyser() + + bboxes, kpss = fa.det_model.detect(frame, max_num=0, metric="default") + if bboxes.shape[0] == 0: + return [] + + need_landmark = _needs_landmark() + rec_model = fa.models.get("recognition") + lmk_model = fa.models.get("landmark_2d_106") if need_landmark else None + + from insightface.app.common import Face + + faces = [] + for i in range(bboxes.shape[0]): + face = Face(bbox=bboxes[i, 0:4], + kps=kpss[i] if kpss is not None else None, + det_score=bboxes[i, 4]) + if rec_model is not None: + rec_model.get(frame, face) + if lmk_model is not None: + lmk_model.get(frame, face) + faces.append(face) + + return faces + + def get_one_face(frame: Frame) -> Any: if _is_dml(): with modules.globals.dml_lock: - face = get_face_analyser().get(frame) + faces = _analyse_faces(frame) else: - face = get_face_analyser().get(frame) + faces = _analyse_faces(frame) try: - return min(face, key=lambda x: x.bbox[0]) + return min(faces, key=lambda x: x.bbox[0]) except ValueError: return None @@ -58,12 +154,38 @@ def get_many_faces(frame: Frame) -> Any: try: if _is_dml(): with modules.globals.dml_lock: - return get_face_analyser().get(frame) + return _analyse_faces(frame) else: - return get_face_analyser().get(frame) + return _analyse_faces(frame) except IndexError: return None +def detect_one_face_fast(frame: Frame) -> Any: + """Detection-only — skips landmark and recognition models. + + Returns a Face with bbox, kps, det_score (enough for face swap). + ~10ms vs ~16ms for full get_one_face() at 1080p. + """ + from insightface.app.common import Face + fa = get_face_analyser() + bboxes, kpss = fa.det_model.detect(frame, max_num=0, metric='default') + if bboxes.shape[0] == 0: + return None + idx = int(bboxes[:, 0].argmin()) + return Face(bbox=bboxes[idx, :4], kps=kpss[idx], det_score=bboxes[idx, 4]) + + +def detect_many_faces_fast(frame: Frame) -> Any: + """Detection-only multi-face — skips landmark and recognition.""" + from insightface.app.common import Face + fa = get_face_analyser() + bboxes, kpss = fa.det_model.detect(frame, max_num=0, metric='default') + if bboxes.shape[0] == 0: + return None + return [Face(bbox=bboxes[i, :4], kps=kpss[i], det_score=bboxes[i, 4]) + for i in range(bboxes.shape[0])] + + def has_valid_map() -> bool: for map in modules.globals.source_target_map: if "source" in map and "target" in map: diff --git a/modules/gpu_processing.py b/modules/gpu_processing.py index b6c8728..e398dd2 100644 --- a/modules/gpu_processing.py +++ b/modules/gpu_processing.py @@ -18,6 +18,7 @@ Usage from __future__ import annotations +import os import cv2 import numpy as np from typing import Tuple, Optional @@ -27,20 +28,25 @@ from typing import Tuple, Optional # --------------------------------------------------------------------------- CUDA_AVAILABLE: bool = False -try: - # cv2.cuda.GpuMat is only present when OpenCV is compiled with CUDA - _test_mat = cv2.cuda.GpuMat() - # Verify we have the required filter / image-processing functions - _has_gauss = hasattr(cv2.cuda, "createGaussianFilter") - _has_resize = hasattr(cv2.cuda, "resize") - _has_cvt = hasattr(cv2.cuda, "cvtColor") - if _has_gauss and _has_resize and _has_cvt: - CUDA_AVAILABLE = True - print("[gpu_processing] OpenCV CUDA support detected – GPU-accelerated processing enabled.") - else: - pass # silently fall back to CPU -except Exception: - pass # silently fall back to CPU +# OpenCV CUDA per-operation acceleration is DISABLED by default. +# Each gpu_* call uploads to GPU, processes, then downloads back to CPU. +# At webcam resolution (~960x540) this upload/download overhead far exceeds +# the time saved on the actual operation, making it slower than pure CPU. +# The heavy lifting (face detection, swap, enhancement) runs on GPU via +# ONNX Runtime's CUDAExecutionProvider, which is where GPU matters. +# +# To force-enable, set OPENCV_CUDA_PROCESSING=1 in your environment. +if os.environ.get("OPENCV_CUDA_PROCESSING") == "1": + try: + _test_mat = cv2.cuda.GpuMat() + _has_gauss = hasattr(cv2.cuda, "createGaussianFilter") + _has_resize = hasattr(cv2.cuda, "resize") + _has_cvt = hasattr(cv2.cuda, "cvtColor") + if _has_gauss and _has_resize and _has_cvt: + CUDA_AVAILABLE = True + print("[gpu_processing] OpenCV CUDA processing enabled via OPENCV_CUDA_PROCESSING=1.") + except Exception: + pass # --------------------------------------------------------------------------- diff --git a/modules/onnx_optimize.py b/modules/onnx_optimize.py new file mode 100644 index 0000000..6c28f53 --- /dev/null +++ b/modules/onnx_optimize.py @@ -0,0 +1,428 @@ +"""ONNX model optimizations for CoreML execution on Apple Silicon. + +Two transformations that eliminate CPU↔ANE round-trips: + +1. **Pad(reflect) decomposition** — CoreML doesn't support ``Pad(mode=reflect)``. + Models using reflect padding (e.g. inswapper_128) get split into many CoreML + subgraphs with CPU fallbacks between each. We rewrite each ``Pad(reflect)`` + as equivalent ``Slice`` + ``Concat`` ops that CoreML handles natively. + Bit-for-bit identical output. + +2. **Shape/Gather constant folding** — Dynamic ``Shape`` → ``Gather`` chains + (e.g. for FPN upsample target sizes in RetinaFace) force ops onto CPU even + when the input dimensions are known at load time. We run ONNX shape + inference with the known input size and replace these chains with constants. + Float32-noise-level differences only (max ~6e-6). + +Both transformations are cached on disk with a ``_coreml`` suffix so the +rewrite cost is paid only once per model. +""" + +import os +import platform + +import numpy as np + +IS_APPLE_SILICON = platform.system() == "Darwin" and platform.machine() == "arm64" + + +def optimize_for_coreml(model_path: str, input_shape: tuple = None) -> str: + """Return path to a CoreML-optimized ONNX model. + + Applies all applicable optimizations and caches the result next to + the original model (with ``_coreml`` suffix). + + Args: + model_path: Path to the original ONNX model. + input_shape: Optional fixed input shape (e.g. ``(1, 3, 640, 640)``). + When provided, enables Shape/Gather constant folding. + + Returns the optimized path, or the original path if no optimizations + apply or we're not on Apple Silicon. + """ + if not IS_APPLE_SILICON: + return model_path + + base, ext = os.path.splitext(model_path) + optimized_path = f"{base}_coreml{ext}" + if os.path.exists(optimized_path): + if os.path.getmtime(optimized_path) >= os.path.getmtime(model_path): + return optimized_path + + import onnx + from onnx import numpy_helper + + model = onnx.load(model_path) + changed = False + + if _fold_shape_gather(model, input_shape): + changed = True + + # TODO(ort>=1.26): drop this pass. Fixed upstream by microsoft/onnxruntime#28073. + if _decompose_reflect_pad(model): + changed = True + + if _decompose_split(model): + changed = True + + if not changed: + return model_path + + # Preserve insightface's emap convention: the INSwapper class reads + # graph.initializer[-1] as the embedding map. If the original model + # had a (512, 512) matrix as its last initializer, keep it last. + _preserve_emap_position(model, numpy_helper) + + onnx.save(model, optimized_path) + return optimized_path + + +# --------------------------------------------------------------------------- +# Pass 1: Fold Shape → Gather chains into constants +# --------------------------------------------------------------------------- + +def _fold_shape_gather(model, input_shape) -> bool: + """Replace dynamic Shape→Gather chains with constants when input size is known. + + Only removes a Shape node when ALL of its consumers are Gather nodes + that are also being folded. This prevents breaking graphs where + a Shape output feeds into other ops as well. + """ + if input_shape is None: + return False + + from onnx import numpy_helper, shape_inference + + graph = model.graph + + # Set fixed input dimensions for shape inference + inp = graph.input[0] + dims = inp.type.tensor_type.shape.dim + for i, size in enumerate(input_shape): + if i < len(dims): + dims[i].dim_value = size + + try: + model_inferred = shape_inference.infer_shapes(model) + except Exception: + return False + + # Extract inferred shapes + value_shapes = {} + for vi in list(model_inferred.graph.value_info) + list(graph.input) + list(graph.output): + shape_dims = vi.type.tensor_type.shape.dim + shape = [] + for d in shape_dims: + if d.dim_value > 0: + shape.append(d.dim_value) + else: + shape.append(None) + value_shapes[vi.name] = shape + + inits = {init.name: numpy_helper.to_array(init) for init in graph.initializer} + + # Build consumer map: output_name → list of consuming nodes + consumers = {} + for node in graph.node: + for i in node.input: + consumers.setdefault(i, []).append(node) + + # Also check graph outputs — an output name consumed by the graph + # output list must not be removed + graph_output_names = {o.name for o in graph.output} + + # Find Shape nodes with fully-known output + shape_constants = {} + for node in graph.node: + if node.op_type == "Shape": + inp_shape = value_shapes.get(node.input[0]) + if inp_shape and all(isinstance(d, int) for d in inp_shape): + shape_constants[node.output[0]] = np.array(inp_shape, dtype=np.int64) + + if not shape_constants: + return False + + # Find Gather nodes consuming Shape constants + gather_constants = {} + for node in graph.node: + if node.op_type == "Gather" and node.input[0] in shape_constants: + idx_name = node.input[1] + if idx_name in inits: + idx = int(inits[idx_name]) + val = int(shape_constants[node.input[0]][idx]) + gather_constants[node.output[0]] = np.array(val, dtype=np.int64) + + if not gather_constants: + return False + + # Determine which Gather nodes to fold (always safe — we replace + # the output with a constant initializer) + gather_remove_ids = set() + for node in graph.node: + if node.op_type == "Gather" and node.output[0] in gather_constants: + gather_remove_ids.add(id(node)) + + # Determine which Shape nodes are safe to remove: only if ALL + # consumers of the Shape output are Gather nodes being folded, + # and the output isn't a graph output. + shape_remove_ids = set() + for node in graph.node: + if node.op_type == "Shape" and node.output[0] in shape_constants: + out_name = node.output[0] + if out_name in graph_output_names: + continue + node_consumers = consumers.get(out_name, []) + if all(id(c) in gather_remove_ids for c in node_consumers): + shape_remove_ids.add(id(node)) + + remove_ids = gather_remove_ids | shape_remove_ids + + # Add Gather output constants as initializers + existing = {i.name for i in graph.initializer} + for name, val in gather_constants.items(): + if name not in existing: + graph.initializer.append(numpy_helper.from_array(val, name=name)) + + new_nodes = [n for n in graph.node if id(n) not in remove_ids] + del graph.node[:] + graph.node.extend(new_nodes) + return True + + +# --------------------------------------------------------------------------- +# Pass 2: Decompose Pad(reflect) → Slice + Concat +# +# TEMPORARY: fixed upstream in microsoft/onnxruntime#28073 (merged 2026-04-20). +# Once the ORT floor is >= 1.26.0, MLProgram handles Pad(mode=reflect) natively +# via MIL tensor_operation.pad and this entire pass can be deleted. +# --------------------------------------------------------------------------- + +def _decompose_reflect_pad(model) -> bool: + """Rewrite Pad(reflect) as Slice+Concat sequences CoreML can handle.""" + from onnx import numpy_helper, helper + + graph = model.graph + inits = {init.name: numpy_helper.to_array(init) for init in graph.initializer} + + reflect_pads = [] + for node in graph.node: + if node.op_type == "Pad": + mode = "constant" + for attr in node.attribute: + if attr.name == "mode": + mode = attr.s.decode() + if mode == "reflect" and len(node.input) > 1 and node.input[1] in inits: + reflect_pads.append(node) + + if not reflect_pads: + return False + + existing_names = {i.name for i in graph.initializer} + + def ensure_const(name, value): + if name not in existing_names: + graph.initializer.append( + numpy_helper.from_array(np.array(value, dtype=np.int64), name=name) + ) + existing_names.add(name) + + ensure_const("_rp_ax2", [2]) + ensure_const("_rp_ax3", [3]) + + max_pad = 0 + for node in reflect_pads: + pads = inits[node.input[1]].tolist() + max_pad = max(max_pad, int(pads[2]), int(pads[3])) + + for v in range(1, max_pad + 2): + ensure_const(f"_rp_p{v}", [v]) + ensure_const(f"_rp_n{v}", [-v]) + + _counter = [0] + + def uid(): + _counter[0] += 1 + return _counter[0] + + pad_ids = {id(n) for n in reflect_pads} + pad_init_names = set() + + new_nodes = [] + for node in graph.node: + if id(node) not in pad_ids: + new_nodes.append(node) + continue + + pads = inits[node.input[1]].tolist() + h_pad, w_pad = int(pads[2]), int(pads[3]) + + for inp in node.input[1:]: + if inp in inits: + pad_init_names.add(inp) + + current = node.input[0] + + if h_pad > 0: + top = [] + for i in range(h_pad, 0, -1): + name = f"_rp_t{uid()}" + new_nodes.append(helper.make_node( + "Slice", + inputs=[current, f"_rp_p{i}", f"_rp_p{i+1}", "_rp_ax2"], + outputs=[name], + )) + top.append(name) + + bot = [] + for i in range(1, h_pad + 1): + name = f"_rp_b{uid()}" + new_nodes.append(helper.make_node( + "Slice", + inputs=[current, f"_rp_n{i+1}", f"_rp_n{i}", "_rp_ax2"], + outputs=[name], + )) + bot.append(name) + + h_out = f"_rp_h{uid()}" + new_nodes.append(helper.make_node( + "Concat", inputs=top + [current] + bot, outputs=[h_out], axis=2 + )) + current = h_out + + if w_pad > 0: + left = [] + for i in range(w_pad, 0, -1): + name = f"_rp_l{uid()}" + new_nodes.append(helper.make_node( + "Slice", + inputs=[current, f"_rp_p{i}", f"_rp_p{i+1}", "_rp_ax3"], + outputs=[name], + )) + left.append(name) + + right = [] + for i in range(1, w_pad + 1): + name = f"_rp_r{uid()}" + new_nodes.append(helper.make_node( + "Slice", + inputs=[current, f"_rp_n{i+1}", f"_rp_n{i}", "_rp_ax3"], + outputs=[name], + )) + right.append(name) + + new_nodes.append(helper.make_node( + "Concat", + inputs=left + [current] + right, + outputs=[node.output[0]], + axis=3, + )) + elif h_pad > 0: + new_nodes.append(helper.make_node( + "Identity", inputs=[current], outputs=[node.output[0]] + )) + + # Remove old Pad initializers + clean_inits = [i for i in graph.initializer if i.name not in pad_init_names] + del graph.initializer[:] + graph.initializer.extend(clean_inits) + + del graph.node[:] + graph.node.extend(new_nodes) + return True + + +# --------------------------------------------------------------------------- +# Pass 3: Decompose Split → Slice pairs +# --------------------------------------------------------------------------- + +def _decompose_split(model) -> bool: + """Rewrite Split(axis=1) as Slice pairs that CoreML can handle. + + CoreML's EP doesn't support the ONNX ``Split`` op, causing partition + boundaries in models that use channel-wise splits (e.g. GFPGAN's SFT + modulation layers). Each Split with two outputs becomes two Slice ops. + """ + from onnx import numpy_helper, helper + + graph = model.graph + + splits = [] + for node in graph.node: + if node.op_type == "Split": + axis = 0 + split_sizes = [] + for attr in node.attribute: + if attr.name == "axis": + axis = attr.i + if attr.name == "split": + split_sizes = list(attr.ints) + if axis == 1 and len(split_sizes) == 2 and len(node.output) == 2: + splits.append((node, split_sizes)) + + if not splits: + return False + + existing = {i.name for i in graph.initializer} + + def ensure_const(name, value): + if name not in existing: + graph.initializer.append( + numpy_helper.from_array(np.array(value, dtype=np.int64), name=name) + ) + existing.add(name) + + ensure_const("_sp_ax1", [1]) + + # Collect all needed boundary constants + for _, (a, b) in splits: + ensure_const(f"_sp_s0", [0]) + ensure_const(f"_sp_s{a}", [a]) + ensure_const(f"_sp_s{a + b}", [a + b]) + + split_ids = {id(node) for node, _ in splits} + replacements = {} + for node, (a, b) in splits: + slice0 = helper.make_node( + "Slice", + inputs=[node.input[0], "_sp_s0", f"_sp_s{a}", "_sp_ax1"], + outputs=[node.output[0]], + ) + slice1 = helper.make_node( + "Slice", + inputs=[node.input[0], f"_sp_s{a}", f"_sp_s{a + b}", "_sp_ax1"], + outputs=[node.output[1]], + ) + replacements[id(node)] = [slice0, slice1] + + new_nodes = [] + for node in graph.node: + if id(node) in split_ids: + new_nodes.extend(replacements[id(node)]) + else: + new_nodes.append(node) + + del graph.node[:] + graph.node.extend(new_nodes) + return True + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _preserve_emap_position(model, numpy_helper): + """Keep the insightface emap (512×512 matrix) as the last initializer.""" + graph = model.graph + emap_init = None + for init in graph.initializer: + if not init.name.startswith("_rp_"): + arr = numpy_helper.to_array(init) + if len(arr.shape) == 2 and arr.shape[0] == 512 and arr.shape[1] == 512: + emap_init = init + break + + if emap_init is not None: + inits = [i for i in graph.initializer if i.name != emap_init.name] + del graph.initializer[:] + graph.initializer.extend(inits) + graph.initializer.append(emap_init) diff --git a/modules/platform_info.py b/modules/platform_info.py new file mode 100644 index 0000000..64235cf --- /dev/null +++ b/modules/platform_info.py @@ -0,0 +1,80 @@ +"""Centralized platform + accelerator detection. + +Imported once at startup to expose typed flags the rest of the codebase +can branch on without re-querying `platform`, `torch.cuda`, or +`onnxruntime.get_available_providers()` repeatedly. + +The banner printed by :func:`print_banner` is the single user-facing +report of which code path the app will take. +""" +from __future__ import annotations + +import platform as _platform +import sys +from typing import List, Tuple + +IS_WINDOWS: bool = _platform.system() == "Windows" +IS_MACOS: bool = _platform.system() == "Darwin" +IS_LINUX: bool = _platform.system() == "Linux" +IS_APPLE_SILICON: bool = IS_MACOS and _platform.machine() == "arm64" + + +def _detect_torch_cuda() -> bool: + try: + import torch # noqa: WPS433 — local import, avoid hard dep at module load + return bool(torch.cuda.is_available()) + except Exception: + return False + + +def _detect_onnx_providers() -> List[str]: + try: + import onnxruntime + return list(onnxruntime.get_available_providers()) + except Exception: + return [] + + +HAS_TORCH_CUDA: bool = _detect_torch_cuda() +ONNX_PROVIDERS: List[str] = _detect_onnx_providers() +HAS_CUDA_PROVIDER: bool = "CUDAExecutionProvider" in ONNX_PROVIDERS +HAS_COREML_PROVIDER: bool = "CoreMLExecutionProvider" in ONNX_PROVIDERS +HAS_DML_PROVIDER: bool = "DmlExecutionProvider" in ONNX_PROVIDERS + + +def camera_backends() -> List[Tuple[int, int]]: + """Return an ordered list of ``(device_index, cv2_backend)`` attempts. + + Windows prefers MSMF (60fps capable) with DirectShow as fallback. + macOS/Linux use the default backend (AVFoundation / V4L2). + """ + import cv2 + if IS_WINDOWS: + return [ + (0, cv2.CAP_MSMF), + (0, cv2.CAP_DSHOW), + (0, cv2.CAP_ANY), + ] + return [(0, cv2.CAP_ANY)] + + +def accelerator_label() -> str: + if HAS_TORCH_CUDA and HAS_CUDA_PROVIDER: + return "CUDA (NVIDIA)" + if IS_APPLE_SILICON and HAS_COREML_PROVIDER: + return "CoreML (Apple Neural Engine)" + if HAS_COREML_PROVIDER: + return "CoreML" + if HAS_DML_PROVIDER: + return "DirectML" + return "CPU" + + +def print_banner() -> None: + """Print a one-line summary of the platform + accelerator selection.""" + os_label = f"{_platform.system()} {_platform.machine()}" + print( + f"[platform] {os_label} | python {sys.version.split()[0]} | " + f"accelerator: {accelerator_label()} | providers: {ONNX_PROVIDERS}", + flush=True, + ) diff --git a/modules/processors/frame/_onnx_enhancer.py b/modules/processors/frame/_onnx_enhancer.py index 1e63c69..4eeee98 100644 --- a/modules/processors/frame/_onnx_enhancer.py +++ b/modules/processors/frame/_onnx_enhancer.py @@ -36,23 +36,11 @@ def build_provider_config(providers=None): # Already configured – pass through config.append(p) elif p == "CUDAExecutionProvider": - config.append(( - "CUDAExecutionProvider", - { - # Re-use freed blocks instead of growing the arena - "arena_extend_strategy": "kSameAsRequested", - # One-time exhaustive search for the fastest cuDNN - # convolution algorithm (significant speed-up after - # the first inference pass) - "cudnn_conv_algo_search": "EXHAUSTIVE", - # Allow cuDNN to use more workspace memory for faster - # convolution kernels - "cudnn_conv_use_max_workspace": "1", - # Use a separate CUDA stream for host↔device copies so - # they can overlap with compute kernels - "do_copy_in_default_stream": "0", - }, - )) + # Use bare provider — ONNX Runtime's defaults are fastest on + # modern GPUs (Blackwell/sm_120). Custom options like + # EXHAUSTIVE cudnn_conv_algo_search hurt performance on these + # architectures. + config.append(p) elif p == "CoreMLExecutionProvider" and IS_APPLE_SILICON: config.append(( "CoreMLExecutionProvider", @@ -103,7 +91,26 @@ def run_inference(session: onnxruntime.InferenceSession, def create_onnx_session(model_path: str) -> onnxruntime.InferenceSession: - """Create an ONNX Runtime session with optimised provider config.""" + """Create an ONNX Runtime session with optimised provider config. + + On Apple Silicon, applies CoreML graph optimizations (Pad decomposition, + Shape/Gather folding, Split decomposition) to reduce CPU↔ANE partition + boundaries. + """ + if IS_APPLE_SILICON: + from modules.onnx_optimize import optimize_for_coreml + # Infer input shape from the model for Shape/Gather folding + try: + import onnx + m = onnx.load(model_path) + inp = m.graph.input[0] + dims = inp.type.tensor_type.shape.dim + shape = tuple(d.dim_value for d in dims if d.dim_value > 0) + input_shape = shape if len(shape) == 4 else None + except Exception: + input_shape = None + model_path = optimize_for_coreml(model_path, input_shape=input_shape) + providers = build_provider_config() session_options = onnxruntime.SessionOptions() session_options.graph_optimization_level = ( diff --git a/modules/processors/frame/core.py b/modules/processors/frame/core.py index 628a2a6..48b660d 100644 --- a/modules/processors/frame/core.py +++ b/modules/processors/frame/core.py @@ -331,6 +331,14 @@ def _run_pipe_pipeline( 'mode': 'in-memory', }) + # Pipelined detection: while processing frame N (swap on + # ANE), start detecting the face in the next frame + # (detection on GPU). They use different hardware units + # so the work overlaps. + detect_executor = ThreadPoolExecutor(max_workers=1) + pending_detect = None + use_pipeline = not modules.globals.many_faces + while True: raw = reader.stdout.read(frame_size) if len(raw) != frame_size: @@ -340,26 +348,33 @@ def _run_pipe_pipeline( (height, width, 3) ).copy() - # Detect target face once and share across all processors. - # This eliminates the redundant detection that each - # processor would otherwise do internally. - if not modules.globals.many_faces: - target_face = get_one_face(frame) + # Get the detection result for THIS frame + if use_pipeline: + if pending_detect is not None: + target_face = pending_detect.result() + else: + target_face = get_one_face(frame) + # Start detecting on THIS frame eagerly — the result + # will be used for the next iteration. At video + # frame rates the face barely moves between frames. + pending_detect = detect_executor.submit( + get_one_face, frame) else: - target_face = None # many_faces mode detects all internally + target_face = None # Run frame through every active processor for fp in frame_processors: try: frame = fp.process_frame(source_face, frame, target_face=target_face) except TypeError: - # Processor doesn't accept target_face kwarg frame = fp.process_frame(source_face, frame) writer.stdin.write(frame.tobytes()) processed_count += 1 progress.update(1) + detect_executor.shutdown(wait=True) + # Graceful shutdown writer.stdin.close() writer.wait() diff --git a/modules/processors/frame/face_enhancer.py b/modules/processors/frame/face_enhancer.py index 5a2a5a7..7c9dde9 100644 --- a/modules/processors/frame/face_enhancer.py +++ b/modules/processors/frame/face_enhancer.py @@ -1,4 +1,3 @@ -# --- START OF FILE face_enhancer.py --- # Uses ONNX Runtime for GFPGAN face enhancement (no torch/gfpgan dependency) from typing import Any, List @@ -82,20 +81,10 @@ def get_face_enhancer() -> onnxruntime.InferenceSession: try: from modules.processors.frame._onnx_enhancer import ( - build_provider_config, - ) - providers = build_provider_config() - - session_options = onnxruntime.SessionOptions() - session_options.graph_optimization_level = ( - onnxruntime.GraphOptimizationLevel.ORT_ENABLE_ALL + create_onnx_session, ) - FACE_ENHANCER = onnxruntime.InferenceSession( - model_path, - sess_options=session_options, - providers=providers, - ) + FACE_ENHANCER = create_onnx_session(model_path) input_info = FACE_ENHANCER.get_inputs()[0] output_info = FACE_ENHANCER.get_outputs()[0] @@ -161,6 +150,18 @@ def _align_face( return aligned_face, affine_matrix +_HAS_TORCH_CUDA = False +try: + import torch + if torch.cuda.is_available(): + _HAS_TORCH_CUDA = True +except ImportError: + pass + +# Cache the feathered mask — it's the same for every call at a given size +_enhancer_cache: dict = {'mask': None, 'mask_size': 0} + + def _paste_back( frame: Frame, enhanced_face: np.ndarray, @@ -170,53 +171,76 @@ def _paste_back( """ Paste an enhanced (aligned) face back onto the original frame using the inverse affine transform with feathered-edge blending. + + Optimized: operates on a tight crop around the face bbox instead of the + full frame, and uses GPU for blending when available. """ h, w = frame.shape[:2] - - # Inverse the affine warp inv_matrix = cv2.invertAffineTransform(affine_matrix) - inv_restored = cv2.warpAffine( - enhanced_face, - inv_matrix, - (w, h), - borderMode=cv2.BORDER_CONSTANT, - borderValue=(0, 0, 0), + + # Build or reuse cached feathered mask + if _enhancer_cache['mask_size'] != output_size: + face_mask = np.ones((output_size, output_size), dtype=np.float32) + border = max(1, int(output_size * 0.05)) + ramp_up = np.linspace(0.0, 1.0, border, dtype=np.float32) + ramp_down = np.linspace(1.0, 0.0, border, dtype=np.float32) + face_mask[:border, :] *= ramp_up[:, None] + face_mask[-border:, :] *= ramp_down[:, None] + face_mask[:, :border] *= ramp_up[None, :] + face_mask[:, -border:] *= ramp_down[None, :] + _enhancer_cache['mask'] = face_mask + _enhancer_cache['mask_size'] = output_size + + # Compute tight bbox from affine corners (avoids full-frame warpAffine scan) + corners = np.array([[0, 0], [output_size, 0], + [output_size, output_size], [0, output_size]], + dtype=np.float32) + transformed = (inv_matrix[:, :2] @ corners.T).T + inv_matrix[:, 2] + x1 = max(0, int(np.floor(transformed[:, 0].min()))) + x2 = min(w, int(np.ceil(transformed[:, 0].max()))) + y1 = max(0, int(np.floor(transformed[:, 1].min()))) + y2 = min(h, int(np.ceil(transformed[:, 1].max()))) + if x1 >= x2 or y1 >= y2: + return frame + + # Pad a few pixels for feathering + pad = max(1, int(output_size * 0.05)) + 2 + y1p, y2p = max(0, y1 - pad), min(h, y2 + pad) + x1p, x2p = max(0, x1 - pad), min(w, x2 + pad) + crop_w, crop_h = x2p - x1p, y2p - y1p + + # Warp enhanced face and mask into crop space only + inv_crop = inv_matrix.copy() + inv_crop[0, 2] -= x1p + inv_crop[1, 2] -= y1p + + inv_restored_crop = cv2.warpAffine( + enhanced_face, inv_crop, (crop_w, crop_h), + borderMode=cv2.BORDER_CONSTANT, borderValue=(0, 0, 0), ) - - # Build a soft feathered mask in aligned space for edge blending - face_mask = np.ones((output_size, output_size), dtype=np.float32) - - # Feather the border (5 % of the size on each edge) - border = max(1, int(output_size * 0.05)) - ramp_up = np.linspace(0.0, 1.0, border, dtype=np.float32) - ramp_down = np.linspace(1.0, 0.0, border, dtype=np.float32) - - # Top / bottom rows - face_mask[:border, :] *= ramp_up[:, None] - face_mask[-border:, :] *= ramp_down[:, None] - # Left / right columns - face_mask[:, :border] *= ramp_up[None, :] - face_mask[:, -border:] *= ramp_down[None, :] - - # Expand to 3-channel - face_mask_3c = np.stack([face_mask] * 3, axis=-1) - - # Warp mask back to original frame space - inv_mask = cv2.warpAffine( - face_mask_3c, - inv_matrix, - (w, h), - borderMode=cv2.BORDER_CONSTANT, - borderValue=(0, 0, 0), + inv_mask_crop = cv2.warpAffine( + _enhancer_cache['mask'], inv_crop, (crop_w, crop_h), + borderMode=cv2.BORDER_CONSTANT, borderValue=0.0, ) - inv_mask = np.clip(inv_mask, 0.0, 1.0) + np.clip(inv_mask_crop, 0.0, 1.0, out=inv_mask_crop) - # Alpha-blend - result = ( - frame.astype(np.float32) * (1.0 - inv_mask) - + inv_restored.astype(np.float32) * inv_mask - ) - return np.clip(result, 0, 255).astype(np.uint8) + if _HAS_TORCH_CUDA: + # GPU blend on crop only + mask_t = torch.from_numpy(inv_mask_crop).cuda().unsqueeze(2) + enhanced_t = torch.from_numpy(inv_restored_crop).float().cuda() + target_t = torch.from_numpy(frame[y1p:y2p, x1p:x2p]).float().cuda() + blended = (mask_t * enhanced_t + (1.0 - mask_t) * target_t + ).to(torch.uint8).cpu().numpy() + frame[y1p:y2p, x1p:x2p] = blended + else: + # CPU blend on crop only + mask_3d = inv_mask_crop[:, :, np.newaxis] + target_crop = frame[y1p:y2p, x1p:x2p].astype(np.float32) + blended = (mask_3d * inv_restored_crop.astype(np.float32) + + (1.0 - mask_3d) * target_crop) + frame[y1p:y2p, x1p:x2p] = np.clip(blended, 0, 255).astype(np.uint8) + + return frame def _preprocess_face(aligned_face: np.ndarray) -> np.ndarray: @@ -224,14 +248,13 @@ def _preprocess_face(aligned_face: np.ndarray) -> np.ndarray: Convert an aligned BGR uint8 face image to the ONNX model input tensor. Format: NCHW float32, normalised to [-1, 1]. """ - # BGR -> RGB - rgb = cv2.cvtColor(aligned_face, cv2.COLOR_BGR2RGB).astype(np.float32) - # [0, 255] -> [0, 1] -> [-1, 1] - rgb = rgb / 255.0 - rgb = (rgb - 0.5) / 0.5 - # HWC -> CHW, add batch dim - chw = np.transpose(rgb, (2, 0, 1)) - return np.expand_dims(chw, axis=0) # shape: (1, 3, H, W) + # BGR -> RGB, normalize, and transpose in one pass + # Fused: (x / 255.0 - 0.5) / 0.5 = x / 127.5 - 1.0 + rgb = aligned_face[:, :, ::-1] # BGR->RGB zero-copy view + chw = np.transpose(rgb, (2, 0, 1)).astype(np.float32) + chw *= (1.0 / 127.5) + chw -= 1.0 + return chw[np.newaxis, ...] # shape: (1, 3, H, W) def _postprocess_face(output: np.ndarray) -> np.ndarray: @@ -239,24 +262,42 @@ def _postprocess_face(output: np.ndarray) -> np.ndarray: Convert the ONNX model output tensor back to a BGR uint8 image. Expects input in NCHW format with values in [-1, 1]. """ - face = np.squeeze(output) # remove batch dim -> (3, H, W) - face = np.transpose(face, (1, 2, 0)) # CHW -> HWC - # [-1, 1] -> [0, 1] -> [0, 255] - face = (face + 1.0) / 2.0 - face = np.clip(face * 255.0, 0, 255).astype(np.uint8) - # RGB -> BGR - return cv2.cvtColor(face, cv2.COLOR_RGB2BGR) + # Fused: ((x + 1.0) / 2.0) * 255 = (x + 1.0) * 127.5 + face = output[0] # remove batch dim -> (3, H, W) + face = (face + 1.0) * 127.5 + np.clip(face, 0, 255, out=face) + face = face.astype(np.uint8).transpose(1, 2, 0) # CHW -> HWC + return face[:, :, ::-1].copy() # RGB -> BGR -def enhance_face(temp_frame: Frame) -> Frame: - """Enhances all faces in a frame using the GFPGAN ONNX model.""" +# Cache for temporal enhancement skipping in live mode. +# GFPGAN output barely changes between consecutive frames (same face, +# same position), so we run inference every _ENH_INTERVAL frames and +# reuse the cached enhanced face + affine matrix in between. +_enh_live_cache: dict = { + 'enhanced_bgr': None, + 'affine_matrix': None, + 'align_size': 0, + 'frame_count': 0, +} +_ENH_INTERVAL = 2 # run inference every N frames, paste cached result otherwise + + +def enhance_face(temp_frame: Frame, detected_faces=None) -> Frame: + """Enhances all faces in a frame using the GFPGAN ONNX model. + + Args: + detected_faces: Pre-detected face list. When provided, skips + the internal detection call (saves ~15-20ms per frame). + Also enables temporal caching — inference runs every + _ENH_INTERVAL frames, reusing the cached result otherwise. + """ session = get_face_enhancer() # Determine model input resolution from the session metadata input_info = session.get_inputs()[0] input_name = input_info.name input_shape = input_info.shape # e.g. [1, 3, 512, 512] - # Safely extract input size (handle dynamic / symbolic dimensions) try: align_size = int(input_shape[2]) if align_size <= 0: @@ -264,15 +305,21 @@ def enhance_face(temp_frame: Frame) -> Frame: except (ValueError, TypeError, IndexError): align_size = 512 - # Detect faces using InsightFace (already a project dependency) - faces = get_many_faces(temp_frame) + # Use pre-detected faces if available, otherwise detect + faces = detected_faces if detected_faces is not None else get_many_faces(temp_frame) if not faces: return temp_frame - result_frame = temp_frame.copy() + # Temporal caching: only available when faces are pre-detected (live mode) + use_cache = detected_faces is not None + if use_cache: + _enh_live_cache['frame_count'] += 1 + run_inference_this_frame = (_enh_live_cache['frame_count'] % _ENH_INTERVAL == 0 + or _enh_live_cache['enhanced_bgr'] is None) + else: + run_inference_this_frame = True for face in faces: - # Need the 5-point key-points for alignment if not hasattr(face, "kps") or face.kps is None: continue @@ -280,54 +327,65 @@ def enhance_face(temp_frame: Frame) -> Frame: if landmarks_5.shape[0] < 5: continue - # Align / crop the face at the model's INPUT resolution - aligned_face, affine_matrix = _align_face( - temp_frame, landmarks_5, output_size=align_size - ) - if aligned_face is None or affine_matrix is None: - continue - - try: - with THREAD_SEMAPHORE: - from modules.processors.frame._onnx_enhancer import ( - run_inference, - ) - input_tensor = _preprocess_face(aligned_face) - output_tensor = run_inference(session, input_name, input_tensor) - enhanced_bgr = _postprocess_face(output_tensor) - - # The model may output at a different resolution than its input - # (e.g. input 512x512 → output 1024x1024). Resize the enhanced - # face back to the alignment size so the inverse affine maps - # correctly. - eh, ew = enhanced_bgr.shape[:2] - if eh != align_size or ew != align_size: - enhanced_bgr = cv2.resize( - enhanced_bgr, - (align_size, align_size), - interpolation=cv2.INTER_LANCZOS4, - ) - - # Paste enhanced face back onto the frame - result_frame = _paste_back( - result_frame, enhanced_bgr, affine_matrix, output_size=align_size + if run_inference_this_frame: + aligned_face, affine_matrix = _align_face( + temp_frame, landmarks_5, output_size=align_size ) - except Exception as e: - print(f"{NAME}: Error enhancing a face: {e}") - continue + if aligned_face is None or affine_matrix is None: + continue - return result_frame + try: + with THREAD_SEMAPHORE: + from modules.processors.frame._onnx_enhancer import ( + run_inference, + ) + input_tensor = _preprocess_face(aligned_face) + output_tensor = run_inference(session, input_name, input_tensor) + enhanced_bgr = _postprocess_face(output_tensor) + eh, ew = enhanced_bgr.shape[:2] + if eh != align_size or ew != align_size: + enhanced_bgr = cv2.resize( + enhanced_bgr, + (align_size, align_size), + interpolation=cv2.INTER_LANCZOS4, + ) + + # Cache for reuse on next frame + if use_cache: + _enh_live_cache['enhanced_bgr'] = enhanced_bgr + _enh_live_cache['affine_matrix'] = affine_matrix + _enh_live_cache['align_size'] = align_size + + _paste_back( + temp_frame, enhanced_bgr, affine_matrix, output_size=align_size + ) + except Exception as e: + print(f"{NAME}: Error enhancing a face: {e}") + continue + else: + # Reuse cached enhanced face — just paste back onto current frame + cached = _enh_live_cache + if cached['enhanced_bgr'] is not None: + _paste_back( + temp_frame, cached['enhanced_bgr'], + cached['affine_matrix'], + output_size=cached['align_size'], + ) + break # single-face live mode — only process first face -def process_frame(source_face: Face | None, temp_frame: Frame) -> Frame: - """Processes a frame: enhances face if detected.""" - temp_frame = enhance_face(temp_frame) return temp_frame -def process_frame_v2(temp_frame: Frame) -> Frame: +def process_frame(source_face: Face | None, temp_frame: Frame, + detected_faces=None) -> Frame: + """Processes a frame: enhances face if detected.""" + return enhance_face(temp_frame, detected_faces=detected_faces) + + +def process_frame_v2(temp_frame: Frame, detected_faces=None) -> Frame: """Processes a frame without source face (used by live webcam preview).""" - return enhance_face(temp_frame) + return enhance_face(temp_frame, detected_faces=detected_faces) def process_frames( @@ -378,6 +436,3 @@ def process_video( modules.processors.frame.core.process_video( source_path, temp_frame_paths, process_frames ) - - -# --- END OF FILE face_enhancer.py --- diff --git a/modules/processors/frame/face_swapper.py b/modules/processors/frame/face_swapper.py index 90fdf87..91b403e 100644 --- a/modules/processors/frame/face_swapper.py +++ b/modules/processors/frame/face_swapper.py @@ -1,7 +1,7 @@ from typing import Any, List, Optional import cv2 import insightface -from insightface.utils import face_align +import logging import threading import numpy as np import platform @@ -86,21 +86,28 @@ def get_face_swapper() -> Any: with THREAD_LOCK: if FACE_SWAPPER is None: - # Prefer FP32 for broad GPU compatibility (FP16 can produce NaN - # on GPUs without Tensor Cores, e.g. GTX 16xx). Fall back to - # FP16 when FP32 is not available. + # Prefer FP16 on GPUs with Tensor Cores (Turing+) — half the + # memory bandwidth, faster inference. Fall back to FP32 for + # older GPUs (e.g. GTX 16xx) where FP16 can produce NaN. fp32_path = os.path.join(models_dir, "inswapper_128.onnx") fp16_path = os.path.join(models_dir, "inswapper_128_fp16.onnx") - if os.path.exists(fp32_path): - model_path = fp32_path - elif os.path.exists(fp16_path): + use_fp16 = _HAS_TORCH_CUDA and os.path.exists(fp16_path) + if use_fp16: model_path = fp16_path + elif os.path.exists(fp32_path): + model_path = fp32_path else: update_status(f"No inswapper model found in {models_dir}.", NAME) return None + # On Apple Silicon, rewrite Pad(reflect) → Slice+Concat so + # CoreML can run the entire model in a single partition on + # the Neural Engine instead of bouncing between CPU and ANE. + if IS_APPLE_SILICON: + from modules.onnx_optimize import optimize_for_coreml + model_path = optimize_for_coreml(model_path) + update_status(f"Loading face swapper model from: {model_path}", NAME) try: - # Optimized provider configuration for Apple Silicon providers_config = [] for p in modules.globals.execution_providers: if p == "CoreMLExecutionProvider" and IS_APPLE_SILICON: @@ -116,21 +123,22 @@ def get_face_swapper() -> Any: } )) elif p == "CUDAExecutionProvider": - providers_config.append(( - "CUDAExecutionProvider", - { - "arena_extend_strategy": "kSameAsRequested", - "cudnn_conv_algo_search": "EXHAUSTIVE", - "cudnn_conv_use_max_workspace": "1", - "do_copy_in_default_stream": "0", - } - )) + # Use bare provider — ONNX Runtime defaults are + # fastest on modern GPUs (Blackwell/sm_120). + providers_config.append(p) else: providers_config.append(p) FACE_SWAPPER = insightface.model_zoo.get_model( model_path, providers=providers_config, ) + # Set up CUDA graph session for faster inference + if _HAS_TORCH_CUDA and any( + p == "CUDAExecutionProvider" or + (isinstance(p, tuple) and p[0] == "CUDAExecutionProvider") + for p in providers_config + ): + _init_cuda_graph_session(model_path, FACE_SWAPPER) update_status("Face swapper model loaded successfully.", NAME) except Exception as e: update_status(f"Error loading face swapper model: {e}", NAME) @@ -139,63 +147,204 @@ def get_face_swapper() -> Any: return FACE_SWAPPER +_HAS_TORCH_CUDA = False +try: + import torch + if torch.cuda.is_available(): + _HAS_TORCH_CUDA = True +except ImportError: + pass + +# Cache for paste-back +_paste_cache = { + 'mask_white': None, # pre-allocated white image +} + +# CUDA graph swap session cache +_cuda_graph_session = { + 'session': None, + 'io_binding': None, + 'ort_input': None, + 'ort_latent': None, + 'recorded': False, +} + + +def _init_cuda_graph_session(model_path: str, swapper): + """Create a CUDA-graph-enabled ONNX session for the swap model. + + CUDA graphs record the GPU kernel launch sequence once, then replay it + with near-zero CPU overhead on subsequent runs. Requires static input + shapes (inswapper is always 1x3x128x128 + 1x512). + """ + import onnxruntime as ort + try: + providers = [('CUDAExecutionProvider', {'enable_cuda_graph': '1'})] + sess = ort.InferenceSession(model_path, providers=providers) + + # Pre-allocate GPU buffers with correct shapes + inp_shape = (1, 3, swapper.input_size[1], swapper.input_size[0]) + latent_shape = (1, 512) + dummy_inp = np.zeros(inp_shape, dtype=np.float32) + dummy_lat = np.zeros(latent_shape, dtype=np.float32) + + ort_input = ort.OrtValue.ortvalue_from_numpy(dummy_inp, 'cuda', 0) + ort_latent = ort.OrtValue.ortvalue_from_numpy(dummy_lat, 'cuda', 0) + + io = sess.io_binding() + io.bind_ortvalue_input(swapper.input_names[0], ort_input) + io.bind_ortvalue_input(swapper.input_names[1], ort_latent) + io.bind_output(swapper.output_names[0], 'cuda', 0) + + # First run records the CUDA graph + sess.run_with_iobinding(io) + + _cuda_graph_session['session'] = sess + _cuda_graph_session['io_binding'] = io + _cuda_graph_session['ort_input'] = ort_input + _cuda_graph_session['ort_latent'] = ort_latent + _cuda_graph_session['recorded'] = True + + # Monkey-patch the swapper's session.run to use CUDA graph replay + _original_run = swapper.session.run + + def _graph_run(output_names, input_dict, **kwargs): + if _cuda_graph_session['recorded']: + try: + # input_dict has 'target' (blob) and 'source' (latent) + keys = list(input_dict.keys()) + blob = input_dict[keys[0]] + latent = input_dict[keys[1]] + return [_cuda_graph_swap_inference(blob, latent)] + except Exception: + pass + return _original_run(output_names, input_dict, **kwargs) + + swapper.session.run = _graph_run + import sys + print(f"[{NAME}] CUDA graph session initialized (swap model)") + sys.stdout.flush() + except Exception as e: + print(f"[{NAME}] CUDA graph init failed, using standard session: {e}") + _cuda_graph_session['recorded'] = False + + +def _cuda_graph_swap_inference(blob: np.ndarray, latent: np.ndarray) -> np.ndarray: + """Run swap model via CUDA graph replay — minimal CPU overhead.""" + cg = _cuda_graph_session + cg['ort_input'].update_inplace(blob) + cg['ort_latent'].update_inplace(latent) + cg['session'].run_with_iobinding(cg['io_binding']) + return cg['io_binding'].get_outputs()[0].numpy() + + def _fast_paste_back(target_img: Frame, bgr_fake: np.ndarray, aimg: np.ndarray, M: np.ndarray) -> Frame: - """Optimized paste-back that restricts blending to the face bounding box. + """GPU-accelerated paste-back that restricts blending to the face bounding box. Same visual output as insightface's built-in paste_back, but: - Skips dead fake_diff code (computed but unused in insightface) - Runs erosion, blur, and blend on the face bbox instead of the full frame + - Uses torch CUDA for warpAffine + blend when available + - Writes directly into target_img to avoid full-frame copy """ h, w = target_img.shape[:2] + face_h, face_w = aimg.shape[:2] IM = cv2.invertAffineTransform(M) - # Warp swapped face and mask to full frame (fast: ~0.4ms each) - bgr_fake_full = cv2.warpAffine(bgr_fake, IM, (w, h), borderValue=0.0) - img_white = np.full((aimg.shape[0], aimg.shape[1]), 255, dtype=np.float32) - img_white_full = cv2.warpAffine(img_white, IM, (w, h), borderValue=0.0) + # Reuse pre-allocated white mask + if _paste_cache['mask_white'] is None or _paste_cache['mask_white'].shape != (face_h, face_w): + _paste_cache['mask_white'] = np.full((face_h, face_w), 255, dtype=np.float32) - # Find tight bounding box of the warped face mask - rows = np.any(img_white_full > 20, axis=1) - cols = np.any(img_white_full > 20, axis=0) - row_idx = np.where(rows)[0] - col_idx = np.where(cols)[0] - if len(row_idx) == 0 or len(col_idx) == 0: + if _HAS_TORCH_CUDA: + # GPU path: compute bbox from affine matrix (avoids warpAffine + scan on white mask) + corners = np.array([[0, 0], [face_w, 0], [face_w, face_h], [0, face_h]], dtype=np.float32) + transformed = (IM[:, :2] @ corners.T).T + IM[:, 2] + x1 = int(np.floor(transformed[:, 0].min())) + x2 = int(np.ceil(transformed[:, 0].max())) + y1 = int(np.floor(transformed[:, 1].min())) + y2 = int(np.ceil(transformed[:, 1].max())) + if x1 >= x2 or y1 >= y2: + return target_img + + mask_h = y2 - y1 + mask_w = x2 - x1 + mask_size = int(np.sqrt(mask_h * mask_w)) + k_erode = max(mask_size // 10, 10) + k_blur = max(mask_size // 20, 5) + + pad = k_erode + k_blur + 2 + y1p, y2p = max(0, y1 - pad), min(h, y2 + pad + 1) + x1p, x2p = max(0, x1 - pad), min(w, x2 + pad + 1) + + # Warp face and mask into crop region only (CPU — fast on small image) + IM_crop = IM.copy() + IM_crop[0, 2] -= x1p + IM_crop[1, 2] -= y1p + crop_w, crop_h = x2p - x1p, y2p - y1p + + bgr_fake_crop = cv2.warpAffine(bgr_fake, IM_crop, (crop_w, crop_h), borderValue=0.0) + mask_crop = cv2.warpAffine(_paste_cache['mask_white'], IM_crop, (crop_w, crop_h), borderValue=0.0) + + # All mask processing + blend on GPU (no CPU roundtrips) + mask_t = torch.from_numpy(mask_crop).cuda() + mask_t = torch.where(mask_t > 20, 255.0, 0.0) + orig_h, orig_w = mask_t.shape + + # Erode via negative max_pool (equivalent to min_pool) + m4 = mask_t.unsqueeze(0).unsqueeze(0) + m4 = -torch.nn.functional.max_pool2d(-m4, kernel_size=k_erode, stride=1, padding=k_erode // 2) + + # Gaussian blur approximation via avg_pool + bk = 2 * k_blur + 1 + m4 = torch.nn.functional.avg_pool2d(m4, kernel_size=bk, stride=1, padding=bk // 2) + + # Fix any padding-induced size mismatch + m4 = m4[:, :, :orig_h, :orig_w] + + mask_3d = (m4.squeeze() * (1.0 / 255.0)).unsqueeze(2) + fake_t = torch.from_numpy(bgr_fake_crop).float().cuda() + tgt_t = torch.from_numpy(target_img[y1p:y2p, x1p:x2p]).float().cuda() + blended = (mask_3d * fake_t + (1.0 - mask_3d) * tgt_t).to(torch.uint8).cpu().numpy() + + target_img[y1p:y2p, x1p:x2p] = blended return target_img - y1, y2 = row_idx[0], row_idx[-1] - x1, x2 = col_idx[0], col_idx[-1] + else: + # CPU fallback + bgr_fake_full = cv2.warpAffine(bgr_fake, IM, (w, h), borderValue=0.0) + img_white_full = cv2.warpAffine(_paste_cache['mask_white'], IM, (w, h), borderValue=0.0) - # Compute mask/blur kernel sizes from the full mask extent - mask_h = y2 - y1 - mask_w = x2 - x1 - mask_size = int(np.sqrt(mask_h * mask_w)) - k_erode = max(mask_size // 10, 10) - k_blur = max(mask_size // 20, 5) + rows = np.any(img_white_full > 20, axis=1) + cols = np.any(img_white_full > 20, axis=0) + row_idx = np.where(rows)[0] + col_idx = np.where(cols)[0] + if len(row_idx) == 0 or len(col_idx) == 0: + return target_img + y1, y2 = row_idx[0], row_idx[-1] + x1, x2 = col_idx[0], col_idx[-1] - # Add padding for erosion + blur kernels, then crop - pad = k_erode + k_blur + 2 - y1p, y2p = max(0, y1 - pad), min(h, y2 + pad + 1) - x1p, x2p = max(0, x1 - pad), min(w, x2 + pad + 1) + mask_h = y2 - y1 + mask_w = x2 - x1 + mask_size = int(np.sqrt(mask_h * mask_w)) + k_erode = max(mask_size // 10, 10) + k_blur = max(mask_size // 20, 5) - # Work on cropped region only - mask_crop = img_white_full[y1p:y2p, x1p:x2p] - mask_crop[mask_crop > 20] = 255 + pad = k_erode + k_blur + 2 + y1p, y2p = max(0, y1 - pad), min(h, y2 + pad + 1) + x1p, x2p = max(0, x1 - pad), min(w, x2 + pad + 1) - kernel = np.ones((k_erode, k_erode), np.uint8) - mask_crop = cv2.erode(mask_crop, kernel, iterations=1) + mask_crop = img_white_full[y1p:y2p, x1p:x2p] + mask_crop[mask_crop > 20] = 255 + mask_crop = cv2.erode(mask_crop, np.ones((k_erode, k_erode), np.uint8), iterations=1) + mask_crop = cv2.GaussianBlur(mask_crop, (2*k_blur+1, 2*k_blur+1), 0) + mask_crop *= (1.0 / 255.0) - blur_size = tuple(2 * i + 1 for i in (k_blur, k_blur)) - mask_crop = cv2.GaussianBlur(mask_crop, blur_size, 0) - mask_crop /= 255.0 - - # Blend only within the crop - mask_3d = mask_crop[:, :, np.newaxis] - fake_crop = bgr_fake_full[y1p:y2p, x1p:x2p].astype(np.float32) - target_crop = target_img[y1p:y2p, x1p:x2p].astype(np.float32) - blended = mask_3d * fake_crop + (1.0 - mask_3d) * target_crop - - result = target_img.copy() - result[y1p:y2p, x1p:x2p] = np.clip(blended, 0, 255).astype(np.uint8) - return result + mask_3d = mask_crop[:, :, np.newaxis] + fake_crop = bgr_fake_full[y1p:y2p, x1p:x2p].astype(np.float32) + target_crop = target_img[y1p:y2p, x1p:x2p].astype(np.float32) + blended = mask_3d * fake_crop + (1.0 - mask_3d) * target_crop + # Write in-place, consistent with the GPU path + target_img[y1p:y2p, x1p:x2p] = np.clip(blended, 0, 255).astype(np.uint8) + return target_img def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame: @@ -211,11 +360,16 @@ def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame: if not hasattr(source_face, 'normed_embedding') or source_face.normed_embedding is None: return temp_frame - # Store a copy of the original frame before swapping for opacity blending and mouth mask + # _fast_paste_back writes in-place on the GPU path. Only copy when + # mouth_mask or opacity < 1 need an unmodified original. opacity = getattr(modules.globals, "opacity", 1.0) opacity = max(0.0, min(1.0, opacity)) mouth_mask_enabled = getattr(modules.globals, "mouth_mask", False) - original_frame = temp_frame.copy() if (opacity < 1.0 or mouth_mask_enabled) else temp_frame + needs_original = opacity < 1.0 or mouth_mask_enabled + if needs_original: + original_frame = temp_frame.copy() + else: + original_frame = temp_frame if temp_frame.dtype != np.uint8: temp_frame = np.clip(temp_frame, 0, 255).astype(np.uint8) @@ -241,11 +395,12 @@ def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame: if not isinstance(bgr_fake, np.ndarray): return original_frame - # Get the aligned input crop for the mask (same as insightface does internally) - aimg, _ = face_align.norm_crop2(temp_frame, target_face.kps, face_swapper.input_size[0]) + # Pass a dummy aimg with correct shape — _fast_paste_back only uses aimg.shape + # to create the white mask. Avoids redundant norm_crop2 (~0.6ms). + _face_size = face_swapper.input_size[0] + _aimg_dummy = np.empty((_face_size, _face_size, 3), dtype=np.uint8) - swapped_frame = _fast_paste_back(temp_frame, bgr_fake, aimg, M) - swapped_frame = np.clip(swapped_frame, 0, 255).astype(np.uint8) + swapped_frame = _fast_paste_back(temp_frame, bgr_fake, _aimg_dummy, M) except Exception as e: print(f"Error during face swap: {e}") @@ -356,6 +511,14 @@ def apply_post_processing(current_frame: Frame, swapped_face_bboxes: List[np.nda """Applies sharpening and interpolation with Apple Silicon optimizations.""" global PREVIOUS_FRAME_RESULT + sharpness_value = getattr(modules.globals, "sharpness", 0.0) + enable_interpolation = getattr(modules.globals, "enable_interpolation", False) + + # Skip copy when no post-processing is active + if sharpness_value <= 0.0 and not enable_interpolation: + PREVIOUS_FRAME_RESULT = None + return current_frame + processed_frame = current_frame.copy() # 1. Apply Sharpening (if enabled) with optimized kernel for Apple Silicon diff --git a/modules/ui.py b/modules/ui.py index 5fa00ca..0fe5990 100644 --- a/modules/ui.py +++ b/modules/ui.py @@ -17,6 +17,8 @@ import modules.metadata from modules.face_analyser import ( get_one_face, get_many_faces, + detect_one_face_fast, + detect_many_faces_fast, get_unique_faces_from_target_image, get_unique_faces_from_target_video, add_blank_map, @@ -1097,10 +1099,16 @@ def _capture_thread_func(cap, capture_queue, stop_event): pass -def _processing_thread_func(capture_queue, processed_queue, stop_event): +def _processing_thread_func(capture_queue, processed_queue, stop_event, + camera_fps: float = 30.0): """Processing thread: takes raw frames from capture_queue, runs face - detection (throttled to every 3rd frame), applies face swap/enhancement, - and puts results into processed_queue.""" + detection (throttled), applies face swap/enhancement, and puts results + into processed_queue. + + Args: + camera_fps: Actual camera frame rate — used to compute how many + frames to skip between face detections (~80ms target). + """ frame_processors = get_frame_processors_modules(modules.globals.frame_processors) source_image = None last_source_path = None @@ -1111,6 +1119,9 @@ def _processing_thread_func(capture_queue, processed_queue, stop_event): det_count = 0 cached_target_face = None cached_many_faces = None + # Detect every N frames ≈ 80ms. At 60fps → every 5 frames (83ms), + # at 30fps → every 3 frames (100ms), at 15fps → every frame. + det_interval = max(1, round(camera_fps * 0.08)) while not stop_event.is_set(): try: @@ -1128,26 +1139,37 @@ def _processing_thread_func(capture_queue, processed_queue, stop_event): last_source_path = modules.globals.source_path source_image = get_one_face(cv2.imread(modules.globals.source_path)) - # Run detection every 3 frames, reuse cached result otherwise + # Run detection every det_interval frames (~80ms). + # Use fast detection (det-only, no landmark/recognition) for live mode. det_count += 1 - if det_count % 3 == 0: + if det_count % det_interval == 0: if modules.globals.many_faces: cached_target_face = None - cached_many_faces = get_many_faces(temp_frame) + cached_many_faces = detect_many_faces_fast(temp_frame) else: - cached_target_face = get_one_face(temp_frame) + cached_target_face = detect_one_face_fast(temp_frame) cached_many_faces = None + # Build face list for enhancers from cached detection + _cached_faces = None + if cached_many_faces: + _cached_faces = cached_many_faces + elif cached_target_face is not None: + _cached_faces = [cached_target_face] + for frame_processor in frame_processors: if frame_processor.NAME == "DLC.FACE-ENHANCER": if modules.globals.fp_ui["face_enhancer"]: - temp_frame = frame_processor.process_frame(None, temp_frame) + temp_frame = frame_processor.process_frame( + None, temp_frame, detected_faces=_cached_faces) elif frame_processor.NAME == "DLC.FACE-ENHANCER-GPEN256": if modules.globals.fp_ui.get("face_enhancer_gpen256", False): - temp_frame = frame_processor.process_frame(None, temp_frame) + temp_frame = frame_processor.process_frame( + None, temp_frame, detected_faces=_cached_faces) elif frame_processor.NAME == "DLC.FACE-ENHANCER-GPEN512": if modules.globals.fp_ui.get("face_enhancer_gpen512", False): - temp_frame = frame_processor.process_frame(None, temp_frame) + temp_frame = frame_processor.process_frame( + None, temp_frame, detected_faces=_cached_faces) elif frame_processor.NAME == "DLC.FACE-SWAPPER": # Use cached face positions from detection thread swapped_bboxes = [] @@ -1198,6 +1220,10 @@ def _processing_thread_func(capture_queue, processed_queue, stop_event): 2, ) + # BGR→RGB in the processing thread so the display thread gets + # a contiguous RGB array (faster PIL.fromarray). + temp_frame = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB) + # Put processed frame into output queue, dropping old frames if full try: processed_queue.put_nowait(temp_frame) @@ -1216,10 +1242,13 @@ def create_webcam_preview(camera_index: int): global preview_label, PREVIEW cap = VideoCapturer(camera_index) - if not cap.start(PREVIEW_DEFAULT_WIDTH, PREVIEW_DEFAULT_HEIGHT, 60): + if not cap.start(1920, 1080, 60): update_status("Failed to start camera") return + camera_fps = cap.actual_fps + print(f"[webcam] Camera running at {cap.actual_width}x{cap.actual_height}@{camera_fps:.0f}fps") + preview_label.configure(width=PREVIEW_DEFAULT_WIDTH, height=PREVIEW_DEFAULT_HEIGHT) PREVIEW.deiconify() @@ -1240,7 +1269,7 @@ def create_webcam_preview(camera_index: int): # Start processing thread proc_thread = threading.Thread( target=_processing_thread_func, - args=(capture_queue, processed_queue, stop_event), + args=(capture_queue, processed_queue, stop_event, camera_fps), daemon=True, ) proc_thread.start() @@ -1253,37 +1282,32 @@ def create_webcam_preview(camera_index: int): cap.release() PREVIEW.withdraw() + # Poll at ~2x camera FPS (Nyquist) so we pick up frames promptly + # without burning CPU. Clamped to [1, 16] ms. + poll_ms = max(1, min(16, int(500 / camera_fps))) + # Non-blocking display loop using ROOT.after() — avoids blocking the - # Tk event loop which could cause UI freezes or re-entrancy issues + # Tk event loop which could cause UI freezes or re-entrancy issues. def _display_next_frame(): if stop_event.is_set() or PREVIEW.state() == "withdrawn": _cleanup() return try: - temp_frame = processed_queue.get_nowait() + rgb_frame = processed_queue.get_nowait() except queue.Empty: - ROOT.after(16, _display_next_frame) + ROOT.after(poll_ms, _display_next_frame) return - if modules.globals.live_resizable: - temp_frame = fit_image_to_size( - temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height() - ) - else: - temp_frame = fit_image_to_size( - temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height() - ) - temp_frame = temp_frame.copy() - image = gpu_cvt_color(temp_frame, cv2.COLOR_BGR2RGB) - image = Image.fromarray(image) - image = ImageOps.contain( - image, (temp_frame.shape[1], temp_frame.shape[0]), Image.LANCZOS + # Frame is already RGB from processing thread; resize to preview window + rgb_frame = fit_image_to_size( + rgb_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height() ) + image = Image.fromarray(rgb_frame) image = ctk.CTkImage(image, size=image.size) preview_label.configure(image=image) - ROOT.after(16, _display_next_frame) + ROOT.after(poll_ms, _display_next_frame) # Kick off the non-blocking display loop ROOT.after(0, _display_next_frame) diff --git a/modules/video_capture.py b/modules/video_capture.py index cab223d..e0e830d 100644 --- a/modules/video_capture.py +++ b/modules/video_capture.py @@ -1,5 +1,7 @@ import cv2 import numpy as np +import sys +import time from typing import Optional, Tuple, Callable import platform import threading @@ -17,6 +19,10 @@ class VideoCapturer: self._frame_ready = threading.Event() self.is_running = False self.cap = None + # Actual values reported by the camera after configuration + self.actual_width: int = 0 + self.actual_height: int = 0 + self.actual_fps: float = 0.0 # Initialize Windows-specific components if on Windows if platform.system() == "Windows": @@ -32,12 +38,14 @@ class VideoCapturer: """Initialize and start video capture""" try: if platform.system() == "Windows": - # Windows-specific capture methods + # Windows-specific capture methods. + # MSMF (Media Foundation) is preferred — DirectShow often + # caps at 30fps even when the camera supports 60fps. capture_methods = [ - (self.device_index, cv2.CAP_DSHOW), # Try DirectShow first - (self.device_index, cv2.CAP_ANY), # Then try default backend - (-1, cv2.CAP_ANY), # Try -1 as fallback - (0, cv2.CAP_ANY), # Finally try 0 without specific backend + (self.device_index, cv2.CAP_MSMF), # Media Foundation first + (self.device_index, cv2.CAP_DSHOW), # DirectShow fallback + (self.device_index, cv2.CAP_ANY), + (0, cv2.CAP_ANY), ] for dev_id, backend in capture_methods: @@ -55,11 +63,30 @@ class VideoCapturer: if not self.cap or not self.cap.isOpened(): raise RuntimeError("Failed to open camera") - # Configure format + # Try MJPEG first — avoids USB bandwidth limits with + # uncompressed YUV at high resolutions. Falls back silently + # if the camera/backend doesn't support it. + self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG')) + # Request desired resolution and frame rate self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) self.cap.set(cv2.CAP_PROP_FPS, fps) + # Read back resolution (usually reliable) + self.actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + self.actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + + # CAP_PROP_FPS is unreliable on DirectShow — often reports 30 + # even when the camera delivers 60. Measure empirically by + # timing a burst of frames. + reported_fps = self.cap.get(cv2.CAP_PROP_FPS) + self.actual_fps = self._measure_fps(warmup=10, sample=30, + fallback=reported_fps or fps) + + print(f"[VideoCapturer] {self.actual_width}x{self.actual_height} " + f"@ {self.actual_fps:.1f}fps (reported={reported_fps:.0f})", + flush=True) + self.is_running = True return True @@ -89,6 +116,29 @@ class VideoCapturer: self.is_running = False self.cap = None + def _measure_fps(self, warmup: int = 10, sample: int = 30, + fallback: float = 30.0) -> float: + """Read warmup+sample frames and return measured FPS. + + This is more reliable than CAP_PROP_FPS which often lies on + DirectShow. Takes ~0.5-1s at startup but gives a ground-truth + number for adaptive polling/detection intervals. + """ + try: + for _ in range(warmup): + self.cap.read() + t0 = time.perf_counter() + for _ in range(sample): + ret, _ = self.cap.read() + if not ret: + return fallback + elapsed = time.perf_counter() - t0 + if elapsed <= 0: + return fallback + return sample / elapsed + except Exception: + return fallback + def set_frame_callback(self, callback: Callable[[np.ndarray], None]) -> None: """Set callback for frame processing""" self.frame_callback = callback diff --git a/run.py b/run.py index f01b4ec..4ec59d2 100644 --- a/run.py +++ b/run.py @@ -7,17 +7,29 @@ import sys project_root = os.path.dirname(os.path.abspath(__file__)) os.environ["PATH"] = project_root + os.pathsep + os.environ.get("PATH", "") -# Add NVIDIA CUDA DLL directories to PATH so onnxruntime-gpu can find them -nvidia_dir = os.path.join(project_root, "venv", "Lib", "site-packages", "nvidia") -if os.path.isdir(nvidia_dir): - for pkg in os.listdir(nvidia_dir): - bin_dir = os.path.join(nvidia_dir, pkg, "bin") - if os.path.isdir(bin_dir): - os.environ["PATH"] = bin_dir + os.pathsep + os.environ["PATH"] +# On Windows, add NVIDIA CUDA DLL directories to PATH so onnxruntime-gpu can +# find cuDNN/cublas. PyTorch bundles cuDNN in its lib/ dir; pip nvidia-* pkgs +# use bin/. Skipped on macOS/Linux where loader paths handle this. +if sys.platform == "win32": + _site_packages = os.path.join(sys.prefix, "Lib", "site-packages") + _venv_site_packages = os.path.join(project_root, "venv", "Lib", "site-packages") + for _sp in (_site_packages, _venv_site_packages): + _torch_lib = os.path.join(_sp, "torch", "lib") + if os.path.isdir(_torch_lib): + os.environ["PATH"] = _torch_lib + os.pathsep + os.environ["PATH"] + _nvidia_dir = os.path.join(_sp, "nvidia") + if os.path.isdir(_nvidia_dir): + for _pkg in os.listdir(_nvidia_dir): + _bin_dir = os.path.join(_nvidia_dir, _pkg, "bin") + if os.path.isdir(_bin_dir): + os.environ["PATH"] = _bin_dir + os.pathsep + os.environ["PATH"] # Import the tkinter fix to patch the ScreenChanged error import tkinter_fix +from modules import platform_info +platform_info.print_banner() + from modules import core if __name__ == '__main__':