From beeb1d99e9ccc7c68c119803c91e8ae201a7bacb Mon Sep 17 00:00:00 2001 From: Harisreedhar <46858047+harisreedhar@users.noreply.github.com> Date: Tue, 2 Jun 2026 01:50:40 +0530 Subject: [PATCH] Refactor(stream-helper): split encode/receive loops and unify audio/video structure (#1142) * split encode loop * unify audio and video encode loop methods * improve variable names * split receive methods * test improve * try to avoid != with even more weird approach * remove source_path check * remove empty variables * fix lint * avoid not in condition * rename * fix lint --- facefusion/apis/stream_helper.py | 183 +++++++++++++++++-------------- tests/test_api_stream_helper.py | 121 ++++++++++++-------- 2 files changed, 172 insertions(+), 132 deletions(-) diff --git a/facefusion/apis/stream_helper.py b/facefusion/apis/stream_helper.py index 46e16779..83d2f7e4 100644 --- a/facefusion/apis/stream_helper.py +++ b/facefusion/apis/stream_helper.py @@ -11,26 +11,22 @@ import cv2 import numpy from starlette.websockets import WebSocket -from facefusion import rtc, rtc_store, state_manager, streamer +from facefusion import rtc, rtc_store, streamer from facefusion.audio import create_empty_audio_frame from facefusion.codecs import aom_decoder, aom_encoder, opus_decoder, opus_encoder, vpx_decoder, vpx_encoder from facefusion.libraries import datachannel as datachannel_module -from facefusion.types import AomDecoder, AomEncoder, AomPointer, AudioCodec, AudioPack, BitRate, PeerConnection, Resolution, RtcPeer, RtcPeerAudio, RtcPeerVideo, SdpAnswer, SdpOffer, SessionId, VideoCodec, VideoPack, VisionFrame, VpxDecoder, VpxEncoder, VpxPointer +from facefusion.types import AomDecoder, AomEncoder, AomPointer, AudioCodec, AudioPack, BitRate, OpusDecoder, PeerConnection, Resolution, RtcPeer, RtcPeerAudio, RtcPeerVideo, SdpAnswer, SdpOffer, SessionId, VideoCodec, VideoPack, VisionFrame, VpxDecoder, VpxEncoder, VpxPointer -#TODO: remove source_paths guard, process_image should work independent of source_paths since processors decide if they need sources async def process_image(websocket : WebSocket) -> None: - source_paths = state_manager.get_item('source_paths') + capture_vision_frame = await anext(receive_vision_frames(websocket), None) - if source_paths: - capture_vision_frame = await anext(receive_vision_frames(websocket), None) + if numpy.any(capture_vision_frame): + output_vision_frame = streamer.process_frame(create_empty_audio_frame(), capture_vision_frame) + is_success, output_frame_buffer = cv2.imencode('.jpg', output_vision_frame) - if numpy.any(capture_vision_frame): - output_vision_frame = streamer.process_frame(create_empty_audio_frame(), capture_vision_frame) - is_success, output_frame_buffer = cv2.imencode('.jpg', output_vision_frame) - - if is_success: - await websocket.send_bytes(output_frame_buffer.tobytes()) + if is_success: + await websocket.send_bytes(output_frame_buffer.tobytes()) #TODO: needs review @@ -53,9 +49,6 @@ def process_video(session_id : SessionId, sdp_offer : SdpOffer) -> Optional[SdpA audio_codec : AudioCodec = 'opus' audio_payload_type = rtc.get_payload_type(sdp_offer, audio_codec) - #todo we try to avoid empty variables like that - audio_receiver_track = None - audio_sender_track = None if audio_payload_type: audio_receiver_track = rtc.add_audio_track(peer_connection, 'recvonly', audio_codec, audio_payload_type) @@ -78,7 +71,7 @@ def process_video(session_id : SessionId, sdp_offer : SdpOffer) -> Optional[SdpA 'receiver_bitrate': receiver_bitrate } - if audio_receiver_track and audio_sender_track: + if audio_payload_type: rtc_peer['audio'] = RtcPeerAudio( sender_track = audio_sender_track, receiver_track = audio_receiver_track, @@ -114,60 +107,41 @@ async def run_peer_loop(session_id : SessionId, rtc_peer : RtcPeer) -> None: video_deque : deque[VideoPack] = deque(maxlen = 1) audio_deque : deque[AudioPack] = deque(maxlen = 10) video_event = threading.Event() - video_codec = rtc_peer.get('video').get('codec') - video_receive = asyncio.to_thread(receive_video_frames, rtc_peer.get('video'), video_deque, video_event) - encode_loop = asyncio.to_thread(run_encode_loop, rtc_peer, video_codec, video_deque, audio_deque, video_event) - coroutines = [ video_receive, encode_loop ] + video_receiver_thread = asyncio.to_thread(receive_video_frames, rtc_peer.get('video'), video_deque, video_event) + video_encoder_thread = asyncio.to_thread(run_video_encode_loop, rtc_peer, video_deque, video_event) + coroutines = [ video_receiver_thread, video_encoder_thread ] if rtc_peer.get('audio'): - coroutines.append(asyncio.to_thread(receive_audio_frames, rtc_peer.get('audio'), audio_deque)) + audio_event = threading.Event() + coroutines.append(asyncio.to_thread(receive_audio_frames, rtc_peer.get('audio'), audio_deque, audio_event)) + coroutines.append(asyncio.to_thread(run_audio_encode_loop, rtc_peer, audio_deque, audio_event)) await asyncio.gather(*coroutines) rtc_store.delete_peers(session_id) #TODO: needs review -#TODO: split encoding, frame processing and sending to allow testing without mocking opus_encoder and streamer -def run_encode_loop(rtc_peer : RtcPeer, video_codec : VideoCodec, video_deque : deque[VideoPack], audio_deque : deque[AudioPack], video_event : threading.Event) -> None: +def run_video_encode_loop(rtc_peer : RtcPeer, video_deque : deque[VideoPack], video_event : threading.Event) -> None: video_event.wait() video_event.clear() - temp_vision_frame, video_receive_time = video_deque.popleft() + video_codec = rtc_peer.get('video').get('codec') + temp_vision_frame, temp_video_time = video_deque.popleft() if numpy.any(temp_vision_frame): temp_resolution : Resolution = (temp_vision_frame.shape[1], temp_vision_frame.shape[0]) temp_bitrate : BitRate = 8000 video_encoder = create_video_encoder(video_codec, temp_resolution, temp_bitrate) - audio_encoder = opus_encoder.create(48000, 2) frame_index = 0 - temp_video_receive_time = 0.0 while numpy.any(temp_vision_frame): - frame_duration = 1.0 / 30 - - if temp_video_receive_time: - frame_duration = video_receive_time - temp_video_receive_time - - temp_video_receive_time = video_receive_time - - audio_packets : list[tuple[bytes, float]] = [] - - while audio_deque and audio_deque[0][1] < video_receive_time + frame_duration: - audio_frame, audio_time = audio_deque.popleft() - output_audio_buffer = opus_encoder.encode(audio_encoder, audio_frame.tobytes(), 960) - - if output_audio_buffer: - audio_packets.append((output_audio_buffer, audio_time)) - output_vision_frame = streamer.process_frame(create_empty_audio_frame(), temp_vision_frame) output_resolution : Resolution = (output_vision_frame.shape[1], output_vision_frame.shape[0]) - # TODO: align buffer naming with input/output and video/audio convention output_vision_buffer = cv2.cvtColor(output_vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes() peer_bitrate = rtc_peer.get('sender_bitrate').value - # TODO: avoid != in condition - if output_resolution != temp_resolution: + if output_resolution[0] - temp_resolution[0] or output_resolution[1] - temp_resolution[1]: destroy_video_encoder(video_codec, video_encoder) temp_resolution = output_resolution video_encoder = create_video_encoder(video_codec, temp_resolution, temp_bitrate) @@ -183,24 +157,47 @@ def run_encode_loop(rtc_peer : RtcPeer, video_codec : VideoCodec, video_deque : output_video_buffer = encode_video_frame(video_codec, video_encoder, output_vision_buffer, temp_resolution, frame_index) - for audio_buffer, audio_time in audio_packets: - rtc.send_audio(rtc_peer, audio_buffer, int(audio_time * 48000)) - if output_video_buffer: - rtc.send_video(rtc_peer, output_video_buffer, int(video_receive_time * 90000)) + rtc.send_video(rtc_peer, output_video_buffer, int(temp_video_time * 90000)) frame_index += 1 video_event.wait() video_event.clear() - temp_vision_frame, video_receive_time = video_deque.popleft() + temp_vision_frame, temp_video_time = video_deque.popleft() - # TODO: remove unconditional destroy methods, which have no impact on control flow destroy_video_encoder(video_codec, video_encoder) - opus_encoder.destroy(audio_encoder) rtc.clear_remb(rtc_peer) -#TODO: split receive loop, decode and buffering into separate functions to allow testing without ctypes mocks +def run_audio_encode_loop(rtc_peer : RtcPeer, audio_deque : deque[AudioPack], audio_event : threading.Event) -> None: + audio_event.wait() + audio_event.clear() + temp_audio_frame, temp_audio_time = audio_deque.popleft() + audio_encoder = opus_encoder.create(48000, 2) + + while numpy.any(temp_audio_frame): + output_audio_buffer = opus_encoder.encode(audio_encoder, temp_audio_frame.tobytes(), 960) + + if output_audio_buffer: + rtc.send_audio(rtc_peer, output_audio_buffer, int(temp_audio_time * 48000)) + + if len(audio_deque) == 0: + audio_event.wait() + audio_event.clear() + + temp_audio_frame, temp_audio_time = audio_deque.popleft() + + opus_encoder.destroy(audio_encoder) + + +def fill_video_deque(video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder, video_buffer : bytes, video_deque : deque[VideoPack], video_event : threading.Event) -> None: + vision_frame = decode_video_frame(video_codec, video_decoder, video_buffer) + + if numpy.any(vision_frame): + video_deque.append((vision_frame, time.monotonic())) + video_event.set() + + def receive_video_frames(rtc_peer_video : RtcPeerVideo, video_deque : deque[VideoPack], video_event : threading.Event) -> None: video_track = rtc_peer_video.get('receiver_track') video_codec = rtc_peer_video.get('codec') @@ -217,13 +214,8 @@ def receive_video_frames(rtc_peer_video : RtcPeerVideo, video_deque : deque[Vide receive_status_code = datachannel_library.rtcReceiveMessage(video_track, receive_buffer, ctypes.byref(buffer_size)) if receive_status_code == 0 and buffer_size.value > 0: - receive_time = time.monotonic() video_buffer = receive_buffer.raw[:buffer_size.value] - vision_frame = decode_video_frame(video_codec, video_decoder, video_buffer) - - if numpy.any(vision_frame): - video_deque.append((vision_frame, receive_time)) - video_event.set() + fill_video_deque(video_codec, video_decoder, video_buffer, video_deque, video_event) if receive_status_code == -3: available_event.wait() @@ -235,39 +227,41 @@ def receive_video_frames(rtc_peer_video : RtcPeerVideo, video_deque : deque[Vide destroy_video_decoder(video_codec, video_decoder) -#TODO: split receive loop, decode and buffering into separate functions to allow testing without ctypes mocks -def receive_audio_frames(rtc_peer_audio : RtcPeerAudio, audio_deque : deque[AudioPack]) -> None: +def fill_audio_deque(audio_codec : AudioCodec, audio_decoder : OpusDecoder, audio_buffer : bytes, audio_deque : deque[AudioPack], audio_event : threading.Event) -> None: + audio_frame = decode_audio_frame(audio_codec, audio_decoder, audio_buffer) + + if audio_frame: + audio_deque.append((numpy.frombuffer(audio_frame, dtype = numpy.float32), time.monotonic())) + audio_event.set() + + +def receive_audio_frames(rtc_peer_audio : RtcPeerAudio, audio_deque : deque[AudioPack], audio_event : threading.Event) -> None: audio_track = rtc_peer_audio.get('receiver_track') audio_codec = rtc_peer_audio.get('codec') datachannel_library = datachannel_module.create_static_library() + audio_decoder = create_audio_decoder(audio_codec) + receive_buffer = ctypes.create_string_buffer(8 * 1024) + available_event = threading.Event() + available_callback = ctypes.CFUNCTYPE(None, ctypes.c_int, ctypes.c_void_p)(partial(dispatch_event, available_event)) + datachannel_library.rtcSetAvailableCallback(audio_track, available_callback) + receive_status_code = -3 - if audio_codec == 'opus': - audio_decoder = opus_decoder.create(48000, 2) - receive_buffer = ctypes.create_string_buffer(8 * 1024) - available_event = threading.Event() - available_callback = ctypes.CFUNCTYPE(None, ctypes.c_int, ctypes.c_void_p)(partial(dispatch_event, available_event)) - datachannel_library.rtcSetAvailableCallback(audio_track, available_callback) - receive_status_code = -3 + while receive_status_code == 0 or receive_status_code == -3: + buffer_size = ctypes.c_int(8 * 1024) + receive_status_code = datachannel_library.rtcReceiveMessage(audio_track, receive_buffer, ctypes.byref(buffer_size)) - while receive_status_code == 0 or receive_status_code == -3: - buffer_size = ctypes.c_int(8 * 1024) - receive_status_code = datachannel_library.rtcReceiveMessage(audio_track, receive_buffer, ctypes.byref(buffer_size)) + if receive_status_code == 0 and buffer_size.value > 0: + audio_buffer = receive_buffer.raw[:buffer_size.value] + fill_audio_deque(audio_codec, audio_decoder, audio_buffer, audio_deque, audio_event) - if receive_status_code == 0 and buffer_size.value > 0: - receive_time = time.monotonic() - audio_buffer = receive_buffer.raw[:buffer_size.value] - audio_frame = opus_decoder.decode(audio_decoder, audio_buffer, 960, 2) + if receive_status_code == -3: + available_event.wait() + available_event.clear() - if audio_frame: - audio_deque.append((numpy.frombuffer(audio_frame, dtype = numpy.float32), receive_time)) - - if receive_status_code == -3: - available_event.wait() - available_event.clear() - - opus_decoder.destroy(audio_decoder) - - return None + empty_audio_frame = numpy.empty(0) + audio_deque.append((empty_audio_frame, 0.0)) + audio_event.set() + destroy_audio_decoder(audio_codec, audio_decoder) def decode_video_frame(video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder, input_buffer : bytes) -> Optional[VisionFrame]: @@ -286,6 +280,13 @@ def decode_video_frame(video_codec : VideoCodec, video_decoder : VpxDecoder | Ao return None +def decode_audio_frame(audio_codec : AudioCodec, audio_decoder : OpusDecoder, input_buffer : bytes) -> Optional[bytes]: + if audio_codec == 'opus': + return opus_decoder.decode(audio_decoder, input_buffer, 960, 2) + + return None + + def encode_video_frame(video_codec : VideoCodec, video_encoder : VpxEncoder | AomEncoder, input_buffer : bytes, frame_resolution : Resolution, frame_index : int) -> bytes: if video_codec == 'av1': return aom_encoder.encode(video_encoder, input_buffer, frame_resolution, frame_index) @@ -302,6 +303,13 @@ def normalize_vision_frame(frame_pointer : AomPointer | VpxPointer) -> VisionFra return cv2.cvtColor(vision_frame, cv2.COLOR_YUV2BGR_I420) +def create_audio_decoder(audio_codec : AudioCodec) -> Optional[OpusDecoder]: + if audio_codec == 'opus': + return opus_decoder.create(48000, 2) + + return None + + def create_video_decoder(video_codec : VideoCodec) -> Optional[VpxDecoder | AomDecoder]: if video_codec == 'av1': return aom_decoder.create(8) @@ -322,6 +330,11 @@ def create_video_encoder(video_codec : VideoCodec, frame_resolution : Resolution return None +def destroy_audio_decoder(audio_codec : AudioCodec, audio_decoder : OpusDecoder) -> None: + if audio_codec == 'opus': + opus_decoder.destroy(audio_decoder) + + def destroy_video_decoder(video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder) -> None: if video_codec == 'av1': aom_decoder.destroy(video_decoder) diff --git a/tests/test_api_stream_helper.py b/tests/test_api_stream_helper.py index 3cc21021..98169cff 100644 --- a/tests/test_api_stream_helper.py +++ b/tests/test_api_stream_helper.py @@ -10,7 +10,7 @@ import numpy import pytest from facefusion import rtc, rtc_store, state_manager -from facefusion.apis.stream_helper import create_video_decoder, create_video_encoder, decode_video_frame, destroy_stream, destroy_video_decoder, destroy_video_encoder, encode_video_frame, process_image, process_video, receive_audio_frames, receive_video_frames, receive_vision_frames, run_encode_loop, run_peer_loop, update_video_encoder_bitrate +from facefusion.apis.stream_helper import create_video_decoder, create_video_encoder, decode_video_frame, destroy_stream, destroy_video_decoder, destroy_video_encoder, encode_video_frame, fill_audio_deque, fill_video_deque, process_image, process_video, receive_audio_frames, receive_video_frames, receive_vision_frames, run_audio_encode_loop, run_peer_loop, run_video_encode_loop, update_video_encoder_bitrate from facefusion.codecs import aom_encoder, vpx_encoder from facefusion.common_helper import is_linux, is_macos, is_windows from facefusion.download import conditional_download @@ -57,7 +57,6 @@ async def test_process_image() -> None: } ] - state_manager.init_item('source_paths', [ get_test_example_file('source.jpg') ]) await process_image(websocket_mock) websocket_mock.send_bytes.assert_called_once() @@ -65,11 +64,6 @@ async def test_process_image() -> None: if is_linux() or is_windows(): assert create_hash(websocket_mock.send_bytes.call_args[0][0]) == '0142782f' - state_manager.init_item('source_paths', None) - await process_image(websocket_mock) - - websocket_mock.send_bytes.assert_called_once() - @pytest.mark.parametrize('video_codec, session_id', [ ('av1', 'test-process-video-av1'), ('vp8', 'test-process-video-vp8') ]) def test_process_video(video_codec : VideoCodec, session_id : str) -> None: @@ -155,7 +149,7 @@ def test_run_peer_loop(video_codec : VideoCodec, payload_type : int, session_id assert rtc_store.has_peers(session_id) is True with patch('facefusion.apis.stream_helper.receive_video_frames'): - with patch('facefusion.apis.stream_helper.run_encode_loop'): + with patch('facefusion.apis.stream_helper.run_video_encode_loop'): thread = threading.Thread(target = asyncio.run, args = (run_peer_loop(session_id, rtc_peer),), daemon = True) thread.start() thread.join(timeout = 5.0) @@ -164,7 +158,7 @@ def test_run_peer_loop(video_codec : VideoCodec, payload_type : int, session_id @pytest.mark.parametrize('video_codec, payload_type', [ ('av1', 35), ('vp8', 96) ]) -def test_run_encode_loop(video_codec : VideoCodec, payload_type : int) -> None: +def test_run_video_encode_loop(video_codec : VideoCodec, payload_type : int) -> None: video_frame = read_video_frame(get_test_example_file('target-240p.mp4')) peer_connection = rtc.create_peer_connection() video_sender_track = rtc.add_video_track(peer_connection, 'sendonly', video_codec, payload_type) @@ -183,14 +177,13 @@ def test_run_encode_loop(video_codec : VideoCodec, payload_type : int) -> None: } video_deque : deque[VideoPack] = deque() - audio_deque : deque[AudioPack] = deque() video_event = threading.Event() video_deque.append((video_frame, 0.1)) video_event.set() with patch('facefusion.apis.stream_helper.rtc.send_video') as send_video_mock: - encode_loop_thread = threading.Thread(target = run_encode_loop, args = (rtc_peer, video_codec, video_deque, audio_deque, video_event), daemon = True) + encode_loop_thread = threading.Thread(target = run_video_encode_loop, args = (rtc_peer, video_deque, video_event), daemon = True) encode_loop_thread.start() empty_vision_frame = numpy.empty(0) video_deque.append((empty_vision_frame, 0.0)) @@ -210,60 +203,73 @@ def test_run_encode_loop(video_codec : VideoCodec, payload_type : int) -> None: pytest.skip() -@pytest.mark.parametrize('video_codec, payload_type', [ ('av1', 35), ('vp8', 96) ]) -def test_run_peer_loop_send_order(video_codec : VideoCodec, payload_type : int) -> None: - video_frame = read_video_frame(get_test_example_file('target-240p.mp4')) +def test_run_audio_encode_loop() -> None: audio_buffer = read_audio_buffer(get_test_example_file('source.mp3'), 48000, 16, 2) audio_frame = numpy.frombuffer(audio_buffer, dtype = numpy.int16).astype(numpy.float32) / 32768.0 peer_connection = rtc.create_peer_connection() - video_sender_track = rtc.add_video_track(peer_connection, 'sendonly', video_codec, payload_type) - video_receiver_track = rtc.add_video_track(peer_connection, 'recvonly', video_codec, payload_type) rtc_peer : RtcPeer =\ { 'peer_connection': peer_connection, 'video': { - 'sender_track': video_sender_track, - 'receiver_track': video_receiver_track, - 'codec': video_codec + 'sender_track': 0, + 'receiver_track': 0, + 'codec': 'vp8' }, 'sender_bitrate': ctypes.c_uint(0), 'receiver_bitrate': ctypes.c_uint(0) } - video_deque : deque[VideoPack] = deque() audio_deque : deque[AudioPack] = deque() - video_event = threading.Event() + audio_event = threading.Event() - video_deque.append((video_frame, 0.100)) audio_deque.append((audio_frame, 0.100)) - video_event.set() - - streamer_mock = MagicMock() - streamer_mock.process_frame.return_value = video_frame + audio_event.set() encoder_mock = MagicMock() encoder_mock.encode.return_value = bytes([ 1 ] * 32) - rtc_mock = MagicMock() - - with patch('facefusion.apis.stream_helper.streamer.process_frame', streamer_mock.process_frame): - with patch('facefusion.apis.stream_helper.opus_encoder.encode', encoder_mock.encode): - with patch('facefusion.apis.stream_helper.rtc.send_audio', rtc_mock.send_audio): - with patch('facefusion.apis.stream_helper.rtc.send_video', rtc_mock.send_video): - encode_loop_thread = threading.Thread(target = run_encode_loop, args = (rtc_peer, video_codec, video_deque, audio_deque, video_event), daemon = True) - encode_loop_thread.start() - empty_vision_frame = numpy.empty(0) - video_deque.append((empty_vision_frame, 0.0)) - video_event.set() - encode_loop_thread.join(timeout = 5.0) - - assert streamer_mock.process_frame.called is True + with patch('facefusion.apis.stream_helper.opus_encoder.encode', encoder_mock.encode): + with patch('facefusion.apis.stream_helper.rtc.send_audio') as send_audio_mock: + audio_loop_thread = threading.Thread(target = run_audio_encode_loop, args = (rtc_peer, audio_deque, audio_event), daemon = True) + audio_loop_thread.start() + audio_deque.append((numpy.empty(0), 0.0)) + audio_event.set() + audio_loop_thread.join(timeout = 5.0) assert encoder_mock.encode.called is True + assert send_audio_mock.called is True - assert rtc_mock.send_audio.called is True - assert rtc_mock.send_video.called is True + +@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ]) +def test_fill_video_deque(video_codec : VideoCodec) -> None: + video_frame = read_video_frame(get_test_example_file('target-240p.mp4')) + input_buffer = cv2.cvtColor(video_frame, cv2.COLOR_BGR2YUV_I420).tobytes() + video_encoder = create_video_encoder(video_codec, (426, 226), 1000) + video_decoder = create_video_decoder(video_codec) + encode_buffer = encode_video_frame(video_codec, video_encoder, input_buffer, (426, 226), 0) + video_deque : deque[VideoPack] = deque() + video_event = threading.Event() + + fill_video_deque(video_codec, video_decoder, encode_buffer, video_deque, video_event) + + vision_frame, _ = video_deque.popleft() + + assert video_event.is_set() + + if is_linux() or is_windows(): + if video_codec == 'av1': + assert create_hash(vision_frame.tobytes()) == 'c97d6d29' + + if video_codec == 'vp8': + assert create_hash(vision_frame.tobytes()) == '99ef2c25' + + if is_macos(): + if video_codec == 'av1': + assert create_hash(vision_frame.tobytes()) == 'eafd1fab' + + if video_codec == 'vp8': + assert create_hash(vision_frame.tobytes()) == 'ff3ecb43' @pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ]) @@ -287,11 +293,29 @@ def test_receive_video_frames(video_codec : VideoCodec) -> None: video_receiver_thread.start() video_receiver_thread.join(timeout = 5.0) + vision_frame, _ = video_deque.popleft() + if is_linux() or is_windows(): - assert create_hash(video_deque[0][0].tobytes()) == 'a17439db' + assert create_hash(vision_frame.tobytes()) == 'a17439db' if is_macos(): - assert create_hash(video_deque[0][0].tobytes()) == '38d00e2a' + assert create_hash(vision_frame.tobytes()) == '38d00e2a' + + +def test_fill_audio_deque() -> None: + audio_buffer = read_audio_buffer(get_test_example_file('source.mp3'), 48000, 16, 2) + audio_frame = numpy.frombuffer(audio_buffer, dtype = numpy.int16).astype(numpy.float32) / 32768.0 + audio_decoder_mock = MagicMock() + audio_deque : deque[AudioPack] = deque() + audio_event = threading.Event() + + with patch('facefusion.apis.stream_helper.decode_audio_frame', return_value = audio_frame.tobytes()): + fill_audio_deque('opus', audio_decoder_mock, audio_frame.tobytes(), audio_deque, audio_event) + + buffer_frame, _ = audio_deque.popleft() + + assert audio_event.is_set() + assert create_hash(buffer_frame.tobytes()) == create_hash(audio_frame.tobytes()) @pytest.mark.parametrize('audio_codec', [ 'opus' ]) @@ -299,23 +323,26 @@ def test_receive_audio_frames(audio_codec : AudioCodec) -> None: audio_buffer = read_audio_buffer(get_test_example_file('source.mp3'), 48000, 16, 2) audio_frame = numpy.frombuffer(audio_buffer, dtype = numpy.int16).astype(numpy.float32) / 32768.0 audio_deque : deque[AudioPack] = deque() + audio_event = threading.Event() datachannel_library_mock = MagicMock() datachannel_library_mock.rtcReceiveMessage.side_effect = [ 0, -1 ] with patch('facefusion.apis.stream_helper.datachannel_module.create_static_library', return_value = datachannel_library_mock): - with patch('facefusion.apis.stream_helper.opus_decoder.decode', return_value = audio_frame.tobytes()): + with patch('facefusion.apis.stream_helper.decode_audio_frame', return_value = audio_frame.tobytes()): rtc_peer_audio : RtcPeerAudio =\ { 'sender_track': 0, 'receiver_track': 0, 'codec': audio_codec } - audio_receiver_thread = threading.Thread(target = receive_audio_frames, args = (rtc_peer_audio, audio_deque), daemon = True) + audio_receiver_thread = threading.Thread(target = receive_audio_frames, args = (rtc_peer_audio, audio_deque, audio_event), daemon = True) audio_receiver_thread.start() audio_receiver_thread.join(timeout = 5.0) - assert create_hash(audio_deque[0][0].tobytes()) == create_hash(audio_frame.tobytes()) + buffer_frame, _ = audio_deque.popleft() + + assert create_hash(buffer_frame.tobytes()) == create_hash(audio_frame.tobytes()) @pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ])