mirror of
https://github.com/facefusion/facefusion.git
synced 2026-04-29 04:55:57 +02:00
Add simple path isolation (#992)
This commit is contained in:
+4
-4
@@ -58,25 +58,25 @@ def route(args : Args) -> None:
|
||||
hard_exit(1)
|
||||
|
||||
if state_manager.get_item('command') in [ 'job-list', 'job-create', 'job-submit', 'job-submit-all', 'job-delete', 'job-delete-all', 'job-add-step', 'job-remix-step', 'job-insert-step', 'job-remove-step' ]:
|
||||
if not job_manager.init_jobs(state_manager.get_item('jobs_path')):
|
||||
if not job_manager.init_jobs(state_manager.get_jobs_path()):
|
||||
hard_exit(1)
|
||||
error_code = route_job_manager(args)
|
||||
hard_exit(error_code)
|
||||
|
||||
if state_manager.get_item('command') == 'run':
|
||||
if not job_manager.init_jobs(state_manager.get_item('jobs_path')):
|
||||
if not job_manager.init_jobs(state_manager.get_jobs_path()):
|
||||
hard_exit(1)
|
||||
error_code = process_headless(args)
|
||||
hard_exit(error_code)
|
||||
|
||||
if state_manager.get_item('command') == 'batch-run':
|
||||
if not job_manager.init_jobs(state_manager.get_item('jobs_path')):
|
||||
if not job_manager.init_jobs(state_manager.get_jobs_path()):
|
||||
hard_exit(1)
|
||||
error_code = process_batch(args)
|
||||
hard_exit(error_code)
|
||||
|
||||
if state_manager.get_item('command') in [ 'job-run', 'job-run-all', 'job-retry', 'job-retry-all' ]:
|
||||
if not job_manager.init_jobs(state_manager.get_item('jobs_path')):
|
||||
if not job_manager.init_jobs(state_manager.get_jobs_path()):
|
||||
hard_exit(1)
|
||||
error_code = route_job_runner()
|
||||
hard_exit(error_code)
|
||||
|
||||
@@ -29,6 +29,6 @@ def graceful_exit(error_code : ErrorCode) -> None:
|
||||
sleep(0.5)
|
||||
|
||||
if state_manager.get_item('output_path'):
|
||||
clear_temp_directory(state_manager.get_item('temp_path'), state_manager.get_item('output_path'))
|
||||
clear_temp_directory(state_manager.get_temp_path(), state_manager.get_item('output_path'))
|
||||
|
||||
hard_exit(error_code)
|
||||
|
||||
@@ -109,7 +109,7 @@ def get_available_encoder_set() -> EncoderSet:
|
||||
|
||||
def extract_frames(target_path : str, output_path : str, temp_video_resolution : Resolution, temp_video_fps : Fps, trim_frame_start : int, trim_frame_end : int) -> bool:
|
||||
extract_frame_total = predict_video_frame_total(target_path, temp_video_fps, trim_frame_start, trim_frame_end)
|
||||
temp_frames_pattern = get_temp_frames_pattern(state_manager.get_item('temp_path'), output_path, state_manager.get_item('temp_frame_format'), '%08d')
|
||||
temp_frames_pattern = get_temp_frames_pattern(state_manager.get_temp_path(), output_path, state_manager.get_item('temp_frame_format'), '%08d')
|
||||
commands = ffmpeg_builder.chain(
|
||||
ffmpeg_builder.set_input(target_path),
|
||||
ffmpeg_builder.set_media_resolution(pack_resolution(temp_video_resolution)),
|
||||
@@ -128,7 +128,7 @@ def extract_frames(target_path : str, output_path : str, temp_video_resolution :
|
||||
def spawn_frames(target_path : str, output_path : str, temp_video_resolution : Resolution, temp_video_fps : Fps, trim_frame_start : int, trim_frame_end : int) -> bool:
|
||||
spawn_frame_total = trim_frame_end - trim_frame_start
|
||||
duration = spawn_frame_total / temp_video_fps
|
||||
temp_frames_pattern = get_temp_frames_pattern(state_manager.get_item('temp_path'), output_path, state_manager.get_item('temp_frame_format'), '%08d')
|
||||
temp_frames_pattern = get_temp_frames_pattern(state_manager.get_temp_path(), output_path, state_manager.get_item('temp_frame_format'), '%08d')
|
||||
commands = ffmpeg_builder.chain(
|
||||
ffmpeg_builder.set_loop(),
|
||||
ffmpeg_builder.set_input(target_path),
|
||||
@@ -144,7 +144,7 @@ def spawn_frames(target_path : str, output_path : str, temp_video_resolution : R
|
||||
|
||||
|
||||
def copy_image(target_path : str, output_path : str, temp_image_resolution : Resolution) -> bool:
|
||||
temp_image_path = get_temp_file_path(state_manager.get_item('temp_path'), output_path)
|
||||
temp_image_path = get_temp_file_path(state_manager.get_temp_path(), output_path)
|
||||
commands = ffmpeg_builder.chain(
|
||||
ffmpeg_builder.set_input(target_path),
|
||||
ffmpeg_builder.set_media_resolution(pack_resolution(temp_image_resolution)),
|
||||
@@ -156,7 +156,7 @@ def copy_image(target_path : str, output_path : str, temp_image_resolution : Res
|
||||
|
||||
def finalize_image(output_path : str, output_image_resolution : Resolution) -> bool:
|
||||
output_image_quality = state_manager.get_item('output_image_quality')
|
||||
temp_image_path = get_temp_file_path(state_manager.get_item('temp_path'), output_path)
|
||||
temp_image_path = get_temp_file_path(state_manager.get_temp_path(), output_path)
|
||||
commands = ffmpeg_builder.chain(
|
||||
ffmpeg_builder.set_input(temp_image_path),
|
||||
ffmpeg_builder.set_media_resolution(pack_resolution(output_image_resolution)),
|
||||
@@ -188,7 +188,7 @@ def restore_audio(target_path : str, output_path : str, trim_frame_start : int,
|
||||
output_audio_quality = state_manager.get_item('output_audio_quality')
|
||||
output_audio_volume = state_manager.get_item('output_audio_volume')
|
||||
target_video_fps = detect_video_fps(target_path)
|
||||
temp_video_path = get_temp_file_path(state_manager.get_item('temp_path'), output_path)
|
||||
temp_video_path = get_temp_file_path(state_manager.get_temp_path(), output_path)
|
||||
temp_video_format = cast(VideoFormat, get_file_format(output_path))
|
||||
temp_video_duration = detect_video_duration(temp_video_path)
|
||||
|
||||
@@ -213,7 +213,7 @@ def replace_audio(audio_path : str, output_path : str) -> bool:
|
||||
output_audio_encoder = state_manager.get_item('output_audio_encoder')
|
||||
output_audio_quality = state_manager.get_item('output_audio_quality')
|
||||
output_audio_volume = state_manager.get_item('output_audio_volume')
|
||||
temp_video_path = get_temp_file_path(state_manager.get_item('temp_path'), output_path)
|
||||
temp_video_path = get_temp_file_path(state_manager.get_temp_path(), output_path)
|
||||
temp_video_format = cast(VideoFormat, get_file_format(output_path))
|
||||
temp_video_duration = detect_video_duration(temp_video_path)
|
||||
|
||||
@@ -237,9 +237,9 @@ def merge_video(target_path : str, output_path : str, temp_video_fps : Fps, outp
|
||||
output_video_quality = state_manager.get_item('output_video_quality')
|
||||
output_video_preset = state_manager.get_item('output_video_preset')
|
||||
merge_frame_total = predict_video_frame_total(target_path, output_video_fps, trim_frame_start, trim_frame_end)
|
||||
temp_video_path = get_temp_file_path(state_manager.get_item('temp_path'), output_path)
|
||||
temp_video_path = get_temp_file_path(state_manager.get_temp_path(), output_path)
|
||||
temp_video_format = cast(VideoFormat, get_file_format(output_path))
|
||||
temp_frames_pattern = get_temp_frames_pattern(state_manager.get_item('temp_path'), output_path, state_manager.get_item('temp_frame_format'), '%08d')
|
||||
temp_frames_pattern = get_temp_frames_pattern(state_manager.get_temp_path(), output_path, state_manager.get_item('temp_frame_format'), '%08d')
|
||||
|
||||
output_video_encoder = fix_video_encoder(temp_video_format, output_video_encoder)
|
||||
commands = ffmpeg_builder.chain(
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import os
|
||||
from typing import Any, Union
|
||||
|
||||
from facefusion.app_context import detect_app_context
|
||||
@@ -40,3 +41,21 @@ def set_item(key : Union[StateKey, ProcessorStateKey], value : Any) -> None:
|
||||
|
||||
def clear_item(key : Union[StateKey, ProcessorStateKey]) -> None:
|
||||
set_item(key, None)
|
||||
|
||||
|
||||
def get_jobs_path() -> str:
|
||||
jobs_path = get_item('jobs_path')
|
||||
session_id = get_item('session_id')
|
||||
|
||||
if session_id:
|
||||
return os.path.join(jobs_path, session_id)
|
||||
return jobs_path
|
||||
|
||||
|
||||
def get_temp_path() -> str:
|
||||
temp_path = get_item('temp_path')
|
||||
session_id = get_item('session_id')
|
||||
|
||||
if session_id:
|
||||
return os.path.join(temp_path, session_id)
|
||||
return temp_path
|
||||
|
||||
@@ -67,7 +67,7 @@ def create_temp_frames() -> ErrorCode:
|
||||
|
||||
|
||||
def process_image() -> ErrorCode:
|
||||
temp_frame_paths = resolve_temp_frame_paths(state_manager.get_item('temp_path'), state_manager.get_item('output_path'), state_manager.get_item('temp_frame_format'))
|
||||
temp_frame_paths = resolve_temp_frame_paths(state_manager.get_temp_path(), state_manager.get_item('output_path'), state_manager.get_item('temp_frame_format'))
|
||||
|
||||
if temp_frame_paths:
|
||||
with tqdm(total = len(temp_frame_paths), desc = translator.get('processing'), unit = 'frame', ascii = ' =', disable = state_manager.get_item('log_level') in [ 'warn', 'error' ]) as progress:
|
||||
@@ -151,7 +151,7 @@ def merge_frames() -> ErrorCode:
|
||||
def restore_audio() -> ErrorCode:
|
||||
if state_manager.get_item('output_audio_volume') == 0:
|
||||
logger.info(translator.get('skipping_audio'), __name__)
|
||||
move_temp_file(state_manager.get_item('temp_path'), state_manager.get_item('output_path'))
|
||||
move_temp_file(state_manager.get_temp_path(), state_manager.get_item('output_path'))
|
||||
else:
|
||||
source_audio_path = get_first(filter_audio_paths(state_manager.get_item('source_paths')))
|
||||
if source_audio_path:
|
||||
@@ -161,7 +161,7 @@ def restore_audio() -> ErrorCode:
|
||||
if is_process_stopping():
|
||||
return 4
|
||||
logger.warn(translator.get('replacing_audio_skipped'), __name__)
|
||||
move_temp_file(state_manager.get_item('temp_path'), state_manager.get_item('output_path'))
|
||||
move_temp_file(state_manager.get_temp_path(), state_manager.get_item('output_path'))
|
||||
return 0
|
||||
|
||||
|
||||
|
||||
@@ -11,12 +11,12 @@ def is_process_stopping() -> bool:
|
||||
|
||||
|
||||
def setup() -> ErrorCode:
|
||||
create_temp_directory(state_manager.get_item('temp_path'), state_manager.get_item('output_path'))
|
||||
create_temp_directory(state_manager.get_temp_path(), state_manager.get_item('output_path'))
|
||||
logger.debug(translator.get('creating_temp'), __name__)
|
||||
return 0
|
||||
|
||||
|
||||
def clear() -> ErrorCode:
|
||||
clear_temp_directory(state_manager.get_item('temp_path'), state_manager.get_item('output_path'))
|
||||
clear_temp_directory(state_manager.get_temp_path(), state_manager.get_item('output_path'))
|
||||
logger.debug(translator.get('clearing_temp'), __name__)
|
||||
return 0
|
||||
|
||||
@@ -57,7 +57,7 @@ def prepare_image() -> ErrorCode:
|
||||
|
||||
|
||||
def process_image() -> ErrorCode:
|
||||
temp_image_path = get_temp_file_path(state_manager.get_item('temp_path'), state_manager.get_item('output_path'))
|
||||
temp_image_path = get_temp_file_path(state_manager.get_temp_path(), state_manager.get_item('output_path'))
|
||||
reference_vision_frame = read_static_image(state_manager.get_item('target_path'))
|
||||
source_vision_frames = read_static_images(state_manager.get_item('source_paths'))
|
||||
source_audio_frame = create_empty_audio_frame()
|
||||
|
||||
@@ -69,7 +69,7 @@ def create_temp_frames() -> ErrorCode:
|
||||
|
||||
|
||||
def process_video() -> ErrorCode:
|
||||
temp_frame_paths = resolve_temp_frame_paths(state_manager.get_item('temp_path'), state_manager.get_item('output_path'), state_manager.get_item('temp_frame_format'))
|
||||
temp_frame_paths = resolve_temp_frame_paths(state_manager.get_temp_path(), state_manager.get_item('output_path'), state_manager.get_item('temp_frame_format'))
|
||||
|
||||
if temp_frame_paths:
|
||||
with tqdm(total = len(temp_frame_paths), desc = translator.get('processing'), unit = 'frame', ascii = ' =', disable = state_manager.get_item('log_level') in [ 'warn', 'error' ]) as progress:
|
||||
@@ -123,7 +123,7 @@ def restore_audio() -> ErrorCode:
|
||||
|
||||
if state_manager.get_item('output_audio_volume') == 0:
|
||||
logger.info(translator.get('skipping_audio'), __name__)
|
||||
move_temp_file(state_manager.get_item('temp_path'), state_manager.get_item('output_path'))
|
||||
move_temp_file(state_manager.get_temp_path(), state_manager.get_item('output_path'))
|
||||
else:
|
||||
source_audio_path = get_first(filter_audio_paths(state_manager.get_item('source_paths')))
|
||||
if source_audio_path:
|
||||
@@ -135,7 +135,7 @@ def restore_audio() -> ErrorCode:
|
||||
if is_process_stopping():
|
||||
return 4
|
||||
logger.warn(translator.get('replacing_audio_skipped'), __name__)
|
||||
move_temp_file(state_manager.get_item('temp_path'), state_manager.get_item('output_path'))
|
||||
move_temp_file(state_manager.get_temp_path(), state_manager.get_item('output_path'))
|
||||
else:
|
||||
if ffmpeg.restore_audio(state_manager.get_item('target_path'), state_manager.get_item('output_path'), trim_frame_start, trim_frame_end):
|
||||
video_manager.clear_video_pool()
|
||||
@@ -145,7 +145,7 @@ def restore_audio() -> ErrorCode:
|
||||
if is_process_stopping():
|
||||
return 4
|
||||
logger.warn(translator.get('restoring_audio_skipped'), __name__)
|
||||
move_temp_file(state_manager.get_item('temp_path'), state_manager.get_item('output_path'))
|
||||
move_temp_file(state_manager.get_temp_path(), state_manager.get_item('output_path'))
|
||||
return 0
|
||||
|
||||
|
||||
|
||||
+14
-14
@@ -82,12 +82,12 @@ def test_extract_frames() -> None:
|
||||
]
|
||||
|
||||
for target_path, output_path, trim_frame_start, trim_frame_end, frame_total in test_set:
|
||||
create_temp_directory(state_manager.get_item('temp_path'), output_path)
|
||||
create_temp_directory(state_manager.get_temp_path(), output_path)
|
||||
|
||||
assert extract_frames(target_path, output_path, (452, 240), 30.0, trim_frame_start, trim_frame_end) is True
|
||||
assert len(resolve_temp_frame_paths(state_manager.get_item('temp_path'), output_path, state_manager.get_item('temp_frame_format'))) == frame_total
|
||||
assert len(resolve_temp_frame_paths(state_manager.get_temp_path(), output_path, state_manager.get_item('temp_frame_format'))) == frame_total
|
||||
|
||||
clear_temp_directory(state_manager.get_item('temp_path'), output_path)
|
||||
clear_temp_directory(state_manager.get_temp_path(), output_path)
|
||||
|
||||
|
||||
def test_spawn_frames() -> None:
|
||||
@@ -101,12 +101,12 @@ def test_spawn_frames() -> None:
|
||||
]
|
||||
|
||||
for target_path, output_path, trim_frame_start, trim_frame_end, temp_video_fps, frame_total in test_set:
|
||||
create_temp_directory(state_manager.get_item('temp_path'), output_path)
|
||||
create_temp_directory(state_manager.get_temp_path(), output_path)
|
||||
|
||||
assert spawn_frames(target_path, output_path, (452, 240), temp_video_fps, trim_frame_start, trim_frame_end) is True
|
||||
assert len(resolve_temp_frame_paths(state_manager.get_item('temp_path'), output_path, state_manager.get_item('temp_frame_format'))) == frame_total
|
||||
assert len(resolve_temp_frame_paths(state_manager.get_temp_path(), output_path, state_manager.get_item('temp_frame_format'))) == frame_total
|
||||
|
||||
clear_temp_directory(state_manager.get_item('temp_path'), output_path)
|
||||
clear_temp_directory(state_manager.get_temp_path(), output_path)
|
||||
|
||||
|
||||
def test_merge_video() -> None:
|
||||
@@ -127,12 +127,12 @@ def test_merge_video() -> None:
|
||||
state_manager.init_item('output_path', target_path)
|
||||
state_manager.init_item('output_video_fps', 25.0)
|
||||
state_manager.init_item('output_video_encoder', output_video_encoder)
|
||||
create_temp_directory(state_manager.get_item('temp_path'), output_path)
|
||||
create_temp_directory(state_manager.get_temp_path(), output_path)
|
||||
extract_frames(target_path, output_path, (452, 240), 25.0, 0, 1)
|
||||
|
||||
assert merge_video(target_path, output_path, 25.0, (452, 240), 0, 1) is True
|
||||
|
||||
clear_temp_directory(state_manager.get_item('temp_path'), output_path)
|
||||
clear_temp_directory(state_manager.get_temp_path(), output_path)
|
||||
|
||||
state_manager.init_item('output_video_encoder', 'libx264')
|
||||
|
||||
@@ -169,15 +169,15 @@ def test_restore_audio() -> None:
|
||||
output_audio_encoders = get_available_encoder_set().get('audio')
|
||||
|
||||
for target_path, output_path in test_set:
|
||||
create_temp_directory(state_manager.get_item('temp_path'), output_path)
|
||||
create_temp_directory(state_manager.get_temp_path(), output_path)
|
||||
|
||||
for output_audio_encoder in output_audio_encoders:
|
||||
state_manager.init_item('output_audio_encoder', output_audio_encoder)
|
||||
copy_file(target_path, get_temp_file_path(state_manager.get_item('temp_path'), output_path))
|
||||
copy_file(target_path, get_temp_file_path(state_manager.get_temp_path(), output_path))
|
||||
|
||||
assert restore_audio(target_path, output_path, 0, 270) is True
|
||||
|
||||
clear_temp_directory(state_manager.get_item('temp_path'), output_path)
|
||||
clear_temp_directory(state_manager.get_temp_path(), output_path)
|
||||
|
||||
state_manager.init_item('output_audio_encoder', 'aac')
|
||||
|
||||
@@ -196,15 +196,15 @@ def test_replace_audio() -> None:
|
||||
output_audio_encoders = get_available_encoder_set().get('audio')
|
||||
|
||||
for target_path, output_path in test_set:
|
||||
create_temp_directory(state_manager.get_item('temp_path'), output_path)
|
||||
create_temp_directory(state_manager.get_temp_path(), output_path)
|
||||
|
||||
for output_audio_encoder in output_audio_encoders:
|
||||
state_manager.init_item('output_audio_encoder', output_audio_encoder)
|
||||
copy_file(target_path, get_temp_file_path(state_manager.get_item('temp_path'), output_path))
|
||||
copy_file(target_path, get_temp_file_path(state_manager.get_temp_path(), output_path))
|
||||
|
||||
assert replace_audio(get_test_example_file('source.mp3'), output_path) is True
|
||||
assert replace_audio(get_test_example_file('source.wav'), output_path) is True
|
||||
|
||||
clear_temp_directory(state_manager.get_item('temp_path'), output_path)
|
||||
clear_temp_directory(state_manager.get_temp_path(), output_path)
|
||||
|
||||
state_manager.init_item('output_audio_encoder', 'aac')
|
||||
|
||||
@@ -21,12 +21,12 @@ def before_all() -> None:
|
||||
|
||||
def test_get_temp_file_path() -> None:
|
||||
state_manager.init_item('output_path', 'temp.mp4')
|
||||
assert get_temp_file_path(state_manager.get_item('temp_path'), get_test_example_file('target-240p.mp4')) == os.path.join(state_manager.get_item('temp_path'), 'facefusion', 'target-240p', 'temp.mp4')
|
||||
assert get_temp_file_path(state_manager.get_temp_path(), get_test_example_file('target-240p.mp4')) == os.path.join(state_manager.get_temp_path(), 'facefusion', 'target-240p', 'temp.mp4')
|
||||
|
||||
|
||||
def test_get_temp_directory_path() -> None:
|
||||
assert get_temp_directory_path(state_manager.get_item('temp_path'), get_test_example_file('target-240p.mp4')) == os.path.join(state_manager.get_item('temp_path'), 'facefusion', 'target-240p')
|
||||
assert get_temp_directory_path(state_manager.get_temp_path(), get_test_example_file('target-240p.mp4')) == os.path.join(state_manager.get_temp_path(), 'facefusion', 'target-240p')
|
||||
|
||||
|
||||
def test_get_temp_frames_pattern() -> None:
|
||||
assert get_temp_frames_pattern(state_manager.get_item('temp_path'), get_test_example_file('target-240p.mp4'), state_manager.get_item('temp_frame_format'), '%04d') == os.path.join(state_manager.get_item('temp_path'), 'facefusion', 'target-240p', '%04d.png')
|
||||
assert get_temp_frames_pattern(state_manager.get_temp_path(), get_test_example_file('target-240p.mp4'), state_manager.get_item('temp_frame_format'), '%04d') == os.path.join(state_manager.get_temp_path(), 'facefusion', 'target-240p', '%04d.png')
|
||||
|
||||
Reference in New Issue
Block a user