From 0d0c27b117a5f278e0b03712cdba67ee98ab9ee1 Mon Sep 17 00:00:00 2001 From: Henry Ruhs Date: Wed, 11 Mar 2026 10:52:01 +0100 Subject: [PATCH] asset validation and image encoder lookup (#1058) * asset validation and image encoder lookup * asset validation and image encoder lookup * asset validation and image encoder lookup --- facefusion/apis/asset_helper.py | 80 +++++++++++++++++++++++++++-- facefusion/apis/asset_store.py | 4 +- facefusion/apis/endpoints/assets.py | 57 ++++---------------- facefusion/choices.py | 60 ++++++++++------------ facefusion/ffmpeg.py | 19 ++++--- facefusion/program.py | 2 +- facefusion/types.py | 8 +-- tests/test_asset_helper.py | 8 +-- tests/test_ffmpeg.py | 2 + 9 files changed, 140 insertions(+), 100 deletions(-) diff --git a/facefusion/apis/asset_helper.py b/facefusion/apis/asset_helper.py index d646b869..6d53d99c 100644 --- a/facefusion/apis/asset_helper.py +++ b/facefusion/apis/asset_helper.py @@ -1,8 +1,14 @@ -from typing import Optional +import os +import tempfile +from typing import List, Optional +from starlette.datastructures import UploadFile + +import facefusion.choices +from facefusion import ffmpeg, process_manager, state_manager from facefusion.audio import detect_audio_duration from facefusion.ffprobe import detect_audio_channel_total, detect_audio_frame_total, detect_audio_sample_rate -from facefusion.filesystem import is_audio, is_image, is_video +from facefusion.filesystem import create_directory, get_file_extension, get_file_format, get_file_name, is_audio, is_image, is_video, remove_file from facefusion.types import AudioMetadata, ImageMetadata, MediaType, VideoMetadata from facefusion.vision import count_video_frame_total, detect_image_resolution, detect_video_duration, detect_video_fps, detect_video_resolution @@ -37,7 +43,7 @@ def extract_video_metadata(file_path : str) -> VideoMetadata: return metadata -def detect_media_type(file_path : str) -> Optional[MediaType]: +def detect_media_type_by_path(file_path : str) -> Optional[MediaType]: if is_audio(file_path): return 'audio' if is_image(file_path): @@ -45,3 +51,71 @@ def detect_media_type(file_path : str) -> Optional[MediaType]: if is_video(file_path): return 'video' return None + + +def detect_media_type_by_format(file_format : str) -> Optional[MediaType]: + if file_format in facefusion.choices.audio_set: + return 'audio' + if file_format in facefusion.choices.image_set: + return 'image' + if file_format in facefusion.choices.video_set: + return 'video' + return None + + +def validate_asset_files(upload_files : List[UploadFile]) -> bool: + available_encoder_set = ffmpeg.get_available_encoder_set() + + for upload_file in upload_files: + file_format = get_file_format(upload_file.filename) + media_type = detect_media_type_by_format(file_format) + + if media_type == 'audio' and facefusion.choices.audio_set.get(file_format) not in available_encoder_set.get('audio'): #type:ignore[call-overload] + return False + + if media_type == 'image' and facefusion.choices.image_set.get(file_format) not in available_encoder_set.get('image'): #type:ignore[call-overload] + return False + + if media_type == 'video' and facefusion.choices.video_set.get(file_format) not in available_encoder_set.get('video'): #type:ignore[call-overload] + return False + + return True + + +async def save_asset_files(upload_files : List[UploadFile]) -> List[str]: + asset_paths : List[str] = [] + + for upload_file in upload_files: + upload_file_extension = get_file_extension(upload_file.filename) + + with tempfile.NamedTemporaryFile(suffix = upload_file_extension, delete = False) as temp_file: + + while upload_chunk := await upload_file.read(1024): + temp_file.write(upload_chunk) + + temp_file.flush() + + media_type = detect_media_type_by_path(temp_file.name) + temp_path = state_manager.get_temp_path() + + create_directory(temp_path) + + asset_file_name = get_file_name(temp_file.name) + asset_path = os.path.join(temp_path, asset_file_name + upload_file_extension) + + process_manager.start() + + if media_type == 'audio' and ffmpeg.sanitize_audio(temp_file.name, asset_path): + asset_paths.append(asset_path) + + if media_type == 'image' and ffmpeg.sanitize_image(temp_file.name, asset_path): + asset_paths.append(asset_path) + + if media_type == 'video' and ffmpeg.sanitize_video(temp_file.name, asset_path): + asset_paths.append(asset_path) + + process_manager.end() + + remove_file(temp_file.name) + + return asset_paths diff --git a/facefusion/apis/asset_store.py b/facefusion/apis/asset_store.py index cc52afe7..cd040a9e 100644 --- a/facefusion/apis/asset_store.py +++ b/facefusion/apis/asset_store.py @@ -2,7 +2,7 @@ import uuid from datetime import datetime, timedelta from typing import List, Optional, cast -from facefusion.apis.asset_helper import detect_media_type, extract_audio_metadata, extract_image_metadata, extract_video_metadata +from facefusion.apis.asset_helper import detect_media_type_by_path, extract_audio_metadata, extract_image_metadata, extract_video_metadata from facefusion.filesystem import get_file_format, get_file_name, get_file_size from facefusion.types import AssetId, AssetSet, AssetStore, AssetType, AudioAsset, AudioFormat, ImageAsset, ImageFormat, SessionId, VideoAsset, VideoFormat @@ -14,7 +14,7 @@ def create_asset(session_id : SessionId, asset_type : AssetType, asset_path : st asset_name = get_file_name(asset_path) asset_format = get_file_format(asset_path) asset_size = get_file_size(asset_path) - media_type = detect_media_type(asset_path) + media_type = detect_media_type_by_path(asset_path) created_at = datetime.now() expires_at = created_at + timedelta(hours = 2) diff --git a/facefusion/apis/endpoints/assets.py b/facefusion/apis/endpoints/assets.py index dad9d1db..fe10e3d1 100644 --- a/facefusion/apis/endpoints/assets.py +++ b/facefusion/apis/endpoints/assets.py @@ -1,17 +1,15 @@ import os -import tempfile from typing import List -from starlette.datastructures import UploadFile from starlette.requests import Request from starlette.responses import FileResponse, JSONResponse, Response -from starlette.status import HTTP_200_OK, HTTP_201_CREATED, HTTP_400_BAD_REQUEST, HTTP_404_NOT_FOUND +from starlette.status import HTTP_200_OK, HTTP_201_CREATED, HTTP_400_BAD_REQUEST, HTTP_404_NOT_FOUND, HTTP_415_UNSUPPORTED_MEDIA_TYPE -from facefusion import ffmpeg, process_manager, session_context, session_manager, state_manager +from facefusion import session_context, session_manager from facefusion.apis import asset_store -from facefusion.apis.asset_helper import detect_media_type +from facefusion.apis.asset_helper import save_asset_files, validate_asset_files from facefusion.apis.endpoints.session import extract_access_token -from facefusion.filesystem import create_directory, get_file_extension, get_file_name, remove_file +from facefusion.filesystem import remove_file async def upload_asset(request : Request) -> Response: @@ -24,13 +22,17 @@ async def upload_asset(request : Request) -> Response: form = await request.form() upload_files = form.getlist('file') - asset_paths = await save_asset_files(upload_files) #type:ignore[arg-type] + + if not validate_asset_files(upload_files): + return Response(status_code = HTTP_415_UNSUPPORTED_MEDIA_TYPE) + + asset_paths = await save_asset_files(upload_files) if asset_paths: asset_ids : List[str] = [] for asset_path in asset_paths: - asset = asset_store.create_asset(session_id, asset_type, asset_path) #type:ignore[arg-type] + asset = asset_store.create_asset(session_id, asset_type, asset_path) if asset: asset_id = asset.get('id') @@ -47,45 +49,6 @@ async def upload_asset(request : Request) -> Response: return Response(status_code = HTTP_400_BAD_REQUEST) -async def save_asset_files(upload_files : List[UploadFile]) -> List[str]: - asset_paths : List[str] = [] - - for upload_file in upload_files: - upload_file_extension = get_file_extension(upload_file.filename) - - with tempfile.NamedTemporaryFile(suffix = upload_file_extension, delete = False) as temp_file: - - while upload_chunk := await upload_file.read(1024): - temp_file.write(upload_chunk) - - temp_file.flush() - - media_type = detect_media_type(temp_file.name) - temp_path = state_manager.get_temp_path() - - create_directory(temp_path) - - asset_file_name = get_file_name(temp_file.name) - asset_path = os.path.join(temp_path, asset_file_name + upload_file_extension) - - process_manager.start() - - if media_type == 'audio' and ffmpeg.sanitize_audio(temp_file.name, asset_path): - asset_paths.append(asset_path) - - if media_type == 'image' and ffmpeg.sanitize_image(temp_file.name, asset_path): - asset_paths.append(asset_path) - - if media_type == 'video' and ffmpeg.sanitize_video(temp_file.name, asset_path): - asset_paths.append(asset_path) - - process_manager.end() - - remove_file(temp_file.name) - - return asset_paths - - async def get_assets(request : Request) -> Response: access_token = extract_access_token(request.scope) session_id = session_manager.find_session_id(access_token) diff --git a/facefusion/choices.py b/facefusion/choices.py index c4279d42..9d98d7fd 100755 --- a/facefusion/choices.py +++ b/facefusion/choices.py @@ -2,7 +2,7 @@ import logging from typing import List, Sequence, get_args from facefusion.common_helper import create_float_range, create_int_range -from facefusion.types import Angle, AudioEncoder, AudioFormat, AudioTypeSet, BenchmarkMode, BenchmarkResolution, BenchmarkSet, DownloadProvider, DownloadProviderSet, DownloadScope, EncoderSet, ExecutionProvider, ExecutionProviderSet, FaceDetectorModel, FaceDetectorSet, FaceLandmarkerModel, FaceMaskArea, FaceMaskAreaSet, FaceMaskRegion, FaceMaskRegionSet, FaceMaskType, FaceOccluderModel, FaceParserModel, FaceSelectorMode, FaceSelectorOrder, Gender, ImageFormat, ImageTypeSet, JobStatus, LogLevel, LogLevelSet, Race, Score, TempFrameFormat, UiWorkflow, VideoEncoder, VideoFormat, VideoMemoryStrategy, VideoPreset, VideoTypeSet, VoiceExtractorModel +from facefusion.types import Angle, AudioEncoder, AudioFormat, AudioSet, BenchmarkMode, BenchmarkResolution, BenchmarkSet, DownloadProvider, DownloadProviderSet, DownloadScope, ExecutionProvider, ExecutionProviderSet, FaceDetectorModel, FaceDetectorSet, FaceLandmarkerModel, FaceMaskArea, FaceMaskAreaSet, FaceMaskRegion, FaceMaskRegionSet, FaceMaskType, FaceOccluderModel, FaceParserModel, FaceSelectorMode, FaceSelectorOrder, Gender, ImageEncoder, ImageFormat, ImageSet, JobStatus, LogLevel, LogLevelSet, Race, Score, TempFrameFormat, VideoEncoder, VideoFormat, VideoMemoryStrategy, VideoPreset, VideoSet, VoiceExtractorModel, WorkFlow face_detector_set : FaceDetectorSet =\ { @@ -47,48 +47,44 @@ voice_extractor_models : List[VoiceExtractorModel] = list(get_args(VoiceExtracto workflows : List[WorkFlow] = [ 'auto', 'audio-to-image:frames', 'audio-to-image:video', 'image-to-image', 'image-to-video', 'image-to-video:frames' ] -audio_type_set : AudioTypeSet =\ +audio_set : AudioSet =\ { - 'flac': 'audio/flac', - 'm4a': 'audio/mp4', - 'mp3': 'audio/mpeg', - 'ogg': 'audio/ogg', - 'opus': 'audio/opus', - 'wav': 'audio/x-wav' + 'flac': 'flac', + 'm4a': 'aac', + 'mp3': 'libmp3lame', + 'ogg': 'flac', + 'opus': 'libopus', + 'wav': 'pcm_s16le' } -image_type_set : ImageTypeSet =\ +image_set : ImageSet =\ { - 'bmp': 'image/bmp', - 'jpeg': 'image/jpeg', - 'png': 'image/png', - 'tiff': 'image/tiff', - 'webp': 'image/webp' + 'bmp': 'bmp', + 'jpeg': 'mjpeg', + 'png': 'png', + 'tiff': 'tiff', + 'webp': 'libwebp' } -video_type_set : VideoTypeSet =\ +video_set : VideoSet =\ { - 'avi': 'video/x-msvideo', - 'm4v': 'video/mp4', - 'mkv': 'video/x-matroska', - 'mp4': 'video/mp4', - 'mpeg': 'video/mpeg', - 'mov': 'video/quicktime', - 'mxf': 'application/mxf', - 'webm': 'video/webm', - 'wmv': 'video/x-ms-wmv' + 'avi': 'mpeg4', + 'm4v': 'libx264', + 'mkv': 'libx264', + 'mov': 'libx264', + 'mp4': 'libx264', + 'mpeg': 'mpeg1video', + 'mxf': 'mpeg2video', + 'webm': 'libvpx-vp9', + 'wmv': 'msmpeg4' } audio_formats : List[AudioFormat] = list(get_args(AudioFormat)) image_formats : List[ImageFormat] = list(get_args(ImageFormat)) video_formats : List[VideoFormat] = list(get_args(VideoFormat)) temp_frame_formats : List[TempFrameFormat] = list(get_args(TempFrameFormat)) -output_audio_encoders : List[AudioEncoder] = list(get_args(AudioEncoder)) -output_video_encoders : List[VideoEncoder] = list(get_args(VideoEncoder)) -output_encoder_set : EncoderSet =\ -{ - 'audio': output_audio_encoders, - 'video': output_video_encoders -} -output_video_presets : List[VideoPreset] = list(get_args(VideoPreset)) +audio_encoders : List[AudioEncoder] = list(get_args(AudioEncoder)) +image_encoders : List[ImageEncoder] = list(get_args(ImageEncoder)) +video_encoders : List[VideoEncoder] = list(get_args(VideoEncoder)) +video_presets : List[VideoPreset] = list(get_args(VideoPreset)) benchmark_modes : List[BenchmarkMode] = list(get_args(BenchmarkMode)) benchmark_set : BenchmarkSet =\ diff --git a/facefusion/ffmpeg.py b/facefusion/ffmpeg.py index 436ec129..b05e8d43 100644 --- a/facefusion/ffmpeg.py +++ b/facefusion/ffmpeg.py @@ -83,6 +83,7 @@ def get_available_encoder_set() -> EncoderSet: available_encoder_set : EncoderSet =\ { 'audio': [], + 'image': [], 'video': [] } commands = ffmpeg_builder.chain( @@ -94,15 +95,17 @@ def get_available_encoder_set() -> EncoderSet: if line.startswith(' a'): audio_encoder = line.split()[1] - if audio_encoder in facefusion.choices.output_audio_encoders: - index = facefusion.choices.output_audio_encoders.index(audio_encoder) #type:ignore[arg-type] - available_encoder_set['audio'].insert(index, audio_encoder) #type:ignore[arg-type] - if line.startswith(' v'): - video_encoder = line.split()[1] + if audio_encoder in facefusion.choices.audio_encoders and audio_encoder not in available_encoder_set.get('audio'): + available_encoder_set['audio'].append(audio_encoder) #type:ignore[arg-type] - if video_encoder in facefusion.choices.output_video_encoders: - index = facefusion.choices.output_video_encoders.index(video_encoder) #type:ignore[arg-type] - available_encoder_set['video'].insert(index, video_encoder) #type:ignore[arg-type] + if line.startswith(' v'): + vision_encoder = line.split()[1] + + if vision_encoder in facefusion.choices.image_encoders and vision_encoder not in available_encoder_set.get('image'): + available_encoder_set['image'].append(vision_encoder) #type:ignore[arg-type] + + if vision_encoder in facefusion.choices.video_encoders and vision_encoder not in available_encoder_set.get('video'): + available_encoder_set['video'].append(vision_encoder) #type:ignore[arg-type] return available_encoder_set diff --git a/facefusion/program.py b/facefusion/program.py index 09f44fc1..4ecd1cdb 100755 --- a/facefusion/program.py +++ b/facefusion/program.py @@ -659,7 +659,7 @@ def create_output_creation_program() -> ArgumentParser: '--output-video-preset', help = translator.get('help.output_video_preset'), default = config.get_str_value('output_creation', 'output_video_preset', 'veryfast'), - choices = facefusion.choices.output_video_presets + choices = facefusion.choices.video_presets ) ], scopes = [ 'api', 'cli' ] diff --git a/facefusion/types.py b/facefusion/types.py index 1a6aff7d..f9137325 100755 --- a/facefusion/types.py +++ b/facefusion/types.py @@ -163,15 +163,17 @@ AudioFormat = Literal['flac', 'm4a', 'mp3', 'ogg', 'opus', 'wav'] ImageFormat = Literal['bmp', 'jpeg', 'png', 'tiff', 'webp'] VideoFormat = Literal['avi', 'm4v', 'mkv', 'mov', 'mp4', 'mpeg', 'mxf', 'webm', 'wmv'] TempFrameFormat = Literal['bmp', 'jpeg', 'png', 'tiff'] -AudioTypeSet : TypeAlias = Dict[AudioFormat, str] -ImageTypeSet : TypeAlias = Dict[ImageFormat, str] -VideoTypeSet : TypeAlias = Dict[VideoFormat, str] AudioEncoder = Literal['flac', 'aac', 'libmp3lame', 'libopus', 'libvorbis', 'pcm_s16le', 'pcm_s32le'] +ImageEncoder = Literal['bmp', 'mjpeg', 'png', 'tiff', 'libwebp'] VideoEncoder = Literal['libx264', 'libx264rgb', 'libx265', 'libvpx-vp9', 'h264_nvenc', 'hevc_nvenc', 'h264_amf', 'hevc_amf', 'h264_qsv', 'hevc_qsv', 'h264_videotoolbox', 'hevc_videotoolbox', 'rawvideo'] +AudioSet : TypeAlias = Dict[AudioFormat, str] +ImageSet : TypeAlias = Dict[ImageFormat, str] +VideoSet : TypeAlias = Dict[VideoFormat, str] EncoderSet = TypedDict('EncoderSet', { 'audio' : List[AudioEncoder], + 'image' : List[ImageEncoder], 'video' : List[VideoEncoder] }) VideoPreset = Literal['ultrafast', 'superfast', 'veryfast', 'faster', 'fast', 'medium', 'slow', 'slower', 'veryslow'] diff --git a/tests/test_asset_helper.py b/tests/test_asset_helper.py index 4c314f9e..40852629 100644 --- a/tests/test_asset_helper.py +++ b/tests/test_asset_helper.py @@ -1,6 +1,6 @@ import pytest -from facefusion.apis.asset_helper import detect_media_type, extract_audio_metadata, extract_image_metadata, extract_video_metadata +from facefusion.apis.asset_helper import detect_media_type_by_path, extract_audio_metadata, extract_image_metadata, extract_video_metadata from facefusion.download import conditional_download from .assert_helper import get_test_example_file, get_test_examples_directory @@ -16,9 +16,9 @@ def before_all() -> None: def test_detect_media_type() -> None: - assert detect_media_type(get_test_example_file('source.jpg')) == 'image' - assert detect_media_type(get_test_example_file('target-240p.mp4')) == 'video' - assert detect_media_type(get_test_example_file('source.mp3')) == 'audio' + assert detect_media_type_by_path(get_test_example_file('source.jpg')) == 'image' + assert detect_media_type_by_path(get_test_example_file('target-240p.mp4')) == 'video' + assert detect_media_type_by_path(get_test_example_file('source.mp3')) == 'audio' def test_extract_image_metadata() -> None: diff --git a/tests/test_ffmpeg.py b/tests/test_ffmpeg.py index 9612b73c..192d96fb 100644 --- a/tests/test_ffmpeg.py +++ b/tests/test_ffmpeg.py @@ -52,6 +52,7 @@ def get_available_encoder_set() -> EncoderSet: return\ { 'audio': [ 'aac' ], + 'image': [ 'png' ], 'video': [ 'libx264' ] } return facefusion.ffmpeg.get_available_encoder_set() @@ -61,6 +62,7 @@ def test_get_available_encoder_set() -> None: available_encoder_set = get_available_encoder_set() assert 'aac' in available_encoder_set.get('audio') + assert 'png' in available_encoder_set.get('image') assert 'libx264' in available_encoder_set.get('video')