From e53cb63577f63c5abe92a4ec23e5465f2a2f03a2 Mon Sep 17 00:00:00 2001 From: Henry Ruhs Date: Tue, 12 May 2026 08:23:27 +0200 Subject: [PATCH] QA - Encoder Testing (#1101) * testing for audio and video encoders, minor cleanups * fix lint * finish create_vpx_encoder, adjust unrelated order of width vs height args --- facefusion/apis/stream_helper.py | 16 ++++++------ facefusion/audio_encoder.py | 15 ++++-------- facefusion/face_detector.py | 6 ++--- facefusion/face_helper.py | 2 +- facefusion/types.py | 4 +++ facefusion/video_encoder.py | 42 ++++++++++++++++---------------- tests/test_audio_encoder.py | 14 +++++++---- tests/test_video_encoder.py | 16 ++++++++---- 8 files changed, 62 insertions(+), 53 deletions(-) diff --git a/facefusion/apis/stream_helper.py b/facefusion/apis/stream_helper.py index 962c3995..cce1af18 100644 --- a/facefusion/apis/stream_helper.py +++ b/facefusion/apis/stream_helper.py @@ -44,7 +44,7 @@ async def receive_vision_frames(websocket : WebSocket) -> AsyncIterator[VisionFr # TODO: move to facefusion/vpx_encoder.py, throttle loop to avoid spinning on same frame def run_video_encode_loop(vision_frame_deque : deque[VisionFrame], session_id : SessionId, initial_resolution : Resolution, keyframe_interval : int) -> None: - codec_context = create_vpx_encoder(initial_resolution[0], initial_resolution[1], 4500) + vpx_encoder = create_vpx_encoder(initial_resolution[0], initial_resolution[1], 4500) current_resolution = initial_resolution pts = 0 @@ -55,29 +55,29 @@ def run_video_encode_loop(vision_frame_deque : deque[VisionFrame], session_id : frame_resolution = (width, height) if frame_resolution[0] != current_resolution[0] or frame_resolution[1] != current_resolution[1]: - if codec_context: - destroy_vpx_encoder(codec_context) + if vpx_encoder: + destroy_vpx_encoder(vpx_encoder) current_resolution = frame_resolution - codec_context = create_vpx_encoder(current_resolution[0], current_resolution[1], 4500) + vpx_encoder = create_vpx_encoder(current_resolution[0], current_resolution[1], 4500) pts = 0 - if codec_context: + if vpx_encoder: yuv_frame = cv2.cvtColor(output_frame, cv2.COLOR_BGR2YUV_I420) vpx_flags = 0 if pts % keyframe_interval == 0: vpx_flags = 1 - frame_buffer = encode_vpx(codec_context, yuv_frame.tobytes(), width, height, pts, vpx_flags) + frame_buffer = encode_vpx(vpx_encoder, yuv_frame.tobytes(), width, height, pts, vpx_flags) if frame_buffer: rtc_store.send_rtc_video(session_id, frame_buffer) pts += 1 - if codec_context: - destroy_vpx_encoder(codec_context) + if vpx_encoder: + destroy_vpx_encoder(vpx_encoder) # TODO: extract shared session setup from handle_image_stream and handle_video_stream, guard session_id like handle_video_stream diff --git a/facefusion/audio_encoder.py b/facefusion/audio_encoder.py index ba3c76bb..d6700c31 100644 --- a/facefusion/audio_encoder.py +++ b/facefusion/audio_encoder.py @@ -2,24 +2,20 @@ import ctypes from typing import Optional from facefusion.libraries import opus as opus_module +from facefusion.types import OpusEncoder -# TODO this method needs refinement -def create_opus_encoder(sample_rate : int, channels : int) -> Optional[ctypes.c_void_p]: +def create_opus_encoder(sample_rate : int, channel_total : int) -> Optional[OpusEncoder]: opus_library = opus_module.create_static_library() if opus_library: - error = ctypes.c_int(0) - encoder = opus_library.opus_encoder_create(sample_rate, channels, 2049, ctypes.byref(error)) - - if error.value == 0: - return encoder + return opus_library.opus_encoder_create(sample_rate, channel_total, 2049, ctypes.byref(ctypes.c_int(0))) return None # TODO this method needs refinement - rename to encode_opus_buffer -def encode_opus(opus_encoder : ctypes.c_void_p, pcm_pointer : ctypes.c_void_p, frame_size : int) -> bytes: +def encode_opus(opus_encoder : OpusEncoder, pcm_pointer : ctypes.c_void_p, frame_size : int) -> bytes: opus_library = opus_module.create_static_library() audio_buffer = b'' @@ -33,8 +29,7 @@ def encode_opus(opus_encoder : ctypes.c_void_p, pcm_pointer : ctypes.c_void_p, f return audio_buffer -# TODO not 100 sure this makes full sense. should we not run clear on the lru-cache instead? -def destroy_opus_encoder(opus_encoder : ctypes.c_void_p) -> None: +def destroy_opus_encoder(opus_encoder : OpusEncoder) -> None: opus_library = opus_module.create_static_library() if opus_library: diff --git a/facefusion/face_detector.py b/facefusion/face_detector.py index 6b5cb351..1ee9dd3c 100644 --- a/facefusion/face_detector.py +++ b/facefusion/face_detector.py @@ -228,7 +228,7 @@ def detect_with_retinaface(vision_frame : VisionFrame, face_detector_size : str) if numpy.any(keep_indices): stride_height = face_detector_height // feature_stride stride_width = face_detector_width // feature_stride - anchors = create_static_anchors(feature_stride, anchor_total, stride_height, stride_width) + anchors = create_static_anchors(feature_stride, anchor_total, stride_width, stride_height) bounding_boxes_raw = detection[index + feature_map_channel] * feature_stride face_landmarks_5_raw = detection[index + feature_map_channel * 2] * feature_stride @@ -273,7 +273,7 @@ def detect_with_scrfd(vision_frame : VisionFrame, face_detector_size : str) -> T if numpy.any(keep_indices): stride_height = face_detector_height // feature_stride stride_width = face_detector_width // feature_stride - anchors = create_static_anchors(feature_stride, anchor_total, stride_height, stride_width) + anchors = create_static_anchors(feature_stride, anchor_total, stride_width, stride_height) bounding_boxes_raw = detection[index + feature_map_channel] * feature_stride face_landmarks_5_raw = detection[index + feature_map_channel * 2] * feature_stride @@ -356,7 +356,7 @@ def detect_with_yunet(vision_frame : VisionFrame, face_detector_size : str) -> T if numpy.any(keep_indices): stride_height = face_detector_height // feature_stride stride_width = face_detector_width // feature_stride - anchors = create_static_anchors(feature_stride, anchor_total, stride_height, stride_width) + anchors = create_static_anchors(feature_stride, anchor_total, stride_width, stride_height) bounding_boxes_center = detection[index + feature_map_channel * 2].squeeze(0)[:, :2] * feature_stride + anchors bounding_boxes_size = numpy.exp(detection[index + feature_map_channel * 2].squeeze(0)[:, 2:4]) * feature_stride face_landmarks_5_raw = detection[index + feature_map_channel * 3].squeeze(0) diff --git a/facefusion/face_helper.py b/facefusion/face_helper.py index 553b3e65..2dfe222a 100644 --- a/facefusion/face_helper.py +++ b/facefusion/face_helper.py @@ -131,7 +131,7 @@ def calculate_paste_area(temp_vision_frame : VisionFrame, crop_vision_frame : Vi @lru_cache() -def create_static_anchors(feature_stride : int, anchor_total : int, stride_height : int, stride_width : int) -> Anchors: +def create_static_anchors(feature_stride : int, anchor_total : int, stride_width : int, stride_height : int) -> Anchors: x, y = numpy.mgrid[:stride_width, :stride_height] anchors = numpy.stack((y, x), axis = -1) anchors = (anchors * feature_stride).reshape((-1, 2)) diff --git a/facefusion/types.py b/facefusion/types.py index 58394eec..a28f508f 100755 --- a/facefusion/types.py +++ b/facefusion/types.py @@ -1,3 +1,4 @@ +import ctypes from collections import namedtuple from datetime import datetime from typing import Any, Callable, Dict, List, Literal, NotRequired, Optional, Tuple, TypeAlias, TypedDict, Union @@ -89,6 +90,9 @@ MelFilterBank : TypeAlias = NDArray[Any] Voice : TypeAlias = NDArray[Any] VoiceChunk : TypeAlias = NDArray[Any] +OpusEncoder : TypeAlias = ctypes.c_void_p +VpxEncoder : TypeAlias = ctypes.Array[ctypes.c_char] + BitRate : TypeAlias = int SampleRate : TypeAlias = int Fps : TypeAlias = float diff --git a/facefusion/video_encoder.py b/facefusion/video_encoder.py index 812bdb3f..1d4fa9ce 100644 --- a/facefusion/video_encoder.py +++ b/facefusion/video_encoder.py @@ -1,43 +1,44 @@ import ctypes -import multiprocessing import struct from typing import Optional from facefusion.libraries import vpx as vpx_module +from facefusion.types import BitRate, VpxEncoder -# TODO this method needs refinement -def create_vpx_encoder(width : int, height : int, bitrate : int) -> Optional[ctypes.Array[ctypes.c_char]]: +def create_vpx_encoder(width : int, height : int, bitrate : BitRate) -> Optional[VpxEncoder]: vpx_library = vpx_module.create_static_library() if vpx_library: - vp8_descriptor = ctypes.c_void_p.in_dll(vpx_library, 'vpx_codec_vp8_cx_algo') + vpx_encoder = ctypes.create_string_buffer(512) + vp8_codec = ctypes.c_void_p.in_dll(vpx_library, 'vpx_codec_vp8_cx_algo') + config_buffer = ctypes.create_string_buffer(4096) - if vpx_library.vpx_codec_enc_config_default(ctypes.byref(vp8_descriptor), config_buffer, 0) == 0: - thread_count = min(multiprocessing.cpu_count(), 8) - struct.pack_into('I', config_buffer, 4, thread_count) + if vpx_library.vpx_codec_enc_config_default(ctypes.byref(vp8_codec), config_buffer, 0) == 0: + struct.pack_into('I', config_buffer, 4, 8) struct.pack_into('I', config_buffer, 12, width) struct.pack_into('I', config_buffer, 16, height) - struct.pack_into('I', config_buffer, 72, 2) + struct.pack_into('I', config_buffer, 28, 1) + struct.pack_into('I', config_buffer, 36, 0) + struct.pack_into('I', config_buffer, 72, 0) struct.pack_into('I', config_buffer, 112, bitrate) struct.pack_into('I', config_buffer, 116, 2) struct.pack_into('I', config_buffer, 120, 50) struct.pack_into('I', config_buffer, 124, 50) struct.pack_into('I', config_buffer, 128, 50) - context_buffer = ctypes.create_string_buffer(512) - if vpx_library.vpx_codec_enc_init_ver(context_buffer, ctypes.byref(vp8_descriptor), config_buffer, 0, 39) == 0: - vpx_library.vpx_codec_control_(context_buffer, 13, ctypes.c_int(16)) - vpx_library.vpx_codec_control_(context_buffer, 12, ctypes.c_int(3)) - vpx_library.vpx_codec_control_(context_buffer, 27, ctypes.c_int(10)) - return context_buffer + if vpx_library.vpx_codec_enc_init_ver(vpx_encoder, ctypes.byref(vp8_codec), config_buffer, 0, 39) == 0: + vpx_library.vpx_codec_control_(vpx_encoder, 13, ctypes.c_int(16)) + vpx_library.vpx_codec_control_(vpx_encoder, 12, ctypes.c_int(3)) + vpx_library.vpx_codec_control_(vpx_encoder, 27, ctypes.c_int(10)) + return vpx_encoder return None # TODO this method needs refinement - rename to encode_vpx_buffer -def encode_vpx(codec_context : ctypes.Array[ctypes.c_char], yuv_buffer : bytes, width : int, height : int, presentation_timestamp : int, flags : int) -> bytes: +def encode_vpx(vpx_encoder : VpxEncoder, yuv_buffer : bytes, width : int, height : int, presentation_timestamp : int, flags : int) -> bytes: vpx_library = vpx_module.create_static_library() frame_buffer = b'' @@ -46,9 +47,9 @@ def encode_vpx(codec_context : ctypes.Array[ctypes.c_char], yuv_buffer : bytes, yuv_string_buffer = ctypes.create_string_buffer(yuv_buffer) if vpx_library.vpx_img_wrap(image_buffer, 0x102, width, height, 1, yuv_string_buffer): - if vpx_library.vpx_codec_encode(codec_context, image_buffer, presentation_timestamp, 1, flags, 1) == 0: + if vpx_library.vpx_codec_encode(vpx_encoder, image_buffer, presentation_timestamp, 1, flags, 1) == 0: iterator = ctypes.c_void_p(0) - packet = vpx_library.vpx_codec_get_cx_data(codec_context, ctypes.byref(iterator)) + packet = vpx_library.vpx_codec_get_cx_data(vpx_encoder, ctypes.byref(iterator)) while packet: if ctypes.c_int.from_address(packet).value == 0: @@ -56,14 +57,13 @@ def encode_vpx(codec_context : ctypes.Array[ctypes.c_char], yuv_buffer : bytes, buffer_size = ctypes.c_size_t.from_address(packet + 16).value frame_buffer += ctypes.string_at(buffer_pointer, buffer_size) - packet = vpx_library.vpx_codec_get_cx_data(codec_context, ctypes.byref(iterator)) + packet = vpx_library.vpx_codec_get_cx_data(vpx_encoder, ctypes.byref(iterator)) return frame_buffer -# TODO not 100 sure this makes full sense. should we not run clear on the lru-cache instead? -def destroy_vpx_encoder(codec_context : ctypes.Array[ctypes.c_char]) -> None: +def destroy_vpx_encoder(vpx_encoder : VpxEncoder) -> None: vpx_library = vpx_module.create_static_library() if vpx_library: - vpx_library.vpx_codec_destroy(codec_context) + vpx_library.vpx_codec_destroy(vpx_encoder) diff --git a/tests/test_audio_encoder.py b/tests/test_audio_encoder.py index 452eab69..d294d33c 100644 --- a/tests/test_audio_encoder.py +++ b/tests/test_audio_encoder.py @@ -1,11 +1,12 @@ import ctypes +from unittest.mock import patch import numpy import pytest from tests.assert_helper import get_test_example_file, get_test_examples_directory from facefusion import state_manager -from facefusion.audio_encoder import create_opus_encoder, encode_opus +from facefusion.audio_encoder import create_opus_encoder, destroy_opus_encoder, encode_opus from facefusion.download import conditional_download from facefusion.ffmpeg import read_audio_buffer from facefusion.libraries import opus as opus_module @@ -20,9 +21,9 @@ def before_all() -> None: opus_module.pre_check() -# TODO: implement def test_create_opus_encoder() -> None: - pass + assert create_opus_encoder(48000, 2) + assert create_opus_encoder(0, 0) is None #TODO: rename to test_encode_opus_buffer @@ -36,6 +37,9 @@ def test_encode_opus() -> None: assert encode_opus(opus_encoder, pcm_pointer, 0) == b'' -# TODO: implement def test_destroy_opus_encoder() -> None: - pass + opus_encoder = create_opus_encoder(48000, 2) + + with patch.object(opus_module.create_static_library(), 'opus_encoder_destroy') as mock: + destroy_opus_encoder(opus_encoder) + mock.assert_called_once_with(opus_encoder) diff --git a/tests/test_video_encoder.py b/tests/test_video_encoder.py index 264d7690..da3a3e66 100644 --- a/tests/test_video_encoder.py +++ b/tests/test_video_encoder.py @@ -1,3 +1,5 @@ +from unittest.mock import patch + import cv2 import pytest from tests.assert_helper import get_test_example_file, get_test_examples_directory @@ -5,21 +7,22 @@ from tests.assert_helper import get_test_example_file, get_test_examples_directo from facefusion import state_manager from facefusion.download import conditional_download from facefusion.libraries import vpx as vpx_module -from facefusion.video_encoder import create_vpx_encoder, encode_vpx +from facefusion.video_encoder import create_vpx_encoder, destroy_vpx_encoder, encode_vpx from facefusion.vision import read_video_frame @pytest.fixture(scope = 'module', autouse = True) def before_all() -> None: state_manager.init_item('download_providers', [ 'github', 'huggingface' ]) + conditional_download(get_test_examples_directory(), [ 'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/target-240p.mp4' ]) vpx_module.pre_check() -# TODO: implement def test_create_vpx_encoder() -> None: - pass + assert create_vpx_encoder(320, 240, 1000) + assert create_vpx_encoder(0, 0, 0) is None # TODO: rename to test_encode_vpx_buffer @@ -35,6 +38,9 @@ def test_encode_vpx() -> None: assert encode_vpx(vpx_encoder, buffer_invalid, width, height, 0, 0) == b'' -# TODO: implement def test_destroy_vpx_encoder() -> None: - pass + vpx_encoder = create_vpx_encoder(320, 240, 1000) + + with patch.object(vpx_module.create_static_library(), 'vpx_codec_destroy') as mock: + destroy_vpx_encoder(vpx_encoder) + mock.assert_called_once_with(vpx_encoder)