diff --git a/facefusion/apis/endpoints/stream.py b/facefusion/apis/endpoints/stream.py index 8c6b4b40..e8ef4ba2 100644 --- a/facefusion/apis/endpoints/stream.py +++ b/facefusion/apis/endpoints/stream.py @@ -557,8 +557,8 @@ def run_h264_dc_pipeline(latest_frame_holder : list, lock : threading.Lock, stop for frame in pending: if backend == 'relay' and udp_sock: - if len(frame) <= 65000: - udp_sock.sendto(frame, ('127.0.0.1', rtp_port)) + if len(frame) <= 64999: + udp_sock.sendto(b'\x01' + frame, ('127.0.0.1', rtp_port)) if backend == 'rtc': from facefusion import rtc rtc.send_vp8_frame(stream_path, frame) @@ -594,14 +594,15 @@ async def websocket_stream_whip_dc(websocket : WebSocket) -> None: from facefusion import rtc as rtc_audio import socket as sock stream_path = 'stream/' + session_id - rtp_port, audio_port = whip_relay.create_session(stream_path) + rtp_port = whip_relay.create_session(stream_path) if not rtp_port: logger.error('failed to create relay session', __name__) await websocket.close() return - audio_sock = sock.socket(sock.AF_INET, sock.SOCK_DGRAM) if audio_port else None + audio_sock = sock.socket(sock.AF_INET, sock.SOCK_DGRAM) + relay_addr = ('127.0.0.1', rtp_port) latest_frame_holder : list = [None] whep_sent = False @@ -630,30 +631,24 @@ async def websocket_stream_whip_dc(websocket : WebSocket) -> None: with lock: latest_frame_holder[0] = frame - if data[:2] != JPEG_MAGIC and audio_sock and audio_port: + if data[:2] != JPEG_MAGIC: encoder = rtc_audio.get_opus_encoder() pcm = numpy.frombuffer(data, dtype = numpy.int16).reshape(1, -1) - samples = pcm.shape[1] // (2 * 960) * 960 * 2 + needed = 960 * 2 - if samples > 0: - for offset in range(0, samples, 960 * 2): - chunk = pcm[:, offset:offset + 960 * 2] + for offset in range(0, pcm.shape[1] - needed + 1, needed): + chunk = pcm[:, offset:offset + needed] + audio_frame = av.AudioFrame.from_ndarray(chunk, format = 's16', layout = 'stereo') + audio_frame.sample_rate = 48000 - if chunk.shape[1] == 960 * 2: - frame = av.AudioFrame.from_ndarray(chunk, format = 's16', layout = 'stereo') - frame.sample_rate = 48000 - - for packet in encoder.encode(frame): - audio_sock.sendto(bytes(packet), ('127.0.0.1', audio_port)) + for packet in encoder.encode(audio_frame): + audio_sock.sendto(b'\x02' + bytes(packet), relay_addr) except Exception as exception: logger.error(str(exception), __name__) stop_event.set() - - if audio_sock: - audio_sock.close() - + audio_sock.close() loop = asyncio.get_running_loop() await loop.run_in_executor(None, worker.join, 10) return diff --git a/facefusion/whip_relay.py b/facefusion/whip_relay.py index 1fe4da88..230ba726 100644 --- a/facefusion/whip_relay.py +++ b/facefusion/whip_relay.py @@ -88,16 +88,12 @@ def is_session_ready(stream_path : str) -> bool: return False -def create_session(stream_path : str) -> tuple: +def create_session(stream_path : str) -> int: try: response = httpx.post('http://localhost:' + str(RELAY_PORT) + '/' + stream_path + '/create', timeout = 5) if response.status_code == 200: - parts = response.text.split(',') - - if len(parts) >= 2: - return int(parts[0]), int(parts[1]) - return int(parts[0]), 0 + return int(response.text) except Exception: pass - return 0, 0 + return 0 diff --git a/tools/whip_relay b/tools/whip_relay index 5b943d3f..0e34c730 100755 Binary files a/tools/whip_relay and b/tools/whip_relay differ diff --git a/tools/whip_relay.c b/tools/whip_relay.c index d6c9a15e..b989da85 100644 --- a/tools/whip_relay.c +++ b/tools/whip_relay.c @@ -30,8 +30,6 @@ typedef struct char path[256]; int rtp_port; int rtp_fd; - int audio_port; - int audio_fd; Viewer viewers[MAX_VIEWERS]; int viewer_count; int active; @@ -72,7 +70,7 @@ static double get_elapsed_seconds(struct timeval *start) return (now.tv_sec - start->tv_sec) + (now.tv_usec - start->tv_usec) / 1000000.0; } -static void *rtp_receiver_thread(void *arg) +static void *receiver_thread(void *arg) { Session *session = (Session *)arg; char buf[256 * 1024]; @@ -85,20 +83,21 @@ static void *rtp_receiver_thread(void *arg) socklen_t fromlen = sizeof(from); int n = recvfrom(session->rtp_fd, buf, sizeof(buf), 0, (struct sockaddr *)&from, &fromlen); - if (n <= 0) + if (n <= 1) { continue; } - if (!started) + char tag = buf[0]; + char *payload = buf + 1; + int payload_len = n - 1; + + if (!started && tag == 0x01) { gettimeofday(&start_time, NULL); started = 1; } - double elapsed = get_elapsed_seconds(&start_time); - uint32_t timestamp = (uint32_t)(elapsed * 90000.0); - pthread_mutex_lock(&session->lock); for (int v = 0; v < session->viewer_count; v++) @@ -110,15 +109,31 @@ static void *rtp_receiver_thread(void *arg) continue; } - for (int t = 0; t < viewer->track_count; t++) + if (tag == 0x01) { - if (!rtcIsOpen(viewer->tracks[t])) + for (int t = 0; t < viewer->track_count; t++) { - continue; + if (!rtcIsOpen(viewer->tracks[t])) + { + continue; + } + + double elapsed = started ? get_elapsed_seconds(&start_time) : 0; + uint32_t timestamp = (uint32_t)(elapsed * 90000.0); + rtcSetTrackRtpTimestamp(viewer->tracks[t], timestamp); + rtcSendMessage(viewer->tracks[t], payload, payload_len); + } + } + + if (tag == 0x02 && viewer->audio_track > 0) + { + if (rtcIsOpen(viewer->audio_track)) + { + rtcSetTrackRtpTimestamp(viewer->audio_track, session->audio_pts); + rtcSendMessage(viewer->audio_track, payload, payload_len); } - rtcSetTrackRtpTimestamp(viewer->tracks[t], timestamp); - rtcSendMessage(viewer->tracks[t], buf, n); + session->audio_pts += 960; } } @@ -128,51 +143,6 @@ static void *rtp_receiver_thread(void *arg) return NULL; } -static void *audio_receiver_thread(void *arg) -{ - Session *session = (Session *)arg; - char buf[4096]; - - while (running && session->active) - { - struct sockaddr_in from; - socklen_t fromlen = sizeof(from); - int n = recvfrom(session->audio_fd, buf, sizeof(buf), 0, (struct sockaddr *)&from, &fromlen); - - if (n <= 0) - { - continue; - } - - uint32_t ts = session->audio_pts; - session->audio_pts += 960; - - pthread_mutex_lock(&session->lock); - - for (int v = 0; v < session->viewer_count; v++) - { - Viewer *viewer = &session->viewers[v]; - - if (!viewer->connected || viewer->audio_track <= 0) - { - continue; - } - - if (!rtcIsOpen(viewer->audio_track)) - { - continue; - } - - rtcSetTrackRtpTimestamp(viewer->audio_track, ts); - rtcSendMessage(viewer->audio_track, buf, n); - } - - pthread_mutex_unlock(&session->lock); - } - - return NULL; -} - static Session *create_session_slot(const char *path) { for (int i = 0; i < MAX_SESSIONS; i++) @@ -206,27 +176,12 @@ static Session *create_session_slot(const char *path) setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); sessions[i].rtp_fd = fd; - - sessions[i].audio_port = next_rtp_port++; - int afd = socket(AF_INET, SOCK_DGRAM, 0); - struct sockaddr_in aaddr; - memset(&aaddr, 0, sizeof(aaddr)); - aaddr.sin_family = AF_INET; - aaddr.sin_addr.s_addr = inet_addr("127.0.0.1"); - aaddr.sin_port = htons(sessions[i].audio_port); - bind(afd, (struct sockaddr *)&aaddr, sizeof(aaddr)); - setsockopt(afd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); - sessions[i].audio_fd = afd; sessions[i].audio_pts = 0; pthread_t tid; - pthread_create(&tid, NULL, rtp_receiver_thread, &sessions[i]); + pthread_create(&tid, NULL, receiver_thread, &sessions[i]); pthread_detach(tid); - pthread_t atid; - pthread_create(&atid, NULL, audio_receiver_thread, &sessions[i]); - pthread_detach(atid); - return &sessions[i]; } } @@ -516,8 +471,8 @@ static void handle_client(int client_fd) if (session) { - char port_str[64]; - snprintf(port_str, sizeof(port_str), "%d,%d", session->rtp_port, session->audio_port); + char port_str[16]; + snprintf(port_str, sizeof(port_str), "%d", session->rtp_port); send_http_response(client_fd, 200, "text/plain", port_str, strlen(port_str)); } else