mirror of
https://github.com/facefusion/facefusion.git
synced 2026-04-29 21:07:50 +02:00
add todo
add test cleanup remove -w move --workflow position fix test add --worflow, audio-to-image, image-to-image, image-to-video
This commit is contained in:
@@ -8,6 +8,8 @@ from facefusion.vision import detect_video_fps
|
||||
def apply_args(args : Args, apply_state_item : ApplyStateItem) -> None:
|
||||
# general
|
||||
apply_state_item('command', args.get('command'))
|
||||
# workflow
|
||||
apply_state_item('workflow', args.get('workflow'))
|
||||
# paths
|
||||
apply_state_item('temp_path', args.get('temp_path'))
|
||||
apply_state_item('jobs_path', args.get('jobs_path'))
|
||||
|
||||
+30
-2
@@ -1,5 +1,6 @@
|
||||
import math
|
||||
from functools import lru_cache
|
||||
from typing import Any, List, Optional
|
||||
from typing import Any, List, Optional, Tuple
|
||||
|
||||
import numpy
|
||||
import scipy
|
||||
@@ -7,7 +8,8 @@ from numpy.typing import NDArray
|
||||
|
||||
from facefusion.ffmpeg import read_audio_buffer
|
||||
from facefusion.filesystem import is_audio
|
||||
from facefusion.types import Audio, AudioFrame, Fps, Mel, MelFilterBank, Spectrogram
|
||||
from facefusion.media_helper import restrict_trim_frame
|
||||
from facefusion.types import Audio, AudioFrame, Duration, Fps, Mel, MelFilterBank, Spectrogram
|
||||
from facefusion.voice_extractor import batch_extract_voice
|
||||
|
||||
|
||||
@@ -141,3 +143,29 @@ def create_spectrogram(audio : Audio) -> Spectrogram:
|
||||
spectrogram = scipy.signal.stft(audio, nperseg = mel_bin_total, nfft = mel_bin_total, noverlap = mel_bin_overlap)[2]
|
||||
spectrogram = numpy.dot(mel_filter_bank, numpy.abs(spectrogram))
|
||||
return spectrogram
|
||||
|
||||
|
||||
def count_audio_frame_total(audio_path : str, fps : Fps) -> int:
|
||||
audio_duration = detect_audio_duration(audio_path)
|
||||
if audio_duration > 0:
|
||||
return math.ceil(audio_duration * fps)
|
||||
return 0
|
||||
|
||||
|
||||
def detect_audio_duration(audio_path : str) -> Duration:
|
||||
audio_sample_rate = 48000
|
||||
audio_sample_size = 16
|
||||
audio_channel_total = 2
|
||||
|
||||
if is_audio(audio_path):
|
||||
audio_buffer = read_audio_buffer(audio_path, audio_sample_rate, audio_sample_size, audio_channel_total)
|
||||
if audio_buffer:
|
||||
audio = numpy.frombuffer(audio_buffer, dtype = numpy.int16).reshape(-1, audio_channel_total)
|
||||
audio_duration = len(audio) / audio_sample_rate
|
||||
return audio_duration
|
||||
return 0
|
||||
|
||||
|
||||
def restrict_trim_audio_frame(audio_path : str, fps : Fps, trim_frame_start : Optional[int], trim_frame_end : Optional[int]) -> Tuple[int, int]:
|
||||
audio_frame_total = count_audio_frame_total(audio_path, fps)
|
||||
return restrict_trim_frame(audio_frame_total, trim_frame_start, trim_frame_end)
|
||||
|
||||
@@ -2,7 +2,7 @@ import logging
|
||||
from typing import List, Sequence
|
||||
|
||||
from facefusion.common_helper import create_float_range, create_int_range
|
||||
from facefusion.types import Angle, AudioEncoder, AudioFormat, AudioTypeSet, BenchmarkMode, BenchmarkResolution, BenchmarkSet, DownloadProvider, DownloadProviderSet, DownloadScope, EncoderSet, ExecutionProvider, ExecutionProviderSet, FaceDetectorModel, FaceDetectorSet, FaceLandmarkerModel, FaceMaskArea, FaceMaskAreaSet, FaceMaskRegion, FaceMaskRegionSet, FaceMaskType, FaceOccluderModel, FaceParserModel, FaceSelectorMode, FaceSelectorOrder, Gender, ImageFormat, ImageTypeSet, JobStatus, LogLevel, LogLevelSet, Race, Score, TempFrameFormat, VideoEncoder, VideoFormat, VideoMemoryStrategy, VideoPreset, VideoTypeSet, VoiceExtractorModel
|
||||
from facefusion.types import Angle, AudioEncoder, AudioFormat, AudioTypeSet, BenchmarkMode, BenchmarkResolution, BenchmarkSet, DownloadProvider, DownloadProviderSet, DownloadScope, EncoderSet, ExecutionProvider, ExecutionProviderSet, FaceDetectorModel, FaceDetectorSet, FaceLandmarkerModel, FaceMaskArea, FaceMaskAreaSet, FaceMaskRegion, FaceMaskRegionSet, FaceMaskType, FaceOccluderModel, FaceParserModel, FaceSelectorMode, FaceSelectorOrder, Gender, ImageFormat, ImageTypeSet, JobStatus, LogLevel, LogLevelSet, Race, Score, TempFrameFormat, VideoEncoder, VideoFormat, VideoMemoryStrategy, VideoPreset, VideoTypeSet, VoiceExtractorModel, WorkFlow
|
||||
|
||||
face_detector_set : FaceDetectorSet =\
|
||||
{
|
||||
@@ -45,6 +45,8 @@ face_mask_regions : List[FaceMaskRegion] = list(face_mask_region_set.keys())
|
||||
|
||||
voice_extractor_models : List[VoiceExtractorModel] = [ 'kim_vocal_1', 'kim_vocal_2', 'uvr_mdxnet' ]
|
||||
|
||||
workflows : List[WorkFlow] = [ 'audio-to-image', 'image-to-image', 'image-to-video' ]
|
||||
|
||||
audio_type_set : AudioTypeSet =\
|
||||
{
|
||||
'flac': 'audio/flac',
|
||||
|
||||
+6
-4
@@ -12,14 +12,14 @@ from facefusion.apis.core import create_api
|
||||
from facefusion.args_helper import apply_args
|
||||
from facefusion.download import conditional_download_hashes, conditional_download_sources
|
||||
from facefusion.exit_helper import hard_exit, signal_exit
|
||||
from facefusion.filesystem import get_file_extension, get_file_name, is_image, is_video, resolve_file_paths, resolve_file_pattern
|
||||
from facefusion.filesystem import get_file_extension, get_file_name, resolve_file_paths, resolve_file_pattern
|
||||
from facefusion.jobs import job_helper, job_manager, job_runner
|
||||
from facefusion.jobs.job_list import compose_job_list
|
||||
from facefusion.processors.core import get_processors_modules
|
||||
from facefusion.program import create_program
|
||||
from facefusion.program_helper import validate_args
|
||||
from facefusion.types import Args, ErrorCode
|
||||
from facefusion.workflows import image_to_image, image_to_video
|
||||
from facefusion.workflows import audio_to_image, image_to_image, image_to_video
|
||||
|
||||
|
||||
def cli() -> None:
|
||||
@@ -332,9 +332,11 @@ def conditional_process() -> ErrorCode:
|
||||
if not processor_module.pre_process('output'):
|
||||
return 2
|
||||
|
||||
if is_image(state_manager.get_item('target_path')):
|
||||
if state_manager.get_item('workflow') == 'audio-to-image':
|
||||
return audio_to_image.process(start_time)
|
||||
if state_manager.get_item('workflow') == 'image-to-image':
|
||||
return image_to_image.process(start_time)
|
||||
if is_video(state_manager.get_item('target_path')):
|
||||
if state_manager.get_item('workflow') == 'image-to-video':
|
||||
return image_to_video.process(start_time)
|
||||
|
||||
return 0
|
||||
|
||||
@@ -0,0 +1,17 @@
|
||||
from typing import Optional, Tuple
|
||||
|
||||
|
||||
def restrict_trim_frame(frame_total : int, trim_frame_start : Optional[int], trim_frame_end : Optional[int]) -> Tuple[int, int]:
|
||||
if isinstance(trim_frame_start, int):
|
||||
trim_frame_start = max(0, min(trim_frame_start, frame_total))
|
||||
if isinstance(trim_frame_end, int):
|
||||
trim_frame_end = max(0, min(trim_frame_end, frame_total))
|
||||
|
||||
if isinstance(trim_frame_start, int) and isinstance(trim_frame_end, int):
|
||||
return trim_frame_start, trim_frame_end
|
||||
if isinstance(trim_frame_start, int):
|
||||
return trim_frame_start, frame_total
|
||||
if isinstance(trim_frame_end, int):
|
||||
return 0, trim_frame_end
|
||||
|
||||
return 0, frame_total
|
||||
+12
-4
@@ -29,6 +29,14 @@ def create_config_path_program() -> ArgumentParser:
|
||||
return program
|
||||
|
||||
|
||||
def create_workflow_program() -> ArgumentParser:
|
||||
program = ArgumentParser(add_help = False)
|
||||
group_paths = program.add_argument_group('paths')
|
||||
group_paths.add_argument('--workflow', help = translator.get('help.workflow'), choices = facefusion.choices.workflows)
|
||||
args_store.register_args([ 'workflow' ], scopes = [ 'api', 'cli' ])
|
||||
return program
|
||||
|
||||
|
||||
def create_temp_path_program() -> ArgumentParser:
|
||||
program = ArgumentParser(add_help = False)
|
||||
group_paths = program.add_argument_group('paths')
|
||||
@@ -285,7 +293,7 @@ def collect_step_program() -> ArgumentParser:
|
||||
|
||||
|
||||
def collect_job_program() -> ArgumentParser:
|
||||
return ArgumentParser(parents = [ create_execution_program(), create_download_providers_program(), create_memory_program(), create_log_level_program() ], add_help = False)
|
||||
return ArgumentParser(parents = [ create_workflow_program(), create_execution_program(), create_download_providers_program(), create_memory_program(), create_log_level_program() ], add_help = False)
|
||||
|
||||
|
||||
def create_program() -> ArgumentParser:
|
||||
@@ -306,9 +314,9 @@ def create_program() -> ArgumentParser:
|
||||
sub_program.add_parser('job-submit-all', help = translator.get('help.job_submit_all'), parents = [ create_jobs_path_program(), create_log_level_program(), create_halt_on_error_program() ], formatter_class = create_help_formatter_large)
|
||||
sub_program.add_parser('job-delete', help = translator.get('help.job_delete'), parents = [ create_job_id_program(), create_jobs_path_program(), create_log_level_program() ], formatter_class = create_help_formatter_large)
|
||||
sub_program.add_parser('job-delete-all', help = translator.get('help.job_delete_all'), parents = [ create_jobs_path_program(), create_log_level_program(), create_halt_on_error_program() ], formatter_class = create_help_formatter_large)
|
||||
sub_program.add_parser('job-add-step', help = translator.get('help.job_add_step'), parents = [ create_job_id_program(), create_config_path_program(), create_jobs_path_program(), create_source_paths_program(), create_target_path_program(), create_output_path_program(), collect_step_program(), create_log_level_program() ], formatter_class = create_help_formatter_large)
|
||||
sub_program.add_parser('job-remix-step', help = translator.get('help.job_remix_step'), parents = [ create_job_id_program(), create_step_index_program(), create_config_path_program(), create_jobs_path_program(), create_source_paths_program(), create_output_path_program(), collect_step_program(), create_log_level_program() ], formatter_class = create_help_formatter_large)
|
||||
sub_program.add_parser('job-insert-step', help = translator.get('help.job_insert_step'), parents = [ create_job_id_program(), create_step_index_program(), create_config_path_program(), create_jobs_path_program(), create_source_paths_program(), create_target_path_program(), create_output_path_program(), collect_step_program(), create_log_level_program() ], formatter_class = create_help_formatter_large)
|
||||
sub_program.add_parser('job-add-step', help = translator.get('help.job_add_step'), parents = [ create_job_id_program(), create_workflow_program(), create_config_path_program(), create_jobs_path_program(), create_source_paths_program(), create_target_path_program(), create_output_path_program(), collect_step_program(), create_log_level_program() ], formatter_class = create_help_formatter_large)
|
||||
sub_program.add_parser('job-remix-step', help = translator.get('help.job_remix_step'), parents = [ create_job_id_program(), create_workflow_program(), create_step_index_program(), create_config_path_program(), create_jobs_path_program(), create_source_paths_program(), create_output_path_program(), collect_step_program(), create_log_level_program() ], formatter_class = create_help_formatter_large)
|
||||
sub_program.add_parser('job-insert-step', help = translator.get('help.job_insert_step'), parents = [ create_job_id_program(), create_workflow_program(), create_step_index_program(), create_config_path_program(), create_jobs_path_program(), create_source_paths_program(), create_target_path_program(), create_output_path_program(), collect_step_program(), create_log_level_program() ], formatter_class = create_help_formatter_large)
|
||||
sub_program.add_parser('job-remove-step', help = translator.get('help.job_remove_step'), parents = [ create_job_id_program(), create_step_index_program(), create_jobs_path_program(), create_log_level_program() ], formatter_class = create_help_formatter_large)
|
||||
# job runner
|
||||
sub_program.add_parser('job-run', help = translator.get('help.job_run'), parents = [ create_job_id_program(), create_config_path_program(), create_temp_path_program(), create_jobs_path_program(), collect_job_program() ], formatter_class = create_help_formatter_large)
|
||||
|
||||
@@ -23,12 +23,12 @@ def resolve_temp_frame_paths(target_path : str) -> List[str]:
|
||||
|
||||
def get_temp_frames_pattern(target_path : str, temp_frame_prefix : str) -> str:
|
||||
temp_directory_path = get_temp_directory_path(target_path)
|
||||
return os.path.join(temp_directory_path, temp_frame_prefix + '.' + state_manager.get_item('temp_frame_format'))
|
||||
return os.path.join(temp_directory_path, temp_frame_prefix + '.' + state_manager.get_item('temp_frame_format')) # TODO: remove state_manager.get
|
||||
|
||||
|
||||
def get_temp_directory_path(file_path : str) -> str:
|
||||
temp_file_name = get_file_name(file_path)
|
||||
return os.path.join(state_manager.get_item('temp_path'), 'facefusion', temp_file_name)
|
||||
return os.path.join(state_manager.get_item('temp_path'), 'facefusion', temp_file_name) # TODO: remove state_manager.get
|
||||
|
||||
|
||||
def create_temp_directory(file_path : str) -> bool:
|
||||
@@ -37,7 +37,18 @@ def create_temp_directory(file_path : str) -> bool:
|
||||
|
||||
|
||||
def clear_temp_directory(file_path : str) -> bool:
|
||||
if not state_manager.get_item('keep_temp'):
|
||||
if not state_manager.get_item('keep_temp'): # TODO: remove state_manager.get
|
||||
temp_directory_path = get_temp_directory_path(file_path)
|
||||
return remove_directory(temp_directory_path)
|
||||
return True
|
||||
|
||||
|
||||
def get_temp_sequence_paths(file_path : str, frame_total : int, temp_frame_prefix : str, temp_frame_format : str) -> List[str]:
|
||||
temp_directory_path = get_temp_directory_path(file_path)
|
||||
temp_frame_paths = []
|
||||
|
||||
for frame_number in range(frame_total):
|
||||
temp_file_name = temp_frame_prefix % (frame_number + 1) + '.' + temp_frame_format
|
||||
temp_frame_path = os.path.join(temp_directory_path, temp_file_name)
|
||||
temp_frame_paths.append(temp_frame_path)
|
||||
return temp_frame_paths
|
||||
|
||||
@@ -55,6 +55,8 @@ Language = Literal['en']
|
||||
Locales : TypeAlias = Dict[Language, Dict[str, Any]]
|
||||
LocalePoolSet : TypeAlias = Dict[str, Locales]
|
||||
|
||||
WorkFlow = Literal['audio-to-image', 'image-to-image', 'image-to-video']
|
||||
|
||||
VideoCaptureSet : TypeAlias = Dict[str, cv2.VideoCapture]
|
||||
VideoWriterSet : TypeAlias = Dict[str, cv2.VideoWriter]
|
||||
CameraCaptureSet : TypeAlias = Dict[str, cv2.VideoCapture]
|
||||
|
||||
+5
-22
@@ -8,6 +8,7 @@ from cv2.typing import Size
|
||||
|
||||
from facefusion.common_helper import is_windows
|
||||
from facefusion.filesystem import get_file_extension, is_image, is_video
|
||||
from facefusion.media_helper import restrict_trim_frame
|
||||
from facefusion.thread_helper import thread_semaphore
|
||||
from facefusion.types import ColorMode, Duration, Fps, Mask, Orientation, Resolution, Scale, VisionFrame
|
||||
from facefusion.video_manager import get_video_capture
|
||||
@@ -107,7 +108,8 @@ def count_video_frame_total(video_path : str) -> int:
|
||||
def predict_video_frame_total(video_path : str, fps : Fps, trim_frame_start : int, trim_frame_end : int) -> int:
|
||||
if is_video(video_path):
|
||||
video_fps = detect_video_fps(video_path)
|
||||
extract_frame_total = count_trim_frame_total(video_path, trim_frame_start, trim_frame_end) * fps / video_fps
|
||||
trim_frame_start, trim_frame_end = restrict_trim_video_frame(video_path, trim_frame_start, trim_frame_end)
|
||||
extract_frame_total = (trim_frame_end - trim_frame_start) * fps / video_fps
|
||||
return math.floor(extract_frame_total)
|
||||
return 0
|
||||
|
||||
@@ -141,28 +143,9 @@ def detect_video_duration(video_path : str) -> Duration:
|
||||
return 0
|
||||
|
||||
|
||||
def count_trim_frame_total(video_path : str, trim_frame_start : Optional[int], trim_frame_end : Optional[int]) -> int:
|
||||
trim_frame_start, trim_frame_end = restrict_trim_frame(video_path, trim_frame_start, trim_frame_end)
|
||||
|
||||
return trim_frame_end - trim_frame_start
|
||||
|
||||
|
||||
def restrict_trim_frame(video_path : str, trim_frame_start : Optional[int], trim_frame_end : Optional[int]) -> Tuple[int, int]:
|
||||
def restrict_trim_video_frame(video_path : str, trim_frame_start : Optional[int], trim_frame_end : Optional[int]) -> Tuple[int, int]:
|
||||
video_frame_total = count_video_frame_total(video_path)
|
||||
|
||||
if isinstance(trim_frame_start, int):
|
||||
trim_frame_start = max(0, min(trim_frame_start, video_frame_total))
|
||||
if isinstance(trim_frame_end, int):
|
||||
trim_frame_end = max(0, min(trim_frame_end, video_frame_total))
|
||||
|
||||
if isinstance(trim_frame_start, int) and isinstance(trim_frame_end, int):
|
||||
return trim_frame_start, trim_frame_end
|
||||
if isinstance(trim_frame_start, int):
|
||||
return trim_frame_start, video_frame_total
|
||||
if isinstance(trim_frame_end, int):
|
||||
return 0, trim_frame_end
|
||||
|
||||
return 0, video_frame_total
|
||||
return restrict_trim_frame(video_frame_total, trim_frame_start, trim_frame_end)
|
||||
|
||||
|
||||
def detect_video_resolution(video_path : str) -> Optional[Resolution]:
|
||||
|
||||
@@ -0,0 +1,160 @@
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from functools import partial
|
||||
|
||||
import numpy
|
||||
from tqdm import tqdm
|
||||
|
||||
from facefusion import content_analyser, ffmpeg, logger, process_manager, state_manager, translator
|
||||
from facefusion.audio import create_empty_audio_frame, get_audio_frame, get_voice_frame, restrict_trim_audio_frame
|
||||
from facefusion.common_helper import get_first
|
||||
from facefusion.filesystem import filter_audio_paths, is_video
|
||||
from facefusion.processors.core import get_processors_modules
|
||||
from facefusion.temp_helper import get_temp_sequence_paths, move_temp_file
|
||||
from facefusion.time_helper import calculate_end_time
|
||||
from facefusion.types import ErrorCode
|
||||
from facefusion.vision import conditional_merge_vision_mask, detect_image_resolution, extract_vision_mask, pack_resolution, read_static_image, read_static_images, restrict_trim_video_frame, scale_resolution, write_image
|
||||
from facefusion.workflows.core import clear, is_process_stopping, setup
|
||||
|
||||
|
||||
def process(start_time : float) -> ErrorCode:
|
||||
tasks =\
|
||||
[
|
||||
analyse_image,
|
||||
clear,
|
||||
setup,
|
||||
process_image,
|
||||
merge_frames,
|
||||
restore_audio,
|
||||
partial(finalize_video, start_time),
|
||||
clear
|
||||
]
|
||||
|
||||
process_manager.start()
|
||||
|
||||
for task in tasks:
|
||||
error_code = task() # type:ignore[operator]
|
||||
|
||||
if error_code > 0:
|
||||
process_manager.end()
|
||||
return error_code
|
||||
|
||||
process_manager.end()
|
||||
return 0
|
||||
|
||||
|
||||
def analyse_image() -> ErrorCode: # TODO: reusable block
|
||||
if content_analyser.analyse_image(state_manager.get_item('target_path')):
|
||||
return 3
|
||||
return 0
|
||||
|
||||
|
||||
def process_image() -> ErrorCode:
|
||||
state_manager.set_item('output_video_fps', 25.0) # TODO: set default fps value
|
||||
source_audio_path = get_first(filter_audio_paths(state_manager.get_item('source_paths')))
|
||||
trim_frame_start, trim_frame_end = restrict_trim_audio_frame(source_audio_path, state_manager.get_item('output_video_fps'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end'))
|
||||
audio_frame_total = trim_frame_end - trim_frame_start
|
||||
temp_frame_paths = get_temp_sequence_paths(state_manager.get_item('output_path'), audio_frame_total, '%08d', state_manager.get_item('temp_frame_format'))
|
||||
|
||||
if temp_frame_paths:
|
||||
with tqdm(total = len(temp_frame_paths), desc = translator.get('processing'), unit = 'frame', ascii = ' =', disable = state_manager.get_item('log_level') in [ 'warn', 'error' ]) as progress:
|
||||
progress.set_postfix(execution_providers = state_manager.get_item('execution_providers'))
|
||||
|
||||
with ThreadPoolExecutor(max_workers = state_manager.get_item('execution_thread_count')) as executor:
|
||||
futures = []
|
||||
|
||||
for frame_number, temp_frame_path in enumerate(temp_frame_paths):
|
||||
future = executor.submit(process_temp_frame, temp_frame_path, frame_number)
|
||||
futures.append(future)
|
||||
|
||||
for future in as_completed(futures):
|
||||
if is_process_stopping():
|
||||
for __future__ in futures:
|
||||
__future__.cancel()
|
||||
|
||||
if not future.cancelled():
|
||||
future.result()
|
||||
progress.update()
|
||||
|
||||
for processor_module in get_processors_modules(state_manager.get_item('processors')):
|
||||
processor_module.post_process()
|
||||
|
||||
if is_process_stopping():
|
||||
return 4
|
||||
else:
|
||||
logger.error(translator.get('temp_frames_not_found'), __name__)
|
||||
return 1
|
||||
return 0
|
||||
|
||||
|
||||
def process_temp_frame(temp_frame_path : str, frame_number : int) -> bool: # TODO refinement like to_video.py file.
|
||||
output_video_fps = state_manager.get_item('output_video_fps')
|
||||
reference_vision_frame = read_static_image(state_manager.get_item('target_path'))
|
||||
source_vision_frames = read_static_images(state_manager.get_item('source_paths'))
|
||||
source_audio_path = get_first(filter_audio_paths(state_manager.get_item('source_paths')))
|
||||
target_vision_frame = read_static_image(state_manager.get_item('target_path'), 'rgba')
|
||||
temp_vision_frame = target_vision_frame.copy()
|
||||
temp_vision_mask = extract_vision_mask(temp_vision_frame)
|
||||
|
||||
source_audio_frame = get_audio_frame(source_audio_path, output_video_fps, frame_number)
|
||||
source_voice_frame = get_voice_frame(source_audio_path, output_video_fps, frame_number)
|
||||
|
||||
if not numpy.any(source_audio_frame):
|
||||
source_audio_frame = create_empty_audio_frame()
|
||||
if not numpy.any(source_voice_frame):
|
||||
source_voice_frame = create_empty_audio_frame()
|
||||
|
||||
for processor_module in get_processors_modules(state_manager.get_item('processors')):
|
||||
temp_vision_frame, temp_vision_mask = processor_module.process_frame(
|
||||
{
|
||||
'reference_vision_frame': reference_vision_frame,
|
||||
'source_vision_frames': source_vision_frames,
|
||||
'source_audio_frame': source_audio_frame,
|
||||
'source_voice_frame': source_voice_frame,
|
||||
'target_vision_frame': target_vision_frame[:, :, :3],
|
||||
'temp_vision_frame': temp_vision_frame[:, :, :3],
|
||||
'temp_vision_mask': temp_vision_mask
|
||||
})
|
||||
|
||||
temp_vision_frame = conditional_merge_vision_mask(temp_vision_frame, temp_vision_mask)
|
||||
return write_image(temp_frame_path, temp_vision_frame)
|
||||
|
||||
|
||||
def merge_frames() -> ErrorCode:
|
||||
trim_frame_start, trim_frame_end = restrict_trim_video_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end'))
|
||||
output_video_resolution = scale_resolution(detect_image_resolution(state_manager.get_item('target_path')), state_manager.get_item('output_image_scale'))
|
||||
|
||||
logger.info(translator.get('merging_video').format(resolution = pack_resolution(output_video_resolution), fps = state_manager.get_item('output_video_fps')), __name__)
|
||||
if ffmpeg.merge_video(state_manager.get_item('target_path'), state_manager.get_item('output_path'), state_manager.get_item('output_video_fps'), output_video_resolution, trim_frame_start, trim_frame_end):
|
||||
logger.debug(translator.get('merging_video_succeeded'), __name__)
|
||||
else:
|
||||
if is_process_stopping():
|
||||
return 4
|
||||
logger.error(translator.get('merging_video_failed'), __name__)
|
||||
return 1
|
||||
return 0
|
||||
|
||||
|
||||
def restore_audio() -> ErrorCode:
|
||||
if state_manager.get_item('output_audio_volume') == 0:
|
||||
logger.info(translator.get('skipping_audio'), __name__)
|
||||
move_temp_file(state_manager.get_item('output_path'))
|
||||
else:
|
||||
source_audio_path = get_first(filter_audio_paths(state_manager.get_item('source_paths')))
|
||||
if source_audio_path:
|
||||
if ffmpeg.replace_audio(source_audio_path, state_manager.get_item('output_path')):
|
||||
logger.debug(translator.get('replacing_audio_succeeded'), __name__)
|
||||
else:
|
||||
if is_process_stopping():
|
||||
return 4
|
||||
logger.warn(translator.get('replacing_audio_skipped'), __name__)
|
||||
move_temp_file(state_manager.get_item('output_path'))
|
||||
return 0
|
||||
|
||||
|
||||
def finalize_video(start_time : float) -> ErrorCode:
|
||||
if is_video(state_manager.get_item('output_path')):
|
||||
logger.info(translator.get('processing_video_succeeded').format(seconds = calculate_end_time(start_time)), __name__)
|
||||
else:
|
||||
logger.error(translator.get('processing_video_failed'), __name__)
|
||||
return 1
|
||||
return 0
|
||||
@@ -12,7 +12,7 @@ from facefusion.processors.core import get_processors_modules
|
||||
from facefusion.temp_helper import move_temp_file, resolve_temp_frame_paths
|
||||
from facefusion.time_helper import calculate_end_time
|
||||
from facefusion.types import ErrorCode
|
||||
from facefusion.vision import conditional_merge_vision_mask, detect_video_resolution, extract_vision_mask, pack_resolution, read_static_image, read_static_images, read_static_video_frame, restrict_trim_frame, restrict_video_fps, restrict_video_resolution, scale_resolution, write_image
|
||||
from facefusion.vision import conditional_merge_vision_mask, detect_video_resolution, extract_vision_mask, pack_resolution, read_static_image, read_static_images, read_static_video_frame, restrict_trim_video_frame, restrict_video_fps, restrict_video_resolution, scale_resolution, write_image
|
||||
from facefusion.workflows.core import clear, is_process_stopping, setup
|
||||
|
||||
|
||||
@@ -44,7 +44,7 @@ def process(start_time : float) -> ErrorCode:
|
||||
|
||||
|
||||
def analyse_video() -> ErrorCode:
|
||||
trim_frame_start, trim_frame_end = restrict_trim_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end'))
|
||||
trim_frame_start, trim_frame_end = restrict_trim_video_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end'))
|
||||
|
||||
if content_analyser.analyse_video(state_manager.get_item('target_path'), trim_frame_start, trim_frame_end):
|
||||
return 3
|
||||
@@ -52,7 +52,7 @@ def analyse_video() -> ErrorCode:
|
||||
|
||||
|
||||
def extract_frames() -> ErrorCode:
|
||||
trim_frame_start, trim_frame_end = restrict_trim_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end'))
|
||||
trim_frame_start, trim_frame_end = restrict_trim_video_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end'))
|
||||
output_video_resolution = scale_resolution(detect_video_resolution(state_manager.get_item('target_path')), state_manager.get_item('output_video_scale'))
|
||||
temp_video_resolution = restrict_video_resolution(state_manager.get_item('target_path'), output_video_resolution)
|
||||
temp_video_fps = restrict_video_fps(state_manager.get_item('target_path'), state_manager.get_item('output_video_fps'))
|
||||
@@ -103,7 +103,7 @@ def process_video() -> ErrorCode:
|
||||
|
||||
|
||||
def merge_frames() -> ErrorCode:
|
||||
trim_frame_start, trim_frame_end = restrict_trim_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end'))
|
||||
trim_frame_start, trim_frame_end = restrict_trim_video_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end'))
|
||||
output_video_resolution = scale_resolution(detect_video_resolution(state_manager.get_item('target_path')), state_manager.get_item('output_video_scale'))
|
||||
temp_video_fps = restrict_video_fps(state_manager.get_item('target_path'), state_manager.get_item('output_video_fps'))
|
||||
|
||||
@@ -119,7 +119,7 @@ def merge_frames() -> ErrorCode:
|
||||
|
||||
|
||||
def restore_audio() -> ErrorCode:
|
||||
trim_frame_start, trim_frame_end = restrict_trim_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end'))
|
||||
trim_frame_start, trim_frame_end = restrict_trim_video_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end'))
|
||||
|
||||
if state_manager.get_item('output_audio_volume') == 0:
|
||||
logger.info(translator.get('skipping_audio'), __name__)
|
||||
|
||||
+18
-1
@@ -1,8 +1,9 @@
|
||||
import subprocess
|
||||
|
||||
import pytest
|
||||
from pytest import approx
|
||||
|
||||
from facefusion.audio import get_audio_frame, read_static_audio
|
||||
from facefusion.audio import detect_audio_duration, get_audio_frame, read_static_audio, restrict_trim_audio_frame
|
||||
from facefusion.download import conditional_download
|
||||
from .helper import get_test_example_file, get_test_examples_directory
|
||||
|
||||
@@ -26,3 +27,19 @@ def test_read_static_audio() -> None:
|
||||
assert len(read_static_audio(get_test_example_file('source.mp3'), 25)) == 280
|
||||
assert len(read_static_audio(get_test_example_file('source.wav'), 25)) == 280
|
||||
assert read_static_audio('invalid', 25) is None
|
||||
|
||||
|
||||
def test_detect_audio_duration() -> None:
|
||||
assert detect_audio_duration(get_test_example_file('source.mp3')) == approx(3.788, rel = 1e-3)
|
||||
assert detect_audio_duration(get_test_example_file('source.wav')) == approx(3.788, rel = 1e-3)
|
||||
assert detect_audio_duration('invalid') == 0
|
||||
|
||||
|
||||
def test_restrict_trim_audio_frame() -> None:
|
||||
assert restrict_trim_audio_frame(get_test_example_file('source.mp3'), 25, 0, 50) == (0, 50)
|
||||
assert restrict_trim_audio_frame(get_test_example_file('source.mp3'), 25, 20, 95) == (20, 95)
|
||||
assert restrict_trim_audio_frame(get_test_example_file('source.mp3'), 25, -10, None) == (0, 95)
|
||||
assert restrict_trim_audio_frame(get_test_example_file('source.mp3'), 25, None, -10) == (0, 0)
|
||||
assert restrict_trim_audio_frame(get_test_example_file('source.mp3'), 25, 100, None) == (95, 95)
|
||||
assert restrict_trim_audio_frame(get_test_example_file('source.mp3'), 25, None, 100) == (0, 95)
|
||||
assert restrict_trim_audio_frame(get_test_example_file('source.mp3'), 25, None, None) == (0, 95)
|
||||
|
||||
@@ -25,14 +25,14 @@ def before_each() -> None:
|
||||
|
||||
|
||||
def test_modify_age_to_image() -> None:
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'age_modifier', '--age-modifier-direction', '100', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-age-face-to-image.jpg') ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'age_modifier', '--age-modifier-direction', '100', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-age-face-to-image.jpg') ]
|
||||
|
||||
assert subprocess.run(commands).returncode == 0
|
||||
assert is_test_output_file('test-age-face-to-image.jpg') is True
|
||||
|
||||
|
||||
def test_modify_age_to_video() -> None:
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'age_modifier', '--age-modifier-direction', '100', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-age-face-to-video.mp4'), '--trim-frame-end', '1' ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'age_modifier', '--age-modifier-direction', '100', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-age-face-to-video.mp4'), '--trim-frame-end', '1' ]
|
||||
|
||||
assert subprocess.run(commands).returncode == 0
|
||||
assert is_test_output_file('test-age-face-to-video.mp4') is True
|
||||
|
||||
@@ -26,14 +26,14 @@ def before_each() -> None:
|
||||
|
||||
|
||||
def test_remove_background_to_image() -> None:
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'background_remover', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-remove-background-to-image.jpg') ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'background_remover', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-remove-background-to-image.jpg') ]
|
||||
|
||||
assert subprocess.run(commands).returncode == 0
|
||||
assert is_test_output_file('test-remove-background-to-image.jpg') is True
|
||||
|
||||
|
||||
def test_remove_background_to_video() -> None:
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'background_remover', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-remove-background-to-video.mp4'), '--trim-frame-end', '1' ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'background_remover', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-remove-background-to-video.mp4'), '--trim-frame-end', '1' ]
|
||||
|
||||
assert subprocess.run(commands).returncode == 0
|
||||
assert is_test_output_file('test-remove-background-to-video.mp4') is True
|
||||
|
||||
@@ -26,7 +26,7 @@ def before_each() -> None:
|
||||
|
||||
|
||||
def test_batch_run_targets() -> None:
|
||||
commands = [ sys.executable, 'facefusion.py', 'batch-run', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p-batch-*.jpg'), '-o', get_test_output_file('test-batch-run-targets-{index}.jpg') ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'batch-run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'face_debugger', '-t', get_test_example_file('target-240p-batch-*.jpg'), '-o', get_test_output_file('test-batch-run-targets-{index}.jpg') ]
|
||||
|
||||
assert subprocess.run(commands).returncode == 0
|
||||
assert is_test_output_file('test-batch-run-targets-0.jpg') is True
|
||||
@@ -35,7 +35,7 @@ def test_batch_run_targets() -> None:
|
||||
|
||||
|
||||
def test_batch_run_sources_to_targets() -> None:
|
||||
commands = [ sys.executable, 'facefusion.py', 'batch-run', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('target-240p-batch-*.jpg'), '-t', get_test_example_file('target-240p-batch-*.jpg'), '-o', get_test_output_file('test-batch-run-sources-to-targets-{index}.jpg') ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'batch-run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('target-240p-batch-*.jpg'), '-t', get_test_example_file('target-240p-batch-*.jpg'), '-o', get_test_output_file('test-batch-run-sources-to-targets-{index}.jpg') ]
|
||||
|
||||
assert subprocess.run(commands).returncode == 0
|
||||
assert is_test_output_file('test-batch-run-sources-to-targets-0.jpg') is True
|
||||
|
||||
@@ -25,14 +25,14 @@ def before_each() -> None:
|
||||
|
||||
|
||||
def test_restore_expression_to_image() -> None:
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'expression_restorer', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-restore-expression-to-image.jpg') ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'expression_restorer', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-restore-expression-to-image.jpg') ]
|
||||
|
||||
assert subprocess.run(commands).returncode == 0
|
||||
assert is_test_output_file('test-restore-expression-to-image.jpg') is True
|
||||
|
||||
|
||||
def test_restore_expression_to_video() -> None:
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'expression_restorer', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-restore-expression-to-video.mp4'), '--trim-frame-end', '1' ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'expression_restorer', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-restore-expression-to-video.mp4'), '--trim-frame-end', '1' ]
|
||||
|
||||
assert subprocess.run(commands).returncode == 0
|
||||
assert is_test_output_file('test-restore-expression-to-video.mp4') is True
|
||||
|
||||
@@ -26,14 +26,14 @@ def before_each() -> None:
|
||||
|
||||
|
||||
def test_debug_face_to_image() -> None:
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-debug-face-to-image.jpg') ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-debug-face-to-image.jpg') ]
|
||||
|
||||
assert subprocess.run(commands).returncode == 0
|
||||
assert is_test_output_file('test-debug-face-to-image.jpg') is True
|
||||
|
||||
|
||||
def test_debug_face_to_video() -> None:
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-debug-face-to-video.mp4'), '--trim-frame-end', '1' ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-debug-face-to-video.mp4'), '--trim-frame-end', '1' ]
|
||||
|
||||
assert subprocess.run(commands).returncode == 0
|
||||
assert is_test_output_file('test-debug-face-to-video.mp4') is True
|
||||
|
||||
@@ -26,14 +26,14 @@ def before_each() -> None:
|
||||
|
||||
|
||||
def test_edit_face_to_image() -> None:
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_editor', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-edit-face-to-image.jpg') ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'face_editor', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-edit-face-to-image.jpg') ]
|
||||
|
||||
assert subprocess.run(commands).returncode == 0
|
||||
assert is_test_output_file('test-edit-face-to-image.jpg') is True
|
||||
|
||||
|
||||
def test_edit_face_to_video() -> None:
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_editor', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-edit-face-to-video.mp4'), '--trim-frame-end', '1' ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'face_editor', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-edit-face-to-video.mp4'), '--trim-frame-end', '1' ]
|
||||
|
||||
assert subprocess.run(commands).returncode == 0
|
||||
assert is_test_output_file('test-edit-face-to-video.mp4') is True
|
||||
|
||||
@@ -26,14 +26,14 @@ def before_each() -> None:
|
||||
|
||||
|
||||
def test_enhance_face_to_image() -> None:
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_enhancer', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-enhance-face-to-image.jpg') ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'face_enhancer', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-enhance-face-to-image.jpg') ]
|
||||
|
||||
assert subprocess.run(commands).returncode == 0
|
||||
assert is_test_output_file('test-enhance-face-to-image.jpg') is True
|
||||
|
||||
|
||||
def test_enhance_face_to_video() -> None:
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_enhancer', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-enhance-face-to-video.mp4'), '--trim-frame-end', '1' ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'face_enhancer', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-enhance-face-to-video.mp4'), '--trim-frame-end', '1' ]
|
||||
|
||||
assert subprocess.run(commands).returncode == 0
|
||||
assert is_test_output_file('test-enhance-face-to-video.mp4') is True
|
||||
|
||||
@@ -26,14 +26,14 @@ def before_each() -> None:
|
||||
|
||||
|
||||
def test_swap_face_to_image() -> None:
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_swapper', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-swap-face-to-image.jpg') ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'face_swapper', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-swap-face-to-image.jpg') ]
|
||||
|
||||
assert subprocess.run(commands).returncode == 0
|
||||
assert is_test_output_file('test-swap-face-to-image.jpg') is True
|
||||
|
||||
|
||||
def test_swap_face_to_video() -> None:
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_swapper', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-swap-face-to-video.mp4'), '--trim-frame-end', '1' ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'face_swapper', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-swap-face-to-video.mp4'), '--trim-frame-end', '1' ]
|
||||
|
||||
assert subprocess.run(commands).returncode == 0
|
||||
assert is_test_output_file('test-swap-face-to-video.mp4') is True
|
||||
|
||||
@@ -27,14 +27,14 @@ def before_each() -> None:
|
||||
|
||||
|
||||
def test_colorize_frame_to_image() -> None:
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'frame_colorizer', '-t', get_test_example_file('target-240p-0sat.jpg'), '-o', get_test_output_file('test_colorize-frame-to-image.jpg') ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'frame_colorizer', '-t', get_test_example_file('target-240p-0sat.jpg'), '-o', get_test_output_file('test_colorize-frame-to-image.jpg') ]
|
||||
|
||||
assert subprocess.run(commands).returncode == 0
|
||||
assert is_test_output_file('test_colorize-frame-to-image.jpg') is True
|
||||
|
||||
|
||||
def test_colorize_frame_to_video() -> None:
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'frame_colorizer', '-t', get_test_example_file('target-240p-0sat.mp4'), '-o', get_test_output_file('test-colorize-frame-to-video.mp4'), '--trim-frame-end', '1' ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'frame_colorizer', '-t', get_test_example_file('target-240p-0sat.mp4'), '-o', get_test_output_file('test-colorize-frame-to-video.mp4'), '--trim-frame-end', '1' ]
|
||||
|
||||
assert subprocess.run(commands).returncode == 0
|
||||
assert is_test_output_file('test-colorize-frame-to-video.mp4') is True
|
||||
|
||||
@@ -26,14 +26,14 @@ def before_each() -> None:
|
||||
|
||||
|
||||
def test_enhance_frame_to_image() -> None:
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'frame_enhancer', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-enhance-frame-to-image.jpg') ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'frame_enhancer', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-enhance-frame-to-image.jpg') ]
|
||||
|
||||
assert subprocess.run(commands).returncode == 0
|
||||
assert is_test_output_file('test-enhance-frame-to-image.jpg') is True
|
||||
|
||||
|
||||
def test_enhance_frame_to_video() -> None:
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'frame_enhancer', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-enhance-frame-to-video.mp4'), '--trim-frame-end', '1' ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'frame_enhancer', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-enhance-frame-to-video.mp4'), '--trim-frame-end', '1' ]
|
||||
|
||||
assert subprocess.run(commands).returncode == 0
|
||||
assert is_test_output_file('test-enhance-frame-to-video.mp4') is True
|
||||
|
||||
@@ -50,7 +50,7 @@ def test_job_submit() -> None:
|
||||
|
||||
assert subprocess.run(commands).returncode == 1
|
||||
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-submit', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-submit', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
|
||||
subprocess.run(commands)
|
||||
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-submit', 'test-job-submit', '--jobs-path', get_test_jobs_directory() ]
|
||||
@@ -73,10 +73,10 @@ def test_submit_all() -> None:
|
||||
|
||||
assert subprocess.run(commands).returncode == 1
|
||||
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-submit-all-1', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-submit-all-1', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
|
||||
subprocess.run(commands)
|
||||
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-submit-all-2', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-submit-all-2', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
|
||||
subprocess.run(commands)
|
||||
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-submit-all', '--jobs-path', get_test_jobs_directory(), '--halt-on-error' ]
|
||||
@@ -122,7 +122,7 @@ def test_job_delete_all() -> None:
|
||||
|
||||
|
||||
def test_job_add_step() -> None:
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-add-step', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-add-step', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
|
||||
|
||||
assert subprocess.run(commands).returncode == 1
|
||||
assert count_step_total('test-job-add-step') == 0
|
||||
@@ -130,14 +130,14 @@ def test_job_add_step() -> None:
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-create', 'test-job-add-step', '--jobs-path', get_test_jobs_directory() ]
|
||||
subprocess.run(commands)
|
||||
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-add-step', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-add-step', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
|
||||
|
||||
assert subprocess.run(commands).returncode == 0
|
||||
assert count_step_total('test-job-add-step') == 1
|
||||
|
||||
|
||||
def test_job_remix() -> None:
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-remix-step', 'test-job-remix-step', '0', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-remix-step', 'test-job-remix-step', '0', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
|
||||
|
||||
assert subprocess.run(commands).returncode == 1
|
||||
assert count_step_total('test-job-remix-step') == 0
|
||||
@@ -145,23 +145,23 @@ def test_job_remix() -> None:
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-create', 'test-job-remix-step', '--jobs-path', get_test_jobs_directory() ]
|
||||
subprocess.run(commands)
|
||||
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-remix-step', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-remix-step', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
|
||||
subprocess.run(commands)
|
||||
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-remix-step', 'test-job-remix-step', '0', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-remix-step', 'test-job-remix-step', '0', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
|
||||
|
||||
assert count_step_total('test-job-remix-step') == 1
|
||||
assert subprocess.run(commands).returncode == 0
|
||||
assert count_step_total('test-job-remix-step') == 2
|
||||
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-remix-step', 'test-job-remix-step', '-1', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-remix-step', 'test-job-remix-step', '-1', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
|
||||
|
||||
assert subprocess.run(commands).returncode == 0
|
||||
assert count_step_total('test-job-remix-step') == 3
|
||||
|
||||
|
||||
def test_job_insert_step() -> None:
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-insert-step', 'test-job-insert-step', '0', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-insert-step', 'test-job-insert-step', '0', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
|
||||
|
||||
assert subprocess.run(commands).returncode == 1
|
||||
assert count_step_total('test-job-insert-step') == 0
|
||||
@@ -169,16 +169,16 @@ def test_job_insert_step() -> None:
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-create', 'test-job-insert-step', '--jobs-path', get_test_jobs_directory() ]
|
||||
subprocess.run(commands)
|
||||
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-insert-step', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-insert-step', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
|
||||
subprocess.run(commands)
|
||||
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-insert-step', 'test-job-insert-step', '0', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-insert-step', 'test-job-insert-step', '0', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
|
||||
|
||||
assert count_step_total('test-job-insert-step') == 1
|
||||
assert subprocess.run(commands).returncode == 0
|
||||
assert count_step_total('test-job-insert-step') == 2
|
||||
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-insert-step', 'test-job-insert-step', '-1', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-insert-step', 'test-job-insert-step', '-1', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
|
||||
|
||||
assert subprocess.run(commands).returncode == 0
|
||||
assert count_step_total('test-job-insert-step') == 3
|
||||
@@ -192,7 +192,7 @@ def test_job_remove_step() -> None:
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-create', 'test-job-remove-step', '--jobs-path', get_test_jobs_directory() ]
|
||||
subprocess.run(commands)
|
||||
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-remove-step', '--jobs-path', get_test_jobs_directory(), '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-remove-step', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '-s', get_test_example_file('source.jpg'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-remix-step.jpg') ]
|
||||
subprocess.run(commands)
|
||||
subprocess.run(commands)
|
||||
|
||||
|
||||
@@ -33,7 +33,7 @@ def test_job_run() -> None:
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-create', 'test-job-run', '--jobs-path', get_test_jobs_directory() ]
|
||||
subprocess.run(commands)
|
||||
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-run', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-run.jpg') ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-run.jpg') ]
|
||||
subprocess.run(commands)
|
||||
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-run', 'test-job-run', '--jobs-path', get_test_jobs_directory() ]
|
||||
@@ -61,13 +61,13 @@ def test_job_run_all() -> None:
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-create', 'test-job-run-all-2', '--jobs-path', get_test_jobs_directory() ]
|
||||
subprocess.run(commands)
|
||||
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-run-all-1', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-run-all-1.jpg') ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-run-all-1', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-run-all-1.jpg') ]
|
||||
subprocess.run(commands)
|
||||
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-run-all-2', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-job-run-all-2.mp4'), '--trim-frame-end', '1' ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-run-all-2', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-job-run-all-2.mp4'), '--trim-frame-end', '1' ]
|
||||
subprocess.run(commands)
|
||||
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-run-all-2', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-job-run-all-2.mp4'), '--trim-frame-start', '0', '--trim-frame-end', '1' ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-run-all-2', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-job-run-all-2.mp4'), '--trim-frame-start', '0', '--trim-frame-end', '1' ]
|
||||
subprocess.run(commands)
|
||||
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-run-all', '--jobs-path', get_test_jobs_directory(), '--halt-on-error' ]
|
||||
@@ -93,7 +93,7 @@ def test_job_retry() -> None:
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-create', 'test-job-retry', '--jobs-path', get_test_jobs_directory() ]
|
||||
subprocess.run(commands)
|
||||
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-retry', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-retry.jpg') ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-retry', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-retry.jpg') ]
|
||||
subprocess.run(commands)
|
||||
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-retry', 'test-job-retry', '--jobs-path', get_test_jobs_directory() ]
|
||||
@@ -121,13 +121,13 @@ def test_job_retry_all() -> None:
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-create', 'test-job-retry-all-2', '--jobs-path', get_test_jobs_directory() ]
|
||||
subprocess.run(commands)
|
||||
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-retry-all-1', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-retry-all-1.jpg') ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-retry-all-1', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test-job-retry-all-1.jpg') ]
|
||||
subprocess.run(commands)
|
||||
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-retry-all-2', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-job-retry-all-2.mp4'), '--trim-frame-end', '1' ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-retry-all-2', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-job-retry-all-2.mp4'), '--trim-frame-end', '1' ]
|
||||
subprocess.run(commands)
|
||||
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-retry-all-2', '--jobs-path', get_test_jobs_directory(), '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-job-retry-all-2.mp4'), '--trim-frame-start', '0', '--trim-frame-end', '1' ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-add-step', 'test-job-retry-all-2', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'face_debugger', '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test-job-retry-all-2.mp4'), '--trim-frame-start', '0', '--trim-frame-end', '1' ]
|
||||
subprocess.run(commands)
|
||||
|
||||
commands = [ sys.executable, 'facefusion.py', 'job-retry-all', '--jobs-path', get_test_jobs_directory(), '--halt-on-error' ]
|
||||
|
||||
@@ -27,14 +27,14 @@ def before_each() -> None:
|
||||
|
||||
|
||||
def test_sync_lip_to_image() -> None:
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'lip_syncer', '-s', get_test_example_file('source.mp3'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test_sync_lip_to_image.jpg') ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'audio-to-image', '--processors', 'lip_syncer', '-s', get_test_example_file('source.mp3'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_file('test_sync_lip_to_image.mp4') ]
|
||||
|
||||
assert subprocess.run(commands).returncode == 0
|
||||
assert is_test_output_file('test_sync_lip_to_image.jpg') is True
|
||||
assert is_test_output_file('test_sync_lip_to_image.mp4') is True
|
||||
|
||||
|
||||
def test_sync_lip_to_video() -> None:
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'lip_syncer', '-s', get_test_example_file('source.mp3'), '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test_sync_lip_to_video.mp4'), '--trim-frame-end', '1' ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'lip_syncer', '-s', get_test_example_file('source.mp3'), '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_file('test_sync_lip_to_video.mp4'), '--trim-frame-end', '1' ]
|
||||
|
||||
assert subprocess.run(commands).returncode == 0
|
||||
assert is_test_output_file('test_sync_lip_to_video.mp4') is True
|
||||
|
||||
@@ -36,7 +36,7 @@ def before_each() -> None:
|
||||
])
|
||||
def test_output_image_scale(output_image_scale : Scale, output_image_resolution : Resolution) -> None:
|
||||
output_file_path = get_test_output_file('test-output-image-scale-' + str(output_image_scale) + '.jpg')
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'frame_enhancer', '-t', get_test_example_file('target-240p.jpg'), '-o', output_file_path, '--output-image-scale', str(output_image_scale) ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-image', '--processors', 'frame_enhancer', '-t', get_test_example_file('target-240p.jpg'), '-o', output_file_path, '--output-image-scale', str(output_image_scale) ]
|
||||
|
||||
assert subprocess.run(commands).returncode == 0
|
||||
assert detect_image_resolution(output_file_path) == output_image_resolution
|
||||
@@ -51,7 +51,7 @@ def test_output_image_scale(output_image_scale : Scale, output_image_resolution
|
||||
])
|
||||
def test_output_video_scale(output_video_scale : Scale, output_video_resolution : Resolution) -> None:
|
||||
output_file_path = get_test_output_file('test-output-video-scale-' + str(output_video_scale) + '.mp4')
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--processors', 'frame_enhancer', '-t', get_test_example_file('target-240p.mp4'), '-o', output_file_path, '--trim-frame-end', '1', '--output-video-scale', str(output_video_scale) ]
|
||||
commands = [ sys.executable, 'facefusion.py', 'run', '--jobs-path', get_test_jobs_directory(), '--workflow', 'image-to-video', '--processors', 'frame_enhancer', '-t', get_test_example_file('target-240p.mp4'), '-o', output_file_path, '--trim-frame-end', '1', '--output-video-scale', str(output_video_scale) ]
|
||||
|
||||
assert subprocess.run(commands).returncode == 0
|
||||
assert detect_video_resolution(output_file_path) == output_video_resolution
|
||||
|
||||
+8
-18
@@ -3,7 +3,7 @@ import subprocess
|
||||
import pytest
|
||||
|
||||
from facefusion.download import conditional_download
|
||||
from facefusion.vision import calculate_histogram_difference, count_trim_frame_total, count_video_frame_total, detect_image_resolution, detect_video_duration, detect_video_fps, detect_video_resolution, match_frame_color, normalize_resolution, pack_resolution, predict_video_frame_total, read_image, read_video_frame, restrict_image_resolution, restrict_trim_frame, restrict_video_fps, restrict_video_resolution, scale_resolution, unpack_resolution, write_image
|
||||
from facefusion.vision import calculate_histogram_difference, count_video_frame_total, detect_image_resolution, detect_video_duration, detect_video_fps, detect_video_resolution, match_frame_color, normalize_resolution, pack_resolution, predict_video_frame_total, read_image, read_video_frame, restrict_image_resolution, restrict_trim_video_frame, restrict_video_fps, restrict_video_resolution, scale_resolution, unpack_resolution, write_image
|
||||
from .helper import get_test_example_file, get_test_examples_directory, get_test_output_file, prepare_test_output_directory
|
||||
|
||||
|
||||
@@ -97,24 +97,14 @@ def test_detect_video_duration() -> None:
|
||||
assert detect_video_duration('invalid') == 0
|
||||
|
||||
|
||||
def test_count_trim_frame_total() -> None:
|
||||
assert count_trim_frame_total(get_test_example_file('target-240p.mp4'), 0, 200) == 200
|
||||
assert count_trim_frame_total(get_test_example_file('target-240p.mp4'), 70, 270) == 200
|
||||
assert count_trim_frame_total(get_test_example_file('target-240p.mp4'), -10, None) == 270
|
||||
assert count_trim_frame_total(get_test_example_file('target-240p.mp4'), None, -10) == 0
|
||||
assert count_trim_frame_total(get_test_example_file('target-240p.mp4'), 280, None) == 0
|
||||
assert count_trim_frame_total(get_test_example_file('target-240p.mp4'), None, 280) == 270
|
||||
assert count_trim_frame_total(get_test_example_file('target-240p.mp4'), None, None) == 270
|
||||
|
||||
|
||||
def test_restrict_trim_frame() -> None:
|
||||
assert restrict_trim_frame(get_test_example_file('target-240p.mp4'), 0, 200) == (0, 200)
|
||||
assert restrict_trim_frame(get_test_example_file('target-240p.mp4'), 70, 270) == (70, 270)
|
||||
assert restrict_trim_frame(get_test_example_file('target-240p.mp4'), -10, None) == (0, 270)
|
||||
assert restrict_trim_frame(get_test_example_file('target-240p.mp4'), None, -10) == (0, 0)
|
||||
assert restrict_trim_frame(get_test_example_file('target-240p.mp4'), 280, None) == (270, 270)
|
||||
assert restrict_trim_frame(get_test_example_file('target-240p.mp4'), None, 280) == (0, 270)
|
||||
assert restrict_trim_frame(get_test_example_file('target-240p.mp4'), None, None) == (0, 270)
|
||||
assert restrict_trim_video_frame(get_test_example_file('target-240p.mp4'), 0, 200) == (0, 200)
|
||||
assert restrict_trim_video_frame(get_test_example_file('target-240p.mp4'), 70, 270) == (70, 270)
|
||||
assert restrict_trim_video_frame(get_test_example_file('target-240p.mp4'), -10, None) == (0, 270)
|
||||
assert restrict_trim_video_frame(get_test_example_file('target-240p.mp4'), None, -10) == (0, 0)
|
||||
assert restrict_trim_video_frame(get_test_example_file('target-240p.mp4'), 280, None) == (270, 270)
|
||||
assert restrict_trim_video_frame(get_test_example_file('target-240p.mp4'), None, 280) == (0, 270)
|
||||
assert restrict_trim_video_frame(get_test_example_file('target-240p.mp4'), None, None) == (0, 270)
|
||||
|
||||
|
||||
def test_detect_video_resolution() -> None:
|
||||
|
||||
Reference in New Issue
Block a user