From 004f87a3fb134a8f0853b3d65ad53bd4697923e2 Mon Sep 17 00:00:00 2001 From: Sam Khoze <68170403+SamKhoze@users.noreply.github.com> Date: Sat, 15 Jun 2024 00:48:10 +0530 Subject: [PATCH] Update nodes.py --- nodes.py | 3041 ++++++++++++++++++++---------------------------------- 1 file changed, 1119 insertions(+), 1922 deletions(-) diff --git a/nodes.py b/nodes.py index b744b53..848723b 100644 --- a/nodes.py +++ b/nodes.py @@ -1,1385 +1,1102 @@ -import torch import os import sys import json -import hashlib -import traceback -import math -import time -import random -import logging - -from PIL import Image, ImageOps, ImageSequence, ImageFile -from PIL.PngImagePlugin import PngInfo - +import subprocess import numpy as np -import safetensors.torch +import re +import cv2 +import time +import itertools +import numpy as np +import datetime +from typing import List +import torch +import psutil -sys.path.insert(0, os.path.join(os.path.dirname(os.path.realpath(__file__)), "comfy")) +from PIL import Image, ExifTags +from PIL.PngImagePlugin import PngInfo +from pathlib import Path +from string import Template +from pydub import AudioSegment +from .utils import BIGMAX, DIMMAX, calculate_file_hash, get_sorted_dir_files_from_directory, get_audio, lazy_eval, hash_path, validate_path, strip_path +from PIL import Image, ImageOps +from comfy.utils import common_upscale, ProgressBar -import comfy.diffusers_load -import comfy.samplers -import comfy.sample -import comfy.sd -import comfy.utils -import comfy.controlnet +from scipy.io.wavfile import write +import folder_paths +from .utils import ffmpeg_path, get_audio, hash_path, validate_path, requeue_workflow, gifski_path, calculate_file_hash, strip_path +from comfy.utils import ProgressBar +from .utils import BIGMAX, DIMMAX, calculate_file_hash, get_sorted_dir_files_from_directory, get_audio, lazy_eval, hash_path, validate_path, strip_path +from .llm_node import LLM_node +from .audio_playback import PlayBackAudio -import comfy.clip_vision -import comfy.model_management -from comfy.cli_args import args +# folder_paths.folder_names_and_paths["VHS_video_formats"] = ( +# [ +# os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "video_formats"), +# ], +# [".json"] +# ) + +result_dir = os.path.join(folder_paths.get_output_directory(),"deepfuze") +audio_dir = os.path.join(folder_paths.get_input_directory(),"audio") + +try: + os.makedirs(result_dir) +except: pass +try: + os.makedirs(audio_dir) +except: pass +audio_extensions = ['mp3', 'mp4', 'wav', 'ogg'] + + + +video_extensions = ['webm', 'mp4', 'mkv', 'gif'] + + +def is_gif(filename) -> bool: + file_parts = filename.split('.') + return len(file_parts) > 1 and file_parts[-1] == "gif" + + +def target_size(width, height, force_size, custom_width, custom_height) -> tuple[int, int]: + if force_size == "Custom": + return (custom_width, custom_height) + elif force_size == "Custom Height": + force_size = "?x"+str(custom_height) + elif force_size == "Custom Width": + force_size = str(custom_width)+"x?" + + if force_size != "Disabled": + force_size = force_size.split("x") + if force_size[0] == "?": + width = (width*int(force_size[1]))//height + #Limit to a multple of 8 for latent conversion + width = int(width)+4 & ~7 + height = int(force_size[1]) + elif force_size[1] == "?": + height = (height*int(force_size[0]))//width + height = int(height)+4 & ~7 + width = int(force_size[0]) + else: + width = int(force_size[0]) + height = int(force_size[1]) + return (width, height) + +def cv_frame_generator(video, force_rate, frame_load_cap, skip_first_frames, + select_every_nth, meta_batch=None, unique_id=None): + video_cap = cv2.VideoCapture(strip_path(video)) + if not video_cap.isOpened(): + raise ValueError(f"{video} could not be loaded with cv.") + pbar = ProgressBar(frame_load_cap) if frame_load_cap > 0 else None + + # extract video metadata + fps = video_cap.get(cv2.CAP_PROP_FPS) + width = int(video_cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(video_cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + total_frames = int(video_cap.get(cv2.CAP_PROP_FRAME_COUNT)) + duration = total_frames / fps + + # set video_cap to look at start_index frame + total_frame_count = 0 + total_frames_evaluated = -1 + frames_added = 0 + base_frame_time = 1 / fps + prev_frame = None + + if force_rate == 0: + target_frame_time = base_frame_time + else: + target_frame_time = 1/force_rate + + yield (width, height, fps, duration, total_frames, target_frame_time) + + time_offset=target_frame_time - base_frame_time + while video_cap.isOpened(): + if time_offset < target_frame_time: + is_returned = video_cap.grab() + # if didn't return frame, video has ended + if not is_returned: + break + time_offset += base_frame_time + if time_offset < target_frame_time: + continue + time_offset -= target_frame_time + # if not at start_index, skip doing anything with frame + total_frame_count += 1 + if total_frame_count <= skip_first_frames: + continue + else: + total_frames_evaluated += 1 + + # if should not be selected, skip doing anything with frame + if total_frames_evaluated%select_every_nth != 0: + continue + + # opencv loads images in BGR format (yuck), so need to convert to RGB for ComfyUI use + # follow up: can videos ever have an alpha channel? + # To my testing: No. opencv has no support for alpha + unused, frame = video_cap.retrieve() + frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + # convert frame to comfyui's expected format + # TODO: frame contains no exif information. Check if opencv2 has already applied + frame = np.array(frame, dtype=np.float32) + torch.from_numpy(frame).div_(255) + if prev_frame is not None: + inp = yield prev_frame + if inp is not None: + #ensure the finally block is called + return + prev_frame = frame + frames_added += 1 + if pbar is not None: + pbar.update_absolute(frames_added, frame_load_cap) + # if cap exists and we've reached it, stop processing frames + if frame_load_cap > 0 and frames_added >= frame_load_cap: + break + if meta_batch is not None: + meta_batch.inputs.pop(unique_id) + meta_batch.has_closed_inputs = True + if prev_frame is not None: + yield prev_frame + +def load_video_cv(video: str, force_rate: int, force_size: str, + custom_width: int,custom_height: int, frame_load_cap: int, + skip_first_frames: int, select_every_nth: int, + meta_batch=None, unique_id=None, memory_limit_mb=None): + print(meta_batch) + if meta_batch is None or unique_id not in meta_batch.inputs: + gen = cv_frame_generator(video, force_rate, frame_load_cap, skip_first_frames, + select_every_nth, meta_batch, unique_id) + (width, height, fps, duration, total_frames, target_frame_time) = next(gen) + + if meta_batch is not None: + meta_batch.inputs[unique_id] = (gen, width, height, fps, duration, total_frames, target_frame_time) + + else: + (gen, width, height, fps, duration, total_frames, target_frame_time) = meta_batch.inputs[unique_id] + + if memory_limit_mb is not None: + memory_limit *= 2 ** 20 + else: + #TODO: verify if garbage collection should be performed here. + #leaves ~128 MB unreserved for safety + memory_limit = (psutil.virtual_memory().available + psutil.swap_memory().free) - 2 ** 27 + #space required to load as f32, exist as latent with wiggle room, decode to f32 + max_loadable_frames = int(memory_limit//(width*height*3*(4+4+1/10))) + if meta_batch is not None: + if meta_batch.frames_per_batch > max_loadable_frames: + raise RuntimeError(f"Meta Batch set to {meta_batch.frames_per_batch} frames but only {max_loadable_frames} can fit in memory") + gen = itertools.islice(gen, meta_batch.frames_per_batch) + else: + original_gen = gen + gen = itertools.islice(gen, max_loadable_frames) + + #Some minor wizardry to eliminate a copy and reduce max memory by a factor of ~2 + images = torch.from_numpy(np.fromiter(gen, np.dtype((np.float32, (height, width, 3))))) + if meta_batch is None: + try: + next(original_gen) + raise RuntimeError(f"Memory limit hit after loading {len(images)} frames. Stopping execution.") + except StopIteration: + pass + if len(images) == 0: + raise RuntimeError("No frames generated") + if force_size != "Disabled": + new_size = target_size(width, height, force_size, custom_width, custom_height) + if new_size[0] != width or new_size[1] != height: + s = images.movedim(-1,1) + s = common_upscale(s, new_size[0], new_size[1], "lanczos", "center") + images = s.movedim(1,-1) + + #Setup lambda for lazy audio capture + audio = lambda : get_audio(video, skip_first_frames * target_frame_time, + frame_load_cap*target_frame_time*select_every_nth) + #Adjust target_frame_time for select_every_nth + target_frame_time *= select_every_nth + video_info = { + "source_fps": fps, + "source_frame_count": total_frames, + "source_duration": duration, + "source_width": width, + "source_height": height, + "loaded_fps": 1/target_frame_time, + "loaded_frame_count": len(images), + "loaded_duration": len(images) * target_frame_time, + "loaded_width": images.shape[2], + "loaded_height": images.shape[1], + } + print("images", type(images)) + return (images, len(images), lazy_eval(audio), video_info) + + + +class AudioData: + def __init__(self, audio_file) -> None: + + # Extract the sample rate + sample_rate = audio_file.frame_rate + + # Get the number of audio channels + num_channels = audio_file.channels + + # Extract the audio data as a NumPy array + audio_data = np.array(audio_file.get_array_of_samples()) + self.audio_data = audio_data + self.sample_rate = sample_rate + self.num_channels = num_channels + + def get_channel_audio_data(self, channel: int): + if channel < 0 or channel >= self.num_channels: + raise IndexError(f"Channel '{channel}' out of range. total channels is '{self.num_channels}'.") + return self.audio_data[channel::self.num_channels] + + def get_channel_fft(self, channel: int): + audio_data = self.get_channel_audio_data(channel) + return fft(audio_data) + + +def gen_format_widgets(video_format): + for k in video_format: + if k.endswith("_pass"): + for i in range(len(video_format[k])): + if isinstance(video_format[k][i], list): + item = [video_format[k][i]] + yield item + video_format[k][i] = item[0] + else: + if isinstance(video_format[k], list): + item = [video_format[k]] + yield item + video_format[k] = item[0] + +def get_video_formats(): + formats = [] + for format_name in folder_paths.get_filename_list("VHS_video_formats"): + format_name = format_name[:-5] + video_format_path = folder_paths.get_full_path("VHS_video_formats", format_name + ".json") + with open(video_format_path, 'r') as stream: + video_format = json.load(stream) + if "gifski_pass" in video_format and gifski_path is None: + #Skip format + continue + widgets = [w[0] for w in gen_format_widgets(video_format)] + if (len(widgets) > 0): + formats.append(["video/" + format_name, widgets]) + else: + formats.append("video/" + format_name) + return formats + +def get_format_widget_defaults(format_name): + video_format_path = folder_paths.get_full_path("VHS_video_formats", format_name + ".json") + with open(video_format_path, 'r') as stream: + video_format = json.load(stream) + results = {} + for w in gen_format_widgets(video_format): + if len(w[0]) > 2 and 'default' in w[0][2]: + default = w[0][2]['default'] + else: + if type(w[0][1]) is list: + default = w[0][1][0] + else: + #NOTE: This doesn't respect max/min, but should be good enough as a fallback to a fallback to a fallback + default = {"BOOLEAN": False, "INT": 0, "FLOAT": 0, "STRING": ""}[w[0][1]] + results[w[0][0]] = default + return results + + +def apply_format_widgets(format_name, kwargs): + video_format_path = folder_paths.get_full_path("VHS_video_formats", format_name + ".json") + print(video_format_path) + with open(video_format_path, 'r') as stream: + video_format = json.load(stream) + for w in gen_format_widgets(video_format): + print(w[0][0]) + assert(w[0][0] in kwargs) + if len(w[0]) > 3: + w[0] = Template(w[0][3]).substitute(val=kwargs[w[0][0]]) + else: + w[0] = str(kwargs[w[0][0]]) + return video_format + +def tensor_to_int(tensor, bits): + #TODO: investigate benefit of rounding by adding 0.5 before clip/cast + tensor = tensor.cpu().numpy() * (2**bits-1) + return np.clip(tensor, 0, (2**bits-1)) +def tensor_to_shorts(tensor): + return tensor_to_int(tensor, 16).astype(np.uint16) +def tensor_to_bytes(tensor): + return tensor_to_int(tensor, 8).astype(np.uint8) + +def ffmpeg_process(args, video_format, video_metadata, file_path, env): + + res = None + frame_data = yield + total_frames_output = 0 + if video_format.get('save_metadata', 'False') != 'False': + os.makedirs(folder_paths.get_temp_directory(), exist_ok=True) + metadata = json.dumps(video_metadata) + metadata_path = os.path.join(folder_paths.get_temp_directory(), "metadata.txt") + #metadata from file should escape = ; # \ and newline + metadata = metadata.replace("\\","\\\\") + metadata = metadata.replace(";","\\;") + metadata = metadata.replace("#","\\#") + metadata = metadata.replace("=","\\=") + metadata = metadata.replace("\n","\\\n") + metadata = "comment=" + metadata + with open(metadata_path, "w") as f: + f.write(";FFMETADATA1\n") + f.write(metadata) + m_args = args[:1] + ["-i", metadata_path] + args[1:] + ["-metadata", "creation_time=now"] + with subprocess.Popen(m_args + [file_path], stderr=subprocess.PIPE, + stdin=subprocess.PIPE, env=env) as proc: + try: + while frame_data is not None: + proc.stdin.write(frame_data) + #TODO: skip flush for increased speed + frame_data = yield + total_frames_output+=1 + proc.stdin.flush() + proc.stdin.close() + res = proc.stderr.read() + except BrokenPipeError as e: + err = proc.stderr.read() + #Check if output file exists. If it does, the re-execution + #will also fail. This obscures the cause of the error + #and seems to never occur concurrent to the metadata issue + if os.path.exists(file_path): + raise Exception("An error occurred in the ffmpeg subprocess:\n" \ + + err.decode("utf-8")) + #Res was not set + print(err.decode("utf-8"), end="", file=sys.stderr) + print("An error occurred when saving with metadata") + if res != b'': + with subprocess.Popen(args + [file_path], stderr=subprocess.PIPE, + stdin=subprocess.PIPE, env=env) as proc: + try: + while frame_data is not None: + proc.stdin.write(frame_data) + frame_data = yield + total_frames_output+=1 + proc.stdin.flush() + proc.stdin.close() + res = proc.stderr.read() + except BrokenPipeError as e: + res = proc.stderr.read() + raise Exception("An error occurred in the ffmpeg subprocess:\n" \ + + res.decode("utf-8")) + yield total_frames_output + if len(res) > 0: + print(res.decode("utf-8"), end="", file=sys.stderr) + +def gifski_process(args, video_format, file_path, env): + frame_data = yield + with subprocess.Popen(args + video_format['main_pass'] + ['-f', 'yuv4mpegpipe', '-'], + stderr=subprocess.PIPE, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, env=env) as procff: + with subprocess.Popen([gifski_path] + video_format['gifski_pass'] + + ['-q', '-o', file_path, '-'], stderr=subprocess.PIPE, + stdin=procff.stdout, stdout=subprocess.PIPE, + env=env) as procgs: + try: + while frame_data is not None: + procff.stdin.write(frame_data) + frame_data = yield + procff.stdin.flush() + procff.stdin.close() + resff = procff.stderr.read() + resgs = procgs.stderr.read() + outgs = procgs.stdout.read() + except BrokenPipeError as e: + procff.stdin.close() + resff = procff.stderr.read() + resgs = procgs.stderr.read() + raise Exception("An error occurred while creating gifski output\n" \ + + "Make sure you are using gifski --version >=1.32.0\nffmpeg: " \ + + resff.decode("utf-8") + '\ngifski: ' + resgs.decode("utf-8")) + if len(resff) > 0: + print(resff.decode("utf-8"), end="", file=sys.stderr) + if len(resgs) > 0: + print(resgs.decode("utf-8"), end="", file=sys.stderr) + #should always be empty as the quiet flag is passed + if len(outgs) > 0: + print(outgs.decode("utf-8")) + +def to_pingpong(inp): + if not hasattr(inp, "__getitem__"): + inp = list(inp) + yield from inp + for i in range(len(inp)-2,0,-1): + yield inp[i] + + +video_extensions = ['webm', 'mp4', 'mkv', 'gif'] + + +def is_gif(filename) -> bool: + file_parts = filename.split('.') + return len(file_parts) > 1 and file_parts[-1] == "gif" + + +def target_size(width, height, force_size, custom_width, custom_height) -> tuple[int, int]: + if force_size == "Custom": + return (custom_width, custom_height) + elif force_size == "Custom Height": + force_size = "?x"+str(custom_height) + elif force_size == "Custom Width": + force_size = str(custom_width)+"x?" + + if force_size != "Disabled": + force_size = force_size.split("x") + if force_size[0] == "?": + width = (width*int(force_size[1]))//height + #Limit to a multple of 8 for latent conversion + width = int(width)+4 & ~7 + height = int(force_size[1]) + elif force_size[1] == "?": + height = (height*int(force_size[0]))//width + height = int(height)+4 & ~7 + width = int(force_size[0]) + else: + width = int(force_size[0]) + height = int(force_size[1]) + return (width, height) + +def cv_frame_generator(video, force_rate, frame_load_cap, skip_first_frames, + select_every_nth, meta_batch=None, unique_id=None): + video_cap = cv2.VideoCapture(strip_path(video)) + if not video_cap.isOpened(): + raise ValueError(f"{video} could not be loaded with cv.") + pbar = ProgressBar(frame_load_cap) if frame_load_cap > 0 else None + + # extract video metadata + fps = video_cap.get(cv2.CAP_PROP_FPS) + width = int(video_cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(video_cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + total_frames = int(video_cap.get(cv2.CAP_PROP_FRAME_COUNT)) + duration = total_frames / fps + + # set video_cap to look at start_index frame + total_frame_count = 0 + total_frames_evaluated = -1 + frames_added = 0 + base_frame_time = 1 / fps + prev_frame = None + + if force_rate == 0: + target_frame_time = base_frame_time + else: + target_frame_time = 1/force_rate + + yield (width, height, fps, duration, total_frames, target_frame_time) + + time_offset=target_frame_time - base_frame_time + while video_cap.isOpened(): + if time_offset < target_frame_time: + is_returned = video_cap.grab() + # if didn't return frame, video has ended + if not is_returned: + break + time_offset += base_frame_time + if time_offset < target_frame_time: + continue + time_offset -= target_frame_time + # if not at start_index, skip doing anything with frame + total_frame_count += 1 + if total_frame_count <= skip_first_frames: + continue + else: + total_frames_evaluated += 1 + + # if should not be selected, skip doing anything with frame + if total_frames_evaluated%select_every_nth != 0: + continue + + # opencv loads images in BGR format (yuck), so need to convert to RGB for ComfyUI use + # follow up: can videos ever have an alpha channel? + # To my testing: No. opencv has no support for alpha + unused, frame = video_cap.retrieve() + frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + # convert frame to comfyui's expected format + # TODO: frame contains no exif information. Check if opencv2 has already applied + frame = np.array(frame, dtype=np.float32) + torch.from_numpy(frame).div_(255) + if prev_frame is not None: + inp = yield prev_frame + if inp is not None: + #ensure the finally block is called + return + prev_frame = frame + frames_added += 1 + if pbar is not None: + pbar.update_absolute(frames_added, frame_load_cap) + # if cap exists and we've reached it, stop processing frames + if frame_load_cap > 0 and frames_added >= frame_load_cap: + break + if meta_batch is not None: + meta_batch.inputs.pop(unique_id) + meta_batch.has_closed_inputs = True + if prev_frame is not None: + yield prev_frame + +def load_video_cv(video: str, force_rate: int, force_size: str, + custom_width: int,custom_height: int, frame_load_cap: int, + skip_first_frames: int, select_every_nth: int, + meta_batch=None, unique_id=None, memory_limit_mb=None): + print(meta_batch) + if meta_batch is None or unique_id not in meta_batch.inputs: + gen = cv_frame_generator(video, force_rate, frame_load_cap, skip_first_frames, + select_every_nth, meta_batch, unique_id) + (width, height, fps, duration, total_frames, target_frame_time) = next(gen) + + if meta_batch is not None: + meta_batch.inputs[unique_id] = (gen, width, height, fps, duration, total_frames, target_frame_time) + + else: + (gen, width, height, fps, duration, total_frames, target_frame_time) = meta_batch.inputs[unique_id] + + if memory_limit_mb is not None: + memory_limit *= 2 ** 20 + else: + #TODO: verify if garbage collection should be performed here. + #leaves ~128 MB unreserved for safety + memory_limit = (psutil.virtual_memory().available + psutil.swap_memory().free) - 2 ** 27 + #space required to load as f32, exist as latent with wiggle room, decode to f32 + max_loadable_frames = int(memory_limit//(width*height*3*(4+4+1/10))) + if meta_batch is not None: + if meta_batch.frames_per_batch > max_loadable_frames: + raise RuntimeError(f"Meta Batch set to {meta_batch.frames_per_batch} frames but only {max_loadable_frames} can fit in memory") + gen = itertools.islice(gen, meta_batch.frames_per_batch) + else: + original_gen = gen + gen = itertools.islice(gen, max_loadable_frames) + + #Some minor wizardry to eliminate a copy and reduce max memory by a factor of ~2 + images = torch.from_numpy(np.fromiter(gen, np.dtype((np.float32, (height, width, 3))))) + if meta_batch is None: + try: + next(original_gen) + raise RuntimeError(f"Memory limit hit after loading {len(images)} frames. Stopping execution.") + except StopIteration: + pass + if len(images) == 0: + raise RuntimeError("No frames generated") + if force_size != "Disabled": + new_size = target_size(width, height, force_size, custom_width, custom_height) + if new_size[0] != width or new_size[1] != height: + s = images.movedim(-1,1) + s = common_upscale(s, new_size[0], new_size[1], "lanczos", "center") + images = s.movedim(1,-1) + + #Setup lambda for lazy audio capture + audio = lambda : get_audio(video, skip_first_frames * target_frame_time, + frame_load_cap*target_frame_time*select_every_nth) + #Adjust target_frame_time for select_every_nth + target_frame_time *= select_every_nth + video_info = { + "source_fps": fps, + "source_frame_count": total_frames, + "source_duration": duration, + "source_width": width, + "source_height": height, + "loaded_fps": 1/target_frame_time, + "loaded_frame_count": len(images), + "loaded_duration": len(images) * target_frame_time, + "loaded_width": images.shape[2], + "loaded_height": images.shape[1], + } + print("images", type(images)) + return (images, len(images), lazy_eval(audio), video_info) + + + + + + +class DeepFuzeAdavance: + @classmethod + def INPUT_TYPES(s): + ffmpeg_formats = get_video_formats() + return { + "required": { + "images": ("IMAGE",), + "audio": ("AUDIO",), + "enhancer": ("None,codeformer,gfpgan_1.2,gfpgan_1.3,gfpgan_1.4,gpen_bfr_256,gpen_bfr_512,gpen_bfr_1024,gpen_bfr_2048,restoreformer_plus_plus".split(","),{"default":'None'}), + "frame_enhancer": ("None,clear_reality_x4,lsdir_x4,nomos8k_sc_x4,real_esrgan_x2,real_esrgan_x2_fp16,real_esrgan_x4,real_esrgan_x4_fp16,real_hatgan_x4,span_kendata_x4,ultra_sharp_x4".split(","),{"default":'None'}), + "face_mask_padding_left": ("INT",{"default":0,"min":0,"max":30,"step":1}), + "face_mask_padding_right": ("INT",{"default":0,"min":0,"max":30,"step":1}), + "face_mask_padding_bottom": ("INT",{"default":0,"min":0,"max":30,"step":1}), + "face_mask_padding_top": ("INT",{"default":0,"min":0,"max":30,"step":1}), + "trim_frame_start": ("INT",{"default":0,"max":2000},), + "trim_frame_end": ("INT",{"default":0,"max":2000},), + "device" : (["cpu","gpu"],{"default":"cpu"}), + "frame_rate": ( + "FLOAT", + {"default": 25, "min": 1, "step": 1}, + ), + + }, + "optional": { + "meta_batch": ("VHS_BatchManager",), + "loop_count": ("INT", {"default": 0, "min": 0, "max": 100, "step": 1}), + "filename_prefix": ("STRING", {"default": "deepfuze"}), + "pingpong": ("BOOLEAN", {"default": False}), + "save_output": ("BOOLEAN", {"default": True}), + }, + "hidden": { + "prompt": "PROMPT", + "format": (["image/gif", "image/webp"] + ffmpeg_formats,{"default":"video/h265-mp4"}), + "extra_pnginfo": "EXTRA_PNGINFO", + "unique_id": "UNIQUE_ID" + }, + } + + + RETURN_TYPES = ("IMAGE", "INT", "VHS_AUDIO", "VHS_VIDEOINFO",) + RETURN_NAMES = ("IMAGE", "frame_count", "audio", "video_info",) + + # RETURN_TYPES = ("VHS_FILENAMES",) + # RETURN_NAMES = ("Filenames",) + # OUTPUT_NODE = True + CATEGORY = "DeepFuze" + FUNCTION = "lipsyncgenerate" + + def lipsyncgenerate( + self, + images, + audio, + enhancer, + frame_enhancer, + face_mask_padding_left, + face_mask_padding_right, + face_mask_padding_bottom, + face_mask_padding_top, + trim_frame_start, + trim_frame_end, + device, + frame_rate: int, + loop_count: int, + filename_prefix="deepfuze", + format="video/h265-mp4", + pingpong=False, + save_output=True, + prompt=None, + extra_pnginfo=None, + unique_id=None, + manual_format_widgets=None, + meta_batch=None + ): + print(enhancer,frame_rate,format) + if isinstance(images, torch.Tensor) and images.size(0) == 0: + return ("",) + pbar = ProgressBar(len(images)) + trim_frame_end = len(images)-trim_frame_end + + first_image = images[0] + # get output information + output_dir = ( + folder_paths.get_output_directory() + if save_output + else folder_paths.get_temp_directory() + ) + ( + full_output_folder, + filename, + _, + subfolder, + _, + ) = folder_paths.get_save_image_path(filename_prefix, output_dir) + output_files = [] + + metadata = PngInfo() + video_metadata = {} + if prompt is not None: + metadata.add_text("prompt", json.dumps(prompt)) + video_metadata["prompt"] = prompt + if extra_pnginfo is not None: + for x in extra_pnginfo: + metadata.add_text(x, json.dumps(extra_pnginfo[x])) + video_metadata[x] = extra_pnginfo[x] + metadata.add_text("CreationTime", datetime.datetime.now().isoformat(" ")[:19]) + + if meta_batch is not None and unique_id in meta_batch.outputs: + (counter, output_process) = meta_batch.outputs[unique_id] + else: + # comfy counter workaround + max_counter = 0 + + # Loop through the existing files + matcher = re.compile(f"{re.escape(filename)}_(\\d+)\\D*\\..+", re.IGNORECASE) + for existing_file in os.listdir(full_output_folder): + # Check if the file matches the expected format + match = matcher.fullmatch(existing_file) + if match: + # Extract the numeric portion of the filename + file_counter = int(match.group(1)) + # Update the maximum counter value if necessary + if file_counter > max_counter: + max_counter = file_counter + + # Increment the counter by 1 to get the next available value + counter = max_counter + 1 + output_process = None + + # save first frame as png to keep metadata + file = f"{filename}_{counter:05}.png" + file_path = os.path.join(full_output_folder, file) + Image.fromarray(tensor_to_bytes(first_image)).save( + file_path, + pnginfo=metadata, + compress_level=4, + ) + output_files.append(file_path) + + format_type, format_ext = format.split("/") + print(format_type, format_ext) + if format_type == "image": + if meta_batch is not None: + raise Exception("Pillow('image/') formats are not compatible with batched output") + image_kwargs = {} + if format_ext == "gif": + image_kwargs['disposal'] = 2 + if format_ext == "webp": + #Save timestamp information + exif = Image.Exif() + exif[ExifTags.IFD.Exif] = {36867: datetime.datetime.now().isoformat(" ")[:19]} + image_kwargs['exif'] = exif + file = f"{filename}_{counter:05}.{format_ext}" + file_path = os.path.join(full_output_folder, file) + if pingpong: + images = to_pingpong(images) + frames = map(lambda x : Image.fromarray(tensor_to_bytes(x)), images) + # Use pillow directly to save an animated image + next(frames).save( + file_path, + format=format_ext.upper(), + save_all=True, + append_images=frames, + duration=round(1000 / frame_rate), + loop=loop_count, + compress_level=4, + **image_kwargs + ) + output_files.append(file_path) + else: + # Use ffmpeg to save a video + if ffmpeg_path is None: + raise ProcessLookupError(f"ffmpeg is required for video outputs and could not be found.\nIn order to use video outputs, you must either:\n- Install imageio-ffmpeg with pip,\n- Place a ffmpeg executable in {os.path.abspath('')}, or\n- Install ffmpeg and add it to the system path.") + + #Acquire additional format_widget values + kwargs = None + if manual_format_widgets is None: + if prompt is not None: + kwargs = prompt[unique_id]['inputs'] + else: + manual_format_widgets = {} + if kwargs is None: + kwargs = get_format_widget_defaults(format_ext) + missing = {} + for k in kwargs.keys(): + if k in manual_format_widgets: + kwargs[k] = manual_format_widgets[k] + else: + missing[k] = kwargs[k] + if len(missing) > 0: + print("Extra format values were not provided, the following defaults will be used: " + str(kwargs) + "\nThis is likely due to usage of ComfyUI-to-python. These values can be manually set by supplying a manual_format_widgets argument") + kwargs["format"] = format + kwargs['pix_fmt'] = 'yuv420p10le' + kwargs['crf'] = 22 + kwargs["save_metadata"] = ["save_metadata", "BOOLEAN", {"default": True}] + print(kwargs) + video_format = apply_format_widgets(format_ext, kwargs) + has_alpha = first_image.shape[-1] == 4 + dim_alignment = video_format.get("dim_alignment", 8) + if (first_image.shape[1] % dim_alignment) or (first_image.shape[0] % dim_alignment): + #output frames must be padded + to_pad = (-first_image.shape[1] % dim_alignment, + -first_image.shape[0] % dim_alignment) + padding = (to_pad[0]//2, to_pad[0] - to_pad[0]//2, + to_pad[1]//2, to_pad[1] - to_pad[1]//2) + padfunc = torch.nn.ReplicationPad2d(padding) + def pad(image): + image = image.permute((2,0,1))#HWC to CHW + padded = padfunc(image.to(dtype=torch.float32)) + return padded.permute((1,2,0)) + images = map(pad, images) + new_dims = (-first_image.shape[1] % dim_alignment + first_image.shape[1], + -first_image.shape[0] % dim_alignment + first_image.shape[0]) + dimensions = f"{new_dims[0]}x{new_dims[1]}" + print("Output images were not of valid resolution and have had padding applied") + else: + dimensions = f"{first_image.shape[1]}x{first_image.shape[0]}" + if loop_count > 0: + loop_args = ["-vf", "loop=loop=" + str(loop_count)+":size=" + str(len(images))] + else: + loop_args = [] + if pingpong: + if meta_batch is not None: + print("pingpong is incompatible with batched output") + images = to_pingpong(images) + if video_format.get('input_color_depth', '8bit') == '16bit': + images = map(tensor_to_shorts, images) + if has_alpha: + i_pix_fmt = 'rgba64' + else: + i_pix_fmt = 'rgb48' + else: + images = map(tensor_to_bytes, images) + if has_alpha: + i_pix_fmt = 'rgba' + else: + i_pix_fmt = 'rgb24' + file = f"{filename}_{counter:05}.{video_format['extension']}" + file_path = os.path.join(full_output_folder, file) + if loop_count > 0: + loop_args = ["-vf", "loop=loop=" + str(loop_count)+":size=" + str(len(images))] + else: + loop_args = [] + bitrate_arg = [] + bitrate = video_format.get('bitrate') + if bitrate is not None: + bitrate_arg = ["-b:v", str(bitrate) + "M" if video_format.get('megabit') == 'True' else str(bitrate) + "K"] + args = [ffmpeg_path, "-v", "error", "-f", "rawvideo", "-pix_fmt", i_pix_fmt, + "-s", dimensions, "-r", str(frame_rate), "-i", "-"] \ + + loop_args + + images = map(lambda x: x.tobytes(), images) + env=os.environ.copy() + if "environment" in video_format: + env.update(video_format["environment"]) + + if "pre_pass" in video_format: + if meta_batch is not None: + #Performing a prepass requires keeping access to all frames. + #Potential solutions include keeping just output frames in + #memory or using 3 passes with intermediate file, but + #very long gifs probably shouldn't be encouraged + raise Exception("Formats which require a pre_pass are incompatible with Batch Manager.") + images = [b''.join(images)] + os.makedirs(folder_paths.get_temp_directory(), exist_ok=True) + pre_pass_args = args[:13] + video_format['pre_pass'] + try: + subprocess.run(pre_pass_args, input=images[0], env=env, + capture_output=True, check=True) + except subprocess.CalledProcessError as e: + raise Exception("An error occurred in the ffmpeg prepass:\n" \ + + e.stderr.decode("utf-8")) + if "inputs_main_pass" in video_format: + args = args[:13] + video_format['inputs_main_pass'] + args[13:] + + if output_process is None: + if 'gifski_pass' in video_format: + output_process = gifski_process(args, video_format, file_path, env) + else: + args += video_format['main_pass'] + bitrate_arg + output_process = ffmpeg_process(args, video_format, video_metadata, file_path, env) + #Proceed to first yield + output_process.send(None) + if meta_batch is not None: + meta_batch.outputs[unique_id] = (counter, output_process) + + for image in images: + pbar.update(1) + output_process.send(image) + if meta_batch is not None: + requeue_workflow((meta_batch.unique_id, not meta_batch.has_closed_inputs)) + if meta_batch is None or meta_batch.has_closed_inputs: + #Close pipe and wait for termination. + try: + total_frames_output = output_process.send(None) + output_process.send(None) + except StopIteration: + pass + if meta_batch is not None: + meta_batch.outputs.pop(unique_id) + if len(meta_batch.outputs) == 0: + meta_batch.reset() + else: + #batch is unfinished + #TODO: Check if empty output breaks other custom nodes + return {"ui": {"unfinished_batch": [True]}, "result": ((save_output, []),)} + + output_files.append(file_path) + + audio_file = os.path.join(audio_dir,str(time.time()).replace(".","")+".wav") + write(audio_file,audio.sample_rate,audio.audio_data) + print(audio_file) + filename = os.path.join(result_dir,f"{str(time.time()).replace('.','')}.mp4") + enhanced_filename = os.path.join(result_dir,f"enhanced_{str(time.time()).replace('.','')}.mp4") + command = [ + 'python', + './run.py', # Script to run + '--frame-processors', + "lip_syncer", + "-s", + audio_file, + '-t', # Argument: segmentation path + output_files[-1], + '-o', + filename, + '--trim-frame-start', + str(trim_frame_start), + '--trim-frame-end', + str(trim_frame_end), + '--face-mask-padding', + str(face_mask_padding_top), + str(face_mask_padding_bottom), + str(face_mask_padding_left), + str(face_mask_padding_right), + '--headless' + ] + if device=="gpu": + command.extend(['--execution-providers',"coreml"]) + print(command) + result = subprocess.run(command,cwd="custom_nodes/ComfyUI-DeepFuze",stdout=subprocess.PIPE) + # print(result.stdout.splitlines()[-1]) + if enhancer!="None": + command = [ + 'python', + './run.py', # Script to run + '--frame-processors', + "face_enhancer", + "--face-enhancer-model", + enhancer, + "-t", + filename, + '-o', + enhanced_filename, + '--headless' + ] + if device=="gpu": + command.extend(['--execution-providers',"coreml"]) + print(command) + result = subprocess.run(command,cwd="custom_nodes/ComfyUI-DeepFuze",stdout=subprocess.PIPE) + filename = enhanced_filename + + if frame_enhancer!="None": + command = [ + 'python', + './run.py', # Script to run + '--frame-processors', + "frame_enhancer", + "--frame-enhancer-model", + frame_enhancer, + "-t", + filename, + '-o', + enhanced_filename, + '--headless' + ] + print(command) + if device=="gpu": + command.extend(['--execution-providers',"coreml"]) + result = subprocess.run(command,cwd="custom_nodes/ComfyUI-DeepFuze",stdout=subprocess.PIPE) + temp_file = "/".join(enhanced_filename.split("/")[:-1]) + "_"+enhanced_filename.split("/")[-1] + subprocess.run(["ffmpeg","-i",enhanced_filename,"-i",audio_file,"-c","copy","-map","0:v:0","-map","1:a:0",temp_file,'-y']) + filename = temp_file + + print(result.stderr) + # try: + # os.system(f"rm {audio_file}") + # except: pass + return load_video_cv(filename,0,'Disabled',512,512,0,0,1) + -import importlib import folder_paths -import latent_preview -import node_helpers +import torch +import time +import os +from pydub import AudioSegment -def before_node_execution(): - comfy.model_management.throw_exception_if_processing_interrupted() +from scipy.io.wavfile import write -def interrupt_processing(value=True): - comfy.model_management.interrupt_current_processing(value) -MAX_RESOLUTION=16384 -class CLIPTextEncode: - @classmethod - def INPUT_TYPES(s): - return {"required": {"text": ("STRING", {"multiline": True, "dynamicPrompts": True}), "clip": ("CLIP", )}} - RETURN_TYPES = ("CONDITIONING",) - FUNCTION = "encode" - CATEGORY = "conditioning" +import numpy as np +from scipy.fft import fft - def encode(self, clip, text): - tokens = clip.tokenize(text) - cond, pooled = clip.encode_from_tokens(tokens, return_pooled=True) - return ([[cond, {"pooled_output": pooled}]], ) - -class ConditioningCombine: - @classmethod - def INPUT_TYPES(s): - return {"required": {"conditioning_1": ("CONDITIONING", ), "conditioning_2": ("CONDITIONING", )}} - RETURN_TYPES = ("CONDITIONING",) - FUNCTION = "combine" - - CATEGORY = "conditioning" - - def combine(self, conditioning_1, conditioning_2): - return (conditioning_1 + conditioning_2, ) - -class ConditioningAverage : - @classmethod - def INPUT_TYPES(s): - return {"required": {"conditioning_to": ("CONDITIONING", ), "conditioning_from": ("CONDITIONING", ), - "conditioning_to_strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}) - }} - RETURN_TYPES = ("CONDITIONING",) - FUNCTION = "addWeighted" - - CATEGORY = "conditioning" - - def addWeighted(self, conditioning_to, conditioning_from, conditioning_to_strength): - out = [] - - if len(conditioning_from) > 1: - logging.warning("Warning: ConditioningAverage conditioning_from contains more than 1 cond, only the first one will actually be applied to conditioning_to.") - - cond_from = conditioning_from[0][0] - pooled_output_from = conditioning_from[0][1].get("pooled_output", None) - - for i in range(len(conditioning_to)): - t1 = conditioning_to[i][0] - pooled_output_to = conditioning_to[i][1].get("pooled_output", pooled_output_from) - t0 = cond_from[:,:t1.shape[1]] - if t0.shape[1] < t1.shape[1]: - t0 = torch.cat([t0] + [torch.zeros((1, (t1.shape[1] - t0.shape[1]), t1.shape[2]))], dim=1) - - tw = torch.mul(t1, conditioning_to_strength) + torch.mul(t0, (1.0 - conditioning_to_strength)) - t_to = conditioning_to[i][1].copy() - if pooled_output_from is not None and pooled_output_to is not None: - t_to["pooled_output"] = torch.mul(pooled_output_to, conditioning_to_strength) + torch.mul(pooled_output_from, (1.0 - conditioning_to_strength)) - elif pooled_output_from is not None: - t_to["pooled_output"] = pooled_output_from - - n = [tw, t_to] - out.append(n) - return (out, ) - -class ConditioningConcat: - @classmethod - def INPUT_TYPES(s): - return {"required": { - "conditioning_to": ("CONDITIONING",), - "conditioning_from": ("CONDITIONING",), - }} - RETURN_TYPES = ("CONDITIONING",) - FUNCTION = "concat" - - CATEGORY = "conditioning" - - def concat(self, conditioning_to, conditioning_from): - out = [] - - if len(conditioning_from) > 1: - logging.warning("Warning: ConditioningConcat conditioning_from contains more than 1 cond, only the first one will actually be applied to conditioning_to.") - - cond_from = conditioning_from[0][0] - - for i in range(len(conditioning_to)): - t1 = conditioning_to[i][0] - tw = torch.cat((t1, cond_from),1) - n = [tw, conditioning_to[i][1].copy()] - out.append(n) - - return (out, ) - -class ConditioningSetArea: - @classmethod - def INPUT_TYPES(s): - return {"required": {"conditioning": ("CONDITIONING", ), - "width": ("INT", {"default": 64, "min": 64, "max": MAX_RESOLUTION, "step": 8}), - "height": ("INT", {"default": 64, "min": 64, "max": MAX_RESOLUTION, "step": 8}), - "x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), - "y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), - "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), - }} - RETURN_TYPES = ("CONDITIONING",) - FUNCTION = "append" - - CATEGORY = "conditioning" - - def append(self, conditioning, width, height, x, y, strength): - c = node_helpers.conditioning_set_values(conditioning, {"area": (height // 8, width // 8, y // 8, x // 8), - "strength": strength, - "set_area_to_bounds": False}) - return (c, ) - -class ConditioningSetAreaPercentage: - @classmethod - def INPUT_TYPES(s): - return {"required": {"conditioning": ("CONDITIONING", ), - "width": ("FLOAT", {"default": 1.0, "min": 0, "max": 1.0, "step": 0.01}), - "height": ("FLOAT", {"default": 1.0, "min": 0, "max": 1.0, "step": 0.01}), - "x": ("FLOAT", {"default": 0, "min": 0, "max": 1.0, "step": 0.01}), - "y": ("FLOAT", {"default": 0, "min": 0, "max": 1.0, "step": 0.01}), - "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), - }} - RETURN_TYPES = ("CONDITIONING",) - FUNCTION = "append" - - CATEGORY = "conditioning" - - def append(self, conditioning, width, height, x, y, strength): - c = node_helpers.conditioning_set_values(conditioning, {"area": ("percentage", height, width, y, x), - "strength": strength, - "set_area_to_bounds": False}) - return (c, ) - -class ConditioningSetAreaStrength: - @classmethod - def INPUT_TYPES(s): - return {"required": {"conditioning": ("CONDITIONING", ), - "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), - }} - RETURN_TYPES = ("CONDITIONING",) - FUNCTION = "append" - - CATEGORY = "conditioning" - - def append(self, conditioning, strength): - c = node_helpers.conditioning_set_values(conditioning, {"strength": strength}) - return (c, ) - - -class ConditioningSetMask: - @classmethod - def INPUT_TYPES(s): - return {"required": {"conditioning": ("CONDITIONING", ), - "mask": ("MASK", ), - "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), - "set_cond_area": (["default", "mask bounds"],), - }} - RETURN_TYPES = ("CONDITIONING",) - FUNCTION = "append" - - CATEGORY = "conditioning" - - def append(self, conditioning, mask, set_cond_area, strength): - set_area_to_bounds = False - if set_cond_area != "default": - set_area_to_bounds = True - if len(mask.shape) < 3: - mask = mask.unsqueeze(0) - - c = node_helpers.conditioning_set_values(conditioning, {"mask": mask, - "set_area_to_bounds": set_area_to_bounds, - "mask_strength": strength}) - return (c, ) - -class ConditioningZeroOut: - @classmethod - def INPUT_TYPES(s): - return {"required": {"conditioning": ("CONDITIONING", )}} - RETURN_TYPES = ("CONDITIONING",) - FUNCTION = "zero_out" - - CATEGORY = "advanced/conditioning" - - def zero_out(self, conditioning): - c = [] - for t in conditioning: - d = t[1].copy() - if "pooled_output" in d: - d["pooled_output"] = torch.zeros_like(d["pooled_output"]) - n = [torch.zeros_like(t[0]), d] - c.append(n) - return (c, ) - -class ConditioningSetTimestepRange: - @classmethod - def INPUT_TYPES(s): - return {"required": {"conditioning": ("CONDITIONING", ), - "start": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.001}), - "end": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.001}) - }} - RETURN_TYPES = ("CONDITIONING",) - FUNCTION = "set_range" - - CATEGORY = "advanced/conditioning" - - def set_range(self, conditioning, start, end): - c = node_helpers.conditioning_set_values(conditioning, {"start_percent": start, - "end_percent": end}) - return (c, ) - -class VAEDecode: - @classmethod - def INPUT_TYPES(s): - return {"required": { "samples": ("LATENT", ), "vae": ("VAE", )}} - RETURN_TYPES = ("IMAGE",) - FUNCTION = "decode" - - CATEGORY = "latent" - - def decode(self, vae, samples): - return (vae.decode(samples["samples"]), ) - -class VAEDecodeTiled: - @classmethod - def INPUT_TYPES(s): - return {"required": {"samples": ("LATENT", ), "vae": ("VAE", ), - "tile_size": ("INT", {"default": 512, "min": 320, "max": 4096, "step": 64}) - }} - RETURN_TYPES = ("IMAGE",) - FUNCTION = "decode" - - CATEGORY = "_for_testing" - - def decode(self, vae, samples, tile_size): - return (vae.decode_tiled(samples["samples"], tile_x=tile_size // 8, tile_y=tile_size // 8, ), ) - -class VAEEncode: - @classmethod - def INPUT_TYPES(s): - return {"required": { "pixels": ("IMAGE", ), "vae": ("VAE", )}} - RETURN_TYPES = ("LATENT",) - FUNCTION = "encode" - - CATEGORY = "latent" - - def encode(self, vae, pixels): - t = vae.encode(pixels[:,:,:,:3]) - return ({"samples":t}, ) - -class VAEEncodeTiled: - @classmethod - def INPUT_TYPES(s): - return {"required": {"pixels": ("IMAGE", ), "vae": ("VAE", ), - "tile_size": ("INT", {"default": 512, "min": 320, "max": 4096, "step": 64}) - }} - RETURN_TYPES = ("LATENT",) - FUNCTION = "encode" - - CATEGORY = "_for_testing" - - def encode(self, vae, pixels, tile_size): - t = vae.encode_tiled(pixels[:,:,:,:3], tile_x=tile_size, tile_y=tile_size, ) - return ({"samples":t}, ) - -class VAEEncodeForInpaint: - @classmethod - def INPUT_TYPES(s): - return {"required": { "pixels": ("IMAGE", ), "vae": ("VAE", ), "mask": ("MASK", ), "grow_mask_by": ("INT", {"default": 6, "min": 0, "max": 64, "step": 1}),}} - RETURN_TYPES = ("LATENT",) - FUNCTION = "encode" - - CATEGORY = "latent/inpaint" - - def encode(self, vae, pixels, mask, grow_mask_by=6): - x = (pixels.shape[1] // vae.downscale_ratio) * vae.downscale_ratio - y = (pixels.shape[2] // vae.downscale_ratio) * vae.downscale_ratio - mask = torch.nn.functional.interpolate(mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])), size=(pixels.shape[1], pixels.shape[2]), mode="bilinear") - - pixels = pixels.clone() - if pixels.shape[1] != x or pixels.shape[2] != y: - x_offset = (pixels.shape[1] % vae.downscale_ratio) // 2 - y_offset = (pixels.shape[2] % vae.downscale_ratio) // 2 - pixels = pixels[:,x_offset:x + x_offset, y_offset:y + y_offset,:] - mask = mask[:,:,x_offset:x + x_offset, y_offset:y + y_offset] - - #grow mask by a few pixels to keep things seamless in latent space - if grow_mask_by == 0: - mask_erosion = mask - else: - kernel_tensor = torch.ones((1, 1, grow_mask_by, grow_mask_by)) - padding = math.ceil((grow_mask_by - 1) / 2) - - mask_erosion = torch.clamp(torch.nn.functional.conv2d(mask.round(), kernel_tensor, padding=padding), 0, 1) - - m = (1.0 - mask.round()).squeeze(1) - for i in range(3): - pixels[:,:,:,i] -= 0.5 - pixels[:,:,:,i] *= m - pixels[:,:,:,i] += 0.5 - t = vae.encode(pixels) - - return ({"samples":t, "noise_mask": (mask_erosion[:,:,:x,:y].round())}, ) - - -class InpaintModelConditioning: - @classmethod - def INPUT_TYPES(s): - return {"required": {"positive": ("CONDITIONING", ), - "negative": ("CONDITIONING", ), - "vae": ("VAE", ), - "pixels": ("IMAGE", ), - "mask": ("MASK", ), - }} - - RETURN_TYPES = ("CONDITIONING","CONDITIONING","LATENT") - RETURN_NAMES = ("positive", "negative", "latent") - FUNCTION = "encode" - - CATEGORY = "conditioning/inpaint" - - def encode(self, positive, negative, pixels, vae, mask): - x = (pixels.shape[1] // 8) * 8 - y = (pixels.shape[2] // 8) * 8 - mask = torch.nn.functional.interpolate(mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])), size=(pixels.shape[1], pixels.shape[2]), mode="bilinear") - - orig_pixels = pixels - pixels = orig_pixels.clone() - if pixels.shape[1] != x or pixels.shape[2] != y: - x_offset = (pixels.shape[1] % 8) // 2 - y_offset = (pixels.shape[2] % 8) // 2 - pixels = pixels[:,x_offset:x + x_offset, y_offset:y + y_offset,:] - mask = mask[:,:,x_offset:x + x_offset, y_offset:y + y_offset] - - m = (1.0 - mask.round()).squeeze(1) - for i in range(3): - pixels[:,:,:,i] -= 0.5 - pixels[:,:,:,i] *= m - pixels[:,:,:,i] += 0.5 - concat_latent = vae.encode(pixels) - orig_latent = vae.encode(orig_pixels) - - out_latent = {} - - out_latent["samples"] = orig_latent - out_latent["noise_mask"] = mask - - out = [] - for conditioning in [positive, negative]: - c = node_helpers.conditioning_set_values(conditioning, {"concat_latent_image": concat_latent, - "concat_mask": mask}) - out.append(c) - return (out[0], out[1], out_latent) - - -class SaveLatent: - def __init__(self): - self.output_dir = folder_paths.get_output_directory() - - @classmethod - def INPUT_TYPES(s): - return {"required": { "samples": ("LATENT", ), - "filename_prefix": ("STRING", {"default": "latents/ComfyUI"})}, - "hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"}, - } - RETURN_TYPES = () - FUNCTION = "save" - - OUTPUT_NODE = True - - CATEGORY = "_for_testing" - - def save(self, samples, filename_prefix="ComfyUI", prompt=None, extra_pnginfo=None): - full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path(filename_prefix, self.output_dir) - - # support save metadata for latent sharing - prompt_info = "" - if prompt is not None: - prompt_info = json.dumps(prompt) - - metadata = None - if not args.disable_metadata: - metadata = {"prompt": prompt_info} - if extra_pnginfo is not None: - for x in extra_pnginfo: - metadata[x] = json.dumps(extra_pnginfo[x]) - - file = f"{filename}_{counter:05}_.latent" - - results = list() - results.append({ - "filename": file, - "subfolder": subfolder, - "type": "output" - }) - - file = os.path.join(full_output_folder, file) - - output = {} - output["latent_tensor"] = samples["samples"] - output["latent_format_version_0"] = torch.tensor([]) - - comfy.utils.save_torch_file(output, file, metadata=metadata) - return { "ui": { "latents": results } } - - -class LoadLatent: - @classmethod - def INPUT_TYPES(s): - input_dir = folder_paths.get_input_directory() - files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f)) and f.endswith(".latent")] - return {"required": {"latent": [sorted(files), ]}, } - - CATEGORY = "_for_testing" - - RETURN_TYPES = ("LATENT", ) - FUNCTION = "load" - - def load(self, latent): - latent_path = folder_paths.get_annotated_filepath(latent) - latent = safetensors.torch.load_file(latent_path, device="cpu") - multiplier = 1.0 - if "latent_format_version_0" not in latent: - multiplier = 1.0 / 0.18215 - samples = {"samples": latent["latent_tensor"].float() * multiplier} - return (samples, ) - - @classmethod - def IS_CHANGED(s, latent): - image_path = folder_paths.get_annotated_filepath(latent) - m = hashlib.sha256() - with open(image_path, 'rb') as f: - m.update(f.read()) - return m.digest().hex() - - @classmethod - def VALIDATE_INPUTS(s, latent): - if not folder_paths.exists_annotated_filepath(latent): - return "Invalid latent file: {}".format(latent) - return True - - -class CheckpointLoader: - @classmethod - def INPUT_TYPES(s): - return {"required": { "config_name": (folder_paths.get_filename_list("configs"), ), - "ckpt_name": (folder_paths.get_filename_list("checkpoints"), )}} - RETURN_TYPES = ("MODEL", "CLIP", "VAE") - FUNCTION = "load_checkpoint" - - CATEGORY = "advanced/loaders" - - def load_checkpoint(self, config_name, ckpt_name): - config_path = folder_paths.get_full_path("configs", config_name) - ckpt_path = folder_paths.get_full_path("checkpoints", ckpt_name) - return comfy.sd.load_checkpoint(config_path, ckpt_path, output_vae=True, output_clip=True, embedding_directory=folder_paths.get_folder_paths("embeddings")) - -class CheckpointLoaderSimple: - @classmethod - def INPUT_TYPES(s): - return {"required": { "ckpt_name": (folder_paths.get_filename_list("checkpoints"), ), - }} - RETURN_TYPES = ("MODEL", "CLIP", "VAE") - FUNCTION = "load_checkpoint" - - CATEGORY = "loaders" - - def load_checkpoint(self, ckpt_name): - ckpt_path = folder_paths.get_full_path("checkpoints", ckpt_name) - out = comfy.sd.load_checkpoint_guess_config(ckpt_path, output_vae=True, output_clip=True, embedding_directory=folder_paths.get_folder_paths("embeddings")) - return out[:3] - -class DiffusersLoader: - @classmethod - def INPUT_TYPES(cls): - paths = [] - for search_path in folder_paths.get_folder_paths("diffusers"): - if os.path.exists(search_path): - for root, subdir, files in os.walk(search_path, followlinks=True): - if "model_index.json" in files: - paths.append(os.path.relpath(root, start=search_path)) - - return {"required": {"model_path": (paths,), }} - RETURN_TYPES = ("MODEL", "CLIP", "VAE") - FUNCTION = "load_checkpoint" - - CATEGORY = "advanced/loaders/deprecated" - - def load_checkpoint(self, model_path, output_vae=True, output_clip=True): - for search_path in folder_paths.get_folder_paths("diffusers"): - if os.path.exists(search_path): - path = os.path.join(search_path, model_path) - if os.path.exists(path): - model_path = path - break - - return comfy.diffusers_load.load_diffusers(model_path, output_vae=output_vae, output_clip=output_clip, embedding_directory=folder_paths.get_folder_paths("embeddings")) - - -class unCLIPCheckpointLoader: - @classmethod - def INPUT_TYPES(s): - return {"required": { "ckpt_name": (folder_paths.get_filename_list("checkpoints"), ), - }} - RETURN_TYPES = ("MODEL", "CLIP", "VAE", "CLIP_VISION") - FUNCTION = "load_checkpoint" - - CATEGORY = "loaders" - - def load_checkpoint(self, ckpt_name, output_vae=True, output_clip=True): - ckpt_path = folder_paths.get_full_path("checkpoints", ckpt_name) - out = comfy.sd.load_checkpoint_guess_config(ckpt_path, output_vae=True, output_clip=True, output_clipvision=True, embedding_directory=folder_paths.get_folder_paths("embeddings")) - return out - -class CLIPSetLastLayer: - @classmethod - def INPUT_TYPES(s): - return {"required": { "clip": ("CLIP", ), - "stop_at_clip_layer": ("INT", {"default": -1, "min": -24, "max": -1, "step": 1}), - }} - RETURN_TYPES = ("CLIP",) - FUNCTION = "set_last_layer" - - CATEGORY = "conditioning" - - def set_last_layer(self, clip, stop_at_clip_layer): - clip = clip.clone() - clip.clip_layer(stop_at_clip_layer) - return (clip,) - -class LoraLoader: - def __init__(self): - self.loaded_lora = None - - @classmethod - def INPUT_TYPES(s): - return {"required": { "model": ("MODEL",), - "clip": ("CLIP", ), - "lora_name": (folder_paths.get_filename_list("loras"), ), - "strength_model": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01}), - "strength_clip": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01}), - }} - RETURN_TYPES = ("MODEL", "CLIP") - FUNCTION = "load_lora" - - CATEGORY = "loaders" - - def load_lora(self, model, clip, lora_name, strength_model, strength_clip): - if strength_model == 0 and strength_clip == 0: - return (model, clip) - - lora_path = folder_paths.get_full_path("loras", lora_name) - lora = None - if self.loaded_lora is not None: - if self.loaded_lora[0] == lora_path: - lora = self.loaded_lora[1] - else: - temp = self.loaded_lora - self.loaded_lora = None - del temp - - if lora is None: - lora = comfy.utils.load_torch_file(lora_path, safe_load=True) - self.loaded_lora = (lora_path, lora) - - model_lora, clip_lora = comfy.sd.load_lora_for_models(model, clip, lora, strength_model, strength_clip) - return (model_lora, clip_lora) - -class LoraLoaderModelOnly(LoraLoader): - @classmethod - def INPUT_TYPES(s): - return {"required": { "model": ("MODEL",), - "lora_name": (folder_paths.get_filename_list("loras"), ), - "strength_model": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01}), - }} - RETURN_TYPES = ("MODEL",) - FUNCTION = "load_lora_model_only" - - def load_lora_model_only(self, model, lora_name, strength_model): - return (self.load_lora(model, None, lora_name, strength_model, 0)[0],) - -class VAELoader: - @staticmethod - def vae_list(): - vaes = folder_paths.get_filename_list("vae") - approx_vaes = folder_paths.get_filename_list("vae_approx") - sdxl_taesd_enc = False - sdxl_taesd_dec = False - sd1_taesd_enc = False - sd1_taesd_dec = False - - for v in approx_vaes: - if v.startswith("taesd_decoder."): - sd1_taesd_dec = True - elif v.startswith("taesd_encoder."): - sd1_taesd_enc = True - elif v.startswith("taesdxl_decoder."): - sdxl_taesd_dec = True - elif v.startswith("taesdxl_encoder."): - sdxl_taesd_enc = True - if sd1_taesd_dec and sd1_taesd_enc: - vaes.append("taesd") - if sdxl_taesd_dec and sdxl_taesd_enc: - vaes.append("taesdxl") - return vaes - - @staticmethod - def load_taesd(name): - sd = {} - approx_vaes = folder_paths.get_filename_list("vae_approx") - - encoder = next(filter(lambda a: a.startswith("{}_encoder.".format(name)), approx_vaes)) - decoder = next(filter(lambda a: a.startswith("{}_decoder.".format(name)), approx_vaes)) - - enc = comfy.utils.load_torch_file(folder_paths.get_full_path("vae_approx", encoder)) - for k in enc: - sd["taesd_encoder.{}".format(k)] = enc[k] - - dec = comfy.utils.load_torch_file(folder_paths.get_full_path("vae_approx", decoder)) - for k in dec: - sd["taesd_decoder.{}".format(k)] = dec[k] - - if name == "taesd": - sd["vae_scale"] = torch.tensor(0.18215) - elif name == "taesdxl": - sd["vae_scale"] = torch.tensor(0.13025) - return sd - - @classmethod - def INPUT_TYPES(s): - return {"required": { "vae_name": (s.vae_list(), )}} - RETURN_TYPES = ("VAE",) - FUNCTION = "load_vae" - - CATEGORY = "loaders" - - #TODO: scale factor? - def load_vae(self, vae_name): - if vae_name in ["taesd", "taesdxl"]: - sd = self.load_taesd(vae_name) - else: - vae_path = folder_paths.get_full_path("vae", vae_name) - sd = comfy.utils.load_torch_file(vae_path) - vae = comfy.sd.VAE(sd=sd) - return (vae,) - -class ControlNetLoader: - @classmethod - def INPUT_TYPES(s): - return {"required": { "control_net_name": (folder_paths.get_filename_list("controlnet"), )}} - - RETURN_TYPES = ("CONTROL_NET",) - FUNCTION = "load_controlnet" - - CATEGORY = "loaders" - - def load_controlnet(self, control_net_name): - controlnet_path = folder_paths.get_full_path("controlnet", control_net_name) - controlnet = comfy.controlnet.load_controlnet(controlnet_path) - return (controlnet,) - -class DiffControlNetLoader: - @classmethod - def INPUT_TYPES(s): - return {"required": { "model": ("MODEL",), - "control_net_name": (folder_paths.get_filename_list("controlnet"), )}} - - RETURN_TYPES = ("CONTROL_NET",) - FUNCTION = "load_controlnet" - - CATEGORY = "loaders" - - def load_controlnet(self, model, control_net_name): - controlnet_path = folder_paths.get_full_path("controlnet", control_net_name) - controlnet = comfy.controlnet.load_controlnet(controlnet_path, model) - return (controlnet,) - - -class ControlNetApply: - @classmethod - def INPUT_TYPES(s): - return {"required": {"conditioning": ("CONDITIONING", ), - "control_net": ("CONTROL_NET", ), - "image": ("IMAGE", ), - "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}) - }} - RETURN_TYPES = ("CONDITIONING",) - FUNCTION = "apply_controlnet" - - CATEGORY = "conditioning" - - def apply_controlnet(self, conditioning, control_net, image, strength): - if strength == 0: - return (conditioning, ) - - c = [] - control_hint = image.movedim(-1,1) - for t in conditioning: - n = [t[0], t[1].copy()] - c_net = control_net.copy().set_cond_hint(control_hint, strength) - if 'control' in t[1]: - c_net.set_previous_controlnet(t[1]['control']) - n[1]['control'] = c_net - n[1]['control_apply_to_uncond'] = True - c.append(n) - return (c, ) - - -class ControlNetApplyAdvanced: - @classmethod - def INPUT_TYPES(s): - return {"required": {"positive": ("CONDITIONING", ), - "negative": ("CONDITIONING", ), - "control_net": ("CONTROL_NET", ), - "image": ("IMAGE", ), - "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), - "start_percent": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.001}), - "end_percent": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.001}) - }} - - RETURN_TYPES = ("CONDITIONING","CONDITIONING") - RETURN_NAMES = ("positive", "negative") - FUNCTION = "apply_controlnet" - - CATEGORY = "conditioning" - - def apply_controlnet(self, positive, negative, control_net, image, strength, start_percent, end_percent): - if strength == 0: - return (positive, negative) - - control_hint = image.movedim(-1,1) - cnets = {} - - out = [] - for conditioning in [positive, negative]: - c = [] - for t in conditioning: - d = t[1].copy() - - prev_cnet = d.get('control', None) - if prev_cnet in cnets: - c_net = cnets[prev_cnet] - else: - c_net = control_net.copy().set_cond_hint(control_hint, strength, (start_percent, end_percent)) - c_net.set_previous_controlnet(prev_cnet) - cnets[prev_cnet] = c_net - - d['control'] = c_net - d['control_apply_to_uncond'] = False - n = [t[0], d] - c.append(n) - out.append(c) - return (out[0], out[1]) - - -class UNETLoader: - @classmethod - def INPUT_TYPES(s): - return {"required": { "unet_name": (folder_paths.get_filename_list("unet"), ), - }} - RETURN_TYPES = ("MODEL",) - FUNCTION = "load_unet" - - CATEGORY = "advanced/loaders" - - def load_unet(self, unet_name): - unet_path = folder_paths.get_full_path("unet", unet_name) - model = comfy.sd.load_unet(unet_path) - return (model,) - -class CLIPLoader: - @classmethod - def INPUT_TYPES(s): - return {"required": { "clip_name": (folder_paths.get_filename_list("clip"), ), - "type": (["stable_diffusion", "stable_cascade"], ), - }} - RETURN_TYPES = ("CLIP",) - FUNCTION = "load_clip" - - CATEGORY = "advanced/loaders" - - def load_clip(self, clip_name, type="stable_diffusion"): - clip_type = comfy.sd.CLIPType.STABLE_DIFFUSION - if type == "stable_cascade": - clip_type = comfy.sd.CLIPType.STABLE_CASCADE - - clip_path = folder_paths.get_full_path("clip", clip_name) - clip = comfy.sd.load_clip(ckpt_paths=[clip_path], embedding_directory=folder_paths.get_folder_paths("embeddings"), clip_type=clip_type) - return (clip,) - -class DualCLIPLoader: - @classmethod - def INPUT_TYPES(s): - return {"required": { "clip_name1": (folder_paths.get_filename_list("clip"), ), "clip_name2": (folder_paths.get_filename_list("clip"), ), - }} - RETURN_TYPES = ("CLIP",) - FUNCTION = "load_clip" - - CATEGORY = "advanced/loaders" - - def load_clip(self, clip_name1, clip_name2): - clip_path1 = folder_paths.get_full_path("clip", clip_name1) - clip_path2 = folder_paths.get_full_path("clip", clip_name2) - clip = comfy.sd.load_clip(ckpt_paths=[clip_path1, clip_path2], embedding_directory=folder_paths.get_folder_paths("embeddings")) - return (clip,) - -class CLIPVisionLoader: - @classmethod - def INPUT_TYPES(s): - return {"required": { "clip_name": (folder_paths.get_filename_list("clip_vision"), ), - }} - RETURN_TYPES = ("CLIP_VISION",) - FUNCTION = "load_clip" - - CATEGORY = "loaders" - - def load_clip(self, clip_name): - clip_path = folder_paths.get_full_path("clip_vision", clip_name) - clip_vision = comfy.clip_vision.load(clip_path) - return (clip_vision,) - -class CLIPVisionEncode: - @classmethod - def INPUT_TYPES(s): - return {"required": { "clip_vision": ("CLIP_VISION",), - "image": ("IMAGE",) - }} - RETURN_TYPES = ("CLIP_VISION_OUTPUT",) - FUNCTION = "encode" - - CATEGORY = "conditioning" - - def encode(self, clip_vision, image): - output = clip_vision.encode_image(image) - return (output,) - -class StyleModelLoader: - @classmethod - def INPUT_TYPES(s): - return {"required": { "style_model_name": (folder_paths.get_filename_list("style_models"), )}} - - RETURN_TYPES = ("STYLE_MODEL",) - FUNCTION = "load_style_model" - - CATEGORY = "loaders" - - def load_style_model(self, style_model_name): - style_model_path = folder_paths.get_full_path("style_models", style_model_name) - style_model = comfy.sd.load_style_model(style_model_path) - return (style_model,) - - -class StyleModelApply: - @classmethod - def INPUT_TYPES(s): - return {"required": {"conditioning": ("CONDITIONING", ), - "style_model": ("STYLE_MODEL", ), - "clip_vision_output": ("CLIP_VISION_OUTPUT", ), - }} - RETURN_TYPES = ("CONDITIONING",) - FUNCTION = "apply_stylemodel" - - CATEGORY = "conditioning/style_model" - - def apply_stylemodel(self, clip_vision_output, style_model, conditioning): - cond = style_model.get_cond(clip_vision_output).flatten(start_dim=0, end_dim=1).unsqueeze(dim=0) - c = [] - for t in conditioning: - n = [torch.cat((t[0], cond), dim=1), t[1].copy()] - c.append(n) - return (c, ) - -class unCLIPConditioning: - @classmethod - def INPUT_TYPES(s): - return {"required": {"conditioning": ("CONDITIONING", ), - "clip_vision_output": ("CLIP_VISION_OUTPUT", ), - "strength": ("FLOAT", {"default": 1.0, "min": -10.0, "max": 10.0, "step": 0.01}), - "noise_augmentation": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.01}), - }} - RETURN_TYPES = ("CONDITIONING",) - FUNCTION = "apply_adm" - - CATEGORY = "conditioning" - - def apply_adm(self, conditioning, clip_vision_output, strength, noise_augmentation): - if strength == 0: - return (conditioning, ) - - c = [] - for t in conditioning: - o = t[1].copy() - x = {"clip_vision_output": clip_vision_output, "strength": strength, "noise_augmentation": noise_augmentation} - if "unclip_conditioning" in o: - o["unclip_conditioning"] = o["unclip_conditioning"][:] + [x] - else: - o["unclip_conditioning"] = [x] - n = [t[0], o] - c.append(n) - return (c, ) - -class GLIGENLoader: - @classmethod - def INPUT_TYPES(s): - return {"required": { "gligen_name": (folder_paths.get_filename_list("gligen"), )}} - - RETURN_TYPES = ("GLIGEN",) - FUNCTION = "load_gligen" - - CATEGORY = "loaders" - - def load_gligen(self, gligen_name): - gligen_path = folder_paths.get_full_path("gligen", gligen_name) - gligen = comfy.sd.load_gligen(gligen_path) - return (gligen,) - -class GLIGENTextBoxApply: - @classmethod - def INPUT_TYPES(s): - return {"required": {"conditioning_to": ("CONDITIONING", ), - "clip": ("CLIP", ), - "gligen_textbox_model": ("GLIGEN", ), - "text": ("STRING", {"multiline": True, "dynamicPrompts": True}), - "width": ("INT", {"default": 64, "min": 8, "max": MAX_RESOLUTION, "step": 8}), - "height": ("INT", {"default": 64, "min": 8, "max": MAX_RESOLUTION, "step": 8}), - "x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), - "y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), - }} - RETURN_TYPES = ("CONDITIONING",) - FUNCTION = "append" - - CATEGORY = "conditioning/gligen" - - def append(self, conditioning_to, clip, gligen_textbox_model, text, width, height, x, y): - c = [] - cond, cond_pooled = clip.encode_from_tokens(clip.tokenize(text), return_pooled="unprojected") - for t in conditioning_to: - n = [t[0], t[1].copy()] - position_params = [(cond_pooled, height // 8, width // 8, y // 8, x // 8)] - prev = [] - if "gligen" in n[1]: - prev = n[1]['gligen'][2] - - n[1]['gligen'] = ("position", gligen_textbox_model, prev + position_params) - c.append(n) - return (c, ) - -class EmptyLatentImage: - def __init__(self): - self.device = comfy.model_management.intermediate_device() - - @classmethod - def INPUT_TYPES(s): - return {"required": { "width": ("INT", {"default": 512, "min": 16, "max": MAX_RESOLUTION, "step": 8}), - "height": ("INT", {"default": 512, "min": 16, "max": MAX_RESOLUTION, "step": 8}), - "batch_size": ("INT", {"default": 1, "min": 1, "max": 4096})}} - RETURN_TYPES = ("LATENT",) - FUNCTION = "generate" - - CATEGORY = "latent" - - def generate(self, width, height, batch_size=1): - latent = torch.zeros([batch_size, 4, height // 8, width // 8], device=self.device) - return ({"samples":latent}, ) - - -class LatentFromBatch: - @classmethod - def INPUT_TYPES(s): - return {"required": { "samples": ("LATENT",), - "batch_index": ("INT", {"default": 0, "min": 0, "max": 63}), - "length": ("INT", {"default": 1, "min": 1, "max": 64}), - }} - RETURN_TYPES = ("LATENT",) - FUNCTION = "frombatch" - - CATEGORY = "latent/batch" - - def frombatch(self, samples, batch_index, length): - s = samples.copy() - s_in = samples["samples"] - batch_index = min(s_in.shape[0] - 1, batch_index) - length = min(s_in.shape[0] - batch_index, length) - s["samples"] = s_in[batch_index:batch_index + length].clone() - if "noise_mask" in samples: - masks = samples["noise_mask"] - if masks.shape[0] == 1: - s["noise_mask"] = masks.clone() - else: - if masks.shape[0] < s_in.shape[0]: - masks = masks.repeat(math.ceil(s_in.shape[0] / masks.shape[0]), 1, 1, 1)[:s_in.shape[0]] - s["noise_mask"] = masks[batch_index:batch_index + length].clone() - if "batch_index" not in s: - s["batch_index"] = [x for x in range(batch_index, batch_index+length)] - else: - s["batch_index"] = samples["batch_index"][batch_index:batch_index + length] - return (s,) - -class RepeatLatentBatch: - @classmethod - def INPUT_TYPES(s): - return {"required": { "samples": ("LATENT",), - "amount": ("INT", {"default": 1, "min": 1, "max": 64}), - }} - RETURN_TYPES = ("LATENT",) - FUNCTION = "repeat" - - CATEGORY = "latent/batch" - - def repeat(self, samples, amount): - s = samples.copy() - s_in = samples["samples"] +class AudioData: + def __init__(self, audio_file) -> None: - s["samples"] = s_in.repeat((amount, 1,1,1)) - if "noise_mask" in samples and samples["noise_mask"].shape[0] > 1: - masks = samples["noise_mask"] - if masks.shape[0] < s_in.shape[0]: - masks = masks.repeat(math.ceil(s_in.shape[0] / masks.shape[0]), 1, 1, 1)[:s_in.shape[0]] - s["noise_mask"] = samples["noise_mask"].repeat((amount, 1,1,1)) - if "batch_index" in s: - offset = max(s["batch_index"]) - min(s["batch_index"]) + 1 - s["batch_index"] = s["batch_index"] + [x + (i * offset) for i in range(1, amount) for x in s["batch_index"]] - return (s,) + # Extract the sample rate + sample_rate = audio_file.frame_rate -class LatentUpscale: - upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "bislerp"] - crop_methods = ["disabled", "center"] + # Get the number of audio channels + num_channels = audio_file.channels + + # Extract the audio data as a NumPy array + audio_data = np.array(audio_file.get_array_of_samples()) + self.audio_data = audio_data + self.sample_rate = sample_rate + self.num_channels = num_channels + + def get_channel_audio_data(self, channel: int): + if channel < 0 or channel >= self.num_channels: + raise IndexError(f"Channel '{channel}' out of range. total channels is '{self.num_channels}'.") + return self.audio_data[channel::self.num_channels] + + def get_channel_fft(self, channel: int): + audio_data = self.get_channel_audio_data(channel) + return fft(audio_data) + + + + +checkpoint_path_voice = os.path.join(folder_paths.models_dir,"deepfuze") +print(checkpoint_path_voice) + +audio_path = os.path.join(folder_paths.get_input_directory(),"audio") +os.makedirs(audio_path,exist_ok=True) + +class TTS_generation: @classmethod - def INPUT_TYPES(s): - return {"required": { "samples": ("LATENT",), "upscale_method": (s.upscale_methods,), - "width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8}), - "height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8}), - "crop": (s.crop_methods,)}} - RETURN_TYPES = ("LATENT",) - FUNCTION = "upscale" + def INPUT_TYPES(self): + return { + "required": { + "audio": ("AUDIO",), + "text": ("STRING",{ + "multiline": True, + "default": "Uploaded Audio and text should be in same language" + }), + "device": (["cpu","cuda","mps"],), + "supported_language": ("English (en), Spanish (es), French (fr), German (de), Italian (it), Portuguese (pt), Polish (pl), Turkish (tr), Russian (ru), Dutch (nl), Czech (cs), Arabic (ar), Chinese (zh-cn), Japanese (ja), Hungarian (hu), Korean (ko), Hindi (hi)".split(","),), + } + } + + RETURN_TYPES = ("AUDIO",) # Output type(s) of the node + FUNCTION = "generate_audio" # Entry-point method name - CATEGORY = "latent" + CATEGORY = "DeepFuze" # Category for the node in the UI - def upscale(self, samples, upscale_method, width, height, crop): - if width == 0 and height == 0: - s = samples - else: - s = samples.copy() + def generate_audio(self, audio, text,device,supported_language): + print(text) + language = supported_language.split("(")[1][:-1] + file_path = os.path.join(audio_path,str(time.time()).replace(".","")+".wav") + write(file_path,audio.sample_rate,audio.audio_data) + command = [ + 'python', 'tts_generation.py', + '--model', checkpoint_path_voice, + '--text', text, + '--language', language, + '--speaker_wav', file_path, + '--output_file', file_path, + '--device', device + ] + result = subprocess.run(command, cwd="custom_nodes/ComfyUI-DeepFuze",capture_output=True, text=True) - if width == 0: - height = max(64, height) - width = max(64, round(samples["samples"].shape[3] * height / samples["samples"].shape[2])) - elif height == 0: - width = max(64, width) - height = max(64, round(samples["samples"].shape[2] * width / samples["samples"].shape[3])) - else: - width = max(64, width) - height = max(64, height) + print("stdout:", result.stdout) + print("stderr:", result.stderr) + audio_file = AudioSegment.from_file(file_path, format="wav") + audio_data = AudioData(audio_file) + return (audio_data,) - s["samples"] = comfy.utils.common_upscale(samples["samples"], width // 8, height // 8, upscale_method, crop) - return (s,) -class LatentUpscaleBy: - upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "bislerp"] - @classmethod - def INPUT_TYPES(s): - return {"required": { "samples": ("LATENT",), "upscale_method": (s.upscale_methods,), - "scale_by": ("FLOAT", {"default": 1.5, "min": 0.01, "max": 8.0, "step": 0.01}),}} - RETURN_TYPES = ("LATENT",) - FUNCTION = "upscale" - - CATEGORY = "latent" - - def upscale(self, samples, upscale_method, scale_by): - s = samples.copy() - width = round(samples["samples"].shape[3] * scale_by) - height = round(samples["samples"].shape[2] * scale_by) - s["samples"] = comfy.utils.common_upscale(samples["samples"], width, height, upscale_method, "disabled") - return (s,) - -class LatentRotate: - @classmethod - def INPUT_TYPES(s): - return {"required": { "samples": ("LATENT",), - "rotation": (["none", "90 degrees", "180 degrees", "270 degrees"],), - }} - RETURN_TYPES = ("LATENT",) - FUNCTION = "rotate" - - CATEGORY = "latent/transform" - - def rotate(self, samples, rotation): - s = samples.copy() - rotate_by = 0 - if rotation.startswith("90"): - rotate_by = 1 - elif rotation.startswith("180"): - rotate_by = 2 - elif rotation.startswith("270"): - rotate_by = 3 - - s["samples"] = torch.rot90(samples["samples"], k=rotate_by, dims=[3, 2]) - return (s,) - -class LatentFlip: - @classmethod - def INPUT_TYPES(s): - return {"required": { "samples": ("LATENT",), - "flip_method": (["x-axis: vertically", "y-axis: horizontally"],), - }} - RETURN_TYPES = ("LATENT",) - FUNCTION = "flip" - - CATEGORY = "latent/transform" - - def flip(self, samples, flip_method): - s = samples.copy() - if flip_method.startswith("x"): - s["samples"] = torch.flip(samples["samples"], dims=[2]) - elif flip_method.startswith("y"): - s["samples"] = torch.flip(samples["samples"], dims=[3]) - - return (s,) - -class LatentComposite: - @classmethod - def INPUT_TYPES(s): - return {"required": { "samples_to": ("LATENT",), - "samples_from": ("LATENT",), - "x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), - "y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), - "feather": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), - }} - RETURN_TYPES = ("LATENT",) - FUNCTION = "composite" - - CATEGORY = "latent" - - def composite(self, samples_to, samples_from, x, y, composite_method="normal", feather=0): - x = x // 8 - y = y // 8 - feather = feather // 8 - samples_out = samples_to.copy() - s = samples_to["samples"].clone() - samples_to = samples_to["samples"] - samples_from = samples_from["samples"] - if feather == 0: - s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x] - else: - samples_from = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x] - mask = torch.ones_like(samples_from) - for t in range(feather): - if y != 0: - mask[:,:,t:1+t,:] *= ((1.0/feather) * (t + 1)) - - if y + samples_from.shape[2] < samples_to.shape[2]: - mask[:,:,mask.shape[2] -1 -t: mask.shape[2]-t,:] *= ((1.0/feather) * (t + 1)) - if x != 0: - mask[:,:,:,t:1+t] *= ((1.0/feather) * (t + 1)) - if x + samples_from.shape[3] < samples_to.shape[3]: - mask[:,:,:,mask.shape[3]- 1 - t: mask.shape[3]- t] *= ((1.0/feather) * (t + 1)) - rev_mask = torch.ones_like(mask) - mask - s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x] * mask + s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] * rev_mask - samples_out["samples"] = s - return (samples_out,) - -class LatentBlend: - @classmethod - def INPUT_TYPES(s): - return {"required": { - "samples1": ("LATENT",), - "samples2": ("LATENT",), - "blend_factor": ("FLOAT", { - "default": 0.5, - "min": 0, - "max": 1, - "step": 0.01 - }), - }} - - RETURN_TYPES = ("LATENT",) - FUNCTION = "blend" - - CATEGORY = "_for_testing" - - def blend(self, samples1, samples2, blend_factor:float, blend_mode: str="normal"): - - samples_out = samples1.copy() - samples1 = samples1["samples"] - samples2 = samples2["samples"] - - if samples1.shape != samples2.shape: - samples2.permute(0, 3, 1, 2) - samples2 = comfy.utils.common_upscale(samples2, samples1.shape[3], samples1.shape[2], 'bicubic', crop='center') - samples2.permute(0, 2, 3, 1) - - samples_blended = self.blend_mode(samples1, samples2, blend_mode) - samples_blended = samples1 * blend_factor + samples_blended * (1 - blend_factor) - samples_out["samples"] = samples_blended - return (samples_out,) - - def blend_mode(self, img1, img2, mode): - if mode == "normal": - return img2 - else: - raise ValueError(f"Unsupported blend mode: {mode}") - -class LatentCrop: - @classmethod - def INPUT_TYPES(s): - return {"required": { "samples": ("LATENT",), - "width": ("INT", {"default": 512, "min": 64, "max": MAX_RESOLUTION, "step": 8}), - "height": ("INT", {"default": 512, "min": 64, "max": MAX_RESOLUTION, "step": 8}), - "x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), - "y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), - }} - RETURN_TYPES = ("LATENT",) - FUNCTION = "crop" - - CATEGORY = "latent/transform" - - def crop(self, samples, width, height, x, y): - s = samples.copy() - samples = samples['samples'] - x = x // 8 - y = y // 8 - - #enfonce minimum size of 64 - if x > (samples.shape[3] - 8): - x = samples.shape[3] - 8 - if y > (samples.shape[2] - 8): - y = samples.shape[2] - 8 - - new_height = height // 8 - new_width = width // 8 - to_x = new_width + x - to_y = new_height + y - s['samples'] = samples[:,:,y:to_y, x:to_x] - return (s,) - -class SetLatentNoiseMask: - @classmethod - def INPUT_TYPES(s): - return {"required": { "samples": ("LATENT",), - "mask": ("MASK",), - }} - RETURN_TYPES = ("LATENT",) - FUNCTION = "set_mask" - - CATEGORY = "latent/inpaint" - - def set_mask(self, samples, mask): - s = samples.copy() - s["noise_mask"] = mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])) - return (s,) - -def common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent, denoise=1.0, disable_noise=False, start_step=None, last_step=None, force_full_denoise=False): - latent_image = latent["samples"] - latent_image = comfy.sample.fix_empty_latent_channels(model, latent_image) - - if disable_noise: - noise = torch.zeros(latent_image.size(), dtype=latent_image.dtype, layout=latent_image.layout, device="cpu") - else: - batch_inds = latent["batch_index"] if "batch_index" in latent else None - noise = comfy.sample.prepare_noise(latent_image, seed, batch_inds) - - noise_mask = None - if "noise_mask" in latent: - noise_mask = latent["noise_mask"] - - callback = latent_preview.prepare_callback(model, steps) - disable_pbar = not comfy.utils.PROGRESS_BAR_ENABLED - samples = comfy.sample.sample(model, noise, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, - denoise=denoise, disable_noise=disable_noise, start_step=start_step, last_step=last_step, - force_full_denoise=force_full_denoise, noise_mask=noise_mask, callback=callback, disable_pbar=disable_pbar, seed=seed) - out = latent.copy() - out["samples"] = samples - return (out, ) - -class KSampler: - @classmethod - def INPUT_TYPES(s): - return {"required": - {"model": ("MODEL",), - "seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}), - "steps": ("INT", {"default": 20, "min": 1, "max": 10000}), - "cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0, "step":0.1, "round": 0.01}), - "sampler_name": (comfy.samplers.KSampler.SAMPLERS, ), - "scheduler": (comfy.samplers.KSampler.SCHEDULERS, ), - "positive": ("CONDITIONING", ), - "negative": ("CONDITIONING", ), - "latent_image": ("LATENT", ), - "denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}), - } - } - - RETURN_TYPES = ("LATENT",) - FUNCTION = "sample" - - CATEGORY = "sampling" - - def sample(self, model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=1.0): - return common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise) - -class KSamplerAdvanced: - @classmethod - def INPUT_TYPES(s): - return {"required": - {"model": ("MODEL",), - "add_noise": (["enable", "disable"], ), - "noise_seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}), - "steps": ("INT", {"default": 20, "min": 1, "max": 10000}), - "cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0, "step":0.1, "round": 0.01}), - "sampler_name": (comfy.samplers.KSampler.SAMPLERS, ), - "scheduler": (comfy.samplers.KSampler.SCHEDULERS, ), - "positive": ("CONDITIONING", ), - "negative": ("CONDITIONING", ), - "latent_image": ("LATENT", ), - "start_at_step": ("INT", {"default": 0, "min": 0, "max": 10000}), - "end_at_step": ("INT", {"default": 10000, "min": 0, "max": 10000}), - "return_with_leftover_noise": (["disable", "enable"], ), - } - } - - RETURN_TYPES = ("LATENT",) - FUNCTION = "sample" - - CATEGORY = "sampling" - - def sample(self, model, add_noise, noise_seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, start_at_step, end_at_step, return_with_leftover_noise, denoise=1.0): - force_full_denoise = True - if return_with_leftover_noise == "enable": - force_full_denoise = False - disable_noise = False - if add_noise == "disable": - disable_noise = True - return common_ksampler(model, noise_seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise, disable_noise=disable_noise, start_step=start_at_step, last_step=end_at_step, force_full_denoise=force_full_denoise) - -class SaveImage: +class DeepfuzePreview: def __init__(self): self.output_dir = folder_paths.get_output_directory() self.type = "output" @@ -1387,599 +1104,79 @@ class SaveImage: self.compress_level = 4 @classmethod - def INPUT_TYPES(s): - return {"required": - {"images": ("IMAGE", ), - "filename_prefix": ("STRING", {"default": "ComfyUI"})}, - "hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"}, + def INPUT_TYPES(self): + return { + "required": { + "images": ("IMAGE",), + "face_mask_padding_left": ("INT",{"default":0,"min":0,"max":30,"step":1}), + "face_mask_padding_right": ("INT",{"default":0,"min":0,"max":30,"step":1}), + "face_mask_padding_bottom": ("INT",{"default":0,"min":0,"max":30,"step":1}), + "face_mask_padding_top": ("INT",{"default":0,"min":0,"max":30,"step":1}), } - + } RETURN_TYPES = () - FUNCTION = "save_images" - + FUNCTION = "test" # Entry-point method name OUTPUT_NODE = True + CATEGORY = "DeepFuze" # Category for the node in the UI - CATEGORY = "image" - - def save_images(self, images, filename_prefix="ComfyUI", prompt=None, extra_pnginfo=None): + def test(self, images, face_mask_padding_left, face_mask_padding_right,face_mask_padding_bottom,face_mask_padding_top, filename_prefix="ComfyUI", prompt=None, extra_pnginfo=None): filename_prefix += self.prefix_append full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path(filename_prefix, self.output_dir, images[0].shape[1], images[0].shape[0]) + print(filename) results = list() for (batch_number, image) in enumerate(images): i = 255. * image.cpu().numpy() img = Image.fromarray(np.clip(i, 0, 255).astype(np.uint8)) metadata = None - if not args.disable_metadata: - metadata = PngInfo() - if prompt is not None: - metadata.add_text("prompt", json.dumps(prompt)) - if extra_pnginfo is not None: - for x in extra_pnginfo: - metadata.add_text(x, json.dumps(extra_pnginfo[x])) - + filename_with_batch_num = filename.replace("%batch_num%", str(batch_number)) file = f"{filename_with_batch_num}_{counter:05}_.png" img.save(os.path.join(full_output_folder, file), pnginfo=metadata, compress_level=self.compress_level) + command = [ + 'python', + 'run.py', # Script to run + '--frame-processors', + "face_debugger", + "-t", + os.path.join(full_output_folder, file), + '-o', + os.path.join(full_output_folder, "_"+file), + '--face-mask-types', + 'box', + '--face-mask-padding', + f'{str(face_mask_padding_top)}', + f'{str(face_mask_padding_bottom)}', + f'{str(face_mask_padding_left)}', + f'{str(face_mask_padding_right)}', + '--headless' + ] + print(command) + # result = subprocess.Popen(" ".join(command),cwd="custom_nodes/ComfyUI-DeepFuze",stdout=subprocess.PIPE) + result = subprocess.run(command,cwd="custom_nodes/ComfyUI-DeepFuze",stdout=subprocess.PIPE) + print(result.stdout) results.append({ - "filename": file, + "filename": "_"+file, "subfolder": subfolder, "type": self.type }) counter += 1 + return { "ui": { "images": results } } - return { "ui": { "images": results } } -class PreviewImage(SaveImage): - def __init__(self): - self.output_dir = folder_paths.get_temp_directory() - self.type = "temp" - self.prefix_append = "_temp_" + ''.join(random.choice("abcdefghijklmnopqrstupvxyz") for x in range(5)) - self.compress_level = 1 - @classmethod - def INPUT_TYPES(s): - return {"required": - {"images": ("IMAGE", ), }, - "hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"}, - } - -class LoadImage: - @classmethod - def INPUT_TYPES(s): - input_dir = folder_paths.get_input_directory() - files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))] - return {"required": - {"image": (sorted(files), {"image_upload": True})}, - } - - CATEGORY = "image" - - RETURN_TYPES = ("IMAGE", "MASK") - FUNCTION = "load_image" - def load_image(self, image): - image_path = folder_paths.get_annotated_filepath(image) - - img = node_helpers.pillow(Image.open, image_path) - - output_images = [] - output_masks = [] - w, h = None, None - - excluded_formats = ['MPO'] - - for i in ImageSequence.Iterator(img): - i = node_helpers.pillow(ImageOps.exif_transpose, i) - - if i.mode == 'I': - i = i.point(lambda i: i * (1 / 255)) - image = i.convert("RGB") - - if len(output_images) == 0: - w = image.size[0] - h = image.size[1] - - if image.size[0] != w or image.size[1] != h: - continue - - image = np.array(image).astype(np.float32) / 255.0 - image = torch.from_numpy(image)[None,] - if 'A' in i.getbands(): - mask = np.array(i.getchannel('A')).astype(np.float32) / 255.0 - mask = 1. - torch.from_numpy(mask) - else: - mask = torch.zeros((64,64), dtype=torch.float32, device="cpu") - output_images.append(image) - output_masks.append(mask.unsqueeze(0)) - - if len(output_images) > 1 and img.format not in excluded_formats: - output_image = torch.cat(output_images, dim=0) - output_mask = torch.cat(output_masks, dim=0) - else: - output_image = output_images[0] - output_mask = output_masks[0] - - return (output_image, output_mask) - - @classmethod - def IS_CHANGED(s, image): - image_path = folder_paths.get_annotated_filepath(image) - m = hashlib.sha256() - with open(image_path, 'rb') as f: - m.update(f.read()) - return m.digest().hex() - - @classmethod - def VALIDATE_INPUTS(s, image): - if not folder_paths.exists_annotated_filepath(image): - return "Invalid image file: {}".format(image) - - return True - -class LoadImageMask: - _color_channels = ["alpha", "red", "green", "blue"] - @classmethod - def INPUT_TYPES(s): - input_dir = folder_paths.get_input_directory() - files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))] - return {"required": - {"image": (sorted(files), {"image_upload": True}), - "channel": (s._color_channels, ), } - } - - CATEGORY = "mask" - - RETURN_TYPES = ("MASK",) - FUNCTION = "load_image" - def load_image(self, image, channel): - image_path = folder_paths.get_annotated_filepath(image) - i = node_helpers.pillow(Image.open, image_path) - i = node_helpers.pillow(ImageOps.exif_transpose, i) - if i.getbands() != ("R", "G", "B", "A"): - if i.mode == 'I': - i = i.point(lambda i: i * (1 / 255)) - i = i.convert("RGBA") - mask = None - c = channel[0].upper() - if c in i.getbands(): - mask = np.array(i.getchannel(c)).astype(np.float32) / 255.0 - mask = torch.from_numpy(mask) - if c == 'A': - mask = 1. - mask - else: - mask = torch.zeros((64,64), dtype=torch.float32, device="cpu") - return (mask.unsqueeze(0),) - - @classmethod - def IS_CHANGED(s, image, channel): - image_path = folder_paths.get_annotated_filepath(image) - m = hashlib.sha256() - with open(image_path, 'rb') as f: - m.update(f.read()) - return m.digest().hex() - - @classmethod - def VALIDATE_INPUTS(s, image): - if not folder_paths.exists_annotated_filepath(image): - return "Invalid image file: {}".format(image) - - return True - -class ImageScale: - upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "lanczos"] - crop_methods = ["disabled", "center"] - - @classmethod - def INPUT_TYPES(s): - return {"required": { "image": ("IMAGE",), "upscale_method": (s.upscale_methods,), - "width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 1}), - "height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 1}), - "crop": (s.crop_methods,)}} - RETURN_TYPES = ("IMAGE",) - FUNCTION = "upscale" - - CATEGORY = "image/upscaling" - - def upscale(self, image, upscale_method, width, height, crop): - if width == 0 and height == 0: - s = image - else: - samples = image.movedim(-1,1) - - if width == 0: - width = max(1, round(samples.shape[3] * height / samples.shape[2])) - elif height == 0: - height = max(1, round(samples.shape[2] * width / samples.shape[3])) - - s = comfy.utils.common_upscale(samples, width, height, upscale_method, crop) - s = s.movedim(1,-1) - return (s,) - -class ImageScaleBy: - upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "lanczos"] - - @classmethod - def INPUT_TYPES(s): - return {"required": { "image": ("IMAGE",), "upscale_method": (s.upscale_methods,), - "scale_by": ("FLOAT", {"default": 1.0, "min": 0.01, "max": 8.0, "step": 0.01}),}} - RETURN_TYPES = ("IMAGE",) - FUNCTION = "upscale" - - CATEGORY = "image/upscaling" - - def upscale(self, image, upscale_method, scale_by): - samples = image.movedim(-1,1) - width = round(samples.shape[3] * scale_by) - height = round(samples.shape[2] * scale_by) - s = comfy.utils.common_upscale(samples, width, height, upscale_method, "disabled") - s = s.movedim(1,-1) - return (s,) - -class ImageInvert: - - @classmethod - def INPUT_TYPES(s): - return {"required": { "image": ("IMAGE",)}} - - RETURN_TYPES = ("IMAGE",) - FUNCTION = "invert" - - CATEGORY = "image" - - def invert(self, image): - s = 1.0 - image - return (s,) - -class ImageBatch: - - @classmethod - def INPUT_TYPES(s): - return {"required": { "image1": ("IMAGE",), "image2": ("IMAGE",)}} - - RETURN_TYPES = ("IMAGE",) - FUNCTION = "batch" - - CATEGORY = "image" - - def batch(self, image1, image2): - if image1.shape[1:] != image2.shape[1:]: - image2 = comfy.utils.common_upscale(image2.movedim(-1,1), image1.shape[2], image1.shape[1], "bilinear", "center").movedim(1,-1) - s = torch.cat((image1, image2), dim=0) - return (s,) - -class EmptyImage: - def __init__(self, device="cpu"): - self.device = device - - @classmethod - def INPUT_TYPES(s): - return {"required": { "width": ("INT", {"default": 512, "min": 1, "max": MAX_RESOLUTION, "step": 1}), - "height": ("INT", {"default": 512, "min": 1, "max": MAX_RESOLUTION, "step": 1}), - "batch_size": ("INT", {"default": 1, "min": 1, "max": 4096}), - "color": ("INT", {"default": 0, "min": 0, "max": 0xFFFFFF, "step": 1, "display": "color"}), - }} - RETURN_TYPES = ("IMAGE",) - FUNCTION = "generate" - - CATEGORY = "image" - - def generate(self, width, height, batch_size=1, color=0): - r = torch.full([batch_size, height, width, 1], ((color >> 16) & 0xFF) / 0xFF) - g = torch.full([batch_size, height, width, 1], ((color >> 8) & 0xFF) / 0xFF) - b = torch.full([batch_size, height, width, 1], ((color) & 0xFF) / 0xFF) - return (torch.cat((r, g, b), dim=-1), ) - -class ImagePadForOutpaint: - - @classmethod - def INPUT_TYPES(s): - return { - "required": { - "image": ("IMAGE",), - "left": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), - "top": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), - "right": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), - "bottom": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), - "feathering": ("INT", {"default": 40, "min": 0, "max": MAX_RESOLUTION, "step": 1}), - } - } - - RETURN_TYPES = ("IMAGE", "MASK") - FUNCTION = "expand_image" - - CATEGORY = "image" - - def expand_image(self, image, left, top, right, bottom, feathering): - d1, d2, d3, d4 = image.size() - - new_image = torch.ones( - (d1, d2 + top + bottom, d3 + left + right, d4), - dtype=torch.float32, - ) * 0.5 - - new_image[:, top:top + d2, left:left + d3, :] = image - - mask = torch.ones( - (d2 + top + bottom, d3 + left + right), - dtype=torch.float32, - ) - - t = torch.zeros( - (d2, d3), - dtype=torch.float32 - ) - - if feathering > 0 and feathering * 2 < d2 and feathering * 2 < d3: - - for i in range(d2): - for j in range(d3): - dt = i if top != 0 else d2 - db = d2 - i if bottom != 0 else d2 - - dl = j if left != 0 else d3 - dr = d3 - j if right != 0 else d3 - - d = min(dt, db, dl, dr) - - if d >= feathering: - continue - - v = (feathering - d) / feathering - - t[i, j] = v * v - - mask[top:top + d2, left:left + d3] = t - - return (new_image, mask) NODE_CLASS_MAPPINGS = { - "KSampler": KSampler, - "CheckpointLoaderSimple": CheckpointLoaderSimple, - "CLIPTextEncode": CLIPTextEncode, - "CLIPSetLastLayer": CLIPSetLastLayer, - "VAEDecode": VAEDecode, - "VAEEncode": VAEEncode, - "VAEEncodeForInpaint": VAEEncodeForInpaint, - "VAELoader": VAELoader, - "EmptyLatentImage": EmptyLatentImage, - "LatentUpscale": LatentUpscale, - "LatentUpscaleBy": LatentUpscaleBy, - "LatentFromBatch": LatentFromBatch, - "RepeatLatentBatch": RepeatLatentBatch, - "SaveImage": SaveImage, - "PreviewImage": PreviewImage, - "LoadImage": LoadImage, - "LoadImageMask": LoadImageMask, - "ImageScale": ImageScale, - "ImageScaleBy": ImageScaleBy, - "ImageInvert": ImageInvert, - "ImageBatch": ImageBatch, - "ImagePadForOutpaint": ImagePadForOutpaint, - "EmptyImage": EmptyImage, - "ConditioningAverage": ConditioningAverage , - "ConditioningCombine": ConditioningCombine, - "ConditioningConcat": ConditioningConcat, - "ConditioningSetArea": ConditioningSetArea, - "ConditioningSetAreaPercentage": ConditioningSetAreaPercentage, - "ConditioningSetAreaStrength": ConditioningSetAreaStrength, - "ConditioningSetMask": ConditioningSetMask, - "KSamplerAdvanced": KSamplerAdvanced, - "SetLatentNoiseMask": SetLatentNoiseMask, - "LatentComposite": LatentComposite, - "LatentBlend": LatentBlend, - "LatentRotate": LatentRotate, - "LatentFlip": LatentFlip, - "LatentCrop": LatentCrop, - "LoraLoader": LoraLoader, - "CLIPLoader": CLIPLoader, - "UNETLoader": UNETLoader, - "DualCLIPLoader": DualCLIPLoader, - "CLIPVisionEncode": CLIPVisionEncode, - "StyleModelApply": StyleModelApply, - "unCLIPConditioning": unCLIPConditioning, - "ControlNetApply": ControlNetApply, - "ControlNetApplyAdvanced": ControlNetApplyAdvanced, - "ControlNetLoader": ControlNetLoader, - "DiffControlNetLoader": DiffControlNetLoader, - "StyleModelLoader": StyleModelLoader, - "CLIPVisionLoader": CLIPVisionLoader, - "VAEDecodeTiled": VAEDecodeTiled, - "VAEEncodeTiled": VAEEncodeTiled, - "unCLIPCheckpointLoader": unCLIPCheckpointLoader, - "GLIGENLoader": GLIGENLoader, - "GLIGENTextBoxApply": GLIGENTextBoxApply, - "InpaintModelConditioning": InpaintModelConditioning, - - "CheckpointLoader": CheckpointLoader, - "DiffusersLoader": DiffusersLoader, - - "LoadLatent": LoadLatent, - "SaveLatent": SaveLatent, - - "ConditioningZeroOut": ConditioningZeroOut, - "ConditioningSetTimestepRange": ConditioningSetTimestepRange, - "LoraLoaderModelOnly": LoraLoaderModelOnly, + "DeepFuzeAdavance": DeepFuzeAdavance, + "TTS_generation":TTS_generation, + "LLM_node": LLM_node, + "PlayBackAudio": PlayBackAudio, + "DeepfuzePreview":DeepfuzePreview } - NODE_DISPLAY_NAME_MAPPINGS = { - # Sampling - "KSampler": "KSampler", - "KSamplerAdvanced": "KSampler (Advanced)", - # Loaders - "CheckpointLoader": "Load Checkpoint With Config (DEPRECATED)", - "CheckpointLoaderSimple": "Load Checkpoint", - "VAELoader": "Load VAE", - "LoraLoader": "Load LoRA", - "CLIPLoader": "Load CLIP", - "ControlNetLoader": "Load ControlNet Model", - "DiffControlNetLoader": "Load ControlNet Model (diff)", - "StyleModelLoader": "Load Style Model", - "CLIPVisionLoader": "Load CLIP Vision", - "UpscaleModelLoader": "Load Upscale Model", - # Conditioning - "CLIPVisionEncode": "CLIP Vision Encode", - "StyleModelApply": "Apply Style Model", - "CLIPTextEncode": "CLIP Text Encode (Prompt)", - "CLIPSetLastLayer": "CLIP Set Last Layer", - "ConditioningCombine": "Conditioning (Combine)", - "ConditioningAverage ": "Conditioning (Average)", - "ConditioningConcat": "Conditioning (Concat)", - "ConditioningSetArea": "Conditioning (Set Area)", - "ConditioningSetAreaPercentage": "Conditioning (Set Area with Percentage)", - "ConditioningSetMask": "Conditioning (Set Mask)", - "ControlNetApply": "Apply ControlNet", - "ControlNetApplyAdvanced": "Apply ControlNet (Advanced)", - # Latent - "VAEEncodeForInpaint": "VAE Encode (for Inpainting)", - "SetLatentNoiseMask": "Set Latent Noise Mask", - "VAEDecode": "VAE Decode", - "VAEEncode": "VAE Encode", - "LatentRotate": "Rotate Latent", - "LatentFlip": "Flip Latent", - "LatentCrop": "Crop Latent", - "EmptyLatentImage": "Empty Latent Image", - "LatentUpscale": "Upscale Latent", - "LatentUpscaleBy": "Upscale Latent By", - "LatentComposite": "Latent Composite", - "LatentBlend": "Latent Blend", - "LatentFromBatch" : "Latent From Batch", - "RepeatLatentBatch": "Repeat Latent Batch", - # Image - "SaveImage": "Save Image", - "PreviewImage": "Preview Image", - "LoadImage": "Load Image", - "LoadImageMask": "Load Image (as Mask)", - "ImageScale": "Upscale Image", - "ImageScaleBy": "Upscale Image By", - "ImageUpscaleWithModel": "Upscale Image (using Model)", - "ImageInvert": "Invert Image", - "ImagePadForOutpaint": "Pad Image for Outpainting", - "ImageBatch": "Batch Images", - # _for_testing - "VAEDecodeTiled": "VAE Decode (Tiled)", - "VAEEncodeTiled": "VAE Encode (Tiled)", + "DeepFuzeAdavance": "DeepFuze Lipsync", + "TTS_generation":"DeepFuze TTS", + "LLM_node": "Openai LLM", + "PlayBackAudio": "Play Audio", + "DeepfuzePreview": "DeepFuze Padding Preview" } - -EXTENSION_WEB_DIRS = {} - -def load_custom_node(module_path, ignore=set()): - module_name = os.path.basename(module_path) - if os.path.isfile(module_path): - sp = os.path.splitext(module_path) - module_name = sp[0] - try: - logging.debug("Trying to load custom node {}".format(module_path)) - if os.path.isfile(module_path): - module_spec = importlib.util.spec_from_file_location(module_name, module_path) - module_dir = os.path.split(module_path)[0] - else: - module_spec = importlib.util.spec_from_file_location(module_name, os.path.join(module_path, "__init__.py")) - module_dir = module_path - - module = importlib.util.module_from_spec(module_spec) - sys.modules[module_name] = module - module_spec.loader.exec_module(module) - - if hasattr(module, "WEB_DIRECTORY") and getattr(module, "WEB_DIRECTORY") is not None: - web_dir = os.path.abspath(os.path.join(module_dir, getattr(module, "WEB_DIRECTORY"))) - if os.path.isdir(web_dir): - EXTENSION_WEB_DIRS[module_name] = web_dir - - if hasattr(module, "NODE_CLASS_MAPPINGS") and getattr(module, "NODE_CLASS_MAPPINGS") is not None: - for name in module.NODE_CLASS_MAPPINGS: - if name not in ignore: - NODE_CLASS_MAPPINGS[name] = module.NODE_CLASS_MAPPINGS[name] - if hasattr(module, "NODE_DISPLAY_NAME_MAPPINGS") and getattr(module, "NODE_DISPLAY_NAME_MAPPINGS") is not None: - NODE_DISPLAY_NAME_MAPPINGS.update(module.NODE_DISPLAY_NAME_MAPPINGS) - return True - else: - logging.warning(f"Skip {module_path} module for custom nodes due to the lack of NODE_CLASS_MAPPINGS.") - return False - except Exception as e: - logging.warning(traceback.format_exc()) - logging.warning(f"Cannot import {module_path} module for custom nodes: {e}") - return False - -def load_custom_nodes(): - base_node_names = set(NODE_CLASS_MAPPINGS.keys()) - node_paths = folder_paths.get_folder_paths("custom_nodes") - node_import_times = [] - for custom_node_path in node_paths: - possible_modules = os.listdir(os.path.realpath(custom_node_path)) - if "__pycache__" in possible_modules: - possible_modules.remove("__pycache__") - - for possible_module in possible_modules: - module_path = os.path.join(custom_node_path, possible_module) - if os.path.isfile(module_path) and os.path.splitext(module_path)[1] != ".py": continue - if module_path.endswith(".disabled"): continue - time_before = time.perf_counter() - success = load_custom_node(module_path, base_node_names) - node_import_times.append((time.perf_counter() - time_before, module_path, success)) - - if len(node_import_times) > 0: - logging.info("\nImport times for custom nodes:") - for n in sorted(node_import_times): - if n[2]: - import_message = "" - else: - import_message = " (IMPORT FAILED)" - logging.info("{:6.1f} seconds{}: {}".format(n[0], import_message, n[1])) - logging.info("") - -def init_custom_nodes(): - extras_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "comfy_extras") - extras_files = [ - "nodes_latent.py", - "nodes_hypernetwork.py", - "nodes_upscale_model.py", - "nodes_post_processing.py", - "nodes_mask.py", - "nodes_compositing.py", - "nodes_rebatch.py", - "nodes_model_merging.py", - "nodes_tomesd.py", - "nodes_clip_sdxl.py", - "nodes_canny.py", - "nodes_freelunch.py", - "nodes_custom_sampler.py", - "nodes_hypertile.py", - "nodes_model_advanced.py", - "nodes_model_downscale.py", - "nodes_images.py", - "nodes_video_model.py", - "nodes_sag.py", - "nodes_perpneg.py", - "nodes_stable3d.py", - "nodes_sdupscale.py", - "nodes_photomaker.py", - "nodes_cond.py", - "nodes_morphology.py", - "nodes_stable_cascade.py", - "nodes_differential_diffusion.py", - "nodes_ip2p.py", - "nodes_model_merging_model_specific.py", - "nodes_pag.py", - "nodes_align_your_steps.py", - "nodes_attention_multiply.py", - "nodes_advanced_samplers.py", - "nodes_webcam.py", - ] - - import_failed = [] - for node_file in extras_files: - if not load_custom_node(os.path.join(extras_dir, node_file)): - import_failed.append(node_file) - - load_custom_nodes() - - if len(import_failed) > 0: - logging.warning("WARNING: some comfy_extras/ nodes did not import correctly. This may be because they are missing some dependencies.\n") - for node in import_failed: - logging.warning("IMPORT FAILED: {}".format(node)) - logging.warning("\nThis issue might be caused by new missing dependencies added the last time you updated ComfyUI.") - if args.windows_standalone_build: - logging.warning("Please run the update script: update/update_comfyui.bat") - else: - logging.warning("Please do a: pip install -r requirements.txt") - logging.warning("")