Best performance to code ratio for stream (#1150)

* queue with futures, kill deque, add couple of todos

* resolve couple of todos

* add more todos

* add more todos and resolve others

* add more todos and resolve others

* fix test

* fix collapse

* adjust naming a bit
This commit is contained in:
Henry Ruhs
2026-06-09 22:16:51 +02:00
committed by GitHub
parent 67cc3de934
commit 00fb89d4f1
9 changed files with 200 additions and 130 deletions
+11 -10
View File
@@ -2,6 +2,7 @@ import ctypes
import threading
from functools import partial
from queue import Queue
from typing import Tuple
from unittest.mock import MagicMock, patch
import numpy
@@ -13,7 +14,7 @@ from facefusion.download import conditional_download
from facefusion.ffmpeg import read_audio_buffer
from facefusion.hash_helper import create_hash
from facefusion.libraries import datachannel as datachannel_module, opus as opus_module
from facefusion.types import AudioCodec, AudioPack, FrameHandler, RtcPeer, RtcPeerAudio
from facefusion.types import AudioCodec, AudioFrame, FrameHandler, RtcPeer, RtcPeerAudio
from .assert_helper import get_test_example_file, get_test_examples_directory
@@ -57,9 +58,9 @@ def test_run_audio_encode_loop() -> None:
'receiver_bitrate': ctypes.c_uint(0)
}
audio_queue : Queue[AudioPack] = Queue(maxsize = 300)
audio_queue : Queue[Tuple[float, AudioFrame]] = Queue(maxsize = 300)
audio_queue.put((audio_frame, 0.100))
audio_queue.put((0.100, audio_frame))
encoder_mock = MagicMock()
encoder_mock.encode.return_value = bytes([ 1 ] * 32)
@@ -68,7 +69,7 @@ def test_run_audio_encode_loop() -> None:
with patch('facefusion.apis.stream_audio.rtc.send_audio') as send_audio_mock:
audio_loop_thread = threading.Thread(target = run_audio_encode_loop, args = (rtc_peer, audio_queue), daemon = True)
audio_loop_thread.start()
audio_queue.put((numpy.empty(0), 0.0))
audio_queue.put((0.0, numpy.empty(0)))
audio_loop_thread.join(timeout = 5.0)
assert encoder_mock.encode.called is True
@@ -79,7 +80,7 @@ def test_run_audio_encode_loop() -> None:
def test_receive_audio_frames(audio_codec : AudioCodec) -> None:
audio_buffer = read_audio_buffer(get_test_example_file('source.mp3'), 48000, 16, 2)
audio_frame = numpy.frombuffer(audio_buffer, dtype = numpy.int16).astype(numpy.float32) / 32768.0
audio_queue : Queue[AudioPack] = Queue(maxsize = 300)
audio_queue : Queue[Tuple[float, AudioFrame]] = Queue(maxsize = 300)
datachannel_mock = MagicMock()
ready_event = threading.Event()
@@ -100,20 +101,20 @@ def test_receive_audio_frames(audio_codec : AudioCodec) -> None:
datachannel_mock.rtcSetClosedCallback.call_args[0][1](0, None)
audio_receiver_thread.join(timeout = 5.0)
buffer_frame, _ = audio_queue.get_nowait()
_, temp_audio_frame = audio_queue.get_nowait()
assert create_hash(buffer_frame.tobytes()) == create_hash(audio_frame.tobytes())
assert create_hash(temp_audio_frame.tobytes()) == create_hash(audio_frame.tobytes())
def test_handle_audio_frame() -> None:
audio_buffer = read_audio_buffer(get_test_example_file('source.mp3'), 48000, 16, 2)
audio_frame = numpy.frombuffer(audio_buffer, dtype = numpy.int16).astype(numpy.float32) / 32768.0
audio_decoder_mock = MagicMock()
audio_queue : Queue[AudioPack] = Queue(maxsize = 300)
audio_queue : Queue[Tuple[float, AudioFrame]] = Queue(maxsize = 300)
with patch('facefusion.apis.stream_audio.decode_audio_frame', return_value = audio_frame.tobytes()):
handle_audio_frame('opus', audio_decoder_mock, audio_queue, 0, ctypes.c_void_p(), 1, ctypes.c_void_p(), ctypes.c_void_p())
buffer_frame, _ = audio_queue.get_nowait()
_, temp_audio_frame = audio_queue.get_nowait()
assert create_hash(buffer_frame.tobytes()) == create_hash(audio_frame.tobytes())
assert create_hash(temp_audio_frame.tobytes()) == create_hash(audio_frame.tobytes())
+1
View File
@@ -17,6 +17,7 @@ from .assert_helper import get_test_example_file, get_test_examples_directory
@pytest.fixture(scope = 'module', autouse = True)
def before_all() -> None:
state_manager.init_item('download_providers', [ 'github', 'huggingface' ])
state_manager.init_item('execution_thread_count', 8)
state_manager.init_item('processors', [])
datachannel_module.pre_check()
+71 -34
View File
@@ -1,8 +1,10 @@
import ctypes
import struct
import threading
from concurrent.futures import Future, ThreadPoolExecutor
from functools import partial
from queue import Queue
from typing import Tuple
from unittest.mock import MagicMock, patch
import cv2
@@ -10,13 +12,13 @@ import numpy
import pytest
from facefusion import rtc, rtc_store, state_manager
from facefusion.apis.stream_video import create_video_decoder, create_video_encoder, decode_video_frame, destroy_video_decoder, destroy_video_encoder, encode_video_frame, handle_video_frame, receive_video_frames, run_video_encode_loop, update_video_encoder_bitrate
from facefusion.apis.stream_video import create_video_decoder, create_video_encoder, decode_video_frame, destroy_video_decoder, destroy_video_encoder, encode_video_frame, handle_video_frame, process_video_frame, receive_video_frames, run_video_encode_loop, update_video_encoder_bitrate, update_video_encoder_resolution
from facefusion.codecs import aom_encoder, vpx_encoder
from facefusion.common_helper import is_linux, is_macos, is_windows
from facefusion.download import conditional_download
from facefusion.hash_helper import create_hash
from facefusion.libraries import aom as aom_module, datachannel as datachannel_module, vpx as vpx_module
from facefusion.types import FrameHandler, RtcPeer, RtcPeerVideo, VideoCodec, VideoPack
from facefusion.types import FrameHandler, Resolution, RtcPeer, RtcPeerVideo, VideoCodec
from facefusion.vision import read_video_frame
from .assert_helper import get_test_example_file, get_test_examples_directory
@@ -65,16 +67,18 @@ def test_run_video_encode_loop(video_codec : VideoCodec, payload_type : int) ->
'receiver_bitrate': ctypes.c_uint(8000)
}
video_queue : Queue[VideoPack] = Queue(maxsize = 30)
video_queue : Queue[Tuple[float, Future[Tuple[bytes, Resolution]]]] = Queue(maxsize = 30)
video_queue.put((video_frame, 0.1))
with ThreadPoolExecutor(max_workers = 1) as executor:
video_queue.put((0.1, executor.submit(process_video_frame, video_frame)))
with patch('facefusion.apis.stream_video.rtc.send_video') as send_video_mock:
encode_loop_thread = threading.Thread(target = run_video_encode_loop, args = (rtc_peer, video_queue), daemon = True)
encode_loop_thread.start()
empty_vision_frame = numpy.empty(0)
video_queue.put((empty_vision_frame, 0.0))
encode_loop_thread.join(timeout = 5.0)
with patch('facefusion.apis.stream_video.rtc.send_video') as send_video_mock:
encode_loop_thread = threading.Thread(target = run_video_encode_loop, args = (rtc_peer, video_queue), daemon = True)
encode_loop_thread.start()
empty_future : Future[Tuple[bytes, Resolution]] = Future()
empty_future.set_result((bytes(), (0, 0)))
video_queue.put((0.0, empty_future))
encode_loop_thread.join(timeout = 5.0)
assert send_video_mock.called
@@ -95,34 +99,37 @@ def test_run_video_encode_loop(video_codec : VideoCodec, payload_type : int) ->
@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8', 'vp9' ])
def test_receive_video_frames(video_codec : VideoCodec) -> None:
video_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
video_queue : Queue[VideoPack] = Queue(maxsize = 30)
video_queue : Queue[Tuple[float, Future[Tuple[bytes, Resolution]]]] = Queue(maxsize = 30)
datachannel_mock = MagicMock()
ready_event = threading.Event()
datachannel_mock.rtcSetClosedCallback.side_effect = partial(set_ready_event, ready_event)
with patch('facefusion.libraries.datachannel.create_static_library', return_value = datachannel_mock):
with patch('facefusion.apis.stream_video.decode_video_frame', return_value = video_frame):
rtc_peer_video : RtcPeerVideo =\
{
'sender_track': 0,
'receiver_track': 0,
'codec': video_codec
}
video_receiver_thread = threading.Thread(target = receive_video_frames, args = (rtc_peer_video, video_queue), daemon = True)
video_receiver_thread.start()
ready_event.wait(timeout = 5.0)
datachannel_mock.rtcSetFrameCallback.call_args[0][1](0, bytes([ 0 ]), 1, None, None)
datachannel_mock.rtcSetClosedCallback.call_args[0][1](0, None)
video_receiver_thread.join(timeout = 5.0)
with ThreadPoolExecutor(max_workers = 1) as executor:
with patch('facefusion.libraries.datachannel.create_static_library', return_value = datachannel_mock):
with patch('facefusion.apis.stream_video.decode_video_frame', return_value = video_frame):
with patch('facefusion.apis.stream_video.process_video_frame', return_value = (video_frame.tobytes(), (426, 226))):
rtc_peer_video : RtcPeerVideo =\
{
'sender_track': 0,
'receiver_track': 0,
'codec': video_codec
}
video_receiver_thread = threading.Thread(target = receive_video_frames, args = (rtc_peer_video, video_queue, executor), daemon = True)
video_receiver_thread.start()
ready_event.wait(timeout = 5.0)
datachannel_mock.rtcSetFrameCallback.call_args[0][1](0, bytes([ 0 ]), 1, None, None)
datachannel_mock.rtcSetClosedCallback.call_args[0][1](0, None)
video_receiver_thread.join(timeout = 5.0)
_, video_future = video_queue.get_nowait()
vision_frame, _ = video_queue.get_nowait()
video_buffer, _ = video_future.result()
if is_linux() or is_windows():
assert create_hash(vision_frame.tobytes()) == 'a17439db'
assert create_hash(video_buffer) == 'a17439db'
if is_macos():
assert create_hash(vision_frame.tobytes()) == '38d00e2a'
assert create_hash(video_buffer) == '38d00e2a'
@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8', 'vp9' ])
@@ -197,6 +204,33 @@ def test_create_and_destroy_video_encoder(video_codec : VideoCodec) -> None:
assert vpx_encoder.encode(video_encoder, input_buffer, (426, 226), 1) == bytes()
@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8', 'vp9' ])
def test_update_video_encoder_resolution(video_codec : VideoCodec) -> None:
video_encoder = create_video_encoder(video_codec, (426, 226), 4000)
if video_codec == 'av1':
assert struct.unpack_from('I', video_encoder, 128 + 12)[0] == 426
if video_codec == 'vp8':
assert struct.unpack_from('I', video_encoder, 64 + 12)[0] == 426
if video_codec == 'vp9':
assert struct.unpack_from('I', video_encoder, 64 + 12)[0] == 426
assert update_video_encoder_resolution(video_codec, video_encoder, (320, 180))
if video_codec == 'av1':
assert struct.unpack_from('I', video_encoder, 128 + 12)[0] == 320
if video_codec == 'vp8':
assert struct.unpack_from('I', video_encoder, 64 + 12)[0] == 320
if video_codec == 'vp9':
assert struct.unpack_from('I', video_encoder, 64 + 12)[0] == 320
destroy_video_encoder(video_codec, video_encoder)
@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8', 'vp9' ])
def test_update_video_encoder_bitrate(video_codec : VideoCodec) -> None:
video_encoder = create_video_encoder(video_codec, (426, 226), 4000)
@@ -228,15 +262,18 @@ def test_update_video_encoder_bitrate(video_codec : VideoCodec) -> None:
def test_handle_video_frame(video_codec : VideoCodec) -> None:
video_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
video_decoder = create_video_decoder(video_codec)
video_queue : Queue[VideoPack] = Queue(maxsize = 30)
video_queue : Queue[Tuple[float, Future[Tuple[bytes, Resolution]]]] = Queue(maxsize = 30)
with patch('facefusion.apis.stream_video.decode_video_frame', return_value = video_frame):
handle_video_frame(video_codec, video_decoder, video_queue, 0, ctypes.c_void_p(), 1, ctypes.c_void_p(), ctypes.c_void_p())
with ThreadPoolExecutor(max_workers = 1) as executor:
with patch('facefusion.apis.stream_video.decode_video_frame', return_value = video_frame):
with patch('facefusion.apis.stream_video.process_video_frame', return_value = (video_frame.tobytes(), (426, 226))):
handle_video_frame(video_codec, video_decoder, video_queue, executor, 0, ctypes.c_void_p(), 1, ctypes.c_void_p(), ctypes.c_void_p())
_, video_future = video_queue.get_nowait()
vision_frame, _ = video_queue.get_nowait()
video_buffer, _ = video_future.result()
if is_linux() or is_windows():
assert create_hash(vision_frame.tobytes()) == 'a17439db'
assert create_hash(video_buffer) == 'a17439db'
if is_macos():
assert create_hash(vision_frame.tobytes()) == '38d00e2a'
assert create_hash(video_buffer) == '38d00e2a'