From 23f865640a3e3d2bc608484cd026937608382907 Mon Sep 17 00:00:00 2001 From: harisreedhar Date: Tue, 16 Dec 2025 14:30:01 +0530 Subject: [PATCH] image to video as sequence --- facefusion/choices.py | 2 +- facefusion/core.py | 12 +- facefusion/filesystem.py | 7 + facefusion/jobs/job_helper.py | 2 + facefusion/jobs/job_runner.py | 31 ++++- facefusion/locales.py | 2 + .../processors/modules/age_modifier/core.py | 7 +- .../modules/background_remover/core.py | 7 +- .../processors/modules/deep_swapper/core.py | 7 +- .../modules/expression_restorer/core.py | 7 +- .../processors/modules/face_debugger/core.py | 7 +- .../processors/modules/face_editor/core.py | 7 +- .../processors/modules/face_enhancer/core.py | 7 +- .../processors/modules/face_swapper/core.py | 7 +- .../modules/frame_colorizer/core.py | 7 +- .../processors/modules/frame_enhancer/core.py | 7 +- facefusion/types.py | 2 +- facefusion/workflows/audio_to_image.py | 5 +- facefusion/workflows/core.py | 91 +------------ facefusion/workflows/image_to_video.py | 33 +---- .../workflows/image_to_video_as_sequence.py | 54 ++++++++ facefusion/workflows/to_video.py | 109 ++++++++++++++++ tests/helper.py | 10 +- tests/test_cli_age_modifier.py | 13 +- tests/test_cli_background_remover.py | 15 ++- tests/test_cli_batch_runner.py | 6 +- tests/test_cli_expression_restorer.py | 13 +- tests/test_cli_face_debugger.py | 13 +- tests/test_cli_face_editor.py | 13 +- tests/test_cli_face_enhancer.py | 13 +- tests/test_cli_face_swapper.py | 13 +- tests/test_cli_frame_colorizer.py | 13 +- tests/test_cli_frame_enhancer.py | 13 +- tests/test_cli_job_manager.py | 30 ++--- tests/test_cli_job_runner.py | 18 +-- tests/test_cli_lip_syncer.py | 13 +- tests/test_cli_output_scale.py | 6 +- tests/test_ffmpeg.py | 48 +++---- tests/test_job_manager.py | 61 +++++++-- tests/test_job_runner.py | 122 +++++++++++++----- tests/test_vision.py | 6 +- 41 files changed, 570 insertions(+), 289 deletions(-) create mode 100644 facefusion/workflows/image_to_video_as_sequence.py create mode 100644 facefusion/workflows/to_video.py diff --git a/facefusion/choices.py b/facefusion/choices.py index 041ca20e..b492517a 100755 --- a/facefusion/choices.py +++ b/facefusion/choices.py @@ -45,7 +45,7 @@ face_mask_regions : List[FaceMaskRegion] = list(face_mask_region_set.keys()) voice_extractor_models : List[VoiceExtractorModel] = [ 'kim_vocal_1', 'kim_vocal_2', 'uvr_mdxnet' ] -workflows : List[WorkFlow] = [ 'auto', 'audio-to-image', 'image-to-image', 'image-to-video' ] +workflows : List[WorkFlow] = [ 'auto', 'audio-to-image', 'image-to-image', 'image-to-video', 'image-to-video-as-sequence' ] audio_type_set : AudioTypeSet =\ { diff --git a/facefusion/core.py b/facefusion/core.py index 10d8127b..a7115c4a 100755 --- a/facefusion/core.py +++ b/facefusion/core.py @@ -12,15 +12,15 @@ from facefusion.apis.core import create_api from facefusion.args_helper import apply_args from facefusion.download import conditional_download_hashes, conditional_download_sources from facefusion.exit_helper import hard_exit, signal_exit -from facefusion.filesystem import get_file_extension, get_file_name, resolve_file_paths, resolve_file_pattern -from facefusion.filesystem import has_audio, has_image, has_video +from facefusion.filesystem import get_file_extension, has_audio, has_image, has_video +from facefusion.filesystem import get_file_name, resolve_file_paths, resolve_file_pattern from facefusion.jobs import job_helper, job_manager, job_runner from facefusion.jobs.job_list import compose_job_list from facefusion.processors.core import get_processors_modules from facefusion.program import create_program from facefusion.program_helper import validate_args from facefusion.types import Args, ErrorCode, WorkFlow -from facefusion.workflows import audio_to_image, image_to_image, image_to_video +from facefusion.workflows import audio_to_image, image_to_image, image_to_video, image_to_video_as_sequence def cli() -> None: @@ -342,13 +342,17 @@ def conditional_process() -> ErrorCode: return image_to_image.process(start_time) if state_manager.get_item('workflow') == 'image-to-video': return image_to_video.process(start_time) + if state_manager.get_item('workflow') == 'image-to-video-as-sequence': + return image_to_video_as_sequence.process(start_time) return 0 def detect_workflow() -> WorkFlow: if has_video([ state_manager.get_item('target_path') ]): - return 'image-to-video' + if get_file_extension(state_manager.get_item('output_path')): + return 'image-to-video' + return 'image-to-video-as-sequence' if has_audio(state_manager.get_item('source_paths')) and has_image([ state_manager.get_item('target_path') ]): return 'audio-to-image' diff --git a/facefusion/filesystem.py b/facefusion/filesystem.py index 42bfe727..22d51439 100644 --- a/facefusion/filesystem.py +++ b/facefusion/filesystem.py @@ -170,6 +170,13 @@ def create_directory(directory_path : str) -> bool: return False +def move_directory(directory_path : str, move_path : str) -> bool: + if is_directory(directory_path): + shutil.move(directory_path, move_path) + return is_directory(move_path) + return False + + def remove_directory(directory_path : str) -> bool: if is_directory(directory_path): shutil.rmtree(directory_path, ignore_errors = True) diff --git a/facefusion/jobs/job_helper.py b/facefusion/jobs/job_helper.py index d7e90218..71b1c00c 100644 --- a/facefusion/jobs/job_helper.py +++ b/facefusion/jobs/job_helper.py @@ -13,6 +13,8 @@ def get_step_output_path(job_id : str, step_index : int, output_path : str) -> O if output_file_name and output_file_extension: return os.path.join(output_directory_path, output_file_name + '-' + job_id + '-' + str(step_index) + output_file_extension) + if output_file_path and output_directory_path: + return os.path.join(output_directory_path, output_file_path + '-' + job_id + '-' + str(step_index)) return None diff --git a/facefusion/jobs/job_runner.py b/facefusion/jobs/job_runner.py index 23a0e38b..8a05d38a 100644 --- a/facefusion/jobs/job_runner.py +++ b/facefusion/jobs/job_runner.py @@ -1,5 +1,7 @@ +import os + from facefusion.ffmpeg import concat_video -from facefusion.filesystem import are_images, are_videos, move_file, remove_file +from facefusion.filesystem import are_images, are_videos, copy_file, create_directory, is_directory, is_file, move_directory, move_file, remove_directory, remove_file, resolve_file_paths from facefusion.jobs import job_helper, job_manager from facefusion.types import JobOutputSet, JobStep, ProcessStep @@ -59,6 +61,8 @@ def run_step(job_id : str, step_index : int, step : JobStep, process_step : Proc output_path = step_args.get('output_path') step_output_path = job_helper.get_step_output_path(job_id, step_index, output_path) + if is_directory(output_path): + return move_directory(output_path, step_output_path) and job_manager.set_step_status(job_id, step_index, 'completed') return move_file(output_path, step_output_path) and job_manager.set_step_status(job_id, step_index, 'completed') job_manager.set_step_status(job_id, step_index, 'failed') return False @@ -79,13 +83,26 @@ def finalize_steps(job_id : str) -> bool: output_set = collect_output_set(job_id) for output_path, temp_output_paths in output_set.items(): - if are_videos(temp_output_paths): + has_videos = are_videos(temp_output_paths) + has_images = are_images(temp_output_paths) + + if has_videos: if not concat_video(output_path, temp_output_paths): return False - if are_images(temp_output_paths): + if not has_videos and has_images: for temp_output_path in temp_output_paths: if not move_file(temp_output_path, output_path): return False + if not has_videos and not has_images: + if not create_directory(output_path): + return False + + for temp_output_path in temp_output_paths: + if is_directory(temp_output_path): + temp_frame_paths = resolve_file_paths(temp_output_path) + for temp_frame_path in temp_frame_paths: + if not copy_file(temp_frame_path, os.path.join(output_path, os.path.basename(temp_frame_path))): + return False return True @@ -94,8 +111,12 @@ def clean_steps(job_id: str) -> bool: for temp_output_paths in output_set.values(): for temp_output_path in temp_output_paths: - if not remove_file(temp_output_path): - return False + if is_file(temp_output_path): + if not remove_file(temp_output_path): + return False + if is_directory(temp_output_path): + if not remove_directory(temp_output_path): + return False return True diff --git a/facefusion/locales.py b/facefusion/locales.py index cb796e1d..4000dff3 100644 --- a/facefusion/locales.py +++ b/facefusion/locales.py @@ -40,6 +40,8 @@ LOCALES : Locales =\ 'processing_stopped': 'processing stopped', 'processing_image_succeeded': 'processing to image succeeded in {seconds} seconds', 'processing_image_failed': 'processing to image failed', + 'processing_sequence_succeeded': 'processing to sequence succeeded in {seconds} seconds', + 'processing_sequence_failed': 'processing to sequence failed', 'processing_video_succeeded': 'processing to video succeeded in {seconds} seconds', 'processing_video_failed': 'processing to video failed', 'choose_image_source': 'choose an image for the source', diff --git a/facefusion/processors/modules/age_modifier/core.py b/facefusion/processors/modules/age_modifier/core.py index d6929a3d..66968350 100755 --- a/facefusion/processors/modules/age_modifier/core.py +++ b/facefusion/processors/modules/age_modifier/core.py @@ -108,9 +108,10 @@ def pre_process(mode : ProcessMode) -> bool: if mode in [ 'output', 'preview' ] and not is_image(state_manager.get_item('target_path')) and not is_video(state_manager.get_item('target_path')): logger.error(translator.get('choose_image_or_video_target') + translator.get('exclamation_mark'), __name__) return False - if mode == 'output' and not in_directory(state_manager.get_item('output_path')): - logger.error(translator.get('specify_image_or_video_output') + translator.get('exclamation_mark'), __name__) - return False + if state_manager.get_item('workflow') in [ 'audio-to-image', 'image-to-image', 'image-to-video' ]: + if mode == 'output' and not in_directory(state_manager.get_item('output_path')): + logger.error(translator.get('specify_image_or_video_output') + translator.get('exclamation_mark'), __name__) + return False return True diff --git a/facefusion/processors/modules/background_remover/core.py b/facefusion/processors/modules/background_remover/core.py index 5db96c27..318912f3 100644 --- a/facefusion/processors/modules/background_remover/core.py +++ b/facefusion/processors/modules/background_remover/core.py @@ -441,9 +441,10 @@ def pre_process(mode : ProcessMode) -> bool: if mode in [ 'output', 'preview' ] and not is_image(state_manager.get_item('target_path')) and not is_video(state_manager.get_item('target_path')): logger.error(translator.get('choose_image_or_video_target') + translator.get('exclamation_mark'), __name__) return False - if mode == 'output' and not in_directory(state_manager.get_item('output_path')): - logger.error(translator.get('specify_image_or_video_output') + translator.get('exclamation_mark'), __name__) - return False + if state_manager.get_item('workflow') in [ 'audio-to-image', 'image-to-image', 'image-to-video' ]: + if mode == 'output' and not in_directory(state_manager.get_item('output_path')): + logger.error(translator.get('specify_image_or_video_output') + translator.get('exclamation_mark'), __name__) + return False return True diff --git a/facefusion/processors/modules/deep_swapper/core.py b/facefusion/processors/modules/deep_swapper/core.py index 2d9ce0a4..e76e593f 100755 --- a/facefusion/processors/modules/deep_swapper/core.py +++ b/facefusion/processors/modules/deep_swapper/core.py @@ -299,9 +299,10 @@ def pre_process(mode : ProcessMode) -> bool: if mode in [ 'output', 'preview' ] and not is_image(state_manager.get_item('target_path')) and not is_video(state_manager.get_item('target_path')): logger.error(translator.get('choose_image_or_video_target') + translator.get('exclamation_mark'), __name__) return False - if mode == 'output' and not in_directory(state_manager.get_item('output_path')): - logger.error(translator.get('specify_image_or_video_output') + translator.get('exclamation_mark'), __name__) - return False + if state_manager.get_item('workflow') in [ 'audio-to-image', 'image-to-image', 'image-to-video' ]: + if mode == 'output' and not in_directory(state_manager.get_item('output_path')): + logger.error(translator.get('specify_image_or_video_output') + translator.get('exclamation_mark'), __name__) + return False return True diff --git a/facefusion/processors/modules/expression_restorer/core.py b/facefusion/processors/modules/expression_restorer/core.py index b53776ad..f46ff5d7 100755 --- a/facefusion/processors/modules/expression_restorer/core.py +++ b/facefusion/processors/modules/expression_restorer/core.py @@ -125,9 +125,10 @@ def pre_process(mode : ProcessMode) -> bool: if mode in [ 'output', 'preview' ] and not is_image(state_manager.get_item('target_path')) and not is_video(state_manager.get_item('target_path')): logger.error(translator.get('choose_image_or_video_target') + translator.get('exclamation_mark'), __name__) return False - if mode == 'output' and not in_directory(state_manager.get_item('output_path')): - logger.error(translator.get('specify_image_or_video_output') + translator.get('exclamation_mark'), __name__) - return False + if state_manager.get_item('workflow') in [ 'audio-to-image', 'image-to-image', 'image-to-video' ]: + if mode == 'output' and not in_directory(state_manager.get_item('output_path')): + logger.error(translator.get('specify_image_or_video_output') + translator.get('exclamation_mark'), __name__) + return False return True diff --git a/facefusion/processors/modules/face_debugger/core.py b/facefusion/processors/modules/face_debugger/core.py index fe7f123b..aef6ad35 100755 --- a/facefusion/processors/modules/face_debugger/core.py +++ b/facefusion/processors/modules/face_debugger/core.py @@ -46,9 +46,10 @@ def pre_process(mode : ProcessMode) -> bool: if mode in [ 'output', 'preview' ] and not is_image(state_manager.get_item('target_path')) and not is_video(state_manager.get_item('target_path')): logger.error(translator.get('choose_image_or_video_target') + translator.get('exclamation_mark'), __name__) return False - if mode == 'output' and not in_directory(state_manager.get_item('output_path')): - logger.error(translator.get('specify_image_or_video_output') + translator.get('exclamation_mark'), __name__) - return False + if state_manager.get_item('workflow') in [ 'audio-to-image', 'image-to-image', 'image-to-video' ]: + if mode == 'output' and not in_directory(state_manager.get_item('output_path')): + logger.error(translator.get('specify_image_or_video_output') + translator.get('exclamation_mark'), __name__) + return False return True diff --git a/facefusion/processors/modules/face_editor/core.py b/facefusion/processors/modules/face_editor/core.py index 56430edb..30504c18 100755 --- a/facefusion/processors/modules/face_editor/core.py +++ b/facefusion/processors/modules/face_editor/core.py @@ -176,9 +176,10 @@ def pre_process(mode : ProcessMode) -> bool: if mode in [ 'output', 'preview' ] and not is_image(state_manager.get_item('target_path')) and not is_video(state_manager.get_item('target_path')): logger.error(translator.get('choose_image_or_video_target') + translator.get('exclamation_mark'), __name__) return False - if mode == 'output' and not in_directory(state_manager.get_item('output_path')): - logger.error(translator.get('specify_image_or_video_output') + translator.get('exclamation_mark'), __name__) - return False + if state_manager.get_item('workflow') in [ 'audio-to-image', 'image-to-image', 'image-to-video' ]: + if mode == 'output' and not in_directory(state_manager.get_item('output_path')): + logger.error(translator.get('specify_image_or_video_output') + translator.get('exclamation_mark'), __name__) + return False return True diff --git a/facefusion/processors/modules/face_enhancer/core.py b/facefusion/processors/modules/face_enhancer/core.py index 97df33fd..332be23c 100755 --- a/facefusion/processors/modules/face_enhancer/core.py +++ b/facefusion/processors/modules/face_enhancer/core.py @@ -315,9 +315,10 @@ def pre_process(mode : ProcessMode) -> bool: if mode in [ 'output', 'preview' ] and not is_image(state_manager.get_item('target_path')) and not is_video(state_manager.get_item('target_path')): logger.error(translator.get('choose_image_or_video_target') + translator.get('exclamation_mark'), __name__) return False - if mode == 'output' and not in_directory(state_manager.get_item('output_path')): - logger.error(translator.get('specify_image_or_video_output') + translator.get('exclamation_mark'), __name__) - return False + if state_manager.get_item('workflow') in [ 'audio-to-image', 'image-to-image', 'image-to-video' ]: + if mode == 'output' and not in_directory(state_manager.get_item('output_path')): + logger.error(translator.get('specify_image_or_video_output') + translator.get('exclamation_mark'), __name__) + return False return True diff --git a/facefusion/processors/modules/face_swapper/core.py b/facefusion/processors/modules/face_swapper/core.py index a9c3e3a4..9730cf56 100755 --- a/facefusion/processors/modules/face_swapper/core.py +++ b/facefusion/processors/modules/face_swapper/core.py @@ -551,9 +551,10 @@ def pre_process(mode : ProcessMode) -> bool: logger.error(translator.get('choose_image_or_video_target') + translator.get('exclamation_mark'), __name__) return False - if mode == 'output' and not in_directory(state_manager.get_item('output_path')): - logger.error(translator.get('specify_image_or_video_output') + translator.get('exclamation_mark'), __name__) - return False + if state_manager.get_item('workflow') in [ 'audio-to-image', 'image-to-image', 'image-to-video' ]: + if mode == 'output' and not in_directory(state_manager.get_item('output_path')): + logger.error(translator.get('specify_image_or_video_output') + translator.get('exclamation_mark'), __name__) + return False return True diff --git a/facefusion/processors/modules/frame_colorizer/core.py b/facefusion/processors/modules/frame_colorizer/core.py index 8a3f3bc6..fd789f1f 100644 --- a/facefusion/processors/modules/frame_colorizer/core.py +++ b/facefusion/processors/modules/frame_colorizer/core.py @@ -207,9 +207,10 @@ def pre_process(mode : ProcessMode) -> bool: if mode in [ 'output', 'preview' ] and not is_image(state_manager.get_item('target_path')) and not is_video(state_manager.get_item('target_path')): logger.error(translator.get('choose_image_or_video_target') + translator.get('exclamation_mark'), __name__) return False - if mode == 'output' and not in_directory(state_manager.get_item('output_path')): - logger.error(translator.get('specify_image_or_video_output') + translator.get('exclamation_mark'), __name__) - return False + if state_manager.get_item('workflow') in [ 'audio-to-image', 'image-to-image', 'image-to-video' ]: + if mode == 'output' and not in_directory(state_manager.get_item('output_path')): + logger.error(translator.get('specify_image_or_video_output') + translator.get('exclamation_mark'), __name__) + return False return True diff --git a/facefusion/processors/modules/frame_enhancer/core.py b/facefusion/processors/modules/frame_enhancer/core.py index ea97ecf9..b48f3245 100644 --- a/facefusion/processors/modules/frame_enhancer/core.py +++ b/facefusion/processors/modules/frame_enhancer/core.py @@ -594,9 +594,10 @@ def pre_process(mode : ProcessMode) -> bool: if mode in [ 'output', 'preview' ] and not is_image(state_manager.get_item('target_path')) and not is_video(state_manager.get_item('target_path')): logger.error(translator.get('choose_image_or_video_target') + translator.get('exclamation_mark'), __name__) return False - if mode == 'output' and not in_directory(state_manager.get_item('output_path')): - logger.error(translator.get('specify_image_or_video_output') + translator.get('exclamation_mark'), __name__) - return False + if state_manager.get_item('workflow') in [ 'audio-to-image', 'image-to-image', 'image-to-video' ]: + if mode == 'output' and not in_directory(state_manager.get_item('output_path')): + logger.error(translator.get('specify_image_or_video_output') + translator.get('exclamation_mark'), __name__) + return False return True diff --git a/facefusion/types.py b/facefusion/types.py index d0815a76..fb2b8212 100755 --- a/facefusion/types.py +++ b/facefusion/types.py @@ -55,7 +55,7 @@ Language = Literal['en'] Locales : TypeAlias = Dict[Language, Dict[str, Any]] LocalePoolSet : TypeAlias = Dict[str, Locales] -WorkFlow = Literal['auto', 'audio-to-image', 'image-to-image', 'image-to-video'] +WorkFlow = Literal['auto', 'audio-to-image', 'image-to-image', 'image-to-video', 'image-to-video-as-sequence'] VideoCaptureSet : TypeAlias = Dict[str, cv2.VideoCapture] VideoWriterSet : TypeAlias = Dict[str, cv2.VideoWriter] diff --git a/facefusion/workflows/audio_to_image.py b/facefusion/workflows/audio_to_image.py index ccd7a506..d76f5657 100644 --- a/facefusion/workflows/audio_to_image.py +++ b/facefusion/workflows/audio_to_image.py @@ -6,7 +6,8 @@ from facefusion.common_helper import get_first from facefusion.filesystem import filter_audio_paths from facefusion.types import ErrorCode from facefusion.vision import detect_image_resolution, restrict_image_resolution, scale_resolution -from facefusion.workflows.core import analyse_image, clear, finalize_video, is_process_stopping, merge_frames, process_video, restore_audio, setup +from facefusion.workflows.core import analyse_image, clear, is_process_stopping, process_frames, setup +from facefusion.workflows.to_video import finalize_video, merge_frames, restore_audio def process(start_time : float) -> ErrorCode: @@ -16,7 +17,7 @@ def process(start_time : float) -> ErrorCode: clear, setup, create_temp_frames, - process_video, + process_frames, merge_frames, restore_audio, partial(finalize_video, start_time), diff --git a/facefusion/workflows/core.py b/facefusion/workflows/core.py index 13a58127..63db8984 100644 --- a/facefusion/workflows/core.py +++ b/facefusion/workflows/core.py @@ -3,16 +3,14 @@ from concurrent.futures import ThreadPoolExecutor, as_completed import numpy from tqdm import tqdm -from facefusion import content_analyser, ffmpeg, logger, process_manager, state_manager, translator, video_manager +from facefusion import content_analyser, logger, process_manager, state_manager, translator from facefusion.audio import create_empty_audio_frame, get_audio_frame, get_voice_frame from facefusion.common_helper import get_first -from facefusion.filesystem import filter_audio_paths, is_video -from facefusion.media_helper import restrict_trim_frame +from facefusion.filesystem import filter_audio_paths from facefusion.processors.core import get_processors_modules -from facefusion.temp_helper import clear_temp_directory, create_temp_directory, move_temp_file, resolve_temp_frame_paths -from facefusion.time_helper import calculate_end_time -from facefusion.types import AudioFrame, ErrorCode, Fps, Resolution, VisionFrame -from facefusion.vision import conditional_merge_vision_mask, detect_image_resolution, detect_video_resolution, extract_vision_mask, pack_resolution, read_static_image, read_static_images, read_static_video_frame, restrict_video_fps, scale_resolution, write_image +from facefusion.temp_helper import clear_temp_directory, create_temp_directory, resolve_temp_frame_paths +from facefusion.types import AudioFrame, ErrorCode, VisionFrame +from facefusion.vision import conditional_merge_vision_mask, extract_vision_mask, read_static_image, read_static_images, read_static_video_frame, restrict_video_fps, write_image def is_process_stopping() -> bool: @@ -71,28 +69,11 @@ def conditional_get_source_voice_frame(frame_number: int) -> AudioFrame: def conditional_get_reference_vision_frame() -> VisionFrame: - if state_manager.get_item('workflow') == 'image-to-video': + if state_manager.get_item('workflow') in [ 'image-to-video', 'image-to-video-as-sequence' ]: return read_static_video_frame(state_manager.get_item('target_path'), state_manager.get_item('reference_frame_number')) return read_static_image(state_manager.get_item('target_path')) -def conditional_scale_resolution() -> Resolution: - if state_manager.get_item('workflow') == 'image-to-video': - return scale_resolution(detect_video_resolution(state_manager.get_item('target_path')), state_manager.get_item('output_video_scale')) - return scale_resolution(detect_image_resolution(state_manager.get_item('target_path')), state_manager.get_item('output_video_scale')) - - -def conditional_restrict_video_fps() -> Fps: - if state_manager.get_item('workflow') == 'image-to-video': - return restrict_video_fps(state_manager.get_item('target_path'), state_manager.get_item('output_video_fps')) - return state_manager.get_item('output_video_fps') - - -def conditional_clear_video_pool() -> None: - if state_manager.get_item('workflow') == 'image-to-video': - video_manager.clear_video_pool() - - def process_temp_frame(temp_frame_path : str, frame_number : int) -> bool: reference_vision_frame = conditional_get_reference_vision_frame() source_vision_frames = read_static_images(state_manager.get_item('source_paths')) @@ -118,7 +99,7 @@ def process_temp_frame(temp_frame_path : str, frame_number : int) -> bool: return write_image(temp_frame_path, temp_vision_frame) -def process_video() -> ErrorCode: +def process_frames() -> ErrorCode: temp_frame_paths = resolve_temp_frame_paths(state_manager.get_temp_path(), state_manager.get_item('output_path'), state_manager.get_item('temp_frame_format')) if temp_frame_paths: @@ -150,61 +131,3 @@ def process_video() -> ErrorCode: logger.error(translator.get('temp_frames_not_found'), __name__) return 1 return 0 - - -def merge_frames() -> ErrorCode: - temp_frame_paths = resolve_temp_frame_paths(state_manager.get_temp_path(), state_manager.get_item('output_path'), state_manager.get_item('temp_frame_format')) - trim_frame_start, trim_frame_end = restrict_trim_frame(len(temp_frame_paths), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end')) - output_video_resolution = conditional_scale_resolution() - temp_video_fps = conditional_restrict_video_fps() - - logger.info(translator.get('merging_video').format(resolution = pack_resolution(output_video_resolution), fps = state_manager.get_item('output_video_fps')), __name__) - if ffmpeg.merge_video(state_manager.get_item('target_path'), state_manager.get_item('output_path'), temp_video_fps, output_video_resolution, trim_frame_start, trim_frame_end): - logger.debug(translator.get('merging_video_succeeded'), __name__) - else: - if is_process_stopping(): - return 4 - logger.error(translator.get('merging_video_failed'), __name__) - return 1 - return 0 - - -def restore_audio() -> ErrorCode: - temp_frame_paths = resolve_temp_frame_paths(state_manager.get_temp_path(), state_manager.get_item('output_path'), state_manager.get_item('temp_frame_format')) - trim_frame_start, trim_frame_end = restrict_trim_frame(len(temp_frame_paths), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end')) - - if state_manager.get_item('output_audio_volume') == 0: - logger.info(translator.get('skipping_audio'), __name__) - move_temp_file(state_manager.get_temp_path(), state_manager.get_item('output_path')) - else: - source_audio_path = get_first(filter_audio_paths(state_manager.get_item('source_paths'))) - if source_audio_path: - if ffmpeg.replace_audio(source_audio_path, state_manager.get_item('output_path')): - conditional_clear_video_pool() - logger.debug(translator.get('replacing_audio_succeeded'), __name__) - else: - conditional_clear_video_pool() - if is_process_stopping(): - return 4 - logger.warn(translator.get('replacing_audio_skipped'), __name__) - move_temp_file(state_manager.get_temp_path(), state_manager.get_item('output_path')) - else: - if ffmpeg.restore_audio(state_manager.get_item('target_path'), state_manager.get_item('output_path'), trim_frame_start, trim_frame_end): - conditional_clear_video_pool() - logger.debug(translator.get('restoring_audio_succeeded'), __name__) - else: - conditional_clear_video_pool() - if is_process_stopping(): - return 4 - logger.warn(translator.get('restoring_audio_skipped'), __name__) - move_temp_file(state_manager.get_temp_path(), state_manager.get_item('output_path')) - return 0 - - -def finalize_video(start_time : float) -> ErrorCode: - if is_video(state_manager.get_item('output_path')): - logger.info(translator.get('processing_video_succeeded').format(seconds = calculate_end_time(start_time)), __name__) - else: - logger.error(translator.get('processing_video_failed'), __name__) - return 1 - return 0 diff --git a/facefusion/workflows/image_to_video.py b/facefusion/workflows/image_to_video.py index 4b982541..359be3a8 100644 --- a/facefusion/workflows/image_to_video.py +++ b/facefusion/workflows/image_to_video.py @@ -1,9 +1,9 @@ from functools import partial -from facefusion import content_analyser, ffmpeg, logger, process_manager, state_manager, translator +from facefusion import process_manager from facefusion.types import ErrorCode -from facefusion.vision import detect_video_resolution, pack_resolution, restrict_trim_video_frame, restrict_video_fps, restrict_video_resolution, scale_resolution -from facefusion.workflows.core import clear, finalize_video, is_process_stopping, merge_frames, process_video, restore_audio, setup +from facefusion.workflows.core import clear, process_frames, setup +from facefusion.workflows.to_video import analyse_video, create_temp_frames, finalize_video, merge_frames, restore_audio def process(start_time : float) -> ErrorCode: @@ -13,7 +13,7 @@ def process(start_time : float) -> ErrorCode: clear, setup, create_temp_frames, - process_video, + process_frames, merge_frames, restore_audio, partial(finalize_video, start_time), @@ -31,28 +31,3 @@ def process(start_time : float) -> ErrorCode: process_manager.end() return 0 - - -def analyse_video() -> ErrorCode: - trim_frame_start, trim_frame_end = restrict_trim_video_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end')) - - if content_analyser.analyse_video(state_manager.get_item('target_path'), trim_frame_start, trim_frame_end): - return 3 - return 0 - - -def create_temp_frames() -> ErrorCode: - trim_frame_start, trim_frame_end = restrict_trim_video_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end')) - output_video_resolution = scale_resolution(detect_video_resolution(state_manager.get_item('target_path')), state_manager.get_item('output_video_scale')) - temp_video_resolution = restrict_video_resolution(state_manager.get_item('target_path'), output_video_resolution) - temp_video_fps = restrict_video_fps(state_manager.get_item('target_path'), state_manager.get_item('output_video_fps')) - logger.info(translator.get('extracting_frames').format(resolution=pack_resolution(temp_video_resolution), fps=temp_video_fps), __name__) - - if ffmpeg.extract_frames(state_manager.get_item('target_path'), state_manager.get_item('output_path'), temp_video_resolution, temp_video_fps, trim_frame_start, trim_frame_end): - logger.debug(translator.get('extracting_frames_succeeded'), __name__) - else: - if is_process_stopping(): - return 4 - logger.error(translator.get('extracting_frames_failed'), __name__) - return 1 - return 0 diff --git a/facefusion/workflows/image_to_video_as_sequence.py b/facefusion/workflows/image_to_video_as_sequence.py new file mode 100644 index 00000000..018ae603 --- /dev/null +++ b/facefusion/workflows/image_to_video_as_sequence.py @@ -0,0 +1,54 @@ +import os +from functools import partial + +from facefusion import logger, process_manager, state_manager, translator +from facefusion.filesystem import are_images, copy_file, create_directory, resolve_file_paths +from facefusion.temp_helper import resolve_temp_frame_paths +from facefusion.time_helper import calculate_end_time +from facefusion.types import ErrorCode +from facefusion.workflows.core import clear, process_frames, setup +from facefusion.workflows.to_video import analyse_video, create_temp_frames + + +def process(start_time : float) -> ErrorCode: + tasks =\ + [ + analyse_video, + clear, + setup, + create_temp_frames, + process_frames, + copy_temp_frames, + partial(finalize_sequence, start_time), + clear + ] + + process_manager.start() + + for task in tasks: + error_code = task() #type:ignore[operator] + + if error_code > 0: + process_manager.end() + return error_code + + process_manager.end() + return 0 + + +def copy_temp_frames() -> ErrorCode: + temp_frame_paths = resolve_temp_frame_paths(state_manager.get_temp_path(), state_manager.get_item('output_path'), state_manager.get_item('temp_frame_format')) + + for temp_frame_path in temp_frame_paths: + if not create_directory(state_manager.get_item('output_path')) or not copy_file(temp_frame_path, os.path.join(state_manager.get_item('output_path'), os.path.basename(temp_frame_path))): + return 1 + return 0 + + +def finalize_sequence(start_time : float) -> ErrorCode: + if are_images(resolve_file_paths(state_manager.get_item('output_path'))): + logger.info(translator.get('processing_sequence_succeeded').format(seconds = calculate_end_time(start_time)), __name__) + else: + logger.error(translator.get('processing_sequence_failed'), __name__) + return 1 + return 0 diff --git a/facefusion/workflows/to_video.py b/facefusion/workflows/to_video.py new file mode 100644 index 00000000..43366dd8 --- /dev/null +++ b/facefusion/workflows/to_video.py @@ -0,0 +1,109 @@ +from facefusion import content_analyser, ffmpeg, logger, state_manager, translator, video_manager +from facefusion.common_helper import get_first +from facefusion.filesystem import filter_audio_paths, is_video +from facefusion.media_helper import restrict_trim_frame +from facefusion.temp_helper import move_temp_file, resolve_temp_frame_paths +from facefusion.time_helper import calculate_end_time +from facefusion.types import ErrorCode, Fps, Resolution +from facefusion.vision import detect_image_resolution, detect_video_resolution, pack_resolution, restrict_trim_video_frame, restrict_video_fps, restrict_video_resolution, scale_resolution +from facefusion.workflows.core import is_process_stopping + + +def analyse_video() -> ErrorCode: + trim_frame_start, trim_frame_end = restrict_trim_video_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end')) + + if content_analyser.analyse_video(state_manager.get_item('target_path'), trim_frame_start, trim_frame_end): + return 3 + return 0 + + +def create_temp_frames() -> ErrorCode: + trim_frame_start, trim_frame_end = restrict_trim_video_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end')) + output_video_resolution = scale_resolution(detect_video_resolution(state_manager.get_item('target_path')), state_manager.get_item('output_video_scale')) + temp_video_resolution = restrict_video_resolution(state_manager.get_item('target_path'), output_video_resolution) + temp_video_fps = restrict_video_fps(state_manager.get_item('target_path'), state_manager.get_item('output_video_fps')) + logger.info(translator.get('extracting_frames').format(resolution=pack_resolution(temp_video_resolution), fps=temp_video_fps), __name__) + + if ffmpeg.extract_frames(state_manager.get_item('target_path'), state_manager.get_item('output_path'), temp_video_resolution, temp_video_fps, trim_frame_start, trim_frame_end): + logger.debug(translator.get('extracting_frames_succeeded'), __name__) + else: + if is_process_stopping(): + return 4 + logger.error(translator.get('extracting_frames_failed'), __name__) + return 1 + return 0 + + +def merge_frames() -> ErrorCode: + temp_frame_paths = resolve_temp_frame_paths(state_manager.get_temp_path(), state_manager.get_item('output_path'), state_manager.get_item('temp_frame_format')) + trim_frame_start, trim_frame_end = restrict_trim_frame(len(temp_frame_paths), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end')) + output_video_resolution = conditional_scale_resolution() + temp_video_fps = conditional_restrict_video_fps() + + logger.info(translator.get('merging_video').format(resolution = pack_resolution(output_video_resolution), fps = state_manager.get_item('output_video_fps')), __name__) + if ffmpeg.merge_video(state_manager.get_item('target_path'), state_manager.get_item('output_path'), temp_video_fps, output_video_resolution, trim_frame_start, trim_frame_end): + logger.debug(translator.get('merging_video_succeeded'), __name__) + else: + if is_process_stopping(): + return 4 + logger.error(translator.get('merging_video_failed'), __name__) + return 1 + return 0 + + +def restore_audio() -> ErrorCode: + temp_frame_paths = resolve_temp_frame_paths(state_manager.get_temp_path(), state_manager.get_item('output_path'), state_manager.get_item('temp_frame_format')) + trim_frame_start, trim_frame_end = restrict_trim_frame(len(temp_frame_paths), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end')) + + if state_manager.get_item('output_audio_volume') == 0: + logger.info(translator.get('skipping_audio'), __name__) + move_temp_file(state_manager.get_temp_path(), state_manager.get_item('output_path')) + else: + source_audio_path = get_first(filter_audio_paths(state_manager.get_item('source_paths'))) + if source_audio_path: + if ffmpeg.replace_audio(source_audio_path, state_manager.get_item('output_path')): + conditional_clear_video_pool() + logger.debug(translator.get('replacing_audio_succeeded'), __name__) + else: + conditional_clear_video_pool() + if is_process_stopping(): + return 4 + logger.warn(translator.get('replacing_audio_skipped'), __name__) + move_temp_file(state_manager.get_temp_path(), state_manager.get_item('output_path')) + else: + if ffmpeg.restore_audio(state_manager.get_item('target_path'), state_manager.get_item('output_path'), trim_frame_start, trim_frame_end): + conditional_clear_video_pool() + logger.debug(translator.get('restoring_audio_succeeded'), __name__) + else: + conditional_clear_video_pool() + if is_process_stopping(): + return 4 + logger.warn(translator.get('restoring_audio_skipped'), __name__) + move_temp_file(state_manager.get_temp_path(), state_manager.get_item('output_path')) + return 0 + + +def finalize_video(start_time : float) -> ErrorCode: + if is_video(state_manager.get_item('output_path')): + logger.info(translator.get('processing_video_succeeded').format(seconds = calculate_end_time(start_time)), __name__) + else: + logger.error(translator.get('processing_video_failed'), __name__) + return 1 + return 0 + + +def conditional_clear_video_pool() -> None: + if state_manager.get_item('workflow') == 'image-to-video': + video_manager.clear_video_pool() + + +def conditional_restrict_video_fps() -> Fps: + if state_manager.get_item('workflow') == 'image-to-video': + return restrict_video_fps(state_manager.get_item('target_path'), state_manager.get_item('output_video_fps')) + return state_manager.get_item('output_video_fps') + + +def conditional_scale_resolution() -> Resolution: + if state_manager.get_item('workflow') == 'image-to-video': + return scale_resolution(detect_video_resolution(state_manager.get_item('target_path')), state_manager.get_item('output_video_scale')) + return scale_resolution(detect_image_resolution(state_manager.get_item('target_path')), state_manager.get_item('output_video_scale')) diff --git a/tests/helper.py b/tests/helper.py index 8902643c..82c07a7e 100644 --- a/tests/helper.py +++ b/tests/helper.py @@ -1,7 +1,7 @@ import os import tempfile -from facefusion.filesystem import create_directory, is_directory, is_file, remove_directory +from facefusion.filesystem import are_images, create_directory, is_directory, is_file, remove_directory, resolve_file_paths from facefusion.types import JobStatus @@ -26,10 +26,14 @@ def get_test_examples_directory() -> str: def is_test_output_file(file_path : str) -> bool: - return is_file(get_test_output_file(file_path)) + return is_file(get_test_output_path(file_path)) -def get_test_output_file(file_path : str) -> str: +def is_test_output_sequence(directory_path : str) -> bool: + return are_images(resolve_file_paths(directory_path)) + + +def get_test_output_path(file_path : str) -> str: return os.path.join(get_test_outputs_directory(), file_path) diff --git a/tests/test_cli_age_modifier.py b/tests/test_cli_age_modifier.py index 06dba143..50db990a 100644 --- a/tests/test_cli_age_modifier.py +++ b/tests/test_cli_age_modifier.py @@ -5,7 +5,7 @@ import pytest from facefusion.download import conditional_download from facefusion.jobs.job_manager import clear_jobs, init_jobs -from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_file, is_test_output_file, prepare_test_output_directory +from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, is_test_output_sequence, prepare_test_output_directory @pytest.fixture(scope = 'module', autouse = True) @@ -25,14 +25,21 @@ def before_each() -> None: def test_modify_age_to_image() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '--processors', 'age_modifier', '--age-modifier-direction', '100', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-age-face-to-image.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '--processors', 'age_modifier', '--age-modifier-direction', '100', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_path('test-age-face-to-image.jpg') ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-age-face-to-image.jpg') is True def test_modify_age_to_video() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-video', '--jobs-path', get_test_jobs_directory(), '--processors', 'age_modifier', '--age-modifier-direction', '100', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-age-face-to-video.mp4'), '--trim-frame-end', '1' ] + commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-video', '--jobs-path', get_test_jobs_directory(), '--processors', 'age_modifier', '--age-modifier-direction', '100', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_path('test-age-face-to-video.mp4'), '--trim-frame-end', '1' ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-age-face-to-video.mp4') is True + + +def test_modify_age_to_video_as_sequence() -> None: + commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-video-as-sequence', '--jobs-path', get_test_jobs_directory(), '--processors', 'age_modifier', '--age-modifier-direction', '100', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_path('test-age-face-to-video-as-sequence'), '--trim-frame-end', '1' ] + + assert subprocess.run(commands).returncode == 0 + assert is_test_output_sequence(get_test_output_path('test-age-face-to-video-as-sequence')) is True diff --git a/tests/test_cli_background_remover.py b/tests/test_cli_background_remover.py index 2646a178..19048adc 100644 --- a/tests/test_cli_background_remover.py +++ b/tests/test_cli_background_remover.py @@ -5,7 +5,7 @@ import pytest from facefusion.download import conditional_download from facefusion.jobs.job_manager import clear_jobs, init_jobs -from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_file, is_test_output_file, prepare_test_output_directory +from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, is_test_output_sequence, prepare_test_output_directory @pytest.fixture(scope = 'module', autouse = True) @@ -15,7 +15,7 @@ def before_all() -> None: 'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/source.jpg', 'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/target-240p.mp4' ]) - subprocess.run(['ffmpeg', '-i', get_test_example_file('target-240p.mp4'), '-vframes', '1', get_test_example_file('target-240p.jpg')]) + subprocess.run([ 'ffmpeg', '-i', get_test_example_file('target-240p.mp4'), '-vframes', '1', get_test_example_file('target-240p.jpg') ]) @pytest.fixture(scope = 'function', autouse = True) @@ -26,14 +26,21 @@ def before_each() -> None: def test_remove_background_to_image() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '--processors', 'background_remover', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-remove-background-to-image.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '--processors', 'background_remover', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_path('test-remove-background-to-image.jpg') ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-remove-background-to-image.jpg') is True def test_remove_background_to_video() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-video', '--jobs-path', get_test_jobs_directory(), '--processors', 'background_remover', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-remove-background-to-video.mp4'), '--trim-frame-end', '1' ] + commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-video', '--jobs-path', get_test_jobs_directory(), '--processors', 'background_remover', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_path('test-remove-background-to-video.mp4'), '--trim-frame-end', '1' ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-remove-background-to-video.mp4') is True + + +def test_remove_background_to_video_as_sequence() -> None: + commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-video-as-sequence', '--jobs-path', get_test_jobs_directory(), '--processors', 'background_remover', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_path('test-remove-background-to-video-as-sequence'), '--trim-frame-end', '1' ] + + assert subprocess.run(commands).returncode == 0 + assert is_test_output_sequence(get_test_output_path('test-remove-background-to-video-as-sequence')) is True diff --git a/tests/test_cli_batch_runner.py b/tests/test_cli_batch_runner.py index 4c124c9f..adafde35 100644 --- a/tests/test_cli_batch_runner.py +++ b/tests/test_cli_batch_runner.py @@ -5,7 +5,7 @@ import pytest from facefusion.download import conditional_download from facefusion.jobs.job_manager import clear_jobs, init_jobs -from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_file, is_test_output_file, prepare_test_output_directory +from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, prepare_test_output_directory @pytest.fixture(scope = 'module', autouse = True) @@ -26,7 +26,7 @@ def before_each() -> None: def test_batch_run_targets() -> None: - commands = [ sys.executable, 'facefusion.py', 'batch-run', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p-batch-*.jpg'), '-o', get_test_output_file('test-batch-run-targets-{index}.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'batch-run', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p-batch-*.jpg'), '-o', get_test_output_path('test-batch-run-targets-{index}.jpg') ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-batch-run-targets-0.jpg') is True @@ -35,7 +35,7 @@ def test_batch_run_targets() -> None: def test_batch_run_sources_to_targets() -> None: - commands = [ sys.executable, 'facefusion.py', 'batch-run', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('target-240p-batch-*.jpg'), '-t', get_test_example_file('target-240p-batch-*.jpg'), '-o', get_test_output_file('test-batch-run-sources-to-targets-{index}.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'batch-run', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('target-240p-batch-*.jpg'), '-t', get_test_example_file('target-240p-batch-*.jpg'), '-o', get_test_output_path('test-batch-run-sources-to-targets-{index}.jpg') ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-batch-run-sources-to-targets-0.jpg') is True diff --git a/tests/test_cli_expression_restorer.py b/tests/test_cli_expression_restorer.py index 9b4aa84c..699f8402 100644 --- a/tests/test_cli_expression_restorer.py +++ b/tests/test_cli_expression_restorer.py @@ -5,7 +5,7 @@ import pytest from facefusion.download import conditional_download from facefusion.jobs.job_manager import clear_jobs, init_jobs -from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_file, is_test_output_file, prepare_test_output_directory +from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, is_test_output_sequence, prepare_test_output_directory @pytest.fixture(scope = 'module', autouse = True) @@ -25,14 +25,21 @@ def before_each() -> None: def test_restore_expression_to_image() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '--processors', 'expression_restorer', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-restore-expression-to-image.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '--processors', 'expression_restorer', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_path('test-restore-expression-to-image.jpg') ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-restore-expression-to-image.jpg') is True def test_restore_expression_to_video() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-video', '--jobs-path', get_test_jobs_directory(), '--processors', 'expression_restorer', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-restore-expression-to-video.mp4'), '--trim-frame-end', '1' ] + commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-video', '--jobs-path', get_test_jobs_directory(), '--processors', 'expression_restorer', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_path('test-restore-expression-to-video.mp4'), '--trim-frame-end', '1' ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-restore-expression-to-video.mp4') is True + + +def test_restore_expression_to_video_as_sequence() -> None: + commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-video-as-sequence', '--jobs-path', get_test_jobs_directory(), '--processors', 'expression_restorer', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_path('test-restore-expression-to-video-as-sequence'), '--trim-frame-end', '1' ] + + assert subprocess.run(commands).returncode == 0 + assert is_test_output_sequence(get_test_output_path('test-restore-expression-to-video-as-sequence')) is True diff --git a/tests/test_cli_face_debugger.py b/tests/test_cli_face_debugger.py index 2669a276..9fb687ca 100644 --- a/tests/test_cli_face_debugger.py +++ b/tests/test_cli_face_debugger.py @@ -5,7 +5,7 @@ import pytest from facefusion.download import conditional_download from facefusion.jobs.job_manager import clear_jobs, init_jobs -from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_file, is_test_output_file, prepare_test_output_directory +from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, is_test_output_sequence, prepare_test_output_directory @pytest.fixture(scope = 'module', autouse = True) @@ -26,14 +26,21 @@ def before_each() -> None: def test_debug_face_to_image() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-debug-face-to-image.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_path('test-debug-face-to-image.jpg') ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-debug-face-to-image.jpg') is True def test_debug_face_to_video() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-video', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-debug-face-to-video.mp4'), '--trim-frame-end', '1' ] + commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-video', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_path('test-debug-face-to-video.mp4'), '--trim-frame-end', '1' ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-debug-face-to-video.mp4') is True + + +def test_debug_face_to_video_as_sequence() -> None: + commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-video-as-sequence', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_path('test-debug-face-to-video-as-sequence'), '--trim-frame-end', '1' ] + + assert subprocess.run(commands).returncode == 0 + assert is_test_output_sequence(get_test_output_path('test-debug-face-to-video-as-sequence')) is True diff --git a/tests/test_cli_face_editor.py b/tests/test_cli_face_editor.py index 8c72c5bf..e0569fbb 100644 --- a/tests/test_cli_face_editor.py +++ b/tests/test_cli_face_editor.py @@ -5,7 +5,7 @@ import pytest from facefusion.download import conditional_download from facefusion.jobs.job_manager import clear_jobs, init_jobs -from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_file, is_test_output_file, prepare_test_output_directory +from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, is_test_output_sequence, prepare_test_output_directory @pytest.fixture(scope = 'module', autouse = True) @@ -26,14 +26,21 @@ def before_each() -> None: def test_edit_face_to_image() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_editor', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-edit-face-to-image.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_editor', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_path('test-edit-face-to-image.jpg') ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-edit-face-to-image.jpg') is True def test_edit_face_to_video() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-video', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_editor', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-edit-face-to-video.mp4'), '--trim-frame-end', '1' ] + commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-video', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_editor', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_path('test-edit-face-to-video.mp4'), '--trim-frame-end', '1' ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-edit-face-to-video.mp4') is True + + +def test_edit_face_to_video_as_sequence() -> None: + commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-video-as-sequence', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_editor', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_path('test-edit-face-to-video-as-sequence'), '--trim-frame-end', '1' ] + + assert subprocess.run(commands).returncode == 0 + assert is_test_output_sequence(get_test_output_path('test-edit-face-to-video-as-sequence')) is True diff --git a/tests/test_cli_face_enhancer.py b/tests/test_cli_face_enhancer.py index b2e3a8ac..a315ad69 100644 --- a/tests/test_cli_face_enhancer.py +++ b/tests/test_cli_face_enhancer.py @@ -5,7 +5,7 @@ import pytest from facefusion.download import conditional_download from facefusion.jobs.job_manager import clear_jobs, init_jobs -from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_file, is_test_output_file, prepare_test_output_directory +from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, is_test_output_sequence, prepare_test_output_directory @pytest.fixture(scope = 'module', autouse = True) @@ -26,14 +26,21 @@ def before_each() -> None: def test_enhance_face_to_image() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_enhancer', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-enhance-face-to-image.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_enhancer', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_path('test-enhance-face-to-image.jpg') ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-enhance-face-to-image.jpg') is True def test_enhance_face_to_video() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-video', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_enhancer', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-enhance-face-to-video.mp4'), '--trim-frame-end', '1' ] + commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-video', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_enhancer', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_path('test-enhance-face-to-video.mp4'), '--trim-frame-end', '1' ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-enhance-face-to-video.mp4') is True + + +def test_enhance_face_to_video_as_sequence() -> None: + commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-video-as-sequence', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_enhancer', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_path('test-enhance-face-to-video-as-sequence'), '--trim-frame-end', '1' ] + + assert subprocess.run(commands).returncode == 0 + assert is_test_output_sequence(get_test_output_path('test-enhance-face-to-video-as-sequence')) is True diff --git a/tests/test_cli_face_swapper.py b/tests/test_cli_face_swapper.py index f8f067cc..1d77a82c 100644 --- a/tests/test_cli_face_swapper.py +++ b/tests/test_cli_face_swapper.py @@ -5,7 +5,7 @@ import pytest from facefusion.download import conditional_download from facefusion.jobs.job_manager import clear_jobs, init_jobs -from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_file, is_test_output_file, prepare_test_output_directory +from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, is_test_output_sequence, prepare_test_output_directory @pytest.fixture(scope = 'module', autouse = True) @@ -26,14 +26,21 @@ def before_each() -> None: def test_swap_face_to_image() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_swapper', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-swap-face-to-image.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_swapper', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_path('test-swap-face-to-image.jpg') ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-swap-face-to-image.jpg') is True def test_swap_face_to_video() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-video', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_swapper', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-swap-face-to-video.mp4'), '--trim-frame-end', '1' ] + commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-video', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_swapper', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_path('test-swap-face-to-video.mp4'), '--trim-frame-end', '1' ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-swap-face-to-video.mp4') is True + + +def test_swap_face_to_video_as_sequence() -> None: + commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-video-as-sequence', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_swapper', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_path('test-swap-face-to-video-as-sequence'), '--trim-frame-end', '1' ] + + assert subprocess.run(commands).returncode == 0 + assert is_test_output_sequence(get_test_output_path('test-swap-face-to-video-as-sequence')) is True diff --git a/tests/test_cli_frame_colorizer.py b/tests/test_cli_frame_colorizer.py index a50e6a63..a3087b67 100644 --- a/tests/test_cli_frame_colorizer.py +++ b/tests/test_cli_frame_colorizer.py @@ -5,7 +5,7 @@ import pytest from facefusion.download import conditional_download from facefusion.jobs.job_manager import clear_jobs, init_jobs -from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_file, is_test_output_file, prepare_test_output_directory +from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, is_test_output_sequence, prepare_test_output_directory @pytest.fixture(scope = 'module', autouse = True) @@ -27,14 +27,21 @@ def before_each() -> None: def test_colorize_frame_to_image() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '--processors', 'frame_colorizer', '-t', get_test_example_file('target-240p-0sat.jpg'), '-o', get_test_output_file('test_colorize-frame-to-image.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '--processors', 'frame_colorizer', '-t', get_test_example_file('target-240p-0sat.jpg'), '-o', get_test_output_path('test_colorize-frame-to-image.jpg') ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test_colorize-frame-to-image.jpg') is True def test_colorize_frame_to_video() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-video', '--jobs-path', get_test_jobs_directory(), '--processors', 'frame_colorizer', '-t', get_test_example_file('target-240p-0sat.mp4'), '-o', get_test_output_file('test-colorize-frame-to-video.mp4'), '--trim-frame-end', '1' ] + commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-video', '--jobs-path', get_test_jobs_directory(), '--processors', 'frame_colorizer', '-t', get_test_example_file('target-240p-0sat.mp4'), '-o', get_test_output_path('test-colorize-frame-to-video.mp4'), '--trim-frame-end', '1' ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-colorize-frame-to-video.mp4') is True + + +def test_colorize_frame_to_video_as_sequence() -> None: + commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-video-as-sequence', '--jobs-path', get_test_jobs_directory(), '--processors', 'frame_colorizer', '-t', get_test_example_file('target-240p-0sat.mp4'), '-o', get_test_output_path('test-colorize-frame-to-video-as-sequence'), '--trim-frame-end', '1' ] + + assert subprocess.run(commands).returncode == 0 + assert is_test_output_sequence(get_test_output_path('test-colorize-frame-to-video-as-sequence')) is True diff --git a/tests/test_cli_frame_enhancer.py b/tests/test_cli_frame_enhancer.py index d0b55f5d..27f03bbb 100644 --- a/tests/test_cli_frame_enhancer.py +++ b/tests/test_cli_frame_enhancer.py @@ -5,7 +5,7 @@ import pytest from facefusion.download import conditional_download from facefusion.jobs.job_manager import clear_jobs, init_jobs -from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_file, is_test_output_file, prepare_test_output_directory +from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, is_test_output_sequence, prepare_test_output_directory @pytest.fixture(scope = 'module', autouse = True) @@ -26,14 +26,21 @@ def before_each() -> None: def test_enhance_frame_to_image() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '--processors', 'frame_enhancer', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-enhance-frame-to-image.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '--processors', 'frame_enhancer', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_path('test-enhance-frame-to-image.jpg') ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-enhance-frame-to-image.jpg') is True def test_enhance_frame_to_video() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-video', '--jobs-path', get_test_jobs_directory(), '--processors', 'frame_enhancer', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-enhance-frame-to-video.mp4'), '--trim-frame-end', '1' ] + commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-video', '--jobs-path', get_test_jobs_directory(), '--processors', 'frame_enhancer', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_path('test-enhance-frame-to-video.mp4'), '--trim-frame-end', '1' ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test-enhance-frame-to-video.mp4') is True + + +def test_enhance_frame_to_video_as_sequence() -> None: + commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-video-as-sequence', '--jobs-path', get_test_jobs_directory(), '--processors', 'frame_enhancer', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_path('test-enhance-frame-to-video-as-sequence'), '--trim-frame-end', '1' ] + + assert subprocess.run(commands).returncode == 0 + assert is_test_output_sequence(get_test_output_path('test-enhance-frame-to-video-as-sequence')) is True diff --git a/tests/test_cli_job_manager.py b/tests/test_cli_job_manager.py index 9bd76d8f..7d462d55 100644 --- a/tests/test_cli_job_manager.py +++ b/tests/test_cli_job_manager.py @@ -5,7 +5,7 @@ import pytest from facefusion.download import conditional_download from facefusion.jobs.job_manager import clear_jobs, count_step_total, init_jobs -from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_file, is_test_job_file +from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_job_file @pytest.fixture(scope = 'module', autouse = True) @@ -50,7 +50,7 @@ def test_job_submit() -> None: assert subprocess.run(commands).returncode == 1 - commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-submit', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-submit', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_path('test-job-remix-step.jpg') ] subprocess.run(commands) commands = [ sys.executable, 'facefusion.py', 'job-submit', 'test-job-submit', '--jobs-path', get_test_jobs_directory() ] @@ -73,10 +73,10 @@ def test_submit_all() -> None: assert subprocess.run(commands).returncode == 1 - commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-submit-all-1', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-submit-all-1', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_path('test-job-remix-step.jpg') ] subprocess.run(commands) - commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-submit-all-2', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-submit-all-2', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_path('test-job-remix-step.jpg') ] subprocess.run(commands) commands = [ sys.executable, 'facefusion.py', 'job-submit-all', '--jobs-path', get_test_jobs_directory(), '--halt-on-error' ] @@ -122,7 +122,7 @@ def test_job_delete_all() -> None: def test_job_add_step() -> None: - commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-add-step', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-add-step', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_path('test-job-remix-step.jpg') ] assert subprocess.run(commands).returncode == 1 assert count_step_total('test-job-add-step') == 0 @@ -130,14 +130,14 @@ def test_job_add_step() -> None: commands = [ sys.executable, 'facefusion.py', 'job-create', 'test-job-add-step', '--jobs-path', get_test_jobs_directory() ] subprocess.run(commands) - commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-add-step', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-add-step', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_path('test-job-remix-step.jpg') ] assert subprocess.run(commands).returncode == 0 assert count_step_total('test-job-add-step') == 1 def test_job_remix() -> None: - commands = [ sys.executable, 'facefusion.py', 'job-remix-step', 'test-job-remix-step', '0', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-remix-step', 'test-job-remix-step', '0', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-o', get_test_output_path('test-job-remix-step.jpg') ] assert subprocess.run(commands).returncode == 1 assert count_step_total('test-job-remix-step') == 0 @@ -145,23 +145,23 @@ def test_job_remix() -> None: commands = [ sys.executable, 'facefusion.py', 'job-create', 'test-job-remix-step', '--jobs-path', get_test_jobs_directory() ] subprocess.run(commands) - commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-remix-step', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-remix-step', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_path('test-job-remix-step.jpg') ] subprocess.run(commands) - commands = [ sys.executable, 'facefusion.py', 'job-remix-step', 'test-job-remix-step', '0', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-remix-step', 'test-job-remix-step', '0', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-o', get_test_output_path('test-job-remix-step.jpg') ] assert count_step_total('test-job-remix-step') == 1 assert subprocess.run(commands).returncode == 0 assert count_step_total('test-job-remix-step') == 2 - commands = [ sys.executable, 'facefusion.py', 'job-remix-step', 'test-job-remix-step', '-1', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-remix-step', 'test-job-remix-step', '-1', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-o', get_test_output_path('test-job-remix-step.jpg') ] assert subprocess.run(commands).returncode == 0 assert count_step_total('test-job-remix-step') == 3 def test_job_insert_step() -> None: - commands = [ sys.executable, 'facefusion.py', 'job-insert-step', 'test-job-insert-step', '0', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-insert-step', 'test-job-insert-step', '0', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_path('test-job-remix-step.jpg') ] assert subprocess.run(commands).returncode == 1 assert count_step_total('test-job-insert-step') == 0 @@ -169,16 +169,16 @@ def test_job_insert_step() -> None: commands = [ sys.executable, 'facefusion.py', 'job-create', 'test-job-insert-step', '--jobs-path', get_test_jobs_directory() ] subprocess.run(commands) - commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-insert-step', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-insert-step', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_path('test-job-remix-step.jpg') ] subprocess.run(commands) - commands = [ sys.executable, 'facefusion.py', 'job-insert-step', 'test-job-insert-step', '0', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-insert-step', 'test-job-insert-step', '0', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_path('test-job-remix-step.jpg') ] assert count_step_total('test-job-insert-step') == 1 assert subprocess.run(commands).returncode == 0 assert count_step_total('test-job-insert-step') == 2 - commands = [ sys.executable, 'facefusion.py', 'job-insert-step', 'test-job-insert-step', '-1', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-insert-step', 'test-job-insert-step', '-1', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_path('test-job-remix-step.jpg') ] assert subprocess.run(commands).returncode == 0 assert count_step_total('test-job-insert-step') == 3 @@ -192,7 +192,7 @@ def test_job_remove_step() -> None: commands = [ sys.executable, 'facefusion.py', 'job-create', 'test-job-remove-step', '--jobs-path', get_test_jobs_directory() ] subprocess.run(commands) - commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-remove-step', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-remove-step', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_path('test-job-remix-step.jpg') ] subprocess.run(commands) subprocess.run(commands) diff --git a/tests/test_cli_job_runner.py b/tests/test_cli_job_runner.py index d84aeaf9..e1608ed5 100644 --- a/tests/test_cli_job_runner.py +++ b/tests/test_cli_job_runner.py @@ -5,7 +5,7 @@ import pytest from facefusion.download import conditional_download from facefusion.jobs.job_manager import clear_jobs, init_jobs, move_job_file, set_steps_status -from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_file, is_test_output_file, prepare_test_output_directory +from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, prepare_test_output_directory @pytest.fixture(scope = 'module', autouse = True) @@ -33,7 +33,7 @@ def test_job_run() -> None: commands = [ sys.executable, 'facefusion.py', 'job-create', 'test-job-run', '--jobs-path', get_test_jobs_directory() ] subprocess.run(commands) - commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-run', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-run.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-run', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_path('test-job-run.jpg') ] subprocess.run(commands) commands = [ sys.executable, 'facefusion.py', 'job-run', 'test-job-run', '--jobs-path', get_test_jobs_directory() ] @@ -61,13 +61,13 @@ def test_job_run_all() -> None: commands = [ sys.executable, 'facefusion.py', 'job-create', 'test-job-run-all-2', '--jobs-path', get_test_jobs_directory() ] subprocess.run(commands) - commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-run-all-1', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-run-all-1.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-run-all-1', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_path('test-job-run-all-1.jpg') ] subprocess.run(commands) - commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-run-all-2', '--workflow', 'image-to-video', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-job-run-all-2.mp4'), '--trim-frame-end', '1' ] + commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-run-all-2', '--workflow', 'image-to-video', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_path('test-job-run-all-2.mp4'), '--trim-frame-end', '1' ] subprocess.run(commands) - commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-run-all-2', '--workflow', 'image-to-video', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-job-run-all-2.mp4'), '--trim-frame-start', '0', '--trim-frame-end', '1' ] + commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-run-all-2', '--workflow', 'image-to-video', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_path('test-job-run-all-2.mp4'), '--trim-frame-start', '0', '--trim-frame-end', '1' ] subprocess.run(commands) commands = [ sys.executable, 'facefusion.py', 'job-run-all', '--jobs-path', get_test_jobs_directory(), '--halt-on-error' ] @@ -93,7 +93,7 @@ def test_job_retry() -> None: commands = [ sys.executable, 'facefusion.py', 'job-create', 'test-job-retry', '--jobs-path', get_test_jobs_directory() ] subprocess.run(commands) - commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-retry', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-retry.jpg') ] + commands = [sys.executable, 'facefusion.py', 'job-add-step', 'test-job-retry', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_path('test-job-retry.jpg') ] subprocess.run(commands) commands = [ sys.executable, 'facefusion.py', 'job-retry', 'test-job-retry', '--jobs-path', get_test_jobs_directory() ] @@ -121,13 +121,13 @@ def test_job_retry_all() -> None: commands = [ sys.executable, 'facefusion.py', 'job-create', 'test-job-retry-all-2', '--jobs-path', get_test_jobs_directory() ] subprocess.run(commands) - commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-retry-all-1', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-retry-all-1.jpg') ] + commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-retry-all-1', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_path('test-job-retry-all-1.jpg') ] subprocess.run(commands) - commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-retry-all-2', '--workflow', 'image-to-video', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-job-retry-all-2.mp4'), '--trim-frame-end', '1' ] + commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-retry-all-2', '--workflow', 'image-to-video', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_path('test-job-retry-all-2.mp4'), '--trim-frame-end', '1' ] subprocess.run(commands) - commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-retry-all-2', '--workflow', 'image-to-video', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-job-retry-all-2.mp4'), '--trim-frame-start', '0', '--trim-frame-end', '1' ] + commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-retry-all-2', '--workflow', 'image-to-video', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_path('test-job-retry-all-2.mp4'), '--trim-frame-start', '0', '--trim-frame-end', '1' ] subprocess.run(commands) commands = [ sys.executable, 'facefusion.py', 'job-retry-all', '--jobs-path', get_test_jobs_directory(), '--halt-on-error' ] diff --git a/tests/test_cli_lip_syncer.py b/tests/test_cli_lip_syncer.py index 1d17276c..47526a2a 100644 --- a/tests/test_cli_lip_syncer.py +++ b/tests/test_cli_lip_syncer.py @@ -5,7 +5,7 @@ import pytest from facefusion.download import conditional_download from facefusion.jobs.job_manager import clear_jobs, init_jobs -from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_file, is_test_output_file, prepare_test_output_directory +from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, is_test_output_sequence, prepare_test_output_directory @pytest.fixture(scope = 'module', autouse = True) @@ -27,14 +27,21 @@ def before_each() -> None: def test_sync_lip_to_image() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'audio-to-image', '--jobs-path', get_test_jobs_directory(), '--processors', 'lip_syncer', '-s', get_test_example_file('source.mp3'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test_sync_lip_to_image.mp4') ] + commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'audio-to-image', '--jobs-path', get_test_jobs_directory(), '--processors', 'lip_syncer', '-s', get_test_example_file('source.mp3'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_path('test_sync_lip_to_image.mp4') ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test_sync_lip_to_image.mp4') is True def test_sync_lip_to_video() -> None: - commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-video', '--jobs-path', get_test_jobs_directory(), '--processors', 'lip_syncer', '-s', get_test_example_file('source.mp3'), '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test_sync_lip_to_video.mp4'), '--trim-frame-end', '1' ] + commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-video', '--jobs-path', get_test_jobs_directory(), '--processors', 'lip_syncer', '-s', get_test_example_file('source.mp3'), '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_path('test_sync_lip_to_video.mp4'), '--trim-frame-end', '1' ] assert subprocess.run(commands).returncode == 0 assert is_test_output_file('test_sync_lip_to_video.mp4') is True + + +def test_sync_lip_to_video_as_sequence() -> None: + commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-video-as-sequence', '--jobs-path', get_test_jobs_directory(), '--processors', 'lip_syncer', '-s', get_test_example_file('source.mp3'), '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_path('test_sync_lip_to_video_as_sequence'), '--trim-frame-end', '1' ] + + assert subprocess.run(commands).returncode == 0 + assert is_test_output_sequence(get_test_output_path('test_sync_lip_to_video_as_sequence')) is True diff --git a/tests/test_cli_output_scale.py b/tests/test_cli_output_scale.py index 9d6337d1..d970f850 100644 --- a/tests/test_cli_output_scale.py +++ b/tests/test_cli_output_scale.py @@ -7,7 +7,7 @@ from facefusion.download import conditional_download from facefusion.jobs.job_manager import clear_jobs, init_jobs from facefusion.types import Resolution, Scale from facefusion.vision import detect_image_resolution, detect_video_resolution -from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_file, prepare_test_output_directory +from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, prepare_test_output_directory @pytest.fixture(scope = 'module', autouse = True) @@ -35,7 +35,7 @@ def before_each() -> None: (8.0, (3408, 1808)) ]) def test_output_image_scale(output_image_scale : Scale, output_image_resolution : Resolution) -> None: - output_file_path = get_test_output_file('test-output-image-scale-' + str(output_image_scale) + '.jpg') + output_file_path = get_test_output_path('test-output-image-scale-' + str(output_image_scale) + '.jpg') commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-image', '--jobs-path', get_test_jobs_directory(), '--processors', 'frame_enhancer', '-t', get_test_example_file('target-240p.jpg'), '-o', output_file_path, '--output-image-scale', str(output_image_scale) ] assert subprocess.run(commands).returncode == 0 @@ -50,7 +50,7 @@ def test_output_image_scale(output_image_scale : Scale, output_image_resolution (8.0, (3408, 1808)) ]) def test_output_video_scale(output_video_scale : Scale, output_video_resolution : Resolution) -> None: - output_file_path = get_test_output_file('test-output-video-scale-' + str(output_video_scale) + '.mp4') + output_file_path = get_test_output_path('test-output-video-scale-' + str(output_video_scale) + '.mp4') commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-video', '--jobs-path', get_test_jobs_directory(), '--processors', 'frame_enhancer', '-t', get_test_example_file('target-240p.mp4'), '-o', output_file_path, '--trim-frame-end', '1', '--output-video-scale', str(output_video_scale) ] assert subprocess.run(commands).returncode == 0 diff --git a/tests/test_ffmpeg.py b/tests/test_ffmpeg.py index 8d0e937e..560a9fd3 100644 --- a/tests/test_ffmpeg.py +++ b/tests/test_ffmpeg.py @@ -11,7 +11,7 @@ from facefusion.ffmpeg import concat_video, extract_frames, merge_video, read_au from facefusion.filesystem import copy_file from facefusion.temp_helper import clear_temp_directory, create_temp_directory, get_temp_file_path, resolve_temp_frame_paths from facefusion.types import EncoderSet -from .helper import get_test_example_file, get_test_examples_directory, get_test_output_file, prepare_test_output_directory +from .helper import get_test_example_file, get_test_examples_directory, get_test_output_path, prepare_test_output_directory @pytest.fixture(scope = 'module', autouse = True) @@ -112,13 +112,13 @@ def test_spawn_frames() -> None: def test_merge_video() -> None: test_set =\ [ - (get_test_example_file('target-240p-16khz.avi'), get_test_output_file('test-merge-video-240p-16khz.avi')), - (get_test_example_file('target-240p-16khz.m4v'), get_test_output_file('test-merge-video-240p-16khz.m4v')), - (get_test_example_file('target-240p-16khz.mkv'), get_test_output_file('test-merge-video-240p-16khz.mkv')), - (get_test_example_file('target-240p-16khz.mp4'), get_test_output_file('test-merge-video-240p-16khz.mp4')), - (get_test_example_file('target-240p-16khz.mov'), get_test_output_file('test-merge-video-240p-16khz.mov')), - (get_test_example_file('target-240p-16khz.webm'), get_test_output_file('test-merge-video-240p-16khz.webm')), - (get_test_example_file('target-240p-16khz.wmv'), get_test_output_file('test-merge-video-240p-16khz.wmv')) + (get_test_example_file('target-240p-16khz.avi'), get_test_output_path('test-merge-video-240p-16khz.avi')), + (get_test_example_file('target-240p-16khz.m4v'), get_test_output_path('test-merge-video-240p-16khz.m4v')), + (get_test_example_file('target-240p-16khz.mkv'), get_test_output_path('test-merge-video-240p-16khz.mkv')), + (get_test_example_file('target-240p-16khz.mp4'), get_test_output_path('test-merge-video-240p-16khz.mp4')), + (get_test_example_file('target-240p-16khz.mov'), get_test_output_path('test-merge-video-240p-16khz.mov')), + (get_test_example_file('target-240p-16khz.webm'), get_test_output_path('test-merge-video-240p-16khz.webm')), + (get_test_example_file('target-240p-16khz.wmv'), get_test_output_path('test-merge-video-240p-16khz.wmv')) ] output_video_encoders = get_available_encoder_set().get('video') @@ -138,7 +138,7 @@ def test_merge_video() -> None: def test_concat_video() -> None: - output_path = get_test_output_file('test-concat-video.mp4') + output_path = get_test_output_path('test-concat-video.mp4') temp_output_paths =\ [ get_test_example_file('target-240p-16khz.mp4'), @@ -157,14 +157,14 @@ def test_read_audio_buffer() -> None: def test_restore_audio() -> None: test_set =\ [ - (get_test_example_file('target-240p-16khz.avi'), get_test_output_file('target-240p-16khz.avi')), - (get_test_example_file('target-240p-16khz.m4v'), get_test_output_file('target-240p-16khz.m4v')), - (get_test_example_file('target-240p-16khz.mkv'), get_test_output_file('target-240p-16khz.mkv')), - (get_test_example_file('target-240p-16khz.mov'), get_test_output_file('target-240p-16khz.mov')), - (get_test_example_file('target-240p-16khz.mp4'), get_test_output_file('target-240p-16khz.mp4')), - (get_test_example_file('target-240p-48khz.mp4'), get_test_output_file('target-240p-48khz.mp4')), - (get_test_example_file('target-240p-16khz.webm'), get_test_output_file('target-240p-16khz.webm')), - (get_test_example_file('target-240p-16khz.wmv'), get_test_output_file('target-240p-16khz.wmv')) + (get_test_example_file('target-240p-16khz.avi'), get_test_output_path('target-240p-16khz.avi')), + (get_test_example_file('target-240p-16khz.m4v'), get_test_output_path('target-240p-16khz.m4v')), + (get_test_example_file('target-240p-16khz.mkv'), get_test_output_path('target-240p-16khz.mkv')), + (get_test_example_file('target-240p-16khz.mov'), get_test_output_path('target-240p-16khz.mov')), + (get_test_example_file('target-240p-16khz.mp4'), get_test_output_path('target-240p-16khz.mp4')), + (get_test_example_file('target-240p-48khz.mp4'), get_test_output_path('target-240p-48khz.mp4')), + (get_test_example_file('target-240p-16khz.webm'), get_test_output_path('target-240p-16khz.webm')), + (get_test_example_file('target-240p-16khz.wmv'), get_test_output_path('target-240p-16khz.wmv')) ] output_audio_encoders = get_available_encoder_set().get('audio') @@ -185,13 +185,13 @@ def test_restore_audio() -> None: def test_replace_audio() -> None: test_set =\ [ - (get_test_example_file('target-240p-16khz.avi'), get_test_output_file('target-240p-16khz.avi')), - (get_test_example_file('target-240p-16khz.m4v'), get_test_output_file('target-240p-16khz.m4v')), - (get_test_example_file('target-240p-16khz.mkv'), get_test_output_file('target-240p-16khz.mkv')), - (get_test_example_file('target-240p-16khz.mov'), get_test_output_file('target-240p-16khz.mov')), - (get_test_example_file('target-240p-16khz.mp4'), get_test_output_file('target-240p-16khz.mp4')), - (get_test_example_file('target-240p-48khz.mp4'), get_test_output_file('target-240p-48khz.mp4')), - (get_test_example_file('target-240p-16khz.webm'), get_test_output_file('target-240p-16khz.webm')) + (get_test_example_file('target-240p-16khz.avi'), get_test_output_path('target-240p-16khz.avi')), + (get_test_example_file('target-240p-16khz.m4v'), get_test_output_path('target-240p-16khz.m4v')), + (get_test_example_file('target-240p-16khz.mkv'), get_test_output_path('target-240p-16khz.mkv')), + (get_test_example_file('target-240p-16khz.mov'), get_test_output_path('target-240p-16khz.mov')), + (get_test_example_file('target-240p-16khz.mp4'), get_test_output_path('target-240p-16khz.mp4')), + (get_test_example_file('target-240p-48khz.mp4'), get_test_output_path('target-240p-48khz.mp4')), + (get_test_example_file('target-240p-16khz.webm'), get_test_output_path('target-240p-16khz.webm')) ] output_audio_encoders = get_available_encoder_set().get('audio') diff --git a/tests/test_job_manager.py b/tests/test_job_manager.py index 27164146..2d263bcc 100644 --- a/tests/test_job_manager.py +++ b/tests/test_job_manager.py @@ -37,6 +37,12 @@ def test_submit_job() -> None: 'target_path': 'target-1.jpg', 'output_path': 'output-1.jpg' } + args_2 =\ + { + 'source_path': 'source-2.jpg', + 'target_path': 'target-2.mp4', + 'output_path': 'output-sequence-2' + } assert submit_job('job-invalid') is False @@ -45,6 +51,7 @@ def test_submit_job() -> None: assert submit_job('job-test-submit-job') is False add_step('job-test-submit-job', args_1) + add_step('job-test-submit-job', args_2) assert submit_job('job-test-submit-job') is True assert submit_job('job-test-submit-job') is False @@ -170,6 +177,18 @@ def test_add_step() -> None: 'target_path': 'target-2.jpg', 'output_path': 'output-2.jpg' } + args_3 =\ + { + 'source_path': 'source-3.jpg', + 'target_path': 'target-3.mp4', + 'output_path': 'output-sequence-1' + } + args_4 =\ + { + 'source_path': 'source-4.jpg', + 'target_path': 'target-4.mp4', + 'output_path': 'output-sequence-1' + } assert add_step('job-invalid', args_1) is False @@ -177,12 +196,16 @@ def test_add_step() -> None: assert add_step('job-test-add-step', args_1) is True assert add_step('job-test-add-step', args_2) is True + assert add_step('job-test-add-step', args_3) is True + assert add_step('job-test-add-step', args_4) is True steps = get_steps('job-test-add-step') assert steps[0].get('args') == args_1 assert steps[1].get('args') == args_2 - assert count_step_total('job-test-add-step') == 2 + assert steps[2].get('args') == args_3 + assert steps[3].get('args') == args_4 + assert count_step_total('job-test-add-step') == 4 def test_remix_step() -> None: @@ -198,28 +221,40 @@ def test_remix_step() -> None: 'target_path': 'target-2.jpg', 'output_path': 'output-2.jpg' } + args_3 =\ + { + 'source_path': 'source-3.jpg', + 'target_path': 'target-3.mp4', + 'output_path': 'output-sequence-3' + } assert remix_step('job-invalid', 0, args_1) is False create_job('job-test-remix-step') add_step('job-test-remix-step', args_1) add_step('job-test-remix-step', args_2) + add_step('job-test-remix-step', args_3) assert remix_step('job-test-remix-step', 99, args_1) is False assert remix_step('job-test-remix-step', 0, args_2) is True assert remix_step('job-test-remix-step', -1, args_2) is True + assert remix_step('job-test-remix-step', 2, args_3) is True steps = get_steps('job-test-remix-step') assert steps[0].get('args') == args_1 assert steps[1].get('args') == args_2 - assert steps[2].get('args').get('source_path') == args_2.get('source_path') - assert steps[2].get('args').get('target_path') == get_step_output_path('job-test-remix-step', 0, args_1.get('output_path')) - assert steps[2].get('args').get('output_path') == args_2.get('output_path') + assert steps[2].get('args') == args_3 assert steps[3].get('args').get('source_path') == args_2.get('source_path') - assert steps[3].get('args').get('target_path') == get_step_output_path('job-test-remix-step', 2, args_2.get('output_path')) + assert steps[3].get('args').get('target_path') == get_step_output_path('job-test-remix-step', 0, args_1.get('output_path')) assert steps[3].get('args').get('output_path') == args_2.get('output_path') - assert count_step_total('job-test-remix-step') == 4 + assert steps[4].get('args').get('source_path') == args_2.get('source_path') + assert steps[4].get('args').get('target_path') == get_step_output_path('job-test-remix-step', 3, args_2.get('output_path')) + assert steps[4].get('args').get('output_path') == args_2.get('output_path') + assert steps[5].get('args').get('source_path') == args_3.get('source_path') + assert steps[5].get('args').get('target_path') == get_step_output_path('job-test-remix-step', 2, args_3.get('output_path')) + assert steps[5].get('args').get('output_path') == args_3.get('output_path') + assert count_step_total('job-test-remix-step') == 6 def test_insert_step() -> None: @@ -241,6 +276,12 @@ def test_insert_step() -> None: 'target_path': 'target-3.jpg', 'output_path': 'output-3.jpg' } + args_4 =\ + { + 'source_path': 'source-4.jpg', + 'target_path': 'target-4.mp4', + 'output_path': 'output-sequence-4' + } assert insert_step('job-invalid', 0, args_1) is False @@ -251,14 +292,16 @@ def test_insert_step() -> None: assert insert_step('job-test-insert-step', 99, args_1) is False assert insert_step('job-test-insert-step', 0, args_2) is True assert insert_step('job-test-insert-step', -1, args_3) is True + assert insert_step('job-test-insert-step', 2, args_4) is True steps = get_steps('job-test-insert-step') assert steps[0].get('args') == args_2 assert steps[1].get('args') == args_1 - assert steps[2].get('args') == args_3 - assert steps[3].get('args') == args_1 - assert count_step_total('job-test-insert-step') == 4 + assert steps[2].get('args') == args_4 + assert steps[3].get('args') == args_3 + assert steps[4].get('args') == args_1 + assert count_step_total('job-test-insert-step') == 5 def test_remove_step() -> None: diff --git a/tests/test_job_runner.py b/tests/test_job_runner.py index 4e0983ae..35d57060 100644 --- a/tests/test_job_runner.py +++ b/tests/test_job_runner.py @@ -1,13 +1,14 @@ +import os import subprocess import pytest from facefusion.download import conditional_download -from facefusion.filesystem import copy_file +from facefusion.filesystem import copy_file, create_directory, get_file_extension from facefusion.jobs.job_manager import add_step, clear_jobs, create_job, init_jobs, move_job_file, submit_job, submit_jobs from facefusion.jobs.job_runner import collect_output_set, finalize_steps, retry_job, retry_jobs, run_job, run_jobs, run_steps from facefusion.types import Args -from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_file, is_test_output_file, prepare_test_output_directory +from .helper import get_test_example_file, get_test_examples_directory, get_test_jobs_directory, get_test_output_path, is_test_output_file, is_test_output_sequence, prepare_test_output_directory @pytest.fixture(scope = 'module', autouse = True) @@ -28,7 +29,15 @@ def before_each() -> None: def process_step(job_id : str, step_index : int, step_args : Args) -> bool: - return copy_file(step_args.get('target_path'), step_args.get('output_path')) + output_path = step_args.get('output_path') + target_path = step_args.get('target_path') + + if output_path and not get_file_extension(output_path): + if create_directory(output_path): + return copy_file(target_path, os.path.join(output_path, os.path.basename(target_path))) + return False + + return copy_file(target_path, output_path) def test_run_job() -> None: @@ -36,19 +45,31 @@ def test_run_job() -> None: { 'source_path': get_test_example_file('source.jpg'), 'target_path': get_test_example_file('target-240p.mp4'), - 'output_path': get_test_output_file('output-1.mp4') + 'output_path': get_test_output_path('output-1.mp4') } args_2 =\ { 'source_path': get_test_example_file('source.jpg'), 'target_path': get_test_example_file('target-240p.mp4'), - 'output_path': get_test_output_file('output-2.mp4') + 'output_path': get_test_output_path('output-2.mp4') } args_3 =\ { 'source_path': get_test_example_file('source.jpg'), 'target_path': get_test_example_file('target-240p.jpg'), - 'output_path': get_test_output_file('output-3.jpg') + 'output_path': get_test_output_path('output-3.jpg') + } + args_4 =\ + { + 'source_path': get_test_example_file('source.jpg'), + 'target_path': get_test_example_file('target-240p.jpg'), + 'output_path': get_test_output_path('output-4') + } + args_5 =\ + { + 'source_path': get_test_example_file('source.jpg'), + 'target_path': get_test_example_file('target-240p.jpg'), + 'output_path': get_test_output_path('output-4') } assert run_job('job-invalid', process_step) is False @@ -58,6 +79,8 @@ def test_run_job() -> None: add_step('job-test-run-job', args_2) add_step('job-test-run-job', args_2) add_step('job-test-run-job', args_3) + add_step('job-test-run-job', args_4) + add_step('job-test-run-job', args_5) assert run_job('job-test-run-job', process_step) is False @@ -71,19 +94,19 @@ def test_run_jobs() -> None: { 'source_path': get_test_example_file('source.jpg'), 'target_path': get_test_example_file('target-240p.mp4'), - 'output_path': get_test_output_file('output-1.mp4') + 'output_path': get_test_output_path('output-1.mp4') } args_2 =\ { 'source_path': get_test_example_file('source.jpg'), 'target_path': get_test_example_file('target-240p.mp4'), - 'output_path': get_test_output_file('output-2.mp4') + 'output_path': get_test_output_path('output-2.mp4') } args_3 =\ { 'source_path': get_test_example_file('source.jpg'), 'target_path': get_test_example_file('target-240p.jpg'), - 'output_path': get_test_output_file('output-3.jpg') + 'output_path': get_test_output_path('output-3.jpg') } halt_on_error = True @@ -108,7 +131,7 @@ def test_retry_job() -> None: { 'source_path': get_test_example_file('source.jpg'), 'target_path': get_test_example_file('target-240p.mp4'), - 'output_path': get_test_output_file('output-1.mp4') + 'output_path': get_test_output_path('output-1.mp4') } assert retry_job('job-invalid', process_step) is False @@ -129,19 +152,19 @@ def test_retry_jobs() -> None: { 'source_path': get_test_example_file('source.jpg'), 'target_path': get_test_example_file('target-240p.mp4'), - 'output_path': get_test_output_file('output-1.mp4') + 'output_path': get_test_output_path('output-1.mp4') } args_2 =\ { 'source_path': get_test_example_file('source.jpg'), 'target_path': get_test_example_file('target-240p.mp4'), - 'output_path': get_test_output_file('output-2.mp4') + 'output_path': get_test_output_path('output-2.mp4') } args_3 =\ { 'source_path': get_test_example_file('source.jpg'), 'target_path': get_test_example_file('target-240p.jpg'), - 'output_path': get_test_output_file('output-3.jpg') + 'output_path': get_test_output_path('output-3.jpg') } halt_on_error = True @@ -167,19 +190,19 @@ def test_run_steps() -> None: { 'source_path': get_test_example_file('source.jpg'), 'target_path': get_test_example_file('target-240p.mp4'), - 'output_path': get_test_output_file('output-1.mp4') + 'output_path': get_test_output_path('output-1.mp4') } args_2 =\ { 'source_path': get_test_example_file('source.jpg'), 'target_path': get_test_example_file('target-240p.mp4'), - 'output_path': get_test_output_file('output-2.mp4') + 'output_path': get_test_output_path('output-2.mp4') } args_3 =\ { 'source_path': get_test_example_file('source.jpg'), 'target_path': get_test_example_file('target-240p.jpg'), - 'output_path': get_test_output_file('output-3.jpg') + 'output_path': get_test_output_path('output-3.jpg') } assert run_steps('job-invalid', process_step) is False @@ -198,19 +221,31 @@ def test_finalize_steps() -> None: { 'source_path': get_test_example_file('source.jpg'), 'target_path': get_test_example_file('target-240p.mp4'), - 'output_path': get_test_output_file('output-1.mp4') + 'output_path': get_test_output_path('output-1.mp4') } args_2 =\ { 'source_path': get_test_example_file('source.jpg'), 'target_path': get_test_example_file('target-240p.mp4'), - 'output_path': get_test_output_file('output-2.mp4') + 'output_path': get_test_output_path('output-2.mp4') } args_3 =\ { 'source_path': get_test_example_file('source.jpg'), 'target_path': get_test_example_file('target-240p.jpg'), - 'output_path': get_test_output_file('output-3.jpg') + 'output_path': get_test_output_path('output-3.jpg') + } + args_4 =\ + { + 'source_path': get_test_example_file('source.jpg'), + 'target_path': get_test_example_file('target-240p.jpg'), + 'output_path': get_test_output_path('output-4') + } + args_5 =\ + { + 'source_path': get_test_example_file('source.jpg'), + 'target_path': get_test_example_file('target-240p.jpg'), + 'output_path': get_test_output_path('output-4') } create_job('job-test-finalize-steps') @@ -218,16 +253,26 @@ def test_finalize_steps() -> None: add_step('job-test-finalize-steps', args_1) add_step('job-test-finalize-steps', args_2) add_step('job-test-finalize-steps', args_3) + add_step('job-test-finalize-steps', args_4) + add_step('job-test-finalize-steps', args_5) - copy_file(args_1.get('target_path'), get_test_output_file('output-1-job-test-finalize-steps-0.mp4')) - copy_file(args_1.get('target_path'), get_test_output_file('output-1-job-test-finalize-steps-1.mp4')) - copy_file(args_2.get('target_path'), get_test_output_file('output-2-job-test-finalize-steps-2.mp4')) - copy_file(args_3.get('target_path'), get_test_output_file('output-3-job-test-finalize-steps-3.jpg')) + copy_file(args_1.get('target_path'), get_test_output_path('output-1-job-test-finalize-steps-0.mp4')) + copy_file(args_1.get('target_path'), get_test_output_path('output-1-job-test-finalize-steps-1.mp4')) + copy_file(args_2.get('target_path'), get_test_output_path('output-2-job-test-finalize-steps-2.mp4')) + copy_file(args_3.get('target_path'), get_test_output_path('output-3-job-test-finalize-steps-3.jpg')) + + temp_directory_1 = get_test_output_path('output-4-job-test-finalize-steps-4') + temp_directory_2 = get_test_output_path('output-4-job-test-finalize-steps-5') + create_directory(temp_directory_1) + create_directory(temp_directory_2) + copy_file(args_4.get('target_path'), os.path.join(temp_directory_1, '00000001.jpg')) + copy_file(args_5.get('target_path'), os.path.join(temp_directory_2, '00000002.jpg')) assert finalize_steps('job-test-finalize-steps') is True assert is_test_output_file('output-1.mp4') is True assert is_test_output_file('output-2.mp4') is True assert is_test_output_file('output-3.jpg') is True + assert is_test_output_sequence(get_test_output_path('output-4')) is True def test_collect_output_set() -> None: @@ -235,19 +280,25 @@ def test_collect_output_set() -> None: { 'source_path': get_test_example_file('source.jpg'), 'target_path': get_test_example_file('target-240p.mp4'), - 'output_path': get_test_output_file('output-1.mp4') + 'output_path': get_test_output_path('output-1.mp4') } args_2 =\ { 'source_path': get_test_example_file('source.jpg'), 'target_path': get_test_example_file('target-240p.mp4'), - 'output_path': get_test_output_file('output-2.mp4') + 'output_path': get_test_output_path('output-2.mp4') } args_3 =\ { 'source_path': get_test_example_file('source.jpg'), 'target_path': get_test_example_file('target-240p.jpg'), - 'output_path': get_test_output_file('output-3.jpg') + 'output_path': get_test_output_path('output-3.jpg') + } + args_4 = \ + { + 'source_path': get_test_example_file('source.jpg'), + 'target_path': get_test_example_file('target-240p.mp4'), + 'output_path': get_test_output_path('output-4') } create_job('job-test-collect-output-set') @@ -255,21 +306,26 @@ def test_collect_output_set() -> None: add_step('job-test-collect-output-set', args_1) add_step('job-test-collect-output-set', args_2) add_step('job-test-collect-output-set', args_3) + add_step('job-test-collect-output-set', args_4) output_set =\ { - get_test_output_file('output-1.mp4'): + get_test_output_path('output-1.mp4'): [ - get_test_output_file('output-1-job-test-collect-output-set-0.mp4'), - get_test_output_file('output-1-job-test-collect-output-set-1.mp4') + get_test_output_path('output-1-job-test-collect-output-set-0.mp4'), + get_test_output_path('output-1-job-test-collect-output-set-1.mp4') ], - get_test_output_file('output-2.mp4'): + get_test_output_path('output-2.mp4'): [ - get_test_output_file('output-2-job-test-collect-output-set-2.mp4') + get_test_output_path('output-2-job-test-collect-output-set-2.mp4') ], - get_test_output_file('output-3.jpg'): + get_test_output_path('output-3.jpg'): [ - get_test_output_file('output-3-job-test-collect-output-set-3.jpg') + get_test_output_path('output-3-job-test-collect-output-set-3.jpg') + ], + get_test_output_path('output-4'): + [ + get_test_output_path('output-4-job-test-collect-output-set-4') ] } diff --git a/tests/test_vision.py b/tests/test_vision.py index 525f0f80..2eb2aae6 100644 --- a/tests/test_vision.py +++ b/tests/test_vision.py @@ -4,7 +4,7 @@ import pytest from facefusion.download import conditional_download from facefusion.vision import calculate_histogram_difference, count_video_frame_total, detect_image_resolution, detect_video_duration, detect_video_fps, detect_video_resolution, match_frame_color, normalize_resolution, pack_resolution, predict_video_frame_total, read_image, read_video_frame, restrict_image_resolution, restrict_trim_video_frame, restrict_video_fps, restrict_video_resolution, scale_resolution, unpack_resolution, write_image -from .helper import get_test_example_file, get_test_examples_directory, get_test_output_file, prepare_test_output_directory +from .helper import get_test_example_file, get_test_examples_directory, get_test_output_path, prepare_test_output_directory @pytest.fixture(scope = 'module', autouse = True) @@ -42,8 +42,8 @@ def test_read_image() -> None: def test_write_image() -> None: vision_frame = read_image(get_test_example_file('target-240p.jpg')) - assert write_image(get_test_output_file('target-240p.jpg'), vision_frame) is True - assert write_image(get_test_output_file('目标-240p.webp'), vision_frame) is True + assert write_image(get_test_output_path('target-240p.jpg'), vision_frame) is True + assert write_image(get_test_output_path('目标-240p.webp'), vision_frame) is True def test_detect_image_resolution() -> None: