Replace queues with timestamped deques and fix Audio Video sync (#1139)

* change to deque

* remove single line condition

* change threading to asyncio

* bringback todos
This commit is contained in:
Harisreedhar
2026-06-01 15:03:33 +05:30
committed by GitHub
parent 0f5f75ba51
commit aea36a9e55
3 changed files with 175 additions and 84 deletions
+55 -52
View File
@@ -1,8 +1,8 @@
import contextlib
import asyncio
import ctypes
import queue
import threading
import time
from collections import deque
from collections.abc import AsyncIterator
from functools import partial
from typing import Optional
@@ -15,7 +15,7 @@ from facefusion import rtc, rtc_store, state_manager, streamer
from facefusion.audio import create_empty_audio_frame
from facefusion.codecs import aom_decoder, aom_encoder, opus_decoder, opus_encoder, vpx_decoder, vpx_encoder
from facefusion.libraries import datachannel as datachannel_module
from facefusion.types import AomDecoder, AomEncoder, AudioCodec, AudioFrame, BitRate, PeerConnection, Resolution, RtcPeer, RtcPeerAudio, SdpAnswer, SdpOffer, SessionId, VideoCodec, VisionFrame, VpxDecoder, VpxEncoder
from facefusion.types import AomDecoder, AomEncoder, AudioCodec, AudioPacket, BitRate, PeerConnection, Resolution, RtcPeer, RtcPeerAudio, SdpAnswer, SdpOffer, SessionId, VideoCodec, VisionFrame, VisionPacket, VpxDecoder, VpxEncoder
#TODO: remove source_paths guard, process_image should work independent of source_paths since processors decide if they need sources
@@ -88,7 +88,7 @@ def process_video(session_id : SessionId, sdp_offer : SdpOffer) -> Optional[SdpA
rtc_store.init_peers(session_id)
rtc_store.get_peers(session_id).append(rtc_peer)
threading.Thread(target = run_peer_loop, args = (session_id, rtc_peer), daemon = True).start()
threading.Thread(target = asyncio.run, args = (run_peer_loop(session_id, rtc_peer),), daemon = True).start()
return local_sdp
datachannel_module.create_static_library().rtcDeletePeerConnection(peer_connection)
@@ -110,49 +110,63 @@ async def receive_vision_frames(websocket : WebSocket) -> AsyncIterator[VisionFr
#TODO: needs review
#TODO: method is too complex
def run_peer_loop(session_id : SessionId, rtc_peer : RtcPeer) -> None:
# TODO: combine video and audio queue
# TODO: update test_receive_video_frames, test_receive_audio_frames with the same approach (deque)
video_queue : queue.Queue[VisionFrame] = queue.Queue(maxsize = 1)
audio_queue : queue.Queue[AudioFrame] = queue.Queue(maxsize = 4)
receiver_threads = []
async def run_peer_loop(session_id : SessionId, rtc_peer : RtcPeer) -> None:
video_deque : deque[VisionPacket] = deque(maxlen = 1)
audio_deque : deque[AudioPacket] = deque(maxlen = 10)
video_event = threading.Event()
video_codec = rtc_peer.get('video').get('codec')
video_track = rtc_peer.get('video').get('receiver_track')
video_receiver_thread = threading.Thread(target = receive_video_frames, args = (video_track, video_codec, video_queue), daemon = True)
receiver_threads.append(video_receiver_thread)
video_receive = asyncio.to_thread(receive_video_frames, video_track, video_codec, video_deque, video_event)
encode_loop = asyncio.to_thread(run_encode_loop, rtc_peer, video_codec, video_deque, audio_deque, video_event)
coroutines = [ video_receive, encode_loop ]
if rtc_peer.get('audio'):
audio_codec : AudioCodec = 'opus'
audio_track = rtc_peer.get('audio').get('receiver_track')
audio_receiver_thread = threading.Thread(target = receive_audio_frames, args = (audio_track, audio_codec, audio_queue), daemon = True)
receiver_threads.append(audio_receiver_thread)
coroutines.append(asyncio.to_thread(receive_audio_frames, audio_track, audio_codec, audio_deque))
for receiver_thread in receiver_threads:
receiver_thread.start()
await asyncio.gather(*coroutines)
rtc_store.delete_peers(session_id)
temp_vision_frame = video_queue.get()
#TODO: needs review
#TODO: method is too complex
def run_encode_loop(rtc_peer : RtcPeer, video_codec : VideoCodec, video_deque : deque[VisionPacket], audio_deque : deque[AudioPacket], video_event : threading.Event) -> None:
video_event.wait()
video_event.clear()
temp_vision_frame, video_receive_time = video_deque.popleft()
if numpy.any(temp_vision_frame):
audio_frame = create_empty_audio_frame()
temp_resolution : Resolution = (temp_vision_frame.shape[1], temp_vision_frame.shape[0])
temp_bitrate : BitRate = 8000
video_encoder = create_video_encoder(video_codec, temp_resolution, temp_bitrate)
audio_encoder = opus_encoder.create(48000, 2)
frame_index = 0
temp_video_receive_time = 0.0
while numpy.any(temp_vision_frame):
with contextlib.suppress(queue.Empty):
audio_frame = audio_queue.get_nowait()
frame_duration = 1.0 / 30
output_vision_frame = streamer.process_frame(audio_frame, temp_vision_frame)
if temp_video_receive_time:
frame_duration = video_receive_time - temp_video_receive_time
temp_video_receive_time = video_receive_time
audio_packets : list[tuple[bytes, float]] = []
while audio_deque and audio_deque[0][1] < video_receive_time + frame_duration:
audio_frame, audio_time = audio_deque.popleft()
output_audio_buffer = opus_encoder.encode(audio_encoder, audio_frame.tobytes(), 960)
if output_audio_buffer:
audio_packets.append((output_audio_buffer, audio_time))
output_vision_frame = streamer.process_frame(create_empty_audio_frame(), temp_vision_frame)
output_resolution : Resolution = (output_vision_frame.shape[1], output_vision_frame.shape[0])
# TODO: align buffer naming with input/output and video/audio convention
output_vision_buffer = cv2.cvtColor(output_vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes()
send_timestamp = time.monotonic()
peer_bitrate = rtc_peer.get('sender_bitrate').value
# TODO: avoid != in condition
@@ -172,39 +186,31 @@ def run_peer_loop(session_id : SessionId, rtc_peer : RtcPeer) -> None:
output_video_buffer = encode_video_frame(video_codec, video_encoder, output_vision_buffer, temp_resolution, frame_index)
for audio_buffer, audio_time in audio_packets:
rtc.send_audio(rtc_peer, audio_buffer, int(audio_time * 48000))
if output_video_buffer:
rtc.send_video(rtc_peer, output_video_buffer, int(send_timestamp * 90000))
if audio_encoder and audio_frame.dtype == numpy.float32:
output_audio_buffer = opus_encoder.encode(audio_encoder, audio_frame.tobytes(), 960)
if output_audio_buffer:
rtc.send_audio(rtc_peer, output_audio_buffer, int(send_timestamp * 48000))
rtc.send_video(rtc_peer, output_video_buffer, int(video_receive_time * 90000))
frame_index += 1
temp_vision_frame = video_queue.get()
video_event.wait()
video_event.clear()
temp_vision_frame, video_receive_time = video_deque.popleft()
# TODO: remove unconditional destroy methods, which have no impact on control flow
destroy_video_encoder(video_codec, video_encoder)
opus_encoder.destroy(audio_encoder)
rtc.clear_remb(rtc_peer)
for receiver_thread in receiver_threads:
receiver_thread.join()
rtc_store.delete_peers(session_id)
# TODO: method is too complex
def receive_video_frames(video_track : int, video_codec : VideoCodec, video_queue : queue.Queue[VisionFrame]) -> None:
def receive_video_frames(video_track : int, video_codec : VideoCodec, video_deque : deque[VisionPacket], video_event : threading.Event) -> None:
datachannel_library = datachannel_module.create_static_library()
video_decoder = create_video_decoder(video_codec)
receive_buffer = ctypes.create_string_buffer(512 * 1024)
# todo - could be prepare ready event
available_event = threading.Event()
available_callback = ctypes.CFUNCTYPE(None, ctypes.c_int, ctypes.c_void_p)(partial(dispatch_event, available_event))
datachannel_library.rtcSetAvailableCallback(video_track, available_callback)
# todo off
receive_status_code = -3
while receive_status_code == 0 or receive_status_code == -3:
@@ -212,34 +218,33 @@ def receive_video_frames(video_track : int, video_codec : VideoCodec, video_queu
receive_status_code = datachannel_library.rtcReceiveMessage(video_track, receive_buffer, ctypes.byref(buffer_size))
if receive_status_code == 0 and buffer_size.value > 0:
receive_time = time.monotonic()
# TODO: align buffer naming with input/output and video/audio convention
frame_buffer = receive_buffer.raw[:buffer_size.value]
vision_frame = decode_video_frame(video_codec, video_decoder, frame_buffer)
if numpy.any(vision_frame):
with contextlib.suppress(queue.Empty):
video_queue.get_nowait()
video_queue.put_nowait(vision_frame)
video_deque.append((vision_frame, receive_time))
video_event.set()
if receive_status_code == -3:
available_event.wait()
available_event.clear()
video_queue.put(numpy.empty(0))
video_deque.append((numpy.empty(0), 0.0))
video_event.set()
destroy_video_decoder(video_codec, video_decoder)
# TODO: audio_codec is not used but has to, even if there is just one
# TODO: method is too complex
def receive_audio_frames(audio_track : int, audio_codec : AudioCodec, audio_queue : queue.Queue[AudioFrame]) -> None:
def receive_audio_frames(audio_track : int, audio_codec : AudioCodec, audio_deque : deque[AudioPacket]) -> None:
datachannel_library = datachannel_module.create_static_library()
audio_decoder = opus_decoder.create(48000, 2)
receive_buffer = ctypes.create_string_buffer(8 * 1024)
#todo - could be prepare ready event
available_event = threading.Event()
available_callback = ctypes.CFUNCTYPE(None, ctypes.c_int, ctypes.c_void_p)(partial(dispatch_event, available_event))
datachannel_library.rtcSetAvailableCallback(audio_track, available_callback)
#todo off
receive_status_code = -3
while receive_status_code == 0 or receive_status_code == -3:
@@ -247,15 +252,13 @@ def receive_audio_frames(audio_track : int, audio_codec : AudioCodec, audio_queu
receive_status_code = datachannel_library.rtcReceiveMessage(audio_track, receive_buffer, ctypes.byref(buffer_size))
if receive_status_code == 0 and buffer_size.value > 0:
receive_time = time.monotonic()
# TODO: rename opus_buffer and output_buffer to audio convention
opus_buffer = receive_buffer.raw[:buffer_size.value]
output_buffer = opus_decoder.decode(audio_decoder, opus_buffer, 960, 2)
if output_buffer:
with contextlib.suppress(queue.Empty):
audio_queue.get_nowait()
audio_queue.put_nowait(numpy.frombuffer(output_buffer, dtype = numpy.float32))
audio_deque.append((numpy.frombuffer(output_buffer, dtype = numpy.float32), receive_time))
if receive_status_code == -3:
available_event.wait()
+3
View File
@@ -313,6 +313,9 @@ RtcPeer = TypedDict('RtcPeer',
})
RtcStore : TypeAlias = Dict[SessionId, List[RtcPeer]]
VisionPacket : TypeAlias = tuple[VisionFrame, float]
AudioPacket : TypeAlias = tuple[AudioFrame, float]
SdpAudioMedia = TypedDict('SdpAudioMedia',
{
'codec': AudioCodec,
+117 -32
View File
@@ -1,7 +1,9 @@
import asyncio
import ctypes
import queue
import struct
import threading
import time
from collections import deque
from unittest.mock import AsyncMock, MagicMock, patch
import cv2
@@ -9,13 +11,13 @@ import numpy
import pytest
from facefusion import rtc, rtc_store, state_manager
from facefusion.apis.stream_helper import create_video_decoder, create_video_encoder, decode_video_frame, destroy_stream, destroy_video_decoder, destroy_video_encoder, encode_video_frame, process_image, process_video, receive_audio_frames, receive_video_frames, receive_vision_frames, run_peer_loop, update_video_encoder_bitrate
from facefusion.apis.stream_helper import create_video_decoder, create_video_encoder, decode_video_frame, destroy_stream, destroy_video_decoder, destroy_video_encoder, encode_video_frame, process_image, process_video, receive_audio_frames, receive_video_frames, receive_vision_frames, run_encode_loop, run_peer_loop, update_video_encoder_bitrate
from facefusion.codecs import aom_encoder, vpx_encoder
from facefusion.common_helper import is_linux, is_macos, is_windows
from facefusion.download import conditional_download
from facefusion.hash_helper import create_hash
from facefusion.libraries import aom as aom_module, datachannel as datachannel_module, opus as opus_module, vpx as vpx_module
from facefusion.types import AudioFrame, RtcPeer, VideoCodec, VisionFrame
from facefusion.types import AudioPacket, RtcPeer, VideoCodec, VisionPacket
from facefusion.vision import read_video_frame
from .assert_helper import get_test_example_file, get_test_examples_directory
@@ -55,14 +57,12 @@ async def test_process_image() -> None:
}
]
#TODO: remove init_item once source_paths guard is removed from process_image
state_manager.init_item('source_paths', [ get_test_example_file('source.jpg') ])
await process_image(websocket_mock)
websocket_mock.send_bytes.assert_called_once()
assert websocket_mock.send_bytes.call_args[0][0][:3] == bytes([ 255, 216, 255 ])
#TODO: remove this block once source_paths guard is removed from process_image
state_manager.init_item('source_paths', None)
await process_image(websocket_mock)
@@ -75,6 +75,7 @@ def test_process_video(video_codec : VideoCodec, session_id : str) -> None:
if video_codec == 'av1':
rtc.add_video_track(peer_connection, 'sendrecv', video_codec, 35)
if video_codec == 'vp8':
rtc.add_video_track(peer_connection, 'sendrecv', video_codec, 96)
@@ -137,11 +138,11 @@ async def test_receive_vision_frames() -> None:
assert frames[0].shape == vision_frame.shape
def test_run_peer_loop() -> None:
source_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
@pytest.mark.parametrize('video_codec, payload_type', [ ('av1', 35), ('vp8', 96) ])
def test_run_peer_loop(video_codec : VideoCodec, payload_type : int) -> None:
peer_connection = rtc.create_peer_connection()
video_sender_track = rtc.add_video_track(peer_connection, 'sendonly', 'vp8', 96)
video_receiver_track = rtc.add_video_track(peer_connection, 'recvonly', 'vp8', 96)
video_sender_track = rtc.add_video_track(peer_connection, 'sendonly', video_codec, payload_type)
video_receiver_track = rtc.add_video_track(peer_connection, 'recvonly', video_codec, payload_type)
rtc_peer : RtcPeer =\
{
'peer_connection': peer_connection,
@@ -149,65 +150,149 @@ def test_run_peer_loop() -> None:
{
'sender_track': video_sender_track,
'receiver_track': video_receiver_track,
'codec': 'vp8'
'codec': video_codec
},
'sender_bitrate': ctypes.c_uint(0),
'receiver_bitrate': ctypes.c_uint(0)
}
session_id = 'test-run-peer-loop'
session_id = 'test-run-peer-loop-' + video_codec
rtc_store.init_peers(session_id)
rtc_store.get_peers(session_id).append(rtc_peer)
datachannel_library_mock = MagicMock()
datachannel_library_mock.rtcReceiveMessage.side_effect = [ 0, -1 ]
with patch('facefusion.apis.stream_helper.receive_video_frames'):
with patch('facefusion.apis.stream_helper.run_encode_loop') as mock_encode_loop:
thread = threading.Thread(target = asyncio.run, args = (run_peer_loop(session_id, rtc_peer),), daemon = True)
thread.start()
thread.join(timeout = 5.0)
with patch('facefusion.apis.stream_helper.datachannel_module.create_static_library', return_value = datachannel_library_mock):
with patch('facefusion.apis.stream_helper.decode_video_frame', return_value = source_frame):
with patch('facefusion.apis.stream_helper.rtc.send_video') as mock_send_video:
thread = threading.Thread(target = run_peer_loop, args = (session_id, rtc_peer), daemon = True)
thread.start()
thread.join(timeout = 5.0)
assert mock_encode_loop.called
assert mock_encode_loop.call_args[0][1] == video_codec
assert rtc_store.has_peers(session_id) is False
@pytest.mark.parametrize('video_codec, payload_type', [ ('av1', 35), ('vp8', 96) ])
def test_run_encode_loop(video_codec : VideoCodec, payload_type : int) -> None:
source_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
peer_connection = rtc.create_peer_connection()
video_sender_track = rtc.add_video_track(peer_connection, 'sendonly', video_codec, payload_type)
video_receiver_track = rtc.add_video_track(peer_connection, 'recvonly', video_codec, payload_type)
rtc_peer : RtcPeer =\
{
'peer_connection': peer_connection,
'video':
{
'sender_track': video_sender_track,
'receiver_track': video_receiver_track,
'codec': video_codec
},
'sender_bitrate': ctypes.c_uint(0),
'receiver_bitrate': ctypes.c_uint(0)
}
video_deque : deque[VisionPacket] = deque()
audio_deque : deque[AudioPacket] = deque()
video_event = threading.Event()
video_deque.append((source_frame, 0.100))
video_event.set()
with patch('facefusion.apis.stream_helper.rtc.send_video') as mock_send_video:
thread = threading.Thread(target = run_encode_loop, args = (rtc_peer, video_codec, video_deque, audio_deque, video_event), daemon = True)
thread.start()
time.sleep(0.1)
video_deque.append((numpy.empty(0), 0.0))
video_event.set()
thread.join(timeout = 5.0)
assert mock_send_video.called
assert len(mock_send_video.call_args[0][1]) > 0
@pytest.mark.parametrize('video_codec, payload_type', [ ('av1', 35), ('vp8', 96) ])
def test_run_peer_loop_send_order(video_codec : VideoCodec, payload_type : int) -> None:
source_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
audio_frame = numpy.zeros(960 * 2, dtype = numpy.float32)
peer_connection = rtc.create_peer_connection()
video_sender_track = rtc.add_video_track(peer_connection, 'sendonly', video_codec, payload_type)
video_receiver_track = rtc.add_video_track(peer_connection, 'recvonly', video_codec, payload_type)
rtc_peer : RtcPeer =\
{
'peer_connection': peer_connection,
'video':
{
'sender_track': video_sender_track,
'receiver_track': video_receiver_track,
'codec': video_codec
},
'sender_bitrate': ctypes.c_uint(0),
'receiver_bitrate': ctypes.c_uint(0)
}
video_deque : deque[VisionPacket] = deque()
audio_deque : deque[AudioPacket] = deque()
video_event = threading.Event()
video_deque.append((source_frame, 0.100))
audio_deque.append((audio_frame, 0.100))
video_event.set()
manager = MagicMock()
manager.process_frame.return_value = source_frame
manager.opus_encode.return_value = bytes([ 1 ] * 32)
with patch('facefusion.apis.stream_helper.streamer.process_frame', manager.process_frame):
with patch('facefusion.apis.stream_helper.opus_encoder.encode', manager.opus_encode):
with patch('facefusion.apis.stream_helper.rtc.send_audio', manager.send_audio):
with patch('facefusion.apis.stream_helper.rtc.send_video', manager.send_video):
thread = threading.Thread(target = run_encode_loop, args = (rtc_peer, video_codec, video_deque, audio_deque, video_event), daemon = True)
thread.start()
time.sleep(0.1)
video_deque.append((numpy.empty(0), 0.0))
video_event.set()
thread.join(timeout = 5.0)
call_names = [ call[0] for call in manager.mock_calls ]
assert 'process_frame' in call_names and 'send_audio' in call_names
assert call_names.index('process_frame') < call_names.index('send_audio')
def test_receive_video_frames() -> None:
datachannel_library_mock = MagicMock()
datachannel_library_mock.rtcReceiveMessage.side_effect = [ 0, -1 ]
vision_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
video_queue : queue.Queue[VisionFrame] = queue.Queue(maxsize = 1)
video_deque : deque[VisionPacket] = deque()
video_event = threading.Event()
with patch('facefusion.apis.stream_helper.datachannel_module.create_static_library', return_value = datachannel_library_mock):
with patch('facefusion.apis.stream_helper.decode_video_frame', return_value = vision_frame):
receiver_thread = threading.Thread(target = receive_video_frames, args = (0, 'vp8', video_queue), daemon = True)
receiver_thread = threading.Thread(target = receive_video_frames, args = (0, 'vp8', video_deque, video_event), daemon = True)
receiver_thread.start()
receiver_thread.join(timeout = 2.0)
if is_linux() or is_windows():
assert create_hash(video_queue.get_nowait().tobytes()) == 'a17439db'
assert create_hash(video_deque[0][0].tobytes()) == 'a17439db'
if is_macos():
assert create_hash(video_queue.get_nowait().tobytes()) == '38d00e2a'
assert create_hash(video_deque[0][0].tobytes()) == '38d00e2a'
def test_receive_audio_frames() -> None:
datachannel_library_mock = MagicMock()
datachannel_library_mock.rtcReceiveMessage.side_effect = [ 0, -1 ]
audio_frame = numpy.zeros(960 * 2).astype(numpy.float32)
audio_queue : queue.Queue[AudioFrame] = queue.Queue(maxsize = 4)
audio_data = numpy.zeros(960 * 2, dtype = numpy.float32)
audio_deque : deque[AudioPacket] = deque()
with patch('facefusion.apis.stream_helper.datachannel_module.create_static_library', return_value = datachannel_library_mock):
with patch('facefusion.apis.stream_helper.opus_decoder.decode', return_value = audio_frame.tobytes()):
receiver_thread = threading.Thread(target = receive_audio_frames, args = (0, 'opus', audio_queue), daemon = True)
with patch('facefusion.apis.stream_helper.opus_decoder.decode', return_value = audio_data.tobytes()):
receiver_thread = threading.Thread(target = receive_audio_frames, args = (0, 'opus', audio_deque), daemon = True)
receiver_thread.start()
audio_frame = audio_queue.get(timeout = 2.0)
receiver_thread.join(timeout = 1.0)
receiver_thread.join(timeout = 2.0)
assert audio_frame.dtype == numpy.float32
assert audio_frame.size == 960 * 2
assert audio_queue.empty()
assert audio_deque[0][0].dtype == numpy.float32
assert audio_deque[0][0].size == 960 * 2
assert len(audio_deque) == 1
@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ])