face swap node is added

This commit is contained in:
Sam Khoze
2024-06-15 12:31:25 +05:30
committed by GitHub
parent ba29788c66
commit 29d4c6841b
+367
View File
@@ -612,6 +612,371 @@ def load_video_cv(video: str, force_rate: int, force_size: str,
class DeepFuzeFaceSwap:
@classmethod
def INPUT_TYPES(s):
ffmpeg_formats = get_video_formats()
return {
"required": {
"source_images": ("IMAGE",),
"images": ("IMAGE",),
"enhancer": ("None,codeformer,gfpgan_1.2,gfpgan_1.3,gfpgan_1.4,gpen_bfr_256,gpen_bfr_512,gpen_bfr_1024,gpen_bfr_2048,restoreformer_plus_plus".split(","),{"default":'None'}),
"faceswap_model":("blendswap_256,inswapper_128,inswapper_128_fp16,simswap_256,simswap_512_unofficial,uniface_256".split(","),),
"face_mask_padding_left": ("INT",{"default":0,"min":0,"max":30,"step":1}),
"face_mask_padding_right": ("INT",{"default":0,"min":0,"max":30,"step":1}),
"face_mask_padding_bottom": ("INT",{"default":0,"min":0,"max":30,"step":1}),
"face_mask_padding_top": ("INT",{"default":0,"min":0,"max":30,"step":1}),
"device" : (["cpu","gpu"],{"default":"cpu"}),
"frame_rate": (
"FLOAT",
{"default": 25, "min": 1, "step": 1},
),
},
"optional": {
"meta_batch": ("VHS_BatchManager",),
"loop_count": ("INT", {"default": 0, "min": 0, "max": 100, "step": 1}),
"filename_prefix": ("STRING", {"default": "deepfuze"}),
"pingpong": ("BOOLEAN", {"default": False}),
"save_output": ("BOOLEAN", {"default": True}),
},
"hidden": {
"prompt": "PROMPT",
"format": (["image/gif", "image/webp"] + ffmpeg_formats,{"default":"video/h265-mp4"}),
"extra_pnginfo": "EXTRA_PNGINFO",
"unique_id": "UNIQUE_ID"
},
}
RETURN_TYPES = ("IMAGE", "INT", "VHS_AUDIO", "VHS_VIDEOINFO",)
RETURN_NAMES = ("IMAGE", "frame_count", "audio", "video_info",)
# RETURN_TYPES = ("VHS_FILENAMES",)
# RETURN_NAMES = ("Filenames",)
# OUTPUT_NODE = True
CATEGORY = "DeepFuze"
FUNCTION = "faceswampgenerate"
def faceswampgenerate(
self,
source_images,
images,
enhancer,
faceswap_model,
face_mask_padding_left,
face_mask_padding_right,
face_mask_padding_bottom,
face_mask_padding_top,
device,
frame_rate: int,
loop_count: int,
filename_prefix="deepfuze",
format="video/h265-mp4",
pingpong=False,
save_output=True,
prompt=None,
extra_pnginfo=None,
unique_id=None,
manual_format_widgets=None,
meta_batch=None
):
print(len(source_images),len(images))
if isinstance(images, torch.Tensor) and images.size(0) == 0:
return ("",)
pbar = ProgressBar(len(images))
first_image = images[0]
# get output information
output_dir = (
folder_paths.get_output_directory()
if save_output
else folder_paths.get_temp_directory()
)
(
full_output_folder,
filename,
_,
subfolder,
_,
) = folder_paths.get_save_image_path(filename_prefix, output_dir)
output_files = []
metadata = PngInfo()
video_metadata = {}
if prompt is not None:
metadata.add_text("prompt", json.dumps(prompt))
video_metadata["prompt"] = prompt
if extra_pnginfo is not None:
for x in extra_pnginfo:
metadata.add_text(x, json.dumps(extra_pnginfo[x]))
video_metadata[x] = extra_pnginfo[x]
metadata.add_text("CreationTime", datetime.datetime.now().isoformat(" ")[:19])
if meta_batch is not None and unique_id in meta_batch.outputs:
(counter, output_process) = meta_batch.outputs[unique_id]
else:
# comfy counter workaround
max_counter = 0
# Loop through the existing files
matcher = re.compile(f"{re.escape(filename)}_(\\d+)\\D*\\..+", re.IGNORECASE)
for existing_file in os.listdir(full_output_folder):
# Check if the file matches the expected format
match = matcher.fullmatch(existing_file)
if match:
# Extract the numeric portion of the filename
file_counter = int(match.group(1))
# Update the maximum counter value if necessary
if file_counter > max_counter:
max_counter = file_counter
# Increment the counter by 1 to get the next available value
counter = max_counter + 1
output_process = None
# save first frame as png to keep metadata
file = f"{filename}_{counter:05}.png"
file_path = os.path.join(full_output_folder, file)
Image.fromarray(tensor_to_bytes(first_image)).save(
file_path,
pnginfo=metadata,
compress_level=4,
)
output_files.append(file_path)
format_type, format_ext = format.split("/")
print(format_type, format_ext)
if format_type == "image":
if meta_batch is not None:
raise Exception("Pillow('image/') formats are not compatible with batched output")
image_kwargs = {}
if format_ext == "gif":
image_kwargs['disposal'] = 2
if format_ext == "webp":
#Save timestamp information
exif = Image.Exif()
exif[ExifTags.IFD.Exif] = {36867: datetime.datetime.now().isoformat(" ")[:19]}
image_kwargs['exif'] = exif
file = f"{filename}_{counter:05}.{format_ext}"
file_path = os.path.join(full_output_folder, file)
if pingpong:
images = to_pingpong(images)
frames = map(lambda x : Image.fromarray(tensor_to_bytes(x)), images)
# Use pillow directly to save an animated image
next(frames).save(
file_path,
format=format_ext.upper(),
save_all=True,
append_images=frames,
duration=round(1000 / frame_rate),
loop=loop_count,
compress_level=4,
**image_kwargs
)
output_files.append(file_path)
else:
# Use ffmpeg to save a video
if ffmpeg_path is None:
raise ProcessLookupError(f"ffmpeg is required for video outputs and could not be found.\nIn order to use video outputs, you must either:\n- Install imageio-ffmpeg with pip,\n- Place a ffmpeg executable in {os.path.abspath('')}, or\n- Install ffmpeg and add it to the system path.")
#Acquire additional format_widget values
kwargs = None
if manual_format_widgets is None:
if prompt is not None:
kwargs = prompt[unique_id]['inputs']
else:
manual_format_widgets = {}
if kwargs is None:
kwargs = get_format_widget_defaults(format_ext)
missing = {}
for k in kwargs.keys():
if k in manual_format_widgets:
kwargs[k] = manual_format_widgets[k]
else:
missing[k] = kwargs[k]
if len(missing) > 0:
print("Extra format values were not provided, the following defaults will be used: " + str(kwargs) + "\nThis is likely due to usage of ComfyUI-to-python. These values can be manually set by supplying a manual_format_widgets argument")
kwargs["format"] = format
kwargs['pix_fmt'] = 'yuv420p10le'
kwargs['crf'] = 22
kwargs["save_metadata"] = ["save_metadata", "BOOLEAN", {"default": True}]
print(kwargs)
video_format = apply_format_widgets(format_ext, kwargs)
has_alpha = first_image.shape[-1] == 4
dim_alignment = video_format.get("dim_alignment", 8)
if (first_image.shape[1] % dim_alignment) or (first_image.shape[0] % dim_alignment):
#output frames must be padded
to_pad = (-first_image.shape[1] % dim_alignment,
-first_image.shape[0] % dim_alignment)
padding = (to_pad[0]//2, to_pad[0] - to_pad[0]//2,
to_pad[1]//2, to_pad[1] - to_pad[1]//2)
padfunc = torch.nn.ReplicationPad2d(padding)
def pad(image):
image = image.permute((2,0,1))#HWC to CHW
padded = padfunc(image.to(dtype=torch.float32))
return padded.permute((1,2,0))
images = map(pad, images)
new_dims = (-first_image.shape[1] % dim_alignment + first_image.shape[1],
-first_image.shape[0] % dim_alignment + first_image.shape[0])
dimensions = f"{new_dims[0]}x{new_dims[1]}"
print("Output images were not of valid resolution and have had padding applied")
else:
dimensions = f"{first_image.shape[1]}x{first_image.shape[0]}"
if loop_count > 0:
loop_args = ["-vf", "loop=loop=" + str(loop_count)+":size=" + str(len(images))]
else:
loop_args = []
if pingpong:
if meta_batch is not None:
print("pingpong is incompatible with batched output")
images = to_pingpong(images)
if video_format.get('input_color_depth', '8bit') == '16bit':
images = map(tensor_to_shorts, images)
if has_alpha:
i_pix_fmt = 'rgba64'
else:
i_pix_fmt = 'rgb48'
else:
images = map(tensor_to_bytes, images)
if has_alpha:
i_pix_fmt = 'rgba'
else:
i_pix_fmt = 'rgb24'
file = f"{filename}_{counter:05}.{video_format['extension']}"
file_path = os.path.join(full_output_folder, file)
if loop_count > 0:
loop_args = ["-vf", "loop=loop=" + str(loop_count)+":size=" + str(len(images))]
else:
loop_args = []
bitrate_arg = []
bitrate = video_format.get('bitrate')
if bitrate is not None:
bitrate_arg = ["-b:v", str(bitrate) + "M" if video_format.get('megabit') == 'True' else str(bitrate) + "K"]
args = [ffmpeg_path, "-v", "error", "-f", "rawvideo", "-pix_fmt", i_pix_fmt,
"-s", dimensions, "-r", str(frame_rate), "-i", "-"] \
+ loop_args
images = map(lambda x: x.tobytes(), images)
env=os.environ.copy()
if "environment" in video_format:
env.update(video_format["environment"])
if "pre_pass" in video_format:
if meta_batch is not None:
#Performing a prepass requires keeping access to all frames.
#Potential solutions include keeping just output frames in
#memory or using 3 passes with intermediate file, but
#very long gifs probably shouldn't be encouraged
raise Exception("Formats which require a pre_pass are incompatible with Batch Manager.")
images = [b''.join(images)]
os.makedirs(folder_paths.get_temp_directory(), exist_ok=True)
pre_pass_args = args[:13] + video_format['pre_pass']
try:
subprocess.run(pre_pass_args, input=images[0], env=env,
capture_output=True, check=True)
except subprocess.CalledProcessError as e:
raise Exception("An error occurred in the ffmpeg prepass:\n" \
+ e.stderr.decode("utf-8"))
if "inputs_main_pass" in video_format:
args = args[:13] + video_format['inputs_main_pass'] + args[13:]
if output_process is None:
if 'gifski_pass' in video_format:
output_process = gifski_process(args, video_format, file_path, env)
else:
args += video_format['main_pass'] + bitrate_arg
output_process = ffmpeg_process(args, video_format, video_metadata, file_path, env)
#Proceed to first yield
output_process.send(None)
if meta_batch is not None:
meta_batch.outputs[unique_id] = (counter, output_process)
for image in images:
pbar.update(1)
output_process.send(image)
if meta_batch is not None:
requeue_workflow((meta_batch.unique_id, not meta_batch.has_closed_inputs))
if meta_batch is None or meta_batch.has_closed_inputs:
#Close pipe and wait for termination.
try:
total_frames_output = output_process.send(None)
output_process.send(None)
except StopIteration:
pass
if meta_batch is not None:
meta_batch.outputs.pop(unique_id)
if len(meta_batch.outputs) == 0:
meta_batch.reset()
else:
#batch is unfinished
#TODO: Check if empty output breaks other custom nodes
return {"ui": {"unfinished_batch": [True]}, "result": ((save_output, []),)}
output_files.append(file_path)
i = 255. * source_images[0].cpu().numpy()
img = Image.fromarray(np.clip(i, 0, 255).astype(np.uint8))
metadata = None
filename_with_batch_num = filename.replace("%batch_num%", "1")
file = f"{filename_with_batch_num}_{counter:05}_.png"
img.save(os.path.join(full_output_folder, file), pnginfo=metadata)
faceswap_filename = os.path.join(result_dir,f"faceswap_{str(time.time()).replace('.','')}.mp4")
command = [
'python',
'./run.py', # Script to run
'--frame-processors',
"face_swapper",
"-s",
os.path.join(full_output_folder, file),
'-t', # Argument: segmentation path
output_files[-1],
'-o',
faceswap_filename,
"--face-swapper-model",
faceswap_model,
'--face-mask-padding',
str(face_mask_padding_top),
str(face_mask_padding_bottom),
str(face_mask_padding_left),
str(face_mask_padding_right),
'--headless'
]
if device=="gpu":
command.extend(['--execution-providers',"coreml"])
print(command)
result = subprocess.run(command,cwd="custom_nodes/ComfyUI-DeepFuze",stdout=subprocess.PIPE)
# print(result.stdout.splitlines()[-1])
if enhancer!="None":
command = [
'python',
'./run.py', # Script to run
'--frame-processors',
"face_enhancer",
"--face-enhancer-model",
enhancer,
"-t",
faceswap_filename,
'-o',
enhanced_filename,
'--headless'
]
if device=="gpu":
command.extend(['--execution-providers',"coreml"])
print(command)
result = subprocess.run(command,cwd="custom_nodes/ComfyUI-DeepFuze",stdout=subprocess.PIPE)
faceswap_filename = enhanced_filename
print(result.stderr)
return load_video_cv(faceswap_filename,0,'Disabled',512,512,0,0,1)
class DeepFuzeAdavance:
@@ -1168,6 +1533,7 @@ class DeepfuzePreview:
NODE_CLASS_MAPPINGS = {
"DeepFuzeAdavance": DeepFuzeAdavance,
"DeepFuzeFaceSwap": DeepFuzeFaceSwap,
"TTS_generation":TTS_generation,
"LLM_node": LLM_node,
"PlayBackAudio": PlayBackAudio,
@@ -1175,6 +1541,7 @@ NODE_CLASS_MAPPINGS = {
}
NODE_DISPLAY_NAME_MAPPINGS = {
"DeepFuzeAdavance": "DeepFuze Lipsync",
"DeepFuzeFaceSwap": "DeepFuze FaceSwap",
"TTS_generation":"DeepFuze TTS",
"LLM_node": "Openai LLM",
"PlayBackAudio": "Play Audio",