Add stream helper utilities and IVF frame iterator (#1082)

* Add stream helper utilities and IVF frame iterator

* fix lint

* some cosmetics

* fix lint

* changes

* improve test

* improve types and test

* add todo for better bitrate calculation
This commit is contained in:
Harisreedhar
2026-04-28 23:04:27 +05:30
committed by henryruhs
parent cc8bfc1af4
commit a2aedc8814
3 changed files with 138 additions and 1 deletions
+67 -1
View File
@@ -1,11 +1,18 @@
import asyncio
from typing import Tuple
import math
import os
import subprocess
from typing import Iterator, Optional, Tuple, cast
from aiortc import MediaStreamTrack, QueuedVideoStreamTrack, RTCPeerConnection, RTCRtpSender
from aiortc.mediastreams import MediaStreamError
from av import VideoFrame
from starlette.datastructures import Headers
from starlette.types import Scope
from facefusion.common_helper import is_linux, is_macos
from facefusion.streamer import process_vision_frame
from facefusion.types import Resolution, StreamBuffer, WebSocketStreamMode
def process_stream_frame(target_stream_frame : VideoFrame) -> VideoFrame:
@@ -42,3 +49,62 @@ def on_video_track(rtc_connection : RTCPeerConnection, output_track : QueuedVide
if target_track.kind == 'video':
asyncio.create_task(process_and_enqueue(target_track, output_track))
def calculate_bitrate(resolution : Resolution) -> int: # TODO : improve the bitrate calculation
pixel_total = resolution[0] * resolution[1]
bitrate_factor = 3500 / math.sqrt(1920 * 1080)
return max(400, round(math.sqrt(pixel_total) * bitrate_factor))
def calculate_buffer_size(resolution : Resolution) -> int:
return calculate_bitrate(resolution) * 2
def get_websocket_stream_mode(scope : Scope) -> Optional[WebSocketStreamMode]:
protocol_header = Headers(scope = scope).get('Sec-WebSocket-Protocol')
if protocol_header:
for protocol in protocol_header.split(','):
websocket_stream_mode = protocol.strip()
if websocket_stream_mode in [ 'image', 'video' ]:
return cast(WebSocketStreamMode, websocket_stream_mode)
return None
def read_pipe_buffer(pipe_handle : int, size : int) -> Optional[bytes]:
byte_buffer = bytearray()
frame_data = os.read(pipe_handle, size - len(byte_buffer))
while frame_data:
byte_buffer += frame_data
if len(byte_buffer) == size:
return bytes(byte_buffer)
frame_data = os.read(pipe_handle, size - len(byte_buffer))
return None
def forward_stream_frame(process : subprocess.Popen[bytes]) -> Iterator[StreamBuffer]:
pipe_handle = process.stdout.fileno()
if is_linux() or is_macos():
os.set_blocking(pipe_handle, True)
header = read_pipe_buffer(pipe_handle, 32)
if header:
frame_header = read_pipe_buffer(pipe_handle, 12)
while frame_header:
frame_size = int.from_bytes(frame_header[0:4], 'little')
frame_data = read_pipe_buffer(pipe_handle, frame_size)
if frame_data:
yield frame_data
frame_header = read_pipe_buffer(pipe_handle, 12)
+2
View File
@@ -264,6 +264,8 @@ BenchmarkCycleSet = TypedDict('BenchmarkCycleSet',
WebcamMode = Literal['inline', 'udp', 'v4l2']
StreamMode = Literal['udp', 'v4l2']
WebSocketStreamMode = Literal['image', 'video']
StreamBuffer : TypeAlias = bytes
RtcOfferSet = TypedDict('RtcOfferSet',
{
+69
View File
@@ -0,0 +1,69 @@
import os
import subprocess
from facefusion import ffmpeg_builder
from facefusion.apis.stream_helper import calculate_bitrate, calculate_buffer_size, forward_stream_frame, get_websocket_stream_mode, read_pipe_buffer
from facefusion.vision import pack_resolution
def make_scope(protocol : str) -> dict[str, object]:
return\
{
'type': 'websocket',
'headers': [ (b'sec-websocket-protocol', protocol.encode()) ]
}
def test_calculate_bitrate() -> None:
assert calculate_bitrate((320, 240)) == 674
assert calculate_bitrate((640, 480)) == 1347
assert calculate_bitrate((1280, 720)) == 2333
assert calculate_bitrate((1920, 1080)) == 3500
assert calculate_bitrate((3840, 2160)) == 7000
def test_calculate_buffer_size() -> None:
assert calculate_buffer_size((320, 240)) == 1348
assert calculate_buffer_size((640, 480)) == 2694
assert calculate_buffer_size((1280, 720)) == 4666
assert calculate_buffer_size((1920, 1080)) == 7000
assert calculate_buffer_size((3840, 2160)) == 14000
def test_get_websocket_stream_mode() -> None:
assert get_websocket_stream_mode(make_scope('image')) == 'image'
assert get_websocket_stream_mode(make_scope('video')) == 'video'
def test_read_pipe_buffer() -> None:
read_fd, write_fd = os.pipe()
os.write(write_fd, b'abcdefgh')
os.close(write_fd)
assert read_pipe_buffer(read_fd, 4) == b'abcd'
assert read_pipe_buffer(read_fd, 4) == b'efgh'
assert read_pipe_buffer(read_fd, 1) is None
os.close(read_fd)
def test_forward_frames() -> None:
resolution = (320, 240)
frame_size = resolution[0] * resolution[1] * 3
commands = ffmpeg_builder.run(ffmpeg_builder.chain(
ffmpeg_builder.capture_video(),
ffmpeg_builder.set_media_resolution(pack_resolution(resolution)),
ffmpeg_builder.set_input_fps(30),
ffmpeg_builder.set_input('-'),
ffmpeg_builder.set_video_encoder('libvpx'),
ffmpeg_builder.set_encoder_deadline('realtime'),
ffmpeg_builder.set_stream_quality(400),
ffmpeg_builder.set_muxer('ivf'),
ffmpeg_builder.set_output('-')
))
encoder = subprocess.Popen(commands, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
encoder.stdin.write(bytes(frame_size))
encoder.stdin.close()
for stream_buffer in forward_stream_frame(encoder):
assert 0 < len(stream_buffer) < frame_size