Add video_encoder.py (#1094)

* add video_encoder.py

* rename pts to presentation_timestamp

* improve test

* improve test

* cleanup

* cleanup

* fix lint
This commit is contained in:
Harisreedhar
2026-05-11 15:37:25 +05:30
committed by henryruhs
parent 6eace4ce29
commit bb1b8ccf13
3 changed files with 99 additions and 66 deletions
+2 -66
View File
@@ -1,7 +1,5 @@
import asyncio
import ctypes
import multiprocessing
import struct
from collections import deque
from collections.abc import AsyncIterator
from typing import Optional, Tuple
@@ -13,9 +11,10 @@ from starlette.websockets import WebSocket, WebSocketState
from facefusion import rtc_store, session_context, session_manager, state_manager
from facefusion.apis.api_helper import get_sec_websocket_protocol
from facefusion.apis.session_helper import extract_access_token
from facefusion.libraries import opus as opus_module, vpx as vpx_module
from facefusion.libraries import opus as opus_module
from facefusion.streamer import process_vision_frame
from facefusion.types import Resolution, SessionId, VisionFrame
from facefusion.video_encoder import create_vpx_encoder, destroy_vpx_encoder, encode_vpx
async def receive_stream_frames(websocket : WebSocket) -> AsyncIterator[Tuple[int, bytes]]:
@@ -43,69 +42,6 @@ async def receive_vision_frames(websocket : WebSocket) -> AsyncIterator[VisionFr
websocket_event = await websocket.receive()
# TODO: move to facefusion/vpx_encoder.py
def create_vpx_encoder(width : int, height : int, bitrate : int) -> Optional[ctypes.Array[ctypes.c_char]]:
vpx_library = vpx_module.create_static_library()
if vpx_library:
vp8_iface = ctypes.c_void_p.in_dll(vpx_library, 'vpx_codec_vp8_cx_algo')
config_buffer = ctypes.create_string_buffer(4096)
if vpx_library.vpx_codec_enc_config_default(ctypes.byref(vp8_iface), config_buffer, 0) == 0:
thread_count = min(multiprocessing.cpu_count(), 8)
struct.pack_into('I', config_buffer, 4, thread_count)
struct.pack_into('I', config_buffer, 12, width)
struct.pack_into('I', config_buffer, 16, height)
struct.pack_into('I', config_buffer, 72, 2)
struct.pack_into('I', config_buffer, 112, bitrate)
struct.pack_into('I', config_buffer, 116, 2)
struct.pack_into('I', config_buffer, 120, 50)
struct.pack_into('I', config_buffer, 124, 50)
struct.pack_into('I', config_buffer, 128, 50)
context_buffer = ctypes.create_string_buffer(512)
if vpx_library.vpx_codec_enc_init_ver(context_buffer, ctypes.byref(vp8_iface), config_buffer, 0, 39) == 0:
vpx_library.vpx_codec_control_(context_buffer, 13, ctypes.c_int(16))
vpx_library.vpx_codec_control_(context_buffer, 12, ctypes.c_int(3))
vpx_library.vpx_codec_control_(context_buffer, 27, ctypes.c_int(10))
return context_buffer
return None
# TODO: move to facefusion/vpx_encoder.py
def encode_vpx(codec_context : ctypes.Array[ctypes.c_char], yuv_buffer : bytes, width : int, height : int, pts : int, flags : int) -> bytes:
vpx_library = vpx_module.create_static_library()
frame_buffer = b''
if vpx_library:
image_buffer = ctypes.create_string_buffer(512)
yuv_string_buffer = ctypes.create_string_buffer(yuv_buffer)
if vpx_library.vpx_img_wrap(image_buffer, 0x102, width, height, 1, yuv_string_buffer):
if vpx_library.vpx_codec_encode(codec_context, image_buffer, pts, 1, flags, 1) == 0:
iterator = ctypes.c_void_p(0)
packet = vpx_library.vpx_codec_get_cx_data(codec_context, ctypes.byref(iterator))
while packet:
if ctypes.c_int.from_address(packet).value == 0:
buffer_pointer = ctypes.c_void_p.from_address(packet + 8).value
buffer_size = ctypes.c_size_t.from_address(packet + 16).value
frame_buffer += ctypes.string_at(buffer_pointer, buffer_size)
packet = vpx_library.vpx_codec_get_cx_data(codec_context, ctypes.byref(iterator))
return frame_buffer
# TODO: move to facefusion/vpx_encoder.py
def destroy_vpx_encoder(codec_context : ctypes.Array[ctypes.c_char]) -> None:
vpx_library = vpx_module.create_static_library()
if vpx_library:
vpx_library.vpx_codec_destroy(codec_context)
# TODO: move to facefusion/opus_encoder.py
def create_opus_encoder(sample_rate : int, channels : int) -> Optional[ctypes.c_void_p]:
opus_library = opus_module.create_static_library()
+69
View File
@@ -0,0 +1,69 @@
import ctypes
import multiprocessing
import struct
from typing import Optional
from facefusion.libraries import vpx as vpx_module
# TODO this method needs refinement
def create_vpx_encoder(width : int, height : int, bitrate : int) -> Optional[ctypes.Array[ctypes.c_char]]:
vpx_library = vpx_module.create_static_library()
if vpx_library:
vp8_descriptor = ctypes.c_void_p.in_dll(vpx_library, 'vpx_codec_vp8_cx_algo')
config_buffer = ctypes.create_string_buffer(4096)
if vpx_library.vpx_codec_enc_config_default(ctypes.byref(vp8_descriptor), config_buffer, 0) == 0:
thread_count = min(multiprocessing.cpu_count(), 8)
struct.pack_into('I', config_buffer, 4, thread_count)
struct.pack_into('I', config_buffer, 12, width)
struct.pack_into('I', config_buffer, 16, height)
struct.pack_into('I', config_buffer, 72, 2)
struct.pack_into('I', config_buffer, 112, bitrate)
struct.pack_into('I', config_buffer, 116, 2)
struct.pack_into('I', config_buffer, 120, 50)
struct.pack_into('I', config_buffer, 124, 50)
struct.pack_into('I', config_buffer, 128, 50)
context_buffer = ctypes.create_string_buffer(512)
if vpx_library.vpx_codec_enc_init_ver(context_buffer, ctypes.byref(vp8_descriptor), config_buffer, 0, 39) == 0:
vpx_library.vpx_codec_control_(context_buffer, 13, ctypes.c_int(16))
vpx_library.vpx_codec_control_(context_buffer, 12, ctypes.c_int(3))
vpx_library.vpx_codec_control_(context_buffer, 27, ctypes.c_int(10))
return context_buffer
return None
# TODO this method needs refinement
def encode_vpx(codec_context : ctypes.Array[ctypes.c_char], yuv_buffer : bytes, width : int, height : int, presentation_timestamp : int, flags : int) -> bytes:
vpx_library = vpx_module.create_static_library()
frame_buffer = b''
if vpx_library:
image_buffer = ctypes.create_string_buffer(512)
yuv_string_buffer = ctypes.create_string_buffer(yuv_buffer)
if vpx_library.vpx_img_wrap(image_buffer, 0x102, width, height, 1, yuv_string_buffer):
if vpx_library.vpx_codec_encode(codec_context, image_buffer, presentation_timestamp, 1, flags, 1) == 0:
iterator = ctypes.c_void_p(0)
packet = vpx_library.vpx_codec_get_cx_data(codec_context, ctypes.byref(iterator))
while packet:
if ctypes.c_int.from_address(packet).value == 0:
buffer_pointer = ctypes.c_void_p.from_address(packet + 8).value
buffer_size = ctypes.c_size_t.from_address(packet + 16).value
frame_buffer += ctypes.string_at(buffer_pointer, buffer_size)
packet = vpx_library.vpx_codec_get_cx_data(codec_context, ctypes.byref(iterator))
return frame_buffer
# TODO not 100 sure this makes full sense. should we not run clear on the lru-cache instead?
def destroy_vpx_encoder(codec_context : ctypes.Array[ctypes.c_char]) -> None:
vpx_library = vpx_module.create_static_library()
if vpx_library:
vpx_library.vpx_codec_destroy(codec_context)
+28
View File
@@ -0,0 +1,28 @@
import cv2
import pytest
from tests.assert_helper import get_test_example_file, get_test_examples_directory
from facefusion import state_manager
from facefusion.download import conditional_download
from facefusion.libraries import vpx as vpx_module
from facefusion.video_encoder import create_vpx_encoder, encode_vpx
from facefusion.vision import read_video_frame
@pytest.fixture(scope = 'module', autouse = True)
def before_all() -> None:
state_manager.init_item('download_providers', [ 'github', 'huggingface' ])
conditional_download(get_test_examples_directory(), [ 'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/target-240p.mp4' ])
vpx_module.pre_check()
def test_encode_vpx() -> None:
vision_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
height, width = vision_frame.shape[:2]
buffer_valid = cv2.cvtColor(vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes()
buffer_invalid = bytes(0)
vpx_encoder = create_vpx_encoder(width, height, 1000)
assert encode_vpx(vpx_encoder, buffer_valid, width, height, 3, 1)
assert encode_vpx(vpx_encoder, buffer_invalid, width, height, 0, 0) == b''