Remove upload queue & media chunk reader (#1076)

* remove upload queue & media chunk reader

* remove macos large

---------

Co-authored-by: henryruhs <info@henryruhs.com>
This commit is contained in:
Harisreedhar
2026-04-08 15:52:04 +05:30
committed by henryruhs
parent 1f80aa735a
commit 701a1b5f9e
5 changed files with 29 additions and 59 deletions
+5 -16
View File
@@ -1,6 +1,5 @@
import asyncio
import os
import queue
import uuid
from typing import List, Optional
@@ -9,7 +8,7 @@ from starlette.datastructures import UploadFile
import facefusion.choices
from facefusion import ffmpeg, process_manager, state_manager
from facefusion.filesystem import create_directory, get_file_extension, get_file_format, is_audio, is_image, is_video
from facefusion.types import ImageMetadata, MediaType, UploadQueue
from facefusion.types import ImageMetadata, MediaType
from facefusion.vision import detect_image_resolution
@@ -60,12 +59,6 @@ def validate_asset_files(upload_files : List[UploadFile]) -> bool:
return True
async def feed_upload_queue(upload_file : UploadFile, upload_queue : UploadQueue) -> None:
while file_chunk := await upload_file.read(1024):
upload_queue.put(file_chunk)
upload_queue.put(b'')
async def save_asset_files(upload_files : List[UploadFile]) -> List[str]:
asset_paths : List[str] = []
api_security_strategy = state_manager.get_item('api_security_strategy')
@@ -80,23 +73,19 @@ async def save_asset_files(upload_files : List[UploadFile]) -> List[str]:
asset_file_name = uuid.uuid4().hex
asset_path = os.path.join(temp_path, asset_file_name + file_extension)
upload_queue : UploadQueue = queue.SimpleQueue() #todo: not sure if queue is even needed
file_content = await upload_file.read()
process_manager.start()
upload_task = asyncio.create_task(feed_upload_queue(upload_file, upload_queue))
if media_type == 'audio' and await asyncio.to_thread(ffmpeg.sanitize_audio, upload_queue.get, asset_path, api_security_strategy):
if media_type == 'audio' and await asyncio.to_thread(ffmpeg.sanitize_audio, file_content, asset_path, api_security_strategy):
asset_paths.append(asset_path)
if media_type == 'image' and await asyncio.to_thread(ffmpeg.sanitize_image, upload_queue.get, asset_path):
if media_type == 'image' and await asyncio.to_thread(ffmpeg.sanitize_image, file_content, asset_path):
asset_paths.append(asset_path)
if media_type == 'video' and await asyncio.to_thread(ffmpeg.sanitize_video, upload_queue.get, asset_path, api_security_strategy):
if media_type == 'video' and await asyncio.to_thread(ffmpeg.sanitize_video, file_content, asset_path, api_security_strategy):
asset_paths.append(asset_path)
await upload_task
process_manager.end()
return asset_paths
+15 -27
View File
@@ -10,7 +10,7 @@ import facefusion.choices
from facefusion import ffmpeg_builder, logger, process_manager, state_manager, translator
from facefusion.filesystem import get_file_format, remove_file
from facefusion.temp_helper import get_temp_file_path, get_temp_frames_pattern
from facefusion.types import ApiSecurityStrategy, AudioBuffer, AudioEncoder, Command, EncoderSet, Fps, MediaChunkReader, Resolution, SampleRate, UpdateProgress, VideoEncoder, VideoFormat
from facefusion.types import ApiSecurityStrategy, AudioBuffer, AudioEncoder, Command, EncoderSet, Fps, Resolution, SampleRate, UpdateProgress, VideoEncoder, VideoFormat
from facefusion.vision import detect_video_duration, detect_video_fps, pack_resolution, predict_video_frame_total
@@ -45,29 +45,17 @@ def update_progress(progress : tqdm, frame_number : int) -> None:
progress.update(frame_number - progress.n)
def run_ffmpeg(commands : List[Command]) -> subprocess.Popen[bytes]:
commands = ffmpeg_builder.run(commands)
process = subprocess.Popen(commands, stderr = subprocess.PIPE, stdout = subprocess.PIPE)
return complete_process(process)
def run_ffmpeg_with_pipe(commands : List[Command], media_chunk_reader : MediaChunkReader) -> subprocess.Popen[bytes]:
def run_ffmpeg_with_pipe(commands : List[Command], file_content : bytes) -> subprocess.Popen[bytes]:
commands = ffmpeg_builder.run(commands)
process = subprocess.Popen(commands, stdin = subprocess.PIPE, stderr = subprocess.PIPE, stdout = subprocess.PIPE)
while media_chunk := media_chunk_reader():
if process.poll() is not None: #todo - poll not allowed
break
process.stdin.write(media_chunk)
if process.stdin and not process.stdin.closed:
process.stdin.close()
return complete_process(process)
process.communicate(input = file_content)
return process
def complete_process(process : subprocess.Popen[bytes]) -> subprocess.Popen[bytes]:
def run_ffmpeg(commands : List[Command]) -> subprocess.Popen[bytes]:
log_level = state_manager.get_item('log_level')
commands = ffmpeg_builder.run(commands)
process = subprocess.Popen(commands, stderr = subprocess.PIPE, stdout = subprocess.PIPE)
while process_manager.is_processing():
try:
@@ -313,7 +301,7 @@ def concat_video(output_path : str, temp_output_paths : List[str]) -> bool:
return process.returncode == 0
def sanitize_audio(media_chunk_reader : MediaChunkReader, asset_path : str, security_strategy : ApiSecurityStrategy) -> bool:
def sanitize_audio(file_content : bytes, asset_path : str, security_strategy : ApiSecurityStrategy) -> bool:
if security_strategy == 'strict':
commands = ffmpeg_builder.chain(
ffmpeg_builder.set_input('pipe:0'),
@@ -321,7 +309,7 @@ def sanitize_audio(media_chunk_reader : MediaChunkReader, asset_path : str, secu
ffmpeg_builder.strip_metadata(),
ffmpeg_builder.force_output(asset_path)
)
return run_ffmpeg_with_pipe(commands, media_chunk_reader).returncode == 0
return run_ffmpeg_with_pipe(commands, file_content).returncode == 0
commands = ffmpeg_builder.chain(
ffmpeg_builder.set_input('pipe:0'),
@@ -329,20 +317,20 @@ def sanitize_audio(media_chunk_reader : MediaChunkReader, asset_path : str, secu
ffmpeg_builder.strip_metadata(),
ffmpeg_builder.force_output(asset_path)
)
return run_ffmpeg_with_pipe(commands, media_chunk_reader).returncode == 0
return run_ffmpeg_with_pipe(commands, file_content).returncode == 0
def sanitize_image(media_chunk_reader : MediaChunkReader, asset_path : str) -> bool:
def sanitize_image(file_content : bytes, asset_path : str) -> bool:
commands = ffmpeg_builder.chain(
ffmpeg_builder.set_input('pipe:0'),
ffmpeg_builder.deep_copy_image(),
ffmpeg_builder.strip_metadata(),
ffmpeg_builder.force_output(asset_path)
)
return run_ffmpeg_with_pipe(commands, media_chunk_reader).returncode == 0
return run_ffmpeg_with_pipe(commands, file_content).returncode == 0
def sanitize_video(media_chunk_reader : MediaChunkReader, asset_path : str, security_strategy : ApiSecurityStrategy) -> bool:
def sanitize_video(file_content : bytes, asset_path : str, security_strategy : ApiSecurityStrategy) -> bool:
if security_strategy == 'strict':
available_video_encoders = get_static_available_encoder_set().get('video')
commands = ffmpeg_builder.chain(
@@ -355,7 +343,7 @@ def sanitize_video(media_chunk_reader : MediaChunkReader, asset_path : str, secu
ffmpeg_builder.strip_metadata(),
ffmpeg_builder.force_output(asset_path)
)
return run_ffmpeg_with_pipe(commands, media_chunk_reader).returncode == 0
return run_ffmpeg_with_pipe(commands, file_content).returncode == 0
commands = ffmpeg_builder.chain(
ffmpeg_builder.set_input('pipe:0'),
@@ -364,7 +352,7 @@ def sanitize_video(media_chunk_reader : MediaChunkReader, asset_path : str, secu
ffmpeg_builder.strip_metadata(),
ffmpeg_builder.force_output(asset_path)
)
return run_ffmpeg_with_pipe(commands, media_chunk_reader).returncode == 0
return run_ffmpeg_with_pipe(commands, file_content).returncode == 0
def fix_audio_encoder(video_format : VideoFormat, audio_encoder : AudioEncoder) -> AudioEncoder:
-3
View File
@@ -1,4 +1,3 @@
import queue
from collections import namedtuple
from datetime import datetime
from typing import Any, Callable, Dict, List, Literal, NotRequired, Optional, Tuple, TypeAlias, TypedDict, Union
@@ -367,8 +366,6 @@ DownloadSet : TypeAlias = Dict[str, Download]
VideoMemoryStrategy = Literal['strict', 'moderate', 'tolerant']
ApiSecurityStrategy = Literal['strict', 'moderate']
UploadQueue : TypeAlias = queue.SimpleQueue[bytes]
MediaChunkReader : TypeAlias = Callable[[], bytes]
AppContext = Literal['cli', 'api']
InferencePool : TypeAlias = Dict[str, InferenceSession]
-7
View File
@@ -1,7 +1,5 @@
import io
import os
import tempfile
from functools import partial
from facefusion.filesystem import are_images, create_directory, is_directory, is_file, remove_directory, resolve_file_paths
from facefusion.types import JobStatus
@@ -48,8 +46,3 @@ def prepare_test_output_directory() -> bool:
remove_directory(test_outputs_directory)
create_directory(test_outputs_directory)
return is_directory(test_outputs_directory)
def create_media_reader(file_path : str) -> partial[bytes]: #todo: kill this
file_buffer = io.BytesIO(open(file_path, 'rb').read())
return partial(file_buffer.read, 1024)
+9 -6
View File
@@ -12,7 +12,7 @@ from facefusion.ffprobe import probe_entries
from facefusion.filesystem import copy_file, is_image
from facefusion.temp_helper import clear_temp_directory, create_temp_directory, get_temp_file_path, resolve_temp_frame_paths
from facefusion.types import EncoderSet
from .assert_helper import create_media_reader, get_test_example_file, get_test_examples_directory, get_test_output_path, prepare_test_output_directory
from .assert_helper import get_test_example_file, get_test_examples_directory, get_test_output_path, prepare_test_output_directory
@pytest.fixture(scope = 'module', autouse = True)
@@ -216,37 +216,40 @@ def test_replace_audio() -> None:
def test_sanitize_audio() -> None:
file_path = get_test_example_file('source.wav')
file_content = open(file_path, 'rb').read()
output_paths =\
[
get_test_output_path('test-sanitize-audio-strict.mp3'),
get_test_output_path('test-sanitize-audio-moderate.wav')
]
assert sanitize_audio(create_media_reader(file_path), output_paths[0], 'strict') is True
assert sanitize_audio(file_content, output_paths[0], 'strict') is True
assert probe_entries(output_paths[0], [ 'codec_name' ]).get('codec_name') == 'mp3'
assert sanitize_audio(create_media_reader(file_path), output_paths[1], 'moderate') is True
assert sanitize_audio(file_content, output_paths[1], 'moderate') is True
assert probe_entries(output_paths[1], [ 'codec_name' ]).get('codec_name') == 'pcm_s16le'
def test_sanitize_image() -> None:
file_path = get_test_example_file('source.jpg')
file_content = open(file_path, 'rb').read()
output_path = get_test_output_path('test-sanitize-image.jpg')
assert sanitize_image(create_media_reader(file_path), output_path) is True
assert sanitize_image(file_content, output_path) is True
assert is_image(output_path) is True
def test_sanitize_video() -> None:
file_path = get_test_example_file('target-240p-h265.mp4')
file_content = open(file_path, 'rb').read()
output_paths =\
[
get_test_output_path('test-sanitize-video-strict.mp4'),
get_test_output_path('test-sanitize-video-moderate.mp4')
]
assert sanitize_video(create_media_reader(file_path), output_paths[0], 'strict') is True
assert sanitize_video(file_content, output_paths[0], 'strict') is True
assert probe_entries(output_paths[0], [ 'codec_name' ]).get('codec_name') == 'h264'
assert sanitize_video(create_media_reader(file_path), output_paths[1], 'moderate') is True
assert sanitize_video(file_content, output_paths[1], 'moderate') is True
assert probe_entries(output_paths[1], [ 'codec_name' ]).get('codec_name') == 'hevc'