Feat/finalize stream (#1144)

* break stream helper into pieces

* remove todos
This commit is contained in:
Henry Ruhs
2026-06-02 16:05:21 +02:00
committed by GitHub
parent 314ee61826
commit 7181b41f2d
10 changed files with 930 additions and 839 deletions
+1 -1
View File
@@ -6,7 +6,7 @@ from starlette.websockets import WebSocket, WebSocketState
from facefusion import session_context, session_manager
from facefusion.apis.api_helper import get_sec_websocket_protocol
from facefusion.apis.session_helper import extract_access_token
from facefusion.apis.stream_helper import destroy_stream, process_image, process_video
from facefusion.apis.stream_manager import destroy_stream, process_image, process_video
async def websocket_stream(websocket : WebSocket) -> None:
+88
View File
@@ -0,0 +1,88 @@
import ctypes
import threading
import time
from collections import deque
from typing import Optional
import numpy
from facefusion import rtc
from facefusion.apis.stream_event import create_event
from facefusion.codecs import opus_decoder, opus_encoder
from facefusion.libraries import datachannel as datachannel_module
from facefusion.types import AudioCodec, AudioPack, OpusDecoder, RtcPeer, RtcPeerAudio
def run_audio_encode_loop(rtc_peer : RtcPeer, audio_deque : deque[AudioPack], audio_event : threading.Event) -> None:
audio_event.wait()
audio_event.clear()
temp_audio_frame, temp_audio_time = audio_deque.popleft()
audio_encoder = opus_encoder.create(48000, 2)
while numpy.any(temp_audio_frame):
output_audio_buffer = opus_encoder.encode(audio_encoder, temp_audio_frame.tobytes(), 960)
if output_audio_buffer:
rtc.send_audio(rtc_peer, output_audio_buffer, int(temp_audio_time * 48000))
if len(audio_deque) == 0:
audio_event.wait()
audio_event.clear()
temp_audio_frame, temp_audio_time = audio_deque.popleft()
opus_encoder.destroy(audio_encoder)
def fill_audio_deque(audio_codec : AudioCodec, audio_decoder : OpusDecoder, audio_buffer : bytes, audio_deque : deque[AudioPack], audio_event : threading.Event) -> None:
audio_frame = decode_audio_frame(audio_codec, audio_decoder, audio_buffer)
if audio_frame:
audio_deque.append((numpy.frombuffer(audio_frame, dtype = numpy.float32), time.monotonic()))
audio_event.set()
def receive_audio_frames(rtc_peer_audio : RtcPeerAudio, audio_deque : deque[AudioPack], audio_event : threading.Event) -> None:
audio_track = rtc_peer_audio.get('receiver_track')
audio_codec = rtc_peer_audio.get('codec')
datachannel_library = datachannel_module.create_static_library()
audio_decoder = create_audio_decoder(audio_codec)
receive_buffer = ctypes.create_string_buffer(8 * 1024)
available_event = create_event(audio_track, datachannel_library)
receive_status_code = -3
while receive_status_code == 0 or receive_status_code == -3:
buffer_size = ctypes.c_int(8 * 1024)
receive_status_code = datachannel_library.rtcReceiveMessage(audio_track, receive_buffer, ctypes.byref(buffer_size))
if receive_status_code == 0 and buffer_size.value > 0:
audio_buffer = receive_buffer.raw[:buffer_size.value]
fill_audio_deque(audio_codec, audio_decoder, audio_buffer, audio_deque, audio_event)
if receive_status_code == -3:
available_event.wait()
available_event.clear()
empty_audio_frame = numpy.empty(0)
audio_deque.append((empty_audio_frame, 0.0))
audio_event.set()
destroy_audio_decoder(audio_codec, audio_decoder)
def decode_audio_frame(audio_codec : AudioCodec, audio_decoder : OpusDecoder, input_buffer : bytes) -> Optional[bytes]:
if audio_codec == 'opus':
return opus_decoder.decode(audio_decoder, input_buffer, 960, 2)
return None
def create_audio_decoder(audio_codec : AudioCodec) -> Optional[OpusDecoder]:
if audio_codec == 'opus':
return opus_decoder.create(48000, 2)
return None
def destroy_audio_decoder(audio_codec : AudioCodec, audio_decoder : OpusDecoder) -> None:
if audio_codec == 'opus':
opus_decoder.destroy(audio_decoder)
+15
View File
@@ -0,0 +1,15 @@
import ctypes
import threading
from functools import partial
def create_event(track : int, datachannel_library : ctypes.CDLL) -> threading.Event:
available_event = threading.Event()
available_callback = ctypes.CFUNCTYPE(None, ctypes.c_int, ctypes.c_void_p)(partial(dispatch_event, available_event))
datachannel_library.rtcSetAvailableCallback(track, available_callback)
available_event.callback = available_callback # type: ignore[attr-defined]
return available_event
def dispatch_event(event : threading.Event, track : int, pointer : ctypes.c_void_p) -> None:
event.set()
-377
View File
@@ -1,377 +0,0 @@
import asyncio
import ctypes
import threading
import time
from collections import deque
from collections.abc import AsyncIterator
from functools import partial
from typing import Optional
import cv2
import numpy
from starlette.websockets import WebSocket
from facefusion import rtc, rtc_store, streamer
from facefusion.audio import create_empty_audio_frame
from facefusion.codecs import aom_decoder, aom_encoder, opus_decoder, opus_encoder, vpx_decoder, vpx_encoder
from facefusion.libraries import datachannel as datachannel_module
from facefusion.types import AomDecoder, AomEncoder, AomPointer, AudioCodec, AudioPack, BitRate, OpusDecoder, PeerConnection, Resolution, RtcPeer, RtcPeerAudio, RtcPeerVideo, SdpAnswer, SdpOffer, SessionId, VideoCodec, VideoPack, VisionFrame, VpxDecoder, VpxEncoder, VpxPointer
async def process_image(websocket : WebSocket) -> None:
capture_vision_frame = await anext(receive_vision_frames(websocket), None)
if numpy.any(capture_vision_frame):
output_vision_frame = streamer.process_frame(create_empty_audio_frame(), capture_vision_frame)
is_success, output_frame_buffer = cv2.imencode('.jpg', output_vision_frame)
if is_success:
await websocket.send_bytes(output_frame_buffer.tobytes())
#TODO: needs review
def process_video(session_id : SessionId, sdp_offer : SdpOffer) -> Optional[SdpAnswer]:
video_codec : VideoCodec = 'vp8'
if rtc.get_payload_type(sdp_offer, 'av1'):
video_codec = 'av1'
video_payload_type = rtc.get_payload_type(sdp_offer, video_codec)
if video_payload_type:
peer_connection : PeerConnection = rtc.create_peer_connection()
video_receiver_track = rtc.add_video_track(peer_connection, 'recvonly', video_codec, video_payload_type)
video_sender_track = rtc.add_video_track(peer_connection, 'sendonly', video_codec, video_payload_type)
sender_bitrate = ctypes.c_uint(0)
rtc.wire_remb(video_sender_track, sender_bitrate)
receiver_bitrate = ctypes.c_uint(0)
rtc.wire_remb(video_receiver_track, receiver_bitrate)
audio_codec : AudioCodec = 'opus'
audio_payload_type = rtc.get_payload_type(sdp_offer, audio_codec)
if audio_payload_type:
audio_receiver_track = rtc.add_audio_track(peer_connection, 'recvonly', audio_codec, audio_payload_type)
audio_sender_track = rtc.add_audio_track(peer_connection, 'sendonly', audio_codec, audio_payload_type)
rtc.set_remote_description(peer_connection, sdp_offer)
local_sdp = rtc.create_sdp_answer(peer_connection)
if local_sdp:
rtc_peer : RtcPeer =\
{
'peer_connection': peer_connection,
'video':
{
'sender_track': video_sender_track,
'receiver_track': video_receiver_track,
'codec': video_codec
},
'sender_bitrate': sender_bitrate,
'receiver_bitrate': receiver_bitrate
}
if audio_payload_type:
rtc_peer['audio'] = RtcPeerAudio(
sender_track = audio_sender_track,
receiver_track = audio_receiver_track,
codec = audio_codec
)
rtc_store.init_peers(session_id)
rtc_store.get_peers(session_id).append(rtc_peer)
threading.Thread(target = asyncio.run, args = (run_peer_loop(session_id, rtc_peer),), daemon = True).start()
return local_sdp
datachannel_module.create_static_library().rtcDeletePeerConnection(peer_connection)
return None
async def receive_vision_frames(websocket : WebSocket) -> AsyncIterator[VisionFrame]:
websocket_event = await websocket.receive()
while websocket_event.get('type') == 'websocket.receive':
frame_buffer = websocket_event.get('bytes') or bytes()
vision_frame = cv2.imdecode(numpy.frombuffer(frame_buffer, numpy.uint8), cv2.IMREAD_COLOR)
if numpy.any(vision_frame):
yield vision_frame
websocket_event = await websocket.receive()
#TODO: needs review
async def run_peer_loop(session_id : SessionId, rtc_peer : RtcPeer) -> None:
video_deque : deque[VideoPack] = deque(maxlen = 1)
audio_deque : deque[AudioPack] = deque(maxlen = 10)
video_event = threading.Event()
video_receiver_thread = asyncio.to_thread(receive_video_frames, rtc_peer.get('video'), video_deque, video_event)
video_encoder_thread = asyncio.to_thread(run_video_encode_loop, rtc_peer, video_deque, video_event)
coroutines = [ video_receiver_thread, video_encoder_thread ]
if rtc_peer.get('audio'):
audio_event = threading.Event()
coroutines.append(asyncio.to_thread(receive_audio_frames, rtc_peer.get('audio'), audio_deque, audio_event))
coroutines.append(asyncio.to_thread(run_audio_encode_loop, rtc_peer, audio_deque, audio_event))
await asyncio.gather(*coroutines)
rtc_store.delete_peers(session_id)
#TODO: needs review
def run_video_encode_loop(rtc_peer : RtcPeer, video_deque : deque[VideoPack], video_event : threading.Event) -> None:
video_event.wait()
video_event.clear()
video_codec = rtc_peer.get('video').get('codec')
temp_vision_frame, temp_video_time = video_deque.popleft()
if numpy.any(temp_vision_frame):
temp_resolution : Resolution = (temp_vision_frame.shape[1], temp_vision_frame.shape[0])
temp_bitrate : BitRate = 8000
video_encoder = create_video_encoder(video_codec, temp_resolution, temp_bitrate)
frame_index = 0
while numpy.any(temp_vision_frame):
output_vision_frame = streamer.process_frame(create_empty_audio_frame(), temp_vision_frame)
output_resolution : Resolution = (output_vision_frame.shape[1], output_vision_frame.shape[0])
output_vision_buffer = cv2.cvtColor(output_vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes()
peer_bitrate = rtc_peer.get('sender_bitrate').value
if output_resolution[0] - temp_resolution[0] or output_resolution[1] - temp_resolution[1]:
destroy_video_encoder(video_codec, video_encoder)
temp_resolution = output_resolution
video_encoder = create_video_encoder(video_codec, temp_resolution, temp_bitrate)
frame_index = 0
if peer_bitrate and peer_bitrate - temp_bitrate:
temp_bitrate = peer_bitrate
if not update_video_encoder_bitrate(video_codec, video_encoder, temp_bitrate):
destroy_video_encoder(video_codec, video_encoder)
video_encoder = create_video_encoder(video_codec, temp_resolution, temp_bitrate)
frame_index = 0
output_video_buffer = encode_video_frame(video_codec, video_encoder, output_vision_buffer, temp_resolution, frame_index)
if output_video_buffer:
rtc.send_video(rtc_peer, output_video_buffer, int(temp_video_time * 90000))
frame_index += 1
video_event.wait()
video_event.clear()
temp_vision_frame, temp_video_time = video_deque.popleft()
destroy_video_encoder(video_codec, video_encoder)
rtc.clear_remb(rtc_peer)
def run_audio_encode_loop(rtc_peer : RtcPeer, audio_deque : deque[AudioPack], audio_event : threading.Event) -> None:
audio_event.wait()
audio_event.clear()
temp_audio_frame, temp_audio_time = audio_deque.popleft()
audio_encoder = opus_encoder.create(48000, 2)
while numpy.any(temp_audio_frame):
output_audio_buffer = opus_encoder.encode(audio_encoder, temp_audio_frame.tobytes(), 960)
if output_audio_buffer:
rtc.send_audio(rtc_peer, output_audio_buffer, int(temp_audio_time * 48000))
if len(audio_deque) == 0:
audio_event.wait()
audio_event.clear()
temp_audio_frame, temp_audio_time = audio_deque.popleft()
opus_encoder.destroy(audio_encoder)
def fill_video_deque(video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder, video_buffer : bytes, video_deque : deque[VideoPack], video_event : threading.Event) -> None:
vision_frame = decode_video_frame(video_codec, video_decoder, video_buffer)
if numpy.any(vision_frame):
video_deque.append((vision_frame, time.monotonic()))
video_event.set()
def receive_video_frames(rtc_peer_video : RtcPeerVideo, video_deque : deque[VideoPack], video_event : threading.Event) -> None:
video_track = rtc_peer_video.get('receiver_track')
video_codec = rtc_peer_video.get('codec')
datachannel_library = datachannel_module.create_static_library()
video_decoder = create_video_decoder(video_codec)
receive_buffer = ctypes.create_string_buffer(512 * 1024)
available_event = create_event(video_track, datachannel_library)
receive_status_code = -3
while receive_status_code == 0 or receive_status_code == -3:
buffer_size = ctypes.c_int(512 * 1024)
receive_status_code = datachannel_library.rtcReceiveMessage(video_track, receive_buffer, ctypes.byref(buffer_size))
if receive_status_code == 0 and buffer_size.value > 0:
video_buffer = receive_buffer.raw[:buffer_size.value]
fill_video_deque(video_codec, video_decoder, video_buffer, video_deque, video_event)
if receive_status_code == -3:
available_event.wait()
available_event.clear()
empty_vision_frame = numpy.empty(0)
video_deque.append((empty_vision_frame, 0.0))
video_event.set()
destroy_video_decoder(video_codec, video_decoder)
def fill_audio_deque(audio_codec : AudioCodec, audio_decoder : OpusDecoder, audio_buffer : bytes, audio_deque : deque[AudioPack], audio_event : threading.Event) -> None:
audio_frame = decode_audio_frame(audio_codec, audio_decoder, audio_buffer)
if audio_frame:
audio_deque.append((numpy.frombuffer(audio_frame, dtype = numpy.float32), time.monotonic()))
audio_event.set()
def receive_audio_frames(rtc_peer_audio : RtcPeerAudio, audio_deque : deque[AudioPack], audio_event : threading.Event) -> None:
audio_track = rtc_peer_audio.get('receiver_track')
audio_codec = rtc_peer_audio.get('codec')
datachannel_library = datachannel_module.create_static_library()
audio_decoder = create_audio_decoder(audio_codec)
receive_buffer = ctypes.create_string_buffer(8 * 1024)
available_event = create_event(audio_track, datachannel_library)
receive_status_code = -3
while receive_status_code == 0 or receive_status_code == -3:
buffer_size = ctypes.c_int(8 * 1024)
receive_status_code = datachannel_library.rtcReceiveMessage(audio_track, receive_buffer, ctypes.byref(buffer_size))
if receive_status_code == 0 and buffer_size.value > 0:
audio_buffer = receive_buffer.raw[:buffer_size.value]
fill_audio_deque(audio_codec, audio_decoder, audio_buffer, audio_deque, audio_event)
if receive_status_code == -3:
available_event.wait()
available_event.clear()
empty_audio_frame = numpy.empty(0)
audio_deque.append((empty_audio_frame, 0.0))
audio_event.set()
destroy_audio_decoder(audio_codec, audio_decoder)
def decode_video_frame(video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder, input_buffer : bytes) -> Optional[VisionFrame]:
if video_codec == 'av1':
aom_pointer = aom_decoder.decode(video_decoder, input_buffer)
if aom_pointer:
return normalize_vision_frame(aom_pointer)
if video_codec == 'vp8':
vpx_pointer = vpx_decoder.decode(video_decoder, input_buffer)
if vpx_pointer:
return normalize_vision_frame(vpx_pointer)
return None
def decode_audio_frame(audio_codec : AudioCodec, audio_decoder : OpusDecoder, input_buffer : bytes) -> Optional[bytes]:
if audio_codec == 'opus':
return opus_decoder.decode(audio_decoder, input_buffer, 960, 2)
return None
def encode_video_frame(video_codec : VideoCodec, video_encoder : VpxEncoder | AomEncoder, input_buffer : bytes, frame_resolution : Resolution, frame_index : int) -> bytes:
if video_codec == 'av1':
return aom_encoder.encode(video_encoder, input_buffer, frame_resolution, frame_index)
if video_codec == 'vp8':
return vpx_encoder.encode(video_encoder, input_buffer, frame_resolution, frame_index)
return bytes()
def normalize_vision_frame(frame_pointer : AomPointer | VpxPointer) -> VisionFrame:
frame_width, frame_height = frame_pointer.get('resolution')
vision_frame = numpy.frombuffer(frame_pointer.get('buffer'), dtype = numpy.uint8).reshape((frame_height * 3 // 2, frame_width))
return cv2.cvtColor(vision_frame, cv2.COLOR_YUV2BGR_I420)
def create_audio_decoder(audio_codec : AudioCodec) -> Optional[OpusDecoder]:
if audio_codec == 'opus':
return opus_decoder.create(48000, 2)
return None
def create_video_decoder(video_codec : VideoCodec) -> Optional[VpxDecoder | AomDecoder]:
if video_codec == 'av1':
return aom_decoder.create(8)
if video_codec == 'vp8':
return vpx_decoder.create(8)
return None
def create_video_encoder(video_codec : VideoCodec, frame_resolution : Resolution, bitrate : BitRate) -> Optional[VpxEncoder | AomEncoder]:
if video_codec == 'av1':
return aom_encoder.create(frame_resolution, bitrate, 8, 10)
if video_codec == 'vp8':
return vpx_encoder.create(frame_resolution, bitrate, 8, 10)
return None
def destroy_audio_decoder(audio_codec : AudioCodec, audio_decoder : OpusDecoder) -> None:
if audio_codec == 'opus':
opus_decoder.destroy(audio_decoder)
def destroy_video_decoder(video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder) -> None:
if video_codec == 'av1':
aom_decoder.destroy(video_decoder)
if video_codec == 'vp8':
vpx_decoder.destroy(video_decoder)
def update_video_encoder_bitrate(video_codec : VideoCodec, video_encoder : VpxEncoder | AomEncoder, bitrate : BitRate) -> bool:
if video_codec == 'av1':
return aom_encoder.update_bitrate(video_encoder, bitrate)
if video_codec == 'vp8':
return vpx_encoder.update_bitrate(video_encoder, bitrate)
return False
def destroy_video_encoder(video_codec : VideoCodec, video_encoder : VpxEncoder | AomEncoder) -> None:
if video_codec == 'av1':
aom_encoder.destroy(video_encoder)
if video_codec == 'vp8':
vpx_encoder.destroy(video_encoder)
def destroy_stream(session_id : SessionId) -> bool:
if rtc_store.has_peers(session_id):
rtc_store.delete_peers(session_id)
return not rtc_store.has_peers(session_id)
return False
def create_event(track : int, datachannel_library : ctypes.CDLL) -> threading.Event:
available_event = threading.Event()
available_callback = ctypes.CFUNCTYPE(None, ctypes.c_int, ctypes.c_void_p)(partial(dispatch_event, available_event))
datachannel_library.rtcSetAvailableCallback(track, available_callback)
available_event.callback = available_callback # type: ignore[attr-defined]
return available_event
def dispatch_event(event : threading.Event, track : int, pointer : ctypes.c_void_p) -> None:
event.set()
+127
View File
@@ -0,0 +1,127 @@
import asyncio
import ctypes
import threading
from collections import deque
from collections.abc import AsyncIterator
from typing import Optional
import cv2
import numpy
from starlette.websockets import WebSocket
from facefusion import rtc, rtc_store, streamer
from facefusion.apis.stream_audio import receive_audio_frames, run_audio_encode_loop
from facefusion.apis.stream_video import receive_video_frames, run_video_encode_loop
from facefusion.audio import create_empty_audio_frame
from facefusion.libraries import datachannel as datachannel_module
from facefusion.types import AudioCodec, AudioPack, PeerConnection, RtcPeer, RtcPeerAudio, SdpAnswer, SdpOffer, SessionId, VideoCodec, VideoPack, VisionFrame
async def process_image(websocket : WebSocket) -> None:
capture_vision_frame = await anext(receive_vision_frames(websocket), None)
if numpy.any(capture_vision_frame):
output_vision_frame = streamer.process_frame(create_empty_audio_frame(), capture_vision_frame)
is_success, output_frame_buffer = cv2.imencode('.jpg', output_vision_frame)
if is_success:
await websocket.send_bytes(output_frame_buffer.tobytes())
def process_video(session_id : SessionId, sdp_offer : SdpOffer) -> Optional[SdpAnswer]:
video_codec : VideoCodec = 'vp8'
if rtc.get_payload_type(sdp_offer, 'av1'):
video_codec = 'av1'
video_payload_type = rtc.get_payload_type(sdp_offer, video_codec)
if video_payload_type:
peer_connection : PeerConnection = rtc.create_peer_connection()
video_receiver_track = rtc.add_video_track(peer_connection, 'recvonly', video_codec, video_payload_type)
video_sender_track = rtc.add_video_track(peer_connection, 'sendonly', video_codec, video_payload_type)
sender_bitrate = ctypes.c_uint(0)
receiver_bitrate = ctypes.c_uint(0)
rtc.wire_remb(video_sender_track, sender_bitrate)
rtc.wire_remb(video_receiver_track, receiver_bitrate)
audio_codec : AudioCodec = 'opus'
audio_payload_type = rtc.get_payload_type(sdp_offer, audio_codec)
if audio_payload_type:
audio_receiver_track = rtc.add_audio_track(peer_connection, 'recvonly', audio_codec, audio_payload_type)
audio_sender_track = rtc.add_audio_track(peer_connection, 'sendonly', audio_codec, audio_payload_type)
rtc.set_remote_description(peer_connection, sdp_offer)
local_sdp = rtc.create_sdp_answer(peer_connection)
if local_sdp:
rtc_peer : RtcPeer =\
{
'peer_connection': peer_connection,
'video':
{
'sender_track': video_sender_track,
'receiver_track': video_receiver_track,
'codec': video_codec
},
'sender_bitrate': sender_bitrate,
'receiver_bitrate': receiver_bitrate
}
if audio_payload_type:
rtc_peer['audio'] = RtcPeerAudio(
sender_track = audio_sender_track,
receiver_track = audio_receiver_track,
codec = audio_codec
)
rtc_store.init_peers(session_id)
rtc_store.get_peers(session_id).append(rtc_peer)
threading.Thread(target = asyncio.run, args = (run_peer_loop(session_id, rtc_peer),), daemon = True).start()
return local_sdp
datachannel_module.create_static_library().rtcDeletePeerConnection(peer_connection)
return None
async def receive_vision_frames(websocket : WebSocket) -> AsyncIterator[VisionFrame]:
websocket_event = await websocket.receive()
while websocket_event.get('type') == 'websocket.receive':
frame_buffer = websocket_event.get('bytes') or bytes()
vision_frame = cv2.imdecode(numpy.frombuffer(frame_buffer, numpy.uint8), cv2.IMREAD_COLOR)
if numpy.any(vision_frame):
yield vision_frame
websocket_event = await websocket.receive()
async def run_peer_loop(session_id : SessionId, rtc_peer : RtcPeer) -> None:
video_deque : deque[VideoPack] = deque(maxlen = 1)
audio_deque : deque[AudioPack] = deque(maxlen = 10)
video_event = threading.Event()
video_receiver_thread = asyncio.to_thread(receive_video_frames, rtc_peer.get('video'), video_deque, video_event)
video_encoder_thread = asyncio.to_thread(run_video_encode_loop, rtc_peer, video_deque, video_event)
coroutines = [ video_receiver_thread, video_encoder_thread ]
if rtc_peer.get('audio'):
audio_event = threading.Event()
coroutines.append(asyncio.to_thread(receive_audio_frames, rtc_peer.get('audio'), audio_deque, audio_event))
coroutines.append(asyncio.to_thread(run_audio_encode_loop, rtc_peer, audio_deque, audio_event))
await asyncio.gather(*coroutines)
rtc_store.delete_peers(session_id)
def destroy_stream(session_id : SessionId) -> bool:
if rtc_store.has_peers(session_id):
rtc_store.delete_peers(session_id)
return not rtc_store.has_peers(session_id)
return False
+175
View File
@@ -0,0 +1,175 @@
import ctypes
import threading
import time
from collections import deque
from typing import Optional
import cv2
import numpy
from facefusion import rtc, streamer
from facefusion.apis.stream_event import create_event
from facefusion.audio import create_empty_audio_frame
from facefusion.codecs import aom_decoder, aom_encoder, vpx_decoder, vpx_encoder
from facefusion.libraries import datachannel as datachannel_module
from facefusion.types import AomDecoder, AomEncoder, AomPointer, BitRate, Resolution, RtcPeer, RtcPeerVideo, VideoCodec, VideoPack, VisionFrame, VpxDecoder, VpxEncoder, VpxPointer
def run_video_encode_loop(rtc_peer : RtcPeer, video_deque : deque[VideoPack], video_event : threading.Event) -> None:
video_event.wait()
video_event.clear()
video_codec = rtc_peer.get('video').get('codec')
temp_vision_frame, temp_video_time = video_deque.popleft()
if numpy.any(temp_vision_frame):
temp_resolution : Resolution = (temp_vision_frame.shape[1], temp_vision_frame.shape[0])
temp_bitrate : BitRate = 8000
video_encoder = create_video_encoder(video_codec, temp_resolution, temp_bitrate)
frame_index = 0
while numpy.any(temp_vision_frame):
output_vision_frame = streamer.process_frame(create_empty_audio_frame(), temp_vision_frame)
output_resolution : Resolution = (output_vision_frame.shape[1], output_vision_frame.shape[0])
output_vision_buffer = cv2.cvtColor(output_vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes()
peer_bitrate = rtc_peer.get('sender_bitrate').value
if output_resolution[0] - temp_resolution[0] or output_resolution[1] - temp_resolution[1]:
destroy_video_encoder(video_codec, video_encoder)
temp_resolution = output_resolution
video_encoder = create_video_encoder(video_codec, temp_resolution, temp_bitrate)
frame_index = 0
if peer_bitrate and peer_bitrate - temp_bitrate:
temp_bitrate = peer_bitrate
if not update_video_encoder_bitrate(video_codec, video_encoder, temp_bitrate):
destroy_video_encoder(video_codec, video_encoder)
video_encoder = create_video_encoder(video_codec, temp_resolution, temp_bitrate)
frame_index = 0
output_video_buffer = encode_video_frame(video_codec, video_encoder, output_vision_buffer, temp_resolution, frame_index)
if output_video_buffer:
rtc.send_video(rtc_peer, output_video_buffer, int(temp_video_time * 90000))
frame_index += 1
video_event.wait()
video_event.clear()
temp_vision_frame, temp_video_time = video_deque.popleft()
destroy_video_encoder(video_codec, video_encoder)
rtc.clear_remb(rtc_peer)
def fill_video_deque(video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder, video_buffer : bytes, video_deque : deque[VideoPack], video_event : threading.Event) -> None:
vision_frame = decode_video_frame(video_codec, video_decoder, video_buffer)
if numpy.any(vision_frame):
video_deque.append((vision_frame, time.monotonic()))
video_event.set()
def receive_video_frames(rtc_peer_video : RtcPeerVideo, video_deque : deque[VideoPack], video_event : threading.Event) -> None:
video_track = rtc_peer_video.get('receiver_track')
video_codec = rtc_peer_video.get('codec')
datachannel_library = datachannel_module.create_static_library()
video_decoder = create_video_decoder(video_codec)
receive_buffer = ctypes.create_string_buffer(512 * 1024)
available_event = create_event(video_track, datachannel_library)
receive_status_code = -3
while receive_status_code == 0 or receive_status_code == -3:
buffer_size = ctypes.c_int(512 * 1024)
receive_status_code = datachannel_library.rtcReceiveMessage(video_track, receive_buffer, ctypes.byref(buffer_size))
if receive_status_code == 0 and buffer_size.value > 0:
video_buffer = receive_buffer.raw[:buffer_size.value]
fill_video_deque(video_codec, video_decoder, video_buffer, video_deque, video_event)
if receive_status_code == -3:
available_event.wait()
available_event.clear()
empty_vision_frame = numpy.empty(0)
video_deque.append((empty_vision_frame, 0.0))
video_event.set()
destroy_video_decoder(video_codec, video_decoder)
def decode_video_frame(video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder, input_buffer : bytes) -> Optional[VisionFrame]:
if video_codec == 'av1':
aom_pointer = aom_decoder.decode(video_decoder, input_buffer)
if aom_pointer:
return normalize_vision_frame(aom_pointer)
if video_codec == 'vp8':
vpx_pointer = vpx_decoder.decode(video_decoder, input_buffer)
if vpx_pointer:
return normalize_vision_frame(vpx_pointer)
return None
def encode_video_frame(video_codec : VideoCodec, video_encoder : VpxEncoder | AomEncoder, input_buffer : bytes, frame_resolution : Resolution, frame_index : int) -> bytes:
if video_codec == 'av1':
return aom_encoder.encode(video_encoder, input_buffer, frame_resolution, frame_index)
if video_codec == 'vp8':
return vpx_encoder.encode(video_encoder, input_buffer, frame_resolution, frame_index)
return bytes()
def normalize_vision_frame(frame_pointer : AomPointer | VpxPointer) -> VisionFrame:
frame_width, frame_height = frame_pointer.get('resolution')
vision_frame = numpy.frombuffer(frame_pointer.get('buffer'), dtype = numpy.uint8).reshape((frame_height * 3 // 2, frame_width))
return cv2.cvtColor(vision_frame, cv2.COLOR_YUV2BGR_I420)
def create_video_decoder(video_codec : VideoCodec) -> Optional[VpxDecoder | AomDecoder]:
if video_codec == 'av1':
return aom_decoder.create(8)
if video_codec == 'vp8':
return vpx_decoder.create(8)
return None
def create_video_encoder(video_codec : VideoCodec, frame_resolution : Resolution, bitrate : BitRate) -> Optional[VpxEncoder | AomEncoder]:
if video_codec == 'av1':
return aom_encoder.create(frame_resolution, bitrate, 8, 10)
if video_codec == 'vp8':
return vpx_encoder.create(frame_resolution, bitrate, 8, 10)
return None
def destroy_video_decoder(video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder) -> None:
if video_codec == 'av1':
aom_decoder.destroy(video_decoder)
if video_codec == 'vp8':
vpx_decoder.destroy(video_decoder)
def destroy_video_encoder(video_codec : VideoCodec, video_encoder : VpxEncoder | AomEncoder) -> None:
if video_codec == 'av1':
aom_encoder.destroy(video_encoder)
if video_codec == 'vp8':
vpx_encoder.destroy(video_encoder)
def update_video_encoder_bitrate(video_codec : VideoCodec, video_encoder : VpxEncoder | AomEncoder, bitrate : BitRate) -> bool:
if video_codec == 'av1':
return aom_encoder.update_bitrate(video_encoder, bitrate)
if video_codec == 'vp8':
return vpx_encoder.update_bitrate(video_encoder, bitrate)
return False
+116
View File
@@ -0,0 +1,116 @@
import ctypes
import threading
from collections import deque
from unittest.mock import MagicMock, patch
import numpy
import pytest
from facefusion import rtc, rtc_store, state_manager
from facefusion.apis.stream_audio import fill_audio_deque, receive_audio_frames, run_audio_encode_loop
from facefusion.download import conditional_download
from facefusion.ffmpeg import read_audio_buffer
from facefusion.hash_helper import create_hash
from facefusion.libraries import datachannel as datachannel_module, opus as opus_module
from facefusion.types import AudioCodec, AudioPack, RtcPeer, RtcPeerAudio
from .assert_helper import get_test_example_file, get_test_examples_directory
@pytest.fixture(scope = 'module', autouse = True)
def before_all() -> None:
state_manager.init_item('download_providers', [ 'github', 'huggingface' ])
state_manager.init_item('processors', [])
opus_module.pre_check()
datachannel_module.pre_check()
conditional_download(get_test_examples_directory(),
[
'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/source.mp3'
])
@pytest.fixture(scope = 'function', autouse = True)
def before_each() -> None:
rtc_store.clear()
def test_run_audio_encode_loop() -> None:
audio_buffer = read_audio_buffer(get_test_example_file('source.mp3'), 48000, 16, 2)
audio_frame = numpy.frombuffer(audio_buffer, dtype = numpy.int16).astype(numpy.float32) / 32768.0
peer_connection = rtc.create_peer_connection()
rtc_peer : RtcPeer =\
{
'peer_connection': peer_connection,
'video':
{
'sender_track': 0,
'receiver_track': 0,
'codec': 'vp8'
},
'sender_bitrate': ctypes.c_uint(0),
'receiver_bitrate': ctypes.c_uint(0)
}
audio_deque : deque[AudioPack] = deque()
audio_event = threading.Event()
audio_deque.append((audio_frame, 0.100))
audio_event.set()
encoder_mock = MagicMock()
encoder_mock.encode.return_value = bytes([ 1 ] * 32)
with patch('facefusion.apis.stream_audio.opus_encoder.encode', encoder_mock.encode):
with patch('facefusion.apis.stream_audio.rtc.send_audio') as send_audio_mock:
audio_loop_thread = threading.Thread(target = run_audio_encode_loop, args = (rtc_peer, audio_deque, audio_event), daemon = True)
audio_loop_thread.start()
audio_deque.append((numpy.empty(0), 0.0))
audio_event.set()
audio_loop_thread.join(timeout = 5.0)
assert encoder_mock.encode.called is True
assert send_audio_mock.called is True
def test_fill_audio_deque() -> None:
audio_buffer = read_audio_buffer(get_test_example_file('source.mp3'), 48000, 16, 2)
audio_frame = numpy.frombuffer(audio_buffer, dtype = numpy.int16).astype(numpy.float32) / 32768.0
audio_decoder_mock = MagicMock()
audio_deque : deque[AudioPack] = deque()
audio_event = threading.Event()
with patch('facefusion.apis.stream_audio.decode_audio_frame', return_value = audio_frame.tobytes()):
fill_audio_deque('opus', audio_decoder_mock, audio_frame.tobytes(), audio_deque, audio_event)
buffer_frame, _ = audio_deque.popleft()
assert audio_event.is_set()
assert create_hash(buffer_frame.tobytes()) == create_hash(audio_frame.tobytes())
@pytest.mark.parametrize('audio_codec', [ 'opus' ])
def test_receive_audio_frames(audio_codec : AudioCodec) -> None:
audio_buffer = read_audio_buffer(get_test_example_file('source.mp3'), 48000, 16, 2)
audio_frame = numpy.frombuffer(audio_buffer, dtype = numpy.int16).astype(numpy.float32) / 32768.0
audio_deque : deque[AudioPack] = deque()
audio_event = threading.Event()
datachannel_library_mock = MagicMock()
datachannel_library_mock.rtcReceiveMessage.side_effect = [ 0, -1 ]
with patch('facefusion.apis.stream_audio.datachannel_module.create_static_library', return_value = datachannel_library_mock):
with patch('facefusion.apis.stream_audio.decode_audio_frame', return_value = audio_frame.tobytes()):
rtc_peer_audio : RtcPeerAudio =\
{
'sender_track': 0,
'receiver_track': 0,
'codec': audio_codec
}
audio_receiver_thread = threading.Thread(target = receive_audio_frames, args = (rtc_peer_audio, audio_deque, audio_event), daemon = True)
audio_receiver_thread.start()
audio_receiver_thread.join(timeout = 5.0)
buffer_frame, _ = audio_deque.popleft()
assert create_hash(buffer_frame.tobytes()) == create_hash(audio_frame.tobytes())
-461
View File
@@ -1,461 +0,0 @@
import asyncio
import ctypes
import struct
import threading
from collections import deque
from unittest.mock import AsyncMock, MagicMock, patch
import cv2
import numpy
import pytest
from facefusion import rtc, rtc_store, state_manager
from facefusion.apis.stream_helper import create_video_decoder, create_video_encoder, decode_video_frame, destroy_stream, destroy_video_decoder, destroy_video_encoder, encode_video_frame, fill_audio_deque, fill_video_deque, process_image, process_video, receive_audio_frames, receive_video_frames, receive_vision_frames, run_audio_encode_loop, run_peer_loop, run_video_encode_loop, update_video_encoder_bitrate
from facefusion.codecs import aom_encoder, vpx_encoder
from facefusion.common_helper import is_linux, is_macos, is_windows
from facefusion.download import conditional_download
from facefusion.ffmpeg import read_audio_buffer
from facefusion.hash_helper import create_hash
from facefusion.libraries import aom as aom_module, datachannel as datachannel_module, opus as opus_module, vpx as vpx_module
from facefusion.types import AudioCodec, AudioPack, RtcPeer, RtcPeerAudio, RtcPeerVideo, SessionId, VideoCodec, VideoPack
from facefusion.vision import read_video_frame
from .assert_helper import get_test_example_file, get_test_examples_directory
@pytest.fixture(scope = 'module', autouse = True)
def before_all() -> None:
state_manager.init_item('download_providers', [ 'github', 'huggingface' ])
state_manager.init_item('processors', [])
aom_module.pre_check()
vpx_module.pre_check()
opus_module.pre_check()
datachannel_module.pre_check()
conditional_download(get_test_examples_directory(),
[
'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/target-240p.mp4',
'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/source.jpg',
'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/source.mp3'
])
@pytest.fixture(scope = 'function', autouse = True)
def before_each() -> None:
rtc_store.clear()
@pytest.mark.anyio
async def test_process_image() -> None:
image_buffer = open(get_test_example_file('source.jpg'), 'rb').read()
websocket_mock = AsyncMock()
websocket_mock.receive.side_effect =\
[
{
'type': 'websocket.receive',
'bytes': image_buffer
}
]
await process_image(websocket_mock)
websocket_mock.send_bytes.assert_called_once()
if is_linux() or is_windows():
assert create_hash(websocket_mock.send_bytes.call_args[0][0]) == '0142782f'
@pytest.mark.parametrize('video_codec, session_id', [ ('av1', 'test-process-video-av1'), ('vp8', 'test-process-video-vp8') ])
def test_process_video(video_codec : VideoCodec, session_id : str) -> None:
peer_connection = rtc.create_peer_connection()
if video_codec == 'av1':
rtc.add_video_track(peer_connection, 'sendrecv', video_codec, 35)
if video_codec == 'vp8':
rtc.add_video_track(peer_connection, 'sendrecv', video_codec, 96)
rtc.add_audio_track(peer_connection, 'sendrecv', 'opus', 111)
sdp_offer = rtc.create_sdp_offer(peer_connection)
datachannel_module.create_static_library().rtcDeletePeerConnection(peer_connection)
with patch('facefusion.apis.stream_helper.threading.Thread'):
sdp_answer = process_video(session_id, sdp_offer)
assert sdp_answer
assert 'm=video' in sdp_answer
assert 'a=recvonly' in sdp_answer
assert 'a=sendonly' in sdp_answer
for peer in rtc_store.get_peers(session_id):
sender_bitrate = peer.get('sender_bitrate')
receiver_bitrate = peer.get('receiver_bitrate')
assert sender_bitrate.value == 0
assert receiver_bitrate.value == 0
rtc.handle_remb(0, 8000000, ctypes.addressof(sender_bitrate))
assert sender_bitrate.value == 8000
rtc.handle_remb(0, 4000000, ctypes.addressof(receiver_bitrate))
assert receiver_bitrate.value == 4000
@pytest.mark.anyio
async def test_receive_vision_frames() -> None:
image_buffer = open(get_test_example_file('source.jpg'), 'rb').read()
websocket_mock = AsyncMock()
websocket_mock.receive.side_effect =\
[
{
'type': 'websocket.receive',
'bytes': image_buffer
},
{
'type': 'websocket.receive',
'bytes': 'invalid'.encode()
},
{
'type': 'websocket.disconnect'
}
]
vision_frames = receive_vision_frames(websocket_mock)
assert create_hash((await anext(vision_frames)).tobytes()) == '5ed32ca0'
@pytest.mark.parametrize('video_codec, payload_type, session_id', [ ('av1', 35, 'test-run-peer-loop-av1'), ('vp8', 96, 'test-run-peer-loop-vp8') ])
def test_run_peer_loop(video_codec : VideoCodec, payload_type : int, session_id : SessionId) -> None:
peer_connection = rtc.create_peer_connection()
video_sender_track = rtc.add_video_track(peer_connection, 'sendonly', video_codec, payload_type)
video_receiver_track = rtc.add_video_track(peer_connection, 'recvonly', video_codec, payload_type)
rtc_peer : RtcPeer =\
{
'peer_connection': peer_connection,
'video':
{
'sender_track': video_sender_track,
'receiver_track': video_receiver_track,
'codec': video_codec
},
'sender_bitrate': ctypes.c_uint(0),
'receiver_bitrate': ctypes.c_uint(0)
}
rtc_store.init_peers(session_id)
rtc_store.get_peers(session_id).append(rtc_peer)
assert rtc_store.has_peers(session_id) is True
with patch('facefusion.apis.stream_helper.receive_video_frames'):
with patch('facefusion.apis.stream_helper.run_video_encode_loop'):
thread = threading.Thread(target = asyncio.run, args = (run_peer_loop(session_id, rtc_peer),), daemon = True)
thread.start()
thread.join(timeout = 5.0)
assert rtc_store.has_peers(session_id) is False
@pytest.mark.parametrize('video_codec, payload_type', [ ('av1', 35), ('vp8', 96) ])
def test_run_video_encode_loop(video_codec : VideoCodec, payload_type : int) -> None:
video_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
peer_connection = rtc.create_peer_connection()
video_sender_track = rtc.add_video_track(peer_connection, 'sendonly', video_codec, payload_type)
video_receiver_track = rtc.add_video_track(peer_connection, 'recvonly', video_codec, payload_type)
rtc_peer : RtcPeer =\
{
'peer_connection': peer_connection,
'video':
{
'sender_track': video_sender_track,
'receiver_track': video_receiver_track,
'codec': video_codec
},
'sender_bitrate': ctypes.c_uint(0),
'receiver_bitrate': ctypes.c_uint(0)
}
video_deque : deque[VideoPack] = deque()
video_event = threading.Event()
video_deque.append((video_frame, 0.1))
video_event.set()
with patch('facefusion.apis.stream_helper.rtc.send_video') as send_video_mock:
encode_loop_thread = threading.Thread(target = run_video_encode_loop, args = (rtc_peer, video_deque, video_event), daemon = True)
encode_loop_thread.start()
empty_vision_frame = numpy.empty(0)
video_deque.append((empty_vision_frame, 0.0))
video_event.set()
encode_loop_thread.join(timeout = 5.0)
assert send_video_mock.called
if video_codec == 'av1':
if is_linux() or is_windows():
assert create_hash(send_video_mock.call_args[0][1]) == '9ba7212b'
if is_macos():
pytest.skip()
if video_codec == 'vp8':
pytest.skip()
def test_run_audio_encode_loop() -> None:
audio_buffer = read_audio_buffer(get_test_example_file('source.mp3'), 48000, 16, 2)
audio_frame = numpy.frombuffer(audio_buffer, dtype = numpy.int16).astype(numpy.float32) / 32768.0
peer_connection = rtc.create_peer_connection()
rtc_peer : RtcPeer =\
{
'peer_connection': peer_connection,
'video':
{
'sender_track': 0,
'receiver_track': 0,
'codec': 'vp8'
},
'sender_bitrate': ctypes.c_uint(0),
'receiver_bitrate': ctypes.c_uint(0)
}
audio_deque : deque[AudioPack] = deque()
audio_event = threading.Event()
audio_deque.append((audio_frame, 0.100))
audio_event.set()
encoder_mock = MagicMock()
encoder_mock.encode.return_value = bytes([ 1 ] * 32)
with patch('facefusion.apis.stream_helper.opus_encoder.encode', encoder_mock.encode):
with patch('facefusion.apis.stream_helper.rtc.send_audio') as send_audio_mock:
audio_loop_thread = threading.Thread(target = run_audio_encode_loop, args = (rtc_peer, audio_deque, audio_event), daemon = True)
audio_loop_thread.start()
audio_deque.append((numpy.empty(0), 0.0))
audio_event.set()
audio_loop_thread.join(timeout = 5.0)
assert encoder_mock.encode.called is True
assert send_audio_mock.called is True
@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ])
def test_fill_video_deque(video_codec : VideoCodec) -> None:
video_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
input_buffer = cv2.cvtColor(video_frame, cv2.COLOR_BGR2YUV_I420).tobytes()
video_encoder = create_video_encoder(video_codec, (426, 226), 1000)
video_decoder = create_video_decoder(video_codec)
encode_buffer = encode_video_frame(video_codec, video_encoder, input_buffer, (426, 226), 0)
video_deque : deque[VideoPack] = deque()
video_event = threading.Event()
fill_video_deque(video_codec, video_decoder, encode_buffer, video_deque, video_event)
vision_frame, _ = video_deque.popleft()
assert video_event.is_set()
if is_linux() or is_windows():
if video_codec == 'av1':
assert create_hash(vision_frame.tobytes()) == 'c97d6d29'
if video_codec == 'vp8':
assert create_hash(vision_frame.tobytes()) == '99ef2c25'
if is_macos():
if video_codec == 'av1':
assert create_hash(vision_frame.tobytes()) == 'eafd1fab'
if video_codec == 'vp8':
assert create_hash(vision_frame.tobytes()) == 'ff3ecb43'
@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ])
def test_receive_video_frames(video_codec : VideoCodec) -> None:
video_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
video_deque : deque[VideoPack] = deque()
video_event = threading.Event()
datachannel_library_mock = MagicMock()
datachannel_library_mock.rtcReceiveMessage.side_effect = [ 0, -1 ]
with patch('facefusion.apis.stream_helper.datachannel_module.create_static_library', return_value = datachannel_library_mock):
with patch('facefusion.apis.stream_helper.decode_video_frame', return_value = video_frame):
rtc_peer_video : RtcPeerVideo =\
{
'sender_track': 0,
'receiver_track': 0,
'codec': video_codec
}
video_receiver_thread = threading.Thread(target = receive_video_frames, args = (rtc_peer_video, video_deque, video_event), daemon = True)
video_receiver_thread.start()
video_receiver_thread.join(timeout = 5.0)
vision_frame, _ = video_deque.popleft()
if is_linux() or is_windows():
assert create_hash(vision_frame.tobytes()) == 'a17439db'
if is_macos():
assert create_hash(vision_frame.tobytes()) == '38d00e2a'
def test_fill_audio_deque() -> None:
audio_buffer = read_audio_buffer(get_test_example_file('source.mp3'), 48000, 16, 2)
audio_frame = numpy.frombuffer(audio_buffer, dtype = numpy.int16).astype(numpy.float32) / 32768.0
audio_decoder_mock = MagicMock()
audio_deque : deque[AudioPack] = deque()
audio_event = threading.Event()
with patch('facefusion.apis.stream_helper.decode_audio_frame', return_value = audio_frame.tobytes()):
fill_audio_deque('opus', audio_decoder_mock, audio_frame.tobytes(), audio_deque, audio_event)
buffer_frame, _ = audio_deque.popleft()
assert audio_event.is_set()
assert create_hash(buffer_frame.tobytes()) == create_hash(audio_frame.tobytes())
@pytest.mark.parametrize('audio_codec', [ 'opus' ])
def test_receive_audio_frames(audio_codec : AudioCodec) -> None:
audio_buffer = read_audio_buffer(get_test_example_file('source.mp3'), 48000, 16, 2)
audio_frame = numpy.frombuffer(audio_buffer, dtype = numpy.int16).astype(numpy.float32) / 32768.0
audio_deque : deque[AudioPack] = deque()
audio_event = threading.Event()
datachannel_library_mock = MagicMock()
datachannel_library_mock.rtcReceiveMessage.side_effect = [ 0, -1 ]
with patch('facefusion.apis.stream_helper.datachannel_module.create_static_library', return_value = datachannel_library_mock):
with patch('facefusion.apis.stream_helper.decode_audio_frame', return_value = audio_frame.tobytes()):
rtc_peer_audio : RtcPeerAudio =\
{
'sender_track': 0,
'receiver_track': 0,
'codec': audio_codec
}
audio_receiver_thread = threading.Thread(target = receive_audio_frames, args = (rtc_peer_audio, audio_deque, audio_event), daemon = True)
audio_receiver_thread.start()
audio_receiver_thread.join(timeout = 5.0)
buffer_frame, _ = audio_deque.popleft()
assert create_hash(buffer_frame.tobytes()) == create_hash(audio_frame.tobytes())
@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ])
def test_encode_and_decode_video_frame(video_codec : VideoCodec) -> None:
video_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
input_buffer = cv2.cvtColor(video_frame, cv2.COLOR_BGR2YUV_I420).tobytes()
video_encoder = create_video_encoder(video_codec, (426, 226), 1000)
video_decoder = create_video_decoder(video_codec)
encode_buffer = encode_video_frame(video_codec, video_encoder, input_buffer, (426, 226), 0)
decode_buffer = decode_video_frame(video_codec, video_decoder, encode_buffer).tobytes()
if is_linux() or is_windows():
if video_codec == 'av1':
assert create_hash(decode_buffer) == 'c97d6d29'
if video_codec == 'vp8':
assert create_hash(decode_buffer) == '99ef2c25'
if is_macos():
if video_codec == 'av1':
assert create_hash(decode_buffer) == 'eafd1fab'
if video_codec == 'vp8':
assert create_hash(decode_buffer) == 'ff3ecb43'
assert decode_video_frame(video_codec, video_decoder, bytes()) is None
@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ])
def test_create_and_destroy_video_decoder(video_codec : VideoCodec) -> None:
video_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
input_buffer = cv2.cvtColor(video_frame, cv2.COLOR_BGR2YUV_I420).tobytes()
if video_codec == 'av1':
video_encoder = aom_encoder.create((426, 226), 1000, 1, 0)
encode_buffer = aom_encoder.encode(video_encoder, input_buffer, (426, 226), 0)
if video_codec == 'vp8':
video_encoder = vpx_encoder.create((426, 226), 1000, 1, 0)
encode_buffer = vpx_encoder.encode(video_encoder, input_buffer, (426, 226), 0)
video_decoder = create_video_decoder(video_codec)
assert numpy.any(decode_video_frame(video_codec, video_decoder, encode_buffer))
destroy_video_decoder(video_codec, video_decoder)
assert decode_video_frame(video_codec, video_decoder, encode_buffer) is None
@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ])
def test_create_and_destroy_video_encoder(video_codec : VideoCodec) -> None:
video_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
input_buffer = cv2.cvtColor(video_frame, cv2.COLOR_BGR2YUV_I420).tobytes()
video_encoder = create_video_encoder(video_codec, (426, 226), 4000)
if video_codec == 'av1':
assert aom_encoder.encode(video_encoder, input_buffer, (426, 226), 0)
if video_codec == 'vp8':
assert vpx_encoder.encode(video_encoder, input_buffer, (426, 226), 0)
destroy_video_encoder(video_codec, video_encoder)
if video_codec == 'av1':
assert aom_encoder.encode(video_encoder, input_buffer, (426, 226), 1) == bytes()
if video_codec == 'vp8':
assert vpx_encoder.encode(video_encoder, input_buffer, (426, 226), 1) == bytes()
@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ])
def test_update_video_encoder_bitrate(video_codec : VideoCodec) -> None:
video_encoder = create_video_encoder(video_codec, (426, 226), 4000)
if video_codec == 'av1':
assert struct.unpack_from('I', video_encoder, 128 + 136)[0] == 4000
if video_codec == 'vp8':
assert struct.unpack_from('I', video_encoder, 64 + 112)[0] == 4000
assert update_video_encoder_bitrate(video_codec, video_encoder, 6000)
if video_codec == 'av1':
assert struct.unpack_from('I', video_encoder, 128 + 136)[0] == 6000
if video_codec == 'vp8':
assert struct.unpack_from('I', video_encoder, 64 + 112)[0] == 6000
destroy_video_encoder(video_codec, video_encoder)
def test_destroy_stream() -> None:
peer_connection = rtc.create_peer_connection()
rtc.add_video_track(peer_connection, 'sendonly', 'vp8', 96)
rtc_peer : RtcPeer =\
{
'peer_connection': peer_connection,
'video':
{
'sender_track': 0,
'receiver_track': 0,
'codec': 'vp8'
},
'sender_bitrate': ctypes.c_uint(0),
'receiver_bitrate': ctypes.c_uint(0)
}
session_id = 'test-destroy-stream'
rtc_store.init_peers(session_id)
rtc_store.get_peers(session_id).append(rtc_peer)
assert destroy_stream(session_id) is True
assert rtc_store.get_peers(session_id) is None
assert destroy_stream(session_id) is False
+171
View File
@@ -0,0 +1,171 @@
import asyncio
import ctypes
import threading
from unittest.mock import AsyncMock, patch
import pytest
from facefusion import rtc, rtc_store, state_manager
from facefusion.apis.stream_manager import destroy_stream, process_image, process_video, receive_vision_frames, run_peer_loop
from facefusion.common_helper import is_linux, is_windows
from facefusion.download import conditional_download
from facefusion.hash_helper import create_hash
from facefusion.libraries import datachannel as datachannel_module
from facefusion.types import RtcPeer, SessionId, VideoCodec
from .assert_helper import get_test_example_file, get_test_examples_directory
@pytest.fixture(scope = 'module', autouse = True)
def before_all() -> None:
state_manager.init_item('download_providers', [ 'github', 'huggingface' ])
state_manager.init_item('processors', [])
datachannel_module.pre_check()
conditional_download(get_test_examples_directory(),
[
'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/source.jpg'
])
@pytest.fixture(scope = 'function', autouse = True)
def before_each() -> None:
rtc_store.clear()
@pytest.mark.anyio
async def test_process_image() -> None:
image_buffer = open(get_test_example_file('source.jpg'), 'rb').read()
websocket_mock = AsyncMock()
websocket_mock.receive.side_effect =\
[
{
'type': 'websocket.receive',
'bytes': image_buffer
}
]
await process_image(websocket_mock)
websocket_mock.send_bytes.assert_called_once()
if is_linux() or is_windows():
assert create_hash(websocket_mock.send_bytes.call_args[0][0]) == '0142782f'
@pytest.mark.parametrize('video_codec, session_id', [ ('av1', 'test-process-video-av1'), ('vp8', 'test-process-video-vp8') ])
def test_process_video(video_codec : VideoCodec, session_id : str) -> None:
peer_connection = rtc.create_peer_connection()
if video_codec == 'av1':
rtc.add_video_track(peer_connection, 'sendrecv', video_codec, 35)
if video_codec == 'vp8':
rtc.add_video_track(peer_connection, 'sendrecv', video_codec, 96)
rtc.add_audio_track(peer_connection, 'sendrecv', 'opus', 111)
sdp_offer = rtc.create_sdp_offer(peer_connection)
datachannel_module.create_static_library().rtcDeletePeerConnection(peer_connection)
with patch('facefusion.apis.stream_manager.threading.Thread'):
sdp_answer = process_video(session_id, sdp_offer)
assert sdp_answer
assert 'm=video' in sdp_answer
assert 'a=recvonly' in sdp_answer
assert 'a=sendonly' in sdp_answer
for peer in rtc_store.get_peers(session_id):
sender_bitrate = peer.get('sender_bitrate')
receiver_bitrate = peer.get('receiver_bitrate')
assert sender_bitrate.value == 0
assert receiver_bitrate.value == 0
rtc.handle_remb(0, 8000000, ctypes.addressof(sender_bitrate))
assert sender_bitrate.value == 8000
rtc.handle_remb(0, 4000000, ctypes.addressof(receiver_bitrate))
assert receiver_bitrate.value == 4000
@pytest.mark.anyio
async def test_receive_vision_frames() -> None:
image_buffer = open(get_test_example_file('source.jpg'), 'rb').read()
websocket_mock = AsyncMock()
websocket_mock.receive.side_effect =\
[
{
'type': 'websocket.receive',
'bytes': image_buffer
},
{
'type': 'websocket.receive',
'bytes': 'invalid'.encode()
},
{
'type': 'websocket.disconnect'
}
]
vision_frames = receive_vision_frames(websocket_mock)
assert create_hash((await anext(vision_frames)).tobytes()) == '5ed32ca0'
@pytest.mark.parametrize('video_codec, payload_type, session_id', [ ('av1', 35, 'test-run-peer-loop-av1'), ('vp8', 96, 'test-run-peer-loop-vp8') ])
def test_run_peer_loop(video_codec : VideoCodec, payload_type : int, session_id : SessionId) -> None:
peer_connection = rtc.create_peer_connection()
video_sender_track = rtc.add_video_track(peer_connection, 'sendonly', video_codec, payload_type)
video_receiver_track = rtc.add_video_track(peer_connection, 'recvonly', video_codec, payload_type)
rtc_peer : RtcPeer =\
{
'peer_connection': peer_connection,
'video':
{
'sender_track': video_sender_track,
'receiver_track': video_receiver_track,
'codec': video_codec
},
'sender_bitrate': ctypes.c_uint(0),
'receiver_bitrate': ctypes.c_uint(0)
}
rtc_store.init_peers(session_id)
rtc_store.get_peers(session_id).append(rtc_peer)
assert rtc_store.has_peers(session_id) is True
with patch('facefusion.apis.stream_manager.receive_video_frames'):
with patch('facefusion.apis.stream_manager.run_video_encode_loop'):
thread = threading.Thread(target = asyncio.run, args = (run_peer_loop(session_id, rtc_peer),), daemon = True)
thread.start()
thread.join(timeout = 5.0)
assert rtc_store.has_peers(session_id) is False
def test_destroy_stream() -> None:
peer_connection = rtc.create_peer_connection()
rtc.add_video_track(peer_connection, 'sendonly', 'vp8', 96)
rtc_peer : RtcPeer =\
{
'peer_connection': peer_connection,
'video':
{
'sender_track': 0,
'receiver_track': 0,
'codec': 'vp8'
},
'sender_bitrate': ctypes.c_uint(0),
'receiver_bitrate': ctypes.c_uint(0)
}
session_id = 'test-destroy-stream'
rtc_store.init_peers(session_id)
rtc_store.get_peers(session_id).append(rtc_peer)
assert destroy_stream(session_id) is True
assert rtc_store.get_peers(session_id) is None
assert destroy_stream(session_id) is False
+237
View File
@@ -0,0 +1,237 @@
import ctypes
import struct
import threading
from collections import deque
from unittest.mock import MagicMock, patch
import cv2
import numpy
import pytest
from facefusion import rtc, rtc_store, state_manager
from facefusion.apis.stream_video import create_video_decoder, create_video_encoder, decode_video_frame, destroy_video_decoder, destroy_video_encoder, encode_video_frame, fill_video_deque, receive_video_frames, run_video_encode_loop, update_video_encoder_bitrate
from facefusion.codecs import aom_encoder, vpx_encoder
from facefusion.common_helper import is_linux, is_macos, is_windows
from facefusion.download import conditional_download
from facefusion.hash_helper import create_hash
from facefusion.libraries import aom as aom_module, datachannel as datachannel_module, vpx as vpx_module
from facefusion.types import RtcPeer, RtcPeerVideo, VideoCodec, VideoPack
from facefusion.vision import read_video_frame
from .assert_helper import get_test_example_file, get_test_examples_directory
@pytest.fixture(scope = 'module', autouse = True)
def before_all() -> None:
state_manager.init_item('download_providers', [ 'github', 'huggingface' ])
state_manager.init_item('processors', [])
aom_module.pre_check()
vpx_module.pre_check()
datachannel_module.pre_check()
conditional_download(get_test_examples_directory(),
[
'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/target-240p.mp4'
])
@pytest.fixture(scope = 'function', autouse = True)
def before_each() -> None:
rtc_store.clear()
@pytest.mark.parametrize('video_codec, payload_type', [ ('av1', 35), ('vp8', 96) ])
def test_run_video_encode_loop(video_codec : VideoCodec, payload_type : int) -> None:
video_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
peer_connection = rtc.create_peer_connection()
video_sender_track = rtc.add_video_track(peer_connection, 'sendonly', video_codec, payload_type)
video_receiver_track = rtc.add_video_track(peer_connection, 'recvonly', video_codec, payload_type)
rtc_peer : RtcPeer =\
{
'peer_connection': peer_connection,
'video':
{
'sender_track': video_sender_track,
'receiver_track': video_receiver_track,
'codec': video_codec
},
'sender_bitrate': ctypes.c_uint(0),
'receiver_bitrate': ctypes.c_uint(0)
}
video_deque : deque[VideoPack] = deque()
video_event = threading.Event()
video_deque.append((video_frame, 0.1))
video_event.set()
with patch('facefusion.apis.stream_video.rtc.send_video') as send_video_mock:
encode_loop_thread = threading.Thread(target = run_video_encode_loop, args = (rtc_peer, video_deque, video_event), daemon = True)
encode_loop_thread.start()
empty_vision_frame = numpy.empty(0)
video_deque.append((empty_vision_frame, 0.0))
video_event.set()
encode_loop_thread.join(timeout = 5.0)
assert send_video_mock.called
if video_codec == 'av1':
if is_linux() or is_windows():
assert create_hash(send_video_mock.call_args[0][1]) == '9ba7212b'
if is_macos():
pytest.skip()
if video_codec == 'vp8':
pytest.skip()
@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ])
def test_fill_video_deque(video_codec : VideoCodec) -> None:
video_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
input_buffer = cv2.cvtColor(video_frame, cv2.COLOR_BGR2YUV_I420).tobytes()
video_encoder = create_video_encoder(video_codec, (426, 226), 1000)
video_decoder = create_video_decoder(video_codec)
encode_buffer = encode_video_frame(video_codec, video_encoder, input_buffer, (426, 226), 0)
video_deque : deque[VideoPack] = deque()
video_event = threading.Event()
fill_video_deque(video_codec, video_decoder, encode_buffer, video_deque, video_event)
vision_frame, _ = video_deque.popleft()
assert video_event.is_set()
if is_linux() or is_windows():
if video_codec == 'av1':
assert create_hash(vision_frame.tobytes()) == 'c97d6d29'
if video_codec == 'vp8':
assert create_hash(vision_frame.tobytes()) == '99ef2c25'
if is_macos():
if video_codec == 'av1':
assert create_hash(vision_frame.tobytes()) == 'eafd1fab'
if video_codec == 'vp8':
assert create_hash(vision_frame.tobytes()) == 'ff3ecb43'
@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ])
def test_receive_video_frames(video_codec : VideoCodec) -> None:
video_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
video_deque : deque[VideoPack] = deque()
video_event = threading.Event()
datachannel_library_mock = MagicMock()
datachannel_library_mock.rtcReceiveMessage.side_effect = [ 0, -1 ]
with patch('facefusion.apis.stream_video.datachannel_module.create_static_library', return_value = datachannel_library_mock):
with patch('facefusion.apis.stream_video.decode_video_frame', return_value = video_frame):
rtc_peer_video : RtcPeerVideo =\
{
'sender_track': 0,
'receiver_track': 0,
'codec': video_codec
}
video_receiver_thread = threading.Thread(target = receive_video_frames, args = (rtc_peer_video, video_deque, video_event), daemon = True)
video_receiver_thread.start()
video_receiver_thread.join(timeout = 5.0)
vision_frame, _ = video_deque.popleft()
if is_linux() or is_windows():
assert create_hash(vision_frame.tobytes()) == 'a17439db'
if is_macos():
assert create_hash(vision_frame.tobytes()) == '38d00e2a'
@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ])
def test_encode_and_decode_video_frame(video_codec : VideoCodec) -> None:
video_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
input_buffer = cv2.cvtColor(video_frame, cv2.COLOR_BGR2YUV_I420).tobytes()
video_encoder = create_video_encoder(video_codec, (426, 226), 1000)
video_decoder = create_video_decoder(video_codec)
encode_buffer = encode_video_frame(video_codec, video_encoder, input_buffer, (426, 226), 0)
decode_buffer = decode_video_frame(video_codec, video_decoder, encode_buffer).tobytes()
if is_linux() or is_windows():
if video_codec == 'av1':
assert create_hash(decode_buffer) == 'c97d6d29'
if video_codec == 'vp8':
assert create_hash(decode_buffer) == '99ef2c25'
if is_macos():
if video_codec == 'av1':
assert create_hash(decode_buffer) == 'eafd1fab'
if video_codec == 'vp8':
assert create_hash(decode_buffer) == 'ff3ecb43'
assert decode_video_frame(video_codec, video_decoder, bytes()) is None
@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ])
def test_create_and_destroy_video_decoder(video_codec : VideoCodec) -> None:
video_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
input_buffer = cv2.cvtColor(video_frame, cv2.COLOR_BGR2YUV_I420).tobytes()
if video_codec == 'av1':
video_encoder = aom_encoder.create((426, 226), 1000, 1, 0)
encode_buffer = aom_encoder.encode(video_encoder, input_buffer, (426, 226), 0)
if video_codec == 'vp8':
video_encoder = vpx_encoder.create((426, 226), 1000, 1, 0)
encode_buffer = vpx_encoder.encode(video_encoder, input_buffer, (426, 226), 0)
video_decoder = create_video_decoder(video_codec)
assert numpy.any(decode_video_frame(video_codec, video_decoder, encode_buffer))
destroy_video_decoder(video_codec, video_decoder)
assert decode_video_frame(video_codec, video_decoder, encode_buffer) is None
@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ])
def test_create_and_destroy_video_encoder(video_codec : VideoCodec) -> None:
video_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
input_buffer = cv2.cvtColor(video_frame, cv2.COLOR_BGR2YUV_I420).tobytes()
video_encoder = create_video_encoder(video_codec, (426, 226), 4000)
if video_codec == 'av1':
assert aom_encoder.encode(video_encoder, input_buffer, (426, 226), 0)
if video_codec == 'vp8':
assert vpx_encoder.encode(video_encoder, input_buffer, (426, 226), 0)
destroy_video_encoder(video_codec, video_encoder)
if video_codec == 'av1':
assert aom_encoder.encode(video_encoder, input_buffer, (426, 226), 1) == bytes()
if video_codec == 'vp8':
assert vpx_encoder.encode(video_encoder, input_buffer, (426, 226), 1) == bytes()
@pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ])
def test_update_video_encoder_bitrate(video_codec : VideoCodec) -> None:
video_encoder = create_video_encoder(video_codec, (426, 226), 4000)
if video_codec == 'av1':
assert struct.unpack_from('I', video_encoder, 128 + 136)[0] == 4000
if video_codec == 'vp8':
assert struct.unpack_from('I', video_encoder, 64 + 112)[0] == 4000
assert update_video_encoder_bitrate(video_codec, video_encoder, 6000)
if video_codec == 'av1':
assert struct.unpack_from('I', video_encoder, 128 + 136)[0] == 6000
if video_codec == 'vp8':
assert struct.unpack_from('I', video_encoder, 64 + 112)[0] == 6000
destroy_video_encoder(video_codec, video_encoder)