Files
facefusion/tests/test_api_stream_audio.py
T
Henry Ruhs 7181b41f2d Feat/finalize stream (#1144)
* break stream helper into pieces

* remove todos
2026-06-02 16:05:21 +02:00

117 lines
4.2 KiB
Python

import ctypes
import threading
from collections import deque
from unittest.mock import MagicMock, patch
import numpy
import pytest
from facefusion import rtc, rtc_store, state_manager
from facefusion.apis.stream_audio import fill_audio_deque, receive_audio_frames, run_audio_encode_loop
from facefusion.download import conditional_download
from facefusion.ffmpeg import read_audio_buffer
from facefusion.hash_helper import create_hash
from facefusion.libraries import datachannel as datachannel_module, opus as opus_module
from facefusion.types import AudioCodec, AudioPack, RtcPeer, RtcPeerAudio
from .assert_helper import get_test_example_file, get_test_examples_directory
@pytest.fixture(scope = 'module', autouse = True)
def before_all() -> None:
state_manager.init_item('download_providers', [ 'github', 'huggingface' ])
state_manager.init_item('processors', [])
opus_module.pre_check()
datachannel_module.pre_check()
conditional_download(get_test_examples_directory(),
[
'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/source.mp3'
])
@pytest.fixture(scope = 'function', autouse = True)
def before_each() -> None:
rtc_store.clear()
def test_run_audio_encode_loop() -> None:
audio_buffer = read_audio_buffer(get_test_example_file('source.mp3'), 48000, 16, 2)
audio_frame = numpy.frombuffer(audio_buffer, dtype = numpy.int16).astype(numpy.float32) / 32768.0
peer_connection = rtc.create_peer_connection()
rtc_peer : RtcPeer =\
{
'peer_connection': peer_connection,
'video':
{
'sender_track': 0,
'receiver_track': 0,
'codec': 'vp8'
},
'sender_bitrate': ctypes.c_uint(0),
'receiver_bitrate': ctypes.c_uint(0)
}
audio_deque : deque[AudioPack] = deque()
audio_event = threading.Event()
audio_deque.append((audio_frame, 0.100))
audio_event.set()
encoder_mock = MagicMock()
encoder_mock.encode.return_value = bytes([ 1 ] * 32)
with patch('facefusion.apis.stream_audio.opus_encoder.encode', encoder_mock.encode):
with patch('facefusion.apis.stream_audio.rtc.send_audio') as send_audio_mock:
audio_loop_thread = threading.Thread(target = run_audio_encode_loop, args = (rtc_peer, audio_deque, audio_event), daemon = True)
audio_loop_thread.start()
audio_deque.append((numpy.empty(0), 0.0))
audio_event.set()
audio_loop_thread.join(timeout = 5.0)
assert encoder_mock.encode.called is True
assert send_audio_mock.called is True
def test_fill_audio_deque() -> None:
audio_buffer = read_audio_buffer(get_test_example_file('source.mp3'), 48000, 16, 2)
audio_frame = numpy.frombuffer(audio_buffer, dtype = numpy.int16).astype(numpy.float32) / 32768.0
audio_decoder_mock = MagicMock()
audio_deque : deque[AudioPack] = deque()
audio_event = threading.Event()
with patch('facefusion.apis.stream_audio.decode_audio_frame', return_value = audio_frame.tobytes()):
fill_audio_deque('opus', audio_decoder_mock, audio_frame.tobytes(), audio_deque, audio_event)
buffer_frame, _ = audio_deque.popleft()
assert audio_event.is_set()
assert create_hash(buffer_frame.tobytes()) == create_hash(audio_frame.tobytes())
@pytest.mark.parametrize('audio_codec', [ 'opus' ])
def test_receive_audio_frames(audio_codec : AudioCodec) -> None:
audio_buffer = read_audio_buffer(get_test_example_file('source.mp3'), 48000, 16, 2)
audio_frame = numpy.frombuffer(audio_buffer, dtype = numpy.int16).astype(numpy.float32) / 32768.0
audio_deque : deque[AudioPack] = deque()
audio_event = threading.Event()
datachannel_library_mock = MagicMock()
datachannel_library_mock.rtcReceiveMessage.side_effect = [ 0, -1 ]
with patch('facefusion.apis.stream_audio.datachannel_module.create_static_library', return_value = datachannel_library_mock):
with patch('facefusion.apis.stream_audio.decode_audio_frame', return_value = audio_frame.tobytes()):
rtc_peer_audio : RtcPeerAudio =\
{
'sender_track': 0,
'receiver_track': 0,
'codec': audio_codec
}
audio_receiver_thread = threading.Thread(target = receive_audio_frames, args = (rtc_peer_audio, audio_deque, audio_event), daemon = True)
audio_receiver_thread.start()
audio_receiver_thread.join(timeout = 5.0)
buffer_frame, _ = audio_deque.popleft()
assert create_hash(buffer_frame.tobytes()) == create_hash(audio_frame.tobytes())