update ffmpeg.set_loop

add test

introduce spawn_frames
This commit is contained in:
harisreedhar
2025-11-26 22:25:13 +05:30
committed by henryruhs
parent 38e8610523
commit 678f4edd4c
7 changed files with 67 additions and 20 deletions
+18
View File
@@ -125,6 +125,24 @@ def extract_frames(target_path : str, output_path : str, temp_video_resolution :
return process.returncode == 0
def spawn_frames(target_path : str, output_path : str, temp_video_resolution : Resolution, temp_video_fps : Fps, trim_frame_start : int, trim_frame_end : int) -> bool:
spawn_frame_total = trim_frame_end - trim_frame_start
duration = spawn_frame_total / temp_video_fps
temp_frames_pattern = get_temp_frames_pattern(state_manager.get_item('temp_path'), output_path, state_manager.get_item('temp_frame_format'), '%08d')
commands = ffmpeg_builder.chain(
ffmpeg_builder.set_loop(),
ffmpeg_builder.set_input(target_path),
ffmpeg_builder.set_video_duration(duration),
ffmpeg_builder.set_video_fps(temp_video_fps),
ffmpeg_builder.set_media_resolution(pack_resolution(temp_video_resolution)),
ffmpeg_builder.set_output(temp_frames_pattern)
)
with tqdm(total = spawn_frame_total, desc = translator.get('spawning'), unit = 'frame', ascii = ' =', disable = state_manager.get_item('log_level') in [ 'warn', 'error' ]) as progress:
process = run_ffmpeg_with_progress(commands, partial(update_progress, progress))
return process.returncode == 0
def copy_image(target_path : str, output_path : str, temp_image_resolution : Resolution) -> bool:
temp_image_path = get_temp_file_path(state_manager.get_item('temp_path'), output_path)
commands = ffmpeg_builder.chain(
+4
View File
@@ -59,6 +59,10 @@ def force_output(output_path : str) -> List[Command]:
return [ '-y', output_path ]
def set_loop() -> List[Command]:
return [ '-loop', '1' ]
def cast_stream() -> List[Command]:
return [ '-' ]
+3
View File
@@ -12,8 +12,11 @@ LOCALES : Locales =\
'extracting_frames': 'extracting frames with a resolution of {resolution} and {fps} frames per second',
'extracting_frames_succeeded': 'extracting frames succeeded',
'extracting_frames_failed': 'extracting frames failed',
'spawning_frames_succeeded': 'spawning frames succeeded',
'spawning_frames_failed': 'spawning frames failed',
'analysing': 'analysing',
'extracting': 'extracting',
'spawning': 'spawning',
'streaming': 'streaming',
'processing': 'processing',
'merging': 'merging',
-11
View File
@@ -38,14 +38,3 @@ def create_temp_directory(temp_path : str, output_path : str) -> bool:
def clear_temp_directory(temp_path : str, output_path : str) -> bool:
temp_directory_path = get_temp_directory_path(temp_path, output_path)
return remove_directory(temp_directory_path)
def get_temp_sequence_paths(temp_path : str, output_path : str, temp_frame_format : str, temp_frame_prefix : str, frame_total : int) -> List[str]:
temp_directory_path = get_temp_directory_path(temp_path, output_path)
temp_frame_paths = []
for frame_number in range(frame_total):
temp_file_name = temp_frame_prefix % (frame_number + 1) + '.' + temp_frame_format
temp_frame_path = os.path.join(temp_directory_path, temp_file_name)
temp_frame_paths.append(temp_frame_path)
return temp_frame_paths
+20 -6
View File
@@ -9,10 +9,10 @@ from facefusion.audio import create_empty_audio_frame, get_audio_frame, get_voic
from facefusion.common_helper import get_first
from facefusion.filesystem import filter_audio_paths, is_video
from facefusion.processors.core import get_processors_modules
from facefusion.temp_helper import get_temp_sequence_paths, move_temp_file
from facefusion.temp_helper import move_temp_file, resolve_temp_frame_paths
from facefusion.time_helper import calculate_end_time
from facefusion.types import ErrorCode
from facefusion.vision import conditional_merge_vision_mask, detect_image_resolution, extract_vision_mask, pack_resolution, read_static_image, read_static_images, restrict_trim_video_frame, scale_resolution, write_image
from facefusion.vision import conditional_merge_vision_mask, detect_image_resolution, extract_vision_mask, pack_resolution, read_static_image, read_static_images, restrict_image_resolution, restrict_trim_video_frame, scale_resolution, write_image
from facefusion.workflows.core import clear, is_process_stopping, setup
@@ -22,6 +22,7 @@ def process(start_time : float) -> ErrorCode:
analyse_image,
clear,
setup,
create_temp_frames,
process_image,
merge_frames,
restore_audio,
@@ -48,12 +49,25 @@ def analyse_image() -> ErrorCode: # TODO: reusable block
return 0
def process_image() -> ErrorCode:
state_manager.set_item('output_video_fps', 25.0) # TODO: set default fps value
def create_temp_frames() -> ErrorCode:
state_manager.set_item('output_video_fps', 25.0) # TODO: set default fps value
source_audio_path = get_first(filter_audio_paths(state_manager.get_item('source_paths')))
output_image_resolution = scale_resolution(detect_image_resolution(state_manager.get_item('target_path')), state_manager.get_item('output_image_scale'))
temp_image_resolution = restrict_image_resolution(state_manager.get_item('target_path'), output_image_resolution)
trim_frame_start, trim_frame_end = restrict_trim_audio_frame(source_audio_path, state_manager.get_item('output_video_fps'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end'))
audio_frame_total = trim_frame_end - trim_frame_start
temp_frame_paths = get_temp_sequence_paths(state_manager.get_item('temp_path'), state_manager.get_item('output_path'), state_manager.get_item('temp_frame_format'), '%08d', audio_frame_total)
if ffmpeg.spawn_frames(state_manager.get_item('target_path'), state_manager.get_item('output_path'), temp_image_resolution, state_manager.get_item('output_video_fps'), trim_frame_start, trim_frame_end):
logger.debug(translator.get('spawning_frames_succeeded'), __name__)
else:
if is_process_stopping():
return 4
logger.error(translator.get('spawning_frames_failed'), __name__)
return 1
return 0
def process_image() -> ErrorCode:
temp_frame_paths = resolve_temp_frame_paths(state_manager.get_item('temp_path'), state_manager.get_item('output_path'), state_manager.get_item('temp_frame_format'))
if temp_frame_paths:
with tqdm(total = len(temp_frame_paths), desc = translator.get('processing'), unit = 'frame', ascii = ' =', disable = state_manager.get_item('log_level') in [ 'warn', 'error' ]) as progress:
+2 -2
View File
@@ -22,7 +22,7 @@ def process(start_time : float) -> ErrorCode:
analyse_video,
clear,
setup,
extract_frames,
create_temp_frames,
process_video,
merge_frames,
restore_audio,
@@ -51,7 +51,7 @@ def analyse_video() -> ErrorCode:
return 0
def extract_frames() -> ErrorCode:
def create_temp_frames() -> ErrorCode:
trim_frame_start, trim_frame_end = restrict_trim_video_frame(state_manager.get_item('target_path'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end'))
output_video_resolution = scale_resolution(detect_video_resolution(state_manager.get_item('target_path')), state_manager.get_item('output_video_scale'))
temp_video_resolution = restrict_video_resolution(state_manager.get_item('target_path'), output_video_resolution)
+20 -1
View File
@@ -7,7 +7,7 @@ import pytest
import facefusion.ffmpeg
from facefusion import process_manager, state_manager
from facefusion.download import conditional_download
from facefusion.ffmpeg import concat_video, extract_frames, merge_video, read_audio_buffer, replace_audio, restore_audio
from facefusion.ffmpeg import concat_video, extract_frames, merge_video, read_audio_buffer, replace_audio, restore_audio, spawn_frames
from facefusion.filesystem import copy_file
from facefusion.temp_helper import clear_temp_directory, create_temp_directory, get_temp_file_path, resolve_temp_frame_paths
from facefusion.types import EncoderSet
@@ -90,6 +90,25 @@ def test_extract_frames() -> None:
clear_temp_directory(state_manager.get_item('temp_path'), output_path)
def test_spawn_frames() -> None:
test_set =\
[
(get_test_example_file('source.jpg'), get_test_example_file('test-spawn-frames-0-100.mp4'), 0, 100, 30.0, 100),
(get_test_example_file('source.jpg'), get_test_example_file('test-spawn-frames-0-150.mp4'), 0, 150, 30.0, 150),
(get_test_example_file('source.jpg'), get_test_example_file('test-spawn-frames-50-100.mp4'), 50, 100, 25.0, 50),
(get_test_example_file('source.jpg'), get_test_example_file('test-spawn-frames-0-300.mp4'), 0, 300, 60.0, 300),
(get_test_example_file('source.jpg'), get_test_example_file('test-spawn-frames-100-200.mp4'), 100, 200, 30.0, 100)
]
for target_path, output_path, trim_frame_start, trim_frame_end, temp_video_fps, frame_total in test_set:
create_temp_directory(state_manager.get_item('temp_path'), output_path)
assert spawn_frames(target_path, output_path, (452, 240), temp_video_fps, trim_frame_start, trim_frame_end) is True
assert len(resolve_temp_frame_paths(state_manager.get_item('temp_path'), output_path, state_manager.get_item('temp_frame_format'))) == frame_total
clear_temp_directory(state_manager.get_item('temp_path'), output_path)
def test_merge_video() -> None:
test_set =\
[