deepfuze
This commit is contained in:
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Executable
+16
@@ -0,0 +1,16 @@
|
||||
from typing import List
|
||||
|
||||
from deepfuze.common_helper import create_int_range
|
||||
from deepfuze.processors.frame.typings import FaceDebuggerItem, FaceEnhancerModel, FaceSwapperModel, FrameColorizerModel, FrameEnhancerModel, LipSyncerModel
|
||||
|
||||
face_debugger_items : List[FaceDebuggerItem] = [ 'bounding-box', 'face-landmark-5', 'face-landmark-5/68', 'face-landmark-68', 'face-landmark-68/5', 'face-mask', 'face-detector-score', 'face-landmarker-score', 'age', 'gender' ]
|
||||
face_enhancer_models : List[FaceEnhancerModel] = [ 'codeformer', 'gfpgan_1.2', 'gfpgan_1.3', 'gfpgan_1.4', 'gpen_bfr_256', 'gpen_bfr_512', 'gpen_bfr_1024', 'gpen_bfr_2048', 'restoreformer_plus_plus' ]
|
||||
face_swapper_models : List[FaceSwapperModel] = [ 'blendswap_256', 'inswapper_128', 'inswapper_128_fp16', 'simswap_256', 'simswap_512_unofficial', 'uniface_256' ]
|
||||
frame_colorizer_models : List[FrameColorizerModel] = [ 'ddcolor', 'ddcolor_artistic', 'deoldify', 'deoldify_artistic', 'deoldify_stable' ]
|
||||
frame_colorizer_sizes : List[str] = [ '192x192', '256x256', '384x384', '512x512' ]
|
||||
frame_enhancer_models : List[FrameEnhancerModel] = [ 'clear_reality_x4', 'lsdir_x4', 'nomos8k_sc_x4', 'real_esrgan_x2', 'real_esrgan_x2_fp16', 'real_esrgan_x4', 'real_esrgan_x4_fp16', 'real_hatgan_x4', 'span_kendata_x4', 'ultra_sharp_x4' ]
|
||||
lip_syncer_models : List[LipSyncerModel] = [ 'wav2lip_gan' ]
|
||||
|
||||
face_enhancer_blend_range : List[int] = create_int_range(0, 100, 1)
|
||||
frame_colorizer_blend_range : List[int] = create_int_range(0, 100, 1)
|
||||
frame_enhancer_blend_range : List[int] = create_int_range(0, 100, 1)
|
||||
@@ -0,0 +1,116 @@
|
||||
import os
|
||||
import sys
|
||||
import importlib
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from queue import Queue
|
||||
from types import ModuleType
|
||||
from typing import Any, List
|
||||
from tqdm import tqdm
|
||||
|
||||
import deepfuze.globals
|
||||
from deepfuze.typing import ProcessFrames, QueuePayload
|
||||
from deepfuze.execution import encode_execution_providers
|
||||
from deepfuze import logger, wording
|
||||
|
||||
FRAME_PROCESSORS_MODULES : List[ModuleType] = []
|
||||
FRAME_PROCESSORS_METHODS =\
|
||||
[
|
||||
'get_frame_processor',
|
||||
'clear_frame_processor',
|
||||
'get_options',
|
||||
'set_options',
|
||||
'register_args',
|
||||
'apply_args',
|
||||
'pre_check',
|
||||
'post_check',
|
||||
'pre_process',
|
||||
'post_process',
|
||||
'get_reference_frame',
|
||||
'process_frame',
|
||||
'process_frames',
|
||||
'process_image',
|
||||
'process_video'
|
||||
]
|
||||
|
||||
|
||||
def load_frame_processor_module(frame_processor : str) -> Any:
|
||||
try:
|
||||
frame_processor_module = importlib.import_module('deepfuze.processors.frame.modules.' + frame_processor)
|
||||
for method_name in FRAME_PROCESSORS_METHODS:
|
||||
if not hasattr(frame_processor_module, method_name):
|
||||
raise NotImplementedError
|
||||
except ModuleNotFoundError as exception:
|
||||
logger.error(wording.get('frame_processor_not_loaded').format(frame_processor = frame_processor), __name__.upper())
|
||||
logger.debug(exception.msg, __name__.upper())
|
||||
sys.exit(1)
|
||||
except NotImplementedError:
|
||||
logger.error(wording.get('frame_processor_not_implemented').format(frame_processor = frame_processor), __name__.upper())
|
||||
sys.exit(1)
|
||||
return frame_processor_module
|
||||
|
||||
|
||||
def get_frame_processors_modules(frame_processors : List[str]) -> List[ModuleType]:
|
||||
global FRAME_PROCESSORS_MODULES
|
||||
|
||||
if not FRAME_PROCESSORS_MODULES:
|
||||
for frame_processor in frame_processors:
|
||||
frame_processor_module = load_frame_processor_module(frame_processor)
|
||||
FRAME_PROCESSORS_MODULES.append(frame_processor_module)
|
||||
return FRAME_PROCESSORS_MODULES
|
||||
|
||||
|
||||
def clear_frame_processors_modules() -> None:
|
||||
global FRAME_PROCESSORS_MODULES
|
||||
|
||||
for frame_processor_module in get_frame_processors_modules(deepfuze.globals.frame_processors):
|
||||
frame_processor_module.clear_frame_processor()
|
||||
FRAME_PROCESSORS_MODULES = []
|
||||
|
||||
|
||||
def multi_process_frames(source_paths : List[str], temp_frame_paths : List[str], process_frames : ProcessFrames) -> None:
|
||||
queue_payloads = create_queue_payloads(temp_frame_paths)
|
||||
with tqdm(total = len(queue_payloads), desc = wording.get('processing'), unit = 'frame', ascii = ' =', disable = deepfuze.globals.log_level in [ 'warn', 'error' ]) as progress:
|
||||
progress.set_postfix(
|
||||
{
|
||||
'execution_providers': encode_execution_providers(deepfuze.globals.execution_providers),
|
||||
'execution_thread_count': deepfuze.globals.execution_thread_count,
|
||||
'execution_queue_count': deepfuze.globals.execution_queue_count
|
||||
})
|
||||
with ThreadPoolExecutor(max_workers = deepfuze.globals.execution_thread_count) as executor:
|
||||
futures = []
|
||||
queue : Queue[QueuePayload] = create_queue(queue_payloads)
|
||||
queue_per_future = max(len(queue_payloads) // deepfuze.globals.execution_thread_count * deepfuze.globals.execution_queue_count, 1)
|
||||
while not queue.empty():
|
||||
future = executor.submit(process_frames, source_paths, pick_queue(queue, queue_per_future), progress.update)
|
||||
futures.append(future)
|
||||
for future_done in as_completed(futures):
|
||||
future_done.result()
|
||||
|
||||
|
||||
def create_queue(queue_payloads : List[QueuePayload]) -> Queue[QueuePayload]:
|
||||
queue : Queue[QueuePayload] = Queue()
|
||||
for queue_payload in queue_payloads:
|
||||
queue.put(queue_payload)
|
||||
return queue
|
||||
|
||||
|
||||
def pick_queue(queue : Queue[QueuePayload], queue_per_future : int) -> List[QueuePayload]:
|
||||
queues = []
|
||||
for _ in range(queue_per_future):
|
||||
if not queue.empty():
|
||||
queues.append(queue.get())
|
||||
return queues
|
||||
|
||||
|
||||
def create_queue_payloads(temp_frame_paths : List[str]) -> List[QueuePayload]:
|
||||
queue_payloads = []
|
||||
temp_frame_paths = sorted(temp_frame_paths, key = os.path.basename)
|
||||
|
||||
for frame_number, frame_path in enumerate(temp_frame_paths):
|
||||
frame_payload : QueuePayload =\
|
||||
{
|
||||
'frame_number': frame_number,
|
||||
'frame_path': frame_path
|
||||
}
|
||||
queue_payloads.append(frame_payload)
|
||||
return queue_payloads
|
||||
Executable
+14
@@ -0,0 +1,14 @@
|
||||
from typing import List, Optional
|
||||
|
||||
from deepfuze.processors.frame.typings import FaceDebuggerItem, FaceEnhancerModel, FaceSwapperModel, FrameColorizerModel, FrameEnhancerModel, LipSyncerModel
|
||||
|
||||
face_debugger_items : Optional[List[FaceDebuggerItem]] = None
|
||||
face_enhancer_model : Optional[FaceEnhancerModel] = None
|
||||
face_enhancer_blend : Optional[int] = None
|
||||
face_swapper_model : Optional[FaceSwapperModel] = None
|
||||
frame_colorizer_model : Optional[FrameColorizerModel] = None
|
||||
frame_colorizer_blend : Optional[int] = None
|
||||
frame_colorizer_size : Optional[str] = None
|
||||
frame_enhancer_model : Optional[FrameEnhancerModel] = None
|
||||
frame_enhancer_blend : Optional[int] = None
|
||||
lip_syncer_model : Optional[LipSyncerModel] = None
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
+192
@@ -0,0 +1,192 @@
|
||||
from typing import Any, List, Literal
|
||||
from argparse import ArgumentParser
|
||||
import cv2
|
||||
import numpy
|
||||
|
||||
import deepfuze.globals
|
||||
import deepfuze.processors.frame.core as frame_processors
|
||||
from deepfuze import config, process_manager, wording
|
||||
from deepfuze.face_analyser import get_one_face, get_many_faces, find_similar_faces, clear_face_analyser
|
||||
from deepfuze.face_masker import create_static_box_mask, create_occlusion_mask, create_region_mask, clear_face_occluder, clear_face_parser
|
||||
from deepfuze.face_helper import warp_face_by_face_landmark_5, categorize_age, categorize_gender
|
||||
from deepfuze.face_store import get_reference_faces
|
||||
from deepfuze.content_analyser import clear_content_analyser
|
||||
from deepfuze.typing import Face, VisionFrame, UpdateProgress, ProcessMode, QueuePayload
|
||||
from deepfuze.vision import read_image, read_static_image, write_image
|
||||
from deepfuze.processors.frame.typings import FaceDebuggerInputs
|
||||
from deepfuze.processors.frame import globals as frame_processors_globals, choices as frame_processors_choices
|
||||
|
||||
NAME = __name__.upper()
|
||||
|
||||
|
||||
def get_frame_processor() -> None:
|
||||
pass
|
||||
|
||||
|
||||
def clear_frame_processor() -> None:
|
||||
pass
|
||||
|
||||
|
||||
def get_options(key : Literal['model']) -> None:
|
||||
pass
|
||||
|
||||
|
||||
def set_options(key : Literal['model'], value : Any) -> None:
|
||||
pass
|
||||
|
||||
|
||||
def register_args(program : ArgumentParser) -> None:
|
||||
program.add_argument('--face-debugger-items', help = wording.get('help.face_debugger_items').format(choices = ', '.join(frame_processors_choices.face_debugger_items)), default = config.get_str_list('frame_processors.face_debugger_items', 'face-landmark-5/68 face-mask'), choices = frame_processors_choices.face_debugger_items, nargs = '+', metavar = 'FACE_DEBUGGER_ITEMS')
|
||||
|
||||
|
||||
def apply_args(program : ArgumentParser) -> None:
|
||||
args = program.parse_args()
|
||||
frame_processors_globals.face_debugger_items = args.face_debugger_items
|
||||
|
||||
|
||||
def pre_check() -> bool:
|
||||
return True
|
||||
|
||||
|
||||
def post_check() -> bool:
|
||||
return True
|
||||
|
||||
|
||||
def pre_process(mode : ProcessMode) -> bool:
|
||||
return True
|
||||
|
||||
|
||||
def post_process() -> None:
|
||||
read_static_image.cache_clear()
|
||||
if deepfuze.globals.video_memory_strategy == 'strict' or deepfuze.globals.video_memory_strategy == 'moderate':
|
||||
clear_frame_processor()
|
||||
if deepfuze.globals.video_memory_strategy == 'strict':
|
||||
clear_face_analyser()
|
||||
clear_content_analyser()
|
||||
clear_face_occluder()
|
||||
clear_face_parser()
|
||||
|
||||
|
||||
def debug_face(target_face : Face, temp_vision_frame : VisionFrame) -> VisionFrame:
|
||||
primary_color = (0, 0, 255)
|
||||
secondary_color = (0, 255, 0)
|
||||
tertiary_color = (255, 255, 0)
|
||||
bounding_box = target_face.bounding_box.astype(numpy.int32)
|
||||
temp_vision_frame = temp_vision_frame.copy()
|
||||
has_face_landmark_5_fallback = numpy.array_equal(target_face.landmarks.get('5'), target_face.landmarks.get('5/68'))
|
||||
has_face_landmark_68_fallback = numpy.array_equal(target_face.landmarks.get('68'), target_face.landmarks.get('68/5'))
|
||||
|
||||
if 'bounding-box' in frame_processors_globals.face_debugger_items:
|
||||
cv2.rectangle(temp_vision_frame, (bounding_box[0], bounding_box[1]), (bounding_box[2], bounding_box[3]), primary_color, 2)
|
||||
if 'face-mask' in frame_processors_globals.face_debugger_items:
|
||||
crop_vision_frame, affine_matrix = warp_face_by_face_landmark_5(temp_vision_frame, target_face.landmarks.get('5/68'), 'arcface_128_v2', (512, 512))
|
||||
inverse_matrix = cv2.invertAffineTransform(affine_matrix)
|
||||
temp_size = temp_vision_frame.shape[:2][::-1]
|
||||
crop_mask_list = []
|
||||
if 'box' in deepfuze.globals.face_mask_types:
|
||||
box_mask = create_static_box_mask(crop_vision_frame.shape[:2][::-1], 0, deepfuze.globals.face_mask_padding)
|
||||
crop_mask_list.append(box_mask)
|
||||
if 'occlusion' in deepfuze.globals.face_mask_types:
|
||||
occlusion_mask = create_occlusion_mask(crop_vision_frame)
|
||||
crop_mask_list.append(occlusion_mask)
|
||||
if 'region' in deepfuze.globals.face_mask_types:
|
||||
region_mask = create_region_mask(crop_vision_frame, deepfuze.globals.face_mask_regions)
|
||||
crop_mask_list.append(region_mask)
|
||||
crop_mask = numpy.minimum.reduce(crop_mask_list).clip(0, 1)
|
||||
crop_mask = (crop_mask * 255).astype(numpy.uint8)
|
||||
inverse_vision_frame = cv2.warpAffine(crop_mask, inverse_matrix, temp_size)
|
||||
inverse_vision_frame = cv2.threshold(inverse_vision_frame, 100, 255, cv2.THRESH_BINARY)[1]
|
||||
inverse_vision_frame[inverse_vision_frame > 0] = 255
|
||||
inverse_contours = cv2.findContours(inverse_vision_frame, cv2.RETR_LIST, cv2.CHAIN_APPROX_NONE)[0]
|
||||
cv2.drawContours(temp_vision_frame, inverse_contours, -1, tertiary_color if has_face_landmark_5_fallback else secondary_color, 2)
|
||||
if 'face-landmark-5' in frame_processors_globals.face_debugger_items and numpy.any(target_face.landmarks.get('5')):
|
||||
face_landmark_5 = target_face.landmarks.get('5').astype(numpy.int32)
|
||||
for index in range(face_landmark_5.shape[0]):
|
||||
cv2.circle(temp_vision_frame, (face_landmark_5[index][0], face_landmark_5[index][1]), 3, primary_color, -1)
|
||||
if 'face-landmark-5/68' in frame_processors_globals.face_debugger_items and numpy.any(target_face.landmarks.get('5/68')):
|
||||
face_landmark_5_68 = target_face.landmarks.get('5/68').astype(numpy.int32)
|
||||
for index in range(face_landmark_5_68.shape[0]):
|
||||
cv2.circle(temp_vision_frame, (face_landmark_5_68[index][0], face_landmark_5_68[index][1]), 3, tertiary_color if has_face_landmark_5_fallback else secondary_color, -1)
|
||||
if 'face-landmark-68' in frame_processors_globals.face_debugger_items and numpy.any(target_face.landmarks.get('68')):
|
||||
face_landmark_68 = target_face.landmarks.get('68').astype(numpy.int32)
|
||||
for index in range(face_landmark_68.shape[0]):
|
||||
cv2.circle(temp_vision_frame, (face_landmark_68[index][0], face_landmark_68[index][1]), 3, tertiary_color if has_face_landmark_68_fallback else secondary_color, -1)
|
||||
if 'face-landmark-68/5' in frame_processors_globals.face_debugger_items and numpy.any(target_face.landmarks.get('68')):
|
||||
face_landmark_68 = target_face.landmarks.get('68/5').astype(numpy.int32)
|
||||
for index in range(face_landmark_68.shape[0]):
|
||||
cv2.circle(temp_vision_frame, (face_landmark_68[index][0], face_landmark_68[index][1]), 3, primary_color, -1)
|
||||
if bounding_box[3] - bounding_box[1] > 50 and bounding_box[2] - bounding_box[0] > 50:
|
||||
top = bounding_box[1]
|
||||
left = bounding_box[0] - 20
|
||||
if 'face-detector-score' in frame_processors_globals.face_debugger_items:
|
||||
face_score_text = str(round(target_face.scores.get('detector'), 2))
|
||||
top = top + 20
|
||||
cv2.putText(temp_vision_frame, face_score_text, (left, top), cv2.FONT_HERSHEY_SIMPLEX, 0.5, primary_color, 2)
|
||||
if 'face-landmarker-score' in frame_processors_globals.face_debugger_items:
|
||||
face_score_text = str(round(target_face.scores.get('landmarker'), 2))
|
||||
top = top + 20
|
||||
cv2.putText(temp_vision_frame, face_score_text, (left, top), cv2.FONT_HERSHEY_SIMPLEX, 0.5, tertiary_color if has_face_landmark_5_fallback else secondary_color, 2)
|
||||
if 'age' in frame_processors_globals.face_debugger_items:
|
||||
face_age_text = categorize_age(target_face.age)
|
||||
top = top + 20
|
||||
cv2.putText(temp_vision_frame, face_age_text, (left, top), cv2.FONT_HERSHEY_SIMPLEX, 0.5, primary_color, 2)
|
||||
if 'gender' in frame_processors_globals.face_debugger_items:
|
||||
face_gender_text = categorize_gender(target_face.gender)
|
||||
top = top + 20
|
||||
cv2.putText(temp_vision_frame, face_gender_text, (left, top), cv2.FONT_HERSHEY_SIMPLEX, 0.5, primary_color, 2)
|
||||
return temp_vision_frame
|
||||
|
||||
|
||||
def get_reference_frame(source_face : Face, target_face : Face, temp_vision_frame : VisionFrame) -> VisionFrame:
|
||||
pass
|
||||
|
||||
|
||||
def process_frame(inputs : FaceDebuggerInputs) -> VisionFrame:
|
||||
reference_faces = inputs.get('reference_faces')
|
||||
target_vision_frame = inputs.get('target_vision_frame')
|
||||
|
||||
if deepfuze.globals.face_selector_mode == 'many':
|
||||
many_faces = get_many_faces(target_vision_frame)
|
||||
if many_faces:
|
||||
for target_face in many_faces:
|
||||
target_vision_frame = debug_face(target_face, target_vision_frame)
|
||||
if deepfuze.globals.face_selector_mode == 'one':
|
||||
target_face = get_one_face(target_vision_frame)
|
||||
if target_face:
|
||||
target_vision_frame = debug_face(target_face, target_vision_frame)
|
||||
if deepfuze.globals.face_selector_mode == 'reference':
|
||||
similar_faces = find_similar_faces(reference_faces, target_vision_frame, deepfuze.globals.reference_face_distance)
|
||||
if similar_faces:
|
||||
for similar_face in similar_faces:
|
||||
target_vision_frame = debug_face(similar_face, target_vision_frame)
|
||||
return target_vision_frame
|
||||
|
||||
|
||||
def process_frames(source_paths : List[str], queue_payloads : List[QueuePayload], update_progress : UpdateProgress) -> None:
|
||||
reference_faces = get_reference_faces() if 'reference' in deepfuze.globals.face_selector_mode else None
|
||||
|
||||
for queue_payload in process_manager.manage(queue_payloads):
|
||||
target_vision_path = queue_payload['frame_path']
|
||||
target_vision_frame = read_image(target_vision_path)
|
||||
output_vision_frame = process_frame(
|
||||
{
|
||||
'reference_faces': reference_faces,
|
||||
'target_vision_frame': target_vision_frame
|
||||
})
|
||||
write_image(target_vision_path, output_vision_frame)
|
||||
update_progress(1)
|
||||
|
||||
|
||||
def process_image(source_paths : List[str], target_path : str, output_path : str) -> None:
|
||||
reference_faces = get_reference_faces() if 'reference' in deepfuze.globals.face_selector_mode else None
|
||||
target_vision_frame = read_static_image(target_path)
|
||||
output_vision_frame = process_frame(
|
||||
{
|
||||
'reference_faces': reference_faces,
|
||||
'target_vision_frame': target_vision_frame
|
||||
})
|
||||
write_image(output_path, output_vision_frame)
|
||||
|
||||
|
||||
def process_video(source_paths : List[str], temp_frame_paths : List[str]) -> None:
|
||||
frame_processors.multi_process_frames(source_paths, temp_frame_paths, process_frames)
|
||||
+301
@@ -0,0 +1,301 @@
|
||||
from typing import Any, List, Literal, Optional
|
||||
from argparse import ArgumentParser
|
||||
from time import sleep
|
||||
import cv2
|
||||
import numpy
|
||||
import onnxruntime
|
||||
|
||||
import deepfuze.globals
|
||||
import deepfuze.processors.frame.core as frame_processors
|
||||
from deepfuze import config, process_manager, logger, wording
|
||||
from deepfuze.face_analyser import get_many_faces, clear_face_analyser, find_similar_faces, get_one_face
|
||||
from deepfuze.face_masker import create_static_box_mask, create_occlusion_mask, clear_face_occluder
|
||||
from deepfuze.face_helper import warp_face_by_face_landmark_5, paste_back
|
||||
from deepfuze.execution import apply_execution_provider_options
|
||||
from deepfuze.content_analyser import clear_content_analyser
|
||||
from deepfuze.face_store import get_reference_faces
|
||||
from deepfuze.normalizer import normalize_output_path
|
||||
from deepfuze.thread_helper import thread_lock, thread_semaphore
|
||||
from deepfuze.typing import Face, VisionFrame, UpdateProgress, ProcessMode, ModelSet, OptionsWithModel, QueuePayload
|
||||
from deepfuze.common_helper import create_metavar
|
||||
from deepfuze.filesystem import is_file, is_image, is_video, resolve_relative_path
|
||||
from deepfuze.download import conditional_download, is_download_done
|
||||
from deepfuze.vision import read_image, read_static_image, write_image
|
||||
from deepfuze.processors.frame.typings import FaceEnhancerInputs
|
||||
from deepfuze.processors.frame import globals as frame_processors_globals
|
||||
from deepfuze.processors.frame import choices as frame_processors_choices
|
||||
|
||||
FRAME_PROCESSOR = None
|
||||
NAME = __name__.upper()
|
||||
MODELS : ModelSet =\
|
||||
{
|
||||
'codeformer':
|
||||
{
|
||||
'url': 'https://github.com/facefusion/facefusion-assets/releases/download/models/codeformer.onnx',
|
||||
'path': resolve_relative_path('../../../models/deepfuze/codeformer.onnx'),
|
||||
'template': 'ffhq_512',
|
||||
'size': (512, 512)
|
||||
},
|
||||
'gfpgan_1.2':
|
||||
{
|
||||
'url': 'https://github.com/facefusion/facefusion-assets/releases/download/models/gfpgan_1.2.onnx',
|
||||
'path': resolve_relative_path('../../../models/deepfuze/gfpgan_1.2.onnx'),
|
||||
'template': 'ffhq_512',
|
||||
'size': (512, 512)
|
||||
},
|
||||
'gfpgan_1.3':
|
||||
{
|
||||
'url': 'https://github.com/facefusion/facefusion-assets/releases/download/models/gfpgan_1.3.onnx',
|
||||
'path': resolve_relative_path('../../../models/deepfuze/gfpgan_1.3.onnx'),
|
||||
'template': 'ffhq_512',
|
||||
'size': (512, 512)
|
||||
},
|
||||
'gfpgan_1.4':
|
||||
{
|
||||
'url': 'https://github.com/facefusion/facefusion-assets/releases/download/models/gfpgan_1.4.onnx',
|
||||
'path': resolve_relative_path('../../../models/deepfuze/gfpgan_1.4.onnx'),
|
||||
'template': 'ffhq_512',
|
||||
'size': (512, 512)
|
||||
},
|
||||
'gpen_bfr_256':
|
||||
{
|
||||
'url': 'https://github.com/facefusion/facefusion-assets/releases/download/models/gpen_bfr_256.onnx',
|
||||
'path': resolve_relative_path('../../../models/deepfuze/gpen_bfr_256.onnx'),
|
||||
'template': 'arcface_128_v2',
|
||||
'size': (256, 256)
|
||||
},
|
||||
'gpen_bfr_512':
|
||||
{
|
||||
'url': 'https://github.com/facefusion/facefusion-assets/releases/download/models/gpen_bfr_512.onnx',
|
||||
'path': resolve_relative_path('../../../models/deepfuze/gpen_bfr_512.onnx'),
|
||||
'template': 'ffhq_512',
|
||||
'size': (512, 512)
|
||||
},
|
||||
'gpen_bfr_1024':
|
||||
{
|
||||
'url': 'https://github.com/facefusion/facefusion-assets/releases/download/models/gpen_bfr_1024.onnx',
|
||||
'path': resolve_relative_path('../../../models/deepfuze/gpen_bfr_1024.onnx'),
|
||||
'template': 'ffhq_512',
|
||||
'size': (1024, 1024)
|
||||
},
|
||||
'gpen_bfr_2048':
|
||||
{
|
||||
'url': 'https://github.com/facefusion/facefusion-assets/releases/download/models/gpen_bfr_2048.onnx',
|
||||
'path': resolve_relative_path('../../../models/deepfuze/gpen_bfr_2048.onnx'),
|
||||
'template': 'ffhq_512',
|
||||
'size': (2048, 2048)
|
||||
},
|
||||
'restoreformer_plus_plus':
|
||||
{
|
||||
'url': 'https://github.com/facefusion/facefusion-assets/releases/download/models/restoreformer_plus_plus.onnx',
|
||||
'path': resolve_relative_path('../../../models/deepfuze/restoreformer_plus_plus.onnx'),
|
||||
'template': 'ffhq_512',
|
||||
'size': (512, 512)
|
||||
}
|
||||
}
|
||||
OPTIONS : Optional[OptionsWithModel] = None
|
||||
|
||||
|
||||
def get_frame_processor() -> Any:
|
||||
global FRAME_PROCESSOR
|
||||
|
||||
with thread_lock():
|
||||
while process_manager.is_checking():
|
||||
sleep(0.5)
|
||||
if FRAME_PROCESSOR is None:
|
||||
model_path = get_options('model').get('path')
|
||||
FRAME_PROCESSOR = onnxruntime.InferenceSession(model_path, providers = apply_execution_provider_options(deepfuze.globals.execution_device_id, deepfuze.globals.execution_providers))
|
||||
return FRAME_PROCESSOR
|
||||
|
||||
|
||||
def clear_frame_processor() -> None:
|
||||
global FRAME_PROCESSOR
|
||||
|
||||
FRAME_PROCESSOR = None
|
||||
|
||||
|
||||
def get_options(key : Literal['model']) -> Any:
|
||||
global OPTIONS
|
||||
|
||||
if OPTIONS is None:
|
||||
OPTIONS =\
|
||||
{
|
||||
'model': MODELS[frame_processors_globals.face_enhancer_model]
|
||||
}
|
||||
return OPTIONS.get(key)
|
||||
|
||||
|
||||
def set_options(key : Literal['model'], value : Any) -> None:
|
||||
global OPTIONS
|
||||
|
||||
OPTIONS[key] = value
|
||||
|
||||
|
||||
def register_args(program : ArgumentParser) -> None:
|
||||
program.add_argument('--face-enhancer-model', help = wording.get('help.face_enhancer_model'), default = config.get_str_value('frame_processors.face_enhancer_model', 'gfpgan_1.4'), choices = frame_processors_choices.face_enhancer_models)
|
||||
program.add_argument('--face-enhancer-blend', help = wording.get('help.face_enhancer_blend'), type = int, default = config.get_int_value('frame_processors.face_enhancer_blend', '80'), choices = frame_processors_choices.face_enhancer_blend_range, metavar = create_metavar(frame_processors_choices.face_enhancer_blend_range))
|
||||
|
||||
|
||||
def apply_args(program : ArgumentParser) -> None:
|
||||
args = program.parse_args()
|
||||
frame_processors_globals.face_enhancer_model = args.face_enhancer_model
|
||||
frame_processors_globals.face_enhancer_blend = args.face_enhancer_blend
|
||||
|
||||
|
||||
def pre_check() -> bool:
|
||||
download_directory_path = resolve_relative_path('../../../models/deepfuze')
|
||||
model_url = get_options('model').get('url')
|
||||
model_path = get_options('model').get('path')
|
||||
|
||||
if not deepfuze.globals.skip_download:
|
||||
process_manager.check()
|
||||
conditional_download(download_directory_path, [ model_url ])
|
||||
process_manager.end()
|
||||
return is_file(model_path)
|
||||
|
||||
|
||||
def post_check() -> bool:
|
||||
model_url = get_options('model').get('url')
|
||||
model_path = get_options('model').get('path')
|
||||
|
||||
if not deepfuze.globals.skip_download and not is_download_done(model_url, model_path):
|
||||
logger.error(wording.get('model_download_not_done') + wording.get('exclamation_mark'), NAME)
|
||||
return False
|
||||
if not is_file(model_path):
|
||||
logger.error(wording.get('model_file_not_present') + wording.get('exclamation_mark'), NAME)
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def pre_process(mode : ProcessMode) -> bool:
|
||||
if mode in [ 'output', 'preview' ] and not is_image(deepfuze.globals.target_path) and not is_video(deepfuze.globals.target_path):
|
||||
logger.error(wording.get('select_image_or_video_target') + wording.get('exclamation_mark'), NAME)
|
||||
return False
|
||||
if mode == 'output' and not normalize_output_path(deepfuze.globals.target_path, deepfuze.globals.output_path):
|
||||
logger.error(wording.get('select_file_or_directory_output') + wording.get('exclamation_mark'), NAME)
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def post_process() -> None:
|
||||
read_static_image.cache_clear()
|
||||
if deepfuze.globals.video_memory_strategy == 'strict' or deepfuze.globals.video_memory_strategy == 'moderate':
|
||||
clear_frame_processor()
|
||||
if deepfuze.globals.video_memory_strategy == 'strict':
|
||||
clear_face_analyser()
|
||||
clear_content_analyser()
|
||||
clear_face_occluder()
|
||||
|
||||
|
||||
def enhance_face(target_face: Face, temp_vision_frame : VisionFrame) -> VisionFrame:
|
||||
model_template = get_options('model').get('template')
|
||||
model_size = get_options('model').get('size')
|
||||
crop_vision_frame, affine_matrix = warp_face_by_face_landmark_5(temp_vision_frame, target_face.landmarks.get('5/68'), model_template, model_size)
|
||||
box_mask = create_static_box_mask(crop_vision_frame.shape[:2][::-1], deepfuze.globals.face_mask_blur, (0, 0, 0, 0))
|
||||
crop_mask_list =\
|
||||
[
|
||||
box_mask
|
||||
]
|
||||
|
||||
if 'occlusion' in deepfuze.globals.face_mask_types:
|
||||
occlusion_mask = create_occlusion_mask(crop_vision_frame)
|
||||
crop_mask_list.append(occlusion_mask)
|
||||
crop_vision_frame = prepare_crop_frame(crop_vision_frame)
|
||||
crop_vision_frame = apply_enhance(crop_vision_frame)
|
||||
crop_vision_frame = normalize_crop_frame(crop_vision_frame)
|
||||
crop_mask = numpy.minimum.reduce(crop_mask_list).clip(0, 1)
|
||||
paste_vision_frame = paste_back(temp_vision_frame, crop_vision_frame, crop_mask, affine_matrix)
|
||||
temp_vision_frame = blend_frame(temp_vision_frame, paste_vision_frame)
|
||||
return temp_vision_frame
|
||||
|
||||
|
||||
def apply_enhance(crop_vision_frame : VisionFrame) -> VisionFrame:
|
||||
frame_processor = get_frame_processor()
|
||||
frame_processor_inputs = {}
|
||||
|
||||
for frame_processor_input in frame_processor.get_inputs():
|
||||
if frame_processor_input.name == 'input':
|
||||
frame_processor_inputs[frame_processor_input.name] = crop_vision_frame
|
||||
if frame_processor_input.name == 'weight':
|
||||
weight = numpy.array([ 1 ]).astype(numpy.double)
|
||||
frame_processor_inputs[frame_processor_input.name] = weight
|
||||
with thread_semaphore():
|
||||
crop_vision_frame = frame_processor.run(None, frame_processor_inputs)[0][0]
|
||||
return crop_vision_frame
|
||||
|
||||
|
||||
def prepare_crop_frame(crop_vision_frame : VisionFrame) -> VisionFrame:
|
||||
crop_vision_frame = crop_vision_frame[:, :, ::-1] / 255.0
|
||||
crop_vision_frame = (crop_vision_frame - 0.5) / 0.5
|
||||
crop_vision_frame = numpy.expand_dims(crop_vision_frame.transpose(2, 0, 1), axis = 0).astype(numpy.float32)
|
||||
return crop_vision_frame
|
||||
|
||||
|
||||
def normalize_crop_frame(crop_vision_frame : VisionFrame) -> VisionFrame:
|
||||
crop_vision_frame = numpy.clip(crop_vision_frame, -1, 1)
|
||||
crop_vision_frame = (crop_vision_frame + 1) / 2
|
||||
crop_vision_frame = crop_vision_frame.transpose(1, 2, 0)
|
||||
crop_vision_frame = (crop_vision_frame * 255.0).round()
|
||||
crop_vision_frame = crop_vision_frame.astype(numpy.uint8)[:, :, ::-1]
|
||||
return crop_vision_frame
|
||||
|
||||
|
||||
def blend_frame(temp_vision_frame : VisionFrame, paste_vision_frame : VisionFrame) -> VisionFrame:
|
||||
face_enhancer_blend = 1 - (frame_processors_globals.face_enhancer_blend / 100)
|
||||
temp_vision_frame = cv2.addWeighted(temp_vision_frame, face_enhancer_blend, paste_vision_frame, 1 - face_enhancer_blend, 0)
|
||||
return temp_vision_frame
|
||||
|
||||
|
||||
def get_reference_frame(source_face : Face, target_face : Face, temp_vision_frame : VisionFrame) -> VisionFrame:
|
||||
return enhance_face(target_face, temp_vision_frame)
|
||||
|
||||
|
||||
def process_frame(inputs : FaceEnhancerInputs) -> VisionFrame:
|
||||
reference_faces = inputs.get('reference_faces')
|
||||
target_vision_frame = inputs.get('target_vision_frame')
|
||||
|
||||
if deepfuze.globals.face_selector_mode == 'many':
|
||||
many_faces = get_many_faces(target_vision_frame)
|
||||
if many_faces:
|
||||
for target_face in many_faces:
|
||||
target_vision_frame = enhance_face(target_face, target_vision_frame)
|
||||
if deepfuze.globals.face_selector_mode == 'one':
|
||||
target_face = get_one_face(target_vision_frame)
|
||||
if target_face:
|
||||
target_vision_frame = enhance_face(target_face, target_vision_frame)
|
||||
if deepfuze.globals.face_selector_mode == 'reference':
|
||||
similar_faces = find_similar_faces(reference_faces, target_vision_frame, deepfuze.globals.reference_face_distance)
|
||||
if similar_faces:
|
||||
for similar_face in similar_faces:
|
||||
target_vision_frame = enhance_face(similar_face, target_vision_frame)
|
||||
return target_vision_frame
|
||||
|
||||
|
||||
def process_frames(source_path : List[str], queue_payloads : List[QueuePayload], update_progress : UpdateProgress) -> None:
|
||||
reference_faces = get_reference_faces() if 'reference' in deepfuze.globals.face_selector_mode else None
|
||||
|
||||
for queue_payload in process_manager.manage(queue_payloads):
|
||||
target_vision_path = queue_payload['frame_path']
|
||||
target_vision_frame = read_image(target_vision_path)
|
||||
output_vision_frame = process_frame(
|
||||
{
|
||||
'reference_faces': reference_faces,
|
||||
'target_vision_frame': target_vision_frame
|
||||
})
|
||||
write_image(target_vision_path, output_vision_frame)
|
||||
update_progress(1)
|
||||
|
||||
|
||||
def process_image(source_path : str, target_path : str, output_path : str) -> None:
|
||||
reference_faces = get_reference_faces() if 'reference' in deepfuze.globals.face_selector_mode else None
|
||||
target_vision_frame = read_static_image(target_path)
|
||||
output_vision_frame = process_frame(
|
||||
{
|
||||
'reference_faces': reference_faces,
|
||||
'target_vision_frame': target_vision_frame
|
||||
})
|
||||
write_image(output_path, output_vision_frame)
|
||||
|
||||
|
||||
def process_video(source_paths : List[str], temp_frame_paths : List[str]) -> None:
|
||||
frame_processors.multi_process_frames(None, temp_frame_paths, process_frames)
|
||||
+369
@@ -0,0 +1,369 @@
|
||||
from typing import Any, List, Literal, Optional
|
||||
from argparse import ArgumentParser
|
||||
from time import sleep
|
||||
import numpy
|
||||
import onnx
|
||||
import onnxruntime
|
||||
from onnx import numpy_helper
|
||||
|
||||
import deepfuze.globals
|
||||
import deepfuze.processors.frame.core as frame_processors
|
||||
from deepfuze import config, process_manager, logger, wording
|
||||
from deepfuze.execution import has_execution_provider, apply_execution_provider_options
|
||||
from deepfuze.face_analyser import get_one_face, get_average_face, get_many_faces, find_similar_faces, clear_face_analyser
|
||||
from deepfuze.face_masker import create_static_box_mask, create_occlusion_mask, create_region_mask, clear_face_occluder, clear_face_parser
|
||||
from deepfuze.face_helper import warp_face_by_face_landmark_5, paste_back
|
||||
from deepfuze.face_store import get_reference_faces
|
||||
from deepfuze.content_analyser import clear_content_analyser
|
||||
from deepfuze.normalizer import normalize_output_path
|
||||
from deepfuze.thread_helper import thread_lock, conditional_thread_semaphore
|
||||
from deepfuze.typing import Face, Embedding, VisionFrame, UpdateProgress, ProcessMode, ModelSet, OptionsWithModel, QueuePayload
|
||||
from deepfuze.filesystem import is_file, is_image, has_image, is_video, filter_image_paths, resolve_relative_path
|
||||
from deepfuze.download import conditional_download, is_download_done
|
||||
from deepfuze.vision import read_image, read_static_image, read_static_images, write_image
|
||||
from deepfuze.processors.frame.typings import FaceSwapperInputs
|
||||
from deepfuze.processors.frame import globals as frame_processors_globals
|
||||
from deepfuze.processors.frame import choices as frame_processors_choices
|
||||
|
||||
FRAME_PROCESSOR = None
|
||||
MODEL_INITIALIZER = None
|
||||
NAME = __name__.upper()
|
||||
MODELS : ModelSet =\
|
||||
{
|
||||
'blendswap_256':
|
||||
{
|
||||
'type': 'blendswap',
|
||||
'url': 'https://github.com/facefusion/facefusion-assets/releases/download/models/blendswap_256.onnx',
|
||||
'path': resolve_relative_path('../../../models/deepfuze/blendswap_256.onnx'),
|
||||
'template': 'ffhq_512',
|
||||
'size': (256, 256),
|
||||
'mean': [ 0.0, 0.0, 0.0 ],
|
||||
'standard_deviation': [ 1.0, 1.0, 1.0 ]
|
||||
},
|
||||
'inswapper_128':
|
||||
{
|
||||
'type': 'inswapper',
|
||||
'url': 'https://github.com/facefusion/facefusion-assets/releases/download/models/inswapper_128.onnx',
|
||||
'path': resolve_relative_path('../../../models/deepfuze/inswapper_128.onnx'),
|
||||
'template': 'arcface_128_v2',
|
||||
'size': (128, 128),
|
||||
'mean': [ 0.0, 0.0, 0.0 ],
|
||||
'standard_deviation': [ 1.0, 1.0, 1.0 ]
|
||||
},
|
||||
'inswapper_128_fp16':
|
||||
{
|
||||
'type': 'inswapper',
|
||||
'url': 'https://github.com/facefusion/facefusion-assets/releases/download/models/inswapper_128_fp16.onnx',
|
||||
'path': resolve_relative_path('../../../models/deepfuze/inswapper_128_fp16.onnx'),
|
||||
'template': 'arcface_128_v2',
|
||||
'size': (128, 128),
|
||||
'mean': [ 0.0, 0.0, 0.0 ],
|
||||
'standard_deviation': [ 1.0, 1.0, 1.0 ]
|
||||
},
|
||||
'simswap_256':
|
||||
{
|
||||
'type': 'simswap',
|
||||
'url': 'https://github.com/facefusion/facefusion-assets/releases/download/models/simswap_256.onnx',
|
||||
'path': resolve_relative_path('../../../models/deepfuze/simswap_256.onnx'),
|
||||
'template': 'arcface_112_v1',
|
||||
'size': (256, 256),
|
||||
'mean': [ 0.485, 0.456, 0.406 ],
|
||||
'standard_deviation': [ 0.229, 0.224, 0.225 ]
|
||||
},
|
||||
'simswap_512_unofficial':
|
||||
{
|
||||
'type': 'simswap',
|
||||
'url': 'https://github.com/facefusion/facefusion-assets/releases/download/models/simswap_512_unofficial.onnx',
|
||||
'path': resolve_relative_path('../../../models/deepfuze/simswap_512_unofficial.onnx'),
|
||||
'template': 'arcface_112_v1',
|
||||
'size': (512, 512),
|
||||
'mean': [ 0.0, 0.0, 0.0 ],
|
||||
'standard_deviation': [ 1.0, 1.0, 1.0 ]
|
||||
},
|
||||
'uniface_256':
|
||||
{
|
||||
'type': 'uniface',
|
||||
'url': 'https://github.com/facefusion/facefusion-assets/releases/download/models/uniface_256.onnx',
|
||||
'path': resolve_relative_path('../../../models/deepfuze/uniface_256.onnx'),
|
||||
'template': 'ffhq_512',
|
||||
'size': (256, 256),
|
||||
'mean': [ 0.0, 0.0, 0.0 ],
|
||||
'standard_deviation': [ 1.0, 1.0, 1.0 ]
|
||||
}
|
||||
}
|
||||
OPTIONS : Optional[OptionsWithModel] = None
|
||||
|
||||
|
||||
def get_frame_processor() -> Any:
|
||||
global FRAME_PROCESSOR
|
||||
|
||||
with thread_lock():
|
||||
while process_manager.is_checking():
|
||||
sleep(0.5)
|
||||
if FRAME_PROCESSOR is None:
|
||||
model_path = get_options('model').get('path')
|
||||
FRAME_PROCESSOR = onnxruntime.InferenceSession(model_path, providers = apply_execution_provider_options(deepfuze.globals.execution_device_id, deepfuze.globals.execution_providers))
|
||||
return FRAME_PROCESSOR
|
||||
|
||||
|
||||
def clear_frame_processor() -> None:
|
||||
global FRAME_PROCESSOR
|
||||
|
||||
FRAME_PROCESSOR = None
|
||||
|
||||
|
||||
def get_model_initializer() -> Any:
|
||||
global MODEL_INITIALIZER
|
||||
|
||||
with thread_lock():
|
||||
while process_manager.is_checking():
|
||||
sleep(0.5)
|
||||
if MODEL_INITIALIZER is None:
|
||||
model_path = get_options('model').get('path')
|
||||
model = onnx.load(model_path)
|
||||
MODEL_INITIALIZER = numpy_helper.to_array(model.graph.initializer[-1])
|
||||
return MODEL_INITIALIZER
|
||||
|
||||
|
||||
def clear_model_initializer() -> None:
|
||||
global MODEL_INITIALIZER
|
||||
|
||||
MODEL_INITIALIZER = None
|
||||
|
||||
|
||||
def get_options(key : Literal['model']) -> Any:
|
||||
global OPTIONS
|
||||
|
||||
if OPTIONS is None:
|
||||
OPTIONS =\
|
||||
{
|
||||
'model': MODELS[frame_processors_globals.face_swapper_model]
|
||||
}
|
||||
return OPTIONS.get(key)
|
||||
|
||||
|
||||
def set_options(key : Literal['model'], value : Any) -> None:
|
||||
global OPTIONS
|
||||
|
||||
OPTIONS[key] = value
|
||||
|
||||
|
||||
def register_args(program : ArgumentParser) -> None:
|
||||
if has_execution_provider('CoreMLExecutionProvider') or has_execution_provider('OpenVINOExecutionProvider'):
|
||||
face_swapper_model_fallback = 'inswapper_128'
|
||||
else:
|
||||
face_swapper_model_fallback = 'inswapper_128_fp16'
|
||||
program.add_argument('--face-swapper-model', help = wording.get('help.face_swapper_model'), default = config.get_str_value('frame_processors.face_swapper_model', face_swapper_model_fallback), choices = frame_processors_choices.face_swapper_models)
|
||||
|
||||
|
||||
def apply_args(program : ArgumentParser) -> None:
|
||||
args = program.parse_args()
|
||||
frame_processors_globals.face_swapper_model = args.face_swapper_model
|
||||
if args.face_swapper_model == 'blendswap_256':
|
||||
deepfuze.globals.face_recognizer_model = 'arcface_blendswap'
|
||||
if args.face_swapper_model == 'inswapper_128' or args.face_swapper_model == 'inswapper_128_fp16':
|
||||
deepfuze.globals.face_recognizer_model = 'arcface_inswapper'
|
||||
if args.face_swapper_model == 'simswap_256' or args.face_swapper_model == 'simswap_512_unofficial':
|
||||
deepfuze.globals.face_recognizer_model = 'arcface_simswap'
|
||||
if args.face_swapper_model == 'uniface_256':
|
||||
deepfuze.globals.face_recognizer_model = 'arcface_uniface'
|
||||
|
||||
|
||||
def pre_check() -> bool:
|
||||
download_directory_path = resolve_relative_path('../../../models/deepfuze')
|
||||
model_url = get_options('model').get('url')
|
||||
model_path = get_options('model').get('path')
|
||||
|
||||
if not deepfuze.globals.skip_download:
|
||||
process_manager.check()
|
||||
conditional_download(download_directory_path, [ model_url ])
|
||||
process_manager.end()
|
||||
return is_file(model_path)
|
||||
|
||||
|
||||
def post_check() -> bool:
|
||||
model_url = get_options('model').get('url')
|
||||
model_path = get_options('model').get('path')
|
||||
|
||||
if not deepfuze.globals.skip_download and not is_download_done(model_url, model_path):
|
||||
logger.error(wording.get('model_download_not_done') + wording.get('exclamation_mark'), NAME)
|
||||
return False
|
||||
if not is_file(model_path):
|
||||
logger.error(wording.get('model_file_not_present') + wording.get('exclamation_mark'), NAME)
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def pre_process(mode : ProcessMode) -> bool:
|
||||
if not has_image(deepfuze.globals.source_paths):
|
||||
logger.error(wording.get('select_image_source') + wording.get('exclamation_mark'), NAME)
|
||||
return False
|
||||
source_image_paths = filter_image_paths(deepfuze.globals.source_paths)
|
||||
source_frames = read_static_images(source_image_paths)
|
||||
for source_frame in source_frames:
|
||||
if not get_one_face(source_frame):
|
||||
logger.error(wording.get('no_source_face_detected') + wording.get('exclamation_mark'), NAME)
|
||||
return False
|
||||
if mode in [ 'output', 'preview' ] and not is_image(deepfuze.globals.target_path) and not is_video(deepfuze.globals.target_path):
|
||||
logger.error(wording.get('select_image_or_video_target') + wording.get('exclamation_mark'), NAME)
|
||||
return False
|
||||
if mode == 'output' and not normalize_output_path(deepfuze.globals.target_path, deepfuze.globals.output_path):
|
||||
logger.error(wording.get('select_file_or_directory_output') + wording.get('exclamation_mark'), NAME)
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def post_process() -> None:
|
||||
read_static_image.cache_clear()
|
||||
if deepfuze.globals.video_memory_strategy == 'strict' or deepfuze.globals.video_memory_strategy == 'moderate':
|
||||
clear_model_initializer()
|
||||
clear_frame_processor()
|
||||
if deepfuze.globals.video_memory_strategy == 'strict':
|
||||
clear_face_analyser()
|
||||
clear_content_analyser()
|
||||
clear_face_occluder()
|
||||
clear_face_parser()
|
||||
|
||||
|
||||
def swap_face(source_face : Face, target_face : Face, temp_vision_frame : VisionFrame) -> VisionFrame:
|
||||
model_template = get_options('model').get('template')
|
||||
model_size = get_options('model').get('size')
|
||||
crop_vision_frame, affine_matrix = warp_face_by_face_landmark_5(temp_vision_frame, target_face.landmarks.get('5/68'), model_template, model_size)
|
||||
crop_mask_list = []
|
||||
|
||||
if 'box' in deepfuze.globals.face_mask_types:
|
||||
box_mask = create_static_box_mask(crop_vision_frame.shape[:2][::-1], deepfuze.globals.face_mask_blur, deepfuze.globals.face_mask_padding)
|
||||
crop_mask_list.append(box_mask)
|
||||
if 'occlusion' in deepfuze.globals.face_mask_types:
|
||||
occlusion_mask = create_occlusion_mask(crop_vision_frame)
|
||||
crop_mask_list.append(occlusion_mask)
|
||||
crop_vision_frame = prepare_crop_frame(crop_vision_frame)
|
||||
crop_vision_frame = apply_swap(source_face, crop_vision_frame)
|
||||
crop_vision_frame = normalize_crop_frame(crop_vision_frame)
|
||||
if 'region' in deepfuze.globals.face_mask_types:
|
||||
region_mask = create_region_mask(crop_vision_frame, deepfuze.globals.face_mask_regions)
|
||||
crop_mask_list.append(region_mask)
|
||||
crop_mask = numpy.minimum.reduce(crop_mask_list).clip(0, 1)
|
||||
temp_vision_frame = paste_back(temp_vision_frame, crop_vision_frame, crop_mask, affine_matrix)
|
||||
return temp_vision_frame
|
||||
|
||||
|
||||
def apply_swap(source_face : Face, crop_vision_frame : VisionFrame) -> VisionFrame:
|
||||
frame_processor = get_frame_processor()
|
||||
model_type = get_options('model').get('type')
|
||||
frame_processor_inputs = {}
|
||||
|
||||
for frame_processor_input in frame_processor.get_inputs():
|
||||
if frame_processor_input.name == 'source':
|
||||
if model_type == 'blendswap' or model_type == 'uniface':
|
||||
frame_processor_inputs[frame_processor_input.name] = prepare_source_frame(source_face)
|
||||
else:
|
||||
frame_processor_inputs[frame_processor_input.name] = prepare_source_embedding(source_face)
|
||||
if frame_processor_input.name == 'target':
|
||||
frame_processor_inputs[frame_processor_input.name] = crop_vision_frame
|
||||
with conditional_thread_semaphore(deepfuze.globals.execution_providers):
|
||||
crop_vision_frame = frame_processor.run(None, frame_processor_inputs)[0][0]
|
||||
return crop_vision_frame
|
||||
|
||||
|
||||
def prepare_source_frame(source_face : Face) -> VisionFrame:
|
||||
model_type = get_options('model').get('type')
|
||||
source_vision_frame = read_static_image(deepfuze.globals.source_paths[0])
|
||||
if model_type == 'blendswap':
|
||||
source_vision_frame, _ = warp_face_by_face_landmark_5(source_vision_frame, source_face.landmarks.get('5/68'), 'arcface_112_v2', (112, 112))
|
||||
if model_type == 'uniface':
|
||||
source_vision_frame, _ = warp_face_by_face_landmark_5(source_vision_frame, source_face.landmarks.get('5/68'), 'ffhq_512', (256, 256))
|
||||
source_vision_frame = source_vision_frame[:, :, ::-1] / 255.0
|
||||
source_vision_frame = source_vision_frame.transpose(2, 0, 1)
|
||||
source_vision_frame = numpy.expand_dims(source_vision_frame, axis = 0).astype(numpy.float32)
|
||||
return source_vision_frame
|
||||
|
||||
|
||||
def prepare_source_embedding(source_face : Face) -> Embedding:
|
||||
model_type = get_options('model').get('type')
|
||||
if model_type == 'inswapper':
|
||||
model_initializer = get_model_initializer()
|
||||
source_embedding = source_face.embedding.reshape((1, -1))
|
||||
source_embedding = numpy.dot(source_embedding, model_initializer) / numpy.linalg.norm(source_embedding)
|
||||
else:
|
||||
source_embedding = source_face.normed_embedding.reshape(1, -1)
|
||||
return source_embedding
|
||||
|
||||
|
||||
def prepare_crop_frame(crop_vision_frame : VisionFrame) -> VisionFrame:
|
||||
model_mean = get_options('model').get('mean')
|
||||
model_standard_deviation = get_options('model').get('standard_deviation')
|
||||
crop_vision_frame = crop_vision_frame[:, :, ::-1] / 255.0
|
||||
crop_vision_frame = (crop_vision_frame - model_mean) / model_standard_deviation
|
||||
crop_vision_frame = crop_vision_frame.transpose(2, 0, 1)
|
||||
crop_vision_frame = numpy.expand_dims(crop_vision_frame, axis = 0).astype(numpy.float32)
|
||||
return crop_vision_frame
|
||||
|
||||
|
||||
def normalize_crop_frame(crop_vision_frame : VisionFrame) -> VisionFrame:
|
||||
crop_vision_frame = crop_vision_frame.transpose(1, 2, 0)
|
||||
crop_vision_frame = (crop_vision_frame * 255.0).round()
|
||||
crop_vision_frame = crop_vision_frame[:, :, ::-1]
|
||||
return crop_vision_frame
|
||||
|
||||
|
||||
def get_reference_frame(source_face : Face, target_face : Face, temp_vision_frame : VisionFrame) -> VisionFrame:
|
||||
return swap_face(source_face, target_face, temp_vision_frame)
|
||||
|
||||
|
||||
def process_frame(inputs : FaceSwapperInputs) -> VisionFrame:
|
||||
reference_faces = inputs.get('reference_faces')
|
||||
source_face = inputs.get('source_face')
|
||||
target_vision_frame = inputs.get('target_vision_frame')
|
||||
|
||||
if deepfuze.globals.face_selector_mode == 'many':
|
||||
many_faces = get_many_faces(target_vision_frame)
|
||||
if many_faces:
|
||||
for target_face in many_faces:
|
||||
target_vision_frame = swap_face(source_face, target_face, target_vision_frame)
|
||||
if deepfuze.globals.face_selector_mode == 'one':
|
||||
target_face = get_one_face(target_vision_frame)
|
||||
if target_face:
|
||||
target_vision_frame = swap_face(source_face, target_face, target_vision_frame)
|
||||
if deepfuze.globals.face_selector_mode == 'reference':
|
||||
similar_faces = find_similar_faces(reference_faces, target_vision_frame, deepfuze.globals.reference_face_distance)
|
||||
if similar_faces:
|
||||
for similar_face in similar_faces:
|
||||
target_vision_frame = swap_face(source_face, similar_face, target_vision_frame)
|
||||
return target_vision_frame
|
||||
|
||||
|
||||
def process_frames(source_paths : List[str], queue_payloads : List[QueuePayload], update_progress : UpdateProgress) -> None:
|
||||
reference_faces = get_reference_faces() if 'reference' in deepfuze.globals.face_selector_mode else None
|
||||
source_frames = read_static_images(source_paths)
|
||||
source_face = get_average_face(source_frames)
|
||||
|
||||
for queue_payload in process_manager.manage(queue_payloads):
|
||||
target_vision_path = queue_payload['frame_path']
|
||||
target_vision_frame = read_image(target_vision_path)
|
||||
output_vision_frame = process_frame(
|
||||
{
|
||||
'reference_faces': reference_faces,
|
||||
'source_face': source_face,
|
||||
'target_vision_frame': target_vision_frame
|
||||
})
|
||||
write_image(target_vision_path, output_vision_frame)
|
||||
update_progress(1)
|
||||
|
||||
|
||||
def process_image(source_paths : List[str], target_path : str, output_path : str) -> None:
|
||||
reference_faces = get_reference_faces() if 'reference' in deepfuze.globals.face_selector_mode else None
|
||||
source_frames = read_static_images(source_paths)
|
||||
source_face = get_average_face(source_frames)
|
||||
target_vision_frame = read_static_image(target_path)
|
||||
output_vision_frame = process_frame(
|
||||
{
|
||||
'reference_faces': reference_faces,
|
||||
'source_face': source_face,
|
||||
'target_vision_frame': target_vision_frame
|
||||
})
|
||||
write_image(output_path, output_vision_frame)
|
||||
|
||||
|
||||
def process_video(source_paths : List[str], temp_frame_paths : List[str]) -> None:
|
||||
frame_processors.multi_process_frames(source_paths, temp_frame_paths, process_frames)
|
||||
@@ -0,0 +1,241 @@
|
||||
from typing import Any, List, Literal, Optional
|
||||
from argparse import ArgumentParser
|
||||
from time import sleep
|
||||
import cv2
|
||||
import numpy
|
||||
import onnxruntime
|
||||
|
||||
import deepfuze.globals
|
||||
import deepfuze.processors.frame.core as frame_processors
|
||||
from deepfuze import config, process_manager, logger, wording
|
||||
from deepfuze.face_analyser import clear_face_analyser
|
||||
from deepfuze.content_analyser import clear_content_analyser
|
||||
from deepfuze.execution import apply_execution_provider_options
|
||||
from deepfuze.normalizer import normalize_output_path
|
||||
from deepfuze.thread_helper import thread_lock, thread_semaphore
|
||||
from deepfuze.typing import Face, VisionFrame, UpdateProgress, ProcessMode, ModelSet, OptionsWithModel, QueuePayload
|
||||
from deepfuze.common_helper import create_metavar
|
||||
from deepfuze.filesystem import is_file, resolve_relative_path, is_image, is_video
|
||||
from deepfuze.download import conditional_download, is_download_done
|
||||
from deepfuze.vision import read_image, read_static_image, write_image, unpack_resolution
|
||||
from deepfuze.processors.frame.typings import FrameColorizerInputs
|
||||
from deepfuze.processors.frame import globals as frame_processors_globals
|
||||
from deepfuze.processors.frame import choices as frame_processors_choices
|
||||
|
||||
FRAME_PROCESSOR = None
|
||||
NAME = __name__.upper()
|
||||
MODELS : ModelSet =\
|
||||
{
|
||||
'ddcolor':
|
||||
{
|
||||
'type': 'ddcolor',
|
||||
'url': 'https://github.com/facefusion/facefusion-assets/releases/download/models/ddcolor.onnx',
|
||||
'path': resolve_relative_path('../../../models/deepfuze/ddcolor.onnx')
|
||||
},
|
||||
'ddcolor_artistic':
|
||||
{
|
||||
'type': 'ddcolor',
|
||||
'url': 'https://github.com/facefusion/facefusion-assets/releases/download/models/ddcolor_artistic.onnx',
|
||||
'path': resolve_relative_path('../../../models/deepfuze/ddcolor_artistic.onnx')
|
||||
},
|
||||
'deoldify':
|
||||
{
|
||||
'type': 'deoldify',
|
||||
'url': 'https://github.com/facefusion/facefusion-assets/releases/download/models/deoldify.onnx',
|
||||
'path': resolve_relative_path('../../../models/deepfuze/deoldify.onnx')
|
||||
},
|
||||
'deoldify_artistic':
|
||||
{
|
||||
'type': 'deoldify',
|
||||
'url': 'https://github.com/facefusion/facefusion-assets/releases/download/models/deoldify_artistic.onnx',
|
||||
'path': resolve_relative_path('../../../models/deepfuze/deoldify_artistic.onnx')
|
||||
},
|
||||
'deoldify_stable':
|
||||
{
|
||||
'type': 'deoldify',
|
||||
'url': 'https://github.com/facefusion/facefusion-assets/releases/download/models/deoldify_stable.onnx',
|
||||
'path': resolve_relative_path('../../../models/deepfuze/deoldify_stable.onnx')
|
||||
}
|
||||
}
|
||||
OPTIONS : Optional[OptionsWithModel] = None
|
||||
|
||||
|
||||
def get_frame_processor() -> Any:
|
||||
global FRAME_PROCESSOR
|
||||
|
||||
with thread_lock():
|
||||
while process_manager.is_checking():
|
||||
sleep(0.5)
|
||||
if FRAME_PROCESSOR is None:
|
||||
model_path = get_options('model').get('path')
|
||||
FRAME_PROCESSOR = onnxruntime.InferenceSession(model_path, providers = apply_execution_provider_options(deepfuze.globals.execution_device_id, deepfuze.globals.execution_providers))
|
||||
return FRAME_PROCESSOR
|
||||
|
||||
|
||||
def clear_frame_processor() -> None:
|
||||
global FRAME_PROCESSOR
|
||||
|
||||
FRAME_PROCESSOR = None
|
||||
|
||||
|
||||
def get_options(key : Literal['model']) -> Any:
|
||||
global OPTIONS
|
||||
|
||||
if OPTIONS is None:
|
||||
OPTIONS =\
|
||||
{
|
||||
'model': MODELS[frame_processors_globals.frame_colorizer_model]
|
||||
}
|
||||
return OPTIONS.get(key)
|
||||
|
||||
|
||||
def set_options(key : Literal['model'], value : Any) -> None:
|
||||
global OPTIONS
|
||||
|
||||
OPTIONS[key] = value
|
||||
|
||||
|
||||
def register_args(program : ArgumentParser) -> None:
|
||||
program.add_argument('--frame-colorizer-model', help = wording.get('help.frame_colorizer_model'), default = config.get_str_value('frame_processors.frame_colorizer_model', 'ddcolor'), choices = frame_processors_choices.frame_colorizer_models)
|
||||
program.add_argument('--frame-colorizer-blend', help = wording.get('help.frame_colorizer_blend'), type = int, default = config.get_int_value('frame_processors.frame_colorizer_blend', '100'), choices = frame_processors_choices.frame_colorizer_blend_range, metavar = create_metavar(frame_processors_choices.frame_colorizer_blend_range))
|
||||
program.add_argument('--frame-colorizer-size', help = wording.get('help.frame_colorizer_size'), type = str, default = config.get_str_value('frame_processors.frame_colorizer_size', '256x256'), choices = frame_processors_choices.frame_colorizer_sizes)
|
||||
|
||||
|
||||
def apply_args(program : ArgumentParser) -> None:
|
||||
args = program.parse_args()
|
||||
frame_processors_globals.frame_colorizer_model = args.frame_colorizer_model
|
||||
frame_processors_globals.frame_colorizer_blend = args.frame_colorizer_blend
|
||||
frame_processors_globals.frame_colorizer_size = args.frame_colorizer_size
|
||||
|
||||
|
||||
def pre_check() -> bool:
|
||||
download_directory_path = resolve_relative_path('../../../models/deepfuze')
|
||||
model_url = get_options('model').get('url')
|
||||
model_path = get_options('model').get('path')
|
||||
|
||||
if not deepfuze.globals.skip_download:
|
||||
process_manager.check()
|
||||
conditional_download(download_directory_path, [ model_url ])
|
||||
process_manager.end()
|
||||
return is_file(model_path)
|
||||
|
||||
|
||||
def post_check() -> bool:
|
||||
model_url = get_options('model').get('url')
|
||||
model_path = get_options('model').get('path')
|
||||
|
||||
if not deepfuze.globals.skip_download and not is_download_done(model_url, model_path):
|
||||
logger.error(wording.get('model_download_not_done') + wording.get('exclamation_mark'), NAME)
|
||||
return False
|
||||
if not is_file(model_path):
|
||||
logger.error(wording.get('model_file_not_present') + wording.get('exclamation_mark'), NAME)
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def pre_process(mode : ProcessMode) -> bool:
|
||||
if mode in [ 'output', 'preview' ] and not is_image(deepfuze.globals.target_path) and not is_video(deepfuze.globals.target_path):
|
||||
logger.error(wording.get('select_image_or_video_target') + wording.get('exclamation_mark'), NAME)
|
||||
return False
|
||||
if mode == 'output' and not normalize_output_path(deepfuze.globals.target_path, deepfuze.globals.output_path):
|
||||
logger.error(wording.get('select_file_or_directory_output') + wording.get('exclamation_mark'), NAME)
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def post_process() -> None:
|
||||
read_static_image.cache_clear()
|
||||
if deepfuze.globals.video_memory_strategy == 'strict' or deepfuze.globals.video_memory_strategy == 'moderate':
|
||||
clear_frame_processor()
|
||||
if deepfuze.globals.video_memory_strategy == 'strict':
|
||||
clear_face_analyser()
|
||||
clear_content_analyser()
|
||||
|
||||
|
||||
def colorize_frame(temp_vision_frame : VisionFrame) -> VisionFrame:
|
||||
frame_processor = get_frame_processor()
|
||||
prepare_vision_frame = prepare_temp_frame(temp_vision_frame)
|
||||
with thread_semaphore():
|
||||
color_vision_frame = frame_processor.run(None,
|
||||
{
|
||||
frame_processor.get_inputs()[0].name: prepare_vision_frame
|
||||
})[0][0]
|
||||
color_vision_frame = merge_color_frame(temp_vision_frame, color_vision_frame)
|
||||
color_vision_frame = blend_frame(temp_vision_frame, color_vision_frame)
|
||||
return color_vision_frame
|
||||
|
||||
|
||||
def prepare_temp_frame(temp_vision_frame : VisionFrame) -> VisionFrame:
|
||||
model_size = unpack_resolution(frame_processors_globals.frame_colorizer_size)
|
||||
model_type = get_options('model').get('type')
|
||||
temp_vision_frame = cv2.cvtColor(temp_vision_frame, cv2.COLOR_BGR2GRAY)
|
||||
temp_vision_frame = cv2.cvtColor(temp_vision_frame, cv2.COLOR_GRAY2RGB)
|
||||
if model_type == 'ddcolor':
|
||||
temp_vision_frame = (temp_vision_frame / 255.0).astype(numpy.float32)
|
||||
temp_vision_frame = cv2.cvtColor(temp_vision_frame, cv2.COLOR_RGB2LAB)[:, :, :1]
|
||||
temp_vision_frame = numpy.concatenate((temp_vision_frame, numpy.zeros_like(temp_vision_frame), numpy.zeros_like(temp_vision_frame)), axis = -1)
|
||||
temp_vision_frame = cv2.cvtColor(temp_vision_frame, cv2.COLOR_LAB2RGB)
|
||||
temp_vision_frame = cv2.resize(temp_vision_frame, model_size)
|
||||
temp_vision_frame = temp_vision_frame.transpose((2, 0, 1))
|
||||
temp_vision_frame = numpy.expand_dims(temp_vision_frame, axis = 0).astype(numpy.float32)
|
||||
return temp_vision_frame
|
||||
|
||||
|
||||
def merge_color_frame(temp_vision_frame : VisionFrame, color_vision_frame : VisionFrame) -> VisionFrame:
|
||||
model_type = get_options('model').get('type')
|
||||
color_vision_frame = color_vision_frame.transpose(1, 2, 0)
|
||||
color_vision_frame = cv2.resize(color_vision_frame, (temp_vision_frame.shape[1], temp_vision_frame.shape[0]))
|
||||
if model_type == 'ddcolor':
|
||||
temp_vision_frame = (temp_vision_frame / 255.0).astype(numpy.float32)
|
||||
temp_vision_frame = cv2.cvtColor(temp_vision_frame, cv2.COLOR_BGR2LAB)[:, :, :1]
|
||||
color_vision_frame = numpy.concatenate((temp_vision_frame, color_vision_frame), axis = -1)
|
||||
color_vision_frame = cv2.cvtColor(color_vision_frame, cv2.COLOR_LAB2BGR)
|
||||
color_vision_frame = (color_vision_frame * 255.0).round().astype(numpy.uint8)
|
||||
if model_type == 'deoldify':
|
||||
temp_blue_channel, _, _ = cv2.split(temp_vision_frame)
|
||||
color_vision_frame = cv2.cvtColor(color_vision_frame, cv2.COLOR_BGR2RGB).astype(numpy.uint8)
|
||||
color_vision_frame = cv2.cvtColor(color_vision_frame, cv2.COLOR_BGR2LAB)
|
||||
_, color_green_channel, color_red_channel = cv2.split(color_vision_frame)
|
||||
color_vision_frame = cv2.merge((temp_blue_channel, color_green_channel, color_red_channel))
|
||||
color_vision_frame = cv2.cvtColor(color_vision_frame, cv2.COLOR_LAB2BGR)
|
||||
return color_vision_frame
|
||||
|
||||
|
||||
def blend_frame(temp_vision_frame : VisionFrame, paste_vision_frame : VisionFrame) -> VisionFrame:
|
||||
frame_colorizer_blend = 1 - (frame_processors_globals.frame_colorizer_blend / 100)
|
||||
temp_vision_frame = cv2.addWeighted(temp_vision_frame, frame_colorizer_blend, paste_vision_frame, 1 - frame_colorizer_blend, 0)
|
||||
return temp_vision_frame
|
||||
|
||||
|
||||
def get_reference_frame(source_face : Face, target_face : Face, temp_vision_frame : VisionFrame) -> VisionFrame:
|
||||
pass
|
||||
|
||||
|
||||
def process_frame(inputs : FrameColorizerInputs) -> VisionFrame:
|
||||
target_vision_frame = inputs.get('target_vision_frame')
|
||||
return colorize_frame(target_vision_frame)
|
||||
|
||||
|
||||
def process_frames(source_paths : List[str], queue_payloads : List[QueuePayload], update_progress : UpdateProgress) -> None:
|
||||
for queue_payload in process_manager.manage(queue_payloads):
|
||||
target_vision_path = queue_payload['frame_path']
|
||||
target_vision_frame = read_image(target_vision_path)
|
||||
output_vision_frame = process_frame(
|
||||
{
|
||||
'target_vision_frame': target_vision_frame
|
||||
})
|
||||
write_image(target_vision_path, output_vision_frame)
|
||||
update_progress(1)
|
||||
|
||||
|
||||
def process_image(source_paths : List[str], target_path : str, output_path : str) -> None:
|
||||
target_vision_frame = read_static_image(target_path)
|
||||
output_vision_frame = process_frame(
|
||||
{
|
||||
'target_vision_frame': target_vision_frame
|
||||
})
|
||||
write_image(output_path, output_vision_frame)
|
||||
|
||||
|
||||
def process_video(source_paths : List[str], temp_frame_paths : List[str]) -> None:
|
||||
frame_processors.multi_process_frames(None, temp_frame_paths, process_frames)
|
||||
@@ -0,0 +1,263 @@
|
||||
from typing import Any, List, Literal, Optional
|
||||
from argparse import ArgumentParser
|
||||
from time import sleep
|
||||
import cv2
|
||||
import numpy
|
||||
import onnxruntime
|
||||
|
||||
import deepfuze.globals
|
||||
import deepfuze.processors.frame.core as frame_processors
|
||||
from deepfuze import config, process_manager, logger, wording
|
||||
from deepfuze.face_analyser import clear_face_analyser
|
||||
from deepfuze.content_analyser import clear_content_analyser
|
||||
from deepfuze.execution import apply_execution_provider_options
|
||||
from deepfuze.normalizer import normalize_output_path
|
||||
from deepfuze.thread_helper import thread_lock, conditional_thread_semaphore
|
||||
from deepfuze.typing import Face, VisionFrame, UpdateProgress, ProcessMode, ModelSet, OptionsWithModel, QueuePayload
|
||||
from deepfuze.common_helper import create_metavar
|
||||
from deepfuze.filesystem import is_file, resolve_relative_path, is_image, is_video
|
||||
from deepfuze.download import conditional_download, is_download_done
|
||||
from deepfuze.vision import read_image, read_static_image, write_image, merge_tile_frames, create_tile_frames
|
||||
from deepfuze.processors.frame.typings import FrameEnhancerInputs
|
||||
from deepfuze.processors.frame import globals as frame_processors_globals
|
||||
from deepfuze.processors.frame import choices as frame_processors_choices
|
||||
|
||||
FRAME_PROCESSOR = None
|
||||
NAME = __name__.upper()
|
||||
MODELS : ModelSet =\
|
||||
{
|
||||
'clear_reality_x4':
|
||||
{
|
||||
'url': 'https://github.com/facefusion/facefusion-assets/releases/download/models/clear_reality_x4.onnx',
|
||||
'path': resolve_relative_path('../../../models/deepfuze/clear_reality_x4.onnx'),
|
||||
'size': (128, 8, 4),
|
||||
'scale': 4
|
||||
},
|
||||
'lsdir_x4':
|
||||
{
|
||||
'url': 'https://github.com/facefusion/facefusion-assets/releases/download/models/lsdir_x4.onnx',
|
||||
'path': resolve_relative_path('../../../models/deepfuze/lsdir_x4.onnx'),
|
||||
'size': (128, 8, 4),
|
||||
'scale': 4
|
||||
},
|
||||
'nomos8k_sc_x4':
|
||||
{
|
||||
'url': 'https://github.com/facefusion/facefusion-assets/releases/download/models/nomos8k_sc_x4.onnx',
|
||||
'path': resolve_relative_path('../../../models/deepfuze/nomos8k_sc_x4.onnx'),
|
||||
'size': (128, 8, 4),
|
||||
'scale': 4
|
||||
},
|
||||
'real_esrgan_x2':
|
||||
{
|
||||
'url': 'https://github.com/facefusion/facefusion-assets/releases/download/models/real_esrgan_x2.onnx',
|
||||
'path': resolve_relative_path('../../../models/deepfuze/real_esrgan_x2.onnx'),
|
||||
'size': (256, 16, 8),
|
||||
'scale': 2
|
||||
},
|
||||
'real_esrgan_x2_fp16':
|
||||
{
|
||||
'url': 'https://github.com/facefusion/facefusion-assets/releases/download/models/real_esrgan_x2_fp16.onnx',
|
||||
'path': resolve_relative_path('../../../models/deepfuze/real_esrgan_x2_fp16.onnx'),
|
||||
'size': (256, 16, 8),
|
||||
'scale': 2
|
||||
},
|
||||
'real_esrgan_x4':
|
||||
{
|
||||
'url': 'https://github.com/facefusion/facefusion-assets/releases/download/models/real_esrgan_x4.onnx',
|
||||
'path': resolve_relative_path('../../../models/deepfuze/real_esrgan_x4.onnx'),
|
||||
'size': (256, 16, 8),
|
||||
'scale': 4
|
||||
},
|
||||
'real_esrgan_x4_fp16':
|
||||
{
|
||||
'url': 'https://github.com/facefusion/facefusion-assets/releases/download/models/real_esrgan_x4_fp16.onnx',
|
||||
'path': resolve_relative_path('../../../models/deepfuze/real_esrgan_x4_fp16.onnx'),
|
||||
'size': (256, 16, 8),
|
||||
'scale': 4
|
||||
},
|
||||
'real_hatgan_x4':
|
||||
{
|
||||
'url': 'https://github.com/facefusion/facefusion-assets/releases/download/models/real_hatgan_x4.onnx',
|
||||
'path': resolve_relative_path('../../../models/deepfuze/real_hatgan_x4.onnx'),
|
||||
'size': (256, 16, 8),
|
||||
'scale': 4
|
||||
},
|
||||
'span_kendata_x4':
|
||||
{
|
||||
'url': 'https://github.com/facefusion/facefusion-assets/releases/download/models/span_kendata_x4.onnx',
|
||||
'path': resolve_relative_path('../../../models/deepfuze/span_kendata_x4.onnx'),
|
||||
'size': (128, 8, 4),
|
||||
'scale': 4
|
||||
},
|
||||
'ultra_sharp_x4':
|
||||
{
|
||||
'url': 'https://github.com/facefusion/facefusion-assets/releases/download/models/ultra_sharp_x4.onnx',
|
||||
'path': resolve_relative_path('../../../models/deepfuze/ultra_sharp_x4.onnx'),
|
||||
'size': (128, 8, 4),
|
||||
'scale': 4
|
||||
}
|
||||
}
|
||||
OPTIONS : Optional[OptionsWithModel] = None
|
||||
|
||||
|
||||
def get_frame_processor() -> Any:
|
||||
global FRAME_PROCESSOR
|
||||
|
||||
with thread_lock():
|
||||
while process_manager.is_checking():
|
||||
sleep(0.5)
|
||||
if FRAME_PROCESSOR is None:
|
||||
model_path = get_options('model').get('path')
|
||||
FRAME_PROCESSOR = onnxruntime.InferenceSession(model_path, providers = apply_execution_provider_options(deepfuze.globals.execution_device_id, deepfuze.globals.execution_providers))
|
||||
return FRAME_PROCESSOR
|
||||
|
||||
|
||||
def clear_frame_processor() -> None:
|
||||
global FRAME_PROCESSOR
|
||||
|
||||
FRAME_PROCESSOR = None
|
||||
|
||||
|
||||
def get_options(key : Literal['model']) -> Any:
|
||||
global OPTIONS
|
||||
|
||||
if OPTIONS is None:
|
||||
OPTIONS =\
|
||||
{
|
||||
'model': MODELS[frame_processors_globals.frame_enhancer_model]
|
||||
}
|
||||
return OPTIONS.get(key)
|
||||
|
||||
|
||||
def set_options(key : Literal['model'], value : Any) -> None:
|
||||
global OPTIONS
|
||||
|
||||
OPTIONS[key] = value
|
||||
|
||||
|
||||
def register_args(program : ArgumentParser) -> None:
|
||||
program.add_argument('--frame-enhancer-model', help = wording.get('help.frame_enhancer_model'), default = config.get_str_value('frame_processors.frame_enhancer_model', 'span_kendata_x4'), choices = frame_processors_choices.frame_enhancer_models)
|
||||
program.add_argument('--frame-enhancer-blend', help = wording.get('help.frame_enhancer_blend'), type = int, default = config.get_int_value('frame_processors.frame_enhancer_blend', '80'), choices = frame_processors_choices.frame_enhancer_blend_range, metavar = create_metavar(frame_processors_choices.frame_enhancer_blend_range))
|
||||
|
||||
|
||||
def apply_args(program : ArgumentParser) -> None:
|
||||
args = program.parse_args()
|
||||
frame_processors_globals.frame_enhancer_model = args.frame_enhancer_model
|
||||
frame_processors_globals.frame_enhancer_blend = args.frame_enhancer_blend
|
||||
|
||||
|
||||
def pre_check() -> bool:
|
||||
download_directory_path = resolve_relative_path('../../../models/deepfuze')
|
||||
model_url = get_options('model').get('url')
|
||||
model_path = get_options('model').get('path')
|
||||
|
||||
if not deepfuze.globals.skip_download:
|
||||
process_manager.check()
|
||||
conditional_download(download_directory_path, [ model_url ])
|
||||
process_manager.end()
|
||||
return is_file(model_path)
|
||||
|
||||
|
||||
def post_check() -> bool:
|
||||
model_url = get_options('model').get('url')
|
||||
model_path = get_options('model').get('path')
|
||||
|
||||
if not deepfuze.globals.skip_download and not is_download_done(model_url, model_path):
|
||||
logger.error(wording.get('model_download_not_done') + wording.get('exclamation_mark'), NAME)
|
||||
return False
|
||||
if not is_file(model_path):
|
||||
logger.error(wording.get('model_file_not_present') + wording.get('exclamation_mark'), NAME)
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def pre_process(mode : ProcessMode) -> bool:
|
||||
if mode in [ 'output', 'preview' ] and not is_image(deepfuze.globals.target_path) and not is_video(deepfuze.globals.target_path):
|
||||
logger.error(wording.get('select_image_or_video_target') + wording.get('exclamation_mark'), NAME)
|
||||
return False
|
||||
if mode == 'output' and not normalize_output_path(deepfuze.globals.target_path, deepfuze.globals.output_path):
|
||||
logger.error(wording.get('select_file_or_directory_output') + wording.get('exclamation_mark'), NAME)
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def post_process() -> None:
|
||||
read_static_image.cache_clear()
|
||||
if deepfuze.globals.video_memory_strategy == 'strict' or deepfuze.globals.video_memory_strategy == 'moderate':
|
||||
clear_frame_processor()
|
||||
if deepfuze.globals.video_memory_strategy == 'strict':
|
||||
clear_face_analyser()
|
||||
clear_content_analyser()
|
||||
|
||||
|
||||
def enhance_frame(temp_vision_frame : VisionFrame) -> VisionFrame:
|
||||
frame_processor = get_frame_processor()
|
||||
size = get_options('model').get('size')
|
||||
scale = get_options('model').get('scale')
|
||||
temp_height, temp_width = temp_vision_frame.shape[:2]
|
||||
tile_vision_frames, pad_width, pad_height = create_tile_frames(temp_vision_frame, size)
|
||||
|
||||
for index, tile_vision_frame in enumerate(tile_vision_frames):
|
||||
with conditional_thread_semaphore(deepfuze.globals.execution_providers):
|
||||
tile_vision_frame = frame_processor.run(None,
|
||||
{
|
||||
frame_processor.get_inputs()[0].name : prepare_tile_frame(tile_vision_frame)
|
||||
})[0]
|
||||
tile_vision_frames[index] = normalize_tile_frame(tile_vision_frame)
|
||||
merge_vision_frame = merge_tile_frames(tile_vision_frames, temp_width * scale, temp_height * scale, pad_width * scale, pad_height * scale, (size[0] * scale, size[1] * scale, size[2] * scale))
|
||||
temp_vision_frame = blend_frame(temp_vision_frame, merge_vision_frame)
|
||||
return temp_vision_frame
|
||||
|
||||
|
||||
def prepare_tile_frame(vision_tile_frame : VisionFrame) -> VisionFrame:
|
||||
vision_tile_frame = numpy.expand_dims(vision_tile_frame[:, :, ::-1], axis = 0)
|
||||
vision_tile_frame = vision_tile_frame.transpose(0, 3, 1, 2)
|
||||
vision_tile_frame = vision_tile_frame.astype(numpy.float32) / 255
|
||||
return vision_tile_frame
|
||||
|
||||
|
||||
def normalize_tile_frame(vision_tile_frame : VisionFrame) -> VisionFrame:
|
||||
vision_tile_frame = vision_tile_frame.transpose(0, 2, 3, 1).squeeze(0) * 255
|
||||
vision_tile_frame = vision_tile_frame.clip(0, 255).astype(numpy.uint8)[:, :, ::-1]
|
||||
return vision_tile_frame
|
||||
|
||||
|
||||
def blend_frame(temp_vision_frame : VisionFrame, merge_vision_frame : VisionFrame) -> VisionFrame:
|
||||
frame_enhancer_blend = 1 - (frame_processors_globals.frame_enhancer_blend / 100)
|
||||
temp_vision_frame = cv2.resize(temp_vision_frame, (merge_vision_frame.shape[1], merge_vision_frame.shape[0]))
|
||||
temp_vision_frame = cv2.addWeighted(temp_vision_frame, frame_enhancer_blend, merge_vision_frame, 1 - frame_enhancer_blend, 0)
|
||||
return temp_vision_frame
|
||||
|
||||
|
||||
def get_reference_frame(source_face : Face, target_face : Face, temp_vision_frame : VisionFrame) -> VisionFrame:
|
||||
pass
|
||||
|
||||
|
||||
def process_frame(inputs : FrameEnhancerInputs) -> VisionFrame:
|
||||
target_vision_frame = inputs.get('target_vision_frame')
|
||||
return enhance_frame(target_vision_frame)
|
||||
|
||||
|
||||
def process_frames(source_paths : List[str], queue_payloads : List[QueuePayload], update_progress : UpdateProgress) -> None:
|
||||
for queue_payload in process_manager.manage(queue_payloads):
|
||||
target_vision_path = queue_payload['frame_path']
|
||||
target_vision_frame = read_image(target_vision_path)
|
||||
output_vision_frame = process_frame(
|
||||
{
|
||||
'target_vision_frame': target_vision_frame
|
||||
})
|
||||
write_image(target_vision_path, output_vision_frame)
|
||||
update_progress(1)
|
||||
|
||||
|
||||
def process_image(source_paths : List[str], target_path : str, output_path : str) -> None:
|
||||
target_vision_frame = read_static_image(target_path)
|
||||
output_vision_frame = process_frame(
|
||||
{
|
||||
'target_vision_frame': target_vision_frame
|
||||
})
|
||||
write_image(output_path, output_vision_frame)
|
||||
|
||||
|
||||
def process_video(source_paths : List[str], temp_frame_paths : List[str]) -> None:
|
||||
frame_processors.multi_process_frames(None, temp_frame_paths, process_frames)
|
||||
+260
@@ -0,0 +1,260 @@
|
||||
from typing import Any, List, Literal, Optional
|
||||
from argparse import ArgumentParser
|
||||
from time import sleep
|
||||
import cv2
|
||||
import numpy
|
||||
import onnxruntime
|
||||
|
||||
import deepfuze.globals
|
||||
import deepfuze.processors.frame.core as frame_processors
|
||||
from deepfuze import config, process_manager, logger, wording
|
||||
from deepfuze.execution import apply_execution_provider_options
|
||||
from deepfuze.face_analyser import get_one_face, get_many_faces, find_similar_faces, clear_face_analyser
|
||||
from deepfuze.face_masker import create_static_box_mask, create_occlusion_mask, create_mouth_mask, clear_face_occluder, clear_face_parser
|
||||
from deepfuze.face_helper import warp_face_by_face_landmark_5, warp_face_by_bounding_box, paste_back, create_bounding_box_from_face_landmark_68
|
||||
from deepfuze.face_store import get_reference_faces
|
||||
from deepfuze.content_analyser import clear_content_analyser
|
||||
from deepfuze.normalizer import normalize_output_path
|
||||
from deepfuze.thread_helper import thread_lock, conditional_thread_semaphore
|
||||
from deepfuze.typing import Face, VisionFrame, UpdateProgress, ProcessMode, ModelSet, OptionsWithModel, AudioFrame, QueuePayload
|
||||
from deepfuze.filesystem import is_file, has_audio, resolve_relative_path
|
||||
from deepfuze.download import conditional_download, is_download_done
|
||||
from deepfuze.audio import read_static_voice, get_voice_frame, create_empty_audio_frame
|
||||
from deepfuze.filesystem import is_image, is_video, filter_audio_paths
|
||||
from deepfuze.common_helper import get_first
|
||||
from deepfuze.vision import read_image, read_static_image, write_image, restrict_video_fps
|
||||
from deepfuze.processors.frame.typings import LipSyncerInputs
|
||||
from deepfuze.voice_extractor import clear_voice_extractor
|
||||
from deepfuze.processors.frame import globals as frame_processors_globals
|
||||
from deepfuze.processors.frame import choices as frame_processors_choices
|
||||
|
||||
FRAME_PROCESSOR = None
|
||||
NAME = __name__.upper()
|
||||
MODELS : ModelSet =\
|
||||
{
|
||||
'wav2lip_gan':
|
||||
{
|
||||
'url': 'https://github.com/facefusion/facefusion-assets/releases/download/models/wav2lip_gan.onnx',
|
||||
'path': resolve_relative_path('../../../models/deepfuze/wav2lip_gan.onnx')
|
||||
}
|
||||
}
|
||||
OPTIONS : Optional[OptionsWithModel] = None
|
||||
|
||||
|
||||
def get_frame_processor() -> Any:
|
||||
global FRAME_PROCESSOR
|
||||
|
||||
with thread_lock():
|
||||
while process_manager.is_checking():
|
||||
sleep(0.5)
|
||||
if FRAME_PROCESSOR is None:
|
||||
model_path = get_options('model').get('path')
|
||||
FRAME_PROCESSOR = onnxruntime.InferenceSession(model_path, providers = apply_execution_provider_options(deepfuze.globals.execution_device_id, deepfuze.globals.execution_providers))
|
||||
return FRAME_PROCESSOR
|
||||
|
||||
|
||||
def clear_frame_processor() -> None:
|
||||
global FRAME_PROCESSOR
|
||||
|
||||
FRAME_PROCESSOR = None
|
||||
|
||||
|
||||
def get_options(key : Literal['model']) -> Any:
|
||||
global OPTIONS
|
||||
|
||||
if OPTIONS is None:
|
||||
OPTIONS =\
|
||||
{
|
||||
'model': MODELS[frame_processors_globals.lip_syncer_model]
|
||||
}
|
||||
return OPTIONS.get(key)
|
||||
|
||||
|
||||
def set_options(key : Literal['model'], value : Any) -> None:
|
||||
global OPTIONS
|
||||
|
||||
OPTIONS[key] = value
|
||||
|
||||
|
||||
def register_args(program : ArgumentParser) -> None:
|
||||
program.add_argument('--lip-syncer-model', help = wording.get('help.lip_syncer_model'), default = config.get_str_value('frame_processors.lip_syncer_model', 'wav2lip_gan'), choices = frame_processors_choices.lip_syncer_models)
|
||||
|
||||
|
||||
def apply_args(program : ArgumentParser) -> None:
|
||||
args = program.parse_args()
|
||||
frame_processors_globals.lip_syncer_model = args.lip_syncer_model
|
||||
|
||||
|
||||
def pre_check() -> bool:
|
||||
download_directory_path = resolve_relative_path('../../../models/deepfuze')
|
||||
model_url = get_options('model').get('url')
|
||||
model_path = get_options('model').get('path')
|
||||
|
||||
if not deepfuze.globals.skip_download:
|
||||
process_manager.check()
|
||||
conditional_download(download_directory_path, [ model_url ])
|
||||
process_manager.end()
|
||||
return is_file(model_path)
|
||||
|
||||
|
||||
def post_check() -> bool:
|
||||
model_url = get_options('model').get('url')
|
||||
model_path = get_options('model').get('path')
|
||||
|
||||
if not deepfuze.globals.skip_download and not is_download_done(model_url, model_path):
|
||||
logger.error(wording.get('model_download_not_done') + wording.get('exclamation_mark'), NAME)
|
||||
return False
|
||||
if not is_file(model_path):
|
||||
logger.error(wording.get('model_file_not_present') + wording.get('exclamation_mark'), NAME)
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def pre_process(mode : ProcessMode) -> bool:
|
||||
if not has_audio(deepfuze.globals.source_paths):
|
||||
logger.error(wording.get('select_audio_source') + wording.get('exclamation_mark'), NAME)
|
||||
return False
|
||||
if mode in [ 'output', 'preview' ] and not is_image(deepfuze.globals.target_path) and not is_video(deepfuze.globals.target_path):
|
||||
logger.error(wording.get('select_image_or_video_target') + wording.get('exclamation_mark'), NAME)
|
||||
return False
|
||||
if mode == 'output' and not normalize_output_path(deepfuze.globals.target_path, deepfuze.globals.output_path):
|
||||
logger.error(wording.get('select_file_or_directory_output') + wording.get('exclamation_mark'), NAME)
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def post_process() -> None:
|
||||
read_static_image.cache_clear()
|
||||
read_static_voice.cache_clear()
|
||||
if deepfuze.globals.video_memory_strategy == 'strict' or deepfuze.globals.video_memory_strategy == 'moderate':
|
||||
clear_frame_processor()
|
||||
if deepfuze.globals.video_memory_strategy == 'strict':
|
||||
clear_face_analyser()
|
||||
clear_content_analyser()
|
||||
clear_face_occluder()
|
||||
clear_face_parser()
|
||||
clear_voice_extractor()
|
||||
|
||||
|
||||
def sync_lip(target_face : Face, temp_audio_frame : AudioFrame, temp_vision_frame : VisionFrame) -> VisionFrame:
|
||||
frame_processor = get_frame_processor()
|
||||
crop_mask_list = []
|
||||
temp_audio_frame = prepare_audio_frame(temp_audio_frame)
|
||||
crop_vision_frame, affine_matrix = warp_face_by_face_landmark_5(temp_vision_frame, target_face.landmarks.get('5/68'), 'ffhq_512', (512, 512))
|
||||
face_landmark_68 = cv2.transform(target_face.landmarks.get('68').reshape(1, -1, 2), affine_matrix).reshape(-1, 2)
|
||||
bounding_box = create_bounding_box_from_face_landmark_68(face_landmark_68)
|
||||
bounding_box[1] -= numpy.abs(bounding_box[3] - bounding_box[1]) * 0.125
|
||||
mouth_mask = create_mouth_mask(face_landmark_68)
|
||||
crop_mask_list.append(mouth_mask)
|
||||
box_mask = create_static_box_mask(crop_vision_frame.shape[:2][::-1], deepfuze.globals.face_mask_blur, deepfuze.globals.face_mask_padding)
|
||||
crop_mask_list.append(box_mask)
|
||||
|
||||
if 'occlusion' in deepfuze.globals.face_mask_types:
|
||||
occlusion_mask = create_occlusion_mask(crop_vision_frame)
|
||||
crop_mask_list.append(occlusion_mask)
|
||||
close_vision_frame, close_matrix = warp_face_by_bounding_box(crop_vision_frame, bounding_box, (96, 96))
|
||||
close_vision_frame = prepare_crop_frame(close_vision_frame)
|
||||
with conditional_thread_semaphore(deepfuze.globals.execution_providers):
|
||||
close_vision_frame = frame_processor.run(None,
|
||||
{
|
||||
'source': temp_audio_frame,
|
||||
'target': close_vision_frame
|
||||
})[0]
|
||||
crop_vision_frame = normalize_crop_frame(close_vision_frame)
|
||||
crop_vision_frame = cv2.warpAffine(crop_vision_frame, cv2.invertAffineTransform(close_matrix), (512, 512), borderMode = cv2.BORDER_REPLICATE)
|
||||
crop_mask = numpy.minimum.reduce(crop_mask_list)
|
||||
paste_vision_frame = paste_back(temp_vision_frame, crop_vision_frame, crop_mask, affine_matrix)
|
||||
return paste_vision_frame
|
||||
|
||||
|
||||
def prepare_audio_frame(temp_audio_frame : AudioFrame) -> AudioFrame:
|
||||
temp_audio_frame = numpy.maximum(numpy.exp(-5 * numpy.log(10)), temp_audio_frame)
|
||||
temp_audio_frame = numpy.log10(temp_audio_frame) * 1.6 + 3.2
|
||||
temp_audio_frame = temp_audio_frame.clip(-4, 4).astype(numpy.float32)
|
||||
temp_audio_frame = numpy.expand_dims(temp_audio_frame, axis = (0, 1))
|
||||
return temp_audio_frame
|
||||
|
||||
|
||||
def prepare_crop_frame(crop_vision_frame : VisionFrame) -> VisionFrame:
|
||||
crop_vision_frame = numpy.expand_dims(crop_vision_frame, axis = 0)
|
||||
prepare_vision_frame = crop_vision_frame.copy()
|
||||
prepare_vision_frame[:, 48:] = 0
|
||||
crop_vision_frame = numpy.concatenate((prepare_vision_frame, crop_vision_frame), axis = 3)
|
||||
crop_vision_frame = crop_vision_frame.transpose(0, 3, 1, 2).astype('float32') / 255.0
|
||||
return crop_vision_frame
|
||||
|
||||
|
||||
def normalize_crop_frame(crop_vision_frame : VisionFrame) -> VisionFrame:
|
||||
crop_vision_frame = crop_vision_frame[0].transpose(1, 2, 0)
|
||||
crop_vision_frame = crop_vision_frame.clip(0, 1) * 255
|
||||
crop_vision_frame = crop_vision_frame.astype(numpy.uint8)
|
||||
return crop_vision_frame
|
||||
|
||||
|
||||
def get_reference_frame(source_face : Face, target_face : Face, temp_vision_frame : VisionFrame) -> VisionFrame:
|
||||
pass
|
||||
|
||||
|
||||
def process_frame(inputs : LipSyncerInputs) -> VisionFrame:
|
||||
reference_faces = inputs.get('reference_faces')
|
||||
source_audio_frame = inputs.get('source_audio_frame')
|
||||
target_vision_frame = inputs.get('target_vision_frame')
|
||||
|
||||
if deepfuze.globals.face_selector_mode == 'many':
|
||||
many_faces = get_many_faces(target_vision_frame)
|
||||
if many_faces:
|
||||
for target_face in many_faces:
|
||||
target_vision_frame = sync_lip(target_face, source_audio_frame, target_vision_frame)
|
||||
if deepfuze.globals.face_selector_mode == 'one':
|
||||
target_face = get_one_face(target_vision_frame)
|
||||
if target_face:
|
||||
target_vision_frame = sync_lip(target_face, source_audio_frame, target_vision_frame)
|
||||
if deepfuze.globals.face_selector_mode == 'reference':
|
||||
similar_faces = find_similar_faces(reference_faces, target_vision_frame, deepfuze.globals.reference_face_distance)
|
||||
if similar_faces:
|
||||
for similar_face in similar_faces:
|
||||
target_vision_frame = sync_lip(similar_face, source_audio_frame, target_vision_frame)
|
||||
return target_vision_frame
|
||||
|
||||
|
||||
def process_frames(source_paths : List[str], queue_payloads : List[QueuePayload], update_progress : UpdateProgress) -> None:
|
||||
reference_faces = get_reference_faces() if 'reference' in deepfuze.globals.face_selector_mode else None
|
||||
source_audio_path = get_first(filter_audio_paths(source_paths))
|
||||
temp_video_fps = restrict_video_fps(deepfuze.globals.target_path, deepfuze.globals.output_video_fps)
|
||||
|
||||
for queue_payload in process_manager.manage(queue_payloads):
|
||||
frame_number = queue_payload['frame_number']
|
||||
target_vision_path = queue_payload['frame_path']
|
||||
source_audio_frame = get_voice_frame(source_audio_path, temp_video_fps, frame_number)
|
||||
if not numpy.any(source_audio_frame):
|
||||
source_audio_frame = create_empty_audio_frame()
|
||||
target_vision_frame = read_image(target_vision_path)
|
||||
output_vision_frame = process_frame(
|
||||
{
|
||||
'reference_faces': reference_faces,
|
||||
'source_audio_frame': source_audio_frame,
|
||||
'target_vision_frame': target_vision_frame
|
||||
})
|
||||
write_image(target_vision_path, output_vision_frame)
|
||||
update_progress(1)
|
||||
|
||||
|
||||
def process_image(source_paths : List[str], target_path : str, output_path : str) -> None:
|
||||
reference_faces = get_reference_faces() if 'reference' in deepfuze.globals.face_selector_mode else None
|
||||
source_audio_frame = create_empty_audio_frame()
|
||||
target_vision_frame = read_static_image(target_path)
|
||||
output_vision_frame = process_frame(
|
||||
{
|
||||
'reference_faces': reference_faces,
|
||||
'source_audio_frame': source_audio_frame,
|
||||
'target_vision_frame': target_vision_frame
|
||||
})
|
||||
write_image(output_path, output_vision_frame)
|
||||
|
||||
|
||||
def process_video(source_paths : List[str], temp_frame_paths : List[str]) -> None:
|
||||
source_audio_paths = filter_audio_paths(deepfuze.globals.source_paths)
|
||||
temp_video_fps = restrict_video_fps(deepfuze.globals.target_path, deepfuze.globals.output_video_fps)
|
||||
for source_audio_path in source_audio_paths:
|
||||
read_static_voice(source_audio_path, temp_video_fps)
|
||||
frame_processors.multi_process_frames(source_paths, temp_frame_paths, process_frames)
|
||||
@@ -0,0 +1,41 @@
|
||||
from typing import Literal, TypedDict
|
||||
|
||||
from deepfuze.typing import Face, FaceSet, AudioFrame, VisionFrame
|
||||
|
||||
FaceDebuggerItem = Literal['bounding-box', 'face-landmark-5', 'face-landmark-5/68', 'face-landmark-68', 'face-landmark-68/5', 'face-mask', 'face-detector-score', 'face-landmarker-score', 'age', 'gender']
|
||||
FaceEnhancerModel = Literal['codeformer', 'gfpgan_1.2', 'gfpgan_1.3', 'gfpgan_1.4', 'gpen_bfr_256', 'gpen_bfr_512', 'gpen_bfr_1024', 'gpen_bfr_2048', 'restoreformer_plus_plus']
|
||||
FaceSwapperModel = Literal['blendswap_256', 'inswapper_128', 'inswapper_128_fp16', 'simswap_256', 'simswap_512_unofficial', 'uniface_256']
|
||||
FrameColorizerModel = Literal['ddcolor', 'ddcolor_artistic', 'deoldify', 'deoldify_artistic', 'deoldify_stable']
|
||||
FrameEnhancerModel = Literal['clear_reality_x4', 'lsdir_x4', 'nomos8k_sc_x4', 'real_esrgan_x2', 'real_esrgan_x2_fp16', 'real_esrgan_x4', 'real_esrgan_x4_fp16', 'real_hatgan_x4', 'span_kendata_x4', 'ultra_sharp_x4']
|
||||
LipSyncerModel = Literal['wav2lip_gan']
|
||||
|
||||
FaceDebuggerInputs = TypedDict('FaceDebuggerInputs',
|
||||
{
|
||||
'reference_faces' : FaceSet,
|
||||
'target_vision_frame' : VisionFrame
|
||||
})
|
||||
FaceEnhancerInputs = TypedDict('FaceEnhancerInputs',
|
||||
{
|
||||
'reference_faces' : FaceSet,
|
||||
'target_vision_frame' : VisionFrame
|
||||
})
|
||||
FaceSwapperInputs = TypedDict('FaceSwapperInputs',
|
||||
{
|
||||
'reference_faces' : FaceSet,
|
||||
'source_face' : Face,
|
||||
'target_vision_frame' : VisionFrame
|
||||
})
|
||||
FrameColorizerInputs = TypedDict('FrameColorizerInputs',
|
||||
{
|
||||
'target_vision_frame' : VisionFrame
|
||||
})
|
||||
FrameEnhancerInputs = TypedDict('FrameEnhancerInputs',
|
||||
{
|
||||
'target_vision_frame' : VisionFrame
|
||||
})
|
||||
LipSyncerInputs = TypedDict('LipSyncerInputs',
|
||||
{
|
||||
'reference_faces' : FaceSet,
|
||||
'source_audio_frame' : AudioFrame,
|
||||
'target_vision_frame' : VisionFrame
|
||||
})
|
||||
Reference in New Issue
Block a user