Files
Sam Khoze fe977ce5d7 Deepfuze
update for windows installation
2024-07-21 13:54:53 +05:30

1532 lines
64 KiB
Python

import os
import sys
import json
import subprocess
import numpy as np
import re
import cv2
import time
import itertools
import numpy as np
import datetime
from typing import List
import torch
import psutil
import torchaudio
from PIL import Image, ExifTags
from PIL.PngImagePlugin import PngInfo
from pathlib import Path
from string import Template
from pydub import AudioSegment
from .utils import BIGMAX, DIMMAX, calculate_file_hash, get_sorted_dir_files_from_directory, get_audio, lazy_eval, hash_path, validate_path, strip_path
from PIL import Image, ImageOps
from comfy.utils import common_upscale, ProgressBar
from sys import platform
from scipy.io.wavfile import write
import folder_paths
from .utils import ffmpeg_path, get_audio, hash_path, validate_path, requeue_workflow, gifski_path, calculate_file_hash, strip_path
from comfy.utils import ProgressBar
from .llm_node import LLM_node
from .audio_playback import PlayBackAudio
from .audio_playback import SaveAudio
result_dir = os.path.join(folder_paths.get_output_directory(),"deepfuze")
audio_dir = os.path.join(folder_paths.get_input_directory(),"audio")
try:
os.makedirs(result_dir)
except: pass
try:
os.makedirs(audio_dir)
except: pass
audio_extensions = ['mp3', 'mp4', 'wav', 'ogg']
path_cwd = "ComfyUI/custom_nodes/ComfyUI-DeepFuze" if os.path.isdir("ComfyUI/custom_nodes/ComfyUI-DeepFuze") else "custom_nodes/ComfyUI-DeepFuze"
video_extensions = ['webm', 'mp4', 'mkv', 'gif']
def is_gif(filename) -> bool:
file_parts = filename.split('.')
return len(file_parts) > 1 and file_parts[-1] == "gif"
def target_size(width, height, force_size, custom_width, custom_height) -> tuple[int, int]:
if force_size == "Custom":
return (custom_width, custom_height)
elif force_size == "Custom Height":
force_size = "?x"+str(custom_height)
elif force_size == "Custom Width":
force_size = str(custom_width)+"x?"
if force_size != "Disabled":
force_size = force_size.split("x")
if force_size[0] == "?":
width = (width*int(force_size[1]))//height
#Limit to a multple of 8 for latent conversion
width = int(width)+4 & ~7
height = int(force_size[1])
elif force_size[1] == "?":
height = (height*int(force_size[0]))//width
height = int(height)+4 & ~7
width = int(force_size[0])
else:
width = int(force_size[0])
height = int(force_size[1])
return (width, height)
def cv_frame_generator(video, force_rate, frame_load_cap, skip_first_frames,
select_every_nth, meta_batch=None, unique_id=None):
video_cap = cv2.VideoCapture(strip_path(video))
if not video_cap.isOpened():
raise ValueError(f"{video} could not be loaded with cv.")
pbar = ProgressBar(frame_load_cap) if frame_load_cap > 0 else None
# extract video metadata
fps = video_cap.get(cv2.CAP_PROP_FPS)
width = int(video_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(video_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(video_cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = total_frames / fps
# set video_cap to look at start_index frame
total_frame_count = 0
total_frames_evaluated = -1
frames_added = 0
base_frame_time = 1 / fps
prev_frame = None
if force_rate == 0:
target_frame_time = base_frame_time
else:
target_frame_time = 1/force_rate
yield (width, height, fps, duration, total_frames, target_frame_time)
time_offset=target_frame_time - base_frame_time
while video_cap.isOpened():
if time_offset < target_frame_time:
is_returned = video_cap.grab()
# if didn't return frame, video has ended
if not is_returned:
break
time_offset += base_frame_time
if time_offset < target_frame_time:
continue
time_offset -= target_frame_time
# if not at start_index, skip doing anything with frame
total_frame_count += 1
if total_frame_count <= skip_first_frames:
continue
else:
total_frames_evaluated += 1
# if should not be selected, skip doing anything with frame
if total_frames_evaluated%select_every_nth != 0:
continue
# opencv loads images in BGR format (yuck), so need to convert to RGB for ComfyUI use
# follow up: can videos ever have an alpha channel?
# To my testing: No. opencv has no support for alpha
unused, frame = video_cap.retrieve()
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# convert frame to comfyui's expected format
# TODO: frame contains no exif information. Check if opencv2 has already applied
frame = np.array(frame, dtype=np.float32)
torch.from_numpy(frame).div_(255)
if prev_frame is not None:
inp = yield prev_frame
if inp is not None:
#ensure the finally block is called
return
prev_frame = frame
frames_added += 1
if pbar is not None:
pbar.update_absolute(frames_added, frame_load_cap)
# if cap exists and we've reached it, stop processing frames
if frame_load_cap > 0 and frames_added >= frame_load_cap:
break
if meta_batch is not None:
meta_batch.inputs.pop(unique_id)
meta_batch.has_closed_inputs = True
if prev_frame is not None:
yield prev_frame
def gen_format_widgets(video_format):
for k in video_format:
if k.endswith("_pass"):
for i in range(len(video_format[k])):
if isinstance(video_format[k][i], list):
item = [video_format[k][i]]
yield item
video_format[k][i] = item[0]
else:
if isinstance(video_format[k], list):
item = [video_format[k]]
yield item
video_format[k] = item[0]
def get_video_formats():
formats = []
for format_name in folder_paths.get_filename_list("VHS_video_formats"):
format_name = format_name[:-5]
video_format_path = folder_paths.get_full_path("VHS_video_formats", format_name + ".json")
with open(video_format_path, 'r') as stream:
video_format = json.load(stream)
if "gifski_pass" in video_format and gifski_path is None:
#Skip format
continue
widgets = [w[0] for w in gen_format_widgets(video_format)]
if (len(widgets) > 0):
formats.append(["video/" + format_name, widgets])
else:
formats.append("video/" + format_name)
return formats
def get_format_widget_defaults(format_name):
video_format_path = folder_paths.get_full_path("VHS_video_formats", format_name + ".json")
with open(video_format_path, 'r') as stream:
video_format = json.load(stream)
results = {}
for w in gen_format_widgets(video_format):
if len(w[0]) > 2 and 'default' in w[0][2]:
default = w[0][2]['default']
else:
if type(w[0][1]) is list:
default = w[0][1][0]
else:
#NOTE: This doesn't respect max/min, but should be good enough as a fallback to a fallback to a fallback
default = {"BOOLEAN": False, "INT": 0, "FLOAT": 0, "STRING": ""}[w[0][1]]
results[w[0][0]] = default
return results
def apply_format_widgets(format_name, kwargs):
video_format_path = folder_paths.get_full_path("VHS_video_formats", format_name + ".json")
print(video_format_path)
with open(video_format_path, 'r') as stream:
video_format = json.load(stream)
for w in gen_format_widgets(video_format):
print(w[0][0])
assert(w[0][0] in kwargs)
if len(w[0]) > 3:
w[0] = Template(w[0][3]).substitute(val=kwargs[w[0][0]])
else:
w[0] = str(kwargs[w[0][0]])
return video_format
def tensor_to_int(tensor, bits):
#TODO: investigate benefit of rounding by adding 0.5 before clip/cast
tensor = tensor.cpu().numpy() * (2**bits-1)
return np.clip(tensor, 0, (2**bits-1))
def tensor_to_shorts(tensor):
return tensor_to_int(tensor, 16).astype(np.uint16)
def tensor_to_bytes(tensor):
return tensor_to_int(tensor, 8).astype(np.uint8)
def ffmpeg_process(args, video_format, video_metadata, file_path, env):
res = None
frame_data = yield
total_frames_output = 0
if video_format.get('save_metadata', 'False') != 'False':
os.makedirs(folder_paths.get_temp_directory(), exist_ok=True)
metadata = json.dumps(video_metadata)
metadata_path = os.path.join(folder_paths.get_temp_directory(), "metadata.txt")
#metadata from file should escape = ; # \ and newline
metadata = metadata.replace("\\","\\\\")
metadata = metadata.replace(";","\\;")
metadata = metadata.replace("#","\\#")
metadata = metadata.replace("=","\\=")
metadata = metadata.replace("\n","\\\n")
metadata = "comment=" + metadata
with open(metadata_path, "w") as f:
f.write(";FFMETADATA1\n")
f.write(metadata)
m_args = args[:1] + ["-i", metadata_path] + args[1:] + ["-metadata", "creation_time=now"]
with subprocess.Popen(m_args + [file_path], stderr=subprocess.PIPE,
stdin=subprocess.PIPE, env=env) as proc:
try:
while frame_data is not None:
proc.stdin.write(frame_data)
#TODO: skip flush for increased speed
frame_data = yield
total_frames_output+=1
proc.stdin.flush()
proc.stdin.close()
res = proc.stderr.read()
except BrokenPipeError as e:
err = proc.stderr.read()
#Check if output file exists. If it does, the re-execution
#will also fail. This obscures the cause of the error
#and seems to never occur concurrent to the metadata issue
if os.path.exists(file_path):
raise Exception("An error occurred in the ffmpeg subprocess:\n" \
+ err.decode("utf-8"))
#Res was not set
print(err.decode("utf-8"), end="", file=sys.stderr)
print("An error occurred when saving with metadata")
if res != b'':
with subprocess.Popen(args + [file_path], stderr=subprocess.PIPE,
stdin=subprocess.PIPE, env=env) as proc:
try:
while frame_data is not None:
proc.stdin.write(frame_data)
frame_data = yield
total_frames_output+=1
proc.stdin.flush()
proc.stdin.close()
res = proc.stderr.read()
except BrokenPipeError as e:
res = proc.stderr.read()
raise Exception("An error occurred in the ffmpeg subprocess:\n" \
+ res.decode("utf-8"))
yield total_frames_output
if len(res) > 0:
print(res.decode("utf-8"), end="", file=sys.stderr)
def gifski_process(args, video_format, file_path, env):
frame_data = yield
with subprocess.Popen(args + video_format['main_pass'] + ['-f', 'yuv4mpegpipe', '-'],
stderr=subprocess.PIPE, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, env=env) as procff:
with subprocess.Popen([gifski_path] + video_format['gifski_pass']
+ ['-q', '-o', file_path, '-'], stderr=subprocess.PIPE,
stdin=procff.stdout, stdout=subprocess.PIPE,
env=env) as procgs:
try:
while frame_data is not None:
procff.stdin.write(frame_data)
frame_data = yield
procff.stdin.flush()
procff.stdin.close()
resff = procff.stderr.read()
resgs = procgs.stderr.read()
outgs = procgs.stdout.read()
except BrokenPipeError as e:
procff.stdin.close()
resff = procff.stderr.read()
resgs = procgs.stderr.read()
raise Exception("An error occurred while creating gifski output\n" \
+ "Make sure you are using gifski --version >=1.32.0\nffmpeg: " \
+ resff.decode("utf-8") + '\ngifski: ' + resgs.decode("utf-8"))
if len(resff) > 0:
print(resff.decode("utf-8"), end="", file=sys.stderr)
if len(resgs) > 0:
print(resgs.decode("utf-8"), end="", file=sys.stderr)
#should always be empty as the quiet flag is passed
if len(outgs) > 0:
print(outgs.decode("utf-8"))
def to_pingpong(inp):
if not hasattr(inp, "__getitem__"):
inp = list(inp)
yield from inp
for i in range(len(inp)-2,0,-1):
yield inp[i]
video_extensions = ['webm', 'mp4', 'mkv', 'gif']
def is_gif(filename) -> bool:
file_parts = filename.split('.')
return len(file_parts) > 1 and file_parts[-1] == "gif"
def target_size(width, height, force_size, custom_width, custom_height) -> tuple[int, int]:
if force_size == "Custom":
return (custom_width, custom_height)
elif force_size == "Custom Height":
force_size = "?x"+str(custom_height)
elif force_size == "Custom Width":
force_size = str(custom_width)+"x?"
if force_size != "Disabled":
force_size = force_size.split("x")
if force_size[0] == "?":
width = (width*int(force_size[1]))//height
#Limit to a multple of 8 for latent conversion
width = int(width)+4 & ~7
height = int(force_size[1])
elif force_size[1] == "?":
height = (height*int(force_size[0]))//width
height = int(height)+4 & ~7
width = int(force_size[0])
else:
width = int(force_size[0])
height = int(force_size[1])
return (width, height)
def cv_frame_generator(video, force_rate, frame_load_cap, skip_first_frames,
select_every_nth, meta_batch=None, unique_id=None):
video_cap = cv2.VideoCapture(strip_path(video))
if not video_cap.isOpened():
raise ValueError(f"{video} could not be loaded with cv.")
pbar = ProgressBar(frame_load_cap) if frame_load_cap > 0 else None
# extract video metadata
fps = video_cap.get(cv2.CAP_PROP_FPS)
width = int(video_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(video_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(video_cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = total_frames / fps
# set video_cap to look at start_index frame
total_frame_count = 0
total_frames_evaluated = -1
frames_added = 0
base_frame_time = 1 / fps
prev_frame = None
if force_rate == 0:
target_frame_time = base_frame_time
else:
target_frame_time = 1/force_rate
yield (width, height, fps, duration, total_frames, target_frame_time)
time_offset=target_frame_time - base_frame_time
while video_cap.isOpened():
if time_offset < target_frame_time:
is_returned = video_cap.grab()
# if didn't return frame, video has ended
if not is_returned:
break
time_offset += base_frame_time
if time_offset < target_frame_time:
continue
time_offset -= target_frame_time
# if not at start_index, skip doing anything with frame
total_frame_count += 1
if total_frame_count <= skip_first_frames:
continue
else:
total_frames_evaluated += 1
# if should not be selected, skip doing anything with frame
if total_frames_evaluated%select_every_nth != 0:
continue
# opencv loads images in BGR format (yuck), so need to convert to RGB for ComfyUI use
# follow up: can videos ever have an alpha channel?
# To my testing: No. opencv has no support for alpha
unused, frame = video_cap.retrieve()
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# convert frame to comfyui's expected format
# TODO: frame contains no exif information. Check if opencv2 has already applied
frame = np.array(frame, dtype=np.float32)
torch.from_numpy(frame).div_(255)
if prev_frame is not None:
inp = yield prev_frame
if inp is not None:
#ensure the finally block is called
return
prev_frame = frame
frames_added += 1
if pbar is not None:
pbar.update_absolute(frames_added, frame_load_cap)
# if cap exists and we've reached it, stop processing frames
if frame_load_cap > 0 and frames_added >= frame_load_cap:
break
if meta_batch is not None:
meta_batch.inputs.pop(unique_id)
meta_batch.has_closed_inputs = True
if prev_frame is not None:
yield prev_frame
def load_video_cv(video: str, force_rate: int, force_size: str,
custom_width: int,custom_height: int, frame_load_cap: int,
skip_first_frames: int, select_every_nth: int,
meta_batch=None, unique_id=None, memory_limit_mb=None):
print(meta_batch)
if meta_batch is None or unique_id not in meta_batch.inputs:
gen = cv_frame_generator(video, force_rate, frame_load_cap, skip_first_frames,
select_every_nth, meta_batch, unique_id)
(width, height, fps, duration, total_frames, target_frame_time) = next(gen)
if meta_batch is not None:
meta_batch.inputs[unique_id] = (gen, width, height, fps, duration, total_frames, target_frame_time)
else:
(gen, width, height, fps, duration, total_frames, target_frame_time) = meta_batch.inputs[unique_id]
if memory_limit_mb is not None:
memory_limit *= 2 ** 20
else:
#TODO: verify if garbage collection should be performed here.
#leaves ~128 MB unreserved for safety
memory_limit = (psutil.virtual_memory().available + psutil.swap_memory().free) - 2 ** 27
#space required to load as f32, exist as latent with wiggle room, decode to f32
max_loadable_frames = int(memory_limit//(width*height*3*(4+4+1/10)))
if meta_batch is not None:
if meta_batch.frames_per_batch > max_loadable_frames:
raise RuntimeError(f"Meta Batch set to {meta_batch.frames_per_batch} frames but only {max_loadable_frames} can fit in memory")
gen = itertools.islice(gen, meta_batch.frames_per_batch)
else:
original_gen = gen
gen = itertools.islice(gen, max_loadable_frames)
#Some minor wizardry to eliminate a copy and reduce max memory by a factor of ~2
images = torch.from_numpy(np.fromiter(gen, np.dtype((np.float32, (height, width, 3)))))
if meta_batch is None:
try:
next(original_gen)
raise RuntimeError(f"Memory limit hit after loading {len(images)} frames. Stopping execution.")
except StopIteration:
pass
if len(images) == 0:
raise RuntimeError("No frames generated")
if force_size != "Disabled":
new_size = target_size(width, height, force_size, custom_width, custom_height)
if new_size[0] != width or new_size[1] != height:
s = images.movedim(-1,1)
s = common_upscale(s, new_size[0], new_size[1], "lanczos", "center")
images = s.movedim(1,-1)
#Setup lambda for lazy audio capture
audio = get_audio(video, skip_first_frames * target_frame_time,
frame_load_cap*target_frame_time*select_every_nth)
#Adjust target_frame_time for select_every_nth
target_frame_time *= select_every_nth
video_info = {
"source_fps": fps,
"source_frame_count": total_frames,
"source_duration": duration,
"source_width": width,
"source_height": height,
"loaded_fps": 1/target_frame_time,
"loaded_frame_count": len(images),
"loaded_duration": len(images) * target_frame_time,
"loaded_width": images.shape[2],
"loaded_height": images.shape[1],
}
print("images", type(images))
return (images, len(images), audio, video_info)
class DeepFuzeFaceSwap:
@classmethod
def INPUT_TYPES(s):
ffmpeg_formats = get_video_formats()
return {
"required": {
"source_images": ("IMAGE",),
"target_images": ("IMAGE",),
"enhancer": ("None,codeformer,gfpgan_1.2,gfpgan_1.3,gfpgan_1.4,gpen_bfr_256,gpen_bfr_512,gpen_bfr_1024,gpen_bfr_2048,restoreformer_plus_plus".split(","),{"default":'None'}),
"faceswap_model":("blendswap_256,inswapper_128,inswapper_128_fp16,simswap_256,simswap_512_unofficial,uniface_256".split(","),{"default":"blendswap_256"}),
"frame_enhancer": ("None,clear_reality_x4,lsdir_x4,nomos8k_sc_x4,real_esrgan_x2,real_esrgan_x2_fp16,real_esrgan_x4,real_esrgan_x4_fp16,real_hatgan_x4,span_kendata_x4,ultra_sharp_x4".split(","),{"default":'None'}),
"face_detector_model" : ("retinaface,scrfd,yoloface,yunet".split(","),{"default":"yoloface"}),
"reference_face_index" : ("INT",{"default":0,"min":0,"max":5,"step":1},{"default":0}),
"face_mask_padding_left": ("INT",{"default":0,"min":0,"max":30,"step":1}),
"face_mask_padding_right": ("INT",{"default":0,"min":0,"max":30,"step":1}),
"face_mask_padding_bottom": ("INT",{"default":0,"min":0,"max":30,"step":1}),
"face_mask_padding_top": ("INT",{"default":0,"min":0,"max":30,"step":1}),
"device" : (["cpu","cuda","mps"],{"default":"cpu"}),
"frame_rate": (
"FLOAT",
{"default": 25, "min": 1, "step": 1},
),
},
"optional": {
"audio": ("AUDIO",),
"meta_batch": ("VHS_BatchManager",),
"loop_count": ("INT", {"default": 0, "min": 0, "max": 100, "step": 1}),
"filename_prefix": ("STRING", {"default": "deepfuze"}),
"pingpong": ("BOOLEAN", {"default": False}),
"save_output": ("BOOLEAN", {"default": True}),
},
"hidden": {
"prompt": "PROMPT",
"format": (["image/gif", "image/webp"] + ffmpeg_formats,{"default":"video/h265-mp4"}),
"extra_pnginfo": "EXTRA_PNGINFO",
"unique_id": "UNIQUE_ID"
},
}
RETURN_TYPES = ("IMAGE", "INT", "AUDIO", "VHS_VIDEOINFO",)
RETURN_NAMES = ("IMAGE", "frame_count", "audio", "video_info",)
# RETURN_TYPES = ("VHS_FILENAMES",)
# RETURN_NAMES = ("Filenames",)
# OUTPUT_NODE = True
CATEGORY = "DeepFuze"
FUNCTION = "faceswampgenerate"
def faceswampgenerate(
self,
source_images,
target_images,
enhancer,
faceswap_model,
frame_enhancer,
face_detector_model,
reference_face_index,
face_mask_padding_left,
face_mask_padding_right,
face_mask_padding_bottom,
face_mask_padding_top,
device,
frame_rate: int,
audio="",
loop_count: int = 0,
filename_prefix="deepfuze",
format="video/h265-mp4",
pingpong=False,
save_output=True,
prompt=None,
extra_pnginfo=None,
unique_id=None,
manual_format_widgets=None,
meta_batch=None
):
images = target_images
print(len(source_images),len(images))
if isinstance(images, torch.Tensor) and images.size(0) == 0:
return ("",)
pbar = ProgressBar(len(images))
first_image = images[0]
# get output information
output_dir = (
folder_paths.get_output_directory()
if save_output
else folder_paths.get_temp_directory()
)
(
full_output_folder,
filename,
_,
subfolder,
_,
) = folder_paths.get_save_image_path(filename_prefix, output_dir)
output_files = []
metadata = PngInfo()
video_metadata = {}
if prompt is not None:
metadata.add_text("prompt", json.dumps(prompt))
video_metadata["prompt"] = prompt
if extra_pnginfo is not None:
for x in extra_pnginfo:
metadata.add_text(x, json.dumps(extra_pnginfo[x]))
video_metadata[x] = extra_pnginfo[x]
metadata.add_text("CreationTime", datetime.datetime.now().isoformat(" ")[:19])
if meta_batch is not None and unique_id in meta_batch.outputs:
(counter, output_process) = meta_batch.outputs[unique_id]
else:
# comfy counter workaround
max_counter = 0
# Loop through the existing files
matcher = re.compile(f"{re.escape(filename)}_(\\d+)\\D*\\..+", re.IGNORECASE)
for existing_file in os.listdir(full_output_folder):
# Check if the file matches the expected format
match = matcher.fullmatch(existing_file)
if match:
# Extract the numeric portion of the filename
file_counter = int(match.group(1))
# Update the maximum counter value if necessary
if file_counter > max_counter:
max_counter = file_counter
# Increment the counter by 1 to get the next available value
counter = max_counter + 1
output_process = None
# save first frame as png to keep metadata
file = f"{filename}_{counter:05}.png"
file_path = os.path.join(full_output_folder, file)
Image.fromarray(tensor_to_bytes(first_image)).save(
file_path,
pnginfo=metadata,
compress_level=4,
)
output_files.append(file_path)
format_type, format_ext = format.split("/")
print(format_type, format_ext)
if format_type == "image":
if meta_batch is not None:
raise Exception("Pillow('image/') formats are not compatible with batched output")
image_kwargs = {}
if format_ext == "gif":
image_kwargs['disposal'] = 2
if format_ext == "webp":
#Save timestamp information
exif = Image.Exif()
exif[ExifTags.IFD.Exif] = {36867: datetime.datetime.now().isoformat(" ")[:19]}
image_kwargs['exif'] = exif
file = f"{filename}_{counter:05}.{format_ext}"
file_path = os.path.join(full_output_folder, file)
if pingpong:
images = to_pingpong(images)
frames = map(lambda x : Image.fromarray(tensor_to_bytes(x)), images)
# Use pillow directly to save an animated image
next(frames).save(
file_path,
format=format_ext.upper(),
save_all=True,
append_images=frames,
duration=round(1000 / frame_rate),
loop=loop_count,
compress_level=4,
**image_kwargs
)
output_files.append(file_path)
else:
# Use ffmpeg to save a video
if ffmpeg_path is None:
raise ProcessLookupError(f"ffmpeg is required for video outputs and could not be found.\nIn order to use video outputs, you must either:\n- Install imageio-ffmpeg with pip,\n- Place a ffmpeg executable in {os.path.abspath('')}, or\n- Install ffmpeg and add it to the system path.")
#Acquire additional format_widget values
kwargs = None
if manual_format_widgets is None:
if prompt is not None:
kwargs = prompt[unique_id]['inputs']
else:
manual_format_widgets = {}
if kwargs is None:
kwargs = get_format_widget_defaults(format_ext)
missing = {}
for k in kwargs.keys():
if k in manual_format_widgets:
kwargs[k] = manual_format_widgets[k]
else:
missing[k] = kwargs[k]
if len(missing) > 0:
print("Extra format values were not provided, the following defaults will be used: " + str(kwargs) + "\nThis is likely due to usage of ComfyUI-to-python. These values can be manually set by supplying a manual_format_widgets argument")
kwargs["format"] = format
kwargs['pix_fmt'] = 'yuv420p10le'
kwargs['crf'] = 22
kwargs["save_metadata"] = ["save_metadata", "BOOLEAN", {"default": True}]
print(kwargs)
video_format = apply_format_widgets(format_ext, kwargs)
has_alpha = first_image.shape[-1] == 4
dim_alignment = video_format.get("dim_alignment", 8)
if (first_image.shape[1] % dim_alignment) or (first_image.shape[0] % dim_alignment):
#output frames must be padded
to_pad = (-first_image.shape[1] % dim_alignment,
-first_image.shape[0] % dim_alignment)
padding = (to_pad[0]//2, to_pad[0] - to_pad[0]//2,
to_pad[1]//2, to_pad[1] - to_pad[1]//2)
padfunc = torch.nn.ReplicationPad2d(padding)
def pad(image):
image = image.permute((2,0,1))#HWC to CHW
padded = padfunc(image.to(dtype=torch.float32))
return padded.permute((1,2,0))
images = map(pad, images)
new_dims = (-first_image.shape[1] % dim_alignment + first_image.shape[1],
-first_image.shape[0] % dim_alignment + first_image.shape[0])
dimensions = f"{new_dims[0]}x{new_dims[1]}"
print("Output images were not of valid resolution and have had padding applied")
else:
dimensions = f"{first_image.shape[1]}x{first_image.shape[0]}"
if loop_count > 0:
loop_args = ["-vf", "loop=loop=" + str(loop_count)+":size=" + str(len(images))]
else:
loop_args = []
if pingpong:
if meta_batch is not None:
print("pingpong is incompatible with batched output")
images = to_pingpong(images)
if video_format.get('input_color_depth', '8bit') == '16bit':
images = map(tensor_to_shorts, images)
if has_alpha:
i_pix_fmt = 'rgba64'
else:
i_pix_fmt = 'rgb48'
else:
images = map(tensor_to_bytes, images)
if has_alpha:
i_pix_fmt = 'rgba'
else:
i_pix_fmt = 'rgb24'
file = f"{filename}_{counter:05}.{video_format['extension']}"
file_path = os.path.join(full_output_folder, file)
if loop_count > 0:
loop_args = ["-vf", "loop=loop=" + str(loop_count)+":size=" + str(len(images))]
else:
loop_args = []
bitrate_arg = []
bitrate = video_format.get('bitrate')
if bitrate is not None:
bitrate_arg = ["-b:v", str(bitrate) + "M" if video_format.get('megabit') == 'True' else str(bitrate) + "K"]
args = [ffmpeg_path, "-v", "error", "-f", "rawvideo", "-pix_fmt", i_pix_fmt,
"-s", dimensions, "-r", str(frame_rate), "-i", "-"] \
+ loop_args
images = map(lambda x: x.tobytes(), images)
env=os.environ.copy()
if "environment" in video_format:
env.update(video_format["environment"])
if "pre_pass" in video_format:
if meta_batch is not None:
#Performing a prepass requires keeping access to all frames.
#Potential solutions include keeping just output frames in
#memory or using 3 passes with intermediate file, but
#very long gifs probably shouldn't be encouraged
raise Exception("Formats which require a pre_pass are incompatible with Batch Manager.")
images = [b''.join(images)]
os.makedirs(folder_paths.get_temp_directory(), exist_ok=True)
pre_pass_args = args[:13] + video_format['pre_pass']
try:
subprocess.run(pre_pass_args, input=images[0], env=env,
capture_output=True, check=True)
except subprocess.CalledProcessError as e:
raise Exception("An error occurred in the ffmpeg prepass:\n" \
+ e.stderr.decode("utf-8"))
if "inputs_main_pass" in video_format:
args = args[:13] + video_format['inputs_main_pass'] + args[13:]
if output_process is None:
if 'gifski_pass' in video_format:
output_process = gifski_process(args, video_format, file_path, env)
else:
args += video_format['main_pass'] + bitrate_arg
output_process = ffmpeg_process(args, video_format, video_metadata, file_path, env)
#Proceed to first yield
output_process.send(None)
if meta_batch is not None:
meta_batch.outputs[unique_id] = (counter, output_process)
for image in images:
pbar.update(1)
output_process.send(image)
if meta_batch is not None:
requeue_workflow((meta_batch.unique_id, not meta_batch.has_closed_inputs))
if meta_batch is None or meta_batch.has_closed_inputs:
#Close pipe and wait for termination.
try:
total_frames_output = output_process.send(None)
output_process.send(None)
except StopIteration:
pass
if meta_batch is not None:
meta_batch.outputs.pop(unique_id)
if len(meta_batch.outputs) == 0:
meta_batch.reset()
else:
#batch is unfinished
#TODO: Check if empty output breaks other custom nodes
return {"ui": {"unfinished_batch": [True]}, "result": ((save_output, []),)}
output_files.append(file_path)
i = 255. * source_images[0].cpu().numpy()
img = Image.fromarray(np.clip(i, 0, 255).astype(np.uint8))
metadata = None
filename_with_batch_num = filename.replace("%batch_num%", "1")
file = f"{filename_with_batch_num}_{counter:05}_.png"
img.save(os.path.join(full_output_folder, file), pnginfo=metadata)
faceswap_filename = os.path.join(result_dir,f"faceswap_{str(time.time()).replace('.','')}.mp4")
command = [
'python',
'./run.py', # Script to run
'--frame-processors',
"face_swapper",
"-s",
os.path.join(full_output_folder, file),
'-t', # Argument: segmentation path
output_files[-1],
'--face-detector-model',
face_detector_model,
'-o',
faceswap_filename,
"--face-swapper-model",
faceswap_model,
'--reference-face-position',
str(reference_face_index),
'--face-mask-padding',
str(face_mask_padding_top),
str(face_mask_padding_bottom),
str(face_mask_padding_left),
str(face_mask_padding_right),
'--headless'
]
if device=="cuda":
command.extend(['--execution-providers',"cuda"])
elif device=="mps":
command.extend(['--execution-providers',"coreml"])
print(command)
if platform == "win32":
result = subprocess.run(command,cwd=path_cwd,stdout=subprocess.PIPE)
else:
result = subprocess.run(command,cwd="custom_nodes/ComfyUI-DeepFuze",stdout=subprocess.PIPE)
# audio_file = os.path.join(audio_dir,str(time.time()).replace(".","")+".wav")
# subprocess.run(["ffmpeg","-i",faceswap_filename, audio_file, '-y'])
# ffmpeg -i sample.avi -q:a 0 -map a sample.mp3
# print(result.stdout.splitlines()[-1])
if enhancer!="None":
command = [
'python',
'./run.py', # Script to run
'--frame-processors',
"face_enhancer",
"--face-enhancer-model",
enhancer,
"-t",
faceswap_filename,
'-o',
faceswap_filename,
'--headless'
]
if device=="cuda":
command.extend(['--execution-providers',"cuda"])
elif device=="mps":
command.extend(['--execution-providers',"coreml"])
print(command)
if platform == "win32":
result = subprocess.run(command,cwd=path_cwd,stdout=subprocess.PIPE)
else:
result = subprocess.run(command,cwd="custom_nodes/ComfyUI-DeepFuze",stdout=subprocess.PIPE)
if frame_enhancer!="None":
command = [
'python',
'./run.py', # Script to run
'--frame-processors',
"frame_enhancer",
"--frame-enhancer-model",
frame_enhancer,
"-t",
faceswap_filename,
'-o',
faceswap_filename,
'--headless'
]
print(command)
if device=="cuda":
command.extend(['--execution-providers',"cuda"])
elif device=="mps":
command.extend(['--execution-providers',"coreml"])
if platform == "win32":
result = subprocess.run(command,cwd=path_cwd,stdout=subprocess.PIPE)
else:
result = subprocess.run(command,cwd="custom_nodes/ComfyUI-DeepFuze",stdout=subprocess.PIPE)
# temp_file = "/".join(faceswap_filename.split("/")[:-1]) + "_"+faceswap_filename.split("/")[-1]
# subprocess.run(["ffmpeg","-i",faceswap_filename,"-i",audio_file,"-c","copy","-map","0:v:0","-map","1:a:0",temp_file,'-y'])
# faceswap_filename = temp_file
print(result.stderr)
if audio:
audio_file = os.path.join(audio_dir,str(time.time()).replace(".","")+".wav")
torchaudio.save(audio_file,audio["waveform"][0],audio["sample_rate"])
subprocess.run(f"ffmpeg -i {faceswap_filename} -i {audio_file} -c copy {faceswap_filename.replace('.mp4','_.mp4')} -y".split())
return load_video_cv(faceswap_filename.replace('.mp4','_.mp4'),0,'Disabled',512,512,0,0,1)
return load_video_cv(faceswap_filename,0,'Disabled',512,512,0,0,1)
class DeepFuzeAdavance:
@classmethod
def INPUT_TYPES(s):
ffmpeg_formats = get_video_formats()
return {
"required": {
"images": ("IMAGE",),
"audio": ("AUDIO",),
"enhancer": ("None,codeformer,gfpgan_1.2,gfpgan_1.3,gfpgan_1.4,gpen_bfr_256,gpen_bfr_512,gpen_bfr_1024,gpen_bfr_2048,restoreformer_plus_plus".split(","),{"default":'None'}),
"frame_enhancer": ("None,clear_reality_x4,lsdir_x4,nomos8k_sc_x4,real_esrgan_x2,real_esrgan_x2_fp16,real_esrgan_x4,real_esrgan_x4_fp16,real_hatgan_x4,span_kendata_x4,ultra_sharp_x4".split(","),{"default":'None'}),
"face_mask_padding_left": ("INT",{"default":0,"min":0,"max":30,"step":1}),
"face_mask_padding_right": ("INT",{"default":0,"min":0,"max":30,"step":1}),
"face_mask_padding_bottom": ("INT",{"default":0,"min":0,"max":30,"step":1}),
"face_mask_padding_top": ("INT",{"default":0,"min":0,"max":30,"step":1}),
"trim_frame_start": ("INT",{"default":0,"max":2000},),
"trim_frame_end": ("INT",{"default":0,"max":2000},),
"device" : (["cpu","cuda","mps"],{"default":"cpu"}),
"frame_rate": (
"FLOAT",
{"default": 25, "min": 1, "step": 1},
),
},
"optional": {
"meta_batch": ("VHS_BatchManager",),
"loop_count": ("INT", {"default": 0, "min": 0, "max": 100, "step": 1}),
"filename_prefix": ("STRING", {"default": "deepfuze"}),
"pingpong": ("BOOLEAN", {"default": False}),
"save_output": ("BOOLEAN", {"default": True}),
},
"hidden": {
"prompt": "PROMPT",
"format": (["image/gif", "image/webp"] + ffmpeg_formats,{"default":"video/h265-mp4"}),
"extra_pnginfo": "EXTRA_PNGINFO",
"unique_id": "UNIQUE_ID"
},
}
RETURN_TYPES = ("IMAGE", "INT", "AUDIO", "VHS_VIDEOINFO",)
RETURN_NAMES = ("IMAGE", "frame_count", "audio", "video_info",)
# RETURN_TYPES = ("VHS_FILENAMES",)
# RETURN_NAMES = ("Filenames",)
# OUTPUT_NODE = True
CATEGORY = "DeepFuze"
FUNCTION = "lipsyncgenerate"
def lipsyncgenerate(
self,
images,
audio,
enhancer,
frame_enhancer,
face_mask_padding_left,
face_mask_padding_right,
face_mask_padding_bottom,
face_mask_padding_top,
trim_frame_start,
trim_frame_end,
device,
frame_rate: int,
loop_count: int,
filename_prefix="deepfuze",
format="video/h265-mp4",
pingpong=False,
save_output=True,
prompt=None,
extra_pnginfo=None,
unique_id=None,
manual_format_widgets=None,
meta_batch=None
):
print(enhancer,frame_rate,format)
if isinstance(images, torch.Tensor) and images.size(0) == 0:
return ("",)
pbar = ProgressBar(len(images))
trim_frame_end = len(images)-trim_frame_end
first_image = images[0]
# get output information
output_dir = (
folder_paths.get_output_directory()
if save_output
else folder_paths.get_temp_directory()
)
(
full_output_folder,
filename,
_,
subfolder,
_,
) = folder_paths.get_save_image_path(filename_prefix, output_dir)
output_files = []
metadata = PngInfo()
video_metadata = {}
if prompt is not None:
metadata.add_text("prompt", json.dumps(prompt))
video_metadata["prompt"] = prompt
if extra_pnginfo is not None:
for x in extra_pnginfo:
metadata.add_text(x, json.dumps(extra_pnginfo[x]))
video_metadata[x] = extra_pnginfo[x]
metadata.add_text("CreationTime", datetime.datetime.now().isoformat(" ")[:19])
if meta_batch is not None and unique_id in meta_batch.outputs:
(counter, output_process) = meta_batch.outputs[unique_id]
else:
# comfy counter workaround
max_counter = 0
# Loop through the existing files
matcher = re.compile(f"{re.escape(filename)}_(\\d+)\\D*\\..+", re.IGNORECASE)
for existing_file in os.listdir(full_output_folder):
# Check if the file matches the expected format
match = matcher.fullmatch(existing_file)
if match:
# Extract the numeric portion of the filename
file_counter = int(match.group(1))
# Update the maximum counter value if necessary
if file_counter > max_counter:
max_counter = file_counter
# Increment the counter by 1 to get the next available value
counter = max_counter + 1
output_process = None
# save first frame as png to keep metadata
file = f"{filename}_{counter:05}.png"
file_path = os.path.join(full_output_folder, file)
Image.fromarray(tensor_to_bytes(first_image)).save(
file_path,
pnginfo=metadata,
compress_level=4,
)
output_files.append(file_path)
format_type, format_ext = format.split("/")
print(format_type, format_ext)
if format_type == "image":
if meta_batch is not None:
raise Exception("Pillow('image/') formats are not compatible with batched output")
image_kwargs = {}
if format_ext == "gif":
image_kwargs['disposal'] = 2
if format_ext == "webp":
#Save timestamp information
exif = Image.Exif()
exif[ExifTags.IFD.Exif] = {36867: datetime.datetime.now().isoformat(" ")[:19]}
image_kwargs['exif'] = exif
file = f"{filename}_{counter:05}.{format_ext}"
file_path = os.path.join(full_output_folder, file)
if pingpong:
images = to_pingpong(images)
frames = map(lambda x : Image.fromarray(tensor_to_bytes(x)), images)
# Use pillow directly to save an animated image
next(frames).save(
file_path,
format=format_ext.upper(),
save_all=True,
append_images=frames,
duration=round(1000 / frame_rate),
loop=loop_count,
compress_level=4,
**image_kwargs
)
output_files.append(file_path)
else:
# Use ffmpeg to save a video
if ffmpeg_path is None:
raise ProcessLookupError(f"ffmpeg is required for video outputs and could not be found.\nIn order to use video outputs, you must either:\n- Install imageio-ffmpeg with pip,\n- Place a ffmpeg executable in {os.path.abspath('')}, or\n- Install ffmpeg and add it to the system path.")
#Acquire additional format_widget values
kwargs = None
if manual_format_widgets is None:
if prompt is not None:
kwargs = prompt[unique_id]['inputs']
else:
manual_format_widgets = {}
if kwargs is None:
kwargs = get_format_widget_defaults(format_ext)
missing = {}
for k in kwargs.keys():
if k in manual_format_widgets:
kwargs[k] = manual_format_widgets[k]
else:
missing[k] = kwargs[k]
if len(missing) > 0:
print("Extra format values were not provided, the following defaults will be used: " + str(kwargs) + "\nThis is likely due to usage of ComfyUI-to-python. These values can be manually set by supplying a manual_format_widgets argument")
kwargs["format"] = format
kwargs['pix_fmt'] = 'yuv420p10le'
kwargs['crf'] = 22
kwargs["save_metadata"] = ["save_metadata", "BOOLEAN", {"default": True}]
print(kwargs)
video_format = apply_format_widgets(format_ext, kwargs)
has_alpha = first_image.shape[-1] == 4
dim_alignment = video_format.get("dim_alignment", 8)
if (first_image.shape[1] % dim_alignment) or (first_image.shape[0] % dim_alignment):
#output frames must be padded
to_pad = (-first_image.shape[1] % dim_alignment,
-first_image.shape[0] % dim_alignment)
padding = (to_pad[0]//2, to_pad[0] - to_pad[0]//2,
to_pad[1]//2, to_pad[1] - to_pad[1]//2)
padfunc = torch.nn.ReplicationPad2d(padding)
def pad(image):
image = image.permute((2,0,1))#HWC to CHW
padded = padfunc(image.to(dtype=torch.float32))
return padded.permute((1,2,0))
images = map(pad, images)
new_dims = (-first_image.shape[1] % dim_alignment + first_image.shape[1],
-first_image.shape[0] % dim_alignment + first_image.shape[0])
dimensions = f"{new_dims[0]}x{new_dims[1]}"
print("Output images were not of valid resolution and have had padding applied")
else:
dimensions = f"{first_image.shape[1]}x{first_image.shape[0]}"
if loop_count > 0:
loop_args = ["-vf", "loop=loop=" + str(loop_count)+":size=" + str(len(images))]
else:
loop_args = []
if pingpong:
if meta_batch is not None:
print("pingpong is incompatible with batched output")
images = to_pingpong(images)
if video_format.get('input_color_depth', '8bit') == '16bit':
images = map(tensor_to_shorts, images)
if has_alpha:
i_pix_fmt = 'rgba64'
else:
i_pix_fmt = 'rgb48'
else:
images = map(tensor_to_bytes, images)
if has_alpha:
i_pix_fmt = 'rgba'
else:
i_pix_fmt = 'rgb24'
file = f"{filename}_{counter:05}.{video_format['extension']}"
file_path = os.path.join(full_output_folder, file)
if loop_count > 0:
loop_args = ["-vf", "loop=loop=" + str(loop_count)+":size=" + str(len(images))]
else:
loop_args = []
bitrate_arg = []
bitrate = video_format.get('bitrate')
if bitrate is not None:
bitrate_arg = ["-b:v", str(bitrate) + "M" if video_format.get('megabit') == 'True' else str(bitrate) + "K"]
args = [ffmpeg_path, "-v", "error", "-f", "rawvideo", "-pix_fmt", i_pix_fmt,
"-s", dimensions, "-r", str(frame_rate), "-i", "-"] \
+ loop_args
images = map(lambda x: x.tobytes(), images)
env=os.environ.copy()
if "environment" in video_format:
env.update(video_format["environment"])
if "pre_pass" in video_format:
if meta_batch is not None:
#Performing a prepass requires keeping access to all frames.
#Potential solutions include keeping just output frames in
#memory or using 3 passes with intermediate file, but
#very long gifs probably shouldn't be encouraged
raise Exception("Formats which require a pre_pass are incompatible with Batch Manager.")
images = [b''.join(images)]
os.makedirs(folder_paths.get_temp_directory(), exist_ok=True)
pre_pass_args = args[:13] + video_format['pre_pass']
try:
subprocess.run(pre_pass_args, input=images[0], env=env,
capture_output=True, check=True)
except subprocess.CalledProcessError as e:
raise Exception("An error occurred in the ffmpeg prepass:\n" \
+ e.stderr.decode("utf-8"))
if "inputs_main_pass" in video_format:
args = args[:13] + video_format['inputs_main_pass'] + args[13:]
if output_process is None:
if 'gifski_pass' in video_format:
output_process = gifski_process(args, video_format, file_path, env)
else:
args += video_format['main_pass'] + bitrate_arg
output_process = ffmpeg_process(args, video_format, video_metadata, file_path, env)
#Proceed to first yield
output_process.send(None)
if meta_batch is not None:
meta_batch.outputs[unique_id] = (counter, output_process)
for image in images:
pbar.update(1)
output_process.send(image)
if meta_batch is not None:
requeue_workflow((meta_batch.unique_id, not meta_batch.has_closed_inputs))
if meta_batch is None or meta_batch.has_closed_inputs:
#Close pipe and wait for termination.
try:
total_frames_output = output_process.send(None)
output_process.send(None)
except StopIteration:
pass
if meta_batch is not None:
meta_batch.outputs.pop(unique_id)
if len(meta_batch.outputs) == 0:
meta_batch.reset()
else:
#batch is unfinished
#TODO: Check if empty output breaks other custom nodes
return {"ui": {"unfinished_batch": [True]}, "result": ((save_output, []),)}
output_files.append(file_path)
audio_file = os.path.join(audio_dir,str(time.time()).replace(".","")+".wav")
torchaudio.save(audio_file,audio["waveform"][0],audio["sample_rate"])
print(audio_file)
filename = os.path.join(result_dir,f"{str(time.time()).replace('.','')}.mp4")
enhanced_filename = os.path.join(result_dir,f"enhanced_{str(time.time()).replace('.','')}.mp4")
command = [
'python',
'./run.py', # Script to run
'--frame-processors',
"lip_syncer",
"-s",
audio_file,
'-t', # Argument: segmentation path
output_files[-1],
'-o',
filename,
'--trim-frame-start',
str(trim_frame_start),
'--trim-frame-end',
str(trim_frame_end),
'--face-mask-padding',
str(face_mask_padding_top),
str(face_mask_padding_bottom),
str(face_mask_padding_left),
str(face_mask_padding_right),
'--headless'
]
if device=="cuda":
command.extend(['--execution-providers',"cuda"])
elif device=="mps":
command.extend(['--execution-providers',"coreml"])
print(command)
if platform == "win32":
result = subprocess.run(command,cwd=path_cwd,stdout=subprocess.PIPE)
else:
result = subprocess.run(command,cwd="custom_nodes/ComfyUI-DeepFuze",stdout=subprocess.PIPE)
# print(result.stdout.splitlines()[-1])
if enhancer!="None":
command = [
'python',
'./run.py', # Script to run
'--frame-processors',
"face_enhancer",
"--face-enhancer-model",
enhancer,
"-t",
filename,
'-o',
enhanced_filename,
'--headless'
]
if device=="cuda":
command.extend(['--execution-providers',"cuda"])
elif device=="mps":
command.extend(['--execution-providers',"coreml"])
print(command)
if platform == "win32":
result = subprocess.run(command,cwd=path_cwd,stdout=subprocess.PIPE)
else:
result = subprocess.run(command,cwd="custom_nodes/ComfyUI-DeepFuze",stdout=subprocess.PIPE)
filename = enhanced_filename
if frame_enhancer!="None":
command = [
'python',
'./run.py', # Script to run
'--frame-processors',
"frame_enhancer",
"--frame-enhancer-model",
frame_enhancer,
"-t",
filename,
'-o',
enhanced_filename,
'--headless'
]
print(command)
if device=="cuda":
command.extend(['--execution-providers',"cuda"])
elif device=="mps":
command.extend(['--execution-providers',"coreml"])
if platform == "win32":
result = subprocess.run(command,cwd=path_cwd,stdout=subprocess.PIPE)
else:
result = subprocess.run(command,cwd="custom_nodes/ComfyUI-DeepFuze",stdout=subprocess.PIPE)
temp_file = enhanced_filename.replace(".mp4","_.mp4") # "/".join(enhanced_filename.split("/")[:-1]) + "_"+enhanced_filename.split("/")[-1]
subprocess.run(["ffmpeg","-i",enhanced_filename,"-i",audio_file,"-c","copy","-map","0:v:0","-map","1:a:0",temp_file,'-y'])
filename = temp_file
print(result.stderr)
# try:
# os.system(f"rm {audio_file}")
# except: pass
return load_video_cv(filename,0,'Disabled',512,512,0,0,1)
import folder_paths
import torch
import time
import os
from pydub import AudioSegment
from scipy.io.wavfile import write
import numpy as np
from scipy.fft import fft
class AudioData:
def __init__(self, audio_file) -> None:
# Extract the sample rate
sample_rate = audio_file.frame_rate
# Get the number of audio channels
num_channels = audio_file.channels
# Extract the audio data as a NumPy array
audio_data = np.array(audio_file.get_array_of_samples())
self.audio_data = audio_data
self.sample_rate = sample_rate
self.num_channels = num_channels
def get_channel_audio_data(self, channel: int):
if channel < 0 or channel >= self.num_channels:
raise IndexError(f"Channel '{channel}' out of range. total channels is '{self.num_channels}'.")
return self.audio_data[channel::self.num_channels]
def get_channel_fft(self, channel: int):
audio_data = self.get_channel_audio_data(channel)
return fft(audio_data)
checkpoint_path_voice = os.path.join(folder_paths.models_dir,"deepfuze")
print(checkpoint_path_voice)
audio_path = os.path.join(folder_paths.get_input_directory(),"audio")
os.makedirs(audio_path,exist_ok=True)
class TTS_generation:
@classmethod
def INPUT_TYPES(self):
return {
"required": {
"audio": ("AUDIO",),
},
"optional": {
"llm_response": ("NEW_STRING",{"default":""}),
"text": ("STRING",{
"multiline": True,
"default": ""
}),
"device": (["cpu","cuda","mps"],),
"supported_language": ("English (en), Spanish (es), French (fr), German (de), Italian (it), Portuguese (pt), Polish (pl), Turkish (tr), Russian (ru), Dutch (nl), Czech (cs), Arabic (ar), Chinese (zh-cn), Japanese (ja), Hungarian (hu), Korean (ko), Hindi (hi)".split(","),),
}
}
RETURN_TYPES = ("AUDIO",) # Output type(s) of the node
FUNCTION = "generate_audio" # Entry-point method name
CATEGORY = "DeepFuze" # Category for the node in the UI
def generate_audio(self, audio, text,device,supported_language,llm_response=""):
if not llm_response and not text:
raise ValueError("Please provide LLM_response or enter text")
if llm_response:
text = llm_response
language = supported_language.split("(")[1][:-1]
file_path = os.path.join(audio_path,str(time.time()).replace(".","")+".wav")
torchaudio.save(file_path,audio["waveform"][0],audio["sample_rate"])
command = [
'python', 'tts_generation.py',
'--model', checkpoint_path_voice,
'--text', text,
'--language', language,
'--speaker_wav', file_path,
'--output_file', file_path,
'--device', device
]
if platform == "win32":
result = subprocess.run(command,cwd=path_cwd,stdout=subprocess.PIPE)
else:
result = subprocess.run(command, cwd="custom_nodes/ComfyUI-DeepFuze",capture_output=True, text=True)
print("stdout:", result.stdout)
print("stderr:", result.stderr)
audio = get_audio(file_path)
return (audio,)
class DeepfuzePreview:
def __init__(self):
self.output_dir = folder_paths.get_output_directory()
self.type = "output"
self.prefix_append = ""
self.compress_level = 4
@classmethod
def INPUT_TYPES(self):
return {
"required": {
"images": ("IMAGE",),
"face_mask_padding_left": ("INT",{"default":0,"min":0,"max":30,"step":1}),
"face_mask_padding_right": ("INT",{"default":0,"min":0,"max":30,"step":1}),
"face_mask_padding_bottom": ("INT",{"default":0,"min":0,"max":30,"step":1}),
"face_mask_padding_top": ("INT",{"default":0,"min":0,"max":30,"step":1}),
}
}
RETURN_TYPES = ()
FUNCTION = "test" # Entry-point method name
OUTPUT_NODE = True
CATEGORY = "DeepFuze" # Category for the node in the UI
def test(self, images, face_mask_padding_left, face_mask_padding_right,face_mask_padding_bottom,face_mask_padding_top, filename_prefix="ComfyUI", prompt=None, extra_pnginfo=None):
filename_prefix += self.prefix_append
full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path(filename_prefix, self.output_dir, images[0].shape[1], images[0].shape[0])
print(filename)
results = list()
for (batch_number, image) in enumerate(images):
i = 255. * image.cpu().numpy()
img = Image.fromarray(np.clip(i, 0, 255).astype(np.uint8))
metadata = None
filename_with_batch_num = filename.replace("%batch_num%", str(batch_number))
file = f"{filename_with_batch_num}_{counter:05}_.png"
img.save(os.path.join(full_output_folder, file), pnginfo=metadata, compress_level=self.compress_level)
command = [
'python',
'run.py', # Script to run
'--frame-processors',
"face_debugger",
"-t",
os.path.join(full_output_folder, file),
'-o',
os.path.join(full_output_folder, "_"+file),
'--face-mask-types',
'box',
'--face-mask-padding',
f'{str(face_mask_padding_top)}',
f'{str(face_mask_padding_bottom)}',
f'{str(face_mask_padding_left)}',
f'{str(face_mask_padding_right)}',
'--headless'
]
print(command)
if platform == "win32":
result = subprocess.run(command,cwd=path_cwd,stdout=subprocess.PIPE)
else:
result = subprocess.run(command,cwd="custom_nodes/ComfyUI-DeepFuze",stdout=subprocess.PIPE)
print(result.stdout)
results.append({
"filename": "_"+file,
"subfolder": subfolder,
"type": self.type
})
counter += 1
return { "ui": { "images": results } }
NODE_CLASS_MAPPINGS = {
"DeepFuzeAdavance": DeepFuzeAdavance,
"DeepFuzeFaceSwap": DeepFuzeFaceSwap,
"TTS_generation":TTS_generation,
"LLM_node": LLM_node,
"PlayBackAudio": PlayBackAudio,
"DeepFuze Save":SaveAudio,
"DeepfuzePreview":DeepfuzePreview
}
NODE_DISPLAY_NAME_MAPPINGS = {
"DeepFuzeAdavance": "DeepFuze Lipsync",
"DeepFuzeFaceSwap": "DeepFuze FaceSwap",
"TTS_generation":"DeepFuze TTS",
"LLM_node": "DeepFuze Openai LLM",
"PlayBackAudio": "Play Audio",
"DeepFuze Save":"DeepFuze Save Audio",
"DeepfuzePreview": "DeepFuze Padding Preview"
}