add test

cleanup

remove -w

move --workflow position

fix test

add --worflow, audio-to-image, image-to-image, image-to-video
This commit is contained in:
harisreedhar
2025-11-24 21:51:52 +05:30
committed by henryruhs
parent 051aed6133
commit de2d5cfc7d
27 changed files with 328 additions and 106 deletions
+2
View File
@@ -8,6 +8,8 @@ from facefusion.vision import detect_video_fps
def apply_args(args : Args, apply_state_item : ApplyStateItem) -> None:
# general
apply_state_item('command', args.get('command'))
# workflow
apply_state_item('workflow', args.get('workflow'))
# paths
apply_state_item('temp_path', args.get('temp_path'))
apply_state_item('jobs_path', args.get('jobs_path'))
+30 -2
View File
@@ -1,5 +1,6 @@
import math
from functools import lru_cache
from typing import Any, List, Optional
from typing import Any, List, Optional, Tuple
import numpy
import scipy
@@ -7,7 +8,8 @@ from numpy.typing import NDArray
from facefusion.ffmpeg import read_audio_buffer
from facefusion.filesystem import is_audio
from facefusion.types import Audio, AudioFrame, Fps, Mel, MelFilterBank, Spectrogram
from facefusion.media_helper import restrict_trim_frame
from facefusion.types import Audio, AudioFrame, Duration, Fps, Mel, MelFilterBank, Spectrogram
from facefusion.voice_extractor import batch_extract_voice
@@ -141,3 +143,29 @@ def create_spectrogram(audio : Audio) -> Spectrogram:
spectrogram = scipy.signal.stft(audio, nperseg = mel_bin_total, nfft = mel_bin_total, noverlap = mel_bin_overlap)[2]
spectrogram = numpy.dot(mel_filter_bank, numpy.abs(spectrogram))
return spectrogram
def count_audio_frame_total(audio_path : str, fps : Fps) -> int:
audio_duration = detect_audio_duration(audio_path)
if audio_duration > 0:
return math.ceil(audio_duration * fps)
return 0
def detect_audio_duration(audio_path : str) -> Duration:
audio_sample_rate = 48000
audio_sample_size = 16
audio_channel_total = 2
if is_audio(audio_path):
audio_buffer = read_audio_buffer(audio_path, audio_sample_rate, audio_sample_size, audio_channel_total)
if audio_buffer:
audio = numpy.frombuffer(audio_buffer, dtype = numpy.int16).reshape(-1, audio_channel_total)
audio_duration = len(audio) / audio_sample_rate
return audio_duration
return 0
def restrict_trim_audio_frame(audio_path : str, fps : Fps, trim_frame_start : Optional[int], trim_frame_end : Optional[int]) -> Tuple[int, int]:
audio_frame_total = count_audio_frame_total(audio_path, fps)
return restrict_trim_frame(audio_frame_total, trim_frame_start, trim_frame_end)
+2
View File
@@ -45,6 +45,8 @@ face_mask_regions : List[FaceMaskRegion] = list(get_args(FaceMaskRegion))
voice_extractor_models : List[VoiceExtractorModel] = list(get_args(VoiceExtractorModel))
workflows : List[WorkFlow] = [ 'audio-to-image', 'image-to-image', 'image-to-video' ]
audio_type_set : AudioTypeSet =\
{
'flac': 'audio/flac',
+6 -4
View File
@@ -12,14 +12,14 @@ from facefusion.apis.core import create_api
from facefusion.args_helper import apply_args
from facefusion.download import conditional_download_hashes, conditional_download_sources
from facefusion.exit_helper import hard_exit, signal_exit
from facefusion.filesystem import get_file_extension, get_file_name, is_image, is_video, resolve_file_paths, resolve_file_pattern
from facefusion.filesystem import get_file_extension, get_file_name, resolve_file_paths, resolve_file_pattern
from facefusion.jobs import job_helper, job_manager, job_runner
from facefusion.jobs.job_list import compose_job_list
from facefusion.processors.core import get_processors_modules
from facefusion.program import create_program
from facefusion.program_helper import validate_args
from facefusion.types import Args, ErrorCode
from facefusion.workflows import image_to_image, image_to_video
from facefusion.workflows import audio_to_image, image_to_image, image_to_video
def cli() -> None:
@@ -332,9 +332,11 @@ def conditional_process() -> ErrorCode:
if not processor_module.pre_process('output'):
return 2
if is_image(state_manager.get_item('target_path')):
if state_manager.get_item('workflow') == 'audio-to-image':
return audio_to_image.process(start_time)
if state_manager.get_item('workflow') == 'image-to-image':
return image_to_image.process(start_time)
if is_video(state_manager.get_item('target_path')):
if state_manager.get_item('workflow') == 'image-to-video':
return image_to_video.process(start_time)
return 0
+17
View File
@@ -0,0 +1,17 @@
from typing import Optional, Tuple
def restrict_trim_frame(frame_total : int, trim_frame_start : Optional[int], trim_frame_end : Optional[int]) -> Tuple[int, int]:
if isinstance(trim_frame_start, int):
trim_frame_start = max(0, min(trim_frame_start, frame_total))
if isinstance(trim_frame_end, int):
trim_frame_end = max(0, min(trim_frame_end, frame_total))
if isinstance(trim_frame_start, int) and isinstance(trim_frame_end, int):
return trim_frame_start, trim_frame_end
if isinstance(trim_frame_start, int):
return trim_frame_start, frame_total
if isinstance(trim_frame_end, int):
return 0, trim_frame_end
return 0, frame_total
+12 -4
View File
@@ -29,6 +29,14 @@ def create_config_path_program() -> ArgumentParser:
return program
def create_workflow_program() -> ArgumentParser:
program = ArgumentParser(add_help = False)
group_paths = program.add_argument_group('paths')
group_paths.add_argument('--workflow', help = translator.get('help.workflow'), choices = facefusion.choices.workflows)
args_store.register_args([ 'workflow' ], scopes = [ 'api', 'cli' ])
return program
def create_temp_path_program() -> ArgumentParser:
program = ArgumentParser(add_help = False)
group_paths = program.add_argument_group('paths')
@@ -285,7 +293,7 @@ def collect_step_program() -> ArgumentParser:
def collect_job_program() -> ArgumentParser:
return ArgumentParser(parents = [ create_execution_program(), create_download_providers_program(), create_memory_program(), create_log_level_program() ], add_help = False)
return ArgumentParser(parents = [ create_workflow_program(), create_execution_program(), create_download_providers_program(), create_memory_program(), create_log_level_program() ], add_help = False)
def create_program() -> ArgumentParser:
@@ -304,9 +312,9 @@ def create_program() -> ArgumentParser:
sub_program.add_parser('job-submit-all', help = translator.get('help.job_submit_all'), parents = [ create_jobs_path_program(), create_log_level_program(), create_halt_on_error_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-delete', help = translator.get('help.job_delete'), parents = [ create_job_id_program(), create_jobs_path_program(), create_log_level_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-delete-all', help = translator.get('help.job_delete_all'), parents = [ create_jobs_path_program(), create_log_level_program(), create_halt_on_error_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-add-step', help = translator.get('help.job_add_step'), parents = [ create_job_id_program(), create_config_path_program(), create_jobs_path_program(), create_source_paths_program(), create_target_path_program(), create_output_path_program(), collect_step_program(), create_log_level_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-remix-step', help = translator.get('help.job_remix_step'), parents = [ create_job_id_program(), create_step_index_program(), create_config_path_program(), create_jobs_path_program(), create_source_paths_program(), create_output_path_program(), collect_step_program(), create_log_level_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-insert-step', help = translator.get('help.job_insert_step'), parents = [ create_job_id_program(), create_step_index_program(), create_config_path_program(), create_jobs_path_program(), create_source_paths_program(), create_target_path_program(), create_output_path_program(), collect_step_program(), create_log_level_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-add-step', help = translator.get('help.job_add_step'), parents = [ create_job_id_program(), create_workflow_program(), create_config_path_program(), create_jobs_path_program(), create_source_paths_program(), create_target_path_program(), create_output_path_program(), collect_step_program(), create_log_level_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-remix-step', help = translator.get('help.job_remix_step'), parents = [ create_job_id_program(), create_workflow_program(), create_step_index_program(), create_config_path_program(), create_jobs_path_program(), create_source_paths_program(), create_output_path_program(), collect_step_program(), create_log_level_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-insert-step', help = translator.get('help.job_insert_step'), parents = [ create_job_id_program(), create_workflow_program(), create_step_index_program(), create_config_path_program(), create_jobs_path_program(), create_source_paths_program(), create_target_path_program(), create_output_path_program(), collect_step_program(), create_log_level_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-remove-step', help = translator.get('help.job_remove_step'), parents = [ create_job_id_program(), create_step_index_program(), create_jobs_path_program(), create_log_level_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-run', help = translator.get('help.job_run'), parents = [ create_job_id_program(), create_config_path_program(), create_temp_path_program(), create_jobs_path_program(), collect_job_program() ], formatter_class = create_help_formatter_large)
sub_program.add_parser('job-run-all', help = translator.get('help.job_run_all'), parents = [ create_config_path_program(), create_temp_path_program(), create_jobs_path_program(), collect_job_program(), create_halt_on_error_program() ], formatter_class = create_help_formatter_large)
+14 -3
View File
@@ -23,12 +23,12 @@ def resolve_temp_frame_paths(target_path : str) -> List[str]:
def get_temp_frames_pattern(target_path : str, temp_frame_prefix : str) -> str:
temp_directory_path = get_temp_directory_path(target_path)
return os.path.join(temp_directory_path, temp_frame_prefix + '.' + state_manager.get_item('temp_frame_format'))
return os.path.join(temp_directory_path, temp_frame_prefix + '.' + state_manager.get_item('temp_frame_format')) # TODO: remove state_manager.get
def get_temp_directory_path(file_path : str) -> str:
temp_file_name = get_file_name(file_path)
return os.path.join(state_manager.get_item('temp_path'), 'facefusion', temp_file_name)
return os.path.join(state_manager.get_item('temp_path'), 'facefusion', temp_file_name) # TODO: remove state_manager.get
def create_temp_directory(file_path : str) -> bool:
@@ -37,7 +37,18 @@ def create_temp_directory(file_path : str) -> bool:
def clear_temp_directory(file_path : str) -> bool:
if not state_manager.get_item('keep_temp'):
if not state_manager.get_item('keep_temp'): # TODO: remove state_manager.get
temp_directory_path = get_temp_directory_path(file_path)
return remove_directory(temp_directory_path)
return True
def get_temp_sequence_paths(file_path : str, frame_total : int, temp_frame_prefix : str, temp_frame_format : str) -> List[str]:
temp_directory_path = get_temp_directory_path(file_path)
temp_frame_paths = []
for frame_number in range(frame_total):
temp_file_name = temp_frame_prefix % (frame_number + 1) + '.' + temp_frame_format
temp_frame_path = os.path.join(temp_directory_path, temp_file_name)
temp_frame_paths.append(temp_frame_path)
return temp_frame_paths
+2
View File
@@ -55,6 +55,8 @@ Language = Literal['en']
Locales : TypeAlias = Dict[Language, Dict[str, Any]]
LocalePoolSet : TypeAlias = Dict[str, Locales]
WorkFlow = Literal['audio-to-image', 'image-to-image', 'image-to-video']
VideoCaptureSet : TypeAlias = Dict[str, cv2.VideoCapture]
VideoWriterSet : TypeAlias = Dict[str, cv2.VideoWriter]
CameraCaptureSet : TypeAlias = Dict[str, cv2.VideoCapture]
+5 -22
View File
@@ -8,6 +8,7 @@ from cv2.typing import Size
from facefusion.common_helper import is_windows
from facefusion.filesystem import get_file_extension, is_image, is_video
from facefusion.media_helper import restrict_trim_frame
from facefusion.thread_helper import thread_semaphore
from facefusion.types import ColorMode, Duration, Fps, Mask, Orientation, Resolution, Scale, VisionFrame
from facefusion.video_manager import get_video_capture
@@ -107,7 +108,8 @@ def count_video_frame_total(video_path : str) -> int:
def predict_video_frame_total(video_path : str, fps : Fps, trim_frame_start : int, trim_frame_end : int) -> int:
if is_video(video_path):
video_fps = detect_video_fps(video_path)
extract_frame_total = count_trim_frame_total(video_path, trim_frame_start, trim_frame_end) * fps / video_fps
trim_frame_start, trim_frame_end = restrict_trim_video_frame(video_path, trim_frame_start, trim_frame_end)
extract_frame_total = (trim_frame_end - trim_frame_start) * fps / video_fps
return math.floor(extract_frame_total)
return 0
@@ -141,28 +143,9 @@ def detect_video_duration(video_path : str) -> Duration:
return 0
def count_trim_frame_total(video_path : str, trim_frame_start : Optional[int], trim_frame_end : Optional[int]) -> int:
trim_frame_start, trim_frame_end = restrict_trim_frame(video_path, trim_frame_start, trim_frame_end)
return trim_frame_end - trim_frame_start
def restrict_trim_frame(video_path : str, trim_frame_start : Optional[int], trim_frame_end : Optional[int]) -> Tuple[int, int]:
def restrict_trim_video_frame(video_path : str, trim_frame_start : Optional[int], trim_frame_end : Optional[int]) -> Tuple[int, int]:
video_frame_total = count_video_frame_total(video_path)
if isinstance(trim_frame_start, int):
trim_frame_start = max(0, min(trim_frame_start, video_frame_total))
if isinstance(trim_frame_end, int):
trim_frame_end = max(0, min(trim_frame_end, video_frame_total))
if isinstance(trim_frame_start, int) and isinstance(trim_frame_end, int):
return trim_frame_start, trim_frame_end
if isinstance(trim_frame_start, int):
return trim_frame_start, video_frame_total
if isinstance(trim_frame_end, int):
return 0, trim_frame_end
return 0, video_frame_total
return restrict_trim_frame(video_frame_total, trim_frame_start, trim_frame_end)
def detect_video_resolution(video_path : str) -> Optional[Resolution]:
+160
View File
@@ -0,0 +1,160 @@
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import partial
import numpy
from tqdm import tqdm
from facefusion import content_analyser, ffmpeg, logger, process_manager, state_manager, translator
from facefusion.audio import create_empty_audio_frame, get_audio_frame, get_voice_frame, restrict_trim_audio_frame
from facefusion.common_helper import get_first
from facefusion.filesystem import filter_audio_paths, is_video
from facefusion.processors.core import get_processors_modules
from facefusion.temp_helper import get_temp_sequence_paths, move_temp_file
from facefusion.time_helper import calculate_end_time
from facefusion.types import ErrorCode
from facefusion.vision import conditional_merge_vision_mask, detect_image_resolution, extract_vision_mask, pack_resolution, read_static_image, read_static_images, restrict_trim_video_frame, scale_resolution, write_image
from facefusion.workflows.core import clear, is_process_stopping, setup
def process(start_time : float) -> ErrorCode:
tasks =\
[
analyse_image,
clear,
setup,
process_image,
merge_frames,
restore_audio,
partial(finalize_video, start_time),
clear
]
process_manager.start()
for task in tasks:
error_code = task() # type:ignore[operator]
if error_code > 0:
process_manager.end()
return error_code
process_manager.end()
return 0
def analyse_image() -> ErrorCode: # TODO: reusable block
if content_analyser.analyse_image(state_manager.get_item('target_path')):
return 3
return 0
def process_image() -> ErrorCode:
state_manager.set_item('output_video_fps', 25.0) # TODO: set default fps value
source_audio_path = get_first(filter_audio_paths(state_manager.get_item('source_paths')))
trim_frame_start, trim_frame_end = restrict_trim_audio_frame(source_audio_path, state_manager.get_item('output_video_fps'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end'))
audio_frame_total = trim_frame_end - trim_frame_start
temp_frame_paths = get_temp_sequence_paths(state_manager.get_item('output_path'), audio_frame_total, '%08d', state_manager.get_item('temp_frame_format'))
if temp_frame_paths:
with tqdm(total = len(temp_frame_paths), desc = translator.get('processing'), unit = 'frame', ascii = ' =', disable = state_manager.get_item('log_level') in [ 'warn', 'error' ]) as progress:
progress.set_postfix(execution_providers = state_manager.get_item('execution_providers'))
with ThreadPoolExecutor(max_workers = state_manager.get_item('execution_thread_count')) as executor:
futures = []
for frame_number, temp_frame_path in enumerate(temp_frame_paths):
future = executor.submit(process_temp_frame, temp_frame_path, frame_number)
futures.append(future)
for future in as_completed(futures):
if is_process_stopping():
for __future__ in futures:
__future__.cancel()
if not future.cancelled():
future.result()
progress.update()
for processor_module in get_processors_modules(state_manager.get_item('processors')):
processor_module.post_process()
if is_process_stopping():
return 4
else:
logger.error(translator.get('temp_frames_not_found'), __name__)
return 1
return 0
def process_temp_frame(temp_frame_path : str, frame_number : int) -> bool: # TODO refinement like to_video.py file.
output_video_fps = state_manager.get_item('output_video_fps')
reference_vision_frame = read_static_image(state_manager.get_item('target_path'))
source_vision_frames = read_static_images(state_manager.get_item('source_paths'))
source_audio_path = get_first(filter_audio_paths(state_manager.get_item('source_paths')))
target_vision_frame = read_static_image(state_manager.get_item('target_path'), 'rgba')
temp_vision_frame = target_vision_frame.copy()
temp_vision_mask = extract_vision_mask(temp_vision_frame)
source_audio_frame = get_audio_frame(source_audio_path, output_video_fps, frame_number)
source_voice_frame = get_voice_frame(source_audio_path, output_video_fps, frame_number)
if not numpy.any(source_audio_frame):
source_audio_frame = create_empty_audio_frame()
if not numpy.any(source_voice_frame):
source_voice_frame = create_empty_audio_frame()
for processor_module in get_processors_modules(state_manager.get_item('processors')):
temp_vision_frame, temp_vision_mask = processor_module.process_frame(
{
'reference_vision_frame': reference_vision_frame,
'source_vision_frames': source_vision_frames,
'source_audio_frame': source_audio_frame,
'source_voice_frame': source_voice_frame,
'target_vision_frame': target_vision_frame[:, :, :3],
'temp_vision_frame': temp_vision_frame[:, :, :3],
'temp_vision_mask': temp_vision_mask
})
temp_vision_frame = conditional_merge_vision_mask(temp_vision_frame, temp_vision_mask)
return write_image(temp_frame_path, temp_vision_frame)
def merge_frames() -> ErrorCode:
trim_frame_start, trim_frame_end = restrict_trim_video_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end'))
output_video_resolution = scale_resolution(detect_image_resolution(state_manager.get_item('target_path')), state_manager.get_item('output_image_scale'))
logger.info(translator.get('merging_video').format(resolution = pack_resolution(output_video_resolution), fps = state_manager.get_item('output_video_fps')), __name__)
if ffmpeg.merge_video(state_manager.get_item('target_path'), state_manager.get_item('output_path'), state_manager.get_item('output_video_fps'), output_video_resolution, trim_frame_start, trim_frame_end):
logger.debug(translator.get('merging_video_succeeded'), __name__)
else:
if is_process_stopping():
return 4
logger.error(translator.get('merging_video_failed'), __name__)
return 1
return 0
def restore_audio() -> ErrorCode:
if state_manager.get_item('output_audio_volume') == 0:
logger.info(translator.get('skipping_audio'), __name__)
move_temp_file(state_manager.get_item('output_path'))
else:
source_audio_path = get_first(filter_audio_paths(state_manager.get_item('source_paths')))
if source_audio_path:
if ffmpeg.replace_audio(source_audio_path, state_manager.get_item('output_path')):
logger.debug(translator.get('replacing_audio_succeeded'), __name__)
else:
if is_process_stopping():
return 4
logger.warn(translator.get('replacing_audio_skipped'), __name__)
move_temp_file(state_manager.get_item('output_path'))
return 0
def finalize_video(start_time : float) -> ErrorCode:
if is_video(state_manager.get_item('output_path')):
logger.info(translator.get('processing_video_succeeded').format(seconds = calculate_end_time(start_time)), __name__)
else:
logger.error(translator.get('processing_video_failed'), __name__)
return 1
return 0
+5 -5
View File
@@ -12,7 +12,7 @@ from facefusion.processors.core import get_processors_modules
from facefusion.temp_helper import move_temp_file, resolve_temp_frame_paths
from facefusion.time_helper import calculate_end_time
from facefusion.types import ErrorCode
from facefusion.vision import conditional_merge_vision_mask, detect_video_resolution, extract_vision_mask, pack_resolution, read_static_image, read_static_images, read_static_video_frame, restrict_trim_frame, restrict_video_fps, restrict_video_resolution, scale_resolution, write_image
from facefusion.vision import conditional_merge_vision_mask, detect_video_resolution, extract_vision_mask, pack_resolution, read_static_image, read_static_images, read_static_video_frame, restrict_trim_video_frame, restrict_video_fps, restrict_video_resolution, scale_resolution, write_image
from facefusion.workflows.core import clear, is_process_stopping, setup
@@ -44,7 +44,7 @@ def process(start_time : float) -> ErrorCode:
def analyse_video() -> ErrorCode:
trim_frame_start, trim_frame_end = restrict_trim_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end'))
trim_frame_start, trim_frame_end = restrict_trim_video_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end'))
if content_analyser.analyse_video(state_manager.get_item('target_path'), trim_frame_start, trim_frame_end):
return 3
@@ -52,7 +52,7 @@ def analyse_video() -> ErrorCode:
def extract_frames() -> ErrorCode:
trim_frame_start, trim_frame_end = restrict_trim_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end'))
trim_frame_start, trim_frame_end = restrict_trim_video_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end'))
output_video_resolution = scale_resolution(detect_video_resolution(state_manager.get_item('target_path')), state_manager.get_item('output_video_scale'))
temp_video_resolution = restrict_video_resolution(state_manager.get_item('target_path'), output_video_resolution)
temp_video_fps = restrict_video_fps(state_manager.get_item('target_path'), state_manager.get_item('output_video_fps'))
@@ -103,7 +103,7 @@ def process_video() -> ErrorCode:
def merge_frames() -> ErrorCode:
trim_frame_start, trim_frame_end = restrict_trim_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end'))
trim_frame_start, trim_frame_end = restrict_trim_video_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end'))
output_video_resolution = scale_resolution(detect_video_resolution(state_manager.get_item('target_path')), state_manager.get_item('output_video_scale'))
temp_video_fps = restrict_video_fps(state_manager.get_item('target_path'), state_manager.get_item('output_video_fps'))
@@ -119,7 +119,7 @@ def merge_frames() -> ErrorCode:
def restore_audio() -> ErrorCode:
trim_frame_start, trim_frame_end = restrict_trim_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end'))
trim_frame_start, trim_frame_end = restrict_trim_video_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end'))
if state_manager.get_item('output_audio_volume') == 0:
logger.info(translator.get('skipping_audio'), __name__)
+18 -1
View File
@@ -1,8 +1,9 @@
import subprocess
import pytest
from pytest import approx
from facefusion.audio import get_audio_frame, read_static_audio
from facefusion.audio import detect_audio_duration, get_audio_frame, read_static_audio, restrict_trim_audio_frame
from facefusion.download import conditional_download
from .helper import get_test_example_file, get_test_examples_directory
@@ -26,3 +27,19 @@ def test_read_static_audio() -> None:
assert len(read_static_audio(get_test_example_file('source.mp3'), 25)) == 280
assert len(read_static_audio(get_test_example_file('source.wav'), 25)) == 280
assert read_static_audio('invalid', 25) is None
def test_detect_audio_duration() -> None:
assert detect_audio_duration(get_test_example_file('source.mp3')) == approx(3.788, rel = 1e-3)
assert detect_audio_duration(get_test_example_file('source.wav')) == approx(3.788, rel = 1e-3)
assert detect_audio_duration('invalid') == 0
def test_restrict_trim_audio_frame() -> None:
assert restrict_trim_audio_frame(get_test_example_file('source.mp3'), 25, 0, 50) == (0, 50)
assert restrict_trim_audio_frame(get_test_example_file('source.mp3'), 25, 20, 95) == (20, 95)
assert restrict_trim_audio_frame(get_test_example_file('source.mp3'), 25, -10, None) == (0, 95)
assert restrict_trim_audio_frame(get_test_example_file('source.mp3'), 25, None, -10) == (0, 0)
assert restrict_trim_audio_frame(get_test_example_file('source.mp3'), 25, 100, None) == (95, 95)
assert restrict_trim_audio_frame(get_test_example_file('source.mp3'), 25, None, 100) == (0, 95)
assert restrict_trim_audio_frame(get_test_example_file('source.mp3'), 25, None, None) == (0, 95)
+2 -2
View File
@@ -25,14 +25,14 @@ def before_each() -> None:
def test_modify_age_to_image() -> None:
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'age_modifier', '--age-modifier-direction', '100', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-age-face-to-image.jpg') ]
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'age_modifier', '--age-modifier-direction', '100', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-age-face-to-image.jpg') ]
assert subprocess.run(commands).returncode == 0
assert is_test_output_file('test-age-face-to-image.jpg') is True
def test_modify_age_to_video() -> None:
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'age_modifier', '--age-modifier-direction', '100', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-age-face-to-video.mp4'), '--trim-frame-end', '1' ]
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'age_modifier', '--age-modifier-direction', '100', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-age-face-to-video.mp4'), '--trim-frame-end', '1' ]
assert subprocess.run(commands).returncode == 0
assert is_test_output_file('test-age-face-to-video.mp4') is True
+2 -2
View File
@@ -26,14 +26,14 @@ def before_each() -> None:
def test_remove_background_to_image() -> None:
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'background_remover', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-remove-background-to-image.jpg') ]
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'background_remover', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-remove-background-to-image.jpg') ]
assert subprocess.run(commands).returncode == 0
assert is_test_output_file('test-remove-background-to-image.jpg') is True
def test_remove_background_to_video() -> None:
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'background_remover', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-remove-background-to-video.mp4'), '--trim-frame-end', '1' ]
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'background_remover', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-remove-background-to-video.mp4'), '--trim-frame-end', '1' ]
assert subprocess.run(commands).returncode == 0
assert is_test_output_file('test-remove-background-to-video.mp4') is True
+2 -2
View File
@@ -26,7 +26,7 @@ def before_each() -> None:
def test_batch_run_targets() -> None:
commands = [ sys.executable, 'facefusion.py', 'batch-run', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p-batch-*.jpg'), '-o', get_test_output_file('test-batch-run-targets-{index}.jpg') ]
commands = [ sys.executable, 'facefusion.py', 'batch-run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'face_debugger', '-t', get_test_example_file('target-240p-batch-*.jpg'), '-o', get_test_output_file('test-batch-run-targets-{index}.jpg') ]
assert subprocess.run(commands).returncode == 0
assert is_test_output_file('test-batch-run-targets-0.jpg') is True
@@ -35,7 +35,7 @@ def test_batch_run_targets() -> None:
def test_batch_run_sources_to_targets() -> None:
commands = [ sys.executable, 'facefusion.py', 'batch-run', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('target-240p-batch-*.jpg'), '-t', get_test_example_file('target-240p-batch-*.jpg'), '-o', get_test_output_file('test-batch-run-sources-to-targets-{index}.jpg') ]
commands = [ sys.executable, 'facefusion.py', 'batch-run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('target-240p-batch-*.jpg'), '-t', get_test_example_file('target-240p-batch-*.jpg'), '-o', get_test_output_file('test-batch-run-sources-to-targets-{index}.jpg') ]
assert subprocess.run(commands).returncode == 0
assert is_test_output_file('test-batch-run-sources-to-targets-0.jpg') is True
+2 -2
View File
@@ -25,14 +25,14 @@ def before_each() -> None:
def test_restore_expression_to_image() -> None:
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'expression_restorer', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-restore-expression-to-image.jpg') ]
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'expression_restorer', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-restore-expression-to-image.jpg') ]
assert subprocess.run(commands).returncode == 0
assert is_test_output_file('test-restore-expression-to-image.jpg') is True
def test_restore_expression_to_video() -> None:
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'expression_restorer', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-restore-expression-to-video.mp4'), '--trim-frame-end', '1' ]
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'expression_restorer', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-restore-expression-to-video.mp4'), '--trim-frame-end', '1' ]
assert subprocess.run(commands).returncode == 0
assert is_test_output_file('test-restore-expression-to-video.mp4') is True
+2 -2
View File
@@ -26,14 +26,14 @@ def before_each() -> None:
def test_debug_face_to_image() -> None:
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-debug-face-to-image.jpg') ]
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-debug-face-to-image.jpg') ]
assert subprocess.run(commands).returncode == 0
assert is_test_output_file('test-debug-face-to-image.jpg') is True
def test_debug_face_to_video() -> None:
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-debug-face-to-video.mp4'), '--trim-frame-end', '1' ]
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-debug-face-to-video.mp4'), '--trim-frame-end', '1' ]
assert subprocess.run(commands).returncode == 0
assert is_test_output_file('test-debug-face-to-video.mp4') is True
+2 -2
View File
@@ -26,14 +26,14 @@ def before_each() -> None:
def test_edit_face_to_image() -> None:
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_editor', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-edit-face-to-image.jpg') ]
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'face_editor', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-edit-face-to-image.jpg') ]
assert subprocess.run(commands).returncode == 0
assert is_test_output_file('test-edit-face-to-image.jpg') is True
def test_edit_face_to_video() -> None:
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_editor', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-edit-face-to-video.mp4'), '--trim-frame-end', '1' ]
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'face_editor', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-edit-face-to-video.mp4'), '--trim-frame-end', '1' ]
assert subprocess.run(commands).returncode == 0
assert is_test_output_file('test-edit-face-to-video.mp4') is True
+2 -2
View File
@@ -26,14 +26,14 @@ def before_each() -> None:
def test_enhance_face_to_image() -> None:
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_enhancer', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-enhance-face-to-image.jpg') ]
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'face_enhancer', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-enhance-face-to-image.jpg') ]
assert subprocess.run(commands).returncode == 0
assert is_test_output_file('test-enhance-face-to-image.jpg') is True
def test_enhance_face_to_video() -> None:
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_enhancer', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-enhance-face-to-video.mp4'), '--trim-frame-end', '1' ]
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'face_enhancer', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-enhance-face-to-video.mp4'), '--trim-frame-end', '1' ]
assert subprocess.run(commands).returncode == 0
assert is_test_output_file('test-enhance-face-to-video.mp4') is True
+2 -2
View File
@@ -26,14 +26,14 @@ def before_each() -> None:
def test_swap_face_to_image() -> None:
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_swapper', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-swap-face-to-image.jpg') ]
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'face_swapper', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-swap-face-to-image.jpg') ]
assert subprocess.run(commands).returncode == 0
assert is_test_output_file('test-swap-face-to-image.jpg') is True
def test_swap_face_to_video() -> None:
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_swapper', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-swap-face-to-video.mp4'), '--trim-frame-end', '1' ]
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'face_swapper', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-swap-face-to-video.mp4'), '--trim-frame-end', '1' ]
assert subprocess.run(commands).returncode == 0
assert is_test_output_file('test-swap-face-to-video.mp4') is True
+2 -2
View File
@@ -27,14 +27,14 @@ def before_each() -> None:
def test_colorize_frame_to_image() -> None:
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'frame_colorizer', '-t', get_test_example_file('target-240p-0sat.jpg'), '-o', get_test_output_file('test_colorize-frame-to-image.jpg') ]
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'frame_colorizer', '-t', get_test_example_file('target-240p-0sat.jpg'), '-o', get_test_output_file('test_colorize-frame-to-image.jpg') ]
assert subprocess.run(commands).returncode == 0
assert is_test_output_file('test_colorize-frame-to-image.jpg') is True
def test_colorize_frame_to_video() -> None:
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'frame_colorizer', '-t', get_test_example_file('target-240p-0sat.mp4'), '-o', get_test_output_file('test-colorize-frame-to-video.mp4'), '--trim-frame-end', '1' ]
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'frame_colorizer', '-t', get_test_example_file('target-240p-0sat.mp4'), '-o', get_test_output_file('test-colorize-frame-to-video.mp4'), '--trim-frame-end', '1' ]
assert subprocess.run(commands).returncode == 0
assert is_test_output_file('test-colorize-frame-to-video.mp4') is True
+2 -2
View File
@@ -26,14 +26,14 @@ def before_each() -> None:
def test_enhance_frame_to_image() -> None:
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'frame_enhancer', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-enhance-frame-to-image.jpg') ]
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'frame_enhancer', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-enhance-frame-to-image.jpg') ]
assert subprocess.run(commands).returncode == 0
assert is_test_output_file('test-enhance-frame-to-image.jpg') is True
def test_enhance_frame_to_video() -> None:
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'frame_enhancer', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-enhance-frame-to-video.mp4'), '--trim-frame-end', '1' ]
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'frame_enhancer', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-enhance-frame-to-video.mp4'), '--trim-frame-end', '1' ]
assert subprocess.run(commands).returncode == 0
assert is_test_output_file('test-enhance-frame-to-video.mp4') is True
+14 -14
View File
@@ -50,7 +50,7 @@ def test_job_submit() -> None:
assert subprocess.run(commands).returncode == 1
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-submit', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-submit', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
subprocess.run(commands)
commands = [ sys.executable, 'facefusion.py', 'job-submit', 'test-job-submit', '--jobs-path', get_test_jobs_directory() ]
@@ -73,10 +73,10 @@ def test_submit_all() -> None:
assert subprocess.run(commands).returncode == 1
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-submit-all-1', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-submit-all-1', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
subprocess.run(commands)
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-submit-all-2', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-submit-all-2', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
subprocess.run(commands)
commands = [ sys.executable, 'facefusion.py', 'job-submit-all', '--jobs-path', get_test_jobs_directory(), '--halt-on-error' ]
@@ -122,7 +122,7 @@ def test_job_delete_all() -> None:
def test_job_add_step() -> None:
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-add-step', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-add-step', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
assert subprocess.run(commands).returncode == 1
assert count_step_total('test-job-add-step') == 0
@@ -130,14 +130,14 @@ def test_job_add_step() -> None:
commands = [ sys.executable, 'facefusion.py', 'job-create', 'test-job-add-step', '--jobs-path', get_test_jobs_directory() ]
subprocess.run(commands)
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-add-step', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-add-step', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
assert subprocess.run(commands).returncode == 0
assert count_step_total('test-job-add-step') == 1
def test_job_remix() -> None:
commands = [ sys.executable, 'facefusion.py', 'job-remix-step', 'test-job-remix-step', '0', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
commands = [ sys.executable, 'facefusion.py', 'job-remix-step', 'test-job-remix-step', '0', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
assert subprocess.run(commands).returncode == 1
assert count_step_total('test-job-remix-step') == 0
@@ -145,23 +145,23 @@ def test_job_remix() -> None:
commands = [ sys.executable, 'facefusion.py', 'job-create', 'test-job-remix-step', '--jobs-path', get_test_jobs_directory() ]
subprocess.run(commands)
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-remix-step', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-remix-step', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
subprocess.run(commands)
commands = [ sys.executable, 'facefusion.py', 'job-remix-step', 'test-job-remix-step', '0', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
commands = [ sys.executable, 'facefusion.py', 'job-remix-step', 'test-job-remix-step', '0', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
assert count_step_total('test-job-remix-step') == 1
assert subprocess.run(commands).returncode == 0
assert count_step_total('test-job-remix-step') == 2
commands = [ sys.executable, 'facefusion.py', 'job-remix-step', 'test-job-remix-step', '-1', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
commands = [ sys.executable, 'facefusion.py', 'job-remix-step', 'test-job-remix-step', '-1', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
assert subprocess.run(commands).returncode == 0
assert count_step_total('test-job-remix-step') == 3
def test_job_insert_step() -> None:
commands = [ sys.executable, 'facefusion.py', 'job-insert-step', 'test-job-insert-step', '0', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
commands = [ sys.executable, 'facefusion.py', 'job-insert-step', 'test-job-insert-step', '0', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
assert subprocess.run(commands).returncode == 1
assert count_step_total('test-job-insert-step') == 0
@@ -169,16 +169,16 @@ def test_job_insert_step() -> None:
commands = [ sys.executable, 'facefusion.py', 'job-create', 'test-job-insert-step', '--jobs-path', get_test_jobs_directory() ]
subprocess.run(commands)
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-insert-step', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-insert-step', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
subprocess.run(commands)
commands = [ sys.executable, 'facefusion.py', 'job-insert-step', 'test-job-insert-step', '0', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
commands = [ sys.executable, 'facefusion.py', 'job-insert-step', 'test-job-insert-step', '0', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
assert count_step_total('test-job-insert-step') == 1
assert subprocess.run(commands).returncode == 0
assert count_step_total('test-job-insert-step') == 2
commands = [ sys.executable, 'facefusion.py', 'job-insert-step', 'test-job-insert-step', '-1', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
commands = [ sys.executable, 'facefusion.py', 'job-insert-step', 'test-job-insert-step', '-1', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
assert subprocess.run(commands).returncode == 0
assert count_step_total('test-job-insert-step') == 3
@@ -192,7 +192,7 @@ def test_job_remove_step() -> None:
commands = [ sys.executable, 'facefusion.py', 'job-create', 'test-job-remove-step', '--jobs-path', get_test_jobs_directory() ]
subprocess.run(commands)
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-remove-step', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-remove-step', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
subprocess.run(commands)
subprocess.run(commands)
+8 -8
View File
@@ -33,7 +33,7 @@ def test_job_run() -> None:
commands = [ sys.executable, 'facefusion.py', 'job-create', 'test-job-run', '--jobs-path', get_test_jobs_directory() ]
subprocess.run(commands)
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-run', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-run.jpg') ]
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-run.jpg') ]
subprocess.run(commands)
commands = [ sys.executable, 'facefusion.py', 'job-run', 'test-job-run', '--jobs-path', get_test_jobs_directory() ]
@@ -61,13 +61,13 @@ def test_job_run_all() -> None:
commands = [ sys.executable, 'facefusion.py', 'job-create', 'test-job-run-all-2', '--jobs-path', get_test_jobs_directory() ]
subprocess.run(commands)
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-run-all-1', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-run-all-1.jpg') ]
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-run-all-1', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-run-all-1.jpg') ]
subprocess.run(commands)
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-run-all-2', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-job-run-all-2.mp4'), '--trim-frame-end', '1' ]
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-run-all-2', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-job-run-all-2.mp4'), '--trim-frame-end', '1' ]
subprocess.run(commands)
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-run-all-2', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-job-run-all-2.mp4'), '--trim-frame-start', '0', '--trim-frame-end', '1' ]
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-run-all-2', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-job-run-all-2.mp4'), '--trim-frame-start', '0', '--trim-frame-end', '1' ]
subprocess.run(commands)
commands = [ sys.executable, 'facefusion.py', 'job-run-all', '--jobs-path', get_test_jobs_directory(), '--halt-on-error' ]
@@ -93,7 +93,7 @@ def test_job_retry() -> None:
commands = [ sys.executable, 'facefusion.py', 'job-create', 'test-job-retry', '--jobs-path', get_test_jobs_directory() ]
subprocess.run(commands)
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-retry', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-retry.jpg') ]
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-retry', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-retry.jpg') ]
subprocess.run(commands)
commands = [ sys.executable, 'facefusion.py', 'job-retry', 'test-job-retry', '--jobs-path', get_test_jobs_directory() ]
@@ -121,13 +121,13 @@ def test_job_retry_all() -> None:
commands = [ sys.executable, 'facefusion.py', 'job-create', 'test-job-retry-all-2', '--jobs-path', get_test_jobs_directory() ]
subprocess.run(commands)
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-retry-all-1', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-retry-all-1.jpg') ]
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-retry-all-1', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-retry-all-1.jpg') ]
subprocess.run(commands)
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-retry-all-2', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-job-retry-all-2.mp4'), '--trim-frame-end', '1' ]
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-retry-all-2', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-job-retry-all-2.mp4'), '--trim-frame-end', '1' ]
subprocess.run(commands)
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-retry-all-2', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-job-retry-all-2.mp4'), '--trim-frame-start', '0', '--trim-frame-end', '1' ]
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-retry-all-2', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-job-retry-all-2.mp4'), '--trim-frame-start', '0', '--trim-frame-end', '1' ]
subprocess.run(commands)
commands = [ sys.executable, 'facefusion.py', 'job-retry-all', '--jobs-path', get_test_jobs_directory(), '--halt-on-error' ]
+3 -3
View File
@@ -27,14 +27,14 @@ def before_each() -> None:
def test_sync_lip_to_image() -> None:
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'lip_syncer', '-s', get_test_example_file('source.mp3'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test_sync_lip_to_image.jpg') ]
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'audio-to-image', '--processors', 'lip_syncer', '-s', get_test_example_file('source.mp3'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test_sync_lip_to_image.mp4') ]
assert subprocess.run(commands).returncode == 0
assert is_test_output_file('test_sync_lip_to_image.jpg') is True
assert is_test_output_file('test_sync_lip_to_image.mp4') is True
def test_sync_lip_to_video() -> None:
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'lip_syncer', '-s', get_test_example_file('source.mp3'), '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test_sync_lip_to_video.mp4'), '--trim-frame-end', '1' ]
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'lip_syncer', '-s', get_test_example_file('source.mp3'), '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test_sync_lip_to_video.mp4'), '--trim-frame-end', '1' ]
assert subprocess.run(commands).returncode == 0
assert is_test_output_file('test_sync_lip_to_video.mp4') is True
+2 -2
View File
@@ -36,7 +36,7 @@ def before_each() -> None:
])
def test_output_image_scale(output_image_scale : Scale, output_image_resolution : Resolution) -> None:
output_file_path = get_test_output_file('test-output-image-scale-' + str(output_image_scale) + '.jpg')
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'frame_enhancer', '-t', get_test_example_file('target-240p.jpg'), '-o', output_file_path, '--output-image-scale', str(output_image_scale) ]
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'frame_enhancer', '-t', get_test_example_file('target-240p.jpg'), '-o', output_file_path, '--output-image-scale', str(output_image_scale) ]
assert subprocess.run(commands).returncode == 0
assert detect_image_resolution(output_file_path) == output_image_resolution
@@ -51,7 +51,7 @@ def test_output_image_scale(output_image_scale : Scale, output_image_resolution
])
def test_output_video_scale(output_video_scale : Scale, output_video_resolution : Resolution) -> None:
output_file_path = get_test_output_file('test-output-video-scale-' + str(output_video_scale) + '.mp4')
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'frame_enhancer', '-t', get_test_example_file('target-240p.mp4'), '-o', output_file_path, '--trim-frame-end', '1', '--output-video-scale', str(output_video_scale) ]
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'frame_enhancer', '-t', get_test_example_file('target-240p.mp4'), '-o', output_file_path, '--trim-frame-end', '1', '--output-video-scale', str(output_video_scale) ]
assert subprocess.run(commands).returncode == 0
assert detect_video_resolution(output_file_path) == output_video_resolution
+8 -18
View File
@@ -3,7 +3,7 @@ import subprocess
import pytest
from facefusion.download import conditional_download
from facefusion.vision import calculate_histogram_difference, count_trim_frame_total, count_video_frame_total, detect_image_resolution, detect_video_duration, detect_video_fps, detect_video_resolution, match_frame_color, normalize_resolution, pack_resolution, predict_video_frame_total, read_image, read_video_frame, restrict_image_resolution, restrict_trim_frame, restrict_video_fps, restrict_video_resolution, scale_resolution, unpack_resolution, write_image
from facefusion.vision import calculate_histogram_difference, count_video_frame_total, detect_image_resolution, detect_video_duration, detect_video_fps, detect_video_resolution, match_frame_color, normalize_resolution, pack_resolution, predict_video_frame_total, read_image, read_video_frame, restrict_image_resolution, restrict_trim_video_frame, restrict_video_fps, restrict_video_resolution, scale_resolution, unpack_resolution, write_image
from .helper import get_test_example_file, get_test_examples_directory, get_test_output_file, prepare_test_output_directory
@@ -97,24 +97,14 @@ def test_detect_video_duration() -> None:
assert detect_video_duration('invalid') == 0
def test_count_trim_frame_total() -> None:
assert count_trim_frame_total(get_test_example_file('target-240p.mp4'), 0, 200) == 200
assert count_trim_frame_total(get_test_example_file('target-240p.mp4'), 70, 270) == 200
assert count_trim_frame_total(get_test_example_file('target-240p.mp4'), -10, None) == 270
assert count_trim_frame_total(get_test_example_file('target-240p.mp4'), None, -10) == 0
assert count_trim_frame_total(get_test_example_file('target-240p.mp4'), 280, None) == 0
assert count_trim_frame_total(get_test_example_file('target-240p.mp4'), None, 280) == 270
assert count_trim_frame_total(get_test_example_file('target-240p.mp4'), None, None) == 270
def test_restrict_trim_frame() -> None:
assert restrict_trim_frame(get_test_example_file('target-240p.mp4'), 0, 200) == (0, 200)
assert restrict_trim_frame(get_test_example_file('target-240p.mp4'), 70, 270) == (70, 270)
assert restrict_trim_frame(get_test_example_file('target-240p.mp4'), -10, None) == (0, 270)
assert restrict_trim_frame(get_test_example_file('target-240p.mp4'), None, -10) == (0, 0)
assert restrict_trim_frame(get_test_example_file('target-240p.mp4'), 280, None) == (270, 270)
assert restrict_trim_frame(get_test_example_file('target-240p.mp4'), None, 280) == (0, 270)
assert restrict_trim_frame(get_test_example_file('target-240p.mp4'), None, None) == (0, 270)
assert restrict_trim_video_frame(get_test_example_file('target-240p.mp4'), 0, 200) == (0, 200)
assert restrict_trim_video_frame(get_test_example_file('target-240p.mp4'), 70, 270) == (70, 270)
assert restrict_trim_video_frame(get_test_example_file('target-240p.mp4'), -10, None) == (0, 270)
assert restrict_trim_video_frame(get_test_example_file('target-240p.mp4'), None, -10) == (0, 0)
assert restrict_trim_video_frame(get_test_example_file('target-240p.mp4'), 280, None) == (270, 270)
assert restrict_trim_video_frame(get_test_example_file('target-240p.mp4'), None, 280) == (0, 270)
assert restrict_trim_video_frame(get_test_example_file('target-240p.mp4'), None, None) == (0, 270)
def test_detect_video_resolution() -> None: