From 3a2127eb6337dc1b5c1c46d3ae578f607deb9291 Mon Sep 17 00:00:00 2001 From: Harisreedhar <46858047+harisreedhar@users.noreply.github.com> Date: Sun, 28 Jan 2024 16:55:49 +0530 Subject: [PATCH] Audio functions (#345) * Update ffmpeg.py * Create audio.py * Update ffmpeg.py * Update audio.py * Update audio.py * Update typing.py * Update ffmpeg.py * Update audio.py --- facefusion/audio.py | 72 ++++++++++++++++++++++++++++++++++++++++++++ facefusion/ffmpeg.py | 20 ++++++++++-- facefusion/typing.py | 5 +++ 3 files changed, 95 insertions(+), 2 deletions(-) create mode 100644 facefusion/audio.py diff --git a/facefusion/audio.py b/facefusion/audio.py new file mode 100644 index 00000000..c529236e --- /dev/null +++ b/facefusion/audio.py @@ -0,0 +1,72 @@ +from typing import Optional, Any, List + +import numpy +import scipy +from functools import lru_cache +from facefusion.ffmpeg import read_audio_buffer +from facefusion.typing import Fps, Audio, Spectrogram, AudioFrame + + +def get_audio_frame(audio_path : str, fps : Fps, frame_number : int = 0) -> Optional[AudioFrame]: + if audio_path: + audio_frames = read_static_audio(audio_path, fps) + if frame_number < len(audio_frames): + return audio_frames[frame_number] + return None + + +@lru_cache(maxsize = None) +def read_static_audio(audio_path : str, fps : Fps) -> List[AudioFrame]: + audio_buffer = read_audio_buffer(audio_path, 16000, 2) + audio = numpy.frombuffer(audio_buffer, dtype = numpy.int16).reshape(-1, 2) + audio = normalize_audio(audio) + audio = filter_audio(audio, -0.97) + spectrogram = create_spectrogram(audio, 16000, 80, 800, 55.0, 7600.0) + audio_frames = extract_audio_frames(spectrogram, 80, 16, fps) + return audio_frames + + +def normalize_audio(audio : numpy.ndarray[Any, Any]) -> Audio: + if audio.ndim > 1: + audio = numpy.mean(audio, axis = 1) + audio = audio / numpy.max(numpy.abs(audio), axis = 0) + return audio + + +def filter_audio(audio : Audio, filter_coefficient: float) -> Audio: + audio = scipy.signal.lfilter([1.0, filter_coefficient], [1.0], audio) + return audio + + +def convert_hertz_to_mel(hertz : float) -> float: + return 2595 * numpy.log10(1 + hertz / 700) + + +def convert_mel_to_hertz(mel : numpy.ndarray[Any, Any]) -> numpy.ndarray[Any, Any]: + return 700 * (10 ** (mel / 2595) - 1) + + +@lru_cache(maxsize = None) +def create_static_mel_filter(sample_rate : int, filter_total : int, filter_size : int, frequency_minimum : float, frequency_maximum : float) -> numpy.ndarray[Any, Any]: + mel_filter = numpy.zeros((filter_total, filter_size // 2 + 1)) + mel_bins = numpy.linspace(convert_hertz_to_mel(frequency_minimum), convert_hertz_to_mel(frequency_maximum), filter_total + 2) + indices = numpy.floor((filter_size + 1) * convert_mel_to_hertz(mel_bins) / sample_rate).astype(numpy.int16) + for index in range(filter_total): + mel_filter[index, indices[index]: indices[index + 1]] = scipy.signal.windows.triang(indices[index + 1] - indices[index]) + return mel_filter + + +def create_spectrogram(audio : Audio, sample_rate : int, filter_total : int, filter_size : int, frequency_minimum : float, frequency_maximum : float) -> Spectrogram: + mel_filter = create_static_mel_filter(sample_rate, filter_total, filter_size, frequency_minimum, frequency_maximum) + spectrogram = scipy.signal.stft(audio, nperseg = filter_size, noverlap = 600, nfft = filter_size)[2] + spectrogram = numpy.dot(mel_filter, numpy.abs(spectrogram)) + return spectrogram + + +def extract_audio_frames(spectrogram: Spectrogram, filter_total: int, audio_frame_step: int, fps: Fps) -> List[AudioFrame]: + indices = numpy.arange(0, spectrogram.shape[1], filter_total / fps).astype(numpy.int16) + indices = indices[indices >= audio_frame_step] + audio_frames = [] + for index in indices: + audio_frames.append(spectrogram[:, max(0, index - audio_frame_step) : index]) + return audio_frames diff --git a/facefusion/ffmpeg.py b/facefusion/ffmpeg.py index 9da029d3..68cafcbe 100644 --- a/facefusion/ffmpeg.py +++ b/facefusion/ffmpeg.py @@ -3,7 +3,7 @@ import subprocess import facefusion.globals from facefusion import logger -from facefusion.typing import OutputVideoPreset, Fps +from facefusion.typing import OutputVideoPreset, Fps, AudioBuffer from facefusion.filesystem import get_temp_frames_pattern, get_temp_output_video_path @@ -21,7 +21,7 @@ def run_ffmpeg(args : List[str]) -> bool: def open_ffmpeg(args : List[str]) -> subprocess.Popen[bytes]: commands = [ 'ffmpeg', '-hide_banner', '-loglevel', 'error' ] commands.extend(args) - return subprocess.Popen(commands, stdin = subprocess.PIPE) + return subprocess.Popen(commands, stdin = subprocess.PIPE, stdout = subprocess.PIPE) def extract_frames(target_path : str, video_resolution : str, video_fps : Fps) -> bool: @@ -80,6 +80,22 @@ def restore_audio(target_path : str, output_path : str, video_fps : Fps) -> bool return run_ffmpeg(commands) +def read_audio_buffer(target_path : str, sample_rate : int, channel_total : int) -> Optional[AudioBuffer]: + commands = [ '-i', target_path, '-vn', '-f', 's16le', '-acodec', 'pcm_s16le', '-ar', str(sample_rate), '-ac', str(channel_total), '-' ] + process = open_ffmpeg(commands) + audio_buffer, error = process.communicate() + if process.returncode == 0: + return audio_buffer + logger.debug(error.decode().strip(), __name__.upper()) + return None + + +def replace_audio(target_path : str, audio_path : str, output_path : str) -> bool: + temp_output_path = get_temp_output_video_path(target_path) + commands = [ '-i', temp_output_path, '-i', audio_path, '-c:v', 'copy', '-af', 'apad', '-shortest', '-map', '0:v:0', '-map', '1:a:0', '-y', output_path ] + return run_ffmpeg(commands) + + def map_nvenc_preset(output_video_preset : OutputVideoPreset) -> Optional[str]: if output_video_preset in [ 'ultrafast', 'superfast', 'veryfast' ]: return 'p1' diff --git a/facefusion/typing.py b/facefusion/typing.py index 2b6a054a..e0f0c58b 100755 --- a/facefusion/typing.py +++ b/facefusion/typing.py @@ -26,6 +26,11 @@ Frame = numpy.ndarray[Any, Any] Mask = numpy.ndarray[Any, Any] Matrix = numpy.ndarray[Any, Any] +AudioBuffer = bytes +Audio = numpy.ndarray[Any, Any] +AudioFrame = numpy.ndarray[Any, Any] +Spectrogram = numpy.ndarray[Any, Any] + Fps = float Padding = Tuple[int, int, int, int] Resolution = Tuple[int, int]