diff --git a/facefusion/face_helper.py b/facefusion/face_helper.py index 96e4e88d..9a91279b 100644 --- a/facefusion/face_helper.py +++ b/facefusion/face_helper.py @@ -1,13 +1,13 @@ -from typing import Tuple, Dict +from typing import Tuple, Dict, Sequence import cv2 import numpy from facefusion.typing import Face, Frame, Matrix, Template -TEMPLATES : Dict[str, numpy.array] =\ +TEMPLATES : Dict[Template, numpy.array] =\ { - 'arface': numpy.array( + 'arcface': numpy.array( [ [ 38.2946, 51.6963 ], [ 73.5318, 51.5014 ], @@ -26,9 +26,9 @@ TEMPLATES : Dict[str, numpy.array] =\ } -def warp_face(target_face : Face, temp_frame : Frame, template : Template) -> Tuple[Frame, Matrix]: +def warp_face(target_face : Face, temp_frame : Frame, template : Template, size = cv2.typing.Size) -> Tuple[Frame, Matrix]: affine_matrix = cv2.estimateAffinePartial2D(target_face.kps, TEMPLATES[template], method = cv2.LMEDS)[0] - crop_frame = cv2.warpAffine(temp_frame, affine_matrix, (512, 512)) + crop_frame = cv2.warpAffine(temp_frame, affine_matrix, size) return crop_frame, affine_matrix diff --git a/facefusion/processors/frame/modules/face_enhancer.py b/facefusion/processors/frame/modules/face_enhancer.py index fba27300..fe6f28d6 100644 --- a/facefusion/processors/frame/modules/face_enhancer.py +++ b/facefusion/processors/frame/modules/face_enhancer.py @@ -129,7 +129,7 @@ def post_process() -> None: def enhance_face(target_face: Face, temp_frame: Frame) -> Frame: frame_processor = get_frame_processor() - crop_frame, affine_matrix = warp_face(target_face, temp_frame, 'ffhq') + crop_frame, affine_matrix = warp_face(target_face, temp_frame, 'ffhq', (512, 512)) crop_frame = prepare_crop_frame(crop_frame) frame_processor_inputs = {} for frame_processor_input in frame_processor.get_inputs(): diff --git a/facefusion/processors/frame/modules/face_swapper.py b/facefusion/processors/frame/modules/face_swapper.py index e8513d80..5d81ee5a 100644 --- a/facefusion/processors/frame/modules/face_swapper.py +++ b/facefusion/processors/frame/modules/face_swapper.py @@ -1,13 +1,18 @@ from typing import Any, List, Dict, Literal, Optional from argparse import ArgumentParser -import insightface import threading +import numpy +import onnx +import onnxruntime +from onnx import numpy_helper + import facefusion.globals import facefusion.processors.frame.core as frame_processors from facefusion import wording from facefusion.core import update_status from facefusion.face_analyser import get_one_face, get_many_faces, find_similar_faces, clear_face_analyser +from facefusion.face_helper import warp_face, paste_back from facefusion.face_reference import get_face_reference, set_face_reference from facefusion.typing import Face, Frame, Update_Process, ProcessMode, ModelValue, OptionsWithModel from facefusion.utilities import conditional_download, resolve_relative_path, is_image, is_video, is_file, is_download_done @@ -16,6 +21,8 @@ from facefusion.processors.frame import globals as frame_processors_globals from facefusion.processors.frame import choices as frame_processors_choices FRAME_PROCESSOR = None +MODEL_MATRIX = None +THREAD_SEMAPHORE : threading.Semaphore = threading.Semaphore() THREAD_LOCK : threading.Lock = threading.Lock() NAME = 'FACEFUSION.FRAME_PROCESSOR.FACE_SWAPPER' MODELS : Dict[str, ModelValue] =\ @@ -40,7 +47,7 @@ def get_frame_processor() -> Any: with THREAD_LOCK: if FRAME_PROCESSOR is None: model_path = get_options('model').get('path') - FRAME_PROCESSOR = insightface.model_zoo.get_model(model_path, providers = facefusion.globals.execution_providers) + FRAME_PROCESSOR = onnxruntime.InferenceSession(model_path, providers = facefusion.globals.execution_providers) return FRAME_PROCESSOR @@ -50,11 +57,28 @@ def clear_frame_processor() -> None: FRAME_PROCESSOR = None +def get_model_matrix() -> Any: + global MODEL_MATRIX + + with THREAD_LOCK: + if MODEL_MATRIX is None: + model_path = get_options('model').get('path') + model = onnx.load(model_path) + MODEL_MATRIX = numpy_helper.to_array(model.graph.initializer[-1]) + return MODEL_MATRIX + + +def clear_model_matrix() -> None: + global MODEL_MATRIX + + MODEL_MATRIX = None + + def get_options(key : Literal[ 'model' ]) -> Any: global OPTIONS if OPTIONS is None: - OPTIONS = \ + OPTIONS =\ { 'model': MODELS[frame_processors_globals.face_swapper_model] } @@ -110,12 +134,47 @@ def pre_process(mode : ProcessMode) -> bool: def post_process() -> None: clear_frame_processor() + clear_model_matrix() clear_face_analyser() read_static_image.cache_clear() def swap_face(source_face : Face, target_face : Face, temp_frame : Frame) -> Frame: - return get_frame_processor().get(temp_frame, target_face, source_face, paste_back = True) + frame_processor = get_frame_processor() + crop_frame, affine_matrix = warp_face(target_face, temp_frame, 'arcface', (128, 128)) + crop_frame = prepare_crop_frame(crop_frame) + frame_processor_inputs = {} + for frame_processor_input in frame_processor.get_inputs(): + if frame_processor_input.name == 'target': + frame_processor_inputs[frame_processor_input.name] = crop_frame + if frame_processor_input.name == 'source': + frame_processor_inputs[frame_processor_input.name] = prepare_source_face(source_face) + with THREAD_SEMAPHORE: + crop_frame = frame_processor.run(None, frame_processor_inputs)[0][0] + crop_frame = normalize_crop_frame(crop_frame) + temp_frame = paste_back(temp_frame, crop_frame, affine_matrix) + return temp_frame + + +def prepare_source_face(source_face : Face) -> Face: + model_matrix = get_model_matrix() + source_face = source_face.embedding.reshape((1, -1)) + source_face = numpy.dot(source_face, model_matrix) / numpy.linalg.norm(source_face) + return source_face + + +def prepare_crop_frame(crop_frame : Frame) -> Frame: + crop_frame = crop_frame / 255.0 + crop_frame = crop_frame[:, :, ::-1] + crop_frame = numpy.expand_dims(crop_frame, axis = 0).transpose(0, 3, 1, 2).astype(numpy.float32) + return crop_frame + + +def normalize_crop_frame(crop_frame : Frame) -> Frame: + crop_frame = crop_frame.transpose(1, 2, 0) + crop_frame = (crop_frame * 255.0).round() + crop_frame = crop_frame.astype(numpy.uint8)[:, :, ::-1] + return crop_frame def process_frame(source_face : Face, reference_face : Face, temp_frame : Frame) -> Frame: diff --git a/facefusion/processors/frame/modules/frame_enhancer.py b/facefusion/processors/frame/modules/frame_enhancer.py index 1fe9be71..5b7b7733 100644 --- a/facefusion/processors/frame/modules/frame_enhancer.py +++ b/facefusion/processors/frame/modules/frame_enhancer.py @@ -74,7 +74,7 @@ def get_options(key : Literal[ 'model' ]) -> Any: global OPTIONS if OPTIONS is None: - OPTIONS = \ + OPTIONS =\ { 'model': MODELS[frame_processors_globals.frame_enhancer_model] }