diff --git a/facefusion/apis/stream_helper.py b/facefusion/apis/stream_helper.py index acb4d1d2..de9a6cf8 100644 --- a/facefusion/apis/stream_helper.py +++ b/facefusion/apis/stream_helper.py @@ -1,8 +1,8 @@ -import contextlib +import asyncio import ctypes -import queue import threading import time +from collections import deque from collections.abc import AsyncIterator from functools import partial from typing import Optional @@ -15,7 +15,7 @@ from facefusion import rtc, rtc_store, state_manager, streamer from facefusion.audio import create_empty_audio_frame from facefusion.codecs import aom_decoder, aom_encoder, opus_decoder, opus_encoder, vpx_decoder, vpx_encoder from facefusion.libraries import datachannel as datachannel_module -from facefusion.types import AomDecoder, AomEncoder, AudioCodec, AudioFrame, BitRate, PeerConnection, Resolution, RtcPeer, RtcPeerAudio, SdpAnswer, SdpOffer, SessionId, VideoCodec, VisionFrame, VpxDecoder, VpxEncoder +from facefusion.types import AomDecoder, AomEncoder, AudioCodec, AudioPacket, BitRate, PeerConnection, Resolution, RtcPeer, RtcPeerAudio, SdpAnswer, SdpOffer, SessionId, VideoCodec, VisionFrame, VisionPacket, VpxDecoder, VpxEncoder #TODO: remove source_paths guard, process_image should work independent of source_paths since processors decide if they need sources @@ -88,7 +88,7 @@ def process_video(session_id : SessionId, sdp_offer : SdpOffer) -> Optional[SdpA rtc_store.init_peers(session_id) rtc_store.get_peers(session_id).append(rtc_peer) - threading.Thread(target = run_peer_loop, args = (session_id, rtc_peer), daemon = True).start() + threading.Thread(target = asyncio.run, args = (run_peer_loop(session_id, rtc_peer),), daemon = True).start() return local_sdp datachannel_module.create_static_library().rtcDeletePeerConnection(peer_connection) @@ -110,49 +110,63 @@ async def receive_vision_frames(websocket : WebSocket) -> AsyncIterator[VisionFr #TODO: needs review -#TODO: method is too complex -def run_peer_loop(session_id : SessionId, rtc_peer : RtcPeer) -> None: - # TODO: combine video and audio queue - # TODO: update test_receive_video_frames, test_receive_audio_frames with the same approach (deque) - video_queue : queue.Queue[VisionFrame] = queue.Queue(maxsize = 1) - audio_queue : queue.Queue[AudioFrame] = queue.Queue(maxsize = 4) - receiver_threads = [] - +async def run_peer_loop(session_id : SessionId, rtc_peer : RtcPeer) -> None: + video_deque : deque[VisionPacket] = deque(maxlen = 1) + audio_deque : deque[AudioPacket] = deque(maxlen = 10) + video_event = threading.Event() video_codec = rtc_peer.get('video').get('codec') video_track = rtc_peer.get('video').get('receiver_track') - video_receiver_thread = threading.Thread(target = receive_video_frames, args = (video_track, video_codec, video_queue), daemon = True) - receiver_threads.append(video_receiver_thread) + + video_receive = asyncio.to_thread(receive_video_frames, video_track, video_codec, video_deque, video_event) + encode_loop = asyncio.to_thread(run_encode_loop, rtc_peer, video_codec, video_deque, audio_deque, video_event) + coroutines = [ video_receive, encode_loop ] if rtc_peer.get('audio'): audio_codec : AudioCodec = 'opus' audio_track = rtc_peer.get('audio').get('receiver_track') - audio_receiver_thread = threading.Thread(target = receive_audio_frames, args = (audio_track, audio_codec, audio_queue), daemon = True) - receiver_threads.append(audio_receiver_thread) + coroutines.append(asyncio.to_thread(receive_audio_frames, audio_track, audio_codec, audio_deque)) - for receiver_thread in receiver_threads: - receiver_thread.start() + await asyncio.gather(*coroutines) + rtc_store.delete_peers(session_id) - temp_vision_frame = video_queue.get() + +#TODO: needs review +#TODO: method is too complex +def run_encode_loop(rtc_peer : RtcPeer, video_codec : VideoCodec, video_deque : deque[VisionPacket], audio_deque : deque[AudioPacket], video_event : threading.Event) -> None: + video_event.wait() + video_event.clear() + temp_vision_frame, video_receive_time = video_deque.popleft() if numpy.any(temp_vision_frame): - audio_frame = create_empty_audio_frame() temp_resolution : Resolution = (temp_vision_frame.shape[1], temp_vision_frame.shape[0]) temp_bitrate : BitRate = 8000 video_encoder = create_video_encoder(video_codec, temp_resolution, temp_bitrate) audio_encoder = opus_encoder.create(48000, 2) frame_index = 0 + temp_video_receive_time = 0.0 while numpy.any(temp_vision_frame): - with contextlib.suppress(queue.Empty): - audio_frame = audio_queue.get_nowait() + frame_duration = 1.0 / 30 - output_vision_frame = streamer.process_frame(audio_frame, temp_vision_frame) + if temp_video_receive_time: + frame_duration = video_receive_time - temp_video_receive_time + + temp_video_receive_time = video_receive_time + + audio_packets : list[tuple[bytes, float]] = [] + + while audio_deque and audio_deque[0][1] < video_receive_time + frame_duration: + audio_frame, audio_time = audio_deque.popleft() + output_audio_buffer = opus_encoder.encode(audio_encoder, audio_frame.tobytes(), 960) + + if output_audio_buffer: + audio_packets.append((output_audio_buffer, audio_time)) + + output_vision_frame = streamer.process_frame(create_empty_audio_frame(), temp_vision_frame) output_resolution : Resolution = (output_vision_frame.shape[1], output_vision_frame.shape[0]) # TODO: align buffer naming with input/output and video/audio convention output_vision_buffer = cv2.cvtColor(output_vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes() - send_timestamp = time.monotonic() - peer_bitrate = rtc_peer.get('sender_bitrate').value # TODO: avoid != in condition @@ -172,39 +186,31 @@ def run_peer_loop(session_id : SessionId, rtc_peer : RtcPeer) -> None: output_video_buffer = encode_video_frame(video_codec, video_encoder, output_vision_buffer, temp_resolution, frame_index) + for audio_buffer, audio_time in audio_packets: + rtc.send_audio(rtc_peer, audio_buffer, int(audio_time * 48000)) + if output_video_buffer: - rtc.send_video(rtc_peer, output_video_buffer, int(send_timestamp * 90000)) - - if audio_encoder and audio_frame.dtype == numpy.float32: - output_audio_buffer = opus_encoder.encode(audio_encoder, audio_frame.tobytes(), 960) - - if output_audio_buffer: - rtc.send_audio(rtc_peer, output_audio_buffer, int(send_timestamp * 48000)) + rtc.send_video(rtc_peer, output_video_buffer, int(video_receive_time * 90000)) frame_index += 1 - temp_vision_frame = video_queue.get() + video_event.wait() + video_event.clear() + temp_vision_frame, video_receive_time = video_deque.popleft() # TODO: remove unconditional destroy methods, which have no impact on control flow destroy_video_encoder(video_codec, video_encoder) opus_encoder.destroy(audio_encoder) rtc.clear_remb(rtc_peer) - for receiver_thread in receiver_threads: - receiver_thread.join() - - rtc_store.delete_peers(session_id) - # TODO: method is too complex -def receive_video_frames(video_track : int, video_codec : VideoCodec, video_queue : queue.Queue[VisionFrame]) -> None: +def receive_video_frames(video_track : int, video_codec : VideoCodec, video_deque : deque[VisionPacket], video_event : threading.Event) -> None: datachannel_library = datachannel_module.create_static_library() video_decoder = create_video_decoder(video_codec) receive_buffer = ctypes.create_string_buffer(512 * 1024) - # todo - could be prepare ready event available_event = threading.Event() available_callback = ctypes.CFUNCTYPE(None, ctypes.c_int, ctypes.c_void_p)(partial(dispatch_event, available_event)) datachannel_library.rtcSetAvailableCallback(video_track, available_callback) - # todo off receive_status_code = -3 while receive_status_code == 0 or receive_status_code == -3: @@ -212,34 +218,33 @@ def receive_video_frames(video_track : int, video_codec : VideoCodec, video_queu receive_status_code = datachannel_library.rtcReceiveMessage(video_track, receive_buffer, ctypes.byref(buffer_size)) if receive_status_code == 0 and buffer_size.value > 0: + receive_time = time.monotonic() # TODO: align buffer naming with input/output and video/audio convention frame_buffer = receive_buffer.raw[:buffer_size.value] vision_frame = decode_video_frame(video_codec, video_decoder, frame_buffer) if numpy.any(vision_frame): - with contextlib.suppress(queue.Empty): - video_queue.get_nowait() - video_queue.put_nowait(vision_frame) + video_deque.append((vision_frame, receive_time)) + video_event.set() if receive_status_code == -3: available_event.wait() available_event.clear() - video_queue.put(numpy.empty(0)) + video_deque.append((numpy.empty(0), 0.0)) + video_event.set() destroy_video_decoder(video_codec, video_decoder) # TODO: audio_codec is not used but has to, even if there is just one # TODO: method is too complex -def receive_audio_frames(audio_track : int, audio_codec : AudioCodec, audio_queue : queue.Queue[AudioFrame]) -> None: +def receive_audio_frames(audio_track : int, audio_codec : AudioCodec, audio_deque : deque[AudioPacket]) -> None: datachannel_library = datachannel_module.create_static_library() audio_decoder = opus_decoder.create(48000, 2) receive_buffer = ctypes.create_string_buffer(8 * 1024) - #todo - could be prepare ready event available_event = threading.Event() available_callback = ctypes.CFUNCTYPE(None, ctypes.c_int, ctypes.c_void_p)(partial(dispatch_event, available_event)) datachannel_library.rtcSetAvailableCallback(audio_track, available_callback) - #todo off receive_status_code = -3 while receive_status_code == 0 or receive_status_code == -3: @@ -247,15 +252,13 @@ def receive_audio_frames(audio_track : int, audio_codec : AudioCodec, audio_queu receive_status_code = datachannel_library.rtcReceiveMessage(audio_track, receive_buffer, ctypes.byref(buffer_size)) if receive_status_code == 0 and buffer_size.value > 0: + receive_time = time.monotonic() # TODO: rename opus_buffer and output_buffer to audio convention opus_buffer = receive_buffer.raw[:buffer_size.value] output_buffer = opus_decoder.decode(audio_decoder, opus_buffer, 960, 2) if output_buffer: - with contextlib.suppress(queue.Empty): - audio_queue.get_nowait() - - audio_queue.put_nowait(numpy.frombuffer(output_buffer, dtype = numpy.float32)) + audio_deque.append((numpy.frombuffer(output_buffer, dtype = numpy.float32), receive_time)) if receive_status_code == -3: available_event.wait() diff --git a/facefusion/types.py b/facefusion/types.py index 9a1a2aaf..1c0636d9 100755 --- a/facefusion/types.py +++ b/facefusion/types.py @@ -313,6 +313,9 @@ RtcPeer = TypedDict('RtcPeer', }) RtcStore : TypeAlias = Dict[SessionId, List[RtcPeer]] +VisionPacket : TypeAlias = tuple[VisionFrame, float] +AudioPacket : TypeAlias = tuple[AudioFrame, float] + SdpAudioMedia = TypedDict('SdpAudioMedia', { 'codec': AudioCodec, diff --git a/tests/test_api_stream_helper.py b/tests/test_api_stream_helper.py index 7a1f1821..eb2a608f 100644 --- a/tests/test_api_stream_helper.py +++ b/tests/test_api_stream_helper.py @@ -1,7 +1,9 @@ +import asyncio import ctypes -import queue import struct import threading +import time +from collections import deque from unittest.mock import AsyncMock, MagicMock, patch import cv2 @@ -9,13 +11,13 @@ import numpy import pytest from facefusion import rtc, rtc_store, state_manager -from facefusion.apis.stream_helper import create_video_decoder, create_video_encoder, decode_video_frame, destroy_stream, destroy_video_decoder, destroy_video_encoder, encode_video_frame, process_image, process_video, receive_audio_frames, receive_video_frames, receive_vision_frames, run_peer_loop, update_video_encoder_bitrate +from facefusion.apis.stream_helper import create_video_decoder, create_video_encoder, decode_video_frame, destroy_stream, destroy_video_decoder, destroy_video_encoder, encode_video_frame, process_image, process_video, receive_audio_frames, receive_video_frames, receive_vision_frames, run_encode_loop, run_peer_loop, update_video_encoder_bitrate from facefusion.codecs import aom_encoder, vpx_encoder from facefusion.common_helper import is_linux, is_macos, is_windows from facefusion.download import conditional_download from facefusion.hash_helper import create_hash from facefusion.libraries import aom as aom_module, datachannel as datachannel_module, opus as opus_module, vpx as vpx_module -from facefusion.types import AudioFrame, RtcPeer, VideoCodec, VisionFrame +from facefusion.types import AudioPacket, RtcPeer, VideoCodec, VisionPacket from facefusion.vision import read_video_frame from .assert_helper import get_test_example_file, get_test_examples_directory @@ -55,14 +57,12 @@ async def test_process_image() -> None: } ] - #TODO: remove init_item once source_paths guard is removed from process_image state_manager.init_item('source_paths', [ get_test_example_file('source.jpg') ]) await process_image(websocket_mock) websocket_mock.send_bytes.assert_called_once() assert websocket_mock.send_bytes.call_args[0][0][:3] == bytes([ 255, 216, 255 ]) - #TODO: remove this block once source_paths guard is removed from process_image state_manager.init_item('source_paths', None) await process_image(websocket_mock) @@ -75,6 +75,7 @@ def test_process_video(video_codec : VideoCodec, session_id : str) -> None: if video_codec == 'av1': rtc.add_video_track(peer_connection, 'sendrecv', video_codec, 35) + if video_codec == 'vp8': rtc.add_video_track(peer_connection, 'sendrecv', video_codec, 96) @@ -137,11 +138,11 @@ async def test_receive_vision_frames() -> None: assert frames[0].shape == vision_frame.shape -def test_run_peer_loop() -> None: - source_frame = read_video_frame(get_test_example_file('target-240p.mp4')) +@pytest.mark.parametrize('video_codec, payload_type', [ ('av1', 35), ('vp8', 96) ]) +def test_run_peer_loop(video_codec : VideoCodec, payload_type : int) -> None: peer_connection = rtc.create_peer_connection() - video_sender_track = rtc.add_video_track(peer_connection, 'sendonly', 'vp8', 96) - video_receiver_track = rtc.add_video_track(peer_connection, 'recvonly', 'vp8', 96) + video_sender_track = rtc.add_video_track(peer_connection, 'sendonly', video_codec, payload_type) + video_receiver_track = rtc.add_video_track(peer_connection, 'recvonly', video_codec, payload_type) rtc_peer : RtcPeer =\ { 'peer_connection': peer_connection, @@ -149,65 +150,149 @@ def test_run_peer_loop() -> None: { 'sender_track': video_sender_track, 'receiver_track': video_receiver_track, - 'codec': 'vp8' + 'codec': video_codec }, 'sender_bitrate': ctypes.c_uint(0), 'receiver_bitrate': ctypes.c_uint(0) } - session_id = 'test-run-peer-loop' + session_id = 'test-run-peer-loop-' + video_codec rtc_store.init_peers(session_id) rtc_store.get_peers(session_id).append(rtc_peer) - datachannel_library_mock = MagicMock() - datachannel_library_mock.rtcReceiveMessage.side_effect = [ 0, -1 ] + with patch('facefusion.apis.stream_helper.receive_video_frames'): + with patch('facefusion.apis.stream_helper.run_encode_loop') as mock_encode_loop: + thread = threading.Thread(target = asyncio.run, args = (run_peer_loop(session_id, rtc_peer),), daemon = True) + thread.start() + thread.join(timeout = 5.0) - with patch('facefusion.apis.stream_helper.datachannel_module.create_static_library', return_value = datachannel_library_mock): - with patch('facefusion.apis.stream_helper.decode_video_frame', return_value = source_frame): - with patch('facefusion.apis.stream_helper.rtc.send_video') as mock_send_video: - thread = threading.Thread(target = run_peer_loop, args = (session_id, rtc_peer), daemon = True) - thread.start() - thread.join(timeout = 5.0) + assert mock_encode_loop.called + assert mock_encode_loop.call_args[0][1] == video_codec + assert rtc_store.has_peers(session_id) is False + + +@pytest.mark.parametrize('video_codec, payload_type', [ ('av1', 35), ('vp8', 96) ]) +def test_run_encode_loop(video_codec : VideoCodec, payload_type : int) -> None: + source_frame = read_video_frame(get_test_example_file('target-240p.mp4')) + peer_connection = rtc.create_peer_connection() + video_sender_track = rtc.add_video_track(peer_connection, 'sendonly', video_codec, payload_type) + video_receiver_track = rtc.add_video_track(peer_connection, 'recvonly', video_codec, payload_type) + rtc_peer : RtcPeer =\ + { + 'peer_connection': peer_connection, + 'video': + { + 'sender_track': video_sender_track, + 'receiver_track': video_receiver_track, + 'codec': video_codec + }, + 'sender_bitrate': ctypes.c_uint(0), + 'receiver_bitrate': ctypes.c_uint(0) + } + + video_deque : deque[VisionPacket] = deque() + audio_deque : deque[AudioPacket] = deque() + video_event = threading.Event() + + video_deque.append((source_frame, 0.100)) + video_event.set() + + with patch('facefusion.apis.stream_helper.rtc.send_video') as mock_send_video: + thread = threading.Thread(target = run_encode_loop, args = (rtc_peer, video_codec, video_deque, audio_deque, video_event), daemon = True) + thread.start() + time.sleep(0.1) + video_deque.append((numpy.empty(0), 0.0)) + video_event.set() + thread.join(timeout = 5.0) assert mock_send_video.called assert len(mock_send_video.call_args[0][1]) > 0 +@pytest.mark.parametrize('video_codec, payload_type', [ ('av1', 35), ('vp8', 96) ]) +def test_run_peer_loop_send_order(video_codec : VideoCodec, payload_type : int) -> None: + source_frame = read_video_frame(get_test_example_file('target-240p.mp4')) + audio_frame = numpy.zeros(960 * 2, dtype = numpy.float32) + peer_connection = rtc.create_peer_connection() + video_sender_track = rtc.add_video_track(peer_connection, 'sendonly', video_codec, payload_type) + video_receiver_track = rtc.add_video_track(peer_connection, 'recvonly', video_codec, payload_type) + rtc_peer : RtcPeer =\ + { + 'peer_connection': peer_connection, + 'video': + { + 'sender_track': video_sender_track, + 'receiver_track': video_receiver_track, + 'codec': video_codec + }, + 'sender_bitrate': ctypes.c_uint(0), + 'receiver_bitrate': ctypes.c_uint(0) + } + + video_deque : deque[VisionPacket] = deque() + audio_deque : deque[AudioPacket] = deque() + video_event = threading.Event() + + video_deque.append((source_frame, 0.100)) + audio_deque.append((audio_frame, 0.100)) + video_event.set() + + manager = MagicMock() + manager.process_frame.return_value = source_frame + manager.opus_encode.return_value = bytes([ 1 ] * 32) + + with patch('facefusion.apis.stream_helper.streamer.process_frame', manager.process_frame): + with patch('facefusion.apis.stream_helper.opus_encoder.encode', manager.opus_encode): + with patch('facefusion.apis.stream_helper.rtc.send_audio', manager.send_audio): + with patch('facefusion.apis.stream_helper.rtc.send_video', manager.send_video): + thread = threading.Thread(target = run_encode_loop, args = (rtc_peer, video_codec, video_deque, audio_deque, video_event), daemon = True) + thread.start() + time.sleep(0.1) + video_deque.append((numpy.empty(0), 0.0)) + video_event.set() + thread.join(timeout = 5.0) + + call_names = [ call[0] for call in manager.mock_calls ] + + assert 'process_frame' in call_names and 'send_audio' in call_names + assert call_names.index('process_frame') < call_names.index('send_audio') + + def test_receive_video_frames() -> None: datachannel_library_mock = MagicMock() datachannel_library_mock.rtcReceiveMessage.side_effect = [ 0, -1 ] vision_frame = read_video_frame(get_test_example_file('target-240p.mp4')) - video_queue : queue.Queue[VisionFrame] = queue.Queue(maxsize = 1) + video_deque : deque[VisionPacket] = deque() + video_event = threading.Event() with patch('facefusion.apis.stream_helper.datachannel_module.create_static_library', return_value = datachannel_library_mock): with patch('facefusion.apis.stream_helper.decode_video_frame', return_value = vision_frame): - receiver_thread = threading.Thread(target = receive_video_frames, args = (0, 'vp8', video_queue), daemon = True) + receiver_thread = threading.Thread(target = receive_video_frames, args = (0, 'vp8', video_deque, video_event), daemon = True) receiver_thread.start() receiver_thread.join(timeout = 2.0) if is_linux() or is_windows(): - assert create_hash(video_queue.get_nowait().tobytes()) == 'a17439db' + assert create_hash(video_deque[0][0].tobytes()) == 'a17439db' if is_macos(): - assert create_hash(video_queue.get_nowait().tobytes()) == '38d00e2a' + assert create_hash(video_deque[0][0].tobytes()) == '38d00e2a' def test_receive_audio_frames() -> None: datachannel_library_mock = MagicMock() datachannel_library_mock.rtcReceiveMessage.side_effect = [ 0, -1 ] - audio_frame = numpy.zeros(960 * 2).astype(numpy.float32) - audio_queue : queue.Queue[AudioFrame] = queue.Queue(maxsize = 4) + audio_data = numpy.zeros(960 * 2, dtype = numpy.float32) + audio_deque : deque[AudioPacket] = deque() with patch('facefusion.apis.stream_helper.datachannel_module.create_static_library', return_value = datachannel_library_mock): - with patch('facefusion.apis.stream_helper.opus_decoder.decode', return_value = audio_frame.tobytes()): - receiver_thread = threading.Thread(target = receive_audio_frames, args = (0, 'opus', audio_queue), daemon = True) + with patch('facefusion.apis.stream_helper.opus_decoder.decode', return_value = audio_data.tobytes()): + receiver_thread = threading.Thread(target = receive_audio_frames, args = (0, 'opus', audio_deque), daemon = True) receiver_thread.start() - audio_frame = audio_queue.get(timeout = 2.0) - receiver_thread.join(timeout = 1.0) + receiver_thread.join(timeout = 2.0) - assert audio_frame.dtype == numpy.float32 - assert audio_frame.size == 960 * 2 - assert audio_queue.empty() + assert audio_deque[0][0].dtype == numpy.float32 + assert audio_deque[0][0].size == 960 * 2 + assert len(audio_deque) == 1 @pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ])