Implement RTC store (#1081)

* implement RTC store

* fix ffmpeg_builder

* add RtcSdpOffer type
This commit is contained in:
Harisreedhar
2026-04-28 15:07:12 +05:30
committed by GitHub
parent 7d0cd9388a
commit da5e21793d
7 changed files with 84 additions and 12 deletions
+5 -2
View File
@@ -355,15 +355,18 @@ def sanitize_video(file_content : bytes, asset_path : str, security_strategy : A
return run_ffmpeg_with_pipe(commands, file_content).returncode == 0
def spawn_stream(resolution : Resolution, stream_fps : int, stream_bitrate : int) -> subprocess.Popen[bytes]:
def spawn_stream(resolution : Resolution, stream_fps : int, stream_bitrate : int, stream_bufsize : int) -> subprocess.Popen[bytes]:
commands = ffmpeg_builder.chain(
ffmpeg_builder.use_wallclock(),
ffmpeg_builder.capture_video(),
ffmpeg_builder.set_media_resolution(pack_resolution(resolution)),
ffmpeg_builder.set_input_fps(stream_fps),
ffmpeg_builder.set_input('-'),
ffmpeg_builder.set_video_encoder('libvpx'), # TODO: replace hardcoded value
ffmpeg_builder.enforce_pixel_format('yuv420p'), # TODO: replace hardcoded value
ffmpeg_builder.set_encoder_deadline('realtime'),
ffmpeg_builder.enforce_pixel_format('rgb24'), # TODO: replace hardcoded value
ffmpeg_builder.set_stream_quality(stream_bitrate),
ffmpeg_builder.set_video_bufsize(stream_bufsize),
ffmpeg_builder.set_stream_keyframe(stream_fps),
ffmpeg_builder.set_muxer('ivf'), # TODO: replace hardcoded value
ffmpeg_builder.set_output('-')
+14 -2
View File
@@ -5,7 +5,7 @@ from typing import List, Optional
import numpy
from facefusion.filesystem import get_file_format
from facefusion.types import AudioEncoder, Command, CommandSet, Duration, Fps, Muxer, SampleRate, StreamMode, VideoEncoder, VideoPreset
from facefusion.types import AudioEncoder, Command, CommandSet, Duration, EncoderDeadline, Fps, Muxer, SampleRate, StreamMode, VideoEncoder, VideoPreset
def run(commands : List[Command]) -> List[Command]:
@@ -300,4 +300,16 @@ def set_stream_keyframe(interval : int) -> List[Command]:
def set_muxer(muxer : Muxer) -> List[Command]:
return [ '-f', muxer]
return [ '-f', muxer ]
def set_video_bufsize(video_bufsize : int) -> List[Command]:
return [ '-bufsize', str(video_bufsize) + 'k' ]
def set_encoder_deadline(deadline : EncoderDeadline) -> List[Command]:
return [ '-deadline', deadline ]
def set_lag_in_frames(count : int) -> List[Command]:
return [ '-lag-in-frames', str(count) ]
+2 -2
View File
@@ -8,7 +8,7 @@ from facefusion.common_helper import is_linux, is_macos, is_windows
from facefusion.download import conditional_download_hashes, conditional_download_sources
from facefusion.filesystem import resolve_relative_path
from facefusion.rtc_bindings import RTC_CONFIGURATION, RTC_PACKETIZER_INIT, init_ctypes
from facefusion.types import DownloadSet, RtcAudioTrack, RtcPeer, RtcVideoTrack
from facefusion.types import DownloadSet, RtcAudioTrack, RtcPeer, RtcSdpAnswer, RtcSdpOffer, RtcVideoTrack
def resolve_binary_file() -> Optional[str]:
@@ -169,7 +169,7 @@ def negotiate_sdp(peer_connection : int, sdp_offer : str) -> Optional[str]:
return None
def handle_whep_offer(peers : List[RtcPeer], sdp_offer : str) -> Optional[str]:
def handle_whep_offer(peers : List[RtcPeer], sdp_offer : RtcSdpOffer) -> Optional[RtcSdpAnswer]:
peer_connection = create_peer_connection()
audio_track = add_audio_track(peer_connection)
video_track = add_video_track(peer_connection)
+37
View File
@@ -0,0 +1,37 @@
from typing import List, Optional
from facefusion import rtc
from facefusion.types import RtcPeer, RtcSdpAnswer, RtcSdpOffer, RtcStreamStore
RTC_STREAMS : RtcStreamStore = {} # TODO: tie lifetime to session_id so streams are cleaned up on session expiry
def get_rtc_stream(stream_path : str) -> Optional[List[RtcPeer]]:
return RTC_STREAMS.get(stream_path)
def create_rtc_stream(stream_path : str) -> None:
RTC_STREAMS[stream_path] = []
def destroy_rtc_stream(stream_path : str) -> None:
peers = RTC_STREAMS.pop(stream_path, None)
if peers:
rtc.delete_peers(peers)
def add_rtc_viewer(stream_path : str, sdp_offer : RtcSdpOffer) -> Optional[RtcSdpAnswer]:
peers = get_rtc_stream(stream_path)
if peers:
return rtc.handle_whep_offer(peers, sdp_offer)
return None
def send_rtc_frame(stream_path : str, frame_data : bytes) -> None:
peers = get_rtc_stream(stream_path)
if peers:
rtc.send_to_peers(peers, frame_data)
+4
View File
@@ -169,6 +169,7 @@ TempFrameFormat = Literal['bmp', 'jpeg', 'png', 'tiff']
AudioEncoder = Literal['flac', 'aac', 'libmp3lame', 'libopus', 'libvorbis', 'pcm_s16le', 'pcm_s32le']
ImageEncoder = Literal['bmp', 'mjpeg', 'png', 'tiff', 'libwebp']
VideoEncoder = Literal['libx264', 'libx264rgb', 'libx265', 'libvpx-vp9', 'h264_nvenc', 'hevc_nvenc', 'h264_amf', 'hevc_amf', 'h264_qsv', 'hevc_qsv', 'h264_videotoolbox', 'hevc_videotoolbox', 'rawvideo']
EncoderDeadline = Literal['best', 'good', 'realtime']
Muxer : TypeAlias = str
AudioSet : TypeAlias = Dict[AudioFormat, str]
ImageSet : TypeAlias = Dict[ImageFormat, str]
@@ -279,6 +280,9 @@ RtcPeer = TypedDict('RtcPeer',
RtcVideoTrack : TypeAlias = int
RtcAudioTrack : TypeAlias = int
RtcSdpOffer : TypeAlias = str
RtcSdpAnswer : TypeAlias = str
RtcStreamStore : TypeAlias = Dict[str, List[RtcPeer]]
ModelOptions : TypeAlias = Dict[str, Any]
ModelSet : TypeAlias = Dict[str, ModelOptions]
+5 -5
View File
@@ -259,13 +259,13 @@ def test_sanitize_video() -> None:
def test_spawn_stream() -> None: # TODO: Improve test
test_set =\
[
((426, 240), 25, 500),
((640, 360), 30, 1000),
((1280, 720), 30, 2000)
((426, 240), 25, 400, 800),
((640, 360), 30, 1000, 2000),
((1280, 720), 30, 2000, 4000)
]
for resolution, stream_fps, stream_bitrate in test_set:
encoder = spawn_stream(resolution, stream_fps, stream_bitrate)
for resolution, stream_fps, stream_bitrate, stream_bufsize in test_set:
encoder = spawn_stream(resolution, stream_fps, stream_bitrate, stream_bufsize)
frame_size = resolution[0] * resolution[1] * 3
stdout, _ = encoder.communicate(input = bytes(frame_size))
+17 -1
View File
@@ -1,7 +1,7 @@
from shutil import which
from facefusion import ffmpeg_builder
from facefusion.ffmpeg_builder import capture_video, chain, concat, enforce_pixel_format, keep_video_alpha, run, select_frame_range, set_audio_quality, set_audio_sample_size, set_muxer, set_stream_keyframe, set_stream_mode, set_stream_quality, set_video_encoder, set_video_fps, set_video_quality, use_wallclock
from facefusion.ffmpeg_builder import capture_video, chain, concat, enforce_pixel_format, keep_video_alpha, run, select_frame_range, set_audio_quality, set_audio_sample_size, set_encoder_deadline, set_lag_in_frames, set_muxer, set_stream_keyframe, set_stream_mode, set_stream_quality, set_video_bufsize, set_video_encoder, set_video_fps, set_video_quality, use_wallclock
def test_run() -> None:
@@ -136,3 +136,19 @@ def test_set_keyframe_interval() -> None:
def test_set_output_format() -> None:
assert set_muxer('ivf') == [ '-f', 'ivf' ]
assert set_muxer('mpegts') == [ '-f', 'mpegts' ]
def test_set_video_bufsize() -> None:
assert set_video_bufsize(800) == [ '-bufsize', '800k' ]
assert set_video_bufsize(4000) == [ '-bufsize', '4000k' ]
def test_set_encoder_deadline() -> None:
assert set_encoder_deadline('best') == [ '-deadline', 'best' ]
assert set_encoder_deadline('good') == [ '-deadline', 'good' ]
assert set_encoder_deadline('realtime') == [ '-deadline', 'realtime' ]
def test_set_lag_in_frames() -> None:
assert set_lag_in_frames(0) == [ '-lag-in-frames', '0' ]
assert set_lag_in_frames(16) == [ '-lag-in-frames', '16' ]