Refactor/stream helper testing (#1141)

* fix tests part1

* improve testing

* use real audio

* more renaming

* switch to hash assertions

* switch to hash assertions

* switch to hash assertions

* switch to hash assertions

* skip on macos
This commit is contained in:
Henry Ruhs
2026-06-01 18:42:37 +02:00
committed by GitHub
parent 3eeb505c86
commit e2e4e6a95b
8 changed files with 95 additions and 101 deletions
+3 -3
View File
@@ -128,7 +128,7 @@ async def run_peer_loop(session_id : SessionId, rtc_peer : RtcPeer) -> None:
#TODO: needs review
#TODO: method is too complex, consider separate encode loop for audio and video?
#TODO: split encoding, frame processing and sending to allow testing without mocking opus_encoder and streamer
def run_encode_loop(rtc_peer : RtcPeer, video_codec : VideoCodec, video_deque : deque[VideoPack], audio_deque : deque[AudioPack], video_event : threading.Event) -> None:
video_event.wait()
video_event.clear()
@@ -200,7 +200,7 @@ def run_encode_loop(rtc_peer : RtcPeer, video_codec : VideoCodec, video_deque :
rtc.clear_remb(rtc_peer)
# TODO: method is too complex
#TODO: split receive loop, decode and buffering into separate functions to allow testing without ctypes mocks
def receive_video_frames(rtc_peer_video : RtcPeerVideo, video_deque : deque[VideoPack], video_event : threading.Event) -> None:
video_track = rtc_peer_video.get('receiver_track')
video_codec = rtc_peer_video.get('codec')
@@ -235,7 +235,7 @@ def receive_video_frames(rtc_peer_video : RtcPeerVideo, video_deque : deque[Vide
destroy_video_decoder(video_codec, video_decoder)
# TODO: method is too complex
#TODO: split receive loop, decode and buffering into separate functions to allow testing without ctypes mocks
def receive_audio_frames(rtc_peer_audio : RtcPeerAudio, audio_deque : deque[AudioPack]) -> None:
audio_track = rtc_peer_audio.get('receiver_track')
audio_codec = rtc_peer_audio.get('codec')
+2 -2
View File
@@ -174,10 +174,10 @@ def analyse_video(video_path : str, trim_frame_start : int, trim_frame_end : int
for frame_number in frame_range:
if frame_number % int(video_fps) == 0:
vision_frame = read_video_frame(video_path, frame_number)
video_frame = read_video_frame(video_path, frame_number)
total += 1
if analyse_frame(vision_frame):
if analyse_frame(video_frame):
counter += 1
if counter > 0 and total > 0:
+1 -1
View File
@@ -121,7 +121,7 @@ def common_pre_check() -> bool:
content_analyser_content = inspect.getsource(content_analyser).encode()
content_analyser_hash = hash_helper.create_hash(content_analyser_content)
return all(module.pre_check() for module in common_modules) and content_analyser_hash == 'b14e7b92'
return all(module.pre_check() for module in common_modules) and content_analyser_hash == '320ef969'
def processors_pre_check() -> bool:
+77 -83
View File
@@ -2,7 +2,6 @@ import asyncio
import ctypes
import struct
import threading
import time
from collections import deque
from unittest.mock import AsyncMock, MagicMock, patch
@@ -15,9 +14,10 @@ from facefusion.apis.stream_helper import create_video_decoder, create_video_enc
from facefusion.codecs import aom_encoder, vpx_encoder
from facefusion.common_helper import is_linux, is_macos, is_windows
from facefusion.download import conditional_download
from facefusion.ffmpeg import read_audio_buffer
from facefusion.hash_helper import create_hash
from facefusion.libraries import aom as aom_module, datachannel as datachannel_module, opus as opus_module, vpx as vpx_module
from facefusion.types import AudioCodec, AudioPack, RtcPeer, RtcPeerAudio, RtcPeerVideo, VideoCodec, VideoPack
from facefusion.types import AudioCodec, AudioPack, RtcPeer, RtcPeerAudio, RtcPeerVideo, SessionId, VideoCodec, VideoPack
from facefusion.vision import read_video_frame
from .assert_helper import get_test_example_file, get_test_examples_directory
@@ -35,7 +35,8 @@ def before_all() -> None:
conditional_download(get_test_examples_directory(),
[
'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/target-240p.mp4',
'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/source.jpg'
'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/source.jpg',
'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/source.mp3'
])
@@ -46,14 +47,13 @@ def before_each() -> None:
@pytest.mark.anyio
async def test_process_image() -> None:
vision_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
frame_buffer = cv2.imencode('.jpg', vision_frame)[1].tobytes()
image_buffer = open(get_test_example_file('source.jpg'), 'rb').read()
websocket_mock = AsyncMock()
websocket_mock.receive.side_effect =\
[
{
'type': 'websocket.receive',
'bytes': frame_buffer
'bytes': image_buffer
}
]
@@ -61,7 +61,9 @@ async def test_process_image() -> None:
await process_image(websocket_mock)
websocket_mock.send_bytes.assert_called_once()
assert websocket_mock.send_bytes.call_args[0][0][:3] == bytes([ 255, 216, 255 ])
if is_linux() or is_windows():
assert create_hash(websocket_mock.send_bytes.call_args[0][0]) == '0142782f'
state_manager.init_item('source_paths', None)
await process_image(websocket_mock)
@@ -107,39 +109,30 @@ def test_process_video(video_codec : VideoCodec, session_id : str) -> None:
@pytest.mark.anyio
async def test_receive_vision_frames() -> None:
vision_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
frame_buffer = cv2.imencode('.jpg', vision_frame)[1].tobytes()
image_buffer = open(get_test_example_file('source.jpg'), 'rb').read()
websocket_mock = AsyncMock()
websocket_mock.receive.side_effect =\
[
{
'type': 'websocket.receive',
'bytes': frame_buffer
'bytes': image_buffer
},
{
'type': 'websocket.receive',
'bytes': 'invalid'.encode()
},
{
'type': 'websocket.receive',
'bytes': frame_buffer
},
{
'type': 'websocket.disconnect'
}
]
frames = []
vision_frames = receive_vision_frames(websocket_mock)
async for frame in receive_vision_frames(websocket_mock):
frames.append(frame)
assert len(frames) == 2
assert frames[0].shape == vision_frame.shape
assert create_hash((await anext(vision_frames)).tobytes()) == '5ed32ca0'
@pytest.mark.parametrize('video_codec, payload_type', [ ('av1', 35), ('vp8', 96) ])
def test_run_peer_loop(video_codec : VideoCodec, payload_type : int) -> None:
@pytest.mark.parametrize('video_codec, payload_type, session_id', [ ('av1', 35, 'test-run-peer-loop-av1'), ('vp8', 96, 'test-run-peer-loop-vp8') ])
def test_run_peer_loop(video_codec : VideoCodec, payload_type : int, session_id : SessionId) -> None:
peer_connection = rtc.create_peer_connection()
video_sender_track = rtc.add_video_track(peer_connection, 'sendonly', video_codec, payload_type)
video_receiver_track = rtc.add_video_track(peer_connection, 'recvonly', video_codec, payload_type)
@@ -156,28 +149,23 @@ def test_run_peer_loop(video_codec : VideoCodec, payload_type : int) -> None:
'receiver_bitrate': ctypes.c_uint(0)
}
# TODO: avoid concatenation — session_id should be a parametrize parameter like video_codec
session_id = 'test-run-peer-loop-' + video_codec
rtc_store.init_peers(session_id)
rtc_store.get_peers(session_id).append(rtc_peer)
# TODO: reduce patches — receiving video_frames is mocked to avoid blocking but that removes meaningful test coverage
assert rtc_store.has_peers(session_id) is True
with patch('facefusion.apis.stream_helper.receive_video_frames'):
with patch('facefusion.apis.stream_helper.run_encode_loop') as mock_encode_loop:
with patch('facefusion.apis.stream_helper.run_encode_loop'):
thread = threading.Thread(target = asyncio.run, args = (run_peer_loop(session_id, rtc_peer),), daemon = True)
thread.start()
thread.join(timeout = 5.0)
assert mock_encode_loop.called
# TODO: test against full call args not just [0][1] — assert mock_encode_loop.call_args matches rtc_peer and video_codec
assert mock_encode_loop.call_args[0][1] == video_codec
# TODO: assert rtc_store.has_peers(session_id) is True before the action, then False after
assert rtc_store.has_peers(session_id) is False
@pytest.mark.parametrize('video_codec, payload_type', [ ('av1', 35), ('vp8', 96) ])
def test_run_encode_loop(video_codec : VideoCodec, payload_type : int) -> None:
source_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
video_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
peer_connection = rtc.create_peer_connection()
video_sender_track = rtc.add_video_track(peer_connection, 'sendonly', video_codec, payload_type)
video_receiver_track = rtc.add_video_track(peer_connection, 'recvonly', video_codec, payload_type)
@@ -198,28 +186,35 @@ def test_run_encode_loop(video_codec : VideoCodec, payload_type : int) -> None:
audio_deque : deque[AudioPack] = deque()
video_event = threading.Event()
video_deque.append((source_frame, 0.100))
video_deque.append((video_frame, 0.1))
video_event.set()
with patch('facefusion.apis.stream_helper.rtc.send_video') as mock_send_video:
# TODO: thread name is unclear — rename to encode_loop_thread to identify what it runs
# TODO: thread block fires without flow testing — nobody triggers the deque/event naturally; the test pre-populates state rather than simulating real receive behaviour
thread = threading.Thread(target = run_encode_loop, args = (rtc_peer, video_codec, video_deque, audio_deque, video_event), daemon = True)
thread.start()
time.sleep(0.1)
with patch('facefusion.apis.stream_helper.rtc.send_video') as send_video_mock:
encode_loop_thread = threading.Thread(target = run_encode_loop, args = (rtc_peer, video_codec, video_deque, audio_deque, video_event), daemon = True)
encode_loop_thread.start()
empty_vision_frame = numpy.empty(0)
video_deque.append((empty_vision_frame, 0.0))
video_event.set()
thread.join(timeout = 5.0)
encode_loop_thread.join(timeout = 5.0)
assert mock_send_video.called
assert len(mock_send_video.call_args[0][1]) > 0
assert send_video_mock.called
if video_codec == 'av1':
if is_linux() or is_windows():
assert create_hash(send_video_mock.call_args[0][1]) == '9ba7212b'
if is_macos():
pytest.skip()
if video_codec == 'vp8':
pytest.skip()
@pytest.mark.parametrize('video_codec, payload_type', [ ('av1', 35), ('vp8', 96) ])
def test_run_peer_loop_send_order(video_codec : VideoCodec, payload_type : int) -> None:
source_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
audio_frame = numpy.zeros(960 * 2, dtype = numpy.float32)
video_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
audio_buffer = read_audio_buffer(get_test_example_file('source.mp3'), 48000, 16, 2)
audio_frame = numpy.frombuffer(audio_buffer, dtype = numpy.int16).astype(numpy.float32) / 32768.0
peer_connection = rtc.create_peer_connection()
video_sender_track = rtc.add_video_track(peer_connection, 'sendonly', video_codec, payload_type)
video_receiver_track = rtc.add_video_track(peer_connection, 'recvonly', video_codec, payload_type)
@@ -240,45 +235,48 @@ def test_run_peer_loop_send_order(video_codec : VideoCodec, payload_type : int)
audio_deque : deque[AudioPack] = deque()
video_event = threading.Event()
video_deque.append((source_frame, 0.100))
video_deque.append((video_frame, 0.100))
audio_deque.append((audio_frame, 0.100))
video_event.set()
# TODO: rename manager — generic name gives no context about what is being mocked
manager = MagicMock()
manager.process_frame.return_value = source_frame
manager.opus_encode.return_value = bytes([ 1 ] * 32)
streamer_mock = MagicMock()
streamer_mock.process_frame.return_value = video_frame
# TODO: too many patches — 4 nested mocks make this test a lie, real behaviour is hidden behind stubs
# TODO: nobody triggers the threads naturally — events and deques are pre-populated rather than driven by real receive flow
with patch('facefusion.apis.stream_helper.streamer.process_frame', manager.process_frame):
with patch('facefusion.apis.stream_helper.opus_encoder.encode', manager.opus_encode):
with patch('facefusion.apis.stream_helper.rtc.send_audio', manager.send_audio):
with patch('facefusion.apis.stream_helper.rtc.send_video', manager.send_video):
thread = threading.Thread(target = run_encode_loop, args = (rtc_peer, video_codec, video_deque, audio_deque, video_event), daemon = True)
thread.start()
time.sleep(0.1)
encoder_mock = MagicMock()
encoder_mock.encode.return_value = bytes([ 1 ] * 32)
rtc_mock = MagicMock()
with patch('facefusion.apis.stream_helper.streamer.process_frame', streamer_mock.process_frame):
with patch('facefusion.apis.stream_helper.opus_encoder.encode', encoder_mock.encode):
with patch('facefusion.apis.stream_helper.rtc.send_audio', rtc_mock.send_audio):
with patch('facefusion.apis.stream_helper.rtc.send_video', rtc_mock.send_video):
encode_loop_thread = threading.Thread(target = run_encode_loop, args = (rtc_peer, video_codec, video_deque, audio_deque, video_event), daemon = True)
encode_loop_thread.start()
empty_vision_frame = numpy.empty(0)
video_deque.append((empty_vision_frame, 0.0))
video_event.set()
thread.join(timeout = 5.0)
encode_loop_thread.join(timeout = 5.0)
call_names = [ call[0] for call in manager.mock_calls ]
assert streamer_mock.process_frame.called is True
assert 'process_frame' in call_names and 'send_audio' in call_names
assert call_names.index('process_frame') < call_names.index('send_audio')
assert encoder_mock.encode.called is True
assert rtc_mock.send_audio.called is True
assert rtc_mock.send_video.called is True
@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ])
def test_receive_video_frames(video_codec : VideoCodec) -> None:
datachannel_library_mock = MagicMock()
datachannel_library_mock.rtcReceiveMessage.side_effect = [ 0, -1 ]
vision_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
video_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
video_deque : deque[VideoPack] = deque()
video_event = threading.Event()
datachannel_library_mock = MagicMock()
datachannel_library_mock.rtcReceiveMessage.side_effect = [ 0, -1 ]
with patch('facefusion.apis.stream_helper.datachannel_module.create_static_library', return_value = datachannel_library_mock):
with patch('facefusion.apis.stream_helper.decode_video_frame', return_value = vision_frame):
with patch('facefusion.apis.stream_helper.decode_video_frame', return_value = video_frame):
rtc_peer_video : RtcPeerVideo =\
{
'sender_track': 0,
@@ -287,9 +285,8 @@ def test_receive_video_frames(video_codec : VideoCodec) -> None:
}
video_receiver_thread = threading.Thread(target = receive_video_frames, args = (rtc_peer_video, video_deque, video_event), daemon = True)
video_receiver_thread.start()
video_receiver_thread.join(timeout = 2.0)
video_receiver_thread.join(timeout = 5.0)
# TODO: avoid [0][0] tuple indexing — use named access once VideoPack becomes a TypedDict
if is_linux() or is_windows():
assert create_hash(video_deque[0][0].tobytes()) == 'a17439db'
@@ -297,13 +294,14 @@ def test_receive_video_frames(video_codec : VideoCodec) -> None:
assert create_hash(video_deque[0][0].tobytes()) == '38d00e2a'
# TODO: refine test
@pytest.mark.parametrize('audio_codec', [ 'opus' ])
def test_receive_audio_frames(audio_codec : AudioCodec) -> None:
audio_buffer = read_audio_buffer(get_test_example_file('source.mp3'), 48000, 16, 2)
audio_frame = numpy.frombuffer(audio_buffer, dtype = numpy.int16).astype(numpy.float32) / 32768.0
audio_deque : deque[AudioPack] = deque()
datachannel_library_mock = MagicMock()
datachannel_library_mock.rtcReceiveMessage.side_effect = [ 0, -1 ]
audio_frame = numpy.zeros(960 * 2, dtype = numpy.float32)
audio_deque : deque[AudioPack] = deque()
with patch('facefusion.apis.stream_helper.datachannel_module.create_static_library', return_value = datachannel_library_mock):
with patch('facefusion.apis.stream_helper.opus_decoder.decode', return_value = audio_frame.tobytes()):
@@ -315,18 +313,15 @@ def test_receive_audio_frames(audio_codec : AudioCodec) -> None:
}
audio_receiver_thread = threading.Thread(target = receive_audio_frames, args = (rtc_peer_audio, audio_deque), daemon = True)
audio_receiver_thread.start()
audio_receiver_thread.join(timeout = 2.0)
audio_receiver_thread.join(timeout = 5.0)
# TODO: assertions do not verify meaningful audio content — dtype/size/len would pass for an empty silent frame
assert audio_deque[0][0].dtype == numpy.float32
assert audio_deque[0][0].size == 960 * 2
assert len(audio_deque) == 1
assert create_hash(audio_deque[0][0].tobytes()) == create_hash(audio_frame.tobytes())
@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ])
def test_encode_and_decode_video_frame(video_codec : VideoCodec) -> None:
vision_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
input_buffer = cv2.cvtColor(vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes()
video_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
input_buffer = cv2.cvtColor(video_frame, cv2.COLOR_BGR2YUV_I420).tobytes()
video_encoder = create_video_encoder(video_codec, (426, 226), 1000)
video_decoder = create_video_decoder(video_codec)
encode_buffer = encode_video_frame(video_codec, video_encoder, input_buffer, (426, 226), 0)
@@ -351,8 +346,8 @@ def test_encode_and_decode_video_frame(video_codec : VideoCodec) -> None:
@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ])
def test_create_and_destroy_video_decoder(video_codec : VideoCodec) -> None:
vision_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
input_buffer = cv2.cvtColor(vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes()
video_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
input_buffer = cv2.cvtColor(video_frame, cv2.COLOR_BGR2YUV_I420).tobytes()
if video_codec == 'av1':
video_encoder = aom_encoder.create((426, 226), 1000, 1, 0)
@@ -373,9 +368,8 @@ def test_create_and_destroy_video_decoder(video_codec : VideoCodec) -> None:
@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ])
def test_create_and_destroy_video_encoder(video_codec : VideoCodec) -> None:
vision_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
input_buffer = cv2.cvtColor(vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes()
video_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
input_buffer = cv2.cvtColor(video_frame, cv2.COLOR_BGR2YUV_I420).tobytes()
video_encoder = create_video_encoder(video_codec, (426, 226), 4000)
if video_codec == 'av1':
@@ -429,8 +423,8 @@ def test_destroy_stream() -> None:
'sender_bitrate': ctypes.c_uint(0),
'receiver_bitrate': ctypes.c_uint(0)
}
session_id = 'test-destroy-stream'
rtc_store.init_peers(session_id)
rtc_store.get_peers(session_id).append(rtc_peer)
+3 -3
View File
@@ -31,9 +31,9 @@ def test_create() -> None:
def test_decode() -> None:
vision_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
video_buffer = cv2.cvtColor(vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes()
video_resolution = (vision_frame.shape[1], vision_frame.shape[0])
video_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
video_buffer = cv2.cvtColor(video_frame, cv2.COLOR_BGR2YUV_I420).tobytes()
video_resolution = (video_frame.shape[1], video_frame.shape[0])
aom_encoder = create_encoder(video_resolution, 1000, 1, 0)
encode_buffer = encode(aom_encoder, video_buffer, video_resolution, 0)
aom_decoder = create(1)
+3 -3
View File
@@ -30,9 +30,9 @@ def test_create() -> None:
def test_encode() -> None:
vision_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
video_buffer = cv2.cvtColor(vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes()
video_resolution = (vision_frame.shape[1], vision_frame.shape[0])
video_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
video_buffer = cv2.cvtColor(video_frame, cv2.COLOR_BGR2YUV_I420).tobytes()
video_resolution = (video_frame.shape[1], video_frame.shape[0])
aom_encoder = create(video_resolution, 1000, 1, 0)
if is_linux() or is_windows():
+3 -3
View File
@@ -31,9 +31,9 @@ def test_create() -> None:
def test_decode() -> None:
vision_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
video_buffer = cv2.cvtColor(vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes()
video_resolution = (vision_frame.shape[1], vision_frame.shape[0])
video_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
video_buffer = cv2.cvtColor(video_frame, cv2.COLOR_BGR2YUV_I420).tobytes()
video_resolution = (video_frame.shape[1], video_frame.shape[0])
vpx_encoder = create_encoder(video_resolution, 1000, 1, 0)
encode_buffer = encode(vpx_encoder, video_buffer, video_resolution, 0)
vpx_decoder = create(1)
+3 -3
View File
@@ -30,9 +30,9 @@ def test_create() -> None:
def test_encode() -> None:
vision_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
video_buffer = cv2.cvtColor(vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes()
video_resolution = (vision_frame.shape[1], vision_frame.shape[0])
video_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
video_buffer = cv2.cvtColor(video_frame, cv2.COLOR_BGR2YUV_I420).tobytes()
video_resolution = (video_frame.shape[1], video_frame.shape[0])
vpx_encoder = create(video_resolution, 1000, 1, 0)
if is_linux() or is_windows():