asset validation and image encoder lookup (#1058)

* asset validation and image encoder lookup

* asset validation and image encoder lookup

* asset validation and image encoder lookup
This commit is contained in:
Henry Ruhs
2026-03-11 10:52:01 +01:00
committed by henryruhs
parent b67aaf1cd1
commit 0d0c27b117
9 changed files with 140 additions and 100 deletions
+77 -3
View File
@@ -1,8 +1,14 @@
from typing import Optional
import os
import tempfile
from typing import List, Optional
from starlette.datastructures import UploadFile
import facefusion.choices
from facefusion import ffmpeg, process_manager, state_manager
from facefusion.audio import detect_audio_duration
from facefusion.ffprobe import detect_audio_channel_total, detect_audio_frame_total, detect_audio_sample_rate
from facefusion.filesystem import is_audio, is_image, is_video
from facefusion.filesystem import create_directory, get_file_extension, get_file_format, get_file_name, is_audio, is_image, is_video, remove_file
from facefusion.types import AudioMetadata, ImageMetadata, MediaType, VideoMetadata
from facefusion.vision import count_video_frame_total, detect_image_resolution, detect_video_duration, detect_video_fps, detect_video_resolution
@@ -37,7 +43,7 @@ def extract_video_metadata(file_path : str) -> VideoMetadata:
return metadata
def detect_media_type(file_path : str) -> Optional[MediaType]:
def detect_media_type_by_path(file_path : str) -> Optional[MediaType]:
if is_audio(file_path):
return 'audio'
if is_image(file_path):
@@ -45,3 +51,71 @@ def detect_media_type(file_path : str) -> Optional[MediaType]:
if is_video(file_path):
return 'video'
return None
def detect_media_type_by_format(file_format : str) -> Optional[MediaType]:
if file_format in facefusion.choices.audio_set:
return 'audio'
if file_format in facefusion.choices.image_set:
return 'image'
if file_format in facefusion.choices.video_set:
return 'video'
return None
def validate_asset_files(upload_files : List[UploadFile]) -> bool:
available_encoder_set = ffmpeg.get_available_encoder_set()
for upload_file in upload_files:
file_format = get_file_format(upload_file.filename)
media_type = detect_media_type_by_format(file_format)
if media_type == 'audio' and facefusion.choices.audio_set.get(file_format) not in available_encoder_set.get('audio'): #type:ignore[call-overload]
return False
if media_type == 'image' and facefusion.choices.image_set.get(file_format) not in available_encoder_set.get('image'): #type:ignore[call-overload]
return False
if media_type == 'video' and facefusion.choices.video_set.get(file_format) not in available_encoder_set.get('video'): #type:ignore[call-overload]
return False
return True
async def save_asset_files(upload_files : List[UploadFile]) -> List[str]:
asset_paths : List[str] = []
for upload_file in upload_files:
upload_file_extension = get_file_extension(upload_file.filename)
with tempfile.NamedTemporaryFile(suffix = upload_file_extension, delete = False) as temp_file:
while upload_chunk := await upload_file.read(1024):
temp_file.write(upload_chunk)
temp_file.flush()
media_type = detect_media_type_by_path(temp_file.name)
temp_path = state_manager.get_temp_path()
create_directory(temp_path)
asset_file_name = get_file_name(temp_file.name)
asset_path = os.path.join(temp_path, asset_file_name + upload_file_extension)
process_manager.start()
if media_type == 'audio' and ffmpeg.sanitize_audio(temp_file.name, asset_path):
asset_paths.append(asset_path)
if media_type == 'image' and ffmpeg.sanitize_image(temp_file.name, asset_path):
asset_paths.append(asset_path)
if media_type == 'video' and ffmpeg.sanitize_video(temp_file.name, asset_path):
asset_paths.append(asset_path)
process_manager.end()
remove_file(temp_file.name)
return asset_paths
+2 -2
View File
@@ -2,7 +2,7 @@ import uuid
from datetime import datetime, timedelta
from typing import List, Optional, cast
from facefusion.apis.asset_helper import detect_media_type, extract_audio_metadata, extract_image_metadata, extract_video_metadata
from facefusion.apis.asset_helper import detect_media_type_by_path, extract_audio_metadata, extract_image_metadata, extract_video_metadata
from facefusion.filesystem import get_file_format, get_file_name, get_file_size
from facefusion.types import AssetId, AssetSet, AssetStore, AssetType, AudioAsset, AudioFormat, ImageAsset, ImageFormat, SessionId, VideoAsset, VideoFormat
@@ -14,7 +14,7 @@ def create_asset(session_id : SessionId, asset_type : AssetType, asset_path : st
asset_name = get_file_name(asset_path)
asset_format = get_file_format(asset_path)
asset_size = get_file_size(asset_path)
media_type = detect_media_type(asset_path)
media_type = detect_media_type_by_path(asset_path)
created_at = datetime.now()
expires_at = created_at + timedelta(hours = 2)
+10 -47
View File
@@ -1,17 +1,15 @@
import os
import tempfile
from typing import List
from starlette.datastructures import UploadFile
from starlette.requests import Request
from starlette.responses import FileResponse, JSONResponse, Response
from starlette.status import HTTP_200_OK, HTTP_201_CREATED, HTTP_400_BAD_REQUEST, HTTP_404_NOT_FOUND
from starlette.status import HTTP_200_OK, HTTP_201_CREATED, HTTP_400_BAD_REQUEST, HTTP_404_NOT_FOUND, HTTP_415_UNSUPPORTED_MEDIA_TYPE
from facefusion import ffmpeg, process_manager, session_context, session_manager, state_manager
from facefusion import session_context, session_manager
from facefusion.apis import asset_store
from facefusion.apis.asset_helper import detect_media_type
from facefusion.apis.asset_helper import save_asset_files, validate_asset_files
from facefusion.apis.endpoints.session import extract_access_token
from facefusion.filesystem import create_directory, get_file_extension, get_file_name, remove_file
from facefusion.filesystem import remove_file
async def upload_asset(request : Request) -> Response:
@@ -24,13 +22,17 @@ async def upload_asset(request : Request) -> Response:
form = await request.form()
upload_files = form.getlist('file')
asset_paths = await save_asset_files(upload_files) #type:ignore[arg-type]
if not validate_asset_files(upload_files):
return Response(status_code = HTTP_415_UNSUPPORTED_MEDIA_TYPE)
asset_paths = await save_asset_files(upload_files)
if asset_paths:
asset_ids : List[str] = []
for asset_path in asset_paths:
asset = asset_store.create_asset(session_id, asset_type, asset_path) #type:ignore[arg-type]
asset = asset_store.create_asset(session_id, asset_type, asset_path)
if asset:
asset_id = asset.get('id')
@@ -47,45 +49,6 @@ async def upload_asset(request : Request) -> Response:
return Response(status_code = HTTP_400_BAD_REQUEST)
async def save_asset_files(upload_files : List[UploadFile]) -> List[str]:
asset_paths : List[str] = []
for upload_file in upload_files:
upload_file_extension = get_file_extension(upload_file.filename)
with tempfile.NamedTemporaryFile(suffix = upload_file_extension, delete = False) as temp_file:
while upload_chunk := await upload_file.read(1024):
temp_file.write(upload_chunk)
temp_file.flush()
media_type = detect_media_type(temp_file.name)
temp_path = state_manager.get_temp_path()
create_directory(temp_path)
asset_file_name = get_file_name(temp_file.name)
asset_path = os.path.join(temp_path, asset_file_name + upload_file_extension)
process_manager.start()
if media_type == 'audio' and ffmpeg.sanitize_audio(temp_file.name, asset_path):
asset_paths.append(asset_path)
if media_type == 'image' and ffmpeg.sanitize_image(temp_file.name, asset_path):
asset_paths.append(asset_path)
if media_type == 'video' and ffmpeg.sanitize_video(temp_file.name, asset_path):
asset_paths.append(asset_path)
process_manager.end()
remove_file(temp_file.name)
return asset_paths
async def get_assets(request : Request) -> Response:
access_token = extract_access_token(request.scope)
session_id = session_manager.find_session_id(access_token)
+28 -32
View File
@@ -2,7 +2,7 @@ import logging
from typing import List, Sequence, get_args
from facefusion.common_helper import create_float_range, create_int_range
from facefusion.types import Angle, AudioEncoder, AudioFormat, AudioTypeSet, BenchmarkMode, BenchmarkResolution, BenchmarkSet, DownloadProvider, DownloadProviderSet, DownloadScope, EncoderSet, ExecutionProvider, ExecutionProviderSet, FaceDetectorModel, FaceDetectorSet, FaceLandmarkerModel, FaceMaskArea, FaceMaskAreaSet, FaceMaskRegion, FaceMaskRegionSet, FaceMaskType, FaceOccluderModel, FaceParserModel, FaceSelectorMode, FaceSelectorOrder, Gender, ImageFormat, ImageTypeSet, JobStatus, LogLevel, LogLevelSet, Race, Score, TempFrameFormat, UiWorkflow, VideoEncoder, VideoFormat, VideoMemoryStrategy, VideoPreset, VideoTypeSet, VoiceExtractorModel
from facefusion.types import Angle, AudioEncoder, AudioFormat, AudioSet, BenchmarkMode, BenchmarkResolution, BenchmarkSet, DownloadProvider, DownloadProviderSet, DownloadScope, ExecutionProvider, ExecutionProviderSet, FaceDetectorModel, FaceDetectorSet, FaceLandmarkerModel, FaceMaskArea, FaceMaskAreaSet, FaceMaskRegion, FaceMaskRegionSet, FaceMaskType, FaceOccluderModel, FaceParserModel, FaceSelectorMode, FaceSelectorOrder, Gender, ImageEncoder, ImageFormat, ImageSet, JobStatus, LogLevel, LogLevelSet, Race, Score, TempFrameFormat, VideoEncoder, VideoFormat, VideoMemoryStrategy, VideoPreset, VideoSet, VoiceExtractorModel, WorkFlow
face_detector_set : FaceDetectorSet =\
{
@@ -47,48 +47,44 @@ voice_extractor_models : List[VoiceExtractorModel] = list(get_args(VoiceExtracto
workflows : List[WorkFlow] = [ 'auto', 'audio-to-image:frames', 'audio-to-image:video', 'image-to-image', 'image-to-video', 'image-to-video:frames' ]
audio_type_set : AudioTypeSet =\
audio_set : AudioSet =\
{
'flac': 'audio/flac',
'm4a': 'audio/mp4',
'mp3': 'audio/mpeg',
'ogg': 'audio/ogg',
'opus': 'audio/opus',
'wav': 'audio/x-wav'
'flac': 'flac',
'm4a': 'aac',
'mp3': 'libmp3lame',
'ogg': 'flac',
'opus': 'libopus',
'wav': 'pcm_s16le'
}
image_type_set : ImageTypeSet =\
image_set : ImageSet =\
{
'bmp': 'image/bmp',
'jpeg': 'image/jpeg',
'png': 'image/png',
'tiff': 'image/tiff',
'webp': 'image/webp'
'bmp': 'bmp',
'jpeg': 'mjpeg',
'png': 'png',
'tiff': 'tiff',
'webp': 'libwebp'
}
video_type_set : VideoTypeSet =\
video_set : VideoSet =\
{
'avi': 'video/x-msvideo',
'm4v': 'video/mp4',
'mkv': 'video/x-matroska',
'mp4': 'video/mp4',
'mpeg': 'video/mpeg',
'mov': 'video/quicktime',
'mxf': 'application/mxf',
'webm': 'video/webm',
'wmv': 'video/x-ms-wmv'
'avi': 'mpeg4',
'm4v': 'libx264',
'mkv': 'libx264',
'mov': 'libx264',
'mp4': 'libx264',
'mpeg': 'mpeg1video',
'mxf': 'mpeg2video',
'webm': 'libvpx-vp9',
'wmv': 'msmpeg4'
}
audio_formats : List[AudioFormat] = list(get_args(AudioFormat))
image_formats : List[ImageFormat] = list(get_args(ImageFormat))
video_formats : List[VideoFormat] = list(get_args(VideoFormat))
temp_frame_formats : List[TempFrameFormat] = list(get_args(TempFrameFormat))
output_audio_encoders : List[AudioEncoder] = list(get_args(AudioEncoder))
output_video_encoders : List[VideoEncoder] = list(get_args(VideoEncoder))
output_encoder_set : EncoderSet =\
{
'audio': output_audio_encoders,
'video': output_video_encoders
}
output_video_presets : List[VideoPreset] = list(get_args(VideoPreset))
audio_encoders : List[AudioEncoder] = list(get_args(AudioEncoder))
image_encoders : List[ImageEncoder] = list(get_args(ImageEncoder))
video_encoders : List[VideoEncoder] = list(get_args(VideoEncoder))
video_presets : List[VideoPreset] = list(get_args(VideoPreset))
benchmark_modes : List[BenchmarkMode] = list(get_args(BenchmarkMode))
benchmark_set : BenchmarkSet =\
+11 -8
View File
@@ -83,6 +83,7 @@ def get_available_encoder_set() -> EncoderSet:
available_encoder_set : EncoderSet =\
{
'audio': [],
'image': [],
'video': []
}
commands = ffmpeg_builder.chain(
@@ -94,15 +95,17 @@ def get_available_encoder_set() -> EncoderSet:
if line.startswith(' a'):
audio_encoder = line.split()[1]
if audio_encoder in facefusion.choices.output_audio_encoders:
index = facefusion.choices.output_audio_encoders.index(audio_encoder) #type:ignore[arg-type]
available_encoder_set['audio'].insert(index, audio_encoder) #type:ignore[arg-type]
if line.startswith(' v'):
video_encoder = line.split()[1]
if audio_encoder in facefusion.choices.audio_encoders and audio_encoder not in available_encoder_set.get('audio'):
available_encoder_set['audio'].append(audio_encoder) #type:ignore[arg-type]
if video_encoder in facefusion.choices.output_video_encoders:
index = facefusion.choices.output_video_encoders.index(video_encoder) #type:ignore[arg-type]
available_encoder_set['video'].insert(index, video_encoder) #type:ignore[arg-type]
if line.startswith(' v'):
vision_encoder = line.split()[1]
if vision_encoder in facefusion.choices.image_encoders and vision_encoder not in available_encoder_set.get('image'):
available_encoder_set['image'].append(vision_encoder) #type:ignore[arg-type]
if vision_encoder in facefusion.choices.video_encoders and vision_encoder not in available_encoder_set.get('video'):
available_encoder_set['video'].append(vision_encoder) #type:ignore[arg-type]
return available_encoder_set
+1 -1
View File
@@ -659,7 +659,7 @@ def create_output_creation_program() -> ArgumentParser:
'--output-video-preset',
help = translator.get('help.output_video_preset'),
default = config.get_str_value('output_creation', 'output_video_preset', 'veryfast'),
choices = facefusion.choices.output_video_presets
choices = facefusion.choices.video_presets
)
],
scopes = [ 'api', 'cli' ]
+5 -3
View File
@@ -163,15 +163,17 @@ AudioFormat = Literal['flac', 'm4a', 'mp3', 'ogg', 'opus', 'wav']
ImageFormat = Literal['bmp', 'jpeg', 'png', 'tiff', 'webp']
VideoFormat = Literal['avi', 'm4v', 'mkv', 'mov', 'mp4', 'mpeg', 'mxf', 'webm', 'wmv']
TempFrameFormat = Literal['bmp', 'jpeg', 'png', 'tiff']
AudioTypeSet : TypeAlias = Dict[AudioFormat, str]
ImageTypeSet : TypeAlias = Dict[ImageFormat, str]
VideoTypeSet : TypeAlias = Dict[VideoFormat, str]
AudioEncoder = Literal['flac', 'aac', 'libmp3lame', 'libopus', 'libvorbis', 'pcm_s16le', 'pcm_s32le']
ImageEncoder = Literal['bmp', 'mjpeg', 'png', 'tiff', 'libwebp']
VideoEncoder = Literal['libx264', 'libx264rgb', 'libx265', 'libvpx-vp9', 'h264_nvenc', 'hevc_nvenc', 'h264_amf', 'hevc_amf', 'h264_qsv', 'hevc_qsv', 'h264_videotoolbox', 'hevc_videotoolbox', 'rawvideo']
AudioSet : TypeAlias = Dict[AudioFormat, str]
ImageSet : TypeAlias = Dict[ImageFormat, str]
VideoSet : TypeAlias = Dict[VideoFormat, str]
EncoderSet = TypedDict('EncoderSet',
{
'audio' : List[AudioEncoder],
'image' : List[ImageEncoder],
'video' : List[VideoEncoder]
})
VideoPreset = Literal['ultrafast', 'superfast', 'veryfast', 'faster', 'fast', 'medium', 'slow', 'slower', 'veryslow']
+4 -4
View File
@@ -1,6 +1,6 @@
import pytest
from facefusion.apis.asset_helper import detect_media_type, extract_audio_metadata, extract_image_metadata, extract_video_metadata
from facefusion.apis.asset_helper import detect_media_type_by_path, extract_audio_metadata, extract_image_metadata, extract_video_metadata
from facefusion.download import conditional_download
from .assert_helper import get_test_example_file, get_test_examples_directory
@@ -16,9 +16,9 @@ def before_all() -> None:
def test_detect_media_type() -> None:
assert detect_media_type(get_test_example_file('source.jpg')) == 'image'
assert detect_media_type(get_test_example_file('target-240p.mp4')) == 'video'
assert detect_media_type(get_test_example_file('source.mp3')) == 'audio'
assert detect_media_type_by_path(get_test_example_file('source.jpg')) == 'image'
assert detect_media_type_by_path(get_test_example_file('target-240p.mp4')) == 'video'
assert detect_media_type_by_path(get_test_example_file('source.mp3')) == 'audio'
def test_extract_image_metadata() -> None:
+2
View File
@@ -52,6 +52,7 @@ def get_available_encoder_set() -> EncoderSet:
return\
{
'audio': [ 'aac' ],
'image': [ 'png' ],
'video': [ 'libx264' ]
}
return facefusion.ffmpeg.get_available_encoder_set()
@@ -61,6 +62,7 @@ def test_get_available_encoder_set() -> None:
available_encoder_set = get_available_encoder_set()
assert 'aac' in available_encoder_set.get('audio')
assert 'png' in available_encoder_set.get('image')
assert 'libx264' in available_encoder_set.get('video')