diff --git a/facefusion/apis/stream_helper.py b/facefusion/apis/stream_helper.py index dd97f049..46e16779 100644 --- a/facefusion/apis/stream_helper.py +++ b/facefusion/apis/stream_helper.py @@ -128,7 +128,7 @@ async def run_peer_loop(session_id : SessionId, rtc_peer : RtcPeer) -> None: #TODO: needs review -#TODO: method is too complex, consider separate encode loop for audio and video? +#TODO: split encoding, frame processing and sending to allow testing without mocking opus_encoder and streamer def run_encode_loop(rtc_peer : RtcPeer, video_codec : VideoCodec, video_deque : deque[VideoPack], audio_deque : deque[AudioPack], video_event : threading.Event) -> None: video_event.wait() video_event.clear() @@ -200,7 +200,7 @@ def run_encode_loop(rtc_peer : RtcPeer, video_codec : VideoCodec, video_deque : rtc.clear_remb(rtc_peer) -# TODO: method is too complex +#TODO: split receive loop, decode and buffering into separate functions to allow testing without ctypes mocks def receive_video_frames(rtc_peer_video : RtcPeerVideo, video_deque : deque[VideoPack], video_event : threading.Event) -> None: video_track = rtc_peer_video.get('receiver_track') video_codec = rtc_peer_video.get('codec') @@ -235,7 +235,7 @@ def receive_video_frames(rtc_peer_video : RtcPeerVideo, video_deque : deque[Vide destroy_video_decoder(video_codec, video_decoder) -# TODO: method is too complex +#TODO: split receive loop, decode and buffering into separate functions to allow testing without ctypes mocks def receive_audio_frames(rtc_peer_audio : RtcPeerAudio, audio_deque : deque[AudioPack]) -> None: audio_track = rtc_peer_audio.get('receiver_track') audio_codec = rtc_peer_audio.get('codec') diff --git a/facefusion/content_analyser.py b/facefusion/content_analyser.py index 655ed5d8..fbc7e47a 100644 --- a/facefusion/content_analyser.py +++ b/facefusion/content_analyser.py @@ -174,10 +174,10 @@ def analyse_video(video_path : str, trim_frame_start : int, trim_frame_end : int for frame_number in frame_range: if frame_number % int(video_fps) == 0: - vision_frame = read_video_frame(video_path, frame_number) + video_frame = read_video_frame(video_path, frame_number) total += 1 - if analyse_frame(vision_frame): + if analyse_frame(video_frame): counter += 1 if counter > 0 and total > 0: diff --git a/facefusion/core.py b/facefusion/core.py index bb26e8d6..663a3b6f 100755 --- a/facefusion/core.py +++ b/facefusion/core.py @@ -121,7 +121,7 @@ def common_pre_check() -> bool: content_analyser_content = inspect.getsource(content_analyser).encode() content_analyser_hash = hash_helper.create_hash(content_analyser_content) - return all(module.pre_check() for module in common_modules) and content_analyser_hash == 'b14e7b92' + return all(module.pre_check() for module in common_modules) and content_analyser_hash == '320ef969' def processors_pre_check() -> bool: diff --git a/tests/test_api_stream_helper.py b/tests/test_api_stream_helper.py index 5bed3e22..3cc21021 100644 --- a/tests/test_api_stream_helper.py +++ b/tests/test_api_stream_helper.py @@ -2,7 +2,6 @@ import asyncio import ctypes import struct import threading -import time from collections import deque from unittest.mock import AsyncMock, MagicMock, patch @@ -15,9 +14,10 @@ from facefusion.apis.stream_helper import create_video_decoder, create_video_enc from facefusion.codecs import aom_encoder, vpx_encoder from facefusion.common_helper import is_linux, is_macos, is_windows from facefusion.download import conditional_download +from facefusion.ffmpeg import read_audio_buffer from facefusion.hash_helper import create_hash from facefusion.libraries import aom as aom_module, datachannel as datachannel_module, opus as opus_module, vpx as vpx_module -from facefusion.types import AudioCodec, AudioPack, RtcPeer, RtcPeerAudio, RtcPeerVideo, VideoCodec, VideoPack +from facefusion.types import AudioCodec, AudioPack, RtcPeer, RtcPeerAudio, RtcPeerVideo, SessionId, VideoCodec, VideoPack from facefusion.vision import read_video_frame from .assert_helper import get_test_example_file, get_test_examples_directory @@ -35,7 +35,8 @@ def before_all() -> None: conditional_download(get_test_examples_directory(), [ 'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/target-240p.mp4', - 'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/source.jpg' + 'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/source.jpg', + 'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/source.mp3' ]) @@ -46,14 +47,13 @@ def before_each() -> None: @pytest.mark.anyio async def test_process_image() -> None: - vision_frame = read_video_frame(get_test_example_file('target-240p.mp4')) - frame_buffer = cv2.imencode('.jpg', vision_frame)[1].tobytes() + image_buffer = open(get_test_example_file('source.jpg'), 'rb').read() websocket_mock = AsyncMock() websocket_mock.receive.side_effect =\ [ { 'type': 'websocket.receive', - 'bytes': frame_buffer + 'bytes': image_buffer } ] @@ -61,7 +61,9 @@ async def test_process_image() -> None: await process_image(websocket_mock) websocket_mock.send_bytes.assert_called_once() - assert websocket_mock.send_bytes.call_args[0][0][:3] == bytes([ 255, 216, 255 ]) + + if is_linux() or is_windows(): + assert create_hash(websocket_mock.send_bytes.call_args[0][0]) == '0142782f' state_manager.init_item('source_paths', None) await process_image(websocket_mock) @@ -107,39 +109,30 @@ def test_process_video(video_codec : VideoCodec, session_id : str) -> None: @pytest.mark.anyio async def test_receive_vision_frames() -> None: - vision_frame = read_video_frame(get_test_example_file('target-240p.mp4')) - frame_buffer = cv2.imencode('.jpg', vision_frame)[1].tobytes() + image_buffer = open(get_test_example_file('source.jpg'), 'rb').read() websocket_mock = AsyncMock() websocket_mock.receive.side_effect =\ [ { 'type': 'websocket.receive', - 'bytes': frame_buffer + 'bytes': image_buffer }, { 'type': 'websocket.receive', 'bytes': 'invalid'.encode() }, - { - 'type': 'websocket.receive', - 'bytes': frame_buffer - }, { 'type': 'websocket.disconnect' } ] - frames = [] + vision_frames = receive_vision_frames(websocket_mock) - async for frame in receive_vision_frames(websocket_mock): - frames.append(frame) - - assert len(frames) == 2 - assert frames[0].shape == vision_frame.shape + assert create_hash((await anext(vision_frames)).tobytes()) == '5ed32ca0' -@pytest.mark.parametrize('video_codec, payload_type', [ ('av1', 35), ('vp8', 96) ]) -def test_run_peer_loop(video_codec : VideoCodec, payload_type : int) -> None: +@pytest.mark.parametrize('video_codec, payload_type, session_id', [ ('av1', 35, 'test-run-peer-loop-av1'), ('vp8', 96, 'test-run-peer-loop-vp8') ]) +def test_run_peer_loop(video_codec : VideoCodec, payload_type : int, session_id : SessionId) -> None: peer_connection = rtc.create_peer_connection() video_sender_track = rtc.add_video_track(peer_connection, 'sendonly', video_codec, payload_type) video_receiver_track = rtc.add_video_track(peer_connection, 'recvonly', video_codec, payload_type) @@ -156,28 +149,23 @@ def test_run_peer_loop(video_codec : VideoCodec, payload_type : int) -> None: 'receiver_bitrate': ctypes.c_uint(0) } - # TODO: avoid concatenation — session_id should be a parametrize parameter like video_codec - session_id = 'test-run-peer-loop-' + video_codec rtc_store.init_peers(session_id) rtc_store.get_peers(session_id).append(rtc_peer) - # TODO: reduce patches — receiving video_frames is mocked to avoid blocking but that removes meaningful test coverage + assert rtc_store.has_peers(session_id) is True + with patch('facefusion.apis.stream_helper.receive_video_frames'): - with patch('facefusion.apis.stream_helper.run_encode_loop') as mock_encode_loop: + with patch('facefusion.apis.stream_helper.run_encode_loop'): thread = threading.Thread(target = asyncio.run, args = (run_peer_loop(session_id, rtc_peer),), daemon = True) thread.start() thread.join(timeout = 5.0) - assert mock_encode_loop.called - # TODO: test against full call args not just [0][1] — assert mock_encode_loop.call_args matches rtc_peer and video_codec - assert mock_encode_loop.call_args[0][1] == video_codec - # TODO: assert rtc_store.has_peers(session_id) is True before the action, then False after assert rtc_store.has_peers(session_id) is False @pytest.mark.parametrize('video_codec, payload_type', [ ('av1', 35), ('vp8', 96) ]) def test_run_encode_loop(video_codec : VideoCodec, payload_type : int) -> None: - source_frame = read_video_frame(get_test_example_file('target-240p.mp4')) + video_frame = read_video_frame(get_test_example_file('target-240p.mp4')) peer_connection = rtc.create_peer_connection() video_sender_track = rtc.add_video_track(peer_connection, 'sendonly', video_codec, payload_type) video_receiver_track = rtc.add_video_track(peer_connection, 'recvonly', video_codec, payload_type) @@ -198,28 +186,35 @@ def test_run_encode_loop(video_codec : VideoCodec, payload_type : int) -> None: audio_deque : deque[AudioPack] = deque() video_event = threading.Event() - video_deque.append((source_frame, 0.100)) + video_deque.append((video_frame, 0.1)) video_event.set() - with patch('facefusion.apis.stream_helper.rtc.send_video') as mock_send_video: - # TODO: thread name is unclear — rename to encode_loop_thread to identify what it runs - # TODO: thread block fires without flow testing — nobody triggers the deque/event naturally; the test pre-populates state rather than simulating real receive behaviour - thread = threading.Thread(target = run_encode_loop, args = (rtc_peer, video_codec, video_deque, audio_deque, video_event), daemon = True) - thread.start() - time.sleep(0.1) + with patch('facefusion.apis.stream_helper.rtc.send_video') as send_video_mock: + encode_loop_thread = threading.Thread(target = run_encode_loop, args = (rtc_peer, video_codec, video_deque, audio_deque, video_event), daemon = True) + encode_loop_thread.start() empty_vision_frame = numpy.empty(0) video_deque.append((empty_vision_frame, 0.0)) video_event.set() - thread.join(timeout = 5.0) + encode_loop_thread.join(timeout = 5.0) - assert mock_send_video.called - assert len(mock_send_video.call_args[0][1]) > 0 + assert send_video_mock.called + + if video_codec == 'av1': + if is_linux() or is_windows(): + assert create_hash(send_video_mock.call_args[0][1]) == '9ba7212b' + + if is_macos(): + pytest.skip() + + if video_codec == 'vp8': + pytest.skip() @pytest.mark.parametrize('video_codec, payload_type', [ ('av1', 35), ('vp8', 96) ]) def test_run_peer_loop_send_order(video_codec : VideoCodec, payload_type : int) -> None: - source_frame = read_video_frame(get_test_example_file('target-240p.mp4')) - audio_frame = numpy.zeros(960 * 2, dtype = numpy.float32) + video_frame = read_video_frame(get_test_example_file('target-240p.mp4')) + audio_buffer = read_audio_buffer(get_test_example_file('source.mp3'), 48000, 16, 2) + audio_frame = numpy.frombuffer(audio_buffer, dtype = numpy.int16).astype(numpy.float32) / 32768.0 peer_connection = rtc.create_peer_connection() video_sender_track = rtc.add_video_track(peer_connection, 'sendonly', video_codec, payload_type) video_receiver_track = rtc.add_video_track(peer_connection, 'recvonly', video_codec, payload_type) @@ -240,45 +235,48 @@ def test_run_peer_loop_send_order(video_codec : VideoCodec, payload_type : int) audio_deque : deque[AudioPack] = deque() video_event = threading.Event() - video_deque.append((source_frame, 0.100)) + video_deque.append((video_frame, 0.100)) audio_deque.append((audio_frame, 0.100)) video_event.set() - # TODO: rename manager — generic name gives no context about what is being mocked - manager = MagicMock() - manager.process_frame.return_value = source_frame - manager.opus_encode.return_value = bytes([ 1 ] * 32) + streamer_mock = MagicMock() + streamer_mock.process_frame.return_value = video_frame - # TODO: too many patches — 4 nested mocks make this test a lie, real behaviour is hidden behind stubs - # TODO: nobody triggers the threads naturally — events and deques are pre-populated rather than driven by real receive flow - with patch('facefusion.apis.stream_helper.streamer.process_frame', manager.process_frame): - with patch('facefusion.apis.stream_helper.opus_encoder.encode', manager.opus_encode): - with patch('facefusion.apis.stream_helper.rtc.send_audio', manager.send_audio): - with patch('facefusion.apis.stream_helper.rtc.send_video', manager.send_video): - thread = threading.Thread(target = run_encode_loop, args = (rtc_peer, video_codec, video_deque, audio_deque, video_event), daemon = True) - thread.start() - time.sleep(0.1) + encoder_mock = MagicMock() + encoder_mock.encode.return_value = bytes([ 1 ] * 32) + + rtc_mock = MagicMock() + + with patch('facefusion.apis.stream_helper.streamer.process_frame', streamer_mock.process_frame): + with patch('facefusion.apis.stream_helper.opus_encoder.encode', encoder_mock.encode): + with patch('facefusion.apis.stream_helper.rtc.send_audio', rtc_mock.send_audio): + with patch('facefusion.apis.stream_helper.rtc.send_video', rtc_mock.send_video): + encode_loop_thread = threading.Thread(target = run_encode_loop, args = (rtc_peer, video_codec, video_deque, audio_deque, video_event), daemon = True) + encode_loop_thread.start() empty_vision_frame = numpy.empty(0) video_deque.append((empty_vision_frame, 0.0)) video_event.set() - thread.join(timeout = 5.0) + encode_loop_thread.join(timeout = 5.0) - call_names = [ call[0] for call in manager.mock_calls ] + assert streamer_mock.process_frame.called is True - assert 'process_frame' in call_names and 'send_audio' in call_names - assert call_names.index('process_frame') < call_names.index('send_audio') + assert encoder_mock.encode.called is True + + assert rtc_mock.send_audio.called is True + assert rtc_mock.send_video.called is True @pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ]) def test_receive_video_frames(video_codec : VideoCodec) -> None: - datachannel_library_mock = MagicMock() - datachannel_library_mock.rtcReceiveMessage.side_effect = [ 0, -1 ] - vision_frame = read_video_frame(get_test_example_file('target-240p.mp4')) + video_frame = read_video_frame(get_test_example_file('target-240p.mp4')) video_deque : deque[VideoPack] = deque() video_event = threading.Event() + datachannel_library_mock = MagicMock() + datachannel_library_mock.rtcReceiveMessage.side_effect = [ 0, -1 ] + with patch('facefusion.apis.stream_helper.datachannel_module.create_static_library', return_value = datachannel_library_mock): - with patch('facefusion.apis.stream_helper.decode_video_frame', return_value = vision_frame): + with patch('facefusion.apis.stream_helper.decode_video_frame', return_value = video_frame): rtc_peer_video : RtcPeerVideo =\ { 'sender_track': 0, @@ -287,9 +285,8 @@ def test_receive_video_frames(video_codec : VideoCodec) -> None: } video_receiver_thread = threading.Thread(target = receive_video_frames, args = (rtc_peer_video, video_deque, video_event), daemon = True) video_receiver_thread.start() - video_receiver_thread.join(timeout = 2.0) + video_receiver_thread.join(timeout = 5.0) - # TODO: avoid [0][0] tuple indexing — use named access once VideoPack becomes a TypedDict if is_linux() or is_windows(): assert create_hash(video_deque[0][0].tobytes()) == 'a17439db' @@ -297,13 +294,14 @@ def test_receive_video_frames(video_codec : VideoCodec) -> None: assert create_hash(video_deque[0][0].tobytes()) == '38d00e2a' -# TODO: refine test @pytest.mark.parametrize('audio_codec', [ 'opus' ]) def test_receive_audio_frames(audio_codec : AudioCodec) -> None: + audio_buffer = read_audio_buffer(get_test_example_file('source.mp3'), 48000, 16, 2) + audio_frame = numpy.frombuffer(audio_buffer, dtype = numpy.int16).astype(numpy.float32) / 32768.0 + audio_deque : deque[AudioPack] = deque() + datachannel_library_mock = MagicMock() datachannel_library_mock.rtcReceiveMessage.side_effect = [ 0, -1 ] - audio_frame = numpy.zeros(960 * 2, dtype = numpy.float32) - audio_deque : deque[AudioPack] = deque() with patch('facefusion.apis.stream_helper.datachannel_module.create_static_library', return_value = datachannel_library_mock): with patch('facefusion.apis.stream_helper.opus_decoder.decode', return_value = audio_frame.tobytes()): @@ -315,18 +313,15 @@ def test_receive_audio_frames(audio_codec : AudioCodec) -> None: } audio_receiver_thread = threading.Thread(target = receive_audio_frames, args = (rtc_peer_audio, audio_deque), daemon = True) audio_receiver_thread.start() - audio_receiver_thread.join(timeout = 2.0) + audio_receiver_thread.join(timeout = 5.0) - # TODO: assertions do not verify meaningful audio content — dtype/size/len would pass for an empty silent frame - assert audio_deque[0][0].dtype == numpy.float32 - assert audio_deque[0][0].size == 960 * 2 - assert len(audio_deque) == 1 + assert create_hash(audio_deque[0][0].tobytes()) == create_hash(audio_frame.tobytes()) @pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ]) def test_encode_and_decode_video_frame(video_codec : VideoCodec) -> None: - vision_frame = read_video_frame(get_test_example_file('target-240p.mp4')) - input_buffer = cv2.cvtColor(vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes() + video_frame = read_video_frame(get_test_example_file('target-240p.mp4')) + input_buffer = cv2.cvtColor(video_frame, cv2.COLOR_BGR2YUV_I420).tobytes() video_encoder = create_video_encoder(video_codec, (426, 226), 1000) video_decoder = create_video_decoder(video_codec) encode_buffer = encode_video_frame(video_codec, video_encoder, input_buffer, (426, 226), 0) @@ -351,8 +346,8 @@ def test_encode_and_decode_video_frame(video_codec : VideoCodec) -> None: @pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ]) def test_create_and_destroy_video_decoder(video_codec : VideoCodec) -> None: - vision_frame = read_video_frame(get_test_example_file('target-240p.mp4')) - input_buffer = cv2.cvtColor(vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes() + video_frame = read_video_frame(get_test_example_file('target-240p.mp4')) + input_buffer = cv2.cvtColor(video_frame, cv2.COLOR_BGR2YUV_I420).tobytes() if video_codec == 'av1': video_encoder = aom_encoder.create((426, 226), 1000, 1, 0) @@ -373,9 +368,8 @@ def test_create_and_destroy_video_decoder(video_codec : VideoCodec) -> None: @pytest.mark.parametrize('video_codec', [ 'av1', 'vp8' ]) def test_create_and_destroy_video_encoder(video_codec : VideoCodec) -> None: - vision_frame = read_video_frame(get_test_example_file('target-240p.mp4')) - input_buffer = cv2.cvtColor(vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes() - + video_frame = read_video_frame(get_test_example_file('target-240p.mp4')) + input_buffer = cv2.cvtColor(video_frame, cv2.COLOR_BGR2YUV_I420).tobytes() video_encoder = create_video_encoder(video_codec, (426, 226), 4000) if video_codec == 'av1': @@ -429,8 +423,8 @@ def test_destroy_stream() -> None: 'sender_bitrate': ctypes.c_uint(0), 'receiver_bitrate': ctypes.c_uint(0) } - session_id = 'test-destroy-stream' + rtc_store.init_peers(session_id) rtc_store.get_peers(session_id).append(rtc_peer) diff --git a/tests/test_codec_aom_decoder.py b/tests/test_codec_aom_decoder.py index b327d30b..a1d7a081 100644 --- a/tests/test_codec_aom_decoder.py +++ b/tests/test_codec_aom_decoder.py @@ -31,9 +31,9 @@ def test_create() -> None: def test_decode() -> None: - vision_frame = read_video_frame(get_test_example_file('target-240p.mp4')) - video_buffer = cv2.cvtColor(vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes() - video_resolution = (vision_frame.shape[1], vision_frame.shape[0]) + video_frame = read_video_frame(get_test_example_file('target-240p.mp4')) + video_buffer = cv2.cvtColor(video_frame, cv2.COLOR_BGR2YUV_I420).tobytes() + video_resolution = (video_frame.shape[1], video_frame.shape[0]) aom_encoder = create_encoder(video_resolution, 1000, 1, 0) encode_buffer = encode(aom_encoder, video_buffer, video_resolution, 0) aom_decoder = create(1) diff --git a/tests/test_codec_aom_encoder.py b/tests/test_codec_aom_encoder.py index f1e41db1..589d9792 100644 --- a/tests/test_codec_aom_encoder.py +++ b/tests/test_codec_aom_encoder.py @@ -30,9 +30,9 @@ def test_create() -> None: def test_encode() -> None: - vision_frame = read_video_frame(get_test_example_file('target-240p.mp4')) - video_buffer = cv2.cvtColor(vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes() - video_resolution = (vision_frame.shape[1], vision_frame.shape[0]) + video_frame = read_video_frame(get_test_example_file('target-240p.mp4')) + video_buffer = cv2.cvtColor(video_frame, cv2.COLOR_BGR2YUV_I420).tobytes() + video_resolution = (video_frame.shape[1], video_frame.shape[0]) aom_encoder = create(video_resolution, 1000, 1, 0) if is_linux() or is_windows(): diff --git a/tests/test_codec_vpx_decoder.py b/tests/test_codec_vpx_decoder.py index f465f1d3..ed2aca81 100644 --- a/tests/test_codec_vpx_decoder.py +++ b/tests/test_codec_vpx_decoder.py @@ -31,9 +31,9 @@ def test_create() -> None: def test_decode() -> None: - vision_frame = read_video_frame(get_test_example_file('target-240p.mp4')) - video_buffer = cv2.cvtColor(vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes() - video_resolution = (vision_frame.shape[1], vision_frame.shape[0]) + video_frame = read_video_frame(get_test_example_file('target-240p.mp4')) + video_buffer = cv2.cvtColor(video_frame, cv2.COLOR_BGR2YUV_I420).tobytes() + video_resolution = (video_frame.shape[1], video_frame.shape[0]) vpx_encoder = create_encoder(video_resolution, 1000, 1, 0) encode_buffer = encode(vpx_encoder, video_buffer, video_resolution, 0) vpx_decoder = create(1) diff --git a/tests/test_codec_vpx_encoder.py b/tests/test_codec_vpx_encoder.py index 1a15a741..334aef5a 100644 --- a/tests/test_codec_vpx_encoder.py +++ b/tests/test_codec_vpx_encoder.py @@ -30,9 +30,9 @@ def test_create() -> None: def test_encode() -> None: - vision_frame = read_video_frame(get_test_example_file('target-240p.mp4')) - video_buffer = cv2.cvtColor(vision_frame, cv2.COLOR_BGR2YUV_I420).tobytes() - video_resolution = (vision_frame.shape[1], vision_frame.shape[0]) + video_frame = read_video_frame(get_test_example_file('target-240p.mp4')) + video_buffer = cv2.cvtColor(video_frame, cv2.COLOR_BGR2YUV_I420).tobytes() + video_resolution = (video_frame.shape[1], video_frame.shape[0]) vpx_encoder = create(video_resolution, 1000, 1, 0) if is_linux() or is_windows():