mirror of
https://github.com/facefusion/facefusion.git
synced 2026-04-29 13:05:59 +02:00
Assets ffmpeg stream upload (#1069)
* ffmpeg sanitize * fix type * fix type * add config * ChunkQueue -> UploadQueue * revert assets.py * move resolve methods to ffmpeg_builder.py * Refactor ffmpeg.py * Remove partial import * improve test * remove put(None) * cleanup * without poll() not working * ChunkReader -> MediaChunkReader * improve assert by replacing generic is_file * naming and cleanup
This commit is contained in:
@@ -119,6 +119,7 @@ benchmark_cycle_count =
|
||||
[api]
|
||||
api_host =
|
||||
api_port =
|
||||
api_security_strategy =
|
||||
|
||||
[execution]
|
||||
execution_device_ids =
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
import asyncio
|
||||
import os
|
||||
import tempfile
|
||||
import queue
|
||||
import uuid
|
||||
from typing import List, Optional
|
||||
|
||||
from starlette.datastructures import UploadFile
|
||||
@@ -8,8 +10,8 @@ import facefusion.choices
|
||||
from facefusion import ffmpeg, process_manager, state_manager
|
||||
from facefusion.audio import detect_audio_duration
|
||||
from facefusion.ffprobe import detect_audio_channel_total, detect_audio_frame_total, detect_audio_sample_rate
|
||||
from facefusion.filesystem import create_directory, get_file_extension, get_file_format, get_file_name, is_audio, is_image, is_video, remove_file
|
||||
from facefusion.types import AudioMetadata, ImageMetadata, MediaType, VideoMetadata
|
||||
from facefusion.filesystem import create_directory, get_file_extension, get_file_format, is_audio, is_image, is_video
|
||||
from facefusion.types import AudioMetadata, ImageMetadata, MediaType, UploadQueue, VideoMetadata
|
||||
from facefusion.vision import count_video_frame_total, detect_image_resolution, detect_video_duration, detect_video_fps, detect_video_resolution
|
||||
|
||||
|
||||
@@ -82,40 +84,45 @@ def validate_asset_files(upload_files : List[UploadFile]) -> bool:
|
||||
return True
|
||||
|
||||
|
||||
async def feed_upload_queue(upload_file : UploadFile, upload_queue : UploadQueue) -> None:
|
||||
while file_chunk := await upload_file.read(1024):
|
||||
upload_queue.put(file_chunk)
|
||||
upload_queue.put(b'')
|
||||
|
||||
|
||||
async def save_asset_files(upload_files : List[UploadFile]) -> List[str]:
|
||||
asset_paths : List[str] = []
|
||||
api_security_strategy = state_manager.get_item('api_security_strategy')
|
||||
|
||||
for upload_file in upload_files:
|
||||
upload_file_extension = get_file_extension(upload_file.filename)
|
||||
file_format = get_file_format(upload_file.filename)
|
||||
file_extension = get_file_extension(upload_file.filename)
|
||||
media_type = detect_media_type_by_format(file_format)
|
||||
temp_path = state_manager.get_temp_path()
|
||||
|
||||
with tempfile.NamedTemporaryFile(suffix = upload_file_extension, delete = False) as temp_file:
|
||||
create_directory(temp_path)
|
||||
|
||||
while upload_chunk := await upload_file.read(1024):
|
||||
temp_file.write(upload_chunk)
|
||||
asset_file_name = uuid.uuid4().hex
|
||||
asset_path = os.path.join(temp_path, asset_file_name + file_extension)
|
||||
upload_queue : UploadQueue = queue.SimpleQueue()
|
||||
|
||||
temp_file.flush()
|
||||
process_manager.start()
|
||||
|
||||
media_type = detect_media_type_by_path(temp_file.name)
|
||||
temp_path = state_manager.get_temp_path()
|
||||
upload_task = asyncio.create_task(feed_upload_queue(upload_file, upload_queue))
|
||||
has_file_sanitized = False
|
||||
|
||||
create_directory(temp_path)
|
||||
if media_type == 'audio':
|
||||
has_file_sanitized = await asyncio.to_thread(ffmpeg.sanitize_audio, file_format, upload_queue.get, asset_path, api_security_strategy)
|
||||
if media_type == 'image':
|
||||
has_file_sanitized = await asyncio.to_thread(ffmpeg.sanitize_image, file_format, upload_queue.get, asset_path)
|
||||
if media_type == 'video':
|
||||
has_file_sanitized = await asyncio.to_thread(ffmpeg.sanitize_video, file_format, upload_queue.get, asset_path, api_security_strategy)
|
||||
|
||||
asset_file_name = get_file_name(temp_file.name)
|
||||
asset_path = os.path.join(temp_path, asset_file_name + upload_file_extension)
|
||||
await upload_task
|
||||
|
||||
process_manager.start()
|
||||
if has_file_sanitized:
|
||||
asset_paths.append(asset_path)
|
||||
|
||||
if media_type == 'audio' and ffmpeg.sanitize_audio(temp_file.name, asset_path):
|
||||
asset_paths.append(asset_path)
|
||||
|
||||
if media_type == 'image' and ffmpeg.sanitize_image(temp_file.name, asset_path):
|
||||
asset_paths.append(asset_path)
|
||||
|
||||
if media_type == 'video' and ffmpeg.sanitize_video(temp_file.name, asset_path):
|
||||
asset_paths.append(asset_path)
|
||||
|
||||
process_manager.end()
|
||||
|
||||
remove_file(temp_file.name)
|
||||
process_manager.end()
|
||||
|
||||
return asset_paths
|
||||
|
||||
@@ -78,6 +78,7 @@ def apply_args(args : Args, apply_state_item : ApplyStateItem) -> None:
|
||||
apply_state_item('benchmark_cycle_count', args.get('benchmark_cycle_count'))
|
||||
apply_state_item('api_host', args.get('api_host'))
|
||||
apply_state_item('api_port', args.get('api_port'))
|
||||
apply_state_item('api_security_strategy', args.get('api_security_strategy'))
|
||||
apply_state_item('video_memory_strategy', args.get('video_memory_strategy'))
|
||||
apply_state_item('log_level', args.get('log_level'))
|
||||
apply_state_item('halt_on_error', args.get('halt_on_error'))
|
||||
|
||||
@@ -2,7 +2,7 @@ import logging
|
||||
from typing import List, Sequence, get_args
|
||||
|
||||
from facefusion.common_helper import create_float_range, create_int_range
|
||||
from facefusion.types import Angle, AudioEncoder, AudioFormat, AudioSet, BenchmarkMode, BenchmarkResolution, BenchmarkSet, DownloadProvider, DownloadProviderSet, DownloadScope, ExecutionProvider, ExecutionProviderSet, FaceDetectorModel, FaceDetectorSet, FaceLandmarkerModel, FaceMaskArea, FaceMaskAreaSet, FaceMaskRegion, FaceMaskRegionSet, FaceMaskType, FaceOccluderModel, FaceParserModel, FaceSelectorMode, FaceSelectorOrder, Gender, ImageEncoder, ImageFormat, ImageSet, JobStatus, LogLevel, LogLevelSet, Race, Score, TempFrameFormat, VideoEncoder, VideoFormat, VideoMemoryStrategy, VideoPreset, VideoSet, VoiceExtractorModel, WorkFlow
|
||||
from facefusion.types import Angle, ApiSecurityStrategy, AudioEncoder, AudioFormat, AudioSet, BenchmarkMode, BenchmarkResolution, BenchmarkSet, DownloadProvider, DownloadProviderSet, DownloadScope, ExecutionProvider, ExecutionProviderSet, FaceDetectorModel, FaceDetectorSet, FaceLandmarkerModel, FaceMaskArea, FaceMaskAreaSet, FaceMaskRegion, FaceMaskRegionSet, FaceMaskType, FaceOccluderModel, FaceParserModel, FaceSelectorMode, FaceSelectorOrder, Gender, ImageEncoder, ImageFormat, ImageSet, JobStatus, LogLevel, LogLevelSet, Race, Score, TempFrameFormat, VideoEncoder, VideoFormat, VideoMemoryStrategy, VideoPreset, VideoSet, VoiceExtractorModel, WorkFlow
|
||||
|
||||
face_detector_set : FaceDetectorSet =\
|
||||
{
|
||||
@@ -136,6 +136,7 @@ download_providers : List[DownloadProvider] = list(get_args(DownloadProvider))
|
||||
download_scopes : List[DownloadScope] = list(get_args(DownloadScope))
|
||||
|
||||
video_memory_strategies : List[VideoMemoryStrategy] = list(get_args(VideoMemoryStrategy))
|
||||
api_security_strategies : List[ApiSecurityStrategy] = list(get_args(ApiSecurityStrategy))
|
||||
|
||||
log_level_set : LogLevelSet =\
|
||||
{
|
||||
|
||||
+60
-13
@@ -10,7 +10,7 @@ import facefusion.choices
|
||||
from facefusion import ffmpeg_builder, logger, process_manager, state_manager, translator
|
||||
from facefusion.filesystem import get_file_format, remove_file
|
||||
from facefusion.temp_helper import get_temp_file_path, get_temp_frames_pattern
|
||||
from facefusion.types import AudioBuffer, AudioEncoder, Command, EncoderSet, Fps, Resolution, UpdateProgress, VideoEncoder, VideoFormat
|
||||
from facefusion.types import ApiSecurityStrategy, AudioBuffer, AudioEncoder, Command, EncoderSet, Fps, MediaChunkReader, Resolution, UpdateProgress, VideoEncoder, VideoFormat
|
||||
from facefusion.vision import detect_video_duration, detect_video_fps, pack_resolution, predict_video_frame_total
|
||||
|
||||
|
||||
@@ -46,9 +46,28 @@ def update_progress(progress : tqdm, frame_number : int) -> None:
|
||||
|
||||
|
||||
def run_ffmpeg(commands : List[Command]) -> subprocess.Popen[bytes]:
|
||||
log_level = state_manager.get_item('log_level')
|
||||
commands = ffmpeg_builder.run(commands)
|
||||
process = subprocess.Popen(commands, stderr = subprocess.PIPE, stdout = subprocess.PIPE)
|
||||
return complete_process(process)
|
||||
|
||||
|
||||
def run_ffmpeg_with_pipe(commands : List[Command], media_chunk_reader : MediaChunkReader) -> subprocess.Popen[bytes]:
|
||||
commands = ffmpeg_builder.run(commands)
|
||||
process = subprocess.Popen(commands, stdin = subprocess.PIPE, stderr = subprocess.PIPE, stdout = subprocess.PIPE)
|
||||
|
||||
while media_chunk := media_chunk_reader():
|
||||
if process.poll() is not None:
|
||||
break
|
||||
process.stdin.write(media_chunk)
|
||||
|
||||
if process.stdin and not process.stdin.closed:
|
||||
process.stdin.close()
|
||||
|
||||
return complete_process(process)
|
||||
|
||||
|
||||
def complete_process(process : subprocess.Popen[bytes]) -> subprocess.Popen[bytes]:
|
||||
log_level = state_manager.get_item('log_level')
|
||||
|
||||
while process_manager.is_processing():
|
||||
try:
|
||||
@@ -289,34 +308,62 @@ def concat_video(output_path : str, temp_output_paths : List[str]) -> bool:
|
||||
return process.returncode == 0
|
||||
|
||||
|
||||
def sanitize_audio(temp_path : str, asset_path : str) -> bool:
|
||||
def sanitize_audio(audio_format : str, media_chunk_reader : MediaChunkReader, asset_path : str, security_strategy : ApiSecurityStrategy) -> bool:
|
||||
audio_pipe_format = ffmpeg_builder.resolve_audio_pipe_format(audio_format)
|
||||
|
||||
if security_strategy == 'strict':
|
||||
commands = ffmpeg_builder.chain(
|
||||
ffmpeg_builder.pipe_input(audio_pipe_format),
|
||||
ffmpeg_builder.deep_copy_audio(),
|
||||
ffmpeg_builder.strip_metadata(),
|
||||
ffmpeg_builder.force_output(asset_path)
|
||||
)
|
||||
return run_ffmpeg_with_pipe(commands, media_chunk_reader).returncode == 0
|
||||
|
||||
commands = ffmpeg_builder.chain(
|
||||
ffmpeg_builder.set_input(temp_path),
|
||||
ffmpeg_builder.deep_copy_audio(),
|
||||
ffmpeg_builder.pipe_input(audio_pipe_format),
|
||||
ffmpeg_builder.copy_audio_encoder(),
|
||||
ffmpeg_builder.strip_metadata(),
|
||||
ffmpeg_builder.force_output(asset_path)
|
||||
)
|
||||
return run_ffmpeg(commands).returncode == 0
|
||||
return run_ffmpeg_with_pipe(commands, media_chunk_reader).returncode == 0
|
||||
|
||||
|
||||
def sanitize_image(temp_path : str, asset_path : str) -> bool:
|
||||
def sanitize_image(image_format : str, media_chunk_reader : MediaChunkReader, asset_path : str) -> bool:
|
||||
image_pipe_format = ffmpeg_builder.resolve_image_pipe_format(image_format)
|
||||
commands = ffmpeg_builder.chain(
|
||||
ffmpeg_builder.set_input(temp_path),
|
||||
ffmpeg_builder.pipe_image(image_pipe_format),
|
||||
ffmpeg_builder.deep_copy_image(),
|
||||
ffmpeg_builder.strip_metadata(),
|
||||
ffmpeg_builder.force_output(asset_path)
|
||||
)
|
||||
return run_ffmpeg(commands).returncode == 0
|
||||
return run_ffmpeg_with_pipe(commands, media_chunk_reader).returncode == 0
|
||||
|
||||
|
||||
def sanitize_video(temp_path : str, asset_path : str) -> bool:
|
||||
def sanitize_video(video_format : str, media_chunk_reader : MediaChunkReader, asset_path : str, security_strategy : ApiSecurityStrategy) -> bool:
|
||||
video_pipe_format = ffmpeg_builder.resolve_video_pipe_format(video_format)
|
||||
|
||||
if security_strategy == 'strict':
|
||||
commands = ffmpeg_builder.chain(
|
||||
ffmpeg_builder.pipe_input(video_pipe_format),
|
||||
ffmpeg_builder.set_video_encoder('libx264'),
|
||||
ffmpeg_builder.set_video_preset('libx264', 'ultrafast'),
|
||||
ffmpeg_builder.set_pixel_format('libx264'),
|
||||
ffmpeg_builder.deep_copy_video(),
|
||||
ffmpeg_builder.deep_copy_audio(),
|
||||
ffmpeg_builder.strip_metadata(),
|
||||
ffmpeg_builder.force_output(asset_path)
|
||||
)
|
||||
return run_ffmpeg_with_pipe(commands, media_chunk_reader).returncode == 0
|
||||
|
||||
commands = ffmpeg_builder.chain(
|
||||
ffmpeg_builder.set_input(temp_path),
|
||||
ffmpeg_builder.deep_copy_video(),
|
||||
ffmpeg_builder.pipe_input(video_pipe_format),
|
||||
ffmpeg_builder.copy_video_encoder(),
|
||||
ffmpeg_builder.copy_audio_encoder(),
|
||||
ffmpeg_builder.strip_metadata(),
|
||||
ffmpeg_builder.force_output(asset_path)
|
||||
)
|
||||
return run_ffmpeg(commands).returncode == 0
|
||||
return run_ffmpeg_with_pipe(commands, media_chunk_reader).returncode == 0
|
||||
|
||||
|
||||
def fix_audio_encoder(video_format : VideoFormat, audio_encoder : AudioEncoder) -> AudioEncoder:
|
||||
|
||||
@@ -47,6 +47,14 @@ def set_input(input_path : str) -> List[Command]:
|
||||
return [ '-i', input_path ]
|
||||
|
||||
|
||||
def pipe_input(pipe_format : str) -> List[Command]:
|
||||
return [ '-f', pipe_format, '-i', 'pipe:0' ]
|
||||
|
||||
|
||||
def pipe_image(image_format : str) -> List[Command]:
|
||||
return [ '-f', 'image2pipe', '-c:v', image_format, '-i', 'pipe:0' ]
|
||||
|
||||
|
||||
def set_input_fps(input_fps : Fps) -> List[Command]:
|
||||
return [ '-r', str(input_fps) ]
|
||||
|
||||
@@ -289,3 +297,27 @@ def map_qsv_preset(video_preset : VideoPreset) -> Optional[str]:
|
||||
if video_preset in [ 'faster', 'fast', 'medium', 'slow', 'slower', 'veryslow' ]:
|
||||
return video_preset
|
||||
return None
|
||||
|
||||
|
||||
def resolve_audio_pipe_format(audio_format : str) -> str:
|
||||
if audio_format == 'm4a':
|
||||
return 'mp4'
|
||||
if audio_format == 'opus':
|
||||
return 'ogg'
|
||||
return audio_format
|
||||
|
||||
|
||||
def resolve_image_pipe_format(image_format : str) -> str:
|
||||
if image_format == 'jpeg':
|
||||
return 'mjpeg'
|
||||
return image_format
|
||||
|
||||
|
||||
def resolve_video_pipe_format(video_format : str) -> str:
|
||||
if video_format == 'mkv':
|
||||
return 'matroska'
|
||||
if video_format == 'm4v':
|
||||
return 'mp4'
|
||||
if video_format == 'wmv':
|
||||
return 'asf'
|
||||
return video_format
|
||||
|
||||
@@ -161,6 +161,7 @@ LOCALES : Locales =\
|
||||
'benchmark_cycle_count': 'specify the amount of cycles per benchmark',
|
||||
'api_host': 'specify the API host',
|
||||
'api_port': 'specify the API port',
|
||||
'api_security_strategy': 'specify the API security strategy used for sanitizing uploaded assets',
|
||||
'execution_device_ids': 'specify the devices used for processing',
|
||||
'execution_providers': 'inference using different providers (choices: {choices}, ...)',
|
||||
'execution_thread_count': 'specify the amount of parallel threads while processing',
|
||||
|
||||
@@ -822,6 +822,17 @@ def create_api_program() -> ArgumentParser:
|
||||
type = int,
|
||||
default = config.get_int_value('api', 'api_port', '8000')
|
||||
)
|
||||
capability_store.register_capability_set(
|
||||
[
|
||||
group_api.add_argument(
|
||||
'--api-security-strategy',
|
||||
help = translator.get('help.api_security_strategy'),
|
||||
default = config.get_str_value('api', 'api_security_strategy', 'strict'),
|
||||
choices = facefusion.choices.api_security_strategies
|
||||
)
|
||||
],
|
||||
scopes = [ 'api' ]
|
||||
)
|
||||
|
||||
return program
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import queue
|
||||
from collections import namedtuple
|
||||
from datetime import datetime
|
||||
from typing import Any, Callable, Dict, List, Literal, NotRequired, Optional, Tuple, TypeAlias, TypedDict, Union
|
||||
@@ -361,6 +362,9 @@ Download = TypedDict('Download',
|
||||
DownloadSet : TypeAlias = Dict[str, Download]
|
||||
|
||||
VideoMemoryStrategy = Literal['strict', 'moderate', 'tolerant']
|
||||
ApiSecurityStrategy = Literal['strict', 'moderate']
|
||||
UploadQueue : TypeAlias = queue.SimpleQueue[bytes]
|
||||
MediaChunkReader : TypeAlias = Callable[[], bytes]
|
||||
AppContext = Literal['cli', 'api']
|
||||
|
||||
InferencePool : TypeAlias = Dict[str, InferenceSession]
|
||||
@@ -451,6 +455,7 @@ StateKey = Literal\
|
||||
'api_host',
|
||||
'api_port',
|
||||
'api_key',
|
||||
'api_security_strategy',
|
||||
'job_id',
|
||||
'job_status',
|
||||
'step_index'
|
||||
@@ -522,6 +527,7 @@ State = TypedDict('State',
|
||||
'api_host' : str,
|
||||
'api_port' : int,
|
||||
'api_key' : str,
|
||||
'api_security_strategy' : ApiSecurityStrategy,
|
||||
'job_id' : str,
|
||||
'job_status' : JobStatus,
|
||||
'step_index' : int
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
import io
|
||||
import os
|
||||
import tempfile
|
||||
from functools import partial
|
||||
|
||||
from facefusion.filesystem import are_images, create_directory, is_directory, is_file, remove_directory, resolve_file_paths
|
||||
from facefusion.types import JobStatus
|
||||
@@ -46,3 +48,8 @@ def prepare_test_output_directory() -> bool:
|
||||
remove_directory(test_outputs_directory)
|
||||
create_directory(test_outputs_directory)
|
||||
return is_directory(test_outputs_directory)
|
||||
|
||||
|
||||
def create_media_reader(file_path : str) -> partial[bytes]:
|
||||
file_buffer = io.BytesIO(open(file_path, 'rb').read())
|
||||
return partial(file_buffer.read, 1024)
|
||||
|
||||
@@ -289,3 +289,37 @@ def test_delete_assets(test_client : TestClient) -> None:
|
||||
})
|
||||
|
||||
assert delete_response.status_code == 404
|
||||
|
||||
|
||||
def test_upload_asset_security_strategies(test_client : TestClient) -> None:
|
||||
source_path = get_test_example_file('source.jpg')
|
||||
target_path = get_test_example_file('target-240p.mp4')
|
||||
|
||||
for strategy in [ 'strict', 'moderate' ]:
|
||||
state_manager.init_item('api_security_strategy', strategy)
|
||||
|
||||
create_session_response = test_client.post('/session', json =
|
||||
{
|
||||
'client_version': metadata.get('version')
|
||||
})
|
||||
access_token = create_session_response.json().get('access_token')
|
||||
session_id = session_manager.find_session_id(access_token)
|
||||
|
||||
with open(source_path, 'rb') as source_file, open(target_path, 'rb') as target_file:
|
||||
upload_response = test_client.post('/assets?type=source', headers =
|
||||
{
|
||||
'Authorization': 'Bearer ' + access_token
|
||||
}, files =
|
||||
[
|
||||
('file', ('source.jpg', source_file.read(), 'image/jpeg')),
|
||||
('file', ('target.mp4', target_file.read(), 'video/mp4'))
|
||||
])
|
||||
|
||||
assert upload_response.status_code == 201
|
||||
|
||||
asset_ids = upload_response.json().get('asset_ids')
|
||||
|
||||
assert asset_store.get_asset(session_id, asset_ids[0]).get('media') == 'image'
|
||||
assert asset_store.get_asset(session_id, asset_ids[1]).get('media') == 'video'
|
||||
|
||||
state_manager.init_item('api_security_strategy', 'strict')
|
||||
|
||||
+37
-3
@@ -7,11 +7,11 @@ import pytest
|
||||
import facefusion.ffmpeg
|
||||
from facefusion import process_manager, state_manager
|
||||
from facefusion.download import conditional_download
|
||||
from facefusion.ffmpeg import concat_video, extract_frames, merge_video, read_audio_buffer, replace_audio, restore_audio, spawn_frames
|
||||
from facefusion.filesystem import copy_file
|
||||
from facefusion.ffmpeg import concat_video, extract_frames, merge_video, read_audio_buffer, replace_audio, restore_audio, sanitize_audio, sanitize_image, sanitize_video, spawn_frames
|
||||
from facefusion.filesystem import copy_file, is_audio, is_image, is_video
|
||||
from facefusion.temp_helper import clear_temp_directory, create_temp_directory, get_temp_file_path, resolve_temp_frame_paths
|
||||
from facefusion.types import EncoderSet
|
||||
from .assert_helper import get_test_example_file, get_test_examples_directory, get_test_output_path, prepare_test_output_directory
|
||||
from .assert_helper import create_media_reader, get_test_example_file, get_test_examples_directory, get_test_output_path, prepare_test_output_directory
|
||||
|
||||
|
||||
@pytest.fixture(scope = 'module', autouse = True)
|
||||
@@ -210,3 +210,37 @@ def test_replace_audio() -> None:
|
||||
clear_temp_directory(state_manager.get_temp_path(), output_path)
|
||||
|
||||
state_manager.init_item('output_audio_encoder', 'aac')
|
||||
|
||||
|
||||
def test_sanitize_audio() -> None:
|
||||
audio_path = get_test_example_file('source.mp3')
|
||||
|
||||
output_strict_path = get_test_output_path('output-strict.mp3')
|
||||
output_moderate_path = get_test_output_path('output-moderate.mp3')
|
||||
|
||||
assert sanitize_audio('mp3', create_media_reader(audio_path), output_strict_path, 'strict') is True
|
||||
assert is_audio(output_strict_path) is True
|
||||
|
||||
assert sanitize_audio('mp3', create_media_reader(audio_path), output_moderate_path, 'moderate') is True
|
||||
assert is_audio(output_moderate_path) is True
|
||||
|
||||
|
||||
def test_sanitize_image() -> None:
|
||||
source_path = get_test_example_file('source.jpg')
|
||||
output_path = get_test_output_path('output.jpg')
|
||||
|
||||
assert sanitize_image('jpeg', create_media_reader(source_path), output_path) is True
|
||||
assert is_image(output_path) is True
|
||||
|
||||
|
||||
def test_sanitize_video() -> None:
|
||||
video_path = get_test_example_file('target-240p.mp4')
|
||||
|
||||
output_strict_path = get_test_output_path('output-strict.mp4')
|
||||
output_moderate_path = get_test_output_path('output-moderate.mp4')
|
||||
|
||||
assert sanitize_video('mp4', create_media_reader(video_path), output_strict_path, 'strict') is True
|
||||
assert is_video(output_strict_path) is True
|
||||
|
||||
assert sanitize_video('mp4', create_media_reader(video_path), output_moderate_path, 'moderate') is True
|
||||
assert is_video(output_moderate_path) is True
|
||||
|
||||
Reference in New Issue
Block a user