From 3404200c566ef7d0f393ef5d0441e6fdc872483a Mon Sep 17 00:00:00 2001 From: henryruhs Date: Mon, 23 Mar 2026 14:59:09 +0100 Subject: [PATCH] mass test approaches --- facefusion/apis/endpoints/stream.py | 21 ++++---- facefusion/rtc.py | 83 ++++++++++++++++++----------- 2 files changed, 62 insertions(+), 42 deletions(-) diff --git a/facefusion/apis/endpoints/stream.py b/facefusion/apis/endpoints/stream.py index e8ef4ba2..3fa3b01f 100644 --- a/facefusion/apis/endpoints/stream.py +++ b/facefusion/apis/endpoints/stream.py @@ -7,7 +7,6 @@ from collections import deque from concurrent.futures import ThreadPoolExecutor from typing import Deque, List -import av import cv2 import numpy from starlette.websockets import WebSocket @@ -632,17 +631,19 @@ async def websocket_stream_whip_dc(websocket : WebSocket) -> None: latest_frame_holder[0] = frame if data[:2] != JPEG_MAGIC: - encoder = rtc_audio.get_opus_encoder() - pcm = numpy.frombuffer(data, dtype = numpy.int16).reshape(1, -1) - needed = 960 * 2 + rtc_audio.init_opus_encoder() - for offset in range(0, pcm.shape[1] - needed + 1, needed): - chunk = pcm[:, offset:offset + needed] - audio_frame = av.AudioFrame.from_ndarray(chunk, format = 's16', layout = 'stereo') - audio_frame.sample_rate = 48000 + with rtc_audio.audio_lock: + rtc_audio.audio_buffer.extend(data) + needed = rtc_audio.OPUS_FRAME_SAMPLES * 2 * 2 - for packet in encoder.encode(audio_frame): - audio_sock.sendto(b'\x02' + bytes(packet), relay_addr) + while len(rtc_audio.audio_buffer) >= needed: + chunk = bytes(rtc_audio.audio_buffer[:needed]) + del rtc_audio.audio_buffer[:needed] + opus_pkt = rtc_audio.encode_opus_frame(chunk) + + if opus_pkt: + audio_sock.sendto(b'\x02' + opus_pkt, relay_addr) except Exception as exception: logger.error(str(exception), __name__) diff --git a/facefusion/rtc.py b/facefusion/rtc.py index a93f65cc..2d888fd2 100644 --- a/facefusion/rtc.py +++ b/facefusion/rtc.py @@ -6,9 +6,6 @@ import time as _time from http.server import BaseHTTPRequestHandler, HTTPServer from typing import Dict, List, Optional, TypeAlias -import av -import numpy - from facefusion import logger RtcLib : TypeAlias = ctypes.CDLL @@ -267,10 +264,10 @@ def run_rtp_forwarder(stream_path : str) -> None: send_start_time : float = 0 -opus_encoder : Optional[av.CodecContext] = None +audio_pts : int = 0 +opus_enc = None audio_buffer : bytearray = bytearray() audio_lock : threading.Lock = threading.Lock() -audio_pts : int = 0 OPUS_FRAME_SAMPLES : int = 960 @@ -307,17 +304,40 @@ def send_to_viewers(stream_path : str, data : bytes) -> None: lib.rtcSendMessage(track_id, buf, data_len) -def get_opus_encoder() -> av.CodecContext: - global opus_encoder +libopus_handle = None - if not opus_encoder: - opus_encoder = av.CodecContext.create('libopus', 'w') - opus_encoder.sample_rate = 48000 - opus_encoder.layout = 'stereo' - opus_encoder.format = av.AudioFormat('s16') - opus_encoder.open() - return opus_encoder +def init_opus_encoder() -> None: + global opus_enc, libopus_handle + + if opus_enc: + return + + libopus_handle = ctypes.CDLL(ctypes.util.find_library('opus')) + libopus_handle.opus_encoder_create.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.c_int, ctypes.POINTER(ctypes.c_int)] + libopus_handle.opus_encoder_create.restype = ctypes.c_void_p + libopus_handle.opus_encode.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.c_int, ctypes.POINTER(ctypes.c_ubyte), ctypes.c_int32] + libopus_handle.opus_encode.restype = ctypes.c_int32 + + error = ctypes.c_int(0) + opus_enc = libopus_handle.opus_encoder_create(48000, 2, 2049, ctypes.byref(error)) + + +def encode_opus_frame(pcm_data : bytes) -> Optional[bytes]: + if not opus_enc or not libopus_handle: + return None + + max_packet = 4000 + output = (ctypes.c_ubyte * max_packet)() + result = libopus_handle.opus_encode(opus_enc, pcm_data, OPUS_FRAME_SAMPLES, output, max_packet) + + if result > 0: + return bytes(output[:result]) + return None + + +def get_opus_encoder() -> None: + init_opus_encoder() def send_audio(stream_path : str, pcm_data : bytes) -> None: @@ -333,6 +353,8 @@ def send_audio(stream_path : str, pcm_data : bytes) -> None: if not viewers: return + init_opus_encoder() + with audio_lock: audio_buffer.extend(pcm_data) needed = OPUS_FRAME_SAMPLES * 2 * 2 @@ -341,30 +363,27 @@ def send_audio(stream_path : str, pcm_data : bytes) -> None: chunk = bytes(audio_buffer[:needed]) del audio_buffer[:needed] - encoder = get_opus_encoder() - pcm = numpy.frombuffer(chunk, dtype = numpy.int16).reshape(1, -1) - frame = av.AudioFrame.from_ndarray(pcm, format = 's16', layout = 'stereo') - frame.sample_rate = 48000 - frame.pts = audio_pts + opus_data = encode_opus_frame(chunk) - for packet in encoder.encode(frame): - opus_data = bytes(packet) - buf = ctypes.create_string_buffer(opus_data) + if not opus_data: + continue - for viewer in viewers: - if not viewer.get('connected'): - continue + buf = ctypes.create_string_buffer(opus_data) - audio_track_id = viewer.get('audio_track') + for viewer in viewers: + if not viewer.get('connected'): + continue - if not audio_track_id: - continue + audio_track_id = viewer.get('audio_track') - if not lib.rtcIsOpen(audio_track_id): - continue + if not audio_track_id: + continue - lib.rtcSetTrackRtpTimestamp(audio_track_id, audio_pts & 0xFFFFFFFF) - lib.rtcSendMessage(audio_track_id, buf, len(opus_data)) + if not lib.rtcIsOpen(audio_track_id): + continue + + lib.rtcSetTrackRtpTimestamp(audio_track_id, audio_pts & 0xFFFFFFFF) + lib.rtcSendMessage(audio_track_id, buf, len(opus_data)) audio_pts += OPUS_FRAME_SAMPLES