import os import folder_paths import comfy.diffusers_load import comfy.samplers import comfy.sample import comfy.sd import comfy.utils import comfy.controlnet import comfy.clip_vision import comfy.model_management from comfy.cli_args import args import torch import torch.nn as nn import numpy as np import latent_preview from PIL import Image from einops import rearrange import scipy.ndimage import sys import cv2 from magic_utils import HWC3, apply_color, common_input_validate, resize_image_with_pad from pidi import pidinet supported_pt_extensions = set(['.ckpt', '.pt', '.bin', '.pth', '.safetensors', '.pkl']) folder_names_and_paths = {} base_path = os.path.dirname(os.path.realpath(__file__)) models_dir = os.path.join(base_path, "../models") folder_names_and_paths["checkpoints"] = ([os.path.join(models_dir, "checkpoints")], supported_pt_extensions) folder_names_and_paths["configs"] = ([os.path.join(models_dir, "configs")], [".yaml"]) folder_names_and_paths["loras"] = ([os.path.join(models_dir, "loras")], supported_pt_extensions) folder_names_and_paths["vae"] = ([os.path.join(models_dir, "vae")], supported_pt_extensions) folder_names_and_paths["clip"] = ([os.path.join(models_dir, "clip")], supported_pt_extensions) folder_names_and_paths["unet"] = ([os.path.join(models_dir, "unet")], supported_pt_extensions) folder_names_and_paths["clip_vision"] = ([os.path.join(models_dir, "clip_vision")], supported_pt_extensions) folder_names_and_paths["style_models"] = ([os.path.join(models_dir, "style_models")], supported_pt_extensions) folder_names_and_paths["embeddings"] = ([os.path.join(models_dir, "embeddings")], supported_pt_extensions) folder_names_and_paths["diffusers"] = ([os.path.join(models_dir, "diffusers")], ["folder"]) folder_names_and_paths["vae_approx"] = ([os.path.join(models_dir, "vae_approx")], supported_pt_extensions) folder_names_and_paths["controlnet"] = ([os.path.join(models_dir, "controlnet"), os.path.join(models_dir, "t2i_adapter")], supported_pt_extensions) folder_names_and_paths["gligen"] = ([os.path.join(models_dir, "gligen")], supported_pt_extensions) folder_names_and_paths["upscale_models"] = ([os.path.join(models_dir, "upscale_models")], supported_pt_extensions) folder_names_and_paths["hypernetworks"] = ([os.path.join(models_dir, "hypernetworks")], supported_pt_extensions) folder_names_and_paths["photomaker"] = ([os.path.join(models_dir, "photomaker")], supported_pt_extensions) folder_names_and_paths["classifiers"] = ([os.path.join(models_dir, "classifiers")], {""}) def common_annotator_call(model, tensor_image, input_batch=False, show_pbar=True, **kwargs): if "detect_resolution" in kwargs: del kwargs["detect_resolution"] #Prevent weird case? if "resolution" in kwargs: detect_resolution = kwargs["resolution"] if type(kwargs["resolution"]) == int and kwargs["resolution"] >= 64 else 512 del kwargs["resolution"] else: detect_resolution = 512 if input_batch: np_images = np.asarray(tensor_image * 255., dtype=np.uint8) np_results = model(np_images, output_type="np", detect_resolution=detect_resolution, **kwargs) return torch.from_numpy(np_results.astype(np.float32) / 255.0) batch_size = tensor_image.shape[0] if show_pbar: pbar = comfy.utils.ProgressBar(batch_size) out_tensor = None for i, image in enumerate(tensor_image): np_image = np.asarray(image.cpu() * 255., dtype=np.uint8) np_result = model(np_image, output_type="np", detect_resolution=detect_resolution, **kwargs) out = torch.from_numpy(np_result.astype(np.float32) / 255.0) if out_tensor is None: out_tensor = torch.zeros(batch_size, *out.shape, dtype=torch.float32) out_tensor[i] = out if show_pbar: pbar.update(1) return out_tensor class CheckpointLoaderSimple: def load_checkpoint(self, ckpt_name): ckpt_path = folder_paths.get_full_path("checkpoints", ckpt_name) print("Loading checkpoint from:", ckpt_path) out = comfy.sd.load_checkpoint_guess_config(ckpt_path, output_vae=True, output_clip=True, embedding_directory=folder_paths.get_folder_paths("embeddings")) return out[:3] class ControlNetLoader: def load_controlnet(self, control_net_name): controlnet_path = folder_paths.get_full_path("controlnet", control_net_name) controlnet = comfy.controlnet.load_controlnet(controlnet_path) return (controlnet, ) class ControlNetApplyAdvanced: def apply_controlnet(self, positive, negative, control_net, image, strength, start_percent, end_percent): if strength == 0: return (positive, negative) control_hint = image.movedim(-1,1) cnets = {} out = [] for conditioning in [positive, negative]: c = [] for t in conditioning: d = t[1].copy() prev_cnet = d.get('control', None) if prev_cnet in cnets: c_net = cnets[prev_cnet] else: c_net = control_net.copy().set_cond_hint(control_hint, strength, (start_percent, end_percent)) c_net.set_previous_controlnet(prev_cnet) cnets[prev_cnet] = c_net d['control'] = c_net d['control_apply_to_uncond'] = False n = [t[0], d] c.append(n) out.append(c) return (out[0], out[1]) class CLIPTextEncode: def encode(self, clip, text): tokens = clip.tokenize(text) cond, pooled = clip.encode_from_tokens(tokens, return_pooled=True) return ([[cond, {"pooled_output": pooled}]], ) class KSampler: def common_ksampler(self, model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent, denoise, disable_noise=False, start_step=None, last_step=None, force_full_denoise=False): latent_image = latent["samples"] latent_image = comfy.sample.fix_empty_latent_channels(model, latent_image) if disable_noise: noise = torch.zeros(latent_image.size(), dtype=latent_image.dtype, layout=latent_image.layout, device="cpu") else: batch_inds = latent["batch_index"] if "batch_index" in latent else None noise = comfy.sample.prepare_noise(latent_image, seed, batch_inds) noise_mask = None if "noise_mask" in latent: noise_mask = latent["noise_mask"] callback = latent_preview.prepare_callback(model, steps) disable_pbar = not comfy.utils.PROGRESS_BAR_ENABLED samples = comfy.sample.sample(model, noise, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise, disable_noise=disable_noise, start_step=start_step, last_step=last_step, force_full_denoise=force_full_denoise, noise_mask=noise_mask, callback=callback, disable_pbar=disable_pbar, seed=seed) out = latent.copy() out["samples"] = samples return (out, ) def sample(self, model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=1.0): return self.common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise) class VAEDecode: def decode(self, vae, samples): return (vae.decode(samples["samples"]), ) class ColorDetector: def __call__(self, input_image=None, detect_resolution=2048, output_type=None, **kwargs): input_image, output_type = common_input_validate(input_image, output_type, **kwargs) input_image = HWC3(input_image) detected_map = HWC3(apply_color(input_image, detect_resolution)) if output_type == "pil": detected_map = Image.fromarray(detected_map) return detected_map class Color_Preprocessor: def execute(self, image, resolution=512, **kwargs): return (common_annotator_call(ColorDetector(), image, resolution=resolution), ) norm_layer = nn.InstanceNorm2d class ResidualBlock(nn.Module): def __init__(self, in_features): super(ResidualBlock, self).__init__() conv_block = [ nn.ReflectionPad2d(1), nn.Conv2d(in_features, in_features, 3), norm_layer(in_features), nn.ReLU(inplace=True), nn.ReflectionPad2d(1), nn.Conv2d(in_features, in_features, 3), norm_layer(in_features) ] self.conv_block = nn.Sequential(*conv_block) def forward(self, x): return x + self.conv_block(x) class Generator(nn.Module): def __init__(self, input_nc, output_nc, n_residual_blocks=9, sigmoid=True): super(Generator, self).__init__() # Initial convolution block model0 = [ nn.ReflectionPad2d(3), nn.Conv2d(input_nc, 64, 7), norm_layer(64), nn.ReLU(inplace=True) ] self.model0 = nn.Sequential(*model0) # Downsampling model1 = [] in_features = 64 out_features = in_features*2 for _ in range(2): model1 += [ nn.Conv2d(in_features, out_features, 3, stride=2, padding=1), norm_layer(out_features), nn.ReLU(inplace=True) ] in_features = out_features out_features = in_features*2 self.model1 = nn.Sequential(*model1) model2 = [] # Residual blocks for _ in range(n_residual_blocks): model2 += [ResidualBlock(in_features)] self.model2 = nn.Sequential(*model2) # Upsampling model3 = [] out_features = in_features//2 for _ in range(2): model3 += [ nn.ConvTranspose2d(in_features, out_features, 3, stride=2, padding=1, output_padding=1), norm_layer(out_features), nn.ReLU(inplace=True) ] in_features = out_features out_features = in_features//2 self.model3 = nn.Sequential(*model3) # Output layer model4 = [ nn.ReflectionPad2d(3), nn.Conv2d(64, output_nc, 7)] if sigmoid: model4 += [nn.Sigmoid()] self.model4 = nn.Sequential(*model4) def forward(self, x, cond=None): out = self.model0(x) out = self.model1(out) out = self.model2(out) out = self.model3(out) out = self.model4(out) return out class LineartDetector: def __init__(self, model, coarse_model): self.model = model self.model_coarse = coarse_model self.device = "cpu" @classmethod def from_pretrained(cls): current_dir = os.path.dirname(os.path.abspath(__file__)) model_path = os.path.join(current_dir, "../models/preprocessor/sk_model.pth") coarse_model_path = os.path.join(current_dir, "../models/preprocessor/sk_model2.pth") # print("model_path:", model_path) model = Generator(3, 1, 3) model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu'))) model.eval() coarse_model = Generator(3, 1, 3) coarse_model.load_state_dict(torch.load(coarse_model_path, map_location=torch.device('cpu'))) coarse_model.eval() return cls(model, coarse_model) def to(self, device): self.model.to(device) self.model_coarse.to(device) self.device = device return self def __call__(self, input_image, coarse=False, detect_resolution=512, output_type="pil", upscale_method="INTER_CUBIC", **kwargs): input_image, output_type = common_input_validate(input_image, output_type, **kwargs) detected_map, remove_pad = resize_image_with_pad(input_image, detect_resolution, upscale_method) model = self.model_coarse if coarse else self.model assert detected_map.ndim == 3 with torch.no_grad(): image = torch.from_numpy(detected_map).float().to(self.device) image = image / 255.0 image = rearrange(image, 'h w c -> 1 c h w') line = model(image)[0][0] line = line.cpu().numpy() line = (line * 255.0).clip(0, 255).astype(np.uint8) detected_map = HWC3(line) detected_map = remove_pad(255 - detected_map) if output_type == "pil": detected_map = Image.fromarray(detected_map) return detected_map class LineArt_Preprocessor: def execute(self, image, resolution=512, **kwargs): model = LineartDetector.from_pretrained().to(comfy.model_management.get_torch_device()) print("model.device:", model.device) out = common_annotator_call(model, image, resolution=resolution, apply_filter=False, coarse = kwargs["coarse"] == "enable") del model return (out, ) def nms(x, t, s): x = cv2.GaussianBlur(x.astype(np.float32), (0, 0), s) f1 = np.array([[0, 0, 0], [1, 1, 1], [0, 0, 0]], dtype=np.uint8) f2 = np.array([[0, 1, 0], [0, 1, 0], [0, 1, 0]], dtype=np.uint8) f3 = np.array([[1, 0, 0], [0, 1, 0], [0, 0, 1]], dtype=np.uint8) f4 = np.array([[0, 0, 1], [0, 1, 0], [1, 0, 0]], dtype=np.uint8) y = np.zeros_like(x) for f in [f1, f2, f3, f4]: np.putmask(y, cv2.dilate(x, kernel=f) == x, x) z = np.zeros_like(y, dtype=np.uint8) z[y > t] = 255 return z class PidiNetDetector: def __init__(self, netNetwork): self.netNetwork = netNetwork self.device = "cpu" @classmethod def from_pretrained(cls, filename="table5_pidinet.pth"): current_dir = os.path.dirname(os.path.abspath(__file__)) model_path = os.path.join(current_dir, f"../models/preprocessor/{filename}") netNetwork = pidinet() netNetwork.load_state_dict({k.replace('module.', ''): v for k, v in torch.load(model_path)['state_dict'].items()}) netNetwork.eval() return cls(netNetwork) def to(self, device): self.netNetwork.to(device) self.device = device return self def __call__(self, input_image, detect_resolution=512, safe=False, output_type="pil", scribble=False, apply_filter=True, upscale_method="INTER_CUBIC", **kwargs): input_image, output_type = common_input_validate(input_image, output_type, **kwargs) detected_map, remove_pad = resize_image_with_pad(input_image, detect_resolution, upscale_method) detected_map = detected_map[:, :, ::-1].copy() with torch.no_grad(): image_pidi = torch.from_numpy(detected_map).float().to(self.device) image_pidi = image_pidi / 255.0 image_pidi = rearrange(image_pidi, 'h w c -> 1 c h w') edge = self.netNetwork(image_pidi)[-1] edge = edge.cpu().numpy() if apply_filter: edge = edge > 0.5 edge = (edge * 255.0).clip(0, 255).astype(np.uint8) detected_map = edge[0, 0] if scribble: detected_map = nms(detected_map, 127, 3.0) detected_map = cv2.GaussianBlur(detected_map, (0, 0), 3.0) detected_map[detected_map > 4] = 255 detected_map[detected_map < 255] = 0 detected_map = HWC3(remove_pad(detected_map)) if output_type == "pil": detected_map = Image.fromarray(detected_map) return detected_map class GrowMask: def expand_mask(self, mask, expand, tapered_corners): c = 0 if tapered_corners else 1 kernel = np.array([[c, 1, c], [1, 1, 1], [c, 1, c]]) mask = mask.reshape((-1, mask.shape[-2], mask.shape[-1])) out = [] for m in mask: output = m.numpy() for _ in range(abs(expand)): if expand < 0: output = scipy.ndimage.grey_erosion(output, footprint=kernel) else: output = scipy.ndimage.grey_dilation(output, footprint=kernel) output = torch.from_numpy(output) out.append(output) return (torch.stack(out, dim=0),) class PIDINET_Preprocessor: def execute(self, image, resolution=512, **kwargs): model = PidiNetDetector.from_pretrained().to(comfy.model_management.get_torch_device()) out = common_annotator_call(model, image, resolution=resolution, safe=True) del model return (out, )