From a3785ff14b9e71c4393ae6cd6c29fc095dcf50fe Mon Sep 17 00:00:00 2001 From: henryruhs Date: Mon, 23 Mar 2026 14:30:17 +0100 Subject: [PATCH] mass test approaches --- facefusion/apis/endpoints/stream.py | 33 ++++----- facefusion/whip_relay.py | 10 +-- tools/whip_relay | Bin 31328 -> 31280 bytes tools/whip_relay.c | 107 ++++++++-------------------- 4 files changed, 48 insertions(+), 102 deletions(-) diff --git a/facefusion/apis/endpoints/stream.py b/facefusion/apis/endpoints/stream.py index 8c6b4b40..e8ef4ba2 100644 --- a/facefusion/apis/endpoints/stream.py +++ b/facefusion/apis/endpoints/stream.py @@ -557,8 +557,8 @@ def run_h264_dc_pipeline(latest_frame_holder : list, lock : threading.Lock, stop for frame in pending: if backend == 'relay' and udp_sock: - if len(frame) <= 65000: - udp_sock.sendto(frame, ('127.0.0.1', rtp_port)) + if len(frame) <= 64999: + udp_sock.sendto(b'\x01' + frame, ('127.0.0.1', rtp_port)) if backend == 'rtc': from facefusion import rtc rtc.send_vp8_frame(stream_path, frame) @@ -594,14 +594,15 @@ async def websocket_stream_whip_dc(websocket : WebSocket) -> None: from facefusion import rtc as rtc_audio import socket as sock stream_path = 'stream/' + session_id - rtp_port, audio_port = whip_relay.create_session(stream_path) + rtp_port = whip_relay.create_session(stream_path) if not rtp_port: logger.error('failed to create relay session', __name__) await websocket.close() return - audio_sock = sock.socket(sock.AF_INET, sock.SOCK_DGRAM) if audio_port else None + audio_sock = sock.socket(sock.AF_INET, sock.SOCK_DGRAM) + relay_addr = ('127.0.0.1', rtp_port) latest_frame_holder : list = [None] whep_sent = False @@ -630,30 +631,24 @@ async def websocket_stream_whip_dc(websocket : WebSocket) -> None: with lock: latest_frame_holder[0] = frame - if data[:2] != JPEG_MAGIC and audio_sock and audio_port: + if data[:2] != JPEG_MAGIC: encoder = rtc_audio.get_opus_encoder() pcm = numpy.frombuffer(data, dtype = numpy.int16).reshape(1, -1) - samples = pcm.shape[1] // (2 * 960) * 960 * 2 + needed = 960 * 2 - if samples > 0: - for offset in range(0, samples, 960 * 2): - chunk = pcm[:, offset:offset + 960 * 2] + for offset in range(0, pcm.shape[1] - needed + 1, needed): + chunk = pcm[:, offset:offset + needed] + audio_frame = av.AudioFrame.from_ndarray(chunk, format = 's16', layout = 'stereo') + audio_frame.sample_rate = 48000 - if chunk.shape[1] == 960 * 2: - frame = av.AudioFrame.from_ndarray(chunk, format = 's16', layout = 'stereo') - frame.sample_rate = 48000 - - for packet in encoder.encode(frame): - audio_sock.sendto(bytes(packet), ('127.0.0.1', audio_port)) + for packet in encoder.encode(audio_frame): + audio_sock.sendto(b'\x02' + bytes(packet), relay_addr) except Exception as exception: logger.error(str(exception), __name__) stop_event.set() - - if audio_sock: - audio_sock.close() - + audio_sock.close() loop = asyncio.get_running_loop() await loop.run_in_executor(None, worker.join, 10) return diff --git a/facefusion/whip_relay.py b/facefusion/whip_relay.py index 1fe4da88..230ba726 100644 --- a/facefusion/whip_relay.py +++ b/facefusion/whip_relay.py @@ -88,16 +88,12 @@ def is_session_ready(stream_path : str) -> bool: return False -def create_session(stream_path : str) -> tuple: +def create_session(stream_path : str) -> int: try: response = httpx.post('http://localhost:' + str(RELAY_PORT) + '/' + stream_path + '/create', timeout = 5) if response.status_code == 200: - parts = response.text.split(',') - - if len(parts) >= 2: - return int(parts[0]), int(parts[1]) - return int(parts[0]), 0 + return int(response.text) except Exception: pass - return 0, 0 + return 0 diff --git a/tools/whip_relay b/tools/whip_relay index 5b943d3f4f843377da605368bcfafe9f5be2128c..0e34c7304295fd1bdaea1bdc58f613c3e4d2ef61 100755 GIT binary patch delta 5880 zcmZ`-3se->8J-z-1r@U(F1v^i@C9lViGrYFK;1I+)WjDFo}|VHKI3VWq7jd}n03|Z zun}`LG1b#(B#CW3Js8@AV~Z|oT+(QQHuaIjNHm%ys32-W5_S6hcV-8)I_L1;d;k0Y z-~YO=rT&&!aZ4;q7ME@rc%jq&1c8716IfSqXvYGpJ}h9r7Dq}Y&sV-8J|=c7)tN#m zdsDj5ZPmoJ+fL8P7@l(KHAmgBnNN)=-~3Z$X;?~M>4<|>54a>Xu3{&XV!C#amBX^n zS~^h>REy;G7fm5{uMV=$K2sJ1Sv)MOuArtCKgA`TK6!bR#DZ64Ph z(X-}`ZEXn~n><`R!wQqr#RT@}kVH>f)Ma-#V`s(S>^L;iABV~=iAD%2KIA*_p_!c-g3xzs)>rWS3OX(ec)}zXp@(L z%051jJ$?~Js=!foji9J__hRU&<~Elnl~j|wxV#rs>d-q#e#lB7Sn`Wzcn0GkS7fWx z4O=w73~mIxJE1}v4JRR`@}ayPW+UX9dMo8s4?z~|I8*WZp#UZ|YIYB=-?n};Gz~9o z-X>rLyt_iE|JF@&>21W7=ekd+6Ur=b;N(x5=4lq!`)%Hx5HlhwiZucuA3sE~=F7IV z{>E@wwPqxf=iXYtwAiA2Mr$1ic8hIBO^a-M+^=}|krl&1!z8z;Zlf9Hv>UkZwQ7%$ zq1?)t92506O<0cUs1?POpD$v8mt$Rz0xR zyZuEu;L^fT%c7_lp=>&p(-4Ym=UcW@-S&STjn)M?KnVf(%KTwwg%ys;J*L`{$GT($ zBr{RqtY0)+Y3x0o0Gh#)FjmBHJyg{?Xo4VM8`(|WNJya6-tfDu_Yj9W1A-qPxoPY5 zbDpeRmOWP|%gUOn0$H{6M=ekzS8J+2;PRs{G}J*?b@}DFb+S^2RTV~Ssixl8v<)s} z)7rdy9wq3re>fhfV%>pkh#VB$+(saH@kdR2D6-GKkraH9QP}uZ2WWxw-Tzte!Viy7 zQRoEuEu0BhB-$^k-3bWaI8m|M(BWokWu=w^aLXdKh@b{aJ&`P>oA_`?^aA^=t>btA zw1>TEF+vm1Nvp2siRQ4cKd;x zW!_i8!H6G8gR?>vk%kr!IpC_}$4v6 zSA*YXdvyTc0h04Ji+Cc*y$m3&vMR{fE%QQsb3fJG;#OMK$q~k!TEchX;Wd4A2CdIl zH9G=FE*-bn7}kDBTR`BsGq6p(9N?9;-*>Fyi2*}K8CC+>k*1c%pFyF$fY7)hk3WjW z7jCk!;kj<|!P6SPwo?_(F0G+4jOycEivEEN)XH3QU*FWUW3C2cor8(XxISclsGe)jPB+=(kULY_ z+6(@2>G)gh=7^-ICt7erx+(_o^=s|fiw#OkO6&}#>YWa=NX4fdmIem}Bo&^XxhCB_i<{xS&sBk= z>k!n0;6PxYoD~QbS2J2OVZ6G3Y-UC2{iRDc*pBo8z2l*+ZW}@AQGFR`j5ztFHueUq zPfr{ZI{3rLlIm^CGU{iHl`>i_#-Q!0*xf`IfmGYXo*9|fzjF_YHo&)ku>3mrGKl=& zi|1LOMboR9)s37arCevdGYY$b8wgHIMe9)Yb<(H?=EWrio%i` z`>(OAQ9Zj9v~f0GV+%(0NxXEG7WJkz_w=V1HTHgONE5EHzlnN=W%%^KDHQ~-?ebRE+zD8>q_P<~x$dtp!ykRtopfB|xTjhMFB6E*y2;?#h<6e^ zemNe(VibZcNW}r6AH2Yba+I2?=V%e+^tUDQZM-cBPjS_H*3=5cagfc7(PgC}ms#St zVeY}-l08@5{SVLE;NszlSA4+rIa$HuOn*-P6L@$|p2BkyZ@b!d&&erp3(v{EFtWd^ zZsbpZfe;ea4ZSMzs^MLZR?9cB5|D$cu9g6`5<_jFsrL0HL#^@m_$eq1gClu(CwH02 zC250Ik$1t@FEQu%9;usnu*W@XjteUzRo8JC?uQpGeiy0MohkHdX8BRwpWgvExaeDc zpH*C{Ts!`i$jY*qdg=YiBP2F*>U35%HI068@b9PF*AW7?60~PAKKzZQ zwW59NBJ{Bs>;X8LfzI1FCGyc8K)aI7n3g0avvt!PVi|iA{k^Panj>@9XPP#QmWT1- zau&$p-Ee9b@m!BZZIH%y=^(uk86F`BG`|mji7a}0Z}$ecyo9tvQekt$NQ|Ve;_uWM zO-s0M-eJzu1N77v2nlkgzO>o2@;K%veyM45?wg-s&i_WlZ@W?mJcA3$1PGgC2|2KY za&tk`&ff>{xd~8;d3U|0C8KTA?=6P(_kq6=x&1P{{rsu$(EM=`dubdFKxcacPjh@d z0{|G+P+6GwdIwe(lguKX&zH)xE9X2PYmKITNMAMnM4`Ko&6L?syLS-_-*x;+STywC z3i`ZyBhar+;{VOy^W0;utHJO7xGui0Wd;u(?k4PeM}j|=N57epix-$ z^zq`Leb(TgGWfhG;J;|_M_`N5$7_LhwZZ>`VUX7Vd^*u7fCXtJEeJgKXy4}i9#O*P zbj}xe($TFj6c&$Vo#qbccnCvL=qAi!nRAoG6KwWeN9rZca|q5Qx@w4C1z!+igxaYg zj^1i)ip3G5tq_gvH6~ElzPP`vy2N zHcXU&SK|X#^VqU^4skZgh$}eV#%TpnRyWTnp6BCcPAx2TzC*NgI+W8KqCID!)=^jz zn~c(je4Pir9!(LK-pJ`|oK|ppgwxZUHW7`p;6X~BKA4}*Y{~*hTp!Fv!%~6~M}%Q2 zjb_DMwtqp2xQ_Q3r{$amSk6KRDoZl;oQAR8;KXv;kJB+k<0c|c2@lEBB$~C(!GleY zXEjS*~#w{{c((b1(n^ delta 6672 zcma)A3tUuH8ozgB6cn047!iB`DuPdZgK373(W~Z$rmeMY3gWBXN)0PY9CEy_x9cc$ zB*`|{Hfm{?%3|GE5**8Q$jq?RwTFe8>mZnDW@zZ$|99@4J8J#3`}@s#eDCxBzI)EO zv+aUde?crv5w{lg`KEnQPk}#0Jy}O_VAwjVK3&Hqi)j|&_Z6$e+r+SqI#JljUa))< zT|erDh^5Y}`?J5c%bVKg9XKgfrdOS~LpwXI9i=f(tk{Q>W~QXe6vQiiK|OUkoT^M33YI+qxylE$5d(h4_`4 zgvbl*Kwu2d$Z!sfn$U#lhE=QJ6a>_x!;nx5qM?d)%a>Sdzs|1PAot-1-`l!h zo+T+KrDvK1Bwlid_d?T@iR!65tNlXt5EW?Fp=-zo#y;7Lp;0qLRSUuidXIv<-cZJng& znMY~nITFsHr&>eDn|an%19{H5%C7W_adm>=HHmi9){tLv!@l5imU55^LTIxqm2SG( z)lM)rJDYshE#y58*Pq5AH)36tgdCY$!#h~`&HQ*AnWW|kw_~R)siF+>?eY_U*$+3@ zRA@U+Muff)3kpeasS#g{MZt^}73%>EZswKi9)V*e+;THHPa)rkJl1j;#M;bd^~bWR z2*?^NR@h06%Rd`L#aSkKoOM<7qL|WeVJQoI&N>SgYVDgyOZ55wj&v!`gEw=}!yJMz zapKtC^+L%B7Yh{Ur;=w{o$4&(ZuEWTOBYzOH}7t^_%uOxk-{M?gWi@4zXHso@B}as zxll<>!y_0=KBrg21br}`PWZXhL&TW!%0N6i7ET%2*H~<7=;Q9Qe zw=|Z`jqzJfDQ-EXxaBe~ z^nDCg6gd-YcQv&_o}st}imG*HF}Qv~u{ zs9{;@rs-;(PN#{E$&HsKe$I$>Y|~)pqlv7SSc?b$fRe9mos2`P_tS{HWzkmSTsAgc0A1Ot$+a&I)L5dah@t!LR^hYSxt z6c@0iHnS5$dfH$6j@#;|+qxLXIt@*-ORM$qUDkiFkw@tV{sTGv zFqPA9&OA4Knq^KSyLJL&I;69)Nj`fV0`OHnoZM9Fby) zJj14qNRIJ0p{A)ybEJOy=RvO3|NUvEj#y#&Pa_LU>lbmR6GxtWMl`aqY4H(n#+a02 zr`f}4-6A#yDesqDMxCa*U7LRuh|A3yb&i=qOSz}n7il?`^`}`z`gYgIAfu^) ze*}$EADZEaJ_|k*JdhXZpgToATqi%OL zMWZvnGMO&_6E}E8hG53=J{KXcFJlz&x(i=3rq_qbYdHaIdfi|e_Q!L(Hd(>u5cK?5 zP~bs7a%-iJkcF&eWMY>O{)ah2+{pn;4GPBj*|6IaVoQEYEAsj6a2t@rbPCdNdw17S zyh@THm+=K!?=fUwB%AJPq6)cBqVi~@t_`J_j7cHwcw}!SFjs2%Nw5Y7cTENX5h%n} zhEoVp3{jXWIIoLIivVGlZ|~shL25>R#^LL%u}>hCZFNuwLO?$sevE?X1$c(B7>Jsn zesB_?TJ)XKNabP3VlN@T%i)(kU+81c0JSQNk0U;h>Of-qNo>POwqaC!>cD%6;`6@- zn^>-wu)(yRfXDZS4po>TH2owyI;x9nB6mi4PofZw)i~*c%3O_6BI8Wa%N-%QQD03Q zM9tOg`4+3O(o1iah##`XYMw-P@YPS>%tm3JUmWDVM^S@qz7P6B9f!CGMY#GJx25>d zG2Ld8+YLruX_;=X2>Ifm@dr<``q7gnzIuYRN=T$+&uS}W6QNqS%+!JF1Up7)5OA4P z1d4n_2QJSL+I50Gd`Fk0=19_^u!=DXps^iy3Nx&%6YPyUlEfM8vpW*|%r)ivki1a~ zlOf-jMn35^fX`zmShq28F#!Pax zLRxX|#uEQbo4K-wU(fwkD1f%spxNNZe-5Jj*xZR?6#N|lMO8sBM1dJ$G<~3skpemW z(h0ov>pr)qIBVVcwL)GjWFNqVm=N^iKhw^B{RqoxAm` z{g=csulBXqTrig+a@_2D!pT8|O-^cHTy@rBGz_CkimKr?ZmqV4F6N3)^8Z6uOMu!d zhT2+F?P|TD)`)ACHDLe(nFYuq)^JIB!T8qf|5rUr&*(b148bYReeV2y!js{ua~~W> z;}*@oOLz3A@6zSF^?269uwOaGPrJJMiY*zd!r#dpW)b_dAv5kJl-ZgYX|78@WQbkb zjM|7fk}k~tSkve&EUDA9X7pv~Y5zL%nWk;SeAhuuOTaw;kfvpzw;a~Ax#(9M(X;~e zxnF47R`e6T(KIjmuhG||&pl0&c$FAZU_bFH>2Omrr(ns zn*f)+;Zm5;j(`+U4Z<+|*0LgSw|=7k2SOZrqVdbcuY?`ACnoU)=BIB=kRho1$18aY!iz$5FOKp*);-)qo0gy3~D=o9Fw zh=)UmzP~{qjoPM%Lxw&Bbn+?`bF!XY`h3j{FlY&LPKeuR)(OR;S|* z$e$>dH8&>gb8tlz=316GFPYgxLmZ;eSx9-r5Hh+eENfm&Y+EWrAxfy7aXmMdC5FVr z4hFX)5YIOJByed#7(ratJTHXx3W*l8**RERgjbtnxAJ#K5R6WEgeB(0h|hA3f*fnK zB~n;D{(5JBBPRaZMr+**7d_b9mMHNgr(WW1Bs&I)p1e)t?QGs=v$6AIVjqQTI})0( z6nQW1m6+klj$D9s<@~tO#1` zx#Y{dE$8iC-X7&`GjGG#+Y8D5bhJr)Jeoa&@!c4wicfROi@be8wp9>X2Cqa|Dr z5&|Ce{uGzF%-e2k>7p2M5O2ru_FmrRumhkkmRKS526SHGGTT|! z;#BcVR)jX3?OGfo##0+R9B+3d5-Mcsz60;Vgo<;EqjM_S|Dm*9#kOVjZ7j#TvgaNj*glZ$)#KUT$JdA(+4Lt;Bj?_un+^2+ sD~g_QiK3rH=D8w{&>aX*H~f0Bm-70BB|O9lLKZuamnJ^L!k$Y0KWaJfp#T5? diff --git a/tools/whip_relay.c b/tools/whip_relay.c index d6c9a15e..b989da85 100644 --- a/tools/whip_relay.c +++ b/tools/whip_relay.c @@ -30,8 +30,6 @@ typedef struct char path[256]; int rtp_port; int rtp_fd; - int audio_port; - int audio_fd; Viewer viewers[MAX_VIEWERS]; int viewer_count; int active; @@ -72,7 +70,7 @@ static double get_elapsed_seconds(struct timeval *start) return (now.tv_sec - start->tv_sec) + (now.tv_usec - start->tv_usec) / 1000000.0; } -static void *rtp_receiver_thread(void *arg) +static void *receiver_thread(void *arg) { Session *session = (Session *)arg; char buf[256 * 1024]; @@ -85,20 +83,21 @@ static void *rtp_receiver_thread(void *arg) socklen_t fromlen = sizeof(from); int n = recvfrom(session->rtp_fd, buf, sizeof(buf), 0, (struct sockaddr *)&from, &fromlen); - if (n <= 0) + if (n <= 1) { continue; } - if (!started) + char tag = buf[0]; + char *payload = buf + 1; + int payload_len = n - 1; + + if (!started && tag == 0x01) { gettimeofday(&start_time, NULL); started = 1; } - double elapsed = get_elapsed_seconds(&start_time); - uint32_t timestamp = (uint32_t)(elapsed * 90000.0); - pthread_mutex_lock(&session->lock); for (int v = 0; v < session->viewer_count; v++) @@ -110,15 +109,31 @@ static void *rtp_receiver_thread(void *arg) continue; } - for (int t = 0; t < viewer->track_count; t++) + if (tag == 0x01) { - if (!rtcIsOpen(viewer->tracks[t])) + for (int t = 0; t < viewer->track_count; t++) { - continue; + if (!rtcIsOpen(viewer->tracks[t])) + { + continue; + } + + double elapsed = started ? get_elapsed_seconds(&start_time) : 0; + uint32_t timestamp = (uint32_t)(elapsed * 90000.0); + rtcSetTrackRtpTimestamp(viewer->tracks[t], timestamp); + rtcSendMessage(viewer->tracks[t], payload, payload_len); + } + } + + if (tag == 0x02 && viewer->audio_track > 0) + { + if (rtcIsOpen(viewer->audio_track)) + { + rtcSetTrackRtpTimestamp(viewer->audio_track, session->audio_pts); + rtcSendMessage(viewer->audio_track, payload, payload_len); } - rtcSetTrackRtpTimestamp(viewer->tracks[t], timestamp); - rtcSendMessage(viewer->tracks[t], buf, n); + session->audio_pts += 960; } } @@ -128,51 +143,6 @@ static void *rtp_receiver_thread(void *arg) return NULL; } -static void *audio_receiver_thread(void *arg) -{ - Session *session = (Session *)arg; - char buf[4096]; - - while (running && session->active) - { - struct sockaddr_in from; - socklen_t fromlen = sizeof(from); - int n = recvfrom(session->audio_fd, buf, sizeof(buf), 0, (struct sockaddr *)&from, &fromlen); - - if (n <= 0) - { - continue; - } - - uint32_t ts = session->audio_pts; - session->audio_pts += 960; - - pthread_mutex_lock(&session->lock); - - for (int v = 0; v < session->viewer_count; v++) - { - Viewer *viewer = &session->viewers[v]; - - if (!viewer->connected || viewer->audio_track <= 0) - { - continue; - } - - if (!rtcIsOpen(viewer->audio_track)) - { - continue; - } - - rtcSetTrackRtpTimestamp(viewer->audio_track, ts); - rtcSendMessage(viewer->audio_track, buf, n); - } - - pthread_mutex_unlock(&session->lock); - } - - return NULL; -} - static Session *create_session_slot(const char *path) { for (int i = 0; i < MAX_SESSIONS; i++) @@ -206,27 +176,12 @@ static Session *create_session_slot(const char *path) setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); sessions[i].rtp_fd = fd; - - sessions[i].audio_port = next_rtp_port++; - int afd = socket(AF_INET, SOCK_DGRAM, 0); - struct sockaddr_in aaddr; - memset(&aaddr, 0, sizeof(aaddr)); - aaddr.sin_family = AF_INET; - aaddr.sin_addr.s_addr = inet_addr("127.0.0.1"); - aaddr.sin_port = htons(sessions[i].audio_port); - bind(afd, (struct sockaddr *)&aaddr, sizeof(aaddr)); - setsockopt(afd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); - sessions[i].audio_fd = afd; sessions[i].audio_pts = 0; pthread_t tid; - pthread_create(&tid, NULL, rtp_receiver_thread, &sessions[i]); + pthread_create(&tid, NULL, receiver_thread, &sessions[i]); pthread_detach(tid); - pthread_t atid; - pthread_create(&atid, NULL, audio_receiver_thread, &sessions[i]); - pthread_detach(atid); - return &sessions[i]; } } @@ -516,8 +471,8 @@ static void handle_client(int client_fd) if (session) { - char port_str[64]; - snprintf(port_str, sizeof(port_str), "%d,%d", session->rtp_port, session->audio_port); + char port_str[16]; + snprintf(port_str, sizeof(port_str), "%d", session->rtp_port); send_http_response(client_fd, 200, "text/plain", port_str, strlen(port_str)); } else