This commit is contained in:
harisreedhar
2025-11-25 16:46:56 +05:30
committed by henryruhs
parent 549de72bef
commit 35c2022700
9 changed files with 48 additions and 51 deletions
+1 -1
View File
@@ -29,6 +29,6 @@ def graceful_exit(error_code : ErrorCode) -> None:
sleep(0.5)
if state_manager.get_item('output_path'):
clear_temp_directory(state_manager.get_item('output_path'))
clear_temp_directory(state_manager.get_item('output_path'), state_manager.get_item('temp_path'))
hard_exit(error_code)
+7 -7
View File
@@ -109,7 +109,7 @@ def get_available_encoder_set() -> EncoderSet:
def extract_frames(target_path : str, output_path : str, temp_video_resolution : Resolution, temp_video_fps : Fps, trim_frame_start : int, trim_frame_end : int) -> bool:
extract_frame_total = predict_video_frame_total(target_path, temp_video_fps, trim_frame_start, trim_frame_end)
temp_frames_pattern = get_temp_frames_pattern(output_path, '%08d')
temp_frames_pattern = get_temp_frames_pattern(output_path, '%08d', state_manager.get_item('temp_path'))
commands = ffmpeg_builder.chain(
ffmpeg_builder.set_input(target_path),
ffmpeg_builder.set_media_resolution(pack_resolution(temp_video_resolution)),
@@ -125,7 +125,7 @@ def extract_frames(target_path : str, output_path : str, temp_video_resolution :
def copy_image(target_path : str, output_path : str, temp_image_resolution : Resolution) -> bool:
temp_image_path = get_temp_file_path(output_path)
temp_image_path = get_temp_file_path(output_path, state_manager.get_item('temp_path'))
commands = ffmpeg_builder.chain(
ffmpeg_builder.set_input(target_path),
ffmpeg_builder.set_media_resolution(pack_resolution(temp_image_resolution)),
@@ -137,7 +137,7 @@ def copy_image(target_path : str, output_path : str, temp_image_resolution : Res
def finalize_image(output_path : str, output_image_resolution : Resolution) -> bool:
output_image_quality = state_manager.get_item('output_image_quality')
temp_image_path = get_temp_file_path(output_path)
temp_image_path = get_temp_file_path(output_path, state_manager.get_item('temp_path'))
commands = ffmpeg_builder.chain(
ffmpeg_builder.set_input(temp_image_path),
ffmpeg_builder.set_media_resolution(pack_resolution(output_image_resolution)),
@@ -169,7 +169,7 @@ def restore_audio(target_path : str, output_path : str, trim_frame_start : int,
output_audio_quality = state_manager.get_item('output_audio_quality')
output_audio_volume = state_manager.get_item('output_audio_volume')
target_video_fps = detect_video_fps(target_path)
temp_video_path = get_temp_file_path(output_path)
temp_video_path = get_temp_file_path(output_path, state_manager.get_item('temp_path'))
temp_video_format = cast(VideoFormat, get_file_format(output_path))
temp_video_duration = detect_video_duration(temp_video_path)
@@ -194,7 +194,7 @@ def replace_audio(audio_path : str, output_path : str) -> bool:
output_audio_encoder = state_manager.get_item('output_audio_encoder')
output_audio_quality = state_manager.get_item('output_audio_quality')
output_audio_volume = state_manager.get_item('output_audio_volume')
temp_video_path = get_temp_file_path(output_path)
temp_video_path = get_temp_file_path(output_path, state_manager.get_item('temp_path'))
temp_video_format = cast(VideoFormat, get_file_format(output_path))
temp_video_duration = detect_video_duration(temp_video_path)
@@ -218,9 +218,9 @@ def merge_video(target_path : str, output_path : str, temp_video_fps : Fps, outp
output_video_quality = state_manager.get_item('output_video_quality')
output_video_preset = state_manager.get_item('output_video_preset')
merge_frame_total = predict_video_frame_total(target_path, output_video_fps, trim_frame_start, trim_frame_end)
temp_video_path = get_temp_file_path(output_path)
temp_video_path = get_temp_file_path(output_path, state_manager.get_item('temp_path'))
temp_video_format = cast(VideoFormat, get_file_format(output_path))
temp_frames_pattern = get_temp_frames_pattern(output_path, '%08d')
temp_frames_pattern = get_temp_frames_pattern(output_path, '%08d', state_manager.get_item('temp_path'))
output_video_encoder = fix_video_encoder(temp_video_format, output_video_encoder)
commands = ffmpeg_builder.chain(
+16 -16
View File
@@ -5,46 +5,46 @@ from facefusion import state_manager
from facefusion.filesystem import create_directory, get_file_extension, get_file_name, move_file, remove_directory, resolve_file_pattern
def get_temp_file_path(file_path : str) -> str:
temp_directory_path = get_temp_directory_path(file_path)
def get_temp_file_path(file_path : str, temp_path : str) -> str:
temp_directory_path = get_temp_directory_path(file_path, temp_path)
temp_file_extension = get_file_extension(file_path)
return os.path.join(temp_directory_path, 'temp' + temp_file_extension)
def move_temp_file(move_path : str) -> bool:
temp_file_path = get_temp_file_path(move_path)
def move_temp_file(move_path : str, temp_path : str) -> bool:
temp_file_path = get_temp_file_path(move_path, temp_path)
return move_file(temp_file_path, move_path)
def resolve_temp_frame_paths(target_path : str) -> List[str]:
temp_frames_pattern = get_temp_frames_pattern(target_path, '*')
def resolve_temp_frame_paths(target_path : str, temp_path : str) -> List[str]:
temp_frames_pattern = get_temp_frames_pattern(target_path, '*', temp_path)
return resolve_file_pattern(temp_frames_pattern)
def get_temp_frames_pattern(target_path : str, temp_frame_prefix : str) -> str:
temp_directory_path = get_temp_directory_path(target_path)
def get_temp_frames_pattern(target_path : str, temp_frame_prefix : str, temp_path : str) -> str:
temp_directory_path = get_temp_directory_path(target_path, temp_path)
return os.path.join(temp_directory_path, temp_frame_prefix + '.' + state_manager.get_item('temp_frame_format')) # TODO: remove state_manager.get
def get_temp_directory_path(file_path : str) -> str:
def get_temp_directory_path(file_path : str, temp_path : str) -> str:
temp_file_name = get_file_name(file_path)
return os.path.join(state_manager.get_item('temp_path'), 'facefusion', temp_file_name) # TODO: remove state_manager.get
return os.path.join(temp_path, 'facefusion', temp_file_name)
def create_temp_directory(file_path : str) -> bool:
temp_directory_path = get_temp_directory_path(file_path)
def create_temp_directory(file_path : str, temp_path : str) -> bool:
temp_directory_path = get_temp_directory_path(file_path, temp_path)
return create_directory(temp_directory_path)
def clear_temp_directory(file_path : str) -> bool:
def clear_temp_directory(file_path : str, temp_path : str) -> bool:
if not state_manager.get_item('keep_temp'): # TODO: remove state_manager.get
temp_directory_path = get_temp_directory_path(file_path)
temp_directory_path = get_temp_directory_path(file_path, temp_path)
return remove_directory(temp_directory_path)
return True
def get_temp_sequence_paths(file_path : str, frame_total : int, temp_frame_prefix : str, temp_frame_format : str) -> List[str]:
temp_directory_path = get_temp_directory_path(file_path)
def get_temp_sequence_paths(file_path : str, frame_total : int, temp_frame_prefix : str, temp_frame_format : str, temp_path : str) -> List[str]:
temp_directory_path = get_temp_directory_path(file_path, temp_path)
temp_frame_paths = []
for frame_number in range(frame_total):
+3 -3
View File
@@ -53,7 +53,7 @@ def process_image() -> ErrorCode:
source_audio_path = get_first(filter_audio_paths(state_manager.get_item('source_paths')))
trim_frame_start, trim_frame_end = restrict_trim_audio_frame(source_audio_path, state_manager.get_item('output_video_fps'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end'))
audio_frame_total = trim_frame_end - trim_frame_start
temp_frame_paths = get_temp_sequence_paths(state_manager.get_item('output_path'), audio_frame_total, '%08d', state_manager.get_item('temp_frame_format'))
temp_frame_paths = get_temp_sequence_paths(state_manager.get_item('output_path'), audio_frame_total, '%08d', state_manager.get_item('temp_frame_format'), state_manager.get_item('temp_path'))
if temp_frame_paths:
with tqdm(total = len(temp_frame_paths), desc = translator.get('processing'), unit = 'frame', ascii = ' =', disable = state_manager.get_item('log_level') in [ 'warn', 'error' ]) as progress:
@@ -137,7 +137,7 @@ def merge_frames() -> ErrorCode:
def restore_audio() -> ErrorCode:
if state_manager.get_item('output_audio_volume') == 0:
logger.info(translator.get('skipping_audio'), __name__)
move_temp_file(state_manager.get_item('output_path'))
move_temp_file(state_manager.get_item('output_path'), state_manager.get_item('temp_path'))
else:
source_audio_path = get_first(filter_audio_paths(state_manager.get_item('source_paths')))
if source_audio_path:
@@ -147,7 +147,7 @@ def restore_audio() -> ErrorCode:
if is_process_stopping():
return 4
logger.warn(translator.get('replacing_audio_skipped'), __name__)
move_temp_file(state_manager.get_item('output_path'))
move_temp_file(state_manager.get_item('output_path'), state_manager.get_item('temp_path'))
return 0
+2 -2
View File
@@ -11,12 +11,12 @@ def is_process_stopping() -> bool:
def setup() -> ErrorCode:
create_temp_directory(state_manager.get_item('output_path'))
create_temp_directory(state_manager.get_item('output_path'), state_manager.get_item('temp_path'))
logger.debug(translator.get('creating_temp'), __name__)
return 0
def clear() -> ErrorCode:
clear_temp_directory(state_manager.get_item('output_path'))
clear_temp_directory(state_manager.get_item('output_path'), state_manager.get_item('temp_path'))
logger.debug(translator.get('clearing_temp'), __name__)
return 0
+1 -1
View File
@@ -57,7 +57,7 @@ def prepare_image() -> ErrorCode:
def process_image() -> ErrorCode:
temp_image_path = get_temp_file_path(state_manager.get_item('output_path'))
temp_image_path = get_temp_file_path(state_manager.get_item('output_path'), state_manager.get_item('temp_path'))
reference_vision_frame = read_static_image(state_manager.get_item('target_path'))
source_vision_frames = read_static_images(state_manager.get_item('source_paths'))
source_audio_frame = create_empty_audio_frame()
+4 -4
View File
@@ -69,7 +69,7 @@ def extract_frames() -> ErrorCode:
def process_video() -> ErrorCode:
temp_frame_paths = resolve_temp_frame_paths(state_manager.get_item('output_path'))
temp_frame_paths = resolve_temp_frame_paths(state_manager.get_item('output_path'), state_manager.get_item('temp_path'))
if temp_frame_paths:
with tqdm(total = len(temp_frame_paths), desc = translator.get('processing'), unit = 'frame', ascii = ' =', disable = state_manager.get_item('log_level') in [ 'warn', 'error' ]) as progress:
@@ -123,7 +123,7 @@ def restore_audio() -> ErrorCode:
if state_manager.get_item('output_audio_volume') == 0:
logger.info(translator.get('skipping_audio'), __name__)
move_temp_file(state_manager.get_item('output_path'))
move_temp_file(state_manager.get_item('output_path'), state_manager.get_item('temp_path'))
else:
source_audio_path = get_first(filter_audio_paths(state_manager.get_item('source_paths')))
if source_audio_path:
@@ -135,7 +135,7 @@ def restore_audio() -> ErrorCode:
if is_process_stopping():
return 4
logger.warn(translator.get('replacing_audio_skipped'), __name__)
move_temp_file(state_manager.get_item('output_path'))
move_temp_file(state_manager.get_item('output_path'), state_manager.get_item('temp_path'))
else:
if ffmpeg.restore_audio(state_manager.get_item('target_path'), state_manager.get_item('output_path'), trim_frame_start, trim_frame_end):
video_manager.clear_video_pool()
@@ -145,7 +145,7 @@ def restore_audio() -> ErrorCode:
if is_process_stopping():
return 4
logger.warn(translator.get('restoring_audio_skipped'), __name__)
move_temp_file(state_manager.get_item('output_path'))
move_temp_file(state_manager.get_item('output_path'), state_manager.get_item('temp_path'))
return 0
+11 -11
View File
@@ -82,12 +82,12 @@ def test_extract_frames() -> None:
]
for target_path, output_path, trim_frame_start, trim_frame_end, frame_total in test_set:
create_temp_directory(output_path)
create_temp_directory(output_path, state_manager.get_item('temp_path'))
assert extract_frames(target_path, output_path, (452, 240), 30.0, trim_frame_start, trim_frame_end) is True
assert len(resolve_temp_frame_paths(output_path)) == frame_total
assert len(resolve_temp_frame_paths(output_path, state_manager.get_item('temp_path'))) == frame_total
clear_temp_directory(output_path)
clear_temp_directory(output_path, state_manager.get_item('temp_path'))
def test_merge_video() -> None:
@@ -108,12 +108,12 @@ def test_merge_video() -> None:
state_manager.init_item('output_path', target_path)
state_manager.init_item('output_video_fps', 25.0)
state_manager.init_item('output_video_encoder', output_video_encoder)
create_temp_directory(output_path)
create_temp_directory(output_path, state_manager.get_item('temp_path'))
extract_frames(target_path, output_path, (452, 240), 25.0, 0, 1)
assert merge_video(target_path, output_path, 25.0, (452, 240), 0, 1) is True
clear_temp_directory(output_path)
clear_temp_directory(output_path, state_manager.get_item('temp_path'))
state_manager.init_item('output_video_encoder', 'libx264')
@@ -150,15 +150,15 @@ def test_restore_audio() -> None:
output_audio_encoders = get_available_encoder_set().get('audio')
for target_path, output_path in test_set:
create_temp_directory(output_path)
create_temp_directory(output_path, state_manager.get_item('temp_path'))
for output_audio_encoder in output_audio_encoders:
state_manager.init_item('output_audio_encoder', output_audio_encoder)
copy_file(target_path, get_temp_file_path(output_path))
copy_file(target_path, get_temp_file_path(output_path, state_manager.get_item('temp_path')))
assert restore_audio(target_path, output_path, 0, 270) is True
clear_temp_directory(output_path)
clear_temp_directory(output_path, state_manager.get_item('temp_path'))
state_manager.init_item('output_audio_encoder', 'aac')
@@ -177,15 +177,15 @@ def test_replace_audio() -> None:
output_audio_encoders = get_available_encoder_set().get('audio')
for target_path, output_path in test_set:
create_temp_directory(output_path)
create_temp_directory(output_path, state_manager.get_item('temp_path'))
for output_audio_encoder in output_audio_encoders:
state_manager.init_item('output_audio_encoder', output_audio_encoder)
copy_file(target_path, get_temp_file_path(output_path))
copy_file(target_path, get_temp_file_path(output_path, state_manager.get_item('temp_path')))
assert replace_audio(get_test_example_file('source.mp3'), output_path) is True
assert replace_audio(get_test_example_file('source.wav'), output_path) is True
clear_temp_directory(output_path)
clear_temp_directory(output_path, state_manager.get_item('temp_path'))
state_manager.init_item('output_audio_encoder', 'aac')
+3 -6
View File
@@ -21,15 +21,12 @@ def before_all() -> None:
def test_get_temp_file_path() -> None:
state_manager.init_item('output_path', 'temp.mp4')
temp_directory = tempfile.gettempdir()
assert get_temp_file_path(get_test_example_file('target-240p.mp4')) == os.path.join(temp_directory, 'facefusion', 'target-240p', 'temp.mp4')
assert get_temp_file_path(get_test_example_file('target-240p.mp4'), state_manager.get_item('temp_path')) == os.path.join(temp_directory, 'facefusion', 'target-240p', 'temp.mp4')
def test_get_temp_directory_path() -> None:
temp_directory = tempfile.gettempdir()
assert get_temp_directory_path(get_test_example_file('target-240p.mp4')) == os.path.join(temp_directory, 'facefusion', 'target-240p')
assert get_temp_directory_path(get_test_example_file('target-240p.mp4'), state_manager.get_item('temp_path')) == os.path.join(temp_directory, 'facefusion', 'target-240p')
def test_get_temp_frames_pattern() -> None:
temp_directory = tempfile.gettempdir()
assert get_temp_frames_pattern(get_test_example_file('target-240p.mp4'), '%04d') == os.path.join(temp_directory, 'facefusion', 'target-240p', '%04d.png')
assert get_temp_frames_pattern(get_test_example_file('target-240p.mp4'), '%04d', state_manager.get_item('temp_path')) == os.path.join(temp_directory, 'facefusion', 'target-240p', '%04d.png')