combine audio and video queue

This commit is contained in:
harisreedhar
2026-05-30 17:18:45 +05:30
parent 460c65004b
commit 50fee0a14e
3 changed files with 61 additions and 34 deletions
+47 -25
View File
@@ -15,7 +15,7 @@ from facefusion import rtc, rtc_store, state_manager, streamer
from facefusion.audio import create_empty_audio_frame
from facefusion.codecs import aom_decoder, aom_encoder, opus_decoder, opus_encoder, vpx_decoder, vpx_encoder
from facefusion.libraries import datachannel as datachannel_module
from facefusion.types import AomDecoder, AomEncoder, AudioCodec, AudioFrame, BitRate, PeerConnection, Resolution, RtcPeer, RtcPeerAudio, SdpAnswer, SdpOffer, SessionId, VideoCodec, VisionFrame, VpxDecoder, VpxEncoder
from facefusion.types import AomDecoder, AomEncoder, AudioCodec, BitRate, FramePacket, PeerConnection, Resolution, RtcPeer, RtcPeerAudio, SdpAnswer, SdpOffer, SessionId, VideoCodec, VisionFrame, VpxDecoder, VpxEncoder
async def process_image(websocket : WebSocket) -> None:
@@ -111,29 +111,33 @@ async def receive_vision_frames(websocket : WebSocket) -> AsyncIterator[VisionFr
#TODO: needs review
#TODO: method is too complex
def run_peer_loop(session_id : SessionId, rtc_peer : RtcPeer) -> None:
# TODO: combine video and audio queue
video_queue : queue.Queue[VisionFrame] = queue.Queue(maxsize = 1)
audio_queue : queue.Queue[AudioFrame] = queue.Queue(maxsize = 4)
frame_queue : queue.Queue[FramePacket] = queue.Queue(maxsize = 5)
receiver_threads = []
video_codec = rtc_peer.get('video').get('codec')
video_track = rtc_peer.get('video').get('receiver_track')
video_receiver_thread = threading.Thread(target = receive_video_frames, args = (video_track, video_codec, video_queue), daemon = True)
video_receiver_thread = threading.Thread(target = receive_video_frames, args = (video_track, video_codec, frame_queue), daemon = True)
receiver_threads.append(video_receiver_thread)
if rtc_peer.get('audio'):
audio_codec : AudioCodec = 'opus'
audio_track = rtc_peer.get('audio').get('receiver_track')
audio_receiver_thread = threading.Thread(target = receive_audio_frames, args = (audio_track, audio_codec, audio_queue), daemon = True)
audio_receiver_thread = threading.Thread(target = receive_audio_frames, args = (audio_track, audio_codec, frame_queue), daemon = True)
receiver_threads.append(audio_receiver_thread)
for receiver_thread in receiver_threads:
receiver_thread.start()
temp_vision_frame = video_queue.get()
audio_frame = create_empty_audio_frame()
packet = frame_queue.get()
while packet.get('kind') == 'audio':
audio_frame = packet.get('frame')
packet = frame_queue.get()
temp_vision_frame = packet.get('frame')
if numpy.any(temp_vision_frame):
audio_frame = create_empty_audio_frame()
temp_resolution : Resolution = (temp_vision_frame.shape[1], temp_vision_frame.shape[0])
temp_bitrate : BitRate = 8000
video_encoder = create_video_encoder(video_codec, temp_resolution, temp_bitrate)
@@ -141,9 +145,6 @@ def run_peer_loop(session_id : SessionId, rtc_peer : RtcPeer) -> None:
frame_index = 0
while numpy.any(temp_vision_frame):
with contextlib.suppress(queue.Empty):
audio_frame = audio_queue.get_nowait()
output_vision_frame = streamer.process_frame(audio_frame, temp_vision_frame)
output_resolution : Resolution = (output_vision_frame.shape[1], output_vision_frame.shape[0])
# TODO: align buffer naming with input/output and video/audio convention
@@ -180,7 +181,13 @@ def run_peer_loop(session_id : SessionId, rtc_peer : RtcPeer) -> None:
rtc.send_audio(rtc_peer, output_audio_buffer, int(send_timestamp * 48000))
frame_index += 1
temp_vision_frame = video_queue.get()
packet = frame_queue.get()
while packet.get('kind') == 'audio':
audio_frame = packet.get('frame')
packet = frame_queue.get()
temp_vision_frame = packet.get('frame')
# TODO: remove unconditional destroy methods, which have no impact on control flow
destroy_video_encoder(video_codec, video_encoder)
@@ -194,15 +201,13 @@ def run_peer_loop(session_id : SessionId, rtc_peer : RtcPeer) -> None:
# TODO: method is too complex
def receive_video_frames(video_track : int, video_codec : VideoCodec, video_queue : queue.Queue[VisionFrame]) -> None:
def receive_video_frames(video_track : int, video_codec : VideoCodec, frame_queue : queue.Queue[FramePacket]) -> None:
datachannel_library = datachannel_module.create_static_library()
video_decoder = create_video_decoder(video_codec)
receive_buffer = ctypes.create_string_buffer(512 * 1024)
# todo - could be prepare ready event
available_event = threading.Event()
available_callback = ctypes.CFUNCTYPE(None, ctypes.c_int, ctypes.c_void_p)(partial(dispatch_event, available_event))
datachannel_library.rtcSetAvailableCallback(video_track, available_callback)
# todo off
receive_status_code = -3
while receive_status_code == 0 or receive_status_code == -3:
@@ -215,29 +220,39 @@ def receive_video_frames(video_track : int, video_codec : VideoCodec, video_queu
vision_frame = decode_video_frame(video_codec, video_decoder, frame_buffer)
if numpy.any(vision_frame):
with contextlib.suppress(queue.Empty):
video_queue.get_nowait()
video_queue.put_nowait(vision_frame)
if frame_queue.full():
with contextlib.suppress(queue.Empty):
frame_queue.get_nowait()
with contextlib.suppress(queue.Full):
frame_queue.put_nowait(
{
'kind': 'video',
'frame': vision_frame
})
if receive_status_code == -3:
available_event.wait()
available_event.clear()
video_queue.put(numpy.empty(0))
frame_queue.put(
{
'kind': 'video',
'frame': numpy.empty(0)
})
destroy_video_decoder(video_codec, video_decoder)
# TODO: audio_codec is not used but has to, even if there is just one
# TODO: method is too complex
def receive_audio_frames(audio_track : int, audio_codec : AudioCodec, audio_queue : queue.Queue[AudioFrame]) -> None:
def receive_audio_frames(audio_track : int, audio_codec : AudioCodec, frame_queue : queue.Queue[FramePacket]) -> None:
datachannel_library = datachannel_module.create_static_library()
audio_decoder = opus_decoder.create(48000, 2)
receive_buffer = ctypes.create_string_buffer(8 * 1024)
#todo - could be prepare ready event
available_event = threading.Event()
available_callback = ctypes.CFUNCTYPE(None, ctypes.c_int, ctypes.c_void_p)(partial(dispatch_event, available_event))
datachannel_library.rtcSetAvailableCallback(audio_track, available_callback)
#todo off
receive_status_code = -3
while receive_status_code == 0 or receive_status_code == -3:
@@ -250,10 +265,17 @@ def receive_audio_frames(audio_track : int, audio_codec : AudioCodec, audio_queu
output_buffer = opus_decoder.decode(audio_decoder, opus_buffer, 960, 2)
if output_buffer:
with contextlib.suppress(queue.Empty):
audio_queue.get_nowait()
if frame_queue.full():
audio_queue.put_nowait(numpy.frombuffer(output_buffer, dtype = numpy.float32))
with contextlib.suppress(queue.Empty):
frame_queue.get_nowait()
with contextlib.suppress(queue.Full):
frame_queue.put_nowait(
{
'kind': 'audio',
'frame': numpy.frombuffer(output_buffer, dtype = numpy.float32)
})
if receive_status_code == -3:
available_event.wait()
+5
View File
@@ -114,6 +114,11 @@ VpxPointer = TypedDict('VpxPointer',
'buffer' : bytes,
'resolution' : Resolution
})
FramePacket = TypedDict('FramePacket',
{
'kind' : Literal['video', 'audio'],
'frame' : VisionFrame | AudioFrame
})
Args : TypeAlias = Dict[str, Any]
+9 -9
View File
@@ -18,7 +18,7 @@ from facefusion.common_helper import is_linux, is_macos, is_windows
from facefusion.download import conditional_download
from facefusion.hash_helper import create_hash
from facefusion.libraries import aom as aom_module, datachannel as datachannel_module, opus as opus_module, vpx as vpx_module
from facefusion.types import AudioFrame, RtcPeer, VideoCodec, VisionFrame
from facefusion.types import FramePacket, RtcPeer, VideoCodec
from facefusion.vision import read_video_frame
@@ -177,19 +177,19 @@ def test_receive_video_frames() -> None:
vision_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
datachannel_library_mock = MagicMock()
datachannel_library_mock.rtcReceiveMessage.side_effect = [ 0, -1 ]
video_queue : queue.Queue[VisionFrame] = queue.Queue(maxsize = 1)
frame_queue : queue.Queue[FramePacket] = queue.Queue(maxsize = 5)
with patch('facefusion.apis.stream_helper.datachannel_module.create_static_library', return_value = datachannel_library_mock), \
patch('facefusion.apis.stream_helper.decode_video_frame', return_value = vision_frame):
receiver_thread = threading.Thread(target = receive_video_frames, args = (0, 'vp8', video_queue), daemon = True)
receiver_thread = threading.Thread(target = receive_video_frames, args = (0, 'vp8', frame_queue), daemon = True)
receiver_thread.start()
receiver_thread.join(timeout = 2.0)
if is_linux() or is_windows():
assert create_hash(video_queue.get_nowait().tobytes()) == 'a17439db'
assert create_hash(frame_queue.get_nowait().get('frame').tobytes()) == 'a17439db'
if is_macos():
assert create_hash(video_queue.get_nowait().tobytes()) == '38d00e2a'
assert create_hash(frame_queue.get_nowait().get('frame').tobytes()) == '38d00e2a'
# TODO: refine test
@@ -197,18 +197,18 @@ def test_receive_audio_frames() -> None:
audio_frame = numpy.zeros(960 * 2, dtype = numpy.float32)
datachannel_library_mock = MagicMock()
datachannel_library_mock.rtcReceiveMessage.side_effect = [ 0, -1 ]
audio_queue : queue.Queue[AudioFrame] = queue.Queue(maxsize = 4)
frame_queue : queue.Queue[FramePacket] = queue.Queue(maxsize = 5)
with patch('facefusion.apis.stream_helper.datachannel_module.create_static_library', return_value = datachannel_library_mock), \
patch('facefusion.apis.stream_helper.opus_decoder.decode', return_value = audio_frame.tobytes()):
receiver_thread = threading.Thread(target = receive_audio_frames, args = (0, 'opus', audio_queue), daemon = True)
receiver_thread = threading.Thread(target = receive_audio_frames, args = (0, 'opus', frame_queue), daemon = True)
receiver_thread.start()
audio_frame = audio_queue.get(timeout = 2.0)
audio_frame = frame_queue.get(timeout = 2.0).get('frame')
receiver_thread.join(timeout = 1.0)
assert audio_frame.dtype == numpy.float32
assert audio_frame.size == 960 * 2
assert audio_queue.empty()
assert frame_queue.empty()
# TODO: refine test