diff --git a/modules/core.py b/modules/core.py index c98b7ba..1231d80 100644 --- a/modules/core.py +++ b/modules/core.py @@ -22,7 +22,7 @@ import tensorflow import modules.globals import modules.metadata import modules.ui as ui -from modules.processors.frame.core import get_frame_processors_modules +from modules.processors.frame.core import get_frame_processors_modules, process_video_in_memory from modules.utilities import has_image_extension, is_image, is_video, detect_fps, create_video, extract_frames, get_temp_frame_paths, restore_audio, create_temp, move_temp, clean_temp, normalize_output_path if HAS_TORCH and 'ROCMExecutionProvider' in modules.globals.execution_providers: @@ -222,44 +222,70 @@ def start() -> None: if modules.globals.nsfw_filter and ui.check_and_ignore_nsfw(modules.globals.target_path, destroy): return - extraction_start = time.time() - if not modules.globals.map_faces: - update_status('Creating temp resources...') - create_temp(modules.globals.target_path) - update_status('Extracting frames...') - extract_frames(modules.globals.target_path) - extraction_time = time.time() - extraction_start - update_status(f'Frame extraction completed in {extraction_time:.2f}s') - - temp_frame_paths = get_temp_frame_paths(modules.globals.target_path) - total_frames = len(temp_frame_paths) - update_status(f'Processing {total_frames} frames with {modules.globals.execution_threads} threads...') - - processing_start = time.time() - for frame_processor in get_frame_processors_modules(modules.globals.frame_processors): - update_status('Progressing...', frame_processor.NAME) - frame_processor.process_video(modules.globals.source_path, temp_frame_paths) - release_resources() - processing_time = time.time() - processing_start - fps_processing = total_frames / processing_time if processing_time > 0 else 0 - update_status(f'Frame processing completed in {processing_time:.2f}s ({fps_processing:.2f} fps)') - - # handles fps - encoding_start = time.time() + # Detect FPS early (needed by both pipelines) if modules.globals.keep_fps: update_status('Detecting fps...') fps = detect_fps(modules.globals.target_path) + else: + fps = 30.0 + + video_created = False + + # --- In-memory pipeline (non-map_faces only) --- + # Reads frames from FFmpeg pipe, processes in memory, encodes directly. + # Eliminates all per-frame PNG disk I/O for a major speed-up. + if not modules.globals.map_faces: + update_status(f'Processing video in-memory at {fps} fps...') + create_temp(modules.globals.target_path) + + processing_start = time.time() + video_created = process_video_in_memory( + modules.globals.source_path, + modules.globals.target_path, + fps, + ) + processing_time = time.time() - processing_start + release_resources() + + if video_created: + update_status(f'In-memory processing + encoding completed in {processing_time:.2f}s') + + # --- Disk-based fallback (required for map_faces, or if pipe failed) --- + if not video_created: + if not modules.globals.map_faces: + update_status('Falling back to disk-based processing...') + + extraction_start = time.time() + if not modules.globals.map_faces: + create_temp(modules.globals.target_path) + update_status('Extracting frames...') + extract_frames(modules.globals.target_path) + extraction_time = time.time() - extraction_start + + temp_frame_paths = get_temp_frame_paths(modules.globals.target_path) + total_frames = len(temp_frame_paths) + update_status(f'Processing {total_frames} frames with {modules.globals.execution_threads} threads...') + + processing_start = time.time() + for frame_processor in get_frame_processors_modules(modules.globals.frame_processors): + update_status('Progressing...', frame_processor.NAME) + frame_processor.process_video(modules.globals.source_path, temp_frame_paths) + release_resources() + processing_time = time.time() - processing_start + fps_processing = total_frames / processing_time if processing_time > 0 else 0 + update_status(f'Frame processing completed in {processing_time:.2f}s ({fps_processing:.2f} fps)') + + encoding_start = time.time() update_status(f'Creating video with {fps} fps...') video_created = create_video(modules.globals.target_path, fps) - else: - update_status('Creating video with 30.0 fps...') - video_created = create_video(modules.globals.target_path) - encoding_time = time.time() - encoding_start + encoding_time = time.time() - encoding_start + if video_created: + update_status(f'Video encoding completed in {encoding_time:.2f}s') + if not video_created: update_status('Video encoding failed. No temporary output video was created.') clean_temp(modules.globals.target_path) return - update_status(f'Video encoding completed in {encoding_time:.2f}s') # handle audio if modules.globals.keep_audio: diff --git a/modules/processors/frame/core.py b/modules/processors/frame/core.py index 80dd2c0..6e51e5b 100644 --- a/modules/processors/frame/core.py +++ b/modules/processors/frame/core.py @@ -1,12 +1,16 @@ +import os +import subprocess import sys import importlib from concurrent.futures import ThreadPoolExecutor from types import ModuleType from typing import Any, List, Callable + +import numpy as np from tqdm import tqdm import modules -import modules.globals +import modules.globals FRAME_PROCESSORS_MODULES: List[ModuleType] = [] FRAME_PROCESSORS_INTERFACE = [ @@ -107,3 +111,265 @@ def process_video(source_path: str, frame_paths: list[str], process_frames: Call with tqdm(total=total, desc='Processing', unit='frame', dynamic_ncols=True, bar_format=progress_bar_format) as progress: progress.set_postfix({'execution_providers': modules.globals.execution_providers, 'execution_threads': modules.globals.execution_threads, 'max_memory': modules.globals.max_memory}) multi_process_frame(source_path, frame_paths, process_frames, progress) + + +def process_video_in_memory(source_path: str, target_path: str, fps: float) -> bool: + """Process video frames in-memory using FFmpeg pipes, eliminating disk I/O. + + Reads raw frames from the source video via an FFmpeg decoder pipe, runs each + frame through all active frame processors sequentially, and writes the + result directly to an FFmpeg encoder pipe. This avoids extracting frames to + PNG on disk, which is the biggest I/O bottleneck in the disk-based pipeline. + + Returns True on success, False on failure (caller should fall back to the + disk-based pipeline). + """ + import cv2 + from modules.face_analyser import get_one_face + from modules.utilities import ( + get_video_dimensions, + estimate_frame_count, + get_temp_output_path, + ) + + temp_output_path = get_temp_output_path(target_path) + + # --- Pre-load source face (needed by face_swapper in simple mode) --- + source_face = None + if source_path and os.path.exists(source_path): + source_img = cv2.imread(source_path) + if source_img is not None: + source_face = get_one_face(source_img) + del source_img + if source_face is None: + print("[DLC.CORE] Warning: No face detected in source image. " + "Face swapping will be skipped.") + + # --- Collect frame processors & reset per-video state --- + frame_processors = get_frame_processors_modules(modules.globals.frame_processors) + for fp in frame_processors: + if hasattr(fp, 'PREVIOUS_FRAME_RESULT'): + fp.PREVIOUS_FRAME_RESULT = None + + # --- Video metadata --- + try: + width, height = get_video_dimensions(target_path) + except Exception as e: + print(f"[DLC.CORE] Failed to get video dimensions: {e}") + return False + + total_frames = estimate_frame_count(target_path, fps) + frame_size = width * height * 3 + + # --- Build encoder arguments --- + encoder = modules.globals.video_encoder + encoder_options: List[str] = [] + is_hw_encoder = False + + if 'CUDAExecutionProvider' in modules.globals.execution_providers: + if encoder == 'libx264': + encoder = 'h264_nvenc' + is_hw_encoder = True + encoder_options = [ + '-preset', 'p4', '-tune', 'hq', '-rc', 'vbr', + '-cq', str(modules.globals.video_quality), '-b:v', '0', + ] + elif encoder == 'libx265': + encoder = 'hevc_nvenc' + is_hw_encoder = True + encoder_options = [ + '-preset', 'p4', '-tune', 'hq', '-rc', 'vbr', + '-cq', str(modules.globals.video_quality), '-b:v', '0', + ] + elif 'DmlExecutionProvider' in modules.globals.execution_providers: + if encoder == 'libx264': + encoder = 'h264_amf' + is_hw_encoder = True + encoder_options = [ + '-quality', 'quality', '-rc', 'vbr_latency', + '-qp_i', str(modules.globals.video_quality), + '-qp_p', str(modules.globals.video_quality), + ] + elif encoder == 'libx265': + encoder = 'hevc_amf' + is_hw_encoder = True + encoder_options = [ + '-quality', 'quality', '-rc', 'vbr_latency', + '-qp_i', str(modules.globals.video_quality), + '-qp_p', str(modules.globals.video_quality), + ] + + if not is_hw_encoder: + if encoder == 'libx264': + encoder_options = [ + '-preset', 'medium', + '-crf', str(modules.globals.video_quality), + '-tune', 'film', + ] + elif encoder == 'libx265': + encoder_options = [ + '-preset', 'medium', + '-crf', str(modules.globals.video_quality), + '-x265-params', 'log-level=error', + ] + elif encoder == 'libvpx-vp9': + encoder_options = [ + '-crf', str(modules.globals.video_quality), + '-b:v', '0', '-cpu-used', '2', + ] + + # --- Attempt pipeline (hw encoder first, then sw fallback) --- + encoders_to_try = [(encoder, encoder_options)] + if is_hw_encoder: + # Software fallback + sw_encoder = 'libx264' + sw_options = [ + '-preset', 'medium', + '-crf', str(modules.globals.video_quality), + '-tune', 'film', + ] + encoders_to_try.append((sw_encoder, sw_options)) + + for attempt, (enc, enc_opts) in enumerate(encoders_to_try): + # Reset interpolation state on retry + if attempt > 0: + for fp in frame_processors: + if hasattr(fp, 'PREVIOUS_FRAME_RESULT'): + fp.PREVIOUS_FRAME_RESULT = None + + success = _run_pipe_pipeline( + target_path, temp_output_path, fps, + source_face, frame_processors, + width, height, frame_size, total_frames, + enc, enc_opts, + ) + if success: + return True + + if attempt == 0 and is_hw_encoder: + print(f"[DLC.CORE] Hardware encoder '{enc}' failed, " + f"retrying with software encoder...") + + return False + + +def _run_pipe_pipeline( + target_path: str, + temp_output_path: str, + fps: float, + source_face: Any, + frame_processors: List[Any], + width: int, + height: int, + frame_size: int, + total_frames: int, + encoder: str, + encoder_options: List[str], +) -> bool: + """Run the FFmpeg-pipe read → process → encode pipeline once.""" + + # --- Reader: decode source video to raw BGR24 on stdout --- + reader_cmd = [ + 'ffmpeg', '-hide_banner', + '-hwaccel', 'auto', + '-i', target_path, + '-f', 'rawvideo', + '-pix_fmt', 'bgr24', + '-v', 'error', + '-', + ] + + # --- Writer: encode raw BGR24 from stdin --- + writer_cmd = [ + 'ffmpeg', '-hide_banner', + '-f', 'rawvideo', + '-pix_fmt', 'bgr24', + '-s', f'{width}x{height}', + '-r', str(fps), + '-i', '-', + '-c:v', encoder, + ] + writer_cmd.extend(encoder_options) + writer_cmd.extend([ + '-pix_fmt', 'yuv420p', + '-movflags', '+faststart', + '-vf', 'colorspace=bt709:iall=bt601-6-625:fast=1', + '-v', 'error', + '-y', temp_output_path, + ]) + + reader = None + writer = None + try: + reader = subprocess.Popen( + reader_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, + ) + writer = subprocess.Popen( + writer_cmd, stdin=subprocess.PIPE, stderr=subprocess.PIPE, + ) + except Exception as e: + print(f"[DLC.CORE] Failed to start FFmpeg pipes: {e}") + for proc in (reader, writer): + if proc: + try: + proc.kill() + except Exception: + pass + return False + + processed_count = 0 + bar_fmt = ('{l_bar}{bar}| {n_fmt}/{total_fmt} ' + '[{elapsed}<{remaining}, {rate_fmt}{postfix}]') + + try: + with tqdm(total=total_frames, desc='Processing', unit='frame', + dynamic_ncols=True, bar_format=bar_fmt) as progress: + progress.set_postfix({ + 'execution_providers': modules.globals.execution_providers, + 'threads': modules.globals.execution_threads, + 'mode': 'in-memory', + }) + + while True: + raw = reader.stdout.read(frame_size) + if len(raw) != frame_size: + break + + frame = np.frombuffer(raw, dtype=np.uint8).reshape( + (height, width, 3) + ).copy() + + # Run frame through every active processor + for fp in frame_processors: + frame = fp.process_frame(source_face, frame) + + writer.stdin.write(frame.tobytes()) + processed_count += 1 + progress.update(1) + + # Graceful shutdown + writer.stdin.close() + writer.wait() + reader.wait() + + if writer.returncode != 0: + stderr_out = writer.stderr.read().decode(errors='ignore').strip() + if stderr_out: + print(f"[DLC.CORE] FFmpeg encoder error: {stderr_out}") + return False + + return processed_count > 0 and os.path.isfile(temp_output_path) + + except BrokenPipeError: + print("[DLC.CORE] FFmpeg pipe broken (encoder may not be available).") + return False + except Exception as e: + print(f"[DLC.CORE] In-memory processing error: {e}") + return False + finally: + for proc in (reader, writer): + if proc: + try: + proc.kill() + except Exception: + pass diff --git a/modules/utilities.py b/modules/utilities.py index d99e74d..953ef3c 100644 --- a/modules/utilities.py +++ b/modules/utilities.py @@ -314,3 +314,35 @@ def conditional_download(download_directory_path: str, urls: List[str]) -> None: def resolve_relative_path(path: str) -> str: return os.path.abspath(os.path.join(os.path.dirname(__file__), path)) + + +def get_video_dimensions(target_path: str) -> tuple: + """Get video width and height using ffprobe.""" + command = [ + "ffprobe", "-v", "error", + "-select_streams", "v:0", + "-show_entries", "stream=width,height", + "-of", "csv=p=0:s=x", + target_path, + ] + output = subprocess.check_output(command).decode().strip() + width, height = map(int, output.split("x")) + return width, height + + +def estimate_frame_count(target_path: str, fps: float = None) -> int: + """Estimate total frame count from video duration and fps.""" + if fps is None: + fps = detect_fps(target_path) + command = [ + "ffprobe", "-v", "error", + "-show_entries", "format=duration", + "-of", "csv=p=0", + target_path, + ] + try: + output = subprocess.check_output(command).decode().strip() + duration = float(output) + return int(duration * fps) + except Exception: + return 0