Revisit and cleanup ffprobe integration (#1027)

* Revisit and cleanup ffprobe integration

* Revisit and cleanup ffprobe integration
This commit is contained in:
Henry Ruhs
2026-01-26 10:27:01 +01:00
committed by henryruhs
parent 1b8e3b00b0
commit ba8093e844
6 changed files with 47 additions and 91 deletions
+3 -4
View File
@@ -1,7 +1,7 @@
from typing import Optional
from facefusion.audio import detect_audio_duration
from facefusion.ffprobe import detect_audio_channel_total, detect_audio_format, detect_audio_frame_total, detect_audio_sample_rate
from facefusion.ffprobe import detect_audio_channel_total, detect_audio_frame_total, detect_audio_sample_rate
from facefusion.filesystem import is_audio, is_image, is_video
from facefusion.types import AudioMetadata, ImageMetadata, MediaType, VideoMetadata
from facefusion.vision import count_video_frame_total, detect_image_resolution, detect_video_duration, detect_video_fps, detect_video_resolution
@@ -11,10 +11,9 @@ def extract_audio_metadata(file_path : str) -> AudioMetadata:
metadata : AudioMetadata =\
{
'duration': detect_audio_duration(file_path),
'sample_rate': detect_audio_sample_rate(file_path),
'frame_total': detect_audio_frame_total(file_path),
'channels': detect_audio_channel_total(file_path),
'format': detect_audio_format(file_path)
'sample_rate': detect_audio_sample_rate(file_path),
'channels': detect_audio_channel_total(file_path)
}
return metadata
+31 -54
View File
@@ -1,6 +1,6 @@
import os
import subprocess
from typing import List, Optional
from typing import Dict, List, Optional
from facefusion import ffprobe_builder
from facefusion.types import Command
@@ -11,73 +11,50 @@ def run_ffprobe(commands : List[Command]) -> subprocess.Popen[bytes]:
return subprocess.Popen(commands, stderr = subprocess.PIPE, stdout = subprocess.PIPE)
def detect_audio_sample_rate(audio_path : str) -> Optional[int]:
def get_audio_entries(audio_path : str) -> Dict[str, str]:
audio_entries = {}
commands = ffprobe_builder.chain(
ffprobe_builder.select_audio_stream(0),
ffprobe_builder.show_stream_entries([ 'sample_rate' ]),
ffprobe_builder.format_to_value(),
ffprobe_builder.set_input(audio_path)
)
process = run_ffprobe(commands)
output, _ = process.communicate()
if process.returncode == 0 and output:
return int(output.decode().strip())
return None
def detect_audio_channel_total(audio_path : str) -> Optional[int]:
commands = ffprobe_builder.chain(
ffprobe_builder.select_audio_stream(0),
ffprobe_builder.show_stream_entries([ 'channels' ]),
ffprobe_builder.format_to_value(),
ffprobe_builder.set_input(audio_path)
)
process = run_ffprobe(commands)
output, _ = process.communicate()
if process.returncode == 0 and output:
return int(output.decode().strip())
return None
def detect_audio_frame_total(audio_path : str) -> Optional[int]:
commands = ffprobe_builder.chain(
ffprobe_builder.select_audio_stream(0),
ffprobe_builder.show_stream_entries([ 'duration', 'sample_rate' ]),
ffprobe_builder.show_entries([ 'duration', 'sample_rate', 'channels', 'nb_read_frames' ]),
ffprobe_builder.format_to_key_value(),
ffprobe_builder.set_input(audio_path)
)
process = run_ffprobe(commands)
output, _ = process.communicate()
if process.returncode == 0 and output:
duration = None
sample_rate = None
if output:
lines = output.decode().strip().split(os.linesep)
for line in lines:
if line.startswith('duration='):
duration = float(line.split('=')[1])
if line.startswith('sample_rate='):
sample_rate = int(line.split('=')[1])
if '=' in line:
key, value = line.split('=', 1)
audio_entries[key] = value
if duration and sample_rate:
return int(duration * sample_rate)
return audio_entries
def detect_audio_sample_rate(audio_path : str) -> Optional[int]:
audio_entries = get_audio_entries(audio_path)
sample_rate = audio_entries.get('sample_rate')
if sample_rate:
return int(sample_rate)
return None
def detect_audio_format(audio_path : str) -> Optional[str]:
commands = ffprobe_builder.chain(
ffprobe_builder.select_audio_stream(0),
ffprobe_builder.show_stream_entries([ 'codec_name' ]),
ffprobe_builder.format_to_value(),
ffprobe_builder.set_input(audio_path)
)
process = run_ffprobe(commands)
output, _ = process.communicate()
def detect_audio_channel_total(audio_path : str) -> Optional[int]:
audio_entries = get_audio_entries(audio_path)
audio_channel_total = audio_entries.get('channels')
if process.returncode == 0 and output:
return output.decode().strip()
if audio_channel_total:
return int(audio_channel_total)
return None
def detect_audio_frame_total(audio_path : str) -> Optional[int]:
audio_entries = get_audio_entries(audio_path)
audio_duration = audio_entries.get('duration')
audio_sample_rate = audio_entries.get('sample_rate')
if audio_duration and audio_sample_rate:
return int(float(audio_duration) * int(audio_sample_rate))
return None
+1 -5
View File
@@ -13,11 +13,7 @@ def chain(*commands : List[Command]) -> List[Command]:
return list(itertools.chain(*commands))
def select_audio_stream(index : int) -> List[Command]:
return [ '-select_streams', 'a:' + str(index) ]
def show_stream_entries(entries : List[str]) -> List[Command]:
def show_entries(entries : List[str]) -> List[Command]:
return [ '-show_entries', 'stream=' + ','.join(entries) ]
+2 -3
View File
@@ -175,10 +175,9 @@ MediaType = Literal['image', 'video', 'audio']
AudioMetadata = TypedDict('AudioMetadata',
{
'duration' : Duration,
'sample_rate': int,
'frame_total': int,
'channels': int,
'format': str
'sample_rate': int,
'channels': int
})
ImageMetadata = TypedDict('ImageMetadata',
{
+5 -11
View File
@@ -4,7 +4,7 @@ import pytest
from facefusion import process_manager
from facefusion.download import conditional_download
from facefusion.ffprobe import detect_audio_channel_total, detect_audio_format, detect_audio_frame_total, detect_audio_sample_rate
from facefusion.ffprobe import detect_audio_channel_total, detect_audio_frame_total, detect_audio_sample_rate
from .helper import get_test_example_file, get_test_examples_directory
@@ -15,28 +15,22 @@ def before_all() -> None:
[
'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/source.mp3'
])
subprocess.run([ 'ffmpeg', '-i', get_test_example_file('source.mp3'), '-t', '1.9', '-ar', '48000', '-ac', '2', get_test_example_file('source.wav') ])
subprocess.run([ 'ffmpeg', '-i', get_test_example_file('source.mp3'), '-t', '1.9', '-ar', '48000', '-ac', '2', get_test_example_file('test-audio-entries.wav') ])
def test_detect_audio_sample_rate() -> None:
assert detect_audio_sample_rate(get_test_example_file('source.mp3')) == 44100
assert detect_audio_sample_rate(get_test_example_file('source.wav')) == 44100
assert detect_audio_sample_rate(get_test_example_file('test-audio-entries.wav')) == 48000
assert detect_audio_sample_rate(get_test_example_file('invalid.mp3')) is None
def test_detect_audio_channel_total() -> None:
assert detect_audio_channel_total(get_test_example_file('source.mp3')) == 1
assert detect_audio_channel_total(get_test_example_file('source.wav')) == 1
assert detect_audio_channel_total(get_test_example_file('test-audio-entries.wav')) == 2
assert detect_audio_channel_total(get_test_example_file('invalid.mp3')) is None
def test_detect_audio_frame_total() -> None:
assert detect_audio_frame_total(get_test_example_file('source.mp3')) == 167039
assert detect_audio_frame_total(get_test_example_file('source.wav')) == 167039
assert detect_audio_frame_total(get_test_example_file('test-audio-entries.wav')) == 91200
assert detect_audio_frame_total(get_test_example_file('invalid.mp3')) is None
def test_detect_audio_format() -> None:
assert detect_audio_format(get_test_example_file('source.mp3')) == 'mp3'
assert detect_audio_format(get_test_example_file('source.wav')) == 'pcm_s16le'
assert detect_audio_format(get_test_example_file('invalid.mp3')) is None
+5 -14
View File
@@ -1,7 +1,7 @@
from shutil import which
from facefusion import ffprobe_builder
from facefusion.ffprobe_builder import chain, format_to_key_value, format_to_value, run, select_audio_stream, set_input, show_stream_entries
from facefusion.ffprobe_builder import chain, format_to_key_value, format_to_value, run, set_input, show_entries
def test_run() -> None:
@@ -10,24 +10,15 @@ def test_run() -> None:
def test_chain() -> None:
assert chain(
ffprobe_builder.select_audio_stream(0)
) == [ '-select_streams', 'a:0' ]
assert chain(
ffprobe_builder.select_audio_stream(0),
ffprobe_builder.show_stream_entries([ 'sample_rate' ]),
ffprobe_builder.show_entries([ 'sample_rate' ]),
ffprobe_builder.format_to_value(),
ffprobe_builder.set_input('audio.mp3')
) == [ '-select_streams', 'a:0', '-show_entries', 'stream=sample_rate', '-of', 'default=noprint_wrappers=1:nokey=1', 'audio.mp3' ]
def test_select_audio_stream() -> None:
assert select_audio_stream(0) == [ '-select_streams', 'a:0' ]
) == [ '-show_entries', 'stream=sample_rate', '-of', 'default=noprint_wrappers=1:nokey=1', 'audio.mp3' ]
def test_show_entries() -> None:
assert show_stream_entries([ 'sample_rate' ]) == [ '-show_entries', 'stream=sample_rate' ]
assert show_stream_entries([ 'channels' ]) == [ '-show_entries', 'stream=channels' ]
assert show_stream_entries([ 'duration', 'sample_rate' ]) == [ '-show_entries', 'stream=duration,sample_rate' ]
assert show_entries([ 'duration' ]) == [ '-show_entries', 'stream=duration' ]
assert show_entries([ 'duration', 'sample_rate']) == [ '-show_entries', 'stream=duration,sample_rate' ]
def test_format_to_value() -> None: