Implement basic webrtc stream (#1054)

* implement basic webrtc_stream

* add aiortc to requirements.txt

* update aiortc version

* rename variables with rtc_ prefix

* changes

* changes

* change helper to assert_helper and stream_helper

* rename variables with rtc_ prefix

* add error handling

* return whole connection

* remove monkey patch and some cleaning

* cleanup

* tiny adjustments

* tiny adjustments

* proper typing and naming for rtc offer set

* - remove async from on_video_track method
- rename source -> target
- add audio

* audio always before video

---------

Co-authored-by: henryruhs <info@henryruhs.com>
This commit is contained in:
Harisreedhar
2026-03-04 18:44:00 +05:30
committed by henryruhs
parent 9159f45a5f
commit ab24cd3f2e
34 changed files with 161 additions and 35 deletions
+33 -7
View File
@@ -1,11 +1,17 @@
from functools import partial
import cv2
import numpy
from aiortc import RTCPeerConnection, RTCSessionDescription
from starlette.requests import Request
from starlette.websockets import WebSocket, WebSocketDisconnect
from starlette.responses import JSONResponse, Response
from starlette.status import HTTP_500_INTERNAL_SERVER_ERROR
from starlette.websockets import WebSocket
from facefusion import session_context, session_manager, state_manager
from facefusion.apis.api_helper import get_sec_websocket_protocol
from facefusion.apis.endpoints.session import extract_access_token
from facefusion.apis.session_helper import extract_access_token
from facefusion.apis.stream_helper import on_video_track
from facefusion.streamer import process_stream_frame
@@ -21,8 +27,8 @@ async def websocket_stream(websocket : WebSocket) -> None:
if source_paths:
try:
image_bytes = await websocket.receive_bytes()
target_vision_frame = cv2.imdecode(numpy.frombuffer(image_bytes, numpy.uint8), cv2.IMREAD_COLOR)
image_buffer = await websocket.receive_bytes()
target_vision_frame = cv2.imdecode(numpy.frombuffer(image_buffer, numpy.uint8), cv2.IMREAD_COLOR)
if numpy.any(target_vision_frame):
temp_vision_frame = process_stream_frame(target_vision_frame)
@@ -31,12 +37,32 @@ async def websocket_stream(websocket : WebSocket) -> None:
if is_success:
await websocket.send_bytes(output_vision_frame.tobytes())
except (WebSocketDisconnect, OSError):
except Exception:
pass
return
await websocket.close()
async def webrtc_stream(request : Request) -> None: # TODO: implement webrtc streaming
pass
async def webrtc_stream(request : Request) -> Response:
access_token = extract_access_token(request.scope)
session_id = session_manager.find_session_id(access_token)
session_context.set_session_id(session_id)
if session_id:
body = await request.json()
rtc_offer = RTCSessionDescription(sdp = body.get('sdp'), type = body.get('type'))
rtc_connection = RTCPeerConnection()
rtc_connection.on('track', partial(on_video_track, rtc_connection))
await rtc_connection.setRemoteDescription(rtc_offer)
await rtc_connection.setLocalDescription(await rtc_connection.createAnswer())
return JSONResponse(
{
'sdp': rtc_connection.localDescription.sdp,
'type': rtc_connection.localDescription.type
})
return Response(status_code = HTTP_500_INTERNAL_SERVER_ERROR)
+31
View File
@@ -0,0 +1,31 @@
import asyncio
from typing import cast
from aiortc import MediaStreamTrack, RTCPeerConnection, VideoStreamTrack
from av import VideoFrame
from facefusion.streamer import process_stream_frame
def create_output_track(target_track : MediaStreamTrack) -> VideoStreamTrack:
output_track = VideoStreamTrack()
async def read_stream_frame() -> VideoFrame:
target_stream_frame = cast(VideoFrame, await target_track.recv())
output_vision_frame = await asyncio.get_running_loop().run_in_executor(None, process_stream_frame, target_stream_frame.to_ndarray(format = 'bgr24'))
output_stream_frame = VideoFrame.from_ndarray(output_vision_frame, format = 'bgr24')
output_stream_frame.pts = target_stream_frame.pts
output_stream_frame.time_base = target_stream_frame.time_base
return output_stream_frame
output_track.recv = read_stream_frame
return output_track
def on_video_track(rtc_connection : RTCPeerConnection, target_track : MediaStreamTrack) -> None:
if target_track.kind == 'audio':
rtc_connection.addTrack(target_track)
if target_track.kind == 'video':
output_track = create_output_track(target_track)
rtc_connection.addTrack(output_track)
+6
View File
@@ -257,6 +257,12 @@ BenchmarkCycleSet = TypedDict('BenchmarkCycleSet',
WebcamMode = Literal['inline', 'udp', 'v4l2']
StreamMode = Literal['udp', 'v4l2']
RtcOfferSet = TypedDict('RtcOfferSet',
{
'sdp': str,
'type': str
})
ModelOptions : TypeAlias = Dict[str, Any]
ModelSet : TypeAlias = Dict[str, ModelOptions]
ModelInitializer : TypeAlias = NDArray[Any]
+1
View File
@@ -7,6 +7,7 @@ nvidia-ml-py==13.590.48
psutil==7.2.2
tqdm==4.67.3
scipy==1.16.3
aiortc==1.14.0
starlette==0.52.1
uvicorn==0.41.0
websockets==16.0
+21
View File
@@ -0,0 +1,21 @@
from aiortc import RTCPeerConnection, VideoStreamTrack
from facefusion.types import RtcOfferSet
async def create_rtc_offer() -> RtcOfferSet:
rtc_connection = RTCPeerConnection()
rtc_connection.addTrack(VideoStreamTrack())
rtc_offer = await rtc_connection.createOffer()
await rtc_connection.setLocalDescription(rtc_offer)
rtc_offer_set : RtcOfferSet =\
{
'sdp': rtc_connection.localDescription.sdp,
'type': rtc_connection.localDescription.type
}
await rtc_connection.close()
return rtc_offer_set
+1 -1
View File
@@ -8,7 +8,7 @@ from facefusion import metadata, session_manager, state_manager
from facefusion.apis import asset_store
from facefusion.apis.core import create_api
from facefusion.download import conditional_download
from .helper import get_test_example_file, get_test_examples_directory
from .assert_helper import get_test_example_file, get_test_examples_directory
@pytest.fixture(scope = 'module', autouse = True)
+1 -1
View File
@@ -9,7 +9,7 @@ from facefusion import capability_store, metadata, session_manager, state_manage
from facefusion.apis import asset_store
from facefusion.apis.core import create_api
from facefusion.download import conditional_download
from .helper import get_test_example_file, get_test_examples_directory
from .assert_helper import get_test_example_file, get_test_examples_directory
@pytest.fixture(scope = 'module', autouse = True)
+42 -1
View File
@@ -1,3 +1,4 @@
import asyncio
import tempfile
from typing import Iterator
@@ -11,7 +12,8 @@ from facefusion.apis import asset_store
from facefusion.apis.core import create_api
from facefusion.core import common_pre_check, processors_pre_check
from facefusion.download import conditional_download
from .helper import get_test_example_file, get_test_examples_directory
from .assert_helper import get_test_example_file, get_test_examples_directory
from .stream_helper import create_rtc_offer
@pytest.fixture(scope = 'module', autouse = True)
@@ -97,3 +99,42 @@ def test_stream_image(test_client : TestClient) -> None:
output_vision_frame = cv2.imdecode(numpy.frombuffer(output_bytes, numpy.uint8), cv2.IMREAD_COLOR)
assert output_vision_frame.shape == (1024, 1024, 3)
def test_stream_video(test_client : TestClient) -> None:
create_session_response = test_client.post('/session', json =
{
'client_version': metadata.get('version')
})
access_token = create_session_response.json().get('access_token')
source_path = get_test_example_file('source.jpg')
with open(source_path, 'rb') as source_file:
source_content = source_file.read()
upload_response = test_client.post('/assets?type=source', headers =
{
'Authorization': 'Bearer ' + access_token
}, files =
[
('file', ('source.jpg', source_content, 'image/jpeg'))
])
asset_id = upload_response.json().get('asset_ids')[0]
test_client.put('/state?action=select&type=source', json =
{
'asset_ids': [ asset_id ]
}, headers =
{
'Authorization': 'Bearer ' + access_token
})
rtc_offer = asyncio.run(create_rtc_offer())
stream_response = test_client.post('/stream', json = rtc_offer, headers =
{
'Authorization': 'Bearer ' + access_token
})
assert stream_response.status_code == 200
assert stream_response.json().get('type') == 'answer'
assert stream_response.json().get('sdp')
+1 -1
View File
@@ -2,7 +2,7 @@ import pytest
from facefusion.apis.asset_helper import detect_media_type, extract_audio_metadata, extract_image_metadata, extract_video_metadata
from facefusion.download import conditional_download
from .helper import get_test_example_file, get_test_examples_directory
from .assert_helper import get_test_example_file, get_test_examples_directory
@pytest.fixture(scope = 'module', autouse = True)
+1 -1
View File
@@ -5,7 +5,7 @@ from pytest import approx
from facefusion.audio import detect_audio_duration, get_audio_frame, read_static_audio, restrict_trim_audio_frame
from facefusion.download import conditional_download
from .helper import get_test_example_file, get_test_examples_directory
from .assert_helper import get_test_example_file, get_test_examples_directory
@pytest.fixture(scope = 'module', autouse = True)
+1 -1
View File
@@ -5,7 +5,7 @@ import pytest
from facefusion.download import conditional_download
from facefusion.jobs.job_manager import clear_jobs, init_jobs
from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, is_test_output_sequence, prepare_test_output_directory
from .assert_helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, is_test_output_sequence, prepare_test_output_directory
@pytest.fixture(scope = 'module', autouse = True)
+1 -1
View File
@@ -5,7 +5,7 @@ import pytest
from facefusion.download import conditional_download
from facefusion.jobs.job_manager import clear_jobs, init_jobs
from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, is_test_output_sequence, prepare_test_output_directory
from .assert_helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, is_test_output_sequence, prepare_test_output_directory
@pytest.fixture(scope = 'module', autouse = True)
+1 -1
View File
@@ -5,7 +5,7 @@ import pytest
from facefusion.download import conditional_download
from facefusion.jobs.job_manager import clear_jobs, init_jobs
from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, prepare_test_output_directory
from .assert_helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, prepare_test_output_directory
@pytest.fixture(scope = 'module', autouse = True)
+1 -1
View File
@@ -5,7 +5,7 @@ import pytest
from facefusion.download import conditional_download
from facefusion.jobs.job_manager import clear_jobs, init_jobs
from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, is_test_output_sequence, prepare_test_output_directory
from .assert_helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, is_test_output_sequence, prepare_test_output_directory
@pytest.fixture(scope = 'module', autouse = True)
+1 -1
View File
@@ -5,7 +5,7 @@ import pytest
from facefusion.download import conditional_download
from facefusion.jobs.job_manager import clear_jobs, init_jobs
from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, is_test_output_sequence, prepare_test_output_directory
from .assert_helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, is_test_output_sequence, prepare_test_output_directory
@pytest.fixture(scope = 'module', autouse = True)
+1 -1
View File
@@ -5,7 +5,7 @@ import pytest
from facefusion.download import conditional_download
from facefusion.jobs.job_manager import clear_jobs, init_jobs
from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, is_test_output_sequence, prepare_test_output_directory
from .assert_helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, is_test_output_sequence, prepare_test_output_directory
@pytest.fixture(scope = 'module', autouse = True)
+1 -1
View File
@@ -5,7 +5,7 @@ import pytest
from facefusion.download import conditional_download
from facefusion.jobs.job_manager import clear_jobs, init_jobs
from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, is_test_output_sequence, prepare_test_output_directory
from .assert_helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, is_test_output_sequence, prepare_test_output_directory
@pytest.fixture(scope = 'module', autouse = True)
+1 -1
View File
@@ -5,7 +5,7 @@ import pytest
from facefusion.download import conditional_download
from facefusion.jobs.job_manager import clear_jobs, init_jobs
from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, is_test_output_sequence, prepare_test_output_directory
from .assert_helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, is_test_output_sequence, prepare_test_output_directory
@pytest.fixture(scope = 'module', autouse = True)
+1 -1
View File
@@ -5,7 +5,7 @@ import pytest
from facefusion.download import conditional_download
from facefusion.jobs.job_manager import clear_jobs, init_jobs
from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, is_test_output_sequence, prepare_test_output_directory
from .assert_helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, is_test_output_sequence, prepare_test_output_directory
@pytest.fixture(scope = 'module', autouse = True)
+1 -1
View File
@@ -5,7 +5,7 @@ import pytest
from facefusion.download import conditional_download
from facefusion.jobs.job_manager import clear_jobs, init_jobs
from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, is_test_output_sequence, prepare_test_output_directory
from .assert_helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, is_test_output_sequence, prepare_test_output_directory
@pytest.fixture(scope = 'module', autouse = True)
+1 -1
View File
@@ -5,7 +5,7 @@ import pytest
from facefusion.download import conditional_download
from facefusion.jobs.job_manager import clear_jobs, count_step_total, init_jobs
from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_job_file
from .assert_helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_job_file
@pytest.fixture(scope = 'module', autouse = True)
+1 -1
View File
@@ -5,7 +5,7 @@ import pytest
from facefusion.download import conditional_download
from facefusion.jobs.job_manager import clear_jobs, init_jobs, move_job_file, set_steps_status
from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, prepare_test_output_directory
from .assert_helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, prepare_test_output_directory
@pytest.fixture(scope = 'module', autouse = True)
+1 -1
View File
@@ -5,7 +5,7 @@ import pytest
from facefusion.download import conditional_download
from facefusion.jobs.job_manager import clear_jobs, init_jobs
from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, is_test_output_sequence, prepare_test_output_directory
from .assert_helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, is_test_output_sequence, prepare_test_output_directory
@pytest.fixture(scope = 'module', autouse = True)
+1 -1
View File
@@ -7,7 +7,7 @@ from facefusion.download import conditional_download
from facefusion.jobs.job_manager import clear_jobs, init_jobs
from facefusion.types import Resolution, Scale
from facefusion.vision import detect_image_resolution, detect_video_resolution
from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, prepare_test_output_directory
from .assert_helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, prepare_test_output_directory
@pytest.fixture(scope = 'module', autouse = True)
+1 -1
View File
@@ -7,7 +7,7 @@ from facefusion.download import conditional_download
from facefusion.face_analyser import get_many_faces
from facefusion.face_store import clear_static_faces
from facefusion.vision import read_static_image
from .helper import get_test_example_file, get_test_examples_directory
from .assert_helper import get_test_example_file, get_test_examples_directory
@pytest.fixture(scope = 'module', autouse = True)
+1 -1
View File
@@ -11,7 +11,7 @@ from facefusion.ffmpeg import concat_video, extract_frames, merge_video, read_au
from facefusion.filesystem import copy_file
from facefusion.temp_helper import clear_temp_directory, create_temp_directory, get_temp_file_path, resolve_temp_frame_paths
from facefusion.types import EncoderSet
from .helper import get_test_example_file, get_test_examples_directory, get_test_output_path, prepare_test_output_directory
from .assert_helper import get_test_example_file, get_test_examples_directory, get_test_output_path, prepare_test_output_directory
@pytest.fixture(scope = 'module', autouse = True)
+1 -1
View File
@@ -5,7 +5,7 @@ import pytest
from facefusion import process_manager
from facefusion.download import conditional_download
from facefusion.ffprobe import detect_audio_channel_total, detect_audio_frame_total, detect_audio_sample_rate
from .helper import get_test_example_file, get_test_examples_directory
from .assert_helper import get_test_example_file, get_test_examples_directory
@pytest.fixture(scope = 'module', autouse = True)
+1 -1
View File
@@ -4,7 +4,7 @@ import pytest
from facefusion.download import conditional_download
from facefusion.filesystem import create_directory, filter_audio_paths, filter_image_paths, get_file_extension, get_file_format, get_file_size, has_audio, has_image, has_video, in_directory, is_audio, is_directory, is_file, is_image, is_video, remove_directory, resolve_file_paths
from .helper import get_test_example_file, get_test_examples_directory, get_test_outputs_directory
from .assert_helper import get_test_example_file, get_test_examples_directory, get_test_outputs_directory
@pytest.fixture(scope = 'module', autouse = True)
+1 -1
View File
@@ -4,7 +4,7 @@ import pytest
from facefusion.jobs.job_list import compose_job_list
from facefusion.jobs.job_manager import clear_jobs, create_job, init_jobs
from .helper import get_test_jobs_directory
from .assert_helper import get_test_jobs_directory
@pytest.fixture(scope = 'function', autouse = True)
+1 -1
View File
@@ -4,7 +4,7 @@ import pytest
from facefusion.jobs.job_helper import get_step_output_path
from facefusion.jobs.job_manager import add_step, clear_jobs, count_step_total, create_job, delete_job, delete_jobs, find_job_ids, find_jobs, get_steps, init_jobs, insert_step, move_job_file, remix_step, remove_step, set_step_status, set_steps_status, submit_job, submit_jobs
from .helper import get_test_jobs_directory
from .assert_helper import get_test_jobs_directory
@pytest.fixture(scope = 'function', autouse = True)
+1 -1
View File
@@ -8,7 +8,7 @@ from facefusion.filesystem import copy_file, create_directory, get_file_extensio
from facefusion.jobs.job_manager import add_step, clear_jobs, create_job, init_jobs, move_job_file, submit_job, submit_jobs
from facefusion.jobs.job_runner import collect_output_set, finalize_steps, retry_job, retry_jobs, run_job, run_jobs, run_steps
from facefusion.types import Args
from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, is_test_output_sequence, prepare_test_output_directory
from .assert_helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, is_test_output_sequence, prepare_test_output_directory
@pytest.fixture(scope = 'module', autouse = True)
+1 -1
View File
@@ -6,7 +6,7 @@ import pytest
from facefusion import state_manager
from facefusion.download import conditional_download
from facefusion.temp_helper import get_temp_directory_path, get_temp_file_path, get_temp_frames_pattern
from .helper import get_test_example_file, get_test_examples_directory
from .assert_helper import get_test_example_file, get_test_examples_directory
@pytest.fixture(scope = 'module', autouse = True)
+1 -1
View File
@@ -4,7 +4,7 @@ import pytest
from facefusion.download import conditional_download
from facefusion.vision import calculate_histogram_difference, count_video_frame_total, detect_image_resolution, detect_video_duration, detect_video_fps, detect_video_resolution, match_frame_color, normalize_resolution, pack_resolution, predict_video_frame_total, read_image, read_video_frame, restrict_image_resolution, restrict_trim_video_frame, restrict_video_fps, restrict_video_resolution, scale_resolution, unpack_resolution, write_image
from .helper import get_test_example_file, get_test_examples_directory, get_test_output_path, prepare_test_output_directory
from .assert_helper import get_test_example_file, get_test_examples_directory, get_test_output_path, prepare_test_output_directory
@pytest.fixture(scope = 'module', autouse = True)