QA - Encoder Testing (#1101)

* testing for audio and video encoders, minor cleanups

* fix lint

* finish create_vpx_encoder, adjust unrelated order of width vs height args
This commit is contained in:
Henry Ruhs
2026-05-12 08:23:27 +02:00
committed by GitHub
parent 717ff0aa33
commit e53cb63577
8 changed files with 62 additions and 53 deletions
+8 -8
View File
@@ -44,7 +44,7 @@ async def receive_vision_frames(websocket : WebSocket) -> AsyncIterator[VisionFr
# TODO: move to facefusion/vpx_encoder.py, throttle loop to avoid spinning on same frame
def run_video_encode_loop(vision_frame_deque : deque[VisionFrame], session_id : SessionId, initial_resolution : Resolution, keyframe_interval : int) -> None:
codec_context = create_vpx_encoder(initial_resolution[0], initial_resolution[1], 4500)
vpx_encoder = create_vpx_encoder(initial_resolution[0], initial_resolution[1], 4500)
current_resolution = initial_resolution
pts = 0
@@ -55,29 +55,29 @@ def run_video_encode_loop(vision_frame_deque : deque[VisionFrame], session_id :
frame_resolution = (width, height)
if frame_resolution[0] != current_resolution[0] or frame_resolution[1] != current_resolution[1]:
if codec_context:
destroy_vpx_encoder(codec_context)
if vpx_encoder:
destroy_vpx_encoder(vpx_encoder)
current_resolution = frame_resolution
codec_context = create_vpx_encoder(current_resolution[0], current_resolution[1], 4500)
vpx_encoder = create_vpx_encoder(current_resolution[0], current_resolution[1], 4500)
pts = 0
if codec_context:
if vpx_encoder:
yuv_frame = cv2.cvtColor(output_frame, cv2.COLOR_BGR2YUV_I420)
vpx_flags = 0
if pts % keyframe_interval == 0:
vpx_flags = 1
frame_buffer = encode_vpx(codec_context, yuv_frame.tobytes(), width, height, pts, vpx_flags)
frame_buffer = encode_vpx(vpx_encoder, yuv_frame.tobytes(), width, height, pts, vpx_flags)
if frame_buffer:
rtc_store.send_rtc_video(session_id, frame_buffer)
pts += 1
if codec_context:
destroy_vpx_encoder(codec_context)
if vpx_encoder:
destroy_vpx_encoder(vpx_encoder)
# TODO: extract shared session setup from handle_image_stream and handle_video_stream, guard session_id like handle_video_stream
+5 -10
View File
@@ -2,24 +2,20 @@ import ctypes
from typing import Optional
from facefusion.libraries import opus as opus_module
from facefusion.types import OpusEncoder
# TODO this method needs refinement
def create_opus_encoder(sample_rate : int, channels : int) -> Optional[ctypes.c_void_p]:
def create_opus_encoder(sample_rate : int, channel_total : int) -> Optional[OpusEncoder]:
opus_library = opus_module.create_static_library()
if opus_library:
error = ctypes.c_int(0)
encoder = opus_library.opus_encoder_create(sample_rate, channels, 2049, ctypes.byref(error))
if error.value == 0:
return encoder
return opus_library.opus_encoder_create(sample_rate, channel_total, 2049, ctypes.byref(ctypes.c_int(0)))
return None
# TODO this method needs refinement - rename to encode_opus_buffer
def encode_opus(opus_encoder : ctypes.c_void_p, pcm_pointer : ctypes.c_void_p, frame_size : int) -> bytes:
def encode_opus(opus_encoder : OpusEncoder, pcm_pointer : ctypes.c_void_p, frame_size : int) -> bytes:
opus_library = opus_module.create_static_library()
audio_buffer = b''
@@ -33,8 +29,7 @@ def encode_opus(opus_encoder : ctypes.c_void_p, pcm_pointer : ctypes.c_void_p, f
return audio_buffer
# TODO not 100 sure this makes full sense. should we not run clear on the lru-cache instead?
def destroy_opus_encoder(opus_encoder : ctypes.c_void_p) -> None:
def destroy_opus_encoder(opus_encoder : OpusEncoder) -> None:
opus_library = opus_module.create_static_library()
if opus_library:
+3 -3
View File
@@ -228,7 +228,7 @@ def detect_with_retinaface(vision_frame : VisionFrame, face_detector_size : str)
if numpy.any(keep_indices):
stride_height = face_detector_height // feature_stride
stride_width = face_detector_width // feature_stride
anchors = create_static_anchors(feature_stride, anchor_total, stride_height, stride_width)
anchors = create_static_anchors(feature_stride, anchor_total, stride_width, stride_height)
bounding_boxes_raw = detection[index + feature_map_channel] * feature_stride
face_landmarks_5_raw = detection[index + feature_map_channel * 2] * feature_stride
@@ -273,7 +273,7 @@ def detect_with_scrfd(vision_frame : VisionFrame, face_detector_size : str) -> T
if numpy.any(keep_indices):
stride_height = face_detector_height // feature_stride
stride_width = face_detector_width // feature_stride
anchors = create_static_anchors(feature_stride, anchor_total, stride_height, stride_width)
anchors = create_static_anchors(feature_stride, anchor_total, stride_width, stride_height)
bounding_boxes_raw = detection[index + feature_map_channel] * feature_stride
face_landmarks_5_raw = detection[index + feature_map_channel * 2] * feature_stride
@@ -356,7 +356,7 @@ def detect_with_yunet(vision_frame : VisionFrame, face_detector_size : str) -> T
if numpy.any(keep_indices):
stride_height = face_detector_height // feature_stride
stride_width = face_detector_width // feature_stride
anchors = create_static_anchors(feature_stride, anchor_total, stride_height, stride_width)
anchors = create_static_anchors(feature_stride, anchor_total, stride_width, stride_height)
bounding_boxes_center = detection[index + feature_map_channel * 2].squeeze(0)[:, :2] * feature_stride + anchors
bounding_boxes_size = numpy.exp(detection[index + feature_map_channel * 2].squeeze(0)[:, 2:4]) * feature_stride
face_landmarks_5_raw = detection[index + feature_map_channel * 3].squeeze(0)
+1 -1
View File
@@ -131,7 +131,7 @@ def calculate_paste_area(temp_vision_frame : VisionFrame, crop_vision_frame : Vi
@lru_cache()
def create_static_anchors(feature_stride : int, anchor_total : int, stride_height : int, stride_width : int) -> Anchors:
def create_static_anchors(feature_stride : int, anchor_total : int, stride_width : int, stride_height : int) -> Anchors:
x, y = numpy.mgrid[:stride_width, :stride_height]
anchors = numpy.stack((y, x), axis = -1)
anchors = (anchors * feature_stride).reshape((-1, 2))
+4
View File
@@ -1,3 +1,4 @@
import ctypes
from collections import namedtuple
from datetime import datetime
from typing import Any, Callable, Dict, List, Literal, NotRequired, Optional, Tuple, TypeAlias, TypedDict, Union
@@ -89,6 +90,9 @@ MelFilterBank : TypeAlias = NDArray[Any]
Voice : TypeAlias = NDArray[Any]
VoiceChunk : TypeAlias = NDArray[Any]
OpusEncoder : TypeAlias = ctypes.c_void_p
VpxEncoder : TypeAlias = ctypes.Array[ctypes.c_char]
BitRate : TypeAlias = int
SampleRate : TypeAlias = int
Fps : TypeAlias = float
+21 -21
View File
@@ -1,43 +1,44 @@
import ctypes
import multiprocessing
import struct
from typing import Optional
from facefusion.libraries import vpx as vpx_module
from facefusion.types import BitRate, VpxEncoder
# TODO this method needs refinement
def create_vpx_encoder(width : int, height : int, bitrate : int) -> Optional[ctypes.Array[ctypes.c_char]]:
def create_vpx_encoder(width : int, height : int, bitrate : BitRate) -> Optional[VpxEncoder]:
vpx_library = vpx_module.create_static_library()
if vpx_library:
vp8_descriptor = ctypes.c_void_p.in_dll(vpx_library, 'vpx_codec_vp8_cx_algo')
vpx_encoder = ctypes.create_string_buffer(512)
vp8_codec = ctypes.c_void_p.in_dll(vpx_library, 'vpx_codec_vp8_cx_algo')
config_buffer = ctypes.create_string_buffer(4096)
if vpx_library.vpx_codec_enc_config_default(ctypes.byref(vp8_descriptor), config_buffer, 0) == 0:
thread_count = min(multiprocessing.cpu_count(), 8)
struct.pack_into('I', config_buffer, 4, thread_count)
if vpx_library.vpx_codec_enc_config_default(ctypes.byref(vp8_codec), config_buffer, 0) == 0:
struct.pack_into('I', config_buffer, 4, 8)
struct.pack_into('I', config_buffer, 12, width)
struct.pack_into('I', config_buffer, 16, height)
struct.pack_into('I', config_buffer, 72, 2)
struct.pack_into('I', config_buffer, 28, 1)
struct.pack_into('I', config_buffer, 36, 0)
struct.pack_into('I', config_buffer, 72, 0)
struct.pack_into('I', config_buffer, 112, bitrate)
struct.pack_into('I', config_buffer, 116, 2)
struct.pack_into('I', config_buffer, 120, 50)
struct.pack_into('I', config_buffer, 124, 50)
struct.pack_into('I', config_buffer, 128, 50)
context_buffer = ctypes.create_string_buffer(512)
if vpx_library.vpx_codec_enc_init_ver(context_buffer, ctypes.byref(vp8_descriptor), config_buffer, 0, 39) == 0:
vpx_library.vpx_codec_control_(context_buffer, 13, ctypes.c_int(16))
vpx_library.vpx_codec_control_(context_buffer, 12, ctypes.c_int(3))
vpx_library.vpx_codec_control_(context_buffer, 27, ctypes.c_int(10))
return context_buffer
if vpx_library.vpx_codec_enc_init_ver(vpx_encoder, ctypes.byref(vp8_codec), config_buffer, 0, 39) == 0:
vpx_library.vpx_codec_control_(vpx_encoder, 13, ctypes.c_int(16))
vpx_library.vpx_codec_control_(vpx_encoder, 12, ctypes.c_int(3))
vpx_library.vpx_codec_control_(vpx_encoder, 27, ctypes.c_int(10))
return vpx_encoder
return None
# TODO this method needs refinement - rename to encode_vpx_buffer
def encode_vpx(codec_context : ctypes.Array[ctypes.c_char], yuv_buffer : bytes, width : int, height : int, presentation_timestamp : int, flags : int) -> bytes:
def encode_vpx(vpx_encoder : VpxEncoder, yuv_buffer : bytes, width : int, height : int, presentation_timestamp : int, flags : int) -> bytes:
vpx_library = vpx_module.create_static_library()
frame_buffer = b''
@@ -46,9 +47,9 @@ def encode_vpx(codec_context : ctypes.Array[ctypes.c_char], yuv_buffer : bytes,
yuv_string_buffer = ctypes.create_string_buffer(yuv_buffer)
if vpx_library.vpx_img_wrap(image_buffer, 0x102, width, height, 1, yuv_string_buffer):
if vpx_library.vpx_codec_encode(codec_context, image_buffer, presentation_timestamp, 1, flags, 1) == 0:
if vpx_library.vpx_codec_encode(vpx_encoder, image_buffer, presentation_timestamp, 1, flags, 1) == 0:
iterator = ctypes.c_void_p(0)
packet = vpx_library.vpx_codec_get_cx_data(codec_context, ctypes.byref(iterator))
packet = vpx_library.vpx_codec_get_cx_data(vpx_encoder, ctypes.byref(iterator))
while packet:
if ctypes.c_int.from_address(packet).value == 0:
@@ -56,14 +57,13 @@ def encode_vpx(codec_context : ctypes.Array[ctypes.c_char], yuv_buffer : bytes,
buffer_size = ctypes.c_size_t.from_address(packet + 16).value
frame_buffer += ctypes.string_at(buffer_pointer, buffer_size)
packet = vpx_library.vpx_codec_get_cx_data(codec_context, ctypes.byref(iterator))
packet = vpx_library.vpx_codec_get_cx_data(vpx_encoder, ctypes.byref(iterator))
return frame_buffer
# TODO not 100 sure this makes full sense. should we not run clear on the lru-cache instead?
def destroy_vpx_encoder(codec_context : ctypes.Array[ctypes.c_char]) -> None:
def destroy_vpx_encoder(vpx_encoder : VpxEncoder) -> None:
vpx_library = vpx_module.create_static_library()
if vpx_library:
vpx_library.vpx_codec_destroy(codec_context)
vpx_library.vpx_codec_destroy(vpx_encoder)
+9 -5
View File
@@ -1,11 +1,12 @@
import ctypes
from unittest.mock import patch
import numpy
import pytest
from tests.assert_helper import get_test_example_file, get_test_examples_directory
from facefusion import state_manager
from facefusion.audio_encoder import create_opus_encoder, encode_opus
from facefusion.audio_encoder import create_opus_encoder, destroy_opus_encoder, encode_opus
from facefusion.download import conditional_download
from facefusion.ffmpeg import read_audio_buffer
from facefusion.libraries import opus as opus_module
@@ -20,9 +21,9 @@ def before_all() -> None:
opus_module.pre_check()
# TODO: implement
def test_create_opus_encoder() -> None:
pass
assert create_opus_encoder(48000, 2)
assert create_opus_encoder(0, 0) is None
#TODO: rename to test_encode_opus_buffer
@@ -36,6 +37,9 @@ def test_encode_opus() -> None:
assert encode_opus(opus_encoder, pcm_pointer, 0) == b''
# TODO: implement
def test_destroy_opus_encoder() -> None:
pass
opus_encoder = create_opus_encoder(48000, 2)
with patch.object(opus_module.create_static_library(), 'opus_encoder_destroy') as mock:
destroy_opus_encoder(opus_encoder)
mock.assert_called_once_with(opus_encoder)
+11 -5
View File
@@ -1,3 +1,5 @@
from unittest.mock import patch
import cv2
import pytest
from tests.assert_helper import get_test_example_file, get_test_examples_directory
@@ -5,21 +7,22 @@ from tests.assert_helper import get_test_example_file, get_test_examples_directo
from facefusion import state_manager
from facefusion.download import conditional_download
from facefusion.libraries import vpx as vpx_module
from facefusion.video_encoder import create_vpx_encoder, encode_vpx
from facefusion.video_encoder import create_vpx_encoder, destroy_vpx_encoder, encode_vpx
from facefusion.vision import read_video_frame
@pytest.fixture(scope = 'module', autouse = True)
def before_all() -> None:
state_manager.init_item('download_providers', [ 'github', 'huggingface' ])
conditional_download(get_test_examples_directory(), [ 'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/target-240p.mp4' ])
vpx_module.pre_check()
# TODO: implement
def test_create_vpx_encoder() -> None:
pass
assert create_vpx_encoder(320, 240, 1000)
assert create_vpx_encoder(0, 0, 0) is None
# TODO: rename to test_encode_vpx_buffer
@@ -35,6 +38,9 @@ def test_encode_vpx() -> None:
assert encode_vpx(vpx_encoder, buffer_invalid, width, height, 0, 0) == b''
# TODO: implement
def test_destroy_vpx_encoder() -> None:
pass
vpx_encoder = create_vpx_encoder(320, 240, 1000)
with patch.object(vpx_module.create_static_library(), 'vpx_codec_destroy') as mock:
destroy_vpx_encoder(vpx_encoder)
mock.assert_called_once_with(vpx_encoder)