import ctypes import threading from collections import deque from unittest.mock import MagicMock, patch import numpy import pytest from facefusion import rtc, rtc_store, state_manager from facefusion.apis.stream_audio import fill_audio_deque, receive_audio_frames, run_audio_encode_loop from facefusion.download import conditional_download from facefusion.ffmpeg import read_audio_buffer from facefusion.hash_helper import create_hash from facefusion.libraries import datachannel as datachannel_module, opus as opus_module from facefusion.types import AudioCodec, AudioPack, RtcPeer, RtcPeerAudio from .assert_helper import get_test_example_file, get_test_examples_directory @pytest.fixture(scope = 'module', autouse = True) def before_all() -> None: state_manager.init_item('download_providers', [ 'github', 'huggingface' ]) state_manager.init_item('processors', []) opus_module.pre_check() datachannel_module.pre_check() conditional_download(get_test_examples_directory(), [ 'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/source.mp3' ]) @pytest.fixture(scope = 'function', autouse = True) def before_each() -> None: rtc_store.clear() def test_run_audio_encode_loop() -> None: audio_buffer = read_audio_buffer(get_test_example_file('source.mp3'), 48000, 16, 2) audio_frame = numpy.frombuffer(audio_buffer, dtype = numpy.int16).astype(numpy.float32) / 32768.0 peer_connection = rtc.create_peer_connection() rtc_peer : RtcPeer =\ { 'peer_connection': peer_connection, 'video': { 'sender_track': 0, 'receiver_track': 0, 'codec': 'vp8' }, 'sender_bitrate': ctypes.c_uint(0), 'receiver_bitrate': ctypes.c_uint(0) } audio_deque : deque[AudioPack] = deque() audio_event = threading.Event() audio_deque.append((audio_frame, 0.100)) audio_event.set() encoder_mock = MagicMock() encoder_mock.encode.return_value = bytes([ 1 ] * 32) with patch('facefusion.apis.stream_audio.opus_encoder.encode', encoder_mock.encode): with patch('facefusion.apis.stream_audio.rtc.send_audio') as send_audio_mock: audio_loop_thread = threading.Thread(target = run_audio_encode_loop, args = (rtc_peer, audio_deque, audio_event), daemon = True) audio_loop_thread.start() audio_deque.append((numpy.empty(0), 0.0)) audio_event.set() audio_loop_thread.join(timeout = 5.0) assert encoder_mock.encode.called is True assert send_audio_mock.called is True def test_fill_audio_deque() -> None: audio_buffer = read_audio_buffer(get_test_example_file('source.mp3'), 48000, 16, 2) audio_frame = numpy.frombuffer(audio_buffer, dtype = numpy.int16).astype(numpy.float32) / 32768.0 audio_decoder_mock = MagicMock() audio_deque : deque[AudioPack] = deque() audio_event = threading.Event() with patch('facefusion.apis.stream_audio.decode_audio_frame', return_value = audio_frame.tobytes()): fill_audio_deque('opus', audio_decoder_mock, audio_frame.tobytes(), audio_deque, audio_event) buffer_frame, _ = audio_deque.popleft() assert audio_event.is_set() assert create_hash(buffer_frame.tobytes()) == create_hash(audio_frame.tobytes()) @pytest.mark.parametrize('audio_codec', [ 'opus' ]) def test_receive_audio_frames(audio_codec : AudioCodec) -> None: audio_buffer = read_audio_buffer(get_test_example_file('source.mp3'), 48000, 16, 2) audio_frame = numpy.frombuffer(audio_buffer, dtype = numpy.int16).astype(numpy.float32) / 32768.0 audio_deque : deque[AudioPack] = deque() audio_event = threading.Event() datachannel_library_mock = MagicMock() datachannel_library_mock.rtcReceiveMessage.side_effect = [ 0, -1 ] with patch('facefusion.apis.stream_audio.datachannel_module.create_static_library', return_value = datachannel_library_mock): with patch('facefusion.apis.stream_audio.decode_audio_frame', return_value = audio_frame.tobytes()): rtc_peer_audio : RtcPeerAudio =\ { 'sender_track': 0, 'receiver_track': 0, 'codec': audio_codec } audio_receiver_thread = threading.Thread(target = receive_audio_frames, args = (rtc_peer_audio, audio_deque, audio_event), daemon = True) audio_receiver_thread.start() audio_receiver_thread.join(timeout = 5.0) buffer_frame, _ = audio_deque.popleft() assert create_hash(buffer_frame.tobytes()) == create_hash(audio_frame.tobytes())