From 7181b41f2d3fbe17324d3a91a72d519268b77421 Mon Sep 17 00:00:00 2001 From: Henry Ruhs Date: Tue, 2 Jun 2026 16:05:21 +0200 Subject: [PATCH] Feat/finalize stream (#1144) * break stream helper into pieces * remove todos --- facefusion/apis/endpoints/stream.py | 2 +- facefusion/apis/stream_audio.py | 88 ++++++ facefusion/apis/stream_event.py | 15 + facefusion/apis/stream_helper.py | 377 ----------------------- facefusion/apis/stream_manager.py | 127 ++++++++ facefusion/apis/stream_video.py | 175 +++++++++++ tests/test_api_stream_audio.py | 116 +++++++ tests/test_api_stream_helper.py | 461 ---------------------------- tests/test_api_stream_manager.py | 171 +++++++++++ tests/test_api_stream_video.py | 237 ++++++++++++++ 10 files changed, 930 insertions(+), 839 deletions(-) create mode 100644 facefusion/apis/stream_audio.py create mode 100644 facefusion/apis/stream_event.py delete mode 100644 facefusion/apis/stream_helper.py create mode 100644 facefusion/apis/stream_manager.py create mode 100644 facefusion/apis/stream_video.py create mode 100644 tests/test_api_stream_audio.py delete mode 100644 tests/test_api_stream_helper.py create mode 100644 tests/test_api_stream_manager.py create mode 100644 tests/test_api_stream_video.py diff --git a/facefusion/apis/endpoints/stream.py b/facefusion/apis/endpoints/stream.py index a2c7e50f..6f84d001 100644 --- a/facefusion/apis/endpoints/stream.py +++ b/facefusion/apis/endpoints/stream.py @@ -6,7 +6,7 @@ from starlette.websockets import WebSocket, WebSocketState from facefusion import session_context, session_manager from facefusion.apis.api_helper import get_sec_websocket_protocol from facefusion.apis.session_helper import extract_access_token -from facefusion.apis.stream_helper import destroy_stream, process_image, process_video +from facefusion.apis.stream_manager import destroy_stream, process_image, process_video async def websocket_stream(websocket : WebSocket) -> None: diff --git a/facefusion/apis/stream_audio.py b/facefusion/apis/stream_audio.py new file mode 100644 index 00000000..82e9dc6c --- /dev/null +++ b/facefusion/apis/stream_audio.py @@ -0,0 +1,88 @@ +import ctypes +import threading +import time +from collections import deque +from typing import Optional + +import numpy + +from facefusion import rtc +from facefusion.apis.stream_event import create_event +from facefusion.codecs import opus_decoder, opus_encoder +from facefusion.libraries import datachannel as datachannel_module +from facefusion.types import AudioCodec, AudioPack, OpusDecoder, RtcPeer, RtcPeerAudio + + +def run_audio_encode_loop(rtc_peer : RtcPeer, audio_deque : deque[AudioPack], audio_event : threading.Event) -> None: + audio_event.wait() + audio_event.clear() + temp_audio_frame, temp_audio_time = audio_deque.popleft() + audio_encoder = opus_encoder.create(48000, 2) + + while numpy.any(temp_audio_frame): + output_audio_buffer = opus_encoder.encode(audio_encoder, temp_audio_frame.tobytes(), 960) + + if output_audio_buffer: + rtc.send_audio(rtc_peer, output_audio_buffer, int(temp_audio_time * 48000)) + + if len(audio_deque) == 0: + audio_event.wait() + audio_event.clear() + + temp_audio_frame, temp_audio_time = audio_deque.popleft() + + opus_encoder.destroy(audio_encoder) + + +def fill_audio_deque(audio_codec : AudioCodec, audio_decoder : OpusDecoder, audio_buffer : bytes, audio_deque : deque[AudioPack], audio_event : threading.Event) -> None: + audio_frame = decode_audio_frame(audio_codec, audio_decoder, audio_buffer) + + if audio_frame: + audio_deque.append((numpy.frombuffer(audio_frame, dtype = numpy.float32), time.monotonic())) + audio_event.set() + + +def receive_audio_frames(rtc_peer_audio : RtcPeerAudio, audio_deque : deque[AudioPack], audio_event : threading.Event) -> None: + audio_track = rtc_peer_audio.get('receiver_track') + audio_codec = rtc_peer_audio.get('codec') + datachannel_library = datachannel_module.create_static_library() + audio_decoder = create_audio_decoder(audio_codec) + receive_buffer = ctypes.create_string_buffer(8 * 1024) + available_event = create_event(audio_track, datachannel_library) + receive_status_code = -3 + + while receive_status_code == 0 or receive_status_code == -3: + buffer_size = ctypes.c_int(8 * 1024) + receive_status_code = datachannel_library.rtcReceiveMessage(audio_track, receive_buffer, ctypes.byref(buffer_size)) + + if receive_status_code == 0 and buffer_size.value > 0: + audio_buffer = receive_buffer.raw[:buffer_size.value] + fill_audio_deque(audio_codec, audio_decoder, audio_buffer, audio_deque, audio_event) + + if receive_status_code == -3: + available_event.wait() + available_event.clear() + + empty_audio_frame = numpy.empty(0) + audio_deque.append((empty_audio_frame, 0.0)) + audio_event.set() + destroy_audio_decoder(audio_codec, audio_decoder) + + +def decode_audio_frame(audio_codec : AudioCodec, audio_decoder : OpusDecoder, input_buffer : bytes) -> Optional[bytes]: + if audio_codec == 'opus': + return opus_decoder.decode(audio_decoder, input_buffer, 960, 2) + + return None + + +def create_audio_decoder(audio_codec : AudioCodec) -> Optional[OpusDecoder]: + if audio_codec == 'opus': + return opus_decoder.create(48000, 2) + + return None + + +def destroy_audio_decoder(audio_codec : AudioCodec, audio_decoder : OpusDecoder) -> None: + if audio_codec == 'opus': + opus_decoder.destroy(audio_decoder) diff --git a/facefusion/apis/stream_event.py b/facefusion/apis/stream_event.py new file mode 100644 index 00000000..17ec2c64 --- /dev/null +++ b/facefusion/apis/stream_event.py @@ -0,0 +1,15 @@ +import ctypes +import threading +from functools import partial + + +def create_event(track : int, datachannel_library : ctypes.CDLL) -> threading.Event: + available_event = threading.Event() + available_callback = ctypes.CFUNCTYPE(None, ctypes.c_int, ctypes.c_void_p)(partial(dispatch_event, available_event)) + datachannel_library.rtcSetAvailableCallback(track, available_callback) + available_event.callback = available_callback # type: ignore[attr-defined] + return available_event + + +def dispatch_event(event : threading.Event, track : int, pointer : ctypes.c_void_p) -> None: + event.set() diff --git a/facefusion/apis/stream_helper.py b/facefusion/apis/stream_helper.py deleted file mode 100644 index b2a62f06..00000000 --- a/facefusion/apis/stream_helper.py +++ /dev/null @@ -1,377 +0,0 @@ -import asyncio -import ctypes -import threading -import time -from collections import deque -from collections.abc import AsyncIterator -from functools import partial -from typing import Optional - -import cv2 -import numpy -from starlette.websockets import WebSocket - -from facefusion import rtc, rtc_store, streamer -from facefusion.audio import create_empty_audio_frame -from facefusion.codecs import aom_decoder, aom_encoder, opus_decoder, opus_encoder, vpx_decoder, vpx_encoder -from facefusion.libraries import datachannel as datachannel_module -from facefusion.types import AomDecoder, AomEncoder, AomPointer, AudioCodec, AudioPack, BitRate, OpusDecoder, PeerConnection, Resolution, RtcPeer, RtcPeerAudio, RtcPeerVideo, SdpAnswer, SdpOffer, SessionId, VideoCodec, VideoPack, VisionFrame, VpxDecoder, VpxEncoder, VpxPointer - - -async def process_image(websocket : WebSocket) -> None: - capture_vision_frame = await anext(receive_vision_frames(websocket), None) - - if numpy.any(capture_vision_frame): - output_vision_frame = streamer.process_frame(create_empty_audio_frame(), capture_vision_frame) - is_success, output_frame_buffer = cv2.imencode('.jpg', output_vision_frame) - - if is_success: - await websocket.send_bytes(output_frame_buffer.tobytes()) - - -#TODO: needs review -def process_video(session_id : SessionId, sdp_offer : SdpOffer) -> Optional[SdpAnswer]: - video_codec : VideoCodec = 'vp8' - - if rtc.get_payload_type(sdp_offer, 'av1'): - video_codec = 'av1' - - video_payload_type = rtc.get_payload_type(sdp_offer, video_codec) - - if video_payload_type: - peer_connection : PeerConnection = rtc.create_peer_connection() - video_receiver_track = rtc.add_video_track(peer_connection, 'recvonly', video_codec, video_payload_type) - video_sender_track = rtc.add_video_track(peer_connection, 'sendonly', video_codec, video_payload_type) - sender_bitrate = ctypes.c_uint(0) - rtc.wire_remb(video_sender_track, sender_bitrate) - receiver_bitrate = ctypes.c_uint(0) - rtc.wire_remb(video_receiver_track, receiver_bitrate) - - audio_codec : AudioCodec = 'opus' - audio_payload_type = rtc.get_payload_type(sdp_offer, audio_codec) - - if audio_payload_type: - audio_receiver_track = rtc.add_audio_track(peer_connection, 'recvonly', audio_codec, audio_payload_type) - audio_sender_track = rtc.add_audio_track(peer_connection, 'sendonly', audio_codec, audio_payload_type) - - rtc.set_remote_description(peer_connection, sdp_offer) - local_sdp = rtc.create_sdp_answer(peer_connection) - - if local_sdp: - rtc_peer : RtcPeer =\ - { - 'peer_connection': peer_connection, - 'video': - { - 'sender_track': video_sender_track, - 'receiver_track': video_receiver_track, - 'codec': video_codec - }, - 'sender_bitrate': sender_bitrate, - 'receiver_bitrate': receiver_bitrate - } - - if audio_payload_type: - rtc_peer['audio'] = RtcPeerAudio( - sender_track = audio_sender_track, - receiver_track = audio_receiver_track, - codec = audio_codec - ) - - rtc_store.init_peers(session_id) - rtc_store.get_peers(session_id).append(rtc_peer) - - threading.Thread(target = asyncio.run, args = (run_peer_loop(session_id, rtc_peer),), daemon = True).start() - return local_sdp - - datachannel_module.create_static_library().rtcDeletePeerConnection(peer_connection) - - return None - - -async def receive_vision_frames(websocket : WebSocket) -> AsyncIterator[VisionFrame]: - websocket_event = await websocket.receive() - - while websocket_event.get('type') == 'websocket.receive': - frame_buffer = websocket_event.get('bytes') or bytes() - vision_frame = cv2.imdecode(numpy.frombuffer(frame_buffer, numpy.uint8), cv2.IMREAD_COLOR) - - if numpy.any(vision_frame): - yield vision_frame - - websocket_event = await websocket.receive() - - -#TODO: needs review -async def run_peer_loop(session_id : SessionId, rtc_peer : RtcPeer) -> None: - video_deque : deque[VideoPack] = deque(maxlen = 1) - audio_deque : deque[AudioPack] = deque(maxlen = 10) - video_event = threading.Event() - - video_receiver_thread = asyncio.to_thread(receive_video_frames, rtc_peer.get('video'), video_deque, video_event) - video_encoder_thread = asyncio.to_thread(run_video_encode_loop, rtc_peer, video_deque, video_event) - coroutines = [ video_receiver_thread, video_encoder_thread ] - - if rtc_peer.get('audio'): - audio_event = threading.Event() - coroutines.append(asyncio.to_thread(receive_audio_frames, rtc_peer.get('audio'), audio_deque, audio_event)) - coroutines.append(asyncio.to_thread(run_audio_encode_loop, rtc_peer, audio_deque, audio_event)) - - await asyncio.gather(*coroutines) - rtc_store.delete_peers(session_id) - - -#TODO: needs review -def run_video_encode_loop(rtc_peer : RtcPeer, video_deque : deque[VideoPack], video_event : threading.Event) -> None: - video_event.wait() - video_event.clear() - video_codec = rtc_peer.get('video').get('codec') - temp_vision_frame, temp_video_time = video_deque.popleft() - - if numpy.any(temp_vision_frame): - temp_resolution : Resolution = (temp_vision_frame.shape[1], temp_vision_frame.shape[0]) - temp_bitrate : BitRate = 8000 - video_encoder = create_video_encoder(video_codec, temp_resolution, temp_bitrate) - frame_index = 0 - - while numpy.any(temp_vision_frame): - output_vision_frame = streamer.process_frame(create_empty_audio_frame(), temp_vision_frame) - output_resolution : Resolution = (output_vision_frame.shape[1], output_vision_frame.shape[0]) - output_vision_buffer = cv2.cvtColor(output_vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes() - - peer_bitrate = rtc_peer.get('sender_bitrate').value - - if output_resolution[0] - temp_resolution[0] or output_resolution[1] - temp_resolution[1]: - destroy_video_encoder(video_codec, video_encoder) - temp_resolution = output_resolution - video_encoder = create_video_encoder(video_codec, temp_resolution, temp_bitrate) - frame_index = 0 - - if peer_bitrate and peer_bitrate - temp_bitrate: - temp_bitrate = peer_bitrate - - if not update_video_encoder_bitrate(video_codec, video_encoder, temp_bitrate): - destroy_video_encoder(video_codec, video_encoder) - video_encoder = create_video_encoder(video_codec, temp_resolution, temp_bitrate) - frame_index = 0 - - output_video_buffer = encode_video_frame(video_codec, video_encoder, output_vision_buffer, temp_resolution, frame_index) - - if output_video_buffer: - rtc.send_video(rtc_peer, output_video_buffer, int(temp_video_time * 90000)) - - frame_index += 1 - video_event.wait() - video_event.clear() - temp_vision_frame, temp_video_time = video_deque.popleft() - - destroy_video_encoder(video_codec, video_encoder) - rtc.clear_remb(rtc_peer) - - -def run_audio_encode_loop(rtc_peer : RtcPeer, audio_deque : deque[AudioPack], audio_event : threading.Event) -> None: - audio_event.wait() - audio_event.clear() - temp_audio_frame, temp_audio_time = audio_deque.popleft() - audio_encoder = opus_encoder.create(48000, 2) - - while numpy.any(temp_audio_frame): - output_audio_buffer = opus_encoder.encode(audio_encoder, temp_audio_frame.tobytes(), 960) - - if output_audio_buffer: - rtc.send_audio(rtc_peer, output_audio_buffer, int(temp_audio_time * 48000)) - - if len(audio_deque) == 0: - audio_event.wait() - audio_event.clear() - - temp_audio_frame, temp_audio_time = audio_deque.popleft() - - opus_encoder.destroy(audio_encoder) - - -def fill_video_deque(video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder, video_buffer : bytes, video_deque : deque[VideoPack], video_event : threading.Event) -> None: - vision_frame = decode_video_frame(video_codec, video_decoder, video_buffer) - - if numpy.any(vision_frame): - video_deque.append((vision_frame, time.monotonic())) - video_event.set() - - -def receive_video_frames(rtc_peer_video : RtcPeerVideo, video_deque : deque[VideoPack], video_event : threading.Event) -> None: - video_track = rtc_peer_video.get('receiver_track') - video_codec = rtc_peer_video.get('codec') - datachannel_library = datachannel_module.create_static_library() - video_decoder = create_video_decoder(video_codec) - receive_buffer = ctypes.create_string_buffer(512 * 1024) - available_event = create_event(video_track, datachannel_library) - receive_status_code = -3 - - while receive_status_code == 0 or receive_status_code == -3: - buffer_size = ctypes.c_int(512 * 1024) - receive_status_code = datachannel_library.rtcReceiveMessage(video_track, receive_buffer, ctypes.byref(buffer_size)) - - if receive_status_code == 0 and buffer_size.value > 0: - video_buffer = receive_buffer.raw[:buffer_size.value] - fill_video_deque(video_codec, video_decoder, video_buffer, video_deque, video_event) - - if receive_status_code == -3: - available_event.wait() - available_event.clear() - - empty_vision_frame = numpy.empty(0) - video_deque.append((empty_vision_frame, 0.0)) - video_event.set() - destroy_video_decoder(video_codec, video_decoder) - - -def fill_audio_deque(audio_codec : AudioCodec, audio_decoder : OpusDecoder, audio_buffer : bytes, audio_deque : deque[AudioPack], audio_event : threading.Event) -> None: - audio_frame = decode_audio_frame(audio_codec, audio_decoder, audio_buffer) - - if audio_frame: - audio_deque.append((numpy.frombuffer(audio_frame, dtype = numpy.float32), time.monotonic())) - audio_event.set() - - -def receive_audio_frames(rtc_peer_audio : RtcPeerAudio, audio_deque : deque[AudioPack], audio_event : threading.Event) -> None: - audio_track = rtc_peer_audio.get('receiver_track') - audio_codec = rtc_peer_audio.get('codec') - datachannel_library = datachannel_module.create_static_library() - audio_decoder = create_audio_decoder(audio_codec) - receive_buffer = ctypes.create_string_buffer(8 * 1024) - available_event = create_event(audio_track, datachannel_library) - receive_status_code = -3 - - while receive_status_code == 0 or receive_status_code == -3: - buffer_size = ctypes.c_int(8 * 1024) - receive_status_code = datachannel_library.rtcReceiveMessage(audio_track, receive_buffer, ctypes.byref(buffer_size)) - - if receive_status_code == 0 and buffer_size.value > 0: - audio_buffer = receive_buffer.raw[:buffer_size.value] - fill_audio_deque(audio_codec, audio_decoder, audio_buffer, audio_deque, audio_event) - - if receive_status_code == -3: - available_event.wait() - available_event.clear() - - empty_audio_frame = numpy.empty(0) - audio_deque.append((empty_audio_frame, 0.0)) - audio_event.set() - destroy_audio_decoder(audio_codec, audio_decoder) - - -def decode_video_frame(video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder, input_buffer : bytes) -> Optional[VisionFrame]: - if video_codec == 'av1': - aom_pointer = aom_decoder.decode(video_decoder, input_buffer) - - if aom_pointer: - return normalize_vision_frame(aom_pointer) - - if video_codec == 'vp8': - vpx_pointer = vpx_decoder.decode(video_decoder, input_buffer) - - if vpx_pointer: - return normalize_vision_frame(vpx_pointer) - - return None - - -def decode_audio_frame(audio_codec : AudioCodec, audio_decoder : OpusDecoder, input_buffer : bytes) -> Optional[bytes]: - if audio_codec == 'opus': - return opus_decoder.decode(audio_decoder, input_buffer, 960, 2) - - return None - - -def encode_video_frame(video_codec : VideoCodec, video_encoder : VpxEncoder | AomEncoder, input_buffer : bytes, frame_resolution : Resolution, frame_index : int) -> bytes: - if video_codec == 'av1': - return aom_encoder.encode(video_encoder, input_buffer, frame_resolution, frame_index) - - if video_codec == 'vp8': - return vpx_encoder.encode(video_encoder, input_buffer, frame_resolution, frame_index) - - return bytes() - - -def normalize_vision_frame(frame_pointer : AomPointer | VpxPointer) -> VisionFrame: - frame_width, frame_height = frame_pointer.get('resolution') - vision_frame = numpy.frombuffer(frame_pointer.get('buffer'), dtype = numpy.uint8).reshape((frame_height * 3 // 2, frame_width)) - return cv2.cvtColor(vision_frame, cv2.COLOR_YUV2BGR_I420) - - -def create_audio_decoder(audio_codec : AudioCodec) -> Optional[OpusDecoder]: - if audio_codec == 'opus': - return opus_decoder.create(48000, 2) - - return None - - -def create_video_decoder(video_codec : VideoCodec) -> Optional[VpxDecoder | AomDecoder]: - if video_codec == 'av1': - return aom_decoder.create(8) - - if video_codec == 'vp8': - return vpx_decoder.create(8) - - return None - - -def create_video_encoder(video_codec : VideoCodec, frame_resolution : Resolution, bitrate : BitRate) -> Optional[VpxEncoder | AomEncoder]: - if video_codec == 'av1': - return aom_encoder.create(frame_resolution, bitrate, 8, 10) - - if video_codec == 'vp8': - return vpx_encoder.create(frame_resolution, bitrate, 8, 10) - - return None - - -def destroy_audio_decoder(audio_codec : AudioCodec, audio_decoder : OpusDecoder) -> None: - if audio_codec == 'opus': - opus_decoder.destroy(audio_decoder) - - -def destroy_video_decoder(video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder) -> None: - if video_codec == 'av1': - aom_decoder.destroy(video_decoder) - - if video_codec == 'vp8': - vpx_decoder.destroy(video_decoder) - - -def update_video_encoder_bitrate(video_codec : VideoCodec, video_encoder : VpxEncoder | AomEncoder, bitrate : BitRate) -> bool: - if video_codec == 'av1': - return aom_encoder.update_bitrate(video_encoder, bitrate) - - if video_codec == 'vp8': - return vpx_encoder.update_bitrate(video_encoder, bitrate) - - return False - - -def destroy_video_encoder(video_codec : VideoCodec, video_encoder : VpxEncoder | AomEncoder) -> None: - if video_codec == 'av1': - aom_encoder.destroy(video_encoder) - - if video_codec == 'vp8': - vpx_encoder.destroy(video_encoder) - - -def destroy_stream(session_id : SessionId) -> bool: - if rtc_store.has_peers(session_id): - rtc_store.delete_peers(session_id) - return not rtc_store.has_peers(session_id) - - return False - - -def create_event(track : int, datachannel_library : ctypes.CDLL) -> threading.Event: - available_event = threading.Event() - available_callback = ctypes.CFUNCTYPE(None, ctypes.c_int, ctypes.c_void_p)(partial(dispatch_event, available_event)) - datachannel_library.rtcSetAvailableCallback(track, available_callback) - available_event.callback = available_callback # type: ignore[attr-defined] - return available_event - - -def dispatch_event(event : threading.Event, track : int, pointer : ctypes.c_void_p) -> None: - event.set() diff --git a/facefusion/apis/stream_manager.py b/facefusion/apis/stream_manager.py new file mode 100644 index 00000000..53d5efbd --- /dev/null +++ b/facefusion/apis/stream_manager.py @@ -0,0 +1,127 @@ +import asyncio +import ctypes +import threading +from collections import deque +from collections.abc import AsyncIterator +from typing import Optional + +import cv2 +import numpy +from starlette.websockets import WebSocket + +from facefusion import rtc, rtc_store, streamer +from facefusion.apis.stream_audio import receive_audio_frames, run_audio_encode_loop +from facefusion.apis.stream_video import receive_video_frames, run_video_encode_loop +from facefusion.audio import create_empty_audio_frame +from facefusion.libraries import datachannel as datachannel_module +from facefusion.types import AudioCodec, AudioPack, PeerConnection, RtcPeer, RtcPeerAudio, SdpAnswer, SdpOffer, SessionId, VideoCodec, VideoPack, VisionFrame + + +async def process_image(websocket : WebSocket) -> None: + capture_vision_frame = await anext(receive_vision_frames(websocket), None) + + if numpy.any(capture_vision_frame): + output_vision_frame = streamer.process_frame(create_empty_audio_frame(), capture_vision_frame) + is_success, output_frame_buffer = cv2.imencode('.jpg', output_vision_frame) + + if is_success: + await websocket.send_bytes(output_frame_buffer.tobytes()) + + +def process_video(session_id : SessionId, sdp_offer : SdpOffer) -> Optional[SdpAnswer]: + video_codec : VideoCodec = 'vp8' + + if rtc.get_payload_type(sdp_offer, 'av1'): + video_codec = 'av1' + + video_payload_type = rtc.get_payload_type(sdp_offer, video_codec) + + if video_payload_type: + peer_connection : PeerConnection = rtc.create_peer_connection() + video_receiver_track = rtc.add_video_track(peer_connection, 'recvonly', video_codec, video_payload_type) + video_sender_track = rtc.add_video_track(peer_connection, 'sendonly', video_codec, video_payload_type) + + sender_bitrate = ctypes.c_uint(0) + receiver_bitrate = ctypes.c_uint(0) + rtc.wire_remb(video_sender_track, sender_bitrate) + rtc.wire_remb(video_receiver_track, receiver_bitrate) + + audio_codec : AudioCodec = 'opus' + audio_payload_type = rtc.get_payload_type(sdp_offer, audio_codec) + + if audio_payload_type: + audio_receiver_track = rtc.add_audio_track(peer_connection, 'recvonly', audio_codec, audio_payload_type) + audio_sender_track = rtc.add_audio_track(peer_connection, 'sendonly', audio_codec, audio_payload_type) + + rtc.set_remote_description(peer_connection, sdp_offer) + local_sdp = rtc.create_sdp_answer(peer_connection) + + if local_sdp: + rtc_peer : RtcPeer =\ + { + 'peer_connection': peer_connection, + 'video': + { + 'sender_track': video_sender_track, + 'receiver_track': video_receiver_track, + 'codec': video_codec + }, + 'sender_bitrate': sender_bitrate, + 'receiver_bitrate': receiver_bitrate + } + + if audio_payload_type: + rtc_peer['audio'] = RtcPeerAudio( + sender_track = audio_sender_track, + receiver_track = audio_receiver_track, + codec = audio_codec + ) + + rtc_store.init_peers(session_id) + rtc_store.get_peers(session_id).append(rtc_peer) + + threading.Thread(target = asyncio.run, args = (run_peer_loop(session_id, rtc_peer),), daemon = True).start() + return local_sdp + + datachannel_module.create_static_library().rtcDeletePeerConnection(peer_connection) + + return None + + +async def receive_vision_frames(websocket : WebSocket) -> AsyncIterator[VisionFrame]: + websocket_event = await websocket.receive() + + while websocket_event.get('type') == 'websocket.receive': + frame_buffer = websocket_event.get('bytes') or bytes() + vision_frame = cv2.imdecode(numpy.frombuffer(frame_buffer, numpy.uint8), cv2.IMREAD_COLOR) + + if numpy.any(vision_frame): + yield vision_frame + + websocket_event = await websocket.receive() + + +async def run_peer_loop(session_id : SessionId, rtc_peer : RtcPeer) -> None: + video_deque : deque[VideoPack] = deque(maxlen = 1) + audio_deque : deque[AudioPack] = deque(maxlen = 10) + video_event = threading.Event() + + video_receiver_thread = asyncio.to_thread(receive_video_frames, rtc_peer.get('video'), video_deque, video_event) + video_encoder_thread = asyncio.to_thread(run_video_encode_loop, rtc_peer, video_deque, video_event) + coroutines = [ video_receiver_thread, video_encoder_thread ] + + if rtc_peer.get('audio'): + audio_event = threading.Event() + coroutines.append(asyncio.to_thread(receive_audio_frames, rtc_peer.get('audio'), audio_deque, audio_event)) + coroutines.append(asyncio.to_thread(run_audio_encode_loop, rtc_peer, audio_deque, audio_event)) + + await asyncio.gather(*coroutines) + rtc_store.delete_peers(session_id) + + +def destroy_stream(session_id : SessionId) -> bool: + if rtc_store.has_peers(session_id): + rtc_store.delete_peers(session_id) + return not rtc_store.has_peers(session_id) + + return False diff --git a/facefusion/apis/stream_video.py b/facefusion/apis/stream_video.py new file mode 100644 index 00000000..8b294eb5 --- /dev/null +++ b/facefusion/apis/stream_video.py @@ -0,0 +1,175 @@ +import ctypes +import threading +import time +from collections import deque +from typing import Optional + +import cv2 +import numpy + +from facefusion import rtc, streamer +from facefusion.apis.stream_event import create_event +from facefusion.audio import create_empty_audio_frame +from facefusion.codecs import aom_decoder, aom_encoder, vpx_decoder, vpx_encoder +from facefusion.libraries import datachannel as datachannel_module +from facefusion.types import AomDecoder, AomEncoder, AomPointer, BitRate, Resolution, RtcPeer, RtcPeerVideo, VideoCodec, VideoPack, VisionFrame, VpxDecoder, VpxEncoder, VpxPointer + + +def run_video_encode_loop(rtc_peer : RtcPeer, video_deque : deque[VideoPack], video_event : threading.Event) -> None: + video_event.wait() + video_event.clear() + video_codec = rtc_peer.get('video').get('codec') + temp_vision_frame, temp_video_time = video_deque.popleft() + + if numpy.any(temp_vision_frame): + temp_resolution : Resolution = (temp_vision_frame.shape[1], temp_vision_frame.shape[0]) + temp_bitrate : BitRate = 8000 + video_encoder = create_video_encoder(video_codec, temp_resolution, temp_bitrate) + frame_index = 0 + + while numpy.any(temp_vision_frame): + output_vision_frame = streamer.process_frame(create_empty_audio_frame(), temp_vision_frame) + output_resolution : Resolution = (output_vision_frame.shape[1], output_vision_frame.shape[0]) + output_vision_buffer = cv2.cvtColor(output_vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes() + + peer_bitrate = rtc_peer.get('sender_bitrate').value + + if output_resolution[0] - temp_resolution[0] or output_resolution[1] - temp_resolution[1]: + destroy_video_encoder(video_codec, video_encoder) + temp_resolution = output_resolution + video_encoder = create_video_encoder(video_codec, temp_resolution, temp_bitrate) + frame_index = 0 + + if peer_bitrate and peer_bitrate - temp_bitrate: + temp_bitrate = peer_bitrate + + if not update_video_encoder_bitrate(video_codec, video_encoder, temp_bitrate): + destroy_video_encoder(video_codec, video_encoder) + video_encoder = create_video_encoder(video_codec, temp_resolution, temp_bitrate) + frame_index = 0 + + output_video_buffer = encode_video_frame(video_codec, video_encoder, output_vision_buffer, temp_resolution, frame_index) + + if output_video_buffer: + rtc.send_video(rtc_peer, output_video_buffer, int(temp_video_time * 90000)) + + frame_index += 1 + video_event.wait() + video_event.clear() + temp_vision_frame, temp_video_time = video_deque.popleft() + + destroy_video_encoder(video_codec, video_encoder) + rtc.clear_remb(rtc_peer) + + +def fill_video_deque(video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder, video_buffer : bytes, video_deque : deque[VideoPack], video_event : threading.Event) -> None: + vision_frame = decode_video_frame(video_codec, video_decoder, video_buffer) + + if numpy.any(vision_frame): + video_deque.append((vision_frame, time.monotonic())) + video_event.set() + + +def receive_video_frames(rtc_peer_video : RtcPeerVideo, video_deque : deque[VideoPack], video_event : threading.Event) -> None: + video_track = rtc_peer_video.get('receiver_track') + video_codec = rtc_peer_video.get('codec') + datachannel_library = datachannel_module.create_static_library() + video_decoder = create_video_decoder(video_codec) + receive_buffer = ctypes.create_string_buffer(512 * 1024) + available_event = create_event(video_track, datachannel_library) + receive_status_code = -3 + + while receive_status_code == 0 or receive_status_code == -3: + buffer_size = ctypes.c_int(512 * 1024) + receive_status_code = datachannel_library.rtcReceiveMessage(video_track, receive_buffer, ctypes.byref(buffer_size)) + + if receive_status_code == 0 and buffer_size.value > 0: + video_buffer = receive_buffer.raw[:buffer_size.value] + fill_video_deque(video_codec, video_decoder, video_buffer, video_deque, video_event) + + if receive_status_code == -3: + available_event.wait() + available_event.clear() + + empty_vision_frame = numpy.empty(0) + video_deque.append((empty_vision_frame, 0.0)) + video_event.set() + destroy_video_decoder(video_codec, video_decoder) + + +def decode_video_frame(video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder, input_buffer : bytes) -> Optional[VisionFrame]: + if video_codec == 'av1': + aom_pointer = aom_decoder.decode(video_decoder, input_buffer) + + if aom_pointer: + return normalize_vision_frame(aom_pointer) + + if video_codec == 'vp8': + vpx_pointer = vpx_decoder.decode(video_decoder, input_buffer) + + if vpx_pointer: + return normalize_vision_frame(vpx_pointer) + + return None + + +def encode_video_frame(video_codec : VideoCodec, video_encoder : VpxEncoder | AomEncoder, input_buffer : bytes, frame_resolution : Resolution, frame_index : int) -> bytes: + if video_codec == 'av1': + return aom_encoder.encode(video_encoder, input_buffer, frame_resolution, frame_index) + + if video_codec == 'vp8': + return vpx_encoder.encode(video_encoder, input_buffer, frame_resolution, frame_index) + + return bytes() + + +def normalize_vision_frame(frame_pointer : AomPointer | VpxPointer) -> VisionFrame: + frame_width, frame_height = frame_pointer.get('resolution') + vision_frame = numpy.frombuffer(frame_pointer.get('buffer'), dtype = numpy.uint8).reshape((frame_height * 3 // 2, frame_width)) + return cv2.cvtColor(vision_frame, cv2.COLOR_YUV2BGR_I420) + + +def create_video_decoder(video_codec : VideoCodec) -> Optional[VpxDecoder | AomDecoder]: + if video_codec == 'av1': + return aom_decoder.create(8) + + if video_codec == 'vp8': + return vpx_decoder.create(8) + + return None + + +def create_video_encoder(video_codec : VideoCodec, frame_resolution : Resolution, bitrate : BitRate) -> Optional[VpxEncoder | AomEncoder]: + if video_codec == 'av1': + return aom_encoder.create(frame_resolution, bitrate, 8, 10) + + if video_codec == 'vp8': + return vpx_encoder.create(frame_resolution, bitrate, 8, 10) + + return None + + +def destroy_video_decoder(video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder) -> None: + if video_codec == 'av1': + aom_decoder.destroy(video_decoder) + + if video_codec == 'vp8': + vpx_decoder.destroy(video_decoder) + + +def destroy_video_encoder(video_codec : VideoCodec, video_encoder : VpxEncoder | AomEncoder) -> None: + if video_codec == 'av1': + aom_encoder.destroy(video_encoder) + + if video_codec == 'vp8': + vpx_encoder.destroy(video_encoder) + + +def update_video_encoder_bitrate(video_codec : VideoCodec, video_encoder : VpxEncoder | AomEncoder, bitrate : BitRate) -> bool: + if video_codec == 'av1': + return aom_encoder.update_bitrate(video_encoder, bitrate) + + if video_codec == 'vp8': + return vpx_encoder.update_bitrate(video_encoder, bitrate) + + return False diff --git a/tests/test_api_stream_audio.py b/tests/test_api_stream_audio.py new file mode 100644 index 00000000..e651da9e --- /dev/null +++ b/tests/test_api_stream_audio.py @@ -0,0 +1,116 @@ +import ctypes +import threading +from collections import deque +from unittest.mock import MagicMock, patch + +import numpy +import pytest + +from facefusion import rtc, rtc_store, state_manager +from facefusion.apis.stream_audio import fill_audio_deque, receive_audio_frames, run_audio_encode_loop +from facefusion.download import conditional_download +from facefusion.ffmpeg import read_audio_buffer +from facefusion.hash_helper import create_hash +from facefusion.libraries import datachannel as datachannel_module, opus as opus_module +from facefusion.types import AudioCodec, AudioPack, RtcPeer, RtcPeerAudio +from .assert_helper import get_test_example_file, get_test_examples_directory + + +@pytest.fixture(scope = 'module', autouse = True) +def before_all() -> None: + state_manager.init_item('download_providers', [ 'github', 'huggingface' ]) + state_manager.init_item('processors', []) + + opus_module.pre_check() + datachannel_module.pre_check() + + conditional_download(get_test_examples_directory(), + [ + 'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/source.mp3' + ]) + + +@pytest.fixture(scope = 'function', autouse = True) +def before_each() -> None: + rtc_store.clear() + + +def test_run_audio_encode_loop() -> None: + audio_buffer = read_audio_buffer(get_test_example_file('source.mp3'), 48000, 16, 2) + audio_frame = numpy.frombuffer(audio_buffer, dtype = numpy.int16).astype(numpy.float32) / 32768.0 + peer_connection = rtc.create_peer_connection() + rtc_peer : RtcPeer =\ + { + 'peer_connection': peer_connection, + 'video': + { + 'sender_track': 0, + 'receiver_track': 0, + 'codec': 'vp8' + }, + 'sender_bitrate': ctypes.c_uint(0), + 'receiver_bitrate': ctypes.c_uint(0) + } + + audio_deque : deque[AudioPack] = deque() + audio_event = threading.Event() + + audio_deque.append((audio_frame, 0.100)) + audio_event.set() + + encoder_mock = MagicMock() + encoder_mock.encode.return_value = bytes([ 1 ] * 32) + + with patch('facefusion.apis.stream_audio.opus_encoder.encode', encoder_mock.encode): + with patch('facefusion.apis.stream_audio.rtc.send_audio') as send_audio_mock: + audio_loop_thread = threading.Thread(target = run_audio_encode_loop, args = (rtc_peer, audio_deque, audio_event), daemon = True) + audio_loop_thread.start() + audio_deque.append((numpy.empty(0), 0.0)) + audio_event.set() + audio_loop_thread.join(timeout = 5.0) + + assert encoder_mock.encode.called is True + assert send_audio_mock.called is True + + +def test_fill_audio_deque() -> None: + audio_buffer = read_audio_buffer(get_test_example_file('source.mp3'), 48000, 16, 2) + audio_frame = numpy.frombuffer(audio_buffer, dtype = numpy.int16).astype(numpy.float32) / 32768.0 + audio_decoder_mock = MagicMock() + audio_deque : deque[AudioPack] = deque() + audio_event = threading.Event() + + with patch('facefusion.apis.stream_audio.decode_audio_frame', return_value = audio_frame.tobytes()): + fill_audio_deque('opus', audio_decoder_mock, audio_frame.tobytes(), audio_deque, audio_event) + + buffer_frame, _ = audio_deque.popleft() + + assert audio_event.is_set() + assert create_hash(buffer_frame.tobytes()) == create_hash(audio_frame.tobytes()) + + +@pytest.mark.parametrize('audio_codec', [ 'opus' ]) +def test_receive_audio_frames(audio_codec : AudioCodec) -> None: + audio_buffer = read_audio_buffer(get_test_example_file('source.mp3'), 48000, 16, 2) + audio_frame = numpy.frombuffer(audio_buffer, dtype = numpy.int16).astype(numpy.float32) / 32768.0 + audio_deque : deque[AudioPack] = deque() + audio_event = threading.Event() + + datachannel_library_mock = MagicMock() + datachannel_library_mock.rtcReceiveMessage.side_effect = [ 0, -1 ] + + with patch('facefusion.apis.stream_audio.datachannel_module.create_static_library', return_value = datachannel_library_mock): + with patch('facefusion.apis.stream_audio.decode_audio_frame', return_value = audio_frame.tobytes()): + rtc_peer_audio : RtcPeerAudio =\ + { + 'sender_track': 0, + 'receiver_track': 0, + 'codec': audio_codec + } + audio_receiver_thread = threading.Thread(target = receive_audio_frames, args = (rtc_peer_audio, audio_deque, audio_event), daemon = True) + audio_receiver_thread.start() + audio_receiver_thread.join(timeout = 5.0) + + buffer_frame, _ = audio_deque.popleft() + + assert create_hash(buffer_frame.tobytes()) == create_hash(audio_frame.tobytes()) diff --git a/tests/test_api_stream_helper.py b/tests/test_api_stream_helper.py deleted file mode 100644 index 98169cff..00000000 --- a/tests/test_api_stream_helper.py +++ /dev/null @@ -1,461 +0,0 @@ -import asyncio -import ctypes -import struct -import threading -from collections import deque -from unittest.mock import AsyncMock, MagicMock, patch - -import cv2 -import numpy -import pytest - -from facefusion import rtc, rtc_store, state_manager -from facefusion.apis.stream_helper import create_video_decoder, create_video_encoder, decode_video_frame, destroy_stream, destroy_video_decoder, destroy_video_encoder, encode_video_frame, fill_audio_deque, fill_video_deque, process_image, process_video, receive_audio_frames, receive_video_frames, receive_vision_frames, run_audio_encode_loop, run_peer_loop, run_video_encode_loop, update_video_encoder_bitrate -from facefusion.codecs import aom_encoder, vpx_encoder -from facefusion.common_helper import is_linux, is_macos, is_windows -from facefusion.download import conditional_download -from facefusion.ffmpeg import read_audio_buffer -from facefusion.hash_helper import create_hash -from facefusion.libraries import aom as aom_module, datachannel as datachannel_module, opus as opus_module, vpx as vpx_module -from facefusion.types import AudioCodec, AudioPack, RtcPeer, RtcPeerAudio, RtcPeerVideo, SessionId, VideoCodec, VideoPack -from facefusion.vision import read_video_frame -from .assert_helper import get_test_example_file, get_test_examples_directory - - -@pytest.fixture(scope = 'module', autouse = True) -def before_all() -> None: - state_manager.init_item('download_providers', [ 'github', 'huggingface' ]) - state_manager.init_item('processors', []) - - aom_module.pre_check() - vpx_module.pre_check() - opus_module.pre_check() - datachannel_module.pre_check() - - conditional_download(get_test_examples_directory(), - [ - 'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/target-240p.mp4', - 'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/source.jpg', - 'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/source.mp3' - ]) - - -@pytest.fixture(scope = 'function', autouse = True) -def before_each() -> None: - rtc_store.clear() - - -@pytest.mark.anyio -async def test_process_image() -> None: - image_buffer = open(get_test_example_file('source.jpg'), 'rb').read() - websocket_mock = AsyncMock() - websocket_mock.receive.side_effect =\ - [ - { - 'type': 'websocket.receive', - 'bytes': image_buffer - } - ] - - await process_image(websocket_mock) - - websocket_mock.send_bytes.assert_called_once() - - if is_linux() or is_windows(): - assert create_hash(websocket_mock.send_bytes.call_args[0][0]) == '0142782f' - - -@pytest.mark.parametrize('video_codec, session_id', [ ('av1', 'test-process-video-av1'), ('vp8', 'test-process-video-vp8') ]) -def test_process_video(video_codec : VideoCodec, session_id : str) -> None: - peer_connection = rtc.create_peer_connection() - - if video_codec == 'av1': - rtc.add_video_track(peer_connection, 'sendrecv', video_codec, 35) - - if video_codec == 'vp8': - rtc.add_video_track(peer_connection, 'sendrecv', video_codec, 96) - - rtc.add_audio_track(peer_connection, 'sendrecv', 'opus', 111) - sdp_offer = rtc.create_sdp_offer(peer_connection) - datachannel_module.create_static_library().rtcDeletePeerConnection(peer_connection) - - with patch('facefusion.apis.stream_helper.threading.Thread'): - sdp_answer = process_video(session_id, sdp_offer) - - assert sdp_answer - assert 'm=video' in sdp_answer - assert 'a=recvonly' in sdp_answer - assert 'a=sendonly' in sdp_answer - - for peer in rtc_store.get_peers(session_id): - sender_bitrate = peer.get('sender_bitrate') - receiver_bitrate = peer.get('receiver_bitrate') - - assert sender_bitrate.value == 0 - assert receiver_bitrate.value == 0 - - rtc.handle_remb(0, 8000000, ctypes.addressof(sender_bitrate)) - assert sender_bitrate.value == 8000 - - rtc.handle_remb(0, 4000000, ctypes.addressof(receiver_bitrate)) - assert receiver_bitrate.value == 4000 - - -@pytest.mark.anyio -async def test_receive_vision_frames() -> None: - image_buffer = open(get_test_example_file('source.jpg'), 'rb').read() - websocket_mock = AsyncMock() - websocket_mock.receive.side_effect =\ - [ - { - 'type': 'websocket.receive', - 'bytes': image_buffer - }, - { - 'type': 'websocket.receive', - 'bytes': 'invalid'.encode() - }, - { - 'type': 'websocket.disconnect' - } - ] - - vision_frames = receive_vision_frames(websocket_mock) - - assert create_hash((await anext(vision_frames)).tobytes()) == '5ed32ca0' - - -@pytest.mark.parametrize('video_codec, payload_type, session_id', [ ('av1', 35, 'test-run-peer-loop-av1'), ('vp8', 96, 'test-run-peer-loop-vp8') ]) -def test_run_peer_loop(video_codec : VideoCodec, payload_type : int, session_id : SessionId) -> None: - peer_connection = rtc.create_peer_connection() - video_sender_track = rtc.add_video_track(peer_connection, 'sendonly', video_codec, payload_type) - video_receiver_track = rtc.add_video_track(peer_connection, 'recvonly', video_codec, payload_type) - rtc_peer : RtcPeer =\ - { - 'peer_connection': peer_connection, - 'video': - { - 'sender_track': video_sender_track, - 'receiver_track': video_receiver_track, - 'codec': video_codec - }, - 'sender_bitrate': ctypes.c_uint(0), - 'receiver_bitrate': ctypes.c_uint(0) - } - - rtc_store.init_peers(session_id) - rtc_store.get_peers(session_id).append(rtc_peer) - - assert rtc_store.has_peers(session_id) is True - - with patch('facefusion.apis.stream_helper.receive_video_frames'): - with patch('facefusion.apis.stream_helper.run_video_encode_loop'): - thread = threading.Thread(target = asyncio.run, args = (run_peer_loop(session_id, rtc_peer),), daemon = True) - thread.start() - thread.join(timeout = 5.0) - - assert rtc_store.has_peers(session_id) is False - - -@pytest.mark.parametrize('video_codec, payload_type', [ ('av1', 35), ('vp8', 96) ]) -def test_run_video_encode_loop(video_codec : VideoCodec, payload_type : int) -> None: - video_frame = read_video_frame(get_test_example_file('target-240p.mp4')) - peer_connection = rtc.create_peer_connection() - video_sender_track = rtc.add_video_track(peer_connection, 'sendonly', video_codec, payload_type) - video_receiver_track = rtc.add_video_track(peer_connection, 'recvonly', video_codec, payload_type) - rtc_peer : RtcPeer =\ - { - 'peer_connection': peer_connection, - 'video': - { - 'sender_track': video_sender_track, - 'receiver_track': video_receiver_track, - 'codec': video_codec - }, - 'sender_bitrate': ctypes.c_uint(0), - 'receiver_bitrate': ctypes.c_uint(0) - } - - video_deque : deque[VideoPack] = deque() - video_event = threading.Event() - - video_deque.append((video_frame, 0.1)) - video_event.set() - - with patch('facefusion.apis.stream_helper.rtc.send_video') as send_video_mock: - encode_loop_thread = threading.Thread(target = run_video_encode_loop, args = (rtc_peer, video_deque, video_event), daemon = True) - encode_loop_thread.start() - empty_vision_frame = numpy.empty(0) - video_deque.append((empty_vision_frame, 0.0)) - video_event.set() - encode_loop_thread.join(timeout = 5.0) - - assert send_video_mock.called - - if video_codec == 'av1': - if is_linux() or is_windows(): - assert create_hash(send_video_mock.call_args[0][1]) == '9ba7212b' - - if is_macos(): - pytest.skip() - - if video_codec == 'vp8': - pytest.skip() - - -def test_run_audio_encode_loop() -> None: - audio_buffer = read_audio_buffer(get_test_example_file('source.mp3'), 48000, 16, 2) - audio_frame = numpy.frombuffer(audio_buffer, dtype = numpy.int16).astype(numpy.float32) / 32768.0 - peer_connection = rtc.create_peer_connection() - rtc_peer : RtcPeer =\ - { - 'peer_connection': peer_connection, - 'video': - { - 'sender_track': 0, - 'receiver_track': 0, - 'codec': 'vp8' - }, - 'sender_bitrate': ctypes.c_uint(0), - 'receiver_bitrate': ctypes.c_uint(0) - } - - audio_deque : deque[AudioPack] = deque() - audio_event = threading.Event() - - audio_deque.append((audio_frame, 0.100)) - audio_event.set() - - encoder_mock = MagicMock() - encoder_mock.encode.return_value = bytes([ 1 ] * 32) - - with patch('facefusion.apis.stream_helper.opus_encoder.encode', encoder_mock.encode): - with patch('facefusion.apis.stream_helper.rtc.send_audio') as send_audio_mock: - audio_loop_thread = threading.Thread(target = run_audio_encode_loop, args = (rtc_peer, audio_deque, audio_event), daemon = True) - audio_loop_thread.start() - audio_deque.append((numpy.empty(0), 0.0)) - audio_event.set() - audio_loop_thread.join(timeout = 5.0) - - assert encoder_mock.encode.called is True - assert send_audio_mock.called is True - - -@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ]) -def test_fill_video_deque(video_codec : VideoCodec) -> None: - video_frame = read_video_frame(get_test_example_file('target-240p.mp4')) - input_buffer = cv2.cvtColor(video_frame, cv2.COLOR_BGR2YUV_I420).tobytes() - video_encoder = create_video_encoder(video_codec, (426, 226), 1000) - video_decoder = create_video_decoder(video_codec) - encode_buffer = encode_video_frame(video_codec, video_encoder, input_buffer, (426, 226), 0) - video_deque : deque[VideoPack] = deque() - video_event = threading.Event() - - fill_video_deque(video_codec, video_decoder, encode_buffer, video_deque, video_event) - - vision_frame, _ = video_deque.popleft() - - assert video_event.is_set() - - if is_linux() or is_windows(): - if video_codec == 'av1': - assert create_hash(vision_frame.tobytes()) == 'c97d6d29' - - if video_codec == 'vp8': - assert create_hash(vision_frame.tobytes()) == '99ef2c25' - - if is_macos(): - if video_codec == 'av1': - assert create_hash(vision_frame.tobytes()) == 'eafd1fab' - - if video_codec == 'vp8': - assert create_hash(vision_frame.tobytes()) == 'ff3ecb43' - - -@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ]) -def test_receive_video_frames(video_codec : VideoCodec) -> None: - video_frame = read_video_frame(get_test_example_file('target-240p.mp4')) - video_deque : deque[VideoPack] = deque() - video_event = threading.Event() - - datachannel_library_mock = MagicMock() - datachannel_library_mock.rtcReceiveMessage.side_effect = [ 0, -1 ] - - with patch('facefusion.apis.stream_helper.datachannel_module.create_static_library', return_value = datachannel_library_mock): - with patch('facefusion.apis.stream_helper.decode_video_frame', return_value = video_frame): - rtc_peer_video : RtcPeerVideo =\ - { - 'sender_track': 0, - 'receiver_track': 0, - 'codec': video_codec - } - video_receiver_thread = threading.Thread(target = receive_video_frames, args = (rtc_peer_video, video_deque, video_event), daemon = True) - video_receiver_thread.start() - video_receiver_thread.join(timeout = 5.0) - - vision_frame, _ = video_deque.popleft() - - if is_linux() or is_windows(): - assert create_hash(vision_frame.tobytes()) == 'a17439db' - - if is_macos(): - assert create_hash(vision_frame.tobytes()) == '38d00e2a' - - -def test_fill_audio_deque() -> None: - audio_buffer = read_audio_buffer(get_test_example_file('source.mp3'), 48000, 16, 2) - audio_frame = numpy.frombuffer(audio_buffer, dtype = numpy.int16).astype(numpy.float32) / 32768.0 - audio_decoder_mock = MagicMock() - audio_deque : deque[AudioPack] = deque() - audio_event = threading.Event() - - with patch('facefusion.apis.stream_helper.decode_audio_frame', return_value = audio_frame.tobytes()): - fill_audio_deque('opus', audio_decoder_mock, audio_frame.tobytes(), audio_deque, audio_event) - - buffer_frame, _ = audio_deque.popleft() - - assert audio_event.is_set() - assert create_hash(buffer_frame.tobytes()) == create_hash(audio_frame.tobytes()) - - -@pytest.mark.parametrize('audio_codec', [ 'opus' ]) -def test_receive_audio_frames(audio_codec : AudioCodec) -> None: - audio_buffer = read_audio_buffer(get_test_example_file('source.mp3'), 48000, 16, 2) - audio_frame = numpy.frombuffer(audio_buffer, dtype = numpy.int16).astype(numpy.float32) / 32768.0 - audio_deque : deque[AudioPack] = deque() - audio_event = threading.Event() - - datachannel_library_mock = MagicMock() - datachannel_library_mock.rtcReceiveMessage.side_effect = [ 0, -1 ] - - with patch('facefusion.apis.stream_helper.datachannel_module.create_static_library', return_value = datachannel_library_mock): - with patch('facefusion.apis.stream_helper.decode_audio_frame', return_value = audio_frame.tobytes()): - rtc_peer_audio : RtcPeerAudio =\ - { - 'sender_track': 0, - 'receiver_track': 0, - 'codec': audio_codec - } - audio_receiver_thread = threading.Thread(target = receive_audio_frames, args = (rtc_peer_audio, audio_deque, audio_event), daemon = True) - audio_receiver_thread.start() - audio_receiver_thread.join(timeout = 5.0) - - buffer_frame, _ = audio_deque.popleft() - - assert create_hash(buffer_frame.tobytes()) == create_hash(audio_frame.tobytes()) - - -@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ]) -def test_encode_and_decode_video_frame(video_codec : VideoCodec) -> None: - video_frame = read_video_frame(get_test_example_file('target-240p.mp4')) - input_buffer = cv2.cvtColor(video_frame, cv2.COLOR_BGR2YUV_I420).tobytes() - video_encoder = create_video_encoder(video_codec, (426, 226), 1000) - video_decoder = create_video_decoder(video_codec) - encode_buffer = encode_video_frame(video_codec, video_encoder, input_buffer, (426, 226), 0) - decode_buffer = decode_video_frame(video_codec, video_decoder, encode_buffer).tobytes() - - if is_linux() or is_windows(): - if video_codec == 'av1': - assert create_hash(decode_buffer) == 'c97d6d29' - - if video_codec == 'vp8': - assert create_hash(decode_buffer) == '99ef2c25' - - if is_macos(): - if video_codec == 'av1': - assert create_hash(decode_buffer) == 'eafd1fab' - - if video_codec == 'vp8': - assert create_hash(decode_buffer) == 'ff3ecb43' - - assert decode_video_frame(video_codec, video_decoder, bytes()) is None - - -@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ]) -def test_create_and_destroy_video_decoder(video_codec : VideoCodec) -> None: - video_frame = read_video_frame(get_test_example_file('target-240p.mp4')) - input_buffer = cv2.cvtColor(video_frame, cv2.COLOR_BGR2YUV_I420).tobytes() - - if video_codec == 'av1': - video_encoder = aom_encoder.create((426, 226), 1000, 1, 0) - encode_buffer = aom_encoder.encode(video_encoder, input_buffer, (426, 226), 0) - - if video_codec == 'vp8': - video_encoder = vpx_encoder.create((426, 226), 1000, 1, 0) - encode_buffer = vpx_encoder.encode(video_encoder, input_buffer, (426, 226), 0) - - video_decoder = create_video_decoder(video_codec) - - assert numpy.any(decode_video_frame(video_codec, video_decoder, encode_buffer)) - - destroy_video_decoder(video_codec, video_decoder) - - assert decode_video_frame(video_codec, video_decoder, encode_buffer) is None - - -@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ]) -def test_create_and_destroy_video_encoder(video_codec : VideoCodec) -> None: - video_frame = read_video_frame(get_test_example_file('target-240p.mp4')) - input_buffer = cv2.cvtColor(video_frame, cv2.COLOR_BGR2YUV_I420).tobytes() - video_encoder = create_video_encoder(video_codec, (426, 226), 4000) - - if video_codec == 'av1': - assert aom_encoder.encode(video_encoder, input_buffer, (426, 226), 0) - - if video_codec == 'vp8': - assert vpx_encoder.encode(video_encoder, input_buffer, (426, 226), 0) - - destroy_video_encoder(video_codec, video_encoder) - - if video_codec == 'av1': - assert aom_encoder.encode(video_encoder, input_buffer, (426, 226), 1) == bytes() - - if video_codec == 'vp8': - assert vpx_encoder.encode(video_encoder, input_buffer, (426, 226), 1) == bytes() - - -@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ]) -def test_update_video_encoder_bitrate(video_codec : VideoCodec) -> None: - video_encoder = create_video_encoder(video_codec, (426, 226), 4000) - - if video_codec == 'av1': - assert struct.unpack_from('I', video_encoder, 128 + 136)[0] == 4000 - - if video_codec == 'vp8': - assert struct.unpack_from('I', video_encoder, 64 + 112)[0] == 4000 - - assert update_video_encoder_bitrate(video_codec, video_encoder, 6000) - - if video_codec == 'av1': - assert struct.unpack_from('I', video_encoder, 128 + 136)[0] == 6000 - - if video_codec == 'vp8': - assert struct.unpack_from('I', video_encoder, 64 + 112)[0] == 6000 - - destroy_video_encoder(video_codec, video_encoder) - - -def test_destroy_stream() -> None: - peer_connection = rtc.create_peer_connection() - rtc.add_video_track(peer_connection, 'sendonly', 'vp8', 96) - rtc_peer : RtcPeer =\ - { - 'peer_connection': peer_connection, - 'video': - { - 'sender_track': 0, - 'receiver_track': 0, - 'codec': 'vp8' - }, - 'sender_bitrate': ctypes.c_uint(0), - 'receiver_bitrate': ctypes.c_uint(0) - } - session_id = 'test-destroy-stream' - - rtc_store.init_peers(session_id) - rtc_store.get_peers(session_id).append(rtc_peer) - - assert destroy_stream(session_id) is True - assert rtc_store.get_peers(session_id) is None - - assert destroy_stream(session_id) is False diff --git a/tests/test_api_stream_manager.py b/tests/test_api_stream_manager.py new file mode 100644 index 00000000..95d7dbee --- /dev/null +++ b/tests/test_api_stream_manager.py @@ -0,0 +1,171 @@ +import asyncio +import ctypes +import threading +from unittest.mock import AsyncMock, patch + +import pytest + +from facefusion import rtc, rtc_store, state_manager +from facefusion.apis.stream_manager import destroy_stream, process_image, process_video, receive_vision_frames, run_peer_loop +from facefusion.common_helper import is_linux, is_windows +from facefusion.download import conditional_download +from facefusion.hash_helper import create_hash +from facefusion.libraries import datachannel as datachannel_module +from facefusion.types import RtcPeer, SessionId, VideoCodec +from .assert_helper import get_test_example_file, get_test_examples_directory + + +@pytest.fixture(scope = 'module', autouse = True) +def before_all() -> None: + state_manager.init_item('download_providers', [ 'github', 'huggingface' ]) + state_manager.init_item('processors', []) + + datachannel_module.pre_check() + + conditional_download(get_test_examples_directory(), + [ + 'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/source.jpg' + ]) + + +@pytest.fixture(scope = 'function', autouse = True) +def before_each() -> None: + rtc_store.clear() + + +@pytest.mark.anyio +async def test_process_image() -> None: + image_buffer = open(get_test_example_file('source.jpg'), 'rb').read() + websocket_mock = AsyncMock() + websocket_mock.receive.side_effect =\ + [ + { + 'type': 'websocket.receive', + 'bytes': image_buffer + } + ] + + await process_image(websocket_mock) + + websocket_mock.send_bytes.assert_called_once() + + if is_linux() or is_windows(): + assert create_hash(websocket_mock.send_bytes.call_args[0][0]) == '0142782f' + + +@pytest.mark.parametrize('video_codec, session_id', [ ('av1', 'test-process-video-av1'), ('vp8', 'test-process-video-vp8') ]) +def test_process_video(video_codec : VideoCodec, session_id : str) -> None: + peer_connection = rtc.create_peer_connection() + + if video_codec == 'av1': + rtc.add_video_track(peer_connection, 'sendrecv', video_codec, 35) + + if video_codec == 'vp8': + rtc.add_video_track(peer_connection, 'sendrecv', video_codec, 96) + + rtc.add_audio_track(peer_connection, 'sendrecv', 'opus', 111) + sdp_offer = rtc.create_sdp_offer(peer_connection) + datachannel_module.create_static_library().rtcDeletePeerConnection(peer_connection) + + with patch('facefusion.apis.stream_manager.threading.Thread'): + sdp_answer = process_video(session_id, sdp_offer) + + assert sdp_answer + assert 'm=video' in sdp_answer + assert 'a=recvonly' in sdp_answer + assert 'a=sendonly' in sdp_answer + + for peer in rtc_store.get_peers(session_id): + sender_bitrate = peer.get('sender_bitrate') + receiver_bitrate = peer.get('receiver_bitrate') + + assert sender_bitrate.value == 0 + assert receiver_bitrate.value == 0 + + rtc.handle_remb(0, 8000000, ctypes.addressof(sender_bitrate)) + assert sender_bitrate.value == 8000 + + rtc.handle_remb(0, 4000000, ctypes.addressof(receiver_bitrate)) + assert receiver_bitrate.value == 4000 + + +@pytest.mark.anyio +async def test_receive_vision_frames() -> None: + image_buffer = open(get_test_example_file('source.jpg'), 'rb').read() + websocket_mock = AsyncMock() + websocket_mock.receive.side_effect =\ + [ + { + 'type': 'websocket.receive', + 'bytes': image_buffer + }, + { + 'type': 'websocket.receive', + 'bytes': 'invalid'.encode() + }, + { + 'type': 'websocket.disconnect' + } + ] + + vision_frames = receive_vision_frames(websocket_mock) + + assert create_hash((await anext(vision_frames)).tobytes()) == '5ed32ca0' + + +@pytest.mark.parametrize('video_codec, payload_type, session_id', [ ('av1', 35, 'test-run-peer-loop-av1'), ('vp8', 96, 'test-run-peer-loop-vp8') ]) +def test_run_peer_loop(video_codec : VideoCodec, payload_type : int, session_id : SessionId) -> None: + peer_connection = rtc.create_peer_connection() + video_sender_track = rtc.add_video_track(peer_connection, 'sendonly', video_codec, payload_type) + video_receiver_track = rtc.add_video_track(peer_connection, 'recvonly', video_codec, payload_type) + rtc_peer : RtcPeer =\ + { + 'peer_connection': peer_connection, + 'video': + { + 'sender_track': video_sender_track, + 'receiver_track': video_receiver_track, + 'codec': video_codec + }, + 'sender_bitrate': ctypes.c_uint(0), + 'receiver_bitrate': ctypes.c_uint(0) + } + + rtc_store.init_peers(session_id) + rtc_store.get_peers(session_id).append(rtc_peer) + + assert rtc_store.has_peers(session_id) is True + + with patch('facefusion.apis.stream_manager.receive_video_frames'): + with patch('facefusion.apis.stream_manager.run_video_encode_loop'): + thread = threading.Thread(target = asyncio.run, args = (run_peer_loop(session_id, rtc_peer),), daemon = True) + thread.start() + thread.join(timeout = 5.0) + + assert rtc_store.has_peers(session_id) is False + + +def test_destroy_stream() -> None: + peer_connection = rtc.create_peer_connection() + rtc.add_video_track(peer_connection, 'sendonly', 'vp8', 96) + rtc_peer : RtcPeer =\ + { + 'peer_connection': peer_connection, + 'video': + { + 'sender_track': 0, + 'receiver_track': 0, + 'codec': 'vp8' + }, + 'sender_bitrate': ctypes.c_uint(0), + 'receiver_bitrate': ctypes.c_uint(0) + } + session_id = 'test-destroy-stream' + + rtc_store.init_peers(session_id) + rtc_store.get_peers(session_id).append(rtc_peer) + + assert destroy_stream(session_id) is True + assert rtc_store.get_peers(session_id) is None + + assert destroy_stream(session_id) is False diff --git a/tests/test_api_stream_video.py b/tests/test_api_stream_video.py new file mode 100644 index 00000000..51edc01b --- /dev/null +++ b/tests/test_api_stream_video.py @@ -0,0 +1,237 @@ +import ctypes +import struct +import threading +from collections import deque +from unittest.mock import MagicMock, patch + +import cv2 +import numpy +import pytest + +from facefusion import rtc, rtc_store, state_manager +from facefusion.apis.stream_video import create_video_decoder, create_video_encoder, decode_video_frame, destroy_video_decoder, destroy_video_encoder, encode_video_frame, fill_video_deque, receive_video_frames, run_video_encode_loop, update_video_encoder_bitrate +from facefusion.codecs import aom_encoder, vpx_encoder +from facefusion.common_helper import is_linux, is_macos, is_windows +from facefusion.download import conditional_download +from facefusion.hash_helper import create_hash +from facefusion.libraries import aom as aom_module, datachannel as datachannel_module, vpx as vpx_module +from facefusion.types import RtcPeer, RtcPeerVideo, VideoCodec, VideoPack +from facefusion.vision import read_video_frame +from .assert_helper import get_test_example_file, get_test_examples_directory + + +@pytest.fixture(scope = 'module', autouse = True) +def before_all() -> None: + state_manager.init_item('download_providers', [ 'github', 'huggingface' ]) + state_manager.init_item('processors', []) + + aom_module.pre_check() + vpx_module.pre_check() + datachannel_module.pre_check() + + conditional_download(get_test_examples_directory(), + [ + 'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/target-240p.mp4' + ]) + + +@pytest.fixture(scope = 'function', autouse = True) +def before_each() -> None: + rtc_store.clear() + + +@pytest.mark.parametrize('video_codec, payload_type', [ ('av1', 35), ('vp8', 96) ]) +def test_run_video_encode_loop(video_codec : VideoCodec, payload_type : int) -> None: + video_frame = read_video_frame(get_test_example_file('target-240p.mp4')) + peer_connection = rtc.create_peer_connection() + video_sender_track = rtc.add_video_track(peer_connection, 'sendonly', video_codec, payload_type) + video_receiver_track = rtc.add_video_track(peer_connection, 'recvonly', video_codec, payload_type) + rtc_peer : RtcPeer =\ + { + 'peer_connection': peer_connection, + 'video': + { + 'sender_track': video_sender_track, + 'receiver_track': video_receiver_track, + 'codec': video_codec + }, + 'sender_bitrate': ctypes.c_uint(0), + 'receiver_bitrate': ctypes.c_uint(0) + } + + video_deque : deque[VideoPack] = deque() + video_event = threading.Event() + + video_deque.append((video_frame, 0.1)) + video_event.set() + + with patch('facefusion.apis.stream_video.rtc.send_video') as send_video_mock: + encode_loop_thread = threading.Thread(target = run_video_encode_loop, args = (rtc_peer, video_deque, video_event), daemon = True) + encode_loop_thread.start() + empty_vision_frame = numpy.empty(0) + video_deque.append((empty_vision_frame, 0.0)) + video_event.set() + encode_loop_thread.join(timeout = 5.0) + + assert send_video_mock.called + + if video_codec == 'av1': + if is_linux() or is_windows(): + assert create_hash(send_video_mock.call_args[0][1]) == '9ba7212b' + + if is_macos(): + pytest.skip() + + if video_codec == 'vp8': + pytest.skip() + + +@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ]) +def test_fill_video_deque(video_codec : VideoCodec) -> None: + video_frame = read_video_frame(get_test_example_file('target-240p.mp4')) + input_buffer = cv2.cvtColor(video_frame, cv2.COLOR_BGR2YUV_I420).tobytes() + video_encoder = create_video_encoder(video_codec, (426, 226), 1000) + video_decoder = create_video_decoder(video_codec) + encode_buffer = encode_video_frame(video_codec, video_encoder, input_buffer, (426, 226), 0) + video_deque : deque[VideoPack] = deque() + video_event = threading.Event() + + fill_video_deque(video_codec, video_decoder, encode_buffer, video_deque, video_event) + + vision_frame, _ = video_deque.popleft() + + assert video_event.is_set() + + if is_linux() or is_windows(): + if video_codec == 'av1': + assert create_hash(vision_frame.tobytes()) == 'c97d6d29' + + if video_codec == 'vp8': + assert create_hash(vision_frame.tobytes()) == '99ef2c25' + + if is_macos(): + if video_codec == 'av1': + assert create_hash(vision_frame.tobytes()) == 'eafd1fab' + + if video_codec == 'vp8': + assert create_hash(vision_frame.tobytes()) == 'ff3ecb43' + + +@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ]) +def test_receive_video_frames(video_codec : VideoCodec) -> None: + video_frame = read_video_frame(get_test_example_file('target-240p.mp4')) + video_deque : deque[VideoPack] = deque() + video_event = threading.Event() + + datachannel_library_mock = MagicMock() + datachannel_library_mock.rtcReceiveMessage.side_effect = [ 0, -1 ] + + with patch('facefusion.apis.stream_video.datachannel_module.create_static_library', return_value = datachannel_library_mock): + with patch('facefusion.apis.stream_video.decode_video_frame', return_value = video_frame): + rtc_peer_video : RtcPeerVideo =\ + { + 'sender_track': 0, + 'receiver_track': 0, + 'codec': video_codec + } + video_receiver_thread = threading.Thread(target = receive_video_frames, args = (rtc_peer_video, video_deque, video_event), daemon = True) + video_receiver_thread.start() + video_receiver_thread.join(timeout = 5.0) + + vision_frame, _ = video_deque.popleft() + + if is_linux() or is_windows(): + assert create_hash(vision_frame.tobytes()) == 'a17439db' + + if is_macos(): + assert create_hash(vision_frame.tobytes()) == '38d00e2a' + + +@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ]) +def test_encode_and_decode_video_frame(video_codec : VideoCodec) -> None: + video_frame = read_video_frame(get_test_example_file('target-240p.mp4')) + input_buffer = cv2.cvtColor(video_frame, cv2.COLOR_BGR2YUV_I420).tobytes() + video_encoder = create_video_encoder(video_codec, (426, 226), 1000) + video_decoder = create_video_decoder(video_codec) + encode_buffer = encode_video_frame(video_codec, video_encoder, input_buffer, (426, 226), 0) + decode_buffer = decode_video_frame(video_codec, video_decoder, encode_buffer).tobytes() + + if is_linux() or is_windows(): + if video_codec == 'av1': + assert create_hash(decode_buffer) == 'c97d6d29' + + if video_codec == 'vp8': + assert create_hash(decode_buffer) == '99ef2c25' + + if is_macos(): + if video_codec == 'av1': + assert create_hash(decode_buffer) == 'eafd1fab' + + if video_codec == 'vp8': + assert create_hash(decode_buffer) == 'ff3ecb43' + + assert decode_video_frame(video_codec, video_decoder, bytes()) is None + + +@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ]) +def test_create_and_destroy_video_decoder(video_codec : VideoCodec) -> None: + video_frame = read_video_frame(get_test_example_file('target-240p.mp4')) + input_buffer = cv2.cvtColor(video_frame, cv2.COLOR_BGR2YUV_I420).tobytes() + + if video_codec == 'av1': + video_encoder = aom_encoder.create((426, 226), 1000, 1, 0) + encode_buffer = aom_encoder.encode(video_encoder, input_buffer, (426, 226), 0) + + if video_codec == 'vp8': + video_encoder = vpx_encoder.create((426, 226), 1000, 1, 0) + encode_buffer = vpx_encoder.encode(video_encoder, input_buffer, (426, 226), 0) + + video_decoder = create_video_decoder(video_codec) + + assert numpy.any(decode_video_frame(video_codec, video_decoder, encode_buffer)) + + destroy_video_decoder(video_codec, video_decoder) + + assert decode_video_frame(video_codec, video_decoder, encode_buffer) is None + + +@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ]) +def test_create_and_destroy_video_encoder(video_codec : VideoCodec) -> None: + video_frame = read_video_frame(get_test_example_file('target-240p.mp4')) + input_buffer = cv2.cvtColor(video_frame, cv2.COLOR_BGR2YUV_I420).tobytes() + video_encoder = create_video_encoder(video_codec, (426, 226), 4000) + + if video_codec == 'av1': + assert aom_encoder.encode(video_encoder, input_buffer, (426, 226), 0) + + if video_codec == 'vp8': + assert vpx_encoder.encode(video_encoder, input_buffer, (426, 226), 0) + + destroy_video_encoder(video_codec, video_encoder) + + if video_codec == 'av1': + assert aom_encoder.encode(video_encoder, input_buffer, (426, 226), 1) == bytes() + + if video_codec == 'vp8': + assert vpx_encoder.encode(video_encoder, input_buffer, (426, 226), 1) == bytes() + + +@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ]) +def test_update_video_encoder_bitrate(video_codec : VideoCodec) -> None: + video_encoder = create_video_encoder(video_codec, (426, 226), 4000) + + if video_codec == 'av1': + assert struct.unpack_from('I', video_encoder, 128 + 136)[0] == 4000 + + if video_codec == 'vp8': + assert struct.unpack_from('I', video_encoder, 64 + 112)[0] == 4000 + + assert update_video_encoder_bitrate(video_codec, video_encoder, 6000) + + if video_codec == 'av1': + assert struct.unpack_from('I', video_encoder, 128 + 136)[0] == 6000 + + if video_codec == 'vp8': + assert struct.unpack_from('I', video_encoder, 64 + 112)[0] == 6000 + + destroy_video_encoder(video_codec, video_encoder)