From 09d295ea31fb7410a57c19eae4f6064b52921ee8 Mon Sep 17 00:00:00 2001 From: harisreedhar Date: Thu, 18 Dec 2025 19:56:31 +0530 Subject: [PATCH] audio to image as frames --- facefusion/choices.py | 2 +- facefusion/core.py | 8 +- facefusion/types.py | 2 +- .../workflows/audio_to_image_as_frames.py | 74 +++++++++++++++++++ facefusion/workflows/core.py | 4 +- tests/test_cli_lip_syncer.py | 7 ++ 6 files changed, 91 insertions(+), 6 deletions(-) create mode 100644 facefusion/workflows/audio_to_image_as_frames.py diff --git a/facefusion/choices.py b/facefusion/choices.py index 3166c80e..c4279d42 100755 --- a/facefusion/choices.py +++ b/facefusion/choices.py @@ -45,7 +45,7 @@ face_mask_regions : List[FaceMaskRegion] = list(get_args(FaceMaskRegion)) voice_extractor_models : List[VoiceExtractorModel] = list(get_args(VoiceExtractorModel)) -workflows : List[WorkFlow] = [ 'auto', 'audio-to-image:video', 'image-to-image', 'image-to-video', 'image-to-video:frames' ] +workflows : List[WorkFlow] = [ 'auto', 'audio-to-image:frames', 'audio-to-image:video', 'image-to-image', 'image-to-video', 'image-to-video:frames' ] audio_type_set : AudioTypeSet =\ { diff --git a/facefusion/core.py b/facefusion/core.py index 014c2210..8d0801ff 100755 --- a/facefusion/core.py +++ b/facefusion/core.py @@ -20,7 +20,7 @@ from facefusion.processors.core import get_processors_modules from facefusion.program import create_program from facefusion.program_helper import validate_args from facefusion.types import Args, ErrorCode, WorkFlow -from facefusion.workflows import audio_to_image, image_to_image, image_to_video, image_to_video_as_frames +from facefusion.workflows import audio_to_image, audio_to_image_as_frames, image_to_image, image_to_video, image_to_video_as_frames def cli() -> None: @@ -338,6 +338,8 @@ def conditional_process() -> ErrorCode: if state_manager.get_item('workflow') == 'audio-to-image:video': return audio_to_image.process(start_time) + if state_manager.get_item('workflow') == 'audio-to-image:frames': + return audio_to_image_as_frames.process(start_time) if state_manager.get_item('workflow') == 'image-to-image': return image_to_image.process(start_time) if state_manager.get_item('workflow') == 'image-to-video': @@ -355,6 +357,8 @@ def detect_workflow() -> WorkFlow: return 'image-to-video:frames' if has_audio(state_manager.get_item('source_paths')) and has_image([ state_manager.get_item('target_path') ]): - return 'audio-to-image:video' + if get_file_extension(state_manager.get_item('output_path')): + return 'audio-to-image:video' + return 'audio-to-image:frames' return 'image-to-image' diff --git a/facefusion/types.py b/facefusion/types.py index 21a3d9d0..c3371f83 100755 --- a/facefusion/types.py +++ b/facefusion/types.py @@ -55,7 +55,7 @@ Language = Literal['en'] Locales : TypeAlias = Dict[Language, Dict[str, Any]] LocalePoolSet : TypeAlias = Dict[str, Locales] -WorkFlow = Literal['auto', 'audio-to-image:video', 'image-to-image', 'image-to-video', 'image-to-video:frames'] +WorkFlow = Literal['auto', 'audio-to-image:frames', 'audio-to-image:video', 'image-to-image', 'image-to-video', 'image-to-video:frames'] VideoCaptureSet : TypeAlias = Dict[str, cv2.VideoCapture] VideoWriterSet : TypeAlias = Dict[str, cv2.VideoWriter] diff --git a/facefusion/workflows/audio_to_image_as_frames.py b/facefusion/workflows/audio_to_image_as_frames.py new file mode 100644 index 00000000..3db08087 --- /dev/null +++ b/facefusion/workflows/audio_to_image_as_frames.py @@ -0,0 +1,74 @@ +import os +from functools import partial + +from facefusion import ffmpeg, logger, process_manager, state_manager, translator +from facefusion.audio import restrict_trim_audio_frame +from facefusion.common_helper import get_first +from facefusion.filesystem import are_images, resolve_file_paths +from facefusion.filesystem import copy_file, create_directory, filter_audio_paths +from facefusion.temp_helper import resolve_temp_frame_paths +from facefusion.time_helper import calculate_end_time +from facefusion.types import ErrorCode +from facefusion.vision import detect_image_resolution, restrict_image_resolution, scale_resolution +from facefusion.workflows.core import analyse_image, clear, is_process_stopping, process_frames, setup + + +def process(start_time : float) -> ErrorCode: + tasks =\ + [ + analyse_image, + clear, + setup, + create_temp_frames, + process_frames, + copy_temp_frames, + partial(finalize_frames, start_time), + clear + ] + + process_manager.start() + + for task in tasks: + error_code = task() #type:ignore[operator] + + if error_code > 0: + process_manager.end() + return error_code + + process_manager.end() + return 0 + + +def create_temp_frames() -> ErrorCode: + state_manager.set_item('output_video_fps', 25.0) # TODO: set default fps value + source_audio_path = get_first(filter_audio_paths(state_manager.get_item('source_paths'))) + output_image_resolution = scale_resolution(detect_image_resolution(state_manager.get_item('target_path')), state_manager.get_item('output_image_scale')) + temp_image_resolution = restrict_image_resolution(state_manager.get_item('target_path'), output_image_resolution) + trim_frame_start, trim_frame_end = restrict_trim_audio_frame(source_audio_path, state_manager.get_item('output_video_fps'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end')) + + if ffmpeg.spawn_frames(state_manager.get_item('target_path'), state_manager.get_item('output_path'), temp_image_resolution, state_manager.get_item('output_video_fps'), trim_frame_start, trim_frame_end): + logger.debug(translator.get('spawning_frames_succeeded'), __name__) + else: + if is_process_stopping(): + return 4 + logger.error(translator.get('spawning_frames_failed'), __name__) + return 1 + return 0 + + +def copy_temp_frames() -> ErrorCode: + temp_frame_paths = resolve_temp_frame_paths(state_manager.get_temp_path(), state_manager.get_item('output_path'), state_manager.get_item('temp_frame_format')) + + for temp_frame_path in temp_frame_paths: + if not create_directory(state_manager.get_item('output_path')) or not copy_file(temp_frame_path, os.path.join(state_manager.get_item('output_path'), os.path.basename(temp_frame_path))): + return 1 + return 0 + + +def finalize_frames(start_time : float) -> ErrorCode: + if are_images(resolve_file_paths(state_manager.get_item('output_path'))): + logger.info(translator.get('processing_frames_succeeded').format(seconds = calculate_end_time(start_time)), __name__) + else: + logger.error(translator.get('processing_frames_failed'), __name__) + return 1 + return 0 diff --git a/facefusion/workflows/core.py b/facefusion/workflows/core.py index f173cdce..1ba61053 100644 --- a/facefusion/workflows/core.py +++ b/facefusion/workflows/core.py @@ -39,7 +39,7 @@ def analyse_image() -> ErrorCode: def conditional_get_source_audio_frame(frame_number : int) -> AudioFrame: - if state_manager.get_item('workflow') in [ 'audio-to-image:video', 'image-to-video' ]: + if state_manager.get_item('workflow') in [ 'audio-to-image:frames', 'audio-to-image:video', 'image-to-video' ]: source_audio_path = get_first(filter_audio_paths(state_manager.get_item('source_paths'))) output_video_fps = state_manager.get_item('output_video_fps') @@ -54,7 +54,7 @@ def conditional_get_source_audio_frame(frame_number : int) -> AudioFrame: def conditional_get_source_voice_frame(frame_number: int) -> AudioFrame: - if state_manager.get_item('workflow') in [ 'audio-to-image:video', 'image-to-video' ]: + if state_manager.get_item('workflow') in [ 'audio-to-image:frames', 'audio-to-image:video', 'image-to-video' ]: source_audio_path = get_first(filter_audio_paths(state_manager.get_item('source_paths'))) output_video_fps = state_manager.get_item('output_video_fps') diff --git a/tests/test_cli_lip_syncer.py b/tests/test_cli_lip_syncer.py index e1d27adb..a5f524c1 100644 --- a/tests/test_cli_lip_syncer.py +++ b/tests/test_cli_lip_syncer.py @@ -33,6 +33,13 @@ def test_sync_lip_to_image() -> None: assert is_test_output_file('test_sync_lip_to_image.mp4') is True +def test_sync_lip_to_image_as_frames() -> None: + commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'audio-to-image:frames', '--jobs-path', get_test_jobs_directory(), '--processors', 'lip_syncer', '-s', get_test_example_file('source.mp3'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_path('test_sync_lip_to_image_as_frames') ] + + assert subprocess.run(commands).returncode == 0 + assert is_test_output_sequence(get_test_output_path('test_sync_lip_to_image_as_frames')) is True + + def test_sync_lip_to_video() -> None: commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-video', '--jobs-path', get_test_jobs_directory(), '--processors', 'lip_syncer', '-s', get_test_example_file('source.mp3'), '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_path('test_sync_lip_to_video.mp4'), '--trim-frame-end', '1' ]