refactor temp handling from target-path to output-path

This commit is contained in:
harisreedhar
2025-11-23 08:14:51 +05:30
committed by henryruhs
parent d548353cf6
commit 5ca7f3d0dc
6 changed files with 48 additions and 46 deletions
+12 -13
View File
@@ -107,9 +107,9 @@ def get_available_encoder_set() -> EncoderSet:
return available_encoder_set
def extract_frames(target_path : str, temp_video_resolution : Resolution, temp_video_fps : Fps, trim_frame_start : int, trim_frame_end : int) -> bool:
def extract_frames(target_path : str, output_path : str, temp_video_resolution : Resolution, temp_video_fps : Fps, trim_frame_start : int, trim_frame_end : int) -> bool:
extract_frame_total = predict_video_frame_total(target_path, temp_video_fps, trim_frame_start, trim_frame_end)
temp_frames_pattern = get_temp_frames_pattern(target_path, '%08d')
temp_frames_pattern = get_temp_frames_pattern(output_path, '%08d')
commands = ffmpeg_builder.chain(
ffmpeg_builder.set_input(target_path),
ffmpeg_builder.set_media_resolution(pack_resolution(temp_video_resolution)),
@@ -125,8 +125,8 @@ def extract_frames(target_path : str, temp_video_resolution : Resolution, temp_v
return process.returncode == 0
def copy_image(target_path : str, temp_image_resolution : Resolution) -> bool:
temp_image_path = get_temp_file_path(target_path)
def copy_image(target_path : str, output_path : str, temp_image_resolution : Resolution) -> bool:
temp_image_path = get_temp_file_path(output_path)
commands = ffmpeg_builder.chain(
ffmpeg_builder.set_input(target_path),
ffmpeg_builder.set_media_resolution(pack_resolution(temp_image_resolution)),
@@ -136,9 +136,9 @@ def copy_image(target_path : str, temp_image_resolution : Resolution) -> bool:
return run_ffmpeg(commands).returncode == 0
def finalize_image(target_path : str, output_path : str, output_image_resolution : Resolution) -> bool:
def finalize_image(output_path : str, output_image_resolution : Resolution) -> bool:
output_image_quality = state_manager.get_item('output_image_quality')
temp_image_path = get_temp_file_path(target_path)
temp_image_path = get_temp_file_path(output_path)
commands = ffmpeg_builder.chain(
ffmpeg_builder.set_input(temp_image_path),
ffmpeg_builder.set_media_resolution(pack_resolution(output_image_resolution)),
@@ -170,7 +170,7 @@ def restore_audio(target_path : str, output_path : str, trim_frame_start : int,
output_audio_quality = state_manager.get_item('output_audio_quality')
output_audio_volume = state_manager.get_item('output_audio_volume')
target_video_fps = detect_video_fps(target_path)
temp_video_path = get_temp_file_path(target_path)
temp_video_path = get_temp_file_path(output_path)
temp_video_format = cast(VideoFormat, get_file_format(output_path))
temp_video_duration = detect_video_duration(temp_video_path)
@@ -191,11 +191,11 @@ def restore_audio(target_path : str, output_path : str, trim_frame_start : int,
return run_ffmpeg(commands).returncode == 0
def replace_audio(target_path : str, audio_path : str, output_path : str) -> bool:
def replace_audio(audio_path : str, output_path : str) -> bool:
output_audio_encoder = state_manager.get_item('output_audio_encoder')
output_audio_quality = state_manager.get_item('output_audio_quality')
output_audio_volume = state_manager.get_item('output_audio_volume')
temp_video_path = get_temp_file_path(target_path)
temp_video_path = get_temp_file_path(output_path)
temp_video_format = cast(VideoFormat, get_file_format(output_path))
temp_video_duration = detect_video_duration(temp_video_path)
@@ -213,16 +213,15 @@ def replace_audio(target_path : str, audio_path : str, output_path : str) -> boo
return run_ffmpeg(commands).returncode == 0
def merge_video(target_path : str, temp_video_fps : Fps, output_video_resolution : Resolution, trim_frame_start : int, trim_frame_end : int) -> bool:
output_path = state_manager.get_item('output_path')
def merge_video(target_path : str, output_path : str, temp_video_fps : Fps, output_video_resolution : Resolution, trim_frame_start : int, trim_frame_end : int) -> bool:
output_video_fps = state_manager.get_item('output_video_fps')
output_video_encoder = state_manager.get_item('output_video_encoder')
output_video_quality = state_manager.get_item('output_video_quality')
output_video_preset = state_manager.get_item('output_video_preset')
merge_frame_total = predict_video_frame_total(target_path, output_video_fps, trim_frame_start, trim_frame_end)
temp_video_path = get_temp_file_path(target_path)
temp_video_path = get_temp_file_path(output_path)
temp_video_format = cast(VideoFormat, get_file_format(output_path))
temp_frames_pattern = get_temp_frames_pattern(target_path, '%08d')
temp_frames_pattern = get_temp_frames_pattern(output_path, '%08d')
output_video_encoder = fix_video_encoder(temp_video_format, output_video_encoder)
commands = ffmpeg_builder.chain(
+3 -3
View File
@@ -7,12 +7,12 @@ from facefusion.filesystem import create_directory, get_file_extension, get_file
def get_temp_file_path(file_path : str) -> str:
temp_directory_path = get_temp_directory_path(file_path)
temp_file_extension = get_file_extension(state_manager.get_item('output_path'))
temp_file_extension = get_file_extension(file_path)
return os.path.join(temp_directory_path, 'temp' + temp_file_extension)
def move_temp_file(file_path : str, move_path : str) -> bool:
temp_file_path = get_temp_file_path(file_path)
def move_temp_file(move_path : str) -> bool:
temp_file_path = get_temp_file_path(move_path)
return move_file(temp_file_path, move_path)
+2 -2
View File
@@ -11,12 +11,12 @@ def is_process_stopping() -> bool:
def setup() -> ErrorCode:
create_temp_directory(state_manager.get_item('target_path'))
create_temp_directory(state_manager.get_item('output_path'))
logger.debug(translator.get('creating_temp'), __name__)
return 0
def clear() -> ErrorCode:
clear_temp_directory(state_manager.get_item('target_path'))
clear_temp_directory(state_manager.get_item('output_path'))
logger.debug(translator.get('clearing_temp'), __name__)
return 0
+6 -5
View File
@@ -47,7 +47,7 @@ def prepare_image() -> ErrorCode:
temp_image_resolution = restrict_image_resolution(state_manager.get_item('target_path'), output_image_resolution)
logger.info(translator.get('copying_image').format(resolution = pack_resolution(temp_image_resolution)), __name__)
if ffmpeg.copy_image(state_manager.get_item('target_path'), temp_image_resolution):
if ffmpeg.copy_image(state_manager.get_item('target_path'), state_manager.get_item('output_path'), temp_image_resolution):
logger.debug(translator.get('copying_image_succeeded'), __name__)
else:
logger.error(translator.get('copying_image_failed'), __name__)
@@ -57,12 +57,13 @@ def prepare_image() -> ErrorCode:
def process_image() -> ErrorCode:
temp_image_path = get_temp_file_path(state_manager.get_item('target_path'))
reference_vision_frame = read_static_image(temp_image_path)
target_image_path = state_manager.get_item('target_path')
temp_image_path = get_temp_file_path(state_manager.get_item('output_path'))
reference_vision_frame = read_static_image(target_image_path)
source_vision_frames = read_static_images(state_manager.get_item('source_paths'))
source_audio_frame = create_empty_audio_frame()
source_voice_frame = create_empty_audio_frame()
target_vision_frame = read_static_image(temp_image_path, 'rgba')
target_vision_frame = read_static_image(target_image_path, 'rgba')
temp_vision_frame = target_vision_frame.copy()
temp_vision_mask = extract_vision_mask(temp_vision_frame)
@@ -94,7 +95,7 @@ def finalize_image(start_time : float) -> ErrorCode:
output_image_resolution = scale_resolution(detect_image_resolution(state_manager.get_item('target_path')), state_manager.get_item('output_image_scale'))
logger.info(translator.get('finalizing_image').format(resolution = pack_resolution(output_image_resolution)), __name__)
if ffmpeg.finalize_image(state_manager.get_item('target_path'), state_manager.get_item('output_path'), output_image_resolution):
if ffmpeg.finalize_image(state_manager.get_item('output_path'), output_image_resolution):
logger.debug(translator.get('finalizing_image_succeeded'), __name__)
else:
logger.warn(translator.get('finalizing_image_skipped'), __name__)
+7 -7
View File
@@ -58,7 +58,7 @@ def extract_frames() -> ErrorCode:
temp_video_fps = restrict_video_fps(state_manager.get_item('target_path'), state_manager.get_item('output_video_fps'))
logger.info(translator.get('extracting_frames').format(resolution=pack_resolution(temp_video_resolution), fps=temp_video_fps), __name__)
if ffmpeg.extract_frames(state_manager.get_item('target_path'), temp_video_resolution, temp_video_fps, trim_frame_start, trim_frame_end):
if ffmpeg.extract_frames(state_manager.get_item('target_path'), state_manager.get_item('output_path'), temp_video_resolution, temp_video_fps, trim_frame_start, trim_frame_end):
logger.debug(translator.get('extracting_frames_succeeded'), __name__)
else:
if is_process_stopping():
@@ -69,7 +69,7 @@ def extract_frames() -> ErrorCode:
def process_video() -> ErrorCode:
temp_frame_paths = resolve_temp_frame_paths(state_manager.get_item('target_path'))
temp_frame_paths = resolve_temp_frame_paths(state_manager.get_item('output_path'))
if temp_frame_paths:
with tqdm(total = len(temp_frame_paths), desc = translator.get('processing'), unit = 'frame', ascii = ' =', disable = state_manager.get_item('log_level') in [ 'warn', 'error' ]) as progress:
@@ -108,7 +108,7 @@ def merge_frames() -> ErrorCode:
temp_video_fps = restrict_video_fps(state_manager.get_item('target_path'), state_manager.get_item('output_video_fps'))
logger.info(translator.get('merging_video').format(resolution = pack_resolution(output_video_resolution), fps = state_manager.get_item('output_video_fps')), __name__)
if ffmpeg.merge_video(state_manager.get_item('target_path'), temp_video_fps, output_video_resolution, trim_frame_start, trim_frame_end):
if ffmpeg.merge_video(state_manager.get_item('target_path'), state_manager.get_item('output_path'), temp_video_fps, output_video_resolution, trim_frame_start, trim_frame_end):
logger.debug(translator.get('merging_video_succeeded'), __name__)
else:
if is_process_stopping():
@@ -123,11 +123,11 @@ def restore_audio() -> ErrorCode:
if state_manager.get_item('output_audio_volume') == 0:
logger.info(translator.get('skipping_audio'), __name__)
move_temp_file(state_manager.get_item('target_path'), state_manager.get_item('output_path'))
move_temp_file(state_manager.get_item('output_path'))
else:
source_audio_path = get_first(filter_audio_paths(state_manager.get_item('source_paths')))
if source_audio_path:
if ffmpeg.replace_audio(state_manager.get_item('target_path'), source_audio_path, state_manager.get_item('output_path')):
if ffmpeg.replace_audio(source_audio_path, state_manager.get_item('output_path')):
video_manager.clear_video_pool()
logger.debug(translator.get('replacing_audio_succeeded'), __name__)
else:
@@ -135,7 +135,7 @@ def restore_audio() -> ErrorCode:
if is_process_stopping():
return 4
logger.warn(translator.get('replacing_audio_skipped'), __name__)
move_temp_file(state_manager.get_item('target_path'), state_manager.get_item('output_path'))
move_temp_file(state_manager.get_item('output_path'))
else:
if ffmpeg.restore_audio(state_manager.get_item('target_path'), state_manager.get_item('output_path'), trim_frame_start, trim_frame_end):
video_manager.clear_video_pool()
@@ -145,7 +145,7 @@ def restore_audio() -> ErrorCode:
if is_process_stopping():
return 4
logger.warn(translator.get('restoring_audio_skipped'), __name__)
move_temp_file(state_manager.get_item('target_path'), state_manager.get_item('output_path'))
move_temp_file(state_manager.get_item('output_path'))
return 0
+18 -16
View File
@@ -82,12 +82,13 @@ def test_extract_frames() -> None:
]
for target_path, trim_frame_start, trim_frame_end, frame_total in test_set:
create_temp_directory(target_path)
output_path = get_test_output_file(f'test-extract-frames-{trim_frame_start}-{trim_frame_end}.mp4')
create_temp_directory(output_path)
assert extract_frames(target_path, (452, 240), 30.0, trim_frame_start, trim_frame_end) is True
assert len(resolve_temp_frame_paths(target_path)) == frame_total
assert extract_frames(target_path, output_path, (452, 240), 30.0, trim_frame_start, trim_frame_end) is True
assert len(resolve_temp_frame_paths(output_path)) == frame_total
clear_temp_directory(target_path)
clear_temp_directory(output_path)
def test_merge_video() -> None:
@@ -105,15 +106,16 @@ def test_merge_video() -> None:
for target_path in target_paths:
for output_video_encoder in output_video_encoders:
output_path = get_test_output_file(f'test-merge-video-{output_video_encoder}.mp4')
state_manager.init_item('output_path', target_path)
state_manager.init_item('output_video_fps', 25.0)
state_manager.init_item('output_video_encoder', output_video_encoder)
create_temp_directory(target_path)
extract_frames(target_path, (452, 240), 25.0, 0, 1)
create_temp_directory(output_path)
extract_frames(target_path, output_path, (452, 240), 25.0, 0, 1)
assert merge_video(target_path, 25.0, (452, 240), 0, 1) is True
assert merge_video(target_path, output_path, 25.0, (452, 240), 0, 1) is True
clear_temp_directory(target_path)
clear_temp_directory(output_path)
state_manager.init_item('output_video_encoder', 'libx264')
@@ -150,15 +152,15 @@ def test_restore_audio() -> None:
output_audio_encoders = get_available_encoder_set().get('audio')
for target_path, output_path in test_set:
create_temp_directory(target_path)
create_temp_directory(output_path)
for output_audio_encoder in output_audio_encoders:
state_manager.init_item('output_audio_encoder', output_audio_encoder)
copy_file(target_path, get_temp_file_path(target_path))
copy_file(target_path, get_temp_file_path(output_path))
assert restore_audio(target_path, output_path, 0, 270) is True
clear_temp_directory(target_path)
clear_temp_directory(output_path)
state_manager.init_item('output_audio_encoder', 'aac')
@@ -177,15 +179,15 @@ def test_replace_audio() -> None:
output_audio_encoders = get_available_encoder_set().get('audio')
for target_path, output_path in test_set:
create_temp_directory(target_path)
create_temp_directory(output_path)
for output_audio_encoder in output_audio_encoders:
state_manager.init_item('output_audio_encoder', output_audio_encoder)
copy_file(target_path, get_temp_file_path(target_path))
copy_file(target_path, get_temp_file_path(output_path))
assert replace_audio(target_path, get_test_example_file('source.mp3'), output_path) is True
assert replace_audio(target_path, get_test_example_file('source.wav'), output_path) is True
assert replace_audio(get_test_example_file('source.mp3'), output_path) is True
assert replace_audio(get_test_example_file('source.wav'), output_path) is True
clear_temp_directory(target_path)
clear_temp_directory(output_path)
state_manager.init_item('output_audio_encoder', 'aac')