From ba8093e844ba88e3587b325315fd286180ec941b Mon Sep 17 00:00:00 2001 From: Henry Ruhs Date: Mon, 26 Jan 2026 10:27:01 +0100 Subject: [PATCH] Revisit and cleanup ffprobe integration (#1027) * Revisit and cleanup ffprobe integration * Revisit and cleanup ffprobe integration --- facefusion/apis/asset_helper.py | 7 ++- facefusion/ffprobe.py | 85 ++++++++++++--------------------- facefusion/ffprobe_builder.py | 6 +-- facefusion/types.py | 5 +- tests/test_ffprobe.py | 16 ++----- tests/test_ffprobe_builder.py | 19 ++------ 6 files changed, 47 insertions(+), 91 deletions(-) diff --git a/facefusion/apis/asset_helper.py b/facefusion/apis/asset_helper.py index 2538ee57..d646b869 100644 --- a/facefusion/apis/asset_helper.py +++ b/facefusion/apis/asset_helper.py @@ -1,7 +1,7 @@ from typing import Optional from facefusion.audio import detect_audio_duration -from facefusion.ffprobe import detect_audio_channel_total, detect_audio_format, detect_audio_frame_total, detect_audio_sample_rate +from facefusion.ffprobe import detect_audio_channel_total, detect_audio_frame_total, detect_audio_sample_rate from facefusion.filesystem import is_audio, is_image, is_video from facefusion.types import AudioMetadata, ImageMetadata, MediaType, VideoMetadata from facefusion.vision import count_video_frame_total, detect_image_resolution, detect_video_duration, detect_video_fps, detect_video_resolution @@ -11,10 +11,9 @@ def extract_audio_metadata(file_path : str) -> AudioMetadata: metadata : AudioMetadata =\ { 'duration': detect_audio_duration(file_path), - 'sample_rate': detect_audio_sample_rate(file_path), 'frame_total': detect_audio_frame_total(file_path), - 'channels': detect_audio_channel_total(file_path), - 'format': detect_audio_format(file_path) + 'sample_rate': detect_audio_sample_rate(file_path), + 'channels': detect_audio_channel_total(file_path) } return metadata diff --git a/facefusion/ffprobe.py b/facefusion/ffprobe.py index d29c2bc6..bdc6477f 100644 --- a/facefusion/ffprobe.py +++ b/facefusion/ffprobe.py @@ -1,6 +1,6 @@ import os import subprocess -from typing import List, Optional +from typing import Dict, List, Optional from facefusion import ffprobe_builder from facefusion.types import Command @@ -11,73 +11,50 @@ def run_ffprobe(commands : List[Command]) -> subprocess.Popen[bytes]: return subprocess.Popen(commands, stderr = subprocess.PIPE, stdout = subprocess.PIPE) -def detect_audio_sample_rate(audio_path : str) -> Optional[int]: +def get_audio_entries(audio_path : str) -> Dict[str, str]: + audio_entries = {} commands = ffprobe_builder.chain( - ffprobe_builder.select_audio_stream(0), - ffprobe_builder.show_stream_entries([ 'sample_rate' ]), - ffprobe_builder.format_to_value(), - ffprobe_builder.set_input(audio_path) - ) - process = run_ffprobe(commands) - output, _ = process.communicate() - - if process.returncode == 0 and output: - return int(output.decode().strip()) - return None - - -def detect_audio_channel_total(audio_path : str) -> Optional[int]: - commands = ffprobe_builder.chain( - ffprobe_builder.select_audio_stream(0), - ffprobe_builder.show_stream_entries([ 'channels' ]), - ffprobe_builder.format_to_value(), - ffprobe_builder.set_input(audio_path) - ) - process = run_ffprobe(commands) - output, _ = process.communicate() - - if process.returncode == 0 and output: - return int(output.decode().strip()) - return None - - -def detect_audio_frame_total(audio_path : str) -> Optional[int]: - commands = ffprobe_builder.chain( - ffprobe_builder.select_audio_stream(0), - ffprobe_builder.show_stream_entries([ 'duration', 'sample_rate' ]), + ffprobe_builder.show_entries([ 'duration', 'sample_rate', 'channels', 'nb_read_frames' ]), ffprobe_builder.format_to_key_value(), ffprobe_builder.set_input(audio_path) ) process = run_ffprobe(commands) output, _ = process.communicate() - if process.returncode == 0 and output: - duration = None - sample_rate = None + if output: lines = output.decode().strip().split(os.linesep) for line in lines: - if line.startswith('duration='): - duration = float(line.split('=')[1]) - if line.startswith('sample_rate='): - sample_rate = int(line.split('=')[1]) + if '=' in line: + key, value = line.split('=', 1) + audio_entries[key] = value - if duration and sample_rate: - return int(duration * sample_rate) + return audio_entries + +def detect_audio_sample_rate(audio_path : str) -> Optional[int]: + audio_entries = get_audio_entries(audio_path) + sample_rate = audio_entries.get('sample_rate') + + if sample_rate: + return int(sample_rate) return None -def detect_audio_format(audio_path : str) -> Optional[str]: - commands = ffprobe_builder.chain( - ffprobe_builder.select_audio_stream(0), - ffprobe_builder.show_stream_entries([ 'codec_name' ]), - ffprobe_builder.format_to_value(), - ffprobe_builder.set_input(audio_path) - ) - process = run_ffprobe(commands) - output, _ = process.communicate() +def detect_audio_channel_total(audio_path : str) -> Optional[int]: + audio_entries = get_audio_entries(audio_path) + audio_channel_total = audio_entries.get('channels') - if process.returncode == 0 and output: - return output.decode().strip() + if audio_channel_total: + return int(audio_channel_total) + return None + + +def detect_audio_frame_total(audio_path : str) -> Optional[int]: + audio_entries = get_audio_entries(audio_path) + audio_duration = audio_entries.get('duration') + audio_sample_rate = audio_entries.get('sample_rate') + + if audio_duration and audio_sample_rate: + return int(float(audio_duration) * int(audio_sample_rate)) return None diff --git a/facefusion/ffprobe_builder.py b/facefusion/ffprobe_builder.py index 7be3120e..974749cb 100644 --- a/facefusion/ffprobe_builder.py +++ b/facefusion/ffprobe_builder.py @@ -13,11 +13,7 @@ def chain(*commands : List[Command]) -> List[Command]: return list(itertools.chain(*commands)) -def select_audio_stream(index : int) -> List[Command]: - return [ '-select_streams', 'a:' + str(index) ] - - -def show_stream_entries(entries : List[str]) -> List[Command]: +def show_entries(entries : List[str]) -> List[Command]: return [ '-show_entries', 'stream=' + ','.join(entries) ] diff --git a/facefusion/types.py b/facefusion/types.py index cbedcdcc..3eb22a2f 100755 --- a/facefusion/types.py +++ b/facefusion/types.py @@ -175,10 +175,9 @@ MediaType = Literal['image', 'video', 'audio'] AudioMetadata = TypedDict('AudioMetadata', { 'duration' : Duration, - 'sample_rate': int, 'frame_total': int, - 'channels': int, - 'format': str + 'sample_rate': int, + 'channels': int }) ImageMetadata = TypedDict('ImageMetadata', { diff --git a/tests/test_ffprobe.py b/tests/test_ffprobe.py index 87f1a9fd..f642fdbb 100644 --- a/tests/test_ffprobe.py +++ b/tests/test_ffprobe.py @@ -4,7 +4,7 @@ import pytest from facefusion import process_manager from facefusion.download import conditional_download -from facefusion.ffprobe import detect_audio_channel_total, detect_audio_format, detect_audio_frame_total, detect_audio_sample_rate +from facefusion.ffprobe import detect_audio_channel_total, detect_audio_frame_total, detect_audio_sample_rate from .helper import get_test_example_file, get_test_examples_directory @@ -15,28 +15,22 @@ def before_all() -> None: [ 'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/source.mp3' ]) - subprocess.run([ 'ffmpeg', '-i', get_test_example_file('source.mp3'), '-t', '1.9', '-ar', '48000', '-ac', '2', get_test_example_file('source.wav') ]) + subprocess.run([ 'ffmpeg', '-i', get_test_example_file('source.mp3'), '-t', '1.9', '-ar', '48000', '-ac', '2', get_test_example_file('test-audio-entries.wav') ]) def test_detect_audio_sample_rate() -> None: assert detect_audio_sample_rate(get_test_example_file('source.mp3')) == 44100 - assert detect_audio_sample_rate(get_test_example_file('source.wav')) == 44100 + assert detect_audio_sample_rate(get_test_example_file('test-audio-entries.wav')) == 48000 assert detect_audio_sample_rate(get_test_example_file('invalid.mp3')) is None def test_detect_audio_channel_total() -> None: assert detect_audio_channel_total(get_test_example_file('source.mp3')) == 1 - assert detect_audio_channel_total(get_test_example_file('source.wav')) == 1 + assert detect_audio_channel_total(get_test_example_file('test-audio-entries.wav')) == 2 assert detect_audio_channel_total(get_test_example_file('invalid.mp3')) is None def test_detect_audio_frame_total() -> None: assert detect_audio_frame_total(get_test_example_file('source.mp3')) == 167039 - assert detect_audio_frame_total(get_test_example_file('source.wav')) == 167039 + assert detect_audio_frame_total(get_test_example_file('test-audio-entries.wav')) == 91200 assert detect_audio_frame_total(get_test_example_file('invalid.mp3')) is None - - -def test_detect_audio_format() -> None: - assert detect_audio_format(get_test_example_file('source.mp3')) == 'mp3' - assert detect_audio_format(get_test_example_file('source.wav')) == 'pcm_s16le' - assert detect_audio_format(get_test_example_file('invalid.mp3')) is None diff --git a/tests/test_ffprobe_builder.py b/tests/test_ffprobe_builder.py index 929b7a9e..411f9ccd 100644 --- a/tests/test_ffprobe_builder.py +++ b/tests/test_ffprobe_builder.py @@ -1,7 +1,7 @@ from shutil import which from facefusion import ffprobe_builder -from facefusion.ffprobe_builder import chain, format_to_key_value, format_to_value, run, select_audio_stream, set_input, show_stream_entries +from facefusion.ffprobe_builder import chain, format_to_key_value, format_to_value, run, set_input, show_entries def test_run() -> None: @@ -10,24 +10,15 @@ def test_run() -> None: def test_chain() -> None: assert chain( - ffprobe_builder.select_audio_stream(0) - ) == [ '-select_streams', 'a:0' ] - assert chain( - ffprobe_builder.select_audio_stream(0), - ffprobe_builder.show_stream_entries([ 'sample_rate' ]), + ffprobe_builder.show_entries([ 'sample_rate' ]), ffprobe_builder.format_to_value(), ffprobe_builder.set_input('audio.mp3') - ) == [ '-select_streams', 'a:0', '-show_entries', 'stream=sample_rate', '-of', 'default=noprint_wrappers=1:nokey=1', 'audio.mp3' ] - - -def test_select_audio_stream() -> None: - assert select_audio_stream(0) == [ '-select_streams', 'a:0' ] + ) == [ '-show_entries', 'stream=sample_rate', '-of', 'default=noprint_wrappers=1:nokey=1', 'audio.mp3' ] def test_show_entries() -> None: - assert show_stream_entries([ 'sample_rate' ]) == [ '-show_entries', 'stream=sample_rate' ] - assert show_stream_entries([ 'channels' ]) == [ '-show_entries', 'stream=channels' ] - assert show_stream_entries([ 'duration', 'sample_rate' ]) == [ '-show_entries', 'stream=duration,sample_rate' ] + assert show_entries([ 'duration' ]) == [ '-show_entries', 'stream=duration' ] + assert show_entries([ 'duration', 'sample_rate']) == [ '-show_entries', 'stream=duration,sample_rate' ] def test_format_to_value() -> None: