From d5fb30ba65cfba7f8286c9abc9f4aafa3b2f7f34 Mon Sep 17 00:00:00 2001 From: Harisreedhar <46858047+harisreedhar@users.noreply.github.com> Date: Thu, 15 Jan 2026 13:09:38 +0530 Subject: [PATCH] Feat/audio metadata (#1019) * audio metadata * audio metadata * audio metadata * audio metadata * audio metadata * audio metadata * audio metadata --- facefusion/apis/api_helper.py | 7 ++- facefusion/ffprobe.py | 83 +++++++++++++++++++++++++++++++++++ facefusion/ffprobe_builder.py | 33 ++++++++++++++ facefusion/types.py | 6 ++- tests/test_ffprobe.py | 62 ++++++++++++++++++++++++++ tests/test_ffprobe_builder.py | 43 ++++++++++++++++++ 6 files changed, 232 insertions(+), 2 deletions(-) create mode 100644 facefusion/ffprobe.py create mode 100644 facefusion/ffprobe_builder.py create mode 100644 tests/test_ffprobe.py create mode 100644 tests/test_ffprobe_builder.py diff --git a/facefusion/apis/api_helper.py b/facefusion/apis/api_helper.py index 30a29747..ed8d7e03 100644 --- a/facefusion/apis/api_helper.py +++ b/facefusion/apis/api_helper.py @@ -4,6 +4,7 @@ from starlette.datastructures import Headers from starlette.types import Scope from facefusion.audio import detect_audio_duration +from facefusion.ffprobe import detect_audio_channel_total, detect_audio_format, detect_audio_frame_total, detect_audio_sample_rate from facefusion.types import AudioMetadata, ImageMetadata, VideoMetadata from facefusion.vision import count_video_frame_total, detect_image_resolution, detect_video_duration, detect_video_fps, detect_video_resolution @@ -21,7 +22,11 @@ def get_sec_websocket_protocol(scope : Scope) -> Optional[str]: def extract_audio_metadata(file_path : str) -> AudioMetadata: metadata : AudioMetadata =\ { - 'duration' : detect_audio_duration(file_path) + 'duration' : detect_audio_duration(file_path), + 'sample_rate' : detect_audio_sample_rate(file_path), + 'frame_total' : detect_audio_frame_total(file_path), + 'channels' : detect_audio_channel_total(file_path), + 'format' : detect_audio_format(file_path) } return metadata diff --git a/facefusion/ffprobe.py b/facefusion/ffprobe.py new file mode 100644 index 00000000..d29c2bc6 --- /dev/null +++ b/facefusion/ffprobe.py @@ -0,0 +1,83 @@ +import os +import subprocess +from typing import List, Optional + +from facefusion import ffprobe_builder +from facefusion.types import Command + + +def run_ffprobe(commands : List[Command]) -> subprocess.Popen[bytes]: + commands = ffprobe_builder.run(commands) + return subprocess.Popen(commands, stderr = subprocess.PIPE, stdout = subprocess.PIPE) + + +def detect_audio_sample_rate(audio_path : str) -> Optional[int]: + commands = ffprobe_builder.chain( + ffprobe_builder.select_audio_stream(0), + ffprobe_builder.show_stream_entries([ 'sample_rate' ]), + ffprobe_builder.format_to_value(), + ffprobe_builder.set_input(audio_path) + ) + process = run_ffprobe(commands) + output, _ = process.communicate() + + if process.returncode == 0 and output: + return int(output.decode().strip()) + return None + + +def detect_audio_channel_total(audio_path : str) -> Optional[int]: + commands = ffprobe_builder.chain( + ffprobe_builder.select_audio_stream(0), + ffprobe_builder.show_stream_entries([ 'channels' ]), + ffprobe_builder.format_to_value(), + ffprobe_builder.set_input(audio_path) + ) + process = run_ffprobe(commands) + output, _ = process.communicate() + + if process.returncode == 0 and output: + return int(output.decode().strip()) + return None + + +def detect_audio_frame_total(audio_path : str) -> Optional[int]: + commands = ffprobe_builder.chain( + ffprobe_builder.select_audio_stream(0), + ffprobe_builder.show_stream_entries([ 'duration', 'sample_rate' ]), + ffprobe_builder.format_to_key_value(), + ffprobe_builder.set_input(audio_path) + ) + process = run_ffprobe(commands) + output, _ = process.communicate() + + if process.returncode == 0 and output: + duration = None + sample_rate = None + lines = output.decode().strip().split(os.linesep) + + for line in lines: + if line.startswith('duration='): + duration = float(line.split('=')[1]) + if line.startswith('sample_rate='): + sample_rate = int(line.split('=')[1]) + + if duration and sample_rate: + return int(duration * sample_rate) + + return None + + +def detect_audio_format(audio_path : str) -> Optional[str]: + commands = ffprobe_builder.chain( + ffprobe_builder.select_audio_stream(0), + ffprobe_builder.show_stream_entries([ 'codec_name' ]), + ffprobe_builder.format_to_value(), + ffprobe_builder.set_input(audio_path) + ) + process = run_ffprobe(commands) + output, _ = process.communicate() + + if process.returncode == 0 and output: + return output.decode().strip() + return None diff --git a/facefusion/ffprobe_builder.py b/facefusion/ffprobe_builder.py new file mode 100644 index 00000000..7be3120e --- /dev/null +++ b/facefusion/ffprobe_builder.py @@ -0,0 +1,33 @@ +import itertools +import shutil +from typing import List + +from facefusion.types import Command + + +def run(commands : List[Command]) -> List[Command]: + return [ shutil.which('ffprobe'), '-loglevel', 'error' ] + commands + + +def chain(*commands : List[Command]) -> List[Command]: + return list(itertools.chain(*commands)) + + +def select_audio_stream(index : int) -> List[Command]: + return [ '-select_streams', 'a:' + str(index) ] + + +def show_stream_entries(entries : List[str]) -> List[Command]: + return [ '-show_entries', 'stream=' + ','.join(entries) ] + + +def format_to_value() -> List[Command]: + return [ '-of', 'default=noprint_wrappers=1:nokey=1' ] + + +def format_to_key_value() -> List[Command]: + return [ '-of', 'default=noprint_wrappers=1' ] + + +def set_input(input_path : str) -> List[Command]: + return [ input_path ] diff --git a/facefusion/types.py b/facefusion/types.py index 3f67ac1b..bdaded68 100755 --- a/facefusion/types.py +++ b/facefusion/types.py @@ -174,7 +174,11 @@ AssetType = Literal['source', 'target'] MediaType = Literal['image', 'video', 'audio'] AudioMetadata = TypedDict('AudioMetadata', { - 'duration' : Duration + 'duration' : Duration, + 'sample_rate': int, + 'frame_total': int, + 'channels': int, + 'format': str }) ImageMetadata = TypedDict('ImageMetadata', { diff --git a/tests/test_ffprobe.py b/tests/test_ffprobe.py new file mode 100644 index 00000000..a7cc9f7b --- /dev/null +++ b/tests/test_ffprobe.py @@ -0,0 +1,62 @@ +import subprocess + +import pytest + +from facefusion import process_manager +from facefusion.download import conditional_download +from facefusion.ffprobe import detect_audio_channel_total, detect_audio_format, detect_audio_frame_total, detect_audio_sample_rate +from .helper import get_test_example_file, get_test_examples_directory + + +@pytest.fixture(scope = 'module', autouse = True) +def before_all() -> None: + process_manager.start() + conditional_download(get_test_examples_directory(), + [ + 'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/source.mp3' + ]) + subprocess.run([ 'ffmpeg', '-i', get_test_example_file('source.mp3'), get_test_example_file('source.wav') ]) + + +def test_detect_audio_sample_rate() -> None: + audio_sample_rate = detect_audio_sample_rate(get_test_example_file('source.mp3')) + assert audio_sample_rate == 44100 + + audio_sample_rate = detect_audio_sample_rate(get_test_example_file('source.wav')) + assert audio_sample_rate == 44100 + + audio_sample_rate = detect_audio_sample_rate(get_test_example_file('invalid.mp3')) + assert audio_sample_rate is None + + +def test_detect_audio_channel_total() -> None: + audio_channel_total = detect_audio_channel_total(get_test_example_file('source.mp3')) + assert audio_channel_total == 1 + + audio_channel_total = detect_audio_channel_total(get_test_example_file('source.wav')) + assert audio_channel_total == 1 + + audio_channel_total = detect_audio_channel_total(get_test_example_file('invalid.mp3')) + assert audio_channel_total is None + + +def test_detect_audio_frame_total() -> None: + audio_frame_total = detect_audio_frame_total(get_test_example_file('source.mp3')) + assert audio_frame_total == 167039 + + audio_frame_total = detect_audio_frame_total(get_test_example_file('source.wav')) + assert audio_frame_total == 167039 + + audio_frame_total = detect_audio_frame_total(get_test_example_file('invalid.mp3')) + assert audio_frame_total is None + + +def test_detect_audio_format() -> None: + audio_format = detect_audio_format(get_test_example_file('source.mp3')) + assert audio_format == 'mp3' + + audio_format = detect_audio_format(get_test_example_file('source.wav')) + assert audio_format == 'pcm_s16le' + + audio_format = detect_audio_format(get_test_example_file('invalid.mp3')) + assert audio_format is None diff --git a/tests/test_ffprobe_builder.py b/tests/test_ffprobe_builder.py new file mode 100644 index 00000000..929b7a9e --- /dev/null +++ b/tests/test_ffprobe_builder.py @@ -0,0 +1,43 @@ +from shutil import which + +from facefusion import ffprobe_builder +from facefusion.ffprobe_builder import chain, format_to_key_value, format_to_value, run, select_audio_stream, set_input, show_stream_entries + + +def test_run() -> None: + assert run([ '-v', 'error' ]) == [ which('ffprobe'), '-loglevel', 'error', '-v', 'error' ] + + +def test_chain() -> None: + assert chain( + ffprobe_builder.select_audio_stream(0) + ) == [ '-select_streams', 'a:0' ] + assert chain( + ffprobe_builder.select_audio_stream(0), + ffprobe_builder.show_stream_entries([ 'sample_rate' ]), + ffprobe_builder.format_to_value(), + ffprobe_builder.set_input('audio.mp3') + ) == [ '-select_streams', 'a:0', '-show_entries', 'stream=sample_rate', '-of', 'default=noprint_wrappers=1:nokey=1', 'audio.mp3' ] + + +def test_select_audio_stream() -> None: + assert select_audio_stream(0) == [ '-select_streams', 'a:0' ] + + +def test_show_entries() -> None: + assert show_stream_entries([ 'sample_rate' ]) == [ '-show_entries', 'stream=sample_rate' ] + assert show_stream_entries([ 'channels' ]) == [ '-show_entries', 'stream=channels' ] + assert show_stream_entries([ 'duration', 'sample_rate' ]) == [ '-show_entries', 'stream=duration,sample_rate' ] + + +def test_format_to_value() -> None: + assert format_to_value() == [ '-of', 'default=noprint_wrappers=1:nokey=1' ] + + +def test_format_to_key_value() -> None: + assert format_to_key_value() == [ '-of', 'default=noprint_wrappers=1' ] + + +def test_set_input() -> None: + assert set_input('input.mp3') == [ 'input.mp3' ] + assert set_input('input.wav') == [ 'input.wav' ]