Refactor(stream-helper): pass peer context objects to receivers, fix codec guard, move YUV conversion into decoders (#1140)

* extract numpy.empty(0) into an empty_vision_frame variable so the sentinel intent is clear

* bring back todos

* bring back todos

* rename opus_buffer to audio

* move reshape and cvtColor into decoder modules

* add audio_codec check

* add audio_codec check

* rename audio video buffer

* simplify receive methods and cleanup test

* todo

* revert decode methods to return pointer

* remove duplicate test_stream_helper — tests live in test_api_stream_helper

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Harisreedhar
2026-06-01 17:21:06 +05:30
committed by GitHub
parent 02f684e220
commit 3eeb505c86
2 changed files with 74 additions and 63 deletions
+45 -45
View File
@@ -15,7 +15,7 @@ from facefusion import rtc, rtc_store, state_manager, streamer
from facefusion.audio import create_empty_audio_frame
from facefusion.codecs import aom_decoder, aom_encoder, opus_decoder, opus_encoder, vpx_decoder, vpx_encoder
from facefusion.libraries import datachannel as datachannel_module
from facefusion.types import AomDecoder, AomEncoder, AudioCodec, AudioPack, BitRate, PeerConnection, Resolution, RtcPeer, RtcPeerAudio, SdpAnswer, SdpOffer, SessionId, VideoCodec, VideoPack, VisionFrame, VpxDecoder, VpxEncoder
from facefusion.types import AomDecoder, AomEncoder, AomPointer, AudioCodec, AudioPack, BitRate, PeerConnection, Resolution, RtcPeer, RtcPeerAudio, RtcPeerVideo, SdpAnswer, SdpOffer, SessionId, VideoCodec, VideoPack, VisionFrame, VpxDecoder, VpxEncoder, VpxPointer
#TODO: remove source_paths guard, process_image should work independent of source_paths since processors decide if they need sources
@@ -115,23 +115,20 @@ async def run_peer_loop(session_id : SessionId, rtc_peer : RtcPeer) -> None:
audio_deque : deque[AudioPack] = deque(maxlen = 10)
video_event = threading.Event()
video_codec = rtc_peer.get('video').get('codec')
video_track = rtc_peer.get('video').get('receiver_track')
video_receive = asyncio.to_thread(receive_video_frames, video_track, video_codec, video_deque, video_event)
video_receive = asyncio.to_thread(receive_video_frames, rtc_peer.get('video'), video_deque, video_event)
encode_loop = asyncio.to_thread(run_encode_loop, rtc_peer, video_codec, video_deque, audio_deque, video_event)
coroutines = [ video_receive, encode_loop ]
if rtc_peer.get('audio'):
audio_codec : AudioCodec = 'opus'
audio_track = rtc_peer.get('audio').get('receiver_track')
coroutines.append(asyncio.to_thread(receive_audio_frames, audio_track, audio_codec, audio_deque))
coroutines.append(asyncio.to_thread(receive_audio_frames, rtc_peer.get('audio'), audio_deque))
await asyncio.gather(*coroutines)
rtc_store.delete_peers(session_id)
#TODO: needs review
#TODO: method is too complex
#TODO: method is too complex, consider separate encode loop for audio and video?
def run_encode_loop(rtc_peer : RtcPeer, video_codec : VideoCodec, video_deque : deque[VideoPack], audio_deque : deque[AudioPack], video_event : threading.Event) -> None:
video_event.wait()
video_event.clear()
@@ -204,8 +201,9 @@ def run_encode_loop(rtc_peer : RtcPeer, video_codec : VideoCodec, video_deque :
# TODO: method is too complex
# TODO: video_codec is passed separately — consider whether it can be derived from video_track instead
def receive_video_frames(video_track : int, video_codec : VideoCodec, video_deque : deque[VideoPack], video_event : threading.Event) -> None:
def receive_video_frames(rtc_peer_video : RtcPeerVideo, video_deque : deque[VideoPack], video_event : threading.Event) -> None:
video_track = rtc_peer_video.get('receiver_track')
video_codec = rtc_peer_video.get('codec')
datachannel_library = datachannel_module.create_static_library()
video_decoder = create_video_decoder(video_codec)
receive_buffer = ctypes.create_string_buffer(512 * 1024)
@@ -220,9 +218,8 @@ def receive_video_frames(video_track : int, video_codec : VideoCodec, video_dequ
if receive_status_code == 0 and buffer_size.value > 0:
receive_time = time.monotonic()
# TODO: align buffer naming with input/output and video/audio convention
frame_buffer = receive_buffer.raw[:buffer_size.value]
vision_frame = decode_video_frame(video_codec, video_decoder, frame_buffer)
video_buffer = receive_buffer.raw[:buffer_size.value]
vision_frame = decode_video_frame(video_codec, video_decoder, video_buffer)
if numpy.any(vision_frame):
video_deque.append((vision_frame, receive_time))
@@ -232,42 +229,45 @@ def receive_video_frames(video_track : int, video_codec : VideoCodec, video_dequ
available_event.wait()
available_event.clear()
# TODO: extract numpy.empty(0) into an empty_vision_frame variable so the sentinel intent is clear
video_deque.append((numpy.empty(0), 0.0))
empty_vision_frame = numpy.empty(0)
video_deque.append((empty_vision_frame, 0.0))
video_event.set()
destroy_video_decoder(video_codec, video_decoder)
# TODO: audio_codec is not used but has to, even if there is just one
# TODO: audio_codec is passed but never checked — decoder should be selected based on codec the same way video does for av1 vs vp8
# TODO: method is too complex
def receive_audio_frames(audio_track : int, audio_codec : AudioCodec, audio_deque : deque[AudioPack]) -> None:
def receive_audio_frames(rtc_peer_audio : RtcPeerAudio, audio_deque : deque[AudioPack]) -> None:
audio_track = rtc_peer_audio.get('receiver_track')
audio_codec = rtc_peer_audio.get('codec')
datachannel_library = datachannel_module.create_static_library()
audio_decoder = opus_decoder.create(48000, 2)
receive_buffer = ctypes.create_string_buffer(8 * 1024)
available_event = threading.Event()
available_callback = ctypes.CFUNCTYPE(None, ctypes.c_int, ctypes.c_void_p)(partial(dispatch_event, available_event))
datachannel_library.rtcSetAvailableCallback(audio_track, available_callback)
receive_status_code = -3
while receive_status_code == 0 or receive_status_code == -3:
buffer_size = ctypes.c_int(8 * 1024)
receive_status_code = datachannel_library.rtcReceiveMessage(audio_track, receive_buffer, ctypes.byref(buffer_size))
if audio_codec == 'opus':
audio_decoder = opus_decoder.create(48000, 2)
receive_buffer = ctypes.create_string_buffer(8 * 1024)
available_event = threading.Event()
available_callback = ctypes.CFUNCTYPE(None, ctypes.c_int, ctypes.c_void_p)(partial(dispatch_event, available_event))
datachannel_library.rtcSetAvailableCallback(audio_track, available_callback)
receive_status_code = -3
if receive_status_code == 0 and buffer_size.value > 0:
receive_time = time.monotonic()
# TODO: rename opus_buffer and output_buffer to audio convention
opus_buffer = receive_buffer.raw[:buffer_size.value]
output_buffer = opus_decoder.decode(audio_decoder, opus_buffer, 960, 2)
while receive_status_code == 0 or receive_status_code == -3:
buffer_size = ctypes.c_int(8 * 1024)
receive_status_code = datachannel_library.rtcReceiveMessage(audio_track, receive_buffer, ctypes.byref(buffer_size))
if output_buffer:
audio_deque.append((numpy.frombuffer(output_buffer, dtype = numpy.float32), receive_time))
if receive_status_code == 0 and buffer_size.value > 0:
receive_time = time.monotonic()
audio_buffer = receive_buffer.raw[:buffer_size.value]
audio_frame = opus_decoder.decode(audio_decoder, audio_buffer, 960, 2)
if receive_status_code == -3:
available_event.wait()
available_event.clear()
if audio_frame:
audio_deque.append((numpy.frombuffer(audio_frame, dtype = numpy.float32), receive_time))
opus_decoder.destroy(audio_decoder)
if receive_status_code == -3:
available_event.wait()
available_event.clear()
opus_decoder.destroy(audio_decoder)
return None
def decode_video_frame(video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder, input_buffer : bytes) -> Optional[VisionFrame]:
@@ -275,19 +275,13 @@ def decode_video_frame(video_codec : VideoCodec, video_decoder : VpxDecoder | Ao
aom_pointer = aom_decoder.decode(video_decoder, input_buffer)
if aom_pointer:
frame_width, frame_height = aom_pointer.get('resolution')
# TODO: move reshape and cvtColor into decoder modules
vision_frame = numpy.frombuffer(aom_pointer.get('buffer'), dtype = numpy.uint8).reshape((frame_height * 3 // 2, frame_width))
return cv2.cvtColor(vision_frame, cv2.COLOR_YUV2BGR_I420)
return normalize_vision_frame(aom_pointer)
if video_codec == 'vp8':
vpx_pointer = vpx_decoder.decode(video_decoder, input_buffer)
if vpx_pointer:
frame_width, frame_height = vpx_pointer.get('resolution')
# TODO: move reshape and cvtColor into decoder modules
vision_frame = numpy.frombuffer(vpx_pointer.get('buffer'), dtype = numpy.uint8).reshape((frame_height * 3 // 2, frame_width))
return cv2.cvtColor(vision_frame, cv2.COLOR_YUV2BGR_I420)
return normalize_vision_frame(vpx_pointer)
return None
@@ -302,6 +296,12 @@ def encode_video_frame(video_codec : VideoCodec, video_encoder : VpxEncoder | Ao
return bytes()
def normalize_vision_frame(frame_pointer : AomPointer | VpxPointer) -> VisionFrame:
frame_width, frame_height = frame_pointer.get('resolution')
vision_frame = numpy.frombuffer(frame_pointer.get('buffer'), dtype = numpy.uint8).reshape((frame_height * 3 // 2, frame_width))
return cv2.cvtColor(vision_frame, cv2.COLOR_YUV2BGR_I420)
def create_video_decoder(video_codec : VideoCodec) -> Optional[VpxDecoder | AomDecoder]:
if video_codec == 'av1':
return aom_decoder.create(8)
+29 -18
View File
@@ -17,7 +17,7 @@ from facefusion.common_helper import is_linux, is_macos, is_windows
from facefusion.download import conditional_download
from facefusion.hash_helper import create_hash
from facefusion.libraries import aom as aom_module, datachannel as datachannel_module, opus as opus_module, vpx as vpx_module
from facefusion.types import AudioPack, RtcPeer, VideoCodec, VideoPack
from facefusion.types import AudioCodec, AudioPack, RtcPeer, RtcPeerAudio, RtcPeerVideo, VideoCodec, VideoPack
from facefusion.vision import read_video_frame
from .assert_helper import get_test_example_file, get_test_examples_directory
@@ -207,8 +207,8 @@ def test_run_encode_loop(video_codec : VideoCodec, payload_type : int) -> None:
thread = threading.Thread(target = run_encode_loop, args = (rtc_peer, video_codec, video_deque, audio_deque, video_event), daemon = True)
thread.start()
time.sleep(0.1)
# TODO: extract numpy.empty(0) into an empty_vision_frame variable so the sentinel intent is clear
video_deque.append((numpy.empty(0), 0.0))
empty_vision_frame = numpy.empty(0)
video_deque.append((empty_vision_frame, 0.0))
video_event.set()
thread.join(timeout = 5.0)
@@ -258,8 +258,8 @@ def test_run_peer_loop_send_order(video_codec : VideoCodec, payload_type : int)
thread = threading.Thread(target = run_encode_loop, args = (rtc_peer, video_codec, video_deque, audio_deque, video_event), daemon = True)
thread.start()
time.sleep(0.1)
# TODO: extract numpy.empty(0) into an empty_vision_frame variable so the sentinel intent is clear
video_deque.append((numpy.empty(0), 0.0))
empty_vision_frame = numpy.empty(0)
video_deque.append((empty_vision_frame, 0.0))
video_event.set()
thread.join(timeout = 5.0)
@@ -269,7 +269,8 @@ def test_run_peer_loop_send_order(video_codec : VideoCodec, payload_type : int)
assert call_names.index('process_frame') < call_names.index('send_audio')
def test_receive_video_frames() -> None:
@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ])
def test_receive_video_frames(video_codec : VideoCodec) -> None:
datachannel_library_mock = MagicMock()
datachannel_library_mock.rtcReceiveMessage.side_effect = [ 0, -1 ]
vision_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
@@ -278,10 +279,15 @@ def test_receive_video_frames() -> None:
with patch('facefusion.apis.stream_helper.datachannel_module.create_static_library', return_value = datachannel_library_mock):
with patch('facefusion.apis.stream_helper.decode_video_frame', return_value = vision_frame):
# TODO: rename receiver_thread — does not identify whether it receives video or audio
receiver_thread = threading.Thread(target = receive_video_frames, args = (0, 'vp8', video_deque, video_event), daemon = True)
receiver_thread.start()
receiver_thread.join(timeout = 2.0)
rtc_peer_video : RtcPeerVideo =\
{
'sender_track': 0,
'receiver_track': 0,
'codec': video_codec
}
video_receiver_thread = threading.Thread(target = receive_video_frames, args = (rtc_peer_video, video_deque, video_event), daemon = True)
video_receiver_thread.start()
video_receiver_thread.join(timeout = 2.0)
# TODO: avoid [0][0] tuple indexing — use named access once VideoPack becomes a TypedDict
if is_linux() or is_windows():
@@ -292,19 +298,24 @@ def test_receive_video_frames() -> None:
# TODO: refine test
def test_receive_audio_frames() -> None:
@pytest.mark.parametrize('audio_codec', [ 'opus' ])
def test_receive_audio_frames(audio_codec : AudioCodec) -> None:
datachannel_library_mock = MagicMock()
datachannel_library_mock.rtcReceiveMessage.side_effect = [ 0, -1 ]
# TODO: rename audio_data — not a recognised naming convention
audio_data = numpy.zeros(960 * 2, dtype = numpy.float32)
audio_frame = numpy.zeros(960 * 2, dtype = numpy.float32)
audio_deque : deque[AudioPack] = deque()
with patch('facefusion.apis.stream_helper.datachannel_module.create_static_library', return_value = datachannel_library_mock):
with patch('facefusion.apis.stream_helper.opus_decoder.decode', return_value = audio_data.tobytes()):
# TODO: rename receiver_thread — does not identify whether it receives video or audio
receiver_thread = threading.Thread(target = receive_audio_frames, args = (0, 'opus', audio_deque), daemon = True)
receiver_thread.start()
receiver_thread.join(timeout = 2.0)
with patch('facefusion.apis.stream_helper.opus_decoder.decode', return_value = audio_frame.tobytes()):
rtc_peer_audio : RtcPeerAudio =\
{
'sender_track': 0,
'receiver_track': 0,
'codec': audio_codec
}
audio_receiver_thread = threading.Thread(target = receive_audio_frames, args = (rtc_peer_audio, audio_deque), daemon = True)
audio_receiver_thread.start()
audio_receiver_thread.join(timeout = 2.0)
# TODO: assertions do not verify meaningful audio content — dtype/size/len would pass for an empty silent frame
assert audio_deque[0][0].dtype == numpy.float32