From 44f8f1e83bcfd334d17c5ba7b00d429f72396adc Mon Sep 17 00:00:00 2001 From: henryruhs Date: Mon, 23 Mar 2026 13:25:44 +0100 Subject: [PATCH] mass test approaches --- facefusion/apis/core.py | 50 ++- facefusion/apis/endpoints/stream.py | 673 +++++++++++++++++++++++++++- facefusion/apis/stream_helper.py | 185 +++++++- facefusion/system.py | 4 +- test_whip_stream.html | 293 +++++++++++- 5 files changed, 1166 insertions(+), 39 deletions(-) diff --git a/facefusion/apis/core.py b/facefusion/apis/core.py index 0f8a25b8..ad5a501b 100644 --- a/facefusion/apis/core.py +++ b/facefusion/apis/core.py @@ -13,7 +13,8 @@ from facefusion.apis.endpoints.metrics import get_metrics, websocket_metrics from facefusion.apis.endpoints.ping import websocket_ping from facefusion.apis.endpoints.session import create_session, destroy_session, get_session, refresh_session from facefusion.apis.endpoints.state import get_state, set_state -from facefusion.apis.endpoints.stream import websocket_stream, websocket_stream_whip +from facefusion import logger +from facefusion.apis.endpoints.stream import websocket_stream, websocket_stream_audio, websocket_stream_live, websocket_stream_mjpeg, websocket_stream_rtc, websocket_stream_whip, websocket_stream_whip_aio, websocket_stream_whip_dc, websocket_stream_whip_py from facefusion.apis.middlewares.session import create_session_guard @@ -21,9 +22,47 @@ from facefusion.apis.middlewares.session import create_session_guard async def lifespan(app : Starlette) -> AsyncGenerator[None, None]: mediamtx.start() mediamtx.wait_for_ready() + + try: + from facefusion import webrtc_sfu + webrtc_sfu.start() + except Exception as exception: + logger.warn('webrtc sfu: ' + str(exception), __name__) + + try: + from facefusion import whip_relay + whip_relay.start() + whip_relay.wait_for_ready() + except Exception as exception: + logger.warn('whip relay: ' + str(exception), __name__) + + try: + from facefusion import rtc + rtc.start() + except Exception as exception: + logger.warn('rtc: ' + str(exception), __name__) + yield mediamtx.stop() + try: + from facefusion import webrtc_sfu + webrtc_sfu.stop() + except Exception: + pass + + try: + from facefusion import whip_relay + whip_relay.stop() + except Exception: + pass + + try: + from facefusion import rtc + rtc.stop() + except Exception: + pass + def create_api() -> Starlette: session_guard = Middleware(create_session_guard) @@ -44,7 +83,14 @@ def create_api() -> Starlette: WebSocketRoute('/metrics', websocket_metrics, middleware = [ session_guard ]), WebSocketRoute('/ping', websocket_ping, middleware = [ session_guard ]), WebSocketRoute('/stream', websocket_stream, middleware = [ session_guard ]), - WebSocketRoute('/stream/whip', websocket_stream_whip, middleware = [ session_guard ]) + WebSocketRoute('/stream/whip', websocket_stream_whip, middleware = [ session_guard ]), + WebSocketRoute('/stream/whip-py', websocket_stream_whip_py, middleware = [ session_guard ]), + WebSocketRoute('/stream/whip-dc', websocket_stream_whip_dc, middleware = [ session_guard ]), + WebSocketRoute('/stream/live', websocket_stream_live, middleware = [ session_guard ]), + WebSocketRoute('/stream/whip-aio', websocket_stream_whip_aio, middleware = [ session_guard ]), + WebSocketRoute('/stream/rtc', websocket_stream_rtc, middleware = [ session_guard ]), + WebSocketRoute('/stream/mjpeg', websocket_stream_mjpeg, middleware = [ session_guard ]), + WebSocketRoute('/stream/audio', websocket_stream_audio, middleware = [ session_guard ]) ] api = Starlette(routes = routes, lifespan = lifespan) diff --git a/facefusion/apis/endpoints/stream.py b/facefusion/apis/endpoints/stream.py index 441a89fb..da87e759 100644 --- a/facefusion/apis/endpoints/stream.py +++ b/facefusion/apis/endpoints/stream.py @@ -1,9 +1,11 @@ import asyncio +import fcntl +import os as _os import threading import time from collections import deque from concurrent.futures import ThreadPoolExecutor -from typing import Deque +from typing import Deque, List import cv2 import numpy @@ -13,7 +15,7 @@ from facefusion import logger, session_context, session_manager, state_manager from facefusion.apis.api_helper import get_sec_websocket_protocol from facefusion.apis.session_helper import extract_access_token from facefusion import mediamtx -from facefusion.apis.stream_helper import STREAM_FPS, STREAM_QUALITY, close_whip_encoder, create_whip_encoder, feed_whip_audio, feed_whip_frame, process_stream_frame +from facefusion.apis.stream_helper import STREAM_FPS, STREAM_QUALITY, close_fmp4_encoder, close_whip_encoder, collect_fmp4_chunks, create_fmp4_encoder, create_vp8_pipe_encoder, create_whip_encoder, feed_whip_audio, feed_whip_frame, process_stream_frame, read_fmp4_output from facefusion.streamer import process_vision_frame from facefusion.types import VisionFrame @@ -80,7 +82,8 @@ def run_whip_pipeline(latest_frame_holder : list, lock : threading.Lock, stop_ev if not encoder: height, width = temp_vision_frame.shape[:2] - encoder, audio_write_fd = create_whip_encoder(width, height, STREAM_FPS, STREAM_QUALITY, stream_path) + whip_url = mediamtx.get_whip_url(stream_path) + encoder, audio_write_fd = create_whip_encoder(width, height, STREAM_FPS, STREAM_QUALITY, whip_url) audio_write_fd_holder[0] = audio_write_fd logger.info('whip encoder started ' + str(width) + 'x' + str(height), __name__) @@ -93,11 +96,6 @@ def run_whip_pipeline(latest_frame_holder : list, lock : threading.Lock, stop_ev time.sleep(0.005) if encoder: - stderr_output = encoder.stderr.read() if encoder.stderr else b'' - - if stderr_output: - logger.error('ffmpeg: ' + stderr_output.decode(), __name__) - close_whip_encoder(encoder, audio_write_fd) @@ -158,3 +156,662 @@ async def websocket_stream_whip(websocket : WebSocket) -> None: return await websocket.close() + + +def run_fmp4_pipeline(latest_frame_holder : list, lock : threading.Lock, stop_event : threading.Event, output_chunks : List[bytes], output_lock : threading.Lock, audio_write_fd_holder : list) -> None: + encoder = None + audio_write_fd = -1 + reader_thread = None + output_deque : Deque[VisionFrame] = deque() + + with ThreadPoolExecutor(max_workers = state_manager.get_item('execution_thread_count')) as executor: + futures = [] + + while not stop_event.is_set(): + with lock: + capture_frame = latest_frame_holder[0] + latest_frame_holder[0] = None + + if capture_frame is not None: + future = executor.submit(process_stream_frame, capture_frame) + futures.append(future) + + for future_done in [ future for future in futures if future.done() ]: + output_deque.append(future_done.result()) + futures.remove(future_done) + + if encoder and encoder.poll() is not None: + stderr_output = encoder.stderr.read() if encoder.stderr else b'' + logger.error('fmp4 encoder died with code ' + str(encoder.returncode) + ': ' + stderr_output.decode(), __name__) + break + + while output_deque: + temp_vision_frame = output_deque.popleft() + + if not encoder: + height, width = temp_vision_frame.shape[:2] + encoder, audio_write_fd = create_fmp4_encoder(width, height, STREAM_FPS, STREAM_QUALITY) + audio_write_fd_holder[0] = audio_write_fd + reader_thread = threading.Thread(target = read_fmp4_output, args = (encoder, output_chunks, output_lock), daemon = True) + reader_thread.start() + logger.info('fmp4 encoder started ' + str(width) + 'x' + str(height), __name__) + + feed_whip_frame(encoder, temp_vision_frame) + + if capture_frame is None and not output_deque: + time.sleep(0.005) + + if encoder: + close_fmp4_encoder(encoder, audio_write_fd) + + +async def websocket_stream_live(websocket : WebSocket) -> None: + subprotocol = get_sec_websocket_protocol(websocket.scope) + access_token = extract_access_token(websocket.scope) + session_id = session_manager.find_session_id(access_token) + + session_context.set_session_id(session_id) + source_paths = state_manager.get_item('source_paths') + + await websocket.accept(subprotocol = subprotocol) + + if source_paths: + latest_frame_holder : list = [None] + audio_write_fd_holder : list = [-1] + output_chunks : List[bytes] = [] + lock = threading.Lock() + output_lock = threading.Lock() + stop_event = threading.Event() + worker = threading.Thread(target = run_fmp4_pipeline, args = (latest_frame_holder, lock, stop_event, output_chunks, output_lock, audio_write_fd_holder), daemon = True) + worker.start() + + try: + while True: + message = await websocket.receive() + + chunks = collect_fmp4_chunks(output_chunks, output_lock) + + if chunks: + await websocket.send_bytes(chunks) + + if message.get('bytes'): + data = message.get('bytes') + + if data[:2] == JPEG_MAGIC: + frame = cv2.imdecode(numpy.frombuffer(data, numpy.uint8), cv2.IMREAD_COLOR) + + if numpy.any(frame): + with lock: + latest_frame_holder[0] = frame + + if data[:2] != JPEG_MAGIC and audio_write_fd_holder[0] > 0: + feed_whip_audio(audio_write_fd_holder[0], data) + + except Exception as exception: + logger.error(str(exception), __name__) + + stop_event.set() + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, worker.join, 10) + return + + await websocket.close() + + +def run_mjpeg_pipeline(latest_frame_holder : list, lock : threading.Lock, stop_event : threading.Event, output_holder : list, output_lock : threading.Lock) -> None: + output_deque : Deque[VisionFrame] = deque() + + with ThreadPoolExecutor(max_workers = state_manager.get_item('execution_thread_count')) as executor: + futures = [] + + while not stop_event.is_set(): + with lock: + capture_frame = latest_frame_holder[0] + latest_frame_holder[0] = None + + if capture_frame is not None: + future = executor.submit(process_stream_frame, capture_frame) + futures.append(future) + + for future_done in [ future for future in futures if future.done() ]: + output_deque.append(future_done.result()) + futures.remove(future_done) + + while output_deque: + temp_vision_frame = output_deque.popleft() + is_success, encoded = cv2.imencode('.jpg', temp_vision_frame, [cv2.IMWRITE_JPEG_QUALITY, 92]) + + if is_success: + with output_lock: + output_holder[0] = encoded.tobytes() + + if capture_frame is None and not output_deque: + time.sleep(0.005) + + +async def websocket_stream_mjpeg(websocket : WebSocket) -> None: + subprotocol = get_sec_websocket_protocol(websocket.scope) + access_token = extract_access_token(websocket.scope) + session_id = session_manager.find_session_id(access_token) + + session_context.set_session_id(session_id) + source_paths = state_manager.get_item('source_paths') + + await websocket.accept(subprotocol = subprotocol) + + if source_paths: + latest_frame_holder : list = [None] + output_holder : list = [None] + lock = threading.Lock() + output_lock = threading.Lock() + stop_event = threading.Event() + worker = threading.Thread(target = run_mjpeg_pipeline, args = (latest_frame_holder, lock, stop_event, output_holder, output_lock), daemon = True) + worker.start() + + try: + while True: + message = await websocket.receive() + + with output_lock: + jpeg_data = output_holder[0] + output_holder[0] = None + + if jpeg_data: + await websocket.send_bytes(jpeg_data) + + if message.get('bytes'): + data = message.get('bytes') + + if data[:2] == JPEG_MAGIC: + frame = cv2.imdecode(numpy.frombuffer(data, numpy.uint8), cv2.IMREAD_COLOR) + + if numpy.any(frame): + with lock: + latest_frame_holder[0] = frame + + except Exception as exception: + logger.error(str(exception), __name__) + + stop_event.set() + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, worker.join, 10) + return + + await websocket.close() + + +async def websocket_stream_audio(websocket : WebSocket) -> None: + subprotocol = get_sec_websocket_protocol(websocket.scope) + access_token = extract_access_token(websocket.scope) + session_id = session_manager.find_session_id(access_token) + + session_context.set_session_id(session_id) + + await websocket.accept(subprotocol = subprotocol) + + try: + while True: + message = await websocket.receive() + + if message.get('bytes'): + await websocket.send_bytes(message.get('bytes')) + except Exception: + pass + + +async def websocket_stream_whip_py(websocket : WebSocket) -> None: + subprotocol = get_sec_websocket_protocol(websocket.scope) + access_token = extract_access_token(websocket.scope) + session_id = session_manager.find_session_id(access_token) + + session_context.set_session_id(session_id) + source_paths = state_manager.get_item('source_paths') + + await websocket.accept(subprotocol = subprotocol) + + if source_paths: + from facefusion.aiortc_bridge import AiortcBridge + + bridge = AiortcBridge() + await bridge.start() + whep_url = 'http://localhost:' + str(bridge.port) + '/whep' + + latest_frame_holder : list = [None] + whep_sent = False + lock = threading.Lock() + stop_event = threading.Event() + ready_event = threading.Event() + worker = threading.Thread(target = run_aiortc_pipeline, args = (latest_frame_holder, lock, stop_event, ready_event, bridge), daemon = True) + worker.start() + + try: + while True: + message = await websocket.receive() + + if not whep_sent and ready_event.is_set(): + await websocket.send_text(whep_url) + whep_sent = True + + if message.get('bytes'): + data = message.get('bytes') + + if data[:2] == JPEG_MAGIC: + frame = cv2.imdecode(numpy.frombuffer(data, numpy.uint8), cv2.IMREAD_COLOR) + + if numpy.any(frame): + with lock: + latest_frame_holder[0] = frame + + if data[:2] != JPEG_MAGIC: + bridge.push_audio(data) + + except Exception as exception: + logger.error(str(exception), __name__) + + stop_event.set() + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, worker.join, 10) + await bridge.stop() + return + + await websocket.close() + + +def run_aiortc_pipeline(latest_frame_holder : list, lock : threading.Lock, stop_event : threading.Event, ready_event : threading.Event, bridge : object) -> None: + output_deque : Deque[VisionFrame] = deque() + + with ThreadPoolExecutor(max_workers = state_manager.get_item('execution_thread_count')) as executor: + futures = [] + + while not stop_event.is_set(): + with lock: + capture_frame = latest_frame_holder[0] + latest_frame_holder[0] = None + + if capture_frame is not None: + future = executor.submit(process_stream_frame, capture_frame) + futures.append(future) + + for future_done in [ future for future in futures if future.done() ]: + output_deque.append(future_done.result()) + futures.remove(future_done) + + while output_deque: + temp_vision_frame = output_deque.popleft() + bridge.push_frame(temp_vision_frame) + + if not ready_event.is_set(): + time.sleep(2) + ready_event.set() + + if capture_frame is None and not output_deque: + time.sleep(0.005) + + +def read_h264_output(process, h264_chunks : List[bytes], h264_lock : threading.Lock) -> None: + fd = process.stdout.fileno() + flags = fcntl.fcntl(fd, fcntl.F_GETFL) + fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~_os.O_NONBLOCK) + + while True: + chunk = _os.read(fd, 4096) + + if not chunk: + break + + with h264_lock: + h264_chunks.append(chunk) + + +def read_ivf_frames(process, frame_list : List[bytes], frame_lock : threading.Lock) -> None: + fd = process.stdout.fileno() + flags = fcntl.fcntl(fd, fcntl.F_GETFL) + fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~_os.O_NONBLOCK) + + header = b'' + + while len(header) < 32: + chunk = _os.read(fd, 32 - len(header)) + + if not chunk: + return + + header += chunk + + while True: + frame_header = b'' + + while len(frame_header) < 12: + chunk = _os.read(fd, 12 - len(frame_header)) + + if not chunk: + return + + frame_header += chunk + + frame_size = int.from_bytes(frame_header[0:4], 'little') + frame_data = b'' + + while len(frame_data) < frame_size: + chunk = _os.read(fd, frame_size - len(frame_data)) + + if not chunk: + return + + frame_data += chunk + + with frame_lock: + frame_list.append(frame_data) + + +def run_h264_dc_pipeline(latest_frame_holder : list, lock : threading.Lock, stop_event : threading.Event, ready_event : threading.Event, backend : str, stream_path : str, rtp_port : int) -> None: + encoder = None + reader_thread = None + vp8_frames : List[bytes] = [] + vp8_lock = threading.Lock() + output_deque : Deque[VisionFrame] = deque() + udp_sock = None + + if backend == 'relay': + import socket as sock + udp_sock = sock.socket(sock.AF_INET, sock.SOCK_DGRAM) + + with ThreadPoolExecutor(max_workers = state_manager.get_item('execution_thread_count')) as executor: + futures = [] + + while not stop_event.is_set(): + with lock: + capture_frame = latest_frame_holder[0] + latest_frame_holder[0] = None + + if capture_frame is not None: + future = executor.submit(process_stream_frame, capture_frame) + futures.append(future) + + for future_done in [ future for future in futures if future.done() ]: + output_deque.append(future_done.result()) + futures.remove(future_done) + + if encoder and encoder.poll() is not None: + stderr_output = encoder.stderr.read() if encoder.stderr else b'' + logger.error('vp8 encoder died: ' + stderr_output.decode(), __name__) + break + + while output_deque: + temp_vision_frame = output_deque.popleft() + + if not encoder: + height, width = temp_vision_frame.shape[:2] + encoder = create_vp8_pipe_encoder(width, height, STREAM_FPS, STREAM_QUALITY) + reader_thread = threading.Thread(target = read_ivf_frames, args = (encoder, vp8_frames, vp8_lock), daemon = True) + reader_thread.start() + logger.info('vp8 encoder started ' + str(width) + 'x' + str(height) + ' [' + backend + ']', __name__) + + feed_whip_frame(encoder, temp_vision_frame) + + with vp8_lock: + if vp8_frames: + pending = list(vp8_frames) + vp8_frames.clear() + + for frame in pending: + if backend == 'relay' and udp_sock: + if len(frame) <= 65000: + udp_sock.sendto(frame, ('127.0.0.1', rtp_port)) + if backend == 'rtc': + from facefusion import rtc + rtc.send_vp8_frame(stream_path, frame) + + if not ready_event.is_set() and encoder and encoder.poll() is None: + time.sleep(1) + ready_event.set() + + if capture_frame is None and not output_deque: + time.sleep(0.005) + + if encoder: + encoder.stdin.close() + encoder.terminate() + encoder.wait(timeout = 5) + + if udp_sock: + udp_sock.close() + + +async def websocket_stream_whip_dc(websocket : WebSocket) -> None: + subprotocol = get_sec_websocket_protocol(websocket.scope) + access_token = extract_access_token(websocket.scope) + session_id = session_manager.find_session_id(access_token) + + session_context.set_session_id(session_id) + source_paths = state_manager.get_item('source_paths') + + await websocket.accept(subprotocol = subprotocol) + + if source_paths: + from facefusion import whip_relay + stream_path = 'stream/' + session_id + rtp_port = whip_relay.create_session(stream_path) + + if not rtp_port: + logger.error('failed to create relay session', __name__) + await websocket.close() + return + + latest_frame_holder : list = [None] + whep_sent = False + lock = threading.Lock() + stop_event = threading.Event() + ready_event = threading.Event() + worker = threading.Thread(target = run_h264_dc_pipeline, args = (latest_frame_holder, lock, stop_event, ready_event, 'relay', stream_path, rtp_port), daemon = True) + worker.start() + + try: + while True: + message = await websocket.receive() + + if not whep_sent and ready_event.is_set(): + whep_url = whip_relay.get_whep_url(stream_path) + await websocket.send_text(whep_url) + whep_sent = True + + if message.get('bytes'): + data = message.get('bytes') + + if data[:2] == JPEG_MAGIC: + frame = cv2.imdecode(numpy.frombuffer(data, numpy.uint8), cv2.IMREAD_COLOR) + + if numpy.any(frame): + with lock: + latest_frame_holder[0] = frame + + except Exception as exception: + logger.error(str(exception), __name__) + + stop_event.set() + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, worker.join, 10) + return + + await websocket.close() + + +async def websocket_stream_whip_aio(websocket : WebSocket) -> None: + subprotocol = get_sec_websocket_protocol(websocket.scope) + access_token = extract_access_token(websocket.scope) + session_id = session_manager.find_session_id(access_token) + + session_context.set_session_id(session_id) + source_paths = state_manager.get_item('source_paths') + + await websocket.accept(subprotocol = subprotocol) + + if source_paths: + from facefusion.aiortc_bridge import AiortcBridge + + bridge = AiortcBridge() + await bridge.start() + whep_url = 'http://localhost:' + str(bridge.port) + '/whep' + + latest_frame_holder : list = [None] + whep_sent = False + lock = threading.Lock() + stop_event = threading.Event() + ready_event = threading.Event() + worker = threading.Thread(target = run_aiortc_pipeline, args = (latest_frame_holder, lock, stop_event, ready_event, bridge), daemon = True) + worker.start() + + try: + while True: + message = await websocket.receive() + + if not whep_sent and ready_event.is_set(): + await websocket.send_text(whep_url) + whep_sent = True + + if message.get('bytes'): + data = message.get('bytes') + + if data[:2] == JPEG_MAGIC: + frame = cv2.imdecode(numpy.frombuffer(data, numpy.uint8), cv2.IMREAD_COLOR) + + if numpy.any(frame): + with lock: + latest_frame_holder[0] = frame + + if data[:2] != JPEG_MAGIC: + bridge.push_audio(data) + + except Exception as exception: + logger.error(str(exception), __name__) + + stop_event.set() + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, worker.join, 10) + await bridge.stop() + return + + await websocket.close() + + +def run_rtc_direct_pipeline(latest_frame_holder : list, lock : threading.Lock, stop_event : threading.Event, ready_event : threading.Event, stream_path : str) -> None: + from facefusion import rtc + encoder = None + reader_thread = None + vp8_frames : List[bytes] = [] + vp8_lock = threading.Lock() + output_deque : Deque[VisionFrame] = deque() + + with ThreadPoolExecutor(max_workers = state_manager.get_item('execution_thread_count')) as executor: + futures = [] + + while not stop_event.is_set(): + with lock: + capture_frame = latest_frame_holder[0] + latest_frame_holder[0] = None + + if capture_frame is not None: + h, w = capture_frame.shape[:2] + + if w > 640: + scale = 640 / w + capture_frame = cv2.resize(capture_frame, (640, int(h * scale) - int(h * scale) % 2)) + + if len(futures) < 4: + future = executor.submit(process_stream_frame, capture_frame) + futures.append(future) + + for future_done in [ future for future in futures if future.done() ]: + output_deque.append(future_done.result()) + futures.remove(future_done) + + if encoder and encoder.poll() is not None: + stderr_output = encoder.stderr.read() if encoder.stderr else b'' + logger.error('vp8 encoder died: ' + stderr_output.decode(), __name__) + break + + while output_deque: + temp_vision_frame = output_deque.popleft() + + if not encoder: + height, width = temp_vision_frame.shape[:2] + encoder = create_vp8_pipe_encoder(width, height, STREAM_FPS, STREAM_QUALITY) + reader_thread = threading.Thread(target = read_ivf_frames, args = (encoder, vp8_frames, vp8_lock), daemon = True) + reader_thread.start() + logger.info('vp8 direct encoder started ' + str(width) + 'x' + str(height), __name__) + + feed_whip_frame(encoder, temp_vision_frame) + + with vp8_lock: + if vp8_frames: + pending = list(vp8_frames) + vp8_frames.clear() + + for frame in pending: + rtc.send_to_viewers(stream_path, frame) + + if not ready_event.is_set() and encoder and encoder.poll() is None: + time.sleep(0.5) + ready_event.set() + + if capture_frame is None and not output_deque: + time.sleep(0.003) + + if encoder: + encoder.stdin.close() + encoder.terminate() + encoder.wait(timeout = 5) + + +async def websocket_stream_rtc(websocket : WebSocket) -> None: + subprotocol = get_sec_websocket_protocol(websocket.scope) + access_token = extract_access_token(websocket.scope) + session_id = session_manager.find_session_id(access_token) + + session_context.set_session_id(session_id) + source_paths = state_manager.get_item('source_paths') + + await websocket.accept(subprotocol = subprotocol) + + if source_paths: + from facefusion import rtc + stream_path = 'stream/' + session_id + rtc.create_session(stream_path) + whep_url = 'http://localhost:' + str(rtc.WHEP_PORT) + '/' + stream_path + '/whep' + + latest_frame_holder : list = [None] + whep_sent = False + lock = threading.Lock() + stop_event = threading.Event() + ready_event = threading.Event() + worker = threading.Thread(target = run_rtc_direct_pipeline, args = (latest_frame_holder, lock, stop_event, ready_event, stream_path), daemon = True) + worker.start() + + try: + while True: + message = await websocket.receive() + + if not whep_sent and ready_event.is_set(): + await websocket.send_text(whep_url) + whep_sent = True + + if message.get('bytes'): + data = message.get('bytes') + + if data[:2] == JPEG_MAGIC: + frame = cv2.imdecode(numpy.frombuffer(data, numpy.uint8), cv2.IMREAD_COLOR) + + if numpy.any(frame): + with lock: + latest_frame_holder[0] = frame + + except Exception as exception: + logger.error(str(exception), __name__) + + stop_event.set() + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, worker.join, 10) + rtc.destroy_session(stream_path) + return + + await websocket.close() diff --git a/facefusion/apis/stream_helper.py b/facefusion/apis/stream_helper.py index adf2d37b..978dac9e 100644 --- a/facefusion/apis/stream_helper.py +++ b/facefusion/apis/stream_helper.py @@ -1,21 +1,50 @@ import os import subprocess import tempfile -from typing import Tuple +import threading +from typing import List, Optional, Tuple import cv2 -from facefusion import ffmpeg_builder, mediamtx +from facefusion import ffmpeg_builder from facefusion.streamer import process_vision_frame from facefusion.types import VisionFrame STREAM_FPS : int = 30 -STREAM_QUALITY : int = 45 +STREAM_QUALITY : int = 80 STREAM_AUDIO_RATE : int = 48000 DTLS_CERT_FILE : str = os.path.join(tempfile.gettempdir(), 'facefusion_dtls_cert.pem') DTLS_KEY_FILE : str = os.path.join(tempfile.gettempdir(), 'facefusion_dtls_key.pem') +def compute_bitrate(width : int, height : int) -> str: + pixels = width * height + + if pixels <= 320 * 240: + return '400k' + if pixels <= 640 * 480: + return '1000k' + if pixels <= 1280 * 720: + return '2000k' + if pixels <= 1920 * 1080: + return '3500k' + return '5000k' + + +def compute_bufsize(width : int, height : int) -> str: + pixels = width * height + + if pixels <= 320 * 240: + return '800k' + if pixels <= 640 * 480: + return '2000k' + if pixels <= 1280 * 720: + return '4000k' + if pixels <= 1920 * 1080: + return '7000k' + return '10000k' + + def create_dtls_certificate() -> None: if os.path.isfile(DTLS_CERT_FILE) and os.path.isfile(DTLS_KEY_FILE): return @@ -27,10 +56,9 @@ def create_dtls_certificate() -> None: ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL) -def create_whip_encoder(width : int, height : int, stream_fps : int, stream_quality : int, stream_path : str) -> Tuple[subprocess.Popen[bytes], int]: +def create_whip_encoder(width : int, height : int, stream_fps : int, stream_quality : int, whip_url : str) -> Tuple[subprocess.Popen[bytes], int]: create_dtls_certificate() audio_read_fd, audio_write_fd = os.pipe() - whip_url = mediamtx.get_whip_url(stream_path) commands = ffmpeg_builder.chain( [ '-use_wallclock_as_timestamps', '1' ], ffmpeg_builder.capture_video(), @@ -44,8 +72,8 @@ def create_whip_encoder(width : int, height : int, stream_fps : int, stream_qual [ '-pix_fmt', 'yuv420p' ], [ '-profile:v', 'baseline' ], [ '-tune', 'zerolatency' ], - [ '-maxrate', '1500k' ], - [ '-bufsize', '3000k' ], + [ '-maxrate', compute_bitrate(width, height) ], + [ '-bufsize', compute_bufsize(width, height) ], [ '-g', str(stream_fps) ], [ '-c:a', 'libopus' ], [ '-f', 'whip' ], @@ -76,5 +104,148 @@ def close_whip_encoder(process : subprocess.Popen[bytes], audio_write_fd : int) process.wait(timeout = 5) +def create_fmp4_encoder(width : int, height : int, stream_fps : int, stream_quality : int) -> Tuple[subprocess.Popen[bytes], int]: + audio_read_fd, audio_write_fd = os.pipe() + commands = ffmpeg_builder.chain( + [ '-use_wallclock_as_timestamps', '1' ], + ffmpeg_builder.capture_video(), + ffmpeg_builder.set_media_resolution(str(width) + 'x' + str(height)), + ffmpeg_builder.set_input('-'), + [ '-use_wallclock_as_timestamps', '1' ], + [ '-f', 's16le', '-ar', str(STREAM_AUDIO_RATE), '-ac', '2', '-i', 'pipe:' + str(audio_read_fd) ], + [ '-thread_queue_size', '512' ], + ffmpeg_builder.set_video_encoder('libx264'), + ffmpeg_builder.set_video_quality('libx264', stream_quality), + ffmpeg_builder.set_video_preset('libx264', 'ultrafast'), + [ '-pix_fmt', 'yuv420p' ], + [ '-profile:v', 'baseline' ], + [ '-tune', 'zerolatency' ], + [ '-maxrate', compute_bitrate(width, height) ], + [ '-bufsize', compute_bufsize(width, height) ], + [ '-g', str(stream_fps) ], + [ '-c:a', 'aac' ], + [ '-b:a', '128k' ], + [ '-f', 'mp4' ], + [ '-movflags', 'frag_keyframe+empty_moov+default_base_moof+frag_every_frame' ], + ffmpeg_builder.set_output('-') + ) + commands = ffmpeg_builder.run(commands) + process = subprocess.Popen(commands, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE, pass_fds = (audio_read_fd,)) + os.close(audio_read_fd) + return process, audio_write_fd + + +def read_fmp4_output(process : subprocess.Popen[bytes], output_chunks : List[bytes], lock : threading.Lock) -> None: + while True: + chunk = process.stdout.read(4096) + + if not chunk: + break + + with lock: + output_chunks.append(chunk) + + +def collect_fmp4_chunks(output_chunks : List[bytes], lock : threading.Lock) -> Optional[bytes]: + with lock: + if output_chunks: + encoded_bytes = b''.join(output_chunks) + output_chunks.clear() + return encoded_bytes + + return None + + +def close_fmp4_encoder(process : subprocess.Popen[bytes], audio_write_fd : int) -> None: + if audio_write_fd > 0: + os.close(audio_write_fd) + process.stdin.close() + process.terminate() + process.wait(timeout = 5) + + +def create_rtp_encoder(width : int, height : int, stream_fps : int, stream_quality : int, rtp_port : int) -> subprocess.Popen[bytes]: + commands = ffmpeg_builder.chain( + [ '-use_wallclock_as_timestamps', '1' ], + ffmpeg_builder.capture_video(), + ffmpeg_builder.set_media_resolution(str(width) + 'x' + str(height)), + ffmpeg_builder.set_input('-'), + ffmpeg_builder.set_video_encoder('libx264'), + ffmpeg_builder.set_video_quality('libx264', stream_quality), + ffmpeg_builder.set_video_preset('libx264', 'ultrafast'), + [ '-pix_fmt', 'yuv420p' ], + [ '-profile:v', 'baseline' ], + [ '-tune', 'zerolatency' ], + [ '-maxrate', compute_bitrate(width, height) ], + [ '-bufsize', compute_bufsize(width, height) ], + [ '-g', str(stream_fps) ], + [ '-an' ], + [ '-f', 'rtp' ], + [ '-payload_type', '96' ], + ffmpeg_builder.set_output('rtp://127.0.0.1:' + str(rtp_port) + '?pkt_size=1200') + ) + commands = ffmpeg_builder.run(commands) + process = subprocess.Popen(commands, stdin = subprocess.PIPE, stderr = subprocess.PIPE) + return process + + +def create_h264_pipe_encoder(width : int, height : int, stream_fps : int, stream_quality : int) -> subprocess.Popen[bytes]: + commands = ffmpeg_builder.chain( + [ '-use_wallclock_as_timestamps', '1' ], + ffmpeg_builder.capture_video(), + ffmpeg_builder.set_media_resolution(str(width) + 'x' + str(height)), + ffmpeg_builder.set_input('-'), + ffmpeg_builder.set_video_encoder('libx264'), + ffmpeg_builder.set_video_quality('libx264', stream_quality), + ffmpeg_builder.set_video_preset('libx264', 'ultrafast'), + [ '-pix_fmt', 'yuv420p' ], + [ '-profile:v', 'baseline' ], + [ '-tune', 'zerolatency' ], + [ '-maxrate', compute_bitrate(width, height) ], + [ '-bufsize', compute_bufsize(width, height) ], + [ '-g', '1' ], + [ '-an' ], + [ '-f', 'h264' ], + ffmpeg_builder.set_output('-') + ) + commands = ffmpeg_builder.run(commands) + process = subprocess.Popen(commands, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE) + return process + + +def create_vp8_pipe_encoder(width : int, height : int, stream_fps : int, stream_quality : int) -> subprocess.Popen[bytes]: + if width > 1280: + height = int(height * 1280 / width) + height = height - (height % 2) + width = 1280 + + commands = ffmpeg_builder.chain( + [ '-use_wallclock_as_timestamps', '1' ], + ffmpeg_builder.capture_video(), + ffmpeg_builder.set_media_resolution(str(width) + 'x' + str(height)), + ffmpeg_builder.set_input('-'), + [ '-c:v', 'libvpx' ], + [ '-deadline', 'realtime' ], + [ '-cpu-used', '15' ], + [ '-pix_fmt', 'yuv420p' ], + [ '-crf', '30' ], + [ '-b:v', '1000k' ], + [ '-maxrate', '1500k' ], + [ '-bufsize', '2000k' ], + [ '-g', '60' ], + [ '-keyint_min', '30' ], + [ '-error-resilient', '1' ], + [ '-lag-in-frames', '0' ], + [ '-rc_lookahead', '0' ], + [ '-threads', '4' ], + [ '-an' ], + [ '-f', 'ivf' ], + ffmpeg_builder.set_output('-') + ) + commands = ffmpeg_builder.run(commands) + process = subprocess.Popen(commands, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE) + return process + + def process_stream_frame(vision_frame : VisionFrame) -> VisionFrame: return process_vision_frame(vision_frame) diff --git a/facefusion/system.py b/facefusion/system.py index 95af94ec..08ce4d89 100644 --- a/facefusion/system.py +++ b/facefusion/system.py @@ -2,7 +2,7 @@ import importlib import shutil from functools import lru_cache from pathlib import Path -from typing import List, Tuple +from typing import List, Sequence, Tuple import psutil @@ -24,7 +24,7 @@ def get_metrics_set() -> Metrics: @lru_cache() -def detect_static_graphic_devices(execution_providers : Tuple[ExecutionProvider, ...]) -> List[GraphicDevice]: +def detect_static_graphic_devices(execution_providers : Sequence[ExecutionProvider]) -> List[GraphicDevice]: return detect_graphic_devices(execution_providers) diff --git a/test_whip_stream.html b/test_whip_stream.html index 5f16327c..3622deb0 100644 --- a/test_whip_stream.html +++ b/test_whip_stream.html @@ -6,10 +6,12 @@ WHIP Stream Monitor