mirror of
https://github.com/facefusion/facefusion.git
synced 2026-05-12 18:32:18 +02:00
Refactor/ffmpeg less stream (#1092)
* remove ffmpeg from stream to use opus and vpx, add bunch of todos * fix testing * improve download checkout * fix datachannel download, fix super dirty test clients - setup logic does not belong there * fix testing
This commit is contained in:
+1
-1
@@ -1,8 +1,8 @@
|
||||
__pycache__
|
||||
.assets
|
||||
.binaries
|
||||
.claude
|
||||
.caches
|
||||
.idea
|
||||
.jobs
|
||||
.libraries
|
||||
.vscode
|
||||
|
||||
@@ -1,262 +0,0 @@
|
||||
# Regression TODO
|
||||
|
||||
## WHEP SDP negotiation blocks the async event loop
|
||||
|
||||
`negotiate_sdp` polls with `time.sleep(0.05)` in a loop for up to 5 seconds. Calling it directly from the async handler freezes all requests and WebSocket traffic.
|
||||
|
||||
Before:
|
||||
|
||||
```python
|
||||
async def post_stream(request : Request) -> Response:
|
||||
sdp_answer = rtc_store.add_rtc_viewer(session_id, sdp_offer)
|
||||
```
|
||||
|
||||
After:
|
||||
|
||||
```python
|
||||
async def post_stream(request : Request) -> Response:
|
||||
event_loop = asyncio.get_running_loop()
|
||||
sdp_answer = await event_loop.run_in_executor(None, rtc_store.add_rtc_viewer, session_id, sdp_offer)
|
||||
```
|
||||
|
||||
## Stream data uses magic byte sniffing instead of channel metadata
|
||||
|
||||
The WebSocket received both vision frames and PCM audio as binary messages. The old code checked for JPEG magic bytes (`\xff\xd8`) to distinguish them. This is wrong because it only supports JPEG, breaks when audio happens to start with `0xff 0xd8`, and fails silently for PNG or other image formats.
|
||||
|
||||
Before:
|
||||
|
||||
```python
|
||||
JPEG_MAGIC : bytes = b'\xff\xd8'
|
||||
|
||||
if data[:2] == JPEG_MAGIC:
|
||||
vision_frame = cv2.imdecode(numpy.frombuffer(data, numpy.uint8), cv2.IMREAD_COLOR)
|
||||
|
||||
if data[:2] != JPEG_MAGIC:
|
||||
rtc_store.send_rtc_audio(session_id, data)
|
||||
```
|
||||
|
||||
After:
|
||||
|
||||
```python
|
||||
if data[0] == ord('v'):
|
||||
vision_frame = cv2.imdecode(numpy.frombuffer(data[1:], numpy.uint8), cv2.IMREAD_COLOR)
|
||||
|
||||
if data[0] == ord('a'):
|
||||
rtc_store.send_rtc_audio(session_id, data[1:])
|
||||
```
|
||||
|
||||
The client prepends a single byte (`v` or `a`) to each message. The server reads the first byte to route the payload — format-agnostic and explicit.
|
||||
|
||||
Before (client):
|
||||
|
||||
```javascript
|
||||
ws.send(buf);
|
||||
ws.send(pcm.buffer);
|
||||
```
|
||||
|
||||
After (client):
|
||||
|
||||
```javascript
|
||||
var prefixed = new Uint8Array(buf.byteLength + 1);
|
||||
prefixed[0] = 118; // 'v'
|
||||
prefixed.set(new Uint8Array(buf), 1);
|
||||
ws.send(prefixed.buffer);
|
||||
|
||||
var prefixed = new Uint8Array(pcm.buffer.byteLength + 1);
|
||||
prefixed[0] = 97; // 'a'
|
||||
prefixed.set(new Uint8Array(pcm.buffer), 1);
|
||||
ws.send(prefixed.buffer);
|
||||
```
|
||||
|
||||
## SDP negotiation polls with sleep loop instead of using callbacks
|
||||
|
||||
`negotiate_sdp` polls `rtcGetLocalDescription` every 50ms for up to 5 seconds. This wastes CPU and adds latency because the answer might be ready after 5ms but we sleep the full 50ms. libdatachannel provides `rtcSetLocalDescriptionCallback` which fires exactly when the SDP answer is ready.
|
||||
|
||||
Before:
|
||||
|
||||
```python
|
||||
def negotiate_sdp(peer_connection : int, sdp_offer : str) -> Optional[str]:
|
||||
rtc_library.rtcSetRemoteDescription(peer_connection, sdp_offer.encode('utf-8'), b'offer')
|
||||
buffer_size = 16384
|
||||
buffer_string = ctypes.create_string_buffer(buffer_size)
|
||||
wait_limit = time.monotonic() + 5
|
||||
|
||||
while time.monotonic() < wait_limit:
|
||||
if rtc_library.rtcGetLocalDescription(peer_connection, buffer_string, buffer_size) > 0:
|
||||
return buffer_string.value.decode()
|
||||
time.sleep(0.05)
|
||||
|
||||
return None
|
||||
```
|
||||
|
||||
After:
|
||||
|
||||
```python
|
||||
def negotiate_sdp(peer_connection : int, sdp_offer : str) -> Optional[str]:
|
||||
rtc_library.rtcSetRemoteDescription(peer_connection, sdp_offer.encode('utf-8'), b'offer')
|
||||
result = threading.Event()
|
||||
sdp_holder = [None]
|
||||
|
||||
@ctypes.CFUNCTYPE(None, ctypes.c_int, ctypes.c_char_p, ctypes.c_int, ctypes.c_void_p)
|
||||
def on_description(pc, sdp, sdp_type, user_ptr):
|
||||
sdp_holder[0] = sdp.decode()
|
||||
result.set()
|
||||
|
||||
rtc_library.rtcSetLocalDescriptionCallback(peer_connection, on_description)
|
||||
result.wait(timeout = 5)
|
||||
return sdp_holder[0]
|
||||
```
|
||||
|
||||
Uses a `threading.Event` to block until the callback fires — no polling, no wasted sleep cycles, instant response.
|
||||
|
||||
## No connection state tracking per peer
|
||||
|
||||
`send_to_peers` calls `rtcIsOpen` on the video track for every frame, every peer. There is no way to detect when a peer disconnects or fails — dead peers stay in the list until the stream is destroyed. The poc branch used `rtcSetStateChangeCallback` to track a `connected` flag per viewer and auto-clean dead connections.
|
||||
|
||||
Before:
|
||||
|
||||
```python
|
||||
def send_to_peers(peers, data):
|
||||
for rtc_peer in peers:
|
||||
video_track_id = rtc_peer.get('video_track')
|
||||
|
||||
if video_track_id and rtc_library.rtcIsOpen(video_track_id):
|
||||
rtc_library.rtcSetTrackRtpTimestamp(video_track_id, timestamp)
|
||||
rtc_library.rtcSendMessage(video_track_id, data_buffer, data_total)
|
||||
```
|
||||
|
||||
After:
|
||||
|
||||
```python
|
||||
@ctypes.CFUNCTYPE(None, ctypes.c_int, ctypes.c_int, ctypes.c_void_p)
|
||||
def on_state_change(pc, state, user_ptr):
|
||||
if state == 4: # RTC_FAILED
|
||||
mark_peer_disconnected(pc)
|
||||
if state == 5: # RTC_CLOSED
|
||||
mark_peer_disconnected(pc)
|
||||
|
||||
def handle_whep_offer(peers, sdp_offer):
|
||||
peer_connection = create_peer_connection()
|
||||
rtc_library.rtcSetStateChangeCallback(peer_connection, on_state_change)
|
||||
```
|
||||
|
||||
Avoids calling `rtcIsOpen` on every frame and allows removing dead peers immediately when the connection drops.
|
||||
|
||||
## SDP line endings break Firefox
|
||||
|
||||
SDP media descriptions used `os.linesep` which produces `\n` on Linux. RFC 4566 requires `\r\n` — Firefox rejected the SDP entirely while Chrome was lenient.
|
||||
|
||||
Before:
|
||||
|
||||
```python
|
||||
media_description = b'm=video 9 UDP/TLS/RTP/SAVPF 96' + os.linesep.encode() + b'a=rtpmap:96 VP8/90000' + os.linesep.encode()
|
||||
```
|
||||
|
||||
After:
|
||||
|
||||
```python
|
||||
media_description = ('m=video 9 UDP/TLS/RTP/SAVPF 96\r\na=rtpmap:96 VP8/90000\r\na=sendonly\r\na=mid:0\r\na=rtcp-mux\r\n').encode()
|
||||
```
|
||||
|
||||
## SDP payload types are hardcoded instead of negotiated
|
||||
|
||||
Payload types were hardcoded (VP8 PT 96, Opus PT 111). Chrome happens to use these, but Firefox offers VP8 PT 120 and Opus PT 109. The server answered with payload types Firefox never offered, so Firefox couldn't decode the RTP packets.
|
||||
|
||||
Before:
|
||||
|
||||
```python
|
||||
def handle_whep_offer(peers, sdp_offer):
|
||||
peer_connection = create_peer_connection()
|
||||
audio_track = add_audio_track(peer_connection)
|
||||
video_track = add_video_track(peer_connection)
|
||||
```
|
||||
|
||||
After:
|
||||
|
||||
```python
|
||||
def extract_payload_type(sdp_offer, media_type, codec_name, fallback):
|
||||
current_media = None
|
||||
|
||||
for line in sdp_offer.splitlines():
|
||||
if line.startswith('m=' + media_type):
|
||||
current_media = media_type
|
||||
if line.startswith('m=') and not line.startswith('m=' + media_type):
|
||||
current_media = None
|
||||
if current_media == media_type and line.startswith('a=rtpmap:') and codec_name in line:
|
||||
return int(line.split(':')[1].split(' ')[0])
|
||||
|
||||
return fallback
|
||||
|
||||
def handle_whep_offer(peers, sdp_offer):
|
||||
video_payload_type = extract_payload_type(sdp_offer, 'video', 'VP8/90000', 96)
|
||||
audio_payload_type = extract_payload_type(sdp_offer, 'audio', 'opus/48000', 111)
|
||||
peer_connection = create_peer_connection()
|
||||
audio_track = add_audio_track(peer_connection, payload_type = audio_payload_type)
|
||||
video_track = add_video_track(peer_connection, payload_type = video_payload_type)
|
||||
```
|
||||
|
||||
## Worker threads bypass API context and skip inference
|
||||
|
||||
`process_vision_frame` runs in a `ThreadPoolExecutor` where `detect_app_context()` walks the call stack and finds no `facefusion/apis/` frame — so it reads from the empty `cli` state and returns frames without face swap.
|
||||
|
||||
Before:
|
||||
|
||||
```python
|
||||
future = executor.submit(process_vision_frame, capture_frame)
|
||||
```
|
||||
|
||||
After:
|
||||
|
||||
```python
|
||||
def process_stream_frame(capture_frame : VisionFrame) -> VisionFrame:
|
||||
return process_vision_frame(capture_frame)
|
||||
|
||||
future = executor.submit(process_stream_frame, capture_frame)
|
||||
```
|
||||
|
||||
The wrapper lives in `facefusion/apis/stream_helper.py`, so `detect_app_context()` finds it on the call stack and resolves to `api`.
|
||||
|
||||
## Frame processing lacks a deque for fluent streaming
|
||||
|
||||
Without a deque, processed frames are sent one at a time — if inference is slower than the capture rate, frames queue up in futures and the output stutters. A `deque` buffers completed frames so the encoder can drain them smoothly while inference continues in parallel.
|
||||
|
||||
Before:
|
||||
|
||||
```python
|
||||
while not stop_event.is_set():
|
||||
capture_frame = latest_frame_holder[0]
|
||||
output_vision_frame = process_stream_frame(capture_frame)
|
||||
encoder.stdin.write(output_vision_frame.tobytes())
|
||||
```
|
||||
|
||||
After:
|
||||
|
||||
```python
|
||||
output_deque : deque[VisionFrame] = deque()
|
||||
|
||||
with ThreadPoolExecutor(max_workers = state_manager.get_item('execution_thread_count')) as executor:
|
||||
futures = []
|
||||
|
||||
while not stop_event.is_set():
|
||||
if capture_frame is not None and len(futures) < 4:
|
||||
future = executor.submit(process_stream_frame, capture_frame)
|
||||
futures.append(future)
|
||||
|
||||
for future_done in [ future for future in futures if future.done() ]:
|
||||
output_deque.append(future_done.result())
|
||||
futures.remove(future_done)
|
||||
|
||||
while output_deque:
|
||||
temp_vision_frame = output_deque.popleft()
|
||||
encoder.stdin.write(temp_vision_frame.tobytes())
|
||||
```
|
||||
|
||||
Consider reusing `multi_process_capture` from `streamer.py` — it has the same ThreadPoolExecutor + deque pattern. If it accepted a frame iterator instead of `cv2.VideoCapture`, both pipelines could share the same processing loop.
|
||||
|
||||
## Binary files belong in `.binaries/` at project root
|
||||
|
||||
The libdatachannel shared library downloads to `.binaries/` in the project root. This is not an asset — binary dependencies are platform-specific build artifacts and must stay separate from `.assets/`.
|
||||
|
||||
```python
|
||||
'path': resolve_relative_path('../.binaries/' + binary_name)
|
||||
```
|
||||
-76
@@ -1,76 +0,0 @@
|
||||
# RTC TODO
|
||||
|
||||
## Approach
|
||||
|
||||
1. vibe code mass testing to get broad coverage fast
|
||||
2. vibe code refactoring (naming, dead code, restructure)
|
||||
3. hand craft production code (libdatachannel, peer management, SDP)
|
||||
4. hand craft testing — thin it out to essentials only
|
||||
|
||||
## Unit Tests
|
||||
|
||||
| Function | Rating | Assertions |
|
||||
|---|---|---|
|
||||
| `create_peer_connection` | essential | returns valid peer connection id |
|
||||
| `add_audio_track` | essential | returns valid track id |
|
||||
| `add_video_track` | essential | returns valid track id |
|
||||
| `negotiate_sdp` | essential | returns sdp answer from valid offer, returns None on timeout |
|
||||
| `delete_peers` | essential | clears peer list |
|
||||
| `create_rtc_stream` | essential | initializes empty peer list |
|
||||
| `destroy_rtc_stream` | essential | cleans up peers, handles missing session |
|
||||
| `add_rtc_viewer` | nice to have | returns None for unknown session |
|
||||
| `resolve_binary_file` | skip | covered implicitly by `create_static_rtc_library` |
|
||||
| `handle_whep_offer` | skip | thin wrapper, covered by integration tests |
|
||||
| `send_to_peers` | skip | needs open tracks, covered by integration tests |
|
||||
| `send_rtc_frame` | skip | thin delegation to `send_to_peers` |
|
||||
|
||||
## Integration Tests
|
||||
|
||||
- [ ] test full SDP offer/answer roundtrip between two peer connections
|
||||
- [ ] test `send_to_peers` delivers frame data to a connected peer
|
||||
- [ ] test peer cleanup after `delete_peers` prevents further sends
|
||||
- [ ] test `add_rtc_viewer` followed by `send_rtc_frame` end-to-end
|
||||
- [ ] test multiple viewers receive frames from same stream
|
||||
- [ ] test viewer disconnect does not break remaining peers
|
||||
|
||||
## Dead Code
|
||||
|
||||
- [ ] remove `is_peer_connected` from `rtc.py`, never called
|
||||
- [ ] remove `pre_check` from `rtc.py`, never called
|
||||
- [ ] remove `get_rtc_stream` from `rtc_store.py`, never called
|
||||
- [ ] remove `RtcOfferSet` from `types.py`, never used
|
||||
|
||||
## Naming
|
||||
|
||||
- [ ] rename `rtc_bindings.py` to `libdatachannel.py` — owns C library loading, struct definitions, ctypes registration
|
||||
- [ ] rename `RTC_CONFIGURATION` to a proper class definition
|
||||
- [ ] rename `RTC_PACKETIZER_INIT` to a proper class definition
|
||||
- [ ] rename `init_ctypes` to something more specific like `register_argtypes`
|
||||
|
||||
## Unused Library Config
|
||||
|
||||
- [ ] `create_peer_connection` exposes 14 params but production only uses `disable_auto_negotiation`, `enable_ice_udp_mux`, `force_media_transport` — reduce to what is needed
|
||||
- [ ] `RTC_PACKETIZER_INIT` defines `nalSeparator`, `obuPacketization`, `playoutDelayId`, `playoutDelayMin`, `playoutDelayMax`, `sequenceNumber`, `timestamp` — none are set outside the struct definition, they are H264/AV1 specific and unused for VP8/Opus
|
||||
- [ ] `rtcSetLocalDescription` is used in tests but not registered in `rtc_bindings.py`
|
||||
|
||||
## Refactor
|
||||
|
||||
- [ ] extract shared media description builder from `rtc.py` and `tests/stream_helper.py` (see TODO in `tests/stream_helper.py`)
|
||||
- [ ] replace `type()` calls for ctypes structs in `rtc_bindings.py` with proper class definitions
|
||||
- [ ] move `resolve_binary_file`, `create_static_download_set`, `create_static_rtc_library` from `rtc.py` into `libdatachannel.py` — library init belongs with the bindings, `rtc.py` just consumes it
|
||||
- [ ] move `rtc_store.py` state into a proper store pattern consistent with other `*_store.py` files
|
||||
- [ ] replace `time.sleep` polling loops in `negotiate_sdp` and `create_sdp_offer` (test helper) with `rtcSetGatheringStateChangeCallback` — signal a `threading.Event` on ICE gathering complete, then `event.wait(timeout=5)` instead of spinning
|
||||
- [ ] `create_static_download_set` has hardcoded linux URLs with a TODO to use dynamic `binary_name`
|
||||
|
||||
## Violations
|
||||
|
||||
- `rtc.py:25` — comment on `create_static_download_set` (no comments)
|
||||
- `rtc.py:34` — comment on hash url (no comments)
|
||||
- `rtc.py:42` — comment on source url (no comments)
|
||||
- `rtc.py:205` — `send_to_peers` returns None explicitly on a void function (redundant)
|
||||
- `rtc_bindings.py:3,23` — `type()` to create structs is a class workaround (no classes, but these should be plain structs)
|
||||
- `tests/stream_helper.py:45` — `sdp` temp variable before return (no need for temporary variable in simple cases)
|
||||
|
||||
## Security
|
||||
|
||||
- `rtc.py:157` — `negotiate_sdp` passes unsanitized `sdp_offer` string directly to C library, no SDP format validation before hitting native code
|
||||
@@ -1,60 +0,0 @@
|
||||
# Stream API TODO
|
||||
|
||||
## Approach
|
||||
|
||||
1. vibe code mass testing to get broad coverage fast
|
||||
2. vibe code refactoring (naming, boilerplate extraction, validation)
|
||||
3. hand craft production code (stream pipeline, encoder loop, IVF parsing)
|
||||
4. hand craft testing — thin it out to essentials only
|
||||
|
||||
## Unit Tests
|
||||
|
||||
| Function | Rating | Assertions |
|
||||
|---|---|---|
|
||||
| `get_websocket_stream_mode` | essential | returns None for missing header, returns None for unknown protocol |
|
||||
| `forward_rtc_frames` | essential | reads IVF header and forwards frame data, handles broken pipe |
|
||||
| `run_encode_loop` | nice to have | drains deque and closes encoder |
|
||||
| `receive_vision_frames` | skip | async generator, covered by integration tests |
|
||||
| `submit_encoder_frame` | skip | thin glue between cv2 and subprocess stdin |
|
||||
| `websocket_stream` | skip | routing only, covered by integration tests |
|
||||
| `post_stream` | skip | covered by integration tests |
|
||||
|
||||
## Integration Tests
|
||||
|
||||
- [ ] test stream without session — expect rejection
|
||||
- [ ] test stream with expired or invalid token
|
||||
- [ ] test image stream without source selected
|
||||
- [ ] test video stream without source selected
|
||||
- [ ] test WHEP offer without active websocket stream
|
||||
- [ ] test WHEP offer with malformed SDP body
|
||||
- [ ] test WHEP offer with wrong content type
|
||||
- [ ] test multiple WHEP viewers on same stream
|
||||
- [ ] test websocket disconnect mid-stream triggers cleanup
|
||||
|
||||
## Naming
|
||||
|
||||
- [ ] rename `test_get_stream_mode` to `test_get_websocket_stream_mode` in `test_stream_helper.py` — does not match function name
|
||||
- [ ] rename `make_scope` in `test_stream_helper.py` to `make_websocket_scope` — more descriptive
|
||||
- [ ] `stream_helper.py` mixes pure helpers (`calculate_bitrate`) with async handlers (`handle_image_stream`) — consider splitting
|
||||
|
||||
## Dead Code
|
||||
|
||||
- [ ] `read_pipe_buffer` has a test but the test is disconnected from how it is actually used — the test reads from a closed pipe, production reads from a live subprocess stdout
|
||||
|
||||
## Refactor
|
||||
|
||||
- [ ] `handle_image_stream` and `handle_video_stream` share session setup boilerplate (subprotocol, access_token, session_id, source_paths, websocket accept) — extract common setup
|
||||
- [ ] `forward_rtc_frames` assumes IVF container format with hardcoded header size 32 and frame header size 12 — document or make configurable
|
||||
- [ ] `post_stream` does not validate content type is `application/sdp` before parsing body
|
||||
- [ ] `calculate_bitrate` has a TODO about improving the calculation
|
||||
- [ ] `handle_video_stream` has a hardcoded fallback `output_video_fps or 30` with a TODO to resolve from target video fps
|
||||
|
||||
## Violations
|
||||
|
||||
- `stream_helper.py:24` — comment on `calculate_bitrate` (no comments)
|
||||
- `stream_helper.py:143` — comment on `output_video_fps` fallback (no comments)
|
||||
- `test_stream_helper.py:30` — `test_get_stream_mode` does not match function name `get_websocket_stream_mode` (naming convention)
|
||||
|
||||
## Security
|
||||
|
||||
- `endpoints/stream.py:27` — `request.body().decode()` is not sanitized, raw user input forwarded to C library via RTC layer
|
||||
@@ -8,6 +8,7 @@ from facefusion.apis.session_helper import extract_access_token
|
||||
from facefusion.apis.stream_helper import handle_image_stream, handle_video_stream
|
||||
|
||||
|
||||
# TODO: reject websocket with invalid or missing stream mode
|
||||
async def websocket_stream(websocket : WebSocket) -> None:
|
||||
stream_mode = websocket.query_params.get('mode')
|
||||
|
||||
@@ -18,14 +19,15 @@ async def websocket_stream(websocket : WebSocket) -> None:
|
||||
await handle_video_stream(websocket)
|
||||
|
||||
|
||||
# TODO: validate content type is application/sdp, sanitize sdp input
|
||||
async def post_stream(request : Request) -> Response:
|
||||
access_token = extract_access_token(request.scope)
|
||||
session_id = session_manager.find_session_id(access_token)
|
||||
session_context.set_session_id(session_id)
|
||||
|
||||
if session_id:
|
||||
sdp_offer = (await request.body()).decode()
|
||||
sdp_answer = rtc_store.add_rtc_viewer(session_id, sdp_offer)
|
||||
sdp_offer = await request.body()
|
||||
sdp_answer = rtc_store.add_rtc_viewer(session_id, sdp_offer.decode())
|
||||
|
||||
if sdp_answer:
|
||||
return Response(sdp_answer, status_code = HTTP_201_CREATED, media_type = 'application/sdp')
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
import asyncio
|
||||
import math
|
||||
import os
|
||||
import subprocess
|
||||
import ctypes
|
||||
import multiprocessing
|
||||
import struct
|
||||
from collections import deque
|
||||
from collections.abc import AsyncIterator
|
||||
from typing import Optional
|
||||
from typing import Optional, Tuple
|
||||
|
||||
import cv2
|
||||
import numpy
|
||||
@@ -13,35 +13,21 @@ from starlette.websockets import WebSocket, WebSocketState
|
||||
from facefusion import rtc_store, session_context, session_manager, state_manager
|
||||
from facefusion.apis.api_helper import get_sec_websocket_protocol
|
||||
from facefusion.apis.session_helper import extract_access_token
|
||||
from facefusion.common_helper import is_linux, is_macos
|
||||
from facefusion.ffmpeg import spawn_stream
|
||||
from facefusion.libraries import opus as opus_module, vpx as vpx_module
|
||||
from facefusion.streamer import process_vision_frame
|
||||
from facefusion.types import Resolution, SessionId, VisionFrame
|
||||
|
||||
|
||||
def calculate_bitrate(resolution : Resolution) -> int: # TODO : improve the bitrate calculation
|
||||
pixel_total = resolution[0] * resolution[1]
|
||||
bitrate_factor = 3500 / math.sqrt(1920 * 1080)
|
||||
return max(400, round(math.sqrt(pixel_total) * bitrate_factor))
|
||||
async def receive_stream_frames(websocket : WebSocket) -> AsyncIterator[Tuple[int, bytes]]:
|
||||
websocket_event = await websocket.receive()
|
||||
|
||||
while websocket_event.get('type') == 'websocket.receive':
|
||||
frame_buffer = websocket_event.get('bytes') or b''
|
||||
|
||||
def calculate_buffer_size(resolution : Resolution) -> int:
|
||||
return calculate_bitrate(resolution) * 2
|
||||
if len(frame_buffer) > 1:
|
||||
yield frame_buffer[0], frame_buffer[1:]
|
||||
|
||||
|
||||
def read_pipe_buffer(pipe_handle : int, size : int) -> Optional[bytes]:
|
||||
byte_buffer = bytearray()
|
||||
frame_data = os.read(pipe_handle, size - len(byte_buffer))
|
||||
|
||||
while frame_data:
|
||||
byte_buffer += frame_data
|
||||
|
||||
if len(byte_buffer) == size:
|
||||
return bytes(byte_buffer)
|
||||
|
||||
frame_data = os.read(pipe_handle, size - len(byte_buffer))
|
||||
|
||||
return None
|
||||
websocket_event = await websocket.receive()
|
||||
|
||||
|
||||
async def receive_vision_frames(websocket : WebSocket) -> AsyncIterator[VisionFrame]:
|
||||
@@ -57,41 +43,165 @@ async def receive_vision_frames(websocket : WebSocket) -> AsyncIterator[VisionFr
|
||||
websocket_event = await websocket.receive()
|
||||
|
||||
|
||||
def forward_rtc_frames(encoder : subprocess.Popen[bytes], session_id : SessionId) -> None:
|
||||
pipe_handle = encoder.stdout.fileno()
|
||||
# TODO: move to facefusion/vpx_encoder.py
|
||||
def create_vpx_encoder(width : int, height : int, bitrate : int) -> Optional[ctypes.Array[ctypes.c_char]]:
|
||||
vpx_library = vpx_module.create_static_library()
|
||||
|
||||
if is_linux() or is_macos():
|
||||
os.set_blocking(pipe_handle, True)
|
||||
if vpx_library:
|
||||
vp8_iface = ctypes.c_void_p.in_dll(vpx_library, 'vpx_codec_vp8_cx_algo')
|
||||
config_buffer = ctypes.create_string_buffer(4096)
|
||||
|
||||
header = read_pipe_buffer(pipe_handle, 32)
|
||||
if vpx_library.vpx_codec_enc_config_default(ctypes.byref(vp8_iface), config_buffer, 0) == 0:
|
||||
thread_count = min(multiprocessing.cpu_count(), 8)
|
||||
struct.pack_into('I', config_buffer, 4, thread_count)
|
||||
struct.pack_into('I', config_buffer, 12, width)
|
||||
struct.pack_into('I', config_buffer, 16, height)
|
||||
struct.pack_into('I', config_buffer, 72, 2)
|
||||
struct.pack_into('I', config_buffer, 112, bitrate)
|
||||
struct.pack_into('I', config_buffer, 116, 2)
|
||||
struct.pack_into('I', config_buffer, 120, 50)
|
||||
struct.pack_into('I', config_buffer, 124, 50)
|
||||
struct.pack_into('I', config_buffer, 128, 50)
|
||||
context_buffer = ctypes.create_string_buffer(512)
|
||||
|
||||
if header:
|
||||
frame_header = read_pipe_buffer(pipe_handle, 12)
|
||||
if vpx_library.vpx_codec_enc_init_ver(context_buffer, ctypes.byref(vp8_iface), config_buffer, 0, 37) == 0:
|
||||
vpx_library.vpx_codec_control_(context_buffer, 13, ctypes.c_int(16))
|
||||
vpx_library.vpx_codec_control_(context_buffer, 12, ctypes.c_int(3))
|
||||
vpx_library.vpx_codec_control_(context_buffer, 27, ctypes.c_int(10))
|
||||
return context_buffer
|
||||
|
||||
while frame_header:
|
||||
frame_size = int.from_bytes(frame_header[0:4], 'little')
|
||||
frame_data = read_pipe_buffer(pipe_handle, frame_size)
|
||||
|
||||
if frame_data:
|
||||
rtc_store.send_rtc_frame(session_id, frame_data)
|
||||
|
||||
frame_header = read_pipe_buffer(pipe_handle, 12)
|
||||
return None
|
||||
|
||||
|
||||
def submit_encoder_frame(encoder : subprocess.Popen[bytes], vision_frame_deque : deque[VisionFrame]) -> None:
|
||||
output_vision_frame = process_vision_frame(vision_frame_deque[-1])
|
||||
encoder.stdin.write(cv2.cvtColor(output_vision_frame, cv2.COLOR_BGR2RGB).tobytes())
|
||||
encoder.stdin.flush()
|
||||
# TODO: move to facefusion/vpx_encoder.py
|
||||
def encode_vpx(codec_context : ctypes.Array[ctypes.c_char], yuv_buffer : bytes, width : int, height : int, pts : int, flags : int) -> bytes:
|
||||
vpx_library = vpx_module.create_static_library()
|
||||
frame_buffer = b''
|
||||
|
||||
if vpx_library:
|
||||
image_buffer = ctypes.create_string_buffer(512)
|
||||
yuv_string_buffer = ctypes.create_string_buffer(yuv_buffer)
|
||||
|
||||
if vpx_library.vpx_img_wrap(image_buffer, 0x102, width, height, 1, yuv_string_buffer):
|
||||
if vpx_library.vpx_codec_encode(codec_context, image_buffer, pts, 1, flags, 1) == 0:
|
||||
iterator = ctypes.c_void_p(0)
|
||||
packet = vpx_library.vpx_codec_get_cx_data(codec_context, ctypes.byref(iterator))
|
||||
|
||||
while packet:
|
||||
if ctypes.c_int.from_address(packet).value == 0:
|
||||
buffer_pointer = ctypes.c_void_p.from_address(packet + 8).value
|
||||
buffer_size = ctypes.c_size_t.from_address(packet + 16).value
|
||||
frame_buffer += ctypes.string_at(buffer_pointer, buffer_size)
|
||||
|
||||
packet = vpx_library.vpx_codec_get_cx_data(codec_context, ctypes.byref(iterator))
|
||||
|
||||
return frame_buffer
|
||||
|
||||
|
||||
def run_encode_loop(encoder : subprocess.Popen[bytes], vision_frame_deque : deque[VisionFrame]) -> None:
|
||||
# TODO: move to facefusion/vpx_encoder.py
|
||||
def destroy_vpx_encoder(codec_context : ctypes.Array[ctypes.c_char]) -> None:
|
||||
vpx_library = vpx_module.create_static_library()
|
||||
|
||||
if vpx_library:
|
||||
vpx_library.vpx_codec_destroy(codec_context)
|
||||
|
||||
|
||||
# TODO: move to facefusion/opus_encoder.py
|
||||
def create_opus_encoder(sample_rate : int, channels : int) -> Optional[ctypes.c_void_p]:
|
||||
opus_library = opus_module.create_static_library()
|
||||
|
||||
if opus_library:
|
||||
error = ctypes.c_int(0)
|
||||
encoder = opus_library.opus_encoder_create(sample_rate, channels, 2049, ctypes.byref(error))
|
||||
|
||||
if error.value == 0:
|
||||
opus_library.opus_encoder_ctl(encoder, 4002, 64000)
|
||||
return encoder
|
||||
|
||||
return None
|
||||
|
||||
|
||||
# TODO: move to facefusion/opus_encoder.py
|
||||
def encode_opus(opus_encoder : ctypes.c_void_p, pcm_pointer : ctypes.c_void_p, frame_size : int) -> bytes:
|
||||
opus_library = opus_module.create_static_library()
|
||||
audio_buffer = b''
|
||||
|
||||
if opus_library:
|
||||
output_buffer = ctypes.create_string_buffer(4000)
|
||||
encoded_length = opus_library.opus_encode_float(opus_encoder, pcm_pointer, frame_size, output_buffer, 4000)
|
||||
|
||||
if encoded_length > 0:
|
||||
audio_buffer = output_buffer.raw[:encoded_length]
|
||||
|
||||
return audio_buffer
|
||||
|
||||
|
||||
# TODO: move to facefusion/opus_encoder.py
|
||||
def destroy_opus_encoder(opus_encoder : ctypes.c_void_p) -> None:
|
||||
opus_library = opus_module.create_static_library()
|
||||
|
||||
if opus_library:
|
||||
opus_library.opus_encoder_destroy(opus_encoder)
|
||||
|
||||
|
||||
# TODO: move to facefusion/vpx_encoder.py, throttle loop to avoid spinning on same frame
|
||||
def run_video_encode_loop(vision_frame_deque : deque[VisionFrame], session_id : SessionId, initial_resolution : Resolution, keyframe_interval : int) -> None:
|
||||
codec_context = create_vpx_encoder(initial_resolution[0], initial_resolution[1], 4500)
|
||||
current_resolution = initial_resolution
|
||||
pts = 0
|
||||
|
||||
while vision_frame_deque:
|
||||
submit_encoder_frame(encoder, vision_frame_deque)
|
||||
vision_frame = vision_frame_deque[-1]
|
||||
output_frame = process_vision_frame(vision_frame)
|
||||
height, width = output_frame.shape[:2]
|
||||
frame_resolution = (width, height)
|
||||
|
||||
encoder.stdin.close()
|
||||
encoder.wait()
|
||||
if frame_resolution[0] != current_resolution[0] or frame_resolution[1] != current_resolution[1]:
|
||||
if codec_context:
|
||||
destroy_vpx_encoder(codec_context)
|
||||
|
||||
current_resolution = frame_resolution
|
||||
codec_context = create_vpx_encoder(current_resolution[0], current_resolution[1], 4500)
|
||||
pts = 0
|
||||
|
||||
if codec_context:
|
||||
yuv_frame = cv2.cvtColor(output_frame, cv2.COLOR_BGR2YUV_I420)
|
||||
vpx_flags = 0
|
||||
|
||||
if pts % keyframe_interval == 0:
|
||||
vpx_flags = 1
|
||||
|
||||
frame_buffer = encode_vpx(codec_context, yuv_frame.tobytes(), width, height, pts, vpx_flags)
|
||||
|
||||
if frame_buffer:
|
||||
rtc_store.send_rtc_video(session_id, frame_buffer)
|
||||
|
||||
pts += 1
|
||||
|
||||
if codec_context:
|
||||
destroy_vpx_encoder(codec_context)
|
||||
|
||||
|
||||
# TODO: move to facefusion/opus_encoder.py
|
||||
def encode_audio_chunk(opus_encoder : ctypes.c_void_p, session_id : SessionId, pcm_data : numpy.ndarray, audio_remainder : numpy.ndarray, audio_pts : int) -> Tuple[numpy.ndarray, int]:
|
||||
pcm_buffer = numpy.concatenate([audio_remainder, pcm_data])
|
||||
frame_samples = 1920
|
||||
|
||||
while len(pcm_buffer) >= frame_samples:
|
||||
chunk = pcm_buffer[:frame_samples]
|
||||
pcm_buffer = pcm_buffer[frame_samples:]
|
||||
pcm_pointer = ctypes.cast(chunk.ctypes.data, ctypes.c_void_p)
|
||||
audio_buffer = encode_opus(opus_encoder, pcm_pointer, 960)
|
||||
|
||||
if audio_buffer:
|
||||
rtc_store.send_rtc_audio(session_id, audio_buffer, audio_pts)
|
||||
|
||||
audio_pts += 960
|
||||
|
||||
return pcm_buffer, audio_pts
|
||||
|
||||
|
||||
# TODO: extract shared session setup from handle_image_stream and handle_video_stream, guard session_id like handle_video_stream
|
||||
async def handle_image_stream(websocket : WebSocket) -> None:
|
||||
subprotocol = get_sec_websocket_protocol(websocket.scope)
|
||||
access_token = extract_access_token(websocket.scope)
|
||||
@@ -125,30 +235,45 @@ async def handle_video_stream(websocket : WebSocket) -> None:
|
||||
await websocket.accept(subprotocol = subprotocol)
|
||||
|
||||
if session_id and source_paths:
|
||||
output_video_fps = int(state_manager.get_item('output_video_fps') or 30) # TODO: resolve from target video fps
|
||||
vision_frames = receive_vision_frames(websocket)
|
||||
vision_frame = await anext(vision_frames, None)
|
||||
stream_frames = receive_stream_frames(websocket)
|
||||
first_frame_type, first_frame_buffer = await anext(stream_frames, (0, b''))
|
||||
first_vision_frame = None
|
||||
|
||||
if numpy.any(vision_frame):
|
||||
resolution = (vision_frame.shape[1], vision_frame.shape[0])
|
||||
encoder = spawn_stream(resolution, output_video_fps, calculate_bitrate(resolution), calculate_buffer_size(resolution))
|
||||
if first_frame_type == 1:
|
||||
first_vision_frame = cv2.imdecode(numpy.frombuffer(first_frame_buffer, numpy.uint8), cv2.IMREAD_COLOR)
|
||||
|
||||
if numpy.any(first_vision_frame):
|
||||
resolution : Resolution = (first_vision_frame.shape[1], first_vision_frame.shape[0])
|
||||
keyframe_interval = int(state_manager.get_item('output_video_fps') or 30) # TODO: remove hardcoded via stream_video_fps
|
||||
vision_frame_deque : deque[VisionFrame] = deque(maxlen = 1)
|
||||
opus_encoder = create_opus_encoder(48000, 2) # TODO: guard against opus_encoder being None
|
||||
audio_remainder = numpy.array([], dtype = numpy.float32)
|
||||
audio_pts = 0
|
||||
|
||||
vision_frame_deque.append(vision_frame)
|
||||
vision_frame_deque.append(first_vision_frame)
|
||||
rtc_store.create_rtc_stream(session_id)
|
||||
|
||||
event_loop = asyncio.get_running_loop()
|
||||
await event_loop.run_in_executor(None, submit_encoder_frame, encoder, vision_frame_deque)
|
||||
video_encode_task = event_loop.run_in_executor(None, run_video_encode_loop, vision_frame_deque, session_id, resolution, keyframe_interval)
|
||||
await websocket.send_text('ready')
|
||||
encode_task = event_loop.run_in_executor(None, run_encode_loop, encoder, vision_frame_deque)
|
||||
rtc_task = event_loop.run_in_executor(None, forward_rtc_frames, encoder, session_id)
|
||||
|
||||
async for vision_frame in vision_frames:
|
||||
vision_frame_deque.append(vision_frame)
|
||||
async for frame_type, frame_buffer in stream_frames:
|
||||
if frame_type == 1:
|
||||
vision_frame = cv2.imdecode(numpy.frombuffer(frame_buffer, numpy.uint8), cv2.IMREAD_COLOR)
|
||||
|
||||
if numpy.any(vision_frame):
|
||||
vision_frame_deque.append(vision_frame)
|
||||
|
||||
if frame_type == 2:
|
||||
pcm_data = numpy.frombuffer(frame_buffer, dtype = numpy.float32)
|
||||
audio_remainder, audio_pts = encode_audio_chunk(opus_encoder, session_id, pcm_data, audio_remainder, audio_pts)
|
||||
|
||||
vision_frame_deque.clear()
|
||||
await asyncio.gather(encode_task, rtc_task)
|
||||
await video_encode_task
|
||||
|
||||
if opus_encoder:
|
||||
destroy_opus_encoder(opus_encoder)
|
||||
|
||||
rtc_store.destroy_rtc_stream(session_id)
|
||||
|
||||
if websocket.client_state == WebSocketState.CONNECTED:
|
||||
|
||||
@@ -355,26 +355,6 @@ def sanitize_video(file_content : bytes, asset_path : str, security_strategy : A
|
||||
return run_ffmpeg_with_pipe(commands, file_content).returncode == 0
|
||||
|
||||
|
||||
def spawn_stream(resolution : Resolution, stream_fps : int, stream_bitrate : int, stream_bufsize : int) -> subprocess.Popen[bytes]:
|
||||
commands = ffmpeg_builder.chain(
|
||||
ffmpeg_builder.use_wallclock(),
|
||||
ffmpeg_builder.capture_video(),
|
||||
ffmpeg_builder.set_media_resolution(pack_resolution(resolution)),
|
||||
ffmpeg_builder.set_input_fps(stream_fps),
|
||||
ffmpeg_builder.set_input('-'),
|
||||
ffmpeg_builder.set_video_encoder('libvpx'), # TODO: replace hardcoded value
|
||||
ffmpeg_builder.set_encoder_deadline('realtime'),
|
||||
ffmpeg_builder.enforce_pixel_format('rgb24'), # TODO: replace hardcoded value
|
||||
ffmpeg_builder.set_stream_quality(stream_bitrate),
|
||||
ffmpeg_builder.set_video_bufsize(stream_bufsize),
|
||||
ffmpeg_builder.set_stream_keyframe(stream_fps),
|
||||
ffmpeg_builder.set_muxer('ivf'), # TODO: replace hardcoded value
|
||||
ffmpeg_builder.set_output('-')
|
||||
)
|
||||
commands = ffmpeg_builder.run(commands)
|
||||
return subprocess.Popen(commands, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
|
||||
|
||||
|
||||
def fix_audio_encoder(video_format : VideoFormat, audio_encoder : AudioEncoder) -> AudioEncoder:
|
||||
if video_format == 'avi' and audio_encoder == 'libopus':
|
||||
return 'aac'
|
||||
|
||||
@@ -5,7 +5,7 @@ from typing import List, Optional
|
||||
import numpy
|
||||
|
||||
from facefusion.filesystem import get_file_format
|
||||
from facefusion.types import AudioEncoder, Command, CommandSet, Duration, EncoderDeadline, Fps, Muxer, SampleRate, StreamMode, VideoEncoder, VideoPreset
|
||||
from facefusion.types import AudioEncoder, Command, CommandSet, Duration, Fps, SampleRate, StreamMode, VideoEncoder, VideoPreset
|
||||
|
||||
|
||||
def run(commands : List[Command]) -> List[Command]:
|
||||
@@ -291,25 +291,3 @@ def map_qsv_preset(video_preset : VideoPreset) -> Optional[str]:
|
||||
return None
|
||||
|
||||
|
||||
def use_wallclock() -> List[Command]:
|
||||
return [ '-use_wallclock_as_timestamps', '1' ]
|
||||
|
||||
|
||||
def set_stream_keyframe(interval : int) -> List[Command]:
|
||||
return [ '-g', str(interval), '-keyint_min', str(interval) ]
|
||||
|
||||
|
||||
def set_muxer(muxer : Muxer) -> List[Command]:
|
||||
return [ '-f', muxer ]
|
||||
|
||||
|
||||
def set_video_bufsize(video_bufsize : int) -> List[Command]:
|
||||
return [ '-bufsize', str(video_bufsize) + 'k' ]
|
||||
|
||||
|
||||
def set_encoder_deadline(deadline : EncoderDeadline) -> List[Command]:
|
||||
return [ '-deadline', deadline ]
|
||||
|
||||
|
||||
def set_lag_in_frames(count : int) -> List[Command]:
|
||||
return [ '-lag-in-frames', str(count) ]
|
||||
|
||||
@@ -1,25 +1,27 @@
|
||||
import ctypes
|
||||
import os
|
||||
from functools import lru_cache
|
||||
from typing import Dict, Optional
|
||||
from typing import Dict, Optional, Tuple
|
||||
|
||||
from facefusion.common_helper import is_linux, is_macos, is_windows
|
||||
from facefusion.download import conditional_download_hashes, conditional_download_sources, resolve_download_url_by_provider
|
||||
from facefusion.filesystem import resolve_relative_path
|
||||
from facefusion.types import DownloadSet
|
||||
|
||||
|
||||
def resolve_library_file() -> Optional[str]:
|
||||
def resolve_library_paths() -> Optional[Tuple[str, str]]:
|
||||
if is_linux():
|
||||
return 'linux-x64-openssl-h264-vp8-av1-opus-libdatachannel-0.24.1.so'
|
||||
return 'linux/libdatachannel.hash', 'linux/libdatachannel.so'
|
||||
if is_macos():
|
||||
return 'macos-universal-openssl-h264-vp8-av1-opus-libdatachannel-0.24.1.dylib'
|
||||
return 'macos/libdatachannel.hash', 'macos/libdatachannel.dylib'
|
||||
if is_windows():
|
||||
return 'windows-x64-openssl-h264-vp8-av1-opus-datachannel-0.24.1.dll'
|
||||
return 'windows/datachannel.hash', 'windows/datachannel.dll'
|
||||
return None
|
||||
|
||||
|
||||
@lru_cache
|
||||
def create_static_library_set() -> Dict[str, DownloadSet]:
|
||||
library_file = resolve_library_file()
|
||||
library_hash_path, library_source_path = resolve_library_paths()
|
||||
|
||||
return\
|
||||
{
|
||||
@@ -27,21 +29,28 @@ def create_static_library_set() -> Dict[str, DownloadSet]:
|
||||
{
|
||||
'datachannel':
|
||||
{
|
||||
'url': 'https://huggingface.co/bluefoxcreation/libdatachannel/resolve/main/linux-x64-openssl-h264-vp8-av1-opus-libdatachannel-0.24.1.so.hash',
|
||||
'path': resolve_relative_path('../.binaries/' + library_file + '.hash')
|
||||
'url': resolve_download_url_by_provider('huggingface', 'libraries-4.0.0', library_hash_path),
|
||||
'path': resolve_relative_path('../.libraries/' + os.path.basename(library_hash_path))
|
||||
}
|
||||
},
|
||||
'sources':
|
||||
{
|
||||
'datachannel':
|
||||
{
|
||||
'url': 'https://huggingface.co/bluefoxcreation/libdatachannel/resolve/main/linux-x64-openssl-h264-vp8-av1-opus-libdatachannel-0.24.1.so',
|
||||
'path': resolve_relative_path('../.binaries/' + library_file)
|
||||
'url': resolve_download_url_by_provider('huggingface', 'libraries-4.0.0', library_source_path),
|
||||
'path': resolve_relative_path('../.libraries/' + os.path.basename(library_source_path))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def pre_check() -> bool:
|
||||
library_hash_set = create_static_library_set().get('hashes')
|
||||
library_source_set = create_static_library_set().get('sources')
|
||||
|
||||
return conditional_download_hashes(library_hash_set) and conditional_download_sources(library_source_set)
|
||||
|
||||
|
||||
@lru_cache
|
||||
def create_static_library() -> Optional[ctypes.CDLL]:
|
||||
library_path = create_static_library_set().get('sources').get('datachannel').get('path')
|
||||
|
||||
+49
-22
@@ -1,21 +1,13 @@
|
||||
import ctypes
|
||||
import threading
|
||||
import time
|
||||
from typing import List, Optional
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
from facefusion.download import conditional_download_hashes, conditional_download_sources
|
||||
from facefusion.libraries import datachannel as datachannel_module
|
||||
from facefusion.types import MediaDirection, PeerConnection, RtcAudioTrack, RtcPeer, RtcVideoTrack, SdpAnswer, SdpOffer
|
||||
|
||||
|
||||
def pre_check() -> bool:
|
||||
download_set = datachannel_module.create_static_library_set()
|
||||
|
||||
if not conditional_download_hashes(download_set.get('hashes')):
|
||||
return False
|
||||
return conditional_download_sources(download_set.get('sources'))
|
||||
|
||||
|
||||
# TODO: reduce to only used params
|
||||
def create_peer_connection(
|
||||
ice_servers : Optional[ctypes.Array[ctypes.c_char_p]] = None,
|
||||
ice_servers_count : int = 0, proxy_server : Optional[bytes] = None,
|
||||
@@ -52,27 +44,42 @@ def create_peer_connection(
|
||||
|
||||
|
||||
def build_media_description(media_type : str, payload_type : int, rtp_codec : str, media_direction : MediaDirection, media_id : int) -> bytes:
|
||||
return '\r\n'.join(
|
||||
lines =\
|
||||
[
|
||||
'm=' + media_type + ' 9 UDP/TLS/RTP/SAVPF ' + str(payload_type),
|
||||
'a=rtpmap:' + str(payload_type) + ' ' + rtp_codec,
|
||||
'a=rtcp-fb:' + str(payload_type) + ' nack',
|
||||
'a=rtcp-fb:' + str(payload_type) + ' nack pli',
|
||||
'a=' + media_direction,
|
||||
'a=mid:' + str(media_id),
|
||||
'a=rtcp-mux',
|
||||
''
|
||||
]).encode()
|
||||
]
|
||||
return '\r\n'.join(lines).encode()
|
||||
|
||||
|
||||
def add_audio_track(peer_connection : PeerConnection, media_direction : MediaDirection) -> RtcAudioTrack:
|
||||
def parse_sdp_payload_types(sdp_offer : SdpOffer) -> Dict[str, int]:
|
||||
payload_types : Dict[str, int] = {}
|
||||
|
||||
for line in sdp_offer.splitlines():
|
||||
if line.startswith('a=rtpmap:') and 'VP8/90000' in line:
|
||||
payload_types['vp8'] = int(line.split(':')[1].split(' ')[0])
|
||||
if line.startswith('a=rtpmap:') and 'opus/48000/2' in line:
|
||||
payload_types['opus'] = int(line.split(':')[1].split(' ')[0])
|
||||
|
||||
return payload_types
|
||||
|
||||
|
||||
def add_audio_track(peer_connection : PeerConnection, media_direction : MediaDirection, payload_type : int) -> RtcAudioTrack:
|
||||
datachannel_library = datachannel_module.create_static_library()
|
||||
media_description = build_media_description('audio', 111, 'opus/48000/2', media_direction, 1)
|
||||
media_description = build_media_description('audio', payload_type, 'opus/48000/2', media_direction, 1)
|
||||
|
||||
audio_track = datachannel_library.rtcAddTrack(peer_connection, media_description)
|
||||
|
||||
audio_packetizer = datachannel_module.define_rtc_packetizer_init()
|
||||
audio_packetizer.ssrc = 43
|
||||
audio_packetizer.cname = b'audio'
|
||||
audio_packetizer.payloadType = 111
|
||||
audio_packetizer.payloadType = payload_type
|
||||
audio_packetizer.clockRate = 48000
|
||||
|
||||
datachannel_library.rtcSetOpusPacketizer(audio_track, ctypes.byref(audio_packetizer))
|
||||
@@ -81,16 +88,16 @@ def add_audio_track(peer_connection : PeerConnection, media_direction : MediaDir
|
||||
return audio_track
|
||||
|
||||
|
||||
def add_video_track(peer_connection : PeerConnection, media_direction : MediaDirection) -> RtcVideoTrack:
|
||||
def add_video_track(peer_connection : PeerConnection, media_direction : MediaDirection, payload_type : int) -> RtcVideoTrack:
|
||||
datachannel_library = datachannel_module.create_static_library()
|
||||
media_description = build_media_description('video', 96, 'VP8/90000', media_direction, 0)
|
||||
media_description = build_media_description('video', payload_type, 'VP8/90000', media_direction, 0)
|
||||
|
||||
video_track = datachannel_library.rtcAddTrack(peer_connection, media_description)
|
||||
|
||||
video_packetizer = datachannel_module.define_rtc_packetizer_init()
|
||||
video_packetizer.ssrc = 42
|
||||
video_packetizer.cname = b'video'
|
||||
video_packetizer.payloadType = 96
|
||||
video_packetizer.payloadType = payload_type
|
||||
video_packetizer.clockRate = 90000
|
||||
video_packetizer.maxFragmentSize = 1200
|
||||
|
||||
@@ -118,6 +125,7 @@ def on_sdp_ready(peer_connection : int, sdp : Optional[bytes], sdp_type : int, u
|
||||
ctypes.cast(user_pointer, ctypes.py_object).value.set()
|
||||
|
||||
|
||||
# TODO: unused callback, remove or wire up
|
||||
@ctypes.CFUNCTYPE(None, ctypes.c_int, ctypes.c_int, ctypes.c_void_p)
|
||||
def on_ice_complete(peer_connection : int, state : int, user_pointer : Optional[int]) -> None:
|
||||
if state == 2:
|
||||
@@ -125,6 +133,7 @@ def on_ice_complete(peer_connection : int, state : int, user_pointer : Optional[
|
||||
context['event'].set()
|
||||
|
||||
|
||||
# TODO: sanitize sdp_offer, wrap in run_in_executor, track peer connection state
|
||||
def negotiate_sdp(peer_connection : PeerConnection, sdp_offer : SdpOffer) -> Optional[SdpAnswer]:
|
||||
datachannel_library = datachannel_module.create_static_library()
|
||||
sdp_event = threading.Event()
|
||||
@@ -144,20 +153,38 @@ def negotiate_sdp(peer_connection : PeerConnection, sdp_offer : SdpOffer) -> Opt
|
||||
return None
|
||||
|
||||
|
||||
def send_to_peers(rtc_peers : List[RtcPeer], data : bytes) -> None:
|
||||
def send_video_to_peers(rtc_peers : List[RtcPeer], frame_buffer : bytes) -> None:
|
||||
datachannel_library = datachannel_module.create_static_library()
|
||||
|
||||
if rtc_peers:
|
||||
timestamp = int(time.monotonic() * 90000) & 0xFFFFFFFF
|
||||
data_buffer = ctypes.create_string_buffer(data)
|
||||
data_total = len(data)
|
||||
send_buffer = ctypes.create_string_buffer(frame_buffer)
|
||||
send_total = len(frame_buffer)
|
||||
|
||||
for rtc_peer in rtc_peers:
|
||||
video_track_id = rtc_peer.get('video_track')
|
||||
|
||||
if video_track_id and datachannel_library.rtcIsOpen(video_track_id):
|
||||
datachannel_library.rtcSetTrackRtpTimestamp(video_track_id, timestamp)
|
||||
datachannel_library.rtcSendMessage(video_track_id, data_buffer, data_total)
|
||||
datachannel_library.rtcSendMessage(video_track_id, send_buffer, send_total)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def send_audio_to_peers(rtc_peers : List[RtcPeer], audio_buffer : bytes, audio_pts : int) -> None:
|
||||
datachannel_library = datachannel_module.create_static_library()
|
||||
|
||||
if rtc_peers:
|
||||
timestamp = audio_pts & 0xFFFFFFFF
|
||||
send_buffer = ctypes.create_string_buffer(audio_buffer)
|
||||
send_total = len(audio_buffer)
|
||||
|
||||
for rtc_peer in rtc_peers:
|
||||
audio_track_id = rtc_peer.get('audio_track')
|
||||
|
||||
if audio_track_id and datachannel_library.rtcIsOpen(audio_track_id):
|
||||
datachannel_library.rtcSetTrackRtpTimestamp(audio_track_id, timestamp)
|
||||
datachannel_library.rtcSendMessage(audio_track_id, send_buffer, send_total)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
+14
-4
@@ -21,11 +21,13 @@ def destroy_rtc_stream(session_id : SessionId) -> None:
|
||||
rtc.delete_peers(rtc_peers)
|
||||
|
||||
|
||||
# TODO: clean up peer connection on failed sdp negotiation, wrap in run_in_executor to avoid blocking async event loop
|
||||
def add_rtc_viewer(session_id : SessionId, sdp_offer : SdpOffer) -> Optional[SdpAnswer]:
|
||||
if session_id in RTC_STREAMS:
|
||||
payload_types = rtc.parse_sdp_payload_types(sdp_offer)
|
||||
peer_connection : PeerConnection = rtc.create_peer_connection()
|
||||
audio_track : RtcAudioTrack = rtc.add_audio_track(peer_connection, 'sendonly')
|
||||
video_track : RtcVideoTrack = rtc.add_video_track(peer_connection, 'sendonly')
|
||||
audio_track : RtcAudioTrack = rtc.add_audio_track(peer_connection, 'sendonly', payload_types.get('opus', 111))
|
||||
video_track : RtcVideoTrack = rtc.add_video_track(peer_connection, 'sendonly', payload_types.get('vp8', 96))
|
||||
local_sdp = rtc.negotiate_sdp(peer_connection, sdp_offer)
|
||||
|
||||
if local_sdp:
|
||||
@@ -42,8 +44,16 @@ def add_rtc_viewer(session_id : SessionId, sdp_offer : SdpOffer) -> Optional[Sdp
|
||||
return None
|
||||
|
||||
|
||||
def send_rtc_frame(session_id : SessionId, frame_data : bytes) -> None:
|
||||
# TODO: detect and remove dead peers
|
||||
def send_rtc_video(session_id : SessionId, frame_buffer : bytes) -> None:
|
||||
rtc_peers = get_rtc_stream(session_id)
|
||||
|
||||
if rtc_peers:
|
||||
rtc.send_to_peers(rtc_peers, frame_data)
|
||||
rtc.send_video_to_peers(rtc_peers, frame_buffer)
|
||||
|
||||
|
||||
def send_rtc_audio(session_id : SessionId, audio_buffer : bytes, audio_pts : int) -> None:
|
||||
rtc_peers = get_rtc_stream(session_id)
|
||||
|
||||
if rtc_peers:
|
||||
rtc.send_audio_to_peers(rtc_peers, audio_buffer, audio_pts)
|
||||
|
||||
@@ -169,8 +169,6 @@ TempFrameFormat = Literal['bmp', 'jpeg', 'png', 'tiff']
|
||||
AudioEncoder = Literal['flac', 'aac', 'libmp3lame', 'libopus', 'libvorbis', 'pcm_s16le', 'pcm_s32le']
|
||||
ImageEncoder = Literal['bmp', 'mjpeg', 'png', 'tiff', 'libwebp']
|
||||
VideoEncoder = Literal['libx264', 'libx264rgb', 'libx265', 'libvpx-vp9', 'h264_nvenc', 'hevc_nvenc', 'h264_amf', 'hevc_amf', 'h264_qsv', 'hevc_qsv', 'h264_videotoolbox', 'hevc_videotoolbox', 'rawvideo']
|
||||
EncoderDeadline = Literal['best', 'good', 'realtime']
|
||||
Muxer : TypeAlias = str
|
||||
AudioSet : TypeAlias = Dict[AudioFormat, str]
|
||||
ImageSet : TypeAlias = Dict[ImageFormat, str]
|
||||
VideoSet : TypeAlias = Dict[VideoFormat, str]
|
||||
@@ -264,14 +262,6 @@ BenchmarkCycleSet = TypedDict('BenchmarkCycleSet',
|
||||
|
||||
WebcamMode = Literal['inline', 'udp', 'v4l2']
|
||||
StreamMode = Literal['udp', 'v4l2']
|
||||
WebSocketStreamMode = Literal['image', 'video']
|
||||
|
||||
RtcOfferSet = TypedDict('RtcOfferSet',
|
||||
{
|
||||
'sdp': str,
|
||||
'type': str
|
||||
})
|
||||
|
||||
RtcVideoTrack : TypeAlias = int
|
||||
RtcAudioTrack : TypeAlias = int
|
||||
PeerConnection : TypeAlias = int
|
||||
|
||||
@@ -10,6 +10,7 @@ from facefusion.libraries import datachannel as datachannel_module
|
||||
from facefusion.types import SdpOffer
|
||||
|
||||
|
||||
# TODO: remove, use rtc.create_sdp with recvonly tracks instead
|
||||
def create_sdp_offer() -> Optional[SdpOffer]:
|
||||
datachannel_library = datachannel_module.create_static_library()
|
||||
peer_connection = rtc.create_peer_connection(disable_auto_negotiation = True)
|
||||
@@ -34,9 +35,10 @@ def create_sdp_offer() -> Optional[SdpOffer]:
|
||||
return None
|
||||
|
||||
|
||||
# TODO: remove, inline into test_api_stream.py
|
||||
def open_websocket_stream(test_client : TestClient, subprotocols : list[str], source_content : bytes, ready_event : threading.Event, stop_event : threading.Event) -> None:
|
||||
with test_client.websocket_connect('/stream', subprotocols = subprotocols) as websocket:
|
||||
websocket.send_bytes(source_content)
|
||||
with test_client.websocket_connect('/stream?mode=video', subprotocols = subprotocols) as websocket:
|
||||
websocket.send_bytes(b'\x01' + source_content)
|
||||
websocket.receive_text()
|
||||
ready_event.set()
|
||||
stop_event.wait()
|
||||
stop_event.wait(timeout = 15)
|
||||
|
||||
@@ -23,12 +23,6 @@ def before_all() -> None:
|
||||
subprocess.run([ 'ffmpeg', '-i', get_test_example_file('target-240p.mp4'), '-vframes', '1', get_test_example_file('target-240p.jpg') ])
|
||||
|
||||
|
||||
@pytest.fixture(scope = 'module')
|
||||
def test_client() -> Iterator[TestClient]:
|
||||
with TestClient(create_api()) as test_client:
|
||||
yield test_client
|
||||
|
||||
|
||||
@pytest.fixture(scope = 'function', autouse = True)
|
||||
def before_each() -> None:
|
||||
state_manager.init_item('temp_path', tempfile.gettempdir())
|
||||
@@ -37,6 +31,12 @@ def before_each() -> None:
|
||||
asset_store.clear()
|
||||
|
||||
|
||||
@pytest.fixture(scope = 'module')
|
||||
def test_client() -> Iterator[TestClient]:
|
||||
with TestClient(create_api()) as test_client:
|
||||
yield test_client
|
||||
|
||||
|
||||
def test_upload_asset(test_client : TestClient) -> None:
|
||||
upload_response = test_client.post('/assets?type=source')
|
||||
|
||||
|
||||
@@ -8,8 +8,8 @@ from facefusion import capability_store, session_manager
|
||||
from facefusion.apis.core import create_api
|
||||
|
||||
|
||||
@pytest.fixture(scope = 'module')
|
||||
def test_client() -> Iterator[TestClient]:
|
||||
@pytest.fixture(scope = 'module', autouse = True)
|
||||
def before_all() -> None:
|
||||
program = ArgumentParser()
|
||||
capability_store.register_capability_set(
|
||||
[
|
||||
@@ -31,15 +31,18 @@ def test_client() -> Iterator[TestClient]:
|
||||
scopes = [ 'api' ]
|
||||
)
|
||||
|
||||
with TestClient(create_api()) as test_client:
|
||||
yield test_client
|
||||
|
||||
|
||||
@pytest.fixture(scope = 'function', autouse = True)
|
||||
def before_each() -> None:
|
||||
session_manager.SESSIONS.clear()
|
||||
|
||||
|
||||
@pytest.fixture(scope = 'module')
|
||||
def test_client() -> Iterator[TestClient]:
|
||||
with TestClient(create_api()) as test_client:
|
||||
yield test_client
|
||||
|
||||
|
||||
def test_get_capabilities(test_client : TestClient) -> None:
|
||||
capabilities_response = test_client.get('/capabilities')
|
||||
capabilities_body = capabilities_response.json()
|
||||
|
||||
@@ -8,17 +8,17 @@ from facefusion import metadata, session_manager
|
||||
from facefusion.apis.core import create_api
|
||||
|
||||
|
||||
@pytest.fixture(scope = 'function', autouse = True)
|
||||
def before_each() -> None:
|
||||
session_manager.SESSIONS.clear()
|
||||
|
||||
|
||||
@pytest.fixture(scope = 'module')
|
||||
def test_client() -> Iterator[TestClient]:
|
||||
with TestClient(create_api()) as test_client:
|
||||
yield test_client
|
||||
|
||||
|
||||
@pytest.fixture(scope = 'function', autouse = True)
|
||||
def before_each() -> None:
|
||||
session_manager.SESSIONS.clear()
|
||||
|
||||
|
||||
@pytest.fixture(scope = 'function', autouse = True)
|
||||
def mock_detect_execution_devices(mocker : MockerFixture) -> None:
|
||||
mocker.patch('facefusion.system.state_manager.get_temp_path', return_value = '/tmp')
|
||||
|
||||
@@ -7,17 +7,17 @@ from facefusion import metadata, session_manager
|
||||
from facefusion.apis.core import create_api
|
||||
|
||||
|
||||
@pytest.fixture(scope = 'function', autouse = True)
|
||||
def before_each() -> None:
|
||||
session_manager.SESSIONS.clear()
|
||||
|
||||
|
||||
@pytest.fixture(scope = 'module')
|
||||
def test_client() -> Iterator[TestClient]:
|
||||
with TestClient(create_api()) as test_client:
|
||||
yield test_client
|
||||
|
||||
|
||||
@pytest.fixture(scope = 'function', autouse = True)
|
||||
def before_each() -> None:
|
||||
session_manager.SESSIONS.clear()
|
||||
|
||||
|
||||
def test_ping(test_client : TestClient) -> None:
|
||||
create_session_response = test_client.post('/session', json =
|
||||
{
|
||||
|
||||
@@ -10,17 +10,17 @@ from facefusion.apis.core import create_api
|
||||
from facefusion.types import Session
|
||||
|
||||
|
||||
@pytest.fixture(scope = 'function', autouse = True)
|
||||
def before_each() -> None:
|
||||
session_manager.SESSIONS.clear()
|
||||
|
||||
|
||||
@pytest.fixture(scope = 'module')
|
||||
def test_client() -> Iterator[TestClient]:
|
||||
with TestClient(create_api()) as test_client:
|
||||
yield test_client
|
||||
|
||||
|
||||
@pytest.fixture(scope = 'function', autouse = True)
|
||||
def before_each() -> None:
|
||||
session_manager.SESSIONS.clear()
|
||||
|
||||
|
||||
def test_create_session(test_client : TestClient) -> None:
|
||||
create_session_response = test_client.post('/session', json =
|
||||
{
|
||||
|
||||
+13
-12
@@ -14,16 +14,6 @@ from .assert_helper import get_test_example_file, get_test_examples_directory
|
||||
|
||||
@pytest.fixture(scope = 'module', autouse = True)
|
||||
def before_all() -> None:
|
||||
conditional_download(get_test_examples_directory(),
|
||||
[
|
||||
'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/source.jpg',
|
||||
'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/target-240p.mp4'
|
||||
])
|
||||
subprocess.run([ 'ffmpeg', '-i', get_test_example_file('target-240p.mp4'), '-vframes', '1', get_test_example_file('target-240p.jpg') ])
|
||||
|
||||
|
||||
@pytest.fixture(scope = 'module')
|
||||
def test_client() -> Iterator[TestClient]:
|
||||
program = ArgumentParser()
|
||||
capability_store.register_capability_set(
|
||||
[
|
||||
@@ -51,10 +41,15 @@ def test_client() -> Iterator[TestClient]:
|
||||
],
|
||||
scopes = [ 'api' ]
|
||||
)
|
||||
|
||||
state_manager.init_item('execution_providers', [ 'cpu' ])
|
||||
|
||||
with TestClient(create_api()) as test_client:
|
||||
yield test_client
|
||||
conditional_download(get_test_examples_directory(),
|
||||
[
|
||||
'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/source.jpg',
|
||||
'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/target-240p.mp4'
|
||||
])
|
||||
subprocess.run([ 'ffmpeg', '-i', get_test_example_file('target-240p.mp4'), '-vframes', '1', get_test_example_file('target-240p.jpg') ])
|
||||
|
||||
|
||||
@pytest.fixture(scope = 'function', autouse = True)
|
||||
@@ -63,6 +58,12 @@ def before_each() -> None:
|
||||
asset_store.clear()
|
||||
|
||||
|
||||
@pytest.fixture(scope = 'module')
|
||||
def test_client() -> Iterator[TestClient]:
|
||||
with TestClient(create_api()) as test_client:
|
||||
yield test_client
|
||||
|
||||
|
||||
def test_get_state(test_client : TestClient) -> None:
|
||||
get_state_response = test_client.get('/state')
|
||||
|
||||
|
||||
+19
-23
@@ -12,26 +12,18 @@ from facefusion.apis import asset_store
|
||||
from facefusion.apis.core import create_api
|
||||
from facefusion.core import common_pre_check, processors_pre_check
|
||||
from facefusion.download import conditional_download
|
||||
from facefusion.libraries import datachannel as datachannel_module
|
||||
from .assert_helper import get_test_example_file, get_test_examples_directory
|
||||
from .stream_helper import create_sdp_offer, open_websocket_stream
|
||||
|
||||
|
||||
@pytest.fixture(scope = 'module', autouse = True)
|
||||
def before_all() -> None:
|
||||
conditional_download(get_test_examples_directory(),
|
||||
[
|
||||
'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/source.jpg'
|
||||
])
|
||||
|
||||
|
||||
@pytest.fixture(scope = 'module')
|
||||
def test_client() -> Iterator[TestClient]:
|
||||
state_manager.init_item('execution_device_ids', [ 0 ])
|
||||
state_manager.init_item('execution_providers', [ 'cpu' ])
|
||||
state_manager.init_item('download_providers', [ 'github', 'huggingface' ])
|
||||
state_manager.init_item('temp_path', tempfile.gettempdir())
|
||||
state_manager.init_item('processors', [ 'face_swapper' ])
|
||||
state_manager.init_item('face_selector_mode', 'many')
|
||||
state_manager.init_item('face_detector_model', 'yolo_face')
|
||||
state_manager.init_item('face_detector_size', '640x640')
|
||||
state_manager.init_item('face_detector_score', 0.5)
|
||||
@@ -44,24 +36,29 @@ def test_client() -> Iterator[TestClient]:
|
||||
state_manager.init_item('face_mask_padding', [ 0, 0, 0, 0 ])
|
||||
state_manager.init_item('face_swapper_model', 'hyperswap_1a_256')
|
||||
state_manager.init_item('face_swapper_pixel_boost', '256x256')
|
||||
state_manager.init_item('face_swapper_weight', 0.5)
|
||||
|
||||
common_pre_check()
|
||||
processors_pre_check()
|
||||
datachannel_module.pre_check()
|
||||
|
||||
with TestClient(create_api()) as test_client:
|
||||
yield test_client
|
||||
conditional_download(get_test_examples_directory(),
|
||||
[
|
||||
'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/source.jpg'
|
||||
])
|
||||
|
||||
|
||||
@pytest.fixture(scope = 'function', autouse = True)
|
||||
def before_each() -> None:
|
||||
state_manager.init_item('source_paths', None)
|
||||
session_manager.SESSIONS.clear()
|
||||
asset_store.clear()
|
||||
|
||||
|
||||
# TODO: enable again
|
||||
@pytest.mark.skip
|
||||
@pytest.fixture(scope = 'module')
|
||||
def test_client() -> Iterator[TestClient]:
|
||||
with TestClient(create_api()) as test_client:
|
||||
yield test_client
|
||||
|
||||
|
||||
def test_stream_image(test_client : TestClient) -> None:
|
||||
create_session_response = test_client.post('/session', json =
|
||||
{
|
||||
@@ -92,10 +89,9 @@ def test_stream_image(test_client : TestClient) -> None:
|
||||
|
||||
assert select_response.status_code == 200
|
||||
|
||||
with test_client.websocket_connect('/stream', subprotocols =
|
||||
with test_client.websocket_connect('/stream?mode=image', subprotocols =
|
||||
[
|
||||
'access_token.' + access_token,
|
||||
'image'
|
||||
'access_token.' + access_token
|
||||
]) as websocket:
|
||||
websocket.send_bytes(source_content)
|
||||
output_bytes = websocket.receive_bytes()
|
||||
@@ -104,8 +100,6 @@ def test_stream_image(test_client : TestClient) -> None:
|
||||
assert output_vision_frame.shape == (1024, 1024, 3)
|
||||
|
||||
|
||||
# TODO: enable again
|
||||
@pytest.mark.skip
|
||||
def test_stream_video(test_client : TestClient) -> None:
|
||||
create_session_response = test_client.post('/session', json =
|
||||
{
|
||||
@@ -137,9 +131,11 @@ def test_stream_video(test_client : TestClient) -> None:
|
||||
ready_event = threading.Event()
|
||||
stop_event = threading.Event()
|
||||
#TODO: use asyncio
|
||||
stream_thread = threading.Thread(target = open_websocket_stream, args = (test_client, [ 'access_token.' + access_token, 'video' ], source_content, ready_event, stop_event))
|
||||
stream_thread = threading.Thread(target = open_websocket_stream, args = (test_client, [ 'access_token.' + access_token ], source_content, ready_event, stop_event))
|
||||
stream_thread.start()
|
||||
ready_event.wait()
|
||||
ready_event.wait(timeout = 10)
|
||||
|
||||
assert ready_event.is_set()
|
||||
|
||||
sdp_offer = create_sdp_offer()
|
||||
stream_response = test_client.post('/stream', content = sdp_offer, headers =
|
||||
@@ -152,4 +148,4 @@ def test_stream_video(test_client : TestClient) -> None:
|
||||
assert stream_response.text
|
||||
|
||||
stop_event.set()
|
||||
stream_thread.join()
|
||||
stream_thread.join(timeout = 10)
|
||||
|
||||
+1
-23
@@ -1,5 +1,4 @@
|
||||
import os
|
||||
import struct
|
||||
import subprocess
|
||||
import tempfile
|
||||
|
||||
@@ -8,7 +7,7 @@ import pytest
|
||||
import facefusion.ffmpeg
|
||||
from facefusion import process_manager, state_manager
|
||||
from facefusion.download import conditional_download
|
||||
from facefusion.ffmpeg import concat_video, extract_frames, merge_video, read_audio_buffer, replace_audio, restore_audio, sanitize_audio, sanitize_image, sanitize_video, spawn_frames, spawn_stream
|
||||
from facefusion.ffmpeg import concat_video, extract_frames, merge_video, read_audio_buffer, replace_audio, restore_audio, sanitize_audio, sanitize_image, sanitize_video, spawn_frames
|
||||
from facefusion.ffprobe import probe_entries
|
||||
from facefusion.filesystem import copy_file, is_image
|
||||
from facefusion.temp_helper import clear_temp_directory, create_temp_directory, get_temp_file_path, resolve_temp_frame_paths
|
||||
@@ -256,24 +255,3 @@ def test_sanitize_video() -> None:
|
||||
assert probe_entries(output_paths[1], [ 'codec_name' ]).get('codec_name') == 'hevc'
|
||||
|
||||
|
||||
def test_spawn_stream() -> None: # TODO: Improve test
|
||||
test_set =\
|
||||
[
|
||||
((426, 240), 25, 400, 800),
|
||||
((640, 360), 30, 1000, 2000),
|
||||
((1280, 720), 30, 2000, 4000)
|
||||
]
|
||||
|
||||
for resolution, stream_fps, stream_bitrate, stream_bufsize in test_set:
|
||||
encoder = spawn_stream(resolution, stream_fps, stream_bitrate, stream_bufsize)
|
||||
frame_size = resolution[0] * resolution[1] * 3
|
||||
stdout, _ = encoder.communicate(input = bytes(frame_size))
|
||||
|
||||
assert len(stdout) > 32
|
||||
frame_header = stdout[:32]
|
||||
|
||||
assert frame_header[:4] == b'DKIF'
|
||||
output_width = struct.unpack_from('<H', frame_header, 12)[0]
|
||||
output_height = struct.unpack_from('<H', frame_header, 14)[0]
|
||||
|
||||
assert (output_width, output_height) == resolution
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from shutil import which
|
||||
|
||||
from facefusion import ffmpeg_builder
|
||||
from facefusion.ffmpeg_builder import capture_video, chain, concat, enforce_pixel_format, keep_video_alpha, run, select_frame_range, set_audio_quality, set_audio_sample_size, set_encoder_deadline, set_lag_in_frames, set_muxer, set_stream_keyframe, set_stream_mode, set_stream_quality, set_video_bufsize, set_video_encoder, set_video_fps, set_video_quality, use_wallclock
|
||||
from facefusion.ffmpeg_builder import capture_video, chain, concat, enforce_pixel_format, keep_video_alpha, run, select_frame_range, set_audio_quality, set_audio_sample_size, set_stream_mode, set_stream_quality, set_video_encoder, set_video_fps, set_video_quality
|
||||
|
||||
|
||||
def test_run() -> None:
|
||||
@@ -110,10 +110,6 @@ def test_set_video_quality() -> None:
|
||||
assert set_video_quality('hevc_videotoolbox', 100) == [ '-b:v', '50512k' ]
|
||||
|
||||
|
||||
def test_use_wallclock_timestamps() -> None:
|
||||
assert use_wallclock() == [ '-use_wallclock_as_timestamps', '1' ]
|
||||
|
||||
|
||||
def test_capture_video() -> None:
|
||||
assert capture_video() == [ '-f', 'rawvideo', '-pix_fmt', 'rgb24' ]
|
||||
|
||||
@@ -128,27 +124,3 @@ def test_set_stream_quality() -> None:
|
||||
assert set_stream_quality(2000) == [ '-b:v', '2000k' ]
|
||||
|
||||
|
||||
def test_set_keyframe_interval() -> None:
|
||||
assert set_stream_keyframe(30) == [ '-g', '30', '-keyint_min', '30' ]
|
||||
assert set_stream_keyframe(60) == [ '-g', '60', '-keyint_min', '60' ]
|
||||
|
||||
|
||||
def test_set_output_format() -> None:
|
||||
assert set_muxer('ivf') == [ '-f', 'ivf' ]
|
||||
assert set_muxer('mpegts') == [ '-f', 'mpegts' ]
|
||||
|
||||
|
||||
def test_set_video_bufsize() -> None:
|
||||
assert set_video_bufsize(800) == [ '-bufsize', '800k' ]
|
||||
assert set_video_bufsize(4000) == [ '-bufsize', '4000k' ]
|
||||
|
||||
|
||||
def test_set_encoder_deadline() -> None:
|
||||
assert set_encoder_deadline('best') == [ '-deadline', 'best' ]
|
||||
assert set_encoder_deadline('good') == [ '-deadline', 'good' ]
|
||||
assert set_encoder_deadline('realtime') == [ '-deadline', 'realtime' ]
|
||||
|
||||
|
||||
def test_set_lag_in_frames() -> None:
|
||||
assert set_lag_in_frames(0) == [ '-lag-in-frames', '0' ]
|
||||
assert set_lag_in_frames(16) == [ '-lag-in-frames', '16' ]
|
||||
|
||||
+10
-9
@@ -9,12 +9,13 @@ from facefusion.types import RtcPeer
|
||||
|
||||
@pytest.fixture(scope = 'module')
|
||||
def before_all() -> None:
|
||||
rtc.pre_check()
|
||||
datachannel_module.pre_check()
|
||||
|
||||
|
||||
# TODO: add test_parse_sdp_payload_types
|
||||
def test_build_media_description() -> None:
|
||||
assert rtc.build_media_description('audio', 111, 'opus/48000/2', 'sendonly', 1) == b'm=audio 9 UDP/TLS/RTP/SAVPF 111\r\na=rtpmap:111 opus/48000/2\r\na=sendonly\r\na=mid:1\r\na=rtcp-mux\r\n'
|
||||
assert rtc.build_media_description('video', 96, 'VP8/90000', 'recvonly', 0) == b'm=video 9 UDP/TLS/RTP/SAVPF 96\r\na=rtpmap:96 VP8/90000\r\na=recvonly\r\na=mid:0\r\na=rtcp-mux\r\n'
|
||||
assert rtc.build_media_description('audio', 111, 'opus/48000/2', 'sendonly', 1) == b'm=audio 9 UDP/TLS/RTP/SAVPF 111\r\na=rtpmap:111 opus/48000/2\r\na=rtcp-fb:111 nack\r\na=rtcp-fb:111 nack pli\r\na=sendonly\r\na=mid:1\r\na=rtcp-mux\r\n'
|
||||
assert rtc.build_media_description('video', 96, 'VP8/90000', 'recvonly', 0) == b'm=video 9 UDP/TLS/RTP/SAVPF 96\r\na=rtpmap:96 VP8/90000\r\na=rtcp-fb:96 nack\r\na=rtcp-fb:96 nack pli\r\na=recvonly\r\na=mid:0\r\na=rtcp-mux\r\n'
|
||||
|
||||
|
||||
# TODO: enable again
|
||||
@@ -32,7 +33,7 @@ def test_create_peer_connection() -> None:
|
||||
def test_add_audio_track() -> None:
|
||||
peer_connection = rtc.create_peer_connection()
|
||||
|
||||
assert rtc.add_audio_track(peer_connection, 'sendonly') > 0
|
||||
assert rtc.add_audio_track(peer_connection, 'sendonly', 111) > 0
|
||||
|
||||
datachannel_module.create_static_library().rtcDeletePeerConnection(peer_connection)
|
||||
|
||||
@@ -42,7 +43,7 @@ def test_add_audio_track() -> None:
|
||||
def test_add_video_track() -> None:
|
||||
peer_connection = rtc.create_peer_connection()
|
||||
|
||||
assert rtc.add_video_track(peer_connection, 'sendonly') > 0
|
||||
assert rtc.add_video_track(peer_connection, 'sendonly', 96) > 0
|
||||
|
||||
datachannel_module.create_static_library().rtcDeletePeerConnection(peer_connection)
|
||||
|
||||
@@ -53,13 +54,13 @@ def test_negotiate_sdp() -> None:
|
||||
datachannel_library = datachannel_module.create_static_library()
|
||||
|
||||
sender_connection = rtc.create_peer_connection()
|
||||
rtc.add_video_track(sender_connection, 'sendonly')
|
||||
rtc.add_audio_track(sender_connection, 'sendonly')
|
||||
rtc.add_video_track(sender_connection, 'sendonly', 96)
|
||||
rtc.add_audio_track(sender_connection, 'sendonly', 111)
|
||||
sdp_offer = rtc.create_sdp(sender_connection)
|
||||
|
||||
receiver_connection = rtc.create_peer_connection()
|
||||
rtc.add_video_track(receiver_connection, 'recvonly')
|
||||
rtc.add_audio_track(receiver_connection, 'recvonly')
|
||||
rtc.add_video_track(receiver_connection, 'recvonly', 96)
|
||||
rtc.add_audio_track(receiver_connection, 'recvonly', 111)
|
||||
sdp_answer = rtc.negotiate_sdp(receiver_connection, sdp_offer)
|
||||
|
||||
assert sdp_answer
|
||||
|
||||
@@ -0,0 +1,18 @@
|
||||
import pytest
|
||||
|
||||
from facefusion.libraries import datachannel as datachannel_module
|
||||
|
||||
|
||||
@pytest.fixture(scope = 'module')
|
||||
def before_all() -> None:
|
||||
datachannel_module.pre_check()
|
||||
|
||||
|
||||
# TODO: test create_rtc_stream, get_rtc_stream, destroy_rtc_stream lifecycle
|
||||
def test_rtc_stream_lifecycle() -> None:
|
||||
pass
|
||||
|
||||
|
||||
# TODO: test add_rtc_viewer with valid session and sdp offer
|
||||
def test_add_rtc_viewer() -> None:
|
||||
pass
|
||||
@@ -1,31 +0,0 @@
|
||||
import os
|
||||
|
||||
from facefusion.apis.stream_helper import calculate_bitrate, calculate_buffer_size, read_pipe_buffer
|
||||
|
||||
|
||||
def test_calculate_bitrate() -> None:
|
||||
assert calculate_bitrate((320, 240)) == 674
|
||||
assert calculate_bitrate((640, 480)) == 1347
|
||||
assert calculate_bitrate((1280, 720)) == 2333
|
||||
assert calculate_bitrate((1920, 1080)) == 3500
|
||||
assert calculate_bitrate((3840, 2160)) == 7000
|
||||
|
||||
|
||||
def test_calculate_buffer_size() -> None:
|
||||
assert calculate_buffer_size((320, 240)) == 1348
|
||||
assert calculate_buffer_size((640, 480)) == 2694
|
||||
assert calculate_buffer_size((1280, 720)) == 4666
|
||||
assert calculate_buffer_size((1920, 1080)) == 7000
|
||||
assert calculate_buffer_size((3840, 2160)) == 14000
|
||||
|
||||
|
||||
def test_read_pipe_buffer() -> None:
|
||||
read_pipe, write_pipe = os.pipe()
|
||||
os.write(write_pipe, b'123456')
|
||||
os.close(write_pipe)
|
||||
|
||||
assert read_pipe_buffer(read_pipe, 3) == b'123'
|
||||
assert read_pipe_buffer(read_pipe, 3) == b'456'
|
||||
assert read_pipe_buffer(read_pipe, 1) is None
|
||||
|
||||
os.close(read_pipe)
|
||||
Reference in New Issue
Block a user