audio to image as frames

This commit is contained in:
harisreedhar
2025-12-18 19:56:31 +05:30
committed by henryruhs
parent c84e6102f6
commit 09d295ea31
6 changed files with 91 additions and 6 deletions
+1 -1
View File
@@ -45,7 +45,7 @@ face_mask_regions : List[FaceMaskRegion] = list(get_args(FaceMaskRegion))
voice_extractor_models : List[VoiceExtractorModel] = list(get_args(VoiceExtractorModel))
workflows : List[WorkFlow] = [ 'auto', 'audio-to-image:video', 'image-to-image', 'image-to-video', 'image-to-video:frames' ]
workflows : List[WorkFlow] = [ 'auto', 'audio-to-image:frames', 'audio-to-image:video', 'image-to-image', 'image-to-video', 'image-to-video:frames' ]
audio_type_set : AudioTypeSet =\
{
+6 -2
View File
@@ -20,7 +20,7 @@ from facefusion.processors.core import get_processors_modules
from facefusion.program import create_program
from facefusion.program_helper import validate_args
from facefusion.types import Args, ErrorCode, WorkFlow
from facefusion.workflows import audio_to_image, image_to_image, image_to_video, image_to_video_as_frames
from facefusion.workflows import audio_to_image, audio_to_image_as_frames, image_to_image, image_to_video, image_to_video_as_frames
def cli() -> None:
@@ -338,6 +338,8 @@ def conditional_process() -> ErrorCode:
if state_manager.get_item('workflow') == 'audio-to-image:video':
return audio_to_image.process(start_time)
if state_manager.get_item('workflow') == 'audio-to-image:frames':
return audio_to_image_as_frames.process(start_time)
if state_manager.get_item('workflow') == 'image-to-image':
return image_to_image.process(start_time)
if state_manager.get_item('workflow') == 'image-to-video':
@@ -355,6 +357,8 @@ def detect_workflow() -> WorkFlow:
return 'image-to-video:frames'
if has_audio(state_manager.get_item('source_paths')) and has_image([ state_manager.get_item('target_path') ]):
return 'audio-to-image:video'
if get_file_extension(state_manager.get_item('output_path')):
return 'audio-to-image:video'
return 'audio-to-image:frames'
return 'image-to-image'
+1 -1
View File
@@ -55,7 +55,7 @@ Language = Literal['en']
Locales : TypeAlias = Dict[Language, Dict[str, Any]]
LocalePoolSet : TypeAlias = Dict[str, Locales]
WorkFlow = Literal['auto', 'audio-to-image:video', 'image-to-image', 'image-to-video', 'image-to-video:frames']
WorkFlow = Literal['auto', 'audio-to-image:frames', 'audio-to-image:video', 'image-to-image', 'image-to-video', 'image-to-video:frames']
VideoCaptureSet : TypeAlias = Dict[str, cv2.VideoCapture]
VideoWriterSet : TypeAlias = Dict[str, cv2.VideoWriter]
@@ -0,0 +1,74 @@
import os
from functools import partial
from facefusion import ffmpeg, logger, process_manager, state_manager, translator
from facefusion.audio import restrict_trim_audio_frame
from facefusion.common_helper import get_first
from facefusion.filesystem import are_images, resolve_file_paths
from facefusion.filesystem import copy_file, create_directory, filter_audio_paths
from facefusion.temp_helper import resolve_temp_frame_paths
from facefusion.time_helper import calculate_end_time
from facefusion.types import ErrorCode
from facefusion.vision import detect_image_resolution, restrict_image_resolution, scale_resolution
from facefusion.workflows.core import analyse_image, clear, is_process_stopping, process_frames, setup
def process(start_time : float) -> ErrorCode:
tasks =\
[
analyse_image,
clear,
setup,
create_temp_frames,
process_frames,
copy_temp_frames,
partial(finalize_frames, start_time),
clear
]
process_manager.start()
for task in tasks:
error_code = task() #type:ignore[operator]
if error_code > 0:
process_manager.end()
return error_code
process_manager.end()
return 0
def create_temp_frames() -> ErrorCode:
state_manager.set_item('output_video_fps', 25.0) # TODO: set default fps value
source_audio_path = get_first(filter_audio_paths(state_manager.get_item('source_paths')))
output_image_resolution = scale_resolution(detect_image_resolution(state_manager.get_item('target_path')), state_manager.get_item('output_image_scale'))
temp_image_resolution = restrict_image_resolution(state_manager.get_item('target_path'), output_image_resolution)
trim_frame_start, trim_frame_end = restrict_trim_audio_frame(source_audio_path, state_manager.get_item('output_video_fps'), state_manager.get_item('trim_frame_start'), state_manager.get_item('trim_frame_end'))
if ffmpeg.spawn_frames(state_manager.get_item('target_path'), state_manager.get_item('output_path'), temp_image_resolution, state_manager.get_item('output_video_fps'), trim_frame_start, trim_frame_end):
logger.debug(translator.get('spawning_frames_succeeded'), __name__)
else:
if is_process_stopping():
return 4
logger.error(translator.get('spawning_frames_failed'), __name__)
return 1
return 0
def copy_temp_frames() -> ErrorCode:
temp_frame_paths = resolve_temp_frame_paths(state_manager.get_temp_path(), state_manager.get_item('output_path'), state_manager.get_item('temp_frame_format'))
for temp_frame_path in temp_frame_paths:
if not create_directory(state_manager.get_item('output_path')) or not copy_file(temp_frame_path, os.path.join(state_manager.get_item('output_path'), os.path.basename(temp_frame_path))):
return 1
return 0
def finalize_frames(start_time : float) -> ErrorCode:
if are_images(resolve_file_paths(state_manager.get_item('output_path'))):
logger.info(translator.get('processing_frames_succeeded').format(seconds = calculate_end_time(start_time)), __name__)
else:
logger.error(translator.get('processing_frames_failed'), __name__)
return 1
return 0
+2 -2
View File
@@ -39,7 +39,7 @@ def analyse_image() -> ErrorCode:
def conditional_get_source_audio_frame(frame_number : int) -> AudioFrame:
if state_manager.get_item('workflow') in [ 'audio-to-image:video', 'image-to-video' ]:
if state_manager.get_item('workflow') in [ 'audio-to-image:frames', 'audio-to-image:video', 'image-to-video' ]:
source_audio_path = get_first(filter_audio_paths(state_manager.get_item('source_paths')))
output_video_fps = state_manager.get_item('output_video_fps')
@@ -54,7 +54,7 @@ def conditional_get_source_audio_frame(frame_number : int) -> AudioFrame:
def conditional_get_source_voice_frame(frame_number: int) -> AudioFrame:
if state_manager.get_item('workflow') in [ 'audio-to-image:video', 'image-to-video' ]:
if state_manager.get_item('workflow') in [ 'audio-to-image:frames', 'audio-to-image:video', 'image-to-video' ]:
source_audio_path = get_first(filter_audio_paths(state_manager.get_item('source_paths')))
output_video_fps = state_manager.get_item('output_video_fps')
+7
View File
@@ -33,6 +33,13 @@ def test_sync_lip_to_image() -> None:
assert is_test_output_file('test_sync_lip_to_image.mp4') is True
def test_sync_lip_to_image_as_frames() -> None:
commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'audio-to-image:frames', '--jobs-path', get_test_jobs_directory(), '--processors', 'lip_syncer', '-s', get_test_example_file('source.mp3'), '-t', get_test_example_file('target-240p.jpg'), '-o', get_test_output_path('test_sync_lip_to_image_as_frames') ]
assert subprocess.run(commands).returncode == 0
assert is_test_output_sequence(get_test_output_path('test_sync_lip_to_image_as_frames')) is True
def test_sync_lip_to_video() -> None:
commands = [ sys.executable, 'facefusion.py', 'run', '--workflow', 'image-to-video', '--jobs-path', get_test_jobs_directory(), '--processors', 'lip_syncer', '-s', get_test_example_file('source.mp3'), '-t', get_test_example_file('target-240p.mp4'), '-o', get_test_output_path('test_sync_lip_to_video.mp4'), '--trim-frame-end', '1' ]