From 76c413a2c115c65016ad19be24392e203c3e20a9 Mon Sep 17 00:00:00 2001 From: Henry Ruhs Date: Sat, 9 May 2026 14:50:38 +0200 Subject: [PATCH] Refactor/ffmpeg less stream (#1092) * remove ffmpeg from stream to use opus and vpx, add bunch of todos * fix testing * improve download checkout * fix datachannel download, fix super dirty test clients - setup logic does not belong there * fix testing --- .gitignore | 2 +- REGRESSION_TODO.md | 262 ---------------------------- RTC_TODO.md | 76 -------- STREAM_API_TODO.md | 60 ------- facefusion/apis/endpoints/stream.py | 6 +- facefusion/apis/stream_helper.py | 249 +++++++++++++++++++------- facefusion/ffmpeg.py | 20 --- facefusion/ffmpeg_builder.py | 24 +-- facefusion/libraries/datachannel.py | 29 +-- facefusion/rtc.py | 71 +++++--- facefusion/rtc_store.py | 18 +- facefusion/types.py | 10 -- tests/stream_helper.py | 8 +- tests/test_api_assets.py | 12 +- tests/test_api_capabilities.py | 13 +- tests/test_api_metrics.py | 10 +- tests/test_api_ping.py | 10 +- tests/test_api_session.py | 10 +- tests/test_api_state.py | 25 +-- tests/test_api_stream.py | 42 ++--- tests/test_ffmpeg.py | 24 +-- tests/test_ffmpeg_builder.py | 30 +--- tests/test_rtc.py | 19 +- tests/test_rtc_store.py | 18 ++ tests/test_stream_helper.py | 31 ---- 25 files changed, 371 insertions(+), 708 deletions(-) delete mode 100644 REGRESSION_TODO.md delete mode 100644 RTC_TODO.md delete mode 100644 STREAM_API_TODO.md create mode 100644 tests/test_rtc_store.py delete mode 100644 tests/test_stream_helper.py diff --git a/.gitignore b/.gitignore index 5cee88dc..9fb95792 100644 --- a/.gitignore +++ b/.gitignore @@ -1,8 +1,8 @@ __pycache__ .assets -.binaries .claude .caches .idea .jobs +.libraries .vscode diff --git a/REGRESSION_TODO.md b/REGRESSION_TODO.md deleted file mode 100644 index 309be3d8..00000000 --- a/REGRESSION_TODO.md +++ /dev/null @@ -1,262 +0,0 @@ -# Regression TODO - -## WHEP SDP negotiation blocks the async event loop - -`negotiate_sdp` polls with `time.sleep(0.05)` in a loop for up to 5 seconds. Calling it directly from the async handler freezes all requests and WebSocket traffic. - -Before: - -```python -async def post_stream(request : Request) -> Response: - sdp_answer = rtc_store.add_rtc_viewer(session_id, sdp_offer) -``` - -After: - -```python -async def post_stream(request : Request) -> Response: - event_loop = asyncio.get_running_loop() - sdp_answer = await event_loop.run_in_executor(None, rtc_store.add_rtc_viewer, session_id, sdp_offer) -``` - -## Stream data uses magic byte sniffing instead of channel metadata - -The WebSocket received both vision frames and PCM audio as binary messages. The old code checked for JPEG magic bytes (`\xff\xd8`) to distinguish them. This is wrong because it only supports JPEG, breaks when audio happens to start with `0xff 0xd8`, and fails silently for PNG or other image formats. - -Before: - -```python -JPEG_MAGIC : bytes = b'\xff\xd8' - -if data[:2] == JPEG_MAGIC: - vision_frame = cv2.imdecode(numpy.frombuffer(data, numpy.uint8), cv2.IMREAD_COLOR) - -if data[:2] != JPEG_MAGIC: - rtc_store.send_rtc_audio(session_id, data) -``` - -After: - -```python -if data[0] == ord('v'): - vision_frame = cv2.imdecode(numpy.frombuffer(data[1:], numpy.uint8), cv2.IMREAD_COLOR) - -if data[0] == ord('a'): - rtc_store.send_rtc_audio(session_id, data[1:]) -``` - -The client prepends a single byte (`v` or `a`) to each message. The server reads the first byte to route the payload — format-agnostic and explicit. - -Before (client): - -```javascript -ws.send(buf); -ws.send(pcm.buffer); -``` - -After (client): - -```javascript -var prefixed = new Uint8Array(buf.byteLength + 1); -prefixed[0] = 118; // 'v' -prefixed.set(new Uint8Array(buf), 1); -ws.send(prefixed.buffer); - -var prefixed = new Uint8Array(pcm.buffer.byteLength + 1); -prefixed[0] = 97; // 'a' -prefixed.set(new Uint8Array(pcm.buffer), 1); -ws.send(prefixed.buffer); -``` - -## SDP negotiation polls with sleep loop instead of using callbacks - -`negotiate_sdp` polls `rtcGetLocalDescription` every 50ms for up to 5 seconds. This wastes CPU and adds latency because the answer might be ready after 5ms but we sleep the full 50ms. libdatachannel provides `rtcSetLocalDescriptionCallback` which fires exactly when the SDP answer is ready. - -Before: - -```python -def negotiate_sdp(peer_connection : int, sdp_offer : str) -> Optional[str]: - rtc_library.rtcSetRemoteDescription(peer_connection, sdp_offer.encode('utf-8'), b'offer') - buffer_size = 16384 - buffer_string = ctypes.create_string_buffer(buffer_size) - wait_limit = time.monotonic() + 5 - - while time.monotonic() < wait_limit: - if rtc_library.rtcGetLocalDescription(peer_connection, buffer_string, buffer_size) > 0: - return buffer_string.value.decode() - time.sleep(0.05) - - return None -``` - -After: - -```python -def negotiate_sdp(peer_connection : int, sdp_offer : str) -> Optional[str]: - rtc_library.rtcSetRemoteDescription(peer_connection, sdp_offer.encode('utf-8'), b'offer') - result = threading.Event() - sdp_holder = [None] - - @ctypes.CFUNCTYPE(None, ctypes.c_int, ctypes.c_char_p, ctypes.c_int, ctypes.c_void_p) - def on_description(pc, sdp, sdp_type, user_ptr): - sdp_holder[0] = sdp.decode() - result.set() - - rtc_library.rtcSetLocalDescriptionCallback(peer_connection, on_description) - result.wait(timeout = 5) - return sdp_holder[0] -``` - -Uses a `threading.Event` to block until the callback fires — no polling, no wasted sleep cycles, instant response. - -## No connection state tracking per peer - -`send_to_peers` calls `rtcIsOpen` on the video track for every frame, every peer. There is no way to detect when a peer disconnects or fails — dead peers stay in the list until the stream is destroyed. The poc branch used `rtcSetStateChangeCallback` to track a `connected` flag per viewer and auto-clean dead connections. - -Before: - -```python -def send_to_peers(peers, data): - for rtc_peer in peers: - video_track_id = rtc_peer.get('video_track') - - if video_track_id and rtc_library.rtcIsOpen(video_track_id): - rtc_library.rtcSetTrackRtpTimestamp(video_track_id, timestamp) - rtc_library.rtcSendMessage(video_track_id, data_buffer, data_total) -``` - -After: - -```python -@ctypes.CFUNCTYPE(None, ctypes.c_int, ctypes.c_int, ctypes.c_void_p) -def on_state_change(pc, state, user_ptr): - if state == 4: # RTC_FAILED - mark_peer_disconnected(pc) - if state == 5: # RTC_CLOSED - mark_peer_disconnected(pc) - -def handle_whep_offer(peers, sdp_offer): - peer_connection = create_peer_connection() - rtc_library.rtcSetStateChangeCallback(peer_connection, on_state_change) -``` - -Avoids calling `rtcIsOpen` on every frame and allows removing dead peers immediately when the connection drops. - -## SDP line endings break Firefox - -SDP media descriptions used `os.linesep` which produces `\n` on Linux. RFC 4566 requires `\r\n` — Firefox rejected the SDP entirely while Chrome was lenient. - -Before: - -```python -media_description = b'm=video 9 UDP/TLS/RTP/SAVPF 96' + os.linesep.encode() + b'a=rtpmap:96 VP8/90000' + os.linesep.encode() -``` - -After: - -```python -media_description = ('m=video 9 UDP/TLS/RTP/SAVPF 96\r\na=rtpmap:96 VP8/90000\r\na=sendonly\r\na=mid:0\r\na=rtcp-mux\r\n').encode() -``` - -## SDP payload types are hardcoded instead of negotiated - -Payload types were hardcoded (VP8 PT 96, Opus PT 111). Chrome happens to use these, but Firefox offers VP8 PT 120 and Opus PT 109. The server answered with payload types Firefox never offered, so Firefox couldn't decode the RTP packets. - -Before: - -```python -def handle_whep_offer(peers, sdp_offer): - peer_connection = create_peer_connection() - audio_track = add_audio_track(peer_connection) - video_track = add_video_track(peer_connection) -``` - -After: - -```python -def extract_payload_type(sdp_offer, media_type, codec_name, fallback): - current_media = None - - for line in sdp_offer.splitlines(): - if line.startswith('m=' + media_type): - current_media = media_type - if line.startswith('m=') and not line.startswith('m=' + media_type): - current_media = None - if current_media == media_type and line.startswith('a=rtpmap:') and codec_name in line: - return int(line.split(':')[1].split(' ')[0]) - - return fallback - -def handle_whep_offer(peers, sdp_offer): - video_payload_type = extract_payload_type(sdp_offer, 'video', 'VP8/90000', 96) - audio_payload_type = extract_payload_type(sdp_offer, 'audio', 'opus/48000', 111) - peer_connection = create_peer_connection() - audio_track = add_audio_track(peer_connection, payload_type = audio_payload_type) - video_track = add_video_track(peer_connection, payload_type = video_payload_type) -``` - -## Worker threads bypass API context and skip inference - -`process_vision_frame` runs in a `ThreadPoolExecutor` where `detect_app_context()` walks the call stack and finds no `facefusion/apis/` frame — so it reads from the empty `cli` state and returns frames without face swap. - -Before: - -```python -future = executor.submit(process_vision_frame, capture_frame) -``` - -After: - -```python -def process_stream_frame(capture_frame : VisionFrame) -> VisionFrame: - return process_vision_frame(capture_frame) - -future = executor.submit(process_stream_frame, capture_frame) -``` - -The wrapper lives in `facefusion/apis/stream_helper.py`, so `detect_app_context()` finds it on the call stack and resolves to `api`. - -## Frame processing lacks a deque for fluent streaming - -Without a deque, processed frames are sent one at a time — if inference is slower than the capture rate, frames queue up in futures and the output stutters. A `deque` buffers completed frames so the encoder can drain them smoothly while inference continues in parallel. - -Before: - -```python -while not stop_event.is_set(): - capture_frame = latest_frame_holder[0] - output_vision_frame = process_stream_frame(capture_frame) - encoder.stdin.write(output_vision_frame.tobytes()) -``` - -After: - -```python -output_deque : deque[VisionFrame] = deque() - -with ThreadPoolExecutor(max_workers = state_manager.get_item('execution_thread_count')) as executor: - futures = [] - - while not stop_event.is_set(): - if capture_frame is not None and len(futures) < 4: - future = executor.submit(process_stream_frame, capture_frame) - futures.append(future) - - for future_done in [ future for future in futures if future.done() ]: - output_deque.append(future_done.result()) - futures.remove(future_done) - - while output_deque: - temp_vision_frame = output_deque.popleft() - encoder.stdin.write(temp_vision_frame.tobytes()) -``` - -Consider reusing `multi_process_capture` from `streamer.py` — it has the same ThreadPoolExecutor + deque pattern. If it accepted a frame iterator instead of `cv2.VideoCapture`, both pipelines could share the same processing loop. - -## Binary files belong in `.binaries/` at project root - -The libdatachannel shared library downloads to `.binaries/` in the project root. This is not an asset — binary dependencies are platform-specific build artifacts and must stay separate from `.assets/`. - -```python -'path': resolve_relative_path('../.binaries/' + binary_name) -``` diff --git a/RTC_TODO.md b/RTC_TODO.md deleted file mode 100644 index 835237ba..00000000 --- a/RTC_TODO.md +++ /dev/null @@ -1,76 +0,0 @@ -# RTC TODO - -## Approach - -1. vibe code mass testing to get broad coverage fast -2. vibe code refactoring (naming, dead code, restructure) -3. hand craft production code (libdatachannel, peer management, SDP) -4. hand craft testing — thin it out to essentials only - -## Unit Tests - -| Function | Rating | Assertions | -|---|---|---| -| `create_peer_connection` | essential | returns valid peer connection id | -| `add_audio_track` | essential | returns valid track id | -| `add_video_track` | essential | returns valid track id | -| `negotiate_sdp` | essential | returns sdp answer from valid offer, returns None on timeout | -| `delete_peers` | essential | clears peer list | -| `create_rtc_stream` | essential | initializes empty peer list | -| `destroy_rtc_stream` | essential | cleans up peers, handles missing session | -| `add_rtc_viewer` | nice to have | returns None for unknown session | -| `resolve_binary_file` | skip | covered implicitly by `create_static_rtc_library` | -| `handle_whep_offer` | skip | thin wrapper, covered by integration tests | -| `send_to_peers` | skip | needs open tracks, covered by integration tests | -| `send_rtc_frame` | skip | thin delegation to `send_to_peers` | - -## Integration Tests - -- [ ] test full SDP offer/answer roundtrip between two peer connections -- [ ] test `send_to_peers` delivers frame data to a connected peer -- [ ] test peer cleanup after `delete_peers` prevents further sends -- [ ] test `add_rtc_viewer` followed by `send_rtc_frame` end-to-end -- [ ] test multiple viewers receive frames from same stream -- [ ] test viewer disconnect does not break remaining peers - -## Dead Code - -- [ ] remove `is_peer_connected` from `rtc.py`, never called -- [ ] remove `pre_check` from `rtc.py`, never called -- [ ] remove `get_rtc_stream` from `rtc_store.py`, never called -- [ ] remove `RtcOfferSet` from `types.py`, never used - -## Naming - -- [ ] rename `rtc_bindings.py` to `libdatachannel.py` — owns C library loading, struct definitions, ctypes registration -- [ ] rename `RTC_CONFIGURATION` to a proper class definition -- [ ] rename `RTC_PACKETIZER_INIT` to a proper class definition -- [ ] rename `init_ctypes` to something more specific like `register_argtypes` - -## Unused Library Config - -- [ ] `create_peer_connection` exposes 14 params but production only uses `disable_auto_negotiation`, `enable_ice_udp_mux`, `force_media_transport` — reduce to what is needed -- [ ] `RTC_PACKETIZER_INIT` defines `nalSeparator`, `obuPacketization`, `playoutDelayId`, `playoutDelayMin`, `playoutDelayMax`, `sequenceNumber`, `timestamp` — none are set outside the struct definition, they are H264/AV1 specific and unused for VP8/Opus -- [ ] `rtcSetLocalDescription` is used in tests but not registered in `rtc_bindings.py` - -## Refactor - -- [ ] extract shared media description builder from `rtc.py` and `tests/stream_helper.py` (see TODO in `tests/stream_helper.py`) -- [ ] replace `type()` calls for ctypes structs in `rtc_bindings.py` with proper class definitions -- [ ] move `resolve_binary_file`, `create_static_download_set`, `create_static_rtc_library` from `rtc.py` into `libdatachannel.py` — library init belongs with the bindings, `rtc.py` just consumes it -- [ ] move `rtc_store.py` state into a proper store pattern consistent with other `*_store.py` files -- [ ] replace `time.sleep` polling loops in `negotiate_sdp` and `create_sdp_offer` (test helper) with `rtcSetGatheringStateChangeCallback` — signal a `threading.Event` on ICE gathering complete, then `event.wait(timeout=5)` instead of spinning -- [ ] `create_static_download_set` has hardcoded linux URLs with a TODO to use dynamic `binary_name` - -## Violations - -- `rtc.py:25` — comment on `create_static_download_set` (no comments) -- `rtc.py:34` — comment on hash url (no comments) -- `rtc.py:42` — comment on source url (no comments) -- `rtc.py:205` — `send_to_peers` returns None explicitly on a void function (redundant) -- `rtc_bindings.py:3,23` — `type()` to create structs is a class workaround (no classes, but these should be plain structs) -- `tests/stream_helper.py:45` — `sdp` temp variable before return (no need for temporary variable in simple cases) - -## Security - -- `rtc.py:157` — `negotiate_sdp` passes unsanitized `sdp_offer` string directly to C library, no SDP format validation before hitting native code diff --git a/STREAM_API_TODO.md b/STREAM_API_TODO.md deleted file mode 100644 index 141531b5..00000000 --- a/STREAM_API_TODO.md +++ /dev/null @@ -1,60 +0,0 @@ -# Stream API TODO - -## Approach - -1. vibe code mass testing to get broad coverage fast -2. vibe code refactoring (naming, boilerplate extraction, validation) -3. hand craft production code (stream pipeline, encoder loop, IVF parsing) -4. hand craft testing — thin it out to essentials only - -## Unit Tests - -| Function | Rating | Assertions | -|---|---|---| -| `get_websocket_stream_mode` | essential | returns None for missing header, returns None for unknown protocol | -| `forward_rtc_frames` | essential | reads IVF header and forwards frame data, handles broken pipe | -| `run_encode_loop` | nice to have | drains deque and closes encoder | -| `receive_vision_frames` | skip | async generator, covered by integration tests | -| `submit_encoder_frame` | skip | thin glue between cv2 and subprocess stdin | -| `websocket_stream` | skip | routing only, covered by integration tests | -| `post_stream` | skip | covered by integration tests | - -## Integration Tests - -- [ ] test stream without session — expect rejection -- [ ] test stream with expired or invalid token -- [ ] test image stream without source selected -- [ ] test video stream without source selected -- [ ] test WHEP offer without active websocket stream -- [ ] test WHEP offer with malformed SDP body -- [ ] test WHEP offer with wrong content type -- [ ] test multiple WHEP viewers on same stream -- [ ] test websocket disconnect mid-stream triggers cleanup - -## Naming - -- [ ] rename `test_get_stream_mode` to `test_get_websocket_stream_mode` in `test_stream_helper.py` — does not match function name -- [ ] rename `make_scope` in `test_stream_helper.py` to `make_websocket_scope` — more descriptive -- [ ] `stream_helper.py` mixes pure helpers (`calculate_bitrate`) with async handlers (`handle_image_stream`) — consider splitting - -## Dead Code - -- [ ] `read_pipe_buffer` has a test but the test is disconnected from how it is actually used — the test reads from a closed pipe, production reads from a live subprocess stdout - -## Refactor - -- [ ] `handle_image_stream` and `handle_video_stream` share session setup boilerplate (subprotocol, access_token, session_id, source_paths, websocket accept) — extract common setup -- [ ] `forward_rtc_frames` assumes IVF container format with hardcoded header size 32 and frame header size 12 — document or make configurable -- [ ] `post_stream` does not validate content type is `application/sdp` before parsing body -- [ ] `calculate_bitrate` has a TODO about improving the calculation -- [ ] `handle_video_stream` has a hardcoded fallback `output_video_fps or 30` with a TODO to resolve from target video fps - -## Violations - -- `stream_helper.py:24` — comment on `calculate_bitrate` (no comments) -- `stream_helper.py:143` — comment on `output_video_fps` fallback (no comments) -- `test_stream_helper.py:30` — `test_get_stream_mode` does not match function name `get_websocket_stream_mode` (naming convention) - -## Security - -- `endpoints/stream.py:27` — `request.body().decode()` is not sanitized, raw user input forwarded to C library via RTC layer diff --git a/facefusion/apis/endpoints/stream.py b/facefusion/apis/endpoints/stream.py index 667cef50..d4b2e95d 100644 --- a/facefusion/apis/endpoints/stream.py +++ b/facefusion/apis/endpoints/stream.py @@ -8,6 +8,7 @@ from facefusion.apis.session_helper import extract_access_token from facefusion.apis.stream_helper import handle_image_stream, handle_video_stream +# TODO: reject websocket with invalid or missing stream mode async def websocket_stream(websocket : WebSocket) -> None: stream_mode = websocket.query_params.get('mode') @@ -18,14 +19,15 @@ async def websocket_stream(websocket : WebSocket) -> None: await handle_video_stream(websocket) +# TODO: validate content type is application/sdp, sanitize sdp input async def post_stream(request : Request) -> Response: access_token = extract_access_token(request.scope) session_id = session_manager.find_session_id(access_token) session_context.set_session_id(session_id) if session_id: - sdp_offer = (await request.body()).decode() - sdp_answer = rtc_store.add_rtc_viewer(session_id, sdp_offer) + sdp_offer = await request.body() + sdp_answer = rtc_store.add_rtc_viewer(session_id, sdp_offer.decode()) if sdp_answer: return Response(sdp_answer, status_code = HTTP_201_CREATED, media_type = 'application/sdp') diff --git a/facefusion/apis/stream_helper.py b/facefusion/apis/stream_helper.py index a497700e..b8f719f8 100644 --- a/facefusion/apis/stream_helper.py +++ b/facefusion/apis/stream_helper.py @@ -1,10 +1,10 @@ import asyncio -import math -import os -import subprocess +import ctypes +import multiprocessing +import struct from collections import deque from collections.abc import AsyncIterator -from typing import Optional +from typing import Optional, Tuple import cv2 import numpy @@ -13,35 +13,21 @@ from starlette.websockets import WebSocket, WebSocketState from facefusion import rtc_store, session_context, session_manager, state_manager from facefusion.apis.api_helper import get_sec_websocket_protocol from facefusion.apis.session_helper import extract_access_token -from facefusion.common_helper import is_linux, is_macos -from facefusion.ffmpeg import spawn_stream +from facefusion.libraries import opus as opus_module, vpx as vpx_module from facefusion.streamer import process_vision_frame from facefusion.types import Resolution, SessionId, VisionFrame -def calculate_bitrate(resolution : Resolution) -> int: # TODO : improve the bitrate calculation - pixel_total = resolution[0] * resolution[1] - bitrate_factor = 3500 / math.sqrt(1920 * 1080) - return max(400, round(math.sqrt(pixel_total) * bitrate_factor)) +async def receive_stream_frames(websocket : WebSocket) -> AsyncIterator[Tuple[int, bytes]]: + websocket_event = await websocket.receive() + while websocket_event.get('type') == 'websocket.receive': + frame_buffer = websocket_event.get('bytes') or b'' -def calculate_buffer_size(resolution : Resolution) -> int: - return calculate_bitrate(resolution) * 2 + if len(frame_buffer) > 1: + yield frame_buffer[0], frame_buffer[1:] - -def read_pipe_buffer(pipe_handle : int, size : int) -> Optional[bytes]: - byte_buffer = bytearray() - frame_data = os.read(pipe_handle, size - len(byte_buffer)) - - while frame_data: - byte_buffer += frame_data - - if len(byte_buffer) == size: - return bytes(byte_buffer) - - frame_data = os.read(pipe_handle, size - len(byte_buffer)) - - return None + websocket_event = await websocket.receive() async def receive_vision_frames(websocket : WebSocket) -> AsyncIterator[VisionFrame]: @@ -57,41 +43,165 @@ async def receive_vision_frames(websocket : WebSocket) -> AsyncIterator[VisionFr websocket_event = await websocket.receive() -def forward_rtc_frames(encoder : subprocess.Popen[bytes], session_id : SessionId) -> None: - pipe_handle = encoder.stdout.fileno() +# TODO: move to facefusion/vpx_encoder.py +def create_vpx_encoder(width : int, height : int, bitrate : int) -> Optional[ctypes.Array[ctypes.c_char]]: + vpx_library = vpx_module.create_static_library() - if is_linux() or is_macos(): - os.set_blocking(pipe_handle, True) + if vpx_library: + vp8_iface = ctypes.c_void_p.in_dll(vpx_library, 'vpx_codec_vp8_cx_algo') + config_buffer = ctypes.create_string_buffer(4096) - header = read_pipe_buffer(pipe_handle, 32) + if vpx_library.vpx_codec_enc_config_default(ctypes.byref(vp8_iface), config_buffer, 0) == 0: + thread_count = min(multiprocessing.cpu_count(), 8) + struct.pack_into('I', config_buffer, 4, thread_count) + struct.pack_into('I', config_buffer, 12, width) + struct.pack_into('I', config_buffer, 16, height) + struct.pack_into('I', config_buffer, 72, 2) + struct.pack_into('I', config_buffer, 112, bitrate) + struct.pack_into('I', config_buffer, 116, 2) + struct.pack_into('I', config_buffer, 120, 50) + struct.pack_into('I', config_buffer, 124, 50) + struct.pack_into('I', config_buffer, 128, 50) + context_buffer = ctypes.create_string_buffer(512) - if header: - frame_header = read_pipe_buffer(pipe_handle, 12) + if vpx_library.vpx_codec_enc_init_ver(context_buffer, ctypes.byref(vp8_iface), config_buffer, 0, 37) == 0: + vpx_library.vpx_codec_control_(context_buffer, 13, ctypes.c_int(16)) + vpx_library.vpx_codec_control_(context_buffer, 12, ctypes.c_int(3)) + vpx_library.vpx_codec_control_(context_buffer, 27, ctypes.c_int(10)) + return context_buffer - while frame_header: - frame_size = int.from_bytes(frame_header[0:4], 'little') - frame_data = read_pipe_buffer(pipe_handle, frame_size) - - if frame_data: - rtc_store.send_rtc_frame(session_id, frame_data) - - frame_header = read_pipe_buffer(pipe_handle, 12) + return None -def submit_encoder_frame(encoder : subprocess.Popen[bytes], vision_frame_deque : deque[VisionFrame]) -> None: - output_vision_frame = process_vision_frame(vision_frame_deque[-1]) - encoder.stdin.write(cv2.cvtColor(output_vision_frame, cv2.COLOR_BGR2RGB).tobytes()) - encoder.stdin.flush() +# TODO: move to facefusion/vpx_encoder.py +def encode_vpx(codec_context : ctypes.Array[ctypes.c_char], yuv_buffer : bytes, width : int, height : int, pts : int, flags : int) -> bytes: + vpx_library = vpx_module.create_static_library() + frame_buffer = b'' + + if vpx_library: + image_buffer = ctypes.create_string_buffer(512) + yuv_string_buffer = ctypes.create_string_buffer(yuv_buffer) + + if vpx_library.vpx_img_wrap(image_buffer, 0x102, width, height, 1, yuv_string_buffer): + if vpx_library.vpx_codec_encode(codec_context, image_buffer, pts, 1, flags, 1) == 0: + iterator = ctypes.c_void_p(0) + packet = vpx_library.vpx_codec_get_cx_data(codec_context, ctypes.byref(iterator)) + + while packet: + if ctypes.c_int.from_address(packet).value == 0: + buffer_pointer = ctypes.c_void_p.from_address(packet + 8).value + buffer_size = ctypes.c_size_t.from_address(packet + 16).value + frame_buffer += ctypes.string_at(buffer_pointer, buffer_size) + + packet = vpx_library.vpx_codec_get_cx_data(codec_context, ctypes.byref(iterator)) + + return frame_buffer -def run_encode_loop(encoder : subprocess.Popen[bytes], vision_frame_deque : deque[VisionFrame]) -> None: +# TODO: move to facefusion/vpx_encoder.py +def destroy_vpx_encoder(codec_context : ctypes.Array[ctypes.c_char]) -> None: + vpx_library = vpx_module.create_static_library() + + if vpx_library: + vpx_library.vpx_codec_destroy(codec_context) + + +# TODO: move to facefusion/opus_encoder.py +def create_opus_encoder(sample_rate : int, channels : int) -> Optional[ctypes.c_void_p]: + opus_library = opus_module.create_static_library() + + if opus_library: + error = ctypes.c_int(0) + encoder = opus_library.opus_encoder_create(sample_rate, channels, 2049, ctypes.byref(error)) + + if error.value == 0: + opus_library.opus_encoder_ctl(encoder, 4002, 64000) + return encoder + + return None + + +# TODO: move to facefusion/opus_encoder.py +def encode_opus(opus_encoder : ctypes.c_void_p, pcm_pointer : ctypes.c_void_p, frame_size : int) -> bytes: + opus_library = opus_module.create_static_library() + audio_buffer = b'' + + if opus_library: + output_buffer = ctypes.create_string_buffer(4000) + encoded_length = opus_library.opus_encode_float(opus_encoder, pcm_pointer, frame_size, output_buffer, 4000) + + if encoded_length > 0: + audio_buffer = output_buffer.raw[:encoded_length] + + return audio_buffer + + +# TODO: move to facefusion/opus_encoder.py +def destroy_opus_encoder(opus_encoder : ctypes.c_void_p) -> None: + opus_library = opus_module.create_static_library() + + if opus_library: + opus_library.opus_encoder_destroy(opus_encoder) + + +# TODO: move to facefusion/vpx_encoder.py, throttle loop to avoid spinning on same frame +def run_video_encode_loop(vision_frame_deque : deque[VisionFrame], session_id : SessionId, initial_resolution : Resolution, keyframe_interval : int) -> None: + codec_context = create_vpx_encoder(initial_resolution[0], initial_resolution[1], 4500) + current_resolution = initial_resolution + pts = 0 + while vision_frame_deque: - submit_encoder_frame(encoder, vision_frame_deque) + vision_frame = vision_frame_deque[-1] + output_frame = process_vision_frame(vision_frame) + height, width = output_frame.shape[:2] + frame_resolution = (width, height) - encoder.stdin.close() - encoder.wait() + if frame_resolution[0] != current_resolution[0] or frame_resolution[1] != current_resolution[1]: + if codec_context: + destroy_vpx_encoder(codec_context) + + current_resolution = frame_resolution + codec_context = create_vpx_encoder(current_resolution[0], current_resolution[1], 4500) + pts = 0 + + if codec_context: + yuv_frame = cv2.cvtColor(output_frame, cv2.COLOR_BGR2YUV_I420) + vpx_flags = 0 + + if pts % keyframe_interval == 0: + vpx_flags = 1 + + frame_buffer = encode_vpx(codec_context, yuv_frame.tobytes(), width, height, pts, vpx_flags) + + if frame_buffer: + rtc_store.send_rtc_video(session_id, frame_buffer) + + pts += 1 + + if codec_context: + destroy_vpx_encoder(codec_context) +# TODO: move to facefusion/opus_encoder.py +def encode_audio_chunk(opus_encoder : ctypes.c_void_p, session_id : SessionId, pcm_data : numpy.ndarray, audio_remainder : numpy.ndarray, audio_pts : int) -> Tuple[numpy.ndarray, int]: + pcm_buffer = numpy.concatenate([audio_remainder, pcm_data]) + frame_samples = 1920 + + while len(pcm_buffer) >= frame_samples: + chunk = pcm_buffer[:frame_samples] + pcm_buffer = pcm_buffer[frame_samples:] + pcm_pointer = ctypes.cast(chunk.ctypes.data, ctypes.c_void_p) + audio_buffer = encode_opus(opus_encoder, pcm_pointer, 960) + + if audio_buffer: + rtc_store.send_rtc_audio(session_id, audio_buffer, audio_pts) + + audio_pts += 960 + + return pcm_buffer, audio_pts + + +# TODO: extract shared session setup from handle_image_stream and handle_video_stream, guard session_id like handle_video_stream async def handle_image_stream(websocket : WebSocket) -> None: subprotocol = get_sec_websocket_protocol(websocket.scope) access_token = extract_access_token(websocket.scope) @@ -125,30 +235,45 @@ async def handle_video_stream(websocket : WebSocket) -> None: await websocket.accept(subprotocol = subprotocol) if session_id and source_paths: - output_video_fps = int(state_manager.get_item('output_video_fps') or 30) # TODO: resolve from target video fps - vision_frames = receive_vision_frames(websocket) - vision_frame = await anext(vision_frames, None) + stream_frames = receive_stream_frames(websocket) + first_frame_type, first_frame_buffer = await anext(stream_frames, (0, b'')) + first_vision_frame = None - if numpy.any(vision_frame): - resolution = (vision_frame.shape[1], vision_frame.shape[0]) - encoder = spawn_stream(resolution, output_video_fps, calculate_bitrate(resolution), calculate_buffer_size(resolution)) + if first_frame_type == 1: + first_vision_frame = cv2.imdecode(numpy.frombuffer(first_frame_buffer, numpy.uint8), cv2.IMREAD_COLOR) + if numpy.any(first_vision_frame): + resolution : Resolution = (first_vision_frame.shape[1], first_vision_frame.shape[0]) + keyframe_interval = int(state_manager.get_item('output_video_fps') or 30) # TODO: remove hardcoded via stream_video_fps vision_frame_deque : deque[VisionFrame] = deque(maxlen = 1) + opus_encoder = create_opus_encoder(48000, 2) # TODO: guard against opus_encoder being None + audio_remainder = numpy.array([], dtype = numpy.float32) + audio_pts = 0 - vision_frame_deque.append(vision_frame) + vision_frame_deque.append(first_vision_frame) rtc_store.create_rtc_stream(session_id) event_loop = asyncio.get_running_loop() - await event_loop.run_in_executor(None, submit_encoder_frame, encoder, vision_frame_deque) + video_encode_task = event_loop.run_in_executor(None, run_video_encode_loop, vision_frame_deque, session_id, resolution, keyframe_interval) await websocket.send_text('ready') - encode_task = event_loop.run_in_executor(None, run_encode_loop, encoder, vision_frame_deque) - rtc_task = event_loop.run_in_executor(None, forward_rtc_frames, encoder, session_id) - async for vision_frame in vision_frames: - vision_frame_deque.append(vision_frame) + async for frame_type, frame_buffer in stream_frames: + if frame_type == 1: + vision_frame = cv2.imdecode(numpy.frombuffer(frame_buffer, numpy.uint8), cv2.IMREAD_COLOR) + + if numpy.any(vision_frame): + vision_frame_deque.append(vision_frame) + + if frame_type == 2: + pcm_data = numpy.frombuffer(frame_buffer, dtype = numpy.float32) + audio_remainder, audio_pts = encode_audio_chunk(opus_encoder, session_id, pcm_data, audio_remainder, audio_pts) vision_frame_deque.clear() - await asyncio.gather(encode_task, rtc_task) + await video_encode_task + + if opus_encoder: + destroy_opus_encoder(opus_encoder) + rtc_store.destroy_rtc_stream(session_id) if websocket.client_state == WebSocketState.CONNECTED: diff --git a/facefusion/ffmpeg.py b/facefusion/ffmpeg.py index 03b81253..e0769d65 100644 --- a/facefusion/ffmpeg.py +++ b/facefusion/ffmpeg.py @@ -355,26 +355,6 @@ def sanitize_video(file_content : bytes, asset_path : str, security_strategy : A return run_ffmpeg_with_pipe(commands, file_content).returncode == 0 -def spawn_stream(resolution : Resolution, stream_fps : int, stream_bitrate : int, stream_bufsize : int) -> subprocess.Popen[bytes]: - commands = ffmpeg_builder.chain( - ffmpeg_builder.use_wallclock(), - ffmpeg_builder.capture_video(), - ffmpeg_builder.set_media_resolution(pack_resolution(resolution)), - ffmpeg_builder.set_input_fps(stream_fps), - ffmpeg_builder.set_input('-'), - ffmpeg_builder.set_video_encoder('libvpx'), # TODO: replace hardcoded value - ffmpeg_builder.set_encoder_deadline('realtime'), - ffmpeg_builder.enforce_pixel_format('rgb24'), # TODO: replace hardcoded value - ffmpeg_builder.set_stream_quality(stream_bitrate), - ffmpeg_builder.set_video_bufsize(stream_bufsize), - ffmpeg_builder.set_stream_keyframe(stream_fps), - ffmpeg_builder.set_muxer('ivf'), # TODO: replace hardcoded value - ffmpeg_builder.set_output('-') - ) - commands = ffmpeg_builder.run(commands) - return subprocess.Popen(commands, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE) - - def fix_audio_encoder(video_format : VideoFormat, audio_encoder : AudioEncoder) -> AudioEncoder: if video_format == 'avi' and audio_encoder == 'libopus': return 'aac' diff --git a/facefusion/ffmpeg_builder.py b/facefusion/ffmpeg_builder.py index a8574ad2..17471d9e 100644 --- a/facefusion/ffmpeg_builder.py +++ b/facefusion/ffmpeg_builder.py @@ -5,7 +5,7 @@ from typing import List, Optional import numpy from facefusion.filesystem import get_file_format -from facefusion.types import AudioEncoder, Command, CommandSet, Duration, EncoderDeadline, Fps, Muxer, SampleRate, StreamMode, VideoEncoder, VideoPreset +from facefusion.types import AudioEncoder, Command, CommandSet, Duration, Fps, SampleRate, StreamMode, VideoEncoder, VideoPreset def run(commands : List[Command]) -> List[Command]: @@ -291,25 +291,3 @@ def map_qsv_preset(video_preset : VideoPreset) -> Optional[str]: return None -def use_wallclock() -> List[Command]: - return [ '-use_wallclock_as_timestamps', '1' ] - - -def set_stream_keyframe(interval : int) -> List[Command]: - return [ '-g', str(interval), '-keyint_min', str(interval) ] - - -def set_muxer(muxer : Muxer) -> List[Command]: - return [ '-f', muxer ] - - -def set_video_bufsize(video_bufsize : int) -> List[Command]: - return [ '-bufsize', str(video_bufsize) + 'k' ] - - -def set_encoder_deadline(deadline : EncoderDeadline) -> List[Command]: - return [ '-deadline', deadline ] - - -def set_lag_in_frames(count : int) -> List[Command]: - return [ '-lag-in-frames', str(count) ] diff --git a/facefusion/libraries/datachannel.py b/facefusion/libraries/datachannel.py index bd599579..be580833 100644 --- a/facefusion/libraries/datachannel.py +++ b/facefusion/libraries/datachannel.py @@ -1,25 +1,27 @@ import ctypes +import os from functools import lru_cache -from typing import Dict, Optional +from typing import Dict, Optional, Tuple from facefusion.common_helper import is_linux, is_macos, is_windows +from facefusion.download import conditional_download_hashes, conditional_download_sources, resolve_download_url_by_provider from facefusion.filesystem import resolve_relative_path from facefusion.types import DownloadSet -def resolve_library_file() -> Optional[str]: +def resolve_library_paths() -> Optional[Tuple[str, str]]: if is_linux(): - return 'linux-x64-openssl-h264-vp8-av1-opus-libdatachannel-0.24.1.so' + return 'linux/libdatachannel.hash', 'linux/libdatachannel.so' if is_macos(): - return 'macos-universal-openssl-h264-vp8-av1-opus-libdatachannel-0.24.1.dylib' + return 'macos/libdatachannel.hash', 'macos/libdatachannel.dylib' if is_windows(): - return 'windows-x64-openssl-h264-vp8-av1-opus-datachannel-0.24.1.dll' + return 'windows/datachannel.hash', 'windows/datachannel.dll' return None @lru_cache def create_static_library_set() -> Dict[str, DownloadSet]: - library_file = resolve_library_file() + library_hash_path, library_source_path = resolve_library_paths() return\ { @@ -27,21 +29,28 @@ def create_static_library_set() -> Dict[str, DownloadSet]: { 'datachannel': { - 'url': 'https://huggingface.co/bluefoxcreation/libdatachannel/resolve/main/linux-x64-openssl-h264-vp8-av1-opus-libdatachannel-0.24.1.so.hash', - 'path': resolve_relative_path('../.binaries/' + library_file + '.hash') + 'url': resolve_download_url_by_provider('huggingface', 'libraries-4.0.0', library_hash_path), + 'path': resolve_relative_path('../.libraries/' + os.path.basename(library_hash_path)) } }, 'sources': { 'datachannel': { - 'url': 'https://huggingface.co/bluefoxcreation/libdatachannel/resolve/main/linux-x64-openssl-h264-vp8-av1-opus-libdatachannel-0.24.1.so', - 'path': resolve_relative_path('../.binaries/' + library_file) + 'url': resolve_download_url_by_provider('huggingface', 'libraries-4.0.0', library_source_path), + 'path': resolve_relative_path('../.libraries/' + os.path.basename(library_source_path)) } } } +def pre_check() -> bool: + library_hash_set = create_static_library_set().get('hashes') + library_source_set = create_static_library_set().get('sources') + + return conditional_download_hashes(library_hash_set) and conditional_download_sources(library_source_set) + + @lru_cache def create_static_library() -> Optional[ctypes.CDLL]: library_path = create_static_library_set().get('sources').get('datachannel').get('path') diff --git a/facefusion/rtc.py b/facefusion/rtc.py index 505e97c0..300f95c7 100644 --- a/facefusion/rtc.py +++ b/facefusion/rtc.py @@ -1,21 +1,13 @@ import ctypes import threading import time -from typing import List, Optional +from typing import Dict, List, Optional -from facefusion.download import conditional_download_hashes, conditional_download_sources from facefusion.libraries import datachannel as datachannel_module from facefusion.types import MediaDirection, PeerConnection, RtcAudioTrack, RtcPeer, RtcVideoTrack, SdpAnswer, SdpOffer -def pre_check() -> bool: - download_set = datachannel_module.create_static_library_set() - - if not conditional_download_hashes(download_set.get('hashes')): - return False - return conditional_download_sources(download_set.get('sources')) - - +# TODO: reduce to only used params def create_peer_connection( ice_servers : Optional[ctypes.Array[ctypes.c_char_p]] = None, ice_servers_count : int = 0, proxy_server : Optional[bytes] = None, @@ -52,27 +44,42 @@ def create_peer_connection( def build_media_description(media_type : str, payload_type : int, rtp_codec : str, media_direction : MediaDirection, media_id : int) -> bytes: - return '\r\n'.join( + lines =\ [ 'm=' + media_type + ' 9 UDP/TLS/RTP/SAVPF ' + str(payload_type), 'a=rtpmap:' + str(payload_type) + ' ' + rtp_codec, + 'a=rtcp-fb:' + str(payload_type) + ' nack', + 'a=rtcp-fb:' + str(payload_type) + ' nack pli', 'a=' + media_direction, 'a=mid:' + str(media_id), 'a=rtcp-mux', '' - ]).encode() + ] + return '\r\n'.join(lines).encode() -def add_audio_track(peer_connection : PeerConnection, media_direction : MediaDirection) -> RtcAudioTrack: +def parse_sdp_payload_types(sdp_offer : SdpOffer) -> Dict[str, int]: + payload_types : Dict[str, int] = {} + + for line in sdp_offer.splitlines(): + if line.startswith('a=rtpmap:') and 'VP8/90000' in line: + payload_types['vp8'] = int(line.split(':')[1].split(' ')[0]) + if line.startswith('a=rtpmap:') and 'opus/48000/2' in line: + payload_types['opus'] = int(line.split(':')[1].split(' ')[0]) + + return payload_types + + +def add_audio_track(peer_connection : PeerConnection, media_direction : MediaDirection, payload_type : int) -> RtcAudioTrack: datachannel_library = datachannel_module.create_static_library() - media_description = build_media_description('audio', 111, 'opus/48000/2', media_direction, 1) + media_description = build_media_description('audio', payload_type, 'opus/48000/2', media_direction, 1) audio_track = datachannel_library.rtcAddTrack(peer_connection, media_description) audio_packetizer = datachannel_module.define_rtc_packetizer_init() audio_packetizer.ssrc = 43 audio_packetizer.cname = b'audio' - audio_packetizer.payloadType = 111 + audio_packetizer.payloadType = payload_type audio_packetizer.clockRate = 48000 datachannel_library.rtcSetOpusPacketizer(audio_track, ctypes.byref(audio_packetizer)) @@ -81,16 +88,16 @@ def add_audio_track(peer_connection : PeerConnection, media_direction : MediaDir return audio_track -def add_video_track(peer_connection : PeerConnection, media_direction : MediaDirection) -> RtcVideoTrack: +def add_video_track(peer_connection : PeerConnection, media_direction : MediaDirection, payload_type : int) -> RtcVideoTrack: datachannel_library = datachannel_module.create_static_library() - media_description = build_media_description('video', 96, 'VP8/90000', media_direction, 0) + media_description = build_media_description('video', payload_type, 'VP8/90000', media_direction, 0) video_track = datachannel_library.rtcAddTrack(peer_connection, media_description) video_packetizer = datachannel_module.define_rtc_packetizer_init() video_packetizer.ssrc = 42 video_packetizer.cname = b'video' - video_packetizer.payloadType = 96 + video_packetizer.payloadType = payload_type video_packetizer.clockRate = 90000 video_packetizer.maxFragmentSize = 1200 @@ -118,6 +125,7 @@ def on_sdp_ready(peer_connection : int, sdp : Optional[bytes], sdp_type : int, u ctypes.cast(user_pointer, ctypes.py_object).value.set() +# TODO: unused callback, remove or wire up @ctypes.CFUNCTYPE(None, ctypes.c_int, ctypes.c_int, ctypes.c_void_p) def on_ice_complete(peer_connection : int, state : int, user_pointer : Optional[int]) -> None: if state == 2: @@ -125,6 +133,7 @@ def on_ice_complete(peer_connection : int, state : int, user_pointer : Optional[ context['event'].set() +# TODO: sanitize sdp_offer, wrap in run_in_executor, track peer connection state def negotiate_sdp(peer_connection : PeerConnection, sdp_offer : SdpOffer) -> Optional[SdpAnswer]: datachannel_library = datachannel_module.create_static_library() sdp_event = threading.Event() @@ -144,20 +153,38 @@ def negotiate_sdp(peer_connection : PeerConnection, sdp_offer : SdpOffer) -> Opt return None -def send_to_peers(rtc_peers : List[RtcPeer], data : bytes) -> None: +def send_video_to_peers(rtc_peers : List[RtcPeer], frame_buffer : bytes) -> None: datachannel_library = datachannel_module.create_static_library() if rtc_peers: timestamp = int(time.monotonic() * 90000) & 0xFFFFFFFF - data_buffer = ctypes.create_string_buffer(data) - data_total = len(data) + send_buffer = ctypes.create_string_buffer(frame_buffer) + send_total = len(frame_buffer) for rtc_peer in rtc_peers: video_track_id = rtc_peer.get('video_track') if video_track_id and datachannel_library.rtcIsOpen(video_track_id): datachannel_library.rtcSetTrackRtpTimestamp(video_track_id, timestamp) - datachannel_library.rtcSendMessage(video_track_id, data_buffer, data_total) + datachannel_library.rtcSendMessage(video_track_id, send_buffer, send_total) + + return None + + +def send_audio_to_peers(rtc_peers : List[RtcPeer], audio_buffer : bytes, audio_pts : int) -> None: + datachannel_library = datachannel_module.create_static_library() + + if rtc_peers: + timestamp = audio_pts & 0xFFFFFFFF + send_buffer = ctypes.create_string_buffer(audio_buffer) + send_total = len(audio_buffer) + + for rtc_peer in rtc_peers: + audio_track_id = rtc_peer.get('audio_track') + + if audio_track_id and datachannel_library.rtcIsOpen(audio_track_id): + datachannel_library.rtcSetTrackRtpTimestamp(audio_track_id, timestamp) + datachannel_library.rtcSendMessage(audio_track_id, send_buffer, send_total) return None diff --git a/facefusion/rtc_store.py b/facefusion/rtc_store.py index 6573d654..07b9ba67 100644 --- a/facefusion/rtc_store.py +++ b/facefusion/rtc_store.py @@ -21,11 +21,13 @@ def destroy_rtc_stream(session_id : SessionId) -> None: rtc.delete_peers(rtc_peers) +# TODO: clean up peer connection on failed sdp negotiation, wrap in run_in_executor to avoid blocking async event loop def add_rtc_viewer(session_id : SessionId, sdp_offer : SdpOffer) -> Optional[SdpAnswer]: if session_id in RTC_STREAMS: + payload_types = rtc.parse_sdp_payload_types(sdp_offer) peer_connection : PeerConnection = rtc.create_peer_connection() - audio_track : RtcAudioTrack = rtc.add_audio_track(peer_connection, 'sendonly') - video_track : RtcVideoTrack = rtc.add_video_track(peer_connection, 'sendonly') + audio_track : RtcAudioTrack = rtc.add_audio_track(peer_connection, 'sendonly', payload_types.get('opus', 111)) + video_track : RtcVideoTrack = rtc.add_video_track(peer_connection, 'sendonly', payload_types.get('vp8', 96)) local_sdp = rtc.negotiate_sdp(peer_connection, sdp_offer) if local_sdp: @@ -42,8 +44,16 @@ def add_rtc_viewer(session_id : SessionId, sdp_offer : SdpOffer) -> Optional[Sdp return None -def send_rtc_frame(session_id : SessionId, frame_data : bytes) -> None: +# TODO: detect and remove dead peers +def send_rtc_video(session_id : SessionId, frame_buffer : bytes) -> None: rtc_peers = get_rtc_stream(session_id) if rtc_peers: - rtc.send_to_peers(rtc_peers, frame_data) + rtc.send_video_to_peers(rtc_peers, frame_buffer) + + +def send_rtc_audio(session_id : SessionId, audio_buffer : bytes, audio_pts : int) -> None: + rtc_peers = get_rtc_stream(session_id) + + if rtc_peers: + rtc.send_audio_to_peers(rtc_peers, audio_buffer, audio_pts) diff --git a/facefusion/types.py b/facefusion/types.py index 7e5b981e..a701ff6e 100755 --- a/facefusion/types.py +++ b/facefusion/types.py @@ -169,8 +169,6 @@ TempFrameFormat = Literal['bmp', 'jpeg', 'png', 'tiff'] AudioEncoder = Literal['flac', 'aac', 'libmp3lame', 'libopus', 'libvorbis', 'pcm_s16le', 'pcm_s32le'] ImageEncoder = Literal['bmp', 'mjpeg', 'png', 'tiff', 'libwebp'] VideoEncoder = Literal['libx264', 'libx264rgb', 'libx265', 'libvpx-vp9', 'h264_nvenc', 'hevc_nvenc', 'h264_amf', 'hevc_amf', 'h264_qsv', 'hevc_qsv', 'h264_videotoolbox', 'hevc_videotoolbox', 'rawvideo'] -EncoderDeadline = Literal['best', 'good', 'realtime'] -Muxer : TypeAlias = str AudioSet : TypeAlias = Dict[AudioFormat, str] ImageSet : TypeAlias = Dict[ImageFormat, str] VideoSet : TypeAlias = Dict[VideoFormat, str] @@ -264,14 +262,6 @@ BenchmarkCycleSet = TypedDict('BenchmarkCycleSet', WebcamMode = Literal['inline', 'udp', 'v4l2'] StreamMode = Literal['udp', 'v4l2'] -WebSocketStreamMode = Literal['image', 'video'] - -RtcOfferSet = TypedDict('RtcOfferSet', -{ - 'sdp': str, - 'type': str -}) - RtcVideoTrack : TypeAlias = int RtcAudioTrack : TypeAlias = int PeerConnection : TypeAlias = int diff --git a/tests/stream_helper.py b/tests/stream_helper.py index 04967b6c..1dbcf9d3 100644 --- a/tests/stream_helper.py +++ b/tests/stream_helper.py @@ -10,6 +10,7 @@ from facefusion.libraries import datachannel as datachannel_module from facefusion.types import SdpOffer +# TODO: remove, use rtc.create_sdp with recvonly tracks instead def create_sdp_offer() -> Optional[SdpOffer]: datachannel_library = datachannel_module.create_static_library() peer_connection = rtc.create_peer_connection(disable_auto_negotiation = True) @@ -34,9 +35,10 @@ def create_sdp_offer() -> Optional[SdpOffer]: return None +# TODO: remove, inline into test_api_stream.py def open_websocket_stream(test_client : TestClient, subprotocols : list[str], source_content : bytes, ready_event : threading.Event, stop_event : threading.Event) -> None: - with test_client.websocket_connect('/stream', subprotocols = subprotocols) as websocket: - websocket.send_bytes(source_content) + with test_client.websocket_connect('/stream?mode=video', subprotocols = subprotocols) as websocket: + websocket.send_bytes(b'\x01' + source_content) websocket.receive_text() ready_event.set() - stop_event.wait() + stop_event.wait(timeout = 15) diff --git a/tests/test_api_assets.py b/tests/test_api_assets.py index bc373082..8abfe7d0 100644 --- a/tests/test_api_assets.py +++ b/tests/test_api_assets.py @@ -23,12 +23,6 @@ def before_all() -> None: subprocess.run([ 'ffmpeg', '-i', get_test_example_file('target-240p.mp4'), '-vframes', '1', get_test_example_file('target-240p.jpg') ]) -@pytest.fixture(scope = 'module') -def test_client() -> Iterator[TestClient]: - with TestClient(create_api()) as test_client: - yield test_client - - @pytest.fixture(scope = 'function', autouse = True) def before_each() -> None: state_manager.init_item('temp_path', tempfile.gettempdir()) @@ -37,6 +31,12 @@ def before_each() -> None: asset_store.clear() +@pytest.fixture(scope = 'module') +def test_client() -> Iterator[TestClient]: + with TestClient(create_api()) as test_client: + yield test_client + + def test_upload_asset(test_client : TestClient) -> None: upload_response = test_client.post('/assets?type=source') diff --git a/tests/test_api_capabilities.py b/tests/test_api_capabilities.py index 04535c4f..a6c5dbfa 100644 --- a/tests/test_api_capabilities.py +++ b/tests/test_api_capabilities.py @@ -8,8 +8,8 @@ from facefusion import capability_store, session_manager from facefusion.apis.core import create_api -@pytest.fixture(scope = 'module') -def test_client() -> Iterator[TestClient]: +@pytest.fixture(scope = 'module', autouse = True) +def before_all() -> None: program = ArgumentParser() capability_store.register_capability_set( [ @@ -31,15 +31,18 @@ def test_client() -> Iterator[TestClient]: scopes = [ 'api' ] ) - with TestClient(create_api()) as test_client: - yield test_client - @pytest.fixture(scope = 'function', autouse = True) def before_each() -> None: session_manager.SESSIONS.clear() +@pytest.fixture(scope = 'module') +def test_client() -> Iterator[TestClient]: + with TestClient(create_api()) as test_client: + yield test_client + + def test_get_capabilities(test_client : TestClient) -> None: capabilities_response = test_client.get('/capabilities') capabilities_body = capabilities_response.json() diff --git a/tests/test_api_metrics.py b/tests/test_api_metrics.py index 119838cc..ac424bde 100644 --- a/tests/test_api_metrics.py +++ b/tests/test_api_metrics.py @@ -8,17 +8,17 @@ from facefusion import metadata, session_manager from facefusion.apis.core import create_api +@pytest.fixture(scope = 'function', autouse = True) +def before_each() -> None: + session_manager.SESSIONS.clear() + + @pytest.fixture(scope = 'module') def test_client() -> Iterator[TestClient]: with TestClient(create_api()) as test_client: yield test_client -@pytest.fixture(scope = 'function', autouse = True) -def before_each() -> None: - session_manager.SESSIONS.clear() - - @pytest.fixture(scope = 'function', autouse = True) def mock_detect_execution_devices(mocker : MockerFixture) -> None: mocker.patch('facefusion.system.state_manager.get_temp_path', return_value = '/tmp') diff --git a/tests/test_api_ping.py b/tests/test_api_ping.py index 138da435..6d01ba95 100644 --- a/tests/test_api_ping.py +++ b/tests/test_api_ping.py @@ -7,17 +7,17 @@ from facefusion import metadata, session_manager from facefusion.apis.core import create_api +@pytest.fixture(scope = 'function', autouse = True) +def before_each() -> None: + session_manager.SESSIONS.clear() + + @pytest.fixture(scope = 'module') def test_client() -> Iterator[TestClient]: with TestClient(create_api()) as test_client: yield test_client -@pytest.fixture(scope = 'function', autouse = True) -def before_each() -> None: - session_manager.SESSIONS.clear() - - def test_ping(test_client : TestClient) -> None: create_session_response = test_client.post('/session', json = { diff --git a/tests/test_api_session.py b/tests/test_api_session.py index fa9c58d8..85e7178b 100644 --- a/tests/test_api_session.py +++ b/tests/test_api_session.py @@ -10,17 +10,17 @@ from facefusion.apis.core import create_api from facefusion.types import Session +@pytest.fixture(scope = 'function', autouse = True) +def before_each() -> None: + session_manager.SESSIONS.clear() + + @pytest.fixture(scope = 'module') def test_client() -> Iterator[TestClient]: with TestClient(create_api()) as test_client: yield test_client -@pytest.fixture(scope = 'function', autouse = True) -def before_each() -> None: - session_manager.SESSIONS.clear() - - def test_create_session(test_client : TestClient) -> None: create_session_response = test_client.post('/session', json = { diff --git a/tests/test_api_state.py b/tests/test_api_state.py index 5d150590..1d3ce0ca 100644 --- a/tests/test_api_state.py +++ b/tests/test_api_state.py @@ -14,16 +14,6 @@ from .assert_helper import get_test_example_file, get_test_examples_directory @pytest.fixture(scope = 'module', autouse = True) def before_all() -> None: - conditional_download(get_test_examples_directory(), - [ - 'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/source.jpg', - 'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/target-240p.mp4' - ]) - subprocess.run([ 'ffmpeg', '-i', get_test_example_file('target-240p.mp4'), '-vframes', '1', get_test_example_file('target-240p.jpg') ]) - - -@pytest.fixture(scope = 'module') -def test_client() -> Iterator[TestClient]: program = ArgumentParser() capability_store.register_capability_set( [ @@ -51,10 +41,15 @@ def test_client() -> Iterator[TestClient]: ], scopes = [ 'api' ] ) + state_manager.init_item('execution_providers', [ 'cpu' ]) - with TestClient(create_api()) as test_client: - yield test_client + conditional_download(get_test_examples_directory(), + [ + 'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/source.jpg', + 'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/target-240p.mp4' + ]) + subprocess.run([ 'ffmpeg', '-i', get_test_example_file('target-240p.mp4'), '-vframes', '1', get_test_example_file('target-240p.jpg') ]) @pytest.fixture(scope = 'function', autouse = True) @@ -63,6 +58,12 @@ def before_each() -> None: asset_store.clear() +@pytest.fixture(scope = 'module') +def test_client() -> Iterator[TestClient]: + with TestClient(create_api()) as test_client: + yield test_client + + def test_get_state(test_client : TestClient) -> None: get_state_response = test_client.get('/state') diff --git a/tests/test_api_stream.py b/tests/test_api_stream.py index e3ca0748..49efad3d 100644 --- a/tests/test_api_stream.py +++ b/tests/test_api_stream.py @@ -12,26 +12,18 @@ from facefusion.apis import asset_store from facefusion.apis.core import create_api from facefusion.core import common_pre_check, processors_pre_check from facefusion.download import conditional_download +from facefusion.libraries import datachannel as datachannel_module from .assert_helper import get_test_example_file, get_test_examples_directory from .stream_helper import create_sdp_offer, open_websocket_stream @pytest.fixture(scope = 'module', autouse = True) def before_all() -> None: - conditional_download(get_test_examples_directory(), - [ - 'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/source.jpg' - ]) - - -@pytest.fixture(scope = 'module') -def test_client() -> Iterator[TestClient]: state_manager.init_item('execution_device_ids', [ 0 ]) state_manager.init_item('execution_providers', [ 'cpu' ]) state_manager.init_item('download_providers', [ 'github', 'huggingface' ]) state_manager.init_item('temp_path', tempfile.gettempdir()) state_manager.init_item('processors', [ 'face_swapper' ]) - state_manager.init_item('face_selector_mode', 'many') state_manager.init_item('face_detector_model', 'yolo_face') state_manager.init_item('face_detector_size', '640x640') state_manager.init_item('face_detector_score', 0.5) @@ -44,24 +36,29 @@ def test_client() -> Iterator[TestClient]: state_manager.init_item('face_mask_padding', [ 0, 0, 0, 0 ]) state_manager.init_item('face_swapper_model', 'hyperswap_1a_256') state_manager.init_item('face_swapper_pixel_boost', '256x256') - state_manager.init_item('face_swapper_weight', 0.5) common_pre_check() processors_pre_check() + datachannel_module.pre_check() - with TestClient(create_api()) as test_client: - yield test_client + conditional_download(get_test_examples_directory(), + [ + 'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/source.jpg' + ]) @pytest.fixture(scope = 'function', autouse = True) def before_each() -> None: - state_manager.init_item('source_paths', None) session_manager.SESSIONS.clear() asset_store.clear() -# TODO: enable again -@pytest.mark.skip +@pytest.fixture(scope = 'module') +def test_client() -> Iterator[TestClient]: + with TestClient(create_api()) as test_client: + yield test_client + + def test_stream_image(test_client : TestClient) -> None: create_session_response = test_client.post('/session', json = { @@ -92,10 +89,9 @@ def test_stream_image(test_client : TestClient) -> None: assert select_response.status_code == 200 - with test_client.websocket_connect('/stream', subprotocols = + with test_client.websocket_connect('/stream?mode=image', subprotocols = [ - 'access_token.' + access_token, - 'image' + 'access_token.' + access_token ]) as websocket: websocket.send_bytes(source_content) output_bytes = websocket.receive_bytes() @@ -104,8 +100,6 @@ def test_stream_image(test_client : TestClient) -> None: assert output_vision_frame.shape == (1024, 1024, 3) -# TODO: enable again -@pytest.mark.skip def test_stream_video(test_client : TestClient) -> None: create_session_response = test_client.post('/session', json = { @@ -137,9 +131,11 @@ def test_stream_video(test_client : TestClient) -> None: ready_event = threading.Event() stop_event = threading.Event() #TODO: use asyncio - stream_thread = threading.Thread(target = open_websocket_stream, args = (test_client, [ 'access_token.' + access_token, 'video' ], source_content, ready_event, stop_event)) + stream_thread = threading.Thread(target = open_websocket_stream, args = (test_client, [ 'access_token.' + access_token ], source_content, ready_event, stop_event)) stream_thread.start() - ready_event.wait() + ready_event.wait(timeout = 10) + + assert ready_event.is_set() sdp_offer = create_sdp_offer() stream_response = test_client.post('/stream', content = sdp_offer, headers = @@ -152,4 +148,4 @@ def test_stream_video(test_client : TestClient) -> None: assert stream_response.text stop_event.set() - stream_thread.join() + stream_thread.join(timeout = 10) diff --git a/tests/test_ffmpeg.py b/tests/test_ffmpeg.py index 54397ed5..8a44cfb5 100644 --- a/tests/test_ffmpeg.py +++ b/tests/test_ffmpeg.py @@ -1,5 +1,4 @@ import os -import struct import subprocess import tempfile @@ -8,7 +7,7 @@ import pytest import facefusion.ffmpeg from facefusion import process_manager, state_manager from facefusion.download import conditional_download -from facefusion.ffmpeg import concat_video, extract_frames, merge_video, read_audio_buffer, replace_audio, restore_audio, sanitize_audio, sanitize_image, sanitize_video, spawn_frames, spawn_stream +from facefusion.ffmpeg import concat_video, extract_frames, merge_video, read_audio_buffer, replace_audio, restore_audio, sanitize_audio, sanitize_image, sanitize_video, spawn_frames from facefusion.ffprobe import probe_entries from facefusion.filesystem import copy_file, is_image from facefusion.temp_helper import clear_temp_directory, create_temp_directory, get_temp_file_path, resolve_temp_frame_paths @@ -256,24 +255,3 @@ def test_sanitize_video() -> None: assert probe_entries(output_paths[1], [ 'codec_name' ]).get('codec_name') == 'hevc' -def test_spawn_stream() -> None: # TODO: Improve test - test_set =\ - [ - ((426, 240), 25, 400, 800), - ((640, 360), 30, 1000, 2000), - ((1280, 720), 30, 2000, 4000) - ] - - for resolution, stream_fps, stream_bitrate, stream_bufsize in test_set: - encoder = spawn_stream(resolution, stream_fps, stream_bitrate, stream_bufsize) - frame_size = resolution[0] * resolution[1] * 3 - stdout, _ = encoder.communicate(input = bytes(frame_size)) - - assert len(stdout) > 32 - frame_header = stdout[:32] - - assert frame_header[:4] == b'DKIF' - output_width = struct.unpack_from(' None: @@ -110,10 +110,6 @@ def test_set_video_quality() -> None: assert set_video_quality('hevc_videotoolbox', 100) == [ '-b:v', '50512k' ] -def test_use_wallclock_timestamps() -> None: - assert use_wallclock() == [ '-use_wallclock_as_timestamps', '1' ] - - def test_capture_video() -> None: assert capture_video() == [ '-f', 'rawvideo', '-pix_fmt', 'rgb24' ] @@ -128,27 +124,3 @@ def test_set_stream_quality() -> None: assert set_stream_quality(2000) == [ '-b:v', '2000k' ] -def test_set_keyframe_interval() -> None: - assert set_stream_keyframe(30) == [ '-g', '30', '-keyint_min', '30' ] - assert set_stream_keyframe(60) == [ '-g', '60', '-keyint_min', '60' ] - - -def test_set_output_format() -> None: - assert set_muxer('ivf') == [ '-f', 'ivf' ] - assert set_muxer('mpegts') == [ '-f', 'mpegts' ] - - -def test_set_video_bufsize() -> None: - assert set_video_bufsize(800) == [ '-bufsize', '800k' ] - assert set_video_bufsize(4000) == [ '-bufsize', '4000k' ] - - -def test_set_encoder_deadline() -> None: - assert set_encoder_deadline('best') == [ '-deadline', 'best' ] - assert set_encoder_deadline('good') == [ '-deadline', 'good' ] - assert set_encoder_deadline('realtime') == [ '-deadline', 'realtime' ] - - -def test_set_lag_in_frames() -> None: - assert set_lag_in_frames(0) == [ '-lag-in-frames', '0' ] - assert set_lag_in_frames(16) == [ '-lag-in-frames', '16' ] diff --git a/tests/test_rtc.py b/tests/test_rtc.py index 0bd009f5..d20ee1c4 100644 --- a/tests/test_rtc.py +++ b/tests/test_rtc.py @@ -9,12 +9,13 @@ from facefusion.types import RtcPeer @pytest.fixture(scope = 'module') def before_all() -> None: - rtc.pre_check() + datachannel_module.pre_check() +# TODO: add test_parse_sdp_payload_types def test_build_media_description() -> None: - assert rtc.build_media_description('audio', 111, 'opus/48000/2', 'sendonly', 1) == b'm=audio 9 UDP/TLS/RTP/SAVPF 111\r\na=rtpmap:111 opus/48000/2\r\na=sendonly\r\na=mid:1\r\na=rtcp-mux\r\n' - assert rtc.build_media_description('video', 96, 'VP8/90000', 'recvonly', 0) == b'm=video 9 UDP/TLS/RTP/SAVPF 96\r\na=rtpmap:96 VP8/90000\r\na=recvonly\r\na=mid:0\r\na=rtcp-mux\r\n' + assert rtc.build_media_description('audio', 111, 'opus/48000/2', 'sendonly', 1) == b'm=audio 9 UDP/TLS/RTP/SAVPF 111\r\na=rtpmap:111 opus/48000/2\r\na=rtcp-fb:111 nack\r\na=rtcp-fb:111 nack pli\r\na=sendonly\r\na=mid:1\r\na=rtcp-mux\r\n' + assert rtc.build_media_description('video', 96, 'VP8/90000', 'recvonly', 0) == b'm=video 9 UDP/TLS/RTP/SAVPF 96\r\na=rtpmap:96 VP8/90000\r\na=rtcp-fb:96 nack\r\na=rtcp-fb:96 nack pli\r\na=recvonly\r\na=mid:0\r\na=rtcp-mux\r\n' # TODO: enable again @@ -32,7 +33,7 @@ def test_create_peer_connection() -> None: def test_add_audio_track() -> None: peer_connection = rtc.create_peer_connection() - assert rtc.add_audio_track(peer_connection, 'sendonly') > 0 + assert rtc.add_audio_track(peer_connection, 'sendonly', 111) > 0 datachannel_module.create_static_library().rtcDeletePeerConnection(peer_connection) @@ -42,7 +43,7 @@ def test_add_audio_track() -> None: def test_add_video_track() -> None: peer_connection = rtc.create_peer_connection() - assert rtc.add_video_track(peer_connection, 'sendonly') > 0 + assert rtc.add_video_track(peer_connection, 'sendonly', 96) > 0 datachannel_module.create_static_library().rtcDeletePeerConnection(peer_connection) @@ -53,13 +54,13 @@ def test_negotiate_sdp() -> None: datachannel_library = datachannel_module.create_static_library() sender_connection = rtc.create_peer_connection() - rtc.add_video_track(sender_connection, 'sendonly') - rtc.add_audio_track(sender_connection, 'sendonly') + rtc.add_video_track(sender_connection, 'sendonly', 96) + rtc.add_audio_track(sender_connection, 'sendonly', 111) sdp_offer = rtc.create_sdp(sender_connection) receiver_connection = rtc.create_peer_connection() - rtc.add_video_track(receiver_connection, 'recvonly') - rtc.add_audio_track(receiver_connection, 'recvonly') + rtc.add_video_track(receiver_connection, 'recvonly', 96) + rtc.add_audio_track(receiver_connection, 'recvonly', 111) sdp_answer = rtc.negotiate_sdp(receiver_connection, sdp_offer) assert sdp_answer diff --git a/tests/test_rtc_store.py b/tests/test_rtc_store.py new file mode 100644 index 00000000..1f8c789c --- /dev/null +++ b/tests/test_rtc_store.py @@ -0,0 +1,18 @@ +import pytest + +from facefusion.libraries import datachannel as datachannel_module + + +@pytest.fixture(scope = 'module') +def before_all() -> None: + datachannel_module.pre_check() + + +# TODO: test create_rtc_stream, get_rtc_stream, destroy_rtc_stream lifecycle +def test_rtc_stream_lifecycle() -> None: + pass + + +# TODO: test add_rtc_viewer with valid session and sdp offer +def test_add_rtc_viewer() -> None: + pass diff --git a/tests/test_stream_helper.py b/tests/test_stream_helper.py deleted file mode 100644 index 31b86506..00000000 --- a/tests/test_stream_helper.py +++ /dev/null @@ -1,31 +0,0 @@ -import os - -from facefusion.apis.stream_helper import calculate_bitrate, calculate_buffer_size, read_pipe_buffer - - -def test_calculate_bitrate() -> None: - assert calculate_bitrate((320, 240)) == 674 - assert calculate_bitrate((640, 480)) == 1347 - assert calculate_bitrate((1280, 720)) == 2333 - assert calculate_bitrate((1920, 1080)) == 3500 - assert calculate_bitrate((3840, 2160)) == 7000 - - -def test_calculate_buffer_size() -> None: - assert calculate_buffer_size((320, 240)) == 1348 - assert calculate_buffer_size((640, 480)) == 2694 - assert calculate_buffer_size((1280, 720)) == 4666 - assert calculate_buffer_size((1920, 1080)) == 7000 - assert calculate_buffer_size((3840, 2160)) == 14000 - - -def test_read_pipe_buffer() -> None: - read_pipe, write_pipe = os.pipe() - os.write(write_pipe, b'123456') - os.close(write_pipe) - - assert read_pipe_buffer(read_pipe, 3) == b'123' - assert read_pipe_buffer(read_pipe, 3) == b'456' - assert read_pipe_buffer(read_pipe, 1) is None - - os.close(read_pipe)