mass test approaches

This commit is contained in:
henryruhs
2026-03-23 13:25:44 +01:00
parent 1cdf51042e
commit 44f8f1e83b
5 changed files with 1166 additions and 39 deletions
+48 -2
View File
@@ -13,7 +13,8 @@ from facefusion.apis.endpoints.metrics import get_metrics, websocket_metrics
from facefusion.apis.endpoints.ping import websocket_ping
from facefusion.apis.endpoints.session import create_session, destroy_session, get_session, refresh_session
from facefusion.apis.endpoints.state import get_state, set_state
from facefusion.apis.endpoints.stream import websocket_stream, websocket_stream_whip
from facefusion import logger
from facefusion.apis.endpoints.stream import websocket_stream, websocket_stream_audio, websocket_stream_live, websocket_stream_mjpeg, websocket_stream_rtc, websocket_stream_whip, websocket_stream_whip_aio, websocket_stream_whip_dc, websocket_stream_whip_py
from facefusion.apis.middlewares.session import create_session_guard
@@ -21,9 +22,47 @@ from facefusion.apis.middlewares.session import create_session_guard
async def lifespan(app : Starlette) -> AsyncGenerator[None, None]:
mediamtx.start()
mediamtx.wait_for_ready()
try:
from facefusion import webrtc_sfu
webrtc_sfu.start()
except Exception as exception:
logger.warn('webrtc sfu: ' + str(exception), __name__)
try:
from facefusion import whip_relay
whip_relay.start()
whip_relay.wait_for_ready()
except Exception as exception:
logger.warn('whip relay: ' + str(exception), __name__)
try:
from facefusion import rtc
rtc.start()
except Exception as exception:
logger.warn('rtc: ' + str(exception), __name__)
yield
mediamtx.stop()
try:
from facefusion import webrtc_sfu
webrtc_sfu.stop()
except Exception:
pass
try:
from facefusion import whip_relay
whip_relay.stop()
except Exception:
pass
try:
from facefusion import rtc
rtc.stop()
except Exception:
pass
def create_api() -> Starlette:
session_guard = Middleware(create_session_guard)
@@ -44,7 +83,14 @@ def create_api() -> Starlette:
WebSocketRoute('/metrics', websocket_metrics, middleware = [ session_guard ]),
WebSocketRoute('/ping', websocket_ping, middleware = [ session_guard ]),
WebSocketRoute('/stream', websocket_stream, middleware = [ session_guard ]),
WebSocketRoute('/stream/whip', websocket_stream_whip, middleware = [ session_guard ])
WebSocketRoute('/stream/whip', websocket_stream_whip, middleware = [ session_guard ]),
WebSocketRoute('/stream/whip-py', websocket_stream_whip_py, middleware = [ session_guard ]),
WebSocketRoute('/stream/whip-dc', websocket_stream_whip_dc, middleware = [ session_guard ]),
WebSocketRoute('/stream/live', websocket_stream_live, middleware = [ session_guard ]),
WebSocketRoute('/stream/whip-aio', websocket_stream_whip_aio, middleware = [ session_guard ]),
WebSocketRoute('/stream/rtc', websocket_stream_rtc, middleware = [ session_guard ]),
WebSocketRoute('/stream/mjpeg', websocket_stream_mjpeg, middleware = [ session_guard ]),
WebSocketRoute('/stream/audio', websocket_stream_audio, middleware = [ session_guard ])
]
api = Starlette(routes = routes, lifespan = lifespan)
+665 -8
View File
@@ -1,9 +1,11 @@
import asyncio
import fcntl
import os as _os
import threading
import time
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from typing import Deque
from typing import Deque, List
import cv2
import numpy
@@ -13,7 +15,7 @@ from facefusion import logger, session_context, session_manager, state_manager
from facefusion.apis.api_helper import get_sec_websocket_protocol
from facefusion.apis.session_helper import extract_access_token
from facefusion import mediamtx
from facefusion.apis.stream_helper import STREAM_FPS, STREAM_QUALITY, close_whip_encoder, create_whip_encoder, feed_whip_audio, feed_whip_frame, process_stream_frame
from facefusion.apis.stream_helper import STREAM_FPS, STREAM_QUALITY, close_fmp4_encoder, close_whip_encoder, collect_fmp4_chunks, create_fmp4_encoder, create_vp8_pipe_encoder, create_whip_encoder, feed_whip_audio, feed_whip_frame, process_stream_frame, read_fmp4_output
from facefusion.streamer import process_vision_frame
from facefusion.types import VisionFrame
@@ -80,7 +82,8 @@ def run_whip_pipeline(latest_frame_holder : list, lock : threading.Lock, stop_ev
if not encoder:
height, width = temp_vision_frame.shape[:2]
encoder, audio_write_fd = create_whip_encoder(width, height, STREAM_FPS, STREAM_QUALITY, stream_path)
whip_url = mediamtx.get_whip_url(stream_path)
encoder, audio_write_fd = create_whip_encoder(width, height, STREAM_FPS, STREAM_QUALITY, whip_url)
audio_write_fd_holder[0] = audio_write_fd
logger.info('whip encoder started ' + str(width) + 'x' + str(height), __name__)
@@ -93,11 +96,6 @@ def run_whip_pipeline(latest_frame_holder : list, lock : threading.Lock, stop_ev
time.sleep(0.005)
if encoder:
stderr_output = encoder.stderr.read() if encoder.stderr else b''
if stderr_output:
logger.error('ffmpeg: ' + stderr_output.decode(), __name__)
close_whip_encoder(encoder, audio_write_fd)
@@ -158,3 +156,662 @@ async def websocket_stream_whip(websocket : WebSocket) -> None:
return
await websocket.close()
def run_fmp4_pipeline(latest_frame_holder : list, lock : threading.Lock, stop_event : threading.Event, output_chunks : List[bytes], output_lock : threading.Lock, audio_write_fd_holder : list) -> None:
encoder = None
audio_write_fd = -1
reader_thread = None
output_deque : Deque[VisionFrame] = deque()
with ThreadPoolExecutor(max_workers = state_manager.get_item('execution_thread_count')) as executor:
futures = []
while not stop_event.is_set():
with lock:
capture_frame = latest_frame_holder[0]
latest_frame_holder[0] = None
if capture_frame is not None:
future = executor.submit(process_stream_frame, capture_frame)
futures.append(future)
for future_done in [ future for future in futures if future.done() ]:
output_deque.append(future_done.result())
futures.remove(future_done)
if encoder and encoder.poll() is not None:
stderr_output = encoder.stderr.read() if encoder.stderr else b''
logger.error('fmp4 encoder died with code ' + str(encoder.returncode) + ': ' + stderr_output.decode(), __name__)
break
while output_deque:
temp_vision_frame = output_deque.popleft()
if not encoder:
height, width = temp_vision_frame.shape[:2]
encoder, audio_write_fd = create_fmp4_encoder(width, height, STREAM_FPS, STREAM_QUALITY)
audio_write_fd_holder[0] = audio_write_fd
reader_thread = threading.Thread(target = read_fmp4_output, args = (encoder, output_chunks, output_lock), daemon = True)
reader_thread.start()
logger.info('fmp4 encoder started ' + str(width) + 'x' + str(height), __name__)
feed_whip_frame(encoder, temp_vision_frame)
if capture_frame is None and not output_deque:
time.sleep(0.005)
if encoder:
close_fmp4_encoder(encoder, audio_write_fd)
async def websocket_stream_live(websocket : WebSocket) -> None:
subprotocol = get_sec_websocket_protocol(websocket.scope)
access_token = extract_access_token(websocket.scope)
session_id = session_manager.find_session_id(access_token)
session_context.set_session_id(session_id)
source_paths = state_manager.get_item('source_paths')
await websocket.accept(subprotocol = subprotocol)
if source_paths:
latest_frame_holder : list = [None]
audio_write_fd_holder : list = [-1]
output_chunks : List[bytes] = []
lock = threading.Lock()
output_lock = threading.Lock()
stop_event = threading.Event()
worker = threading.Thread(target = run_fmp4_pipeline, args = (latest_frame_holder, lock, stop_event, output_chunks, output_lock, audio_write_fd_holder), daemon = True)
worker.start()
try:
while True:
message = await websocket.receive()
chunks = collect_fmp4_chunks(output_chunks, output_lock)
if chunks:
await websocket.send_bytes(chunks)
if message.get('bytes'):
data = message.get('bytes')
if data[:2] == JPEG_MAGIC:
frame = cv2.imdecode(numpy.frombuffer(data, numpy.uint8), cv2.IMREAD_COLOR)
if numpy.any(frame):
with lock:
latest_frame_holder[0] = frame
if data[:2] != JPEG_MAGIC and audio_write_fd_holder[0] > 0:
feed_whip_audio(audio_write_fd_holder[0], data)
except Exception as exception:
logger.error(str(exception), __name__)
stop_event.set()
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, worker.join, 10)
return
await websocket.close()
def run_mjpeg_pipeline(latest_frame_holder : list, lock : threading.Lock, stop_event : threading.Event, output_holder : list, output_lock : threading.Lock) -> None:
output_deque : Deque[VisionFrame] = deque()
with ThreadPoolExecutor(max_workers = state_manager.get_item('execution_thread_count')) as executor:
futures = []
while not stop_event.is_set():
with lock:
capture_frame = latest_frame_holder[0]
latest_frame_holder[0] = None
if capture_frame is not None:
future = executor.submit(process_stream_frame, capture_frame)
futures.append(future)
for future_done in [ future for future in futures if future.done() ]:
output_deque.append(future_done.result())
futures.remove(future_done)
while output_deque:
temp_vision_frame = output_deque.popleft()
is_success, encoded = cv2.imencode('.jpg', temp_vision_frame, [cv2.IMWRITE_JPEG_QUALITY, 92])
if is_success:
with output_lock:
output_holder[0] = encoded.tobytes()
if capture_frame is None and not output_deque:
time.sleep(0.005)
async def websocket_stream_mjpeg(websocket : WebSocket) -> None:
subprotocol = get_sec_websocket_protocol(websocket.scope)
access_token = extract_access_token(websocket.scope)
session_id = session_manager.find_session_id(access_token)
session_context.set_session_id(session_id)
source_paths = state_manager.get_item('source_paths')
await websocket.accept(subprotocol = subprotocol)
if source_paths:
latest_frame_holder : list = [None]
output_holder : list = [None]
lock = threading.Lock()
output_lock = threading.Lock()
stop_event = threading.Event()
worker = threading.Thread(target = run_mjpeg_pipeline, args = (latest_frame_holder, lock, stop_event, output_holder, output_lock), daemon = True)
worker.start()
try:
while True:
message = await websocket.receive()
with output_lock:
jpeg_data = output_holder[0]
output_holder[0] = None
if jpeg_data:
await websocket.send_bytes(jpeg_data)
if message.get('bytes'):
data = message.get('bytes')
if data[:2] == JPEG_MAGIC:
frame = cv2.imdecode(numpy.frombuffer(data, numpy.uint8), cv2.IMREAD_COLOR)
if numpy.any(frame):
with lock:
latest_frame_holder[0] = frame
except Exception as exception:
logger.error(str(exception), __name__)
stop_event.set()
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, worker.join, 10)
return
await websocket.close()
async def websocket_stream_audio(websocket : WebSocket) -> None:
subprotocol = get_sec_websocket_protocol(websocket.scope)
access_token = extract_access_token(websocket.scope)
session_id = session_manager.find_session_id(access_token)
session_context.set_session_id(session_id)
await websocket.accept(subprotocol = subprotocol)
try:
while True:
message = await websocket.receive()
if message.get('bytes'):
await websocket.send_bytes(message.get('bytes'))
except Exception:
pass
async def websocket_stream_whip_py(websocket : WebSocket) -> None:
subprotocol = get_sec_websocket_protocol(websocket.scope)
access_token = extract_access_token(websocket.scope)
session_id = session_manager.find_session_id(access_token)
session_context.set_session_id(session_id)
source_paths = state_manager.get_item('source_paths')
await websocket.accept(subprotocol = subprotocol)
if source_paths:
from facefusion.aiortc_bridge import AiortcBridge
bridge = AiortcBridge()
await bridge.start()
whep_url = 'http://localhost:' + str(bridge.port) + '/whep'
latest_frame_holder : list = [None]
whep_sent = False
lock = threading.Lock()
stop_event = threading.Event()
ready_event = threading.Event()
worker = threading.Thread(target = run_aiortc_pipeline, args = (latest_frame_holder, lock, stop_event, ready_event, bridge), daemon = True)
worker.start()
try:
while True:
message = await websocket.receive()
if not whep_sent and ready_event.is_set():
await websocket.send_text(whep_url)
whep_sent = True
if message.get('bytes'):
data = message.get('bytes')
if data[:2] == JPEG_MAGIC:
frame = cv2.imdecode(numpy.frombuffer(data, numpy.uint8), cv2.IMREAD_COLOR)
if numpy.any(frame):
with lock:
latest_frame_holder[0] = frame
if data[:2] != JPEG_MAGIC:
bridge.push_audio(data)
except Exception as exception:
logger.error(str(exception), __name__)
stop_event.set()
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, worker.join, 10)
await bridge.stop()
return
await websocket.close()
def run_aiortc_pipeline(latest_frame_holder : list, lock : threading.Lock, stop_event : threading.Event, ready_event : threading.Event, bridge : object) -> None:
output_deque : Deque[VisionFrame] = deque()
with ThreadPoolExecutor(max_workers = state_manager.get_item('execution_thread_count')) as executor:
futures = []
while not stop_event.is_set():
with lock:
capture_frame = latest_frame_holder[0]
latest_frame_holder[0] = None
if capture_frame is not None:
future = executor.submit(process_stream_frame, capture_frame)
futures.append(future)
for future_done in [ future for future in futures if future.done() ]:
output_deque.append(future_done.result())
futures.remove(future_done)
while output_deque:
temp_vision_frame = output_deque.popleft()
bridge.push_frame(temp_vision_frame)
if not ready_event.is_set():
time.sleep(2)
ready_event.set()
if capture_frame is None and not output_deque:
time.sleep(0.005)
def read_h264_output(process, h264_chunks : List[bytes], h264_lock : threading.Lock) -> None:
fd = process.stdout.fileno()
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~_os.O_NONBLOCK)
while True:
chunk = _os.read(fd, 4096)
if not chunk:
break
with h264_lock:
h264_chunks.append(chunk)
def read_ivf_frames(process, frame_list : List[bytes], frame_lock : threading.Lock) -> None:
fd = process.stdout.fileno()
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~_os.O_NONBLOCK)
header = b''
while len(header) < 32:
chunk = _os.read(fd, 32 - len(header))
if not chunk:
return
header += chunk
while True:
frame_header = b''
while len(frame_header) < 12:
chunk = _os.read(fd, 12 - len(frame_header))
if not chunk:
return
frame_header += chunk
frame_size = int.from_bytes(frame_header[0:4], 'little')
frame_data = b''
while len(frame_data) < frame_size:
chunk = _os.read(fd, frame_size - len(frame_data))
if not chunk:
return
frame_data += chunk
with frame_lock:
frame_list.append(frame_data)
def run_h264_dc_pipeline(latest_frame_holder : list, lock : threading.Lock, stop_event : threading.Event, ready_event : threading.Event, backend : str, stream_path : str, rtp_port : int) -> None:
encoder = None
reader_thread = None
vp8_frames : List[bytes] = []
vp8_lock = threading.Lock()
output_deque : Deque[VisionFrame] = deque()
udp_sock = None
if backend == 'relay':
import socket as sock
udp_sock = sock.socket(sock.AF_INET, sock.SOCK_DGRAM)
with ThreadPoolExecutor(max_workers = state_manager.get_item('execution_thread_count')) as executor:
futures = []
while not stop_event.is_set():
with lock:
capture_frame = latest_frame_holder[0]
latest_frame_holder[0] = None
if capture_frame is not None:
future = executor.submit(process_stream_frame, capture_frame)
futures.append(future)
for future_done in [ future for future in futures if future.done() ]:
output_deque.append(future_done.result())
futures.remove(future_done)
if encoder and encoder.poll() is not None:
stderr_output = encoder.stderr.read() if encoder.stderr else b''
logger.error('vp8 encoder died: ' + stderr_output.decode(), __name__)
break
while output_deque:
temp_vision_frame = output_deque.popleft()
if not encoder:
height, width = temp_vision_frame.shape[:2]
encoder = create_vp8_pipe_encoder(width, height, STREAM_FPS, STREAM_QUALITY)
reader_thread = threading.Thread(target = read_ivf_frames, args = (encoder, vp8_frames, vp8_lock), daemon = True)
reader_thread.start()
logger.info('vp8 encoder started ' + str(width) + 'x' + str(height) + ' [' + backend + ']', __name__)
feed_whip_frame(encoder, temp_vision_frame)
with vp8_lock:
if vp8_frames:
pending = list(vp8_frames)
vp8_frames.clear()
for frame in pending:
if backend == 'relay' and udp_sock:
if len(frame) <= 65000:
udp_sock.sendto(frame, ('127.0.0.1', rtp_port))
if backend == 'rtc':
from facefusion import rtc
rtc.send_vp8_frame(stream_path, frame)
if not ready_event.is_set() and encoder and encoder.poll() is None:
time.sleep(1)
ready_event.set()
if capture_frame is None and not output_deque:
time.sleep(0.005)
if encoder:
encoder.stdin.close()
encoder.terminate()
encoder.wait(timeout = 5)
if udp_sock:
udp_sock.close()
async def websocket_stream_whip_dc(websocket : WebSocket) -> None:
subprotocol = get_sec_websocket_protocol(websocket.scope)
access_token = extract_access_token(websocket.scope)
session_id = session_manager.find_session_id(access_token)
session_context.set_session_id(session_id)
source_paths = state_manager.get_item('source_paths')
await websocket.accept(subprotocol = subprotocol)
if source_paths:
from facefusion import whip_relay
stream_path = 'stream/' + session_id
rtp_port = whip_relay.create_session(stream_path)
if not rtp_port:
logger.error('failed to create relay session', __name__)
await websocket.close()
return
latest_frame_holder : list = [None]
whep_sent = False
lock = threading.Lock()
stop_event = threading.Event()
ready_event = threading.Event()
worker = threading.Thread(target = run_h264_dc_pipeline, args = (latest_frame_holder, lock, stop_event, ready_event, 'relay', stream_path, rtp_port), daemon = True)
worker.start()
try:
while True:
message = await websocket.receive()
if not whep_sent and ready_event.is_set():
whep_url = whip_relay.get_whep_url(stream_path)
await websocket.send_text(whep_url)
whep_sent = True
if message.get('bytes'):
data = message.get('bytes')
if data[:2] == JPEG_MAGIC:
frame = cv2.imdecode(numpy.frombuffer(data, numpy.uint8), cv2.IMREAD_COLOR)
if numpy.any(frame):
with lock:
latest_frame_holder[0] = frame
except Exception as exception:
logger.error(str(exception), __name__)
stop_event.set()
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, worker.join, 10)
return
await websocket.close()
async def websocket_stream_whip_aio(websocket : WebSocket) -> None:
subprotocol = get_sec_websocket_protocol(websocket.scope)
access_token = extract_access_token(websocket.scope)
session_id = session_manager.find_session_id(access_token)
session_context.set_session_id(session_id)
source_paths = state_manager.get_item('source_paths')
await websocket.accept(subprotocol = subprotocol)
if source_paths:
from facefusion.aiortc_bridge import AiortcBridge
bridge = AiortcBridge()
await bridge.start()
whep_url = 'http://localhost:' + str(bridge.port) + '/whep'
latest_frame_holder : list = [None]
whep_sent = False
lock = threading.Lock()
stop_event = threading.Event()
ready_event = threading.Event()
worker = threading.Thread(target = run_aiortc_pipeline, args = (latest_frame_holder, lock, stop_event, ready_event, bridge), daemon = True)
worker.start()
try:
while True:
message = await websocket.receive()
if not whep_sent and ready_event.is_set():
await websocket.send_text(whep_url)
whep_sent = True
if message.get('bytes'):
data = message.get('bytes')
if data[:2] == JPEG_MAGIC:
frame = cv2.imdecode(numpy.frombuffer(data, numpy.uint8), cv2.IMREAD_COLOR)
if numpy.any(frame):
with lock:
latest_frame_holder[0] = frame
if data[:2] != JPEG_MAGIC:
bridge.push_audio(data)
except Exception as exception:
logger.error(str(exception), __name__)
stop_event.set()
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, worker.join, 10)
await bridge.stop()
return
await websocket.close()
def run_rtc_direct_pipeline(latest_frame_holder : list, lock : threading.Lock, stop_event : threading.Event, ready_event : threading.Event, stream_path : str) -> None:
from facefusion import rtc
encoder = None
reader_thread = None
vp8_frames : List[bytes] = []
vp8_lock = threading.Lock()
output_deque : Deque[VisionFrame] = deque()
with ThreadPoolExecutor(max_workers = state_manager.get_item('execution_thread_count')) as executor:
futures = []
while not stop_event.is_set():
with lock:
capture_frame = latest_frame_holder[0]
latest_frame_holder[0] = None
if capture_frame is not None:
h, w = capture_frame.shape[:2]
if w > 640:
scale = 640 / w
capture_frame = cv2.resize(capture_frame, (640, int(h * scale) - int(h * scale) % 2))
if len(futures) < 4:
future = executor.submit(process_stream_frame, capture_frame)
futures.append(future)
for future_done in [ future for future in futures if future.done() ]:
output_deque.append(future_done.result())
futures.remove(future_done)
if encoder and encoder.poll() is not None:
stderr_output = encoder.stderr.read() if encoder.stderr else b''
logger.error('vp8 encoder died: ' + stderr_output.decode(), __name__)
break
while output_deque:
temp_vision_frame = output_deque.popleft()
if not encoder:
height, width = temp_vision_frame.shape[:2]
encoder = create_vp8_pipe_encoder(width, height, STREAM_FPS, STREAM_QUALITY)
reader_thread = threading.Thread(target = read_ivf_frames, args = (encoder, vp8_frames, vp8_lock), daemon = True)
reader_thread.start()
logger.info('vp8 direct encoder started ' + str(width) + 'x' + str(height), __name__)
feed_whip_frame(encoder, temp_vision_frame)
with vp8_lock:
if vp8_frames:
pending = list(vp8_frames)
vp8_frames.clear()
for frame in pending:
rtc.send_to_viewers(stream_path, frame)
if not ready_event.is_set() and encoder and encoder.poll() is None:
time.sleep(0.5)
ready_event.set()
if capture_frame is None and not output_deque:
time.sleep(0.003)
if encoder:
encoder.stdin.close()
encoder.terminate()
encoder.wait(timeout = 5)
async def websocket_stream_rtc(websocket : WebSocket) -> None:
subprotocol = get_sec_websocket_protocol(websocket.scope)
access_token = extract_access_token(websocket.scope)
session_id = session_manager.find_session_id(access_token)
session_context.set_session_id(session_id)
source_paths = state_manager.get_item('source_paths')
await websocket.accept(subprotocol = subprotocol)
if source_paths:
from facefusion import rtc
stream_path = 'stream/' + session_id
rtc.create_session(stream_path)
whep_url = 'http://localhost:' + str(rtc.WHEP_PORT) + '/' + stream_path + '/whep'
latest_frame_holder : list = [None]
whep_sent = False
lock = threading.Lock()
stop_event = threading.Event()
ready_event = threading.Event()
worker = threading.Thread(target = run_rtc_direct_pipeline, args = (latest_frame_holder, lock, stop_event, ready_event, stream_path), daemon = True)
worker.start()
try:
while True:
message = await websocket.receive()
if not whep_sent and ready_event.is_set():
await websocket.send_text(whep_url)
whep_sent = True
if message.get('bytes'):
data = message.get('bytes')
if data[:2] == JPEG_MAGIC:
frame = cv2.imdecode(numpy.frombuffer(data, numpy.uint8), cv2.IMREAD_COLOR)
if numpy.any(frame):
with lock:
latest_frame_holder[0] = frame
except Exception as exception:
logger.error(str(exception), __name__)
stop_event.set()
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, worker.join, 10)
rtc.destroy_session(stream_path)
return
await websocket.close()
+178 -7
View File
@@ -1,21 +1,50 @@
import os
import subprocess
import tempfile
from typing import Tuple
import threading
from typing import List, Optional, Tuple
import cv2
from facefusion import ffmpeg_builder, mediamtx
from facefusion import ffmpeg_builder
from facefusion.streamer import process_vision_frame
from facefusion.types import VisionFrame
STREAM_FPS : int = 30
STREAM_QUALITY : int = 45
STREAM_QUALITY : int = 80
STREAM_AUDIO_RATE : int = 48000
DTLS_CERT_FILE : str = os.path.join(tempfile.gettempdir(), 'facefusion_dtls_cert.pem')
DTLS_KEY_FILE : str = os.path.join(tempfile.gettempdir(), 'facefusion_dtls_key.pem')
def compute_bitrate(width : int, height : int) -> str:
pixels = width * height
if pixels <= 320 * 240:
return '400k'
if pixels <= 640 * 480:
return '1000k'
if pixels <= 1280 * 720:
return '2000k'
if pixels <= 1920 * 1080:
return '3500k'
return '5000k'
def compute_bufsize(width : int, height : int) -> str:
pixels = width * height
if pixels <= 320 * 240:
return '800k'
if pixels <= 640 * 480:
return '2000k'
if pixels <= 1280 * 720:
return '4000k'
if pixels <= 1920 * 1080:
return '7000k'
return '10000k'
def create_dtls_certificate() -> None:
if os.path.isfile(DTLS_CERT_FILE) and os.path.isfile(DTLS_KEY_FILE):
return
@@ -27,10 +56,9 @@ def create_dtls_certificate() -> None:
], stdout = subprocess.DEVNULL, stderr = subprocess.DEVNULL)
def create_whip_encoder(width : int, height : int, stream_fps : int, stream_quality : int, stream_path : str) -> Tuple[subprocess.Popen[bytes], int]:
def create_whip_encoder(width : int, height : int, stream_fps : int, stream_quality : int, whip_url : str) -> Tuple[subprocess.Popen[bytes], int]:
create_dtls_certificate()
audio_read_fd, audio_write_fd = os.pipe()
whip_url = mediamtx.get_whip_url(stream_path)
commands = ffmpeg_builder.chain(
[ '-use_wallclock_as_timestamps', '1' ],
ffmpeg_builder.capture_video(),
@@ -44,8 +72,8 @@ def create_whip_encoder(width : int, height : int, stream_fps : int, stream_qual
[ '-pix_fmt', 'yuv420p' ],
[ '-profile:v', 'baseline' ],
[ '-tune', 'zerolatency' ],
[ '-maxrate', '1500k' ],
[ '-bufsize', '3000k' ],
[ '-maxrate', compute_bitrate(width, height) ],
[ '-bufsize', compute_bufsize(width, height) ],
[ '-g', str(stream_fps) ],
[ '-c:a', 'libopus' ],
[ '-f', 'whip' ],
@@ -76,5 +104,148 @@ def close_whip_encoder(process : subprocess.Popen[bytes], audio_write_fd : int)
process.wait(timeout = 5)
def create_fmp4_encoder(width : int, height : int, stream_fps : int, stream_quality : int) -> Tuple[subprocess.Popen[bytes], int]:
audio_read_fd, audio_write_fd = os.pipe()
commands = ffmpeg_builder.chain(
[ '-use_wallclock_as_timestamps', '1' ],
ffmpeg_builder.capture_video(),
ffmpeg_builder.set_media_resolution(str(width) + 'x' + str(height)),
ffmpeg_builder.set_input('-'),
[ '-use_wallclock_as_timestamps', '1' ],
[ '-f', 's16le', '-ar', str(STREAM_AUDIO_RATE), '-ac', '2', '-i', 'pipe:' + str(audio_read_fd) ],
[ '-thread_queue_size', '512' ],
ffmpeg_builder.set_video_encoder('libx264'),
ffmpeg_builder.set_video_quality('libx264', stream_quality),
ffmpeg_builder.set_video_preset('libx264', 'ultrafast'),
[ '-pix_fmt', 'yuv420p' ],
[ '-profile:v', 'baseline' ],
[ '-tune', 'zerolatency' ],
[ '-maxrate', compute_bitrate(width, height) ],
[ '-bufsize', compute_bufsize(width, height) ],
[ '-g', str(stream_fps) ],
[ '-c:a', 'aac' ],
[ '-b:a', '128k' ],
[ '-f', 'mp4' ],
[ '-movflags', 'frag_keyframe+empty_moov+default_base_moof+frag_every_frame' ],
ffmpeg_builder.set_output('-')
)
commands = ffmpeg_builder.run(commands)
process = subprocess.Popen(commands, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE, pass_fds = (audio_read_fd,))
os.close(audio_read_fd)
return process, audio_write_fd
def read_fmp4_output(process : subprocess.Popen[bytes], output_chunks : List[bytes], lock : threading.Lock) -> None:
while True:
chunk = process.stdout.read(4096)
if not chunk:
break
with lock:
output_chunks.append(chunk)
def collect_fmp4_chunks(output_chunks : List[bytes], lock : threading.Lock) -> Optional[bytes]:
with lock:
if output_chunks:
encoded_bytes = b''.join(output_chunks)
output_chunks.clear()
return encoded_bytes
return None
def close_fmp4_encoder(process : subprocess.Popen[bytes], audio_write_fd : int) -> None:
if audio_write_fd > 0:
os.close(audio_write_fd)
process.stdin.close()
process.terminate()
process.wait(timeout = 5)
def create_rtp_encoder(width : int, height : int, stream_fps : int, stream_quality : int, rtp_port : int) -> subprocess.Popen[bytes]:
commands = ffmpeg_builder.chain(
[ '-use_wallclock_as_timestamps', '1' ],
ffmpeg_builder.capture_video(),
ffmpeg_builder.set_media_resolution(str(width) + 'x' + str(height)),
ffmpeg_builder.set_input('-'),
ffmpeg_builder.set_video_encoder('libx264'),
ffmpeg_builder.set_video_quality('libx264', stream_quality),
ffmpeg_builder.set_video_preset('libx264', 'ultrafast'),
[ '-pix_fmt', 'yuv420p' ],
[ '-profile:v', 'baseline' ],
[ '-tune', 'zerolatency' ],
[ '-maxrate', compute_bitrate(width, height) ],
[ '-bufsize', compute_bufsize(width, height) ],
[ '-g', str(stream_fps) ],
[ '-an' ],
[ '-f', 'rtp' ],
[ '-payload_type', '96' ],
ffmpeg_builder.set_output('rtp://127.0.0.1:' + str(rtp_port) + '?pkt_size=1200')
)
commands = ffmpeg_builder.run(commands)
process = subprocess.Popen(commands, stdin = subprocess.PIPE, stderr = subprocess.PIPE)
return process
def create_h264_pipe_encoder(width : int, height : int, stream_fps : int, stream_quality : int) -> subprocess.Popen[bytes]:
commands = ffmpeg_builder.chain(
[ '-use_wallclock_as_timestamps', '1' ],
ffmpeg_builder.capture_video(),
ffmpeg_builder.set_media_resolution(str(width) + 'x' + str(height)),
ffmpeg_builder.set_input('-'),
ffmpeg_builder.set_video_encoder('libx264'),
ffmpeg_builder.set_video_quality('libx264', stream_quality),
ffmpeg_builder.set_video_preset('libx264', 'ultrafast'),
[ '-pix_fmt', 'yuv420p' ],
[ '-profile:v', 'baseline' ],
[ '-tune', 'zerolatency' ],
[ '-maxrate', compute_bitrate(width, height) ],
[ '-bufsize', compute_bufsize(width, height) ],
[ '-g', '1' ],
[ '-an' ],
[ '-f', 'h264' ],
ffmpeg_builder.set_output('-')
)
commands = ffmpeg_builder.run(commands)
process = subprocess.Popen(commands, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
return process
def create_vp8_pipe_encoder(width : int, height : int, stream_fps : int, stream_quality : int) -> subprocess.Popen[bytes]:
if width > 1280:
height = int(height * 1280 / width)
height = height - (height % 2)
width = 1280
commands = ffmpeg_builder.chain(
[ '-use_wallclock_as_timestamps', '1' ],
ffmpeg_builder.capture_video(),
ffmpeg_builder.set_media_resolution(str(width) + 'x' + str(height)),
ffmpeg_builder.set_input('-'),
[ '-c:v', 'libvpx' ],
[ '-deadline', 'realtime' ],
[ '-cpu-used', '15' ],
[ '-pix_fmt', 'yuv420p' ],
[ '-crf', '30' ],
[ '-b:v', '1000k' ],
[ '-maxrate', '1500k' ],
[ '-bufsize', '2000k' ],
[ '-g', '60' ],
[ '-keyint_min', '30' ],
[ '-error-resilient', '1' ],
[ '-lag-in-frames', '0' ],
[ '-rc_lookahead', '0' ],
[ '-threads', '4' ],
[ '-an' ],
[ '-f', 'ivf' ],
ffmpeg_builder.set_output('-')
)
commands = ffmpeg_builder.run(commands)
process = subprocess.Popen(commands, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
return process
def process_stream_frame(vision_frame : VisionFrame) -> VisionFrame:
return process_vision_frame(vision_frame)
+2 -2
View File
@@ -2,7 +2,7 @@ import importlib
import shutil
from functools import lru_cache
from pathlib import Path
from typing import List, Tuple
from typing import List, Sequence, Tuple
import psutil
@@ -24,7 +24,7 @@ def get_metrics_set() -> Metrics:
@lru_cache()
def detect_static_graphic_devices(execution_providers : Tuple[ExecutionProvider, ...]) -> List[GraphicDevice]:
def detect_static_graphic_devices(execution_providers : Sequence[ExecutionProvider]) -> List[GraphicDevice]:
return detect_graphic_devices(execution_providers)
+273 -20
View File
@@ -6,10 +6,12 @@
<title>WHIP Stream Monitor</title>
<style>
* { box-sizing: border-box; margin: 0; padding: 0; }
body { font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif; background: #0a0a0f; color: #c8c8d0; min-height: 100vh; }
.layout { display: grid; grid-template-columns: 340px 1fr; min-height: 100vh; }
html, body { overflow: hidden; height: 100vh; }
body { font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif; background: #0a0a0f; color: #c8c8d0; }
.layout { display: grid; grid-template-columns: 340px 1fr; height: 100vh; }
.sidebar { background: #12121a; border-right: 1px solid #1e1e2e; padding: 1.2rem; overflow-y: auto; }
.sidebar { background: #12121a; border-right: 1px solid #1e1e2e; padding: 1.2rem; overflow-y: auto; scrollbar-width: none; -ms-overflow-style: none; }
.sidebar::-webkit-scrollbar { display: none; }
.sidebar h2 { font-size: 1rem; color: #fff; margin-bottom: 0.8rem; }
.sidebar h2 .badge { font-size: 0.65rem; padding: 0.15rem 0.4rem; border-radius: 4px; margin-left: 0.4rem; vertical-align: middle; background: linear-gradient(135deg, #6c5ce7, #a855f7); color: #fff; }
@@ -69,6 +71,12 @@
.timeline { display: none; align-items: stretch; padding: 0; background: #0e0e14; border-top: 1px solid #1e1e2e; border-bottom: 1px solid #1e1e2e; flex-shrink: 0; }
.timeline.visible { display: flex; }
.timeline .transport { display: flex; align-items: center; gap: 2px; padding: 0 0.4rem; background: #12121a; border-right: 1px solid #1e1e2e; }
.timeline .transport-btn { width: 28px; height: 28px; border: none; border-radius: 6px; cursor: pointer; display: flex; align-items: center; justify-content: center; background: transparent; color: #888; transition: all 0.15s; }
.timeline .transport-btn:hover { background: #1e1e2e; color: #fff; }
.timeline .transport-btn:disabled { opacity: 0.25; cursor: not-allowed; }
.timeline .transport-btn.active { color: #00b894; }
.timeline .transport-btn svg { width: 14px; height: 14px; fill: currentColor; }
.timeline .time { font-size: 0.75rem; color: #888; font-family: monospace; min-width: 60px; display: flex; align-items: center; justify-content: center; padding: 0 0.6rem; background: #12121a; border-right: 1px solid #1e1e2e; }
.timeline .time:last-child { border-right: none; border-left: 1px solid #1e1e2e; }
.timeline .track { flex: 1; position: relative; height: 2em; cursor: pointer; background: repeating-linear-gradient(90deg, transparent, transparent 59px, #1a1a25 59px, #1a1a25 60px); }
@@ -77,8 +85,11 @@
.timeline .track-playhead:before { content: ''; position: absolute; top: -4px; left: -5px; width: 0; height: 0; border-left: 6px solid transparent; border-right: 6px solid transparent; border-top: 6px solid #a855f7; }
.timeline input[type=range] { position: absolute; top: 0; left: 0; width: 100%; height: 100%; -webkit-appearance: none; appearance: none; background: transparent; outline: none; cursor: pointer; margin: 0; opacity: 0; z-index: 2; }
.timeline input[type=range]::-webkit-slider-thumb { -webkit-appearance: none; width: 1px; height: 100%; cursor: col-resize; }
.timeline input[type=range]:disabled { cursor: default; }
.timeline .track:has(input:disabled) { opacity: 0.4; cursor: default; }
.log-panel { background: #08080c; border-top: 1px solid #1e1e2e; flex-shrink: 0; max-height: 300px; overflow-y: auto; padding: 0.4rem 1rem; font-family: monospace; font-size: 0.8rem; color: #444; }
.log-panel { background: #08080c; border-top: 1px solid #1e1e2e; flex-shrink: 0; max-height: 300px; overflow-y: auto; padding: 0.4rem 1rem; font-family: monospace; font-size: 0.8rem; color: #444; scrollbar-width: none; -ms-overflow-style: none; }
.log-panel::-webkit-scrollbar { display: none; }
.log-panel div { padding: 1px 0; white-space: nowrap; }
.log-panel .log-info { color: #555; }
.log-panel .log-ok { color: #00b894; }
@@ -128,7 +139,24 @@
</div>
<div class="section">
<div class="section-title"><span class="step-dot" id="dotOptions">4</span> Options</div>
<div class="section-title"><span class="step-dot" id="dotMode">4</span> Streaming Mode</div>
<div class="form-row">
<label>Approach
<select id="streamMode">
<option value="whip-mediamtx">FFmpeg WHIP + MediaMTX</option>
<option value="whip-python">aiortc in-process WebRTC</option>
<option value="whip-datachannel">FFmpeg WHIP + libdatachannel relay</option>
<option value="ws-fmp4">FFmpeg fMP4 + WebSocket (MSE)</option>
<option value="datachannel-direct">libdatachannel direct</option>
<option value="whip-aiortc">aiortc WebRTC + audio (no ext. deps)</option>
<option value="ws-mjpeg">MJPEG over WebSocket (no deps)</option>
</select>
</label>
</div>
</div>
<div class="section">
<div class="section-title"><span class="step-dot" id="dotOptions">5</span> Options</div>
<div class="form-row">
<label>Capture Resolution
<select id="captureRes">
@@ -152,9 +180,11 @@
</div>
<div class="section">
<div class="section-title"><span class="step-dot" id="dotStream">5</span> Stream</div>
<button class="btn btn-primary" id="btnConnect" onclick="connect()" disabled>Start Stream</button>
<button class="btn btn-danger" id="btnDisconnect" onclick="disconnect()" disabled>Disconnect</button>
<div class="section-title"><span class="step-dot" id="dotStream">6</span> Stream</div>
<div class="switch-row">
<span>Ready</span>
<span id="streamReadyHint" style="font-size:0.7rem;color:#555;">set source + video first</span>
</div>
</div>
</div>
@@ -235,11 +265,19 @@
<video id="inputVideo" autoplay muted playsinline style="display:none;"></video>
<div class="timeline" id="timeline">
<div class="transport">
<button class="transport-btn" id="btnPlay" onclick="connect()" disabled title="Play">
<svg viewBox="0 0 24 24"><polygon points="6,4 20,12 6,20"/></svg>
</button>
<button class="transport-btn" id="btnStop" onclick="disconnect()" disabled title="Stop">
<svg viewBox="0 0 24 24"><rect x="5" y="5" width="14" height="14" rx="2"/></svg>
</button>
</div>
<span class="time" id="timePosition">0:00</span>
<div class="track">
<div class="track-fill" id="trackFill"></div>
<div class="track-playhead" id="trackPlayhead"></div>
<input type="range" id="timeSlider" min="0" max="100" step="0.1" value="0" oninput="onSeekInput()" onchange="onSeekCommit()">
<input type="range" id="timeSlider" min="0" max="100" step="0.1" value="0" oninput="onSeekInput()" onchange="onSeekCommit()" disabled>
</div>
<span class="time" id="timeDuration">0:00</span>
</div>
@@ -267,11 +305,36 @@ var prevStatsTime = 0;
var prevFramesSent = 0;
var metricsWs = null;
var mediaSource = null;
var sourceBuffer = null;
var mseQueue = [];
var mseReady = false;
var captureCanvas = document.createElement('canvas');
var captureCtx = captureCanvas.getContext('2d');
var audioCtx = null;
var audioWorklet = null;
var audioEchoWs = null;
var audioPlayCtx = null;
var audioPlayNextTime = 0;
var MODE_CONFIG = {
'whip-mediamtx': { wsPath: '/stream/whip', playback: 'whep' },
'whip-python': { wsPath: '/stream/whip-py', playback: 'whep' },
'whip-datachannel': { wsPath: '/stream/whip-dc', playback: 'whep' },
'ws-fmp4': { wsPath: '/stream/live', playback: 'mse' },
'datachannel-direct': { wsPath: '/stream/rtc', playback: 'whep' },
'whip-aiortc': { wsPath: '/stream/whip-aio', playback: 'whep' },
'ws-mjpeg': { wsPath: '/stream/mjpeg', playback: 'mjpeg' }
};
function getMode() {
return document.getElementById('streamMode').value;
}
function getModeConfig() {
return MODE_CONFIG[getMode()];
}
function log(msg, type) {
type = type || 'info';
@@ -305,7 +368,9 @@ function authHeaders() {
function checkConnectReady() {
if (sourceReady && localStream) {
document.getElementById('btnConnect').disabled = false;
document.getElementById('btnPlay').disabled = false;
document.getElementById('streamReadyHint').textContent = 'ready';
document.getElementById('streamReadyHint').style.color = '#00b894';
}
}
@@ -582,7 +647,7 @@ function loadVideoFromFile(file) {
var blobUrl = URL.createObjectURL(file);
vid.src = blobUrl;
vid.loop = true;
vid.muted = false;
vid.muted = true;
vid.volume = 0;
vid.play().then(function() {
@@ -688,6 +753,12 @@ function onSeekCommit() {
timelineVideo.currentTime = t;
log('seek → ' + formatTime(t), 'info');
}
if (audioPlayCtx) {
audioPlayCtx.close();
audioPlayCtx = new AudioContext({ sampleRate: 48000 });
audioPlayNextTime = 0;
}
}
function formatTime(s) {
@@ -824,13 +895,139 @@ function stopAudioCapture() {
}
}
function startAudioEcho() {
var stream = localStream;
if (!stream || stream.getAudioTracks().length === 0) {
log('no audio track for echo', 'warn');
return;
}
audioPlayCtx = new AudioContext({ sampleRate: 48000 });
audioPlayNextTime = 0;
var captureCtxAudio = new AudioContext({ sampleRate: 48000 });
var source = captureCtxAudio.createMediaStreamSource(stream);
var processor = captureCtxAudio.createScriptProcessor(4096, 2, 2);
var echoUrl = wsBase() + '/stream/audio';
var protocols = ['access_token.' + accessToken];
audioEchoWs = new WebSocket(echoUrl, protocols);
audioEchoWs.binaryType = 'arraybuffer';
audioEchoWs.onmessage = function(event) {
if (!audioPlayCtx) return;
var pcm = new Int16Array(event.data);
var samples = pcm.length / 2;
var buffer = audioPlayCtx.createBuffer(2, samples, 48000);
var left = buffer.getChannelData(0);
var right = buffer.getChannelData(1);
for (var i = 0; i < samples; i++) {
left[i] = pcm[i * 2] / 32768;
right[i] = pcm[i * 2 + 1] / 32768;
}
var bufferSource = audioPlayCtx.createBufferSource();
bufferSource.buffer = buffer;
bufferSource.connect(audioPlayCtx.destination);
var now = audioPlayCtx.currentTime;
if (audioPlayNextTime < now) audioPlayNextTime = now + 0.05;
bufferSource.start(audioPlayNextTime);
audioPlayNextTime += buffer.duration;
};
processor.onaudioprocess = function(e) {
if (!audioEchoWs || audioEchoWs.readyState !== WebSocket.OPEN) return;
var left = e.inputBuffer.getChannelData(0);
var right = e.inputBuffer.getChannelData(1);
var pcm = new Int16Array(left.length * 2);
for (var i = 0; i < left.length; i++) {
pcm[i * 2] = Math.max(-32768, Math.min(32767, left[i] * 32768));
pcm[i * 2 + 1] = Math.max(-32768, Math.min(32767, right[i] * 32768));
}
audioEchoWs.send(pcm.buffer);
};
source.connect(processor);
processor.connect(captureCtxAudio.destination);
log('audio echo started (48kHz stereo s16le)', 'ok');
}
function stopAudioEcho() {
if (audioEchoWs) {
audioEchoWs.close();
audioEchoWs = null;
}
if (audioPlayCtx) {
audioPlayCtx.close();
audioPlayCtx = null;
}
audioPlayNextTime = 0;
}
function initMse() {
var video = document.getElementById('outputVideo');
mediaSource = new MediaSource();
video.src = URL.createObjectURL(mediaSource);
mediaSource.addEventListener('sourceopen', function() {
sourceBuffer = mediaSource.addSourceBuffer('video/mp4; codecs="avc1.42E01E,mp4a.40.2"');
sourceBuffer.mode = 'sequence';
mseReady = true;
sourceBuffer.addEventListener('updateend', function() {
if (mseQueue.length > 0 && !sourceBuffer.updating) {
sourceBuffer.appendBuffer(mseQueue.shift());
}
});
log('MSE source buffer ready', 'ok');
});
}
function feedMse(data) {
if (!mseReady || !sourceBuffer) return;
if (sourceBuffer.updating || mseQueue.length > 0) {
mseQueue.push(data);
} else {
sourceBuffer.appendBuffer(data);
}
}
function cleanupMse() {
mseQueue = [];
mseReady = false;
sourceBuffer = null;
if (mediaSource && mediaSource.readyState === 'open') {
mediaSource.endOfStream();
}
mediaSource = null;
}
async function connect() {
var config = getModeConfig();
var mode = getMode();
framesSent = 0;
var wsUrl = wsBase() + '/stream/whip';
var wsUrl = wsBase() + config.wsPath;
var protocols = ['access_token.' + accessToken];
var t0 = performance.now();
log('ws → ' + wsUrl, 'info');
log('[' + mode + '] ws → ' + wsUrl, 'info');
if (config.playback === 'mse') {
initMse();
}
ws = new WebSocket(wsUrl, protocols);
ws.binaryType = 'arraybuffer';
@@ -838,20 +1035,69 @@ async function connect() {
ws.onopen = function() {
log('websocket open (' + Math.round(performance.now() - t0) + 'ms) — sending frames', 'ok');
markDone('dotStream');
document.getElementById('btnConnect').disabled = true;
document.getElementById('btnDisconnect').disabled = false;
markDone('dotMode');
document.getElementById('btnPlay').disabled = true;
document.getElementById('btnPlay').classList.add('active');
document.getElementById('btnStop').disabled = false;
document.getElementById('timeSlider').disabled = false;
streaming = true;
updatePipVisibility();
if (timelineVideo) {
timelineVideo.currentTime = 0;
}
document.getElementById('timePosition').textContent = '0:00';
document.getElementById('timeSlider').value = 0;
updateTrackVisual(0, timelineVideo ? timelineVideo.duration : 0);
captureTimer = setInterval(captureAndSend, 1000 / 30);
startAudioCapture();
startTimelineSync();
if (config.playback === 'mjpeg') {
startAudioEcho();
} else {
startAudioCapture();
}
startStats();
};
var streamStarted = false;
function onFirstOutput() {
if (streamStarted) return;
streamStarted = true;
if (timelineVideo) timelineVideo.play();
startTimelineSync();
log('stream output started', 'ok');
}
ws.onmessage = function(event) {
if (config.playback === 'mse' && event.data instanceof ArrayBuffer) {
onFirstOutput();
feedMse(event.data);
return;
}
if (config.playback === 'mjpeg' && event.data instanceof ArrayBuffer) {
onFirstOutput();
var blob = new Blob([event.data], { type: 'image/jpeg' });
var url = URL.createObjectURL(blob);
var video = document.getElementById('outputVideo');
if (!video._mjpegImg) {
video.style.display = 'none';
var img = document.createElement('img');
img.id = 'mjpegOutput';
img.style.cssText = 'width:100%;height:100%;object-fit:contain;border-radius:8px;';
video.parentNode.appendChild(img);
video._mjpegImg = img;
}
if (video._mjpegImg._prevUrl) URL.revokeObjectURL(video._mjpegImg._prevUrl);
video._mjpegImg.src = url;
video._mjpegImg._prevUrl = url;
return;
}
if (typeof event.data === 'string' && !whepUrlFromServer) {
whepUrlFromServer = event.data;
onFirstOutput();
log('stream ready (' + Math.round(performance.now() - t0) + 'ms) — WHEP url: ' + whepUrlFromServer, 'ok');
if (!window.RTCPeerConnection && !window.webkitRTCPeerConnection) {
@@ -883,19 +1129,24 @@ function stopStreaming() {
streaming = false;
updatePipVisibility();
stopStats();
cleanupMse();
if (captureTimer) {
clearInterval(captureTimer);
captureTimer = null;
}
document.getElementById('btnConnect').disabled = false;
document.getElementById('btnDisconnect').disabled = true;
document.getElementById('btnPlay').disabled = false;
document.getElementById('btnPlay').classList.remove('active');
document.getElementById('btnStop').disabled = true;
document.getElementById('timeSlider').disabled = true;
document.getElementById('dotStream').classList.remove('done');
document.getElementById('dotMode').classList.remove('done');
}
function disconnect() {
stopAudioCapture();
stopAudioEcho();
stopTimelineSync();
if (pc) {
@@ -910,7 +1161,9 @@ function disconnect() {
whepUrlFromServer = null;
document.getElementById('outputVideo').srcObject = null;
var video = document.getElementById('outputVideo');
video.srcObject = null;
video.src = '';
stopStreaming();
log('disconnected', 'warn');
}