diff --git a/facefusion/args_helper.py b/facefusion/args_helper.py index 04dff311..59988c3b 100644 --- a/facefusion/args_helper.py +++ b/facefusion/args_helper.py @@ -8,6 +8,8 @@ from facefusion.vision import detect_video_fps def apply_args(args : Args, apply_state_item : ApplyStateItem) -> None: # general apply_state_item('command', args.get('command')) + # workflow + apply_state_item('workflow', args.get('workflow')) # paths apply_state_item('temp_path', args.get('temp_path')) apply_state_item('jobs_path', args.get('jobs_path')) diff --git a/facefusion/audio.py b/facefusion/audio.py index 739fc848..86244263 100644 --- a/facefusion/audio.py +++ b/facefusion/audio.py @@ -1,5 +1,6 @@ +import math from functools import lru_cache -from typing import Any, List, Optional +from typing import Any, List, Optional, Tuple import numpy import scipy @@ -7,7 +8,8 @@ from numpy.typing import NDArray from facefusion.ffmpeg import read_audio_buffer from facefusion.filesystem import is_audio -from facefusion.types import Audio, AudioFrame, Fps, Mel, MelFilterBank, Spectrogram +from facefusion.media_helper import restrict_trim_frame +from facefusion.types import Audio, AudioFrame, Duration, Fps, Mel, MelFilterBank, Spectrogram from facefusion.voice_extractor import batch_extract_voice @@ -141,3 +143,29 @@ def create_spectrogram(audio : Audio) -> Spectrogram: spectrogram = scipy.signal.stft(audio, nperseg = mel_bin_total, nfft = mel_bin_total, noverlap = mel_bin_overlap)[2] spectrogram = numpy.dot(mel_filter_bank, numpy.abs(spectrogram)) return spectrogram + + +def count_audio_frame_total(audio_path : str, fps : Fps) -> int: + audio_duration = detect_audio_duration(audio_path) + if audio_duration > 0: + return math.ceil(audio_duration * fps) + return 0 + + +def detect_audio_duration(audio_path : str) -> Duration: + audio_sample_rate = 48000 + audio_sample_size = 16 + audio_channel_total = 2 + + if is_audio(audio_path): + audio_buffer = read_audio_buffer(audio_path, audio_sample_rate, audio_sample_size, audio_channel_total) + if audio_buffer: + audio = numpy.frombuffer(audio_buffer, dtype = numpy.int16).reshape(-1, audio_channel_total) + audio_duration = len(audio) / audio_sample_rate + return audio_duration + return 0 + + +def restrict_trim_audio_frame(audio_path : str, fps : Fps, trim_frame_start : Optional[int], trim_frame_end : Optional[int]) -> Tuple[int, int]: + audio_frame_total = count_audio_frame_total(audio_path, fps) + return restrict_trim_frame(audio_frame_total, trim_frame_start, trim_frame_end) diff --git a/facefusion/choices.py b/facefusion/choices.py index d85993c2..934ded8b 100755 --- a/facefusion/choices.py +++ b/facefusion/choices.py @@ -2,7 +2,7 @@ import logging from typing import List, Sequence from facefusion.common_helper import create_float_range, create_int_range -from facefusion.types import Angle, AudioEncoder, AudioFormat, AudioTypeSet, BenchmarkMode, BenchmarkResolution, BenchmarkSet, DownloadProvider, DownloadProviderSet, DownloadScope, EncoderSet, ExecutionProvider, ExecutionProviderSet, FaceDetectorModel, FaceDetectorSet, FaceLandmarkerModel, FaceMaskArea, FaceMaskAreaSet, FaceMaskRegion, FaceMaskRegionSet, FaceMaskType, FaceOccluderModel, FaceParserModel, FaceSelectorMode, FaceSelectorOrder, Gender, ImageFormat, ImageTypeSet, JobStatus, LogLevel, LogLevelSet, Race, Score, TempFrameFormat, VideoEncoder, VideoFormat, VideoMemoryStrategy, VideoPreset, VideoTypeSet, VoiceExtractorModel +from facefusion.types import Angle, AudioEncoder, AudioFormat, AudioTypeSet, BenchmarkMode, BenchmarkResolution, BenchmarkSet, DownloadProvider, DownloadProviderSet, DownloadScope, EncoderSet, ExecutionProvider, ExecutionProviderSet, FaceDetectorModel, FaceDetectorSet, FaceLandmarkerModel, FaceMaskArea, FaceMaskAreaSet, FaceMaskRegion, FaceMaskRegionSet, FaceMaskType, FaceOccluderModel, FaceParserModel, FaceSelectorMode, FaceSelectorOrder, Gender, ImageFormat, ImageTypeSet, JobStatus, LogLevel, LogLevelSet, Race, Score, TempFrameFormat, VideoEncoder, VideoFormat, VideoMemoryStrategy, VideoPreset, VideoTypeSet, VoiceExtractorModel, WorkFlow face_detector_set : FaceDetectorSet =\ { @@ -45,6 +45,8 @@ face_mask_regions : List[FaceMaskRegion] = list(face_mask_region_set.keys()) voice_extractor_models : List[VoiceExtractorModel] = [ 'kim_vocal_1', 'kim_vocal_2', 'uvr_mdxnet' ] +workflows : List[WorkFlow] = [ 'audio-to-image', 'image-to-image', 'image-to-video' ] + audio_type_set : AudioTypeSet =\ { 'flac': 'audio/flac', diff --git a/facefusion/core.py b/facefusion/core.py index 276ecc8b..3cd290db 100755 --- a/facefusion/core.py +++ b/facefusion/core.py @@ -12,14 +12,14 @@ from facefusion.apis.core import create_api from facefusion.args_helper import apply_args from facefusion.download import conditional_download_hashes, conditional_download_sources from facefusion.exit_helper import hard_exit, signal_exit -from facefusion.filesystem import get_file_extension, get_file_name, is_image, is_video, resolve_file_paths, resolve_file_pattern +from facefusion.filesystem import get_file_extension, get_file_name, resolve_file_paths, resolve_file_pattern from facefusion.jobs import job_helper, job_manager, job_runner from facefusion.jobs.job_list import compose_job_list from facefusion.processors.core import get_processors_modules from facefusion.program import create_program from facefusion.program_helper import validate_args from facefusion.types import Args, ErrorCode -from facefusion.workflows import image_to_image, image_to_video +from facefusion.workflows import audio_to_image, image_to_image, image_to_video def cli() -> None: @@ -332,9 +332,11 @@ def conditional_process() -> ErrorCode: if not processor_module.pre_process('output'): return 2 - if is_image(state_manager.get_item('target_path')): + if state_manager.get_item('workflow') == 'audio-to-image': + return audio_to_image.process(start_time) + if state_manager.get_item('workflow') == 'image-to-image': return image_to_image.process(start_time) - if is_video(state_manager.get_item('target_path')): + if state_manager.get_item('workflow') == 'image-to-video': return image_to_video.process(start_time) return 0 diff --git a/facefusion/media_helper.py b/facefusion/media_helper.py new file mode 100644 index 00000000..60bfa317 --- /dev/null +++ b/facefusion/media_helper.py @@ -0,0 +1,17 @@ +from typing import Optional, Tuple + + +def restrict_trim_frame(frame_total : int, trim_frame_start : Optional[int], trim_frame_end : Optional[int]) -> Tuple[int, int]: + if isinstance(trim_frame_start, int): + trim_frame_start = max(0, min(trim_frame_start, frame_total)) + if isinstance(trim_frame_end, int): + trim_frame_end = max(0, min(trim_frame_end, frame_total)) + + if isinstance(trim_frame_start, int) and isinstance(trim_frame_end, int): + return trim_frame_start, trim_frame_end + if isinstance(trim_frame_start, int): + return trim_frame_start, frame_total + if isinstance(trim_frame_end, int): + return 0, trim_frame_end + + return 0, frame_total diff --git a/facefusion/program.py b/facefusion/program.py index 33474e8a..c2542124 100755 --- a/facefusion/program.py +++ b/facefusion/program.py @@ -29,6 +29,14 @@ def create_config_path_program() -> ArgumentParser: return program +def create_workflow_program() -> ArgumentParser: + program = ArgumentParser(add_help = False) + group_paths = program.add_argument_group('paths') + group_paths.add_argument('--workflow', help = translator.get('help.workflow'), choices = facefusion.choices.workflows) + args_store.register_args([ 'workflow' ], scopes = [ 'api', 'cli' ]) + return program + + def create_temp_path_program() -> ArgumentParser: program = ArgumentParser(add_help = False) group_paths = program.add_argument_group('paths') @@ -285,7 +293,7 @@ def collect_step_program() -> ArgumentParser: def collect_job_program() -> ArgumentParser: - return ArgumentParser(parents = [ create_execution_program(), create_download_providers_program(), create_memory_program(), create_log_level_program() ], add_help = False) + return ArgumentParser(parents = [ create_workflow_program(), create_execution_program(), create_download_providers_program(), create_memory_program(), create_log_level_program() ], add_help = False) def create_program() -> ArgumentParser: @@ -306,9 +314,9 @@ def create_program() -> ArgumentParser: sub_program.add_parser('job-submit-all', help = translator.get('help.job_submit_all'), parents = [ create_jobs_path_program(), create_log_level_program(), create_halt_on_error_program() ], formatter_class = create_help_formatter_large) sub_program.add_parser('job-delete', help = translator.get('help.job_delete'), parents = [ create_job_id_program(), create_jobs_path_program(), create_log_level_program() ], formatter_class = create_help_formatter_large) sub_program.add_parser('job-delete-all', help = translator.get('help.job_delete_all'), parents = [ create_jobs_path_program(), create_log_level_program(), create_halt_on_error_program() ], formatter_class = create_help_formatter_large) - sub_program.add_parser('job-add-step', help = translator.get('help.job_add_step'), parents = [ create_job_id_program(), create_config_path_program(), create_jobs_path_program(), create_source_paths_program(), create_target_path_program(), create_output_path_program(), collect_step_program(), create_log_level_program() ], formatter_class = create_help_formatter_large) - sub_program.add_parser('job-remix-step', help = translator.get('help.job_remix_step'), parents = [ create_job_id_program(), create_step_index_program(), create_config_path_program(), create_jobs_path_program(), create_source_paths_program(), create_output_path_program(), collect_step_program(), create_log_level_program() ], formatter_class = create_help_formatter_large) - sub_program.add_parser('job-insert-step', help = translator.get('help.job_insert_step'), parents = [ create_job_id_program(), create_step_index_program(), create_config_path_program(), create_jobs_path_program(), create_source_paths_program(), create_target_path_program(), create_output_path_program(), collect_step_program(), create_log_level_program() ], formatter_class = create_help_formatter_large) + sub_program.add_parser('job-add-step', help = translator.get('help.job_add_step'), parents = [ create_job_id_program(), create_workflow_program(), create_config_path_program(), create_jobs_path_program(), create_source_paths_program(), create_target_path_program(), create_output_path_program(), collect_step_program(), create_log_level_program() ], formatter_class = create_help_formatter_large) + sub_program.add_parser('job-remix-step', help = translator.get('help.job_remix_step'), parents = [ create_job_id_program(), create_workflow_program(), create_step_index_program(), create_config_path_program(), create_jobs_path_program(), create_source_paths_program(), create_output_path_program(), collect_step_program(), create_log_level_program() ], formatter_class = create_help_formatter_large) + sub_program.add_parser('job-insert-step', help = translator.get('help.job_insert_step'), parents = [ create_job_id_program(), create_workflow_program(), create_step_index_program(), create_config_path_program(), create_jobs_path_program(), create_source_paths_program(), create_target_path_program(), create_output_path_program(), collect_step_program(), create_log_level_program() ], formatter_class = create_help_formatter_large) sub_program.add_parser('job-remove-step', help = translator.get('help.job_remove_step'), parents = [ create_job_id_program(), create_step_index_program(), create_jobs_path_program(), create_log_level_program() ], formatter_class = create_help_formatter_large) # job runner sub_program.add_parser('job-run', help = translator.get('help.job_run'), parents = [ create_job_id_program(), create_config_path_program(), create_temp_path_program(), create_jobs_path_program(), collect_job_program() ], formatter_class = create_help_formatter_large) diff --git a/facefusion/temp_helper.py b/facefusion/temp_helper.py index e190e071..9a1b6d7f 100644 --- a/facefusion/temp_helper.py +++ b/facefusion/temp_helper.py @@ -23,12 +23,12 @@ def resolve_temp_frame_paths(target_path : str) -> List[str]: def get_temp_frames_pattern(target_path : str, temp_frame_prefix : str) -> str: temp_directory_path = get_temp_directory_path(target_path) - return os.path.join(temp_directory_path, temp_frame_prefix + '.' + state_manager.get_item('temp_frame_format')) + return os.path.join(temp_directory_path, temp_frame_prefix + '.' + state_manager.get_item('temp_frame_format')) # TODO: remove state_manager.get def get_temp_directory_path(file_path : str) -> str: temp_file_name = get_file_name(file_path) - return os.path.join(state_manager.get_item('temp_path'), 'facefusion', temp_file_name) + return os.path.join(state_manager.get_item('temp_path'), 'facefusion', temp_file_name) # TODO: remove state_manager.get def create_temp_directory(file_path : str) -> bool: @@ -37,7 +37,18 @@ def create_temp_directory(file_path : str) -> bool: def clear_temp_directory(file_path : str) -> bool: - if not state_manager.get_item('keep_temp'): + if not state_manager.get_item('keep_temp'): # TODO: remove state_manager.get temp_directory_path = get_temp_directory_path(file_path) return remove_directory(temp_directory_path) return True + + +def get_temp_sequence_paths(file_path : str, frame_total : int, temp_frame_prefix : str, temp_frame_format : str) -> List[str]: + temp_directory_path = get_temp_directory_path(file_path) + temp_frame_paths = [] + + for frame_number in range(frame_total): + temp_file_name = temp_frame_prefix % (frame_number + 1) + '.' + temp_frame_format + temp_frame_path = os.path.join(temp_directory_path, temp_file_name) + temp_frame_paths.append(temp_frame_path) + return temp_frame_paths diff --git a/facefusion/types.py b/facefusion/types.py index c020a657..701229f7 100755 --- a/facefusion/types.py +++ b/facefusion/types.py @@ -55,6 +55,8 @@ Language = Literal['en'] Locales : TypeAlias = Dict[Language, Dict[str, Any]] LocalePoolSet : TypeAlias = Dict[str, Locales] +WorkFlow = Literal['audio-to-image', 'image-to-image', 'image-to-video'] + VideoCaptureSet : TypeAlias = Dict[str, cv2.VideoCapture] VideoWriterSet : TypeAlias = Dict[str, cv2.VideoWriter] CameraCaptureSet : TypeAlias = Dict[str, cv2.VideoCapture] diff --git a/facefusion/vision.py b/facefusion/vision.py index 561e374c..3a427c3a 100644 --- a/facefusion/vision.py +++ b/facefusion/vision.py @@ -8,6 +8,7 @@ from cv2.typing import Size from facefusion.common_helper import is_windows from facefusion.filesystem import get_file_extension, is_image, is_video +from facefusion.media_helper import restrict_trim_frame from facefusion.thread_helper import thread_semaphore from facefusion.types import ColorMode, Duration, Fps, Mask, Orientation, Resolution, Scale, VisionFrame from facefusion.video_manager import get_video_capture @@ -107,7 +108,8 @@ def count_video_frame_total(video_path : str) -> int: def predict_video_frame_total(video_path : str, fps : Fps, trim_frame_start : int, trim_frame_end : int) -> int: if is_video(video_path): video_fps = detect_video_fps(video_path) - extract_frame_total = count_trim_frame_total(video_path, trim_frame_start, trim_frame_end) * fps / video_fps + trim_frame_start, trim_frame_end = restrict_trim_video_frame(video_path, trim_frame_start, trim_frame_end) + extract_frame_total = (trim_frame_end - trim_frame_start) * fps / video_fps return math.floor(extract_frame_total) return 0 @@ -141,28 +143,9 @@ def detect_video_duration(video_path : str) -> Duration: return 0 -def count_trim_frame_total(video_path : str, trim_frame_start : Optional[int], trim_frame_end : Optional[int]) -> int: - trim_frame_start, trim_frame_end = restrict_trim_frame(video_path, trim_frame_start, trim_frame_end) - - return trim_frame_end - trim_frame_start - - -def restrict_trim_frame(video_path : str, trim_frame_start : Optional[int], trim_frame_end : Optional[int]) -> Tuple[int, int]: +def restrict_trim_video_frame(video_path : str, trim_frame_start : Optional[int], trim_frame_end : Optional[int]) -> Tuple[int, int]: video_frame_total = count_video_frame_total(video_path) - - if isinstance(trim_frame_start, int): - trim_frame_start = max(0, min(trim_frame_start, video_frame_total)) - if isinstance(trim_frame_end, int): - trim_frame_end = max(0, min(trim_frame_end, video_frame_total)) - - if isinstance(trim_frame_start, int) and isinstance(trim_frame_end, int): - return trim_frame_start, trim_frame_end - if isinstance(trim_frame_start, int): - return trim_frame_start, video_frame_total - if isinstance(trim_frame_end, int): - return 0, trim_frame_end - - return 0, video_frame_total + return restrict_trim_frame(video_frame_total, trim_frame_start, trim_frame_end) def detect_video_resolution(video_path : str) -> Optional[Resolution]: diff --git a/facefusion/workflows/audio_to_image.py b/facefusion/workflows/audio_to_image.py new file mode 100644 index 00000000..303832a4 --- /dev/null +++ b/facefusion/workflows/audio_to_image.py @@ -0,0 +1,160 @@ +from concurrent.futures import ThreadPoolExecutor, as_completed +from functools import partial + +import numpy +from tqdm import tqdm + +from facefusion import content_analyser, ffmpeg, logger, process_manager, state_manager, translator +from facefusion.audio import create_empty_audio_frame, get_audio_frame, get_voice_frame, restrict_trim_audio_frame +from facefusion.common_helper import get_first +from facefusion.filesystem import filter_audio_paths, is_video +from facefusion.processors.core import get_processors_modules +from facefusion.temp_helper import get_temp_sequence_paths, move_temp_file +from facefusion.time_helper import calculate_end_time +from facefusion.types import ErrorCode +from facefusion.vision import conditional_merge_vision_mask, detect_image_resolution, extract_vision_mask, pack_resolution, read_static_image, read_static_images, restrict_trim_video_frame, scale_resolution, write_image +from facefusion.workflows.core import clear, is_process_stopping, setup + + +def process(start_time : float) -> ErrorCode: + tasks =\ + [ + analyse_image, + clear, + setup, + process_image, + merge_frames, + restore_audio, + partial(finalize_video, start_time), + clear + ] + + process_manager.start() + + for task in tasks: + error_code = task() # type:ignore[operator] + + if error_code > 0: + process_manager.end() + return error_code + + process_manager.end() + return 0 + + +def analyse_image() -> ErrorCode: # TODO: reusable block + if content_analyser.analyse_image(state_manager.get_item('target_path')): + return 3 + return 0 + + +def process_image() -> ErrorCode: + state_manager.set_item('output_video_fps', 25.0) # TODO: set default fps value + source_audio_path = get_first(filter_audio_paths(state_manager.get_item('source_paths'))) + trim_frame_start, trim_frame_end = restrict_trim_audio_frame(source_audio_path, state_manager.get_item('output_video_fps'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end')) + audio_frame_total = trim_frame_end - trim_frame_start + temp_frame_paths = get_temp_sequence_paths(state_manager.get_item('output_path'), audio_frame_total, '%08d', state_manager.get_item('temp_frame_format')) + + if temp_frame_paths: + with tqdm(total = len(temp_frame_paths), desc = translator.get('processing'), unit = 'frame', ascii = ' =', disable = state_manager.get_item('log_level') in [ 'warn', 'error' ]) as progress: + progress.set_postfix(execution_providers = state_manager.get_item('execution_providers')) + + with ThreadPoolExecutor(max_workers = state_manager.get_item('execution_thread_count')) as executor: + futures = [] + + for frame_number, temp_frame_path in enumerate(temp_frame_paths): + future = executor.submit(process_temp_frame, temp_frame_path, frame_number) + futures.append(future) + + for future in as_completed(futures): + if is_process_stopping(): + for __future__ in futures: + __future__.cancel() + + if not future.cancelled(): + future.result() + progress.update() + + for processor_module in get_processors_modules(state_manager.get_item('processors')): + processor_module.post_process() + + if is_process_stopping(): + return 4 + else: + logger.error(translator.get('temp_frames_not_found'), __name__) + return 1 + return 0 + + +def process_temp_frame(temp_frame_path : str, frame_number : int) -> bool: # TODO refinement like to_video.py file. + output_video_fps = state_manager.get_item('output_video_fps') + reference_vision_frame = read_static_image(state_manager.get_item('target_path')) + source_vision_frames = read_static_images(state_manager.get_item('source_paths')) + source_audio_path = get_first(filter_audio_paths(state_manager.get_item('source_paths'))) + target_vision_frame = read_static_image(state_manager.get_item('target_path'), 'rgba') + temp_vision_frame = target_vision_frame.copy() + temp_vision_mask = extract_vision_mask(temp_vision_frame) + + source_audio_frame = get_audio_frame(source_audio_path, output_video_fps, frame_number) + source_voice_frame = get_voice_frame(source_audio_path, output_video_fps, frame_number) + + if not numpy.any(source_audio_frame): + source_audio_frame = create_empty_audio_frame() + if not numpy.any(source_voice_frame): + source_voice_frame = create_empty_audio_frame() + + for processor_module in get_processors_modules(state_manager.get_item('processors')): + temp_vision_frame, temp_vision_mask = processor_module.process_frame( + { + 'reference_vision_frame': reference_vision_frame, + 'source_vision_frames': source_vision_frames, + 'source_audio_frame': source_audio_frame, + 'source_voice_frame': source_voice_frame, + 'target_vision_frame': target_vision_frame[:, :, :3], + 'temp_vision_frame': temp_vision_frame[:, :, :3], + 'temp_vision_mask': temp_vision_mask + }) + + temp_vision_frame = conditional_merge_vision_mask(temp_vision_frame, temp_vision_mask) + return write_image(temp_frame_path, temp_vision_frame) + + +def merge_frames() -> ErrorCode: + trim_frame_start, trim_frame_end = restrict_trim_video_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end')) + output_video_resolution = scale_resolution(detect_image_resolution(state_manager.get_item('target_path')), state_manager.get_item('output_image_scale')) + + logger.info(translator.get('merging_video').format(resolution = pack_resolution(output_video_resolution), fps = state_manager.get_item('output_video_fps')), __name__) + if ffmpeg.merge_video(state_manager.get_item('target_path'), state_manager.get_item('output_path'), state_manager.get_item('output_video_fps'), output_video_resolution, trim_frame_start, trim_frame_end): + logger.debug(translator.get('merging_video_succeeded'), __name__) + else: + if is_process_stopping(): + return 4 + logger.error(translator.get('merging_video_failed'), __name__) + return 1 + return 0 + + +def restore_audio() -> ErrorCode: + if state_manager.get_item('output_audio_volume') == 0: + logger.info(translator.get('skipping_audio'), __name__) + move_temp_file(state_manager.get_item('output_path')) + else: + source_audio_path = get_first(filter_audio_paths(state_manager.get_item('source_paths'))) + if source_audio_path: + if ffmpeg.replace_audio(source_audio_path, state_manager.get_item('output_path')): + logger.debug(translator.get('replacing_audio_succeeded'), __name__) + else: + if is_process_stopping(): + return 4 + logger.warn(translator.get('replacing_audio_skipped'), __name__) + move_temp_file(state_manager.get_item('output_path')) + return 0 + + +def finalize_video(start_time : float) -> ErrorCode: + if is_video(state_manager.get_item('output_path')): + logger.info(translator.get('processing_video_succeeded').format(seconds = calculate_end_time(start_time)), __name__) + else: + logger.error(translator.get('processing_video_failed'), __name__) + return 1 + return 0 diff --git a/facefusion/workflows/image_to_video.py b/facefusion/workflows/image_to_video.py index 1414dc2a..c227ba9d 100644 --- a/facefusion/workflows/image_to_video.py +++ b/facefusion/workflows/image_to_video.py @@ -12,7 +12,7 @@ from facefusion.processors.core import get_processors_modules from facefusion.temp_helper import move_temp_file, resolve_temp_frame_paths from facefusion.time_helper import calculate_end_time from facefusion.types import ErrorCode -from facefusion.vision import conditional_merge_vision_mask, detect_video_resolution, extract_vision_mask, pack_resolution, read_static_image, read_static_images, read_static_video_frame, restrict_trim_frame, restrict_video_fps, restrict_video_resolution, scale_resolution, write_image +from facefusion.vision import conditional_merge_vision_mask, detect_video_resolution, extract_vision_mask, pack_resolution, read_static_image, read_static_images, read_static_video_frame, restrict_trim_video_frame, restrict_video_fps, restrict_video_resolution, scale_resolution, write_image from facefusion.workflows.core import clear, is_process_stopping, setup @@ -44,7 +44,7 @@ def process(start_time : float) -> ErrorCode: def analyse_video() -> ErrorCode: - trim_frame_start, trim_frame_end = restrict_trim_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end')) + trim_frame_start, trim_frame_end = restrict_trim_video_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end')) if content_analyser.analyse_video(state_manager.get_item('target_path'), trim_frame_start, trim_frame_end): return 3 @@ -52,7 +52,7 @@ def analyse_video() -> ErrorCode: def extract_frames() -> ErrorCode: - trim_frame_start, trim_frame_end = restrict_trim_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end')) + trim_frame_start, trim_frame_end = restrict_trim_video_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end')) output_video_resolution = scale_resolution(detect_video_resolution(state_manager.get_item('target_path')), state_manager.get_item('output_video_scale')) temp_video_resolution = restrict_video_resolution(state_manager.get_item('target_path'), output_video_resolution) temp_video_fps = restrict_video_fps(state_manager.get_item('target_path'), state_manager.get_item('output_video_fps')) @@ -103,7 +103,7 @@ def process_video() -> ErrorCode: def merge_frames() -> ErrorCode: - trim_frame_start, trim_frame_end = restrict_trim_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end')) + trim_frame_start, trim_frame_end = restrict_trim_video_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end')) output_video_resolution = scale_resolution(detect_video_resolution(state_manager.get_item('target_path')), state_manager.get_item('output_video_scale')) temp_video_fps = restrict_video_fps(state_manager.get_item('target_path'), state_manager.get_item('output_video_fps')) @@ -119,7 +119,7 @@ def merge_frames() -> ErrorCode: def restore_audio() -> ErrorCode: - trim_frame_start, trim_frame_end = restrict_trim_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end')) + trim_frame_start, trim_frame_end = restrict_trim_video_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end')) if state_manager.get_item('output_audio_volume') == 0: logger.info(translator.get('skipping_audio'), __name__) diff --git a/tests/test_audio.py b/tests/test_audio.py index 36faf9b0..202a649c 100644 --- a/tests/test_audio.py +++ b/tests/test_audio.py @@ -1,8 +1,9 @@ import subprocess import pytest +from pytest import approx -from facefusion.audio import get_audio_frame, read_static_audio +from facefusion.audio import detect_audio_duration, get_audio_frame, read_static_audio, restrict_trim_audio_frame from facefusion.download import conditional_download from .helper import get_test_example_file, get_test_examples_directory @@ -26,3 +27,19 @@ def test_read_static_audio() -> None: assert len(read_static_audio(get_test_example_file('source.mp3'), 25)) == 280 assert len(read_static_audio(get_test_example_file('source.wav'), 25)) == 280 assert read_static_audio('invalid', 25) is None + + +def test_detect_audio_duration() -> None: + assert detect_audio_duration(get_test_example_file('source.mp3')) == approx(3.788, rel = 1e-3) + assert detect_audio_duration(get_test_example_file('source.wav')) == approx(3.788, rel = 1e-3) + assert detect_audio_duration('invalid') == 0 + + +def test_restrict_trim_audio_frame() -> None: + assert restrict_trim_audio_frame(get_test_example_file('source.mp3'), 25, 0, 50) == (0, 50) + assert restrict_trim_audio_frame(get_test_example_file('source.mp3'), 25, 20, 95) == (20, 95) + assert restrict_trim_audio_frame(get_test_example_file('source.mp3'), 25, -10, None) == (0, 95) + assert restrict_trim_audio_frame(get_test_example_file('source.mp3'), 25, None, -10) == (0, 0) + assert restrict_trim_audio_frame(get_test_example_file('source.mp3'), 25, 100, None) == (95, 95) + assert restrict_trim_audio_frame(get_test_example_file('source.mp3'), 25, None, 100) == (0, 95) + assert restrict_trim_audio_frame(get_test_example_file('source.mp3'), 25, None, None) == (0, 95) diff --git a/tests/test_cli_age_modifier.py b/tests/test_cli_age_modifier.py index 13a23f56..bb0ec920 100644 --- a/tests/test_cli_age_modifier.py +++ b/tests/test_cli_age_modifier.py @@ -25,14 +25,14 @@ def before_each() -> None: def test_modify_age_to_image() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'age_modifier', '--age-modifier-direction', '100', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-age-face-to-image.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'age_modifier', '--age-modifier-direction', '100', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-age-face-to-image.jpg') ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-age-face-to-image.jpg') is True def test_modify_age_to_video() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'age_modifier', '--age-modifier-direction', '100', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-age-face-to-video.mp4'), '--trim-frame-end', '1' ] + commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'age_modifier', '--age-modifier-direction', '100', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-age-face-to-video.mp4'), '--trim-frame-end', '1' ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-age-face-to-video.mp4') is True diff --git a/tests/test_cli_background_remover.py b/tests/test_cli_background_remover.py index 4d854468..9a0bf856 100644 --- a/tests/test_cli_background_remover.py +++ b/tests/test_cli_background_remover.py @@ -26,14 +26,14 @@ def before_each() -> None: def test_remove_background_to_image() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'background_remover', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-remove-background-to-image.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'background_remover', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-remove-background-to-image.jpg') ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-remove-background-to-image.jpg') is True def test_remove_background_to_video() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'background_remover', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-remove-background-to-video.mp4'), '--trim-frame-end', '1' ] + commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'background_remover', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-remove-background-to-video.mp4'), '--trim-frame-end', '1' ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-remove-background-to-video.mp4') is True diff --git a/tests/test_cli_batch_runner.py b/tests/test_cli_batch_runner.py index 963e5a95..968aa431 100644 --- a/tests/test_cli_batch_runner.py +++ b/tests/test_cli_batch_runner.py @@ -26,7 +26,7 @@ def before_each() -> None: def test_batch_run_targets() -> None: - commands = [ sys.executable, 'facefusion.py', 'batch-run', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p-batch-*.jpg'), '-o', get_test_output_file('test-batch-run-targets-{index}.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'batch-run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'face_debugger', '-t', get_test_example_file('target-240p-batch-*.jpg'), '-o', get_test_output_file('test-batch-run-targets-{index}.jpg') ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-batch-run-targets-0.jpg') is True @@ -35,7 +35,7 @@ def test_batch_run_targets() -> None: def test_batch_run_sources_to_targets() -> None: - commands = [ sys.executable, 'facefusion.py', 'batch-run', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('target-240p-batch-*.jpg'), '-t', get_test_example_file('target-240p-batch-*.jpg'), '-o', get_test_output_file('test-batch-run-sources-to-targets-{index}.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'batch-run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('target-240p-batch-*.jpg'), '-t', get_test_example_file('target-240p-batch-*.jpg'), '-o', get_test_output_file('test-batch-run-sources-to-targets-{index}.jpg') ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-batch-run-sources-to-targets-0.jpg') is True diff --git a/tests/test_cli_expression_restorer.py b/tests/test_cli_expression_restorer.py index 12258c07..b782b861 100644 --- a/tests/test_cli_expression_restorer.py +++ b/tests/test_cli_expression_restorer.py @@ -25,14 +25,14 @@ def before_each() -> None: def test_restore_expression_to_image() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'expression_restorer', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-restore-expression-to-image.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'expression_restorer', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-restore-expression-to-image.jpg') ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-restore-expression-to-image.jpg') is True def test_restore_expression_to_video() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'expression_restorer', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-restore-expression-to-video.mp4'), '--trim-frame-end', '1' ] + commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'expression_restorer', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-restore-expression-to-video.mp4'), '--trim-frame-end', '1' ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-restore-expression-to-video.mp4') is True diff --git a/tests/test_cli_face_debugger.py b/tests/test_cli_face_debugger.py index 48fd1ec7..12b610c1 100644 --- a/tests/test_cli_face_debugger.py +++ b/tests/test_cli_face_debugger.py @@ -26,14 +26,14 @@ def before_each() -> None: def test_debug_face_to_image() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-debug-face-to-image.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-debug-face-to-image.jpg') ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-debug-face-to-image.jpg') is True def test_debug_face_to_video() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-debug-face-to-video.mp4'), '--trim-frame-end', '1' ] + commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-debug-face-to-video.mp4'), '--trim-frame-end', '1' ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-debug-face-to-video.mp4') is True diff --git a/tests/test_cli_face_editor.py b/tests/test_cli_face_editor.py index 75b4b537..044cdd26 100644 --- a/tests/test_cli_face_editor.py +++ b/tests/test_cli_face_editor.py @@ -26,14 +26,14 @@ def before_each() -> None: def test_edit_face_to_image() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_editor', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-edit-face-to-image.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'face_editor', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-edit-face-to-image.jpg') ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-edit-face-to-image.jpg') is True def test_edit_face_to_video() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_editor', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-edit-face-to-video.mp4'), '--trim-frame-end', '1' ] + commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'face_editor', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-edit-face-to-video.mp4'), '--trim-frame-end', '1' ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-edit-face-to-video.mp4') is True diff --git a/tests/test_cli_face_enhancer.py b/tests/test_cli_face_enhancer.py index 344f6daf..b5e80399 100644 --- a/tests/test_cli_face_enhancer.py +++ b/tests/test_cli_face_enhancer.py @@ -26,14 +26,14 @@ def before_each() -> None: def test_enhance_face_to_image() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_enhancer', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-enhance-face-to-image.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'face_enhancer', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-enhance-face-to-image.jpg') ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-enhance-face-to-image.jpg') is True def test_enhance_face_to_video() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_enhancer', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-enhance-face-to-video.mp4'), '--trim-frame-end', '1' ] + commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'face_enhancer', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-enhance-face-to-video.mp4'), '--trim-frame-end', '1' ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-enhance-face-to-video.mp4') is True diff --git a/tests/test_cli_face_swapper.py b/tests/test_cli_face_swapper.py index de59055d..36f5a154 100644 --- a/tests/test_cli_face_swapper.py +++ b/tests/test_cli_face_swapper.py @@ -26,14 +26,14 @@ def before_each() -> None: def test_swap_face_to_image() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_swapper', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-swap-face-to-image.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'face_swapper', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-swap-face-to-image.jpg') ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-swap-face-to-image.jpg') is True def test_swap_face_to_video() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_swapper', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-swap-face-to-video.mp4'), '--trim-frame-end', '1' ] + commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'face_swapper', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-swap-face-to-video.mp4'), '--trim-frame-end', '1' ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-swap-face-to-video.mp4') is True diff --git a/tests/test_cli_frame_colorizer.py b/tests/test_cli_frame_colorizer.py index 675dbb84..8cbe5ce4 100644 --- a/tests/test_cli_frame_colorizer.py +++ b/tests/test_cli_frame_colorizer.py @@ -27,14 +27,14 @@ def before_each() -> None: def test_colorize_frame_to_image() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'frame_colorizer', '-t', get_test_example_file('target-240p-0sat.jpg'), '-o', get_test_output_file('test_colorize-frame-to-image.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'frame_colorizer', '-t', get_test_example_file('target-240p-0sat.jpg'), '-o', get_test_output_file('test_colorize-frame-to-image.jpg') ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test_colorize-frame-to-image.jpg') is True def test_colorize_frame_to_video() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'frame_colorizer', '-t', get_test_example_file('target-240p-0sat.mp4'), '-o', get_test_output_file('test-colorize-frame-to-video.mp4'), '--trim-frame-end', '1' ] + commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'frame_colorizer', '-t', get_test_example_file('target-240p-0sat.mp4'), '-o', get_test_output_file('test-colorize-frame-to-video.mp4'), '--trim-frame-end', '1' ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-colorize-frame-to-video.mp4') is True diff --git a/tests/test_cli_frame_enhancer.py b/tests/test_cli_frame_enhancer.py index 1d5ba501..b5673f03 100644 --- a/tests/test_cli_frame_enhancer.py +++ b/tests/test_cli_frame_enhancer.py @@ -26,14 +26,14 @@ def before_each() -> None: def test_enhance_frame_to_image() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'frame_enhancer', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-enhance-frame-to-image.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'frame_enhancer', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-enhance-frame-to-image.jpg') ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-enhance-frame-to-image.jpg') is True def test_enhance_frame_to_video() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'frame_enhancer', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-enhance-frame-to-video.mp4'), '--trim-frame-end', '1' ] + commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'frame_enhancer', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-enhance-frame-to-video.mp4'), '--trim-frame-end', '1' ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-enhance-frame-to-video.mp4') is True diff --git a/tests/test_cli_job_manager.py b/tests/test_cli_job_manager.py index a20a2728..1ca3326a 100644 --- a/tests/test_cli_job_manager.py +++ b/tests/test_cli_job_manager.py @@ -50,7 +50,7 @@ def test_job_submit() -> None: assert subprocess.run(commands).returncode == 1 - commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-submit', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-submit', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] subprocess.run(commands) commands = [ sys.executable, 'facefusion.py', 'job-submit', 'test-job-submit', '--jobs-path', get_test_jobs_directory() ] @@ -73,10 +73,10 @@ def test_submit_all() -> None: assert subprocess.run(commands).returncode == 1 - commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-submit-all-1', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-submit-all-1', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] subprocess.run(commands) - commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-submit-all-2', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-submit-all-2', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] subprocess.run(commands) commands = [ sys.executable, 'facefusion.py', 'job-submit-all', '--jobs-path', get_test_jobs_directory(), '--halt-on-error' ] @@ -122,7 +122,7 @@ def test_job_delete_all() -> None: def test_job_add_step() -> None: - commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-add-step', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-add-step', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] assert subprocess.run(commands).returncode == 1 assert count_step_total('test-job-add-step') == 0 @@ -130,14 +130,14 @@ def test_job_add_step() -> None: commands = [ sys.executable, 'facefusion.py', 'job-create', 'test-job-add-step', '--jobs-path', get_test_jobs_directory() ] subprocess.run(commands) - commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-add-step', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-add-step', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] assert subprocess.run(commands).returncode == 0 assert count_step_total('test-job-add-step') == 1 def test_job_remix() -> None: - commands = [ sys.executable, 'facefusion.py', 'job-remix-step', 'test-job-remix-step', '0', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-remix-step', 'test-job-remix-step', '0', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] assert subprocess.run(commands).returncode == 1 assert count_step_total('test-job-remix-step') == 0 @@ -145,23 +145,23 @@ def test_job_remix() -> None: commands = [ sys.executable, 'facefusion.py', 'job-create', 'test-job-remix-step', '--jobs-path', get_test_jobs_directory() ] subprocess.run(commands) - commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-remix-step', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-remix-step', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] subprocess.run(commands) - commands = [ sys.executable, 'facefusion.py', 'job-remix-step', 'test-job-remix-step', '0', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-remix-step', 'test-job-remix-step', '0', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] assert count_step_total('test-job-remix-step') == 1 assert subprocess.run(commands).returncode == 0 assert count_step_total('test-job-remix-step') == 2 - commands = [ sys.executable, 'facefusion.py', 'job-remix-step', 'test-job-remix-step', '-1', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-remix-step', 'test-job-remix-step', '-1', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] assert subprocess.run(commands).returncode == 0 assert count_step_total('test-job-remix-step') == 3 def test_job_insert_step() -> None: - commands = [ sys.executable, 'facefusion.py', 'job-insert-step', 'test-job-insert-step', '0', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-insert-step', 'test-job-insert-step', '0', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] assert subprocess.run(commands).returncode == 1 assert count_step_total('test-job-insert-step') == 0 @@ -169,16 +169,16 @@ def test_job_insert_step() -> None: commands = [ sys.executable, 'facefusion.py', 'job-create', 'test-job-insert-step', '--jobs-path', get_test_jobs_directory() ] subprocess.run(commands) - commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-insert-step', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-insert-step', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] subprocess.run(commands) - commands = [ sys.executable, 'facefusion.py', 'job-insert-step', 'test-job-insert-step', '0', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-insert-step', 'test-job-insert-step', '0', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] assert count_step_total('test-job-insert-step') == 1 assert subprocess.run(commands).returncode == 0 assert count_step_total('test-job-insert-step') == 2 - commands = [ sys.executable, 'facefusion.py', 'job-insert-step', 'test-job-insert-step', '-1', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-insert-step', 'test-job-insert-step', '-1', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] assert subprocess.run(commands).returncode == 0 assert count_step_total('test-job-insert-step') == 3 @@ -192,7 +192,7 @@ def test_job_remove_step() -> None: commands = [ sys.executable, 'facefusion.py', 'job-create', 'test-job-remove-step', '--jobs-path', get_test_jobs_directory() ] subprocess.run(commands) - commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-remove-step', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-remove-step', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] subprocess.run(commands) subprocess.run(commands) diff --git a/tests/test_cli_job_runner.py b/tests/test_cli_job_runner.py index 23d23cba..e88588f6 100644 --- a/tests/test_cli_job_runner.py +++ b/tests/test_cli_job_runner.py @@ -33,7 +33,7 @@ def test_job_run() -> None: commands = [ sys.executable, 'facefusion.py', 'job-create', 'test-job-run', '--jobs-path', get_test_jobs_directory() ] subprocess.run(commands) - commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-run', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-run.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-run.jpg') ] subprocess.run(commands) commands = [ sys.executable, 'facefusion.py', 'job-run', 'test-job-run', '--jobs-path', get_test_jobs_directory() ] @@ -61,13 +61,13 @@ def test_job_run_all() -> None: commands = [ sys.executable, 'facefusion.py', 'job-create', 'test-job-run-all-2', '--jobs-path', get_test_jobs_directory() ] subprocess.run(commands) - commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-run-all-1', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-run-all-1.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-run-all-1', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-run-all-1.jpg') ] subprocess.run(commands) - commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-run-all-2', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-job-run-all-2.mp4'), '--trim-frame-end', '1' ] + commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-run-all-2', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-job-run-all-2.mp4'), '--trim-frame-end', '1' ] subprocess.run(commands) - commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-run-all-2', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-job-run-all-2.mp4'), '--trim-frame-start', '0', '--trim-frame-end', '1' ] + commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-run-all-2', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-job-run-all-2.mp4'), '--trim-frame-start', '0', '--trim-frame-end', '1' ] subprocess.run(commands) commands = [ sys.executable, 'facefusion.py', 'job-run-all', '--jobs-path', get_test_jobs_directory(), '--halt-on-error' ] @@ -93,7 +93,7 @@ def test_job_retry() -> None: commands = [ sys.executable, 'facefusion.py', 'job-create', 'test-job-retry', '--jobs-path', get_test_jobs_directory() ] subprocess.run(commands) - commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-retry', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-retry.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-retry', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-retry.jpg') ] subprocess.run(commands) commands = [ sys.executable, 'facefusion.py', 'job-retry', 'test-job-retry', '--jobs-path', get_test_jobs_directory() ] @@ -121,13 +121,13 @@ def test_job_retry_all() -> None: commands = [ sys.executable, 'facefusion.py', 'job-create', 'test-job-retry-all-2', '--jobs-path', get_test_jobs_directory() ] subprocess.run(commands) - commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-retry-all-1', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-retry-all-1.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-retry-all-1', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-retry-all-1.jpg') ] subprocess.run(commands) - commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-retry-all-2', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-job-retry-all-2.mp4'), '--trim-frame-end', '1' ] + commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-retry-all-2', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-job-retry-all-2.mp4'), '--trim-frame-end', '1' ] subprocess.run(commands) - commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-retry-all-2', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-job-retry-all-2.mp4'), '--trim-frame-start', '0', '--trim-frame-end', '1' ] + commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-retry-all-2', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-job-retry-all-2.mp4'), '--trim-frame-start', '0', '--trim-frame-end', '1' ] subprocess.run(commands) commands = [ sys.executable, 'facefusion.py', 'job-retry-all', '--jobs-path', get_test_jobs_directory(), '--halt-on-error' ] diff --git a/tests/test_cli_lip_syncer.py b/tests/test_cli_lip_syncer.py index b61a2559..77a27717 100644 --- a/tests/test_cli_lip_syncer.py +++ b/tests/test_cli_lip_syncer.py @@ -27,14 +27,14 @@ def before_each() -> None: def test_sync_lip_to_image() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'lip_syncer', '-s', get_test_example_file('source.mp3'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test_sync_lip_to_image.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'audio-to-image', '--processors', 'lip_syncer', '-s', get_test_example_file('source.mp3'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test_sync_lip_to_image.mp4') ] assert subprocess.run(commands).returncode == 0 - assert is_test_output_file('test_sync_lip_to_image.jpg') is True + assert is_test_output_file('test_sync_lip_to_image.mp4') is True def test_sync_lip_to_video() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'lip_syncer', '-s', get_test_example_file('source.mp3'), '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test_sync_lip_to_video.mp4'), '--trim-frame-end', '1' ] + commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'lip_syncer', '-s', get_test_example_file('source.mp3'), '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test_sync_lip_to_video.mp4'), '--trim-frame-end', '1' ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test_sync_lip_to_video.mp4') is True diff --git a/tests/test_cli_output_scale.py b/tests/test_cli_output_scale.py index 2a6ddb9d..8ae1af7e 100644 --- a/tests/test_cli_output_scale.py +++ b/tests/test_cli_output_scale.py @@ -36,7 +36,7 @@ def before_each() -> None: ]) def test_output_image_scale(output_image_scale : Scale, output_image_resolution : Resolution) -> None: output_file_path = get_test_output_file('test-output-image-scale-' + str(output_image_scale) + '.jpg') - commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'frame_enhancer', '-t', get_test_example_file('target-240p.jpg'), '-o', output_file_path, '--output-image-scale', str(output_image_scale) ] + commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'frame_enhancer', '-t', get_test_example_file('target-240p.jpg'), '-o', output_file_path, '--output-image-scale', str(output_image_scale) ] assert subprocess.run(commands).returncode == 0 assert detect_image_resolution(output_file_path) == output_image_resolution @@ -51,7 +51,7 @@ def test_output_image_scale(output_image_scale : Scale, output_image_resolution ]) def test_output_video_scale(output_video_scale : Scale, output_video_resolution : Resolution) -> None: output_file_path = get_test_output_file('test-output-video-scale-' + str(output_video_scale) + '.mp4') - commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'frame_enhancer', '-t', get_test_example_file('target-240p.mp4'), '-o', output_file_path, '--trim-frame-end', '1', '--output-video-scale', str(output_video_scale) ] + commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'frame_enhancer', '-t', get_test_example_file('target-240p.mp4'), '-o', output_file_path, '--trim-frame-end', '1', '--output-video-scale', str(output_video_scale) ] assert subprocess.run(commands).returncode == 0 assert detect_video_resolution(output_file_path) == output_video_resolution diff --git a/tests/test_vision.py b/tests/test_vision.py index a440bab7..525f0f80 100644 --- a/tests/test_vision.py +++ b/tests/test_vision.py @@ -3,7 +3,7 @@ import subprocess import pytest from facefusion.download import conditional_download -from facefusion.vision import calculate_histogram_difference, count_trim_frame_total, count_video_frame_total, detect_image_resolution, detect_video_duration, detect_video_fps, detect_video_resolution, match_frame_color, normalize_resolution, pack_resolution, predict_video_frame_total, read_image, read_video_frame, restrict_image_resolution, restrict_trim_frame, restrict_video_fps, restrict_video_resolution, scale_resolution, unpack_resolution, write_image +from facefusion.vision import calculate_histogram_difference, count_video_frame_total, detect_image_resolution, detect_video_duration, detect_video_fps, detect_video_resolution, match_frame_color, normalize_resolution, pack_resolution, predict_video_frame_total, read_image, read_video_frame, restrict_image_resolution, restrict_trim_video_frame, restrict_video_fps, restrict_video_resolution, scale_resolution, unpack_resolution, write_image from .helper import get_test_example_file, get_test_examples_directory, get_test_output_file, prepare_test_output_directory @@ -97,24 +97,14 @@ def test_detect_video_duration() -> None: assert detect_video_duration('invalid') == 0 -def test_count_trim_frame_total() -> None: - assert count_trim_frame_total(get_test_example_file('target-240p.mp4'), 0, 200) == 200 - assert count_trim_frame_total(get_test_example_file('target-240p.mp4'), 70, 270) == 200 - assert count_trim_frame_total(get_test_example_file('target-240p.mp4'), -10, None) == 270 - assert count_trim_frame_total(get_test_example_file('target-240p.mp4'), None, -10) == 0 - assert count_trim_frame_total(get_test_example_file('target-240p.mp4'), 280, None) == 0 - assert count_trim_frame_total(get_test_example_file('target-240p.mp4'), None, 280) == 270 - assert count_trim_frame_total(get_test_example_file('target-240p.mp4'), None, None) == 270 - - def test_restrict_trim_frame() -> None: - assert restrict_trim_frame(get_test_example_file('target-240p.mp4'), 0, 200) == (0, 200) - assert restrict_trim_frame(get_test_example_file('target-240p.mp4'), 70, 270) == (70, 270) - assert restrict_trim_frame(get_test_example_file('target-240p.mp4'), -10, None) == (0, 270) - assert restrict_trim_frame(get_test_example_file('target-240p.mp4'), None, -10) == (0, 0) - assert restrict_trim_frame(get_test_example_file('target-240p.mp4'), 280, None) == (270, 270) - assert restrict_trim_frame(get_test_example_file('target-240p.mp4'), None, 280) == (0, 270) - assert restrict_trim_frame(get_test_example_file('target-240p.mp4'), None, None) == (0, 270) + assert restrict_trim_video_frame(get_test_example_file('target-240p.mp4'), 0, 200) == (0, 200) + assert restrict_trim_video_frame(get_test_example_file('target-240p.mp4'), 70, 270) == (70, 270) + assert restrict_trim_video_frame(get_test_example_file('target-240p.mp4'), -10, None) == (0, 270) + assert restrict_trim_video_frame(get_test_example_file('target-240p.mp4'), None, -10) == (0, 0) + assert restrict_trim_video_frame(get_test_example_file('target-240p.mp4'), 280, None) == (270, 270) + assert restrict_trim_video_frame(get_test_example_file('target-240p.mp4'), None, 280) == (0, 270) + assert restrict_trim_video_frame(get_test_example_file('target-240p.mp4'), None, None) == (0, 270) def test_detect_video_resolution() -> None: