mass test approaches

This commit is contained in:
henryruhs
2026-03-23 14:59:09 +01:00
parent a3785ff14b
commit 3404200c56
2 changed files with 62 additions and 42 deletions
+11 -10
View File
@@ -7,7 +7,6 @@ from collections import deque
from concurrent.futures import ThreadPoolExecutor
from typing import Deque, List
import av
import cv2
import numpy
from starlette.websockets import WebSocket
@@ -632,17 +631,19 @@ async def websocket_stream_whip_dc(websocket : WebSocket) -> None:
latest_frame_holder[0] = frame
if data[:2] != JPEG_MAGIC:
encoder = rtc_audio.get_opus_encoder()
pcm = numpy.frombuffer(data, dtype = numpy.int16).reshape(1, -1)
needed = 960 * 2
rtc_audio.init_opus_encoder()
for offset in range(0, pcm.shape[1] - needed + 1, needed):
chunk = pcm[:, offset:offset + needed]
audio_frame = av.AudioFrame.from_ndarray(chunk, format = 's16', layout = 'stereo')
audio_frame.sample_rate = 48000
with rtc_audio.audio_lock:
rtc_audio.audio_buffer.extend(data)
needed = rtc_audio.OPUS_FRAME_SAMPLES * 2 * 2
for packet in encoder.encode(audio_frame):
audio_sock.sendto(b'\x02' + bytes(packet), relay_addr)
while len(rtc_audio.audio_buffer) >= needed:
chunk = bytes(rtc_audio.audio_buffer[:needed])
del rtc_audio.audio_buffer[:needed]
opus_pkt = rtc_audio.encode_opus_frame(chunk)
if opus_pkt:
audio_sock.sendto(b'\x02' + opus_pkt, relay_addr)
except Exception as exception:
logger.error(str(exception), __name__)
+51 -32
View File
@@ -6,9 +6,6 @@ import time as _time
from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import Dict, List, Optional, TypeAlias
import av
import numpy
from facefusion import logger
RtcLib : TypeAlias = ctypes.CDLL
@@ -267,10 +264,10 @@ def run_rtp_forwarder(stream_path : str) -> None:
send_start_time : float = 0
opus_encoder : Optional[av.CodecContext] = None
audio_pts : int = 0
opus_enc = None
audio_buffer : bytearray = bytearray()
audio_lock : threading.Lock = threading.Lock()
audio_pts : int = 0
OPUS_FRAME_SAMPLES : int = 960
@@ -307,17 +304,40 @@ def send_to_viewers(stream_path : str, data : bytes) -> None:
lib.rtcSendMessage(track_id, buf, data_len)
def get_opus_encoder() -> av.CodecContext:
global opus_encoder
libopus_handle = None
if not opus_encoder:
opus_encoder = av.CodecContext.create('libopus', 'w')
opus_encoder.sample_rate = 48000
opus_encoder.layout = 'stereo'
opus_encoder.format = av.AudioFormat('s16')
opus_encoder.open()
return opus_encoder
def init_opus_encoder() -> None:
global opus_enc, libopus_handle
if opus_enc:
return
libopus_handle = ctypes.CDLL(ctypes.util.find_library('opus'))
libopus_handle.opus_encoder_create.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.c_int, ctypes.POINTER(ctypes.c_int)]
libopus_handle.opus_encoder_create.restype = ctypes.c_void_p
libopus_handle.opus_encode.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.c_int, ctypes.POINTER(ctypes.c_ubyte), ctypes.c_int32]
libopus_handle.opus_encode.restype = ctypes.c_int32
error = ctypes.c_int(0)
opus_enc = libopus_handle.opus_encoder_create(48000, 2, 2049, ctypes.byref(error))
def encode_opus_frame(pcm_data : bytes) -> Optional[bytes]:
if not opus_enc or not libopus_handle:
return None
max_packet = 4000
output = (ctypes.c_ubyte * max_packet)()
result = libopus_handle.opus_encode(opus_enc, pcm_data, OPUS_FRAME_SAMPLES, output, max_packet)
if result > 0:
return bytes(output[:result])
return None
def get_opus_encoder() -> None:
init_opus_encoder()
def send_audio(stream_path : str, pcm_data : bytes) -> None:
@@ -333,6 +353,8 @@ def send_audio(stream_path : str, pcm_data : bytes) -> None:
if not viewers:
return
init_opus_encoder()
with audio_lock:
audio_buffer.extend(pcm_data)
needed = OPUS_FRAME_SAMPLES * 2 * 2
@@ -341,30 +363,27 @@ def send_audio(stream_path : str, pcm_data : bytes) -> None:
chunk = bytes(audio_buffer[:needed])
del audio_buffer[:needed]
encoder = get_opus_encoder()
pcm = numpy.frombuffer(chunk, dtype = numpy.int16).reshape(1, -1)
frame = av.AudioFrame.from_ndarray(pcm, format = 's16', layout = 'stereo')
frame.sample_rate = 48000
frame.pts = audio_pts
opus_data = encode_opus_frame(chunk)
for packet in encoder.encode(frame):
opus_data = bytes(packet)
buf = ctypes.create_string_buffer(opus_data)
if not opus_data:
continue
for viewer in viewers:
if not viewer.get('connected'):
continue
buf = ctypes.create_string_buffer(opus_data)
audio_track_id = viewer.get('audio_track')
for viewer in viewers:
if not viewer.get('connected'):
continue
if not audio_track_id:
continue
audio_track_id = viewer.get('audio_track')
if not lib.rtcIsOpen(audio_track_id):
continue
if not audio_track_id:
continue
lib.rtcSetTrackRtpTimestamp(audio_track_id, audio_pts & 0xFFFFFFFF)
lib.rtcSendMessage(audio_track_id, buf, len(opus_data))
if not lib.rtcIsOpen(audio_track_id):
continue
lib.rtcSetTrackRtpTimestamp(audio_track_id, audio_pts & 0xFFFFFFFF)
lib.rtcSendMessage(audio_track_id, buf, len(opus_data))
audio_pts += OPUS_FRAME_SAMPLES