diff --git "a/stablepy_model.py" "b/stablepy_model.py" deleted file mode 100644--- "a/stablepy_model.py" +++ /dev/null @@ -1,2593 +0,0 @@ -import gc, time -import numpy as np -import PIL.Image -from diffusers import ( - ControlNetModel, - DiffusionPipeline, - StableDiffusionControlNetPipeline, - StableDiffusionControlNetInpaintPipeline, - StableDiffusionPipeline, - AutoencoderKL, - StableDiffusionXLInpaintPipeline, - StableDiffusionXLAdapterPipeline, - T2IAdapter, - StableDiffusionXLPipeline, - AutoPipelineForImage2Image -) -from huggingface_hub import hf_hub_download -import torch, random, json -from controlnet_aux import ( - CannyDetector, - ContentShuffleDetector, - HEDdetector, - LineartAnimeDetector, - LineartDetector, - MidasDetector, - MLSDdetector, - NormalBaeDetector, - OpenposeDetector, - PidiNetDetector, -) -from transformers import pipeline -from controlnet_aux.util import HWC3, ade_palette -from transformers import AutoImageProcessor, UperNetForSemanticSegmentation -import cv2 -from diffusers import ( - DPMSolverMultistepScheduler, - DPMSolverSinglestepScheduler, - KDPM2DiscreteScheduler, - EulerDiscreteScheduler, - EulerAncestralDiscreteScheduler, - HeunDiscreteScheduler, - LMSDiscreteScheduler, - DDIMScheduler, - DEISMultistepScheduler, - UniPCMultistepScheduler, - LCMScheduler, - PNDMScheduler, - KDPM2AncestralDiscreteScheduler, - EDMDPMSolverMultistepScheduler, - EDMEulerScheduler, -) -from .prompt_weights import get_embed_new, add_comma_after_pattern_ti -from .utils import save_pil_image_with_metadata -from .lora_loader import lora_mix_load -from .inpainting_canvas import draw, make_inpaint_condition -from .adetailer import ad_model_process -from ..upscalers.esrgan import UpscalerESRGAN, UpscalerLanczos, UpscalerNearest -from ..logging.logging_setup import logger -from .extra_model_loaders import custom_task_model_loader -from .high_resolution import process_images_high_resolution -from .style_prompt_config import styles_data, STYLE_NAMES, get_json_content, apply_style -import os -from compel import Compel, ReturnedEmbeddingsType -import ipywidgets as widgets, mediapy -from IPython.display import display -from PIL import Image -from typing import Union, Optional, List, Tuple, Dict, Any, Callable -import logging, diffusers, copy, warnings -logging.getLogger("diffusers").setLevel(logging.ERROR) -#logging.getLogger("transformers").setLevel(logging.ERROR) -diffusers.utils.logging.set_verbosity(40) -warnings.filterwarnings(action="ignore", category=FutureWarning, module="diffusers") -warnings.filterwarnings(action="ignore", category=FutureWarning, module="transformers") - -# ===================================== -# Utils preprocessor -# ===================================== -def resize_image(input_image, resolution, interpolation=None): - H, W, C = input_image.shape - H = float(H) - W = float(W) - k = float(resolution) / max(H, W) - H *= k - W *= k - H = int(np.round(H / 64.0)) * 64 - W = int(np.round(W / 64.0)) * 64 - if interpolation is None: - interpolation = cv2.INTER_LANCZOS4 if k > 1 else cv2.INTER_AREA - img = cv2.resize(input_image, (W, H), interpolation=interpolation) - return img - - -class DepthEstimator: - def __init__(self): - self.model = pipeline("depth-estimation") - - def __call__(self, image: np.ndarray, **kwargs) -> PIL.Image.Image: - detect_resolution = kwargs.pop("detect_resolution", 512) - image_resolution = kwargs.pop("image_resolution", 512) - image = np.array(image) - image = HWC3(image) - image = resize_image(image, resolution=detect_resolution) - image = PIL.Image.fromarray(image) - image = self.model(image) - image = image["depth"] - image = np.array(image) - image = HWC3(image) - image = resize_image(image, resolution=image_resolution) - return PIL.Image.fromarray(image) - - -class ImageSegmentor: - def __init__(self): - self.image_processor = AutoImageProcessor.from_pretrained( - "openmmlab/upernet-convnext-small" - ) - self.image_segmentor = UperNetForSemanticSegmentation.from_pretrained( - "openmmlab/upernet-convnext-small" - ) - - @torch.inference_mode() - def __call__(self, image: np.ndarray, **kwargs) -> PIL.Image.Image: - detect_resolution = kwargs.pop("detect_resolution", 512) - image_resolution = kwargs.pop("image_resolution", 512) - image = HWC3(image) - image = resize_image(image, resolution=detect_resolution) - image = PIL.Image.fromarray(image) - - pixel_values = self.image_processor(image, return_tensors="pt").pixel_values - outputs = self.image_segmentor(pixel_values) - seg = self.image_processor.post_process_semantic_segmentation( - outputs, target_sizes=[image.size[::-1]] - )[0] - color_seg = np.zeros((seg.shape[0], seg.shape[1], 3), dtype=np.uint8) - for label, color in enumerate(ade_palette()): - color_seg[seg == label, :] = color - color_seg = color_seg.astype(np.uint8) - - color_seg = resize_image( - color_seg, resolution=image_resolution, interpolation=cv2.INTER_NEAREST - ) - return PIL.Image.fromarray(color_seg) - - -class Preprocessor: - MODEL_ID = "lllyasviel/Annotators" - - def __init__(self): - self.model = None - self.name = "" - - def load(self, name: str) -> None: - if name == self.name: - return - if name == "HED": - self.model = HEDdetector.from_pretrained(self.MODEL_ID) - elif name == "Midas": - self.model = MidasDetector.from_pretrained(self.MODEL_ID) - elif name == "MLSD": - self.model = MLSDdetector.from_pretrained(self.MODEL_ID) - elif name == "Openpose": - self.model = OpenposeDetector.from_pretrained(self.MODEL_ID) - elif name == "PidiNet": - self.model = PidiNetDetector.from_pretrained(self.MODEL_ID) - elif name == "NormalBae": - self.model = NormalBaeDetector.from_pretrained(self.MODEL_ID) - elif name == "Lineart": - self.model = LineartDetector.from_pretrained(self.MODEL_ID) - elif name == "LineartAnime": - self.model = LineartAnimeDetector.from_pretrained(self.MODEL_ID) - elif name == "Canny": - self.model = CannyDetector() - elif name == "ContentShuffle": - self.model = ContentShuffleDetector() - elif name == "DPT": - self.model = DepthEstimator() - elif name == "UPerNet": - self.model = ImageSegmentor() - else: - raise ValueError - torch.cuda.empty_cache() - gc.collect() - self.name = name - - def __call__(self, image: PIL.Image.Image, **kwargs) -> PIL.Image.Image: - if self.name == "Canny": - if "detect_resolution" in kwargs: - detect_resolution = kwargs.pop("detect_resolution") - image = np.array(image) - image = HWC3(image) - image = resize_image(image, resolution=detect_resolution) - image = self.model(image, **kwargs) - return PIL.Image.fromarray(image) - elif self.name == "Midas": - detect_resolution = kwargs.pop("detect_resolution", 512) - image_resolution = kwargs.pop("image_resolution", 512) - image = np.array(image) - image = HWC3(image) - image = resize_image(image, resolution=detect_resolution) - image = self.model(image, **kwargs) - image = HWC3(image) - image = resize_image(image, resolution=image_resolution) - return PIL.Image.fromarray(image) - else: - return self.model(image, **kwargs) - - -# ===================================== -# Base Model -# ===================================== - -CONTROLNET_MODEL_IDS = { - "openpose": "lllyasviel/control_v11p_sd15_openpose", - "canny": "lllyasviel/control_v11p_sd15_canny", - "mlsd": "lllyasviel/control_v11p_sd15_mlsd", - "scribble": "lllyasviel/control_v11p_sd15_scribble", - "softedge": "lllyasviel/control_v11p_sd15_softedge", - "segmentation": "lllyasviel/control_v11p_sd15_seg", - "depth": "lllyasviel/control_v11f1p_sd15_depth", - "normalbae": "lllyasviel/control_v11p_sd15_normalbae", - "lineart": "lllyasviel/control_v11p_sd15_lineart", - "lineart_anime": "lllyasviel/control_v11p_sd15s2_lineart_anime", - "shuffle": "lllyasviel/control_v11e_sd15_shuffle", - "ip2p": "lllyasviel/control_v11e_sd15_ip2p", - "inpaint": "lllyasviel/control_v11p_sd15_inpaint", - "txt2img": "Nothinghere", - "sdxl_canny": "TencentARC/t2i-adapter-canny-sdxl-1.0", - "sdxl_sketch": "TencentARC/t2i-adapter-sketch-sdxl-1.0", - "sdxl_lineart": "TencentARC/t2i-adapter-lineart-sdxl-1.0", - "sdxl_depth-midas": "TencentARC/t2i-adapter-depth-midas-sdxl-1.0", - "sdxl_openpose": "TencentARC/t2i-adapter-openpose-sdxl-1.0", - #"sdxl_depth-zoe": "TencentARC/t2i-adapter-depth-zoe-sdxl-1.0", - #"sdxl_recolor": "TencentARC/t2i-adapter-recolor-sdxl-1.0", - "img2img": "Nothinghere", -} - - -# def download_all_controlnet_weights() -> None: -# for model_id in CONTROLNET_MODEL_IDS.values(): -# ControlNetModel.from_pretrained(model_id) - - -SCHEDULER_CONFIG_MAP = { - "DPM++ 2M": (DPMSolverMultistepScheduler, {}), - "DPM++ 2M Karras": (DPMSolverMultistepScheduler, {"use_karras_sigmas": True}), - "DPM++ 2M SDE": (DPMSolverMultistepScheduler, {"algorithm_type": "sde-dpmsolver++"}), - "DPM++ 2M SDE Karras": (DPMSolverMultistepScheduler, {"use_karras_sigmas": True, "algorithm_type": "sde-dpmsolver++"}), - "DPM++ SDE": (DPMSolverSinglestepScheduler, {}), - "DPM++ SDE Karras": (DPMSolverSinglestepScheduler, {"use_karras_sigmas": True}), - "DPM2": (KDPM2DiscreteScheduler, {}), - "DPM2 Karras": (KDPM2DiscreteScheduler, {"use_karras_sigmas": True}), - "DPM2 a" : (KDPM2AncestralDiscreteScheduler, {}), - "DPM2 a Karras" : (KDPM2AncestralDiscreteScheduler, {"use_karras_sigmas": True}), - "Euler": (EulerDiscreteScheduler, {}), - "Euler a": (EulerAncestralDiscreteScheduler, {}), - "Heun": (HeunDiscreteScheduler, {}), - "LMS": (LMSDiscreteScheduler, {}), - "LMS Karras": (LMSDiscreteScheduler, {"use_karras_sigmas": True}), - "DDIM": (DDIMScheduler, {}), - "DEIS": (DEISMultistepScheduler, {}), - "UniPC": (UniPCMultistepScheduler, {}), - "PNDM" : (PNDMScheduler, {}), - - "DPM++ 2M Lu": (DPMSolverMultistepScheduler, {"use_lu_lambdas": True}), - "DPM++ 2M Ef": (DPMSolverMultistepScheduler, {"euler_at_final": True}), - "DPM++ 2M SDE Lu": (DPMSolverMultistepScheduler, {"use_lu_lambdas": True, "algorithm_type": "sde-dpmsolver++"}), - "DPM++ 2M SDE Ef": (DPMSolverMultistepScheduler, {"algorithm_type": "sde-dpmsolver++", "euler_at_final": True}), - - "EDMDPM": (EDMDPMSolverMultistepScheduler, {}), - "EDMEuler": (EDMEulerScheduler, {}), - - "LCM" : (LCMScheduler, {}), -} - -scheduler_names = list(SCHEDULER_CONFIG_MAP.keys()) - -def process_prompts_valid(specific_prompt, specific_negative_prompt, prompt, negative_prompt): - specific_prompt_empty = (specific_prompt in [None, ""]) - specific_negative_prompt_empty = (specific_negative_prompt in [None, ""]) - - prompt_valid = prompt if specific_prompt_empty else specific_prompt - negative_prompt_valid = negative_prompt if specific_negative_prompt_empty else specific_negative_prompt - - return specific_prompt_empty, specific_negative_prompt_empty, prompt_valid, negative_prompt_valid - -class Model_Diffusers: - def __init__( - self, - base_model_id: str = "runwayml/stable-diffusion-v1-5", - task_name: str = "txt2img", - vae_model=None, - type_model_precision=torch.float16, - sdxl_safetensors = False, - ): - self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") - self.base_model_id = "" - self.task_name = "" - self.vae_model = None - self.type_model_precision = ( - type_model_precision if torch.cuda.is_available() else torch.float32 - ) # For SD 1.5 - - self.load_pipe( - base_model_id, task_name, vae_model, type_model_precision, sdxl_safetensors = sdxl_safetensors - ) - self.preprocessor = Preprocessor() - - self.styles_data = styles_data - self.STYLE_NAMES = STYLE_NAMES - self.style_json_file = "" - - - def load_pipe( - self, - base_model_id: str, - task_name="txt2img", - vae_model=None, - type_model_precision=torch.float16, - reload=False, - sdxl_safetensors = False, - retain_model_in_memory = True, - ) -> DiffusionPipeline: - if ( - base_model_id == self.base_model_id - and task_name == self.task_name - and hasattr(self, "pipe") - and self.vae_model == vae_model - and self.pipe is not None - and reload == False - ): - if self.type_model_precision == type_model_precision or self.device.type == "cpu": - return - - if hasattr(self, "pipe") and os.path.isfile(base_model_id): - unload_model = False - if self.pipe == None: - unload_model = True - elif type_model_precision != self.type_model_precision and self.device.type != "cpu": - unload_model = True - else: - if hasattr(self, "pipe"): - unload_model = False - if self.pipe == None: - unload_model = True - else: - unload_model = True - self.type_model_precision = ( - type_model_precision if torch.cuda.is_available() else torch.float32 - ) - - if self.type_model_precision == torch.float32 and os.path.isfile(base_model_id): - logger.info(f"Working with full precision {str(self.type_model_precision)}") - - # Load model - if self.base_model_id == base_model_id and self.pipe is not None and reload == False and self.vae_model == vae_model and unload_model == False: - #logger.info("Previous loaded base model") # not return - class_name = self.class_name - else: - # Unload previous model and stuffs - self.pipe = None - self.model_memory = {} - self.lora_memory = [None, None, None, None, None] - self.lora_scale_memory = [1.0, 1.0, 1.0, 1.0, 1.0] - self.LCMconfig = None - self.embed_loaded = [] - self.FreeU = False - torch.cuda.empty_cache() - gc.collect() - - # Load new model - if os.path.isfile(base_model_id): # exists or not same # if os.path.exists(base_model_id): - - if sdxl_safetensors: - logger.info("Default VAE: madebyollin/sdxl-vae-fp16-fix") - self.pipe = StableDiffusionXLPipeline.from_single_file( - base_model_id, - vae=AutoencoderKL.from_pretrained( - "madebyollin/sdxl-vae-fp16-fix", torch_dtype=torch.float16 - ), - torch_dtype=self.type_model_precision, - ) - class_name = "StableDiffusionXLPipeline" - else: - self.pipe = StableDiffusionPipeline.from_single_file( - base_model_id, - # vae=None - # if vae_model == None - # else AutoencoderKL.from_single_file( - # vae_model - # ), - torch_dtype=self.type_model_precision, - ) - class_name = "StableDiffusionPipeline" - else: - file_config = hf_hub_download(repo_id=base_model_id, filename="model_index.json") - - # Reading data from the JSON file - with open(file_config, 'r') as json_config: - data_config = json.load(json_config) - - # Searching for the value of the "_class_name" key - if '_class_name' in data_config: - class_name = data_config['_class_name'] - - match class_name: - case "StableDiffusionPipeline": - self.pipe = StableDiffusionPipeline.from_pretrained( - base_model_id, - torch_dtype=self.type_model_precision, - ) - - case "StableDiffusionXLPipeline": - logger.info("Default VAE: madebyollin/sdxl-vae-fp16-fix") - try: - self.pipe = DiffusionPipeline.from_pretrained( - base_model_id, - vae=AutoencoderKL.from_pretrained( - "madebyollin/sdxl-vae-fp16-fix", torch_dtype=torch.float16 - ), - torch_dtype=torch.float16, - use_safetensors=True, - variant="fp16", - add_watermarker=False, - ) - except Exception as e: - logger.debug(e) - logger.debug("Loading model without parameter variant=fp16") - self.pipe = DiffusionPipeline.from_pretrained( - base_model_id, - vae=AutoencoderKL.from_pretrained( - "madebyollin/sdxl-vae-fp16-fix", torch_dtype=torch.float16 - ), - torch_dtype=torch.float16, - use_safetensors=True, - add_watermarker=False, - ) - self.base_model_id = base_model_id - self.class_name = class_name - - # Load VAE after loaded model - if vae_model is None : - logger.debug("Default VAE") - pass - else: - if os.path.isfile(vae_model): - self.pipe.vae = AutoencoderKL.from_single_file( - vae_model - ) - else: - self.pipe.vae = AutoencoderKL.from_pretrained( - vae_model, - subfolder = "vae", - ) - try: - self.pipe.vae.to(self.type_model_precision) - except: - logger.warning(f"VAE: not in {self.type_model_precision}") - self.vae_model = vae_model - - # Define base scheduler - self.default_scheduler = copy.deepcopy(self.pipe.scheduler) - logger.debug(f"Base sampler: {self.default_scheduler}") - - if task_name in self.model_memory: - self.pipe = self.model_memory[task_name] - # Create new base values - #self.pipe.to(self.device) - # torch.cuda.empty_cache() - # gc.collect() - self.base_model_id = base_model_id - self.task_name = task_name - self.vae_model = vae_model - self.class_name = class_name - self.pipe.watermark = None - return - - # Load task - model_id = CONTROLNET_MODEL_IDS[task_name] - - if task_name == "inpaint": - match class_name: - case "StableDiffusionPipeline": - - controlnet = ControlNetModel.from_pretrained( - model_id, torch_dtype=self.type_model_precision - ) - - self.pipe = StableDiffusionControlNetInpaintPipeline( - vae=self.pipe.vae, - text_encoder=self.pipe.text_encoder, - tokenizer=self.pipe.tokenizer, - unet=self.pipe.unet, - controlnet=controlnet, - scheduler=self.pipe.scheduler, - safety_checker=self.pipe.safety_checker, - feature_extractor=self.pipe.feature_extractor, - requires_safety_checker=self.pipe.config.requires_safety_checker, - ) - case "StableDiffusionXLPipeline": - - self.pipe = StableDiffusionXLInpaintPipeline( - vae=self.pipe.vae, - text_encoder=self.pipe.text_encoder, - text_encoder_2=self.pipe.text_encoder_2, - tokenizer=self.pipe.tokenizer, - tokenizer_2=self.pipe.tokenizer_2, - unet=self.pipe.unet, - # controlnet=self.controlnet, - scheduler=self.pipe.scheduler, - ) - - - if task_name not in ["txt2img", "inpaint", "img2img"]: - match class_name: - case "StableDiffusionPipeline": - - controlnet = ControlNetModel.from_pretrained( - model_id, torch_dtype=self.type_model_precision - ) - - self.pipe = StableDiffusionControlNetPipeline( - vae=self.pipe.vae, - text_encoder=self.pipe.text_encoder, - tokenizer=self.pipe.tokenizer, - unet=self.pipe.unet, - controlnet=controlnet, - scheduler=self.pipe.scheduler, - safety_checker=self.pipe.safety_checker, - feature_extractor=self.pipe.feature_extractor, - requires_safety_checker=self.pipe.config.requires_safety_checker, - ) - self.pipe.scheduler = UniPCMultistepScheduler.from_config(self.pipe.scheduler.config) - - case "StableDiffusionXLPipeline": - - adapter = T2IAdapter.from_pretrained( - model_id, - torch_dtype=torch.float16, - varient="fp16", - ).to(self.device) - - self.pipe = StableDiffusionXLAdapterPipeline( - vae=self.pipe.vae, - text_encoder=self.pipe.text_encoder, - text_encoder_2=self.pipe.text_encoder_2, - tokenizer=self.pipe.tokenizer, - tokenizer_2=self.pipe.tokenizer_2, - unet=self.pipe.unet, - adapter=adapter, - scheduler=self.pipe.scheduler, - ).to(self.device) - - - if task_name in ["txt2img", "img2img"]: - match class_name: - - case "StableDiffusionPipeline": - self.pipe = StableDiffusionPipeline( - vae=self.pipe.vae, - text_encoder=self.pipe.text_encoder, - tokenizer=self.pipe.tokenizer, - unet=self.pipe.unet, - scheduler=self.pipe.scheduler, - safety_checker=self.pipe.safety_checker, - feature_extractor=self.pipe.feature_extractor, - requires_safety_checker=self.pipe.config.requires_safety_checker, - ) - - case "StableDiffusionXLPipeline": - self.pipe = StableDiffusionXLPipeline( - vae=self.pipe.vae, - text_encoder=self.pipe.text_encoder, - text_encoder_2=self.pipe.text_encoder_2, - tokenizer=self.pipe.tokenizer, - tokenizer_2=self.pipe.tokenizer_2, - unet=self.pipe.unet, - scheduler=self.pipe.scheduler, - ) - - if task_name == "img2img": - self.pipe = AutoPipelineForImage2Image.from_pipe(self.pipe) - - # Create new base values - self.pipe.to(self.device) - torch.cuda.empty_cache() - gc.collect() - - self.base_model_id = base_model_id - self.task_name = task_name - self.vae_model = vae_model - self.class_name = class_name - - if self.class_name == "StableDiffusionXLPipeline": - self.pipe.enable_vae_slicing() - self.pipe.enable_vae_tiling() - self.pipe.watermark = None - - if retain_model_in_memory == True and task_name not in self.model_memory: - self.model_memory[task_name] = self.pipe - - return - - def load_controlnet_weight(self, task_name: str) -> None: - torch.cuda.empty_cache() - gc.collect() - model_id = CONTROLNET_MODEL_IDS[task_name] - controlnet = ControlNetModel.from_pretrained( - model_id, torch_dtype=self.type_model_precision - ) - controlnet.to(self.device) - torch.cuda.empty_cache() - gc.collect() - self.pipe.controlnet = controlnet - #self.task_name = task_name - - @torch.autocast("cuda") - def run_pipe( - self, - prompt: str, - negative_prompt: str, - prompt_embeds, - negative_prompt_embeds, - control_image: PIL.Image.Image, - num_images: int, - num_steps: int, - guidance_scale: float, - clip_skip: int, - generator, - controlnet_conditioning_scale, - control_guidance_start, - control_guidance_end, - ) -> list[PIL.Image.Image]: - # Return PIL images - # generator = torch.Generator().manual_seed(seed) - return self.pipe( - prompt=prompt, - negative_prompt=negative_prompt, - prompt_embeds=prompt_embeds, - negative_prompt_embeds=negative_prompt_embeds, - guidance_scale=guidance_scale, - clip_skip=clip_skip, - num_images_per_prompt=num_images, - num_inference_steps=num_steps, - generator=generator, - controlnet_conditioning_scale=controlnet_conditioning_scale, - control_guidance_start=control_guidance_start, - control_guidance_end=control_guidance_end, - image=control_image, - ).images - - @torch.autocast("cuda") - def run_pipe_SD( - self, - prompt: str, - negative_prompt: str, - prompt_embeds, - negative_prompt_embeds, - num_images: int, - num_steps: int, - guidance_scale: float, - clip_skip: int, - height: int, - width: int, - generator, - ) -> list[PIL.Image.Image]: - # Return PIL images - # generator = torch.Generator().manual_seed(seed) - self.preview_handle = None - return self.pipe( - prompt=prompt, - negative_prompt=negative_prompt, - prompt_embeds=prompt_embeds, - negative_prompt_embeds=negative_prompt_embeds, - guidance_scale=guidance_scale, - clip_skip=clip_skip, - num_images_per_prompt=num_images, - num_inference_steps=num_steps, - generator=generator, - height=height, - width=width, - callback=self.callback_pipe if self.image_previews else None, - callback_steps=10 if self.image_previews else 100, - ).images - - # @torch.autocast('cuda') - # def run_pipe_SDXL( - # self, - # prompt: str, - # negative_prompt: str, - # prompt_embeds, - # negative_prompt_embeds, - # num_images: int, - # num_steps: int, - # guidance_scale: float, - # clip_skip: int, - # height : int, - # width : int, - # generator, - # seddd, - # conditioning, - # pooled, - # ) -> list[PIL.Image.Image]: - # # Return PIL images - # #generator = torch.Generator("cuda").manual_seed(seddd) # generator = torch.Generator("cuda").manual_seed(seed), - # return self.pipe( - # prompt = None, - # negative_prompt = None, - # prompt_embeds=conditioning[0:1], - # pooled_prompt_embeds=pooled[0:1], - # negative_prompt_embeds=conditioning[1:2], - # negative_pooled_prompt_embeds=pooled[1:2], - # height = height, - # width = width, - # num_inference_steps = num_steps, - # guidance_scale = guidance_scale, - # clip_skip = clip_skip, - # num_images_per_prompt = num_images, - # generator = generator, - # ).images - - @torch.autocast("cuda") - def run_pipe_inpaint( - self, - prompt: str, - negative_prompt: str, - prompt_embeds, - negative_prompt_embeds, - control_image: PIL.Image.Image, - num_images: int, - num_steps: int, - guidance_scale: float, - clip_skip: int, - strength: float, - init_image, - control_mask, - controlnet_conditioning_scale, - control_guidance_start, - control_guidance_end, - generator, - ) -> list[PIL.Image.Image]: - # Return PIL images - # generator = torch.Generator().manual_seed(seed) - return self.pipe( - prompt=None, - negative_prompt=None, - prompt_embeds=prompt_embeds, - negative_prompt_embeds=negative_prompt_embeds, - eta=1.0, - strength=strength, - image=init_image, # original image - mask_image=control_mask, # mask, values of 0 to 255 - control_image=control_image, # tensor control image - num_images_per_prompt=num_images, - num_inference_steps=num_steps, - guidance_scale=guidance_scale, - clip_skip=clip_skip, - generator=generator, - controlnet_conditioning_scale=controlnet_conditioning_scale, - control_guidance_start=control_guidance_start, - control_guidance_end=control_guidance_end, - ).images - - @torch.autocast("cuda") - def run_pipe_img2img( - self, - prompt: str, - negative_prompt: str, - prompt_embeds, - negative_prompt_embeds, - num_images: int, - num_steps: int, - guidance_scale: float, - clip_skip: int, - strength: float, - init_image, - generator, - ) -> list[PIL.Image.Image]: - # Return PIL images - # generator = torch.Generator().manual_seed(seed) - return self.pipe( - prompt=None, - negative_prompt=None, - prompt_embeds=prompt_embeds, - negative_prompt_embeds=negative_prompt_embeds, - eta=1.0, - strength=strength, - image=init_image, # original image - num_images_per_prompt=num_images, - num_inference_steps=num_steps, - guidance_scale=guidance_scale, - clip_skip=clip_skip, - generator=generator, - ).images - - ### self.x_process return image_preprocessor### - @torch.inference_mode() - def process_canny( - self, - image: np.ndarray, - image_resolution: int, - preprocess_resolution: int, - low_threshold: int, - high_threshold: int, - ) -> list[PIL.Image.Image]: - if image is None: - raise ValueError - - self.preprocessor.load("Canny") - control_image = self.preprocessor( - image=image, - low_threshold=low_threshold, - high_threshold=high_threshold, - image_resolution=image_resolution, - detect_resolution=preprocess_resolution, - ) - - return control_image - - @torch.inference_mode() - def process_mlsd( - self, - image: np.ndarray, - image_resolution: int, - preprocess_resolution: int, - value_threshold: float, - distance_threshold: float, - ) -> list[PIL.Image.Image]: - if image is None: - raise ValueError - - self.preprocessor.load("MLSD") - control_image = self.preprocessor( - image=image, - image_resolution=image_resolution, - detect_resolution=preprocess_resolution, - thr_v=value_threshold, - thr_d=distance_threshold, - ) - - return control_image - - @torch.inference_mode() - def process_scribble( - self, - image: np.ndarray, - image_resolution: int, - preprocess_resolution: int, - preprocessor_name: str, - ) -> list[PIL.Image.Image]: - if image is None: - raise ValueError - - if preprocessor_name == "None": - image = HWC3(image) - image = resize_image(image, resolution=image_resolution) - control_image = PIL.Image.fromarray(image) - elif preprocessor_name == "HED": - self.preprocessor.load(preprocessor_name) - control_image = self.preprocessor( - image=image, - image_resolution=image_resolution, - detect_resolution=preprocess_resolution, - scribble=False, - ) - elif preprocessor_name == "PidiNet": - self.preprocessor.load(preprocessor_name) - control_image = self.preprocessor( - image=image, - image_resolution=image_resolution, - detect_resolution=preprocess_resolution, - safe=False, - ) - - return control_image - - @torch.inference_mode() - def process_scribble_interactive( - self, - image_and_mask: dict[str, np.ndarray], - image_resolution: int, - ) -> list[PIL.Image.Image]: - if image_and_mask is None: - raise ValueError - - image = image_and_mask["mask"] - image = HWC3(image) - image = resize_image(image, resolution=image_resolution) - control_image = PIL.Image.fromarray(image) - - return control_image - - @torch.inference_mode() - def process_softedge( - self, - image: np.ndarray, - image_resolution: int, - preprocess_resolution: int, - preprocessor_name: str, - ) -> list[PIL.Image.Image]: - if image is None: - raise ValueError - - if preprocessor_name == "None": - image = HWC3(image) - image = resize_image(image, resolution=image_resolution) - control_image = PIL.Image.fromarray(image) - elif preprocessor_name in ["HED", "HED safe"]: - safe = "safe" in preprocessor_name - self.preprocessor.load("HED") - control_image = self.preprocessor( - image=image, - image_resolution=image_resolution, - detect_resolution=preprocess_resolution, - scribble=safe, - ) - elif preprocessor_name in ["PidiNet", "PidiNet safe"]: - safe = "safe" in preprocessor_name - self.preprocessor.load("PidiNet") - control_image = self.preprocessor( - image=image, - image_resolution=image_resolution, - detect_resolution=preprocess_resolution, - safe=safe, - ) - else: - raise ValueError - - return control_image - - @torch.inference_mode() - def process_openpose( - self, - image: np.ndarray, - image_resolution: int, - preprocess_resolution: int, - preprocessor_name: str, - ) -> list[PIL.Image.Image]: - if image is None: - raise ValueError - - if preprocessor_name == "None": - image = HWC3(image) - image = resize_image(image, resolution=image_resolution) - control_image = PIL.Image.fromarray(image) - else: - self.preprocessor.load("Openpose") - control_image = self.preprocessor( - image=image, - image_resolution=image_resolution, - detect_resolution=preprocess_resolution, - hand_and_face=True, - ) - - return control_image - - @torch.inference_mode() - def process_segmentation( - self, - image: np.ndarray, - image_resolution: int, - preprocess_resolution: int, - preprocessor_name: str, - ) -> list[PIL.Image.Image]: - if image is None: - raise ValueError - - if preprocessor_name == "None": - image = HWC3(image) - image = resize_image(image, resolution=image_resolution) - control_image = PIL.Image.fromarray(image) - else: - self.preprocessor.load(preprocessor_name) - control_image = self.preprocessor( - image=image, - image_resolution=image_resolution, - detect_resolution=preprocess_resolution, - ) - - return control_image - - @torch.inference_mode() - def process_depth( - self, - image: np.ndarray, - image_resolution: int, - preprocess_resolution: int, - preprocessor_name: str, - ) -> list[PIL.Image.Image]: - if image is None: - raise ValueError - - if preprocessor_name == "None": - image = HWC3(image) - image = resize_image(image, resolution=image_resolution) - control_image = PIL.Image.fromarray(image) - else: - self.preprocessor.load(preprocessor_name) - control_image = self.preprocessor( - image=image, - image_resolution=image_resolution, - detect_resolution=preprocess_resolution, - ) - - return control_image - - @torch.inference_mode() - def process_normal( - self, - image: np.ndarray, - image_resolution: int, - preprocess_resolution: int, - preprocessor_name: str, - ) -> list[PIL.Image.Image]: - if image is None: - raise ValueError - - if preprocessor_name == "None": - image = HWC3(image) - image = resize_image(image, resolution=image_resolution) - control_image = PIL.Image.fromarray(image) - else: - self.preprocessor.load("NormalBae") - control_image = self.preprocessor( - image=image, - image_resolution=image_resolution, - detect_resolution=preprocess_resolution, - ) - - return control_image - - @torch.inference_mode() - def process_lineart( - self, - image: np.ndarray, - image_resolution: int, - preprocess_resolution: int, - preprocessor_name: str, - ) -> list[PIL.Image.Image]: - if image is None: - raise ValueError - - if preprocessor_name in ["None", "None (anime)"]: - image = HWC3(image) - image = resize_image(image, resolution=image_resolution) - control_image = PIL.Image.fromarray(image) - elif preprocessor_name in ["Lineart", "Lineart coarse"]: - coarse = "coarse" in preprocessor_name - self.preprocessor.load("Lineart") - control_image = self.preprocessor( - image=image, - image_resolution=image_resolution, - detect_resolution=preprocess_resolution, - coarse=coarse, - ) - elif preprocessor_name == "Lineart (anime)": - self.preprocessor.load("LineartAnime") - control_image = self.preprocessor( - image=image, - image_resolution=image_resolution, - detect_resolution=preprocess_resolution, - ) - - if self.class_name == "StableDiffusionPipeline": - if "anime" in preprocessor_name: - self.load_controlnet_weight("lineart_anime") - logger.info("Linear anime") - else: - self.load_controlnet_weight("lineart") - - return control_image - - @torch.inference_mode() - def process_shuffle( - self, - image: np.ndarray, - image_resolution: int, - preprocessor_name: str, - ) -> list[PIL.Image.Image]: - if image is None: - raise ValueError - - if preprocessor_name == "None": - image = HWC3(image) - image = resize_image(image, resolution=image_resolution) - control_image = PIL.Image.fromarray(image) - else: - self.preprocessor.load(preprocessor_name) - control_image = self.preprocessor( - image=image, - image_resolution=image_resolution, - ) - - return control_image - - @torch.inference_mode() - def process_ip2p( - self, - image: np.ndarray, - image_resolution: int, - ) -> list[PIL.Image.Image]: - if image is None: - raise ValueError - - image = HWC3(image) - image = resize_image(image, resolution=image_resolution) - control_image = PIL.Image.fromarray(image) - - return control_image - - @torch.inference_mode() - def process_inpaint( - self, - image: np.ndarray, - image_resolution: int, - preprocess_resolution: int, - image_mask: str, ### - ) -> list[PIL.Image.Image]: - if image is None: - raise ValueError - - image = HWC3(image) - image = resize_image(image, resolution=image_resolution) - init_image = PIL.Image.fromarray(image) - - image_mask = HWC3(image_mask) - image_mask = resize_image(image_mask, resolution=image_resolution) - control_mask = PIL.Image.fromarray(image_mask) - - control_image = make_inpaint_condition(init_image, control_mask) - - return init_image, control_mask, control_image - - @torch.inference_mode() - def process_img2img( - self, - image: np.ndarray, - image_resolution: int, - ) -> list[PIL.Image.Image]: - if image is None: - raise ValueError - - image = HWC3(image) - image = resize_image(image, resolution=image_resolution) - init_image = PIL.Image.fromarray(image) - - return init_image - - def get_scheduler(self, name): - if name in SCHEDULER_CONFIG_MAP: - scheduler_class, config = SCHEDULER_CONFIG_MAP[name] - #return scheduler_class.from_config(self.pipe.scheduler.config, **config) - # beta self.default_scheduler - return scheduler_class.from_config(self.default_scheduler.config, **config) - else: - raise ValueError(f"Scheduler with name {name} not found. Valid schedulers: {', '.join(scheduler_names)}") - - def create_prompt_embeds( - self, - prompt, - negative_prompt, - textual_inversion, - clip_skip, - syntax_weights, - ): - if self.class_name == "StableDiffusionPipeline": - if self.embed_loaded != textual_inversion and textual_inversion != []: - # Textual Inversion - for name, directory_name in textual_inversion: - try: - if directory_name.endswith(".pt"): - model = torch.load(directory_name, map_location=self.device) - model_tensors = model.get("string_to_param").get("*") - s_model = {"emb_params": model_tensors} - # save_file(s_model, directory_name[:-3] + '.safetensors') - self.pipe.load_textual_inversion(s_model, token=name) - - else: - # self.pipe.text_encoder.resize_token_embeddings(len(self.pipe.tokenizer),pad_to_multiple_of=128) - # self.pipe.load_textual_inversion("./bad_prompt.pt", token="baddd") - self.pipe.load_textual_inversion(directory_name, token=name) - if not self.gui_active: - logger.info(f"Applied : {name}") - - except Exception as e: - exception = str(e) - if name in exception: - logger.debug(f"Previous loaded embed {name}") - else: - logger.error(exception) - logger.error(f"Can't apply embed {name}") - self.embed_loaded = textual_inversion - - # Clip skip - # clip_skip_diffusers = None #clip_skip - 1 # future update - if not hasattr(self, "compel"): - self.compel = Compel( - tokenizer=self.pipe.tokenizer, - text_encoder=self.pipe.text_encoder, - truncate_long_prompts=False, - returned_embeddings_type=ReturnedEmbeddingsType.PENULTIMATE_HIDDEN_STATES_NORMALIZED if clip_skip else ReturnedEmbeddingsType.LAST_HIDDEN_STATES_NORMALIZED, - ) - - # Prompt weights for textual inversion - prompt_ti = self.pipe.maybe_convert_prompt(prompt, self.pipe.tokenizer) - negative_prompt_ti = self.pipe.maybe_convert_prompt( - negative_prompt, self.pipe.tokenizer - ) - - # separate the multi-vector textual inversion by comma - if self.embed_loaded != []: - prompt_ti = add_comma_after_pattern_ti(prompt_ti) - negative_prompt_ti = add_comma_after_pattern_ti(negative_prompt_ti) - - # Syntax weights - self.pipe.to(self.device) - if syntax_weights == "Classic": - prompt_emb = get_embed_new(prompt_ti, self.pipe, self.compel) - negative_prompt_emb = get_embed_new(negative_prompt_ti, self.pipe, self.compel) - else: - prompt_emb = get_embed_new(prompt_ti, self.pipe, self.compel, compel_process_sd=True) - negative_prompt_emb = get_embed_new(negative_prompt_ti, self.pipe, self.compel, compel_process_sd=True) - - # Fix error shape - if prompt_emb.shape != negative_prompt_emb.shape: - ( - prompt_emb, - negative_prompt_emb, - ) = self.compel.pad_conditioning_tensors_to_same_length( - [prompt_emb, negative_prompt_emb] - ) - - return prompt_emb, negative_prompt_emb - - else: - # SDXL embed - if self.embed_loaded != textual_inversion and textual_inversion != []: - # Textual Inversion - for name, directory_name in textual_inversion: - try: - from safetensors.torch import load_file - state_dict = load_file(directory_name) - self.pipe.load_textual_inversion(state_dict["clip_g"], token=name, text_encoder=self.pipe.text_encoder_2, tokenizer=self.pipe.tokenizer_2) - self.pipe.load_textual_inversion(state_dict["clip_l"], token=name, text_encoder=self.pipe.text_encoder, tokenizer=self.pipe.tokenizer) - if not self.gui_active: - logger.info(f"Applied : {name}") - except Exception as e: - exception = str(e) - if name in exception: - logger.debug(f"Previous loaded embed {name}") - else: - logger.error(exception) - logger.error(f"Can't apply embed {name}") - self.embed_loaded = textual_inversion - - if not hasattr(self, "compel"): - # Clip skip - if clip_skip: - # clip_skip_diffusers = None #clip_skip - 1 # future update - self.compel = Compel( - tokenizer=[self.pipe.tokenizer, self.pipe.tokenizer_2], - text_encoder=[self.pipe.text_encoder, self.pipe.text_encoder_2], - returned_embeddings_type=ReturnedEmbeddingsType.PENULTIMATE_HIDDEN_STATES_NON_NORMALIZED, - requires_pooled=[False, True], - truncate_long_prompts=False, - ) - else: - # clip_skip_diffusers = None # clip_skip = None # future update - self.compel = Compel( - tokenizer=[self.pipe.tokenizer, self.pipe.tokenizer_2], - text_encoder=[self.pipe.text_encoder, self.pipe.text_encoder_2], - requires_pooled=[False, True], - truncate_long_prompts=False, - ) - - # Prompt weights for textual inversion - try: - prompt_ti = self.pipe.maybe_convert_prompt(prompt, self.pipe.tokenizer) - negative_prompt_ti = self.pipe.maybe_convert_prompt(negative_prompt, self.pipe.tokenizer) - except: - prompt_ti = prompt - negative_prompt_ti = negative_prompt - logger.error("FAILED: Convert prompt for textual inversion") - - # prompt syntax style a1... - if syntax_weights == "Classic": - self.pipe.to("cuda") - prompt_ti = get_embed_new(prompt_ti, self.pipe, self.compel, only_convert_string=True) - negative_prompt_ti = get_embed_new(negative_prompt_ti, self.pipe, self.compel, only_convert_string=True) - else: - prompt_ti = prompt - negative_prompt_ti = negative_prompt - - conditioning, pooled = self.compel([prompt_ti, negative_prompt_ti]) - - return conditioning, pooled - - - - def process_lora(self, select_lora, lora_weights_scale, unload=False): - device = "cuda" if torch.cuda.is_available() else "cpu" - if not unload: - if select_lora != None: - try: - self.pipe = lora_mix_load( - self.pipe, - select_lora, - lora_weights_scale, - device=device, - dtype=self.type_model_precision, - ) - logger.info(select_lora) - except Exception as e: - logger.error(f"ERROR: LoRA not compatible: {select_lora}") - logger.debug(f"{str(e)}") - return self.pipe - else: - # Unload numerically unstable but fast and need less memory - if select_lora != None: - try: - self.pipe = lora_mix_load( - self.pipe, - select_lora, - -lora_weights_scale, - device=device, - dtype=self.type_model_precision, - ) - logger.debug(f"Unload LoRA: {select_lora}") - except: - pass - return self.pipe - - def load_style_file(self, style_json_file): - if os.path.exists(style_json_file): - try: - file_json_read = get_json_content(style_json_file) - self.styles_data = {k["name"]: (k["prompt"], k["negative_prompt"]) for k in file_json_read} - self.STYLE_NAMES = list(self.styles_data.keys()) - self.style_json_file = style_json_file - logger.info(f"Styles json file loaded with {len(self.STYLE_NAMES)} styles") - logger.debug(str(self.STYLE_NAMES)) - except Exception as e: - logger.error(str(e)) - else: - logger.error("Not found styles json file in directory") - - def callback_pipe(self, iter, t, latents): - # convert latents to image - with torch.no_grad(): - latents = 1 / 0.18215 * latents - image = self.pipe.vae.decode(latents).sample - - image = (image / 2 + 0.5).clamp(0, 1) - - # we always cast to float32 as this does not cause significant overhead and is compatible with bfloa16 - image = image.cpu().permute(0, 2, 3, 1).float().numpy() - - # convert to PIL Images - image = self.pipe.numpy_to_pil(image) - - # show one image - # global preview_handle - if self.preview_handle == None: - self.preview_handle = display(image[0], display_id=True) - else: - self.preview_handle.update(image[0]) - - def __call__( - self, - prompt: str = "", - negative_prompt: str = "", - img_height: int = 512, - img_width: int = 512, - num_images: int = 1, - num_steps: int = 30, - guidance_scale: float = 7.5, - clip_skip: Optional[bool] = True, - seed: int = -1, - sampler: str = "DPM++ 2M", - syntax_weights: str = "Classic", - - lora_A: Optional[str] = None, - lora_scale_A: float = 1.0, - lora_B: Optional[str] = None, - lora_scale_B: float = 1.0, - lora_C: Optional[str] = None, - lora_scale_C: float = 1.0, - lora_D: Optional[str] = None, - lora_scale_D: float = 1.0, - lora_E: Optional[str] = None, - lora_scale_E: float = 1.0, - textual_inversion: List[Tuple[str, str]] = [], - FreeU: bool = False, - adetailer_A: bool = False, - adetailer_A_params: Dict[str, Any] = {}, - adetailer_B: bool = False, - adetailer_B_params: Dict[str, Any] = {}, - style_prompt: Optional[Any] = [""], - style_json_file: Optional[Any] = "", - - image: Optional[Any] = None, - preprocessor_name: Optional[str] = "None", - preprocess_resolution: int = 512, - image_resolution: int = 512, - image_mask: Optional[Any] = None, - strength: float = 0.35, - low_threshold: int = 100, - high_threshold: int = 200, - value_threshold: float = 0.1, - distance_threshold: float = 0.1, - controlnet_conditioning_scale: float = 1.0, - control_guidance_start: float = 0.0, - control_guidance_end: float = 1.0, - t2i_adapter_preprocessor: bool = True, - t2i_adapter_conditioning_scale: float = 1.0, - t2i_adapter_conditioning_factor: float = 1.0, - - upscaler_model_path: Optional[str] = None, # add latent - upscaler_increases_size: float = 1.5, - esrgan_tile: int = 100, - esrgan_tile_overlap: int = 10, - hires_steps: int = 25, - hires_denoising_strength: float = 0.35, - hires_prompt: str = "", - hires_negative_prompt: str = "", - hires_sampler: str = "Use same sampler", - - loop_generation: int = 1, - display_images: bool = False, - save_generated_images: bool = True, - image_storage_location: str = "./images", - generator_in_cpu: bool = False, - leave_progress_bar: bool = False, - disable_progress_bar: bool = False, - hires_before_adetailer: bool = False, - hires_after_adetailer: bool = True, - retain_compel_previous_load: bool = False, - retain_detailfix_model_previous_load: bool = False, - retain_hires_model_previous_load: bool = False, - image_previews: bool = False, - xformers_memory_efficient_attention: bool = False, - gui_active: bool = False, - ): - - """ - The call function for the generation. - - Args: - prompt (str , optional): - The prompt or prompts to guide image generation. - negative_prompt (str , optional): - The prompt or prompts to guide what to not include in image generation. Ignored when not using guidance (`guidance_scale < 1`). - img_height (int, optional, defaults to 512): - The height in pixels of the generated image. - img_width (int, optional, defaults to 512): - The width in pixels of the generated image. - num_images (int, optional, defaults to 1): - The number of images to generate per prompt. - num_steps (int, optional, defaults to 30): - The number of denoising steps. More denoising steps usually lead to a higher quality image at the - expense of slower inference. - guidance_scale (float, optional, defaults to 7.5): - A higher guidance scale value encourages the model to generate images closely linked to the text - `prompt` at the expense of lower image quality. Guidance scale is enabled when `guidance_scale > 1`. - clip_skip (bool, optional): - Number of layers to be skipped from CLIP while computing the prompt embeddings. It can be placed on - the penultimate (True) or last layer (False). - seed (int, optional, defaults to -1): - A seed for controlling the randomness of the image generation process. -1 design a random seed. - sampler (str, optional, defaults to "DPM++ 2M"): - The sampler used for the generation process. Available samplers: DPM++ 2M, DPM++ 2M Karras, DPM++ 2M SDE, - DPM++ 2M SDE Karras, DPM++ SDE, DPM++ SDE Karras, DPM2, DPM2 Karras, Euler, Euler a, Heun, LMS, LMS Karras, - DDIM, DEIS, UniPC, DPM2 a, DPM2 a Karras, PNDM, LCM, DPM++ 2M Lu, DPM++ 2M Ef, DPM++ 2M SDE Lu and DPM++ 2M SDE Ef. - syntax_weights (str, optional, defaults to "Classic"): - Specifies the type of syntax weights used during generation. "Classic" is (word:weight), "Compel" is (word)weight - lora_A (str, optional): - Placeholder for lora A parameter. - lora_scale_A (float, optional, defaults to 1.0): - Placeholder for lora scale A parameter. - lora_B (str, optional): - Placeholder for lora B parameter. - lora_scale_B (float, optional, defaults to 1.0): - Placeholder for lora scale B parameter. - lora_C (str, optional): - Placeholder for lora C parameter. - lora_scale_C (float, optional, defaults to 1.0): - Placeholder for lora scale C parameter. - lora_D (str, optional): - Placeholder for lora D parameter. - lora_scale_D (float, optional, defaults to 1.0): - Placeholder for lora scale D parameter. - lora_E (str, optional): - Placeholder for lora E parameter. - lora_scale_E (float, optional, defaults to 1.0): - Placeholder for lora scale E parameter. - textual_inversion (List[Tuple[str, str]], optional, defaults to []): - Placeholder for textual inversion list of tuples. Help the model to adapt to a particular - style. [("",""),...] - FreeU (bool, optional, defaults to False): - Is a method that substantially improves diffusion model sample quality at no costs. - adetailer_A (bool, optional, defaults to False): - Guided Inpainting to Correct Image, it is preferable to use low values for strength. - adetailer_A_params (Dict[str, Any], optional, defaults to {}): - Placeholder for adetailer_A parameters in a dict example {"prompt": "my prompt", "inpaint_only": True ...}. - If not specified, default values will be used: - - face_detector_ad (bool): Indicates whether face detection is enabled. Defaults to True. - - person_detector_ad (bool): Indicates whether person detection is enabled. Defaults to True. - - hand_detector_ad (bool): Indicates whether hand detection is enabled. Defaults to False. - - prompt (str): A prompt for the adetailer_A. Defaults to an empty string. - - negative_prompt (str): A negative prompt for the adetailer_A. Defaults to an empty string. - - strength (float): The strength parameter value. Defaults to 0.35. - - mask_dilation (int): The mask dilation value. Defaults to 4. - - mask_blur (int): The mask blur value. Defaults to 4. - - mask_padding (int): The mask padding value. Defaults to 32. - - inpaint_only (bool): Indicates if only inpainting is to be performed. Defaults to True. False is img2img mode - - sampler (str): The sampler type to be used. Defaults to "Use same sampler". - adetailer_B (bool, optional, defaults to False): - Guided Inpainting to Correct Image, it is preferable to use low values for strength. - adetailer_B_params (Dict[str, Any], optional, defaults to {}): - Placeholder for adetailer_B parameters in a dict example {"prompt": "my prompt", "inpaint_only": True ...}. - If not specified, default values will be used. - style_prompt (str, optional): - If a style that is in STYLE_NAMES is specified, it will be added to the original prompt and negative prompt. - style_json_file (str, optional): - JSON with styles to be applied and used in style_prompt. - upscaler_model_path (str, optional): - Placeholder for upscaler model path. - upscaler_increases_size (float, optional, defaults to 1.5): - Placeholder for upscaler increases size parameter. - esrgan_tile (int, optional, defaults to 100): - Tile if use a ESRGAN model. - esrgan_tile_overlap (int, optional, defaults to 100): - Tile overlap if use a ESRGAN model. - hires_steps (int, optional, defaults to 25): - The number of denoising steps for hires. More denoising steps usually lead to a higher quality image at the - expense of slower inference. - hires_denoising_strength (float, optional, defaults to 0.35): - Strength parameter for the hires. - hires_prompt (str , optional): - The prompt for hires. If not specified, the main prompt will be used. - hires_negative_prompt (str , optional): - The negative prompt for hires. If not specified, the main negative prompt will be used. - hires_sampler (str, optional, defaults to "Use same sampler"): - The sampler used for the hires generation process. If not specified, the main sampler will be used. - image (Any, optional): - The image to be used for the Inpaint, ControlNet, or T2I adapter. - preprocessor_name (str, optional, defaults to "None"): - Preprocessor name for ControlNet. - preprocess_resolution (int, optional, defaults to 512): - Preprocess resolution for the Inpaint, ControlNet, or T2I adapter. - image_resolution (int, optional, defaults to 512): - Image resolution for the Img2Img, Inpaint, ControlNet, or T2I adapter. - image_mask (Any, optional): - Path image mask for the Inpaint. - strength (float, optional, defaults to 0.35): - Strength parameter for the Inpaint and Img2Img. - low_threshold (int, optional, defaults to 100): - Low threshold parameter for ControlNet and T2I Adapter Canny. - high_threshold (int, optional, defaults to 200): - High threshold parameter for ControlNet and T2I Adapter Canny. - value_threshold (float, optional, defaults to 0.1): - Value threshold parameter for ControlNet MLSD. - distance_threshold (float, optional, defaults to 0.1): - Distance threshold parameter for ControlNet MLSD. - controlnet_conditioning_scale (float, optional, defaults to 1.0): - The outputs of the ControlNet are multiplied by `controlnet_conditioning_scale` before they are added - to the residual in the original `unet`. Used in ControlNet and Inpaint - control_guidance_start (float, optional, defaults to 0.0): - The percentage of total steps at which the ControlNet starts applying. Used in ControlNet and Inpaint - control_guidance_end (float, optional, defaults to 1.0): - The percentage of total steps at which the ControlNet stops applying. Used in ControlNet and Inpaint - t2i_adapter_preprocessor (bool, optional, defaults to True): - Preprocessor for the image in sdxl_canny by default is True. - t2i_adapter_conditioning_scale (float, optional, defaults to 1.0): - The outputs of the adapter are multiplied by `t2i_adapter_conditioning_scale` before they are added to the - residual in the original unet. - t2i_adapter_conditioning_factor (float, optional, defaults to 1.0): - The fraction of timesteps for which adapter should be applied. If `t2i_adapter_conditioning_factor` is - `0.0`, adapter is not applied at all. If `t2i_adapter_conditioning_factor` is `1.0`, adapter is applied for - all timesteps. If `t2i_adapter_conditioning_factor` is `0.5`, adapter is applied for half of the timesteps. - loop_generation (int, optional, defaults to 1): - The number of times the specified `num_images` will be generated. - display_images (bool, optional, defaults to False): - If you use a notebook, you will be able to display the images generated with this parameter. - save_generated_images (bool, optional, defaults to True): - By default, the generated images are saved in the current location within the 'images' folder. You can disable this with this parameter. - image_storage_location (str , optional, defaults to "./images"): - The directory where the generated images are saved. - generator_in_cpu (bool, optional, defaults to False): - The generator by default is specified on the GPU. To obtain more consistent results across various environments, - it is preferable to use the generator on the CPU. - leave_progress_bar (bool, optional, defaults to False): - Leave the progress bar after generating the image. - disable_progress_bar (bool, optional, defaults to False): - Do not display the progress bar during image generation. - hires_before_adetailer (bool, optional, defaults to False): - Apply an upscale and high-resolution fix before adetailer. - hires_after_adetailer (bool, optional, defaults to True): - Apply an upscale and high-resolution fix after adetailer. - retain_compel_previous_load (bool, optional, defaults to False): - The previous compel remains preloaded in memory. - retain_detailfix_model_previous_load (bool, optional, defaults to False): - The previous adetailer model remains preloaded in memory. - retain_hires_model_previous_load (bool, optional, defaults to False): - The previous hires model remains preloaded in memory. - image_previews (bool, optional, defaults to False): - Displaying the image denoising process. - xformers_memory_efficient_attention (bool, optional, defaults to False): - Improves generation time, currently disabled. - gui_active (bool, optional, defaults to False): - utility when used with a GUI, it changes the behavior especially by displaying confirmation messages or options. - - Specific parameter usage details: - - Additional parameters that will be used in Inpaint: - - image - - image_mask - - image_resolution - - strength - for SD 1.5: - - controlnet_conditioning_scale - - control_guidance_start - - control_guidance_end - - Additional parameters that will be used in img2img: - - image - - image_resolution - - strength - - Additional parameters that will be used in ControlNet for SD 1.5 depending on the task: - - image - - preprocessor_name - - preprocess_resolution - - image_resolution - - controlnet_conditioning_scale - - control_guidance_start - - control_guidance_end - for Canny: - - low_threshold - - high_threshold - for MLSD: - - value_threshold - - distance_threshold - - Additional parameters that will be used in T2I adapter for SDXL depending on the task: - - image - - preprocess_resolution - - image_resolution - - t2i_adapter_preprocessor - - t2i_adapter_conditioning_scale - - t2i_adapter_conditioning_factor - - """ - - if self.task_name != "txt2img" and image == None: - raise ValueError( - "You need to specify the for this task." - ) - if img_height % 8 != 0: - img_height = img_height + (8 - img_height % 8) - logger.warning(f"Height must be divisible by 8, changed to {str(img_height)}") - if img_width % 8 != 0: - img_width = img_width + (8 - img_width % 8) - logger.warning(f"Width must be divisible by 8, changed to {str(img_width)}") - if image_resolution % 8 != 0: - image_resolution = image_resolution + (8 - image_resolution % 8) - logger.warning(f"Image resolution must be divisible by 8, changed to {str(image_resolution)}") - if control_guidance_start >= control_guidance_end: - logger.error( - "Control guidance start (ControlNet Start Threshold) cannot be larger or equal to control guidance end (ControlNet Stop Threshold). The default values 0.0 and 1.0 will be used." - ) - control_guidance_start, control_guidance_end = 0.0, 1.0 - - self.gui_active = gui_active - self.image_previews = image_previews - - if self.pipe == None: - self.load_pipe( - self.base_model_id, - task_name=self.task_name, - vae_model=self.vae_model, - reload=True, - ) - - self.pipe.set_progress_bar_config(leave=leave_progress_bar) - self.pipe.set_progress_bar_config(disable=disable_progress_bar) - - xformers_memory_efficient_attention=False # disabled - if xformers_memory_efficient_attention and torch.cuda.is_available(): - self.pipe.disable_xformers_memory_efficient_attention() - self.pipe.to(self.device) - - # Load style prompt file - if style_json_file != "" and style_json_file != self.style_json_file: - self.load_style_file(style_json_file) - # Set style - if isinstance(style_prompt, str): - style_prompt = [style_prompt] - if style_prompt != [""]: - prompt, negative_prompt = apply_style(style_prompt, prompt, negative_prompt, self.styles_data, self.STYLE_NAMES) - - # LoRA load - if self.lora_memory == [ - lora_A, - lora_B, - lora_C, - lora_D, - lora_E, - ] and self.lora_scale_memory == [ - lora_scale_A, - lora_scale_B, - lora_scale_C, - lora_scale_D, - lora_scale_E, - ]: - for single_lora in self.lora_memory: - if single_lora != None: - logger.info(f"LoRA in memory: {single_lora}") - pass - - else: - logger.debug("_un, re and load_ lora") - self.pipe = self.process_lora( - self.lora_memory[0], self.lora_scale_memory[0], unload=True - ) - self.pipe = self.process_lora( - self.lora_memory[1], self.lora_scale_memory[1], unload=True - ) - self.pipe = self.process_lora( - self.lora_memory[2], self.lora_scale_memory[2], unload=True - ) - self.pipe = self.process_lora( - self.lora_memory[3], self.lora_scale_memory[3], unload=True - ) - self.pipe = self.process_lora( - self.lora_memory[4], self.lora_scale_memory[4], unload=True - ) - - self.pipe = self.process_lora(lora_A, lora_scale_A) - self.pipe = self.process_lora(lora_B, lora_scale_B) - self.pipe = self.process_lora(lora_C, lora_scale_C) - self.pipe = self.process_lora(lora_D, lora_scale_D) - self.pipe = self.process_lora(lora_E, lora_scale_E) - - self.lora_memory = [lora_A, lora_B, lora_C, lora_D, lora_E] - self.lora_scale_memory = [ - lora_scale_A, - lora_scale_B, - lora_scale_C, - lora_scale_D, - lora_scale_E, - ] - - # LCM config - if sampler == "LCM" and self.LCMconfig == None: - if self.class_name == "StableDiffusionPipeline": - adapter_id = "latent-consistency/lcm-lora-sdv1-5" - elif self.class_name == "StableDiffusionXLPipeline": - adapter_id = "latent-consistency/lcm-lora-sdxl" - - self.process_lora(adapter_id, 1.0) - self.LCMconfig = adapter_id - logger.info("LCM") - elif sampler != "LCM" and self.LCMconfig != None: - self.process_lora(self.LCMconfig, 1.0, unload=True) - self.LCMconfig = None - elif self.LCMconfig != None: - logger.info("LCM") - - # FreeU - if FreeU: - logger.info("FreeU active") - if self.class_name == "StableDiffusionPipeline": - # sd - self.pipe.enable_freeu(s1=0.9, s2=0.2, b1=1.2, b2=1.4) - else: - # sdxl - self.pipe.enable_freeu(s1=0.6, s2=0.4, b1=1.1, b2=1.2) - self.FreeU = True - elif self.FreeU: - self.pipe.disable_freeu() - self.FreeU = False - - # Prompt Optimizations - if hasattr(self, "compel") and not retain_compel_previous_load: - del self.compel - - prompt_emb, negative_prompt_emb = self.create_prompt_embeds( - prompt=prompt, - negative_prompt=negative_prompt, - textual_inversion=textual_inversion, - clip_skip=clip_skip, - syntax_weights=syntax_weights, - ) - - if self.class_name != "StableDiffusionPipeline": - # Additional prompt for SDXL - conditioning, pooled = prompt_emb.clone(), negative_prompt_emb.clone() - prompt_emb = negative_prompt_emb = None - - - if torch.cuda.is_available() and xformers_memory_efficient_attention: - if xformers_memory_efficient_attention: - self.pipe.enable_xformers_memory_efficient_attention() - else: - self.pipe.disable_xformers_memory_efficient_attention() - - try: - #self.pipe.scheduler = DPMSolverSinglestepScheduler() # fix default params by random scheduler, not recomn - self.pipe.scheduler = self.get_scheduler(sampler) - except Exception as e: - logger.debug(f"{e}") - logger.warning(f"Error in sampler, please try again") - #self.pipe = None - torch.cuda.empty_cache() - gc.collect() - return - - self.pipe.safety_checker = None - - # Get image Global - if self.task_name != "txt2img": - if isinstance(image, str): - # If the input is a string (file path), open it as an image - image_pil = Image.open(image) - numpy_array = np.array(image_pil, dtype=np.uint8) - elif isinstance(image, Image.Image): - # If the input is already a PIL Image, convert it to a NumPy array - numpy_array = np.array(image, dtype=np.uint8) - elif isinstance(image, np.ndarray): - # If the input is a NumPy array, np.uint8 - numpy_array = image.astype(np.uint8) - else: - if gui_active: - logger.info( - "Not found image" - ) - return - else: - raise ValueError( - "Unsupported image type or not control image found; Bug report to https://github.com/R3gm/stablepy or https://github.com/R3gm/SD_diffusers_interactive" - ) - - # Extract the RGB channels - try: - array_rgb = numpy_array[:, :, :3] - except: - logger.error("Unsupported image type") - raise ValueError( - "Unsupported image type; Bug report to https://github.com/R3gm/stablepy or https://github.com/R3gm/SD_diffusers_interactive" - ) # return - - # Get params preprocess Global SD 1.5 - preprocess_params_config = {} - if self.task_name not in ["txt2img", "inpaint", "img2img"]: - preprocess_params_config["image"] = array_rgb - preprocess_params_config["image_resolution"] = image_resolution - - if self.task_name != "ip2p": - if self.task_name != "shuffle": - preprocess_params_config[ - "preprocess_resolution" - ] = preprocess_resolution - if self.task_name != "mlsd" and self.task_name != "canny": - preprocess_params_config["preprocessor_name"] = preprocessor_name - - # RUN Preprocess SD 1.5 - if self.task_name == "inpaint": - # Get mask for Inpaint - if gui_active or os.path.exists(str(image_mask)): - # Read image mask from gui - mask_control_img = Image.open(image_mask) - numpy_array_mask = np.array(mask_control_img, dtype=np.uint8) - array_rgb_mask = numpy_array_mask[:, :, :3] - elif not gui_active: - # Convert control image to draw - import base64 - import matplotlib.pyplot as plt - name_without_extension = os.path.splitext(image.split("/")[-1])[0] - image64 = base64.b64encode(open(image, "rb").read()) - image64 = image64.decode("utf-8") - img = np.array(plt.imread(f"{image}")[:, :, :3]) - - # Create mask interactive - logger.info(f"Draw the mask on this canvas using the mouse. When you finish, press 'Finish' in the bottom side of the canvas.") - draw( - image64, - filename=f"./{name_without_extension}_draw.png", - w=img.shape[1], - h=img.shape[0], - line_width=0.04 * img.shape[1], - ) - - # Create mask and save - with_mask = np.array( - plt.imread(f"./{name_without_extension}_draw.png")[:, :, :3] - ) - mask = ( - (with_mask[:, :, 0] == 1) - * (with_mask[:, :, 1] == 0) - * (with_mask[:, :, 2] == 0) - ) - plt.imsave(f"./{name_without_extension}_mask.png", mask, cmap="gray") - mask_control = f"./{name_without_extension}_mask.png" - logger.info(f"Mask saved: {mask_control}") - - # Read image mask - mask_control_img = Image.open(mask_control) - numpy_array_mask = np.array(mask_control_img, dtype=np.uint8) - array_rgb_mask = numpy_array_mask[:, :, :3] - else: - raise ValueError("No images found") - - init_image, control_mask, control_image = self.process_inpaint( - image=array_rgb, - image_resolution=image_resolution, - preprocess_resolution=preprocess_resolution, # Not used - image_mask=array_rgb_mask, - ) - - elif self.task_name == "openpose": - logger.info("Openpose") - control_image = self.process_openpose(**preprocess_params_config) - - elif self.task_name == "canny": - logger.info("Canny") - control_image = self.process_canny( - **preprocess_params_config, - low_threshold=low_threshold, - high_threshold=high_threshold, - ) - - elif self.task_name == "mlsd": - logger.info("MLSD") - control_image = self.process_mlsd( - **preprocess_params_config, - value_threshold=value_threshold, - distance_threshold=distance_threshold, - ) - - elif self.task_name == "scribble": - logger.info("Scribble") - control_image = self.process_scribble(**preprocess_params_config) - - elif self.task_name == "softedge": - logger.info("Softedge") - control_image = self.process_softedge(**preprocess_params_config) - - elif self.task_name == "segmentation": - logger.info("Segmentation") - control_image = self.process_segmentation(**preprocess_params_config) - - elif self.task_name == "depth": - logger.info("Depth") - control_image = self.process_depth(**preprocess_params_config) - - elif self.task_name == "normalbae": - logger.info("NormalBae") - control_image = self.process_normal(**preprocess_params_config) - - elif self.task_name == "lineart": - logger.info("Lineart") - control_image = self.process_lineart(**preprocess_params_config) - - elif self.task_name == "shuffle": - logger.info("Shuffle") - control_image = self.process_shuffle(**preprocess_params_config) - - elif self.task_name == "ip2p": - logger.info("Ip2p") - control_image = self.process_ip2p(**preprocess_params_config) - - elif self.task_name == "img2img": - preprocess_params_config["image"] = array_rgb - preprocess_params_config["image_resolution"] = image_resolution - init_image = self.process_img2img(**preprocess_params_config) - - # RUN Preprocess T2I for SDXL - if self.class_name == "StableDiffusionXLPipeline": - # Get params preprocess XL - preprocess_params_config_xl = {} - if self.task_name not in ["txt2img", "inpaint", "img2img"]: - preprocess_params_config_xl["image"] = array_rgb - preprocess_params_config_xl["preprocess_resolution"] = preprocess_resolution - preprocess_params_config_xl["image_resolution"] = image_resolution - # preprocess_params_config_xl["additional_prompt"] = additional_prompt # "" - - if self.task_name == "sdxl_canny": # preprocessor true default - logger.info("SDXL Canny: Preprocessor active by default") - control_image = self.process_canny( - **preprocess_params_config_xl, - low_threshold=low_threshold, - high_threshold=high_threshold, - ) - elif self.task_name == "sdxl_openpose": - logger.info("SDXL Openpose") - control_image = self.process_openpose( - preprocessor_name = "Openpose" if t2i_adapter_preprocessor else "None", - **preprocess_params_config_xl, - ) - elif self.task_name == "sdxl_sketch": - logger.info("SDXL Scribble") - control_image = self.process_scribble( - preprocessor_name = "PidiNet" if t2i_adapter_preprocessor else "None", - **preprocess_params_config_xl, - ) - elif self.task_name == "sdxl_depth-midas": - logger.info("SDXL Depth") - control_image = self.process_depth( - preprocessor_name = "Midas" if t2i_adapter_preprocessor else "None", - **preprocess_params_config_xl, - ) - elif self.task_name == "sdxl_lineart": - logger.info("SDXL Lineart") - control_image = self.process_lineart( - preprocessor_name = "Lineart" if t2i_adapter_preprocessor else "None", - **preprocess_params_config_xl, - ) - - # Get general params for TASK - if self.class_name == "StableDiffusionPipeline": - # Base params pipe sd - pipe_params_config = { - "prompt": None, # prompt, - "negative_prompt": None, # negative_prompt, - "prompt_embeds": prompt_emb, - "negative_prompt_embeds": negative_prompt_emb, - "num_images": num_images, - "num_steps": num_steps, - "guidance_scale": guidance_scale, - "clip_skip": None, # clip_skip, because we use clip skip of compel - } - else: - # Base params pipe sdxl - pipe_params_config = { - "prompt" : None, - "negative_prompt" : None, - "num_inference_steps" : num_steps, - "guidance_scale" : guidance_scale, - "clip_skip" : None, - "num_images_per_prompt" : num_images, - } - - # New params - if self.class_name == "StableDiffusionXLPipeline": - # pipe sdxl - if self.task_name == "txt2img": - pipe_params_config["height"] = img_height - pipe_params_config["width"] = img_width - elif self.task_name == "inpaint": - pipe_params_config["strength"] = strength - pipe_params_config["image"] = init_image - pipe_params_config["mask_image"] = control_mask - logger.info(f"Image resolution: {str(init_image.size)}") - elif self.task_name not in ["txt2img", "inpaint", "img2img"]: - pipe_params_config["image"] = control_image - pipe_params_config["adapter_conditioning_scale"] = t2i_adapter_conditioning_scale - pipe_params_config["adapter_conditioning_factor"] = t2i_adapter_conditioning_factor - logger.info(f"Image resolution: {str(control_image.size)}") - elif self.task_name == "img2img": - pipe_params_config["strength"] = strength - pipe_params_config["image"] = init_image - logger.info(f"Image resolution: {str(init_image.size)}") - elif self.task_name == "txt2img": - pipe_params_config["height"] = img_height - pipe_params_config["width"] = img_width - elif self.task_name == "inpaint": - pipe_params_config["strength"] = strength - pipe_params_config["init_image"] = init_image - pipe_params_config["control_mask"] = control_mask - pipe_params_config["control_image"] = control_image - pipe_params_config[ - "controlnet_conditioning_scale" - ] = controlnet_conditioning_scale - pipe_params_config["control_guidance_start"] = control_guidance_start - pipe_params_config["control_guidance_end"] = control_guidance_end - logger.info(f"Image resolution: {str(init_image.size)}") - elif self.task_name not in ["txt2img", "inpaint", "img2img"]: - pipe_params_config["control_image"] = control_image - pipe_params_config[ - "controlnet_conditioning_scale" - ] = controlnet_conditioning_scale - pipe_params_config["control_guidance_start"] = control_guidance_start - pipe_params_config["control_guidance_end"] = control_guidance_end - logger.info(f"Image resolution: {str(control_image.size)}") - elif self.task_name == "img2img": - pipe_params_config["strength"] = strength - pipe_params_config["init_image"] = init_image - logger.info(f"Image resolution: {str(init_image.size)}") - - # detailfix params and pipe global - if adetailer_A or adetailer_B: - - # global params detailfix - default_params_detailfix = { - "face_detector_ad" : True, - "person_detector_ad" : True, - "hand_detector_ad" : False, - "prompt": "", - "negative_prompt" : "", - "strength" : 0.35, - "mask_dilation" : 4, - "mask_blur" : 4, - "mask_padding" : 32, - #"sampler" : "Use same sampler", - #"inpaint_only" : True, - } - - # Pipe detailfix_pipe - if not hasattr(self, "detailfix_pipe") or not retain_detailfix_model_previous_load: - if adetailer_A_params.get("inpaint_only", False) == True or adetailer_B_params.get("inpaint_only", False) == True: - detailfix_pipe = custom_task_model_loader( - pipe=self.pipe, - model_category="detailfix", - task_name=self.task_name, - torch_dtype=self.type_model_precision - ) - else: - detailfix_pipe = custom_task_model_loader( - pipe=self.pipe, - model_category="detailfix_img2img", - task_name=self.task_name, - torch_dtype=self.type_model_precision - ) - if hasattr(self, "detailfix_pipe"): - del self.detailfix_pipe - if retain_detailfix_model_previous_load: - if hasattr(self, "detailfix_pipe"): - detailfix_pipe = self.detailfix_pipe - else: - self.detailfix_pipe = detailfix_pipe - adetailer_A_params.pop("inpaint_only", None) - adetailer_B_params.pop("inpaint_only", None) - - # Define base scheduler detailfix - detailfix_pipe.default_scheduler = copy.deepcopy(self.default_scheduler) - if adetailer_A_params.get("sampler", "Use same sampler") != "Use same sampler": - logger.debug("detailfix_pipe will use the sampler from adetailer_A") - detailfix_pipe.scheduler = self.get_scheduler(adetailer_A_params["sampler"]) - adetailer_A_params.pop("sampler", None) - if adetailer_B_params.get("sampler", "Use same sampler") != "Use same sampler": - logger.debug("detailfix_pipe will use the sampler from adetailer_B") - detailfix_pipe.scheduler = self.get_scheduler(adetailer_A_params["sampler"]) - adetailer_B_params.pop("sampler", None) - - detailfix_pipe.set_progress_bar_config(leave=leave_progress_bar) - detailfix_pipe.set_progress_bar_config(disable=disable_progress_bar) - detailfix_pipe.to(self.device) - torch.cuda.empty_cache() - gc.collect() - - if adetailer_A: - for key_param, default_value in default_params_detailfix.items(): - if key_param not in adetailer_A_params: - adetailer_A_params[key_param] = default_value - elif type(default_value) != type(adetailer_A_params[key_param]): - logger.warning(f"DetailFix A: Error type param, set default {str(key_param)}") - adetailer_A_params[key_param] = default_value - - detailfix_params_A = { - "prompt": adetailer_A_params["prompt"], - "negative_prompt" : adetailer_A_params["negative_prompt"], - "strength" : adetailer_A_params["strength"], - "num_inference_steps" : num_steps, - "guidance_scale" : guidance_scale, - } - - # clear params yolo - adetailer_A_params.pop('strength', None) - adetailer_A_params.pop('prompt', None) - adetailer_A_params.pop('negative_prompt', None) - - # Verify prompt detailfix_params_A and get valid - prompt_empty_detailfix_A, negative_prompt_empty_detailfix_A, prompt_df_A, negative_prompt_df_A = process_prompts_valid( - detailfix_params_A["prompt"], detailfix_params_A["negative_prompt"], prompt, negative_prompt - ) - - # Params detailfix - if self.class_name == "StableDiffusionPipeline": - # SD detailfix - # detailfix_params_A["controlnet_conditioning_scale"] = controlnet_conditioning_scale - # detailfix_params_A["control_guidance_start"] = control_guidance_start - # detailfix_params_A["control_guidance_end"] = control_guidance_end - - if prompt_empty_detailfix_A and negative_prompt_empty_detailfix_A: - detailfix_params_A["prompt_embeds"] = prompt_emb - detailfix_params_A["negative_prompt_embeds"] = negative_prompt_emb - else: - prompt_emb_ad, negative_prompt_emb_ad = self.create_prompt_embeds( - prompt=prompt_df_A, - negative_prompt=negative_prompt_df_A, - textual_inversion=textual_inversion, - clip_skip=clip_skip, - syntax_weights=syntax_weights, - ) - detailfix_params_A["prompt_embeds"] = prompt_emb_ad - detailfix_params_A["negative_prompt_embeds"] = negative_prompt_emb_ad - - detailfix_params_A["prompt"] = None - detailfix_params_A["negative_prompt"] = None - - else: - # SDXL detailfix - if prompt_empty_detailfix_A and negative_prompt_empty_detailfix_A: - conditioning_detailfix_A, pooled_detailfix_A = conditioning, pooled - else: - conditioning_detailfix_A, pooled_detailfix_A = self.create_prompt_embeds( - prompt=prompt_df_A, - negative_prompt=negative_prompt_df_A, - textual_inversion=textual_inversion, - clip_skip=clip_skip, - syntax_weights=syntax_weights, - ) - - detailfix_params_A.pop('prompt', None) - detailfix_params_A.pop('negative_prompt', None) - detailfix_params_A["prompt_embeds"] = conditioning_detailfix_A[0:1] - detailfix_params_A["pooled_prompt_embeds"] = pooled_detailfix_A[0:1] - detailfix_params_A["negative_prompt_embeds"] = conditioning_detailfix_A[1:2] - detailfix_params_A["negative_pooled_prompt_embeds"] = pooled_detailfix_A[1:2] - - logger.debug(f"detailfix A prompt empty {prompt_empty_detailfix_A, negative_prompt_empty_detailfix_A}") - if not prompt_empty_detailfix_A or not negative_prompt_empty_detailfix_A: - logger.debug(f"Prompts detailfix A {prompt_df_A, negative_prompt_df_A}") - logger.debug(f"Pipe params detailfix A \n{detailfix_params_A}") - logger.debug(f"Params detailfix A \n{adetailer_A_params}") - - if adetailer_B: - for key_param, default_value in default_params_detailfix.items(): - if key_param not in adetailer_B_params: - adetailer_B_params[key_param] = default_value - elif type(default_value) != type(adetailer_B_params[key_param]): - logger.warning(f"DetailfFix B: Error type param, set default {str(key_param)}") - adetailer_B_params[key_param] = default_value - - detailfix_params_B = { - "prompt": adetailer_B_params["prompt"], - "negative_prompt" : adetailer_B_params["negative_prompt"], - "strength" : adetailer_B_params["strength"], - "num_inference_steps" : num_steps, - "guidance_scale" : guidance_scale, - } - - # clear params yolo - adetailer_B_params.pop('strength', None) - adetailer_B_params.pop('prompt', None) - adetailer_B_params.pop('negative_prompt', None) - - # Verify prompt detailfix_params_B and get valid - prompt_empty_detailfix_B, negative_prompt_empty_detailfix_B, prompt_df_B, negative_prompt_df_B = process_prompts_valid( - detailfix_params_B["prompt"], detailfix_params_B["negative_prompt"], prompt, negative_prompt - ) - - # Params detailfix - if self.class_name == "StableDiffusionPipeline": - # SD detailfix - # detailfix_params_B["controlnet_conditioning_scale"] = controlnet_conditioning_scale - # detailfix_params_B["control_guidance_start"] = control_guidance_start - # detailfix_params_B["control_guidance_end"] = control_guidance_end - - if prompt_empty_detailfix_B and negative_prompt_empty_detailfix_B: - detailfix_params_B["prompt_embeds"] = prompt_emb - detailfix_params_B["negative_prompt_embeds"] = negative_prompt_emb - else: - prompt_emb_ad_b, negative_prompt_emb_ad_b = self.create_prompt_embeds( - prompt=prompt_df_B, - negative_prompt=negative_prompt_df_B, - textual_inversion=textual_inversion, - clip_skip=clip_skip, - syntax_weights=syntax_weights, - ) - detailfix_params_B["prompt_embeds"] = prompt_emb_ad_b - detailfix_params_B["negative_prompt_embeds"] = negative_prompt_emb_ad_b - detailfix_params_B["prompt"] = None - detailfix_params_B["negative_prompt"] = None - else: - # SDXL detailfix - if prompt_empty_detailfix_B and negative_prompt_empty_detailfix_B: - conditioning_detailfix_B, pooled_detailfix_B = conditioning, pooled - else: - conditioning_detailfix_B, pooled_detailfix_B = self.create_prompt_embeds( - prompt=prompt_df_B, - negative_prompt=negative_prompt_df_B, - textual_inversion=textual_inversion, - clip_skip=clip_skip, - syntax_weights=syntax_weights, - ) - detailfix_params_B.pop('prompt', None) - detailfix_params_B.pop('negative_prompt', None) - detailfix_params_B["prompt_embeds"] = conditioning_detailfix_B[0:1] - detailfix_params_B["pooled_prompt_embeds"] = pooled_detailfix_B[0:1] - detailfix_params_B["negative_prompt_embeds"] = conditioning_detailfix_B[1:2] - detailfix_params_B["negative_pooled_prompt_embeds"] = pooled_detailfix_B[1:2] - - logger.debug(f"detailfix B prompt empty {prompt_empty_detailfix_B, negative_prompt_empty_detailfix_B}") - if not prompt_empty_detailfix_B or not negative_prompt_empty_detailfix_B: - logger.debug(f"Prompts detailfix B {prompt_df_B, negative_prompt_df_B}") - logger.debug(f"Pipe params detailfix B \n{detailfix_params_B}") - logger.debug(f"Params detailfix B \n{adetailer_B_params}") - - if hires_steps > 1 and upscaler_model_path != None: - # Hires params BASE - hires_params_config = { - "prompt" : None, - "negative_prompt" : None, - "num_inference_steps" : hires_steps, - "guidance_scale" : guidance_scale, - "clip_skip" : None, - "strength" : hires_denoising_strength, - } - if self.class_name == "StableDiffusionPipeline": - hires_params_config["eta"] = 1.0 - - # Verify prompt hires and get valid - hires_prompt_empty, hires_negative_prompt_empty, prompt_hires_valid, negative_prompt_hires_valid = process_prompts_valid( - hires_prompt, hires_negative_prompt, prompt, negative_prompt - ) - - # Hires embed params - if self.class_name == "StableDiffusionPipeline": - if hires_prompt_empty and hires_negative_prompt_empty: - hires_params_config["prompt_embeds"] = prompt_emb - hires_params_config["negative_prompt_embeds"] = negative_prompt_emb - else: - prompt_emb_hires, negative_prompt_emb_hires = self.create_prompt_embeds( - prompt=prompt_hires_valid, - negative_prompt=negative_prompt_hires_valid, - textual_inversion=textual_inversion, - clip_skip=clip_skip, - syntax_weights=syntax_weights, - ) - - hires_params_config["prompt_embeds"] = prompt_emb_hires - hires_params_config["negative_prompt_embeds"] = negative_prompt_emb_hires - else: - if hires_prompt_empty and hires_negative_prompt_empty: - hires_conditioning, hires_pooled = conditioning, pooled - else: - hires_conditioning, hires_pooled = self.create_prompt_embeds( - prompt=prompt_hires_valid, - negative_prompt=negative_prompt_hires_valid, - textual_inversion=textual_inversion, - clip_skip=clip_skip, - syntax_weights=syntax_weights, - ) - - hires_params_config.pop('prompt', None) - hires_params_config.pop('negative_prompt', None) - hires_params_config["prompt_embeds"] = hires_conditioning[0:1] - hires_params_config["pooled_prompt_embeds"] = hires_pooled[0:1] - hires_params_config["negative_prompt_embeds"] = hires_conditioning[1:2] - hires_params_config["negative_pooled_prompt_embeds"] = hires_pooled[1:2] - - # Hires pipe - if not hasattr(self, "hires_pipe") or not retain_hires_model_previous_load: - hires_pipe = custom_task_model_loader( - pipe=self.pipe, - model_category="hires", - task_name=self.task_name, - torch_dtype=self.type_model_precision - ) - if hasattr(self, "hires_pipe"): - del self.hires_pipe - if retain_hires_model_previous_load: - if hasattr(self, "hires_pipe"): - hires_pipe = self.hires_pipe - else: - self.hires_pipe = hires_pipe - - # Hires scheduler - if hires_sampler != "Use same sampler": - logger.debug("New hires sampler") - hires_pipe.scheduler = self.get_scheduler(hires_sampler) - - hires_pipe.set_progress_bar_config(leave=leave_progress_bar) - hires_pipe.set_progress_bar_config(disable=disable_progress_bar) - hires_pipe.to(self.device) - torch.cuda.empty_cache() - gc.collect() - else: - hires_params_config = {} - hires_pipe = None - - # Debug info - try: - logger.debug(f"INFO PIPE: {self.pipe.__class__.__name__}") - logger.debug(f"text_encoder_type: {self.pipe.text_encoder.dtype}") - logger.debug(f"unet_type: {self.pipe.unet.dtype}") - logger.debug(f"vae_type: {self.pipe.vae.dtype}") - logger.debug(f"pipe_type: {self.pipe.dtype}") - logger.debug(f"scheduler_main_pipe: {self.pipe.scheduler}") - if adetailer_A or adetailer_B: - logger.debug(f"scheduler_detailfix: {detailfix_pipe.scheduler}") - if hires_steps > 1 and upscaler_model_path != None: - logger.debug(f"scheduler_hires: {hires_pipe.scheduler}") - except Exception as e: - logger.debug(f"{str(e)}") - - # === RUN PIPE === # - for i in range(loop_generation): - - # number seed - if seed == -1: - seeds = [random.randint(0, 2147483647) for _ in range(num_images)] - else: - if num_images == 1: - seeds = [seed] - else: - seeds = [seed] + [random.randint(0, 2147483647) for _ in range(num_images-1)] - - # generators - generators = [] # List to store all the generators - for calculate_seed in seeds: - if generator_in_cpu or self.device.type == "cpu": - generator = torch.Generator().manual_seed(calculate_seed) - else: - try: - generator = torch.Generator("cuda").manual_seed(calculate_seed) - except: - logger.warning("Generator in CPU") - generator = torch.Generator().manual_seed(calculate_seed) - - generators.append(generator) - - # fix img2img bug need concat tensor prompts with generator same number (only in batch inference) - pipe_params_config["generator"] = generators if self.task_name != "img2img" else generators[0] # no list - seeds = seeds if self.task_name != "img2img" else [seeds[0]] * num_images - - try: - if self.class_name == "StableDiffusionXLPipeline": - # sdxl pipe - images = self.pipe( - prompt_embeds=conditioning[0:1], - pooled_prompt_embeds=pooled[0:1], - negative_prompt_embeds=conditioning[1:2], - negative_pooled_prompt_embeds=pooled[1:2], - #generator=pipe_params_config["generator"], - **pipe_params_config, - ).images - if self.task_name not in ["txt2img", "inpaint", "img2img"]: - images = [control_image] + images - elif self.task_name == "txt2img": - images = self.run_pipe_SD(**pipe_params_config) - elif self.task_name == "inpaint": - images = self.run_pipe_inpaint(**pipe_params_config) - elif self.task_name not in ["txt2img", "inpaint", "img2img"]: - results = self.run_pipe( - **pipe_params_config - ) ## pipe ControlNet add condition_weights - images = [control_image] + results - del results - elif self.task_name == "img2img": - images = self.run_pipe_img2img(**pipe_params_config) - except Exception as e: - e = str(e) - if "Tensor with 2 elements cannot be converted to Scalar" in e: - logger.debug(e) - logger.error("Error in sampler; trying with DDIM sampler") - self.pipe.scheduler = self.default_scheduler - self.pipe.scheduler = DDIMScheduler.from_config(self.pipe.scheduler.config) - if self.class_name == "StableDiffusionXLPipeline": - # sdxl pipe - images = self.pipe( - prompt_embeds=conditioning[0:1], - pooled_prompt_embeds=pooled[0:1], - negative_prompt_embeds=conditioning[1:2], - negative_pooled_prompt_embeds=pooled[1:2], - #generator=pipe_params_config["generator"], - **pipe_params_config, - ).images - if self.task_name not in ["txt2img", "inpaint", "img2img"]: - images = [control_image] + images - elif self.task_name == "txt2img": - images = self.run_pipe_SD(**pipe_params_config) - elif self.task_name == "inpaint": - images = self.run_pipe_inpaint(**pipe_params_config) - elif self.task_name not in ["txt2img", "inpaint", "img2img"]: - results = self.run_pipe( - **pipe_params_config - ) ## pipe ControlNet add condition_weights - images = [control_image] + results - del results - elif self.task_name == "img2img": - images = self.run_pipe_img2img(**pipe_params_config) - elif "The size of tensor a (0) must match the size of tensor b (3) at non-singleton" in e: - raise ValueError(f"steps / strength too low for the model to produce a satisfactory response") - else: - raise ValueError(e) - - torch.cuda.empty_cache() - gc.collect() - - if hires_before_adetailer and upscaler_model_path != None: - logger.debug(f"Hires before; same seed for each image (no batch)") - images = process_images_high_resolution( - images, - upscaler_model_path, - upscaler_increases_size, - esrgan_tile, esrgan_tile_overlap, - hires_steps, hires_params_config, - self.task_name, - generators[0], #pipe_params_config["generator"][0], # no generator - hires_pipe, - ) - - # Adetailer stuff - if adetailer_A or adetailer_B: - # image_pil_list = [] - # for img_single in images: - # image_ad = img_single.convert("RGB") - # image_pil_list.append(image_ad) - if self.task_name not in ["txt2img", "inpaint", "img2img"]: - images = images[1:] - - if adetailer_A: - images = ad_model_process( - pipe_params_df=detailfix_params_A, - detailfix_pipe=detailfix_pipe, - image_list_task=images, - **adetailer_A_params, - ) - if adetailer_B: - images = ad_model_process( - pipe_params_df=detailfix_params_B, - detailfix_pipe=detailfix_pipe, - image_list_task=images, - **adetailer_B_params, - ) - - if self.task_name not in ["txt2img", "inpaint", "img2img"]: - images = [control_image] + images - # del detailfix_pipe - torch.cuda.empty_cache() - gc.collect() - - if hires_after_adetailer and upscaler_model_path != None: - logger.debug(f"Hires after; same seed for each image (no batch)") - images = process_images_high_resolution( - images, - upscaler_model_path, - upscaler_increases_size, - esrgan_tile, esrgan_tile_overlap, - hires_steps, hires_params_config, - self.task_name, - generators[0], #pipe_params_config["generator"][0], # no generator - hires_pipe, - ) - - logger.info(f"Seeds: {seeds}") - - # Show images if loop - if display_images: - mediapy.show_images(images) - # logger.info(image_list) - # del images - if loop_generation > 1: - time.sleep(0.5) - - # List images and save - image_list = [] - metadata = [ - prompt, - negative_prompt, - self.base_model_id, - self.vae_model, - num_steps, - guidance_scale, - sampler, - 0000000000, #calculate_seed, - img_width, - img_height, - clip_skip, - ] - - valid_seeds = [0] + seeds if self.task_name not in ["txt2img", "inpaint", "img2img"] else seeds - for image_, seed_ in zip(images, valid_seeds): - image_path = "not saved in storage" - if save_generated_images: - metadata[7] = seed_ - image_path = save_pil_image_with_metadata(image_, image_storage_location, metadata) - image_list.append(image_path) - - torch.cuda.empty_cache() - gc.collect() - - if image_list[0] != "not saved in storage": - logger.info(image_list) - - if hasattr(self, "compel") and not retain_compel_previous_load: - del self.compel - torch.cuda.empty_cache() - gc.collect() - - return images, image_list