From da74b85223f0bfd07889f268fd64164b1176cdf4 Mon Sep 17 00:00:00 2001 From: henryruhs Date: Mon, 23 Mar 2026 14:20:37 +0100 Subject: [PATCH] mass test approaches --- facefusion/apis/core.py | 5 +- facefusion/apis/endpoints/stream.py | 27 +- facefusion/rtc.py | 13 +- facefusion/whip_relay.py | 10 +- test_stream.html | 29 +- tools/whip_relay | Bin 0 -> 31328 bytes tools/whip_relay.c | 664 ++++++++++++++++++++++++++++ 7 files changed, 729 insertions(+), 19 deletions(-) create mode 100755 tools/whip_relay create mode 100644 tools/whip_relay.c diff --git a/facefusion/apis/core.py b/facefusion/apis/core.py index ad5a501b..e99fd624 100644 --- a/facefusion/apis/core.py +++ b/facefusion/apis/core.py @@ -14,7 +14,7 @@ from facefusion.apis.endpoints.ping import websocket_ping from facefusion.apis.endpoints.session import create_session, destroy_session, get_session, refresh_session from facefusion.apis.endpoints.state import get_state, set_state from facefusion import logger -from facefusion.apis.endpoints.stream import websocket_stream, websocket_stream_audio, websocket_stream_live, websocket_stream_mjpeg, websocket_stream_rtc, websocket_stream_whip, websocket_stream_whip_aio, websocket_stream_whip_dc, websocket_stream_whip_py +from facefusion.apis.endpoints.stream import websocket_stream, websocket_stream_audio, websocket_stream_live, websocket_stream_mjpeg, websocket_stream_rtc, websocket_stream_whip, websocket_stream_whip_dc, websocket_stream_whip_py from facefusion.apis.middlewares.session import create_session_guard @@ -87,8 +87,7 @@ def create_api() -> Starlette: WebSocketRoute('/stream/whip-py', websocket_stream_whip_py, middleware = [ session_guard ]), WebSocketRoute('/stream/whip-dc', websocket_stream_whip_dc, middleware = [ session_guard ]), WebSocketRoute('/stream/live', websocket_stream_live, middleware = [ session_guard ]), - WebSocketRoute('/stream/whip-aio', websocket_stream_whip_aio, middleware = [ session_guard ]), - WebSocketRoute('/stream/rtc', websocket_stream_rtc, middleware = [ session_guard ]), +WebSocketRoute('/stream/rtc', websocket_stream_rtc, middleware = [ session_guard ]), WebSocketRoute('/stream/mjpeg', websocket_stream_mjpeg, middleware = [ session_guard ]), WebSocketRoute('/stream/audio', websocket_stream_audio, middleware = [ session_guard ]) ] diff --git a/facefusion/apis/endpoints/stream.py b/facefusion/apis/endpoints/stream.py index 24cca8a6..8c6b4b40 100644 --- a/facefusion/apis/endpoints/stream.py +++ b/facefusion/apis/endpoints/stream.py @@ -7,6 +7,7 @@ from collections import deque from concurrent.futures import ThreadPoolExecutor from typing import Deque, List +import av import cv2 import numpy from starlette.websockets import WebSocket @@ -590,14 +591,18 @@ async def websocket_stream_whip_dc(websocket : WebSocket) -> None: if source_paths: from facefusion import whip_relay + from facefusion import rtc as rtc_audio + import socket as sock stream_path = 'stream/' + session_id - rtp_port = whip_relay.create_session(stream_path) + rtp_port, audio_port = whip_relay.create_session(stream_path) if not rtp_port: logger.error('failed to create relay session', __name__) await websocket.close() return + audio_sock = sock.socket(sock.AF_INET, sock.SOCK_DGRAM) if audio_port else None + latest_frame_holder : list = [None] whep_sent = False lock = threading.Lock() @@ -625,10 +630,30 @@ async def websocket_stream_whip_dc(websocket : WebSocket) -> None: with lock: latest_frame_holder[0] = frame + if data[:2] != JPEG_MAGIC and audio_sock and audio_port: + encoder = rtc_audio.get_opus_encoder() + pcm = numpy.frombuffer(data, dtype = numpy.int16).reshape(1, -1) + samples = pcm.shape[1] // (2 * 960) * 960 * 2 + + if samples > 0: + for offset in range(0, samples, 960 * 2): + chunk = pcm[:, offset:offset + 960 * 2] + + if chunk.shape[1] == 960 * 2: + frame = av.AudioFrame.from_ndarray(chunk, format = 's16', layout = 'stereo') + frame.sample_rate = 48000 + + for packet in encoder.encode(frame): + audio_sock.sendto(bytes(packet), ('127.0.0.1', audio_port)) + except Exception as exception: logger.error(str(exception), __name__) stop_event.set() + + if audio_sock: + audio_sock.close() + loop = asyncio.get_running_loop() await loop.run_in_executor(None, worker.join, 10) return diff --git a/facefusion/rtc.py b/facefusion/rtc.py index 7379046d..a93f65cc 100644 --- a/facefusion/rtc.py +++ b/facefusion/rtc.py @@ -270,6 +270,7 @@ send_start_time : float = 0 opus_encoder : Optional[av.CodecContext] = None audio_buffer : bytearray = bytearray() audio_lock : threading.Lock = threading.Lock() +audio_pts : int = 0 OPUS_FRAME_SAMPLES : int = 960 @@ -320,6 +321,8 @@ def get_opus_encoder() -> av.CodecContext: def send_audio(stream_path : str, pcm_data : bytes) -> None: + global audio_pts + session = sessions.get(stream_path) if not session: @@ -342,10 +345,11 @@ def send_audio(stream_path : str, pcm_data : bytes) -> None: pcm = numpy.frombuffer(chunk, dtype = numpy.int16).reshape(1, -1) frame = av.AudioFrame.from_ndarray(pcm, format = 's16', layout = 'stereo') frame.sample_rate = 48000 - frame.pts = None + frame.pts = audio_pts for packet in encoder.encode(frame): opus_data = bytes(packet) + buf = ctypes.create_string_buffer(opus_data) for viewer in viewers: if not viewer.get('connected'): @@ -359,12 +363,11 @@ def send_audio(stream_path : str, pcm_data : bytes) -> None: if not lib.rtcIsOpen(audio_track_id): continue - elapsed = _time.monotonic() - send_start_time if send_start_time > 0 else 0 - timestamp = int(elapsed * 48000) & 0xFFFFFFFF - buf = ctypes.create_string_buffer(opus_data) - lib.rtcSetTrackRtpTimestamp(audio_track_id, timestamp) + lib.rtcSetTrackRtpTimestamp(audio_track_id, audio_pts & 0xFFFFFFFF) lib.rtcSendMessage(audio_track_id, buf, len(opus_data)) + audio_pts += OPUS_FRAME_SAMPLES + h264_au_buffer : Dict[str, bytes] = {} diff --git a/facefusion/whip_relay.py b/facefusion/whip_relay.py index 230ba726..1fe4da88 100644 --- a/facefusion/whip_relay.py +++ b/facefusion/whip_relay.py @@ -88,12 +88,16 @@ def is_session_ready(stream_path : str) -> bool: return False -def create_session(stream_path : str) -> int: +def create_session(stream_path : str) -> tuple: try: response = httpx.post('http://localhost:' + str(RELAY_PORT) + '/' + stream_path + '/create', timeout = 5) if response.status_code == 200: - return int(response.text) + parts = response.text.split(',') + + if len(parts) >= 2: + return int(parts[0]), int(parts[1]) + return int(parts[0]), 0 except Exception: pass - return 0 + return 0, 0 diff --git a/test_stream.html b/test_stream.html index 653e3531..879ed8da 100644 --- a/test_stream.html +++ b/test_stream.html @@ -3,7 +3,7 @@ -WHIP Stream Monitor +Video Stream