libdatachannel implementation Part 1 (#1077)

* add libdatachannel

* move some methods to rtc_helper.py

* move some methods to rtc_helper.py

* open_vp8_encoder -> spawn_stream_encoder

* update url (linux only)

* fix lint

* remove RTC_STATE and add unit tests

* remove convert_to_raw_rgb

* remove 'utf-8' from decode

* cleanup

* cleanup

* cleanup

* move rtc types to rtc_bindings.py

* remove load_library

* cleanup

* add todos and some cleanup

* fix lint
This commit is contained in:
Harisreedhar
2026-04-21 17:33:26 +05:30
committed by GitHub
parent 5ce181b33d
commit a3e504ee17
7 changed files with 378 additions and 7 deletions
+17 -4
View File
@@ -355,6 +355,23 @@ def sanitize_video(file_content : bytes, asset_path : str, security_strategy : A
return run_ffmpeg_with_pipe(commands, file_content).returncode == 0
def spawn_stream(resolution : Resolution, stream_fps : int, stream_bitrate : int) -> subprocess.Popen[bytes]:
commands = ffmpeg_builder.chain(
ffmpeg_builder.use_wallclock(),
ffmpeg_builder.capture_video(),
ffmpeg_builder.set_media_resolution(pack_resolution(resolution)),
ffmpeg_builder.set_input('-'),
ffmpeg_builder.set_video_encoder('libvpx'), # TODO: replace hardcoded value
ffmpeg_builder.enforce_pixel_format('yuv420p'), # TODO: replace hardcoded value
ffmpeg_builder.set_stream_quality(stream_bitrate),
ffmpeg_builder.set_stream_keyframe(stream_fps),
ffmpeg_builder.set_muxer('ivf'), # TODO: replace hardcoded value
ffmpeg_builder.set_output('-')
)
commands = ffmpeg_builder.run(commands)
return subprocess.Popen(commands, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
def fix_audio_encoder(video_format : VideoFormat, audio_encoder : AudioEncoder) -> AudioEncoder:
if video_format == 'avi' and audio_encoder == 'libopus':
return 'aac'
@@ -379,7 +396,3 @@ def fix_video_encoder(video_format : VideoFormat, video_encoder : VideoEncoder)
if video_format == 'webm':
return 'libvpx-vp9'
return video_encoder
+13 -1
View File
@@ -5,7 +5,7 @@ from typing import List, Optional
import numpy
from facefusion.filesystem import get_file_format
from facefusion.types import AudioEncoder, Command, CommandSet, Duration, Fps, SampleRate, StreamMode, VideoEncoder, VideoPreset
from facefusion.types import AudioEncoder, Command, CommandSet, Duration, Fps, Muxer, SampleRate, StreamMode, VideoEncoder, VideoPreset
def run(commands : List[Command]) -> List[Command]:
@@ -289,3 +289,15 @@ def map_qsv_preset(video_preset : VideoPreset) -> Optional[str]:
if video_preset in [ 'faster', 'fast', 'medium', 'slow', 'slower', 'veryslow' ]:
return video_preset
return None
def use_wallclock() -> List[Command]:
return [ '-use_wallclock_as_timestamps', '1' ]
def set_stream_keyframe(interval : int) -> List[Command]:
return [ '-g', str(interval), '-keyint_min', str(interval) ]
def set_muxer(muxer : Muxer) -> List[Command]:
return [ '-f', muxer]
+195
View File
@@ -0,0 +1,195 @@
import ctypes
import time
from functools import lru_cache
from typing import Dict, List, Optional, Tuple
from facefusion.common_helper import is_linux, is_macos, is_windows
from facefusion.download import conditional_download_hashes, conditional_download_sources
from facefusion.filesystem import resolve_relative_path
from facefusion.rtc_bindings import RTC_CONFIGURATION, RTC_PACKETIZER_INIT, init_ctypes
from facefusion.types import DownloadSet, RtcAudioTrack, RtcPeer, RtcVideoTrack
RTC_LIBRARY: Optional[ctypes.CDLL] = None
def resolve_binary_file() -> Optional[str]:
if is_linux():
return 'linux-x64-openssl-h264-vp8-av1-opus-libdatachannel-0.24.1.so'
if is_macos():
return 'macos-universal-openssl-h264-vp8-av1-opus-libdatachannel-0.24.1.dylib'
if is_windows():
return 'windows-x64-openssl-h264-vp8-av1-opus-datachannel-0.24.1.dll'
return None
@lru_cache
def create_static_download_set() -> Dict[str, DownloadSet]: # TODO: replace once conda package is in place
binary_name = resolve_binary_file()
return\
{
'hashes':
{
'datachannel':
{
'url': 'https://huggingface.co/bluefoxcreation/libdatachannel/resolve/main/linux-x64-openssl-h264-vp8-av1-opus-libdatachannel-0.24.1.so.hash', # TODO: use url with dynamic binary_name
'path': resolve_relative_path('../.assets/binaries/' + binary_name + '.hash')
}
},
'sources':
{
'datachannel':
{
'url': 'https://huggingface.co/bluefoxcreation/libdatachannel/resolve/main/linux-x64-openssl-h264-vp8-av1-opus-libdatachannel-0.24.1.so', # TODO: use url with dynamic binary_name
'path': resolve_relative_path('../.assets/binaries/' + binary_name)
}
}
}
def pre_check() -> bool:
download_set = create_static_download_set()
if not conditional_download_hashes(download_set.get('hashes')):
return False
return conditional_download_sources(download_set.get('sources'))
def get_rtc_library() -> Optional[ctypes.CDLL]:
global RTC_LIBRARY
if RTC_LIBRARY:
return RTC_LIBRARY
binary_path = create_static_download_set().get('sources').get('datachannel').get('path')
if binary_path:
rtc_library = ctypes.CDLL(binary_path)
if init_ctypes(rtc_library):
RTC_LIBRARY = rtc_library
return RTC_LIBRARY
def create_peer_connection() -> int: # TODO: change method to have arguments with default values
rtc_library = get_rtc_library()
config = RTC_CONFIGURATION()
config.iceServers = None
config.iceServersCount = 0
config.proxyServer = None
config.bindAddress = None
config.certificateType = 0
config.iceTransportPolicy = 0
config.enableIceTcp = False
config.enableIceUdpMux = True
config.disableAutoNegotiation = False
config.forceMediaTransport = True
config.portRangeBegin = 0
config.portRangeEnd = 0
config.mtu = 0
config.maxMessageSize = 0
return rtc_library.rtcCreatePeerConnection(ctypes.byref(config))
def add_media_tracks(peer_connection : int) -> Tuple[RtcVideoTrack, RtcAudioTrack]: # TODO: split into add_audio_track and add_video_track. add arguments with default values
rtc_library = get_rtc_library()
video_media_description = b'm=video 9 UDP/TLS/RTP/SAVPF 96\r\na=rtpmap:96 VP8/90000\r\na=sendonly\r\na=mid:0\r\na=rtcp-mux\r\n'
audio_media_description = b'm=audio 9 UDP/TLS/RTP/SAVPF 111\r\na=rtpmap:111 opus/48000/2\r\na=sendonly\r\na=mid:1\r\na=rtcp-mux\r\n'
video_track = rtc_library.rtcAddTrack(peer_connection, video_media_description)
audio_track = rtc_library.rtcAddTrack(peer_connection, audio_media_description)
video_packetizer = RTC_PACKETIZER_INIT()
video_packetizer.ssrc = 42
video_packetizer.cname = b'video'
video_packetizer.payloadType = 96
video_packetizer.clockRate = 90000
video_packetizer.maxFragmentSize = 1200
rtc_library.rtcSetVP8Packetizer(video_track, ctypes.byref(video_packetizer))
rtc_library.rtcChainRtcpSrReporter(video_track)
rtc_library.rtcChainRtcpNackResponder(video_track, 512)
audio_packetizer = RTC_PACKETIZER_INIT()
audio_packetizer.ssrc = 43
audio_packetizer.cname = b'audio'
audio_packetizer.payloadType = 111
audio_packetizer.clockRate = 48000
rtc_library.rtcSetOpusPacketizer(audio_track, ctypes.byref(audio_packetizer))
rtc_library.rtcChainRtcpSrReporter(audio_track)
return video_track, audio_track
def negotiate_sdp(peer_connection : int, sdp_offer : str) -> Optional[str]:
rtc_library = get_rtc_library()
rtc_library.rtcSetRemoteDescription(peer_connection, sdp_offer.encode('utf-8'), b'offer')
buffer_size = 16384
buffer_string = ctypes.create_string_buffer(buffer_size)
wait_limit = time.monotonic() + 5
while time.monotonic() < wait_limit:
if rtc_library.rtcGetLocalDescription(peer_connection, buffer_string, buffer_size) > 0:
return buffer_string.value.decode()
time.sleep(0.05)
return None
def handle_whep_offer(peers : List[RtcPeer], sdp_offer : str) -> Optional[str]:
peer_connection = create_peer_connection()
video_track, audio_track = add_media_tracks(peer_connection)
local_sdp = negotiate_sdp(peer_connection, sdp_offer)
if local_sdp:
rtc_peer : RtcPeer =\
{
'peer_connection': peer_connection,
'video_track': video_track,
'audio_track': audio_track
}
peers.append(rtc_peer)
return local_sdp
def send_to_peers(peers : List[RtcPeer], data : bytes) -> None:
rtc_library = get_rtc_library()
if peers:
timestamp = int(time.monotonic() * 90000) & 0xFFFFFFFF
data_buffer = ctypes.create_string_buffer(data)
data_total = len(data)
for rtc_peer in peers:
video_track_id = rtc_peer.get('video_track')
if video_track_id and rtc_library.rtcIsOpen(video_track_id):
rtc_library.rtcSetTrackRtpTimestamp(video_track_id, timestamp)
rtc_library.rtcSendMessage(video_track_id, data_buffer, data_total)
return None
def delete_peers(peers : List[RtcPeer]) -> None:
rtc_library = get_rtc_library()
for rtc_peer in peers:
peer_connection_id = rtc_peer.get('peer_connection')
if peer_connection_id:
rtc_library.rtcDeletePeerConnection(peer_connection_id)
peers.clear()
def is_peer_connected(peers : List[RtcPeer]) -> bool:
rtc_library = get_rtc_library()
for rtc_peer in peers:
video_track_id = rtc_peer.get('video_track')
if video_track_id and rtc_library.rtcIsOpen(video_track_id):
return True
return False
+88
View File
@@ -0,0 +1,88 @@
import ctypes
RTC_CONFIGURATION = type('RtcConfiguration', (ctypes.Structure,),
{
'_fields_':
[
('iceServers', ctypes.POINTER(ctypes.c_char_p)),
('iceServersCount', ctypes.c_int),
('proxyServer', ctypes.c_char_p),
('bindAddress', ctypes.c_char_p),
('certificateType', ctypes.c_int),
('iceTransportPolicy', ctypes.c_int),
('enableIceTcp', ctypes.c_bool),
('enableIceUdpMux', ctypes.c_bool),
('disableAutoNegotiation', ctypes.c_bool),
('forceMediaTransport', ctypes.c_bool),
('portRangeBegin', ctypes.c_ushort),
('portRangeEnd', ctypes.c_ushort),
('mtu', ctypes.c_int),
('maxMessageSize', ctypes.c_int)
]
})
RTC_PACKETIZER_INIT = type('RtcPacketizerInit', (ctypes.Structure,),
{
'_fields_':
[
('ssrc', ctypes.c_uint32),
('cname', ctypes.c_char_p),
('payloadType', ctypes.c_uint8),
('clockRate', ctypes.c_uint32),
('sequenceNumber', ctypes.c_uint16),
('timestamp', ctypes.c_uint32),
('maxFragmentSize', ctypes.c_uint16),
('nalSeparator', ctypes.c_int),
('obuPacketization', ctypes.c_int),
('playoutDelayId', ctypes.c_uint8),
('playoutDelayMin', ctypes.c_uint16),
('playoutDelayMax', ctypes.c_uint16)
]
})
LOG_CB_TYPE = ctypes.CFUNCTYPE(None, ctypes.c_int, ctypes.c_char_p)
def init_ctypes(rtc_library : ctypes.CDLL) -> bool:
if rtc_library:
rtc_library.rtcInitLogger.argtypes = [ ctypes.c_int, LOG_CB_TYPE ]
rtc_library.rtcInitLogger.restype = None
rtc_library.rtcInitLogger(4, LOG_CB_TYPE(0))
rtc_library.rtcCreatePeerConnection.argtypes = [ ctypes.POINTER(RTC_CONFIGURATION) ]
rtc_library.rtcCreatePeerConnection.restype = ctypes.c_int
rtc_library.rtcDeletePeerConnection.argtypes = [ ctypes.c_int ]
rtc_library.rtcDeletePeerConnection.restype = ctypes.c_int
rtc_library.rtcSetRemoteDescription.argtypes = [ ctypes.c_int, ctypes.c_char_p, ctypes.c_char_p ]
rtc_library.rtcSetRemoteDescription.restype = ctypes.c_int
rtc_library.rtcAddTrack.argtypes = [ ctypes.c_int, ctypes.c_char_p ]
rtc_library.rtcAddTrack.restype = ctypes.c_int
rtc_library.rtcSendMessage.argtypes = [ ctypes.c_int, ctypes.c_void_p, ctypes.c_int ]
rtc_library.rtcSendMessage.restype = ctypes.c_int
rtc_library.rtcSetVP8Packetizer.argtypes = [ ctypes.c_int, ctypes.POINTER(RTC_PACKETIZER_INIT) ]
rtc_library.rtcSetVP8Packetizer.restype = ctypes.c_int
rtc_library.rtcChainRtcpSrReporter.argtypes = [ ctypes.c_int ]
rtc_library.rtcChainRtcpSrReporter.restype = ctypes.c_int
rtc_library.rtcSetTrackRtpTimestamp.argtypes = [ ctypes.c_int, ctypes.c_uint32 ]
rtc_library.rtcSetTrackRtpTimestamp.restype = ctypes.c_int
rtc_library.rtcIsOpen.argtypes = [ ctypes.c_int ]
rtc_library.rtcIsOpen.restype = ctypes.c_bool
rtc_library.rtcChainRtcpNackResponder.argtypes = [ ctypes.c_int, ctypes.c_uint ]
rtc_library.rtcChainRtcpNackResponder.restype = ctypes.c_int
rtc_library.rtcGetLocalDescription.argtypes = [ ctypes.c_int, ctypes.c_char_p, ctypes.c_int ]
rtc_library.rtcGetLocalDescription.restype = ctypes.c_int
rtc_library.rtcSetOpusPacketizer.argtypes = [ ctypes.c_int, ctypes.POINTER(RTC_PACKETIZER_INIT) ]
rtc_library.rtcSetOpusPacketizer.restype = ctypes.c_int
return True
return False
+11
View File
@@ -169,6 +169,7 @@ TempFrameFormat = Literal['bmp', 'jpeg', 'png', 'tiff']
AudioEncoder = Literal['flac', 'aac', 'libmp3lame', 'libopus', 'libvorbis', 'pcm_s16le', 'pcm_s32le']
ImageEncoder = Literal['bmp', 'mjpeg', 'png', 'tiff', 'libwebp']
VideoEncoder = Literal['libx264', 'libx264rgb', 'libx265', 'libvpx-vp9', 'h264_nvenc', 'hevc_nvenc', 'h264_amf', 'hevc_amf', 'h264_qsv', 'hevc_qsv', 'h264_videotoolbox', 'hevc_videotoolbox', 'rawvideo']
Muxer : TypeAlias = str
AudioSet : TypeAlias = Dict[AudioFormat, str]
ImageSet : TypeAlias = Dict[ImageFormat, str]
VideoSet : TypeAlias = Dict[VideoFormat, str]
@@ -269,6 +270,16 @@ RtcOfferSet = TypedDict('RtcOfferSet',
'type': str
})
RtcPeer = TypedDict('RtcPeer',
{
'peer_connection': int,
'video_track': int,
'audio_track': int
})
RtcVideoTrack : TypeAlias = int
RtcAudioTrack : TypeAlias = int
ModelOptions : TypeAlias = Dict[str, Any]
ModelSet : TypeAlias = Dict[str, ModelOptions]
ModelInitializer : TypeAlias = NDArray[Any]
+25 -1
View File
@@ -1,4 +1,5 @@
import os
import struct
import subprocess
import tempfile
@@ -7,7 +8,7 @@ import pytest
import facefusion.ffmpeg
from facefusion import process_manager, state_manager
from facefusion.download import conditional_download
from facefusion.ffmpeg import concat_video, extract_frames, merge_video, read_audio_buffer, replace_audio, restore_audio, sanitize_audio, sanitize_image, sanitize_video, spawn_frames
from facefusion.ffmpeg import concat_video, extract_frames, merge_video, read_audio_buffer, replace_audio, restore_audio, sanitize_audio, sanitize_image, sanitize_video, spawn_frames, spawn_stream
from facefusion.ffprobe import probe_entries
from facefusion.filesystem import copy_file, is_image
from facefusion.temp_helper import clear_temp_directory, create_temp_directory, get_temp_file_path, resolve_temp_frame_paths
@@ -253,3 +254,26 @@ def test_sanitize_video() -> None:
assert sanitize_video(file_content, output_paths[1], 'moderate') is True
assert probe_entries(output_paths[1], [ 'codec_name' ]).get('codec_name') == 'hevc'
def test_spawn_stream() -> None: # TODO: Improve test
test_set =\
[
((426, 240), 25, 500),
((640, 360), 30, 1000),
((1280, 720), 30, 2000)
]
for resolution, stream_fps, stream_bitrate in test_set:
encoder = spawn_stream(resolution, stream_fps, stream_bitrate)
frame_size = resolution[0] * resolution[1] * 3
stdout, _ = encoder.communicate(input = bytes(frame_size))
assert len(stdout) > 32
frame_header = stdout[:32]
assert frame_header[:4] == b'DKIF'
output_width = struct.unpack_from('<H', frame_header, 12)[0]
output_height = struct.unpack_from('<H', frame_header, 14)[0]
assert (output_width, output_height) == resolution
+29 -1
View File
@@ -1,7 +1,7 @@
from shutil import which
from facefusion import ffmpeg_builder
from facefusion.ffmpeg_builder import chain, concat, keep_video_alpha, run, select_frame_range, set_audio_quality, set_audio_sample_size, set_stream_mode, set_video_encoder, set_video_fps, set_video_quality
from facefusion.ffmpeg_builder import capture_video, chain, concat, enforce_pixel_format, keep_video_alpha, run, select_frame_range, set_audio_quality, set_audio_sample_size, set_muxer, set_stream_keyframe, set_stream_mode, set_stream_quality, set_video_encoder, set_video_fps, set_video_quality, use_wallclock
def test_run() -> None:
@@ -108,3 +108,31 @@ def test_set_video_quality() -> None:
assert set_video_quality('hevc_videotoolbox', 0) == [ '-b:v', '1024k' ]
assert set_video_quality('hevc_videotoolbox', 50) == [ '-b:v', '25768k' ]
assert set_video_quality('hevc_videotoolbox', 100) == [ '-b:v', '50512k' ]
def test_use_wallclock_timestamps() -> None:
assert use_wallclock() == [ '-use_wallclock_as_timestamps', '1' ]
def test_capture_video() -> None:
assert capture_video() == [ '-f', 'rawvideo', '-pix_fmt', 'rgb24' ]
def test_enforce_pixel_format() -> None:
assert enforce_pixel_format('yuv420p') == [ '-pix_fmt', 'yuv420p' ]
assert enforce_pixel_format('rgb24') == [ '-pix_fmt', 'rgb24' ]
def test_set_stream_quality() -> None:
assert set_stream_quality(500) == [ '-b:v', '500k' ]
assert set_stream_quality(2000) == [ '-b:v', '2000k' ]
def test_set_keyframe_interval() -> None:
assert set_stream_keyframe(30) == [ '-g', '30', '-keyint_min', '30' ]
assert set_stream_keyframe(60) == [ '-g', '60', '-keyint_min', '60' ]
def test_set_output_format() -> None:
assert set_muxer('ivf') == [ '-f', 'ivf' ]
assert set_muxer('mpegts') == [ '-f', 'mpegts' ]