Feat/audio metadata (#1019)

* audio metadata

* audio metadata

* audio metadata

* audio metadata

* audio metadata

* audio metadata

* audio metadata
This commit is contained in:
Harisreedhar
2026-01-15 13:09:38 +05:30
committed by henryruhs
parent 771ffe3081
commit d5fb30ba65
6 changed files with 232 additions and 2 deletions
+6 -1
View File
@@ -4,6 +4,7 @@ from starlette.datastructures import Headers
from starlette.types import Scope
from facefusion.audio import detect_audio_duration
from facefusion.ffprobe import detect_audio_channel_total, detect_audio_format, detect_audio_frame_total, detect_audio_sample_rate
from facefusion.types import AudioMetadata, ImageMetadata, VideoMetadata
from facefusion.vision import count_video_frame_total, detect_image_resolution, detect_video_duration, detect_video_fps, detect_video_resolution
@@ -21,7 +22,11 @@ def get_sec_websocket_protocol(scope : Scope) -> Optional[str]:
def extract_audio_metadata(file_path : str) -> AudioMetadata:
metadata : AudioMetadata =\
{
'duration' : detect_audio_duration(file_path)
'duration' : detect_audio_duration(file_path),
'sample_rate' : detect_audio_sample_rate(file_path),
'frame_total' : detect_audio_frame_total(file_path),
'channels' : detect_audio_channel_total(file_path),
'format' : detect_audio_format(file_path)
}
return metadata
+83
View File
@@ -0,0 +1,83 @@
import os
import subprocess
from typing import List, Optional
from facefusion import ffprobe_builder
from facefusion.types import Command
def run_ffprobe(commands : List[Command]) -> subprocess.Popen[bytes]:
commands = ffprobe_builder.run(commands)
return subprocess.Popen(commands, stderr = subprocess.PIPE, stdout = subprocess.PIPE)
def detect_audio_sample_rate(audio_path : str) -> Optional[int]:
commands = ffprobe_builder.chain(
ffprobe_builder.select_audio_stream(0),
ffprobe_builder.show_stream_entries([ 'sample_rate' ]),
ffprobe_builder.format_to_value(),
ffprobe_builder.set_input(audio_path)
)
process = run_ffprobe(commands)
output, _ = process.communicate()
if process.returncode == 0 and output:
return int(output.decode().strip())
return None
def detect_audio_channel_total(audio_path : str) -> Optional[int]:
commands = ffprobe_builder.chain(
ffprobe_builder.select_audio_stream(0),
ffprobe_builder.show_stream_entries([ 'channels' ]),
ffprobe_builder.format_to_value(),
ffprobe_builder.set_input(audio_path)
)
process = run_ffprobe(commands)
output, _ = process.communicate()
if process.returncode == 0 and output:
return int(output.decode().strip())
return None
def detect_audio_frame_total(audio_path : str) -> Optional[int]:
commands = ffprobe_builder.chain(
ffprobe_builder.select_audio_stream(0),
ffprobe_builder.show_stream_entries([ 'duration', 'sample_rate' ]),
ffprobe_builder.format_to_key_value(),
ffprobe_builder.set_input(audio_path)
)
process = run_ffprobe(commands)
output, _ = process.communicate()
if process.returncode == 0 and output:
duration = None
sample_rate = None
lines = output.decode().strip().split(os.linesep)
for line in lines:
if line.startswith('duration='):
duration = float(line.split('=')[1])
if line.startswith('sample_rate='):
sample_rate = int(line.split('=')[1])
if duration and sample_rate:
return int(duration * sample_rate)
return None
def detect_audio_format(audio_path : str) -> Optional[str]:
commands = ffprobe_builder.chain(
ffprobe_builder.select_audio_stream(0),
ffprobe_builder.show_stream_entries([ 'codec_name' ]),
ffprobe_builder.format_to_value(),
ffprobe_builder.set_input(audio_path)
)
process = run_ffprobe(commands)
output, _ = process.communicate()
if process.returncode == 0 and output:
return output.decode().strip()
return None
+33
View File
@@ -0,0 +1,33 @@
import itertools
import shutil
from typing import List
from facefusion.types import Command
def run(commands : List[Command]) -> List[Command]:
return [ shutil.which('ffprobe'), '-loglevel', 'error' ] + commands
def chain(*commands : List[Command]) -> List[Command]:
return list(itertools.chain(*commands))
def select_audio_stream(index : int) -> List[Command]:
return [ '-select_streams', 'a:' + str(index) ]
def show_stream_entries(entries : List[str]) -> List[Command]:
return [ '-show_entries', 'stream=' + ','.join(entries) ]
def format_to_value() -> List[Command]:
return [ '-of', 'default=noprint_wrappers=1:nokey=1' ]
def format_to_key_value() -> List[Command]:
return [ '-of', 'default=noprint_wrappers=1' ]
def set_input(input_path : str) -> List[Command]:
return [ input_path ]
+5 -1
View File
@@ -174,7 +174,11 @@ AssetType = Literal['source', 'target']
MediaType = Literal['image', 'video', 'audio']
AudioMetadata = TypedDict('AudioMetadata',
{
'duration' : Duration
'duration' : Duration,
'sample_rate': int,
'frame_total': int,
'channels': int,
'format': str
})
ImageMetadata = TypedDict('ImageMetadata',
{
+62
View File
@@ -0,0 +1,62 @@
import subprocess
import pytest
from facefusion import process_manager
from facefusion.download import conditional_download
from facefusion.ffprobe import detect_audio_channel_total, detect_audio_format, detect_audio_frame_total, detect_audio_sample_rate
from .helper import get_test_example_file, get_test_examples_directory
@pytest.fixture(scope = 'module', autouse = True)
def before_all() -> None:
process_manager.start()
conditional_download(get_test_examples_directory(),
[
'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/source.mp3'
])
subprocess.run([ 'ffmpeg', '-i', get_test_example_file('source.mp3'), get_test_example_file('source.wav') ])
def test_detect_audio_sample_rate() -> None:
audio_sample_rate = detect_audio_sample_rate(get_test_example_file('source.mp3'))
assert audio_sample_rate == 44100
audio_sample_rate = detect_audio_sample_rate(get_test_example_file('source.wav'))
assert audio_sample_rate == 44100
audio_sample_rate = detect_audio_sample_rate(get_test_example_file('invalid.mp3'))
assert audio_sample_rate is None
def test_detect_audio_channel_total() -> None:
audio_channel_total = detect_audio_channel_total(get_test_example_file('source.mp3'))
assert audio_channel_total == 1
audio_channel_total = detect_audio_channel_total(get_test_example_file('source.wav'))
assert audio_channel_total == 1
audio_channel_total = detect_audio_channel_total(get_test_example_file('invalid.mp3'))
assert audio_channel_total is None
def test_detect_audio_frame_total() -> None:
audio_frame_total = detect_audio_frame_total(get_test_example_file('source.mp3'))
assert audio_frame_total == 167039
audio_frame_total = detect_audio_frame_total(get_test_example_file('source.wav'))
assert audio_frame_total == 167039
audio_frame_total = detect_audio_frame_total(get_test_example_file('invalid.mp3'))
assert audio_frame_total is None
def test_detect_audio_format() -> None:
audio_format = detect_audio_format(get_test_example_file('source.mp3'))
assert audio_format == 'mp3'
audio_format = detect_audio_format(get_test_example_file('source.wav'))
assert audio_format == 'pcm_s16le'
audio_format = detect_audio_format(get_test_example_file('invalid.mp3'))
assert audio_format is None
+43
View File
@@ -0,0 +1,43 @@
from shutil import which
from facefusion import ffprobe_builder
from facefusion.ffprobe_builder import chain, format_to_key_value, format_to_value, run, select_audio_stream, set_input, show_stream_entries
def test_run() -> None:
assert run([ '-v', 'error' ]) == [ which('ffprobe'), '-loglevel', 'error', '-v', 'error' ]
def test_chain() -> None:
assert chain(
ffprobe_builder.select_audio_stream(0)
) == [ '-select_streams', 'a:0' ]
assert chain(
ffprobe_builder.select_audio_stream(0),
ffprobe_builder.show_stream_entries([ 'sample_rate' ]),
ffprobe_builder.format_to_value(),
ffprobe_builder.set_input('audio.mp3')
) == [ '-select_streams', 'a:0', '-show_entries', 'stream=sample_rate', '-of', 'default=noprint_wrappers=1:nokey=1', 'audio.mp3' ]
def test_select_audio_stream() -> None:
assert select_audio_stream(0) == [ '-select_streams', 'a:0' ]
def test_show_entries() -> None:
assert show_stream_entries([ 'sample_rate' ]) == [ '-show_entries', 'stream=sample_rate' ]
assert show_stream_entries([ 'channels' ]) == [ '-show_entries', 'stream=channels' ]
assert show_stream_entries([ 'duration', 'sample_rate' ]) == [ '-show_entries', 'stream=duration,sample_rate' ]
def test_format_to_value() -> None:
assert format_to_value() == [ '-of', 'default=noprint_wrappers=1:nokey=1' ]
def test_format_to_key_value() -> None:
assert format_to_key_value() == [ '-of', 'default=noprint_wrappers=1' ]
def test_set_input() -> None:
assert set_input('input.mp3') == [ 'input.mp3' ]
assert set_input('input.wav') == [ 'input.wav' ]