diff --git a/facefusion/apis/endpoints/stream.py b/facefusion/apis/endpoints/stream.py index 80c561ac..7c986ae3 100644 --- a/facefusion/apis/endpoints/stream.py +++ b/facefusion/apis/endpoints/stream.py @@ -12,10 +12,12 @@ from starlette.websockets import WebSocket from facefusion import logger, session_context, session_manager, state_manager from facefusion.apis.api_helper import get_sec_websocket_protocol from facefusion.apis.session_helper import extract_access_token -from facefusion.apis.stream_helper import STREAM_FPS, STREAM_QUALITY, close_whip_encoder, create_whip_encoder, feed_whip_frame, process_stream_frame, start_mediamtx, stop_mediamtx, wait_for_mediamtx +from facefusion.apis.stream_helper import STREAM_FPS, STREAM_QUALITY, close_whip_encoder, create_whip_encoder, feed_whip_audio, feed_whip_frame, process_stream_frame, start_mediamtx, stop_mediamtx, wait_for_mediamtx from facefusion.streamer import process_vision_frame from facefusion.types import VisionFrame +JPEG_MAGIC : bytes = b'\xff\xd8' + async def websocket_stream(websocket : WebSocket) -> None: subprotocol = get_sec_websocket_protocol(websocket.scope) @@ -46,8 +48,9 @@ async def websocket_stream(websocket : WebSocket) -> None: await websocket.close() -def run_whip_pipeline(latest_frame_holder : list, lock : threading.Lock, stop_event : threading.Event) -> None: +def run_whip_pipeline(latest_frame_holder : list, lock : threading.Lock, stop_event : threading.Event, audio_write_fd_holder : list) -> None: encoder = None + audio_write_fd = -1 output_deque : Deque[VisionFrame] = deque() with ThreadPoolExecutor(max_workers = state_manager.get_item('execution_thread_count')) as executor: @@ -71,7 +74,8 @@ def run_whip_pipeline(latest_frame_holder : list, lock : threading.Lock, stop_ev if not encoder: height, width = temp_vision_frame.shape[:2] - encoder = create_whip_encoder(width, height, STREAM_FPS, STREAM_QUALITY) + encoder, audio_write_fd = create_whip_encoder(width, height, STREAM_FPS, STREAM_QUALITY) + audio_write_fd_holder[0] = audio_write_fd logger.info('whip encoder started ' + str(width) + 'x' + str(height), __name__) feed_whip_frame(encoder, temp_vision_frame) @@ -85,7 +89,7 @@ def run_whip_pipeline(latest_frame_holder : list, lock : threading.Lock, stop_ev if stderr_output: logger.error('ffmpeg: ' + stderr_output.decode(), __name__) - close_whip_encoder(encoder) + close_whip_encoder(encoder, audio_write_fd) async def websocket_stream_whip(websocket : WebSocket) -> None: @@ -111,19 +115,28 @@ async def websocket_stream_whip(websocket : WebSocket) -> None: logger.info('mediamtx ready', __name__) latest_frame_holder : list = [None] + audio_write_fd_holder : list = [-1] lock = threading.Lock() stop_event = threading.Event() - worker = threading.Thread(target = run_whip_pipeline, args = (latest_frame_holder, lock, stop_event), daemon = True) + worker = threading.Thread(target = run_whip_pipeline, args = (latest_frame_holder, lock, stop_event, audio_write_fd_holder), daemon = True) worker.start() try: while True: - image_buffer = await websocket.receive_bytes() - frame = cv2.imdecode(numpy.frombuffer(image_buffer, numpy.uint8), cv2.IMREAD_COLOR) + message = await websocket.receive() - if numpy.any(frame): - with lock: - latest_frame_holder[0] = frame + if message.get('bytes'): + data = message.get('bytes') + + if data[:2] == JPEG_MAGIC: + frame = cv2.imdecode(numpy.frombuffer(data, numpy.uint8), cv2.IMREAD_COLOR) + + if numpy.any(frame): + with lock: + latest_frame_holder[0] = frame + + if data[:2] != JPEG_MAGIC and audio_write_fd_holder[0] > 0: + feed_whip_audio(audio_write_fd_holder[0], data) except Exception as exception: logger.error(str(exception), __name__) diff --git a/facefusion/apis/stream_helper.py b/facefusion/apis/stream_helper.py index 821982f2..ddd459e1 100644 --- a/facefusion/apis/stream_helper.py +++ b/facefusion/apis/stream_helper.py @@ -3,7 +3,7 @@ import shutil import subprocess import tempfile import time -from typing import Optional +from typing import Optional, Tuple import cv2 import requests @@ -14,6 +14,7 @@ from facefusion.types import VisionFrame STREAM_FPS : int = 30 STREAM_QUALITY : int = 45 +STREAM_AUDIO_RATE : int = 48000 MEDIAMTX_WHIP_PORT : int = 8889 MEDIAMTX_PATH : str = 'stream' MEDIAMTX_CONFIG : str = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), 'mediamtx.yml') @@ -32,15 +33,17 @@ def create_dtls_certificate() -> None: ], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL) -def create_whip_encoder(width : int, height : int, stream_fps : int, stream_quality : int) -> subprocess.Popen[bytes]: +def create_whip_encoder(width : int, height : int, stream_fps : int, stream_quality : int) -> Tuple[subprocess.Popen[bytes], int]: create_dtls_certificate() + audio_read_fd, audio_write_fd = os.pipe() whip_url = 'http://localhost:' + str(MEDIAMTX_WHIP_PORT) + '/' + MEDIAMTX_PATH + '/whip' commands = ffmpeg_builder.chain( [ '-use_wallclock_as_timestamps', '1' ], ffmpeg_builder.capture_video(), ffmpeg_builder.set_media_resolution(str(width) + 'x' + str(height)), ffmpeg_builder.set_input('-'), - [ '-f', 'lavfi', '-i', 'anullsrc=r=48000:cl=stereo' ], + [ '-use_wallclock_as_timestamps', '1' ], + [ '-f', 's16le', '-ar', str(STREAM_AUDIO_RATE), '-ac', '2', '-i', 'pipe:' + str(audio_read_fd) ], ffmpeg_builder.set_video_encoder('libx264'), ffmpeg_builder.set_video_quality('libx264', stream_quality), ffmpeg_builder.set_video_preset('libx264', 'ultrafast'), @@ -57,7 +60,9 @@ def create_whip_encoder(width : int, height : int, stream_fps : int, stream_qual ffmpeg_builder.set_output(whip_url) ) commands = ffmpeg_builder.run(commands) - return subprocess.Popen(commands, stdin = subprocess.PIPE, stderr = subprocess.PIPE) + process = subprocess.Popen(commands, stdin = subprocess.PIPE, stderr = subprocess.PIPE, pass_fds = (audio_read_fd,)) + os.close(audio_read_fd) + return process, audio_write_fd def start_mediamtx() -> Optional[subprocess.Popen[bytes]]: @@ -105,7 +110,12 @@ def feed_whip_frame(process : subprocess.Popen[bytes], vision_frame : VisionFram process.stdin.flush() -def close_whip_encoder(process : subprocess.Popen[bytes]) -> None: +def feed_whip_audio(audio_write_fd : int, audio_data : bytes) -> None: + os.write(audio_write_fd, audio_data) + + +def close_whip_encoder(process : subprocess.Popen[bytes], audio_write_fd : int) -> None: + os.close(audio_write_fd) process.stdin.close() process.terminate() process.wait(timeout = 5)