Files
facefusion/facefusion/ffmpeg.py
T
Henry Ruhs 76c413a2c1 Refactor/ffmpeg less stream (#1092)
* remove ffmpeg from stream to use opus and vpx, add bunch of todos

* fix testing

* improve download checkout

* fix datachannel download, fix super dirty test clients - setup logic does not belong there

* fix testing
2026-05-11 16:36:23 +02:00

382 lines
16 KiB
Python

import os
import subprocess
import tempfile
from functools import lru_cache, partial
from typing import List, Optional, cast
from tqdm import tqdm
import facefusion.choices
from facefusion import ffmpeg_builder, logger, process_manager, state_manager, translator
from facefusion.filesystem import get_file_format, remove_file
from facefusion.temp_helper import get_temp_file_path, get_temp_frames_pattern
from facefusion.types import ApiSecurityStrategy, AudioBuffer, AudioEncoder, Command, EncoderSet, Fps, Resolution, SampleRate, UpdateProgress, VideoEncoder, VideoFormat
from facefusion.vision import detect_video_duration, detect_video_fps, pack_resolution, predict_video_frame_total
def run_ffmpeg_with_progress(commands : List[Command], update_progress : UpdateProgress) -> subprocess.Popen[bytes]:
log_level = state_manager.get_item('log_level')
commands.extend(ffmpeg_builder.set_progress())
commands.extend(ffmpeg_builder.cast_stream())
commands = ffmpeg_builder.run(commands)
process = subprocess.Popen(commands, stderr = subprocess.PIPE, stdout = subprocess.PIPE)
while process_manager.is_processing():
try:
while __line__ := process.stdout.readline().decode().lower():
if process_manager.is_stopping():
process.terminate()
if 'frame=' in __line__:
_, frame_number = __line__.split('frame=')
update_progress(int(frame_number))
if log_level == 'debug':
log_debug(process)
process.wait(timeout = 0.5)
except subprocess.TimeoutExpired:
continue
return process
return process
def update_progress(progress : tqdm, frame_number : int) -> None:
progress.update(frame_number - progress.n)
def run_ffmpeg_with_pipe(commands : List[Command], file_content : bytes) -> subprocess.Popen[bytes]:
commands = ffmpeg_builder.run(commands)
process = subprocess.Popen(commands, stdin = subprocess.PIPE, stderr = subprocess.PIPE, stdout = subprocess.PIPE)
process.communicate(input = file_content)
return process
def run_ffmpeg(commands : List[Command]) -> subprocess.Popen[bytes]:
log_level = state_manager.get_item('log_level')
commands = ffmpeg_builder.run(commands)
process = subprocess.Popen(commands, stderr = subprocess.PIPE, stdout = subprocess.PIPE)
while process_manager.is_processing():
try:
if log_level == 'debug':
log_debug(process)
process.wait(timeout = 0.5)
except subprocess.TimeoutExpired:
continue
return process
if process_manager.is_stopping():
process.terminate()
return process
def open_ffmpeg(commands : List[Command]) -> subprocess.Popen[bytes]:
commands = ffmpeg_builder.run(commands)
return subprocess.Popen(commands, stdin = subprocess.PIPE, stdout = subprocess.PIPE)
def log_debug(process : subprocess.Popen[bytes]) -> None:
_, stderr = process.communicate()
errors = stderr.decode().splitlines()
for error in errors:
if error.strip():
logger.debug(error.strip(), __name__)
def get_available_encoder_set() -> EncoderSet:
available_encoder_set : EncoderSet =\
{
'audio': [],
'image': [],
'video': []
}
commands = ffmpeg_builder.chain(
ffmpeg_builder.get_encoders()
)
process = run_ffmpeg(commands)
while line := process.stdout.readline().decode().lower():
if line.startswith(' a'):
audio_encoder = line.split()[1]
if audio_encoder in facefusion.choices.audio_encoders and audio_encoder not in available_encoder_set.get('audio'):
available_encoder_set['audio'].append(audio_encoder) #type:ignore[arg-type]
if line.startswith(' v'):
vision_encoder = line.split()[1]
if vision_encoder in facefusion.choices.image_encoders and vision_encoder not in available_encoder_set.get('image'):
available_encoder_set['image'].append(vision_encoder) #type:ignore[arg-type]
if vision_encoder in facefusion.choices.video_encoders and vision_encoder not in available_encoder_set.get('video'):
available_encoder_set['video'].append(vision_encoder) #type:ignore[arg-type]
return available_encoder_set
@lru_cache(maxsize = None)
def get_static_available_encoder_set() -> EncoderSet:
return get_available_encoder_set()
def extract_frames(target_path : str, output_path : str, temp_video_resolution : Resolution, temp_video_fps : Fps, trim_frame_start : int, trim_frame_end : int) -> bool:
extract_frame_total = predict_video_frame_total(target_path, temp_video_fps, trim_frame_start, trim_frame_end)
temp_frames_pattern = get_temp_frames_pattern(state_manager.get_temp_path(), output_path, state_manager.get_item('temp_frame_format'), '%08d')
commands = ffmpeg_builder.chain(
ffmpeg_builder.set_input(target_path),
ffmpeg_builder.set_media_resolution(pack_resolution(temp_video_resolution)),
ffmpeg_builder.set_frame_quality(0),
ffmpeg_builder.enforce_pixel_format('rgb24'),
ffmpeg_builder.select_frame_range(trim_frame_start, trim_frame_end, temp_video_fps),
ffmpeg_builder.prevent_frame_drop(),
ffmpeg_builder.set_output(temp_frames_pattern)
)
with tqdm(total = extract_frame_total, desc = translator.get('extracting'), unit = 'frame', ascii = ' =', disable = state_manager.get_item('log_level') in [ 'warn', 'error' ]) as progress:
process = run_ffmpeg_with_progress(commands, partial(update_progress, progress))
return process.returncode == 0
def spawn_frames(target_path : str, output_path : str, temp_video_resolution : Resolution, temp_video_fps : Fps, trim_frame_start : int, trim_frame_end : int) -> bool:
spawn_frame_total = trim_frame_end - trim_frame_start
duration = spawn_frame_total / temp_video_fps
temp_frames_pattern = get_temp_frames_pattern(state_manager.get_temp_path(), output_path, state_manager.get_item('temp_frame_format'), '%08d')
commands = ffmpeg_builder.chain(
ffmpeg_builder.set_loop(),
ffmpeg_builder.set_input(target_path),
ffmpeg_builder.set_video_duration(duration),
ffmpeg_builder.set_video_fps(temp_video_fps),
ffmpeg_builder.set_media_resolution(pack_resolution(temp_video_resolution)),
ffmpeg_builder.set_output(temp_frames_pattern)
)
with tqdm(total = spawn_frame_total, desc = translator.get('spawning'), unit = 'frame', ascii = ' =', disable = state_manager.get_item('log_level') in [ 'warn', 'error' ]) as progress:
process = run_ffmpeg_with_progress(commands, partial(update_progress, progress))
return process.returncode == 0
def copy_image(target_path : str, output_path : str, temp_image_resolution : Resolution) -> bool:
temp_image_path = get_temp_file_path(state_manager.get_temp_path(), output_path)
commands = ffmpeg_builder.chain(
ffmpeg_builder.set_input(target_path),
ffmpeg_builder.set_media_resolution(pack_resolution(temp_image_resolution)),
ffmpeg_builder.set_image_quality(target_path, 100),
ffmpeg_builder.force_output(temp_image_path)
)
return run_ffmpeg(commands).returncode == 0
def finalize_image(output_path : str, output_image_resolution : Resolution) -> bool:
output_image_quality = state_manager.get_item('output_image_quality')
temp_image_path = get_temp_file_path(state_manager.get_temp_path(), output_path)
commands = ffmpeg_builder.chain(
ffmpeg_builder.set_input(temp_image_path),
ffmpeg_builder.set_media_resolution(pack_resolution(output_image_resolution)),
ffmpeg_builder.set_image_quality(output_path, output_image_quality),
ffmpeg_builder.force_output(output_path)
)
return run_ffmpeg(commands).returncode == 0
def read_audio_buffer(target_path : str, audio_sample_rate : SampleRate, audio_sample_size : int, audio_channel_total : int) -> Optional[AudioBuffer]:
commands = ffmpeg_builder.chain(
ffmpeg_builder.set_input(target_path),
ffmpeg_builder.ignore_video_stream(),
ffmpeg_builder.set_audio_sample_rate(audio_sample_rate),
ffmpeg_builder.set_audio_sample_size(audio_sample_size),
ffmpeg_builder.set_audio_channel_total(audio_channel_total),
ffmpeg_builder.cast_stream()
)
process = open_ffmpeg(commands)
audio_buffer, _ = process.communicate()
if process.returncode == 0:
return audio_buffer
return None
def restore_audio(target_path : str, output_path : str, trim_frame_start : int, trim_frame_end : int) -> bool:
output_audio_encoder = state_manager.get_item('output_audio_encoder')
output_audio_quality = state_manager.get_item('output_audio_quality')
output_audio_volume = state_manager.get_item('output_audio_volume')
target_video_fps = detect_video_fps(target_path)
temp_video_path = get_temp_file_path(state_manager.get_temp_path(), output_path)
temp_video_format = cast(VideoFormat, get_file_format(output_path))
temp_video_duration = detect_video_duration(temp_video_path)
output_audio_encoder = fix_audio_encoder(temp_video_format, output_audio_encoder)
commands = ffmpeg_builder.chain(
ffmpeg_builder.set_input(temp_video_path),
ffmpeg_builder.select_media_range(trim_frame_start, trim_frame_end, target_video_fps),
ffmpeg_builder.set_input(target_path),
ffmpeg_builder.copy_video_encoder(),
ffmpeg_builder.set_audio_encoder(output_audio_encoder),
ffmpeg_builder.set_audio_quality(output_audio_encoder, output_audio_quality),
ffmpeg_builder.set_audio_volume(output_audio_volume),
ffmpeg_builder.select_media_stream('0:v:0'),
ffmpeg_builder.select_media_stream('1:a:0'),
ffmpeg_builder.set_video_duration(temp_video_duration),
ffmpeg_builder.force_output(output_path)
)
return run_ffmpeg(commands).returncode == 0
def replace_audio(audio_path : str, output_path : str) -> bool:
output_audio_encoder = state_manager.get_item('output_audio_encoder')
output_audio_quality = state_manager.get_item('output_audio_quality')
output_audio_volume = state_manager.get_item('output_audio_volume')
temp_video_path = get_temp_file_path(state_manager.get_temp_path(), output_path)
temp_video_format = cast(VideoFormat, get_file_format(output_path))
temp_video_duration = detect_video_duration(temp_video_path)
output_audio_encoder = fix_audio_encoder(temp_video_format, output_audio_encoder)
commands = ffmpeg_builder.chain(
ffmpeg_builder.set_input(temp_video_path),
ffmpeg_builder.set_input(audio_path),
ffmpeg_builder.copy_video_encoder(),
ffmpeg_builder.set_audio_encoder(output_audio_encoder),
ffmpeg_builder.set_audio_quality(output_audio_encoder, output_audio_quality),
ffmpeg_builder.set_audio_volume(output_audio_volume),
ffmpeg_builder.set_video_duration(temp_video_duration),
ffmpeg_builder.force_output(output_path)
)
return run_ffmpeg(commands).returncode == 0
def merge_video(target_path : str, output_path : str, temp_video_fps : Fps, output_video_fps : Fps, output_video_resolution : Resolution, trim_frame_start : int, trim_frame_end : int) -> bool:
output_video_encoder = state_manager.get_item('output_video_encoder')
output_video_quality = state_manager.get_item('output_video_quality')
output_video_preset = state_manager.get_item('output_video_preset')
merge_frame_total = predict_video_frame_total(target_path, output_video_fps, trim_frame_start, trim_frame_end)
temp_video_path = get_temp_file_path(state_manager.get_temp_path(), output_path)
temp_video_format = cast(VideoFormat, get_file_format(output_path))
temp_frames_pattern = get_temp_frames_pattern(state_manager.get_temp_path(), output_path, state_manager.get_item('temp_frame_format'), '%08d')
output_video_encoder = fix_video_encoder(temp_video_format, output_video_encoder)
commands = ffmpeg_builder.chain(
ffmpeg_builder.set_input_fps(temp_video_fps),
ffmpeg_builder.set_input(temp_frames_pattern),
ffmpeg_builder.set_media_resolution(pack_resolution(output_video_resolution)),
ffmpeg_builder.set_video_encoder(output_video_encoder),
ffmpeg_builder.set_video_quality(output_video_encoder, output_video_quality),
ffmpeg_builder.set_video_preset(output_video_encoder, output_video_preset),
ffmpeg_builder.concat(
ffmpeg_builder.set_video_fps(output_video_fps),
ffmpeg_builder.keep_video_alpha(output_video_encoder)
),
ffmpeg_builder.set_pixel_format(output_video_encoder),
ffmpeg_builder.force_output(temp_video_path)
)
with tqdm(total = merge_frame_total, desc = translator.get('merging'), unit = 'frame', ascii = ' =', disable = state_manager.get_item('log_level') in [ 'warn', 'error' ]) as progress:
process = run_ffmpeg_with_progress(commands, partial(update_progress, progress))
return process.returncode == 0
def concat_video(output_path : str, temp_output_paths : List[str]) -> bool:
file_descriptor, concat_video_path = tempfile.mkstemp()
os.close(file_descriptor)
with open(concat_video_path, 'w') as concat_video_file:
for temp_output_path in temp_output_paths:
concat_video_file.write('file \'' + os.path.abspath(temp_output_path) + '\'' + os.linesep)
concat_video_file.flush()
concat_video_file.close()
output_path = os.path.abspath(output_path)
commands = ffmpeg_builder.chain(
ffmpeg_builder.unsafe_concat(),
ffmpeg_builder.set_input(concat_video_file.name),
ffmpeg_builder.copy_video_encoder(),
ffmpeg_builder.copy_audio_encoder(),
ffmpeg_builder.force_output(output_path)
)
process = run_ffmpeg(commands)
process.communicate()
remove_file(concat_video_path)
return process.returncode == 0
def sanitize_audio(file_content : bytes, asset_path : str, security_strategy : ApiSecurityStrategy) -> bool:
if security_strategy == 'strict':
commands = ffmpeg_builder.chain(
ffmpeg_builder.set_input('pipe:0'),
ffmpeg_builder.deep_copy_audio(),
ffmpeg_builder.strip_metadata(),
ffmpeg_builder.force_output(asset_path)
)
return run_ffmpeg_with_pipe(commands, file_content).returncode == 0
commands = ffmpeg_builder.chain(
ffmpeg_builder.set_input('pipe:0'),
ffmpeg_builder.copy_audio_encoder(),
ffmpeg_builder.strip_metadata(),
ffmpeg_builder.force_output(asset_path)
)
return run_ffmpeg_with_pipe(commands, file_content).returncode == 0
def sanitize_image(file_content : bytes, asset_path : str) -> bool:
commands = ffmpeg_builder.chain(
ffmpeg_builder.set_input('pipe:0'),
ffmpeg_builder.deep_copy_image(),
ffmpeg_builder.strip_metadata(),
ffmpeg_builder.force_output(asset_path)
)
return run_ffmpeg_with_pipe(commands, file_content).returncode == 0
def sanitize_video(file_content : bytes, asset_path : str, security_strategy : ApiSecurityStrategy) -> bool:
if security_strategy == 'strict':
available_video_encoders = get_static_available_encoder_set().get('video')
commands = ffmpeg_builder.chain(
ffmpeg_builder.set_input('pipe:0'),
ffmpeg_builder.set_video_encoder(available_video_encoders[0]),
ffmpeg_builder.set_video_preset(available_video_encoders[0], 'ultrafast'),
ffmpeg_builder.set_pixel_format(available_video_encoders[0]),
ffmpeg_builder.deep_copy_video(),
ffmpeg_builder.deep_copy_audio(),
ffmpeg_builder.strip_metadata(),
ffmpeg_builder.force_output(asset_path)
)
return run_ffmpeg_with_pipe(commands, file_content).returncode == 0
commands = ffmpeg_builder.chain(
ffmpeg_builder.set_input('pipe:0'),
ffmpeg_builder.copy_video_encoder(),
ffmpeg_builder.copy_audio_encoder(),
ffmpeg_builder.strip_metadata(),
ffmpeg_builder.force_output(asset_path)
)
return run_ffmpeg_with_pipe(commands, file_content).returncode == 0
def fix_audio_encoder(video_format : VideoFormat, audio_encoder : AudioEncoder) -> AudioEncoder:
if video_format == 'avi' and audio_encoder == 'libopus':
return 'aac'
if video_format in [ 'm4v', 'mpeg', 'wmv' ]:
return 'aac'
if video_format == 'mov' and audio_encoder in [ 'flac', 'libopus' ]:
return 'aac'
if video_format == 'mxf':
return 'pcm_s16le'
if video_format == 'webm':
return 'libopus'
return audio_encoder
def fix_video_encoder(video_format : VideoFormat, video_encoder : VideoEncoder) -> VideoEncoder:
if video_format in [ 'm4v', 'mpeg', 'mxf', 'wmv' ]:
return 'libx264'
if video_format in [ 'mkv', 'mp4' ] and video_encoder == 'rawvideo':
return 'libx264'
if video_format == 'mov' and video_encoder == 'libvpx-vp9':
return 'libx264'
if video_format == 'webm':
return 'libvpx-vp9'
return video_encoder