diff --git a/COMPILE_GUIDE.md b/COMPILE_GUIDE.md new file mode 100644 index 00000000..3cf64f9b --- /dev/null +++ b/COMPILE_GUIDE.md @@ -0,0 +1,110 @@ +# Compiling libdatachannel + +Prebuilt DLLs from OBS or pip lack VP8 support. We compile from source to get all codecs (H264, VP8, AV1, Opus). + +## Source + +``` +git clone --depth 1 --recurse-submodules https://github.com/paullouisageneau/libdatachannel.git +cd libdatachannel +``` + +## Windows + +Requirements: Visual Studio Build Tools 2019+ with C++ workload, cmake, ninja (available via conda). + +```cmd +call "C:\Program Files (x86)\Microsoft Visual Studio\2019\BuildTools\VC\Auxiliary\Build\vcvarsall.bat" x64 +cmake -B build -G Ninja -DCMAKE_BUILD_TYPE=Release -DNO_WEBSOCKET=ON -DNO_EXAMPLES=ON -DNO_TESTS=ON -DUSE_NICE=OFF +cmake --build build --config Release +``` + +Output: `build/datachannel.dll` + +Rename to: `windows-x64-openssl-h264-vp8-av1-opus-datachannel-.dll` + +Place in: `bin/` + +## Linux + +Requirements: gcc/g++, cmake, ninja-build, libssl-dev. + +```bash +sudo apt install build-essential cmake ninja-build libssl-dev +cmake -B build -G Ninja -DCMAKE_BUILD_TYPE=Release -DNO_WEBSOCKET=ON -DNO_EXAMPLES=ON -DNO_TESTS=ON -DUSE_NICE=OFF +cmake --build build --config Release +``` + +Output: `build/libdatachannel.so` + +Rename to: `linux-x64-openssl-h264-vp8-av1-opus-libdatachannel-.so` + +Install to: `/usr/local/lib/` or project `bin/` + +If installed to a custom path, run `sudo ldconfig` or set `LD_LIBRARY_PATH`. + +## macOS + +Requirements: Xcode Command Line Tools, cmake, ninja. + +```bash +xcode-select --install +brew install cmake ninja +cmake -B build -G Ninja -DCMAKE_BUILD_TYPE=Release -DNO_WEBSOCKET=ON -DNO_EXAMPLES=ON -DNO_TESTS=ON -DUSE_NICE=OFF +cmake --build build --config Release +``` + +For universal binary (arm64 + x86_64): + +```bash +cmake -B build -G Ninja -DCMAKE_BUILD_TYPE=Release -DNO_WEBSOCKET=ON -DNO_EXAMPLES=ON -DNO_TESTS=ON -DUSE_NICE=OFF -DCMAKE_OSX_ARCHITECTURES="arm64;x86_64" +cmake --build build --config Release +``` + +Output: `build/libdatachannel.dylib` + +Rename to: `macos-universal-openssl-h264-vp8-av1-opus-libdatachannel-.dylib` + +Install to: `/usr/local/lib/` or project `bin/` + +## Naming convention + +``` +----datachannel-. +``` + +- os: `windows`, `linux`, `macos` +- arch: `x64`, `arm64`, `universal` +- tls: `openssl` (default), `gnutls`, `mbedtls` +- codecs: supported packetizers, e.g. `h264-vp8-av1-opus` +- version: libdatachannel version, e.g. `0.24.1` + +## Verifying the build + +```python +import ctypes +lib = ctypes.CDLL('path/to/datachannel.dll') +for fn in ['rtcSetH264Packetizer', 'rtcSetVP8Packetizer', 'rtcSetAV1Packetizer', 'rtcSetOpusPacketizer']: + try: + getattr(lib, fn) + print(f'{fn}: OK') + except AttributeError: + print(f'{fn}: MISSING') +``` + +## CMake flags reference + +| Flag | Default | Purpose | +|---|---|---| +| `NO_WEBSOCKET` | OFF | Disable WebSocket support (not needed) | +| `NO_MEDIA` | OFF | Disable media transport (must be OFF for codecs) | +| `NO_EXAMPLES` | OFF | Skip building examples | +| `NO_TESTS` | OFF | Skip building tests | +| `USE_NICE` | OFF | Use libnice instead of libjuice for ICE | +| `USE_GNUTLS` | OFF | Use GnuTLS instead of OpenSSL | +| `USE_MBEDTLS` | OFF | Use Mbed TLS instead of OpenSSL | + +## Runtime dependencies + +- **libopus**: Required for audio encoding. Install via `conda install -c conda-forge libopus` (Windows) or `apt install libopus-dev` (Linux) or `brew install opus` (macOS). +- **OpenSSL**: Usually bundled or system-provided. On Windows, conda provides it. diff --git a/bin/windows-x64-openssl-h264-vp8-av1-opus-datachannel-0.24.1.dll b/bin/windows-x64-openssl-h264-vp8-av1-opus-datachannel-0.24.1.dll new file mode 100644 index 00000000..6b894f0e Binary files /dev/null and b/bin/windows-x64-openssl-h264-vp8-av1-opus-datachannel-0.24.1.dll differ diff --git a/e2e_video_modes.py b/e2e_video_modes.py index eb8a2fa3..83aa8f7a 100644 --- a/e2e_video_modes.py +++ b/e2e_video_modes.py @@ -1,4 +1,5 @@ import os +import platform import signal import subprocess import sys @@ -10,7 +11,21 @@ from playwright.sync_api import sync_playwright API_PORT : int = 8400 HTML_FILE : str = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'test_stream.html') SOURCE_FILE : str = os.path.join(os.path.dirname(os.path.abspath(__file__)), '.assets', 'examples', 'source.jpg') -VIDEO_FILE : str = '/home/henry/Documents/examples/download.mp4' + +def is_windows() -> bool: + return platform.system().lower() == 'windows' + +if is_windows(): + VIDEO_FILE : str = 'C:\\Users\\info\\Downloads\\face8k.mp4' +else: + VIDEO_FILE : str = '/home/henry/Documents/examples/download.mp4' + + +def safe_print(text : str) -> None: + try: + print(text) + except UnicodeEncodeError: + print(text.encode('ascii', errors='replace').decode('ascii')) MODES =\ [ @@ -26,9 +41,13 @@ MODES =\ def start_api() -> subprocess.Popen: env = os.environ.copy() - env['LD_LIBRARY_PATH'] = '/home/henry/local/lib:' + env.get('LD_LIBRARY_PATH', '') + python_cmd = 'python' if is_windows() else 'python3' + + if not is_windows(): + env['LD_LIBRARY_PATH'] = '/home/henry/local/lib:' + env.get('LD_LIBRARY_PATH', '') + proc = subprocess.Popen( - [ 'python3', 'facefusion.py', 'api', '--api-port', str(API_PORT), '--execution-providers', 'cpu' ], + [ python_cmd, 'facefusion.py', 'api', '--api-port', str(API_PORT), '--execution-providers', 'cpu' ], env = env, stdout = subprocess.PIPE, stderr = subprocess.PIPE @@ -55,7 +74,10 @@ def wait_for_api(timeout : int = 60) -> bool: def stop_api(proc : subprocess.Popen) -> None: - proc.send_signal(signal.SIGTERM) + if is_windows(): + proc.terminate() + else: + proc.send_signal(signal.SIGTERM) try: proc.wait(timeout = 10) @@ -66,14 +88,36 @@ def stop_api(proc : subprocess.Popen) -> None: time.sleep(1) +def kill_port_windows(port : int) -> None: + result = subprocess.run( + [ 'netstat', '-ano' ], + capture_output = True, text = True + ) + + for line in result.stdout.splitlines(): + if ':' + str(port) + ' ' in line and ('LISTENING' in line or 'ESTABLISHED' in line): + parts = line.split() + pid = parts[-1] + + if pid.isdigit() and int(pid) > 0: + subprocess.run([ 'taskkill', '/F', '/PID', pid ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL) + + def kill_stale() -> None: - subprocess.run([ 'fuser', '-k', str(API_PORT) + '/tcp' ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL) - subprocess.run([ 'fuser', '-k', '8889/tcp' ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL) - subprocess.run([ 'fuser', '-k', '8189/udp' ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL) - subprocess.run([ 'fuser', '-k', '9997/tcp' ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL) - subprocess.run([ 'fuser', '-k', '8890/tcp' ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL) - subprocess.run([ 'fuser', '-k', '8891/tcp' ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL) - subprocess.run([ 'fuser', '-k', '8892/tcp' ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL) + ports = [ API_PORT, 8889, 8189, 9997, 8890, 8891, 8892 ] + + if is_windows(): + for port in ports: + kill_port_windows(port) + else: + subprocess.run([ 'fuser', '-k', str(API_PORT) + '/tcp' ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL) + subprocess.run([ 'fuser', '-k', '8889/tcp' ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL) + subprocess.run([ 'fuser', '-k', '8189/udp' ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL) + subprocess.run([ 'fuser', '-k', '9997/tcp' ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL) + subprocess.run([ 'fuser', '-k', '8890/tcp' ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL) + subprocess.run([ 'fuser', '-k', '8891/tcp' ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL) + subprocess.run([ 'fuser', '-k', '8892/tcp' ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL) + time.sleep(2) @@ -258,33 +302,48 @@ def test_mode(mode : str) -> dict: print(' [' + str(i) + 's] frames=' + str(frames_val) + ' fps=' + fps_stat + ' rtc=' + rtc_stat) break - print(' [' + str(i) + 's] ws=' + ws_stat + ' rtc=' + rtc_stat + ' frames=' + frames_stat) + try: + rtc_stats = page.evaluate('''() => { + if (!window.pc) return ''; + return pc.getStats().then(stats => { + var r = ''; + stats.forEach(report => { + if (report.type === 'inbound-rtp' && report.kind === 'video') { + r = 'pkts=' + (report.packetsReceived||0) + ' bytes=' + (report.bytesReceived||0) + ' lost=' + (report.packetsLost||0) + ' dropped=' + (report.framesDropped||0) + ' dec=' + (report.decoderImplementation||'?') + ' kf=' + (report.keyFramesDecoded||0) + ' nacks=' + (report.nackCount||0) + ' plis=' + (report.pliCount||0); + } + }); + return r; + }); + }''') + except Exception: + rtc_stats = '' + print(' [' + str(i) + 's] ws=' + ws_stat + ' rtc=' + rtc_stat + ' frames=' + frames_stat + ' ' + str(rtc_stats)) if not result.get('playback'): log_text = page.locator('#log').text_content() result['error'] = 'no playback after 45s' - print(' FAIL: no playback') - print(' LOG (last 500): ' + log_text[-500:]) + safe_print(' FAIL: no playback') + safe_print(' LOG (last 500): ' + log_text[-500:]) for line in logs[-20:]: - print(' [console] ' + line) + safe_print(' [console] ' + line) browser.close() except Exception as exception: result['error'] = str(exception) - print(' EXCEPTION: ' + str(exception)) + safe_print(' EXCEPTION: ' + str(exception)) stderr_out = '' try: stop_api(api_proc) - stderr_out = api_proc.stderr.read().decode()[-500:] + stderr_out = api_proc.stderr.read().decode('utf-8', errors='ignore')[-5000:] except Exception: pass if stderr_out.strip(): - print(' API stderr: ' + stderr_out) + safe_print(' API stderr: ' + stderr_out) return result diff --git a/facefusion/apis/core.py b/facefusion/apis/core.py index e5f0aeff..5b5afe5c 100644 --- a/facefusion/apis/core.py +++ b/facefusion/apis/core.py @@ -6,22 +6,23 @@ from starlette.middleware import Middleware from starlette.middleware.cors import CORSMiddleware from starlette.routing import Route, WebSocketRoute -from facefusion import mediamtx +from facefusion import logger, mediamtx from facefusion.apis.endpoints.assets import delete_assets, get_asset, get_assets, upload_asset from facefusion.apis.endpoints.capabilities import get_capabilities from facefusion.apis.endpoints.metrics import get_metrics, websocket_metrics from facefusion.apis.endpoints.ping import websocket_ping from facefusion.apis.endpoints.session import create_session, destroy_session, get_session, refresh_session from facefusion.apis.endpoints.state import get_state, set_state -from facefusion import logger +from facefusion.common_helper import is_windows from facefusion.apis.endpoints.stream import websocket_stream, websocket_stream_audio, websocket_stream_live, websocket_stream_mjpeg, websocket_stream_rtc, websocket_stream_rtc_relay, websocket_stream_whip, websocket_stream_whip_dc, websocket_stream_whip_py from facefusion.apis.middlewares.session import create_session_guard @asynccontextmanager async def lifespan(app : Starlette) -> AsyncGenerator[None, None]: - mediamtx.start() - mediamtx.wait_for_ready() + if not is_windows(): + mediamtx.start() + mediamtx.wait_for_ready() try: from facefusion import webrtc_sfu @@ -43,7 +44,9 @@ async def lifespan(app : Starlette) -> AsyncGenerator[None, None]: logger.warn('rtc: ' + str(exception), __name__) yield - mediamtx.stop() + + if not is_windows(): + mediamtx.stop() try: from facefusion import webrtc_sfu diff --git a/facefusion/apis/endpoints/stream.py b/facefusion/apis/endpoints/stream.py index 948a4df0..7007e53d 100644 --- a/facefusion/apis/endpoints/stream.py +++ b/facefusion/apis/endpoints/stream.py @@ -1,5 +1,4 @@ import asyncio -import fcntl import os as _os import threading import time @@ -12,6 +11,7 @@ import numpy from starlette.websockets import WebSocket from facefusion import logger, session_context, session_manager, state_manager +from facefusion.common_helper import is_windows from facefusion.apis.api_helper import get_sec_websocket_protocol from facefusion.apis.session_helper import extract_access_token from facefusion import mediamtx @@ -450,8 +450,11 @@ def run_aiortc_pipeline(latest_frame_holder : list, lock : threading.Lock, stop_ def read_h264_output(process, h264_chunks : List[bytes], h264_lock : threading.Lock) -> None: fd = process.stdout.fileno() - flags = fcntl.fcntl(fd, fcntl.F_GETFL) - fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~_os.O_NONBLOCK) + + if not is_windows(): + import fcntl + flags = fcntl.fcntl(fd, fcntl.F_GETFL) + fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~_os.O_NONBLOCK) while True: chunk = _os.read(fd, 4096) @@ -465,8 +468,11 @@ def read_h264_output(process, h264_chunks : List[bytes], h264_lock : threading.L def read_ivf_frames(process, frame_list : List[bytes], frame_lock : threading.Lock) -> None: fd = process.stdout.fileno() - flags = fcntl.fcntl(fd, fcntl.F_GETFL) - fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~_os.O_NONBLOCK) + + if not is_windows(): + import fcntl + flags = fcntl.fcntl(fd, fcntl.F_GETFL) + fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~_os.O_NONBLOCK) header = b'' @@ -589,26 +595,18 @@ async def websocket_stream_whip_dc(websocket : WebSocket) -> None: await websocket.accept(subprotocol = subprotocol) if source_paths: - from facefusion import whip_relay - from facefusion import rtc as rtc_audio - import socket as sock - stream_path = 'stream/' + session_id - rtp_port = whip_relay.create_session(stream_path) + from facefusion.aiortc_bridge import AiortcBridge - if not rtp_port: - logger.error('failed to create relay session', __name__) - await websocket.close() - return - - audio_sock = sock.socket(sock.AF_INET, sock.SOCK_DGRAM) - relay_addr = ('127.0.0.1', rtp_port) + bridge = AiortcBridge() + await bridge.start() + whep_url = 'http://localhost:' + str(bridge.port) + '/whep' latest_frame_holder : list = [None] whep_sent = False lock = threading.Lock() stop_event = threading.Event() ready_event = threading.Event() - worker = threading.Thread(target = run_h264_dc_pipeline, args = (latest_frame_holder, lock, stop_event, ready_event, 'relay', stream_path, rtp_port), daemon = True) + worker = threading.Thread(target = run_aiortc_pipeline, args = (latest_frame_holder, lock, stop_event, ready_event, bridge), daemon = True) worker.start() try: @@ -616,7 +614,6 @@ async def websocket_stream_whip_dc(websocket : WebSocket) -> None: message = await websocket.receive() if not whep_sent and ready_event.is_set(): - whep_url = whip_relay.get_whep_url(stream_path) await websocket.send_text(whep_url) whep_sent = True @@ -631,27 +628,15 @@ async def websocket_stream_whip_dc(websocket : WebSocket) -> None: latest_frame_holder[0] = frame if data[:2] != JPEG_MAGIC: - rtc_audio.init_opus_encoder() - - with rtc_audio.audio_lock: - rtc_audio.audio_buffer.extend(data) - needed = rtc_audio.OPUS_FRAME_SAMPLES * 2 * 2 - - while len(rtc_audio.audio_buffer) >= needed: - chunk = bytes(rtc_audio.audio_buffer[:needed]) - del rtc_audio.audio_buffer[:needed] - opus_pkt = rtc_audio.encode_opus_frame(chunk) - - if opus_pkt: - audio_sock.sendto(b'\x02' + opus_pkt, relay_addr) + bridge.push_audio(data) except Exception as exception: logger.error(str(exception), __name__) stop_event.set() - audio_sock.close() loop = asyncio.get_running_loop() await loop.run_in_executor(None, worker.join, 10) + await bridge.stop() return await websocket.close() @@ -792,24 +777,28 @@ async def websocket_stream_rtc_relay(websocket : WebSocket) -> None: session_context.set_session_id(session_id) source_paths = state_manager.get_item('source_paths') + logger.info('rtc-relay: session_id=' + str(session_id) + ' source_paths=' + str(bool(source_paths)), __name__) + await websocket.accept(subprotocol = subprotocol) if source_paths: from facefusion import rtc - import socket as sock - stream_path = 'stream/' + session_id - rtp_port = rtc.create_rtp_session(stream_path) - whep_url = 'http://localhost:' + str(rtc.WHEP_PORT) + '/' + stream_path + '/whep' - audio_sock = sock.socket(sock.AF_INET, sock.SOCK_DGRAM) - relay_addr = ('127.0.0.1', rtp_port) + if not rtc.lib: + logger.error('rtc-relay: libdatachannel not loaded', __name__) + await websocket.close() + return + + stream_path = 'stream/' + session_id + rtc.create_session(stream_path) + whep_url = 'http://localhost:' + str(rtc.WHEP_PORT) + '/' + stream_path + '/whep' latest_frame_holder : list = [None] whep_sent = False lock = threading.Lock() stop_event = threading.Event() ready_event = threading.Event() - worker = threading.Thread(target = run_h264_dc_pipeline, args = (latest_frame_holder, lock, stop_event, ready_event, 'relay', stream_path, rtp_port), daemon = True) + worker = threading.Thread(target = run_rtc_direct_pipeline, args = (latest_frame_holder, lock, stop_event, ready_event, stream_path), daemon = True) worker.start() try: @@ -831,25 +820,12 @@ async def websocket_stream_rtc_relay(websocket : WebSocket) -> None: latest_frame_holder[0] = frame if data[:2] != JPEG_MAGIC: - rtc.init_opus_encoder() - - with rtc.audio_lock: - rtc.audio_buffer.extend(data) - needed = rtc.OPUS_FRAME_SAMPLES * 2 * 2 - - while len(rtc.audio_buffer) >= needed: - chunk = bytes(rtc.audio_buffer[:needed]) - del rtc.audio_buffer[:needed] - opus_pkt = rtc.encode_opus_frame(chunk) - - if opus_pkt: - audio_sock.sendto(b'\x02' + opus_pkt, relay_addr) + rtc.send_audio(stream_path, data) except Exception as exception: logger.error(str(exception), __name__) stop_event.set() - audio_sock.close() loop = asyncio.get_running_loop() await loop.run_in_executor(None, worker.join, 10) rtc.destroy_session(stream_path) diff --git a/facefusion/apis/stream_helper.py b/facefusion/apis/stream_helper.py index de52b9f4..c1a110b4 100644 --- a/facefusion/apis/stream_helper.py +++ b/facefusion/apis/stream_helper.py @@ -7,6 +7,7 @@ from typing import List, Optional, Tuple import cv2 from facefusion import ffmpeg_builder +from facefusion.common_helper import is_windows from facefusion.streamer import process_vision_frame from facefusion.types import VisionFrame @@ -82,7 +83,13 @@ def create_whip_encoder(width : int, height : int, stream_fps : int, stream_qual ffmpeg_builder.set_output(whip_url) ) commands = ffmpeg_builder.run(commands) - process = subprocess.Popen(commands, stdin = subprocess.PIPE, stderr = subprocess.PIPE, pass_fds = (audio_read_fd,)) + + if is_windows(): + os.set_inheritable(audio_read_fd, True) + process = subprocess.Popen(commands, stdin = subprocess.PIPE, stderr = subprocess.PIPE, close_fds = False) + else: + process = subprocess.Popen(commands, stdin = subprocess.PIPE, stderr = subprocess.PIPE, pass_fds = (audio_read_fd,)) + os.close(audio_read_fd) return process, audio_write_fd @@ -130,7 +137,13 @@ def create_fmp4_encoder(width : int, height : int, stream_fps : int, stream_qual ffmpeg_builder.set_output('-') ) commands = ffmpeg_builder.run(commands) - process = subprocess.Popen(commands, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE, pass_fds = (audio_read_fd,)) + + if is_windows(): + os.set_inheritable(audio_read_fd, True) + process = subprocess.Popen(commands, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE, close_fds = False) + else: + process = subprocess.Popen(commands, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE, pass_fds = (audio_read_fd,)) + os.close(audio_read_fd) return process, audio_write_fd diff --git a/facefusion/rtc.py b/facefusion/rtc.py index 7c1f322d..a99785a9 100644 --- a/facefusion/rtc.py +++ b/facefusion/rtc.py @@ -7,6 +7,7 @@ from http.server import BaseHTTPRequestHandler, HTTPServer from typing import Dict, List, Optional, TypeAlias from facefusion import logger +from facefusion.common_helper import is_windows RtcLib : TypeAlias = ctypes.CDLL WHEP_PORT : int = 8892 @@ -76,30 +77,34 @@ class RtcPacketizerInit(ctypes.Structure): ('obuPacketization', ctypes.c_int), ('playoutDelayId', ctypes.c_uint8), ('playoutDelayMin', ctypes.c_uint16), - ('playoutDelayMax', ctypes.c_uint16), - ('colorSpaceId', ctypes.c_uint8), - ('colorChromaSitingHorz', ctypes.c_uint8), - ('colorChromaSitingVert', ctypes.c_uint8), - ('colorRange', ctypes.c_uint8), - ('colorPrimaries', ctypes.c_uint8), - ('colorTransfer', ctypes.c_uint8), - ('colorMatrix', ctypes.c_uint8) + ('playoutDelayMax', ctypes.c_uint16) ] def find_library() -> Optional[str]: - lib_path = ctypes.util.find_library('datachannel') + project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) - if lib_path: - return lib_path + if is_windows(): + bin_dir = os.path.join(project_root, 'bin') - search_paths =\ - [ - '/home/henry/local/lib/libdatachannel.so', - '/usr/local/lib/libdatachannel.so', - '/usr/lib/libdatachannel.so', - '/usr/lib/x86_64-linux-gnu/libdatachannel.so' - ] + if os.path.isdir(bin_dir): + for entry in sorted(os.listdir(bin_dir), reverse = True): + if 'datachannel' in entry and entry.endswith('.dll'): + return os.path.join(bin_dir, entry) + + search_paths =\ + [ + os.path.join(os.environ.get('CONDA_PREFIX', ''), 'Library', 'bin', 'datachannel.dll'), + os.path.join(os.environ.get('CONDA_PREFIX', ''), 'Library', 'lib', 'datachannel.dll') + ] + else: + search_paths =\ + [ + '/home/henry/local/lib/libdatachannel.so', + '/usr/local/lib/libdatachannel.so', + '/usr/lib/libdatachannel.so', + '/usr/lib/x86_64-linux-gnu/libdatachannel.so' + ] for path in search_paths: if os.path.isfile(path): @@ -111,10 +116,13 @@ def find_library() -> Optional[str]: def load_library() -> bool: global lib + if lib: + return True + lib_path = find_library() if not lib_path: - logger.warn('libdatachannel.so not found', __name__) + logger.warn('libdatachannel not found', __name__) return False lib = ctypes.CDLL(lib_path) @@ -352,7 +360,15 @@ def init_opus_encoder() -> None: if opus_enc: return - libopus_handle = ctypes.CDLL(ctypes.util.find_library('opus')) + opus_path = ctypes.util.find_library('opus') + + if not opus_path: + if not hasattr(init_opus_encoder, '_warned'): + logger.warn('libopus not found, audio encoding disabled', __name__) + init_opus_encoder._warned = True + return + + libopus_handle = ctypes.CDLL(opus_path) libopus_handle.opus_encoder_create.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.c_int, ctypes.POINTER(ctypes.c_int)] libopus_handle.opus_encoder_create.restype = ctypes.c_void_p libopus_handle.opus_encode.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.c_int, ctypes.POINTER(ctypes.c_ubyte), ctypes.c_int32] @@ -434,41 +450,97 @@ def send_vp8_frame(stream_path : str, frame_data : bytes) -> None: send_h264_frame(stream_path, frame_data) +def find_nal_starts(data : bytes) -> List: + starts = [] + i = 0 + + while i < len(data) - 3: + if data[i] == 0 and data[i + 1] == 0: + if data[i + 2] == 1: + starts.append((i, 3)) + i += 3 + continue + if i < len(data) - 4 and data[i + 2] == 0 and data[i + 3] == 1: + starts.append((i, 4)) + i += 4 + continue + + i += 1 + + return starts + + def send_h264_frame(stream_path : str, frame_data : bytes) -> None: + global send_start_time + session = sessions.get(stream_path) if not session: return + viewers = session.get('viewers') + + if not viewers: + return + prev = h264_au_buffer.get(stream_path, b'') buf = prev + frame_data + nal_starts = find_nal_starts(buf) - au_starts = [] - i = 0 - - while i < len(buf) - 4: - if buf[i] == 0 and buf[i + 1] == 0 and buf[i + 2] == 0 and buf[i + 3] == 1 and i + 4 < len(buf): - nal_type = buf[i + 4] & 0x1f - - if nal_type == 7 or nal_type == 5: - au_starts.append(i) - - i += 1 - - if len(au_starts) < 2: + if len(nal_starts) < 2: h264_au_buffer[stream_path] = buf return - for j in range(len(au_starts) - 1): - au = buf[au_starts[j]:au_starts[j + 1]] + au_boundaries = [] - for viewer in session.get('viewers', []): - tracks = viewer.get('tracks', []) + for idx, (pos, sc_len) in enumerate(nal_starts): + nal_type = buf[pos + sc_len] & 0x1f - if tracks: - lib.rtcSendMessage(tracks[0], au, len(au)) + if nal_type == 7: + au_boundaries.append(idx) - h264_au_buffer[stream_path] = buf[au_starts[-1]:] + if len(au_boundaries) < 2: + h264_au_buffer[stream_path] = buf + return + + if send_start_time == 0: + send_start_time = _time.monotonic() + + elapsed = _time.monotonic() - send_start_time + frame_duration = 1.0 / 30.0 + + for k in range(len(au_boundaries) - 1): + start_nal = au_boundaries[k] + end_nal = au_boundaries[k + 1] + timestamp = int((elapsed + k * frame_duration) * 90000) & 0xFFFFFFFF + + nalu_parts = [] + + for nal_idx in range(start_nal, end_nal): + nal_pos = nal_starts[nal_idx][0] + nal_sc_len = nal_starts[nal_idx][1] + + if nal_idx + 1 < len(nal_starts): + nal_end = nal_starts[nal_idx + 1][0] + else: + nal_end = len(buf) + + nalu = buf[nal_pos + nal_sc_len:nal_end] + + if len(nalu) > 0: + nalu_parts.append(len(nalu).to_bytes(4, 'big') + nalu) + + if nalu_parts: + frame_msg = b''.join(nalu_parts) + + for viewer in viewers: + tracks = viewer.get('tracks', []) + + if tracks: + lib.rtcSendMessage(tracks[0], frame_msg, len(frame_msg)) + + last_boundary = au_boundaries[-1] + h264_au_buffer[stream_path] = buf[nal_starts[last_boundary][0]:] def destroy_session(stream_path : str) -> None: @@ -560,6 +632,13 @@ def handle_whep_offer(stream_path : str, sdp_offer : str) -> Optional[str]: lib.rtcSetOpusPacketizer(audio_track, ctypes.byref(audio_packetizer)) lib.rtcChainRtcpSrReporter(audio_track) + def on_track_open(track_id, user_ptr): + logger.info('track ' + str(track_id) + ' opened', __name__) + + track_open_cb = ctypes.CFUNCTYPE(None, ctypes.c_int, ctypes.c_void_p)(on_track_open) + callback_refs.append(track_open_cb) + lib.rtcSetOpenCallback(video_track, track_open_cb) + viewer['tracks'] = [video_track] viewer['audio_track'] = audio_track session['viewers'].append(viewer) @@ -586,6 +665,9 @@ def handle_whep_offer(stream_path : str, sdp_offer : str) -> Optional[str]: def start() -> None: global running, http_thread + if running: + return + if not load_library(): return diff --git a/facefusion/whip_relay.py b/facefusion/whip_relay.py index 230ba726..d3d4ce95 100644 --- a/facefusion/whip_relay.py +++ b/facefusion/whip_relay.py @@ -1,99 +1,62 @@ -import os -import shutil -import subprocess -import time +import threading from typing import Optional -import httpx - from facefusion import logger RELAY_PORT : int = 8891 -RELAY_BINARY : str = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'tools', 'whip_relay') -RELAY_PROCESS : Optional[subprocess.Popen[bytes]] = None +_started : bool = False +_lock : threading.Lock = threading.Lock() def get_whip_url(stream_path : str) -> str: - return 'http://localhost:' + str(RELAY_PORT) + '/' + stream_path + '/whip' + from facefusion import rtc + return 'http://localhost:' + str(rtc.WHEP_PORT) + '/' + stream_path + '/whip' def get_whep_url(stream_path : str) -> str: - return 'http://localhost:' + str(RELAY_PORT) + '/' + stream_path + '/whep' - - -def resolve_binary() -> str: - relay_path = shutil.which('whip_relay') - - if relay_path: - return relay_path - - if os.path.isfile(RELAY_BINARY): - return RELAY_BINARY - return RELAY_BINARY + from facefusion import rtc + return 'http://localhost:' + str(rtc.WHEP_PORT) + '/' + stream_path + '/whep' def start() -> None: - global RELAY_PROCESS + global _started - subprocess.run([ 'fuser', '-k', str(RELAY_PORT) + '/tcp' ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL) - time.sleep(0.5) + from facefusion import rtc - relay_binary = resolve_binary() + if not rtc.lib: + if not rtc.load_library(): + logger.warn('whip relay: libdatachannel not available', __name__) + return - if not os.path.isfile(relay_binary): - logger.warn('whip_relay binary not found at ' + relay_binary + ', skipping', __name__) - return + if not rtc.running: + rtc.start() - env = os.environ.copy() - env['LD_LIBRARY_PATH'] = '/home/henry/local/lib:' + env.get('LD_LIBRARY_PATH', '') - RELAY_PROCESS = subprocess.Popen( - [ relay_binary, str(RELAY_PORT) ], - env = env, - stdout = subprocess.PIPE, - stderr = subprocess.PIPE - ) - logger.info('whip relay started on port ' + str(RELAY_PORT), __name__) + _started = True + logger.info('whip relay (python) ready on port ' + str(rtc.WHEP_PORT), __name__) def stop() -> None: - global RELAY_PROCESS - - if RELAY_PROCESS: - RELAY_PROCESS.terminate() - RELAY_PROCESS.wait() - RELAY_PROCESS = None + global _started + _started = False def wait_for_ready() -> bool: - for _ in range(10): - try: - response = httpx.get('http://localhost:' + str(RELAY_PORT) + '/health', timeout = 1) - - if response.status_code == 200: - return True - except Exception: - pass - time.sleep(0.5) - return False + return _started def is_session_ready(stream_path : str) -> bool: - try: - response = httpx.get('http://localhost:' + str(RELAY_PORT) + '/session/' + stream_path, timeout = 1) - - if response.status_code == 200: - return True - except Exception: - pass - return False + from facefusion import rtc + return stream_path in rtc.sessions def create_session(stream_path : str) -> int: - try: - response = httpx.post('http://localhost:' + str(RELAY_PORT) + '/' + stream_path + '/create', timeout = 5) + from facefusion import rtc - if response.status_code == 200: - return int(response.text) - except Exception: - pass - return 0 + if not _started: + start() + + if not rtc.lib: + return 0 + + rtp_port = rtc.create_rtp_session(stream_path) + return rtp_port