From a3e504ee1753e0879758c65baf6367285666ee12 Mon Sep 17 00:00:00 2001 From: Harisreedhar <46858047+harisreedhar@users.noreply.github.com> Date: Tue, 21 Apr 2026 17:33:26 +0530 Subject: [PATCH] libdatachannel implementation Part 1 (#1077) * add libdatachannel * move some methods to rtc_helper.py * move some methods to rtc_helper.py * open_vp8_encoder -> spawn_stream_encoder * update url (linux only) * fix lint * remove RTC_STATE and add unit tests * remove convert_to_raw_rgb * remove 'utf-8' from decode * cleanup * cleanup * cleanup * move rtc types to rtc_bindings.py * remove load_library * cleanup * add todos and some cleanup * fix lint --- facefusion/ffmpeg.py | 21 +++- facefusion/ffmpeg_builder.py | 14 ++- facefusion/rtc.py | 195 +++++++++++++++++++++++++++++++++++ facefusion/rtc_bindings.py | 88 ++++++++++++++++ facefusion/types.py | 11 ++ tests/test_ffmpeg.py | 26 ++++- tests/test_ffmpeg_builder.py | 30 +++++- 7 files changed, 378 insertions(+), 7 deletions(-) create mode 100644 facefusion/rtc.py create mode 100644 facefusion/rtc_bindings.py diff --git a/facefusion/ffmpeg.py b/facefusion/ffmpeg.py index 26d6d0ae..753a5c90 100644 --- a/facefusion/ffmpeg.py +++ b/facefusion/ffmpeg.py @@ -355,6 +355,23 @@ def sanitize_video(file_content : bytes, asset_path : str, security_strategy : A return run_ffmpeg_with_pipe(commands, file_content).returncode == 0 +def spawn_stream(resolution : Resolution, stream_fps : int, stream_bitrate : int) -> subprocess.Popen[bytes]: + commands = ffmpeg_builder.chain( + ffmpeg_builder.use_wallclock(), + ffmpeg_builder.capture_video(), + ffmpeg_builder.set_media_resolution(pack_resolution(resolution)), + ffmpeg_builder.set_input('-'), + ffmpeg_builder.set_video_encoder('libvpx'), # TODO: replace hardcoded value + ffmpeg_builder.enforce_pixel_format('yuv420p'), # TODO: replace hardcoded value + ffmpeg_builder.set_stream_quality(stream_bitrate), + ffmpeg_builder.set_stream_keyframe(stream_fps), + ffmpeg_builder.set_muxer('ivf'), # TODO: replace hardcoded value + ffmpeg_builder.set_output('-') + ) + commands = ffmpeg_builder.run(commands) + return subprocess.Popen(commands, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE) + + def fix_audio_encoder(video_format : VideoFormat, audio_encoder : AudioEncoder) -> AudioEncoder: if video_format == 'avi' and audio_encoder == 'libopus': return 'aac' @@ -379,7 +396,3 @@ def fix_video_encoder(video_format : VideoFormat, video_encoder : VideoEncoder) if video_format == 'webm': return 'libvpx-vp9' return video_encoder - - - - diff --git a/facefusion/ffmpeg_builder.py b/facefusion/ffmpeg_builder.py index e971cfa0..ef094fa5 100644 --- a/facefusion/ffmpeg_builder.py +++ b/facefusion/ffmpeg_builder.py @@ -5,7 +5,7 @@ from typing import List, Optional import numpy from facefusion.filesystem import get_file_format -from facefusion.types import AudioEncoder, Command, CommandSet, Duration, Fps, SampleRate, StreamMode, VideoEncoder, VideoPreset +from facefusion.types import AudioEncoder, Command, CommandSet, Duration, Fps, Muxer, SampleRate, StreamMode, VideoEncoder, VideoPreset def run(commands : List[Command]) -> List[Command]: @@ -289,3 +289,15 @@ def map_qsv_preset(video_preset : VideoPreset) -> Optional[str]: if video_preset in [ 'faster', 'fast', 'medium', 'slow', 'slower', 'veryslow' ]: return video_preset return None + + +def use_wallclock() -> List[Command]: + return [ '-use_wallclock_as_timestamps', '1' ] + + +def set_stream_keyframe(interval : int) -> List[Command]: + return [ '-g', str(interval), '-keyint_min', str(interval) ] + + +def set_muxer(muxer : Muxer) -> List[Command]: + return [ '-f', muxer] diff --git a/facefusion/rtc.py b/facefusion/rtc.py new file mode 100644 index 00000000..470b7400 --- /dev/null +++ b/facefusion/rtc.py @@ -0,0 +1,195 @@ +import ctypes +import time +from functools import lru_cache +from typing import Dict, List, Optional, Tuple + +from facefusion.common_helper import is_linux, is_macos, is_windows +from facefusion.download import conditional_download_hashes, conditional_download_sources +from facefusion.filesystem import resolve_relative_path +from facefusion.rtc_bindings import RTC_CONFIGURATION, RTC_PACKETIZER_INIT, init_ctypes +from facefusion.types import DownloadSet, RtcAudioTrack, RtcPeer, RtcVideoTrack + +RTC_LIBRARY: Optional[ctypes.CDLL] = None + + +def resolve_binary_file() -> Optional[str]: + if is_linux(): + return 'linux-x64-openssl-h264-vp8-av1-opus-libdatachannel-0.24.1.so' + if is_macos(): + return 'macos-universal-openssl-h264-vp8-av1-opus-libdatachannel-0.24.1.dylib' + if is_windows(): + return 'windows-x64-openssl-h264-vp8-av1-opus-datachannel-0.24.1.dll' + return None + + +@lru_cache +def create_static_download_set() -> Dict[str, DownloadSet]: # TODO: replace once conda package is in place + binary_name = resolve_binary_file() + + return\ + { + 'hashes': + { + 'datachannel': + { + 'url': 'https://huggingface.co/bluefoxcreation/libdatachannel/resolve/main/linux-x64-openssl-h264-vp8-av1-opus-libdatachannel-0.24.1.so.hash', # TODO: use url with dynamic binary_name + 'path': resolve_relative_path('../.assets/binaries/' + binary_name + '.hash') + } + }, + 'sources': + { + 'datachannel': + { + 'url': 'https://huggingface.co/bluefoxcreation/libdatachannel/resolve/main/linux-x64-openssl-h264-vp8-av1-opus-libdatachannel-0.24.1.so', # TODO: use url with dynamic binary_name + 'path': resolve_relative_path('../.assets/binaries/' + binary_name) + } + } + } + + +def pre_check() -> bool: + download_set = create_static_download_set() + + if not conditional_download_hashes(download_set.get('hashes')): + return False + return conditional_download_sources(download_set.get('sources')) + + +def get_rtc_library() -> Optional[ctypes.CDLL]: + global RTC_LIBRARY + + if RTC_LIBRARY: + return RTC_LIBRARY + binary_path = create_static_download_set().get('sources').get('datachannel').get('path') + + if binary_path: + rtc_library = ctypes.CDLL(binary_path) + if init_ctypes(rtc_library): + RTC_LIBRARY = rtc_library + + return RTC_LIBRARY + + +def create_peer_connection() -> int: # TODO: change method to have arguments with default values + rtc_library = get_rtc_library() + config = RTC_CONFIGURATION() + config.iceServers = None + config.iceServersCount = 0 + config.proxyServer = None + config.bindAddress = None + config.certificateType = 0 + config.iceTransportPolicy = 0 + config.enableIceTcp = False + config.enableIceUdpMux = True + config.disableAutoNegotiation = False + config.forceMediaTransport = True + config.portRangeBegin = 0 + config.portRangeEnd = 0 + config.mtu = 0 + config.maxMessageSize = 0 + return rtc_library.rtcCreatePeerConnection(ctypes.byref(config)) + + +def add_media_tracks(peer_connection : int) -> Tuple[RtcVideoTrack, RtcAudioTrack]: # TODO: split into add_audio_track and add_video_track. add arguments with default values + rtc_library = get_rtc_library() + video_media_description = b'm=video 9 UDP/TLS/RTP/SAVPF 96\r\na=rtpmap:96 VP8/90000\r\na=sendonly\r\na=mid:0\r\na=rtcp-mux\r\n' + audio_media_description = b'm=audio 9 UDP/TLS/RTP/SAVPF 111\r\na=rtpmap:111 opus/48000/2\r\na=sendonly\r\na=mid:1\r\na=rtcp-mux\r\n' + + video_track = rtc_library.rtcAddTrack(peer_connection, video_media_description) + audio_track = rtc_library.rtcAddTrack(peer_connection, audio_media_description) + + video_packetizer = RTC_PACKETIZER_INIT() + video_packetizer.ssrc = 42 + video_packetizer.cname = b'video' + video_packetizer.payloadType = 96 + video_packetizer.clockRate = 90000 + video_packetizer.maxFragmentSize = 1200 + + rtc_library.rtcSetVP8Packetizer(video_track, ctypes.byref(video_packetizer)) + rtc_library.rtcChainRtcpSrReporter(video_track) + rtc_library.rtcChainRtcpNackResponder(video_track, 512) + + audio_packetizer = RTC_PACKETIZER_INIT() + audio_packetizer.ssrc = 43 + audio_packetizer.cname = b'audio' + audio_packetizer.payloadType = 111 + audio_packetizer.clockRate = 48000 + + rtc_library.rtcSetOpusPacketizer(audio_track, ctypes.byref(audio_packetizer)) + rtc_library.rtcChainRtcpSrReporter(audio_track) + + return video_track, audio_track + + +def negotiate_sdp(peer_connection : int, sdp_offer : str) -> Optional[str]: + rtc_library = get_rtc_library() + rtc_library.rtcSetRemoteDescription(peer_connection, sdp_offer.encode('utf-8'), b'offer') + buffer_size = 16384 + buffer_string = ctypes.create_string_buffer(buffer_size) + wait_limit = time.monotonic() + 5 + + while time.monotonic() < wait_limit: + if rtc_library.rtcGetLocalDescription(peer_connection, buffer_string, buffer_size) > 0: + return buffer_string.value.decode() + time.sleep(0.05) + + return None + + +def handle_whep_offer(peers : List[RtcPeer], sdp_offer : str) -> Optional[str]: + peer_connection = create_peer_connection() + video_track, audio_track = add_media_tracks(peer_connection) + local_sdp = negotiate_sdp(peer_connection, sdp_offer) + + if local_sdp: + rtc_peer : RtcPeer =\ + { + 'peer_connection': peer_connection, + 'video_track': video_track, + 'audio_track': audio_track + } + peers.append(rtc_peer) + + return local_sdp + + +def send_to_peers(peers : List[RtcPeer], data : bytes) -> None: + rtc_library = get_rtc_library() + + if peers: + timestamp = int(time.monotonic() * 90000) & 0xFFFFFFFF + data_buffer = ctypes.create_string_buffer(data) + data_total = len(data) + + for rtc_peer in peers: + video_track_id = rtc_peer.get('video_track') + + if video_track_id and rtc_library.rtcIsOpen(video_track_id): + rtc_library.rtcSetTrackRtpTimestamp(video_track_id, timestamp) + rtc_library.rtcSendMessage(video_track_id, data_buffer, data_total) + + return None + + +def delete_peers(peers : List[RtcPeer]) -> None: + rtc_library = get_rtc_library() + + for rtc_peer in peers: + peer_connection_id = rtc_peer.get('peer_connection') + + if peer_connection_id: + rtc_library.rtcDeletePeerConnection(peer_connection_id) + + peers.clear() + + +def is_peer_connected(peers : List[RtcPeer]) -> bool: + rtc_library = get_rtc_library() + + for rtc_peer in peers: + video_track_id = rtc_peer.get('video_track') + + if video_track_id and rtc_library.rtcIsOpen(video_track_id): + return True + + return False diff --git a/facefusion/rtc_bindings.py b/facefusion/rtc_bindings.py new file mode 100644 index 00000000..eeea200a --- /dev/null +++ b/facefusion/rtc_bindings.py @@ -0,0 +1,88 @@ +import ctypes + +RTC_CONFIGURATION = type('RtcConfiguration', (ctypes.Structure,), +{ + '_fields_': + [ + ('iceServers', ctypes.POINTER(ctypes.c_char_p)), + ('iceServersCount', ctypes.c_int), + ('proxyServer', ctypes.c_char_p), + ('bindAddress', ctypes.c_char_p), + ('certificateType', ctypes.c_int), + ('iceTransportPolicy', ctypes.c_int), + ('enableIceTcp', ctypes.c_bool), + ('enableIceUdpMux', ctypes.c_bool), + ('disableAutoNegotiation', ctypes.c_bool), + ('forceMediaTransport', ctypes.c_bool), + ('portRangeBegin', ctypes.c_ushort), + ('portRangeEnd', ctypes.c_ushort), + ('mtu', ctypes.c_int), + ('maxMessageSize', ctypes.c_int) + ] +}) +RTC_PACKETIZER_INIT = type('RtcPacketizerInit', (ctypes.Structure,), +{ + '_fields_': + [ + ('ssrc', ctypes.c_uint32), + ('cname', ctypes.c_char_p), + ('payloadType', ctypes.c_uint8), + ('clockRate', ctypes.c_uint32), + ('sequenceNumber', ctypes.c_uint16), + ('timestamp', ctypes.c_uint32), + ('maxFragmentSize', ctypes.c_uint16), + ('nalSeparator', ctypes.c_int), + ('obuPacketization', ctypes.c_int), + ('playoutDelayId', ctypes.c_uint8), + ('playoutDelayMin', ctypes.c_uint16), + ('playoutDelayMax', ctypes.c_uint16) + ] +}) +LOG_CB_TYPE = ctypes.CFUNCTYPE(None, ctypes.c_int, ctypes.c_char_p) + + +def init_ctypes(rtc_library : ctypes.CDLL) -> bool: + if rtc_library: + rtc_library.rtcInitLogger.argtypes = [ ctypes.c_int, LOG_CB_TYPE ] + rtc_library.rtcInitLogger.restype = None + rtc_library.rtcInitLogger(4, LOG_CB_TYPE(0)) + + rtc_library.rtcCreatePeerConnection.argtypes = [ ctypes.POINTER(RTC_CONFIGURATION) ] + rtc_library.rtcCreatePeerConnection.restype = ctypes.c_int + + rtc_library.rtcDeletePeerConnection.argtypes = [ ctypes.c_int ] + rtc_library.rtcDeletePeerConnection.restype = ctypes.c_int + + rtc_library.rtcSetRemoteDescription.argtypes = [ ctypes.c_int, ctypes.c_char_p, ctypes.c_char_p ] + rtc_library.rtcSetRemoteDescription.restype = ctypes.c_int + + rtc_library.rtcAddTrack.argtypes = [ ctypes.c_int, ctypes.c_char_p ] + rtc_library.rtcAddTrack.restype = ctypes.c_int + + rtc_library.rtcSendMessage.argtypes = [ ctypes.c_int, ctypes.c_void_p, ctypes.c_int ] + rtc_library.rtcSendMessage.restype = ctypes.c_int + + rtc_library.rtcSetVP8Packetizer.argtypes = [ ctypes.c_int, ctypes.POINTER(RTC_PACKETIZER_INIT) ] + rtc_library.rtcSetVP8Packetizer.restype = ctypes.c_int + + rtc_library.rtcChainRtcpSrReporter.argtypes = [ ctypes.c_int ] + rtc_library.rtcChainRtcpSrReporter.restype = ctypes.c_int + + rtc_library.rtcSetTrackRtpTimestamp.argtypes = [ ctypes.c_int, ctypes.c_uint32 ] + rtc_library.rtcSetTrackRtpTimestamp.restype = ctypes.c_int + + rtc_library.rtcIsOpen.argtypes = [ ctypes.c_int ] + rtc_library.rtcIsOpen.restype = ctypes.c_bool + + rtc_library.rtcChainRtcpNackResponder.argtypes = [ ctypes.c_int, ctypes.c_uint ] + rtc_library.rtcChainRtcpNackResponder.restype = ctypes.c_int + + rtc_library.rtcGetLocalDescription.argtypes = [ ctypes.c_int, ctypes.c_char_p, ctypes.c_int ] + rtc_library.rtcGetLocalDescription.restype = ctypes.c_int + + rtc_library.rtcSetOpusPacketizer.argtypes = [ ctypes.c_int, ctypes.POINTER(RTC_PACKETIZER_INIT) ] + rtc_library.rtcSetOpusPacketizer.restype = ctypes.c_int + + return True + + return False diff --git a/facefusion/types.py b/facefusion/types.py index 9803ae4b..5b73df59 100755 --- a/facefusion/types.py +++ b/facefusion/types.py @@ -169,6 +169,7 @@ TempFrameFormat = Literal['bmp', 'jpeg', 'png', 'tiff'] AudioEncoder = Literal['flac', 'aac', 'libmp3lame', 'libopus', 'libvorbis', 'pcm_s16le', 'pcm_s32le'] ImageEncoder = Literal['bmp', 'mjpeg', 'png', 'tiff', 'libwebp'] VideoEncoder = Literal['libx264', 'libx264rgb', 'libx265', 'libvpx-vp9', 'h264_nvenc', 'hevc_nvenc', 'h264_amf', 'hevc_amf', 'h264_qsv', 'hevc_qsv', 'h264_videotoolbox', 'hevc_videotoolbox', 'rawvideo'] +Muxer : TypeAlias = str AudioSet : TypeAlias = Dict[AudioFormat, str] ImageSet : TypeAlias = Dict[ImageFormat, str] VideoSet : TypeAlias = Dict[VideoFormat, str] @@ -269,6 +270,16 @@ RtcOfferSet = TypedDict('RtcOfferSet', 'type': str }) +RtcPeer = TypedDict('RtcPeer', +{ + 'peer_connection': int, + 'video_track': int, + 'audio_track': int +}) + +RtcVideoTrack : TypeAlias = int +RtcAudioTrack : TypeAlias = int + ModelOptions : TypeAlias = Dict[str, Any] ModelSet : TypeAlias = Dict[str, ModelOptions] ModelInitializer : TypeAlias = NDArray[Any] diff --git a/tests/test_ffmpeg.py b/tests/test_ffmpeg.py index 3e5aae92..6fb72877 100644 --- a/tests/test_ffmpeg.py +++ b/tests/test_ffmpeg.py @@ -1,4 +1,5 @@ import os +import struct import subprocess import tempfile @@ -7,7 +8,7 @@ import pytest import facefusion.ffmpeg from facefusion import process_manager, state_manager from facefusion.download import conditional_download -from facefusion.ffmpeg import concat_video, extract_frames, merge_video, read_audio_buffer, replace_audio, restore_audio, sanitize_audio, sanitize_image, sanitize_video, spawn_frames +from facefusion.ffmpeg import concat_video, extract_frames, merge_video, read_audio_buffer, replace_audio, restore_audio, sanitize_audio, sanitize_image, sanitize_video, spawn_frames, spawn_stream from facefusion.ffprobe import probe_entries from facefusion.filesystem import copy_file, is_image from facefusion.temp_helper import clear_temp_directory, create_temp_directory, get_temp_file_path, resolve_temp_frame_paths @@ -253,3 +254,26 @@ def test_sanitize_video() -> None: assert sanitize_video(file_content, output_paths[1], 'moderate') is True assert probe_entries(output_paths[1], [ 'codec_name' ]).get('codec_name') == 'hevc' + + +def test_spawn_stream() -> None: # TODO: Improve test + test_set =\ + [ + ((426, 240), 25, 500), + ((640, 360), 30, 1000), + ((1280, 720), 30, 2000) + ] + + for resolution, stream_fps, stream_bitrate in test_set: + encoder = spawn_stream(resolution, stream_fps, stream_bitrate) + frame_size = resolution[0] * resolution[1] * 3 + stdout, _ = encoder.communicate(input = bytes(frame_size)) + + assert len(stdout) > 32 + frame_header = stdout[:32] + + assert frame_header[:4] == b'DKIF' + output_width = struct.unpack_from(' None: @@ -108,3 +108,31 @@ def test_set_video_quality() -> None: assert set_video_quality('hevc_videotoolbox', 0) == [ '-b:v', '1024k' ] assert set_video_quality('hevc_videotoolbox', 50) == [ '-b:v', '25768k' ] assert set_video_quality('hevc_videotoolbox', 100) == [ '-b:v', '50512k' ] + + +def test_use_wallclock_timestamps() -> None: + assert use_wallclock() == [ '-use_wallclock_as_timestamps', '1' ] + + +def test_capture_video() -> None: + assert capture_video() == [ '-f', 'rawvideo', '-pix_fmt', 'rgb24' ] + + +def test_enforce_pixel_format() -> None: + assert enforce_pixel_format('yuv420p') == [ '-pix_fmt', 'yuv420p' ] + assert enforce_pixel_format('rgb24') == [ '-pix_fmt', 'rgb24' ] + + +def test_set_stream_quality() -> None: + assert set_stream_quality(500) == [ '-b:v', '500k' ] + assert set_stream_quality(2000) == [ '-b:v', '2000k' ] + + +def test_set_keyframe_interval() -> None: + assert set_stream_keyframe(30) == [ '-g', '30', '-keyint_min', '30' ] + assert set_stream_keyframe(60) == [ '-g', '60', '-keyint_min', '60' ] + + +def test_set_output_format() -> None: + assert set_muxer('ivf') == [ '-f', 'ivf' ] + assert set_muxer('mpegts') == [ '-f', 'mpegts' ]