From e7d22e84bf1f8100b091171499eb893ce1e64d04 Mon Sep 17 00:00:00 2001 From: Henry Ruhs Date: Thu, 11 Jun 2026 13:03:29 +0200 Subject: [PATCH] use Buffer and BufferPack everywhere (#1154) --- facefusion/apis/stream_audio.py | 11 +++++------ facefusion/apis/stream_manager.py | 7 +++---- facefusion/apis/stream_video.py | 30 +++++++++++++++++------------- facefusion/codecs/aom_decoder.py | 8 ++++---- facefusion/codecs/aom_encoder.py | 6 +++--- facefusion/codecs/opus_decoder.py | 4 ++-- facefusion/codecs/opus_encoder.py | 4 ++-- facefusion/codecs/vpx_decoder.py | 8 ++++---- facefusion/codecs/vpx_encoder.py | 6 +++--- facefusion/download.py | 4 ++-- facefusion/ffmpeg.py | 20 ++++++++++---------- facefusion/ffprobe.py | 4 ++-- facefusion/hash_helper.py | 5 +++-- facefusion/rtc.py | 6 +++--- facefusion/streamer.py | 4 ++-- facefusion/types.py | 12 ++++-------- tests/test_api_stream_video.py | 20 ++++++++++---------- 17 files changed, 79 insertions(+), 80 deletions(-) diff --git a/facefusion/apis/stream_audio.py b/facefusion/apis/stream_audio.py index 54e32d7e..4fc27694 100644 --- a/facefusion/apis/stream_audio.py +++ b/facefusion/apis/stream_audio.py @@ -9,10 +9,10 @@ import numpy from facefusion import rtc from facefusion.apis.stream_event import create_receive_event from facefusion.codecs import opus_decoder, opus_encoder -from facefusion.types import AudioCodec, AudioFrame, OpusDecoder, RtcPeer, RtcPeerAudio +from facefusion.types import AudioCodec, AudioFrame, Buffer, OpusDecoder, RtcPeer, RtcPeerAudio, Time -def run_audio_encode_loop(rtc_peer : RtcPeer, audio_queue : Queue[Tuple[float, AudioFrame]]) -> None: +def run_audio_encode_loop(rtc_peer : RtcPeer, audio_queue : Queue[Tuple[Time, AudioFrame]]) -> None: temp_audio_time, temp_audio_frame = audio_queue.get() audio_encoder = opus_encoder.create(48000, 2) @@ -28,7 +28,7 @@ def run_audio_encode_loop(rtc_peer : RtcPeer, audio_queue : Queue[Tuple[float, A opus_encoder.destroy(audio_encoder) -def receive_audio_frames(rtc_peer_audio : RtcPeerAudio, audio_queue : Queue[Tuple[float, AudioFrame]]) -> None: +def receive_audio_frames(rtc_peer_audio : RtcPeerAudio, audio_queue : Queue[Tuple[Time, AudioFrame]]) -> None: audio_track = rtc_peer_audio.get('receiver_track') audio_codec = rtc_peer_audio.get('codec') audio_decoder = create_audio_decoder(audio_codec) @@ -42,7 +42,7 @@ def receive_audio_frames(rtc_peer_audio : RtcPeerAudio, audio_queue : Queue[Tupl destroy_audio_decoder(audio_codec, audio_decoder) -def decode_audio_frame(audio_codec : AudioCodec, audio_decoder : OpusDecoder, input_buffer : bytes) -> Optional[bytes]: +def decode_audio_frame(audio_codec : AudioCodec, audio_decoder : OpusDecoder, input_buffer : Buffer) -> Optional[Buffer]: if audio_codec == 'opus': return opus_decoder.decode(audio_decoder, input_buffer, 960, 2) return None @@ -59,8 +59,7 @@ def destroy_audio_decoder(audio_codec : AudioCodec, audio_decoder : OpusDecoder) opus_decoder.destroy(audio_decoder) -#todo: Alias Time for float -def handle_audio_frame(audio_codec : AudioCodec, audio_decoder : OpusDecoder, audio_queue : Queue[Tuple[float, AudioFrame]], track : int, data : ctypes.c_void_p, size : int, info : ctypes.c_void_p, pointer : ctypes.c_void_p) -> None: +def handle_audio_frame(audio_codec : AudioCodec, audio_decoder : OpusDecoder, audio_queue : Queue[Tuple[Time, AudioFrame]], track : int, data : ctypes.c_void_p, size : int, info : ctypes.c_void_p, pointer : ctypes.c_void_p) -> None: audio_buffer = ctypes.string_at(data, size) audio_frame = decode_audio_frame(audio_codec, audio_decoder, audio_buffer) diff --git a/facefusion/apis/stream_manager.py b/facefusion/apis/stream_manager.py index 5f387da9..a4dd32d2 100644 --- a/facefusion/apis/stream_manager.py +++ b/facefusion/apis/stream_manager.py @@ -14,7 +14,7 @@ from facefusion.apis.stream_audio import receive_audio_frames, run_audio_encode_ from facefusion.apis.stream_video import receive_video_frames, run_video_encode_loop from facefusion.audio import create_empty_audio_frame from facefusion.libraries import datachannel as datachannel_module -from facefusion.types import AudioCodec, AudioFrame, PeerConnection, Resolution, RtcPeer, RtcPeerAudio, SdpAnswer, SdpOffer, SessionId, VideoCodec, VisionFrame +from facefusion.types import AudioCodec, AudioFrame, BufferPack, PeerConnection, RtcPeer, RtcPeerAudio, SdpAnswer, SdpOffer, SessionId, Time, VideoCodec, VisionFrame async def process_image(websocket : WebSocket) -> None: @@ -106,9 +106,8 @@ def process_video(session_id : SessionId, sdp_offer : SdpOffer) -> Optional[SdpA def run_peer_loop(session_id : SessionId, rtc_peer : RtcPeer) -> None: execution_thread_count = state_manager.get_item('execution_thread_count') - #todo: is bytes, Resolution not a XXXPointer type - video_queue : Queue[Tuple[float, Future[Tuple[bytes, Resolution]]]] = Queue(maxsize = execution_thread_count) - audio_queue : Queue[Tuple[float, AudioFrame]] = Queue(maxsize = execution_thread_count * 10) + video_queue : Queue[Tuple[Time, Future[BufferPack]]] = Queue(maxsize = execution_thread_count) + audio_queue : Queue[Tuple[Time, AudioFrame]] = Queue(maxsize = execution_thread_count * 10) video_executor = ThreadPoolExecutor(max_workers = execution_thread_count) video_receiver_thread = threading.Thread(target = receive_video_frames, args = (rtc_peer.get('video'), video_queue, video_executor), daemon = True) diff --git a/facefusion/apis/stream_video.py b/facefusion/apis/stream_video.py index dfe2a6e6..451ecf67 100644 --- a/facefusion/apis/stream_video.py +++ b/facefusion/apis/stream_video.py @@ -12,13 +12,15 @@ from facefusion import rtc, streamer from facefusion.apis.stream_event import create_receive_event from facefusion.audio import create_empty_audio_frame from facefusion.codecs import aom_decoder, aom_encoder, vpx_decoder, vpx_encoder -from facefusion.types import AomDecoder, AomEncoder, AomPointer, BitRate, Resolution, RtcPeer, RtcPeerVideo, VideoCodec, VisionFrame, VpxDecoder, VpxEncoder, VpxPointer +from facefusion.types import AomDecoder, AomEncoder, BitRate, Buffer, BufferPack, Resolution, RtcPeer, RtcPeerVideo, Time, VideoCodec, VisionFrame, VpxDecoder, VpxEncoder -def run_video_encode_loop(rtc_peer : RtcPeer, video_queue : Queue[Tuple[float, Future[Tuple[bytes, Resolution]]]]) -> None: +def run_video_encode_loop(rtc_peer : RtcPeer, video_queue : Queue[Tuple[Time, Future[BufferPack]]]) -> None: video_codec = rtc_peer.get('video').get('codec') video_time, video_future = video_queue.get() - video_buffer, video_resolution = video_future.result() + video_pack = video_future.result() + video_buffer = video_pack.get('buffer') + video_resolution = video_pack.get('resolution') if video_buffer: temp_resolution : Resolution = video_resolution @@ -54,13 +56,15 @@ def run_video_encode_loop(rtc_peer : RtcPeer, video_queue : Queue[Tuple[float, F frame_index += 1 video_time, video_future = video_queue.get() - video_buffer, video_resolution = video_future.result() + video_pack = video_future.result() + video_buffer = video_pack.get('buffer') + video_resolution = video_pack.get('resolution') destroy_video_encoder(video_codec, video_encoder) rtc.clear_bitrate(rtc_peer) -def receive_video_frames(rtc_peer_video : RtcPeerVideo, video_queue : Queue[Tuple[float, Future[Tuple[bytes, Resolution]]]], video_executor : ThreadPoolExecutor) -> None: +def receive_video_frames(rtc_peer_video : RtcPeerVideo, video_queue : Queue[Tuple[Time, Future[BufferPack]]], video_executor : ThreadPoolExecutor) -> None: video_track = rtc_peer_video.get('receiver_track') video_codec = rtc_peer_video.get('codec') video_decoder = create_video_decoder(video_codec) @@ -69,17 +73,17 @@ def receive_video_frames(rtc_peer_video : RtcPeerVideo, video_queue : Queue[Tupl receive_event = create_receive_event(video_track, video_frame_handler) receive_event.wait() - empty_future : Future[Tuple[bytes, Resolution]] = Future() - empty_future.set_result((bytes(), (0, 0))) + empty_future : Future[BufferPack] = Future() + empty_future.set_result(BufferPack(buffer = bytes(), resolution = (0, 0))) video_queue.put((0.0, empty_future)) destroy_video_decoder(video_codec, video_decoder) -def process_video_frame(input_vision_frame : VisionFrame) -> Tuple[bytes, Resolution]: +def process_video_frame(input_vision_frame : VisionFrame) -> BufferPack: output_vision_frame = streamer.process_frame(create_empty_audio_frame(), input_vision_frame) output_resolution : Resolution = (output_vision_frame.shape[1], output_vision_frame.shape[0]) output_buffer = cv2.cvtColor(output_vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes() - return output_buffer, output_resolution + return BufferPack(buffer = output_buffer, resolution = output_resolution) def calculate_receiver_bitrate(rtc_peer : RtcPeer, encode_time : float, frame_interval : float) -> BitRate: @@ -107,7 +111,7 @@ def calculate_sender_bitrate(rtc_peer : RtcPeer, bitrate : BitRate) -> BitRate: return bitrate -def decode_video_frame(video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder, input_buffer : bytes) -> Optional[VisionFrame]: +def decode_video_frame(video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder, input_buffer : Buffer) -> Optional[VisionFrame]: if video_codec == 'av1': aom_pointer = aom_decoder.decode(video_decoder, input_buffer) @@ -123,7 +127,7 @@ def decode_video_frame(video_codec : VideoCodec, video_decoder : VpxDecoder | Ao return None -def encode_video_frame(video_codec : VideoCodec, video_encoder : VpxEncoder | AomEncoder, input_buffer : bytes, frame_resolution : Resolution, frame_index : int) -> bytes: +def encode_video_frame(video_codec : VideoCodec, video_encoder : VpxEncoder | AomEncoder, input_buffer : Buffer, frame_resolution : Resolution, frame_index : int) -> Buffer: if video_codec == 'av1': return aom_encoder.encode(video_encoder, input_buffer, frame_resolution, frame_index) @@ -133,7 +137,7 @@ def encode_video_frame(video_codec : VideoCodec, video_encoder : VpxEncoder | Ao return bytes() -def normalize_vision_frame(frame_pointer : AomPointer | VpxPointer) -> VisionFrame: +def normalize_vision_frame(frame_pointer : BufferPack) -> VisionFrame: frame_width, frame_height = frame_pointer.get('resolution') vision_frame = numpy.frombuffer(frame_pointer.get('buffer'), dtype = numpy.uint8).reshape((frame_height * 3 // 2, frame_width)) return cv2.cvtColor(vision_frame, cv2.COLOR_YUV2BGR_I420) @@ -196,7 +200,7 @@ def update_video_encoder_bitrate(video_codec : VideoCodec, video_encoder : VpxEn #todo: we can remove the dead args or pass audio buffer -def handle_video_frame(video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder, video_queue : Queue[Tuple[float, Future[Tuple[bytes, Resolution]]]], video_executor : ThreadPoolExecutor, track : int, data : ctypes.c_void_p, size : int, info : ctypes.c_void_p, pointer : ctypes.c_void_p) -> None: +def handle_video_frame(video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder, video_queue : Queue[Tuple[Time, Future[BufferPack]]], video_executor : ThreadPoolExecutor, track : int, data : ctypes.c_void_p, size : int, info : ctypes.c_void_p, pointer : ctypes.c_void_p) -> None: video_buffer = ctypes.string_at(data, size) vision_frame = decode_video_frame(video_codec, video_decoder, video_buffer) diff --git a/facefusion/codecs/aom_decoder.py b/facefusion/codecs/aom_decoder.py index 1c5ea56f..113f6010 100644 --- a/facefusion/codecs/aom_decoder.py +++ b/facefusion/codecs/aom_decoder.py @@ -3,7 +3,7 @@ import struct from typing import Optional from facefusion.libraries import aom as aom_module -from facefusion.types import AomDecoder, AomPointer +from facefusion.types import AomDecoder, Buffer, BufferPack def create(thread_count : int) -> Optional[AomDecoder]: @@ -23,7 +23,7 @@ def create(thread_count : int) -> Optional[AomDecoder]: return None -def decode(aom_decoder : AomDecoder, input_buffer : bytes) -> Optional[AomPointer]: +def decode(aom_decoder : AomDecoder, input_buffer : Buffer) -> Optional[BufferPack]: aom_library = aom_module.create_static_library() if aom_library and input_buffer: @@ -37,7 +37,7 @@ def decode(aom_decoder : AomDecoder, input_buffer : bytes) -> Optional[AomPointe frame_width = ctypes.c_uint.from_address(address + 28).value & ~1 frame_height = ctypes.c_uint.from_address(address + 32).value & ~1 - return AomPointer( + return BufferPack( buffer = collect(address, frame_width, frame_height), resolution = (frame_width, frame_height) ) @@ -45,7 +45,7 @@ def decode(aom_decoder : AomDecoder, input_buffer : bytes) -> Optional[AomPointe return None -def collect(address : int, frame_width : int, frame_height : int) -> bytes: +def collect(address : int, frame_width : int, frame_height : int) -> Buffer: output_parts = [] for index in range(3): diff --git a/facefusion/codecs/aom_encoder.py b/facefusion/codecs/aom_encoder.py index e696bf66..09e42ac2 100644 --- a/facefusion/codecs/aom_encoder.py +++ b/facefusion/codecs/aom_encoder.py @@ -3,7 +3,7 @@ import struct from typing import Optional from facefusion.libraries import aom as aom_module -from facefusion.types import AomEncoder, BitRate, Resolution +from facefusion.types import AomEncoder, BitRate, Buffer, Resolution def create(frame_resolution : Resolution, bitrate : BitRate, thread_count : int, cpu_count : int) -> Optional[AomEncoder]: @@ -34,7 +34,7 @@ def create(frame_resolution : Resolution, bitrate : BitRate, thread_count : int, return None -def encode(aom_encoder : AomEncoder, input_buffer : bytes, frame_resolution : Resolution, frame_index : int) -> bytes: +def encode(aom_encoder : AomEncoder, input_buffer : Buffer, frame_resolution : Resolution, frame_index : int) -> Buffer: aom_library = aom_module.create_static_library() output_buffer = bytes() @@ -48,7 +48,7 @@ def encode(aom_encoder : AomEncoder, input_buffer : bytes, frame_resolution : Re return output_buffer -def collect(aom_encoder : AomEncoder) -> bytes: +def collect(aom_encoder : AomEncoder) -> Buffer: aom_library = aom_module.create_static_library() output_parts = [] diff --git a/facefusion/codecs/opus_decoder.py b/facefusion/codecs/opus_decoder.py index 3691f436..f5b19197 100644 --- a/facefusion/codecs/opus_decoder.py +++ b/facefusion/codecs/opus_decoder.py @@ -2,7 +2,7 @@ import ctypes from typing import Optional from facefusion.libraries import opus as opus_module -from facefusion.types import OpusDecoder +from facefusion.types import Buffer, OpusDecoder def create(sample_rate : int, channel_total : int) -> Optional[OpusDecoder]: @@ -14,7 +14,7 @@ def create(sample_rate : int, channel_total : int) -> Optional[OpusDecoder]: return None -def decode(opus_decoder : OpusDecoder, input_buffer : bytes, frame_size : int, channel_total : int) -> bytes: +def decode(opus_decoder : OpusDecoder, input_buffer : Buffer, frame_size : int, channel_total : int) -> Buffer: opus_library = opus_module.create_static_library() output_buffer = bytes() diff --git a/facefusion/codecs/opus_encoder.py b/facefusion/codecs/opus_encoder.py index 3e431db7..6af7cb73 100644 --- a/facefusion/codecs/opus_encoder.py +++ b/facefusion/codecs/opus_encoder.py @@ -2,7 +2,7 @@ import ctypes from typing import Optional from facefusion.libraries import opus as opus_module -from facefusion.types import OpusEncoder +from facefusion.types import Buffer, OpusEncoder def create(sample_rate : int, channel_total : int) -> Optional[OpusEncoder]: @@ -14,7 +14,7 @@ def create(sample_rate : int, channel_total : int) -> Optional[OpusEncoder]: return None -def encode(opus_encoder : OpusEncoder, input_buffer : bytes, frame_size : int) -> bytes: +def encode(opus_encoder : OpusEncoder, input_buffer : Buffer, frame_size : int) -> Buffer: opus_library = opus_module.create_static_library() output_buffer = bytes() diff --git a/facefusion/codecs/vpx_decoder.py b/facefusion/codecs/vpx_decoder.py index c6305f71..1bcdb499 100644 --- a/facefusion/codecs/vpx_decoder.py +++ b/facefusion/codecs/vpx_decoder.py @@ -3,7 +3,7 @@ import struct from typing import Optional from facefusion.libraries import vpx as vpx_module -from facefusion.types import VpxDecoder, VpxPointer, VxpVideoCodec +from facefusion.types import Buffer, BufferPack, VpxDecoder, VxpVideoCodec def create(video_codec : VxpVideoCodec, thread_count : int) -> Optional[VpxDecoder]: @@ -27,7 +27,7 @@ def create(video_codec : VxpVideoCodec, thread_count : int) -> Optional[VpxDecod return None -def decode(vpx_decoder : VpxDecoder, input_buffer : bytes) -> Optional[VpxPointer]: +def decode(vpx_decoder : VpxDecoder, input_buffer : Buffer) -> Optional[BufferPack]: vpx_library = vpx_module.create_static_library() if vpx_library and input_buffer: @@ -41,7 +41,7 @@ def decode(vpx_decoder : VpxDecoder, input_buffer : bytes) -> Optional[VpxPointe frame_width = ctypes.c_uint.from_address(address + 24).value & ~1 frame_height = ctypes.c_uint.from_address(address + 28).value & ~1 - return VpxPointer( + return BufferPack( buffer = collect(address, frame_width, frame_height), resolution = (frame_width, frame_height) ) @@ -49,7 +49,7 @@ def decode(vpx_decoder : VpxDecoder, input_buffer : bytes) -> Optional[VpxPointe return None -def collect(address : int, frame_width : int, frame_height : int) -> bytes: +def collect(address : int, frame_width : int, frame_height : int) -> Buffer: output_parts = [] for index in range(3): diff --git a/facefusion/codecs/vpx_encoder.py b/facefusion/codecs/vpx_encoder.py index ef3da620..8deab8d8 100644 --- a/facefusion/codecs/vpx_encoder.py +++ b/facefusion/codecs/vpx_encoder.py @@ -3,7 +3,7 @@ import struct from typing import Optional from facefusion.libraries import vpx as vpx_module -from facefusion.types import BitRate, Resolution, VpxEncoder, VxpVideoCodec +from facefusion.types import BitRate, Buffer, Resolution, VpxEncoder, VxpVideoCodec def create(video_codec : VxpVideoCodec, frame_resolution : Resolution, bitrate : BitRate, thread_count : int, cpu_count : int) -> Optional[VpxEncoder]: @@ -44,7 +44,7 @@ def create(video_codec : VxpVideoCodec, frame_resolution : Resolution, bitrate : return None -def encode(vpx_encoder : VpxEncoder, input_buffer : bytes, frame_resolution : Resolution, frame_index : int) -> bytes: +def encode(vpx_encoder : VpxEncoder, input_buffer : Buffer, frame_resolution : Resolution, frame_index : int) -> Buffer: vpx_library = vpx_module.create_static_library() output_buffer = bytes() @@ -58,7 +58,7 @@ def encode(vpx_encoder : VpxEncoder, input_buffer : bytes, frame_resolution : Re return output_buffer -def collect(vpx_encoder : VpxEncoder) -> bytes: +def collect(vpx_encoder : VpxEncoder) -> Buffer: vpx_library = vpx_module.create_static_library() output_parts = [] diff --git a/facefusion/download.py b/facefusion/download.py index 8ba0c1e7..5f4c7152 100644 --- a/facefusion/download.py +++ b/facefusion/download.py @@ -10,10 +10,10 @@ import facefusion.choices from facefusion import curl_builder, logger, process_manager, state_manager, translator from facefusion.filesystem import get_file_name, get_file_size, is_file, remove_file from facefusion.hash_helper import validate_hash -from facefusion.types import Command, DownloadProvider, DownloadSet +from facefusion.types import Buffer, Command, DownloadProvider, DownloadSet -def open_curl(commands : List[Command]) -> subprocess.Popen[bytes]: +def open_curl(commands : List[Command]) -> subprocess.Popen[Buffer]: commands = curl_builder.run(commands) return subprocess.Popen(commands, stdin = subprocess.PIPE, stdout = subprocess.PIPE) diff --git a/facefusion/ffmpeg.py b/facefusion/ffmpeg.py index e0769d65..886c0fd7 100644 --- a/facefusion/ffmpeg.py +++ b/facefusion/ffmpeg.py @@ -10,11 +10,11 @@ import facefusion.choices from facefusion import ffmpeg_builder, logger, process_manager, state_manager, translator from facefusion.filesystem import get_file_format, remove_file from facefusion.temp_helper import get_temp_file_path, get_temp_frames_pattern -from facefusion.types import ApiSecurityStrategy, AudioBuffer, AudioEncoder, Command, EncoderSet, Fps, Resolution, SampleRate, UpdateProgress, VideoEncoder, VideoFormat +from facefusion.types import ApiSecurityStrategy, AudioEncoder, Buffer, Command, EncoderSet, Fps, Resolution, SampleRate, UpdateProgress, VideoEncoder, VideoFormat from facefusion.vision import detect_video_duration, detect_video_fps, pack_resolution, predict_video_frame_total -def run_ffmpeg_with_progress(commands : List[Command], update_progress : UpdateProgress) -> subprocess.Popen[bytes]: +def run_ffmpeg_with_progress(commands : List[Command], update_progress : UpdateProgress) -> subprocess.Popen[Buffer]: log_level = state_manager.get_item('log_level') commands.extend(ffmpeg_builder.set_progress()) commands.extend(ffmpeg_builder.cast_stream()) @@ -45,14 +45,14 @@ def update_progress(progress : tqdm, frame_number : int) -> None: progress.update(frame_number - progress.n) -def run_ffmpeg_with_pipe(commands : List[Command], file_content : bytes) -> subprocess.Popen[bytes]: +def run_ffmpeg_with_pipe(commands : List[Command], file_content : Buffer) -> subprocess.Popen[Buffer]: commands = ffmpeg_builder.run(commands) process = subprocess.Popen(commands, stdin = subprocess.PIPE, stderr = subprocess.PIPE, stdout = subprocess.PIPE) process.communicate(input = file_content) return process -def run_ffmpeg(commands : List[Command]) -> subprocess.Popen[bytes]: +def run_ffmpeg(commands : List[Command]) -> subprocess.Popen[Buffer]: log_level = state_manager.get_item('log_level') commands = ffmpeg_builder.run(commands) process = subprocess.Popen(commands, stderr = subprocess.PIPE, stdout = subprocess.PIPE) @@ -72,12 +72,12 @@ def run_ffmpeg(commands : List[Command]) -> subprocess.Popen[bytes]: return process -def open_ffmpeg(commands : List[Command]) -> subprocess.Popen[bytes]: +def open_ffmpeg(commands : List[Command]) -> subprocess.Popen[Buffer]: commands = ffmpeg_builder.run(commands) return subprocess.Popen(commands, stdin = subprocess.PIPE, stdout = subprocess.PIPE) -def log_debug(process : subprocess.Popen[bytes]) -> None: +def log_debug(process : subprocess.Popen[Buffer]) -> None: _, stderr = process.communicate() errors = stderr.decode().splitlines() @@ -181,7 +181,7 @@ def finalize_image(output_path : str, output_image_resolution : Resolution) -> b return run_ffmpeg(commands).returncode == 0 -def read_audio_buffer(target_path : str, audio_sample_rate : SampleRate, audio_sample_size : int, audio_channel_total : int) -> Optional[AudioBuffer]: +def read_audio_buffer(target_path : str, audio_sample_rate : SampleRate, audio_sample_size : int, audio_channel_total : int) -> Optional[Buffer]: commands = ffmpeg_builder.chain( ffmpeg_builder.set_input(target_path), ffmpeg_builder.ignore_video_stream(), @@ -301,7 +301,7 @@ def concat_video(output_path : str, temp_output_paths : List[str]) -> bool: return process.returncode == 0 -def sanitize_audio(file_content : bytes, asset_path : str, security_strategy : ApiSecurityStrategy) -> bool: +def sanitize_audio(file_content : Buffer, asset_path : str, security_strategy : ApiSecurityStrategy) -> bool: if security_strategy == 'strict': commands = ffmpeg_builder.chain( ffmpeg_builder.set_input('pipe:0'), @@ -320,7 +320,7 @@ def sanitize_audio(file_content : bytes, asset_path : str, security_strategy : A return run_ffmpeg_with_pipe(commands, file_content).returncode == 0 -def sanitize_image(file_content : bytes, asset_path : str) -> bool: +def sanitize_image(file_content : Buffer, asset_path : str) -> bool: commands = ffmpeg_builder.chain( ffmpeg_builder.set_input('pipe:0'), ffmpeg_builder.deep_copy_image(), @@ -330,7 +330,7 @@ def sanitize_image(file_content : bytes, asset_path : str) -> bool: return run_ffmpeg_with_pipe(commands, file_content).returncode == 0 -def sanitize_video(file_content : bytes, asset_path : str, security_strategy : ApiSecurityStrategy) -> bool: +def sanitize_video(file_content : Buffer, asset_path : str, security_strategy : ApiSecurityStrategy) -> bool: if security_strategy == 'strict': available_video_encoders = get_static_available_encoder_set().get('video') commands = ffmpeg_builder.chain( diff --git a/facefusion/ffprobe.py b/facefusion/ffprobe.py index f41805a1..3d01365c 100644 --- a/facefusion/ffprobe.py +++ b/facefusion/ffprobe.py @@ -2,10 +2,10 @@ import subprocess from typing import Dict, List from facefusion import ffprobe_builder -from facefusion.types import AudioMetadata, Command, Fps, VideoMetadata +from facefusion.types import AudioMetadata, Buffer, Command, Fps, VideoMetadata -def run_ffprobe(commands : List[Command]) -> subprocess.Popen[bytes]: +def run_ffprobe(commands : List[Command]) -> subprocess.Popen[Buffer]: commands = ffprobe_builder.run(commands) return subprocess.Popen(commands, stderr = subprocess.PIPE, stdout = subprocess.PIPE) diff --git a/facefusion/hash_helper.py b/facefusion/hash_helper.py index d3d84d81..7a866e9b 100644 --- a/facefusion/hash_helper.py +++ b/facefusion/hash_helper.py @@ -3,10 +3,11 @@ import zlib from typing import Optional from facefusion.filesystem import get_file_name, is_file +from facefusion.types import Buffer -def create_hash(content : bytes) -> str: - return format(zlib.crc32(content), '08x') +def create_hash(buffer : Buffer) -> str: + return format(zlib.crc32(buffer), '08x') def validate_hash(validate_path : str) -> bool: diff --git a/facefusion/rtc.py b/facefusion/rtc.py index 0097c46e..743d1138 100644 --- a/facefusion/rtc.py +++ b/facefusion/rtc.py @@ -2,7 +2,7 @@ import ctypes from typing import List, Optional from facefusion.libraries import datachannel as datachannel_module -from facefusion.types import AudioCodec, BitRate, MediaDirection, PeerConnection, RtcAudioTrack, RtcPeer, RtcTrackInit, RtcVideoTrack, SdpAnswer, SdpOffer, VideoCodec +from facefusion.types import AudioCodec, BitRate, Buffer, MediaDirection, PeerConnection, RtcAudioTrack, RtcPeer, RtcTrackInit, RtcVideoTrack, SdpAnswer, SdpOffer, VideoCodec def create_peer_connection() -> PeerConnection: @@ -47,7 +47,7 @@ def set_remote_description(peer_connection : PeerConnection, sdp_offer : SdpOffe return None -def send_video(rtc_peer : RtcPeer, video_buffer : bytes, video_timestamp : int) -> None: +def send_video(rtc_peer : RtcPeer, video_buffer : Buffer, video_timestamp : int) -> None: datachannel_library = datachannel_module.create_static_library() if rtc_peer.get('video'): @@ -61,7 +61,7 @@ def send_video(rtc_peer : RtcPeer, video_buffer : bytes, video_timestamp : int) return None -def send_audio(rtc_peer : RtcPeer, audio_buffer : bytes, audio_timestamp : int) -> None: +def send_audio(rtc_peer : RtcPeer, audio_buffer : Buffer, audio_timestamp : int) -> None: datachannel_library = datachannel_module.create_static_library() if rtc_peer.get('audio'): diff --git a/facefusion/streamer.py b/facefusion/streamer.py index e8cf1b75..97357dce 100644 --- a/facefusion/streamer.py +++ b/facefusion/streamer.py @@ -14,7 +14,7 @@ from facefusion.content_analyser import analyse_stream from facefusion.ffmpeg import open_ffmpeg from facefusion.filesystem import is_directory from facefusion.processors.core import get_processors_modules -from facefusion.types import AudioFrame, Fps, StreamMode, VisionFrame +from facefusion.types import AudioFrame, Buffer, Fps, StreamMode, VisionFrame from facefusion.vision import extract_vision_mask, read_static_images @@ -69,7 +69,7 @@ def process_frame(stream_audio_frame : AudioFrame, stream_vision_frame : VisionF return temp_vision_frame -def open_stream(stream_mode : StreamMode, stream_resolution : str, stream_fps : Fps) -> subprocess.Popen[bytes]: +def open_stream(stream_mode : StreamMode, stream_resolution : str, stream_fps : Fps) -> subprocess.Popen[Buffer]: commands = ffmpeg_builder.chain( ffmpeg_builder.capture_video(), ffmpeg_builder.set_media_resolution(stream_resolution), diff --git a/facefusion/types.py b/facefusion/types.py index 0296ef93..6b7fc0c5 100755 --- a/facefusion/types.py +++ b/facefusion/types.py @@ -76,7 +76,6 @@ Matrix : TypeAlias = NDArray[Any] Anchors : TypeAlias = NDArray[Any] Translation : TypeAlias = NDArray[Any] -AudioBuffer : TypeAlias = bytes Audio : TypeAlias = NDArray[Any] AudioChunk : TypeAlias = NDArray[Any] AudioFrame : TypeAlias = NDArray[Any] @@ -90,6 +89,7 @@ BitRate : TypeAlias = int SampleRate : TypeAlias = int Fps : TypeAlias = float Duration : TypeAlias = float +Time : TypeAlias = float Color : TypeAlias = Tuple[int, int, int, int] Padding : TypeAlias = Tuple[int, int, int, int] Margin : TypeAlias = Tuple[int, int, int, int] @@ -111,14 +111,10 @@ OpusDecoder : TypeAlias = ctypes.c_void_p VpxEncoder : TypeAlias = ctypes.Array[ctypes.c_char] VpxDecoder : TypeAlias = ctypes.Array[ctypes.c_char] -AomPointer = TypedDict('AomPointer', +Buffer : TypeAlias = bytes +BufferPack = TypedDict('BufferPack', { - 'buffer' : bytes, - 'resolution' : Resolution -}) -VpxPointer = TypedDict('VpxPointer', -{ - 'buffer' : bytes, + 'buffer' : Buffer, 'resolution' : Resolution }) diff --git a/tests/test_api_stream_video.py b/tests/test_api_stream_video.py index 28eea0d3..d741a324 100644 --- a/tests/test_api_stream_video.py +++ b/tests/test_api_stream_video.py @@ -18,7 +18,7 @@ from facefusion.common_helper import is_linux, is_macos, is_windows from facefusion.download import conditional_download from facefusion.hash_helper import create_hash from facefusion.libraries import aom as aom_module, datachannel as datachannel_module, vpx as vpx_module -from facefusion.types import FrameHandler, Resolution, RtcPeer, RtcPeerVideo, VideoCodec +from facefusion.types import BufferPack, FrameHandler, RtcPeer, RtcPeerVideo, Time, VideoCodec from facefusion.vision import read_video_frame from .assert_helper import get_test_example_file, get_test_examples_directory @@ -67,7 +67,7 @@ def test_run_video_encode_loop(video_codec : VideoCodec, payload_type : int) -> 'receiver_bitrate': ctypes.c_uint(8000) } - video_queue : Queue[Tuple[float, Future[Tuple[bytes, Resolution]]]] = Queue(maxsize = 30) + video_queue : Queue[Tuple[Time, Future[BufferPack]]] = Queue(maxsize = 30) with ThreadPoolExecutor(max_workers = 1) as executor: video_queue.put((0.1, executor.submit(process_video_frame, video_frame))) @@ -75,8 +75,8 @@ def test_run_video_encode_loop(video_codec : VideoCodec, payload_type : int) -> with patch('facefusion.apis.stream_video.rtc.send_video') as send_video_mock: encode_loop_thread = threading.Thread(target = run_video_encode_loop, args = (rtc_peer, video_queue), daemon = True) encode_loop_thread.start() - empty_future : Future[Tuple[bytes, Resolution]] = Future() - empty_future.set_result((bytes(), (0, 0))) + empty_future : Future[BufferPack] = Future() + empty_future.set_result(BufferPack(buffer = bytes(), resolution = (0, 0))) video_queue.put((0.0, empty_future)) encode_loop_thread.join(timeout = 5.0) @@ -99,7 +99,7 @@ def test_run_video_encode_loop(video_codec : VideoCodec, payload_type : int) -> @pytest.mark.parametrize('video_codec', [ 'av1', 'vp8', 'vp9' ]) def test_receive_video_frames(video_codec : VideoCodec) -> None: video_frame = read_video_frame(get_test_example_file('target-240p.mp4')) - video_queue : Queue[Tuple[float, Future[Tuple[bytes, Resolution]]]] = Queue(maxsize = 30) + video_queue : Queue[Tuple[Time, Future[BufferPack]]] = Queue(maxsize = 30) datachannel_mock = MagicMock() ready_event = threading.Event() @@ -108,7 +108,7 @@ def test_receive_video_frames(video_codec : VideoCodec) -> None: with ThreadPoolExecutor(max_workers = 1) as executor: with patch('facefusion.libraries.datachannel.create_static_library', return_value = datachannel_mock): with patch('facefusion.apis.stream_video.decode_video_frame', return_value = video_frame): - with patch('facefusion.apis.stream_video.process_video_frame', return_value = (video_frame.tobytes(), (426, 226))): + with patch('facefusion.apis.stream_video.process_video_frame', return_value = BufferPack(buffer = video_frame.tobytes(), resolution = (426, 226))): rtc_peer_video : RtcPeerVideo =\ { 'sender_track': 0, @@ -123,7 +123,7 @@ def test_receive_video_frames(video_codec : VideoCodec) -> None: video_receiver_thread.join(timeout = 5.0) _, video_future = video_queue.get_nowait() - video_buffer, _ = video_future.result() + video_buffer = video_future.result().get('buffer') if is_linux() or is_windows(): assert create_hash(video_buffer) == 'a17439db' @@ -262,15 +262,15 @@ def test_update_video_encoder_bitrate(video_codec : VideoCodec) -> None: def test_handle_video_frame(video_codec : VideoCodec) -> None: video_frame = read_video_frame(get_test_example_file('target-240p.mp4')) video_decoder = create_video_decoder(video_codec) - video_queue : Queue[Tuple[float, Future[Tuple[bytes, Resolution]]]] = Queue(maxsize = 30) + video_queue : Queue[Tuple[Time, Future[BufferPack]]] = Queue(maxsize = 30) with ThreadPoolExecutor(max_workers = 1) as executor: with patch('facefusion.apis.stream_video.decode_video_frame', return_value = video_frame): - with patch('facefusion.apis.stream_video.process_video_frame', return_value = (video_frame.tobytes(), (426, 226))): + with patch('facefusion.apis.stream_video.process_video_frame', return_value = BufferPack(buffer = video_frame.tobytes(), resolution = (426, 226))): handle_video_frame(video_codec, video_decoder, video_queue, executor, 0, ctypes.c_void_p(), 1, ctypes.c_void_p(), ctypes.c_void_p()) _, video_future = video_queue.get_nowait() - video_buffer, _ = video_future.result() + video_buffer = video_future.result().get('buffer') if is_linux() or is_windows(): assert create_hash(video_buffer) == 'a17439db'