mirror of
https://github.com/facefusion/facefusion.git
synced 2026-05-30 01:19:29 +02:00
Rename Frame to VisionFrame (#346)
This commit is contained in:
@@ -8,7 +8,7 @@ from tqdm import tqdm
|
||||
|
||||
import facefusion.globals
|
||||
from facefusion import wording
|
||||
from facefusion.typing import Frame, ModelValue, Fps
|
||||
from facefusion.typing import VisionFrame, ModelValue, Fps
|
||||
from facefusion.execution_helper import apply_execution_provider_options
|
||||
from facefusion.vision import get_video_frame, count_video_frame_total, read_image, detect_video_fps
|
||||
from facefusion.filesystem import resolve_relative_path
|
||||
@@ -53,7 +53,7 @@ def pre_check() -> bool:
|
||||
return True
|
||||
|
||||
|
||||
def analyse_stream(frame : Frame, video_fps : Fps) -> bool:
|
||||
def analyse_stream(frame : VisionFrame, video_fps : Fps) -> bool:
|
||||
global STREAM_COUNTER
|
||||
|
||||
STREAM_COUNTER = STREAM_COUNTER + 1
|
||||
@@ -62,14 +62,14 @@ def analyse_stream(frame : Frame, video_fps : Fps) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def prepare_frame(frame : Frame) -> Frame:
|
||||
def prepare_frame(frame : VisionFrame) -> VisionFrame:
|
||||
frame = cv2.resize(frame, (224, 224)).astype(numpy.float32)
|
||||
frame -= numpy.array([ 104, 117, 123 ]).astype(numpy.float32)
|
||||
frame = numpy.expand_dims(frame, axis = 0)
|
||||
return frame
|
||||
|
||||
|
||||
def analyse_frame(frame : Frame) -> bool:
|
||||
def analyse_frame(frame : VisionFrame) -> bool:
|
||||
content_analyser = get_content_analyser()
|
||||
frame = prepare_frame(frame)
|
||||
probability = content_analyser.run(None,
|
||||
|
||||
+12
-12
@@ -10,7 +10,7 @@ from facefusion.face_store import get_static_faces, set_static_faces
|
||||
from facefusion.execution_helper import apply_execution_provider_options
|
||||
from facefusion.face_helper import warp_face_by_kps, create_static_anchors, distance_to_kps, distance_to_bbox, apply_nms
|
||||
from facefusion.filesystem import resolve_relative_path
|
||||
from facefusion.typing import Frame, Face, FaceSet, FaceAnalyserOrder, FaceAnalyserAge, FaceAnalyserGender, ModelSet, Bbox, Kps, Score, Embedding
|
||||
from facefusion.typing import VisionFrame, Face, FaceSet, FaceAnalyserOrder, FaceAnalyserAge, FaceAnalyserGender, ModelSet, Bbox, Kps, Score, Embedding
|
||||
from facefusion.vision import resize_frame_resolution, unpack_resolution
|
||||
|
||||
FACE_ANALYSER = None
|
||||
@@ -105,7 +105,7 @@ def pre_check() -> bool:
|
||||
return True
|
||||
|
||||
|
||||
def extract_faces(frame : Frame) -> List[Face]:
|
||||
def extract_faces(frame : VisionFrame) -> List[Face]:
|
||||
face_detector_width, face_detector_height = unpack_resolution(facefusion.globals.face_detector_size)
|
||||
frame_height, frame_width, _ = frame.shape
|
||||
temp_frame = resize_frame_resolution(frame, face_detector_width, face_detector_height)
|
||||
@@ -124,7 +124,7 @@ def extract_faces(frame : Frame) -> List[Face]:
|
||||
return []
|
||||
|
||||
|
||||
def detect_with_retinaface(temp_frame : Frame, temp_frame_height : int, temp_frame_width : int, face_detector_height : int, face_detector_width : int, ratio_height : float, ratio_width : float) -> Tuple[List[Bbox], List[Kps], List[Score]]:
|
||||
def detect_with_retinaface(temp_frame : VisionFrame, temp_frame_height : int, temp_frame_width : int, face_detector_height : int, face_detector_width : int, ratio_height : float, ratio_width : float) -> Tuple[List[Bbox], List[Kps], List[Score]]:
|
||||
face_detector = get_face_analyser().get('face_detector')
|
||||
bbox_list = []
|
||||
kps_list = []
|
||||
@@ -164,7 +164,7 @@ def detect_with_retinaface(temp_frame : Frame, temp_frame_height : int, temp_fra
|
||||
return bbox_list, kps_list, score_list
|
||||
|
||||
|
||||
def detect_with_yoloface(temp_frame : Frame, temp_frame_height : int, temp_frame_width : int, face_detector_height : int, face_detector_width : int, ratio_height : float, ratio_width : float) -> Tuple[List[Bbox], List[Kps], List[Score]]:
|
||||
def detect_with_yoloface(temp_frame : VisionFrame, temp_frame_height : int, temp_frame_width : int, face_detector_height : int, face_detector_width : int, ratio_height : float, ratio_width : float) -> Tuple[List[Bbox], List[Kps], List[Score]]:
|
||||
face_detector = get_face_analyser().get('face_detector')
|
||||
bbox_list = []
|
||||
kps_list = []
|
||||
@@ -206,7 +206,7 @@ def detect_with_yoloface(temp_frame : Frame, temp_frame_height : int, temp_frame
|
||||
return bbox_list, kps_list, score_list
|
||||
|
||||
|
||||
def detect_with_yunet(temp_frame : Frame, temp_frame_height : int, temp_frame_width : int, ratio_height : float, ratio_width : float) -> Tuple[List[Bbox], List[Kps], List[Score]]:
|
||||
def detect_with_yunet(temp_frame : VisionFrame, temp_frame_height : int, temp_frame_width : int, ratio_height : float, ratio_width : float) -> Tuple[List[Bbox], List[Kps], List[Score]]:
|
||||
face_detector = get_face_analyser().get('face_detector')
|
||||
face_detector.setInputSize((temp_frame_width, temp_frame_height))
|
||||
face_detector.setScoreThreshold(facefusion.globals.face_detector_score)
|
||||
@@ -229,7 +229,7 @@ def detect_with_yunet(temp_frame : Frame, temp_frame_height : int, temp_frame_wi
|
||||
return bbox_list, kps_list, score_list
|
||||
|
||||
|
||||
def create_faces(frame : Frame, bbox_list : List[Bbox], kps_list : List[Kps], score_list : List[Score]) -> List[Face]:
|
||||
def create_faces(frame : VisionFrame, bbox_list : List[Bbox], kps_list : List[Kps], score_list : List[Score]) -> List[Face]:
|
||||
faces = []
|
||||
if facefusion.globals.face_detector_score > 0:
|
||||
sort_indices = numpy.argsort(-numpy.array(score_list))
|
||||
@@ -255,7 +255,7 @@ def create_faces(frame : Frame, bbox_list : List[Bbox], kps_list : List[Kps], sc
|
||||
return faces
|
||||
|
||||
|
||||
def calc_embedding(temp_frame : Frame, kps : Kps) -> Tuple[Embedding, Embedding]:
|
||||
def calc_embedding(temp_frame : VisionFrame, kps : Kps) -> Tuple[Embedding, Embedding]:
|
||||
face_recognizer = get_face_analyser().get('face_recognizer')
|
||||
crop_frame, matrix = warp_face_by_kps(temp_frame, kps, 'arcface_112_v2', (112, 112))
|
||||
crop_frame = crop_frame.astype(numpy.float32) / 127.5 - 1
|
||||
@@ -270,7 +270,7 @@ def calc_embedding(temp_frame : Frame, kps : Kps) -> Tuple[Embedding, Embedding]
|
||||
return embedding, normed_embedding
|
||||
|
||||
|
||||
def detect_gender_age(frame : Frame, bbox : Bbox) -> Tuple[int, int]:
|
||||
def detect_gender_age(frame : VisionFrame, bbox : Bbox) -> Tuple[int, int]:
|
||||
gender_age = get_face_analyser().get('gender_age')
|
||||
bbox = bbox.reshape(2, -1)
|
||||
scale = 64 / numpy.subtract(*bbox[::-1]).max()
|
||||
@@ -288,7 +288,7 @@ def detect_gender_age(frame : Frame, bbox : Bbox) -> Tuple[int, int]:
|
||||
return gender, age
|
||||
|
||||
|
||||
def get_one_face(frame : Frame, position : int = 0) -> Optional[Face]:
|
||||
def get_one_face(frame : VisionFrame, position : int = 0) -> Optional[Face]:
|
||||
many_faces = get_many_faces(frame)
|
||||
if many_faces:
|
||||
try:
|
||||
@@ -298,7 +298,7 @@ def get_one_face(frame : Frame, position : int = 0) -> Optional[Face]:
|
||||
return None
|
||||
|
||||
|
||||
def get_average_face(frames : List[Frame], position : int = 0) -> Optional[Face]:
|
||||
def get_average_face(frames : List[VisionFrame], position : int = 0) -> Optional[Face]:
|
||||
average_face = None
|
||||
faces = []
|
||||
embedding_list = []
|
||||
@@ -322,7 +322,7 @@ def get_average_face(frames : List[Frame], position : int = 0) -> Optional[Face]
|
||||
return average_face
|
||||
|
||||
|
||||
def get_many_faces(frame : Frame) -> List[Face]:
|
||||
def get_many_faces(frame : VisionFrame) -> List[Face]:
|
||||
try:
|
||||
faces_cache = get_static_faces(frame)
|
||||
if faces_cache:
|
||||
@@ -341,7 +341,7 @@ def get_many_faces(frame : Frame) -> List[Face]:
|
||||
return []
|
||||
|
||||
|
||||
def find_similar_faces(frame : Frame, reference_faces : FaceSet, face_distance : float) -> List[Face]:
|
||||
def find_similar_faces(frame : VisionFrame, reference_faces : FaceSet, face_distance : float) -> List[Face]:
|
||||
similar_faces : List[Face] = []
|
||||
many_faces = get_many_faces(frame)
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ from functools import lru_cache
|
||||
import cv2
|
||||
import numpy
|
||||
|
||||
from facefusion.typing import Bbox, Kps, Frame, Mask, Matrix, Template
|
||||
from facefusion.typing import Bbox, Kps, VisionFrame, Mask, Matrix, Template
|
||||
|
||||
TEMPLATES : Dict[Template, numpy.ndarray[Any, Any]] =\
|
||||
{
|
||||
@@ -43,14 +43,14 @@ TEMPLATES : Dict[Template, numpy.ndarray[Any, Any]] =\
|
||||
}
|
||||
|
||||
|
||||
def warp_face_by_kps(temp_frame : Frame, kps : Kps, template : Template, crop_size : Size) -> Tuple[Frame, Matrix]:
|
||||
def warp_face_by_kps(temp_frame : VisionFrame, kps : Kps, template : Template, crop_size : Size) -> Tuple[VisionFrame, Matrix]:
|
||||
normed_template = TEMPLATES.get(template) * crop_size
|
||||
affine_matrix = cv2.estimateAffinePartial2D(kps, normed_template, method = cv2.RANSAC, ransacReprojThreshold = 100)[0]
|
||||
crop_frame = cv2.warpAffine(temp_frame, affine_matrix, crop_size, borderMode = cv2.BORDER_REPLICATE, flags = cv2.INTER_AREA)
|
||||
return crop_frame, affine_matrix
|
||||
|
||||
|
||||
def warp_face_by_bbox(temp_frame : Frame, bbox : Bbox, crop_size : Size) -> Tuple[Frame, Matrix]:
|
||||
def warp_face_by_bbox(temp_frame : VisionFrame, bbox : Bbox, crop_size : Size) -> Tuple[VisionFrame, Matrix]:
|
||||
source_kps = numpy.array([[ bbox[0], bbox[1] ], [bbox[2], bbox[1] ], [bbox[0], bbox[3] ]], dtype = numpy.float32)
|
||||
target_kps = numpy.array([[ 0, 0 ], [ crop_size[0], 0 ], [ 0, crop_size[1] ]], dtype = numpy.float32)
|
||||
affine_matrix = cv2.getAffineTransform(source_kps, target_kps)
|
||||
@@ -62,7 +62,7 @@ def warp_face_by_bbox(temp_frame : Frame, bbox : Bbox, crop_size : Size) -> Tupl
|
||||
return crop_frame, affine_matrix
|
||||
|
||||
|
||||
def paste_back(temp_frame : Frame, crop_frame : Frame, crop_mask : Mask, affine_matrix : Matrix) -> Frame:
|
||||
def paste_back(temp_frame : VisionFrame, crop_frame : VisionFrame, crop_mask : Mask, affine_matrix : Matrix) -> VisionFrame:
|
||||
inverse_matrix = cv2.invertAffineTransform(affine_matrix)
|
||||
temp_frame_size = temp_frame.shape[:2][::-1]
|
||||
inverse_crop_mask = cv2.warpAffine(crop_mask, inverse_matrix, temp_frame_size).clip(0, 1)
|
||||
|
||||
@@ -7,7 +7,7 @@ import numpy
|
||||
import onnxruntime
|
||||
|
||||
import facefusion.globals
|
||||
from facefusion.typing import Frame, Mask, Padding, FaceMaskRegion, ModelSet
|
||||
from facefusion.typing import VisionFrame, Mask, Padding, FaceMaskRegion, ModelSet
|
||||
from facefusion.execution_helper import apply_execution_provider_options
|
||||
from facefusion.filesystem import resolve_relative_path
|
||||
from facefusion.download import conditional_download
|
||||
@@ -101,7 +101,7 @@ def create_static_box_mask(crop_size : Size, face_mask_blur : float, face_mask_p
|
||||
return box_mask
|
||||
|
||||
|
||||
def create_occlusion_mask(crop_frame : Frame) -> Mask:
|
||||
def create_occlusion_mask(crop_frame : VisionFrame) -> Mask:
|
||||
face_occluder = get_face_occluder()
|
||||
prepare_frame = cv2.resize(crop_frame, face_occluder.get_inputs()[0].shape[1:3][::-1])
|
||||
prepare_frame = numpy.expand_dims(prepare_frame, axis = 0).astype(numpy.float32) / 255
|
||||
@@ -115,7 +115,7 @@ def create_occlusion_mask(crop_frame : Frame) -> Mask:
|
||||
return occlusion_mask
|
||||
|
||||
|
||||
def create_region_mask(crop_frame : Frame, face_mask_regions : List[FaceMaskRegion]) -> Mask:
|
||||
def create_region_mask(crop_frame : VisionFrame, face_mask_regions : List[FaceMaskRegion]) -> Mask:
|
||||
face_parser = get_face_parser()
|
||||
prepare_frame = cv2.flip(cv2.resize(crop_frame, (512, 512)), 1)
|
||||
prepare_frame = numpy.expand_dims(prepare_frame, axis = 0).astype(numpy.float32)[:, :, ::-1] / 127.5 - 1
|
||||
|
||||
@@ -2,7 +2,7 @@ from typing import Optional, List
|
||||
import hashlib
|
||||
import numpy
|
||||
|
||||
from facefusion.typing import Frame, Face, FaceStore, FaceSet
|
||||
from facefusion.typing import VisionFrame, Face, FaceStore, FaceSet
|
||||
|
||||
FACE_STORE: FaceStore =\
|
||||
{
|
||||
@@ -11,14 +11,14 @@ FACE_STORE: FaceStore =\
|
||||
}
|
||||
|
||||
|
||||
def get_static_faces(frame : Frame) -> Optional[List[Face]]:
|
||||
def get_static_faces(frame : VisionFrame) -> Optional[List[Face]]:
|
||||
frame_hash = create_frame_hash(frame)
|
||||
if frame_hash in FACE_STORE['static_faces']:
|
||||
return FACE_STORE['static_faces'][frame_hash]
|
||||
return None
|
||||
|
||||
|
||||
def set_static_faces(frame : Frame, faces : List[Face]) -> None:
|
||||
def set_static_faces(frame : VisionFrame, faces : List[Face]) -> None:
|
||||
frame_hash = create_frame_hash(frame)
|
||||
if frame_hash:
|
||||
FACE_STORE['static_faces'][frame_hash] = faces
|
||||
@@ -28,7 +28,7 @@ def clear_static_faces() -> None:
|
||||
FACE_STORE['static_faces'] = {}
|
||||
|
||||
|
||||
def create_frame_hash(frame : Frame) -> Optional[str]:
|
||||
def create_frame_hash(frame : VisionFrame) -> Optional[str]:
|
||||
return hashlib.sha1(frame.tobytes()).hexdigest() if numpy.any(frame) else None
|
||||
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ from facefusion import config, wording
|
||||
from facefusion.face_analyser import get_one_face, get_average_face, get_many_faces, find_similar_faces, clear_face_analyser
|
||||
from facefusion.face_store import get_reference_faces
|
||||
from facefusion.content_analyser import clear_content_analyser
|
||||
from facefusion.typing import Face, FaceSet, Frame, Update_Process, ProcessMode
|
||||
from facefusion.typing import Face, FaceSet, VisionFrame, Update_Process, ProcessMode
|
||||
from facefusion.vision import read_image, read_static_image, read_static_images, write_image
|
||||
from facefusion.face_helper import warp_face_by_kps
|
||||
from facefusion.face_masker import create_static_box_mask, create_occlusion_mask, create_region_mask, clear_face_occluder, clear_face_parser
|
||||
@@ -66,7 +66,7 @@ def post_process() -> None:
|
||||
clear_face_parser()
|
||||
|
||||
|
||||
def debug_face(source_face : Face, target_face : Face, reference_faces : FaceSet, temp_frame : Frame) -> Frame:
|
||||
def debug_face(source_face : Face, target_face : Face, reference_faces : FaceSet, temp_frame : VisionFrame) -> VisionFrame:
|
||||
primary_color = (0, 0, 255)
|
||||
secondary_color = (0, 255, 0)
|
||||
bounding_box = target_face.bbox.astype(numpy.int32)
|
||||
@@ -103,11 +103,11 @@ def debug_face(source_face : Face, target_face : Face, reference_faces : FaceSet
|
||||
return temp_frame
|
||||
|
||||
|
||||
def get_reference_frame(source_face : Face, target_face : Face, temp_frame : Frame) -> Frame:
|
||||
def get_reference_frame(source_face : Face, target_face : Face, temp_frame : VisionFrame) -> VisionFrame:
|
||||
pass
|
||||
|
||||
|
||||
def process_frame(source_face : Face, reference_faces : FaceSet, temp_frame : Frame) -> Frame:
|
||||
def process_frame(source_face : Face, reference_faces : FaceSet, temp_frame : VisionFrame) -> VisionFrame:
|
||||
if 'reference' in facefusion.globals.face_selector_mode:
|
||||
similar_faces = find_similar_faces(temp_frame, reference_faces, facefusion.globals.reference_face_distance)
|
||||
if similar_faces:
|
||||
|
||||
@@ -13,7 +13,7 @@ from facefusion.execution_helper import apply_execution_provider_options
|
||||
from facefusion.face_helper import warp_face_by_kps, paste_back
|
||||
from facefusion.content_analyser import clear_content_analyser
|
||||
from facefusion.face_store import get_reference_faces
|
||||
from facefusion.typing import Face, FaceSet, Frame, Update_Process, ProcessMode, ModelSet, OptionsWithModel
|
||||
from facefusion.typing import Face, FaceSet, VisionFrame, Update_Process, ProcessMode, ModelSet, OptionsWithModel
|
||||
from facefusion.common_helper import create_metavar
|
||||
from facefusion.filesystem import is_file, is_image, is_video, resolve_relative_path
|
||||
from facefusion.download import conditional_download, is_download_done
|
||||
@@ -165,7 +165,7 @@ def post_process() -> None:
|
||||
clear_face_occluder()
|
||||
|
||||
|
||||
def enhance_face(target_face: Face, temp_frame : Frame) -> Frame:
|
||||
def enhance_face(target_face: Face, temp_frame : VisionFrame) -> VisionFrame:
|
||||
model_template = get_options('model').get('template')
|
||||
model_size = get_options('model').get('size')
|
||||
crop_frame, affine_matrix = warp_face_by_kps(temp_frame, target_face.kps, model_template, model_size)
|
||||
@@ -184,7 +184,7 @@ def enhance_face(target_face: Face, temp_frame : Frame) -> Frame:
|
||||
return temp_frame
|
||||
|
||||
|
||||
def apply_enhance(crop_frame : Frame) -> Frame:
|
||||
def apply_enhance(crop_frame : VisionFrame) -> VisionFrame:
|
||||
frame_processor = get_frame_processor()
|
||||
frame_processor_inputs = {}
|
||||
|
||||
@@ -199,14 +199,14 @@ def apply_enhance(crop_frame : Frame) -> Frame:
|
||||
return crop_frame
|
||||
|
||||
|
||||
def prepare_crop_frame(crop_frame : Frame) -> Frame:
|
||||
def prepare_crop_frame(crop_frame : VisionFrame) -> VisionFrame:
|
||||
crop_frame = crop_frame[:, :, ::-1] / 255.0
|
||||
crop_frame = (crop_frame - 0.5) / 0.5
|
||||
crop_frame = numpy.expand_dims(crop_frame.transpose(2, 0, 1), axis = 0).astype(numpy.float32)
|
||||
return crop_frame
|
||||
|
||||
|
||||
def normalize_crop_frame(crop_frame : Frame) -> Frame:
|
||||
def normalize_crop_frame(crop_frame : VisionFrame) -> VisionFrame:
|
||||
crop_frame = numpy.clip(crop_frame, -1, 1)
|
||||
crop_frame = (crop_frame + 1) / 2
|
||||
crop_frame = crop_frame.transpose(1, 2, 0)
|
||||
@@ -215,17 +215,17 @@ def normalize_crop_frame(crop_frame : Frame) -> Frame:
|
||||
return crop_frame
|
||||
|
||||
|
||||
def blend_frame(temp_frame : Frame, paste_frame : Frame) -> Frame:
|
||||
def blend_frame(temp_frame : VisionFrame, paste_frame : VisionFrame) -> VisionFrame:
|
||||
face_enhancer_blend = 1 - (frame_processors_globals.face_enhancer_blend / 100)
|
||||
temp_frame = cv2.addWeighted(temp_frame, face_enhancer_blend, paste_frame, 1 - face_enhancer_blend, 0)
|
||||
return temp_frame
|
||||
|
||||
|
||||
def get_reference_frame(source_face : Face, target_face : Face, temp_frame : Frame) -> Frame:
|
||||
def get_reference_frame(source_face : Face, target_face : Face, temp_frame : VisionFrame) -> VisionFrame:
|
||||
return enhance_face(target_face, temp_frame)
|
||||
|
||||
|
||||
def process_frame(source_face : Face, reference_faces : FaceSet, temp_frame : Frame) -> Frame:
|
||||
def process_frame(source_face : Face, reference_faces : FaceSet, temp_frame : VisionFrame) -> VisionFrame:
|
||||
if 'reference' in facefusion.globals.face_selector_mode:
|
||||
similar_faces = find_similar_faces(temp_frame, reference_faces, facefusion.globals.reference_face_distance)
|
||||
if similar_faces:
|
||||
|
||||
@@ -15,7 +15,7 @@ from facefusion.face_analyser import get_one_face, get_average_face, get_many_fa
|
||||
from facefusion.face_helper import warp_face_by_kps, paste_back
|
||||
from facefusion.face_store import get_reference_faces
|
||||
from facefusion.content_analyser import clear_content_analyser
|
||||
from facefusion.typing import Face, FaceSet, Frame, Update_Process, ProcessMode, ModelSet, OptionsWithModel, Embedding
|
||||
from facefusion.typing import Face, FaceSet, VisionFrame, Update_Process, ProcessMode, ModelSet, OptionsWithModel, Embedding
|
||||
from facefusion.filesystem import is_file, is_image, are_images, is_video, resolve_relative_path
|
||||
from facefusion.download import conditional_download, is_download_done
|
||||
from facefusion.vision import read_image, read_static_image, read_static_images, write_image
|
||||
@@ -201,7 +201,7 @@ def post_process() -> None:
|
||||
clear_face_parser()
|
||||
|
||||
|
||||
def swap_face(source_face : Face, target_face : Face, temp_frame : Frame) -> Frame:
|
||||
def swap_face(source_face : Face, target_face : Face, temp_frame : VisionFrame) -> VisionFrame:
|
||||
model_template = get_options('model').get('template')
|
||||
model_size = get_options('model').get('size')
|
||||
crop_frame, affine_matrix = warp_face_by_kps(temp_frame, target_face.kps, model_template, model_size)
|
||||
@@ -221,7 +221,7 @@ def swap_face(source_face : Face, target_face : Face, temp_frame : Frame) -> Fra
|
||||
return temp_frame
|
||||
|
||||
|
||||
def apply_swap(source_face : Face, crop_frame : Frame) -> Frame:
|
||||
def apply_swap(source_face : Face, crop_frame : VisionFrame) -> VisionFrame:
|
||||
frame_processor = get_frame_processor()
|
||||
model_type = get_options('model').get('type')
|
||||
frame_processor_inputs = {}
|
||||
@@ -238,7 +238,7 @@ def apply_swap(source_face : Face, crop_frame : Frame) -> Frame:
|
||||
return crop_frame
|
||||
|
||||
|
||||
def prepare_source_frame(source_face : Face) -> Frame:
|
||||
def prepare_source_frame(source_face : Face) -> VisionFrame:
|
||||
source_frame = read_static_image(facefusion.globals.source_paths[0])
|
||||
source_frame, _ = warp_face_by_kps(source_frame, source_face.kps, 'arcface_112_v2', (112, 112))
|
||||
source_frame = source_frame[:, :, ::-1] / 255.0
|
||||
@@ -258,7 +258,7 @@ def prepare_source_embedding(source_face : Face) -> Embedding:
|
||||
return source_embedding
|
||||
|
||||
|
||||
def prepare_crop_frame(crop_frame : Frame) -> Frame:
|
||||
def prepare_crop_frame(crop_frame : VisionFrame) -> VisionFrame:
|
||||
model_mean = get_options('model').get('mean')
|
||||
model_standard_deviation = get_options('model').get('standard_deviation')
|
||||
crop_frame = crop_frame[:, :, ::-1] / 255.0
|
||||
@@ -268,18 +268,18 @@ def prepare_crop_frame(crop_frame : Frame) -> Frame:
|
||||
return crop_frame
|
||||
|
||||
|
||||
def normalize_crop_frame(crop_frame : Frame) -> Frame:
|
||||
def normalize_crop_frame(crop_frame : VisionFrame) -> VisionFrame:
|
||||
crop_frame = crop_frame.transpose(1, 2, 0)
|
||||
crop_frame = (crop_frame * 255.0).round()
|
||||
crop_frame = crop_frame[:, :, ::-1]
|
||||
return crop_frame
|
||||
|
||||
|
||||
def get_reference_frame(source_face : Face, target_face : Face, temp_frame : Frame) -> Frame:
|
||||
def get_reference_frame(source_face : Face, target_face : Face, temp_frame : VisionFrame) -> VisionFrame:
|
||||
return swap_face(source_face, target_face, temp_frame)
|
||||
|
||||
|
||||
def process_frame(source_face : Face, reference_faces : FaceSet, temp_frame : Frame) -> Frame:
|
||||
def process_frame(source_face : Face, reference_faces : FaceSet, temp_frame : VisionFrame) -> VisionFrame:
|
||||
if 'reference' in facefusion.globals.face_selector_mode:
|
||||
similar_faces = find_similar_faces(temp_frame, reference_faces, facefusion.globals.reference_face_distance)
|
||||
if similar_faces:
|
||||
|
||||
@@ -10,7 +10,7 @@ import facefusion.processors.frame.core as frame_processors
|
||||
from facefusion import config, logger, wording
|
||||
from facefusion.face_analyser import clear_face_analyser
|
||||
from facefusion.content_analyser import clear_content_analyser
|
||||
from facefusion.typing import Face, FaceSet, Frame, Update_Process, ProcessMode, ModelSet, OptionsWithModel
|
||||
from facefusion.typing import Face, FaceSet, VisionFrame, Update_Process, ProcessMode, ModelSet, OptionsWithModel
|
||||
from facefusion.common_helper import create_metavar
|
||||
from facefusion.execution_helper import map_torch_backend
|
||||
from facefusion.filesystem import is_file, resolve_relative_path
|
||||
@@ -137,14 +137,14 @@ def post_process() -> None:
|
||||
clear_content_analyser()
|
||||
|
||||
|
||||
def enhance_frame(temp_frame : Frame) -> Frame:
|
||||
def enhance_frame(temp_frame : VisionFrame) -> VisionFrame:
|
||||
with THREAD_SEMAPHORE:
|
||||
paste_frame, _ = get_frame_processor().enhance(temp_frame)
|
||||
temp_frame = blend_frame(temp_frame, paste_frame)
|
||||
return temp_frame
|
||||
|
||||
|
||||
def blend_frame(temp_frame : Frame, paste_frame : Frame) -> Frame:
|
||||
def blend_frame(temp_frame : VisionFrame, paste_frame : VisionFrame) -> VisionFrame:
|
||||
frame_enhancer_blend = 1 - (frame_processors_globals.frame_enhancer_blend / 100)
|
||||
paste_frame_height, paste_frame_width = paste_frame.shape[0:2]
|
||||
temp_frame = cv2.resize(temp_frame, (paste_frame_width, paste_frame_height))
|
||||
@@ -152,11 +152,11 @@ def blend_frame(temp_frame : Frame, paste_frame : Frame) -> Frame:
|
||||
return temp_frame
|
||||
|
||||
|
||||
def get_reference_frame(source_face : Face, target_face : Face, temp_frame : Frame) -> Frame:
|
||||
def get_reference_frame(source_face : Face, target_face : Face, temp_frame : VisionFrame) -> VisionFrame:
|
||||
pass
|
||||
|
||||
|
||||
def process_frame(source_face : Face, reference_faces : FaceSet, temp_frame : Frame) -> Frame:
|
||||
def process_frame(source_face : Face, reference_faces : FaceSet, temp_frame : VisionFrame) -> VisionFrame:
|
||||
return enhance_frame(temp_frame)
|
||||
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ FaceStore = TypedDict('FaceStore',
|
||||
'static_faces' : FaceSet,
|
||||
'reference_faces': FaceSet
|
||||
})
|
||||
Frame = numpy.ndarray[Any, Any]
|
||||
VisionFrame = numpy.ndarray[Any, Any]
|
||||
Mask = numpy.ndarray[Any, Any]
|
||||
Matrix = numpy.ndarray[Any, Any]
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ from facefusion.face_store import clear_static_faces, clear_reference_faces
|
||||
from facefusion.vision import get_video_frame, read_static_image, normalize_frame_color
|
||||
from facefusion.filesystem import is_image, is_video
|
||||
from facefusion.face_analyser import get_many_faces
|
||||
from facefusion.typing import Frame, FaceSelectorMode
|
||||
from facefusion.typing import VisionFrame, FaceSelectorMode
|
||||
from facefusion.uis.core import get_ui_component, register_ui_component
|
||||
from facefusion.uis.typing import ComponentName
|
||||
|
||||
@@ -147,7 +147,7 @@ def update_reference_position_gallery() -> gradio.Gallery:
|
||||
return gradio.Gallery(value = None)
|
||||
|
||||
|
||||
def extract_gallery_frames(reference_frame : Frame) -> List[Frame]:
|
||||
def extract_gallery_frames(reference_frame : VisionFrame) -> List[VisionFrame]:
|
||||
crop_frames = []
|
||||
faces = get_many_faces(reference_frame)
|
||||
for face in faces:
|
||||
|
||||
@@ -7,7 +7,7 @@ import facefusion.globals
|
||||
from facefusion import wording, logger
|
||||
from facefusion.core import conditional_append_reference_faces
|
||||
from facefusion.face_store import clear_static_faces, get_reference_faces, clear_reference_faces
|
||||
from facefusion.typing import Frame, Face, FaceSet
|
||||
from facefusion.typing import VisionFrame, Face, FaceSet
|
||||
from facefusion.vision import get_video_frame, count_video_frame_total, normalize_frame_color, resize_frame_resolution, read_static_image, read_static_images
|
||||
from facefusion.filesystem import is_image, is_video
|
||||
from facefusion.face_analyser import get_average_face, clear_face_analyser
|
||||
@@ -166,7 +166,7 @@ def update_preview_frame_slider() -> gradio.Slider:
|
||||
return gradio.Slider(value = None, maximum = None, visible = False)
|
||||
|
||||
|
||||
def process_preview_frame(source_face : Face, reference_faces : FaceSet, temp_frame : Frame) -> Frame:
|
||||
def process_preview_frame(source_face : Face, reference_faces : FaceSet, temp_frame : VisionFrame) -> VisionFrame:
|
||||
temp_frame = resize_frame_resolution(temp_frame, 640, 640)
|
||||
if analyse_frame(temp_frame):
|
||||
return cv2.GaussianBlur(temp_frame, (99, 99), 0)
|
||||
|
||||
@@ -12,7 +12,7 @@ from tqdm import tqdm
|
||||
import facefusion.globals
|
||||
from facefusion import logger, wording
|
||||
from facefusion.content_analyser import analyse_stream
|
||||
from facefusion.typing import Frame, Face, Fps
|
||||
from facefusion.typing import VisionFrame, Face, Fps
|
||||
from facefusion.face_analyser import get_average_face
|
||||
from facefusion.processors.frame.core import get_frame_processors_modules, load_frame_processor_module
|
||||
from facefusion.ffmpeg import open_ffmpeg
|
||||
@@ -88,7 +88,7 @@ def listen() -> None:
|
||||
component.change(update, cancels = start_event)
|
||||
|
||||
|
||||
def start(webcam_mode : WebcamMode, webcam_resolution : str, webcam_fps : Fps) -> Generator[Frame, None, None]:
|
||||
def start(webcam_mode : WebcamMode, webcam_resolution : str, webcam_fps : Fps) -> Generator[VisionFrame, None, None]:
|
||||
facefusion.globals.face_selector_mode = 'one'
|
||||
facefusion.globals.face_analyser_order = 'large-small'
|
||||
source_frames = read_static_images(facefusion.globals.source_paths)
|
||||
@@ -114,11 +114,11 @@ def start(webcam_mode : WebcamMode, webcam_resolution : str, webcam_fps : Fps) -
|
||||
yield None
|
||||
|
||||
|
||||
def multi_process_capture(source_face : Face, webcam_capture : cv2.VideoCapture, webcam_fps : Fps) -> Generator[Frame, None, None]:
|
||||
def multi_process_capture(source_face : Face, webcam_capture : cv2.VideoCapture, webcam_fps : Fps) -> Generator[VisionFrame, None, None]:
|
||||
with tqdm(desc = wording.get('processing'), unit = 'frame', ascii = ' =', disable = facefusion.globals.log_level in [ 'warn', 'error' ]) as progress:
|
||||
with ThreadPoolExecutor(max_workers = facefusion.globals.execution_thread_count) as executor:
|
||||
futures = []
|
||||
deque_capture_frames : Deque[Frame] = deque()
|
||||
deque_capture_frames : Deque[VisionFrame] = deque()
|
||||
while webcam_capture and webcam_capture.isOpened():
|
||||
_, capture_frame = webcam_capture.read()
|
||||
if analyse_stream(capture_frame, webcam_fps):
|
||||
@@ -148,7 +148,7 @@ def stop() -> gradio.Image:
|
||||
return gradio.Image(value = None)
|
||||
|
||||
|
||||
def process_stream_frame(source_face : Face, temp_frame : Frame) -> Frame:
|
||||
def process_stream_frame(source_face : Face, temp_frame : VisionFrame) -> VisionFrame:
|
||||
for frame_processor_module in get_frame_processors_modules(facefusion.globals.frame_processors):
|
||||
logger.disable()
|
||||
if frame_processor_module.pre_process('stream'):
|
||||
|
||||
@@ -2,12 +2,12 @@ from typing import Optional, List, Tuple
|
||||
from functools import lru_cache
|
||||
import cv2
|
||||
|
||||
from facefusion.typing import Frame, Resolution
|
||||
from facefusion.typing import VisionFrame, Resolution
|
||||
from facefusion.choices import video_template_sizes
|
||||
from facefusion.filesystem import is_image, is_video
|
||||
|
||||
|
||||
def get_video_frame(video_path : str, frame_number : int = 0) -> Optional[Frame]:
|
||||
def get_video_frame(video_path : str, frame_number : int = 0) -> Optional[VisionFrame]:
|
||||
if is_video(video_path):
|
||||
video_capture = cv2.VideoCapture(video_path)
|
||||
if video_capture.isOpened():
|
||||
@@ -91,7 +91,7 @@ def unpack_resolution(resolution : str) -> Resolution:
|
||||
return width, height
|
||||
|
||||
|
||||
def resize_frame_resolution(frame : Frame, max_width : int, max_height : int) -> Frame:
|
||||
def resize_frame_resolution(frame : VisionFrame, max_width : int, max_height : int) -> VisionFrame:
|
||||
height, width = frame.shape[:2]
|
||||
|
||||
if height > max_height or width > max_width:
|
||||
@@ -102,16 +102,16 @@ def resize_frame_resolution(frame : Frame, max_width : int, max_height : int) ->
|
||||
return frame
|
||||
|
||||
|
||||
def normalize_frame_color(frame : Frame) -> Frame:
|
||||
def normalize_frame_color(frame : VisionFrame) -> VisionFrame:
|
||||
return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||||
|
||||
|
||||
@lru_cache(maxsize = 128)
|
||||
def read_static_image(image_path : str) -> Optional[Frame]:
|
||||
def read_static_image(image_path : str) -> Optional[VisionFrame]:
|
||||
return read_image(image_path)
|
||||
|
||||
|
||||
def read_static_images(image_paths : List[str]) -> Optional[List[Frame]]:
|
||||
def read_static_images(image_paths : List[str]) -> Optional[List[VisionFrame]]:
|
||||
frames = []
|
||||
if image_paths:
|
||||
for image_path in image_paths:
|
||||
@@ -119,13 +119,13 @@ def read_static_images(image_paths : List[str]) -> Optional[List[Frame]]:
|
||||
return frames
|
||||
|
||||
|
||||
def read_image(image_path : str) -> Optional[Frame]:
|
||||
def read_image(image_path : str) -> Optional[VisionFrame]:
|
||||
if is_image(image_path):
|
||||
return cv2.imread(image_path)
|
||||
return None
|
||||
|
||||
|
||||
def write_image(image_path : str, frame : Frame) -> bool:
|
||||
def write_image(image_path : str, frame : VisionFrame) -> bool:
|
||||
if image_path:
|
||||
return cv2.imwrite(image_path, frame)
|
||||
return False
|
||||
|
||||
Reference in New Issue
Block a user