diff --git "a/stablepy_model.py" "b/stablepy_model.py" new file mode 100644--- /dev/null +++ "b/stablepy_model.py" @@ -0,0 +1,2593 @@ +import gc, time +import numpy as np +import PIL.Image +from diffusers import ( + ControlNetModel, + DiffusionPipeline, + StableDiffusionControlNetPipeline, + StableDiffusionControlNetInpaintPipeline, + StableDiffusionPipeline, + AutoencoderKL, + StableDiffusionXLInpaintPipeline, + StableDiffusionXLAdapterPipeline, + T2IAdapter, + StableDiffusionXLPipeline, + AutoPipelineForImage2Image +) +from huggingface_hub import hf_hub_download +import torch, random, json +from controlnet_aux import ( + CannyDetector, + ContentShuffleDetector, + HEDdetector, + LineartAnimeDetector, + LineartDetector, + MidasDetector, + MLSDdetector, + NormalBaeDetector, + OpenposeDetector, + PidiNetDetector, +) +from transformers import pipeline +from controlnet_aux.util import HWC3, ade_palette +from transformers import AutoImageProcessor, UperNetForSemanticSegmentation +import cv2 +from diffusers import ( + DPMSolverMultistepScheduler, + DPMSolverSinglestepScheduler, + KDPM2DiscreteScheduler, + EulerDiscreteScheduler, + EulerAncestralDiscreteScheduler, + HeunDiscreteScheduler, + LMSDiscreteScheduler, + DDIMScheduler, + DEISMultistepScheduler, + UniPCMultistepScheduler, + LCMScheduler, + PNDMScheduler, + KDPM2AncestralDiscreteScheduler, + EDMDPMSolverMultistepScheduler, + EDMEulerScheduler, +) +from .prompt_weights import get_embed_new, add_comma_after_pattern_ti +from .utils import save_pil_image_with_metadata +from .lora_loader import lora_mix_load +from .inpainting_canvas import draw, make_inpaint_condition +from .adetailer import ad_model_process +from ..upscalers.esrgan import UpscalerESRGAN, UpscalerLanczos, UpscalerNearest +from ..logging.logging_setup import logger +from .extra_model_loaders import custom_task_model_loader +from .high_resolution import process_images_high_resolution +from .style_prompt_config import styles_data, STYLE_NAMES, get_json_content, apply_style +import os +from compel import Compel, ReturnedEmbeddingsType +import ipywidgets as widgets, mediapy +from IPython.display import display +from PIL import Image +from typing import Union, Optional, List, Tuple, Dict, Any, Callable +import logging, diffusers, copy, warnings +logging.getLogger("diffusers").setLevel(logging.ERROR) +#logging.getLogger("transformers").setLevel(logging.ERROR) +diffusers.utils.logging.set_verbosity(40) +warnings.filterwarnings(action="ignore", category=FutureWarning, module="diffusers") +warnings.filterwarnings(action="ignore", category=FutureWarning, module="transformers") + +# ===================================== +# Utils preprocessor +# ===================================== +def resize_image(input_image, resolution, interpolation=None): + H, W, C = input_image.shape + H = float(H) + W = float(W) + k = float(resolution) / max(H, W) + H *= k + W *= k + H = int(np.round(H / 64.0)) * 64 + W = int(np.round(W / 64.0)) * 64 + if interpolation is None: + interpolation = cv2.INTER_LANCZOS4 if k > 1 else cv2.INTER_AREA + img = cv2.resize(input_image, (W, H), interpolation=interpolation) + return img + + +class DepthEstimator: + def __init__(self): + self.model = pipeline("depth-estimation") + + def __call__(self, image: np.ndarray, **kwargs) -> PIL.Image.Image: + detect_resolution = kwargs.pop("detect_resolution", 512) + image_resolution = kwargs.pop("image_resolution", 512) + image = np.array(image) + image = HWC3(image) + image = resize_image(image, resolution=detect_resolution) + image = PIL.Image.fromarray(image) + image = self.model(image) + image = image["depth"] + image = np.array(image) + image = HWC3(image) + image = resize_image(image, resolution=image_resolution) + return PIL.Image.fromarray(image) + + +class ImageSegmentor: + def __init__(self): + self.image_processor = AutoImageProcessor.from_pretrained( + "openmmlab/upernet-convnext-small" + ) + self.image_segmentor = UperNetForSemanticSegmentation.from_pretrained( + "openmmlab/upernet-convnext-small" + ) + + @torch.inference_mode() + def __call__(self, image: np.ndarray, **kwargs) -> PIL.Image.Image: + detect_resolution = kwargs.pop("detect_resolution", 512) + image_resolution = kwargs.pop("image_resolution", 512) + image = HWC3(image) + image = resize_image(image, resolution=detect_resolution) + image = PIL.Image.fromarray(image) + + pixel_values = self.image_processor(image, return_tensors="pt").pixel_values + outputs = self.image_segmentor(pixel_values) + seg = self.image_processor.post_process_semantic_segmentation( + outputs, target_sizes=[image.size[::-1]] + )[0] + color_seg = np.zeros((seg.shape[0], seg.shape[1], 3), dtype=np.uint8) + for label, color in enumerate(ade_palette()): + color_seg[seg == label, :] = color + color_seg = color_seg.astype(np.uint8) + + color_seg = resize_image( + color_seg, resolution=image_resolution, interpolation=cv2.INTER_NEAREST + ) + return PIL.Image.fromarray(color_seg) + + +class Preprocessor: + MODEL_ID = "lllyasviel/Annotators" + + def __init__(self): + self.model = None + self.name = "" + + def load(self, name: str) -> None: + if name == self.name: + return + if name == "HED": + self.model = HEDdetector.from_pretrained(self.MODEL_ID) + elif name == "Midas": + self.model = MidasDetector.from_pretrained(self.MODEL_ID) + elif name == "MLSD": + self.model = MLSDdetector.from_pretrained(self.MODEL_ID) + elif name == "Openpose": + self.model = OpenposeDetector.from_pretrained(self.MODEL_ID) + elif name == "PidiNet": + self.model = PidiNetDetector.from_pretrained(self.MODEL_ID) + elif name == "NormalBae": + self.model = NormalBaeDetector.from_pretrained(self.MODEL_ID) + elif name == "Lineart": + self.model = LineartDetector.from_pretrained(self.MODEL_ID) + elif name == "LineartAnime": + self.model = LineartAnimeDetector.from_pretrained(self.MODEL_ID) + elif name == "Canny": + self.model = CannyDetector() + elif name == "ContentShuffle": + self.model = ContentShuffleDetector() + elif name == "DPT": + self.model = DepthEstimator() + elif name == "UPerNet": + self.model = ImageSegmentor() + else: + raise ValueError + torch.cuda.empty_cache() + gc.collect() + self.name = name + + def __call__(self, image: PIL.Image.Image, **kwargs) -> PIL.Image.Image: + if self.name == "Canny": + if "detect_resolution" in kwargs: + detect_resolution = kwargs.pop("detect_resolution") + image = np.array(image) + image = HWC3(image) + image = resize_image(image, resolution=detect_resolution) + image = self.model(image, **kwargs) + return PIL.Image.fromarray(image) + elif self.name == "Midas": + detect_resolution = kwargs.pop("detect_resolution", 512) + image_resolution = kwargs.pop("image_resolution", 512) + image = np.array(image) + image = HWC3(image) + image = resize_image(image, resolution=detect_resolution) + image = self.model(image, **kwargs) + image = HWC3(image) + image = resize_image(image, resolution=image_resolution) + return PIL.Image.fromarray(image) + else: + return self.model(image, **kwargs) + + +# ===================================== +# Base Model +# ===================================== + +CONTROLNET_MODEL_IDS = { + "openpose": "lllyasviel/control_v11p_sd15_openpose", + "canny": "lllyasviel/control_v11p_sd15_canny", + "mlsd": "lllyasviel/control_v11p_sd15_mlsd", + "scribble": "lllyasviel/control_v11p_sd15_scribble", + "softedge": "lllyasviel/control_v11p_sd15_softedge", + "segmentation": "lllyasviel/control_v11p_sd15_seg", + "depth": "lllyasviel/control_v11f1p_sd15_depth", + "normalbae": "lllyasviel/control_v11p_sd15_normalbae", + "lineart": "lllyasviel/control_v11p_sd15_lineart", + "lineart_anime": "lllyasviel/control_v11p_sd15s2_lineart_anime", + "shuffle": "lllyasviel/control_v11e_sd15_shuffle", + "ip2p": "lllyasviel/control_v11e_sd15_ip2p", + "inpaint": "lllyasviel/control_v11p_sd15_inpaint", + "txt2img": "Nothinghere", + "sdxl_canny": "TencentARC/t2i-adapter-canny-sdxl-1.0", + "sdxl_sketch": "TencentARC/t2i-adapter-sketch-sdxl-1.0", + "sdxl_lineart": "TencentARC/t2i-adapter-lineart-sdxl-1.0", + "sdxl_depth-midas": "TencentARC/t2i-adapter-depth-midas-sdxl-1.0", + "sdxl_openpose": "TencentARC/t2i-adapter-openpose-sdxl-1.0", + #"sdxl_depth-zoe": "TencentARC/t2i-adapter-depth-zoe-sdxl-1.0", + #"sdxl_recolor": "TencentARC/t2i-adapter-recolor-sdxl-1.0", + "img2img": "Nothinghere", +} + + +# def download_all_controlnet_weights() -> None: +# for model_id in CONTROLNET_MODEL_IDS.values(): +# ControlNetModel.from_pretrained(model_id) + + +SCHEDULER_CONFIG_MAP = { + "DPM++ 2M": (DPMSolverMultistepScheduler, {}), + "DPM++ 2M Karras": (DPMSolverMultistepScheduler, {"use_karras_sigmas": True}), + "DPM++ 2M SDE": (DPMSolverMultistepScheduler, {"algorithm_type": "sde-dpmsolver++"}), + "DPM++ 2M SDE Karras": (DPMSolverMultistepScheduler, {"use_karras_sigmas": True, "algorithm_type": "sde-dpmsolver++"}), + "DPM++ SDE": (DPMSolverSinglestepScheduler, {}), + "DPM++ SDE Karras": (DPMSolverSinglestepScheduler, {"use_karras_sigmas": True}), + "DPM2": (KDPM2DiscreteScheduler, {}), + "DPM2 Karras": (KDPM2DiscreteScheduler, {"use_karras_sigmas": True}), + "DPM2 a" : (KDPM2AncestralDiscreteScheduler, {}), + "DPM2 a Karras" : (KDPM2AncestralDiscreteScheduler, {"use_karras_sigmas": True}), + "Euler": (EulerDiscreteScheduler, {}), + "Euler a": (EulerAncestralDiscreteScheduler, {}), + "Heun": (HeunDiscreteScheduler, {}), + "LMS": (LMSDiscreteScheduler, {}), + "LMS Karras": (LMSDiscreteScheduler, {"use_karras_sigmas": True}), + "DDIM": (DDIMScheduler, {}), + "DEIS": (DEISMultistepScheduler, {}), + "UniPC": (UniPCMultistepScheduler, {}), + "PNDM" : (PNDMScheduler, {}), + + "DPM++ 2M Lu": (DPMSolverMultistepScheduler, {"use_lu_lambdas": True}), + "DPM++ 2M Ef": (DPMSolverMultistepScheduler, {"euler_at_final": True}), + "DPM++ 2M SDE Lu": (DPMSolverMultistepScheduler, {"use_lu_lambdas": True, "algorithm_type": "sde-dpmsolver++"}), + "DPM++ 2M SDE Ef": (DPMSolverMultistepScheduler, {"algorithm_type": "sde-dpmsolver++", "euler_at_final": True}), + + "EDMDPM": (EDMDPMSolverMultistepScheduler, {}), + "EDMEuler": (EDMEulerScheduler, {}), + + "LCM" : (LCMScheduler, {}), +} + +scheduler_names = list(SCHEDULER_CONFIG_MAP.keys()) + +def process_prompts_valid(specific_prompt, specific_negative_prompt, prompt, negative_prompt): + specific_prompt_empty = (specific_prompt in [None, ""]) + specific_negative_prompt_empty = (specific_negative_prompt in [None, ""]) + + prompt_valid = prompt if specific_prompt_empty else specific_prompt + negative_prompt_valid = negative_prompt if specific_negative_prompt_empty else specific_negative_prompt + + return specific_prompt_empty, specific_negative_prompt_empty, prompt_valid, negative_prompt_valid + +class Model_Diffusers: + def __init__( + self, + base_model_id: str = "runwayml/stable-diffusion-v1-5", + task_name: str = "txt2img", + vae_model=None, + type_model_precision=torch.float16, + sdxl_safetensors = False, + ): + self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") + self.base_model_id = "" + self.task_name = "" + self.vae_model = None + self.type_model_precision = ( + type_model_precision if torch.cuda.is_available() else torch.float32 + ) # For SD 1.5 + + self.load_pipe( + base_model_id, task_name, vae_model, type_model_precision, sdxl_safetensors = sdxl_safetensors + ) + self.preprocessor = Preprocessor() + + self.styles_data = styles_data + self.STYLE_NAMES = STYLE_NAMES + self.style_json_file = "" + + + def load_pipe( + self, + base_model_id: str, + task_name="txt2img", + vae_model=None, + type_model_precision=torch.float16, + reload=False, + sdxl_safetensors = False, + retain_model_in_memory = True, + ) -> DiffusionPipeline: + if ( + base_model_id == self.base_model_id + and task_name == self.task_name + and hasattr(self, "pipe") + and self.vae_model == vae_model + and self.pipe is not None + and reload == False + ): + if self.type_model_precision == type_model_precision or self.device.type == "cpu": + return + + if hasattr(self, "pipe") and os.path.isfile(base_model_id): + unload_model = False + if self.pipe == None: + unload_model = True + elif type_model_precision != self.type_model_precision and self.device.type != "cpu": + unload_model = True + else: + if hasattr(self, "pipe"): + unload_model = False + if self.pipe == None: + unload_model = True + else: + unload_model = True + self.type_model_precision = ( + type_model_precision if torch.cuda.is_available() else torch.float32 + ) + + if self.type_model_precision == torch.float32 and os.path.isfile(base_model_id): + logger.info(f"Working with full precision {str(self.type_model_precision)}") + + # Load model + if self.base_model_id == base_model_id and self.pipe is not None and reload == False and self.vae_model == vae_model and unload_model == False: + #logger.info("Previous loaded base model") # not return + class_name = self.class_name + else: + # Unload previous model and stuffs + self.pipe = None + self.model_memory = {} + self.lora_memory = [None, None, None, None, None] + self.lora_scale_memory = [1.0, 1.0, 1.0, 1.0, 1.0] + self.LCMconfig = None + self.embed_loaded = [] + self.FreeU = False + torch.cuda.empty_cache() + gc.collect() + + # Load new model + if os.path.isfile(base_model_id): # exists or not same # if os.path.exists(base_model_id): + + if sdxl_safetensors: + logger.info("Default VAE: madebyollin/sdxl-vae-fp16-fix") + self.pipe = StableDiffusionXLPipeline.from_single_file( + base_model_id, + vae=AutoencoderKL.from_pretrained( + "madebyollin/sdxl-vae-fp16-fix", torch_dtype=torch.float16 + ), + torch_dtype=self.type_model_precision, + ) + class_name = "StableDiffusionXLPipeline" + else: + self.pipe = StableDiffusionPipeline.from_single_file( + base_model_id, + # vae=None + # if vae_model == None + # else AutoencoderKL.from_single_file( + # vae_model + # ), + torch_dtype=self.type_model_precision, + ) + class_name = "StableDiffusionPipeline" + else: + file_config = hf_hub_download(repo_id=base_model_id, filename="model_index.json") + + # Reading data from the JSON file + with open(file_config, 'r') as json_config: + data_config = json.load(json_config) + + # Searching for the value of the "_class_name" key + if '_class_name' in data_config: + class_name = data_config['_class_name'] + + match class_name: + case "StableDiffusionPipeline": + self.pipe = StableDiffusionPipeline.from_pretrained( + base_model_id, + torch_dtype=self.type_model_precision, + ) + + case "StableDiffusionXLPipeline": + logger.info("Default VAE: madebyollin/sdxl-vae-fp16-fix") + try: + self.pipe = DiffusionPipeline.from_pretrained( + base_model_id, + vae=AutoencoderKL.from_pretrained( + "madebyollin/sdxl-vae-fp16-fix", torch_dtype=torch.float16 + ), + torch_dtype=torch.float16, + use_safetensors=True, + variant="fp16", + add_watermarker=False, + ) + except Exception as e: + logger.debug(e) + logger.debug("Loading model without parameter variant=fp16") + self.pipe = DiffusionPipeline.from_pretrained( + base_model_id, + vae=AutoencoderKL.from_pretrained( + "madebyollin/sdxl-vae-fp16-fix", torch_dtype=torch.float16 + ), + torch_dtype=torch.float16, + use_safetensors=True, + add_watermarker=False, + ) + self.base_model_id = base_model_id + self.class_name = class_name + + # Load VAE after loaded model + if vae_model is None : + logger.debug("Default VAE") + pass + else: + if os.path.isfile(vae_model): + self.pipe.vae = AutoencoderKL.from_single_file( + vae_model + ) + else: + self.pipe.vae = AutoencoderKL.from_pretrained( + vae_model, + subfolder = "vae", + ) + try: + self.pipe.vae.to(self.type_model_precision) + except: + logger.warning(f"VAE: not in {self.type_model_precision}") + self.vae_model = vae_model + + # Define base scheduler + self.default_scheduler = copy.deepcopy(self.pipe.scheduler) + logger.debug(f"Base sampler: {self.default_scheduler}") + + if task_name in self.model_memory: + self.pipe = self.model_memory[task_name] + # Create new base values + #self.pipe.to(self.device) + # torch.cuda.empty_cache() + # gc.collect() + self.base_model_id = base_model_id + self.task_name = task_name + self.vae_model = vae_model + self.class_name = class_name + self.pipe.watermark = None + return + + # Load task + model_id = CONTROLNET_MODEL_IDS[task_name] + + if task_name == "inpaint": + match class_name: + case "StableDiffusionPipeline": + + controlnet = ControlNetModel.from_pretrained( + model_id, torch_dtype=self.type_model_precision + ) + + self.pipe = StableDiffusionControlNetInpaintPipeline( + vae=self.pipe.vae, + text_encoder=self.pipe.text_encoder, + tokenizer=self.pipe.tokenizer, + unet=self.pipe.unet, + controlnet=controlnet, + scheduler=self.pipe.scheduler, + safety_checker=self.pipe.safety_checker, + feature_extractor=self.pipe.feature_extractor, + requires_safety_checker=self.pipe.config.requires_safety_checker, + ) + case "StableDiffusionXLPipeline": + + self.pipe = StableDiffusionXLInpaintPipeline( + vae=self.pipe.vae, + text_encoder=self.pipe.text_encoder, + text_encoder_2=self.pipe.text_encoder_2, + tokenizer=self.pipe.tokenizer, + tokenizer_2=self.pipe.tokenizer_2, + unet=self.pipe.unet, + # controlnet=self.controlnet, + scheduler=self.pipe.scheduler, + ) + + + if task_name not in ["txt2img", "inpaint", "img2img"]: + match class_name: + case "StableDiffusionPipeline": + + controlnet = ControlNetModel.from_pretrained( + model_id, torch_dtype=self.type_model_precision + ) + + self.pipe = StableDiffusionControlNetPipeline( + vae=self.pipe.vae, + text_encoder=self.pipe.text_encoder, + tokenizer=self.pipe.tokenizer, + unet=self.pipe.unet, + controlnet=controlnet, + scheduler=self.pipe.scheduler, + safety_checker=self.pipe.safety_checker, + feature_extractor=self.pipe.feature_extractor, + requires_safety_checker=self.pipe.config.requires_safety_checker, + ) + self.pipe.scheduler = UniPCMultistepScheduler.from_config(self.pipe.scheduler.config) + + case "StableDiffusionXLPipeline": + + adapter = T2IAdapter.from_pretrained( + model_id, + torch_dtype=torch.float16, + varient="fp16", + ).to(self.device) + + self.pipe = StableDiffusionXLAdapterPipeline( + vae=self.pipe.vae, + text_encoder=self.pipe.text_encoder, + text_encoder_2=self.pipe.text_encoder_2, + tokenizer=self.pipe.tokenizer, + tokenizer_2=self.pipe.tokenizer_2, + unet=self.pipe.unet, + adapter=adapter, + scheduler=self.pipe.scheduler, + ).to(self.device) + + + if task_name in ["txt2img", "img2img"]: + match class_name: + + case "StableDiffusionPipeline": + self.pipe = StableDiffusionPipeline( + vae=self.pipe.vae, + text_encoder=self.pipe.text_encoder, + tokenizer=self.pipe.tokenizer, + unet=self.pipe.unet, + scheduler=self.pipe.scheduler, + safety_checker=self.pipe.safety_checker, + feature_extractor=self.pipe.feature_extractor, + requires_safety_checker=self.pipe.config.requires_safety_checker, + ) + + case "StableDiffusionXLPipeline": + self.pipe = StableDiffusionXLPipeline( + vae=self.pipe.vae, + text_encoder=self.pipe.text_encoder, + text_encoder_2=self.pipe.text_encoder_2, + tokenizer=self.pipe.tokenizer, + tokenizer_2=self.pipe.tokenizer_2, + unet=self.pipe.unet, + scheduler=self.pipe.scheduler, + ) + + if task_name == "img2img": + self.pipe = AutoPipelineForImage2Image.from_pipe(self.pipe) + + # Create new base values + self.pipe.to(self.device) + torch.cuda.empty_cache() + gc.collect() + + self.base_model_id = base_model_id + self.task_name = task_name + self.vae_model = vae_model + self.class_name = class_name + + if self.class_name == "StableDiffusionXLPipeline": + self.pipe.enable_vae_slicing() + self.pipe.enable_vae_tiling() + self.pipe.watermark = None + + if retain_model_in_memory == True and task_name not in self.model_memory: + self.model_memory[task_name] = self.pipe + + return + + def load_controlnet_weight(self, task_name: str) -> None: + torch.cuda.empty_cache() + gc.collect() + model_id = CONTROLNET_MODEL_IDS[task_name] + controlnet = ControlNetModel.from_pretrained( + model_id, torch_dtype=self.type_model_precision + ) + controlnet.to(self.device) + torch.cuda.empty_cache() + gc.collect() + self.pipe.controlnet = controlnet + #self.task_name = task_name + + @torch.autocast("cuda") + def run_pipe( + self, + prompt: str, + negative_prompt: str, + prompt_embeds, + negative_prompt_embeds, + control_image: PIL.Image.Image, + num_images: int, + num_steps: int, + guidance_scale: float, + clip_skip: int, + generator, + controlnet_conditioning_scale, + control_guidance_start, + control_guidance_end, + ) -> list[PIL.Image.Image]: + # Return PIL images + # generator = torch.Generator().manual_seed(seed) + return self.pipe( + prompt=prompt, + negative_prompt=negative_prompt, + prompt_embeds=prompt_embeds, + negative_prompt_embeds=negative_prompt_embeds, + guidance_scale=guidance_scale, + clip_skip=clip_skip, + num_images_per_prompt=num_images, + num_inference_steps=num_steps, + generator=generator, + controlnet_conditioning_scale=controlnet_conditioning_scale, + control_guidance_start=control_guidance_start, + control_guidance_end=control_guidance_end, + image=control_image, + ).images + + @torch.autocast("cuda") + def run_pipe_SD( + self, + prompt: str, + negative_prompt: str, + prompt_embeds, + negative_prompt_embeds, + num_images: int, + num_steps: int, + guidance_scale: float, + clip_skip: int, + height: int, + width: int, + generator, + ) -> list[PIL.Image.Image]: + # Return PIL images + # generator = torch.Generator().manual_seed(seed) + self.preview_handle = None + return self.pipe( + prompt=prompt, + negative_prompt=negative_prompt, + prompt_embeds=prompt_embeds, + negative_prompt_embeds=negative_prompt_embeds, + guidance_scale=guidance_scale, + clip_skip=clip_skip, + num_images_per_prompt=num_images, + num_inference_steps=num_steps, + generator=generator, + height=height, + width=width, + callback=self.callback_pipe if self.image_previews else None, + callback_steps=10 if self.image_previews else 100, + ).images + + # @torch.autocast('cuda') + # def run_pipe_SDXL( + # self, + # prompt: str, + # negative_prompt: str, + # prompt_embeds, + # negative_prompt_embeds, + # num_images: int, + # num_steps: int, + # guidance_scale: float, + # clip_skip: int, + # height : int, + # width : int, + # generator, + # seddd, + # conditioning, + # pooled, + # ) -> list[PIL.Image.Image]: + # # Return PIL images + # #generator = torch.Generator("cuda").manual_seed(seddd) # generator = torch.Generator("cuda").manual_seed(seed), + # return self.pipe( + # prompt = None, + # negative_prompt = None, + # prompt_embeds=conditioning[0:1], + # pooled_prompt_embeds=pooled[0:1], + # negative_prompt_embeds=conditioning[1:2], + # negative_pooled_prompt_embeds=pooled[1:2], + # height = height, + # width = width, + # num_inference_steps = num_steps, + # guidance_scale = guidance_scale, + # clip_skip = clip_skip, + # num_images_per_prompt = num_images, + # generator = generator, + # ).images + + @torch.autocast("cuda") + def run_pipe_inpaint( + self, + prompt: str, + negative_prompt: str, + prompt_embeds, + negative_prompt_embeds, + control_image: PIL.Image.Image, + num_images: int, + num_steps: int, + guidance_scale: float, + clip_skip: int, + strength: float, + init_image, + control_mask, + controlnet_conditioning_scale, + control_guidance_start, + control_guidance_end, + generator, + ) -> list[PIL.Image.Image]: + # Return PIL images + # generator = torch.Generator().manual_seed(seed) + return self.pipe( + prompt=None, + negative_prompt=None, + prompt_embeds=prompt_embeds, + negative_prompt_embeds=negative_prompt_embeds, + eta=1.0, + strength=strength, + image=init_image, # original image + mask_image=control_mask, # mask, values of 0 to 255 + control_image=control_image, # tensor control image + num_images_per_prompt=num_images, + num_inference_steps=num_steps, + guidance_scale=guidance_scale, + clip_skip=clip_skip, + generator=generator, + controlnet_conditioning_scale=controlnet_conditioning_scale, + control_guidance_start=control_guidance_start, + control_guidance_end=control_guidance_end, + ).images + + @torch.autocast("cuda") + def run_pipe_img2img( + self, + prompt: str, + negative_prompt: str, + prompt_embeds, + negative_prompt_embeds, + num_images: int, + num_steps: int, + guidance_scale: float, + clip_skip: int, + strength: float, + init_image, + generator, + ) -> list[PIL.Image.Image]: + # Return PIL images + # generator = torch.Generator().manual_seed(seed) + return self.pipe( + prompt=None, + negative_prompt=None, + prompt_embeds=prompt_embeds, + negative_prompt_embeds=negative_prompt_embeds, + eta=1.0, + strength=strength, + image=init_image, # original image + num_images_per_prompt=num_images, + num_inference_steps=num_steps, + guidance_scale=guidance_scale, + clip_skip=clip_skip, + generator=generator, + ).images + + ### self.x_process return image_preprocessor### + @torch.inference_mode() + def process_canny( + self, + image: np.ndarray, + image_resolution: int, + preprocess_resolution: int, + low_threshold: int, + high_threshold: int, + ) -> list[PIL.Image.Image]: + if image is None: + raise ValueError + + self.preprocessor.load("Canny") + control_image = self.preprocessor( + image=image, + low_threshold=low_threshold, + high_threshold=high_threshold, + image_resolution=image_resolution, + detect_resolution=preprocess_resolution, + ) + + return control_image + + @torch.inference_mode() + def process_mlsd( + self, + image: np.ndarray, + image_resolution: int, + preprocess_resolution: int, + value_threshold: float, + distance_threshold: float, + ) -> list[PIL.Image.Image]: + if image is None: + raise ValueError + + self.preprocessor.load("MLSD") + control_image = self.preprocessor( + image=image, + image_resolution=image_resolution, + detect_resolution=preprocess_resolution, + thr_v=value_threshold, + thr_d=distance_threshold, + ) + + return control_image + + @torch.inference_mode() + def process_scribble( + self, + image: np.ndarray, + image_resolution: int, + preprocess_resolution: int, + preprocessor_name: str, + ) -> list[PIL.Image.Image]: + if image is None: + raise ValueError + + if preprocessor_name == "None": + image = HWC3(image) + image = resize_image(image, resolution=image_resolution) + control_image = PIL.Image.fromarray(image) + elif preprocessor_name == "HED": + self.preprocessor.load(preprocessor_name) + control_image = self.preprocessor( + image=image, + image_resolution=image_resolution, + detect_resolution=preprocess_resolution, + scribble=False, + ) + elif preprocessor_name == "PidiNet": + self.preprocessor.load(preprocessor_name) + control_image = self.preprocessor( + image=image, + image_resolution=image_resolution, + detect_resolution=preprocess_resolution, + safe=False, + ) + + return control_image + + @torch.inference_mode() + def process_scribble_interactive( + self, + image_and_mask: dict[str, np.ndarray], + image_resolution: int, + ) -> list[PIL.Image.Image]: + if image_and_mask is None: + raise ValueError + + image = image_and_mask["mask"] + image = HWC3(image) + image = resize_image(image, resolution=image_resolution) + control_image = PIL.Image.fromarray(image) + + return control_image + + @torch.inference_mode() + def process_softedge( + self, + image: np.ndarray, + image_resolution: int, + preprocess_resolution: int, + preprocessor_name: str, + ) -> list[PIL.Image.Image]: + if image is None: + raise ValueError + + if preprocessor_name == "None": + image = HWC3(image) + image = resize_image(image, resolution=image_resolution) + control_image = PIL.Image.fromarray(image) + elif preprocessor_name in ["HED", "HED safe"]: + safe = "safe" in preprocessor_name + self.preprocessor.load("HED") + control_image = self.preprocessor( + image=image, + image_resolution=image_resolution, + detect_resolution=preprocess_resolution, + scribble=safe, + ) + elif preprocessor_name in ["PidiNet", "PidiNet safe"]: + safe = "safe" in preprocessor_name + self.preprocessor.load("PidiNet") + control_image = self.preprocessor( + image=image, + image_resolution=image_resolution, + detect_resolution=preprocess_resolution, + safe=safe, + ) + else: + raise ValueError + + return control_image + + @torch.inference_mode() + def process_openpose( + self, + image: np.ndarray, + image_resolution: int, + preprocess_resolution: int, + preprocessor_name: str, + ) -> list[PIL.Image.Image]: + if image is None: + raise ValueError + + if preprocessor_name == "None": + image = HWC3(image) + image = resize_image(image, resolution=image_resolution) + control_image = PIL.Image.fromarray(image) + else: + self.preprocessor.load("Openpose") + control_image = self.preprocessor( + image=image, + image_resolution=image_resolution, + detect_resolution=preprocess_resolution, + hand_and_face=True, + ) + + return control_image + + @torch.inference_mode() + def process_segmentation( + self, + image: np.ndarray, + image_resolution: int, + preprocess_resolution: int, + preprocessor_name: str, + ) -> list[PIL.Image.Image]: + if image is None: + raise ValueError + + if preprocessor_name == "None": + image = HWC3(image) + image = resize_image(image, resolution=image_resolution) + control_image = PIL.Image.fromarray(image) + else: + self.preprocessor.load(preprocessor_name) + control_image = self.preprocessor( + image=image, + image_resolution=image_resolution, + detect_resolution=preprocess_resolution, + ) + + return control_image + + @torch.inference_mode() + def process_depth( + self, + image: np.ndarray, + image_resolution: int, + preprocess_resolution: int, + preprocessor_name: str, + ) -> list[PIL.Image.Image]: + if image is None: + raise ValueError + + if preprocessor_name == "None": + image = HWC3(image) + image = resize_image(image, resolution=image_resolution) + control_image = PIL.Image.fromarray(image) + else: + self.preprocessor.load(preprocessor_name) + control_image = self.preprocessor( + image=image, + image_resolution=image_resolution, + detect_resolution=preprocess_resolution, + ) + + return control_image + + @torch.inference_mode() + def process_normal( + self, + image: np.ndarray, + image_resolution: int, + preprocess_resolution: int, + preprocessor_name: str, + ) -> list[PIL.Image.Image]: + if image is None: + raise ValueError + + if preprocessor_name == "None": + image = HWC3(image) + image = resize_image(image, resolution=image_resolution) + control_image = PIL.Image.fromarray(image) + else: + self.preprocessor.load("NormalBae") + control_image = self.preprocessor( + image=image, + image_resolution=image_resolution, + detect_resolution=preprocess_resolution, + ) + + return control_image + + @torch.inference_mode() + def process_lineart( + self, + image: np.ndarray, + image_resolution: int, + preprocess_resolution: int, + preprocessor_name: str, + ) -> list[PIL.Image.Image]: + if image is None: + raise ValueError + + if preprocessor_name in ["None", "None (anime)"]: + image = HWC3(image) + image = resize_image(image, resolution=image_resolution) + control_image = PIL.Image.fromarray(image) + elif preprocessor_name in ["Lineart", "Lineart coarse"]: + coarse = "coarse" in preprocessor_name + self.preprocessor.load("Lineart") + control_image = self.preprocessor( + image=image, + image_resolution=image_resolution, + detect_resolution=preprocess_resolution, + coarse=coarse, + ) + elif preprocessor_name == "Lineart (anime)": + self.preprocessor.load("LineartAnime") + control_image = self.preprocessor( + image=image, + image_resolution=image_resolution, + detect_resolution=preprocess_resolution, + ) + + if self.class_name == "StableDiffusionPipeline": + if "anime" in preprocessor_name: + self.load_controlnet_weight("lineart_anime") + logger.info("Linear anime") + else: + self.load_controlnet_weight("lineart") + + return control_image + + @torch.inference_mode() + def process_shuffle( + self, + image: np.ndarray, + image_resolution: int, + preprocessor_name: str, + ) -> list[PIL.Image.Image]: + if image is None: + raise ValueError + + if preprocessor_name == "None": + image = HWC3(image) + image = resize_image(image, resolution=image_resolution) + control_image = PIL.Image.fromarray(image) + else: + self.preprocessor.load(preprocessor_name) + control_image = self.preprocessor( + image=image, + image_resolution=image_resolution, + ) + + return control_image + + @torch.inference_mode() + def process_ip2p( + self, + image: np.ndarray, + image_resolution: int, + ) -> list[PIL.Image.Image]: + if image is None: + raise ValueError + + image = HWC3(image) + image = resize_image(image, resolution=image_resolution) + control_image = PIL.Image.fromarray(image) + + return control_image + + @torch.inference_mode() + def process_inpaint( + self, + image: np.ndarray, + image_resolution: int, + preprocess_resolution: int, + image_mask: str, ### + ) -> list[PIL.Image.Image]: + if image is None: + raise ValueError + + image = HWC3(image) + image = resize_image(image, resolution=image_resolution) + init_image = PIL.Image.fromarray(image) + + image_mask = HWC3(image_mask) + image_mask = resize_image(image_mask, resolution=image_resolution) + control_mask = PIL.Image.fromarray(image_mask) + + control_image = make_inpaint_condition(init_image, control_mask) + + return init_image, control_mask, control_image + + @torch.inference_mode() + def process_img2img( + self, + image: np.ndarray, + image_resolution: int, + ) -> list[PIL.Image.Image]: + if image is None: + raise ValueError + + image = HWC3(image) + image = resize_image(image, resolution=image_resolution) + init_image = PIL.Image.fromarray(image) + + return init_image + + def get_scheduler(self, name): + if name in SCHEDULER_CONFIG_MAP: + scheduler_class, config = SCHEDULER_CONFIG_MAP[name] + #return scheduler_class.from_config(self.pipe.scheduler.config, **config) + # beta self.default_scheduler + return scheduler_class.from_config(self.default_scheduler.config, **config) + else: + raise ValueError(f"Scheduler with name {name} not found. Valid schedulers: {', '.join(scheduler_names)}") + + def create_prompt_embeds( + self, + prompt, + negative_prompt, + textual_inversion, + clip_skip, + syntax_weights, + ): + if self.class_name == "StableDiffusionPipeline": + if self.embed_loaded != textual_inversion and textual_inversion != []: + # Textual Inversion + for name, directory_name in textual_inversion: + try: + if directory_name.endswith(".pt"): + model = torch.load(directory_name, map_location=self.device) + model_tensors = model.get("string_to_param").get("*") + s_model = {"emb_params": model_tensors} + # save_file(s_model, directory_name[:-3] + '.safetensors') + self.pipe.load_textual_inversion(s_model, token=name) + + else: + # self.pipe.text_encoder.resize_token_embeddings(len(self.pipe.tokenizer),pad_to_multiple_of=128) + # self.pipe.load_textual_inversion("./bad_prompt.pt", token="baddd") + self.pipe.load_textual_inversion(directory_name, token=name) + if not self.gui_active: + logger.info(f"Applied : {name}") + + except Exception as e: + exception = str(e) + if name in exception: + logger.debug(f"Previous loaded embed {name}") + else: + logger.error(exception) + logger.error(f"Can't apply embed {name}") + self.embed_loaded = textual_inversion + + # Clip skip + # clip_skip_diffusers = None #clip_skip - 1 # future update + if not hasattr(self, "compel"): + self.compel = Compel( + tokenizer=self.pipe.tokenizer, + text_encoder=self.pipe.text_encoder, + truncate_long_prompts=False, + returned_embeddings_type=ReturnedEmbeddingsType.PENULTIMATE_HIDDEN_STATES_NORMALIZED if clip_skip else ReturnedEmbeddingsType.LAST_HIDDEN_STATES_NORMALIZED, + ) + + # Prompt weights for textual inversion + prompt_ti = self.pipe.maybe_convert_prompt(prompt, self.pipe.tokenizer) + negative_prompt_ti = self.pipe.maybe_convert_prompt( + negative_prompt, self.pipe.tokenizer + ) + + # separate the multi-vector textual inversion by comma + if self.embed_loaded != []: + prompt_ti = add_comma_after_pattern_ti(prompt_ti) + negative_prompt_ti = add_comma_after_pattern_ti(negative_prompt_ti) + + # Syntax weights + self.pipe.to(self.device) + if syntax_weights == "Classic": + prompt_emb = get_embed_new(prompt_ti, self.pipe, self.compel) + negative_prompt_emb = get_embed_new(negative_prompt_ti, self.pipe, self.compel) + else: + prompt_emb = get_embed_new(prompt_ti, self.pipe, self.compel, compel_process_sd=True) + negative_prompt_emb = get_embed_new(negative_prompt_ti, self.pipe, self.compel, compel_process_sd=True) + + # Fix error shape + if prompt_emb.shape != negative_prompt_emb.shape: + ( + prompt_emb, + negative_prompt_emb, + ) = self.compel.pad_conditioning_tensors_to_same_length( + [prompt_emb, negative_prompt_emb] + ) + + return prompt_emb, negative_prompt_emb + + else: + # SDXL embed + if self.embed_loaded != textual_inversion and textual_inversion != []: + # Textual Inversion + for name, directory_name in textual_inversion: + try: + from safetensors.torch import load_file + state_dict = load_file(directory_name) + self.pipe.load_textual_inversion(state_dict["clip_g"], token=name, text_encoder=self.pipe.text_encoder_2, tokenizer=self.pipe.tokenizer_2) + self.pipe.load_textual_inversion(state_dict["clip_l"], token=name, text_encoder=self.pipe.text_encoder, tokenizer=self.pipe.tokenizer) + if not self.gui_active: + logger.info(f"Applied : {name}") + except Exception as e: + exception = str(e) + if name in exception: + logger.debug(f"Previous loaded embed {name}") + else: + logger.error(exception) + logger.error(f"Can't apply embed {name}") + self.embed_loaded = textual_inversion + + if not hasattr(self, "compel"): + # Clip skip + if clip_skip: + # clip_skip_diffusers = None #clip_skip - 1 # future update + self.compel = Compel( + tokenizer=[self.pipe.tokenizer, self.pipe.tokenizer_2], + text_encoder=[self.pipe.text_encoder, self.pipe.text_encoder_2], + returned_embeddings_type=ReturnedEmbeddingsType.PENULTIMATE_HIDDEN_STATES_NON_NORMALIZED, + requires_pooled=[False, True], + truncate_long_prompts=False, + ) + else: + # clip_skip_diffusers = None # clip_skip = None # future update + self.compel = Compel( + tokenizer=[self.pipe.tokenizer, self.pipe.tokenizer_2], + text_encoder=[self.pipe.text_encoder, self.pipe.text_encoder_2], + requires_pooled=[False, True], + truncate_long_prompts=False, + ) + + # Prompt weights for textual inversion + try: + prompt_ti = self.pipe.maybe_convert_prompt(prompt, self.pipe.tokenizer) + negative_prompt_ti = self.pipe.maybe_convert_prompt(negative_prompt, self.pipe.tokenizer) + except: + prompt_ti = prompt + negative_prompt_ti = negative_prompt + logger.error("FAILED: Convert prompt for textual inversion") + + # prompt syntax style a1... + if syntax_weights == "Classic": + self.pipe.to("cuda") + prompt_ti = get_embed_new(prompt_ti, self.pipe, self.compel, only_convert_string=True) + negative_prompt_ti = get_embed_new(negative_prompt_ti, self.pipe, self.compel, only_convert_string=True) + else: + prompt_ti = prompt + negative_prompt_ti = negative_prompt + + conditioning, pooled = self.compel([prompt_ti, negative_prompt_ti]) + + return conditioning, pooled + + + + def process_lora(self, select_lora, lora_weights_scale, unload=False): + device = "cuda" if torch.cuda.is_available() else "cpu" + if not unload: + if select_lora != None: + try: + self.pipe = lora_mix_load( + self.pipe, + select_lora, + lora_weights_scale, + device=device, + dtype=self.type_model_precision, + ) + logger.info(select_lora) + except Exception as e: + logger.error(f"ERROR: LoRA not compatible: {select_lora}") + logger.debug(f"{str(e)}") + return self.pipe + else: + # Unload numerically unstable but fast and need less memory + if select_lora != None: + try: + self.pipe = lora_mix_load( + self.pipe, + select_lora, + -lora_weights_scale, + device=device, + dtype=self.type_model_precision, + ) + logger.debug(f"Unload LoRA: {select_lora}") + except: + pass + return self.pipe + + def load_style_file(self, style_json_file): + if os.path.exists(style_json_file): + try: + file_json_read = get_json_content(style_json_file) + self.styles_data = {k["name"]: (k["prompt"], k["negative_prompt"]) for k in file_json_read} + self.STYLE_NAMES = list(self.styles_data.keys()) + self.style_json_file = style_json_file + logger.info(f"Styles json file loaded with {len(self.STYLE_NAMES)} styles") + logger.debug(str(self.STYLE_NAMES)) + except Exception as e: + logger.error(str(e)) + else: + logger.error("Not found styles json file in directory") + + def callback_pipe(self, iter, t, latents): + # convert latents to image + with torch.no_grad(): + latents = 1 / 0.18215 * latents + image = self.pipe.vae.decode(latents).sample + + image = (image / 2 + 0.5).clamp(0, 1) + + # we always cast to float32 as this does not cause significant overhead and is compatible with bfloa16 + image = image.cpu().permute(0, 2, 3, 1).float().numpy() + + # convert to PIL Images + image = self.pipe.numpy_to_pil(image) + + # show one image + # global preview_handle + if self.preview_handle == None: + self.preview_handle = display(image[0], display_id=True) + else: + self.preview_handle.update(image[0]) + + def __call__( + self, + prompt: str = "", + negative_prompt: str = "", + img_height: int = 512, + img_width: int = 512, + num_images: int = 1, + num_steps: int = 30, + guidance_scale: float = 7.5, + clip_skip: Optional[bool] = True, + seed: int = -1, + sampler: str = "DPM++ 2M", + syntax_weights: str = "Classic", + + lora_A: Optional[str] = None, + lora_scale_A: float = 1.0, + lora_B: Optional[str] = None, + lora_scale_B: float = 1.0, + lora_C: Optional[str] = None, + lora_scale_C: float = 1.0, + lora_D: Optional[str] = None, + lora_scale_D: float = 1.0, + lora_E: Optional[str] = None, + lora_scale_E: float = 1.0, + textual_inversion: List[Tuple[str, str]] = [], + FreeU: bool = False, + adetailer_A: bool = False, + adetailer_A_params: Dict[str, Any] = {}, + adetailer_B: bool = False, + adetailer_B_params: Dict[str, Any] = {}, + style_prompt: Optional[Any] = [""], + style_json_file: Optional[Any] = "", + + image: Optional[Any] = None, + preprocessor_name: Optional[str] = "None", + preprocess_resolution: int = 512, + image_resolution: int = 512, + image_mask: Optional[Any] = None, + strength: float = 0.35, + low_threshold: int = 100, + high_threshold: int = 200, + value_threshold: float = 0.1, + distance_threshold: float = 0.1, + controlnet_conditioning_scale: float = 1.0, + control_guidance_start: float = 0.0, + control_guidance_end: float = 1.0, + t2i_adapter_preprocessor: bool = True, + t2i_adapter_conditioning_scale: float = 1.0, + t2i_adapter_conditioning_factor: float = 1.0, + + upscaler_model_path: Optional[str] = None, # add latent + upscaler_increases_size: float = 1.5, + esrgan_tile: int = 100, + esrgan_tile_overlap: int = 10, + hires_steps: int = 25, + hires_denoising_strength: float = 0.35, + hires_prompt: str = "", + hires_negative_prompt: str = "", + hires_sampler: str = "Use same sampler", + + loop_generation: int = 1, + display_images: bool = False, + save_generated_images: bool = True, + image_storage_location: str = "./images", + generator_in_cpu: bool = False, + leave_progress_bar: bool = False, + disable_progress_bar: bool = False, + hires_before_adetailer: bool = False, + hires_after_adetailer: bool = True, + retain_compel_previous_load: bool = False, + retain_detailfix_model_previous_load: bool = False, + retain_hires_model_previous_load: bool = False, + image_previews: bool = False, + xformers_memory_efficient_attention: bool = False, + gui_active: bool = False, + ): + + """ + The call function for the generation. + + Args: + prompt (str , optional): + The prompt or prompts to guide image generation. + negative_prompt (str , optional): + The prompt or prompts to guide what to not include in image generation. Ignored when not using guidance (`guidance_scale < 1`). + img_height (int, optional, defaults to 512): + The height in pixels of the generated image. + img_width (int, optional, defaults to 512): + The width in pixels of the generated image. + num_images (int, optional, defaults to 1): + The number of images to generate per prompt. + num_steps (int, optional, defaults to 30): + The number of denoising steps. More denoising steps usually lead to a higher quality image at the + expense of slower inference. + guidance_scale (float, optional, defaults to 7.5): + A higher guidance scale value encourages the model to generate images closely linked to the text + `prompt` at the expense of lower image quality. Guidance scale is enabled when `guidance_scale > 1`. + clip_skip (bool, optional): + Number of layers to be skipped from CLIP while computing the prompt embeddings. It can be placed on + the penultimate (True) or last layer (False). + seed (int, optional, defaults to -1): + A seed for controlling the randomness of the image generation process. -1 design a random seed. + sampler (str, optional, defaults to "DPM++ 2M"): + The sampler used for the generation process. Available samplers: DPM++ 2M, DPM++ 2M Karras, DPM++ 2M SDE, + DPM++ 2M SDE Karras, DPM++ SDE, DPM++ SDE Karras, DPM2, DPM2 Karras, Euler, Euler a, Heun, LMS, LMS Karras, + DDIM, DEIS, UniPC, DPM2 a, DPM2 a Karras, PNDM, LCM, DPM++ 2M Lu, DPM++ 2M Ef, DPM++ 2M SDE Lu and DPM++ 2M SDE Ef. + syntax_weights (str, optional, defaults to "Classic"): + Specifies the type of syntax weights used during generation. "Classic" is (word:weight), "Compel" is (word)weight + lora_A (str, optional): + Placeholder for lora A parameter. + lora_scale_A (float, optional, defaults to 1.0): + Placeholder for lora scale A parameter. + lora_B (str, optional): + Placeholder for lora B parameter. + lora_scale_B (float, optional, defaults to 1.0): + Placeholder for lora scale B parameter. + lora_C (str, optional): + Placeholder for lora C parameter. + lora_scale_C (float, optional, defaults to 1.0): + Placeholder for lora scale C parameter. + lora_D (str, optional): + Placeholder for lora D parameter. + lora_scale_D (float, optional, defaults to 1.0): + Placeholder for lora scale D parameter. + lora_E (str, optional): + Placeholder for lora E parameter. + lora_scale_E (float, optional, defaults to 1.0): + Placeholder for lora scale E parameter. + textual_inversion (List[Tuple[str, str]], optional, defaults to []): + Placeholder for textual inversion list of tuples. Help the model to adapt to a particular + style. [("",""),...] + FreeU (bool, optional, defaults to False): + Is a method that substantially improves diffusion model sample quality at no costs. + adetailer_A (bool, optional, defaults to False): + Guided Inpainting to Correct Image, it is preferable to use low values for strength. + adetailer_A_params (Dict[str, Any], optional, defaults to {}): + Placeholder for adetailer_A parameters in a dict example {"prompt": "my prompt", "inpaint_only": True ...}. + If not specified, default values will be used: + - face_detector_ad (bool): Indicates whether face detection is enabled. Defaults to True. + - person_detector_ad (bool): Indicates whether person detection is enabled. Defaults to True. + - hand_detector_ad (bool): Indicates whether hand detection is enabled. Defaults to False. + - prompt (str): A prompt for the adetailer_A. Defaults to an empty string. + - negative_prompt (str): A negative prompt for the adetailer_A. Defaults to an empty string. + - strength (float): The strength parameter value. Defaults to 0.35. + - mask_dilation (int): The mask dilation value. Defaults to 4. + - mask_blur (int): The mask blur value. Defaults to 4. + - mask_padding (int): The mask padding value. Defaults to 32. + - inpaint_only (bool): Indicates if only inpainting is to be performed. Defaults to True. False is img2img mode + - sampler (str): The sampler type to be used. Defaults to "Use same sampler". + adetailer_B (bool, optional, defaults to False): + Guided Inpainting to Correct Image, it is preferable to use low values for strength. + adetailer_B_params (Dict[str, Any], optional, defaults to {}): + Placeholder for adetailer_B parameters in a dict example {"prompt": "my prompt", "inpaint_only": True ...}. + If not specified, default values will be used. + style_prompt (str, optional): + If a style that is in STYLE_NAMES is specified, it will be added to the original prompt and negative prompt. + style_json_file (str, optional): + JSON with styles to be applied and used in style_prompt. + upscaler_model_path (str, optional): + Placeholder for upscaler model path. + upscaler_increases_size (float, optional, defaults to 1.5): + Placeholder for upscaler increases size parameter. + esrgan_tile (int, optional, defaults to 100): + Tile if use a ESRGAN model. + esrgan_tile_overlap (int, optional, defaults to 100): + Tile overlap if use a ESRGAN model. + hires_steps (int, optional, defaults to 25): + The number of denoising steps for hires. More denoising steps usually lead to a higher quality image at the + expense of slower inference. + hires_denoising_strength (float, optional, defaults to 0.35): + Strength parameter for the hires. + hires_prompt (str , optional): + The prompt for hires. If not specified, the main prompt will be used. + hires_negative_prompt (str , optional): + The negative prompt for hires. If not specified, the main negative prompt will be used. + hires_sampler (str, optional, defaults to "Use same sampler"): + The sampler used for the hires generation process. If not specified, the main sampler will be used. + image (Any, optional): + The image to be used for the Inpaint, ControlNet, or T2I adapter. + preprocessor_name (str, optional, defaults to "None"): + Preprocessor name for ControlNet. + preprocess_resolution (int, optional, defaults to 512): + Preprocess resolution for the Inpaint, ControlNet, or T2I adapter. + image_resolution (int, optional, defaults to 512): + Image resolution for the Img2Img, Inpaint, ControlNet, or T2I adapter. + image_mask (Any, optional): + Path image mask for the Inpaint. + strength (float, optional, defaults to 0.35): + Strength parameter for the Inpaint and Img2Img. + low_threshold (int, optional, defaults to 100): + Low threshold parameter for ControlNet and T2I Adapter Canny. + high_threshold (int, optional, defaults to 200): + High threshold parameter for ControlNet and T2I Adapter Canny. + value_threshold (float, optional, defaults to 0.1): + Value threshold parameter for ControlNet MLSD. + distance_threshold (float, optional, defaults to 0.1): + Distance threshold parameter for ControlNet MLSD. + controlnet_conditioning_scale (float, optional, defaults to 1.0): + The outputs of the ControlNet are multiplied by `controlnet_conditioning_scale` before they are added + to the residual in the original `unet`. Used in ControlNet and Inpaint + control_guidance_start (float, optional, defaults to 0.0): + The percentage of total steps at which the ControlNet starts applying. Used in ControlNet and Inpaint + control_guidance_end (float, optional, defaults to 1.0): + The percentage of total steps at which the ControlNet stops applying. Used in ControlNet and Inpaint + t2i_adapter_preprocessor (bool, optional, defaults to True): + Preprocessor for the image in sdxl_canny by default is True. + t2i_adapter_conditioning_scale (float, optional, defaults to 1.0): + The outputs of the adapter are multiplied by `t2i_adapter_conditioning_scale` before they are added to the + residual in the original unet. + t2i_adapter_conditioning_factor (float, optional, defaults to 1.0): + The fraction of timesteps for which adapter should be applied. If `t2i_adapter_conditioning_factor` is + `0.0`, adapter is not applied at all. If `t2i_adapter_conditioning_factor` is `1.0`, adapter is applied for + all timesteps. If `t2i_adapter_conditioning_factor` is `0.5`, adapter is applied for half of the timesteps. + loop_generation (int, optional, defaults to 1): + The number of times the specified `num_images` will be generated. + display_images (bool, optional, defaults to False): + If you use a notebook, you will be able to display the images generated with this parameter. + save_generated_images (bool, optional, defaults to True): + By default, the generated images are saved in the current location within the 'images' folder. You can disable this with this parameter. + image_storage_location (str , optional, defaults to "./images"): + The directory where the generated images are saved. + generator_in_cpu (bool, optional, defaults to False): + The generator by default is specified on the GPU. To obtain more consistent results across various environments, + it is preferable to use the generator on the CPU. + leave_progress_bar (bool, optional, defaults to False): + Leave the progress bar after generating the image. + disable_progress_bar (bool, optional, defaults to False): + Do not display the progress bar during image generation. + hires_before_adetailer (bool, optional, defaults to False): + Apply an upscale and high-resolution fix before adetailer. + hires_after_adetailer (bool, optional, defaults to True): + Apply an upscale and high-resolution fix after adetailer. + retain_compel_previous_load (bool, optional, defaults to False): + The previous compel remains preloaded in memory. + retain_detailfix_model_previous_load (bool, optional, defaults to False): + The previous adetailer model remains preloaded in memory. + retain_hires_model_previous_load (bool, optional, defaults to False): + The previous hires model remains preloaded in memory. + image_previews (bool, optional, defaults to False): + Displaying the image denoising process. + xformers_memory_efficient_attention (bool, optional, defaults to False): + Improves generation time, currently disabled. + gui_active (bool, optional, defaults to False): + utility when used with a GUI, it changes the behavior especially by displaying confirmation messages or options. + + Specific parameter usage details: + + Additional parameters that will be used in Inpaint: + - image + - image_mask + - image_resolution + - strength + for SD 1.5: + - controlnet_conditioning_scale + - control_guidance_start + - control_guidance_end + + Additional parameters that will be used in img2img: + - image + - image_resolution + - strength + + Additional parameters that will be used in ControlNet for SD 1.5 depending on the task: + - image + - preprocessor_name + - preprocess_resolution + - image_resolution + - controlnet_conditioning_scale + - control_guidance_start + - control_guidance_end + for Canny: + - low_threshold + - high_threshold + for MLSD: + - value_threshold + - distance_threshold + + Additional parameters that will be used in T2I adapter for SDXL depending on the task: + - image + - preprocess_resolution + - image_resolution + - t2i_adapter_preprocessor + - t2i_adapter_conditioning_scale + - t2i_adapter_conditioning_factor + + """ + + if self.task_name != "txt2img" and image == None: + raise ValueError( + "You need to specify the for this task." + ) + if img_height % 8 != 0: + img_height = img_height + (8 - img_height % 8) + logger.warning(f"Height must be divisible by 8, changed to {str(img_height)}") + if img_width % 8 != 0: + img_width = img_width + (8 - img_width % 8) + logger.warning(f"Width must be divisible by 8, changed to {str(img_width)}") + if image_resolution % 8 != 0: + image_resolution = image_resolution + (8 - image_resolution % 8) + logger.warning(f"Image resolution must be divisible by 8, changed to {str(image_resolution)}") + if control_guidance_start >= control_guidance_end: + logger.error( + "Control guidance start (ControlNet Start Threshold) cannot be larger or equal to control guidance end (ControlNet Stop Threshold). The default values 0.0 and 1.0 will be used." + ) + control_guidance_start, control_guidance_end = 0.0, 1.0 + + self.gui_active = gui_active + self.image_previews = image_previews + + if self.pipe == None: + self.load_pipe( + self.base_model_id, + task_name=self.task_name, + vae_model=self.vae_model, + reload=True, + ) + + self.pipe.set_progress_bar_config(leave=leave_progress_bar) + self.pipe.set_progress_bar_config(disable=disable_progress_bar) + + xformers_memory_efficient_attention=False # disabled + if xformers_memory_efficient_attention and torch.cuda.is_available(): + self.pipe.disable_xformers_memory_efficient_attention() + self.pipe.to(self.device) + + # Load style prompt file + if style_json_file != "" and style_json_file != self.style_json_file: + self.load_style_file(style_json_file) + # Set style + if isinstance(style_prompt, str): + style_prompt = [style_prompt] + if style_prompt != [""]: + prompt, negative_prompt = apply_style(style_prompt, prompt, negative_prompt, self.styles_data, self.STYLE_NAMES) + + # LoRA load + if self.lora_memory == [ + lora_A, + lora_B, + lora_C, + lora_D, + lora_E, + ] and self.lora_scale_memory == [ + lora_scale_A, + lora_scale_B, + lora_scale_C, + lora_scale_D, + lora_scale_E, + ]: + for single_lora in self.lora_memory: + if single_lora != None: + logger.info(f"LoRA in memory: {single_lora}") + pass + + else: + logger.debug("_un, re and load_ lora") + self.pipe = self.process_lora( + self.lora_memory[0], self.lora_scale_memory[0], unload=True + ) + self.pipe = self.process_lora( + self.lora_memory[1], self.lora_scale_memory[1], unload=True + ) + self.pipe = self.process_lora( + self.lora_memory[2], self.lora_scale_memory[2], unload=True + ) + self.pipe = self.process_lora( + self.lora_memory[3], self.lora_scale_memory[3], unload=True + ) + self.pipe = self.process_lora( + self.lora_memory[4], self.lora_scale_memory[4], unload=True + ) + + self.pipe = self.process_lora(lora_A, lora_scale_A) + self.pipe = self.process_lora(lora_B, lora_scale_B) + self.pipe = self.process_lora(lora_C, lora_scale_C) + self.pipe = self.process_lora(lora_D, lora_scale_D) + self.pipe = self.process_lora(lora_E, lora_scale_E) + + self.lora_memory = [lora_A, lora_B, lora_C, lora_D, lora_E] + self.lora_scale_memory = [ + lora_scale_A, + lora_scale_B, + lora_scale_C, + lora_scale_D, + lora_scale_E, + ] + + # LCM config + if sampler == "LCM" and self.LCMconfig == None: + if self.class_name == "StableDiffusionPipeline": + adapter_id = "latent-consistency/lcm-lora-sdv1-5" + elif self.class_name == "StableDiffusionXLPipeline": + adapter_id = "latent-consistency/lcm-lora-sdxl" + + self.process_lora(adapter_id, 1.0) + self.LCMconfig = adapter_id + logger.info("LCM") + elif sampler != "LCM" and self.LCMconfig != None: + self.process_lora(self.LCMconfig, 1.0, unload=True) + self.LCMconfig = None + elif self.LCMconfig != None: + logger.info("LCM") + + # FreeU + if FreeU: + logger.info("FreeU active") + if self.class_name == "StableDiffusionPipeline": + # sd + self.pipe.enable_freeu(s1=0.9, s2=0.2, b1=1.2, b2=1.4) + else: + # sdxl + self.pipe.enable_freeu(s1=0.6, s2=0.4, b1=1.1, b2=1.2) + self.FreeU = True + elif self.FreeU: + self.pipe.disable_freeu() + self.FreeU = False + + # Prompt Optimizations + if hasattr(self, "compel") and not retain_compel_previous_load: + del self.compel + + prompt_emb, negative_prompt_emb = self.create_prompt_embeds( + prompt=prompt, + negative_prompt=negative_prompt, + textual_inversion=textual_inversion, + clip_skip=clip_skip, + syntax_weights=syntax_weights, + ) + + if self.class_name != "StableDiffusionPipeline": + # Additional prompt for SDXL + conditioning, pooled = prompt_emb.clone(), negative_prompt_emb.clone() + prompt_emb = negative_prompt_emb = None + + + if torch.cuda.is_available() and xformers_memory_efficient_attention: + if xformers_memory_efficient_attention: + self.pipe.enable_xformers_memory_efficient_attention() + else: + self.pipe.disable_xformers_memory_efficient_attention() + + try: + #self.pipe.scheduler = DPMSolverSinglestepScheduler() # fix default params by random scheduler, not recomn + self.pipe.scheduler = self.get_scheduler(sampler) + except Exception as e: + logger.debug(f"{e}") + logger.warning(f"Error in sampler, please try again") + #self.pipe = None + torch.cuda.empty_cache() + gc.collect() + return + + self.pipe.safety_checker = None + + # Get image Global + if self.task_name != "txt2img": + if isinstance(image, str): + # If the input is a string (file path), open it as an image + image_pil = Image.open(image) + numpy_array = np.array(image_pil, dtype=np.uint8) + elif isinstance(image, Image.Image): + # If the input is already a PIL Image, convert it to a NumPy array + numpy_array = np.array(image, dtype=np.uint8) + elif isinstance(image, np.ndarray): + # If the input is a NumPy array, np.uint8 + numpy_array = image.astype(np.uint8) + else: + if gui_active: + logger.info( + "Not found image" + ) + return + else: + raise ValueError( + "Unsupported image type or not control image found; Bug report to https://github.com/R3gm/stablepy or https://github.com/R3gm/SD_diffusers_interactive" + ) + + # Extract the RGB channels + try: + array_rgb = numpy_array[:, :, :3] + except: + logger.error("Unsupported image type") + raise ValueError( + "Unsupported image type; Bug report to https://github.com/R3gm/stablepy or https://github.com/R3gm/SD_diffusers_interactive" + ) # return + + # Get params preprocess Global SD 1.5 + preprocess_params_config = {} + if self.task_name not in ["txt2img", "inpaint", "img2img"]: + preprocess_params_config["image"] = array_rgb + preprocess_params_config["image_resolution"] = image_resolution + + if self.task_name != "ip2p": + if self.task_name != "shuffle": + preprocess_params_config[ + "preprocess_resolution" + ] = preprocess_resolution + if self.task_name != "mlsd" and self.task_name != "canny": + preprocess_params_config["preprocessor_name"] = preprocessor_name + + # RUN Preprocess SD 1.5 + if self.task_name == "inpaint": + # Get mask for Inpaint + if gui_active or os.path.exists(str(image_mask)): + # Read image mask from gui + mask_control_img = Image.open(image_mask) + numpy_array_mask = np.array(mask_control_img, dtype=np.uint8) + array_rgb_mask = numpy_array_mask[:, :, :3] + elif not gui_active: + # Convert control image to draw + import base64 + import matplotlib.pyplot as plt + name_without_extension = os.path.splitext(image.split("/")[-1])[0] + image64 = base64.b64encode(open(image, "rb").read()) + image64 = image64.decode("utf-8") + img = np.array(plt.imread(f"{image}")[:, :, :3]) + + # Create mask interactive + logger.info(f"Draw the mask on this canvas using the mouse. When you finish, press 'Finish' in the bottom side of the canvas.") + draw( + image64, + filename=f"./{name_without_extension}_draw.png", + w=img.shape[1], + h=img.shape[0], + line_width=0.04 * img.shape[1], + ) + + # Create mask and save + with_mask = np.array( + plt.imread(f"./{name_without_extension}_draw.png")[:, :, :3] + ) + mask = ( + (with_mask[:, :, 0] == 1) + * (with_mask[:, :, 1] == 0) + * (with_mask[:, :, 2] == 0) + ) + plt.imsave(f"./{name_without_extension}_mask.png", mask, cmap="gray") + mask_control = f"./{name_without_extension}_mask.png" + logger.info(f"Mask saved: {mask_control}") + + # Read image mask + mask_control_img = Image.open(mask_control) + numpy_array_mask = np.array(mask_control_img, dtype=np.uint8) + array_rgb_mask = numpy_array_mask[:, :, :3] + else: + raise ValueError("No images found") + + init_image, control_mask, control_image = self.process_inpaint( + image=array_rgb, + image_resolution=image_resolution, + preprocess_resolution=preprocess_resolution, # Not used + image_mask=array_rgb_mask, + ) + + elif self.task_name == "openpose": + logger.info("Openpose") + control_image = self.process_openpose(**preprocess_params_config) + + elif self.task_name == "canny": + logger.info("Canny") + control_image = self.process_canny( + **preprocess_params_config, + low_threshold=low_threshold, + high_threshold=high_threshold, + ) + + elif self.task_name == "mlsd": + logger.info("MLSD") + control_image = self.process_mlsd( + **preprocess_params_config, + value_threshold=value_threshold, + distance_threshold=distance_threshold, + ) + + elif self.task_name == "scribble": + logger.info("Scribble") + control_image = self.process_scribble(**preprocess_params_config) + + elif self.task_name == "softedge": + logger.info("Softedge") + control_image = self.process_softedge(**preprocess_params_config) + + elif self.task_name == "segmentation": + logger.info("Segmentation") + control_image = self.process_segmentation(**preprocess_params_config) + + elif self.task_name == "depth": + logger.info("Depth") + control_image = self.process_depth(**preprocess_params_config) + + elif self.task_name == "normalbae": + logger.info("NormalBae") + control_image = self.process_normal(**preprocess_params_config) + + elif self.task_name == "lineart": + logger.info("Lineart") + control_image = self.process_lineart(**preprocess_params_config) + + elif self.task_name == "shuffle": + logger.info("Shuffle") + control_image = self.process_shuffle(**preprocess_params_config) + + elif self.task_name == "ip2p": + logger.info("Ip2p") + control_image = self.process_ip2p(**preprocess_params_config) + + elif self.task_name == "img2img": + preprocess_params_config["image"] = array_rgb + preprocess_params_config["image_resolution"] = image_resolution + init_image = self.process_img2img(**preprocess_params_config) + + # RUN Preprocess T2I for SDXL + if self.class_name == "StableDiffusionXLPipeline": + # Get params preprocess XL + preprocess_params_config_xl = {} + if self.task_name not in ["txt2img", "inpaint", "img2img"]: + preprocess_params_config_xl["image"] = array_rgb + preprocess_params_config_xl["preprocess_resolution"] = preprocess_resolution + preprocess_params_config_xl["image_resolution"] = image_resolution + # preprocess_params_config_xl["additional_prompt"] = additional_prompt # "" + + if self.task_name == "sdxl_canny": # preprocessor true default + logger.info("SDXL Canny: Preprocessor active by default") + control_image = self.process_canny( + **preprocess_params_config_xl, + low_threshold=low_threshold, + high_threshold=high_threshold, + ) + elif self.task_name == "sdxl_openpose": + logger.info("SDXL Openpose") + control_image = self.process_openpose( + preprocessor_name = "Openpose" if t2i_adapter_preprocessor else "None", + **preprocess_params_config_xl, + ) + elif self.task_name == "sdxl_sketch": + logger.info("SDXL Scribble") + control_image = self.process_scribble( + preprocessor_name = "PidiNet" if t2i_adapter_preprocessor else "None", + **preprocess_params_config_xl, + ) + elif self.task_name == "sdxl_depth-midas": + logger.info("SDXL Depth") + control_image = self.process_depth( + preprocessor_name = "Midas" if t2i_adapter_preprocessor else "None", + **preprocess_params_config_xl, + ) + elif self.task_name == "sdxl_lineart": + logger.info("SDXL Lineart") + control_image = self.process_lineart( + preprocessor_name = "Lineart" if t2i_adapter_preprocessor else "None", + **preprocess_params_config_xl, + ) + + # Get general params for TASK + if self.class_name == "StableDiffusionPipeline": + # Base params pipe sd + pipe_params_config = { + "prompt": None, # prompt, + "negative_prompt": None, # negative_prompt, + "prompt_embeds": prompt_emb, + "negative_prompt_embeds": negative_prompt_emb, + "num_images": num_images, + "num_steps": num_steps, + "guidance_scale": guidance_scale, + "clip_skip": None, # clip_skip, because we use clip skip of compel + } + else: + # Base params pipe sdxl + pipe_params_config = { + "prompt" : None, + "negative_prompt" : None, + "num_inference_steps" : num_steps, + "guidance_scale" : guidance_scale, + "clip_skip" : None, + "num_images_per_prompt" : num_images, + } + + # New params + if self.class_name == "StableDiffusionXLPipeline": + # pipe sdxl + if self.task_name == "txt2img": + pipe_params_config["height"] = img_height + pipe_params_config["width"] = img_width + elif self.task_name == "inpaint": + pipe_params_config["strength"] = strength + pipe_params_config["image"] = init_image + pipe_params_config["mask_image"] = control_mask + logger.info(f"Image resolution: {str(init_image.size)}") + elif self.task_name not in ["txt2img", "inpaint", "img2img"]: + pipe_params_config["image"] = control_image + pipe_params_config["adapter_conditioning_scale"] = t2i_adapter_conditioning_scale + pipe_params_config["adapter_conditioning_factor"] = t2i_adapter_conditioning_factor + logger.info(f"Image resolution: {str(control_image.size)}") + elif self.task_name == "img2img": + pipe_params_config["strength"] = strength + pipe_params_config["image"] = init_image + logger.info(f"Image resolution: {str(init_image.size)}") + elif self.task_name == "txt2img": + pipe_params_config["height"] = img_height + pipe_params_config["width"] = img_width + elif self.task_name == "inpaint": + pipe_params_config["strength"] = strength + pipe_params_config["init_image"] = init_image + pipe_params_config["control_mask"] = control_mask + pipe_params_config["control_image"] = control_image + pipe_params_config[ + "controlnet_conditioning_scale" + ] = controlnet_conditioning_scale + pipe_params_config["control_guidance_start"] = control_guidance_start + pipe_params_config["control_guidance_end"] = control_guidance_end + logger.info(f"Image resolution: {str(init_image.size)}") + elif self.task_name not in ["txt2img", "inpaint", "img2img"]: + pipe_params_config["control_image"] = control_image + pipe_params_config[ + "controlnet_conditioning_scale" + ] = controlnet_conditioning_scale + pipe_params_config["control_guidance_start"] = control_guidance_start + pipe_params_config["control_guidance_end"] = control_guidance_end + logger.info(f"Image resolution: {str(control_image.size)}") + elif self.task_name == "img2img": + pipe_params_config["strength"] = strength + pipe_params_config["init_image"] = init_image + logger.info(f"Image resolution: {str(init_image.size)}") + + # detailfix params and pipe global + if adetailer_A or adetailer_B: + + # global params detailfix + default_params_detailfix = { + "face_detector_ad" : True, + "person_detector_ad" : True, + "hand_detector_ad" : False, + "prompt": "", + "negative_prompt" : "", + "strength" : 0.35, + "mask_dilation" : 4, + "mask_blur" : 4, + "mask_padding" : 32, + #"sampler" : "Use same sampler", + #"inpaint_only" : True, + } + + # Pipe detailfix_pipe + if not hasattr(self, "detailfix_pipe") or not retain_detailfix_model_previous_load: + if adetailer_A_params.get("inpaint_only", False) == True or adetailer_B_params.get("inpaint_only", False) == True: + detailfix_pipe = custom_task_model_loader( + pipe=self.pipe, + model_category="detailfix", + task_name=self.task_name, + torch_dtype=self.type_model_precision + ) + else: + detailfix_pipe = custom_task_model_loader( + pipe=self.pipe, + model_category="detailfix_img2img", + task_name=self.task_name, + torch_dtype=self.type_model_precision + ) + if hasattr(self, "detailfix_pipe"): + del self.detailfix_pipe + if retain_detailfix_model_previous_load: + if hasattr(self, "detailfix_pipe"): + detailfix_pipe = self.detailfix_pipe + else: + self.detailfix_pipe = detailfix_pipe + adetailer_A_params.pop("inpaint_only", None) + adetailer_B_params.pop("inpaint_only", None) + + # Define base scheduler detailfix + detailfix_pipe.default_scheduler = copy.deepcopy(self.default_scheduler) + if adetailer_A_params.get("sampler", "Use same sampler") != "Use same sampler": + logger.debug("detailfix_pipe will use the sampler from adetailer_A") + detailfix_pipe.scheduler = self.get_scheduler(adetailer_A_params["sampler"]) + adetailer_A_params.pop("sampler", None) + if adetailer_B_params.get("sampler", "Use same sampler") != "Use same sampler": + logger.debug("detailfix_pipe will use the sampler from adetailer_B") + detailfix_pipe.scheduler = self.get_scheduler(adetailer_A_params["sampler"]) + adetailer_B_params.pop("sampler", None) + + detailfix_pipe.set_progress_bar_config(leave=leave_progress_bar) + detailfix_pipe.set_progress_bar_config(disable=disable_progress_bar) + detailfix_pipe.to(self.device) + torch.cuda.empty_cache() + gc.collect() + + if adetailer_A: + for key_param, default_value in default_params_detailfix.items(): + if key_param not in adetailer_A_params: + adetailer_A_params[key_param] = default_value + elif type(default_value) != type(adetailer_A_params[key_param]): + logger.warning(f"DetailFix A: Error type param, set default {str(key_param)}") + adetailer_A_params[key_param] = default_value + + detailfix_params_A = { + "prompt": adetailer_A_params["prompt"], + "negative_prompt" : adetailer_A_params["negative_prompt"], + "strength" : adetailer_A_params["strength"], + "num_inference_steps" : num_steps, + "guidance_scale" : guidance_scale, + } + + # clear params yolo + adetailer_A_params.pop('strength', None) + adetailer_A_params.pop('prompt', None) + adetailer_A_params.pop('negative_prompt', None) + + # Verify prompt detailfix_params_A and get valid + prompt_empty_detailfix_A, negative_prompt_empty_detailfix_A, prompt_df_A, negative_prompt_df_A = process_prompts_valid( + detailfix_params_A["prompt"], detailfix_params_A["negative_prompt"], prompt, negative_prompt + ) + + # Params detailfix + if self.class_name == "StableDiffusionPipeline": + # SD detailfix + # detailfix_params_A["controlnet_conditioning_scale"] = controlnet_conditioning_scale + # detailfix_params_A["control_guidance_start"] = control_guidance_start + # detailfix_params_A["control_guidance_end"] = control_guidance_end + + if prompt_empty_detailfix_A and negative_prompt_empty_detailfix_A: + detailfix_params_A["prompt_embeds"] = prompt_emb + detailfix_params_A["negative_prompt_embeds"] = negative_prompt_emb + else: + prompt_emb_ad, negative_prompt_emb_ad = self.create_prompt_embeds( + prompt=prompt_df_A, + negative_prompt=negative_prompt_df_A, + textual_inversion=textual_inversion, + clip_skip=clip_skip, + syntax_weights=syntax_weights, + ) + detailfix_params_A["prompt_embeds"] = prompt_emb_ad + detailfix_params_A["negative_prompt_embeds"] = negative_prompt_emb_ad + + detailfix_params_A["prompt"] = None + detailfix_params_A["negative_prompt"] = None + + else: + # SDXL detailfix + if prompt_empty_detailfix_A and negative_prompt_empty_detailfix_A: + conditioning_detailfix_A, pooled_detailfix_A = conditioning, pooled + else: + conditioning_detailfix_A, pooled_detailfix_A = self.create_prompt_embeds( + prompt=prompt_df_A, + negative_prompt=negative_prompt_df_A, + textual_inversion=textual_inversion, + clip_skip=clip_skip, + syntax_weights=syntax_weights, + ) + + detailfix_params_A.pop('prompt', None) + detailfix_params_A.pop('negative_prompt', None) + detailfix_params_A["prompt_embeds"] = conditioning_detailfix_A[0:1] + detailfix_params_A["pooled_prompt_embeds"] = pooled_detailfix_A[0:1] + detailfix_params_A["negative_prompt_embeds"] = conditioning_detailfix_A[1:2] + detailfix_params_A["negative_pooled_prompt_embeds"] = pooled_detailfix_A[1:2] + + logger.debug(f"detailfix A prompt empty {prompt_empty_detailfix_A, negative_prompt_empty_detailfix_A}") + if not prompt_empty_detailfix_A or not negative_prompt_empty_detailfix_A: + logger.debug(f"Prompts detailfix A {prompt_df_A, negative_prompt_df_A}") + logger.debug(f"Pipe params detailfix A \n{detailfix_params_A}") + logger.debug(f"Params detailfix A \n{adetailer_A_params}") + + if adetailer_B: + for key_param, default_value in default_params_detailfix.items(): + if key_param not in adetailer_B_params: + adetailer_B_params[key_param] = default_value + elif type(default_value) != type(adetailer_B_params[key_param]): + logger.warning(f"DetailfFix B: Error type param, set default {str(key_param)}") + adetailer_B_params[key_param] = default_value + + detailfix_params_B = { + "prompt": adetailer_B_params["prompt"], + "negative_prompt" : adetailer_B_params["negative_prompt"], + "strength" : adetailer_B_params["strength"], + "num_inference_steps" : num_steps, + "guidance_scale" : guidance_scale, + } + + # clear params yolo + adetailer_B_params.pop('strength', None) + adetailer_B_params.pop('prompt', None) + adetailer_B_params.pop('negative_prompt', None) + + # Verify prompt detailfix_params_B and get valid + prompt_empty_detailfix_B, negative_prompt_empty_detailfix_B, prompt_df_B, negative_prompt_df_B = process_prompts_valid( + detailfix_params_B["prompt"], detailfix_params_B["negative_prompt"], prompt, negative_prompt + ) + + # Params detailfix + if self.class_name == "StableDiffusionPipeline": + # SD detailfix + # detailfix_params_B["controlnet_conditioning_scale"] = controlnet_conditioning_scale + # detailfix_params_B["control_guidance_start"] = control_guidance_start + # detailfix_params_B["control_guidance_end"] = control_guidance_end + + if prompt_empty_detailfix_B and negative_prompt_empty_detailfix_B: + detailfix_params_B["prompt_embeds"] = prompt_emb + detailfix_params_B["negative_prompt_embeds"] = negative_prompt_emb + else: + prompt_emb_ad_b, negative_prompt_emb_ad_b = self.create_prompt_embeds( + prompt=prompt_df_B, + negative_prompt=negative_prompt_df_B, + textual_inversion=textual_inversion, + clip_skip=clip_skip, + syntax_weights=syntax_weights, + ) + detailfix_params_B["prompt_embeds"] = prompt_emb_ad_b + detailfix_params_B["negative_prompt_embeds"] = negative_prompt_emb_ad_b + detailfix_params_B["prompt"] = None + detailfix_params_B["negative_prompt"] = None + else: + # SDXL detailfix + if prompt_empty_detailfix_B and negative_prompt_empty_detailfix_B: + conditioning_detailfix_B, pooled_detailfix_B = conditioning, pooled + else: + conditioning_detailfix_B, pooled_detailfix_B = self.create_prompt_embeds( + prompt=prompt_df_B, + negative_prompt=negative_prompt_df_B, + textual_inversion=textual_inversion, + clip_skip=clip_skip, + syntax_weights=syntax_weights, + ) + detailfix_params_B.pop('prompt', None) + detailfix_params_B.pop('negative_prompt', None) + detailfix_params_B["prompt_embeds"] = conditioning_detailfix_B[0:1] + detailfix_params_B["pooled_prompt_embeds"] = pooled_detailfix_B[0:1] + detailfix_params_B["negative_prompt_embeds"] = conditioning_detailfix_B[1:2] + detailfix_params_B["negative_pooled_prompt_embeds"] = pooled_detailfix_B[1:2] + + logger.debug(f"detailfix B prompt empty {prompt_empty_detailfix_B, negative_prompt_empty_detailfix_B}") + if not prompt_empty_detailfix_B or not negative_prompt_empty_detailfix_B: + logger.debug(f"Prompts detailfix B {prompt_df_B, negative_prompt_df_B}") + logger.debug(f"Pipe params detailfix B \n{detailfix_params_B}") + logger.debug(f"Params detailfix B \n{adetailer_B_params}") + + if hires_steps > 1 and upscaler_model_path != None: + # Hires params BASE + hires_params_config = { + "prompt" : None, + "negative_prompt" : None, + "num_inference_steps" : hires_steps, + "guidance_scale" : guidance_scale, + "clip_skip" : None, + "strength" : hires_denoising_strength, + } + if self.class_name == "StableDiffusionPipeline": + hires_params_config["eta"] = 1.0 + + # Verify prompt hires and get valid + hires_prompt_empty, hires_negative_prompt_empty, prompt_hires_valid, negative_prompt_hires_valid = process_prompts_valid( + hires_prompt, hires_negative_prompt, prompt, negative_prompt + ) + + # Hires embed params + if self.class_name == "StableDiffusionPipeline": + if hires_prompt_empty and hires_negative_prompt_empty: + hires_params_config["prompt_embeds"] = prompt_emb + hires_params_config["negative_prompt_embeds"] = negative_prompt_emb + else: + prompt_emb_hires, negative_prompt_emb_hires = self.create_prompt_embeds( + prompt=prompt_hires_valid, + negative_prompt=negative_prompt_hires_valid, + textual_inversion=textual_inversion, + clip_skip=clip_skip, + syntax_weights=syntax_weights, + ) + + hires_params_config["prompt_embeds"] = prompt_emb_hires + hires_params_config["negative_prompt_embeds"] = negative_prompt_emb_hires + else: + if hires_prompt_empty and hires_negative_prompt_empty: + hires_conditioning, hires_pooled = conditioning, pooled + else: + hires_conditioning, hires_pooled = self.create_prompt_embeds( + prompt=prompt_hires_valid, + negative_prompt=negative_prompt_hires_valid, + textual_inversion=textual_inversion, + clip_skip=clip_skip, + syntax_weights=syntax_weights, + ) + + hires_params_config.pop('prompt', None) + hires_params_config.pop('negative_prompt', None) + hires_params_config["prompt_embeds"] = hires_conditioning[0:1] + hires_params_config["pooled_prompt_embeds"] = hires_pooled[0:1] + hires_params_config["negative_prompt_embeds"] = hires_conditioning[1:2] + hires_params_config["negative_pooled_prompt_embeds"] = hires_pooled[1:2] + + # Hires pipe + if not hasattr(self, "hires_pipe") or not retain_hires_model_previous_load: + hires_pipe = custom_task_model_loader( + pipe=self.pipe, + model_category="hires", + task_name=self.task_name, + torch_dtype=self.type_model_precision + ) + if hasattr(self, "hires_pipe"): + del self.hires_pipe + if retain_hires_model_previous_load: + if hasattr(self, "hires_pipe"): + hires_pipe = self.hires_pipe + else: + self.hires_pipe = hires_pipe + + # Hires scheduler + if hires_sampler != "Use same sampler": + logger.debug("New hires sampler") + hires_pipe.scheduler = self.get_scheduler(hires_sampler) + + hires_pipe.set_progress_bar_config(leave=leave_progress_bar) + hires_pipe.set_progress_bar_config(disable=disable_progress_bar) + hires_pipe.to(self.device) + torch.cuda.empty_cache() + gc.collect() + else: + hires_params_config = {} + hires_pipe = None + + # Debug info + try: + logger.debug(f"INFO PIPE: {self.pipe.__class__.__name__}") + logger.debug(f"text_encoder_type: {self.pipe.text_encoder.dtype}") + logger.debug(f"unet_type: {self.pipe.unet.dtype}") + logger.debug(f"vae_type: {self.pipe.vae.dtype}") + logger.debug(f"pipe_type: {self.pipe.dtype}") + logger.debug(f"scheduler_main_pipe: {self.pipe.scheduler}") + if adetailer_A or adetailer_B: + logger.debug(f"scheduler_detailfix: {detailfix_pipe.scheduler}") + if hires_steps > 1 and upscaler_model_path != None: + logger.debug(f"scheduler_hires: {hires_pipe.scheduler}") + except Exception as e: + logger.debug(f"{str(e)}") + + # === RUN PIPE === # + for i in range(loop_generation): + + # number seed + if seed == -1: + seeds = [random.randint(0, 2147483647) for _ in range(num_images)] + else: + if num_images == 1: + seeds = [seed] + else: + seeds = [seed] + [random.randint(0, 2147483647) for _ in range(num_images-1)] + + # generators + generators = [] # List to store all the generators + for calculate_seed in seeds: + if generator_in_cpu or self.device.type == "cpu": + generator = torch.Generator().manual_seed(calculate_seed) + else: + try: + generator = torch.Generator("cuda").manual_seed(calculate_seed) + except: + logger.warning("Generator in CPU") + generator = torch.Generator().manual_seed(calculate_seed) + + generators.append(generator) + + # fix img2img bug need concat tensor prompts with generator same number (only in batch inference) + pipe_params_config["generator"] = generators if self.task_name != "img2img" else generators[0] # no list + seeds = seeds if self.task_name != "img2img" else [seeds[0]] * num_images + + try: + if self.class_name == "StableDiffusionXLPipeline": + # sdxl pipe + images = self.pipe( + prompt_embeds=conditioning[0:1], + pooled_prompt_embeds=pooled[0:1], + negative_prompt_embeds=conditioning[1:2], + negative_pooled_prompt_embeds=pooled[1:2], + #generator=pipe_params_config["generator"], + **pipe_params_config, + ).images + if self.task_name not in ["txt2img", "inpaint", "img2img"]: + images = [control_image] + images + elif self.task_name == "txt2img": + images = self.run_pipe_SD(**pipe_params_config) + elif self.task_name == "inpaint": + images = self.run_pipe_inpaint(**pipe_params_config) + elif self.task_name not in ["txt2img", "inpaint", "img2img"]: + results = self.run_pipe( + **pipe_params_config + ) ## pipe ControlNet add condition_weights + images = [control_image] + results + del results + elif self.task_name == "img2img": + images = self.run_pipe_img2img(**pipe_params_config) + except Exception as e: + e = str(e) + if "Tensor with 2 elements cannot be converted to Scalar" in e: + logger.debug(e) + logger.error("Error in sampler; trying with DDIM sampler") + self.pipe.scheduler = self.default_scheduler + self.pipe.scheduler = DDIMScheduler.from_config(self.pipe.scheduler.config) + if self.class_name == "StableDiffusionXLPipeline": + # sdxl pipe + images = self.pipe( + prompt_embeds=conditioning[0:1], + pooled_prompt_embeds=pooled[0:1], + negative_prompt_embeds=conditioning[1:2], + negative_pooled_prompt_embeds=pooled[1:2], + #generator=pipe_params_config["generator"], + **pipe_params_config, + ).images + if self.task_name not in ["txt2img", "inpaint", "img2img"]: + images = [control_image] + images + elif self.task_name == "txt2img": + images = self.run_pipe_SD(**pipe_params_config) + elif self.task_name == "inpaint": + images = self.run_pipe_inpaint(**pipe_params_config) + elif self.task_name not in ["txt2img", "inpaint", "img2img"]: + results = self.run_pipe( + **pipe_params_config + ) ## pipe ControlNet add condition_weights + images = [control_image] + results + del results + elif self.task_name == "img2img": + images = self.run_pipe_img2img(**pipe_params_config) + elif "The size of tensor a (0) must match the size of tensor b (3) at non-singleton" in e: + raise ValueError(f"steps / strength too low for the model to produce a satisfactory response") + else: + raise ValueError(e) + + torch.cuda.empty_cache() + gc.collect() + + if hires_before_adetailer and upscaler_model_path != None: + logger.debug(f"Hires before; same seed for each image (no batch)") + images = process_images_high_resolution( + images, + upscaler_model_path, + upscaler_increases_size, + esrgan_tile, esrgan_tile_overlap, + hires_steps, hires_params_config, + self.task_name, + generators[0], #pipe_params_config["generator"][0], # no generator + hires_pipe, + ) + + # Adetailer stuff + if adetailer_A or adetailer_B: + # image_pil_list = [] + # for img_single in images: + # image_ad = img_single.convert("RGB") + # image_pil_list.append(image_ad) + if self.task_name not in ["txt2img", "inpaint", "img2img"]: + images = images[1:] + + if adetailer_A: + images = ad_model_process( + pipe_params_df=detailfix_params_A, + detailfix_pipe=detailfix_pipe, + image_list_task=images, + **adetailer_A_params, + ) + if adetailer_B: + images = ad_model_process( + pipe_params_df=detailfix_params_B, + detailfix_pipe=detailfix_pipe, + image_list_task=images, + **adetailer_B_params, + ) + + if self.task_name not in ["txt2img", "inpaint", "img2img"]: + images = [control_image] + images + # del detailfix_pipe + torch.cuda.empty_cache() + gc.collect() + + if hires_after_adetailer and upscaler_model_path != None: + logger.debug(f"Hires after; same seed for each image (no batch)") + images = process_images_high_resolution( + images, + upscaler_model_path, + upscaler_increases_size, + esrgan_tile, esrgan_tile_overlap, + hires_steps, hires_params_config, + self.task_name, + generators[0], #pipe_params_config["generator"][0], # no generator + hires_pipe, + ) + + logger.info(f"Seeds: {seeds}") + + # Show images if loop + if display_images: + mediapy.show_images(images) + # logger.info(image_list) + # del images + if loop_generation > 1: + time.sleep(0.5) + + # List images and save + image_list = [] + metadata = [ + prompt, + negative_prompt, + self.base_model_id, + self.vae_model, + num_steps, + guidance_scale, + sampler, + 0000000000, #calculate_seed, + img_width, + img_height, + clip_skip, + ] + + valid_seeds = [0] + seeds if self.task_name not in ["txt2img", "inpaint", "img2img"] else seeds + for image_, seed_ in zip(images, valid_seeds): + image_path = "not saved in storage" + if save_generated_images: + metadata[7] = seed_ + image_path = save_pil_image_with_metadata(image_, image_storage_location, metadata) + image_list.append(image_path) + + torch.cuda.empty_cache() + gc.collect() + + if image_list[0] != "not saved in storage": + logger.info(image_list) + + if hasattr(self, "compel") and not retain_compel_previous_load: + del self.compel + torch.cuda.empty_cache() + gc.collect() + + return images, image_list