Apple Silicon + Windows CUDA perf: 60 FPS pipeline, cross-platform routing

Bundles CoreML graph rewrites, GPU-accelerated pipeline work, Windows CUDA
fixes, and Mac/Windows runtime routing into a single drop.

CoreML (Apple Silicon):
- Decompose Pad(reflect) → Slice+Concat in inswapper_128 so the model
  runs in one CoreML partition instead of 14 (TEMPORARY: fixed upstream
  in microsoft/onnxruntime#28073, drop when ORT >= 1.26.0).
- Fold Shape/Gather chains to constants in det_10g (21ms → 4ms).
- Decompose Split(axis=1) → Slice pairs in GFPGAN (155ms → 89ms).
- Route detection model to GPU so the ANE is free for the swap model.
- Centralize provider/config selection in create_onnx_session.

Pipeline (all platforms):
- Parallelize face landmark + recognition post-detection; skip landmark_2d_106
  when only face_swapper is active.
- Pipeline face detection with swap for ANE overlap.
- GPU-accelerated paste_back, MJPEG capture, zero-copy display path.
- Standalone pipeline benchmark script.

Windows / CUDA:
- CUDA graphs + FP16 model + all-GPU pipeline for 1080p 60 FPS.
- Auto-detect GPU provider and fix DLL discovery for Windows CUDA execution.

Cross-platform:
- platform_info helper for Mac/Windows runtime routing.
- GFPGAN 30 fps + MSMF camera 60 fps with adaptive pipeline tuning.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Max Buckley
2026-04-22 10:44:59 +02:00
parent 64d3f06089
commit f65aeae5db
13 changed files with 1420 additions and 271 deletions
+178
View File
@@ -0,0 +1,178 @@
"""Standalone pipeline benchmark — no UI required.
Captures 200 frames from the webcam and runs the full face swap pipeline,
printing per-stage timing and effective FPS.
"""
import os, sys, time, cv2, numpy as np, queue, threading
# PATH fix for cuDNN (Windows only)
if sys.platform == "win32":
_sp = os.path.join(sys.prefix, "Lib", "site-packages")
_torch_lib = os.path.join(_sp, "torch", "lib")
if os.path.isdir(_torch_lib):
os.environ["PATH"] = _torch_lib + os.pathsep + os.environ["PATH"]
import insightface
from insightface.app import FaceAnalysis
from insightface.utils import face_align
from modules.processors.frame.face_swapper import _fast_paste_back
from modules import platform_info
platform_info.print_banner()
# Pick providers based on what's actually available on this machine.
if platform_info.HAS_CUDA_PROVIDER:
_providers = ["CUDAExecutionProvider", "CPUExecutionProvider"]
elif platform_info.HAS_COREML_PROVIDER:
_providers = ["CoreMLExecutionProvider", "CPUExecutionProvider"]
else:
_providers = ["CPUExecutionProvider"]
# --- Init models (same as the app) ---
print(f"Loading models with providers={_providers}...")
fa = FaceAnalysis(
name="buffalo_l",
providers=_providers,
allowed_modules=["detection", "recognition", "landmark_2d_106"],
)
fa.prepare(ctx_id=0, det_size=(640, 640))
swap_model = insightface.model_zoo.get_model(
"models/inswapper_128.onnx",
providers=_providers,
)
face_size = swap_model.input_size[0]
aimg_dummy = np.empty((face_size, face_size, 3), dtype=np.uint8)
# --- Camera setup ---
# Windows: DirectShow explicit for MJPEG 1080p60 support.
# macOS/Linux: default backend (AVFoundation / V4L2).
print("Opening camera at 1080p60 MJPEG...")
if sys.platform == "win32":
cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)
else:
cap = cv2.VideoCapture(0)
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"MJPG"))
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080)
cap.set(cv2.CAP_PROP_FPS, 60)
time.sleep(0.5)
# Warmup + get source face
for _ in range(15):
cap.read()
ret, src_frame = cap.read()
faces = fa.get(src_frame)
if not faces:
print("ERROR: No face detected in warmup frame")
cap.release()
sys.exit(1)
source_face = faces[0]
print(f"Source face acquired. Frame: {src_frame.shape}")
# --- Capture thread (same as app) ---
capture_queue = queue.Queue(maxsize=2)
stop_event = threading.Event()
def capture_thread():
while not stop_event.is_set():
ret, frame = cap.read()
if not ret:
break
try:
capture_queue.put_nowait(frame)
except queue.Full:
try: capture_queue.get_nowait()
except queue.Empty: pass
try: capture_queue.put_nowait(frame)
except queue.Full: pass
cap_t = threading.Thread(target=capture_thread, daemon=True)
cap_t.start()
# --- Warmup processing ---
print("Warming up pipeline...")
for _ in range(20):
try:
frame = capture_queue.get(timeout=0.1)
except queue.Empty:
continue
f = frame.copy()
det_faces = fa.get(f)
if det_faces:
tgt = min(det_faces, key=lambda x: x.bbox[0])
bgr_fake, M = swap_model.get(f, tgt, source_face, paste_back=False)
_fast_paste_back(f, bgr_fake, aimg_dummy, M)
# --- Benchmark ---
N = 200
print(f"\nBenchmarking {N} frames...")
t_queue, t_det, t_onnx, t_paste, t_copy, t_cvt, t_total = [], [], [], [], [], [], []
det_count = 0
cached_face = None
for i in range(N):
tt = time.perf_counter()
t0 = time.perf_counter()
try:
frame = capture_queue.get(timeout=0.1)
except queue.Empty:
continue
t_queue.append((time.perf_counter() - t0) * 1000)
# Detection every 3rd frame — det-only (no landmark/recognition)
det_count += 1
if det_count % 3 == 0:
t0 = time.perf_counter()
from insightface.app.common import Face as _Face
bboxes, kpss = fa.det_model.detect(frame, max_num=0, metric='default')
if bboxes.shape[0] > 0:
idx = int(bboxes[:, 0].argmin())
cached_face = _Face(bbox=bboxes[idx, :4], kps=kpss[idx], det_score=bboxes[idx, 4])
t_det.append((time.perf_counter() - t0) * 1000)
if cached_face is not None:
# No frame.copy() — _fast_paste_back writes in-place, we own the frame
t0 = time.perf_counter()
bgr_fake, M = swap_model.get(frame, cached_face, source_face, paste_back=False)
t_onnx.append((time.perf_counter() - t0) * 1000)
t0 = time.perf_counter()
result = _fast_paste_back(frame, bgr_fake, aimg_dummy, M)
t_paste.append((time.perf_counter() - t0) * 1000)
# Display prep — resize then flip (no cvtColor needed)
t0 = time.perf_counter()
small = cv2.resize(result, (640, 360))
_ = small[:, :, ::-1] # BGR→RGB zero-copy
t_cvt.append((time.perf_counter() - t0) * 1000)
t_total.append((time.perf_counter() - tt) * 1000)
stop_event.set()
cap.release()
# --- Results ---
def s(name, arr):
if not arr:
return
avg = sum(arr) / len(arr)
print(f" {name:25s}: avg={avg:6.1f}ms min={min(arr):5.1f}ms max={max(arr):6.1f}ms n={len(arr)}")
print(f"\n{'='*55}")
print(f" 1080p Pipeline Benchmark ({len(t_total)} frames)")
print(f"{'='*55}")
s("queue.get (wait for cam)", t_queue)
s("detection (fa.get)", t_det)
s("frame.copy()", t_copy)
s("ONNX swap", t_onnx)
s("_fast_paste_back", t_paste)
s("cvtColor BGR->RGB", t_cvt)
s("TOTAL per frame", t_total)
avg_total = sum(t_total) / len(t_total)
avg_queue = sum(t_queue) / len(t_queue)
print(f"\n Effective FPS: {1000/avg_total:.1f}")
print(f" FPS (excl. cam wait): {1000/(avg_total - avg_queue):.1f}")
print(f"{'='*55}")
+10 -1
View File
@@ -57,7 +57,7 @@ def parse_args() -> None:
program.add_argument('--live-mirror', help='The live camera display as you see it in the front-facing camera frame', dest='live_mirror', action='store_true', default=False)
program.add_argument('--live-resizable', help='The live camera frame is resizable', dest='live_resizable', action='store_true', default=False)
program.add_argument('--max-memory', help='maximum amount of RAM in GB', dest='max_memory', type=int, default=suggest_max_memory())
program.add_argument('--execution-provider', help='execution provider', dest='execution_provider', default=['cpu'], choices=suggest_execution_providers(), nargs='+')
program.add_argument('--execution-provider', help='execution provider', dest='execution_provider', default=[suggest_default_execution_provider()], choices=suggest_execution_providers(), nargs='+')
program.add_argument('--execution-threads', help='number of execution threads', dest='execution_threads', type=int, default=suggest_execution_threads())
program.add_argument('-v', '--version', action='version', version=f'{modules.metadata.name} {modules.metadata.version}')
@@ -131,6 +131,15 @@ def suggest_max_memory() -> int:
return 16
def suggest_default_execution_provider() -> str:
"""Pick the best available provider: cuda > rocm > coreml > dml > cpu."""
available = encode_execution_providers(onnxruntime.get_available_providers())
for pref in ('cuda', 'rocm', 'coreml', 'dml'):
if pref in available:
return pref
return 'cpu'
def suggest_execution_providers() -> List[str]:
return encode_execution_providers(onnxruntime.get_available_providers())
+128 -6
View File
@@ -16,6 +16,8 @@ from pathlib import Path
FACE_ANALYSER = None
FACE_ANALYSER_LOCK = threading.Lock()
DET_SIZE = (640, 640)
def get_face_analyser() -> Any:
"""Get face analyser with thread-safe initialization."""
@@ -34,22 +36,116 @@ def get_face_analyser() -> Any:
providers=providers,
allowed_modules=['detection', 'recognition', 'landmark_2d_106']
)
FACE_ANALYSER.prepare(ctx_id=0, det_size=(640, 640))
FACE_ANALYSER.prepare(ctx_id=0, det_size=DET_SIZE)
_optimize_det_model(FACE_ANALYSER, providers)
return FACE_ANALYSER
def _optimize_det_model(fa: Any, providers) -> None:
"""Replace the detection model's ONNX session with a CoreML-optimized one.
Folds dynamic Shape→Gather chains into constants (the input size is
fixed at det_size), eliminating CPU↔ANE partition boundaries in the
RetinaFace FPN upsampling path. 21ms → 4ms on M3 Max.
"""
from modules.onnx_optimize import optimize_for_coreml, IS_APPLE_SILICON
if not IS_APPLE_SILICON:
return
det_model = fa.det_model
model_path = getattr(det_model, 'model_file', None)
if model_path is None or not os.path.exists(model_path):
return
input_shape = (1, 3, DET_SIZE[1], DET_SIZE[0])
optimized_path = optimize_for_coreml(model_path, input_shape=input_shape)
if optimized_path == model_path:
return
import onnxruntime
session_options = onnxruntime.SessionOptions()
session_options.graph_optimization_level = (
onnxruntime.GraphOptimizationLevel.ORT_ENABLE_ALL
)
# Route detection to GPU shader cores (CPUAndGPU) instead of ANE.
# This lets detection run concurrently with the swap model on the
# ANE, overlapping the two inference calls. Detection is fast
# enough on GPU (~4ms) and this frees ANE for the heavier swap.
det_providers = []
for p in providers:
name = p[0] if isinstance(p, tuple) else p
if name == "CoreMLExecutionProvider":
det_providers.append((
"CoreMLExecutionProvider",
{"ModelFormat": "MLProgram", "MLComputeUnits": "CPUAndGPU"},
))
else:
det_providers.append(p)
det_model.session = onnxruntime.InferenceSession(
optimized_path, sess_options=session_options, providers=det_providers,
)
def _needs_landmark() -> bool:
"""Check whether any active feature requires 106-point landmarks.
Landmarks are needed by face enhancers and mouth masking, but not
by the face swapper alone.
"""
if getattr(modules.globals, "mouth_mask", False):
return True
processors = getattr(modules.globals, "frame_processors", [])
return any(p in processors for p in
("face_enhancer", "face_enhancer_gpen256", "face_enhancer_gpen512"))
def _is_dml() -> bool:
return any("DmlExecutionProvider" in p for p in modules.globals.execution_providers)
def _analyse_faces(frame: Frame) -> list:
"""Run face detection, then recognition (and optionally landmark).
Replaces InsightFace's ``FaceAnalysis.get()`` to skip the
landmark_2d_106 model when only face_swapper is active (saves ~1ms
per face and avoids an unnecessary ONNX session call).
"""
fa = get_face_analyser()
bboxes, kpss = fa.det_model.detect(frame, max_num=0, metric="default")
if bboxes.shape[0] == 0:
return []
need_landmark = _needs_landmark()
rec_model = fa.models.get("recognition")
lmk_model = fa.models.get("landmark_2d_106") if need_landmark else None
from insightface.app.common import Face
faces = []
for i in range(bboxes.shape[0]):
face = Face(bbox=bboxes[i, 0:4],
kps=kpss[i] if kpss is not None else None,
det_score=bboxes[i, 4])
if rec_model is not None:
rec_model.get(frame, face)
if lmk_model is not None:
lmk_model.get(frame, face)
faces.append(face)
return faces
def get_one_face(frame: Frame) -> Any:
if _is_dml():
with modules.globals.dml_lock:
face = get_face_analyser().get(frame)
faces = _analyse_faces(frame)
else:
face = get_face_analyser().get(frame)
faces = _analyse_faces(frame)
try:
return min(face, key=lambda x: x.bbox[0])
return min(faces, key=lambda x: x.bbox[0])
except ValueError:
return None
@@ -58,12 +154,38 @@ def get_many_faces(frame: Frame) -> Any:
try:
if _is_dml():
with modules.globals.dml_lock:
return get_face_analyser().get(frame)
return _analyse_faces(frame)
else:
return get_face_analyser().get(frame)
return _analyse_faces(frame)
except IndexError:
return None
def detect_one_face_fast(frame: Frame) -> Any:
"""Detection-only — skips landmark and recognition models.
Returns a Face with bbox, kps, det_score (enough for face swap).
~10ms vs ~16ms for full get_one_face() at 1080p.
"""
from insightface.app.common import Face
fa = get_face_analyser()
bboxes, kpss = fa.det_model.detect(frame, max_num=0, metric='default')
if bboxes.shape[0] == 0:
return None
idx = int(bboxes[:, 0].argmin())
return Face(bbox=bboxes[idx, :4], kps=kpss[idx], det_score=bboxes[idx, 4])
def detect_many_faces_fast(frame: Frame) -> Any:
"""Detection-only multi-face — skips landmark and recognition."""
from insightface.app.common import Face
fa = get_face_analyser()
bboxes, kpss = fa.det_model.detect(frame, max_num=0, metric='default')
if bboxes.shape[0] == 0:
return None
return [Face(bbox=bboxes[i, :4], kps=kpss[i], det_score=bboxes[i, 4])
for i in range(bboxes.shape[0])]
def has_valid_map() -> bool:
for map in modules.globals.source_target_map:
if "source" in map and "target" in map:
+20 -14
View File
@@ -18,6 +18,7 @@ Usage
from __future__ import annotations
import os
import cv2
import numpy as np
from typing import Tuple, Optional
@@ -27,20 +28,25 @@ from typing import Tuple, Optional
# ---------------------------------------------------------------------------
CUDA_AVAILABLE: bool = False
try:
# cv2.cuda.GpuMat is only present when OpenCV is compiled with CUDA
_test_mat = cv2.cuda.GpuMat()
# Verify we have the required filter / image-processing functions
_has_gauss = hasattr(cv2.cuda, "createGaussianFilter")
_has_resize = hasattr(cv2.cuda, "resize")
_has_cvt = hasattr(cv2.cuda, "cvtColor")
if _has_gauss and _has_resize and _has_cvt:
CUDA_AVAILABLE = True
print("[gpu_processing] OpenCV CUDA support detected GPU-accelerated processing enabled.")
else:
pass # silently fall back to CPU
except Exception:
pass # silently fall back to CPU
# OpenCV CUDA per-operation acceleration is DISABLED by default.
# Each gpu_* call uploads to GPU, processes, then downloads back to CPU.
# At webcam resolution (~960x540) this upload/download overhead far exceeds
# the time saved on the actual operation, making it slower than pure CPU.
# The heavy lifting (face detection, swap, enhancement) runs on GPU via
# ONNX Runtime's CUDAExecutionProvider, which is where GPU matters.
#
# To force-enable, set OPENCV_CUDA_PROCESSING=1 in your environment.
if os.environ.get("OPENCV_CUDA_PROCESSING") == "1":
try:
_test_mat = cv2.cuda.GpuMat()
_has_gauss = hasattr(cv2.cuda, "createGaussianFilter")
_has_resize = hasattr(cv2.cuda, "resize")
_has_cvt = hasattr(cv2.cuda, "cvtColor")
if _has_gauss and _has_resize and _has_cvt:
CUDA_AVAILABLE = True
print("[gpu_processing] OpenCV CUDA processing enabled via OPENCV_CUDA_PROCESSING=1.")
except Exception:
pass
# ---------------------------------------------------------------------------
+428
View File
@@ -0,0 +1,428 @@
"""ONNX model optimizations for CoreML execution on Apple Silicon.
Two transformations that eliminate CPU↔ANE round-trips:
1. **Pad(reflect) decomposition** — CoreML doesn't support ``Pad(mode=reflect)``.
Models using reflect padding (e.g. inswapper_128) get split into many CoreML
subgraphs with CPU fallbacks between each. We rewrite each ``Pad(reflect)``
as equivalent ``Slice`` + ``Concat`` ops that CoreML handles natively.
Bit-for-bit identical output.
2. **Shape/Gather constant folding** — Dynamic ``Shape`` → ``Gather`` chains
(e.g. for FPN upsample target sizes in RetinaFace) force ops onto CPU even
when the input dimensions are known at load time. We run ONNX shape
inference with the known input size and replace these chains with constants.
Float32-noise-level differences only (max ~6e-6).
Both transformations are cached on disk with a ``_coreml`` suffix so the
rewrite cost is paid only once per model.
"""
import os
import platform
import numpy as np
IS_APPLE_SILICON = platform.system() == "Darwin" and platform.machine() == "arm64"
def optimize_for_coreml(model_path: str, input_shape: tuple = None) -> str:
"""Return path to a CoreML-optimized ONNX model.
Applies all applicable optimizations and caches the result next to
the original model (with ``_coreml`` suffix).
Args:
model_path: Path to the original ONNX model.
input_shape: Optional fixed input shape (e.g. ``(1, 3, 640, 640)``).
When provided, enables Shape/Gather constant folding.
Returns the optimized path, or the original path if no optimizations
apply or we're not on Apple Silicon.
"""
if not IS_APPLE_SILICON:
return model_path
base, ext = os.path.splitext(model_path)
optimized_path = f"{base}_coreml{ext}"
if os.path.exists(optimized_path):
if os.path.getmtime(optimized_path) >= os.path.getmtime(model_path):
return optimized_path
import onnx
from onnx import numpy_helper
model = onnx.load(model_path)
changed = False
if _fold_shape_gather(model, input_shape):
changed = True
# TODO(ort>=1.26): drop this pass. Fixed upstream by microsoft/onnxruntime#28073.
if _decompose_reflect_pad(model):
changed = True
if _decompose_split(model):
changed = True
if not changed:
return model_path
# Preserve insightface's emap convention: the INSwapper class reads
# graph.initializer[-1] as the embedding map. If the original model
# had a (512, 512) matrix as its last initializer, keep it last.
_preserve_emap_position(model, numpy_helper)
onnx.save(model, optimized_path)
return optimized_path
# ---------------------------------------------------------------------------
# Pass 1: Fold Shape → Gather chains into constants
# ---------------------------------------------------------------------------
def _fold_shape_gather(model, input_shape) -> bool:
"""Replace dynamic Shape→Gather chains with constants when input size is known.
Only removes a Shape node when ALL of its consumers are Gather nodes
that are also being folded. This prevents breaking graphs where
a Shape output feeds into other ops as well.
"""
if input_shape is None:
return False
from onnx import numpy_helper, shape_inference
graph = model.graph
# Set fixed input dimensions for shape inference
inp = graph.input[0]
dims = inp.type.tensor_type.shape.dim
for i, size in enumerate(input_shape):
if i < len(dims):
dims[i].dim_value = size
try:
model_inferred = shape_inference.infer_shapes(model)
except Exception:
return False
# Extract inferred shapes
value_shapes = {}
for vi in list(model_inferred.graph.value_info) + list(graph.input) + list(graph.output):
shape_dims = vi.type.tensor_type.shape.dim
shape = []
for d in shape_dims:
if d.dim_value > 0:
shape.append(d.dim_value)
else:
shape.append(None)
value_shapes[vi.name] = shape
inits = {init.name: numpy_helper.to_array(init) for init in graph.initializer}
# Build consumer map: output_name → list of consuming nodes
consumers = {}
for node in graph.node:
for i in node.input:
consumers.setdefault(i, []).append(node)
# Also check graph outputs — an output name consumed by the graph
# output list must not be removed
graph_output_names = {o.name for o in graph.output}
# Find Shape nodes with fully-known output
shape_constants = {}
for node in graph.node:
if node.op_type == "Shape":
inp_shape = value_shapes.get(node.input[0])
if inp_shape and all(isinstance(d, int) for d in inp_shape):
shape_constants[node.output[0]] = np.array(inp_shape, dtype=np.int64)
if not shape_constants:
return False
# Find Gather nodes consuming Shape constants
gather_constants = {}
for node in graph.node:
if node.op_type == "Gather" and node.input[0] in shape_constants:
idx_name = node.input[1]
if idx_name in inits:
idx = int(inits[idx_name])
val = int(shape_constants[node.input[0]][idx])
gather_constants[node.output[0]] = np.array(val, dtype=np.int64)
if not gather_constants:
return False
# Determine which Gather nodes to fold (always safe — we replace
# the output with a constant initializer)
gather_remove_ids = set()
for node in graph.node:
if node.op_type == "Gather" and node.output[0] in gather_constants:
gather_remove_ids.add(id(node))
# Determine which Shape nodes are safe to remove: only if ALL
# consumers of the Shape output are Gather nodes being folded,
# and the output isn't a graph output.
shape_remove_ids = set()
for node in graph.node:
if node.op_type == "Shape" and node.output[0] in shape_constants:
out_name = node.output[0]
if out_name in graph_output_names:
continue
node_consumers = consumers.get(out_name, [])
if all(id(c) in gather_remove_ids for c in node_consumers):
shape_remove_ids.add(id(node))
remove_ids = gather_remove_ids | shape_remove_ids
# Add Gather output constants as initializers
existing = {i.name for i in graph.initializer}
for name, val in gather_constants.items():
if name not in existing:
graph.initializer.append(numpy_helper.from_array(val, name=name))
new_nodes = [n for n in graph.node if id(n) not in remove_ids]
del graph.node[:]
graph.node.extend(new_nodes)
return True
# ---------------------------------------------------------------------------
# Pass 2: Decompose Pad(reflect) → Slice + Concat
#
# TEMPORARY: fixed upstream in microsoft/onnxruntime#28073 (merged 2026-04-20).
# Once the ORT floor is >= 1.26.0, MLProgram handles Pad(mode=reflect) natively
# via MIL tensor_operation.pad and this entire pass can be deleted.
# ---------------------------------------------------------------------------
def _decompose_reflect_pad(model) -> bool:
"""Rewrite Pad(reflect) as Slice+Concat sequences CoreML can handle."""
from onnx import numpy_helper, helper
graph = model.graph
inits = {init.name: numpy_helper.to_array(init) for init in graph.initializer}
reflect_pads = []
for node in graph.node:
if node.op_type == "Pad":
mode = "constant"
for attr in node.attribute:
if attr.name == "mode":
mode = attr.s.decode()
if mode == "reflect" and len(node.input) > 1 and node.input[1] in inits:
reflect_pads.append(node)
if not reflect_pads:
return False
existing_names = {i.name for i in graph.initializer}
def ensure_const(name, value):
if name not in existing_names:
graph.initializer.append(
numpy_helper.from_array(np.array(value, dtype=np.int64), name=name)
)
existing_names.add(name)
ensure_const("_rp_ax2", [2])
ensure_const("_rp_ax3", [3])
max_pad = 0
for node in reflect_pads:
pads = inits[node.input[1]].tolist()
max_pad = max(max_pad, int(pads[2]), int(pads[3]))
for v in range(1, max_pad + 2):
ensure_const(f"_rp_p{v}", [v])
ensure_const(f"_rp_n{v}", [-v])
_counter = [0]
def uid():
_counter[0] += 1
return _counter[0]
pad_ids = {id(n) for n in reflect_pads}
pad_init_names = set()
new_nodes = []
for node in graph.node:
if id(node) not in pad_ids:
new_nodes.append(node)
continue
pads = inits[node.input[1]].tolist()
h_pad, w_pad = int(pads[2]), int(pads[3])
for inp in node.input[1:]:
if inp in inits:
pad_init_names.add(inp)
current = node.input[0]
if h_pad > 0:
top = []
for i in range(h_pad, 0, -1):
name = f"_rp_t{uid()}"
new_nodes.append(helper.make_node(
"Slice",
inputs=[current, f"_rp_p{i}", f"_rp_p{i+1}", "_rp_ax2"],
outputs=[name],
))
top.append(name)
bot = []
for i in range(1, h_pad + 1):
name = f"_rp_b{uid()}"
new_nodes.append(helper.make_node(
"Slice",
inputs=[current, f"_rp_n{i+1}", f"_rp_n{i}", "_rp_ax2"],
outputs=[name],
))
bot.append(name)
h_out = f"_rp_h{uid()}"
new_nodes.append(helper.make_node(
"Concat", inputs=top + [current] + bot, outputs=[h_out], axis=2
))
current = h_out
if w_pad > 0:
left = []
for i in range(w_pad, 0, -1):
name = f"_rp_l{uid()}"
new_nodes.append(helper.make_node(
"Slice",
inputs=[current, f"_rp_p{i}", f"_rp_p{i+1}", "_rp_ax3"],
outputs=[name],
))
left.append(name)
right = []
for i in range(1, w_pad + 1):
name = f"_rp_r{uid()}"
new_nodes.append(helper.make_node(
"Slice",
inputs=[current, f"_rp_n{i+1}", f"_rp_n{i}", "_rp_ax3"],
outputs=[name],
))
right.append(name)
new_nodes.append(helper.make_node(
"Concat",
inputs=left + [current] + right,
outputs=[node.output[0]],
axis=3,
))
elif h_pad > 0:
new_nodes.append(helper.make_node(
"Identity", inputs=[current], outputs=[node.output[0]]
))
# Remove old Pad initializers
clean_inits = [i for i in graph.initializer if i.name not in pad_init_names]
del graph.initializer[:]
graph.initializer.extend(clean_inits)
del graph.node[:]
graph.node.extend(new_nodes)
return True
# ---------------------------------------------------------------------------
# Pass 3: Decompose Split → Slice pairs
# ---------------------------------------------------------------------------
def _decompose_split(model) -> bool:
"""Rewrite Split(axis=1) as Slice pairs that CoreML can handle.
CoreML's EP doesn't support the ONNX ``Split`` op, causing partition
boundaries in models that use channel-wise splits (e.g. GFPGAN's SFT
modulation layers). Each Split with two outputs becomes two Slice ops.
"""
from onnx import numpy_helper, helper
graph = model.graph
splits = []
for node in graph.node:
if node.op_type == "Split":
axis = 0
split_sizes = []
for attr in node.attribute:
if attr.name == "axis":
axis = attr.i
if attr.name == "split":
split_sizes = list(attr.ints)
if axis == 1 and len(split_sizes) == 2 and len(node.output) == 2:
splits.append((node, split_sizes))
if not splits:
return False
existing = {i.name for i in graph.initializer}
def ensure_const(name, value):
if name not in existing:
graph.initializer.append(
numpy_helper.from_array(np.array(value, dtype=np.int64), name=name)
)
existing.add(name)
ensure_const("_sp_ax1", [1])
# Collect all needed boundary constants
for _, (a, b) in splits:
ensure_const(f"_sp_s0", [0])
ensure_const(f"_sp_s{a}", [a])
ensure_const(f"_sp_s{a + b}", [a + b])
split_ids = {id(node) for node, _ in splits}
replacements = {}
for node, (a, b) in splits:
slice0 = helper.make_node(
"Slice",
inputs=[node.input[0], "_sp_s0", f"_sp_s{a}", "_sp_ax1"],
outputs=[node.output[0]],
)
slice1 = helper.make_node(
"Slice",
inputs=[node.input[0], f"_sp_s{a}", f"_sp_s{a + b}", "_sp_ax1"],
outputs=[node.output[1]],
)
replacements[id(node)] = [slice0, slice1]
new_nodes = []
for node in graph.node:
if id(node) in split_ids:
new_nodes.extend(replacements[id(node)])
else:
new_nodes.append(node)
del graph.node[:]
graph.node.extend(new_nodes)
return True
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _preserve_emap_position(model, numpy_helper):
"""Keep the insightface emap (512×512 matrix) as the last initializer."""
graph = model.graph
emap_init = None
for init in graph.initializer:
if not init.name.startswith("_rp_"):
arr = numpy_helper.to_array(init)
if len(arr.shape) == 2 and arr.shape[0] == 512 and arr.shape[1] == 512:
emap_init = init
break
if emap_init is not None:
inits = [i for i in graph.initializer if i.name != emap_init.name]
del graph.initializer[:]
graph.initializer.extend(inits)
graph.initializer.append(emap_init)
+80
View File
@@ -0,0 +1,80 @@
"""Centralized platform + accelerator detection.
Imported once at startup to expose typed flags the rest of the codebase
can branch on without re-querying `platform`, `torch.cuda`, or
`onnxruntime.get_available_providers()` repeatedly.
The banner printed by :func:`print_banner` is the single user-facing
report of which code path the app will take.
"""
from __future__ import annotations
import platform as _platform
import sys
from typing import List, Tuple
IS_WINDOWS: bool = _platform.system() == "Windows"
IS_MACOS: bool = _platform.system() == "Darwin"
IS_LINUX: bool = _platform.system() == "Linux"
IS_APPLE_SILICON: bool = IS_MACOS and _platform.machine() == "arm64"
def _detect_torch_cuda() -> bool:
try:
import torch # noqa: WPS433 — local import, avoid hard dep at module load
return bool(torch.cuda.is_available())
except Exception:
return False
def _detect_onnx_providers() -> List[str]:
try:
import onnxruntime
return list(onnxruntime.get_available_providers())
except Exception:
return []
HAS_TORCH_CUDA: bool = _detect_torch_cuda()
ONNX_PROVIDERS: List[str] = _detect_onnx_providers()
HAS_CUDA_PROVIDER: bool = "CUDAExecutionProvider" in ONNX_PROVIDERS
HAS_COREML_PROVIDER: bool = "CoreMLExecutionProvider" in ONNX_PROVIDERS
HAS_DML_PROVIDER: bool = "DmlExecutionProvider" in ONNX_PROVIDERS
def camera_backends() -> List[Tuple[int, int]]:
"""Return an ordered list of ``(device_index, cv2_backend)`` attempts.
Windows prefers MSMF (60fps capable) with DirectShow as fallback.
macOS/Linux use the default backend (AVFoundation / V4L2).
"""
import cv2
if IS_WINDOWS:
return [
(0, cv2.CAP_MSMF),
(0, cv2.CAP_DSHOW),
(0, cv2.CAP_ANY),
]
return [(0, cv2.CAP_ANY)]
def accelerator_label() -> str:
if HAS_TORCH_CUDA and HAS_CUDA_PROVIDER:
return "CUDA (NVIDIA)"
if IS_APPLE_SILICON and HAS_COREML_PROVIDER:
return "CoreML (Apple Neural Engine)"
if HAS_COREML_PROVIDER:
return "CoreML"
if HAS_DML_PROVIDER:
return "DirectML"
return "CPU"
def print_banner() -> None:
"""Print a one-line summary of the platform + accelerator selection."""
os_label = f"{_platform.system()} {_platform.machine()}"
print(
f"[platform] {os_label} | python {sys.version.split()[0]} | "
f"accelerator: {accelerator_label()} | providers: {ONNX_PROVIDERS}",
flush=True,
)
+25 -18
View File
@@ -36,23 +36,11 @@ def build_provider_config(providers=None):
# Already configured pass through
config.append(p)
elif p == "CUDAExecutionProvider":
config.append((
"CUDAExecutionProvider",
{
# Re-use freed blocks instead of growing the arena
"arena_extend_strategy": "kSameAsRequested",
# One-time exhaustive search for the fastest cuDNN
# convolution algorithm (significant speed-up after
# the first inference pass)
"cudnn_conv_algo_search": "EXHAUSTIVE",
# Allow cuDNN to use more workspace memory for faster
# convolution kernels
"cudnn_conv_use_max_workspace": "1",
# Use a separate CUDA stream for host↔device copies so
# they can overlap with compute kernels
"do_copy_in_default_stream": "0",
},
))
# Use bare provider — ONNX Runtime's defaults are fastest on
# modern GPUs (Blackwell/sm_120). Custom options like
# EXHAUSTIVE cudnn_conv_algo_search hurt performance on these
# architectures.
config.append(p)
elif p == "CoreMLExecutionProvider" and IS_APPLE_SILICON:
config.append((
"CoreMLExecutionProvider",
@@ -103,7 +91,26 @@ def run_inference(session: onnxruntime.InferenceSession,
def create_onnx_session(model_path: str) -> onnxruntime.InferenceSession:
"""Create an ONNX Runtime session with optimised provider config."""
"""Create an ONNX Runtime session with optimised provider config.
On Apple Silicon, applies CoreML graph optimizations (Pad decomposition,
Shape/Gather folding, Split decomposition) to reduce CPU↔ANE partition
boundaries.
"""
if IS_APPLE_SILICON:
from modules.onnx_optimize import optimize_for_coreml
# Infer input shape from the model for Shape/Gather folding
try:
import onnx
m = onnx.load(model_path)
inp = m.graph.input[0]
dims = inp.type.tensor_type.shape.dim
shape = tuple(d.dim_value for d in dims if d.dim_value > 0)
input_shape = shape if len(shape) == 4 else None
except Exception:
input_shape = None
model_path = optimize_for_coreml(model_path, input_shape=input_shape)
providers = build_provider_config()
session_options = onnxruntime.SessionOptions()
session_options.graph_optimization_level = (
+22 -7
View File
@@ -331,6 +331,14 @@ def _run_pipe_pipeline(
'mode': 'in-memory',
})
# Pipelined detection: while processing frame N (swap on
# ANE), start detecting the face in the next frame
# (detection on GPU). They use different hardware units
# so the work overlaps.
detect_executor = ThreadPoolExecutor(max_workers=1)
pending_detect = None
use_pipeline = not modules.globals.many_faces
while True:
raw = reader.stdout.read(frame_size)
if len(raw) != frame_size:
@@ -340,26 +348,33 @@ def _run_pipe_pipeline(
(height, width, 3)
).copy()
# Detect target face once and share across all processors.
# This eliminates the redundant detection that each
# processor would otherwise do internally.
if not modules.globals.many_faces:
target_face = get_one_face(frame)
# Get the detection result for THIS frame
if use_pipeline:
if pending_detect is not None:
target_face = pending_detect.result()
else:
target_face = get_one_face(frame)
# Start detecting on THIS frame eagerly — the result
# will be used for the next iteration. At video
# frame rates the face barely moves between frames.
pending_detect = detect_executor.submit(
get_one_face, frame)
else:
target_face = None # many_faces mode detects all internally
target_face = None
# Run frame through every active processor
for fp in frame_processors:
try:
frame = fp.process_frame(source_face, frame, target_face=target_face)
except TypeError:
# Processor doesn't accept target_face kwarg
frame = fp.process_frame(source_face, frame)
writer.stdin.write(frame.tobytes())
processed_count += 1
progress.update(1)
detect_executor.shutdown(wait=True)
# Graceful shutdown
writer.stdin.close()
writer.wait()
+174 -119
View File
@@ -1,4 +1,3 @@
# --- START OF FILE face_enhancer.py ---
# Uses ONNX Runtime for GFPGAN face enhancement (no torch/gfpgan dependency)
from typing import Any, List
@@ -82,20 +81,10 @@ def get_face_enhancer() -> onnxruntime.InferenceSession:
try:
from modules.processors.frame._onnx_enhancer import (
build_provider_config,
)
providers = build_provider_config()
session_options = onnxruntime.SessionOptions()
session_options.graph_optimization_level = (
onnxruntime.GraphOptimizationLevel.ORT_ENABLE_ALL
create_onnx_session,
)
FACE_ENHANCER = onnxruntime.InferenceSession(
model_path,
sess_options=session_options,
providers=providers,
)
FACE_ENHANCER = create_onnx_session(model_path)
input_info = FACE_ENHANCER.get_inputs()[0]
output_info = FACE_ENHANCER.get_outputs()[0]
@@ -161,6 +150,18 @@ def _align_face(
return aligned_face, affine_matrix
_HAS_TORCH_CUDA = False
try:
import torch
if torch.cuda.is_available():
_HAS_TORCH_CUDA = True
except ImportError:
pass
# Cache the feathered mask — it's the same for every call at a given size
_enhancer_cache: dict = {'mask': None, 'mask_size': 0}
def _paste_back(
frame: Frame,
enhanced_face: np.ndarray,
@@ -170,53 +171,76 @@ def _paste_back(
"""
Paste an enhanced (aligned) face back onto the original frame using the
inverse affine transform with feathered-edge blending.
Optimized: operates on a tight crop around the face bbox instead of the
full frame, and uses GPU for blending when available.
"""
h, w = frame.shape[:2]
# Inverse the affine warp
inv_matrix = cv2.invertAffineTransform(affine_matrix)
inv_restored = cv2.warpAffine(
enhanced_face,
inv_matrix,
(w, h),
borderMode=cv2.BORDER_CONSTANT,
borderValue=(0, 0, 0),
# Build or reuse cached feathered mask
if _enhancer_cache['mask_size'] != output_size:
face_mask = np.ones((output_size, output_size), dtype=np.float32)
border = max(1, int(output_size * 0.05))
ramp_up = np.linspace(0.0, 1.0, border, dtype=np.float32)
ramp_down = np.linspace(1.0, 0.0, border, dtype=np.float32)
face_mask[:border, :] *= ramp_up[:, None]
face_mask[-border:, :] *= ramp_down[:, None]
face_mask[:, :border] *= ramp_up[None, :]
face_mask[:, -border:] *= ramp_down[None, :]
_enhancer_cache['mask'] = face_mask
_enhancer_cache['mask_size'] = output_size
# Compute tight bbox from affine corners (avoids full-frame warpAffine scan)
corners = np.array([[0, 0], [output_size, 0],
[output_size, output_size], [0, output_size]],
dtype=np.float32)
transformed = (inv_matrix[:, :2] @ corners.T).T + inv_matrix[:, 2]
x1 = max(0, int(np.floor(transformed[:, 0].min())))
x2 = min(w, int(np.ceil(transformed[:, 0].max())))
y1 = max(0, int(np.floor(transformed[:, 1].min())))
y2 = min(h, int(np.ceil(transformed[:, 1].max())))
if x1 >= x2 or y1 >= y2:
return frame
# Pad a few pixels for feathering
pad = max(1, int(output_size * 0.05)) + 2
y1p, y2p = max(0, y1 - pad), min(h, y2 + pad)
x1p, x2p = max(0, x1 - pad), min(w, x2 + pad)
crop_w, crop_h = x2p - x1p, y2p - y1p
# Warp enhanced face and mask into crop space only
inv_crop = inv_matrix.copy()
inv_crop[0, 2] -= x1p
inv_crop[1, 2] -= y1p
inv_restored_crop = cv2.warpAffine(
enhanced_face, inv_crop, (crop_w, crop_h),
borderMode=cv2.BORDER_CONSTANT, borderValue=(0, 0, 0),
)
# Build a soft feathered mask in aligned space for edge blending
face_mask = np.ones((output_size, output_size), dtype=np.float32)
# Feather the border (5 % of the size on each edge)
border = max(1, int(output_size * 0.05))
ramp_up = np.linspace(0.0, 1.0, border, dtype=np.float32)
ramp_down = np.linspace(1.0, 0.0, border, dtype=np.float32)
# Top / bottom rows
face_mask[:border, :] *= ramp_up[:, None]
face_mask[-border:, :] *= ramp_down[:, None]
# Left / right columns
face_mask[:, :border] *= ramp_up[None, :]
face_mask[:, -border:] *= ramp_down[None, :]
# Expand to 3-channel
face_mask_3c = np.stack([face_mask] * 3, axis=-1)
# Warp mask back to original frame space
inv_mask = cv2.warpAffine(
face_mask_3c,
inv_matrix,
(w, h),
borderMode=cv2.BORDER_CONSTANT,
borderValue=(0, 0, 0),
inv_mask_crop = cv2.warpAffine(
_enhancer_cache['mask'], inv_crop, (crop_w, crop_h),
borderMode=cv2.BORDER_CONSTANT, borderValue=0.0,
)
inv_mask = np.clip(inv_mask, 0.0, 1.0)
np.clip(inv_mask_crop, 0.0, 1.0, out=inv_mask_crop)
# Alpha-blend
result = (
frame.astype(np.float32) * (1.0 - inv_mask)
+ inv_restored.astype(np.float32) * inv_mask
)
return np.clip(result, 0, 255).astype(np.uint8)
if _HAS_TORCH_CUDA:
# GPU blend on crop only
mask_t = torch.from_numpy(inv_mask_crop).cuda().unsqueeze(2)
enhanced_t = torch.from_numpy(inv_restored_crop).float().cuda()
target_t = torch.from_numpy(frame[y1p:y2p, x1p:x2p]).float().cuda()
blended = (mask_t * enhanced_t + (1.0 - mask_t) * target_t
).to(torch.uint8).cpu().numpy()
frame[y1p:y2p, x1p:x2p] = blended
else:
# CPU blend on crop only
mask_3d = inv_mask_crop[:, :, np.newaxis]
target_crop = frame[y1p:y2p, x1p:x2p].astype(np.float32)
blended = (mask_3d * inv_restored_crop.astype(np.float32)
+ (1.0 - mask_3d) * target_crop)
frame[y1p:y2p, x1p:x2p] = np.clip(blended, 0, 255).astype(np.uint8)
return frame
def _preprocess_face(aligned_face: np.ndarray) -> np.ndarray:
@@ -224,14 +248,13 @@ def _preprocess_face(aligned_face: np.ndarray) -> np.ndarray:
Convert an aligned BGR uint8 face image to the ONNX model input tensor.
Format: NCHW float32, normalised to [-1, 1].
"""
# BGR -> RGB
rgb = cv2.cvtColor(aligned_face, cv2.COLOR_BGR2RGB).astype(np.float32)
# [0, 255] -> [0, 1] -> [-1, 1]
rgb = rgb / 255.0
rgb = (rgb - 0.5) / 0.5
# HWC -> CHW, add batch dim
chw = np.transpose(rgb, (2, 0, 1))
return np.expand_dims(chw, axis=0) # shape: (1, 3, H, W)
# BGR -> RGB, normalize, and transpose in one pass
# Fused: (x / 255.0 - 0.5) / 0.5 = x / 127.5 - 1.0
rgb = aligned_face[:, :, ::-1] # BGR->RGB zero-copy view
chw = np.transpose(rgb, (2, 0, 1)).astype(np.float32)
chw *= (1.0 / 127.5)
chw -= 1.0
return chw[np.newaxis, ...] # shape: (1, 3, H, W)
def _postprocess_face(output: np.ndarray) -> np.ndarray:
@@ -239,24 +262,42 @@ def _postprocess_face(output: np.ndarray) -> np.ndarray:
Convert the ONNX model output tensor back to a BGR uint8 image.
Expects input in NCHW format with values in [-1, 1].
"""
face = np.squeeze(output) # remove batch dim -> (3, H, W)
face = np.transpose(face, (1, 2, 0)) # CHW -> HWC
# [-1, 1] -> [0, 1] -> [0, 255]
face = (face + 1.0) / 2.0
face = np.clip(face * 255.0, 0, 255).astype(np.uint8)
# RGB -> BGR
return cv2.cvtColor(face, cv2.COLOR_RGB2BGR)
# Fused: ((x + 1.0) / 2.0) * 255 = (x + 1.0) * 127.5
face = output[0] # remove batch dim -> (3, H, W)
face = (face + 1.0) * 127.5
np.clip(face, 0, 255, out=face)
face = face.astype(np.uint8).transpose(1, 2, 0) # CHW -> HWC
return face[:, :, ::-1].copy() # RGB -> BGR
def enhance_face(temp_frame: Frame) -> Frame:
"""Enhances all faces in a frame using the GFPGAN ONNX model."""
# Cache for temporal enhancement skipping in live mode.
# GFPGAN output barely changes between consecutive frames (same face,
# same position), so we run inference every _ENH_INTERVAL frames and
# reuse the cached enhanced face + affine matrix in between.
_enh_live_cache: dict = {
'enhanced_bgr': None,
'affine_matrix': None,
'align_size': 0,
'frame_count': 0,
}
_ENH_INTERVAL = 2 # run inference every N frames, paste cached result otherwise
def enhance_face(temp_frame: Frame, detected_faces=None) -> Frame:
"""Enhances all faces in a frame using the GFPGAN ONNX model.
Args:
detected_faces: Pre-detected face list. When provided, skips
the internal detection call (saves ~15-20ms per frame).
Also enables temporal caching — inference runs every
_ENH_INTERVAL frames, reusing the cached result otherwise.
"""
session = get_face_enhancer()
# Determine model input resolution from the session metadata
input_info = session.get_inputs()[0]
input_name = input_info.name
input_shape = input_info.shape # e.g. [1, 3, 512, 512]
# Safely extract input size (handle dynamic / symbolic dimensions)
try:
align_size = int(input_shape[2])
if align_size <= 0:
@@ -264,15 +305,21 @@ def enhance_face(temp_frame: Frame) -> Frame:
except (ValueError, TypeError, IndexError):
align_size = 512
# Detect faces using InsightFace (already a project dependency)
faces = get_many_faces(temp_frame)
# Use pre-detected faces if available, otherwise detect
faces = detected_faces if detected_faces is not None else get_many_faces(temp_frame)
if not faces:
return temp_frame
result_frame = temp_frame.copy()
# Temporal caching: only available when faces are pre-detected (live mode)
use_cache = detected_faces is not None
if use_cache:
_enh_live_cache['frame_count'] += 1
run_inference_this_frame = (_enh_live_cache['frame_count'] % _ENH_INTERVAL == 0
or _enh_live_cache['enhanced_bgr'] is None)
else:
run_inference_this_frame = True
for face in faces:
# Need the 5-point key-points for alignment
if not hasattr(face, "kps") or face.kps is None:
continue
@@ -280,54 +327,65 @@ def enhance_face(temp_frame: Frame) -> Frame:
if landmarks_5.shape[0] < 5:
continue
# Align / crop the face at the model's INPUT resolution
aligned_face, affine_matrix = _align_face(
temp_frame, landmarks_5, output_size=align_size
)
if aligned_face is None or affine_matrix is None:
continue
try:
with THREAD_SEMAPHORE:
from modules.processors.frame._onnx_enhancer import (
run_inference,
)
input_tensor = _preprocess_face(aligned_face)
output_tensor = run_inference(session, input_name, input_tensor)
enhanced_bgr = _postprocess_face(output_tensor)
# The model may output at a different resolution than its input
# (e.g. input 512x512 → output 1024x1024). Resize the enhanced
# face back to the alignment size so the inverse affine maps
# correctly.
eh, ew = enhanced_bgr.shape[:2]
if eh != align_size or ew != align_size:
enhanced_bgr = cv2.resize(
enhanced_bgr,
(align_size, align_size),
interpolation=cv2.INTER_LANCZOS4,
)
# Paste enhanced face back onto the frame
result_frame = _paste_back(
result_frame, enhanced_bgr, affine_matrix, output_size=align_size
if run_inference_this_frame:
aligned_face, affine_matrix = _align_face(
temp_frame, landmarks_5, output_size=align_size
)
except Exception as e:
print(f"{NAME}: Error enhancing a face: {e}")
continue
if aligned_face is None or affine_matrix is None:
continue
return result_frame
try:
with THREAD_SEMAPHORE:
from modules.processors.frame._onnx_enhancer import (
run_inference,
)
input_tensor = _preprocess_face(aligned_face)
output_tensor = run_inference(session, input_name, input_tensor)
enhanced_bgr = _postprocess_face(output_tensor)
eh, ew = enhanced_bgr.shape[:2]
if eh != align_size or ew != align_size:
enhanced_bgr = cv2.resize(
enhanced_bgr,
(align_size, align_size),
interpolation=cv2.INTER_LANCZOS4,
)
# Cache for reuse on next frame
if use_cache:
_enh_live_cache['enhanced_bgr'] = enhanced_bgr
_enh_live_cache['affine_matrix'] = affine_matrix
_enh_live_cache['align_size'] = align_size
_paste_back(
temp_frame, enhanced_bgr, affine_matrix, output_size=align_size
)
except Exception as e:
print(f"{NAME}: Error enhancing a face: {e}")
continue
else:
# Reuse cached enhanced face — just paste back onto current frame
cached = _enh_live_cache
if cached['enhanced_bgr'] is not None:
_paste_back(
temp_frame, cached['enhanced_bgr'],
cached['affine_matrix'],
output_size=cached['align_size'],
)
break # single-face live mode — only process first face
def process_frame(source_face: Face | None, temp_frame: Frame) -> Frame:
"""Processes a frame: enhances face if detected."""
temp_frame = enhance_face(temp_frame)
return temp_frame
def process_frame_v2(temp_frame: Frame) -> Frame:
def process_frame(source_face: Face | None, temp_frame: Frame,
detected_faces=None) -> Frame:
"""Processes a frame: enhances face if detected."""
return enhance_face(temp_frame, detected_faces=detected_faces)
def process_frame_v2(temp_frame: Frame, detected_faces=None) -> Frame:
"""Processes a frame without source face (used by live webcam preview)."""
return enhance_face(temp_frame)
return enhance_face(temp_frame, detected_faces=detected_faces)
def process_frames(
@@ -378,6 +436,3 @@ def process_video(
modules.processors.frame.core.process_video(
source_path, temp_frame_paths, process_frames
)
# --- END OF FILE face_enhancer.py ---
+227 -64
View File
@@ -1,7 +1,7 @@
from typing import Any, List, Optional
import cv2
import insightface
from insightface.utils import face_align
import logging
import threading
import numpy as np
import platform
@@ -86,21 +86,28 @@ def get_face_swapper() -> Any:
with THREAD_LOCK:
if FACE_SWAPPER is None:
# Prefer FP32 for broad GPU compatibility (FP16 can produce NaN
# on GPUs without Tensor Cores, e.g. GTX 16xx). Fall back to
# FP16 when FP32 is not available.
# Prefer FP16 on GPUs with Tensor Cores (Turing+) — half the
# memory bandwidth, faster inference. Fall back to FP32 for
# older GPUs (e.g. GTX 16xx) where FP16 can produce NaN.
fp32_path = os.path.join(models_dir, "inswapper_128.onnx")
fp16_path = os.path.join(models_dir, "inswapper_128_fp16.onnx")
if os.path.exists(fp32_path):
model_path = fp32_path
elif os.path.exists(fp16_path):
use_fp16 = _HAS_TORCH_CUDA and os.path.exists(fp16_path)
if use_fp16:
model_path = fp16_path
elif os.path.exists(fp32_path):
model_path = fp32_path
else:
update_status(f"No inswapper model found in {models_dir}.", NAME)
return None
# On Apple Silicon, rewrite Pad(reflect) → Slice+Concat so
# CoreML can run the entire model in a single partition on
# the Neural Engine instead of bouncing between CPU and ANE.
if IS_APPLE_SILICON:
from modules.onnx_optimize import optimize_for_coreml
model_path = optimize_for_coreml(model_path)
update_status(f"Loading face swapper model from: {model_path}", NAME)
try:
# Optimized provider configuration for Apple Silicon
providers_config = []
for p in modules.globals.execution_providers:
if p == "CoreMLExecutionProvider" and IS_APPLE_SILICON:
@@ -116,21 +123,22 @@ def get_face_swapper() -> Any:
}
))
elif p == "CUDAExecutionProvider":
providers_config.append((
"CUDAExecutionProvider",
{
"arena_extend_strategy": "kSameAsRequested",
"cudnn_conv_algo_search": "EXHAUSTIVE",
"cudnn_conv_use_max_workspace": "1",
"do_copy_in_default_stream": "0",
}
))
# Use bare provider — ONNX Runtime defaults are
# fastest on modern GPUs (Blackwell/sm_120).
providers_config.append(p)
else:
providers_config.append(p)
FACE_SWAPPER = insightface.model_zoo.get_model(
model_path,
providers=providers_config,
)
# Set up CUDA graph session for faster inference
if _HAS_TORCH_CUDA and any(
p == "CUDAExecutionProvider" or
(isinstance(p, tuple) and p[0] == "CUDAExecutionProvider")
for p in providers_config
):
_init_cuda_graph_session(model_path, FACE_SWAPPER)
update_status("Face swapper model loaded successfully.", NAME)
except Exception as e:
update_status(f"Error loading face swapper model: {e}", NAME)
@@ -139,63 +147,204 @@ def get_face_swapper() -> Any:
return FACE_SWAPPER
_HAS_TORCH_CUDA = False
try:
import torch
if torch.cuda.is_available():
_HAS_TORCH_CUDA = True
except ImportError:
pass
# Cache for paste-back
_paste_cache = {
'mask_white': None, # pre-allocated white image
}
# CUDA graph swap session cache
_cuda_graph_session = {
'session': None,
'io_binding': None,
'ort_input': None,
'ort_latent': None,
'recorded': False,
}
def _init_cuda_graph_session(model_path: str, swapper):
"""Create a CUDA-graph-enabled ONNX session for the swap model.
CUDA graphs record the GPU kernel launch sequence once, then replay it
with near-zero CPU overhead on subsequent runs. Requires static input
shapes (inswapper is always 1x3x128x128 + 1x512).
"""
import onnxruntime as ort
try:
providers = [('CUDAExecutionProvider', {'enable_cuda_graph': '1'})]
sess = ort.InferenceSession(model_path, providers=providers)
# Pre-allocate GPU buffers with correct shapes
inp_shape = (1, 3, swapper.input_size[1], swapper.input_size[0])
latent_shape = (1, 512)
dummy_inp = np.zeros(inp_shape, dtype=np.float32)
dummy_lat = np.zeros(latent_shape, dtype=np.float32)
ort_input = ort.OrtValue.ortvalue_from_numpy(dummy_inp, 'cuda', 0)
ort_latent = ort.OrtValue.ortvalue_from_numpy(dummy_lat, 'cuda', 0)
io = sess.io_binding()
io.bind_ortvalue_input(swapper.input_names[0], ort_input)
io.bind_ortvalue_input(swapper.input_names[1], ort_latent)
io.bind_output(swapper.output_names[0], 'cuda', 0)
# First run records the CUDA graph
sess.run_with_iobinding(io)
_cuda_graph_session['session'] = sess
_cuda_graph_session['io_binding'] = io
_cuda_graph_session['ort_input'] = ort_input
_cuda_graph_session['ort_latent'] = ort_latent
_cuda_graph_session['recorded'] = True
# Monkey-patch the swapper's session.run to use CUDA graph replay
_original_run = swapper.session.run
def _graph_run(output_names, input_dict, **kwargs):
if _cuda_graph_session['recorded']:
try:
# input_dict has 'target' (blob) and 'source' (latent)
keys = list(input_dict.keys())
blob = input_dict[keys[0]]
latent = input_dict[keys[1]]
return [_cuda_graph_swap_inference(blob, latent)]
except Exception:
pass
return _original_run(output_names, input_dict, **kwargs)
swapper.session.run = _graph_run
import sys
print(f"[{NAME}] CUDA graph session initialized (swap model)")
sys.stdout.flush()
except Exception as e:
print(f"[{NAME}] CUDA graph init failed, using standard session: {e}")
_cuda_graph_session['recorded'] = False
def _cuda_graph_swap_inference(blob: np.ndarray, latent: np.ndarray) -> np.ndarray:
"""Run swap model via CUDA graph replay — minimal CPU overhead."""
cg = _cuda_graph_session
cg['ort_input'].update_inplace(blob)
cg['ort_latent'].update_inplace(latent)
cg['session'].run_with_iobinding(cg['io_binding'])
return cg['io_binding'].get_outputs()[0].numpy()
def _fast_paste_back(target_img: Frame, bgr_fake: np.ndarray, aimg: np.ndarray, M: np.ndarray) -> Frame:
"""Optimized paste-back that restricts blending to the face bounding box.
"""GPU-accelerated paste-back that restricts blending to the face bounding box.
Same visual output as insightface's built-in paste_back, but:
- Skips dead fake_diff code (computed but unused in insightface)
- Runs erosion, blur, and blend on the face bbox instead of the full frame
- Uses torch CUDA for warpAffine + blend when available
- Writes directly into target_img to avoid full-frame copy
"""
h, w = target_img.shape[:2]
face_h, face_w = aimg.shape[:2]
IM = cv2.invertAffineTransform(M)
# Warp swapped face and mask to full frame (fast: ~0.4ms each)
bgr_fake_full = cv2.warpAffine(bgr_fake, IM, (w, h), borderValue=0.0)
img_white = np.full((aimg.shape[0], aimg.shape[1]), 255, dtype=np.float32)
img_white_full = cv2.warpAffine(img_white, IM, (w, h), borderValue=0.0)
# Reuse pre-allocated white mask
if _paste_cache['mask_white'] is None or _paste_cache['mask_white'].shape != (face_h, face_w):
_paste_cache['mask_white'] = np.full((face_h, face_w), 255, dtype=np.float32)
# Find tight bounding box of the warped face mask
rows = np.any(img_white_full > 20, axis=1)
cols = np.any(img_white_full > 20, axis=0)
row_idx = np.where(rows)[0]
col_idx = np.where(cols)[0]
if len(row_idx) == 0 or len(col_idx) == 0:
if _HAS_TORCH_CUDA:
# GPU path: compute bbox from affine matrix (avoids warpAffine + scan on white mask)
corners = np.array([[0, 0], [face_w, 0], [face_w, face_h], [0, face_h]], dtype=np.float32)
transformed = (IM[:, :2] @ corners.T).T + IM[:, 2]
x1 = int(np.floor(transformed[:, 0].min()))
x2 = int(np.ceil(transformed[:, 0].max()))
y1 = int(np.floor(transformed[:, 1].min()))
y2 = int(np.ceil(transformed[:, 1].max()))
if x1 >= x2 or y1 >= y2:
return target_img
mask_h = y2 - y1
mask_w = x2 - x1
mask_size = int(np.sqrt(mask_h * mask_w))
k_erode = max(mask_size // 10, 10)
k_blur = max(mask_size // 20, 5)
pad = k_erode + k_blur + 2
y1p, y2p = max(0, y1 - pad), min(h, y2 + pad + 1)
x1p, x2p = max(0, x1 - pad), min(w, x2 + pad + 1)
# Warp face and mask into crop region only (CPU — fast on small image)
IM_crop = IM.copy()
IM_crop[0, 2] -= x1p
IM_crop[1, 2] -= y1p
crop_w, crop_h = x2p - x1p, y2p - y1p
bgr_fake_crop = cv2.warpAffine(bgr_fake, IM_crop, (crop_w, crop_h), borderValue=0.0)
mask_crop = cv2.warpAffine(_paste_cache['mask_white'], IM_crop, (crop_w, crop_h), borderValue=0.0)
# All mask processing + blend on GPU (no CPU roundtrips)
mask_t = torch.from_numpy(mask_crop).cuda()
mask_t = torch.where(mask_t > 20, 255.0, 0.0)
orig_h, orig_w = mask_t.shape
# Erode via negative max_pool (equivalent to min_pool)
m4 = mask_t.unsqueeze(0).unsqueeze(0)
m4 = -torch.nn.functional.max_pool2d(-m4, kernel_size=k_erode, stride=1, padding=k_erode // 2)
# Gaussian blur approximation via avg_pool
bk = 2 * k_blur + 1
m4 = torch.nn.functional.avg_pool2d(m4, kernel_size=bk, stride=1, padding=bk // 2)
# Fix any padding-induced size mismatch
m4 = m4[:, :, :orig_h, :orig_w]
mask_3d = (m4.squeeze() * (1.0 / 255.0)).unsqueeze(2)
fake_t = torch.from_numpy(bgr_fake_crop).float().cuda()
tgt_t = torch.from_numpy(target_img[y1p:y2p, x1p:x2p]).float().cuda()
blended = (mask_3d * fake_t + (1.0 - mask_3d) * tgt_t).to(torch.uint8).cpu().numpy()
target_img[y1p:y2p, x1p:x2p] = blended
return target_img
y1, y2 = row_idx[0], row_idx[-1]
x1, x2 = col_idx[0], col_idx[-1]
else:
# CPU fallback
bgr_fake_full = cv2.warpAffine(bgr_fake, IM, (w, h), borderValue=0.0)
img_white_full = cv2.warpAffine(_paste_cache['mask_white'], IM, (w, h), borderValue=0.0)
# Compute mask/blur kernel sizes from the full mask extent
mask_h = y2 - y1
mask_w = x2 - x1
mask_size = int(np.sqrt(mask_h * mask_w))
k_erode = max(mask_size // 10, 10)
k_blur = max(mask_size // 20, 5)
rows = np.any(img_white_full > 20, axis=1)
cols = np.any(img_white_full > 20, axis=0)
row_idx = np.where(rows)[0]
col_idx = np.where(cols)[0]
if len(row_idx) == 0 or len(col_idx) == 0:
return target_img
y1, y2 = row_idx[0], row_idx[-1]
x1, x2 = col_idx[0], col_idx[-1]
# Add padding for erosion + blur kernels, then crop
pad = k_erode + k_blur + 2
y1p, y2p = max(0, y1 - pad), min(h, y2 + pad + 1)
x1p, x2p = max(0, x1 - pad), min(w, x2 + pad + 1)
mask_h = y2 - y1
mask_w = x2 - x1
mask_size = int(np.sqrt(mask_h * mask_w))
k_erode = max(mask_size // 10, 10)
k_blur = max(mask_size // 20, 5)
# Work on cropped region only
mask_crop = img_white_full[y1p:y2p, x1p:x2p]
mask_crop[mask_crop > 20] = 255
pad = k_erode + k_blur + 2
y1p, y2p = max(0, y1 - pad), min(h, y2 + pad + 1)
x1p, x2p = max(0, x1 - pad), min(w, x2 + pad + 1)
kernel = np.ones((k_erode, k_erode), np.uint8)
mask_crop = cv2.erode(mask_crop, kernel, iterations=1)
mask_crop = img_white_full[y1p:y2p, x1p:x2p]
mask_crop[mask_crop > 20] = 255
mask_crop = cv2.erode(mask_crop, np.ones((k_erode, k_erode), np.uint8), iterations=1)
mask_crop = cv2.GaussianBlur(mask_crop, (2*k_blur+1, 2*k_blur+1), 0)
mask_crop *= (1.0 / 255.0)
blur_size = tuple(2 * i + 1 for i in (k_blur, k_blur))
mask_crop = cv2.GaussianBlur(mask_crop, blur_size, 0)
mask_crop /= 255.0
# Blend only within the crop
mask_3d = mask_crop[:, :, np.newaxis]
fake_crop = bgr_fake_full[y1p:y2p, x1p:x2p].astype(np.float32)
target_crop = target_img[y1p:y2p, x1p:x2p].astype(np.float32)
blended = mask_3d * fake_crop + (1.0 - mask_3d) * target_crop
result = target_img.copy()
result[y1p:y2p, x1p:x2p] = np.clip(blended, 0, 255).astype(np.uint8)
return result
mask_3d = mask_crop[:, :, np.newaxis]
fake_crop = bgr_fake_full[y1p:y2p, x1p:x2p].astype(np.float32)
target_crop = target_img[y1p:y2p, x1p:x2p].astype(np.float32)
blended = mask_3d * fake_crop + (1.0 - mask_3d) * target_crop
# Write in-place, consistent with the GPU path
target_img[y1p:y2p, x1p:x2p] = np.clip(blended, 0, 255).astype(np.uint8)
return target_img
def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
@@ -211,11 +360,16 @@ def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
if not hasattr(source_face, 'normed_embedding') or source_face.normed_embedding is None:
return temp_frame
# Store a copy of the original frame before swapping for opacity blending and mouth mask
# _fast_paste_back writes in-place on the GPU path. Only copy when
# mouth_mask or opacity < 1 need an unmodified original.
opacity = getattr(modules.globals, "opacity", 1.0)
opacity = max(0.0, min(1.0, opacity))
mouth_mask_enabled = getattr(modules.globals, "mouth_mask", False)
original_frame = temp_frame.copy() if (opacity < 1.0 or mouth_mask_enabled) else temp_frame
needs_original = opacity < 1.0 or mouth_mask_enabled
if needs_original:
original_frame = temp_frame.copy()
else:
original_frame = temp_frame
if temp_frame.dtype != np.uint8:
temp_frame = np.clip(temp_frame, 0, 255).astype(np.uint8)
@@ -241,11 +395,12 @@ def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
if not isinstance(bgr_fake, np.ndarray):
return original_frame
# Get the aligned input crop for the mask (same as insightface does internally)
aimg, _ = face_align.norm_crop2(temp_frame, target_face.kps, face_swapper.input_size[0])
# Pass a dummy aimg with correct shape — _fast_paste_back only uses aimg.shape
# to create the white mask. Avoids redundant norm_crop2 (~0.6ms).
_face_size = face_swapper.input_size[0]
_aimg_dummy = np.empty((_face_size, _face_size, 3), dtype=np.uint8)
swapped_frame = _fast_paste_back(temp_frame, bgr_fake, aimg, M)
swapped_frame = np.clip(swapped_frame, 0, 255).astype(np.uint8)
swapped_frame = _fast_paste_back(temp_frame, bgr_fake, _aimg_dummy, M)
except Exception as e:
print(f"Error during face swap: {e}")
@@ -356,6 +511,14 @@ def apply_post_processing(current_frame: Frame, swapped_face_bboxes: List[np.nda
"""Applies sharpening and interpolation with Apple Silicon optimizations."""
global PREVIOUS_FRAME_RESULT
sharpness_value = getattr(modules.globals, "sharpness", 0.0)
enable_interpolation = getattr(modules.globals, "enable_interpolation", False)
# Skip copy when no post-processing is active
if sharpness_value <= 0.0 and not enable_interpolation:
PREVIOUS_FRAME_RESULT = None
return current_frame
processed_frame = current_frame.copy()
# 1. Apply Sharpening (if enabled) with optimized kernel for Apple Silicon
+53 -29
View File
@@ -17,6 +17,8 @@ import modules.metadata
from modules.face_analyser import (
get_one_face,
get_many_faces,
detect_one_face_fast,
detect_many_faces_fast,
get_unique_faces_from_target_image,
get_unique_faces_from_target_video,
add_blank_map,
@@ -1097,10 +1099,16 @@ def _capture_thread_func(cap, capture_queue, stop_event):
pass
def _processing_thread_func(capture_queue, processed_queue, stop_event):
def _processing_thread_func(capture_queue, processed_queue, stop_event,
camera_fps: float = 30.0):
"""Processing thread: takes raw frames from capture_queue, runs face
detection (throttled to every 3rd frame), applies face swap/enhancement,
and puts results into processed_queue."""
detection (throttled), applies face swap/enhancement, and puts results
into processed_queue.
Args:
camera_fps: Actual camera frame rate — used to compute how many
frames to skip between face detections (~80ms target).
"""
frame_processors = get_frame_processors_modules(modules.globals.frame_processors)
source_image = None
last_source_path = None
@@ -1111,6 +1119,9 @@ def _processing_thread_func(capture_queue, processed_queue, stop_event):
det_count = 0
cached_target_face = None
cached_many_faces = None
# Detect every N frames ≈ 80ms. At 60fps → every 5 frames (83ms),
# at 30fps → every 3 frames (100ms), at 15fps → every frame.
det_interval = max(1, round(camera_fps * 0.08))
while not stop_event.is_set():
try:
@@ -1128,26 +1139,37 @@ def _processing_thread_func(capture_queue, processed_queue, stop_event):
last_source_path = modules.globals.source_path
source_image = get_one_face(cv2.imread(modules.globals.source_path))
# Run detection every 3 frames, reuse cached result otherwise
# Run detection every det_interval frames (~80ms).
# Use fast detection (det-only, no landmark/recognition) for live mode.
det_count += 1
if det_count % 3 == 0:
if det_count % det_interval == 0:
if modules.globals.many_faces:
cached_target_face = None
cached_many_faces = get_many_faces(temp_frame)
cached_many_faces = detect_many_faces_fast(temp_frame)
else:
cached_target_face = get_one_face(temp_frame)
cached_target_face = detect_one_face_fast(temp_frame)
cached_many_faces = None
# Build face list for enhancers from cached detection
_cached_faces = None
if cached_many_faces:
_cached_faces = cached_many_faces
elif cached_target_face is not None:
_cached_faces = [cached_target_face]
for frame_processor in frame_processors:
if frame_processor.NAME == "DLC.FACE-ENHANCER":
if modules.globals.fp_ui["face_enhancer"]:
temp_frame = frame_processor.process_frame(None, temp_frame)
temp_frame = frame_processor.process_frame(
None, temp_frame, detected_faces=_cached_faces)
elif frame_processor.NAME == "DLC.FACE-ENHANCER-GPEN256":
if modules.globals.fp_ui.get("face_enhancer_gpen256", False):
temp_frame = frame_processor.process_frame(None, temp_frame)
temp_frame = frame_processor.process_frame(
None, temp_frame, detected_faces=_cached_faces)
elif frame_processor.NAME == "DLC.FACE-ENHANCER-GPEN512":
if modules.globals.fp_ui.get("face_enhancer_gpen512", False):
temp_frame = frame_processor.process_frame(None, temp_frame)
temp_frame = frame_processor.process_frame(
None, temp_frame, detected_faces=_cached_faces)
elif frame_processor.NAME == "DLC.FACE-SWAPPER":
# Use cached face positions from detection thread
swapped_bboxes = []
@@ -1198,6 +1220,10 @@ def _processing_thread_func(capture_queue, processed_queue, stop_event):
2,
)
# BGR→RGB in the processing thread so the display thread gets
# a contiguous RGB array (faster PIL.fromarray).
temp_frame = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB)
# Put processed frame into output queue, dropping old frames if full
try:
processed_queue.put_nowait(temp_frame)
@@ -1216,10 +1242,13 @@ def create_webcam_preview(camera_index: int):
global preview_label, PREVIEW
cap = VideoCapturer(camera_index)
if not cap.start(PREVIEW_DEFAULT_WIDTH, PREVIEW_DEFAULT_HEIGHT, 60):
if not cap.start(1920, 1080, 60):
update_status("Failed to start camera")
return
camera_fps = cap.actual_fps
print(f"[webcam] Camera running at {cap.actual_width}x{cap.actual_height}@{camera_fps:.0f}fps")
preview_label.configure(width=PREVIEW_DEFAULT_WIDTH, height=PREVIEW_DEFAULT_HEIGHT)
PREVIEW.deiconify()
@@ -1240,7 +1269,7 @@ def create_webcam_preview(camera_index: int):
# Start processing thread
proc_thread = threading.Thread(
target=_processing_thread_func,
args=(capture_queue, processed_queue, stop_event),
args=(capture_queue, processed_queue, stop_event, camera_fps),
daemon=True,
)
proc_thread.start()
@@ -1253,37 +1282,32 @@ def create_webcam_preview(camera_index: int):
cap.release()
PREVIEW.withdraw()
# Poll at ~2x camera FPS (Nyquist) so we pick up frames promptly
# without burning CPU. Clamped to [1, 16] ms.
poll_ms = max(1, min(16, int(500 / camera_fps)))
# Non-blocking display loop using ROOT.after() — avoids blocking the
# Tk event loop which could cause UI freezes or re-entrancy issues
# Tk event loop which could cause UI freezes or re-entrancy issues.
def _display_next_frame():
if stop_event.is_set() or PREVIEW.state() == "withdrawn":
_cleanup()
return
try:
temp_frame = processed_queue.get_nowait()
rgb_frame = processed_queue.get_nowait()
except queue.Empty:
ROOT.after(16, _display_next_frame)
ROOT.after(poll_ms, _display_next_frame)
return
if modules.globals.live_resizable:
temp_frame = fit_image_to_size(
temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height()
)
else:
temp_frame = fit_image_to_size(
temp_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height()
)
temp_frame = temp_frame.copy()
image = gpu_cvt_color(temp_frame, cv2.COLOR_BGR2RGB)
image = Image.fromarray(image)
image = ImageOps.contain(
image, (temp_frame.shape[1], temp_frame.shape[0]), Image.LANCZOS
# Frame is already RGB from processing thread; resize to preview window
rgb_frame = fit_image_to_size(
rgb_frame, PREVIEW.winfo_width(), PREVIEW.winfo_height()
)
image = Image.fromarray(rgb_frame)
image = ctk.CTkImage(image, size=image.size)
preview_label.configure(image=image)
ROOT.after(16, _display_next_frame)
ROOT.after(poll_ms, _display_next_frame)
# Kick off the non-blocking display loop
ROOT.after(0, _display_next_frame)
+56 -6
View File
@@ -1,5 +1,7 @@
import cv2
import numpy as np
import sys
import time
from typing import Optional, Tuple, Callable
import platform
import threading
@@ -17,6 +19,10 @@ class VideoCapturer:
self._frame_ready = threading.Event()
self.is_running = False
self.cap = None
# Actual values reported by the camera after configuration
self.actual_width: int = 0
self.actual_height: int = 0
self.actual_fps: float = 0.0
# Initialize Windows-specific components if on Windows
if platform.system() == "Windows":
@@ -32,12 +38,14 @@ class VideoCapturer:
"""Initialize and start video capture"""
try:
if platform.system() == "Windows":
# Windows-specific capture methods
# Windows-specific capture methods.
# MSMF (Media Foundation) is preferred — DirectShow often
# caps at 30fps even when the camera supports 60fps.
capture_methods = [
(self.device_index, cv2.CAP_DSHOW), # Try DirectShow first
(self.device_index, cv2.CAP_ANY), # Then try default backend
(-1, cv2.CAP_ANY), # Try -1 as fallback
(0, cv2.CAP_ANY), # Finally try 0 without specific backend
(self.device_index, cv2.CAP_MSMF), # Media Foundation first
(self.device_index, cv2.CAP_DSHOW), # DirectShow fallback
(self.device_index, cv2.CAP_ANY),
(0, cv2.CAP_ANY),
]
for dev_id, backend in capture_methods:
@@ -55,11 +63,30 @@ class VideoCapturer:
if not self.cap or not self.cap.isOpened():
raise RuntimeError("Failed to open camera")
# Configure format
# Try MJPEG first — avoids USB bandwidth limits with
# uncompressed YUV at high resolutions. Falls back silently
# if the camera/backend doesn't support it.
self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG'))
# Request desired resolution and frame rate
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
self.cap.set(cv2.CAP_PROP_FPS, fps)
# Read back resolution (usually reliable)
self.actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
self.actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# CAP_PROP_FPS is unreliable on DirectShow — often reports 30
# even when the camera delivers 60. Measure empirically by
# timing a burst of frames.
reported_fps = self.cap.get(cv2.CAP_PROP_FPS)
self.actual_fps = self._measure_fps(warmup=10, sample=30,
fallback=reported_fps or fps)
print(f"[VideoCapturer] {self.actual_width}x{self.actual_height} "
f"@ {self.actual_fps:.1f}fps (reported={reported_fps:.0f})",
flush=True)
self.is_running = True
return True
@@ -89,6 +116,29 @@ class VideoCapturer:
self.is_running = False
self.cap = None
def _measure_fps(self, warmup: int = 10, sample: int = 30,
fallback: float = 30.0) -> float:
"""Read warmup+sample frames and return measured FPS.
This is more reliable than CAP_PROP_FPS which often lies on
DirectShow. Takes ~0.5-1s at startup but gives a ground-truth
number for adaptive polling/detection intervals.
"""
try:
for _ in range(warmup):
self.cap.read()
t0 = time.perf_counter()
for _ in range(sample):
ret, _ = self.cap.read()
if not ret:
return fallback
elapsed = time.perf_counter() - t0
if elapsed <= 0:
return fallback
return sample / elapsed
except Exception:
return fallback
def set_frame_callback(self, callback: Callable[[np.ndarray], None]) -> None:
"""Set callback for frame processing"""
self.frame_callback = callback
+19 -7
View File
@@ -7,17 +7,29 @@ import sys
project_root = os.path.dirname(os.path.abspath(__file__))
os.environ["PATH"] = project_root + os.pathsep + os.environ.get("PATH", "")
# Add NVIDIA CUDA DLL directories to PATH so onnxruntime-gpu can find them
nvidia_dir = os.path.join(project_root, "venv", "Lib", "site-packages", "nvidia")
if os.path.isdir(nvidia_dir):
for pkg in os.listdir(nvidia_dir):
bin_dir = os.path.join(nvidia_dir, pkg, "bin")
if os.path.isdir(bin_dir):
os.environ["PATH"] = bin_dir + os.pathsep + os.environ["PATH"]
# On Windows, add NVIDIA CUDA DLL directories to PATH so onnxruntime-gpu can
# find cuDNN/cublas. PyTorch bundles cuDNN in its lib/ dir; pip nvidia-* pkgs
# use bin/. Skipped on macOS/Linux where loader paths handle this.
if sys.platform == "win32":
_site_packages = os.path.join(sys.prefix, "Lib", "site-packages")
_venv_site_packages = os.path.join(project_root, "venv", "Lib", "site-packages")
for _sp in (_site_packages, _venv_site_packages):
_torch_lib = os.path.join(_sp, "torch", "lib")
if os.path.isdir(_torch_lib):
os.environ["PATH"] = _torch_lib + os.pathsep + os.environ["PATH"]
_nvidia_dir = os.path.join(_sp, "nvidia")
if os.path.isdir(_nvidia_dir):
for _pkg in os.listdir(_nvidia_dir):
_bin_dir = os.path.join(_nvidia_dir, _pkg, "bin")
if os.path.isdir(_bin_dir):
os.environ["PATH"] = _bin_dir + os.pathsep + os.environ["PATH"]
# Import the tkinter fix to patch the ScreenChanged error
import tkinter_fix
from modules import platform_info
platform_info.print_banner()
from modules import core
if __name__ == '__main__':