mirror of
https://github.com/facefusion/facefusion.git
synced 2026-06-02 10:51:39 +02:00
AV1 codec support (#1111)
* restructure xxx_encoders, introduce av1 codec * get rid of strip_temporal_delimiters * improve testing * fix test for macos * improve testing
This commit is contained in:
@@ -10,10 +10,10 @@ from starlette.websockets import WebSocket, WebSocketState
|
||||
from facefusion import rtc_store, session_context, session_manager, state_manager
|
||||
from facefusion.apis.api_helper import get_sec_websocket_protocol
|
||||
from facefusion.apis.session_helper import extract_access_token
|
||||
from facefusion.audio_encoder import create_opus_encoder, destroy_opus_encoder, encode_opus_buffer
|
||||
from facefusion.codecs.opus import create_opus_encoder, destroy_opus_encoder, encode_opus_buffer
|
||||
from facefusion.codecs.vpx import create_vpx_encoder, destroy_vpx_encoder, encode_vpx_buffer
|
||||
from facefusion.streamer import process_vision_frame
|
||||
from facefusion.types import Resolution, SessionId, VisionFrame
|
||||
from facefusion.video_encoder import create_vpx_encoder, destroy_vpx_encoder, encode_vpx_buffer
|
||||
|
||||
|
||||
async def receive_stream_frames(websocket : WebSocket) -> AsyncIterator[Tuple[int, bytes]]:
|
||||
|
||||
@@ -0,0 +1,74 @@
|
||||
import ctypes
|
||||
import struct
|
||||
from typing import Optional
|
||||
|
||||
from facefusion.libraries import aom as aom_module
|
||||
from facefusion.types import AomEncoder, BitRate, Resolution
|
||||
|
||||
|
||||
def create_aom_encoder(frame_resolution : Resolution, bitrate : BitRate, thread_count : int, cpu_count : int) -> Optional[AomEncoder]:
|
||||
aom_library = aom_module.create_static_library()
|
||||
|
||||
if aom_library:
|
||||
aom_encoder = ctypes.create_string_buffer(1024)
|
||||
aom_codec = ctypes.c_void_p.in_dll(aom_library, 'aom_codec_av1_cx_algo')
|
||||
|
||||
config_buffer = ctypes.create_string_buffer(4096)
|
||||
|
||||
if aom_library.aom_codec_enc_config_default(ctypes.byref(aom_codec), config_buffer, 1) == 0:
|
||||
struct.pack_into('I', config_buffer, 4, thread_count)
|
||||
struct.pack_into('I', config_buffer, 12, frame_resolution[0])
|
||||
struct.pack_into('I', config_buffer, 16, frame_resolution[1])
|
||||
struct.pack_into('I', config_buffer, 136, bitrate)
|
||||
|
||||
if aom_library.aom_codec_enc_init_ver(aom_encoder, ctypes.byref(aom_codec), config_buffer, 0, 25) == 0:
|
||||
aom_library.aom_codec_control(aom_encoder, 13, ctypes.c_int(cpu_count))
|
||||
aom_library.aom_codec_control(aom_encoder, 75, ctypes.c_int(2))
|
||||
aom_library.aom_codec_control(aom_encoder, 106, ctypes.c_int(1))
|
||||
aom_library.aom_codec_control(aom_encoder, 122, ctypes.c_int(0))
|
||||
aom_library.aom_codec_control(aom_encoder, 123, ctypes.c_int(0))
|
||||
return aom_encoder
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def encode_aom_buffer(aom_encoder : AomEncoder, input_buffer : bytes, frame_resolution : Resolution, frame_index : int) -> bytes:
|
||||
aom_library = aom_module.create_static_library()
|
||||
output_buffer = b''
|
||||
|
||||
if aom_library:
|
||||
temp_buffer = ctypes.create_string_buffer(512)
|
||||
encode_buffer = ctypes.create_string_buffer(input_buffer)
|
||||
|
||||
if aom_library.aom_img_wrap(temp_buffer, 0x102, frame_resolution[0], frame_resolution[1], 1, encode_buffer) and aom_library.aom_codec_encode(aom_encoder, temp_buffer, frame_index, 1, 0, 1) == 0:
|
||||
output_buffer = collect_aom_buffer(aom_encoder)
|
||||
|
||||
if output_buffer.startswith(bytes([ 0x12, 0x00 ])):
|
||||
output_buffer = output_buffer[2:]
|
||||
|
||||
return output_buffer
|
||||
|
||||
|
||||
def collect_aom_buffer(aom_encoder : AomEncoder) -> bytes:
|
||||
aom_library = aom_module.create_static_library()
|
||||
output_buffer = b''
|
||||
|
||||
packet_cursor = ctypes.c_void_p(0)
|
||||
packet = aom_library.aom_codec_get_cx_data(aom_encoder, ctypes.byref(packet_cursor))
|
||||
|
||||
while packet:
|
||||
if ctypes.c_int.from_address(packet).value == 0:
|
||||
buffer_pointer = ctypes.c_void_p.from_address(packet + 8).value
|
||||
buffer_size = ctypes.c_size_t.from_address(packet + 16).value
|
||||
output_buffer += ctypes.string_at(buffer_pointer, buffer_size)
|
||||
|
||||
packet = aom_library.aom_codec_get_cx_data(aom_encoder, ctypes.byref(packet_cursor))
|
||||
|
||||
return output_buffer
|
||||
|
||||
|
||||
def destroy_aom_encoder(aom_encoder : AomEncoder) -> None:
|
||||
aom_library = aom_module.create_static_library()
|
||||
|
||||
if aom_library:
|
||||
aom_library.aom_codec_destroy(aom_encoder)
|
||||
@@ -90,6 +90,7 @@ MelFilterBank : TypeAlias = NDArray[Any]
|
||||
Voice : TypeAlias = NDArray[Any]
|
||||
VoiceChunk : TypeAlias = NDArray[Any]
|
||||
|
||||
AomEncoder : TypeAlias = ctypes.Array[ctypes.c_char]
|
||||
OpusEncoder : TypeAlias = ctypes.c_void_p
|
||||
VpxEncoder : TypeAlias = ctypes.Array[ctypes.c_char]
|
||||
|
||||
|
||||
@@ -0,0 +1,48 @@
|
||||
from unittest.mock import patch
|
||||
|
||||
import cv2
|
||||
import pytest
|
||||
from tests.assert_helper import get_test_example_file, get_test_examples_directory
|
||||
|
||||
from facefusion import state_manager
|
||||
from facefusion.codecs.aom import create_aom_encoder, destroy_aom_encoder, encode_aom_buffer
|
||||
from facefusion.common_helper import is_linux, is_macos, is_windows
|
||||
from facefusion.download import conditional_download
|
||||
from facefusion.hash_helper import create_hash
|
||||
from facefusion.libraries import aom as aom_module
|
||||
from facefusion.vision import read_video_frame
|
||||
|
||||
|
||||
@pytest.fixture(scope = 'module', autouse = True)
|
||||
def before_all() -> None:
|
||||
state_manager.init_item('download_providers', [ 'github', 'huggingface' ])
|
||||
|
||||
conditional_download(get_test_examples_directory(), [ 'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/target-240p.mp4' ])
|
||||
|
||||
aom_module.pre_check()
|
||||
|
||||
|
||||
def test_create_aom_encoder() -> None:
|
||||
assert create_aom_encoder((320, 240), 1000, 8, 16)
|
||||
assert create_aom_encoder((0, 0), 0, 0, 0) is None
|
||||
|
||||
|
||||
def test_encode_aom_buffer() -> None:
|
||||
vision_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
|
||||
video_buffer = cv2.cvtColor(vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes()
|
||||
video_resolution = (vision_frame.shape[1], vision_frame.shape[0])
|
||||
aom_encoder = create_aom_encoder(video_resolution, 1000, 1, 0)
|
||||
|
||||
if is_linux() or is_windows():
|
||||
assert create_hash(encode_aom_buffer(aom_encoder, video_buffer, video_resolution, 3)) == '4b621fb8'
|
||||
|
||||
if is_macos():
|
||||
assert create_hash(encode_aom_buffer(aom_encoder, video_buffer, video_resolution, 3)) == '64c12977'
|
||||
|
||||
|
||||
def test_destroy_aom_encoder() -> None:
|
||||
aom_encoder = create_aom_encoder((320, 240), 1000, 8, 16)
|
||||
|
||||
with patch.object(aom_module.create_static_library(), 'aom_codec_destroy') as mock:
|
||||
destroy_aom_encoder(aom_encoder)
|
||||
mock.assert_called_once_with(aom_encoder)
|
||||
@@ -5,7 +5,7 @@ import pytest
|
||||
from tests.assert_helper import get_test_example_file, get_test_examples_directory
|
||||
|
||||
from facefusion import state_manager
|
||||
from facefusion.audio_encoder import create_opus_encoder, destroy_opus_encoder, encode_opus_buffer
|
||||
from facefusion.codecs.opus import create_opus_encoder, destroy_opus_encoder, encode_opus_buffer
|
||||
from facefusion.common_helper import is_linux, is_macos, is_windows
|
||||
from facefusion.download import conditional_download
|
||||
from facefusion.ffmpeg import read_audio_buffer
|
||||
@@ -29,15 +29,14 @@ def test_create_opus_encoder() -> None:
|
||||
|
||||
def test_encode_opus_buffer() -> None:
|
||||
audio_buffer = read_audio_buffer(get_test_example_file('source.mp3'), 48000, 16, 2)
|
||||
input_samples = numpy.frombuffer(audio_buffer, dtype = numpy.int16).astype(numpy.float32) / 32768.0
|
||||
input_buffer = input_samples[:1920].tobytes()
|
||||
audio_sample = numpy.frombuffer(audio_buffer, dtype = numpy.int16).astype(numpy.float32) / 32768.0
|
||||
opus_encoder = create_opus_encoder(48000, 2)
|
||||
|
||||
if is_linux() or is_windows():
|
||||
assert create_hash(encode_opus_buffer(opus_encoder, input_buffer, 960)) == '8abe71cf'
|
||||
assert create_hash(encode_opus_buffer(opus_encoder, audio_sample.tobytes(), 960)) == '8abe71cf'
|
||||
|
||||
if is_macos():
|
||||
assert create_hash(encode_opus_buffer(opus_encoder, input_buffer, 960)) == '8ecd1108'
|
||||
assert create_hash(encode_opus_buffer(opus_encoder, audio_sample.tobytes(), 960)) == '8ecd1108'
|
||||
|
||||
|
||||
def test_destroy_opus_encoder() -> None:
|
||||
@@ -5,11 +5,11 @@ import pytest
|
||||
from tests.assert_helper import get_test_example_file, get_test_examples_directory
|
||||
|
||||
from facefusion import state_manager
|
||||
from facefusion.codecs.vpx import create_vpx_encoder, destroy_vpx_encoder, encode_vpx_buffer
|
||||
from facefusion.common_helper import is_linux, is_macos, is_windows
|
||||
from facefusion.download import conditional_download
|
||||
from facefusion.hash_helper import create_hash
|
||||
from facefusion.libraries import vpx as vpx_module
|
||||
from facefusion.video_encoder import create_vpx_encoder, destroy_vpx_encoder, encode_vpx_buffer
|
||||
from facefusion.vision import read_video_frame
|
||||
|
||||
|
||||
@@ -29,19 +29,15 @@ def test_create_vpx_encoder() -> None:
|
||||
|
||||
def test_encode_vpx_buffer() -> None:
|
||||
vision_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
|
||||
frame_resolution = (vision_frame.shape[1], vision_frame.shape[0])
|
||||
vpx_encoder = create_vpx_encoder(frame_resolution, 1000, 1, 0)
|
||||
|
||||
buffer_valid = cv2.cvtColor(vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes()
|
||||
buffer_invalid = bytes(0)
|
||||
video_buffer = cv2.cvtColor(vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes()
|
||||
video_resolution = (vision_frame.shape[1], vision_frame.shape[0])
|
||||
vpx_encoder = create_vpx_encoder(video_resolution, 1000, 1, 0)
|
||||
|
||||
if is_linux() or is_windows():
|
||||
assert create_hash(encode_vpx_buffer(vpx_encoder, buffer_valid, frame_resolution, 3)) == 'ce133a1f'
|
||||
assert create_hash(encode_vpx_buffer(vpx_encoder, video_buffer, video_resolution, 3)) == 'ce133a1f'
|
||||
|
||||
if is_macos():
|
||||
assert create_hash(encode_vpx_buffer(vpx_encoder, buffer_valid, frame_resolution, 3)) == '21c36925'
|
||||
|
||||
assert encode_vpx_buffer(vpx_encoder, buffer_invalid, frame_resolution, 0) == b''
|
||||
assert create_hash(encode_vpx_buffer(vpx_encoder, video_buffer, video_resolution, 3)) == '21c36925'
|
||||
|
||||
|
||||
def test_destroy_vpx_encoder() -> None:
|
||||
Reference in New Issue
Block a user