FFmpeg powered sanitization, Chunk based upload write

This commit is contained in:
henryruhs
2026-01-18 12:37:22 +01:00
parent 77c1682078
commit 5fa088457e
4 changed files with 79 additions and 18 deletions
+19 -9
View File
@@ -1,3 +1,4 @@
import os
import tempfile
from typing import List
@@ -6,11 +7,11 @@ from starlette.requests import Request
from starlette.responses import JSONResponse, Response
from starlette.status import HTTP_201_CREATED, HTTP_400_BAD_REQUEST
from facefusion import session_manager
from facefusion import ffmpeg, session_manager, state_manager
from facefusion.apis import asset_store
from facefusion.apis.asset_helper import detect_media_type
from facefusion.apis.endpoints.session import extract_access_token
from facefusion.filesystem import get_file_extension, remove_file
from facefusion.filesystem import get_file_extension
async def upload_asset(request : Request) -> Response:
@@ -48,13 +49,22 @@ async def save_asset_files(upload_files : List[UploadFile]) -> List[str]:
for upload_file in upload_files:
upload_file_extension = get_file_extension(upload_file.filename)
with tempfile.NamedTemporaryFile(suffix = upload_file_extension, delete = False) as temp_file:
temp_content = await upload_file.read()
temp_file.write(temp_content)
with tempfile.NamedTemporaryFile(suffix = upload_file_extension) as temp_file:
if detect_media_type(temp_file.name):
asset_paths.append(temp_file.name)
else:
remove_file(temp_file.name)
while upload_chunk := await upload_file.read(1024):
temp_file.write(upload_chunk)
media_type = detect_media_type(temp_file.name)
temp_path = state_manager.get_temp_path()
asset_path = os.path.join(temp_path, temp_file.name + '.' + upload_file_extension)
if media_type == 'audio' and ffmpeg.sanitize_audio(temp_file.name, asset_path):
asset_paths.append(asset_path)
if media_type == 'image' and ffmpeg.sanitize_image(temp_file.name, asset_path):
asset_paths.append(asset_path)
if media_type == 'video' and ffmpeg.sanitize_video(temp_file.name, asset_path):
asset_paths.append(asset_path)
return asset_paths
+31
View File
@@ -178,6 +178,7 @@ def read_audio_buffer(target_path : str, audio_sample_rate : int, audio_sample_s
process = open_ffmpeg(commands)
audio_buffer, _ = process.communicate()
if process.returncode == 0:
return audio_buffer
return None
@@ -286,6 +287,36 @@ def concat_video(output_path : str, temp_output_paths : List[str]) -> bool:
return process.returncode == 0
def sanitize_audio(temp_path : str, asset_path : str) -> bool:
commands = ffmpeg_builder.chain(
ffmpeg_builder.set_input(temp_path),
ffmpeg_builder.deep_copy_audio(),
ffmpeg_builder.strip_metadata(),
ffmpeg_builder.force_output(asset_path)
)
return run_ffmpeg(commands).returncode == 0
def sanitize_image(temp_path : str, asset_path : str) -> bool:
commands = ffmpeg_builder.chain(
ffmpeg_builder.set_input(temp_path),
ffmpeg_builder.deep_copy_image(),
ffmpeg_builder.strip_metadata(),
ffmpeg_builder.force_output(asset_path)
)
return run_ffmpeg(commands).returncode == 0
def sanitize_video(temp_path : str, asset_path : str) -> bool:
commands = ffmpeg_builder.chain(
ffmpeg_builder.set_input(temp_path),
ffmpeg_builder.deep_copy_video(),
ffmpeg_builder.strip_metadata(),
ffmpeg_builder.force_output(asset_path)
)
return run_ffmpeg(commands).returncode == 0
def fix_audio_encoder(video_format : VideoFormat, audio_encoder : AudioEncoder) -> AudioEncoder:
if video_format == 'avi' and audio_encoder == 'libopus':
return 'aac'
+24 -8
View File
@@ -87,6 +87,10 @@ def enforce_pixel_format(pixel_format : str) -> List[Command]:
return [ '-pix_fmt', pixel_format ]
def strip_metadata() -> List[Command]:
return [ '-map_metadata', '-1' ]
def set_pixel_format(video_encoder : VideoEncoder) -> List[Command]:
if video_encoder == 'rawvideo':
return [ '-pix_fmt', 'rgb24' ]
@@ -131,12 +135,8 @@ def set_media_resolution(video_resolution : str) -> List[Command]:
return [ '-s', video_resolution ]
def set_image_quality(image_path : str, image_quality : int) -> List[Command]:
if get_file_format(image_path) == 'webp':
return [ '-q:v', str(image_quality) ]
image_compression = round(31 - (image_quality * 0.31))
return [ '-q:v', str(image_compression) ]
def deep_copy_audio() -> List[Command]:
return [ '-q:a', '0' ]
def set_audio_encoder(audio_codec : str) -> List[Command]:
@@ -176,13 +176,29 @@ def set_audio_quality(audio_encoder : AudioEncoder, audio_quality : int) -> List
if audio_encoder == 'libvorbis':
audio_compression = numpy.round(numpy.interp(audio_quality, [ 0, 100 ], [ -1, 10 ]), 1).astype(float).item()
return [ '-q:a', str(audio_compression) ]
return []
return [ '-q:a', '0' ]
def set_audio_volume(audio_volume : int) -> List[Command]:
return [ '-filter:a', 'volume=' + str(audio_volume / 100) ]
def deep_copy_image() -> List[Command]:
return [ '-q:v', '0' ]
def set_image_quality(image_path : str, image_quality : int) -> List[Command]:
if get_file_format(image_path) == 'webp':
return [ '-q:v', str(image_quality) ]
image_compression = round(31 - (image_quality * 0.31))
return [ '-q:v', str(image_compression) ]
def deep_copy_video() -> List[Command]:
return [ '-q:v', '0' ]
def set_video_encoder(video_encoder : str) -> List[Command]:
return [ '-c:v', video_encoder ]
@@ -210,7 +226,7 @@ def set_video_quality(video_encoder : VideoEncoder, video_quality : int) -> List
if video_encoder in [ 'h264_videotoolbox', 'hevc_videotoolbox' ]:
video_bit_rate = numpy.round(numpy.interp(video_quality, [ 0, 100 ], [ 1024, 50512 ])).astype(int).item()
return [ '-b:v', str(video_bit_rate) + 'k' ]
return []
return [ '-q:v', '0' ]
def set_video_preset(video_encoder : VideoEncoder, video_preset : VideoPreset) -> List[Command]:
+5 -1
View File
@@ -1,9 +1,10 @@
import tempfile
from typing import Iterator
import pytest
from starlette.testclient import TestClient
from facefusion import metadata, session_manager
from facefusion import metadata, process_manager, session_manager, state_manager
from facefusion.apis import asset_store
from facefusion.apis.core import create_api
from facefusion.download import conditional_download
@@ -12,6 +13,7 @@ from .helper import get_test_example_file, get_test_examples_directory
@pytest.fixture(scope = 'module', autouse = True)
def before_all() -> None:
process_manager.start()
conditional_download(get_test_examples_directory(),
[
'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/source.jpg',
@@ -27,6 +29,8 @@ def test_client() -> Iterator[TestClient]:
@pytest.fixture(scope = 'function', autouse = True)
def before_each() -> None:
state_manager.init_item('temp_path', tempfile.gettempdir())
state_manager.init_item('temp_frame_format', 'png')
session_manager.SESSIONS.clear()
asset_store.clear()