diff --git a/facefusion/apis/asset_helper.py b/facefusion/apis/asset_helper.py index 008e4e30..2cb4f300 100644 --- a/facefusion/apis/asset_helper.py +++ b/facefusion/apis/asset_helper.py @@ -1,6 +1,5 @@ import asyncio import os -import queue import uuid from typing import List, Optional @@ -9,7 +8,7 @@ from starlette.datastructures import UploadFile import facefusion.choices from facefusion import ffmpeg, process_manager, state_manager from facefusion.filesystem import create_directory, get_file_extension, get_file_format, is_audio, is_image, is_video -from facefusion.types import ImageMetadata, MediaType, UploadQueue +from facefusion.types import ImageMetadata, MediaType from facefusion.vision import detect_image_resolution @@ -60,12 +59,6 @@ def validate_asset_files(upload_files : List[UploadFile]) -> bool: return True -async def feed_upload_queue(upload_file : UploadFile, upload_queue : UploadQueue) -> None: - while file_chunk := await upload_file.read(1024): - upload_queue.put(file_chunk) - upload_queue.put(b'') - - async def save_asset_files(upload_files : List[UploadFile]) -> List[str]: asset_paths : List[str] = [] api_security_strategy = state_manager.get_item('api_security_strategy') @@ -80,23 +73,19 @@ async def save_asset_files(upload_files : List[UploadFile]) -> List[str]: asset_file_name = uuid.uuid4().hex asset_path = os.path.join(temp_path, asset_file_name + file_extension) - upload_queue : UploadQueue = queue.SimpleQueue() #todo: not sure if queue is even needed + file_content = await upload_file.read() process_manager.start() - upload_task = asyncio.create_task(feed_upload_queue(upload_file, upload_queue)) - - if media_type == 'audio' and await asyncio.to_thread(ffmpeg.sanitize_audio, upload_queue.get, asset_path, api_security_strategy): + if media_type == 'audio' and await asyncio.to_thread(ffmpeg.sanitize_audio, file_content, asset_path, api_security_strategy): asset_paths.append(asset_path) - if media_type == 'image' and await asyncio.to_thread(ffmpeg.sanitize_image, upload_queue.get, asset_path): + if media_type == 'image' and await asyncio.to_thread(ffmpeg.sanitize_image, file_content, asset_path): asset_paths.append(asset_path) - if media_type == 'video' and await asyncio.to_thread(ffmpeg.sanitize_video, upload_queue.get, asset_path, api_security_strategy): + if media_type == 'video' and await asyncio.to_thread(ffmpeg.sanitize_video, file_content, asset_path, api_security_strategy): asset_paths.append(asset_path) - await upload_task - process_manager.end() return asset_paths diff --git a/facefusion/ffmpeg.py b/facefusion/ffmpeg.py index 6c5bb9ab..26d6d0ae 100644 --- a/facefusion/ffmpeg.py +++ b/facefusion/ffmpeg.py @@ -10,7 +10,7 @@ import facefusion.choices from facefusion import ffmpeg_builder, logger, process_manager, state_manager, translator from facefusion.filesystem import get_file_format, remove_file from facefusion.temp_helper import get_temp_file_path, get_temp_frames_pattern -from facefusion.types import ApiSecurityStrategy, AudioBuffer, AudioEncoder, Command, EncoderSet, Fps, MediaChunkReader, Resolution, SampleRate, UpdateProgress, VideoEncoder, VideoFormat +from facefusion.types import ApiSecurityStrategy, AudioBuffer, AudioEncoder, Command, EncoderSet, Fps, Resolution, SampleRate, UpdateProgress, VideoEncoder, VideoFormat from facefusion.vision import detect_video_duration, detect_video_fps, pack_resolution, predict_video_frame_total @@ -45,29 +45,17 @@ def update_progress(progress : tqdm, frame_number : int) -> None: progress.update(frame_number - progress.n) -def run_ffmpeg(commands : List[Command]) -> subprocess.Popen[bytes]: - commands = ffmpeg_builder.run(commands) - process = subprocess.Popen(commands, stderr = subprocess.PIPE, stdout = subprocess.PIPE) - return complete_process(process) - - -def run_ffmpeg_with_pipe(commands : List[Command], media_chunk_reader : MediaChunkReader) -> subprocess.Popen[bytes]: +def run_ffmpeg_with_pipe(commands : List[Command], file_content : bytes) -> subprocess.Popen[bytes]: commands = ffmpeg_builder.run(commands) process = subprocess.Popen(commands, stdin = subprocess.PIPE, stderr = subprocess.PIPE, stdout = subprocess.PIPE) - - while media_chunk := media_chunk_reader(): - if process.poll() is not None: #todo - poll not allowed - break - process.stdin.write(media_chunk) - - if process.stdin and not process.stdin.closed: - process.stdin.close() - - return complete_process(process) + process.communicate(input = file_content) + return process -def complete_process(process : subprocess.Popen[bytes]) -> subprocess.Popen[bytes]: +def run_ffmpeg(commands : List[Command]) -> subprocess.Popen[bytes]: log_level = state_manager.get_item('log_level') + commands = ffmpeg_builder.run(commands) + process = subprocess.Popen(commands, stderr = subprocess.PIPE, stdout = subprocess.PIPE) while process_manager.is_processing(): try: @@ -313,7 +301,7 @@ def concat_video(output_path : str, temp_output_paths : List[str]) -> bool: return process.returncode == 0 -def sanitize_audio(media_chunk_reader : MediaChunkReader, asset_path : str, security_strategy : ApiSecurityStrategy) -> bool: +def sanitize_audio(file_content : bytes, asset_path : str, security_strategy : ApiSecurityStrategy) -> bool: if security_strategy == 'strict': commands = ffmpeg_builder.chain( ffmpeg_builder.set_input('pipe:0'), @@ -321,7 +309,7 @@ def sanitize_audio(media_chunk_reader : MediaChunkReader, asset_path : str, secu ffmpeg_builder.strip_metadata(), ffmpeg_builder.force_output(asset_path) ) - return run_ffmpeg_with_pipe(commands, media_chunk_reader).returncode == 0 + return run_ffmpeg_with_pipe(commands, file_content).returncode == 0 commands = ffmpeg_builder.chain( ffmpeg_builder.set_input('pipe:0'), @@ -329,20 +317,20 @@ def sanitize_audio(media_chunk_reader : MediaChunkReader, asset_path : str, secu ffmpeg_builder.strip_metadata(), ffmpeg_builder.force_output(asset_path) ) - return run_ffmpeg_with_pipe(commands, media_chunk_reader).returncode == 0 + return run_ffmpeg_with_pipe(commands, file_content).returncode == 0 -def sanitize_image(media_chunk_reader : MediaChunkReader, asset_path : str) -> bool: +def sanitize_image(file_content : bytes, asset_path : str) -> bool: commands = ffmpeg_builder.chain( ffmpeg_builder.set_input('pipe:0'), ffmpeg_builder.deep_copy_image(), ffmpeg_builder.strip_metadata(), ffmpeg_builder.force_output(asset_path) ) - return run_ffmpeg_with_pipe(commands, media_chunk_reader).returncode == 0 + return run_ffmpeg_with_pipe(commands, file_content).returncode == 0 -def sanitize_video(media_chunk_reader : MediaChunkReader, asset_path : str, security_strategy : ApiSecurityStrategy) -> bool: +def sanitize_video(file_content : bytes, asset_path : str, security_strategy : ApiSecurityStrategy) -> bool: if security_strategy == 'strict': available_video_encoders = get_static_available_encoder_set().get('video') commands = ffmpeg_builder.chain( @@ -355,7 +343,7 @@ def sanitize_video(media_chunk_reader : MediaChunkReader, asset_path : str, secu ffmpeg_builder.strip_metadata(), ffmpeg_builder.force_output(asset_path) ) - return run_ffmpeg_with_pipe(commands, media_chunk_reader).returncode == 0 + return run_ffmpeg_with_pipe(commands, file_content).returncode == 0 commands = ffmpeg_builder.chain( ffmpeg_builder.set_input('pipe:0'), @@ -364,7 +352,7 @@ def sanitize_video(media_chunk_reader : MediaChunkReader, asset_path : str, secu ffmpeg_builder.strip_metadata(), ffmpeg_builder.force_output(asset_path) ) - return run_ffmpeg_with_pipe(commands, media_chunk_reader).returncode == 0 + return run_ffmpeg_with_pipe(commands, file_content).returncode == 0 def fix_audio_encoder(video_format : VideoFormat, audio_encoder : AudioEncoder) -> AudioEncoder: diff --git a/facefusion/types.py b/facefusion/types.py index fef0078b..9803ae4b 100755 --- a/facefusion/types.py +++ b/facefusion/types.py @@ -1,4 +1,3 @@ -import queue from collections import namedtuple from datetime import datetime from typing import Any, Callable, Dict, List, Literal, NotRequired, Optional, Tuple, TypeAlias, TypedDict, Union @@ -367,8 +366,6 @@ DownloadSet : TypeAlias = Dict[str, Download] VideoMemoryStrategy = Literal['strict', 'moderate', 'tolerant'] ApiSecurityStrategy = Literal['strict', 'moderate'] -UploadQueue : TypeAlias = queue.SimpleQueue[bytes] -MediaChunkReader : TypeAlias = Callable[[], bytes] AppContext = Literal['cli', 'api'] InferencePool : TypeAlias = Dict[str, InferenceSession] diff --git a/tests/assert_helper.py b/tests/assert_helper.py index 42756ad6..82c07a7e 100644 --- a/tests/assert_helper.py +++ b/tests/assert_helper.py @@ -1,7 +1,5 @@ -import io import os import tempfile -from functools import partial from facefusion.filesystem import are_images, create_directory, is_directory, is_file, remove_directory, resolve_file_paths from facefusion.types import JobStatus @@ -48,8 +46,3 @@ def prepare_test_output_directory() -> bool: remove_directory(test_outputs_directory) create_directory(test_outputs_directory) return is_directory(test_outputs_directory) - - -def create_media_reader(file_path : str) -> partial[bytes]: #todo: kill this - file_buffer = io.BytesIO(open(file_path, 'rb').read()) - return partial(file_buffer.read, 1024) diff --git a/tests/test_ffmpeg.py b/tests/test_ffmpeg.py index 538dd9c9..3e5aae92 100644 --- a/tests/test_ffmpeg.py +++ b/tests/test_ffmpeg.py @@ -12,7 +12,7 @@ from facefusion.ffprobe import probe_entries from facefusion.filesystem import copy_file, is_image from facefusion.temp_helper import clear_temp_directory, create_temp_directory, get_temp_file_path, resolve_temp_frame_paths from facefusion.types import EncoderSet -from .assert_helper import create_media_reader, get_test_example_file, get_test_examples_directory, get_test_output_path, prepare_test_output_directory +from .assert_helper import get_test_example_file, get_test_examples_directory, get_test_output_path, prepare_test_output_directory @pytest.fixture(scope = 'module', autouse = True) @@ -216,37 +216,40 @@ def test_replace_audio() -> None: def test_sanitize_audio() -> None: file_path = get_test_example_file('source.wav') + file_content = open(file_path, 'rb').read() output_paths =\ [ get_test_output_path('test-sanitize-audio-strict.mp3'), get_test_output_path('test-sanitize-audio-moderate.wav') ] - assert sanitize_audio(create_media_reader(file_path), output_paths[0], 'strict') is True + assert sanitize_audio(file_content, output_paths[0], 'strict') is True assert probe_entries(output_paths[0], [ 'codec_name' ]).get('codec_name') == 'mp3' - assert sanitize_audio(create_media_reader(file_path), output_paths[1], 'moderate') is True + assert sanitize_audio(file_content, output_paths[1], 'moderate') is True assert probe_entries(output_paths[1], [ 'codec_name' ]).get('codec_name') == 'pcm_s16le' def test_sanitize_image() -> None: file_path = get_test_example_file('source.jpg') + file_content = open(file_path, 'rb').read() output_path = get_test_output_path('test-sanitize-image.jpg') - assert sanitize_image(create_media_reader(file_path), output_path) is True + assert sanitize_image(file_content, output_path) is True assert is_image(output_path) is True def test_sanitize_video() -> None: file_path = get_test_example_file('target-240p-h265.mp4') + file_content = open(file_path, 'rb').read() output_paths =\ [ get_test_output_path('test-sanitize-video-strict.mp4'), get_test_output_path('test-sanitize-video-moderate.mp4') ] - assert sanitize_video(create_media_reader(file_path), output_paths[0], 'strict') is True + assert sanitize_video(file_content, output_paths[0], 'strict') is True assert probe_entries(output_paths[0], [ 'codec_name' ]).get('codec_name') == 'h264' - assert sanitize_video(create_media_reader(file_path), output_paths[1], 'moderate') is True + assert sanitize_video(file_content, output_paths[1], 'moderate') is True assert probe_entries(output_paths[1], [ 'codec_name' ]).get('codec_name') == 'hevc'