From 6bb62cf64e34c692b7599a0b38ae478071fab5e1 Mon Sep 17 00:00:00 2001 From: Harisreedhar <46858047+harisreedhar@users.noreply.github.com> Date: Tue, 31 Mar 2026 20:01:02 +0530 Subject: [PATCH] Assets ffmpeg stream upload (#1069) * ffmpeg sanitize * fix type * fix type * add config * ChunkQueue -> UploadQueue * revert assets.py * move resolve methods to ffmpeg_builder.py * Refactor ffmpeg.py * Remove partial import * improve test * remove put(None) * cleanup * without poll() not working * ChunkReader -> MediaChunkReader * improve assert by replacing generic is_file * naming and cleanup --- facefusion.ini | 1 + facefusion/apis/asset_helper.py | 59 ++++++++++++++------------ facefusion/args_helper.py | 1 + facefusion/choices.py | 3 +- facefusion/ffmpeg.py | 73 +++++++++++++++++++++++++++------ facefusion/ffmpeg_builder.py | 32 +++++++++++++++ facefusion/locales.py | 1 + facefusion/program.py | 11 +++++ facefusion/types.py | 6 +++ tests/assert_helper.py | 7 ++++ tests/test_api_assets.py | 34 +++++++++++++++ tests/test_ffmpeg.py | 40 ++++++++++++++++-- 12 files changed, 225 insertions(+), 43 deletions(-) diff --git a/facefusion.ini b/facefusion.ini index 8cc9d8e0..17f3a2e2 100644 --- a/facefusion.ini +++ b/facefusion.ini @@ -119,6 +119,7 @@ benchmark_cycle_count = [api] api_host = api_port = +api_security_strategy = [execution] execution_device_ids = diff --git a/facefusion/apis/asset_helper.py b/facefusion/apis/asset_helper.py index 6d53d99c..56102233 100644 --- a/facefusion/apis/asset_helper.py +++ b/facefusion/apis/asset_helper.py @@ -1,5 +1,7 @@ +import asyncio import os -import tempfile +import queue +import uuid from typing import List, Optional from starlette.datastructures import UploadFile @@ -8,8 +10,8 @@ import facefusion.choices from facefusion import ffmpeg, process_manager, state_manager from facefusion.audio import detect_audio_duration from facefusion.ffprobe import detect_audio_channel_total, detect_audio_frame_total, detect_audio_sample_rate -from facefusion.filesystem import create_directory, get_file_extension, get_file_format, get_file_name, is_audio, is_image, is_video, remove_file -from facefusion.types import AudioMetadata, ImageMetadata, MediaType, VideoMetadata +from facefusion.filesystem import create_directory, get_file_extension, get_file_format, is_audio, is_image, is_video +from facefusion.types import AudioMetadata, ImageMetadata, MediaType, UploadQueue, VideoMetadata from facefusion.vision import count_video_frame_total, detect_image_resolution, detect_video_duration, detect_video_fps, detect_video_resolution @@ -82,40 +84,45 @@ def validate_asset_files(upload_files : List[UploadFile]) -> bool: return True +async def feed_upload_queue(upload_file : UploadFile, upload_queue : UploadQueue) -> None: + while file_chunk := await upload_file.read(1024): + upload_queue.put(file_chunk) + upload_queue.put(b'') + + async def save_asset_files(upload_files : List[UploadFile]) -> List[str]: asset_paths : List[str] = [] + api_security_strategy = state_manager.get_item('api_security_strategy') for upload_file in upload_files: - upload_file_extension = get_file_extension(upload_file.filename) + file_format = get_file_format(upload_file.filename) + file_extension = get_file_extension(upload_file.filename) + media_type = detect_media_type_by_format(file_format) + temp_path = state_manager.get_temp_path() - with tempfile.NamedTemporaryFile(suffix = upload_file_extension, delete = False) as temp_file: + create_directory(temp_path) - while upload_chunk := await upload_file.read(1024): - temp_file.write(upload_chunk) + asset_file_name = uuid.uuid4().hex + asset_path = os.path.join(temp_path, asset_file_name + file_extension) + upload_queue : UploadQueue = queue.SimpleQueue() - temp_file.flush() + process_manager.start() - media_type = detect_media_type_by_path(temp_file.name) - temp_path = state_manager.get_temp_path() + upload_task = asyncio.create_task(feed_upload_queue(upload_file, upload_queue)) + has_file_sanitized = False - create_directory(temp_path) + if media_type == 'audio': + has_file_sanitized = await asyncio.to_thread(ffmpeg.sanitize_audio, file_format, upload_queue.get, asset_path, api_security_strategy) + if media_type == 'image': + has_file_sanitized = await asyncio.to_thread(ffmpeg.sanitize_image, file_format, upload_queue.get, asset_path) + if media_type == 'video': + has_file_sanitized = await asyncio.to_thread(ffmpeg.sanitize_video, file_format, upload_queue.get, asset_path, api_security_strategy) - asset_file_name = get_file_name(temp_file.name) - asset_path = os.path.join(temp_path, asset_file_name + upload_file_extension) + await upload_task - process_manager.start() + if has_file_sanitized: + asset_paths.append(asset_path) - if media_type == 'audio' and ffmpeg.sanitize_audio(temp_file.name, asset_path): - asset_paths.append(asset_path) - - if media_type == 'image' and ffmpeg.sanitize_image(temp_file.name, asset_path): - asset_paths.append(asset_path) - - if media_type == 'video' and ffmpeg.sanitize_video(temp_file.name, asset_path): - asset_paths.append(asset_path) - - process_manager.end() - - remove_file(temp_file.name) + process_manager.end() return asset_paths diff --git a/facefusion/args_helper.py b/facefusion/args_helper.py index a93176a1..f89055aa 100644 --- a/facefusion/args_helper.py +++ b/facefusion/args_helper.py @@ -78,6 +78,7 @@ def apply_args(args : Args, apply_state_item : ApplyStateItem) -> None: apply_state_item('benchmark_cycle_count', args.get('benchmark_cycle_count')) apply_state_item('api_host', args.get('api_host')) apply_state_item('api_port', args.get('api_port')) + apply_state_item('api_security_strategy', args.get('api_security_strategy')) apply_state_item('video_memory_strategy', args.get('video_memory_strategy')) apply_state_item('log_level', args.get('log_level')) apply_state_item('halt_on_error', args.get('halt_on_error')) diff --git a/facefusion/choices.py b/facefusion/choices.py index 9d98d7fd..bde7dcbc 100755 --- a/facefusion/choices.py +++ b/facefusion/choices.py @@ -2,7 +2,7 @@ import logging from typing import List, Sequence, get_args from facefusion.common_helper import create_float_range, create_int_range -from facefusion.types import Angle, AudioEncoder, AudioFormat, AudioSet, BenchmarkMode, BenchmarkResolution, BenchmarkSet, DownloadProvider, DownloadProviderSet, DownloadScope, ExecutionProvider, ExecutionProviderSet, FaceDetectorModel, FaceDetectorSet, FaceLandmarkerModel, FaceMaskArea, FaceMaskAreaSet, FaceMaskRegion, FaceMaskRegionSet, FaceMaskType, FaceOccluderModel, FaceParserModel, FaceSelectorMode, FaceSelectorOrder, Gender, ImageEncoder, ImageFormat, ImageSet, JobStatus, LogLevel, LogLevelSet, Race, Score, TempFrameFormat, VideoEncoder, VideoFormat, VideoMemoryStrategy, VideoPreset, VideoSet, VoiceExtractorModel, WorkFlow +from facefusion.types import Angle, ApiSecurityStrategy, AudioEncoder, AudioFormat, AudioSet, BenchmarkMode, BenchmarkResolution, BenchmarkSet, DownloadProvider, DownloadProviderSet, DownloadScope, ExecutionProvider, ExecutionProviderSet, FaceDetectorModel, FaceDetectorSet, FaceLandmarkerModel, FaceMaskArea, FaceMaskAreaSet, FaceMaskRegion, FaceMaskRegionSet, FaceMaskType, FaceOccluderModel, FaceParserModel, FaceSelectorMode, FaceSelectorOrder, Gender, ImageEncoder, ImageFormat, ImageSet, JobStatus, LogLevel, LogLevelSet, Race, Score, TempFrameFormat, VideoEncoder, VideoFormat, VideoMemoryStrategy, VideoPreset, VideoSet, VoiceExtractorModel, WorkFlow face_detector_set : FaceDetectorSet =\ { @@ -136,6 +136,7 @@ download_providers : List[DownloadProvider] = list(get_args(DownloadProvider)) download_scopes : List[DownloadScope] = list(get_args(DownloadScope)) video_memory_strategies : List[VideoMemoryStrategy] = list(get_args(VideoMemoryStrategy)) +api_security_strategies : List[ApiSecurityStrategy] = list(get_args(ApiSecurityStrategy)) log_level_set : LogLevelSet =\ { diff --git a/facefusion/ffmpeg.py b/facefusion/ffmpeg.py index e310b7a1..e70df943 100644 --- a/facefusion/ffmpeg.py +++ b/facefusion/ffmpeg.py @@ -10,7 +10,7 @@ import facefusion.choices from facefusion import ffmpeg_builder, logger, process_manager, state_manager, translator from facefusion.filesystem import get_file_format, remove_file from facefusion.temp_helper import get_temp_file_path, get_temp_frames_pattern -from facefusion.types import AudioBuffer, AudioEncoder, Command, EncoderSet, Fps, Resolution, UpdateProgress, VideoEncoder, VideoFormat +from facefusion.types import ApiSecurityStrategy, AudioBuffer, AudioEncoder, Command, EncoderSet, Fps, MediaChunkReader, Resolution, UpdateProgress, VideoEncoder, VideoFormat from facefusion.vision import detect_video_duration, detect_video_fps, pack_resolution, predict_video_frame_total @@ -46,9 +46,28 @@ def update_progress(progress : tqdm, frame_number : int) -> None: def run_ffmpeg(commands : List[Command]) -> subprocess.Popen[bytes]: - log_level = state_manager.get_item('log_level') commands = ffmpeg_builder.run(commands) process = subprocess.Popen(commands, stderr = subprocess.PIPE, stdout = subprocess.PIPE) + return complete_process(process) + + +def run_ffmpeg_with_pipe(commands : List[Command], media_chunk_reader : MediaChunkReader) -> subprocess.Popen[bytes]: + commands = ffmpeg_builder.run(commands) + process = subprocess.Popen(commands, stdin = subprocess.PIPE, stderr = subprocess.PIPE, stdout = subprocess.PIPE) + + while media_chunk := media_chunk_reader(): + if process.poll() is not None: + break + process.stdin.write(media_chunk) + + if process.stdin and not process.stdin.closed: + process.stdin.close() + + return complete_process(process) + + +def complete_process(process : subprocess.Popen[bytes]) -> subprocess.Popen[bytes]: + log_level = state_manager.get_item('log_level') while process_manager.is_processing(): try: @@ -289,34 +308,62 @@ def concat_video(output_path : str, temp_output_paths : List[str]) -> bool: return process.returncode == 0 -def sanitize_audio(temp_path : str, asset_path : str) -> bool: +def sanitize_audio(audio_format : str, media_chunk_reader : MediaChunkReader, asset_path : str, security_strategy : ApiSecurityStrategy) -> bool: + audio_pipe_format = ffmpeg_builder.resolve_audio_pipe_format(audio_format) + + if security_strategy == 'strict': + commands = ffmpeg_builder.chain( + ffmpeg_builder.pipe_input(audio_pipe_format), + ffmpeg_builder.deep_copy_audio(), + ffmpeg_builder.strip_metadata(), + ffmpeg_builder.force_output(asset_path) + ) + return run_ffmpeg_with_pipe(commands, media_chunk_reader).returncode == 0 + commands = ffmpeg_builder.chain( - ffmpeg_builder.set_input(temp_path), - ffmpeg_builder.deep_copy_audio(), + ffmpeg_builder.pipe_input(audio_pipe_format), + ffmpeg_builder.copy_audio_encoder(), ffmpeg_builder.strip_metadata(), ffmpeg_builder.force_output(asset_path) ) - return run_ffmpeg(commands).returncode == 0 + return run_ffmpeg_with_pipe(commands, media_chunk_reader).returncode == 0 -def sanitize_image(temp_path : str, asset_path : str) -> bool: +def sanitize_image(image_format : str, media_chunk_reader : MediaChunkReader, asset_path : str) -> bool: + image_pipe_format = ffmpeg_builder.resolve_image_pipe_format(image_format) commands = ffmpeg_builder.chain( - ffmpeg_builder.set_input(temp_path), + ffmpeg_builder.pipe_image(image_pipe_format), ffmpeg_builder.deep_copy_image(), ffmpeg_builder.strip_metadata(), ffmpeg_builder.force_output(asset_path) ) - return run_ffmpeg(commands).returncode == 0 + return run_ffmpeg_with_pipe(commands, media_chunk_reader).returncode == 0 -def sanitize_video(temp_path : str, asset_path : str) -> bool: +def sanitize_video(video_format : str, media_chunk_reader : MediaChunkReader, asset_path : str, security_strategy : ApiSecurityStrategy) -> bool: + video_pipe_format = ffmpeg_builder.resolve_video_pipe_format(video_format) + + if security_strategy == 'strict': + commands = ffmpeg_builder.chain( + ffmpeg_builder.pipe_input(video_pipe_format), + ffmpeg_builder.set_video_encoder('libx264'), + ffmpeg_builder.set_video_preset('libx264', 'ultrafast'), + ffmpeg_builder.set_pixel_format('libx264'), + ffmpeg_builder.deep_copy_video(), + ffmpeg_builder.deep_copy_audio(), + ffmpeg_builder.strip_metadata(), + ffmpeg_builder.force_output(asset_path) + ) + return run_ffmpeg_with_pipe(commands, media_chunk_reader).returncode == 0 + commands = ffmpeg_builder.chain( - ffmpeg_builder.set_input(temp_path), - ffmpeg_builder.deep_copy_video(), + ffmpeg_builder.pipe_input(video_pipe_format), + ffmpeg_builder.copy_video_encoder(), + ffmpeg_builder.copy_audio_encoder(), ffmpeg_builder.strip_metadata(), ffmpeg_builder.force_output(asset_path) ) - return run_ffmpeg(commands).returncode == 0 + return run_ffmpeg_with_pipe(commands, media_chunk_reader).returncode == 0 def fix_audio_encoder(video_format : VideoFormat, audio_encoder : AudioEncoder) -> AudioEncoder: diff --git a/facefusion/ffmpeg_builder.py b/facefusion/ffmpeg_builder.py index f434a787..a4e24f1a 100644 --- a/facefusion/ffmpeg_builder.py +++ b/facefusion/ffmpeg_builder.py @@ -47,6 +47,14 @@ def set_input(input_path : str) -> List[Command]: return [ '-i', input_path ] +def pipe_input(pipe_format : str) -> List[Command]: + return [ '-f', pipe_format, '-i', 'pipe:0' ] + + +def pipe_image(image_format : str) -> List[Command]: + return [ '-f', 'image2pipe', '-c:v', image_format, '-i', 'pipe:0' ] + + def set_input_fps(input_fps : Fps) -> List[Command]: return [ '-r', str(input_fps) ] @@ -289,3 +297,27 @@ def map_qsv_preset(video_preset : VideoPreset) -> Optional[str]: if video_preset in [ 'faster', 'fast', 'medium', 'slow', 'slower', 'veryslow' ]: return video_preset return None + + +def resolve_audio_pipe_format(audio_format : str) -> str: + if audio_format == 'm4a': + return 'mp4' + if audio_format == 'opus': + return 'ogg' + return audio_format + + +def resolve_image_pipe_format(image_format : str) -> str: + if image_format == 'jpeg': + return 'mjpeg' + return image_format + + +def resolve_video_pipe_format(video_format : str) -> str: + if video_format == 'mkv': + return 'matroska' + if video_format == 'm4v': + return 'mp4' + if video_format == 'wmv': + return 'asf' + return video_format diff --git a/facefusion/locales.py b/facefusion/locales.py index f025f65c..19af72d8 100644 --- a/facefusion/locales.py +++ b/facefusion/locales.py @@ -161,6 +161,7 @@ LOCALES : Locales =\ 'benchmark_cycle_count': 'specify the amount of cycles per benchmark', 'api_host': 'specify the API host', 'api_port': 'specify the API port', + 'api_security_strategy': 'specify the API security strategy used for sanitizing uploaded assets', 'execution_device_ids': 'specify the devices used for processing', 'execution_providers': 'inference using different providers (choices: {choices}, ...)', 'execution_thread_count': 'specify the amount of parallel threads while processing', diff --git a/facefusion/program.py b/facefusion/program.py index 54340083..964fda79 100755 --- a/facefusion/program.py +++ b/facefusion/program.py @@ -822,6 +822,17 @@ def create_api_program() -> ArgumentParser: type = int, default = config.get_int_value('api', 'api_port', '8000') ) + capability_store.register_capability_set( + [ + group_api.add_argument( + '--api-security-strategy', + help = translator.get('help.api_security_strategy'), + default = config.get_str_value('api', 'api_security_strategy', 'strict'), + choices = facefusion.choices.api_security_strategies + ) + ], + scopes = [ 'api' ] + ) return program diff --git a/facefusion/types.py b/facefusion/types.py index 21215d5e..f3ac9b14 100755 --- a/facefusion/types.py +++ b/facefusion/types.py @@ -1,3 +1,4 @@ +import queue from collections import namedtuple from datetime import datetime from typing import Any, Callable, Dict, List, Literal, NotRequired, Optional, Tuple, TypeAlias, TypedDict, Union @@ -361,6 +362,9 @@ Download = TypedDict('Download', DownloadSet : TypeAlias = Dict[str, Download] VideoMemoryStrategy = Literal['strict', 'moderate', 'tolerant'] +ApiSecurityStrategy = Literal['strict', 'moderate'] +UploadQueue : TypeAlias = queue.SimpleQueue[bytes] +MediaChunkReader : TypeAlias = Callable[[], bytes] AppContext = Literal['cli', 'api'] InferencePool : TypeAlias = Dict[str, InferenceSession] @@ -451,6 +455,7 @@ StateKey = Literal\ 'api_host', 'api_port', 'api_key', + 'api_security_strategy', 'job_id', 'job_status', 'step_index' @@ -522,6 +527,7 @@ State = TypedDict('State', 'api_host' : str, 'api_port' : int, 'api_key' : str, + 'api_security_strategy' : ApiSecurityStrategy, 'job_id' : str, 'job_status' : JobStatus, 'step_index' : int diff --git a/tests/assert_helper.py b/tests/assert_helper.py index 82c07a7e..679fe6f2 100644 --- a/tests/assert_helper.py +++ b/tests/assert_helper.py @@ -1,5 +1,7 @@ +import io import os import tempfile +from functools import partial from facefusion.filesystem import are_images, create_directory, is_directory, is_file, remove_directory, resolve_file_paths from facefusion.types import JobStatus @@ -46,3 +48,8 @@ def prepare_test_output_directory() -> bool: remove_directory(test_outputs_directory) create_directory(test_outputs_directory) return is_directory(test_outputs_directory) + + +def create_media_reader(file_path : str) -> partial[bytes]: + file_buffer = io.BytesIO(open(file_path, 'rb').read()) + return partial(file_buffer.read, 1024) diff --git a/tests/test_api_assets.py b/tests/test_api_assets.py index 656ec727..78bd411a 100644 --- a/tests/test_api_assets.py +++ b/tests/test_api_assets.py @@ -289,3 +289,37 @@ def test_delete_assets(test_client : TestClient) -> None: }) assert delete_response.status_code == 404 + + +def test_upload_asset_security_strategies(test_client : TestClient) -> None: + source_path = get_test_example_file('source.jpg') + target_path = get_test_example_file('target-240p.mp4') + + for strategy in [ 'strict', 'moderate' ]: + state_manager.init_item('api_security_strategy', strategy) + + create_session_response = test_client.post('/session', json = + { + 'client_version': metadata.get('version') + }) + access_token = create_session_response.json().get('access_token') + session_id = session_manager.find_session_id(access_token) + + with open(source_path, 'rb') as source_file, open(target_path, 'rb') as target_file: + upload_response = test_client.post('/assets?type=source', headers = + { + 'Authorization': 'Bearer ' + access_token + }, files = + [ + ('file', ('source.jpg', source_file.read(), 'image/jpeg')), + ('file', ('target.mp4', target_file.read(), 'video/mp4')) + ]) + + assert upload_response.status_code == 201 + + asset_ids = upload_response.json().get('asset_ids') + + assert asset_store.get_asset(session_id, asset_ids[0]).get('media') == 'image' + assert asset_store.get_asset(session_id, asset_ids[1]).get('media') == 'video' + + state_manager.init_item('api_security_strategy', 'strict') diff --git a/tests/test_ffmpeg.py b/tests/test_ffmpeg.py index dc48da54..81a28cd9 100644 --- a/tests/test_ffmpeg.py +++ b/tests/test_ffmpeg.py @@ -7,11 +7,11 @@ import pytest import facefusion.ffmpeg from facefusion import process_manager, state_manager from facefusion.download import conditional_download -from facefusion.ffmpeg import concat_video, extract_frames, merge_video, read_audio_buffer, replace_audio, restore_audio, spawn_frames -from facefusion.filesystem import copy_file +from facefusion.ffmpeg import concat_video, extract_frames, merge_video, read_audio_buffer, replace_audio, restore_audio, sanitize_audio, sanitize_image, sanitize_video, spawn_frames +from facefusion.filesystem import copy_file, is_audio, is_image, is_video from facefusion.temp_helper import clear_temp_directory, create_temp_directory, get_temp_file_path, resolve_temp_frame_paths from facefusion.types import EncoderSet -from .assert_helper import get_test_example_file, get_test_examples_directory, get_test_output_path, prepare_test_output_directory +from .assert_helper import create_media_reader, get_test_example_file, get_test_examples_directory, get_test_output_path, prepare_test_output_directory @pytest.fixture(scope = 'module', autouse = True) @@ -210,3 +210,37 @@ def test_replace_audio() -> None: clear_temp_directory(state_manager.get_temp_path(), output_path) state_manager.init_item('output_audio_encoder', 'aac') + + +def test_sanitize_audio() -> None: + audio_path = get_test_example_file('source.mp3') + + output_strict_path = get_test_output_path('output-strict.mp3') + output_moderate_path = get_test_output_path('output-moderate.mp3') + + assert sanitize_audio('mp3', create_media_reader(audio_path), output_strict_path, 'strict') is True + assert is_audio(output_strict_path) is True + + assert sanitize_audio('mp3', create_media_reader(audio_path), output_moderate_path, 'moderate') is True + assert is_audio(output_moderate_path) is True + + +def test_sanitize_image() -> None: + source_path = get_test_example_file('source.jpg') + output_path = get_test_output_path('output.jpg') + + assert sanitize_image('jpeg', create_media_reader(source_path), output_path) is True + assert is_image(output_path) is True + + +def test_sanitize_video() -> None: + video_path = get_test_example_file('target-240p.mp4') + + output_strict_path = get_test_output_path('output-strict.mp4') + output_moderate_path = get_test_output_path('output-moderate.mp4') + + assert sanitize_video('mp4', create_media_reader(video_path), output_strict_path, 'strict') is True + assert is_video(output_strict_path) is True + + assert sanitize_video('mp4', create_media_reader(video_path), output_moderate_path, 'moderate') is True + assert is_video(output_moderate_path) is True