From fbacb24fcc1e71abbd279a7ceda644fa28639b70 Mon Sep 17 00:00:00 2001 From: Henry Ruhs Date: Tue, 19 May 2026 10:31:53 +0200 Subject: [PATCH] Tiny refactor of codecs (#1121) * improve performance using pointers * simplify decoder's collect * simplify decoder's collect * add threading to decoders * fix test * switch back to return bytes * fix macos --- facefusion/apis/stream_helper.py | 59 ++++++++++++------------------ facefusion/codecs/aom_decoder.py | 61 ++++++++++++-------------------- facefusion/codecs/vpx_decoder.py | 60 ++++++++++++------------------- facefusion/types.py | 29 ++++++++++----- tests/test_codec_aom_decoder.py | 39 +++++++++----------- tests/test_codec_opus_decoder.py | 11 +++--- tests/test_codec_vpx_decoder.py | 38 +++++++++----------- 7 files changed, 128 insertions(+), 169 deletions(-) diff --git a/facefusion/apis/stream_helper.py b/facefusion/apis/stream_helper.py index d83b32fe..6495defb 100644 --- a/facefusion/apis/stream_helper.py +++ b/facefusion/apis/stream_helper.py @@ -130,19 +130,14 @@ def run_peer_loop(session_id : SessionId, rtc_peer : RtcPeer) -> None: cleanup_peer(session_id, rtc_peer, video_codec, video_decoder, audio_decoder) return - resolution = read_video_resolution(video_codec, video_decoder, frame_buffer) - - if resolution is None: - cleanup_peer(session_id, rtc_peer, video_codec, video_decoder, audio_decoder) - return - - vision_frame = decode_video_frame(video_codec, video_decoder, frame_buffer, resolution) + vision_frame = decode_video_frame(video_codec, video_decoder, frame_buffer) if vision_frame is None: cleanup_peer(session_id, rtc_peer, video_codec, video_decoder, audio_decoder) return audio_frame = create_empty_audio_frame() + resolution : Resolution = (vision_frame.shape[1], vision_frame.shape[0]) video_encoder = create_video_encoder(video_codec, resolution) audio_encoder = opus_encoder.create(48000, 2) frame_index = 0 @@ -179,13 +174,13 @@ def run_peer_loop(session_id : SessionId, rtc_peer : RtcPeer) -> None: frame_index += 1 - next_frame = drain_to_latest_frame(datachannel_library, video_info.get('receiver_track'), video_codec, video_decoder, video_receive_buffer, resolution) + next_frame = drain_to_latest_frame(datachannel_library, video_info.get('receiver_track'), video_codec, video_decoder, video_receive_buffer) if next_frame is not None: vision_frame = next_frame continue - next_frame = poll_for_frame(datachannel_library, video_info.get('receiver_track'), video_codec, video_decoder, video_receive_buffer, resolution, 30.0) + next_frame = poll_for_frame(datachannel_library, video_info.get('receiver_track'), video_codec, video_decoder, video_receive_buffer, 30.0) if next_frame is None: break @@ -214,9 +209,9 @@ def cleanup_peer(session_id : SessionId, rtc_peer : RtcPeer, video_codec : Video #TODO: needs review def create_video_decoder(video_codec : VideoCodec) -> Optional[VpxDecoder | AomDecoder]: if video_codec == 'av1': - return aom_decoder.create() + return aom_decoder.create(8) if video_codec == 'vp8': - return vpx_decoder.create() + return vpx_decoder.create(8) return None @@ -239,27 +234,19 @@ def destroy_video_encoder(video_codec : VideoCodec, video_encoder : Optional[Vpx vpx_encoder.destroy(video_encoder) -def read_video_resolution(video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder, frame_buffer : bytes) -> Optional[Resolution]: +def decode_video_frame(video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder, frame_buffer : bytes) -> Optional[VisionFrame]: if video_codec == 'av1': - return aom_decoder.read_resolution(video_decoder, frame_buffer) + aom_pointer = aom_decoder.decode(video_decoder, frame_buffer) + if aom_pointer: + frame_width, frame_height = aom_pointer.get('resolution') + yuv_frame = numpy.frombuffer(aom_pointer.get('buffer'), dtype = numpy.uint8).reshape((frame_height * 3 // 2, frame_width)) + return cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR_I420) if video_codec == 'vp8': - return vpx_decoder.read_resolution(video_decoder, frame_buffer) - - return None - - -def decode_video_frame(video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder, frame_buffer : bytes, frame_resolution : Resolution) -> Optional[VisionFrame]: - output_buffer = bytes() - - if video_codec == 'av1': - output_buffer = aom_decoder.decode(video_decoder, frame_buffer) - if video_codec == 'vp8': - output_buffer = vpx_decoder.decode(video_decoder, frame_buffer) - - if output_buffer: - frame_width, frame_height = frame_resolution - yuv_frame = numpy.frombuffer(output_buffer, dtype = numpy.uint8).reshape((frame_height * 3 // 2, frame_width)) - return cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR_I420) + vpx_pointer = vpx_decoder.decode(video_decoder, frame_buffer) + if vpx_pointer: + frame_width, frame_height = vpx_pointer.get('resolution') + yuv_frame = numpy.frombuffer(vpx_pointer.get('buffer'), dtype = numpy.uint8).reshape((frame_height * 3 // 2, frame_width)) + return cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR_I420) return None @@ -304,11 +291,11 @@ def poll_for_buffer(datachannel_library : ctypes.CDLL, video_track : int, receiv #TODO: needs review -def poll_for_frame(datachannel_library : ctypes.CDLL, video_track : int, video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder, receive_buffer : ctypes.Array[ctypes.c_char], frame_resolution : Resolution, timeout : float) -> Optional[VisionFrame]: +def poll_for_frame(datachannel_library : ctypes.CDLL, video_track : int, video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder, receive_buffer : ctypes.Array[ctypes.c_char], timeout : float) -> Optional[VisionFrame]: deadline = time.monotonic() + timeout while time.monotonic() < deadline: - vision_frame = try_receive_frame(datachannel_library, video_track, video_codec, video_decoder, receive_buffer, frame_resolution) + vision_frame = try_receive_frame(datachannel_library, video_track, video_codec, video_decoder, receive_buffer) if vision_frame is not None: return vision_frame @@ -319,17 +306,17 @@ def poll_for_frame(datachannel_library : ctypes.CDLL, video_track : int, video_c #TODO: needs review -def try_receive_frame(datachannel_library : ctypes.CDLL, video_track : int, video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder, receive_buffer : ctypes.Array[ctypes.c_char], frame_resolution : Resolution) -> Optional[VisionFrame]: +def try_receive_frame(datachannel_library : ctypes.CDLL, video_track : int, video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder, receive_buffer : ctypes.Array[ctypes.c_char]) -> Optional[VisionFrame]: frame_buffer = receive_video_buffer(datachannel_library, video_track, receive_buffer) if frame_buffer: - return decode_video_frame(video_codec, video_decoder, frame_buffer, frame_resolution) + return decode_video_frame(video_codec, video_decoder, frame_buffer) return None #TODO: needs review -def drain_to_latest_frame(datachannel_library : ctypes.CDLL, video_track : int, video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder, receive_buffer : ctypes.Array[ctypes.c_char], frame_resolution : Resolution) -> Optional[VisionFrame]: +def drain_to_latest_frame(datachannel_library : ctypes.CDLL, video_track : int, video_codec : VideoCodec, video_decoder : VpxDecoder | AomDecoder, receive_buffer : ctypes.Array[ctypes.c_char]) -> Optional[VisionFrame]: last_vision_frame = numpy.empty(0) buffer_size = ctypes.c_int(512 * 1024) receive_output = 0 @@ -340,7 +327,7 @@ def drain_to_latest_frame(datachannel_library : ctypes.CDLL, video_track : int, if receive_output == 0 and buffer_size.value > 0: frame_buffer = receive_buffer.raw[:buffer_size.value] - vision_frame = decode_video_frame(video_codec, video_decoder, frame_buffer, frame_resolution) + vision_frame = decode_video_frame(video_codec, video_decoder, frame_buffer) if numpy.any(vision_frame): last_vision_frame = vision_frame diff --git a/facefusion/codecs/aom_decoder.py b/facefusion/codecs/aom_decoder.py index 46f3c909..7e341cae 100644 --- a/facefusion/codecs/aom_decoder.py +++ b/facefusion/codecs/aom_decoder.py @@ -1,52 +1,56 @@ import ctypes +import struct from typing import Optional from facefusion.libraries import aom as aom_module -from facefusion.types import AomDecoder, Resolution +from facefusion.types import AomDecoder, AomPointer -def create() -> Optional[AomDecoder]: +def create(thread_count : int) -> Optional[AomDecoder]: aom_library = aom_module.create_static_library() if aom_library: aom_decoder = ctypes.create_string_buffer(128) aom_codec = ctypes.c_void_p.in_dll(aom_library, 'aom_codec_av1_dx_algo') + config_buffer = ctypes.create_string_buffer(128) - if aom_library.aom_codec_dec_init_ver(aom_decoder, ctypes.byref(aom_codec), None, 0, 22) == 0: + struct.pack_into('I', config_buffer, 0, thread_count) + struct.pack_into('I', config_buffer, 12, 1) + + if aom_library.aom_codec_dec_init_ver(aom_decoder, ctypes.byref(aom_codec), config_buffer, 0, 22) == 0: return aom_decoder return None -#TODO: needs review -def decode(aom_decoder : AomDecoder, input_buffer : bytes) -> bytes: +def decode(aom_decoder : AomDecoder, input_buffer : bytes) -> Optional[AomPointer]: aom_library = aom_module.create_static_library() - output_buffer = bytes() if aom_library and input_buffer: input_total = len(input_buffer) - temp_buffer = (ctypes.c_uint8 * input_total).from_buffer_copy(input_buffer) + temp_buffer = ctypes.create_string_buffer(input_buffer) if aom_library.aom_codec_decode(aom_decoder, temp_buffer, input_total, None) == 0: - frame_pointer = aom_library.aom_codec_get_frame(aom_decoder, ctypes.byref(ctypes.c_void_p(0))) + address = aom_library.aom_codec_get_frame(aom_decoder, ctypes.byref(ctypes.c_void_p(0))) - if frame_pointer: - output_buffer = collect(frame_pointer) + if address: + frame_width = ctypes.c_uint.from_address(address + 28).value & ~1 + frame_height = ctypes.c_uint.from_address(address + 32).value & ~1 - return output_buffer + return AomPointer( + buffer = collect(address, frame_width, frame_height), + resolution = (frame_width, frame_height) + ) + + return None -#TODO: needs review -def collect(frame_pointer : int) -> bytes: - frame_width = ctypes.c_uint.from_address(frame_pointer + 28).value & ~1 - frame_height = ctypes.c_uint.from_address(frame_pointer + 32).value & ~1 - planes_offset = frame_pointer + 64 - strides_offset = frame_pointer + 88 +def collect(address : int, frame_width : int, frame_height : int) -> bytes: output_buffer = bytes() for index in range(3): - plane_pointer = ctypes.c_void_p.from_address(planes_offset + index * 8).value - stride = ctypes.c_int.from_address(strides_offset + index * 4).value + plane_pointer = ctypes.c_void_p.from_address(address + 64 + index * 8).value + stride = ctypes.c_int.from_address(address + 88 + index * 4).value plane_width = frame_width >> (index > 0) plane_height = frame_height >> (index > 0) @@ -56,25 +60,6 @@ def collect(frame_pointer : int) -> bytes: return output_buffer -#TODO: needs review -def read_resolution(aom_decoder : AomDecoder, input_buffer : bytes) -> Optional[Resolution]: - aom_library = aom_module.create_static_library() - - if aom_library and input_buffer: - input_total = len(input_buffer) - temp_buffer = (ctypes.c_uint8 * input_total).from_buffer_copy(input_buffer) - - if aom_library.aom_codec_decode(aom_decoder, temp_buffer, input_total, None) == 0: - frame_pointer = aom_library.aom_codec_get_frame(aom_decoder, ctypes.byref(ctypes.c_void_p(0))) - - if frame_pointer: - frame_width = ctypes.c_uint.from_address(frame_pointer + 28).value & ~1 - frame_height = ctypes.c_uint.from_address(frame_pointer + 32).value & ~1 - return frame_width, frame_height - - return None - - def destroy(aom_decoder : AomDecoder) -> None: aom_library = aom_module.create_static_library() diff --git a/facefusion/codecs/vpx_decoder.py b/facefusion/codecs/vpx_decoder.py index 51619a53..26bcbf5b 100644 --- a/facefusion/codecs/vpx_decoder.py +++ b/facefusion/codecs/vpx_decoder.py @@ -1,52 +1,55 @@ import ctypes +import struct from typing import Optional from facefusion.libraries import vpx as vpx_module -from facefusion.types import Resolution, VpxDecoder +from facefusion.types import VpxDecoder, VpxPointer -def create() -> Optional[VpxDecoder]: +def create(thread_count : int) -> Optional[VpxDecoder]: vpx_library = vpx_module.create_static_library() if vpx_library: vpx_decoder = ctypes.create_string_buffer(64) vpx_codec = ctypes.c_void_p.in_dll(vpx_library, 'vpx_codec_vp8_dx_algo') + config_buffer = ctypes.create_string_buffer(128) - if vpx_library.vpx_codec_dec_init_ver(vpx_decoder, ctypes.byref(vpx_codec), None, 0, 12) == 0: + struct.pack_into('I', config_buffer, 0, thread_count) + + if vpx_library.vpx_codec_dec_init_ver(vpx_decoder, ctypes.byref(vpx_codec), config_buffer, 0, 12) == 0: return vpx_decoder return None -#TODO: needs review -def decode(vpx_decoder : VpxDecoder, input_buffer : bytes) -> bytes: +def decode(vpx_decoder : VpxDecoder, input_buffer : bytes) -> Optional[VpxPointer]: vpx_library = vpx_module.create_static_library() - output_buffer = bytes() if vpx_library and input_buffer: input_total = len(input_buffer) - temp_buffer = (ctypes.c_uint8 * input_total).from_buffer_copy(input_buffer) + temp_buffer = ctypes.create_string_buffer(input_buffer) if vpx_library.vpx_codec_decode(vpx_decoder, temp_buffer, input_total, None, 0) == 0: - frame_pointer = vpx_library.vpx_codec_get_frame(vpx_decoder, ctypes.byref(ctypes.c_void_p(0))) + address = vpx_library.vpx_codec_get_frame(vpx_decoder, ctypes.byref(ctypes.c_void_p(0))) - if frame_pointer: - output_buffer = collect(frame_pointer) + if address: + frame_width = ctypes.c_uint.from_address(address + 24).value & ~1 + frame_height = ctypes.c_uint.from_address(address + 28).value & ~1 - return output_buffer + return VpxPointer( + buffer = collect(address, frame_width, frame_height), + resolution = (frame_width, frame_height) + ) + + return None -#TODO: needs review - find better name -def collect(frame_pointer : int) -> bytes: - frame_width = ctypes.c_uint.from_address(frame_pointer + 24).value & ~1 - frame_height = ctypes.c_uint.from_address(frame_pointer + 28).value & ~1 - planes_offset = frame_pointer + 48 - strides_offset = frame_pointer + 80 +def collect(address : int, frame_width : int, frame_height : int) -> bytes: output_buffer = bytes() for index in range(3): - plane_pointer = ctypes.c_void_p.from_address(planes_offset + index * 8).value - stride = ctypes.c_int.from_address(strides_offset + index * 4).value + plane_pointer = ctypes.c_void_p.from_address(address + 48 + index * 8).value + stride = ctypes.c_int.from_address(address + 80 + index * 4).value plane_width = frame_width >> (index > 0) plane_height = frame_height >> (index > 0) @@ -56,25 +59,6 @@ def collect(frame_pointer : int) -> bytes: return output_buffer -#TODO: needs review -def read_resolution(vpx_decoder : VpxDecoder, input_buffer : bytes) -> Optional[Resolution]: - vpx_library = vpx_module.create_static_library() - - if vpx_library and input_buffer: - input_total = len(input_buffer) - temp_buffer = (ctypes.c_uint8 * input_total).from_buffer_copy(input_buffer) - - if vpx_library.vpx_codec_decode(vpx_decoder, temp_buffer, input_total, None, 0) == 0: - frame_pointer = vpx_library.vpx_codec_get_frame(vpx_decoder, ctypes.byref(ctypes.c_void_p(0))) - - if frame_pointer: - frame_width = ctypes.c_uint.from_address(frame_pointer + 24).value & ~1 - frame_height = ctypes.c_uint.from_address(frame_pointer + 28).value & ~1 - return frame_width, frame_height - - return None - - def destroy(vpx_decoder : VpxDecoder) -> None: vpx_library = vpx_module.create_static_library() diff --git a/facefusion/types.py b/facefusion/types.py index a4af9310..8f20d785 100755 --- a/facefusion/types.py +++ b/facefusion/types.py @@ -90,6 +90,16 @@ MelFilterBank : TypeAlias = NDArray[Any] Voice : TypeAlias = NDArray[Any] VoiceChunk : TypeAlias = NDArray[Any] +BitRate : TypeAlias = int +SampleRate : TypeAlias = int +Fps : TypeAlias = float +Duration : TypeAlias = float +Color : TypeAlias = Tuple[int, int, int, int] +Padding : TypeAlias = Tuple[int, int, int, int] +Margin : TypeAlias = Tuple[int, int, int, int] +Orientation = Literal['landscape', 'portrait'] +Resolution : TypeAlias = Tuple[int, int] + AudioCodec : TypeAlias = Literal['opus'] VideoCodec : TypeAlias = Literal['av1', 'vp8'] @@ -100,15 +110,16 @@ OpusDecoder : TypeAlias = ctypes.c_void_p VpxEncoder : TypeAlias = ctypes.Array[ctypes.c_char] VpxDecoder : TypeAlias = ctypes.Array[ctypes.c_char] -BitRate : TypeAlias = int -SampleRate : TypeAlias = int -Fps : TypeAlias = float -Duration : TypeAlias = float -Color : TypeAlias = Tuple[int, int, int, int] -Padding : TypeAlias = Tuple[int, int, int, int] -Margin : TypeAlias = Tuple[int, int, int, int] -Orientation = Literal['landscape', 'portrait'] -Resolution : TypeAlias = Tuple[int, int] +AomPointer = TypedDict('AomPointer', +{ + 'buffer' : bytes, + 'resolution' : Resolution +}) +VpxPointer = TypedDict('VpxPointer', +{ + 'buffer' : bytes, + 'resolution' : Resolution +}) Args : TypeAlias = Dict[str, Any] diff --git a/tests/test_codec_aom_decoder.py b/tests/test_codec_aom_decoder.py index 00838293..5d6a902d 100644 --- a/tests/test_codec_aom_decoder.py +++ b/tests/test_codec_aom_decoder.py @@ -5,9 +5,11 @@ import pytest from tests.assert_helper import get_test_example_file, get_test_examples_directory from facefusion import state_manager -from facefusion.codecs.aom_decoder import create, decode, destroy, read_resolution +from facefusion.codecs.aom_decoder import create, decode, destroy from facefusion.codecs.aom_encoder import create as create_encoder, encode +from facefusion.common_helper import is_macos from facefusion.download import conditional_download +from facefusion.hash_helper import create_hash from facefusion.libraries import aom as aom_module from facefusion.vision import read_video_frame @@ -21,40 +23,33 @@ def before_all() -> None: aom_module.pre_check() -#TODO: needs review def test_create() -> None: - assert create() + assert create(1) + + with patch('facefusion.codecs.aom_decoder.aom_module.create_static_library', return_value = None): + assert create(1) is None -#TODO: needs review def test_decode() -> None: vision_frame = read_video_frame(get_test_example_file('target-240p.mp4')) video_buffer = cv2.cvtColor(vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes() video_resolution = (vision_frame.shape[1], vision_frame.shape[0]) aom_encoder = create_encoder(video_resolution, 1000, 1, 0) encoded_buffer = encode(aom_encoder, video_buffer, video_resolution, 0) - decode_resolution = read_resolution(create(), encoded_buffer) + aom_pointer = decode(create(1), encoded_buffer) - assert len(decode(create(), encoded_buffer)) == decode_resolution[0] * decode_resolution[1] * 3 // 2 - assert decode(create(), bytes()) == bytes() + assert aom_pointer is not None + assert aom_pointer.get('resolution')[0] >= video_resolution[0] + assert aom_pointer.get('resolution')[1] >= video_resolution[1] + assert len(aom_pointer.get('buffer')) == aom_pointer.get('resolution')[0] * aom_pointer.get('resolution')[1] * 3 // 2 + assert decode(create(1), bytes()) is None + + if is_macos(): + assert create_hash(bytes(aom_pointer.get('buffer'))) == 'c8c6fdaa' -#TODO: needs review -def test_read_resolution() -> None: - vision_frame = read_video_frame(get_test_example_file('target-240p.mp4')) - video_buffer = cv2.cvtColor(vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes() - video_resolution = (vision_frame.shape[1], vision_frame.shape[0]) - aom_encoder = create_encoder(video_resolution, 1000, 1, 0) - encoded_buffer = encode(aom_encoder, video_buffer, video_resolution, 0) - - assert read_resolution(create(), encoded_buffer)[0] >= video_resolution[0] - assert read_resolution(create(), encoded_buffer)[1] >= video_resolution[1] - assert read_resolution(create(), bytes()) is None - - -#TODO: needs review def test_destroy() -> None: - aom_decoder = create() + aom_decoder = create(1) with patch.object(aom_module.create_static_library(), 'aom_codec_destroy') as mock: destroy(aom_decoder) diff --git a/tests/test_codec_opus_decoder.py b/tests/test_codec_opus_decoder.py index 191e238e..b7329e2a 100644 --- a/tests/test_codec_opus_decoder.py +++ b/tests/test_codec_opus_decoder.py @@ -7,8 +7,10 @@ from tests.assert_helper import get_test_example_file, get_test_examples_directo from facefusion import state_manager from facefusion.codecs.opus_decoder import create, decode, destroy from facefusion.codecs.opus_encoder import create as create_encoder, encode +from facefusion.common_helper import is_macos from facefusion.download import conditional_download from facefusion.ffmpeg import read_audio_buffer +from facefusion.hash_helper import create_hash from facefusion.libraries import opus as opus_module @@ -21,24 +23,25 @@ def before_all() -> None: opus_module.pre_check() -#TODO: needs review def test_create() -> None: assert create(48000, 2) assert create(0, 0) is None -#TODO: needs review def test_decode() -> None: audio_buffer = read_audio_buffer(get_test_example_file('source.mp3'), 48000, 16, 2) audio_sample = numpy.frombuffer(audio_buffer, dtype = numpy.int16).astype(numpy.float32) / 32768.0 opus_encoder = create_encoder(48000, 2) encoded_buffer = encode(opus_encoder, audio_sample.tobytes(), 960) opus_decoder = create(48000, 2) + decoded_buffer = decode(opus_decoder, encoded_buffer, 960, 2) - assert len(decode(opus_decoder, encoded_buffer, 960, 2)) == 960 * 2 * 4 + assert len(decoded_buffer) == 960 * 2 * 4 + + if is_macos(): + assert create_hash(decoded_buffer) == '92f7997d' -#TODO: needs review def test_destroy() -> None: opus_decoder = create(48000, 2) diff --git a/tests/test_codec_vpx_decoder.py b/tests/test_codec_vpx_decoder.py index 64ef0ed0..b64e29e9 100644 --- a/tests/test_codec_vpx_decoder.py +++ b/tests/test_codec_vpx_decoder.py @@ -5,9 +5,11 @@ import pytest from tests.assert_helper import get_test_example_file, get_test_examples_directory from facefusion import state_manager -from facefusion.codecs.vpx_decoder import create, decode, destroy, read_resolution +from facefusion.codecs.vpx_decoder import create, decode, destroy from facefusion.codecs.vpx_encoder import create as create_encoder, encode +from facefusion.common_helper import is_macos from facefusion.download import conditional_download +from facefusion.hash_helper import create_hash from facefusion.libraries import vpx as vpx_module from facefusion.vision import read_video_frame @@ -21,40 +23,32 @@ def before_all() -> None: vpx_module.pre_check() -#TODO: needs review def test_create() -> None: - assert create() + assert create(1) + + with patch('facefusion.codecs.vpx_decoder.vpx_module.create_static_library', return_value = None): + assert create(1) is None -#TODO: needs review def test_decode() -> None: vision_frame = read_video_frame(get_test_example_file('target-240p.mp4')) video_buffer = cv2.cvtColor(vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes() video_resolution = (vision_frame.shape[1], vision_frame.shape[0]) vpx_encoder = create_encoder(video_resolution, 1000, 1, 0) encoded_buffer = encode(vpx_encoder, video_buffer, video_resolution, 0) - vpx_decoder = create() + vpx_pointer = decode(create(1), encoded_buffer) - assert len(decode(vpx_decoder, encoded_buffer)) == video_resolution[0] * video_resolution[1] * 3 // 2 - assert decode(vpx_decoder, bytes()) == bytes() + assert vpx_pointer is not None + assert vpx_pointer.get('resolution') == video_resolution + assert len(vpx_pointer.get('buffer')) == video_resolution[0] * video_resolution[1] * 3 // 2 + assert decode(create(1), bytes()) is None + + if is_macos(): + assert create_hash(bytes(vpx_pointer.get('buffer'))) == '87450f70' -#TODO: needs review -def test_read_resolution() -> None: - vision_frame = read_video_frame(get_test_example_file('target-240p.mp4')) - video_buffer = cv2.cvtColor(vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes() - video_resolution = (vision_frame.shape[1], vision_frame.shape[0]) - vpx_encoder = create_encoder(video_resolution, 1000, 1, 0) - encoded_buffer = encode(vpx_encoder, video_buffer, video_resolution, 0) - vpx_decoder = create() - - assert read_resolution(vpx_decoder, encoded_buffer) == video_resolution - assert read_resolution(vpx_decoder, bytes()) is None - - -#TODO: needs review def test_destroy() -> None: - vpx_decoder = create() + vpx_decoder = create(1) with patch.object(vpx_module.create_static_library(), 'vpx_codec_destroy') as mock: destroy(vpx_decoder)