mass test approaches

This commit is contained in:
henryruhs
2026-03-23 14:30:17 +01:00
parent da74b85223
commit a3785ff14b
4 changed files with 48 additions and 102 deletions
+14 -19
View File
@@ -557,8 +557,8 @@ def run_h264_dc_pipeline(latest_frame_holder : list, lock : threading.Lock, stop
for frame in pending:
if backend == 'relay' and udp_sock:
if len(frame) <= 65000:
udp_sock.sendto(frame, ('127.0.0.1', rtp_port))
if len(frame) <= 64999:
udp_sock.sendto(b'\x01' + frame, ('127.0.0.1', rtp_port))
if backend == 'rtc':
from facefusion import rtc
rtc.send_vp8_frame(stream_path, frame)
@@ -594,14 +594,15 @@ async def websocket_stream_whip_dc(websocket : WebSocket) -> None:
from facefusion import rtc as rtc_audio
import socket as sock
stream_path = 'stream/' + session_id
rtp_port, audio_port = whip_relay.create_session(stream_path)
rtp_port = whip_relay.create_session(stream_path)
if not rtp_port:
logger.error('failed to create relay session', __name__)
await websocket.close()
return
audio_sock = sock.socket(sock.AF_INET, sock.SOCK_DGRAM) if audio_port else None
audio_sock = sock.socket(sock.AF_INET, sock.SOCK_DGRAM)
relay_addr = ('127.0.0.1', rtp_port)
latest_frame_holder : list = [None]
whep_sent = False
@@ -630,30 +631,24 @@ async def websocket_stream_whip_dc(websocket : WebSocket) -> None:
with lock:
latest_frame_holder[0] = frame
if data[:2] != JPEG_MAGIC and audio_sock and audio_port:
if data[:2] != JPEG_MAGIC:
encoder = rtc_audio.get_opus_encoder()
pcm = numpy.frombuffer(data, dtype = numpy.int16).reshape(1, -1)
samples = pcm.shape[1] // (2 * 960) * 960 * 2
needed = 960 * 2
if samples > 0:
for offset in range(0, samples, 960 * 2):
chunk = pcm[:, offset:offset + 960 * 2]
for offset in range(0, pcm.shape[1] - needed + 1, needed):
chunk = pcm[:, offset:offset + needed]
audio_frame = av.AudioFrame.from_ndarray(chunk, format = 's16', layout = 'stereo')
audio_frame.sample_rate = 48000
if chunk.shape[1] == 960 * 2:
frame = av.AudioFrame.from_ndarray(chunk, format = 's16', layout = 'stereo')
frame.sample_rate = 48000
for packet in encoder.encode(frame):
audio_sock.sendto(bytes(packet), ('127.0.0.1', audio_port))
for packet in encoder.encode(audio_frame):
audio_sock.sendto(b'\x02' + bytes(packet), relay_addr)
except Exception as exception:
logger.error(str(exception), __name__)
stop_event.set()
if audio_sock:
audio_sock.close()
audio_sock.close()
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, worker.join, 10)
return
+3 -7
View File
@@ -88,16 +88,12 @@ def is_session_ready(stream_path : str) -> bool:
return False
def create_session(stream_path : str) -> tuple:
def create_session(stream_path : str) -> int:
try:
response = httpx.post('http://localhost:' + str(RELAY_PORT) + '/' + stream_path + '/create', timeout = 5)
if response.status_code == 200:
parts = response.text.split(',')
if len(parts) >= 2:
return int(parts[0]), int(parts[1])
return int(parts[0]), 0
return int(response.text)
except Exception:
pass
return 0, 0
return 0
BIN
View File
Binary file not shown.
+31 -76
View File
@@ -30,8 +30,6 @@ typedef struct
char path[256];
int rtp_port;
int rtp_fd;
int audio_port;
int audio_fd;
Viewer viewers[MAX_VIEWERS];
int viewer_count;
int active;
@@ -72,7 +70,7 @@ static double get_elapsed_seconds(struct timeval *start)
return (now.tv_sec - start->tv_sec) + (now.tv_usec - start->tv_usec) / 1000000.0;
}
static void *rtp_receiver_thread(void *arg)
static void *receiver_thread(void *arg)
{
Session *session = (Session *)arg;
char buf[256 * 1024];
@@ -85,20 +83,21 @@ static void *rtp_receiver_thread(void *arg)
socklen_t fromlen = sizeof(from);
int n = recvfrom(session->rtp_fd, buf, sizeof(buf), 0, (struct sockaddr *)&from, &fromlen);
if (n <= 0)
if (n <= 1)
{
continue;
}
if (!started)
char tag = buf[0];
char *payload = buf + 1;
int payload_len = n - 1;
if (!started && tag == 0x01)
{
gettimeofday(&start_time, NULL);
started = 1;
}
double elapsed = get_elapsed_seconds(&start_time);
uint32_t timestamp = (uint32_t)(elapsed * 90000.0);
pthread_mutex_lock(&session->lock);
for (int v = 0; v < session->viewer_count; v++)
@@ -110,15 +109,31 @@ static void *rtp_receiver_thread(void *arg)
continue;
}
for (int t = 0; t < viewer->track_count; t++)
if (tag == 0x01)
{
if (!rtcIsOpen(viewer->tracks[t]))
for (int t = 0; t < viewer->track_count; t++)
{
continue;
if (!rtcIsOpen(viewer->tracks[t]))
{
continue;
}
double elapsed = started ? get_elapsed_seconds(&start_time) : 0;
uint32_t timestamp = (uint32_t)(elapsed * 90000.0);
rtcSetTrackRtpTimestamp(viewer->tracks[t], timestamp);
rtcSendMessage(viewer->tracks[t], payload, payload_len);
}
}
if (tag == 0x02 && viewer->audio_track > 0)
{
if (rtcIsOpen(viewer->audio_track))
{
rtcSetTrackRtpTimestamp(viewer->audio_track, session->audio_pts);
rtcSendMessage(viewer->audio_track, payload, payload_len);
}
rtcSetTrackRtpTimestamp(viewer->tracks[t], timestamp);
rtcSendMessage(viewer->tracks[t], buf, n);
session->audio_pts += 960;
}
}
@@ -128,51 +143,6 @@ static void *rtp_receiver_thread(void *arg)
return NULL;
}
static void *audio_receiver_thread(void *arg)
{
Session *session = (Session *)arg;
char buf[4096];
while (running && session->active)
{
struct sockaddr_in from;
socklen_t fromlen = sizeof(from);
int n = recvfrom(session->audio_fd, buf, sizeof(buf), 0, (struct sockaddr *)&from, &fromlen);
if (n <= 0)
{
continue;
}
uint32_t ts = session->audio_pts;
session->audio_pts += 960;
pthread_mutex_lock(&session->lock);
for (int v = 0; v < session->viewer_count; v++)
{
Viewer *viewer = &session->viewers[v];
if (!viewer->connected || viewer->audio_track <= 0)
{
continue;
}
if (!rtcIsOpen(viewer->audio_track))
{
continue;
}
rtcSetTrackRtpTimestamp(viewer->audio_track, ts);
rtcSendMessage(viewer->audio_track, buf, n);
}
pthread_mutex_unlock(&session->lock);
}
return NULL;
}
static Session *create_session_slot(const char *path)
{
for (int i = 0; i < MAX_SESSIONS; i++)
@@ -206,27 +176,12 @@ static Session *create_session_slot(const char *path)
setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
sessions[i].rtp_fd = fd;
sessions[i].audio_port = next_rtp_port++;
int afd = socket(AF_INET, SOCK_DGRAM, 0);
struct sockaddr_in aaddr;
memset(&aaddr, 0, sizeof(aaddr));
aaddr.sin_family = AF_INET;
aaddr.sin_addr.s_addr = inet_addr("127.0.0.1");
aaddr.sin_port = htons(sessions[i].audio_port);
bind(afd, (struct sockaddr *)&aaddr, sizeof(aaddr));
setsockopt(afd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
sessions[i].audio_fd = afd;
sessions[i].audio_pts = 0;
pthread_t tid;
pthread_create(&tid, NULL, rtp_receiver_thread, &sessions[i]);
pthread_create(&tid, NULL, receiver_thread, &sessions[i]);
pthread_detach(tid);
pthread_t atid;
pthread_create(&atid, NULL, audio_receiver_thread, &sessions[i]);
pthread_detach(atid);
return &sessions[i];
}
}
@@ -516,8 +471,8 @@ static void handle_client(int client_fd)
if (session)
{
char port_str[64];
snprintf(port_str, sizeof(port_str), "%d,%d", session->rtp_port, session->audio_port);
char port_str[16];
snprintf(port_str, sizeof(port_str), "%d", session->rtp_port);
send_http_response(client_fd, 200, "text/plain", port_str, strlen(port_str));
}
else