mirror of
https://github.com/facefusion/facefusion.git
synced 2026-06-02 10:51:39 +02:00
Tiny refactor of codecs (#1121)
* improve performance using pointers * simplify decoder's collect * simplify decoder's collect * add threading to decoders * fix test * switch back to return bytes * fix macos
This commit is contained in:
@@ -130,19 +130,14 @@ def run_peer_loop(session_id : SessionId, rtc_peer : RtcPeer) -> None:
|
||||
cleanup_peer(session_id, rtc_peer, video_codec, video_decoder, audio_decoder)
|
||||
return
|
||||
|
||||
resolution = read_video_resolution(video_codec, video_decoder, frame_buffer)
|
||||
|
||||
if resolution is None:
|
||||
cleanup_peer(session_id, rtc_peer, video_codec, video_decoder, audio_decoder)
|
||||
return
|
||||
|
||||
vision_frame = decode_video_frame(video_codec, video_decoder, frame_buffer, resolution)
|
||||
vision_frame = decode_video_frame(video_codec, video_decoder, frame_buffer)
|
||||
|
||||
if vision_frame is None:
|
||||
cleanup_peer(session_id, rtc_peer, video_codec, video_decoder, audio_decoder)
|
||||
return
|
||||
|
||||
audio_frame = create_empty_audio_frame()
|
||||
resolution : Resolution = (vision_frame.shape[1], vision_frame.shape[0])
|
||||
video_encoder = create_video_encoder(video_codec, resolution)
|
||||
audio_encoder = opus_encoder.create(48000, 2)
|
||||
frame_index = 0
|
||||
@@ -179,13 +174,13 @@ def run_peer_loop(session_id : SessionId, rtc_peer : RtcPeer) -> None:
|
||||
|
||||
frame_index += 1
|
||||
|
||||
next_frame = drain_to_latest_frame(datachannel_library, video_info.get('receiver_track'), video_codec, video_decoder, video_receive_buffer, resolution)
|
||||
next_frame = drain_to_latest_frame(datachannel_library, video_info.get('receiver_track'), video_codec, video_decoder, video_receive_buffer)
|
||||
|
||||
if next_frame is not None:
|
||||
vision_frame = next_frame
|
||||
continue
|
||||
|
||||
next_frame = poll_for_frame(datachannel_library, video_info.get('receiver_track'), video_codec, video_decoder, video_receive_buffer, resolution, 30.0)
|
||||
next_frame = poll_for_frame(datachannel_library, video_info.get('receiver_track'), video_codec, video_decoder, video_receive_buffer, 30.0)
|
||||
|
||||
if next_frame is None:
|
||||
break
|
||||
@@ -214,9 +209,9 @@ def cleanup_peer(session_id : SessionId, rtc_peer : RtcPeer, video_codec : Video
|
||||
#TODO: needs review
|
||||
def create_video_decoder(video_codec : VideoCodec) -> Optional[VpxDecoder | AomDecoder]:
|
||||
if video_codec == 'av1':
|
||||
return aom_decoder.create()
|
||||
return aom_decoder.create(8)
|
||||
if video_codec == 'vp8':
|
||||
return vpx_decoder.create()
|
||||
return vpx_decoder.create(8)
|
||||
|
||||
return None
|
||||
|
||||
@@ -239,27 +234,19 @@ def destroy_video_encoder(video_codec : VideoCodec, video_encoder : Optional[Vpx
|
||||
vpx_encoder.destroy(video_encoder)
|
||||
|
||||
|
||||
def read_video_resolution(video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder, frame_buffer : bytes) -> Optional[Resolution]:
|
||||
def decode_video_frame(video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder, frame_buffer : bytes) -> Optional[VisionFrame]:
|
||||
if video_codec == 'av1':
|
||||
return aom_decoder.read_resolution(video_decoder, frame_buffer)
|
||||
aom_pointer = aom_decoder.decode(video_decoder, frame_buffer)
|
||||
if aom_pointer:
|
||||
frame_width, frame_height = aom_pointer.get('resolution')
|
||||
yuv_frame = numpy.frombuffer(aom_pointer.get('buffer'), dtype = numpy.uint8).reshape((frame_height * 3 // 2, frame_width))
|
||||
return cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR_I420)
|
||||
if video_codec == 'vp8':
|
||||
return vpx_decoder.read_resolution(video_decoder, frame_buffer)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def decode_video_frame(video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder, frame_buffer : bytes, frame_resolution : Resolution) -> Optional[VisionFrame]:
|
||||
output_buffer = bytes()
|
||||
|
||||
if video_codec == 'av1':
|
||||
output_buffer = aom_decoder.decode(video_decoder, frame_buffer)
|
||||
if video_codec == 'vp8':
|
||||
output_buffer = vpx_decoder.decode(video_decoder, frame_buffer)
|
||||
|
||||
if output_buffer:
|
||||
frame_width, frame_height = frame_resolution
|
||||
yuv_frame = numpy.frombuffer(output_buffer, dtype = numpy.uint8).reshape((frame_height * 3 // 2, frame_width))
|
||||
return cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR_I420)
|
||||
vpx_pointer = vpx_decoder.decode(video_decoder, frame_buffer)
|
||||
if vpx_pointer:
|
||||
frame_width, frame_height = vpx_pointer.get('resolution')
|
||||
yuv_frame = numpy.frombuffer(vpx_pointer.get('buffer'), dtype = numpy.uint8).reshape((frame_height * 3 // 2, frame_width))
|
||||
return cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR_I420)
|
||||
|
||||
return None
|
||||
|
||||
@@ -304,11 +291,11 @@ def poll_for_buffer(datachannel_library : ctypes.CDLL, video_track : int, receiv
|
||||
|
||||
|
||||
#TODO: needs review
|
||||
def poll_for_frame(datachannel_library : ctypes.CDLL, video_track : int, video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder, receive_buffer : ctypes.Array[ctypes.c_char], frame_resolution : Resolution, timeout : float) -> Optional[VisionFrame]:
|
||||
def poll_for_frame(datachannel_library : ctypes.CDLL, video_track : int, video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder, receive_buffer : ctypes.Array[ctypes.c_char], timeout : float) -> Optional[VisionFrame]:
|
||||
deadline = time.monotonic() + timeout
|
||||
|
||||
while time.monotonic() < deadline:
|
||||
vision_frame = try_receive_frame(datachannel_library, video_track, video_codec, video_decoder, receive_buffer, frame_resolution)
|
||||
vision_frame = try_receive_frame(datachannel_library, video_track, video_codec, video_decoder, receive_buffer)
|
||||
|
||||
if vision_frame is not None:
|
||||
return vision_frame
|
||||
@@ -319,17 +306,17 @@ def poll_for_frame(datachannel_library : ctypes.CDLL, video_track : int, video_c
|
||||
|
||||
|
||||
#TODO: needs review
|
||||
def try_receive_frame(datachannel_library : ctypes.CDLL, video_track : int, video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder, receive_buffer : ctypes.Array[ctypes.c_char], frame_resolution : Resolution) -> Optional[VisionFrame]:
|
||||
def try_receive_frame(datachannel_library : ctypes.CDLL, video_track : int, video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder, receive_buffer : ctypes.Array[ctypes.c_char]) -> Optional[VisionFrame]:
|
||||
frame_buffer = receive_video_buffer(datachannel_library, video_track, receive_buffer)
|
||||
|
||||
if frame_buffer:
|
||||
return decode_video_frame(video_codec, video_decoder, frame_buffer, frame_resolution)
|
||||
return decode_video_frame(video_codec, video_decoder, frame_buffer)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
#TODO: needs review
|
||||
def drain_to_latest_frame(datachannel_library : ctypes.CDLL, video_track : int, video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder, receive_buffer : ctypes.Array[ctypes.c_char], frame_resolution : Resolution) -> Optional[VisionFrame]:
|
||||
def drain_to_latest_frame(datachannel_library : ctypes.CDLL, video_track : int, video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder, receive_buffer : ctypes.Array[ctypes.c_char]) -> Optional[VisionFrame]:
|
||||
last_vision_frame = numpy.empty(0)
|
||||
buffer_size = ctypes.c_int(512 * 1024)
|
||||
receive_output = 0
|
||||
@@ -340,7 +327,7 @@ def drain_to_latest_frame(datachannel_library : ctypes.CDLL, video_track : int,
|
||||
|
||||
if receive_output == 0 and buffer_size.value > 0:
|
||||
frame_buffer = receive_buffer.raw[:buffer_size.value]
|
||||
vision_frame = decode_video_frame(video_codec, video_decoder, frame_buffer, frame_resolution)
|
||||
vision_frame = decode_video_frame(video_codec, video_decoder, frame_buffer)
|
||||
|
||||
if numpy.any(vision_frame):
|
||||
last_vision_frame = vision_frame
|
||||
|
||||
@@ -1,52 +1,56 @@
|
||||
import ctypes
|
||||
import struct
|
||||
from typing import Optional
|
||||
|
||||
from facefusion.libraries import aom as aom_module
|
||||
from facefusion.types import AomDecoder, Resolution
|
||||
from facefusion.types import AomDecoder, AomPointer
|
||||
|
||||
|
||||
def create() -> Optional[AomDecoder]:
|
||||
def create(thread_count : int) -> Optional[AomDecoder]:
|
||||
aom_library = aom_module.create_static_library()
|
||||
|
||||
if aom_library:
|
||||
aom_decoder = ctypes.create_string_buffer(128)
|
||||
aom_codec = ctypes.c_void_p.in_dll(aom_library, 'aom_codec_av1_dx_algo')
|
||||
config_buffer = ctypes.create_string_buffer(128)
|
||||
|
||||
if aom_library.aom_codec_dec_init_ver(aom_decoder, ctypes.byref(aom_codec), None, 0, 22) == 0:
|
||||
struct.pack_into('I', config_buffer, 0, thread_count)
|
||||
struct.pack_into('I', config_buffer, 12, 1)
|
||||
|
||||
if aom_library.aom_codec_dec_init_ver(aom_decoder, ctypes.byref(aom_codec), config_buffer, 0, 22) == 0:
|
||||
return aom_decoder
|
||||
|
||||
return None
|
||||
|
||||
|
||||
#TODO: needs review
|
||||
def decode(aom_decoder : AomDecoder, input_buffer : bytes) -> bytes:
|
||||
def decode(aom_decoder : AomDecoder, input_buffer : bytes) -> Optional[AomPointer]:
|
||||
aom_library = aom_module.create_static_library()
|
||||
output_buffer = bytes()
|
||||
|
||||
if aom_library and input_buffer:
|
||||
input_total = len(input_buffer)
|
||||
temp_buffer = (ctypes.c_uint8 * input_total).from_buffer_copy(input_buffer)
|
||||
temp_buffer = ctypes.create_string_buffer(input_buffer)
|
||||
|
||||
if aom_library.aom_codec_decode(aom_decoder, temp_buffer, input_total, None) == 0:
|
||||
frame_pointer = aom_library.aom_codec_get_frame(aom_decoder, ctypes.byref(ctypes.c_void_p(0)))
|
||||
address = aom_library.aom_codec_get_frame(aom_decoder, ctypes.byref(ctypes.c_void_p(0)))
|
||||
|
||||
if frame_pointer:
|
||||
output_buffer = collect(frame_pointer)
|
||||
if address:
|
||||
frame_width = ctypes.c_uint.from_address(address + 28).value & ~1
|
||||
frame_height = ctypes.c_uint.from_address(address + 32).value & ~1
|
||||
|
||||
return output_buffer
|
||||
return AomPointer(
|
||||
buffer = collect(address, frame_width, frame_height),
|
||||
resolution = (frame_width, frame_height)
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
#TODO: needs review
|
||||
def collect(frame_pointer : int) -> bytes:
|
||||
frame_width = ctypes.c_uint.from_address(frame_pointer + 28).value & ~1
|
||||
frame_height = ctypes.c_uint.from_address(frame_pointer + 32).value & ~1
|
||||
planes_offset = frame_pointer + 64
|
||||
strides_offset = frame_pointer + 88
|
||||
def collect(address : int, frame_width : int, frame_height : int) -> bytes:
|
||||
output_buffer = bytes()
|
||||
|
||||
for index in range(3):
|
||||
plane_pointer = ctypes.c_void_p.from_address(planes_offset + index * 8).value
|
||||
stride = ctypes.c_int.from_address(strides_offset + index * 4).value
|
||||
plane_pointer = ctypes.c_void_p.from_address(address + 64 + index * 8).value
|
||||
stride = ctypes.c_int.from_address(address + 88 + index * 4).value
|
||||
plane_width = frame_width >> (index > 0)
|
||||
plane_height = frame_height >> (index > 0)
|
||||
|
||||
@@ -56,25 +60,6 @@ def collect(frame_pointer : int) -> bytes:
|
||||
return output_buffer
|
||||
|
||||
|
||||
#TODO: needs review
|
||||
def read_resolution(aom_decoder : AomDecoder, input_buffer : bytes) -> Optional[Resolution]:
|
||||
aom_library = aom_module.create_static_library()
|
||||
|
||||
if aom_library and input_buffer:
|
||||
input_total = len(input_buffer)
|
||||
temp_buffer = (ctypes.c_uint8 * input_total).from_buffer_copy(input_buffer)
|
||||
|
||||
if aom_library.aom_codec_decode(aom_decoder, temp_buffer, input_total, None) == 0:
|
||||
frame_pointer = aom_library.aom_codec_get_frame(aom_decoder, ctypes.byref(ctypes.c_void_p(0)))
|
||||
|
||||
if frame_pointer:
|
||||
frame_width = ctypes.c_uint.from_address(frame_pointer + 28).value & ~1
|
||||
frame_height = ctypes.c_uint.from_address(frame_pointer + 32).value & ~1
|
||||
return frame_width, frame_height
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def destroy(aom_decoder : AomDecoder) -> None:
|
||||
aom_library = aom_module.create_static_library()
|
||||
|
||||
|
||||
@@ -1,52 +1,55 @@
|
||||
import ctypes
|
||||
import struct
|
||||
from typing import Optional
|
||||
|
||||
from facefusion.libraries import vpx as vpx_module
|
||||
from facefusion.types import Resolution, VpxDecoder
|
||||
from facefusion.types import VpxDecoder, VpxPointer
|
||||
|
||||
|
||||
def create() -> Optional[VpxDecoder]:
|
||||
def create(thread_count : int) -> Optional[VpxDecoder]:
|
||||
vpx_library = vpx_module.create_static_library()
|
||||
|
||||
if vpx_library:
|
||||
vpx_decoder = ctypes.create_string_buffer(64)
|
||||
vpx_codec = ctypes.c_void_p.in_dll(vpx_library, 'vpx_codec_vp8_dx_algo')
|
||||
config_buffer = ctypes.create_string_buffer(128)
|
||||
|
||||
if vpx_library.vpx_codec_dec_init_ver(vpx_decoder, ctypes.byref(vpx_codec), None, 0, 12) == 0:
|
||||
struct.pack_into('I', config_buffer, 0, thread_count)
|
||||
|
||||
if vpx_library.vpx_codec_dec_init_ver(vpx_decoder, ctypes.byref(vpx_codec), config_buffer, 0, 12) == 0:
|
||||
return vpx_decoder
|
||||
|
||||
return None
|
||||
|
||||
|
||||
#TODO: needs review
|
||||
def decode(vpx_decoder : VpxDecoder, input_buffer : bytes) -> bytes:
|
||||
def decode(vpx_decoder : VpxDecoder, input_buffer : bytes) -> Optional[VpxPointer]:
|
||||
vpx_library = vpx_module.create_static_library()
|
||||
output_buffer = bytes()
|
||||
|
||||
if vpx_library and input_buffer:
|
||||
input_total = len(input_buffer)
|
||||
temp_buffer = (ctypes.c_uint8 * input_total).from_buffer_copy(input_buffer)
|
||||
temp_buffer = ctypes.create_string_buffer(input_buffer)
|
||||
|
||||
if vpx_library.vpx_codec_decode(vpx_decoder, temp_buffer, input_total, None, 0) == 0:
|
||||
frame_pointer = vpx_library.vpx_codec_get_frame(vpx_decoder, ctypes.byref(ctypes.c_void_p(0)))
|
||||
address = vpx_library.vpx_codec_get_frame(vpx_decoder, ctypes.byref(ctypes.c_void_p(0)))
|
||||
|
||||
if frame_pointer:
|
||||
output_buffer = collect(frame_pointer)
|
||||
if address:
|
||||
frame_width = ctypes.c_uint.from_address(address + 24).value & ~1
|
||||
frame_height = ctypes.c_uint.from_address(address + 28).value & ~1
|
||||
|
||||
return output_buffer
|
||||
return VpxPointer(
|
||||
buffer = collect(address, frame_width, frame_height),
|
||||
resolution = (frame_width, frame_height)
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
#TODO: needs review - find better name
|
||||
def collect(frame_pointer : int) -> bytes:
|
||||
frame_width = ctypes.c_uint.from_address(frame_pointer + 24).value & ~1
|
||||
frame_height = ctypes.c_uint.from_address(frame_pointer + 28).value & ~1
|
||||
planes_offset = frame_pointer + 48
|
||||
strides_offset = frame_pointer + 80
|
||||
def collect(address : int, frame_width : int, frame_height : int) -> bytes:
|
||||
output_buffer = bytes()
|
||||
|
||||
for index in range(3):
|
||||
plane_pointer = ctypes.c_void_p.from_address(planes_offset + index * 8).value
|
||||
stride = ctypes.c_int.from_address(strides_offset + index * 4).value
|
||||
plane_pointer = ctypes.c_void_p.from_address(address + 48 + index * 8).value
|
||||
stride = ctypes.c_int.from_address(address + 80 + index * 4).value
|
||||
plane_width = frame_width >> (index > 0)
|
||||
plane_height = frame_height >> (index > 0)
|
||||
|
||||
@@ -56,25 +59,6 @@ def collect(frame_pointer : int) -> bytes:
|
||||
return output_buffer
|
||||
|
||||
|
||||
#TODO: needs review
|
||||
def read_resolution(vpx_decoder : VpxDecoder, input_buffer : bytes) -> Optional[Resolution]:
|
||||
vpx_library = vpx_module.create_static_library()
|
||||
|
||||
if vpx_library and input_buffer:
|
||||
input_total = len(input_buffer)
|
||||
temp_buffer = (ctypes.c_uint8 * input_total).from_buffer_copy(input_buffer)
|
||||
|
||||
if vpx_library.vpx_codec_decode(vpx_decoder, temp_buffer, input_total, None, 0) == 0:
|
||||
frame_pointer = vpx_library.vpx_codec_get_frame(vpx_decoder, ctypes.byref(ctypes.c_void_p(0)))
|
||||
|
||||
if frame_pointer:
|
||||
frame_width = ctypes.c_uint.from_address(frame_pointer + 24).value & ~1
|
||||
frame_height = ctypes.c_uint.from_address(frame_pointer + 28).value & ~1
|
||||
return frame_width, frame_height
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def destroy(vpx_decoder : VpxDecoder) -> None:
|
||||
vpx_library = vpx_module.create_static_library()
|
||||
|
||||
|
||||
+20
-9
@@ -90,6 +90,16 @@ MelFilterBank : TypeAlias = NDArray[Any]
|
||||
Voice : TypeAlias = NDArray[Any]
|
||||
VoiceChunk : TypeAlias = NDArray[Any]
|
||||
|
||||
BitRate : TypeAlias = int
|
||||
SampleRate : TypeAlias = int
|
||||
Fps : TypeAlias = float
|
||||
Duration : TypeAlias = float
|
||||
Color : TypeAlias = Tuple[int, int, int, int]
|
||||
Padding : TypeAlias = Tuple[int, int, int, int]
|
||||
Margin : TypeAlias = Tuple[int, int, int, int]
|
||||
Orientation = Literal['landscape', 'portrait']
|
||||
Resolution : TypeAlias = Tuple[int, int]
|
||||
|
||||
AudioCodec : TypeAlias = Literal['opus']
|
||||
VideoCodec : TypeAlias = Literal['av1', 'vp8']
|
||||
|
||||
@@ -100,15 +110,16 @@ OpusDecoder : TypeAlias = ctypes.c_void_p
|
||||
VpxEncoder : TypeAlias = ctypes.Array[ctypes.c_char]
|
||||
VpxDecoder : TypeAlias = ctypes.Array[ctypes.c_char]
|
||||
|
||||
BitRate : TypeAlias = int
|
||||
SampleRate : TypeAlias = int
|
||||
Fps : TypeAlias = float
|
||||
Duration : TypeAlias = float
|
||||
Color : TypeAlias = Tuple[int, int, int, int]
|
||||
Padding : TypeAlias = Tuple[int, int, int, int]
|
||||
Margin : TypeAlias = Tuple[int, int, int, int]
|
||||
Orientation = Literal['landscape', 'portrait']
|
||||
Resolution : TypeAlias = Tuple[int, int]
|
||||
AomPointer = TypedDict('AomPointer',
|
||||
{
|
||||
'buffer' : bytes,
|
||||
'resolution' : Resolution
|
||||
})
|
||||
VpxPointer = TypedDict('VpxPointer',
|
||||
{
|
||||
'buffer' : bytes,
|
||||
'resolution' : Resolution
|
||||
})
|
||||
|
||||
Args : TypeAlias = Dict[str, Any]
|
||||
|
||||
|
||||
@@ -5,9 +5,11 @@ import pytest
|
||||
from tests.assert_helper import get_test_example_file, get_test_examples_directory
|
||||
|
||||
from facefusion import state_manager
|
||||
from facefusion.codecs.aom_decoder import create, decode, destroy, read_resolution
|
||||
from facefusion.codecs.aom_decoder import create, decode, destroy
|
||||
from facefusion.codecs.aom_encoder import create as create_encoder, encode
|
||||
from facefusion.common_helper import is_macos
|
||||
from facefusion.download import conditional_download
|
||||
from facefusion.hash_helper import create_hash
|
||||
from facefusion.libraries import aom as aom_module
|
||||
from facefusion.vision import read_video_frame
|
||||
|
||||
@@ -21,40 +23,33 @@ def before_all() -> None:
|
||||
aom_module.pre_check()
|
||||
|
||||
|
||||
#TODO: needs review
|
||||
def test_create() -> None:
|
||||
assert create()
|
||||
assert create(1)
|
||||
|
||||
with patch('facefusion.codecs.aom_decoder.aom_module.create_static_library', return_value = None):
|
||||
assert create(1) is None
|
||||
|
||||
|
||||
#TODO: needs review
|
||||
def test_decode() -> None:
|
||||
vision_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
|
||||
video_buffer = cv2.cvtColor(vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes()
|
||||
video_resolution = (vision_frame.shape[1], vision_frame.shape[0])
|
||||
aom_encoder = create_encoder(video_resolution, 1000, 1, 0)
|
||||
encoded_buffer = encode(aom_encoder, video_buffer, video_resolution, 0)
|
||||
decode_resolution = read_resolution(create(), encoded_buffer)
|
||||
aom_pointer = decode(create(1), encoded_buffer)
|
||||
|
||||
assert len(decode(create(), encoded_buffer)) == decode_resolution[0] * decode_resolution[1] * 3 // 2
|
||||
assert decode(create(), bytes()) == bytes()
|
||||
assert aom_pointer is not None
|
||||
assert aom_pointer.get('resolution')[0] >= video_resolution[0]
|
||||
assert aom_pointer.get('resolution')[1] >= video_resolution[1]
|
||||
assert len(aom_pointer.get('buffer')) == aom_pointer.get('resolution')[0] * aom_pointer.get('resolution')[1] * 3 // 2
|
||||
assert decode(create(1), bytes()) is None
|
||||
|
||||
if is_macos():
|
||||
assert create_hash(bytes(aom_pointer.get('buffer'))) == 'c8c6fdaa'
|
||||
|
||||
|
||||
#TODO: needs review
|
||||
def test_read_resolution() -> None:
|
||||
vision_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
|
||||
video_buffer = cv2.cvtColor(vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes()
|
||||
video_resolution = (vision_frame.shape[1], vision_frame.shape[0])
|
||||
aom_encoder = create_encoder(video_resolution, 1000, 1, 0)
|
||||
encoded_buffer = encode(aom_encoder, video_buffer, video_resolution, 0)
|
||||
|
||||
assert read_resolution(create(), encoded_buffer)[0] >= video_resolution[0]
|
||||
assert read_resolution(create(), encoded_buffer)[1] >= video_resolution[1]
|
||||
assert read_resolution(create(), bytes()) is None
|
||||
|
||||
|
||||
#TODO: needs review
|
||||
def test_destroy() -> None:
|
||||
aom_decoder = create()
|
||||
aom_decoder = create(1)
|
||||
|
||||
with patch.object(aom_module.create_static_library(), 'aom_codec_destroy') as mock:
|
||||
destroy(aom_decoder)
|
||||
|
||||
@@ -7,8 +7,10 @@ from tests.assert_helper import get_test_example_file, get_test_examples_directo
|
||||
from facefusion import state_manager
|
||||
from facefusion.codecs.opus_decoder import create, decode, destroy
|
||||
from facefusion.codecs.opus_encoder import create as create_encoder, encode
|
||||
from facefusion.common_helper import is_macos
|
||||
from facefusion.download import conditional_download
|
||||
from facefusion.ffmpeg import read_audio_buffer
|
||||
from facefusion.hash_helper import create_hash
|
||||
from facefusion.libraries import opus as opus_module
|
||||
|
||||
|
||||
@@ -21,24 +23,25 @@ def before_all() -> None:
|
||||
opus_module.pre_check()
|
||||
|
||||
|
||||
#TODO: needs review
|
||||
def test_create() -> None:
|
||||
assert create(48000, 2)
|
||||
assert create(0, 0) is None
|
||||
|
||||
|
||||
#TODO: needs review
|
||||
def test_decode() -> None:
|
||||
audio_buffer = read_audio_buffer(get_test_example_file('source.mp3'), 48000, 16, 2)
|
||||
audio_sample = numpy.frombuffer(audio_buffer, dtype = numpy.int16).astype(numpy.float32) / 32768.0
|
||||
opus_encoder = create_encoder(48000, 2)
|
||||
encoded_buffer = encode(opus_encoder, audio_sample.tobytes(), 960)
|
||||
opus_decoder = create(48000, 2)
|
||||
decoded_buffer = decode(opus_decoder, encoded_buffer, 960, 2)
|
||||
|
||||
assert len(decode(opus_decoder, encoded_buffer, 960, 2)) == 960 * 2 * 4
|
||||
assert len(decoded_buffer) == 960 * 2 * 4
|
||||
|
||||
if is_macos():
|
||||
assert create_hash(decoded_buffer) == '92f7997d'
|
||||
|
||||
|
||||
#TODO: needs review
|
||||
def test_destroy() -> None:
|
||||
opus_decoder = create(48000, 2)
|
||||
|
||||
|
||||
@@ -5,9 +5,11 @@ import pytest
|
||||
from tests.assert_helper import get_test_example_file, get_test_examples_directory
|
||||
|
||||
from facefusion import state_manager
|
||||
from facefusion.codecs.vpx_decoder import create, decode, destroy, read_resolution
|
||||
from facefusion.codecs.vpx_decoder import create, decode, destroy
|
||||
from facefusion.codecs.vpx_encoder import create as create_encoder, encode
|
||||
from facefusion.common_helper import is_macos
|
||||
from facefusion.download import conditional_download
|
||||
from facefusion.hash_helper import create_hash
|
||||
from facefusion.libraries import vpx as vpx_module
|
||||
from facefusion.vision import read_video_frame
|
||||
|
||||
@@ -21,40 +23,32 @@ def before_all() -> None:
|
||||
vpx_module.pre_check()
|
||||
|
||||
|
||||
#TODO: needs review
|
||||
def test_create() -> None:
|
||||
assert create()
|
||||
assert create(1)
|
||||
|
||||
with patch('facefusion.codecs.vpx_decoder.vpx_module.create_static_library', return_value = None):
|
||||
assert create(1) is None
|
||||
|
||||
|
||||
#TODO: needs review
|
||||
def test_decode() -> None:
|
||||
vision_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
|
||||
video_buffer = cv2.cvtColor(vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes()
|
||||
video_resolution = (vision_frame.shape[1], vision_frame.shape[0])
|
||||
vpx_encoder = create_encoder(video_resolution, 1000, 1, 0)
|
||||
encoded_buffer = encode(vpx_encoder, video_buffer, video_resolution, 0)
|
||||
vpx_decoder = create()
|
||||
vpx_pointer = decode(create(1), encoded_buffer)
|
||||
|
||||
assert len(decode(vpx_decoder, encoded_buffer)) == video_resolution[0] * video_resolution[1] * 3 // 2
|
||||
assert decode(vpx_decoder, bytes()) == bytes()
|
||||
assert vpx_pointer is not None
|
||||
assert vpx_pointer.get('resolution') == video_resolution
|
||||
assert len(vpx_pointer.get('buffer')) == video_resolution[0] * video_resolution[1] * 3 // 2
|
||||
assert decode(create(1), bytes()) is None
|
||||
|
||||
if is_macos():
|
||||
assert create_hash(bytes(vpx_pointer.get('buffer'))) == '87450f70'
|
||||
|
||||
|
||||
#TODO: needs review
|
||||
def test_read_resolution() -> None:
|
||||
vision_frame = read_video_frame(get_test_example_file('target-240p.mp4'))
|
||||
video_buffer = cv2.cvtColor(vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes()
|
||||
video_resolution = (vision_frame.shape[1], vision_frame.shape[0])
|
||||
vpx_encoder = create_encoder(video_resolution, 1000, 1, 0)
|
||||
encoded_buffer = encode(vpx_encoder, video_buffer, video_resolution, 0)
|
||||
vpx_decoder = create()
|
||||
|
||||
assert read_resolution(vpx_decoder, encoded_buffer) == video_resolution
|
||||
assert read_resolution(vpx_decoder, bytes()) is None
|
||||
|
||||
|
||||
#TODO: needs review
|
||||
def test_destroy() -> None:
|
||||
vpx_decoder = create()
|
||||
vpx_decoder = create(1)
|
||||
|
||||
with patch.object(vpx_module.create_static_library(), 'vpx_codec_destroy') as mock:
|
||||
destroy(vpx_decoder)
|
||||
|
||||
Reference in New Issue
Block a user