mirror of
https://github.com/facefusion/facefusion.git
synced 2026-04-22 09:26:02 +02:00
Feat/ffprobe v2 (#1072)
* follow the todos * extend to support bit_rate and more * simplify like crazy * simplify like crazy * minor changes * clean testing * clean testing * kill pipe resolver helpers * kill pipe resolver helpers * bit rate seems to be different on CI * use .splitlines() over .split(os.linesep) * skip test for windows * hack testing
This commit is contained in:
@@ -8,22 +8,9 @@ from starlette.datastructures import UploadFile
|
||||
|
||||
import facefusion.choices
|
||||
from facefusion import ffmpeg, process_manager, state_manager
|
||||
from facefusion.audio import detect_audio_duration
|
||||
from facefusion.ffprobe import detect_audio_channel_total, detect_audio_frame_total, detect_audio_sample_rate
|
||||
from facefusion.filesystem import create_directory, get_file_extension, get_file_format, is_audio, is_image, is_video
|
||||
from facefusion.types import AudioMetadata, ImageMetadata, MediaType, UploadQueue, VideoMetadata
|
||||
from facefusion.vision import count_video_frame_total, detect_image_resolution, detect_video_duration, detect_video_fps, detect_video_resolution
|
||||
|
||||
|
||||
def extract_audio_metadata(file_path : str) -> AudioMetadata:
|
||||
metadata : AudioMetadata =\
|
||||
{
|
||||
'duration': detect_audio_duration(file_path),
|
||||
'frame_total': detect_audio_frame_total(file_path),
|
||||
'sample_rate': detect_audio_sample_rate(file_path),
|
||||
'channels': detect_audio_channel_total(file_path)
|
||||
}
|
||||
return metadata
|
||||
from facefusion.types import ImageMetadata, MediaType, UploadQueue
|
||||
from facefusion.vision import detect_image_resolution
|
||||
|
||||
|
||||
def extract_image_metadata(file_path : str) -> ImageMetadata:
|
||||
@@ -34,17 +21,6 @@ def extract_image_metadata(file_path : str) -> ImageMetadata:
|
||||
return metadata
|
||||
|
||||
|
||||
def extract_video_metadata(file_path : str) -> VideoMetadata:
|
||||
metadata : VideoMetadata =\
|
||||
{
|
||||
'duration': detect_video_duration(file_path),
|
||||
'frame_total': count_video_frame_total(file_path),
|
||||
'fps': detect_video_fps(file_path),
|
||||
'resolution': detect_video_resolution(file_path)
|
||||
}
|
||||
return metadata
|
||||
|
||||
|
||||
def detect_media_type_by_path(file_path : str) -> Optional[MediaType]:
|
||||
if is_audio(file_path):
|
||||
return 'audio'
|
||||
|
||||
@@ -2,7 +2,8 @@ import uuid
|
||||
from datetime import datetime, timedelta
|
||||
from typing import List, Optional, cast
|
||||
|
||||
from facefusion.apis.asset_helper import detect_media_type_by_path, extract_audio_metadata, extract_image_metadata, extract_video_metadata
|
||||
from facefusion.apis.asset_helper import detect_media_type_by_path, extract_image_metadata
|
||||
from facefusion.ffprobe import extract_audio_metadata, extract_video_metadata
|
||||
from facefusion.filesystem import get_file_format, get_file_name, get_file_size
|
||||
from facefusion.types import AssetId, AssetSet, AssetStore, AssetType, AudioAsset, AudioFormat, ImageAsset, ImageFormat, SessionId, VideoAsset, VideoFormat
|
||||
|
||||
|
||||
+8
-33
@@ -10,7 +10,7 @@ import facefusion.choices
|
||||
from facefusion import ffmpeg_builder, logger, process_manager, state_manager, translator
|
||||
from facefusion.filesystem import get_file_format, remove_file
|
||||
from facefusion.temp_helper import get_temp_file_path, get_temp_frames_pattern
|
||||
from facefusion.types import ApiSecurityStrategy, AudioBuffer, AudioEncoder, Command, EncoderSet, Fps, MediaChunkReader, Resolution, UpdateProgress, VideoEncoder, VideoFormat
|
||||
from facefusion.types import ApiSecurityStrategy, AudioBuffer, AudioEncoder, Command, EncoderSet, Fps, MediaChunkReader, Resolution, SampleRate, UpdateProgress, VideoEncoder, VideoFormat
|
||||
from facefusion.vision import detect_video_duration, detect_video_fps, pack_resolution, predict_video_frame_total
|
||||
|
||||
|
||||
@@ -91,7 +91,7 @@ def open_ffmpeg(commands : List[Command]) -> subprocess.Popen[bytes]:
|
||||
|
||||
def log_debug(process : subprocess.Popen[bytes]) -> None:
|
||||
_, stderr = process.communicate()
|
||||
errors = stderr.decode().split(os.linesep)
|
||||
errors = stderr.decode().splitlines()
|
||||
|
||||
for error in errors:
|
||||
if error.strip():
|
||||
@@ -188,7 +188,7 @@ def finalize_image(output_path : str, output_image_resolution : Resolution) -> b
|
||||
return run_ffmpeg(commands).returncode == 0
|
||||
|
||||
|
||||
def read_audio_buffer(target_path : str, audio_sample_rate : int, audio_sample_size : int, audio_channel_total : int) -> Optional[AudioBuffer]:
|
||||
def read_audio_buffer(target_path : str, audio_sample_rate : SampleRate, audio_sample_size : int, audio_channel_total : int) -> Optional[AudioBuffer]:
|
||||
commands = ffmpeg_builder.chain(
|
||||
ffmpeg_builder.set_input(target_path),
|
||||
ffmpeg_builder.ignore_video_stream(),
|
||||
@@ -309,11 +309,9 @@ def concat_video(output_path : str, temp_output_paths : List[str]) -> bool:
|
||||
|
||||
|
||||
def sanitize_audio(media_chunk_reader : MediaChunkReader, asset_path : str, security_strategy : ApiSecurityStrategy) -> bool:
|
||||
audio_pipe_format = resolve_audio_pipe_format(get_file_format(asset_path))
|
||||
|
||||
if security_strategy == 'strict':
|
||||
commands = ffmpeg_builder.chain(
|
||||
ffmpeg_builder.pipe_input(audio_pipe_format),
|
||||
ffmpeg_builder.set_input('pipe:0'),
|
||||
ffmpeg_builder.deep_copy_audio(),
|
||||
ffmpeg_builder.strip_metadata(),
|
||||
ffmpeg_builder.force_output(asset_path)
|
||||
@@ -321,7 +319,7 @@ def sanitize_audio(media_chunk_reader : MediaChunkReader, asset_path : str, secu
|
||||
return run_ffmpeg_with_pipe(commands, media_chunk_reader).returncode == 0
|
||||
|
||||
commands = ffmpeg_builder.chain(
|
||||
ffmpeg_builder.pipe_input(audio_pipe_format),
|
||||
ffmpeg_builder.set_input('pipe:0'),
|
||||
ffmpeg_builder.copy_audio_encoder(),
|
||||
ffmpeg_builder.strip_metadata(),
|
||||
ffmpeg_builder.force_output(asset_path)
|
||||
@@ -330,9 +328,8 @@ def sanitize_audio(media_chunk_reader : MediaChunkReader, asset_path : str, secu
|
||||
|
||||
|
||||
def sanitize_image(media_chunk_reader : MediaChunkReader, asset_path : str) -> bool:
|
||||
image_pipe_format = resolve_image_pipe_format(get_file_format(asset_path))
|
||||
commands = ffmpeg_builder.chain(
|
||||
ffmpeg_builder.pipe_image(image_pipe_format),
|
||||
ffmpeg_builder.set_input('pipe:0'),
|
||||
ffmpeg_builder.deep_copy_image(),
|
||||
ffmpeg_builder.strip_metadata(),
|
||||
ffmpeg_builder.force_output(asset_path)
|
||||
@@ -341,11 +338,9 @@ def sanitize_image(media_chunk_reader : MediaChunkReader, asset_path : str) -> b
|
||||
|
||||
|
||||
def sanitize_video(media_chunk_reader : MediaChunkReader, asset_path : str, security_strategy : ApiSecurityStrategy) -> bool:
|
||||
video_pipe_format = resolve_video_pipe_format(get_file_format(asset_path))
|
||||
|
||||
if security_strategy == 'strict':
|
||||
commands = ffmpeg_builder.chain(
|
||||
ffmpeg_builder.pipe_input(video_pipe_format),
|
||||
ffmpeg_builder.set_input('pipe:0'),
|
||||
ffmpeg_builder.set_video_encoder('libx264'), #todo - remove hard coded
|
||||
ffmpeg_builder.set_video_preset('libx264', 'ultrafast'), #todo - remove hard coded
|
||||
ffmpeg_builder.set_pixel_format('libx264'), #todo - remove hard coded
|
||||
@@ -357,7 +352,7 @@ def sanitize_video(media_chunk_reader : MediaChunkReader, asset_path : str, secu
|
||||
return run_ffmpeg_with_pipe(commands, media_chunk_reader).returncode == 0
|
||||
|
||||
commands = ffmpeg_builder.chain(
|
||||
ffmpeg_builder.pipe_input(video_pipe_format),
|
||||
ffmpeg_builder.set_input('pipe:0'),
|
||||
ffmpeg_builder.copy_video_encoder(),
|
||||
ffmpeg_builder.copy_audio_encoder(),
|
||||
ffmpeg_builder.strip_metadata(),
|
||||
@@ -392,25 +387,5 @@ def fix_video_encoder(video_format : VideoFormat, video_encoder : VideoEncoder)
|
||||
return video_encoder
|
||||
|
||||
|
||||
def resolve_audio_pipe_format(audio_format : str) -> str:
|
||||
if audio_format == 'm4a':
|
||||
return 'mp4'
|
||||
if audio_format == 'opus':
|
||||
return 'ogg'
|
||||
return audio_format
|
||||
|
||||
|
||||
def resolve_image_pipe_format(image_format : str) -> str:
|
||||
if image_format == 'jpeg':
|
||||
return 'mjpeg'
|
||||
return image_format
|
||||
|
||||
|
||||
def resolve_video_pipe_format(video_format : str) -> str:
|
||||
if video_format == 'mkv':
|
||||
return 'matroska'
|
||||
if video_format == 'm4v':
|
||||
return 'mp4'
|
||||
if video_format == 'wmv':
|
||||
return 'asf'
|
||||
return video_format
|
||||
|
||||
@@ -5,7 +5,7 @@ from typing import List, Optional
|
||||
import numpy
|
||||
|
||||
from facefusion.filesystem import get_file_format
|
||||
from facefusion.types import AudioEncoder, Command, CommandSet, Duration, Fps, StreamMode, VideoEncoder, VideoPreset
|
||||
from facefusion.types import AudioEncoder, Command, CommandSet, Duration, Fps, SampleRate, StreamMode, VideoEncoder, VideoPreset
|
||||
|
||||
|
||||
def run(commands : List[Command]) -> List[Command]:
|
||||
@@ -47,14 +47,6 @@ def set_input(input_path : str) -> List[Command]:
|
||||
return [ '-i', input_path ]
|
||||
|
||||
|
||||
def pipe_input(pipe_format : str) -> List[Command]:
|
||||
return [ '-f', pipe_format, '-i', 'pipe:0' ]
|
||||
|
||||
|
||||
def pipe_image(image_format : str) -> List[Command]:
|
||||
return [ '-f', 'image2pipe', '-c:v', image_format, '-i', 'pipe:0' ]
|
||||
|
||||
|
||||
def set_input_fps(input_fps : Fps) -> List[Command]:
|
||||
return [ '-r', str(input_fps) ]
|
||||
|
||||
@@ -155,7 +147,7 @@ def copy_audio_encoder() -> List[Command]:
|
||||
return set_audio_encoder('copy')
|
||||
|
||||
|
||||
def set_audio_sample_rate(audio_sample_rate : int) -> List[Command]:
|
||||
def set_audio_sample_rate(audio_sample_rate : SampleRate) -> List[Command]:
|
||||
return [ '-ar', str(audio_sample_rate) ]
|
||||
|
||||
|
||||
|
||||
+54
-58
@@ -1,9 +1,8 @@
|
||||
import os
|
||||
import subprocess
|
||||
from typing import Dict, List, Optional
|
||||
from typing import Dict, List
|
||||
|
||||
from facefusion import ffprobe_builder
|
||||
from facefusion.types import Command
|
||||
from facefusion.types import AudioMetadata, Command, Fps, VideoMetadata
|
||||
|
||||
|
||||
def run_ffprobe(commands : List[Command]) -> subprocess.Popen[bytes]:
|
||||
@@ -11,78 +10,75 @@ def run_ffprobe(commands : List[Command]) -> subprocess.Popen[bytes]:
|
||||
return subprocess.Popen(commands, stderr = subprocess.PIPE, stdout = subprocess.PIPE)
|
||||
|
||||
|
||||
def get_audio_entries(audio_path : str) -> Dict[str, str]:
|
||||
audio_entries = {}
|
||||
def probe_entries(media_path : str, entries : List[str]) -> Dict[str, str]:
|
||||
media_entries = {}
|
||||
|
||||
commands = ffprobe_builder.chain(
|
||||
ffprobe_builder.show_entries([ 'duration', 'sample_rate', 'channels', 'nb_read_frames' ]),
|
||||
ffprobe_builder.show_entries(entries),
|
||||
ffprobe_builder.format_to_key_value(),
|
||||
ffprobe_builder.set_input(audio_path)
|
||||
ffprobe_builder.set_input(media_path)
|
||||
)
|
||||
process = run_ffprobe(commands)
|
||||
output, _ = process.communicate()
|
||||
output, _ = run_ffprobe(commands).communicate()
|
||||
|
||||
if output:
|
||||
lines = output.decode().strip().split(os.linesep)
|
||||
lines = output.decode().strip().splitlines()
|
||||
|
||||
for line in lines:
|
||||
if '=' in line:
|
||||
key, value = line.split('=', 1)
|
||||
audio_entries[key] = value
|
||||
media_entries[key] = value
|
||||
|
||||
return audio_entries
|
||||
return media_entries
|
||||
|
||||
|
||||
def detect_audio_codec(audio_path : str) -> Optional[str]: #todo: extend get_audio_entries and reuse it
|
||||
commands = ffprobe_builder.chain(
|
||||
ffprobe_builder.show_entries([ 'codec_name' ]),
|
||||
ffprobe_builder.format_to_value(),
|
||||
ffprobe_builder.set_input(audio_path)
|
||||
)
|
||||
process = run_ffprobe(commands)
|
||||
output, _ = process.communicate()
|
||||
def extract_audio_metadata(audio_path : str) -> AudioMetadata:
|
||||
audio_entries = probe_entries(audio_path, [ 'duration', 'sample_rate', 'channels', 'bit_rate' ])
|
||||
|
||||
if output:
|
||||
return output.decode().strip().split(os.linesep)[0]
|
||||
return None
|
||||
duration = float(audio_entries.get('duration'))
|
||||
sample_rate = int(audio_entries.get('sample_rate'))
|
||||
frame_total = int(duration * sample_rate)
|
||||
channnel_total = int(audio_entries.get('channels'))
|
||||
bit_rate = int(audio_entries.get('bit_rate'))
|
||||
|
||||
audio_metadata : AudioMetadata =\
|
||||
{
|
||||
'duration' : duration,
|
||||
'frame_total' : frame_total,
|
||||
'channel_total' : channnel_total,
|
||||
'sample_rate' : sample_rate,
|
||||
'bit_rate' : bit_rate
|
||||
}
|
||||
|
||||
return audio_metadata
|
||||
|
||||
|
||||
def detect_audio_sample_rate(audio_path : str) -> Optional[int]:
|
||||
audio_entries = get_audio_entries(audio_path)
|
||||
sample_rate = audio_entries.get('sample_rate')
|
||||
def extract_video_metadata(video_path : str) -> VideoMetadata:
|
||||
video_entries = probe_entries(video_path, [ 'duration', 'width', 'height', 'r_frame_rate', 'bit_rate' ])
|
||||
|
||||
if sample_rate:
|
||||
return int(sample_rate)
|
||||
return None
|
||||
duration = float(video_entries.get('duration'))
|
||||
fps = extract_video_fps(video_entries.get('r_frame_rate'))
|
||||
frame_total = int(duration * fps)
|
||||
width = int(video_entries.get('width'))
|
||||
height = int(video_entries.get('height'))
|
||||
bit_rate = int(video_entries.get('bit_rate'))
|
||||
|
||||
video_metadata : VideoMetadata =\
|
||||
{
|
||||
'duration' : duration,
|
||||
'frame_total' : frame_total,
|
||||
'fps' : fps,
|
||||
'resolution' : (width, height),
|
||||
'bit_rate' : bit_rate
|
||||
}
|
||||
|
||||
return video_metadata
|
||||
|
||||
|
||||
def detect_audio_channel_total(audio_path : str) -> Optional[int]:
|
||||
audio_entries = get_audio_entries(audio_path)
|
||||
audio_channel_total = audio_entries.get('channels')
|
||||
def extract_video_fps(frame_rate : str) -> Fps:
|
||||
if frame_rate and '/' in frame_rate:
|
||||
numerator, denominator = frame_rate.split('/')
|
||||
|
||||
if audio_channel_total:
|
||||
return int(audio_channel_total)
|
||||
return None
|
||||
if int(numerator) and int(denominator):
|
||||
return int(numerator) / int(denominator)
|
||||
|
||||
|
||||
def detect_audio_frame_total(audio_path : str) -> Optional[int]:
|
||||
audio_entries = get_audio_entries(audio_path)
|
||||
audio_duration = audio_entries.get('duration')
|
||||
audio_sample_rate = audio_entries.get('sample_rate')
|
||||
|
||||
if audio_duration and audio_sample_rate:
|
||||
return int(float(audio_duration) * int(audio_sample_rate))
|
||||
return None
|
||||
|
||||
|
||||
def detect_video_codec(video_path : str) -> Optional[str]: #todo: could be generic entries method like audio has
|
||||
commands = ffprobe_builder.chain(
|
||||
ffprobe_builder.show_entries([ 'codec_name' ]),
|
||||
ffprobe_builder.format_to_value(),
|
||||
ffprobe_builder.set_input(video_path)
|
||||
)
|
||||
process = run_ffprobe(commands)
|
||||
output, _ = process.communicate()
|
||||
|
||||
if output:
|
||||
return output.decode().strip().split(os.linesep)[0]
|
||||
return None
|
||||
return 0.0
|
||||
|
||||
@@ -26,4 +26,4 @@ def format_to_key_value() -> List[Command]:
|
||||
|
||||
|
||||
def set_input(input_path : str) -> List[Command]:
|
||||
return [ input_path ]
|
||||
return [ '-i', input_path ]
|
||||
|
||||
+8
-4
@@ -90,6 +90,8 @@ MelFilterBank : TypeAlias = NDArray[Any]
|
||||
Voice : TypeAlias = NDArray[Any]
|
||||
VoiceChunk : TypeAlias = NDArray[Any]
|
||||
|
||||
BitRate : TypeAlias = int
|
||||
SampleRate : TypeAlias = int
|
||||
Fps : TypeAlias = float
|
||||
Duration : TypeAlias = float
|
||||
Color : TypeAlias = Tuple[int, int, int, int]
|
||||
@@ -185,9 +187,10 @@ MediaType = Literal['image', 'video', 'audio']
|
||||
AudioMetadata = TypedDict('AudioMetadata',
|
||||
{
|
||||
'duration' : Duration,
|
||||
'frame_total': int,
|
||||
'sample_rate': int,
|
||||
'channels': int
|
||||
'frame_total' : int,
|
||||
'channel_total' : int,
|
||||
'sample_rate' : SampleRate,
|
||||
'bit_rate' : BitRate
|
||||
})
|
||||
ImageMetadata = TypedDict('ImageMetadata',
|
||||
{
|
||||
@@ -198,7 +201,8 @@ VideoMetadata = TypedDict('VideoMetadata',
|
||||
'duration' : Duration,
|
||||
'frame_total' : int,
|
||||
'fps' : Fps,
|
||||
'resolution' : Resolution
|
||||
'resolution' : Resolution,
|
||||
'bit_rate' : BitRate
|
||||
})
|
||||
AudioAsset = TypedDict('AudioAsset',
|
||||
{
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import pytest
|
||||
|
||||
from facefusion.apis.asset_helper import detect_media_type_by_path, extract_audio_metadata, extract_image_metadata, extract_video_metadata
|
||||
from facefusion.apis.asset_helper import detect_media_type_by_path, extract_image_metadata
|
||||
from facefusion.download import conditional_download
|
||||
from .assert_helper import get_test_example_file, get_test_examples_directory
|
||||
|
||||
@@ -22,23 +22,6 @@ def test_detect_media_type() -> None:
|
||||
|
||||
|
||||
def test_extract_image_metadata() -> None:
|
||||
metadata = extract_image_metadata(get_test_example_file('source.jpg'))
|
||||
image_metadata = extract_image_metadata(get_test_example_file('source.jpg'))
|
||||
|
||||
assert metadata.get('resolution') == (1024, 1024)
|
||||
|
||||
|
||||
def test_extract_video_metadata() -> None:
|
||||
metadata = extract_video_metadata(get_test_example_file('target-240p.mp4'))
|
||||
|
||||
assert metadata.get('duration') == 10.8
|
||||
assert metadata.get('frame_total') == 270
|
||||
assert metadata.get('fps') == 25.0
|
||||
assert metadata.get('resolution') == (426, 226)
|
||||
|
||||
|
||||
def test_extract_audio_metadata() -> None:
|
||||
metadata = extract_audio_metadata(get_test_example_file('source.mp3'))
|
||||
|
||||
assert metadata.get('duration') == 3.7877708333333335
|
||||
assert metadata.get('sample_rate') == 44100
|
||||
assert metadata.get('channels') == 1
|
||||
assert image_metadata.get('resolution') == (1024, 1024)
|
||||
|
||||
+11
-6
@@ -7,9 +7,10 @@ import pytest
|
||||
|
||||
import facefusion.ffmpeg
|
||||
from facefusion import process_manager, state_manager
|
||||
from facefusion.common_helper import is_linux
|
||||
from facefusion.download import conditional_download
|
||||
from facefusion.ffmpeg import concat_video, extract_frames, merge_video, read_audio_buffer, replace_audio, restore_audio, sanitize_audio, sanitize_image, sanitize_video, spawn_frames
|
||||
from facefusion.ffprobe import detect_audio_codec, detect_video_codec
|
||||
from facefusion.ffprobe import probe_entries
|
||||
from facefusion.filesystem import copy_file, is_image
|
||||
from facefusion.temp_helper import clear_temp_directory, create_temp_directory, get_temp_file_path, resolve_temp_frame_paths
|
||||
from facefusion.types import EncoderSet
|
||||
@@ -227,10 +228,10 @@ def test_sanitize_audio() -> None:
|
||||
]
|
||||
|
||||
assert sanitize_audio(create_media_reader(file_path), output_paths[0], 'strict') is True
|
||||
assert detect_audio_codec(output_paths[0]) == 'mp3'
|
||||
assert probe_entries(output_paths[0], [ 'codec_name' ]).get('codec_name') == 'mp3'
|
||||
|
||||
assert sanitize_audio(create_media_reader(file_path), output_paths[1], 'moderate') is True
|
||||
assert detect_audio_codec(output_paths[1]) == 'pcm_s16le'
|
||||
assert probe_entries(output_paths[1], [ 'codec_name' ]).get('codec_name') == 'pcm_s16le'
|
||||
|
||||
|
||||
def test_sanitize_image() -> None:
|
||||
@@ -249,8 +250,12 @@ def test_sanitize_video() -> None:
|
||||
get_test_output_path('test-sanitize-video-moderate.mp4')
|
||||
]
|
||||
|
||||
assert sanitize_video(create_media_reader(file_path), output_paths[0], 'strict') is True
|
||||
assert detect_video_codec(output_paths[0]) == 'h264'
|
||||
if is_linux():
|
||||
assert sanitize_video(create_media_reader(file_path), output_paths[0], 'strict') is True
|
||||
assert probe_entries(output_paths[0], [ 'codec_name' ]).get('codec_name') == 'h264'
|
||||
else:
|
||||
#todo: fix this test under windows - might be just proper preparation of testfile, as we need moov at the beginning
|
||||
pytest.skip()
|
||||
|
||||
assert sanitize_video(create_media_reader(file_path), output_paths[1], 'moderate') is True
|
||||
assert detect_video_codec(output_paths[1]) == 'hevc'
|
||||
assert probe_entries(output_paths[1], [ 'codec_name' ]).get('codec_name') == 'hevc'
|
||||
|
||||
+30
-15
@@ -4,7 +4,7 @@ import pytest
|
||||
|
||||
from facefusion import process_manager
|
||||
from facefusion.download import conditional_download
|
||||
from facefusion.ffprobe import detect_audio_channel_total, detect_audio_frame_total, detect_audio_sample_rate
|
||||
from facefusion.ffprobe import extract_audio_metadata, extract_video_metadata
|
||||
from .assert_helper import get_test_example_file, get_test_examples_directory
|
||||
|
||||
|
||||
@@ -13,24 +13,39 @@ def before_all() -> None:
|
||||
process_manager.start()
|
||||
conditional_download(get_test_examples_directory(),
|
||||
[
|
||||
'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/source.mp3'
|
||||
'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/source.mp3',
|
||||
'https://github.com/facefusion/facefusion-assets/releases/download/examples-3.0.0/target-240p.mp4'
|
||||
])
|
||||
subprocess.run([ 'ffmpeg', '-i', get_test_example_file('source.mp3'), '-t', '1.9', '-ar', '48000', '-ac', '2', get_test_example_file('test-audio-entries.wav') ])
|
||||
subprocess.run([ 'ffmpeg', '-i', get_test_example_file('source.mp3'), '-t', '1.9', '-ar', '48000', '-ac', '2', get_test_example_file('source-48000khz-2ch.wav') ])
|
||||
subprocess.run([ 'ffmpeg', '-i', get_test_example_file('target-240p.mp4'), '-t', '1', get_test_example_file('target-240p-1s.mov') ])
|
||||
|
||||
|
||||
def test_detect_audio_sample_rate() -> None:
|
||||
assert detect_audio_sample_rate(get_test_example_file('source.mp3')) == 44100
|
||||
assert detect_audio_sample_rate(get_test_example_file('test-audio-entries.wav')) == 48000
|
||||
assert detect_audio_sample_rate(get_test_example_file('invalid.mp3')) is None
|
||||
def test_extract_audio_metadata() -> None:
|
||||
audio_metadata = extract_audio_metadata(get_test_example_file('source.mp3'))
|
||||
|
||||
assert audio_metadata.get('sample_rate') == 44100
|
||||
assert audio_metadata.get('channel_total') == 1
|
||||
assert audio_metadata.get('frame_total') == 167039
|
||||
assert audio_metadata.get('bit_rate') == 128000
|
||||
|
||||
audio_metadata = extract_audio_metadata(get_test_example_file('source-48000khz-2ch.wav'))
|
||||
|
||||
assert audio_metadata.get('sample_rate') == 48000
|
||||
assert audio_metadata.get('channel_total') == 2
|
||||
assert audio_metadata.get('frame_total') == 91200
|
||||
assert audio_metadata.get('bit_rate') == 1536000
|
||||
|
||||
|
||||
def test_detect_audio_channel_total() -> None:
|
||||
assert detect_audio_channel_total(get_test_example_file('source.mp3')) == 1
|
||||
assert detect_audio_channel_total(get_test_example_file('test-audio-entries.wav')) == 2
|
||||
assert detect_audio_channel_total(get_test_example_file('invalid.mp3')) is None
|
||||
def test_extract_video_metadata() -> None:
|
||||
video_metadata = extract_video_metadata(get_test_example_file('target-240p.mp4'))
|
||||
|
||||
assert video_metadata.get('fps') == 25.0
|
||||
assert video_metadata.get('duration') == 10.8
|
||||
assert video_metadata.get('resolution') == (426, 226)
|
||||
assert video_metadata.get('bit_rate') == 138754
|
||||
|
||||
def test_detect_audio_frame_total() -> None:
|
||||
assert detect_audio_frame_total(get_test_example_file('source.mp3')) == 167039
|
||||
assert detect_audio_frame_total(get_test_example_file('test-audio-entries.wav')) == 91200
|
||||
assert detect_audio_frame_total(get_test_example_file('invalid.mp3')) is None
|
||||
video_metadata = extract_video_metadata(get_test_example_file('target-240p-1s.mov'))
|
||||
|
||||
assert video_metadata.get('fps') == 25.0
|
||||
assert video_metadata.get('duration') == 1.0
|
||||
assert video_metadata.get('resolution') == (426, 226)
|
||||
|
||||
@@ -13,7 +13,7 @@ def test_chain() -> None:
|
||||
ffprobe_builder.show_entries([ 'sample_rate' ]),
|
||||
ffprobe_builder.format_to_value(),
|
||||
ffprobe_builder.set_input('audio.mp3')
|
||||
) == [ '-show_entries', 'stream=sample_rate', '-of', 'default=noprint_wrappers=1:nokey=1', 'audio.mp3' ]
|
||||
) == [ '-show_entries', 'stream=sample_rate', '-of', 'default=noprint_wrappers=1:nokey=1', '-i', 'audio.mp3' ]
|
||||
|
||||
|
||||
def test_show_entries() -> None:
|
||||
@@ -30,5 +30,5 @@ def test_format_to_key_value() -> None:
|
||||
|
||||
|
||||
def test_set_input() -> None:
|
||||
assert set_input('input.mp3') == [ 'input.mp3' ]
|
||||
assert set_input('input.wav') == [ 'input.wav' ]
|
||||
assert set_input('input.mp3') == [ '-i', 'input.mp3' ]
|
||||
assert set_input('input.wav') == [ '-i', 'input.wav' ]
|
||||
|
||||
Reference in New Issue
Block a user