From a2aedc8814af1f7231f99706eee0d57d39ca6e59 Mon Sep 17 00:00:00 2001 From: Harisreedhar <46858047+harisreedhar@users.noreply.github.com> Date: Tue, 28 Apr 2026 23:04:27 +0530 Subject: [PATCH] Add stream helper utilities and IVF frame iterator (#1082) * Add stream helper utilities and IVF frame iterator * fix lint * some cosmetics * fix lint * changes * improve test * improve types and test * add todo for better bitrate calculation --- facefusion/apis/stream_helper.py | 68 ++++++++++++++++++++++++++++++- facefusion/types.py | 2 + tests/test_stream_helper.py | 69 ++++++++++++++++++++++++++++++++ 3 files changed, 138 insertions(+), 1 deletion(-) create mode 100644 tests/test_stream_helper.py diff --git a/facefusion/apis/stream_helper.py b/facefusion/apis/stream_helper.py index 14b9fdb4..e235084d 100644 --- a/facefusion/apis/stream_helper.py +++ b/facefusion/apis/stream_helper.py @@ -1,11 +1,18 @@ import asyncio -from typing import Tuple +import math +import os +import subprocess +from typing import Iterator, Optional, Tuple, cast from aiortc import MediaStreamTrack, QueuedVideoStreamTrack, RTCPeerConnection, RTCRtpSender from aiortc.mediastreams import MediaStreamError from av import VideoFrame +from starlette.datastructures import Headers +from starlette.types import Scope +from facefusion.common_helper import is_linux, is_macos from facefusion.streamer import process_vision_frame +from facefusion.types import Resolution, StreamBuffer, WebSocketStreamMode def process_stream_frame(target_stream_frame : VideoFrame) -> VideoFrame: @@ -42,3 +49,62 @@ def on_video_track(rtc_connection : RTCPeerConnection, output_track : QueuedVide if target_track.kind == 'video': asyncio.create_task(process_and_enqueue(target_track, output_track)) + + +def calculate_bitrate(resolution : Resolution) -> int: # TODO : improve the bitrate calculation + pixel_total = resolution[0] * resolution[1] + bitrate_factor = 3500 / math.sqrt(1920 * 1080) + return max(400, round(math.sqrt(pixel_total) * bitrate_factor)) + + +def calculate_buffer_size(resolution : Resolution) -> int: + return calculate_bitrate(resolution) * 2 + + +def get_websocket_stream_mode(scope : Scope) -> Optional[WebSocketStreamMode]: + protocol_header = Headers(scope = scope).get('Sec-WebSocket-Protocol') + + if protocol_header: + for protocol in protocol_header.split(','): + websocket_stream_mode = protocol.strip() + + if websocket_stream_mode in [ 'image', 'video' ]: + return cast(WebSocketStreamMode, websocket_stream_mode) + + return None + + +def read_pipe_buffer(pipe_handle : int, size : int) -> Optional[bytes]: + byte_buffer = bytearray() + frame_data = os.read(pipe_handle, size - len(byte_buffer)) + + while frame_data: + byte_buffer += frame_data + + if len(byte_buffer) == size: + return bytes(byte_buffer) + + frame_data = os.read(pipe_handle, size - len(byte_buffer)) + + return None + + +def forward_stream_frame(process : subprocess.Popen[bytes]) -> Iterator[StreamBuffer]: + pipe_handle = process.stdout.fileno() + + if is_linux() or is_macos(): + os.set_blocking(pipe_handle, True) + + header = read_pipe_buffer(pipe_handle, 32) + + if header: + frame_header = read_pipe_buffer(pipe_handle, 12) + + while frame_header: + frame_size = int.from_bytes(frame_header[0:4], 'little') + frame_data = read_pipe_buffer(pipe_handle, frame_size) + + if frame_data: + yield frame_data + + frame_header = read_pipe_buffer(pipe_handle, 12) diff --git a/facefusion/types.py b/facefusion/types.py index fb24d50a..f75b02c6 100755 --- a/facefusion/types.py +++ b/facefusion/types.py @@ -264,6 +264,8 @@ BenchmarkCycleSet = TypedDict('BenchmarkCycleSet', WebcamMode = Literal['inline', 'udp', 'v4l2'] StreamMode = Literal['udp', 'v4l2'] +WebSocketStreamMode = Literal['image', 'video'] +StreamBuffer : TypeAlias = bytes RtcOfferSet = TypedDict('RtcOfferSet', { diff --git a/tests/test_stream_helper.py b/tests/test_stream_helper.py new file mode 100644 index 00000000..efb0be28 --- /dev/null +++ b/tests/test_stream_helper.py @@ -0,0 +1,69 @@ +import os +import subprocess + +from facefusion import ffmpeg_builder +from facefusion.apis.stream_helper import calculate_bitrate, calculate_buffer_size, forward_stream_frame, get_websocket_stream_mode, read_pipe_buffer +from facefusion.vision import pack_resolution + + +def make_scope(protocol : str) -> dict[str, object]: + return\ + { + 'type': 'websocket', + 'headers': [ (b'sec-websocket-protocol', protocol.encode()) ] + } + + +def test_calculate_bitrate() -> None: + assert calculate_bitrate((320, 240)) == 674 + assert calculate_bitrate((640, 480)) == 1347 + assert calculate_bitrate((1280, 720)) == 2333 + assert calculate_bitrate((1920, 1080)) == 3500 + assert calculate_bitrate((3840, 2160)) == 7000 + + +def test_calculate_buffer_size() -> None: + assert calculate_buffer_size((320, 240)) == 1348 + assert calculate_buffer_size((640, 480)) == 2694 + assert calculate_buffer_size((1280, 720)) == 4666 + assert calculate_buffer_size((1920, 1080)) == 7000 + assert calculate_buffer_size((3840, 2160)) == 14000 + + +def test_get_websocket_stream_mode() -> None: + assert get_websocket_stream_mode(make_scope('image')) == 'image' + assert get_websocket_stream_mode(make_scope('video')) == 'video' + + +def test_read_pipe_buffer() -> None: + read_fd, write_fd = os.pipe() + os.write(write_fd, b'abcdefgh') + os.close(write_fd) + + assert read_pipe_buffer(read_fd, 4) == b'abcd' + assert read_pipe_buffer(read_fd, 4) == b'efgh' + assert read_pipe_buffer(read_fd, 1) is None + + os.close(read_fd) + + +def test_forward_frames() -> None: + resolution = (320, 240) + frame_size = resolution[0] * resolution[1] * 3 + commands = ffmpeg_builder.run(ffmpeg_builder.chain( + ffmpeg_builder.capture_video(), + ffmpeg_builder.set_media_resolution(pack_resolution(resolution)), + ffmpeg_builder.set_input_fps(30), + ffmpeg_builder.set_input('-'), + ffmpeg_builder.set_video_encoder('libvpx'), + ffmpeg_builder.set_encoder_deadline('realtime'), + ffmpeg_builder.set_stream_quality(400), + ffmpeg_builder.set_muxer('ivf'), + ffmpeg_builder.set_output('-') + )) + encoder = subprocess.Popen(commands, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE) + encoder.stdin.write(bytes(frame_size)) + encoder.stdin.close() + + for stream_buffer in forward_stream_frame(encoder): + assert 0 < len(stream_buffer) < frame_size