finally working under windows

This commit is contained in:
henryruhs
2026-03-24 17:07:02 +01:00
parent 14816f787c
commit 452347c032
8 changed files with 393 additions and 187 deletions
+110
View File
@@ -0,0 +1,110 @@
# Compiling libdatachannel
Prebuilt DLLs from OBS or pip lack VP8 support. We compile from source to get all codecs (H264, VP8, AV1, Opus).
## Source
```
git clone --depth 1 --recurse-submodules https://github.com/paullouisageneau/libdatachannel.git
cd libdatachannel
```
## Windows
Requirements: Visual Studio Build Tools 2019+ with C++ workload, cmake, ninja (available via conda).
```cmd
call "C:\Program Files (x86)\Microsoft Visual Studio\2019\BuildTools\VC\Auxiliary\Build\vcvarsall.bat" x64
cmake -B build -G Ninja -DCMAKE_BUILD_TYPE=Release -DNO_WEBSOCKET=ON -DNO_EXAMPLES=ON -DNO_TESTS=ON -DUSE_NICE=OFF
cmake --build build --config Release
```
Output: `build/datachannel.dll`
Rename to: `windows-x64-openssl-h264-vp8-av1-opus-datachannel-<version>.dll`
Place in: `bin/`
## Linux
Requirements: gcc/g++, cmake, ninja-build, libssl-dev.
```bash
sudo apt install build-essential cmake ninja-build libssl-dev
cmake -B build -G Ninja -DCMAKE_BUILD_TYPE=Release -DNO_WEBSOCKET=ON -DNO_EXAMPLES=ON -DNO_TESTS=ON -DUSE_NICE=OFF
cmake --build build --config Release
```
Output: `build/libdatachannel.so`
Rename to: `linux-x64-openssl-h264-vp8-av1-opus-libdatachannel-<version>.so`
Install to: `/usr/local/lib/` or project `bin/`
If installed to a custom path, run `sudo ldconfig` or set `LD_LIBRARY_PATH`.
## macOS
Requirements: Xcode Command Line Tools, cmake, ninja.
```bash
xcode-select --install
brew install cmake ninja
cmake -B build -G Ninja -DCMAKE_BUILD_TYPE=Release -DNO_WEBSOCKET=ON -DNO_EXAMPLES=ON -DNO_TESTS=ON -DUSE_NICE=OFF
cmake --build build --config Release
```
For universal binary (arm64 + x86_64):
```bash
cmake -B build -G Ninja -DCMAKE_BUILD_TYPE=Release -DNO_WEBSOCKET=ON -DNO_EXAMPLES=ON -DNO_TESTS=ON -DUSE_NICE=OFF -DCMAKE_OSX_ARCHITECTURES="arm64;x86_64"
cmake --build build --config Release
```
Output: `build/libdatachannel.dylib`
Rename to: `macos-universal-openssl-h264-vp8-av1-opus-libdatachannel-<version>.dylib`
Install to: `/usr/local/lib/` or project `bin/`
## Naming convention
```
<os>-<arch>-<tls>-<codecs>-datachannel-<version>.<ext>
```
- os: `windows`, `linux`, `macos`
- arch: `x64`, `arm64`, `universal`
- tls: `openssl` (default), `gnutls`, `mbedtls`
- codecs: supported packetizers, e.g. `h264-vp8-av1-opus`
- version: libdatachannel version, e.g. `0.24.1`
## Verifying the build
```python
import ctypes
lib = ctypes.CDLL('path/to/datachannel.dll')
for fn in ['rtcSetH264Packetizer', 'rtcSetVP8Packetizer', 'rtcSetAV1Packetizer', 'rtcSetOpusPacketizer']:
try:
getattr(lib, fn)
print(f'{fn}: OK')
except AttributeError:
print(f'{fn}: MISSING')
```
## CMake flags reference
| Flag | Default | Purpose |
|---|---|---|
| `NO_WEBSOCKET` | OFF | Disable WebSocket support (not needed) |
| `NO_MEDIA` | OFF | Disable media transport (must be OFF for codecs) |
| `NO_EXAMPLES` | OFF | Skip building examples |
| `NO_TESTS` | OFF | Skip building tests |
| `USE_NICE` | OFF | Use libnice instead of libjuice for ICE |
| `USE_GNUTLS` | OFF | Use GnuTLS instead of OpenSSL |
| `USE_MBEDTLS` | OFF | Use Mbed TLS instead of OpenSSL |
## Runtime dependencies
- **libopus**: Required for audio encoding. Install via `conda install -c conda-forge libopus` (Windows) or `apt install libopus-dev` (Linux) or `brew install opus` (macOS).
- **OpenSSL**: Usually bundled or system-provided. On Windows, conda provides it.
+77 -18
View File
@@ -1,4 +1,5 @@
import os
import platform
import signal
import subprocess
import sys
@@ -10,7 +11,21 @@ from playwright.sync_api import sync_playwright
API_PORT : int = 8400
HTML_FILE : str = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'test_stream.html')
SOURCE_FILE : str = os.path.join(os.path.dirname(os.path.abspath(__file__)), '.assets', 'examples', 'source.jpg')
VIDEO_FILE : str = '/home/henry/Documents/examples/download.mp4'
def is_windows() -> bool:
return platform.system().lower() == 'windows'
if is_windows():
VIDEO_FILE : str = 'C:\\Users\\info\\Downloads\\face8k.mp4'
else:
VIDEO_FILE : str = '/home/henry/Documents/examples/download.mp4'
def safe_print(text : str) -> None:
try:
print(text)
except UnicodeEncodeError:
print(text.encode('ascii', errors='replace').decode('ascii'))
MODES =\
[
@@ -26,9 +41,13 @@ MODES =\
def start_api() -> subprocess.Popen:
env = os.environ.copy()
env['LD_LIBRARY_PATH'] = '/home/henry/local/lib:' + env.get('LD_LIBRARY_PATH', '')
python_cmd = 'python' if is_windows() else 'python3'
if not is_windows():
env['LD_LIBRARY_PATH'] = '/home/henry/local/lib:' + env.get('LD_LIBRARY_PATH', '')
proc = subprocess.Popen(
[ 'python3', 'facefusion.py', 'api', '--api-port', str(API_PORT), '--execution-providers', 'cpu' ],
[ python_cmd, 'facefusion.py', 'api', '--api-port', str(API_PORT), '--execution-providers', 'cpu' ],
env = env,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE
@@ -55,7 +74,10 @@ def wait_for_api(timeout : int = 60) -> bool:
def stop_api(proc : subprocess.Popen) -> None:
proc.send_signal(signal.SIGTERM)
if is_windows():
proc.terminate()
else:
proc.send_signal(signal.SIGTERM)
try:
proc.wait(timeout = 10)
@@ -66,14 +88,36 @@ def stop_api(proc : subprocess.Popen) -> None:
time.sleep(1)
def kill_port_windows(port : int) -> None:
result = subprocess.run(
[ 'netstat', '-ano' ],
capture_output = True, text = True
)
for line in result.stdout.splitlines():
if ':' + str(port) + ' ' in line and ('LISTENING' in line or 'ESTABLISHED' in line):
parts = line.split()
pid = parts[-1]
if pid.isdigit() and int(pid) > 0:
subprocess.run([ 'taskkill', '/F', '/PID', pid ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL)
def kill_stale() -> None:
subprocess.run([ 'fuser', '-k', str(API_PORT) + '/tcp' ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL)
subprocess.run([ 'fuser', '-k', '8889/tcp' ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL)
subprocess.run([ 'fuser', '-k', '8189/udp' ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL)
subprocess.run([ 'fuser', '-k', '9997/tcp' ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL)
subprocess.run([ 'fuser', '-k', '8890/tcp' ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL)
subprocess.run([ 'fuser', '-k', '8891/tcp' ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL)
subprocess.run([ 'fuser', '-k', '8892/tcp' ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL)
ports = [ API_PORT, 8889, 8189, 9997, 8890, 8891, 8892 ]
if is_windows():
for port in ports:
kill_port_windows(port)
else:
subprocess.run([ 'fuser', '-k', str(API_PORT) + '/tcp' ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL)
subprocess.run([ 'fuser', '-k', '8889/tcp' ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL)
subprocess.run([ 'fuser', '-k', '8189/udp' ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL)
subprocess.run([ 'fuser', '-k', '9997/tcp' ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL)
subprocess.run([ 'fuser', '-k', '8890/tcp' ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL)
subprocess.run([ 'fuser', '-k', '8891/tcp' ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL)
subprocess.run([ 'fuser', '-k', '8892/tcp' ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL)
time.sleep(2)
@@ -258,33 +302,48 @@ def test_mode(mode : str) -> dict:
print(' [' + str(i) + 's] frames=' + str(frames_val) + ' fps=' + fps_stat + ' rtc=' + rtc_stat)
break
print(' [' + str(i) + 's] ws=' + ws_stat + ' rtc=' + rtc_stat + ' frames=' + frames_stat)
try:
rtc_stats = page.evaluate('''() => {
if (!window.pc) return '';
return pc.getStats().then(stats => {
var r = '';
stats.forEach(report => {
if (report.type === 'inbound-rtp' && report.kind === 'video') {
r = 'pkts=' + (report.packetsReceived||0) + ' bytes=' + (report.bytesReceived||0) + ' lost=' + (report.packetsLost||0) + ' dropped=' + (report.framesDropped||0) + ' dec=' + (report.decoderImplementation||'?') + ' kf=' + (report.keyFramesDecoded||0) + ' nacks=' + (report.nackCount||0) + ' plis=' + (report.pliCount||0);
}
});
return r;
});
}''')
except Exception:
rtc_stats = ''
print(' [' + str(i) + 's] ws=' + ws_stat + ' rtc=' + rtc_stat + ' frames=' + frames_stat + ' ' + str(rtc_stats))
if not result.get('playback'):
log_text = page.locator('#log').text_content()
result['error'] = 'no playback after 45s'
print(' FAIL: no playback')
print(' LOG (last 500): ' + log_text[-500:])
safe_print(' FAIL: no playback')
safe_print(' LOG (last 500): ' + log_text[-500:])
for line in logs[-20:]:
print(' [console] ' + line)
safe_print(' [console] ' + line)
browser.close()
except Exception as exception:
result['error'] = str(exception)
print(' EXCEPTION: ' + str(exception))
safe_print(' EXCEPTION: ' + str(exception))
stderr_out = ''
try:
stop_api(api_proc)
stderr_out = api_proc.stderr.read().decode()[-500:]
stderr_out = api_proc.stderr.read().decode('utf-8', errors='ignore')[-5000:]
except Exception:
pass
if stderr_out.strip():
print(' API stderr: ' + stderr_out)
safe_print(' API stderr: ' + stderr_out)
return result
+8 -5
View File
@@ -6,22 +6,23 @@ from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware
from starlette.routing import Route, WebSocketRoute
from facefusion import mediamtx
from facefusion import logger, mediamtx
from facefusion.apis.endpoints.assets import delete_assets, get_asset, get_assets, upload_asset
from facefusion.apis.endpoints.capabilities import get_capabilities
from facefusion.apis.endpoints.metrics import get_metrics, websocket_metrics
from facefusion.apis.endpoints.ping import websocket_ping
from facefusion.apis.endpoints.session import create_session, destroy_session, get_session, refresh_session
from facefusion.apis.endpoints.state import get_state, set_state
from facefusion import logger
from facefusion.common_helper import is_windows
from facefusion.apis.endpoints.stream import websocket_stream, websocket_stream_audio, websocket_stream_live, websocket_stream_mjpeg, websocket_stream_rtc, websocket_stream_rtc_relay, websocket_stream_whip, websocket_stream_whip_dc, websocket_stream_whip_py
from facefusion.apis.middlewares.session import create_session_guard
@asynccontextmanager
async def lifespan(app : Starlette) -> AsyncGenerator[None, None]:
mediamtx.start()
mediamtx.wait_for_ready()
if not is_windows():
mediamtx.start()
mediamtx.wait_for_ready()
try:
from facefusion import webrtc_sfu
@@ -43,7 +44,9 @@ async def lifespan(app : Starlette) -> AsyncGenerator[None, None]:
logger.warn('rtc: ' + str(exception), __name__)
yield
mediamtx.stop()
if not is_windows():
mediamtx.stop()
try:
from facefusion import webrtc_sfu
+30 -54
View File
@@ -1,5 +1,4 @@
import asyncio
import fcntl
import os as _os
import threading
import time
@@ -12,6 +11,7 @@ import numpy
from starlette.websockets import WebSocket
from facefusion import logger, session_context, session_manager, state_manager
from facefusion.common_helper import is_windows
from facefusion.apis.api_helper import get_sec_websocket_protocol
from facefusion.apis.session_helper import extract_access_token
from facefusion import mediamtx
@@ -450,8 +450,11 @@ def run_aiortc_pipeline(latest_frame_holder : list, lock : threading.Lock, stop_
def read_h264_output(process, h264_chunks : List[bytes], h264_lock : threading.Lock) -> None:
fd = process.stdout.fileno()
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~_os.O_NONBLOCK)
if not is_windows():
import fcntl
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~_os.O_NONBLOCK)
while True:
chunk = _os.read(fd, 4096)
@@ -465,8 +468,11 @@ def read_h264_output(process, h264_chunks : List[bytes], h264_lock : threading.L
def read_ivf_frames(process, frame_list : List[bytes], frame_lock : threading.Lock) -> None:
fd = process.stdout.fileno()
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~_os.O_NONBLOCK)
if not is_windows():
import fcntl
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~_os.O_NONBLOCK)
header = b''
@@ -589,26 +595,18 @@ async def websocket_stream_whip_dc(websocket : WebSocket) -> None:
await websocket.accept(subprotocol = subprotocol)
if source_paths:
from facefusion import whip_relay
from facefusion import rtc as rtc_audio
import socket as sock
stream_path = 'stream/' + session_id
rtp_port = whip_relay.create_session(stream_path)
from facefusion.aiortc_bridge import AiortcBridge
if not rtp_port:
logger.error('failed to create relay session', __name__)
await websocket.close()
return
audio_sock = sock.socket(sock.AF_INET, sock.SOCK_DGRAM)
relay_addr = ('127.0.0.1', rtp_port)
bridge = AiortcBridge()
await bridge.start()
whep_url = 'http://localhost:' + str(bridge.port) + '/whep'
latest_frame_holder : list = [None]
whep_sent = False
lock = threading.Lock()
stop_event = threading.Event()
ready_event = threading.Event()
worker = threading.Thread(target = run_h264_dc_pipeline, args = (latest_frame_holder, lock, stop_event, ready_event, 'relay', stream_path, rtp_port), daemon = True)
worker = threading.Thread(target = run_aiortc_pipeline, args = (latest_frame_holder, lock, stop_event, ready_event, bridge), daemon = True)
worker.start()
try:
@@ -616,7 +614,6 @@ async def websocket_stream_whip_dc(websocket : WebSocket) -> None:
message = await websocket.receive()
if not whep_sent and ready_event.is_set():
whep_url = whip_relay.get_whep_url(stream_path)
await websocket.send_text(whep_url)
whep_sent = True
@@ -631,27 +628,15 @@ async def websocket_stream_whip_dc(websocket : WebSocket) -> None:
latest_frame_holder[0] = frame
if data[:2] != JPEG_MAGIC:
rtc_audio.init_opus_encoder()
with rtc_audio.audio_lock:
rtc_audio.audio_buffer.extend(data)
needed = rtc_audio.OPUS_FRAME_SAMPLES * 2 * 2
while len(rtc_audio.audio_buffer) >= needed:
chunk = bytes(rtc_audio.audio_buffer[:needed])
del rtc_audio.audio_buffer[:needed]
opus_pkt = rtc_audio.encode_opus_frame(chunk)
if opus_pkt:
audio_sock.sendto(b'\x02' + opus_pkt, relay_addr)
bridge.push_audio(data)
except Exception as exception:
logger.error(str(exception), __name__)
stop_event.set()
audio_sock.close()
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, worker.join, 10)
await bridge.stop()
return
await websocket.close()
@@ -792,24 +777,28 @@ async def websocket_stream_rtc_relay(websocket : WebSocket) -> None:
session_context.set_session_id(session_id)
source_paths = state_manager.get_item('source_paths')
logger.info('rtc-relay: session_id=' + str(session_id) + ' source_paths=' + str(bool(source_paths)), __name__)
await websocket.accept(subprotocol = subprotocol)
if source_paths:
from facefusion import rtc
import socket as sock
stream_path = 'stream/' + session_id
rtp_port = rtc.create_rtp_session(stream_path)
whep_url = 'http://localhost:' + str(rtc.WHEP_PORT) + '/' + stream_path + '/whep'
audio_sock = sock.socket(sock.AF_INET, sock.SOCK_DGRAM)
relay_addr = ('127.0.0.1', rtp_port)
if not rtc.lib:
logger.error('rtc-relay: libdatachannel not loaded', __name__)
await websocket.close()
return
stream_path = 'stream/' + session_id
rtc.create_session(stream_path)
whep_url = 'http://localhost:' + str(rtc.WHEP_PORT) + '/' + stream_path + '/whep'
latest_frame_holder : list = [None]
whep_sent = False
lock = threading.Lock()
stop_event = threading.Event()
ready_event = threading.Event()
worker = threading.Thread(target = run_h264_dc_pipeline, args = (latest_frame_holder, lock, stop_event, ready_event, 'relay', stream_path, rtp_port), daemon = True)
worker = threading.Thread(target = run_rtc_direct_pipeline, args = (latest_frame_holder, lock, stop_event, ready_event, stream_path), daemon = True)
worker.start()
try:
@@ -831,25 +820,12 @@ async def websocket_stream_rtc_relay(websocket : WebSocket) -> None:
latest_frame_holder[0] = frame
if data[:2] != JPEG_MAGIC:
rtc.init_opus_encoder()
with rtc.audio_lock:
rtc.audio_buffer.extend(data)
needed = rtc.OPUS_FRAME_SAMPLES * 2 * 2
while len(rtc.audio_buffer) >= needed:
chunk = bytes(rtc.audio_buffer[:needed])
del rtc.audio_buffer[:needed]
opus_pkt = rtc.encode_opus_frame(chunk)
if opus_pkt:
audio_sock.sendto(b'\x02' + opus_pkt, relay_addr)
rtc.send_audio(stream_path, data)
except Exception as exception:
logger.error(str(exception), __name__)
stop_event.set()
audio_sock.close()
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, worker.join, 10)
rtc.destroy_session(stream_path)
+15 -2
View File
@@ -7,6 +7,7 @@ from typing import List, Optional, Tuple
import cv2
from facefusion import ffmpeg_builder
from facefusion.common_helper import is_windows
from facefusion.streamer import process_vision_frame
from facefusion.types import VisionFrame
@@ -82,7 +83,13 @@ def create_whip_encoder(width : int, height : int, stream_fps : int, stream_qual
ffmpeg_builder.set_output(whip_url)
)
commands = ffmpeg_builder.run(commands)
process = subprocess.Popen(commands, stdin = subprocess.PIPE, stderr = subprocess.PIPE, pass_fds = (audio_read_fd,))
if is_windows():
os.set_inheritable(audio_read_fd, True)
process = subprocess.Popen(commands, stdin = subprocess.PIPE, stderr = subprocess.PIPE, close_fds = False)
else:
process = subprocess.Popen(commands, stdin = subprocess.PIPE, stderr = subprocess.PIPE, pass_fds = (audio_read_fd,))
os.close(audio_read_fd)
return process, audio_write_fd
@@ -130,7 +137,13 @@ def create_fmp4_encoder(width : int, height : int, stream_fps : int, stream_qual
ffmpeg_builder.set_output('-')
)
commands = ffmpeg_builder.run(commands)
process = subprocess.Popen(commands, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE, pass_fds = (audio_read_fd,))
if is_windows():
os.set_inheritable(audio_read_fd, True)
process = subprocess.Popen(commands, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE, close_fds = False)
else:
process = subprocess.Popen(commands, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE, pass_fds = (audio_read_fd,))
os.close(audio_read_fd)
return process, audio_write_fd
+122 -40
View File
@@ -7,6 +7,7 @@ from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import Dict, List, Optional, TypeAlias
from facefusion import logger
from facefusion.common_helper import is_windows
RtcLib : TypeAlias = ctypes.CDLL
WHEP_PORT : int = 8892
@@ -76,30 +77,34 @@ class RtcPacketizerInit(ctypes.Structure):
('obuPacketization', ctypes.c_int),
('playoutDelayId', ctypes.c_uint8),
('playoutDelayMin', ctypes.c_uint16),
('playoutDelayMax', ctypes.c_uint16),
('colorSpaceId', ctypes.c_uint8),
('colorChromaSitingHorz', ctypes.c_uint8),
('colorChromaSitingVert', ctypes.c_uint8),
('colorRange', ctypes.c_uint8),
('colorPrimaries', ctypes.c_uint8),
('colorTransfer', ctypes.c_uint8),
('colorMatrix', ctypes.c_uint8)
('playoutDelayMax', ctypes.c_uint16)
]
def find_library() -> Optional[str]:
lib_path = ctypes.util.find_library('datachannel')
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if lib_path:
return lib_path
if is_windows():
bin_dir = os.path.join(project_root, 'bin')
search_paths =\
[
'/home/henry/local/lib/libdatachannel.so',
'/usr/local/lib/libdatachannel.so',
'/usr/lib/libdatachannel.so',
'/usr/lib/x86_64-linux-gnu/libdatachannel.so'
]
if os.path.isdir(bin_dir):
for entry in sorted(os.listdir(bin_dir), reverse = True):
if 'datachannel' in entry and entry.endswith('.dll'):
return os.path.join(bin_dir, entry)
search_paths =\
[
os.path.join(os.environ.get('CONDA_PREFIX', ''), 'Library', 'bin', 'datachannel.dll'),
os.path.join(os.environ.get('CONDA_PREFIX', ''), 'Library', 'lib', 'datachannel.dll')
]
else:
search_paths =\
[
'/home/henry/local/lib/libdatachannel.so',
'/usr/local/lib/libdatachannel.so',
'/usr/lib/libdatachannel.so',
'/usr/lib/x86_64-linux-gnu/libdatachannel.so'
]
for path in search_paths:
if os.path.isfile(path):
@@ -111,10 +116,13 @@ def find_library() -> Optional[str]:
def load_library() -> bool:
global lib
if lib:
return True
lib_path = find_library()
if not lib_path:
logger.warn('libdatachannel.so not found', __name__)
logger.warn('libdatachannel not found', __name__)
return False
lib = ctypes.CDLL(lib_path)
@@ -352,7 +360,15 @@ def init_opus_encoder() -> None:
if opus_enc:
return
libopus_handle = ctypes.CDLL(ctypes.util.find_library('opus'))
opus_path = ctypes.util.find_library('opus')
if not opus_path:
if not hasattr(init_opus_encoder, '_warned'):
logger.warn('libopus not found, audio encoding disabled', __name__)
init_opus_encoder._warned = True
return
libopus_handle = ctypes.CDLL(opus_path)
libopus_handle.opus_encoder_create.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.c_int, ctypes.POINTER(ctypes.c_int)]
libopus_handle.opus_encoder_create.restype = ctypes.c_void_p
libopus_handle.opus_encode.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.c_int, ctypes.POINTER(ctypes.c_ubyte), ctypes.c_int32]
@@ -434,41 +450,97 @@ def send_vp8_frame(stream_path : str, frame_data : bytes) -> None:
send_h264_frame(stream_path, frame_data)
def find_nal_starts(data : bytes) -> List:
starts = []
i = 0
while i < len(data) - 3:
if data[i] == 0 and data[i + 1] == 0:
if data[i + 2] == 1:
starts.append((i, 3))
i += 3
continue
if i < len(data) - 4 and data[i + 2] == 0 and data[i + 3] == 1:
starts.append((i, 4))
i += 4
continue
i += 1
return starts
def send_h264_frame(stream_path : str, frame_data : bytes) -> None:
global send_start_time
session = sessions.get(stream_path)
if not session:
return
viewers = session.get('viewers')
if not viewers:
return
prev = h264_au_buffer.get(stream_path, b'')
buf = prev + frame_data
nal_starts = find_nal_starts(buf)
au_starts = []
i = 0
while i < len(buf) - 4:
if buf[i] == 0 and buf[i + 1] == 0 and buf[i + 2] == 0 and buf[i + 3] == 1 and i + 4 < len(buf):
nal_type = buf[i + 4] & 0x1f
if nal_type == 7 or nal_type == 5:
au_starts.append(i)
i += 1
if len(au_starts) < 2:
if len(nal_starts) < 2:
h264_au_buffer[stream_path] = buf
return
for j in range(len(au_starts) - 1):
au = buf[au_starts[j]:au_starts[j + 1]]
au_boundaries = []
for viewer in session.get('viewers', []):
tracks = viewer.get('tracks', [])
for idx, (pos, sc_len) in enumerate(nal_starts):
nal_type = buf[pos + sc_len] & 0x1f
if tracks:
lib.rtcSendMessage(tracks[0], au, len(au))
if nal_type == 7:
au_boundaries.append(idx)
h264_au_buffer[stream_path] = buf[au_starts[-1]:]
if len(au_boundaries) < 2:
h264_au_buffer[stream_path] = buf
return
if send_start_time == 0:
send_start_time = _time.monotonic()
elapsed = _time.monotonic() - send_start_time
frame_duration = 1.0 / 30.0
for k in range(len(au_boundaries) - 1):
start_nal = au_boundaries[k]
end_nal = au_boundaries[k + 1]
timestamp = int((elapsed + k * frame_duration) * 90000) & 0xFFFFFFFF
nalu_parts = []
for nal_idx in range(start_nal, end_nal):
nal_pos = nal_starts[nal_idx][0]
nal_sc_len = nal_starts[nal_idx][1]
if nal_idx + 1 < len(nal_starts):
nal_end = nal_starts[nal_idx + 1][0]
else:
nal_end = len(buf)
nalu = buf[nal_pos + nal_sc_len:nal_end]
if len(nalu) > 0:
nalu_parts.append(len(nalu).to_bytes(4, 'big') + nalu)
if nalu_parts:
frame_msg = b''.join(nalu_parts)
for viewer in viewers:
tracks = viewer.get('tracks', [])
if tracks:
lib.rtcSendMessage(tracks[0], frame_msg, len(frame_msg))
last_boundary = au_boundaries[-1]
h264_au_buffer[stream_path] = buf[nal_starts[last_boundary][0]:]
def destroy_session(stream_path : str) -> None:
@@ -560,6 +632,13 @@ def handle_whep_offer(stream_path : str, sdp_offer : str) -> Optional[str]:
lib.rtcSetOpusPacketizer(audio_track, ctypes.byref(audio_packetizer))
lib.rtcChainRtcpSrReporter(audio_track)
def on_track_open(track_id, user_ptr):
logger.info('track ' + str(track_id) + ' opened', __name__)
track_open_cb = ctypes.CFUNCTYPE(None, ctypes.c_int, ctypes.c_void_p)(on_track_open)
callback_refs.append(track_open_cb)
lib.rtcSetOpenCallback(video_track, track_open_cb)
viewer['tracks'] = [video_track]
viewer['audio_track'] = audio_track
session['viewers'].append(viewer)
@@ -586,6 +665,9 @@ def handle_whep_offer(stream_path : str, sdp_offer : str) -> Optional[str]:
def start() -> None:
global running, http_thread
if running:
return
if not load_library():
return
+31 -68
View File
@@ -1,99 +1,62 @@
import os
import shutil
import subprocess
import time
import threading
from typing import Optional
import httpx
from facefusion import logger
RELAY_PORT : int = 8891
RELAY_BINARY : str = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'tools', 'whip_relay')
RELAY_PROCESS : Optional[subprocess.Popen[bytes]] = None
_started : bool = False
_lock : threading.Lock = threading.Lock()
def get_whip_url(stream_path : str) -> str:
return 'http://localhost:' + str(RELAY_PORT) + '/' + stream_path + '/whip'
from facefusion import rtc
return 'http://localhost:' + str(rtc.WHEP_PORT) + '/' + stream_path + '/whip'
def get_whep_url(stream_path : str) -> str:
return 'http://localhost:' + str(RELAY_PORT) + '/' + stream_path + '/whep'
def resolve_binary() -> str:
relay_path = shutil.which('whip_relay')
if relay_path:
return relay_path
if os.path.isfile(RELAY_BINARY):
return RELAY_BINARY
return RELAY_BINARY
from facefusion import rtc
return 'http://localhost:' + str(rtc.WHEP_PORT) + '/' + stream_path + '/whep'
def start() -> None:
global RELAY_PROCESS
global _started
subprocess.run([ 'fuser', '-k', str(RELAY_PORT) + '/tcp' ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL)
time.sleep(0.5)
from facefusion import rtc
relay_binary = resolve_binary()
if not rtc.lib:
if not rtc.load_library():
logger.warn('whip relay: libdatachannel not available', __name__)
return
if not os.path.isfile(relay_binary):
logger.warn('whip_relay binary not found at ' + relay_binary + ', skipping', __name__)
return
if not rtc.running:
rtc.start()
env = os.environ.copy()
env['LD_LIBRARY_PATH'] = '/home/henry/local/lib:' + env.get('LD_LIBRARY_PATH', '')
RELAY_PROCESS = subprocess.Popen(
[ relay_binary, str(RELAY_PORT) ],
env = env,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE
)
logger.info('whip relay started on port ' + str(RELAY_PORT), __name__)
_started = True
logger.info('whip relay (python) ready on port ' + str(rtc.WHEP_PORT), __name__)
def stop() -> None:
global RELAY_PROCESS
if RELAY_PROCESS:
RELAY_PROCESS.terminate()
RELAY_PROCESS.wait()
RELAY_PROCESS = None
global _started
_started = False
def wait_for_ready() -> bool:
for _ in range(10):
try:
response = httpx.get('http://localhost:' + str(RELAY_PORT) + '/health', timeout = 1)
if response.status_code == 200:
return True
except Exception:
pass
time.sleep(0.5)
return False
return _started
def is_session_ready(stream_path : str) -> bool:
try:
response = httpx.get('http://localhost:' + str(RELAY_PORT) + '/session/' + stream_path, timeout = 1)
if response.status_code == 200:
return True
except Exception:
pass
return False
from facefusion import rtc
return stream_path in rtc.sessions
def create_session(stream_path : str) -> int:
try:
response = httpx.post('http://localhost:' + str(RELAY_PORT) + '/' + stream_path + '/create', timeout = 5)
from facefusion import rtc
if response.status_code == 200:
return int(response.text)
except Exception:
pass
return 0
if not _started:
start()
if not rtc.lib:
return 0
rtp_port = rtc.create_rtp_session(stream_path)
return rtp_port