Files
hacksider-Deep-Live-Cam/modules/onnx_optimize.py
T
Max Buckley f65aeae5db Apple Silicon + Windows CUDA perf: 60 FPS pipeline, cross-platform routing
Bundles CoreML graph rewrites, GPU-accelerated pipeline work, Windows CUDA
fixes, and Mac/Windows runtime routing into a single drop.

CoreML (Apple Silicon):
- Decompose Pad(reflect) → Slice+Concat in inswapper_128 so the model
  runs in one CoreML partition instead of 14 (TEMPORARY: fixed upstream
  in microsoft/onnxruntime#28073, drop when ORT >= 1.26.0).
- Fold Shape/Gather chains to constants in det_10g (21ms → 4ms).
- Decompose Split(axis=1) → Slice pairs in GFPGAN (155ms → 89ms).
- Route detection model to GPU so the ANE is free for the swap model.
- Centralize provider/config selection in create_onnx_session.

Pipeline (all platforms):
- Parallelize face landmark + recognition post-detection; skip landmark_2d_106
  when only face_swapper is active.
- Pipeline face detection with swap for ANE overlap.
- GPU-accelerated paste_back, MJPEG capture, zero-copy display path.
- Standalone pipeline benchmark script.

Windows / CUDA:
- CUDA graphs + FP16 model + all-GPU pipeline for 1080p 60 FPS.
- Auto-detect GPU provider and fix DLL discovery for Windows CUDA execution.

Cross-platform:
- platform_info helper for Mac/Windows runtime routing.
- GFPGAN 30 fps + MSMF camera 60 fps with adaptive pipeline tuning.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-22 10:44:59 +02:00

429 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""ONNX model optimizations for CoreML execution on Apple Silicon.
Two transformations that eliminate CPU↔ANE round-trips:
1. **Pad(reflect) decomposition** — CoreML doesn't support ``Pad(mode=reflect)``.
Models using reflect padding (e.g. inswapper_128) get split into many CoreML
subgraphs with CPU fallbacks between each. We rewrite each ``Pad(reflect)``
as equivalent ``Slice`` + ``Concat`` ops that CoreML handles natively.
Bit-for-bit identical output.
2. **Shape/Gather constant folding** — Dynamic ``Shape`` → ``Gather`` chains
(e.g. for FPN upsample target sizes in RetinaFace) force ops onto CPU even
when the input dimensions are known at load time. We run ONNX shape
inference with the known input size and replace these chains with constants.
Float32-noise-level differences only (max ~6e-6).
Both transformations are cached on disk with a ``_coreml`` suffix so the
rewrite cost is paid only once per model.
"""
import os
import platform
import numpy as np
IS_APPLE_SILICON = platform.system() == "Darwin" and platform.machine() == "arm64"
def optimize_for_coreml(model_path: str, input_shape: tuple = None) -> str:
"""Return path to a CoreML-optimized ONNX model.
Applies all applicable optimizations and caches the result next to
the original model (with ``_coreml`` suffix).
Args:
model_path: Path to the original ONNX model.
input_shape: Optional fixed input shape (e.g. ``(1, 3, 640, 640)``).
When provided, enables Shape/Gather constant folding.
Returns the optimized path, or the original path if no optimizations
apply or we're not on Apple Silicon.
"""
if not IS_APPLE_SILICON:
return model_path
base, ext = os.path.splitext(model_path)
optimized_path = f"{base}_coreml{ext}"
if os.path.exists(optimized_path):
if os.path.getmtime(optimized_path) >= os.path.getmtime(model_path):
return optimized_path
import onnx
from onnx import numpy_helper
model = onnx.load(model_path)
changed = False
if _fold_shape_gather(model, input_shape):
changed = True
# TODO(ort>=1.26): drop this pass. Fixed upstream by microsoft/onnxruntime#28073.
if _decompose_reflect_pad(model):
changed = True
if _decompose_split(model):
changed = True
if not changed:
return model_path
# Preserve insightface's emap convention: the INSwapper class reads
# graph.initializer[-1] as the embedding map. If the original model
# had a (512, 512) matrix as its last initializer, keep it last.
_preserve_emap_position(model, numpy_helper)
onnx.save(model, optimized_path)
return optimized_path
# ---------------------------------------------------------------------------
# Pass 1: Fold Shape → Gather chains into constants
# ---------------------------------------------------------------------------
def _fold_shape_gather(model, input_shape) -> bool:
"""Replace dynamic Shape→Gather chains with constants when input size is known.
Only removes a Shape node when ALL of its consumers are Gather nodes
that are also being folded. This prevents breaking graphs where
a Shape output feeds into other ops as well.
"""
if input_shape is None:
return False
from onnx import numpy_helper, shape_inference
graph = model.graph
# Set fixed input dimensions for shape inference
inp = graph.input[0]
dims = inp.type.tensor_type.shape.dim
for i, size in enumerate(input_shape):
if i < len(dims):
dims[i].dim_value = size
try:
model_inferred = shape_inference.infer_shapes(model)
except Exception:
return False
# Extract inferred shapes
value_shapes = {}
for vi in list(model_inferred.graph.value_info) + list(graph.input) + list(graph.output):
shape_dims = vi.type.tensor_type.shape.dim
shape = []
for d in shape_dims:
if d.dim_value > 0:
shape.append(d.dim_value)
else:
shape.append(None)
value_shapes[vi.name] = shape
inits = {init.name: numpy_helper.to_array(init) for init in graph.initializer}
# Build consumer map: output_name → list of consuming nodes
consumers = {}
for node in graph.node:
for i in node.input:
consumers.setdefault(i, []).append(node)
# Also check graph outputs — an output name consumed by the graph
# output list must not be removed
graph_output_names = {o.name for o in graph.output}
# Find Shape nodes with fully-known output
shape_constants = {}
for node in graph.node:
if node.op_type == "Shape":
inp_shape = value_shapes.get(node.input[0])
if inp_shape and all(isinstance(d, int) for d in inp_shape):
shape_constants[node.output[0]] = np.array(inp_shape, dtype=np.int64)
if not shape_constants:
return False
# Find Gather nodes consuming Shape constants
gather_constants = {}
for node in graph.node:
if node.op_type == "Gather" and node.input[0] in shape_constants:
idx_name = node.input[1]
if idx_name in inits:
idx = int(inits[idx_name])
val = int(shape_constants[node.input[0]][idx])
gather_constants[node.output[0]] = np.array(val, dtype=np.int64)
if not gather_constants:
return False
# Determine which Gather nodes to fold (always safe — we replace
# the output with a constant initializer)
gather_remove_ids = set()
for node in graph.node:
if node.op_type == "Gather" and node.output[0] in gather_constants:
gather_remove_ids.add(id(node))
# Determine which Shape nodes are safe to remove: only if ALL
# consumers of the Shape output are Gather nodes being folded,
# and the output isn't a graph output.
shape_remove_ids = set()
for node in graph.node:
if node.op_type == "Shape" and node.output[0] in shape_constants:
out_name = node.output[0]
if out_name in graph_output_names:
continue
node_consumers = consumers.get(out_name, [])
if all(id(c) in gather_remove_ids for c in node_consumers):
shape_remove_ids.add(id(node))
remove_ids = gather_remove_ids | shape_remove_ids
# Add Gather output constants as initializers
existing = {i.name for i in graph.initializer}
for name, val in gather_constants.items():
if name not in existing:
graph.initializer.append(numpy_helper.from_array(val, name=name))
new_nodes = [n for n in graph.node if id(n) not in remove_ids]
del graph.node[:]
graph.node.extend(new_nodes)
return True
# ---------------------------------------------------------------------------
# Pass 2: Decompose Pad(reflect) → Slice + Concat
#
# TEMPORARY: fixed upstream in microsoft/onnxruntime#28073 (merged 2026-04-20).
# Once the ORT floor is >= 1.26.0, MLProgram handles Pad(mode=reflect) natively
# via MIL tensor_operation.pad and this entire pass can be deleted.
# ---------------------------------------------------------------------------
def _decompose_reflect_pad(model) -> bool:
"""Rewrite Pad(reflect) as Slice+Concat sequences CoreML can handle."""
from onnx import numpy_helper, helper
graph = model.graph
inits = {init.name: numpy_helper.to_array(init) for init in graph.initializer}
reflect_pads = []
for node in graph.node:
if node.op_type == "Pad":
mode = "constant"
for attr in node.attribute:
if attr.name == "mode":
mode = attr.s.decode()
if mode == "reflect" and len(node.input) > 1 and node.input[1] in inits:
reflect_pads.append(node)
if not reflect_pads:
return False
existing_names = {i.name for i in graph.initializer}
def ensure_const(name, value):
if name not in existing_names:
graph.initializer.append(
numpy_helper.from_array(np.array(value, dtype=np.int64), name=name)
)
existing_names.add(name)
ensure_const("_rp_ax2", [2])
ensure_const("_rp_ax3", [3])
max_pad = 0
for node in reflect_pads:
pads = inits[node.input[1]].tolist()
max_pad = max(max_pad, int(pads[2]), int(pads[3]))
for v in range(1, max_pad + 2):
ensure_const(f"_rp_p{v}", [v])
ensure_const(f"_rp_n{v}", [-v])
_counter = [0]
def uid():
_counter[0] += 1
return _counter[0]
pad_ids = {id(n) for n in reflect_pads}
pad_init_names = set()
new_nodes = []
for node in graph.node:
if id(node) not in pad_ids:
new_nodes.append(node)
continue
pads = inits[node.input[1]].tolist()
h_pad, w_pad = int(pads[2]), int(pads[3])
for inp in node.input[1:]:
if inp in inits:
pad_init_names.add(inp)
current = node.input[0]
if h_pad > 0:
top = []
for i in range(h_pad, 0, -1):
name = f"_rp_t{uid()}"
new_nodes.append(helper.make_node(
"Slice",
inputs=[current, f"_rp_p{i}", f"_rp_p{i+1}", "_rp_ax2"],
outputs=[name],
))
top.append(name)
bot = []
for i in range(1, h_pad + 1):
name = f"_rp_b{uid()}"
new_nodes.append(helper.make_node(
"Slice",
inputs=[current, f"_rp_n{i+1}", f"_rp_n{i}", "_rp_ax2"],
outputs=[name],
))
bot.append(name)
h_out = f"_rp_h{uid()}"
new_nodes.append(helper.make_node(
"Concat", inputs=top + [current] + bot, outputs=[h_out], axis=2
))
current = h_out
if w_pad > 0:
left = []
for i in range(w_pad, 0, -1):
name = f"_rp_l{uid()}"
new_nodes.append(helper.make_node(
"Slice",
inputs=[current, f"_rp_p{i}", f"_rp_p{i+1}", "_rp_ax3"],
outputs=[name],
))
left.append(name)
right = []
for i in range(1, w_pad + 1):
name = f"_rp_r{uid()}"
new_nodes.append(helper.make_node(
"Slice",
inputs=[current, f"_rp_n{i+1}", f"_rp_n{i}", "_rp_ax3"],
outputs=[name],
))
right.append(name)
new_nodes.append(helper.make_node(
"Concat",
inputs=left + [current] + right,
outputs=[node.output[0]],
axis=3,
))
elif h_pad > 0:
new_nodes.append(helper.make_node(
"Identity", inputs=[current], outputs=[node.output[0]]
))
# Remove old Pad initializers
clean_inits = [i for i in graph.initializer if i.name not in pad_init_names]
del graph.initializer[:]
graph.initializer.extend(clean_inits)
del graph.node[:]
graph.node.extend(new_nodes)
return True
# ---------------------------------------------------------------------------
# Pass 3: Decompose Split → Slice pairs
# ---------------------------------------------------------------------------
def _decompose_split(model) -> bool:
"""Rewrite Split(axis=1) as Slice pairs that CoreML can handle.
CoreML's EP doesn't support the ONNX ``Split`` op, causing partition
boundaries in models that use channel-wise splits (e.g. GFPGAN's SFT
modulation layers). Each Split with two outputs becomes two Slice ops.
"""
from onnx import numpy_helper, helper
graph = model.graph
splits = []
for node in graph.node:
if node.op_type == "Split":
axis = 0
split_sizes = []
for attr in node.attribute:
if attr.name == "axis":
axis = attr.i
if attr.name == "split":
split_sizes = list(attr.ints)
if axis == 1 and len(split_sizes) == 2 and len(node.output) == 2:
splits.append((node, split_sizes))
if not splits:
return False
existing = {i.name for i in graph.initializer}
def ensure_const(name, value):
if name not in existing:
graph.initializer.append(
numpy_helper.from_array(np.array(value, dtype=np.int64), name=name)
)
existing.add(name)
ensure_const("_sp_ax1", [1])
# Collect all needed boundary constants
for _, (a, b) in splits:
ensure_const(f"_sp_s0", [0])
ensure_const(f"_sp_s{a}", [a])
ensure_const(f"_sp_s{a + b}", [a + b])
split_ids = {id(node) for node, _ in splits}
replacements = {}
for node, (a, b) in splits:
slice0 = helper.make_node(
"Slice",
inputs=[node.input[0], "_sp_s0", f"_sp_s{a}", "_sp_ax1"],
outputs=[node.output[0]],
)
slice1 = helper.make_node(
"Slice",
inputs=[node.input[0], f"_sp_s{a}", f"_sp_s{a + b}", "_sp_ax1"],
outputs=[node.output[1]],
)
replacements[id(node)] = [slice0, slice1]
new_nodes = []
for node in graph.node:
if id(node) in split_ids:
new_nodes.extend(replacements[id(node)])
else:
new_nodes.append(node)
del graph.node[:]
graph.node.extend(new_nodes)
return True
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _preserve_emap_position(model, numpy_helper):
"""Keep the insightface emap (512×512 matrix) as the last initializer."""
graph = model.graph
emap_init = None
for init in graph.initializer:
if not init.name.startswith("_rp_"):
arr = numpy_helper.to_array(init)
if len(arr.shape) == 2 and arr.shape[0] == 512 and arr.shape[1] == 512:
emap_init = init
break
if emap_init is not None:
inits = [i for i in graph.initializer if i.name != emap_init.name]
del graph.initializer[:]
graph.initializer.extend(inits)
graph.initializer.append(emap_init)