import importlib import inspect import math from pathlib import Path import re from collections import defaultdict import cv2 import time import k_diffusion import numpy as np import PIL import torch import torch.nn as nn import torch.nn.functional as F from einops import rearrange from .external_k_diffusion import CompVisDenoiser, CompVisVDenoiser from torch import einsum from torch.autograd.function import Function from diffusers.callbacks import MultiPipelineCallbacks, PipelineCallback from diffusers import DiffusionPipeline from diffusers.utils import PIL_INTERPOLATION, is_accelerate_available, logging from diffusers.utils.torch_utils import randn_tensor,is_compiled_module,is_torch_version from diffusers.image_processor import VaeImageProcessor,PipelineImageInput from safetensors.torch import load_file from diffusers import ControlNetModel from PIL import Image import torchvision.transforms as transforms from diffusers import StableDiffusionPipeline,StableDiffusionControlNetPipeline,StableDiffusionControlNetImg2ImgPipeline,StableDiffusionImg2ImgPipeline,StableDiffusionInpaintPipeline,StableDiffusionControlNetInpaintPipeline from typing import Any, Callable, Dict, List, Optional, Union from diffusers.pipelines.stable_diffusion import StableDiffusionPipelineOutput from transformers import CLIPImageProcessor, CLIPTextModel, CLIPTokenizer from diffusers import AutoencoderKL, LMSDiscreteScheduler from .u_net_condition_modify import UNet2DConditionModel from diffusers.models.lora import adjust_lora_scale_text_encoder from diffusers.models import AutoencoderKL, ImageProjection,AsymmetricAutoencoderKL from diffusers.schedulers import KarrasDiffusionSchedulers from diffusers.utils import ( USE_PEFT_BACKEND, deprecate, logging, replace_example_docstring, scale_lora_layers, unscale_lora_layers, ) from diffusers.loaders import FromSingleFileMixin, LoraLoaderMixin, TextualInversionLoaderMixin from diffusers.pipelines.stable_diffusion.safety_checker import StableDiffusionSafetyChecker from diffusers.pipelines.pipeline_utils import DiffusionPipeline from packaging import version from diffusers.configuration_utils import FrozenDict from diffusers.pipelines.controlnet.multicontrolnet import MultiControlNetModel from .ip_adapter import IPAdapterMixin from .t2i_adapter import preprocessing_t2i_adapter,default_height_width from .encoder_prompt_modify import encode_prompt_function from .encode_region_map_function import encode_region_map def get_image_size(image): height, width = None, None if isinstance(image, Image.Image): return image.size elif isinstance(image, np.ndarray): height, width = image.shape[:2] return (width, height) elif torch.is_tensor(image): #RGB image if len(image.shape) == 3: _, height, width = image.shape else: height, width = image.shape return (width, height) else: raise TypeError("The image must be an instance of PIL.Image, numpy.ndarray, or torch.Tensor.") #Get id token of text at present only support for batch_size = 1 because prompt is a string ("For easy to handle") #Class_name is the name of the class for example StableDiffusion def get_id_text(class_name,prompt,max_length,negative_prompt = None,prompt_embeds: Optional[torch.Tensor] = None,negative_prompt_embeds: Optional[torch.Tensor] = None): #Check prompt_embeds is None -> not using prompt as input if prompt_embeds is not None or negative_prompt_embeds is not None : return None,None if prompt is not None and isinstance(prompt, str): batch_size = 1 elif prompt is not None and isinstance(prompt, list): batch_size = len(prompt) else: batch_size = prompt_embeds.shape[0] if isinstance(class_name, TextualInversionLoaderMixin): prompt = class_name.maybe_convert_prompt(prompt, class_name.tokenizer) text_inputs = class_name.tokenizer( prompt, padding="max_length", max_length=class_name.tokenizer.model_max_length, truncation=True, return_tensors="pt", ) text_input_ids = text_inputs.input_ids.detach().cpu().numpy() uncond_tokens: List[str] if negative_prompt is None: uncond_tokens = [""] * batch_size elif prompt is not None and type(prompt) is not type(negative_prompt): raise TypeError( f"`negative_prompt` should be the same type to `prompt`, but got {type(negative_prompt)} !=" f" {type(prompt)}." ) elif isinstance(negative_prompt, str): uncond_tokens = [negative_prompt] elif batch_size != len(negative_prompt): raise ValueError( f"`negative_prompt`: {negative_prompt} has batch size {len(negative_prompt)}, but `prompt`:" f" {prompt} has batch size {batch_size}. Please make sure that passed `negative_prompt` matches" " the batch size of `prompt`." ) else: uncond_tokens = negative_prompt # textual inversion: procecss multi-vector tokens if necessary if isinstance(class_name, TextualInversionLoaderMixin): uncond_tokens = class_name.maybe_convert_prompt(uncond_tokens, class_name.tokenizer) uncond_input = class_name.tokenizer( uncond_tokens, padding="max_length", max_length=max_length, truncation=True, return_tensors="pt", ) uncond_input_ids = uncond_input.input_ids.detach().cpu().numpy() if batch_size == 1: return text_input_ids.reshape((1,-1)),uncond_input_ids.reshape((1,-1)) return text_input_ids,uncond_input_ids # from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.rescale_noise_cfg def rescale_noise_cfg(noise_cfg, noise_pred_text, guidance_rescale=0.0): """ Rescale `noise_cfg` according to `guidance_rescale`. Based on findings of [Common Diffusion Noise Schedules and Sample Steps are Flawed](https://arxiv.org/pdf/2305.08891.pdf). See Section 3.4 """ std_text = noise_pred_text.std(dim=list(range(1, noise_pred_text.ndim)), keepdim=True) std_cfg = noise_cfg.std(dim=list(range(1, noise_cfg.ndim)), keepdim=True) # rescale the results from guidance (fixes overexposure) noise_pred_rescaled = noise_cfg * (std_text / std_cfg) # mix with the original results from guidance by factor guidance_rescale to avoid "plain looking" images noise_cfg = guidance_rescale * noise_pred_rescaled + (1 - guidance_rescale) * noise_cfg return noise_cfg def retrieve_timesteps( scheduler, num_inference_steps: Optional[int] = None, device: Optional[Union[str, torch.device]] = None, timesteps: Optional[List[int]] = None, sigmas: Optional[List[float]] = None, **kwargs, ): """ Calls the scheduler's `set_timesteps` method and retrieves timesteps from the scheduler after the call. Handles custom timesteps. Any kwargs will be supplied to `scheduler.set_timesteps`. Args: scheduler (`SchedulerMixin`): The scheduler to get timesteps from. num_inference_steps (`int`): The number of diffusion steps used when generating samples with a pre-trained model. If used, `timesteps` must be `None`. device (`str` or `torch.device`, *optional*): The device to which the timesteps should be moved to. If `None`, the timesteps are not moved. timesteps (`List[int]`, *optional*): Custom timesteps used to override the timestep spacing strategy of the scheduler. If `timesteps` is passed, `num_inference_steps` and `sigmas` must be `None`. sigmas (`List[float]`, *optional*): Custom sigmas used to override the timestep spacing strategy of the scheduler. If `sigmas` is passed, `num_inference_steps` and `timesteps` must be `None`. Returns: `Tuple[torch.Tensor, int]`: A tuple where the first element is the timestep schedule from the scheduler and the second element is the number of inference steps. """ if timesteps is not None and sigmas is not None: raise ValueError("Only one of `timesteps` or `sigmas` can be passed. Please choose one to set custom values") if timesteps is not None: accepts_timesteps = "timesteps" in set(inspect.signature(scheduler.set_timesteps).parameters.keys()) if not accepts_timesteps: raise ValueError( f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom" f" timestep schedules. Please check whether you are using the correct scheduler." ) scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs) timesteps = scheduler.timesteps num_inference_steps = len(timesteps) elif sigmas is not None: accept_sigmas = "sigmas" in set(inspect.signature(scheduler.set_timesteps).parameters.keys()) if not accept_sigmas: raise ValueError( f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom" f" sigmas schedules. Please check whether you are using the correct scheduler." ) scheduler.set_timesteps(sigmas=sigmas, device=device, **kwargs) timesteps = scheduler.timesteps num_inference_steps = len(timesteps) else: scheduler.set_timesteps(num_inference_steps, device=device, **kwargs) timesteps = scheduler.timesteps return timesteps, num_inference_steps class StableDiffusionPipeline_finetune(IPAdapterMixin,StableDiffusionPipeline): def type_output(self,output_type,device,d_type,return_dict,latents,generator): if not output_type == "latent": image = self.vae.decode(latents / self.vae.config.scaling_factor, return_dict=False,generator=generator)[0] image, has_nsfw_concept = self.run_safety_checker(image, device, d_type) else: image = latents has_nsfw_concept = None if has_nsfw_concept is None: do_denormalize = [True] * image.shape[0] else: do_denormalize = [not has_nsfw for has_nsfw in has_nsfw_concept] image = self.image_processor.postprocess(image, output_type=output_type, do_denormalize=do_denormalize) # Offload all models self.maybe_free_model_hooks() if not return_dict: return (image, has_nsfw_concept) return StableDiffusionPipelineOutput(images=image, nsfw_content_detected=has_nsfw_concept) @torch.no_grad() def __call__( self, prompt: Union[str, List[str]] = None, height: Optional[int] = None, width: Optional[int] = None, num_inference_steps: int = 50, timesteps: List[int] = None, sigmas: List[float] = None, guidance_scale: float = 7.5, negative_prompt: Optional[Union[str, List[str]]] = None, num_images_per_prompt: Optional[int] = 1, eta: float = 0.0, generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None, latents: Optional[torch.Tensor] = None, prompt_embeds: Optional[torch.Tensor] = None, negative_prompt_embeds: Optional[torch.Tensor] = None, ip_adapter_image: Optional[PipelineImageInput] = None, ip_adapter_image_embeds: Optional[List[torch.Tensor]] = None, output_type: Optional[str] = "pil", return_dict: bool = True, #callback: Optional[Callable[[int, int, torch.Tensor], None]] = None, #callback_steps: int = 1, cross_attention_kwargs: Optional[Dict[str, Any]] = None, guidance_rescale: float = 0.0, clip_skip: Optional[int] = 0, callback_on_step_end: Optional[ Union[Callable[[int, int, Dict], None], PipelineCallback, MultiPipelineCallbacks] ] = None, callback_on_step_end_tensor_inputs: List[str] = ["latents"], region_map_state=None, weight_func = lambda w, sigma, qk: w * sigma * qk.std(), latent_processing = 0, image_t2i_adapter : Optional[PipelineImageInput] = None, adapter_conditioning_scale: Union[float, List[float]] = 1.0, adapter_conditioning_factor: float = 1.0, long_encode: int = 0, **kwargs, ): callback = kwargs.pop("callback", None) callback_steps = kwargs.pop("callback_steps", None) if callback is not None: deprecate( "callback", "1.0.0", "Passing `callback` as an input argument to `__call__` is deprecated, consider using `callback_on_step_end`", ) if callback_steps is not None: deprecate( "callback_steps", "1.0.0", "Passing `callback_steps` as an input argument to `__call__` is deprecated, consider using `callback_on_step_end`", ) if isinstance(callback_on_step_end, (PipelineCallback, MultiPipelineCallbacks)): callback_on_step_end_tensor_inputs = callback_on_step_end.tensor_inputs # 0. Default height and width to unet height = height or self.unet.config.sample_size * self.vae_scale_factor width = width or self.unet.config.sample_size * self.vae_scale_factor # to deal with lora scaling and other possible forward hooks # 1. Check inputs. Raise error if not correct self.check_inputs( prompt, height, width, callback_steps, negative_prompt, prompt_embeds, negative_prompt_embeds, ip_adapter_image, ip_adapter_image_embeds, callback_on_step_end_tensor_inputs, ) self._guidance_scale = guidance_scale self._guidance_rescale = guidance_rescale self._clip_skip = clip_skip self._cross_attention_kwargs = cross_attention_kwargs self._interrupt = False adapter_state = None if image_t2i_adapter is not None: height, width = default_height_width(self,height, width, image_t2i_adapter) adapter_state = preprocessing_t2i_adapter(self,image_t2i_adapter,width,height,adapter_conditioning_scale,num_images_per_prompt) # 2. Define call parameters if prompt is not None and isinstance(prompt, str): batch_size = 1 elif prompt is not None and isinstance(prompt, list): batch_size = len(prompt) else: batch_size = prompt_embeds.shape[0] device = self._execution_device # here `guidance_scale` is defined analog to the guidance weight `w` of equation (2) # of the Imagen paper: https://arxiv.org/pdf/2205.11487.pdf . `guidance_scale = 1` # corresponds to doing no classifier free guidance. do_classifier_free_guidance = guidance_scale > 1.0 # 3. Encode input prompt lora_scale = ( self.cross_attention_kwargs.get("scale", None) if self.cross_attention_kwargs is not None else None ) #print(type(negative_prompt)) #print(type(prompt)) '''if negative_prompt is None: negative_prompt = '' if prompt is None: prompt =''' #text_ids, text_embeddings = self.prompt_parser([negative_prompt, prompt]) #text_embeddings = text_embeddings.to(self.unet.dtype) #print(text_embeddings) #Copy prompt_embed of input for support get token_id prompt_embeds_copy = None negative_prompt_embeds_copy = None if prompt_embeds is not None: prompt_embeds_copy = prompt_embeds.clone().detach() if negative_prompt_embeds is not None: negative_prompt_embeds_copy = negative_prompt_embeds.clone().detach() prompt_embeds, negative_prompt_embeds,text_input_ids = encode_prompt_function( self, prompt, device, num_images_per_prompt, self.do_classifier_free_guidance, negative_prompt, prompt_embeds=prompt_embeds, negative_prompt_embeds=negative_prompt_embeds, lora_scale=lora_scale, clip_skip=self.clip_skip, long_encode = long_encode, ) #Get token_id #text_input_ids,uncond_input_ids = get_id_text(self,prompt,max_length = prompt_embeds.shape[1],negative_prompt = negative_prompt,prompt_embeds = prompt_embeds_copy,negative_prompt_embeds = negative_prompt_embeds_copy) # For classifier free guidance, we need to do two forward passes. # Here we concatenate the unconditional and text embeddings into a single batch # to avoid doing two forward passes '''if text_input_ids is not None: text_input_ids = np.concatenate([uncond_input_ids, text_input_ids])''' if self.do_classifier_free_guidance: prompt_embeds = torch.cat([negative_prompt_embeds, prompt_embeds]) if ip_adapter_image is not None or ip_adapter_image_embeds is not None: image_embeds = self.prepare_ip_adapter_image_embeds( ip_adapter_image, ip_adapter_image_embeds, device, batch_size * num_images_per_prompt, self.do_classifier_free_guidance, ) # 4. Prepare timesteps #print(prompt_embeds) timesteps, num_inference_steps = retrieve_timesteps( self.scheduler, num_inference_steps, device, timesteps, sigmas ) #4.1 Prepare region region_state = encode_region_map( self, region_map_state, width = width, height = height, num_images_per_prompt = num_images_per_prompt, text_ids=text_input_ids, ) if self.cross_attention_kwargs is None: self._cross_attention_kwargs ={} # 5. Prepare latent variables num_channels_latents = self.unet.config.in_channels latents = self.prepare_latents( batch_size * num_images_per_prompt, num_channels_latents, height, width, prompt_embeds.dtype, device, generator, latents, ) lst_latent = [] if latent_processing == 1: lst_latent = [self.type_output("pil",device,prompt_embeds.dtype,return_dict,latents,generator).images[0]] # 6. Prepare extra step kwargs. TODO: Logic should ideally just be moved out of the pipeline extra_step_kwargs = self.prepare_extra_step_kwargs(generator, eta) # 6.1 Add image embeds for IP-Adapter added_cond_kwargs = ( {"image_embeds": image_embeds} if (ip_adapter_image is not None or ip_adapter_image_embeds is not None) else None ) # 6.2 Optionally get Guidance Scale Embedding timestep_cond = None if self.unet.config.time_cond_proj_dim is not None: guidance_scale_tensor = torch.tensor(self.guidance_scale - 1).repeat(batch_size * num_images_per_prompt) timestep_cond = self.get_guidance_scale_embedding( guidance_scale_tensor, embedding_dim=self.unet.config.time_cond_proj_dim ).to(device=device, dtype=latents.dtype) #print(self.scheduler.sigmas) #print(len(self.scheduler.sigmas)) #values, indices = torch.sort(self.scheduler.sigmas, descending=True) #print(self.scheduler.sigmas) # 7. Denoising loop num_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.order self._num_timesteps = len(timesteps) with self.progress_bar(total=num_inference_steps) as progress_bar: #step_x = 0 for i, t in enumerate(timesteps): if self.interrupt: continue # expand the latents if we are doing classifier free guidance latent_model_input = torch.cat([latents] * 2) if self.do_classifier_free_guidance else latents latent_model_input = self.scheduler.scale_model_input(latent_model_input, t) #print(self.scheduler.sigmas[step_x]) region_prompt = { "region_state": region_state, "sigma": self.scheduler.sigmas[i], "weight_func": weight_func, } self._cross_attention_kwargs["region_prompt"] = region_prompt #print(t) #step_x=step_x+1 #tensor_data = {k: torch.Tensor(v) for k, v in encoder_state.items()} # predict the noise residual down_intrablock_additional_residuals = None if adapter_state is not None: if i < int(num_inference_steps * adapter_conditioning_factor): down_intrablock_additional_residuals = [state.clone() for state in adapter_state] else: down_intrablock_additional_residuals = None noise_pred = self.unet( latent_model_input, t, encoder_hidden_states=prompt_embeds, timestep_cond=timestep_cond, cross_attention_kwargs=self.cross_attention_kwargs, down_intrablock_additional_residuals = down_intrablock_additional_residuals, added_cond_kwargs=added_cond_kwargs, return_dict=False, )[0] # perform guidance if self.do_classifier_free_guidance: noise_pred_uncond, noise_pred_text = noise_pred.chunk(2) noise_pred = noise_pred_uncond + self.guidance_scale * (noise_pred_text - noise_pred_uncond) if self.do_classifier_free_guidance and self.guidance_rescale > 0.0: # Based on 3.4. in https://arxiv.org/pdf/2305.08891.pdf noise_pred = rescale_noise_cfg(noise_pred, noise_pred_text, guidance_rescale=self.guidance_rescale) # compute the previous noisy sample x_t -> x_t-1 latents = self.scheduler.step(noise_pred, t, latents, **extra_step_kwargs, return_dict=False)[0] if latent_processing == 1: lst_latent.append(self.type_output("pil",device,prompt_embeds.dtype,return_dict,latents,generator).images[0]) if callback_on_step_end is not None: callback_kwargs = {} for k in callback_on_step_end_tensor_inputs: callback_kwargs[k] = locals()[k] callback_outputs = callback_on_step_end(self, i, t, callback_kwargs) latents = callback_outputs.pop("latents", latents) prompt_embeds = callback_outputs.pop("prompt_embeds", prompt_embeds) negative_prompt_embeds = callback_outputs.pop("negative_prompt_embeds", negative_prompt_embeds) # call the callback, if provided if i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0): progress_bar.update() if callback is not None and i % callback_steps == 0: step_idx = i // getattr(self.scheduler, "order", 1) callback(step_idx, t, latents) torch.cuda.empty_cache() if latent_processing == 1: if output_type == 'latent': lst_latent.append(self.type_output(output_type,device,prompt_embeds.dtype,return_dict,latents,generator).images[0]) return lst_latent if output_type == 'latent': return [self.type_output("pil",device,prompt_embeds.dtype,return_dict,latents,generator).images[0],self.type_output(output_type,device,prompt_embeds.dtype,return_dict,latents,generator).images[0]] return [self.type_output(output_type,device,prompt_embeds.dtype,return_dict,latents,generator).images[0]] class StableDiffusionControlNetPipeline_finetune(IPAdapterMixin,StableDiffusionControlNetPipeline): def type_output(self,output_type,device,d_type,return_dict,latents,generator): if not output_type == "latent": image = self.vae.decode(latents / self.vae.config.scaling_factor, return_dict=False,generator=generator)[0] image, has_nsfw_concept = self.run_safety_checker(image, device, d_type) else: image = latents has_nsfw_concept = None if has_nsfw_concept is None: do_denormalize = [True] * image.shape[0] else: do_denormalize = [not has_nsfw for has_nsfw in has_nsfw_concept] image = self.image_processor.postprocess(image, output_type=output_type, do_denormalize=do_denormalize) # Offload all models self.maybe_free_model_hooks() if not return_dict: return (image, has_nsfw_concept) return StableDiffusionPipelineOutput(images=image, nsfw_content_detected=has_nsfw_concept) @torch.no_grad() def __call__( self, prompt: Union[str, List[str]] = None, image: PipelineImageInput = None, height: Optional[int] = None, width: Optional[int] = None, num_inference_steps: int = 50, timesteps: List[int] = None, sigmas: List[float] = None, guidance_scale: float = 7.5, negative_prompt: Optional[Union[str, List[str]]] = None, num_images_per_prompt: Optional[int] = 1, eta: float = 0.0, generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None, latents: Optional[torch.Tensor] = None, prompt_embeds: Optional[torch.Tensor] = None, negative_prompt_embeds: Optional[torch.Tensor] = None, ip_adapter_image: Optional[PipelineImageInput] = None, ip_adapter_image_embeds: Optional[List[torch.Tensor]] = None, output_type: Optional[str] = "pil", return_dict: bool = True, guidance_rescale: float = 0.0, #callback: Optional[Callable[[int, int, torch.Tensor], None]] = None, #callback_steps: int = 1, cross_attention_kwargs: Optional[Dict[str, Any]] = None, controlnet_conditioning_scale: Union[float, List[float]] = 1.0, guess_mode: bool = False, control_guidance_start: Union[float, List[float]] = 0.0, control_guidance_end: Union[float, List[float]] = 1.0, clip_skip: Optional[int] = 0, callback_on_step_end: Optional[ Union[Callable[[int, int, Dict], None], PipelineCallback, MultiPipelineCallbacks] ] = None, callback_on_step_end_tensor_inputs: List[str] = ["latents"], region_map_state=None, weight_func = lambda w, sigma, qk: w * sigma * qk.std(), latent_processing = 0, image_t2i_adapter : Optional[PipelineImageInput] = None, adapter_conditioning_scale: Union[float, List[float]] = 1.0, adapter_conditioning_factor: float = 1.0, long_encode: int = 0, **kwargs, ): callback = kwargs.pop("callback", None) callback_steps = kwargs.pop("callback_steps", None) if callback is not None: deprecate( "callback", "1.0.0", "Passing `callback` as an input argument to `__call__` is deprecated, consider using `callback_on_step_end`", ) if callback_steps is not None: deprecate( "callback_steps", "1.0.0", "Passing `callback_steps` as an input argument to `__call__` is deprecated, consider using `callback_on_step_end`", ) if isinstance(callback_on_step_end, (PipelineCallback, MultiPipelineCallbacks)): callback_on_step_end_tensor_inputs = callback_on_step_end.tensor_inputs controlnet = self.controlnet._orig_mod if is_compiled_module(self.controlnet) else self.controlnet # align format for control guidance if not isinstance(control_guidance_start, list) and isinstance(control_guidance_end, list): control_guidance_start = len(control_guidance_end) * [control_guidance_start] elif not isinstance(control_guidance_end, list) and isinstance(control_guidance_start, list): control_guidance_end = len(control_guidance_start) * [control_guidance_end] elif not isinstance(control_guidance_start, list) and not isinstance(control_guidance_end, list): mult = len(controlnet.nets) if isinstance(controlnet, MultiControlNetModel) else 1 control_guidance_start, control_guidance_end = ( mult * [control_guidance_start], mult * [control_guidance_end], ) if height is None: _,height = get_image_size(image) height = int((height // 8)*8) if width is None: width,_ = get_image_size(image) width = int((width // 8)*8) # 1. Check inputs. Raise error if not correct self.check_inputs( prompt, image, callback_steps, negative_prompt, prompt_embeds, negative_prompt_embeds, ip_adapter_image, ip_adapter_image_embeds, controlnet_conditioning_scale, control_guidance_start, control_guidance_end, callback_on_step_end_tensor_inputs, ) self._guidance_scale = guidance_scale self._clip_skip = clip_skip self._cross_attention_kwargs = cross_attention_kwargs adapter_state = None if image_t2i_adapter is not None: height, width = default_height_width(self,height, width, image_t2i_adapter) adapter_state = preprocessing_t2i_adapter(self,image_t2i_adapter,width,height,adapter_conditioning_scale,num_images_per_prompt) # 2. Define call parameters if prompt is not None and isinstance(prompt, str): batch_size = 1 elif prompt is not None and isinstance(prompt, list): batch_size = len(prompt) else: batch_size = prompt_embeds.shape[0] device = self._execution_device # here `guidance_scale` is defined analog to the guidance weight `w` of equation (2) # of the Imagen paper: https://arxiv.org/pdf/2205.11487.pdf . `guidance_scale = 1` # corresponds to doing no classifier free guidance. #do_classifier_free_guidance = guidance_scale > 1.0 if isinstance(controlnet, MultiControlNetModel) and isinstance(controlnet_conditioning_scale, float): controlnet_conditioning_scale = [controlnet_conditioning_scale] * len(controlnet.nets) global_pool_conditions = ( controlnet.config.global_pool_conditions if isinstance(controlnet, ControlNetModel) else controlnet.nets[0].config.global_pool_conditions ) guess_mode = guess_mode or global_pool_conditions # 3. Encode input prompt text_encoder_lora_scale = ( self.cross_attention_kwargs.get("scale", None) if self.cross_attention_kwargs is not None else None ) #text_ids, text_embeddings = self.prompt_parser([negative_prompt, prompt]) #text_embeddings = text_embeddings.to(self.unet.dtype) #Copy input prompt_embeds and negative_prompt_embeds prompt_embeds_copy = None negative_prompt_embeds_copy = None if prompt_embeds is not None: prompt_embeds_copy = prompt_embeds.clone().detach() if negative_prompt_embeds is not None: negative_prompt_embeds_copy = negative_prompt_embeds.clone().detach() prompt_embeds, negative_prompt_embeds,text_input_ids = encode_prompt_function( self, prompt, device, num_images_per_prompt, self.do_classifier_free_guidance, negative_prompt, prompt_embeds=prompt_embeds, negative_prompt_embeds=negative_prompt_embeds, lora_scale=text_encoder_lora_scale, clip_skip=self.clip_skip, long_encode = long_encode, ) #Get token_id #text_input_ids,uncond_input_ids = get_id_text(self,prompt,max_length = prompt_embeds.shape[1],negative_prompt = negative_prompt,prompt_embeds = prompt_embeds_copy,negative_prompt_embeds = negative_prompt_embeds_copy) # For classifier free guidance, we need to do two forward passes. # Here we concatenate the unconditional and text embeddings into a single batch # to avoid doing two forward passes '''if text_input_ids is not None: text_input_ids = np.concatenate([uncond_input_ids, text_input_ids])''' # For classifier free guidance, we need to do two forward passes. # Here we concatenate the unconditional and text embeddings into a single batch # to avoid doing two forward passes if self.do_classifier_free_guidance: prompt_embeds = torch.cat([negative_prompt_embeds, prompt_embeds]) if ip_adapter_image is not None or ip_adapter_image_embeds is not None: image_embeds = self.prepare_ip_adapter_image_embeds( ip_adapter_image, ip_adapter_image_embeds, device, batch_size * num_images_per_prompt, self.do_classifier_free_guidance, ) #if height is None and width is None: #height, width = image.shape[-2:] # 4. Prepare image if isinstance(controlnet, ControlNetModel): image = self.prepare_image( image=image, width=width, height=height, batch_size=batch_size * num_images_per_prompt, num_images_per_prompt=num_images_per_prompt, device=device, dtype=controlnet.dtype, do_classifier_free_guidance=self.do_classifier_free_guidance, guess_mode=guess_mode, ) elif isinstance(controlnet, MultiControlNetModel): images = [] # Nested lists as ControlNet condition if isinstance(image[0], list): # Transpose the nested image list image = [list(t) for t in zip(*image)] for image_ in image: image_ = self.prepare_image( image=image_, width=width, height=height, batch_size=batch_size * num_images_per_prompt, num_images_per_prompt=num_images_per_prompt, device=device, dtype=controlnet.dtype, do_classifier_free_guidance=self.do_classifier_free_guidance, guess_mode=guess_mode, ) images.append(image_) image = images height, width = image[0].shape[-2:] else: assert False # 5. Prepare timesteps timesteps, num_inference_steps = retrieve_timesteps( self.scheduler, num_inference_steps, device, timesteps, sigmas ) self._num_timesteps = len(timesteps) # 6. Prepare latent variables region_state = encode_region_map( self, region_map_state, width = width, height = height, num_images_per_prompt = num_images_per_prompt, text_ids=text_input_ids, ) if self.cross_attention_kwargs is None: self._cross_attention_kwargs ={} num_channels_latents = self.unet.config.in_channels latents = self.prepare_latents( batch_size * num_images_per_prompt, num_channels_latents, height, width, prompt_embeds.dtype, device, generator, latents, ) # 6.5 Optionally get Guidance Scale Embedding timestep_cond = None if self.unet.config.time_cond_proj_dim is not None: guidance_scale_tensor = torch.tensor(self.guidance_scale - 1).repeat(batch_size * num_images_per_prompt) timestep_cond = self.get_guidance_scale_embedding( guidance_scale_tensor, embedding_dim=self.unet.config.time_cond_proj_dim ).to(device=device, dtype=latents.dtype) lst_latent = [] if latent_processing == 1: lst_latent = [self.type_output("pil",device,prompt_embeds.dtype,return_dict,latents,generator).images[0]] # 7. Prepare extra step kwargs. TODO: Logic should ideally just be moved out of the pipeline extra_step_kwargs = self.prepare_extra_step_kwargs(generator, eta) # 7.1 Add image embeds for IP-Adapter added_cond_kwargs = ( {"image_embeds": image_embeds} if ip_adapter_image is not None or ip_adapter_image_embeds is not None else None ) # 7.2 Create tensor stating which controlnets to keep controlnet_keep = [] for i in range(len(timesteps)): keeps = [ 1.0 - float(i / len(timesteps) < s or (i + 1) / len(timesteps) > e) for s, e in zip(control_guidance_start, control_guidance_end) ] controlnet_keep.append(keeps[0] if isinstance(controlnet, ControlNetModel) else keeps) # 8. Denoising loop num_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.order is_unet_compiled = is_compiled_module(self.unet) is_controlnet_compiled = is_compiled_module(self.controlnet) is_torch_higher_equal_2_1 = is_torch_version(">=", "2.1") with self.progress_bar(total=num_inference_steps) as progress_bar: #step_x = 0 for i, t in enumerate(timesteps): # Relevant thread: # https://dev-discuss.pytorch.org/t/cudagraphs-in-pytorch-2-0/1428 if (is_unet_compiled and is_controlnet_compiled) and is_torch_higher_equal_2_1: torch._inductor.cudagraph_mark_step_begin() # expand the latents if we are doing classifier free guidance latent_model_input = torch.cat([latents] * 2) if self.do_classifier_free_guidance else latents latent_model_input = self.scheduler.scale_model_input(latent_model_input, t) # controlnet(s) inference if guess_mode and self.do_classifier_free_guidance: # Infer ControlNet only for the conditional batch. control_model_input = latents control_model_input = self.scheduler.scale_model_input(control_model_input, t) controlnet_prompt_embeds = prompt_embeds.chunk(2)[1] else: control_model_input = latent_model_input controlnet_prompt_embeds = prompt_embeds if isinstance(controlnet_keep[i], list): cond_scale = [c * s for c, s in zip(controlnet_conditioning_scale, controlnet_keep[i])] else: controlnet_cond_scale = controlnet_conditioning_scale if isinstance(controlnet_cond_scale, list): controlnet_cond_scale = controlnet_cond_scale[0] cond_scale = controlnet_cond_scale * controlnet_keep[i] down_block_res_samples, mid_block_res_sample = self.controlnet( control_model_input, t, encoder_hidden_states=controlnet_prompt_embeds, controlnet_cond=image, conditioning_scale=cond_scale, guess_mode=guess_mode, return_dict=False, ) if guess_mode and self.do_classifier_free_guidance: # Infered ControlNet only for the conditional batch. # To apply the output of ControlNet to both the unconditional and conditional batches, # add 0 to the unconditional batch to keep it unchanged. down_block_res_samples = [torch.cat([torch.zeros_like(d), d]) for d in down_block_res_samples] mid_block_res_sample = torch.cat([torch.zeros_like(mid_block_res_sample), mid_block_res_sample]) region_prompt = { "region_state": region_state, "sigma": self.scheduler.sigmas[i], "weight_func": weight_func, } self._cross_attention_kwargs["region_prompt"] = region_prompt #print(t) #step_x=step_x+1 down_intrablock_additional_residuals = None if adapter_state is not None: if i < int(num_inference_steps * adapter_conditioning_factor): down_intrablock_additional_residuals = [state.clone() for state in adapter_state] else: down_intrablock_additional_residuals = None # predict the noise residual noise_pred = self.unet( latent_model_input, t, encoder_hidden_states=prompt_embeds, timestep_cond=timestep_cond, cross_attention_kwargs=self.cross_attention_kwargs, down_block_additional_residuals=down_block_res_samples, mid_block_additional_residual=mid_block_res_sample, down_intrablock_additional_residuals = down_intrablock_additional_residuals, added_cond_kwargs=added_cond_kwargs, return_dict=False, )[0] # perform guidance if self.do_classifier_free_guidance: noise_pred_uncond, noise_pred_text = noise_pred.chunk(2) noise_pred = noise_pred_uncond + self.guidance_scale * (noise_pred_text - noise_pred_uncond) if self.do_classifier_free_guidance and guidance_rescale > 0.0: # Based on 3.4. in https://arxiv.org/pdf/2305.08891.pdf noise_pred = rescale_noise_cfg(noise_pred, noise_pred_text, guidance_rescale=guidance_rescale) # compute the previous noisy sample x_t -> x_t-1 latents = self.scheduler.step(noise_pred, t, latents, **extra_step_kwargs, return_dict=False)[0] if latent_processing == 1: lst_latent.append(self.type_output("pil",device,prompt_embeds.dtype,return_dict,latents,generator).images[0]) if callback_on_step_end is not None: callback_kwargs = {} for k in callback_on_step_end_tensor_inputs: callback_kwargs[k] = locals()[k] callback_outputs = callback_on_step_end(self, i, t, callback_kwargs) latents = callback_outputs.pop("latents", latents) prompt_embeds = callback_outputs.pop("prompt_embeds", prompt_embeds) negative_prompt_embeds = callback_outputs.pop("negative_prompt_embeds", negative_prompt_embeds) # call the callback, if provided if i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0): progress_bar.update() if callback is not None and i % callback_steps == 0: step_idx = i // getattr(self.scheduler, "order", 1) callback(step_idx, t, latents) # If we do sequential model offloading, let's offload unet and controlnet # manually for max memory savings if hasattr(self, "final_offload_hook") and self.final_offload_hook is not None: self.unet.to("cpu") self.controlnet.to("cpu") torch.cuda.empty_cache() if latent_processing == 1: if output_type == 'latent': lst_latent.append(self.type_output(output_type,device,prompt_embeds.dtype,return_dict,latents,generator).images[0]) return lst_latent if output_type == 'latent': return [self.type_output("pil",device,prompt_embeds.dtype,return_dict,latents,generator).images[0],self.type_output(output_type,device,prompt_embeds.dtype,return_dict,latents,generator).images[0]] return [self.type_output(output_type,device,prompt_embeds.dtype,return_dict,latents,generator).images[0]] class StableDiffusionControlNetImg2ImgPipeline_finetune(IPAdapterMixin,StableDiffusionControlNetImg2ImgPipeline): def type_output(self,output_type,device,d_type,return_dict,latents,generator): if not output_type == "latent": image = self.vae.decode(latents / self.vae.config.scaling_factor, return_dict=False,generator=generator)[0] image, has_nsfw_concept = self.run_safety_checker(image, device, d_type) else: image = latents has_nsfw_concept = None if has_nsfw_concept is None: do_denormalize = [True] * image.shape[0] else: do_denormalize = [not has_nsfw for has_nsfw in has_nsfw_concept] image = self.image_processor.postprocess(image, output_type=output_type, do_denormalize=do_denormalize) # Offload all models self.maybe_free_model_hooks() if not return_dict: return (image, has_nsfw_concept) return StableDiffusionPipelineOutput(images=image, nsfw_content_detected=has_nsfw_concept) @torch.no_grad() def __call__( self, prompt: Union[str, List[str]] = None, image: PipelineImageInput = None, control_image: PipelineImageInput = None, height: Optional[int] = None, width: Optional[int] = None, strength: float = 0.8, num_inference_steps: int = 50, guidance_scale: float = 7.5, negative_prompt: Optional[Union[str, List[str]]] = None, num_images_per_prompt: Optional[int] = 1, eta: float = 0.0, generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None, latents: Optional[torch.Tensor] = None, prompt_embeds: Optional[torch.Tensor] = None, negative_prompt_embeds: Optional[torch.Tensor] = None, ip_adapter_image: Optional[PipelineImageInput] = None, ip_adapter_image_embeds: Optional[List[torch.Tensor]] = None, output_type: Optional[str] = "pil", return_dict: bool = True, guidance_rescale: float = 0.0, #callback: Optional[Callable[[int, int, torch.Tensor], None]] = None, #callback_steps: int = 1, cross_attention_kwargs: Optional[Dict[str, Any]] = None, controlnet_conditioning_scale: Union[float, List[float]] = 0.8, guess_mode: bool = False, control_guidance_start: Union[float, List[float]] = 0.0, control_guidance_end: Union[float, List[float]] = 1.0, clip_skip: Optional[int] = 0, callback_on_step_end: Optional[ Union[Callable[[int, int, Dict], None], PipelineCallback, MultiPipelineCallbacks] ] = None, callback_on_step_end_tensor_inputs: List[str] = ["latents"], region_map_state=None, weight_func = lambda w, sigma, qk: w * sigma * qk.std(), latent_processing = 0, image_t2i_adapter : Optional[PipelineImageInput] = None, adapter_conditioning_scale: Union[float, List[float]] = 1.0, adapter_conditioning_factor: float = 1.0, long_encode: int = 0, **kwargs, ): init_step = num_inference_steps callback = kwargs.pop("callback", None) callback_steps = kwargs.pop("callback_steps", None) if callback is not None: deprecate( "callback", "1.0.0", "Passing `callback` as an input argument to `__call__` is deprecated, consider using `callback_on_step_end`", ) if callback_steps is not None: deprecate( "callback_steps", "1.0.0", "Passing `callback_steps` as an input argument to `__call__` is deprecated, consider using `callback_on_step_end`", ) if isinstance(callback_on_step_end, (PipelineCallback, MultiPipelineCallbacks)): callback_on_step_end_tensor_inputs = callback_on_step_end.tensor_inputs controlnet = self.controlnet._orig_mod if is_compiled_module(self.controlnet) else self.controlnet if height is None: _,height = get_image_size(image) height = int((height // 8)*8) if width is None: width,_ = get_image_size(image) width = int((width // 8)*8) # align format for control guidance if not isinstance(control_guidance_start, list) and isinstance(control_guidance_end, list): control_guidance_start = len(control_guidance_end) * [control_guidance_start] elif not isinstance(control_guidance_end, list) and isinstance(control_guidance_start, list): control_guidance_end = len(control_guidance_start) * [control_guidance_end] elif not isinstance(control_guidance_start, list) and not isinstance(control_guidance_end, list): mult = len(controlnet.nets) if isinstance(controlnet, MultiControlNetModel) else 1 control_guidance_start, control_guidance_end = ( mult * [control_guidance_start], mult * [control_guidance_end], ) # 1. Check inputs. Raise error if not correct self.check_inputs( prompt, control_image, callback_steps, negative_prompt, prompt_embeds, negative_prompt_embeds, ip_adapter_image, ip_adapter_image_embeds, controlnet_conditioning_scale, control_guidance_start, control_guidance_end, callback_on_step_end_tensor_inputs, ) self._guidance_scale = guidance_scale self._clip_skip = clip_skip self._cross_attention_kwargs = cross_attention_kwargs adapter_state = None if image_t2i_adapter is not None: height, width = default_height_width(self,height, width, image_t2i_adapter) adapter_state = preprocessing_t2i_adapter(self,image_t2i_adapter,width,height,adapter_conditioning_scale,num_images_per_prompt) #self.prompt_parser = FrozenCLIPEmbedderWithCustomWords(self.tokenizer, self.text_encoder,clip_skip+1) # 2. Define call parameters if prompt is not None and isinstance(prompt, str): batch_size = 1 elif prompt is not None and isinstance(prompt, list): batch_size = len(prompt) else: batch_size = prompt_embeds.shape[0] device = self._execution_device # here `guidance_scale` is defined analog to the guidance weight `w` of equation (2) # of the Imagen paper: https://arxiv.org/pdf/2205.11487.pdf . `guidance_scale = 1` # corresponds to doing no classifier free guidance. #do_classifier_free_guidance = guidance_scale > 1.0 if isinstance(controlnet, MultiControlNetModel) and isinstance(controlnet_conditioning_scale, float): controlnet_conditioning_scale = [controlnet_conditioning_scale] * len(controlnet.nets) global_pool_conditions = ( controlnet.config.global_pool_conditions if isinstance(controlnet, ControlNetModel) else controlnet.nets[0].config.global_pool_conditions ) guess_mode = guess_mode or global_pool_conditions # 3. Encode input prompt text_encoder_lora_scale = ( self.cross_attention_kwargs.get("scale", None) if self.cross_attention_kwargs is not None else None ) #text_ids, text_embeddings = self.prompt_parser([negative_prompt, prompt]) #text_embeddings = text_embeddings.to(self.unet.dtype) #Copy input prompt_embeds and negative_prompt_embeds prompt_embeds_copy = None negative_prompt_embeds_copy = None if prompt_embeds is not None: prompt_embeds_copy = prompt_embeds.clone().detach() if negative_prompt_embeds is not None: negative_prompt_embeds_copy = negative_prompt_embeds.clone().detach() prompt_embeds, negative_prompt_embeds,text_input_ids = encode_prompt_function( self, prompt, device, num_images_per_prompt, self.do_classifier_free_guidance, negative_prompt, prompt_embeds=prompt_embeds, negative_prompt_embeds=negative_prompt_embeds, lora_scale=text_encoder_lora_scale, clip_skip=self.clip_skip, long_encode = long_encode, ) #Get token_id #text_input_ids,uncond_input_ids = get_id_text(self,prompt,max_length = prompt_embeds.shape[1],negative_prompt = negative_prompt,prompt_embeds = prompt_embeds_copy,negative_prompt_embeds = negative_prompt_embeds_copy) # For classifier free guidance, we need to do two forward passes. # Here we concatenate the unconditional and text embeddings into a single batch # to avoid doing two forward passes '''if text_input_ids is not None: text_input_ids = np.concatenate([uncond_input_ids, text_input_ids])''' # For classifier free guidance, we need to do two forward passes. # Here we concatenate the unconditional and text embeddings into a single batch # to avoid doing two forward passes if self.do_classifier_free_guidance: prompt_embeds = torch.cat([negative_prompt_embeds, prompt_embeds]) if ip_adapter_image is not None or ip_adapter_image_embeds is not None: image_embeds = self.prepare_ip_adapter_image_embeds( ip_adapter_image, ip_adapter_image_embeds, device, batch_size * num_images_per_prompt, self.do_classifier_free_guidance, ) # 4. Prepare image image = self.image_processor.preprocess(image, height=height, width=width).to(dtype=torch.float32) # 5. Prepare controlnet_conditioning_image if isinstance(controlnet, ControlNetModel): control_image = self.prepare_control_image( image=control_image, width=width, height=height, batch_size=batch_size * num_images_per_prompt, num_images_per_prompt=num_images_per_prompt, device=device, dtype=controlnet.dtype, do_classifier_free_guidance=self.do_classifier_free_guidance, guess_mode=guess_mode, ) elif isinstance(controlnet, MultiControlNetModel): control_images = [] # Nested lists as ControlNet condition if isinstance(image[0], list): # Transpose the nested image list image = [list(t) for t in zip(*image)] for control_image_ in control_image: control_image_ = self.prepare_control_image( image=control_image_, width=width, height=height, batch_size=batch_size * num_images_per_prompt, num_images_per_prompt=num_images_per_prompt, device=device, dtype=controlnet.dtype, do_classifier_free_guidance=self.do_classifier_free_guidance, guess_mode=guess_mode, ) control_images.append(control_image_) control_image = control_images else: assert False # 5. Prepare timesteps region_state = encode_region_map( self, region_map_state, width = width, height = height, num_images_per_prompt = num_images_per_prompt, text_ids=text_input_ids, ) if self.cross_attention_kwargs is None: self._cross_attention_kwargs ={} self.scheduler.set_timesteps(num_inference_steps, device=device) timesteps, num_inference_steps = self.get_timesteps(num_inference_steps, strength, device) latent_timestep = timesteps[:1].repeat(batch_size * num_images_per_prompt) self._num_timesteps = len(timesteps) # 6. Prepare latent variables if latents is None: latents = self.prepare_latents( image, latent_timestep, batch_size, num_images_per_prompt, prompt_embeds.dtype, device, generator, ) lst_latent = [] if latent_processing == 1: lst_latent = [self.type_output("pil",device,prompt_embeds.dtype,return_dict,latents,generator).images[0]] # 7. Prepare extra step kwargs. TODO: Logic should ideally just be moved out of the pipeline extra_step_kwargs = self.prepare_extra_step_kwargs(generator, eta) # 7.1 Add image embeds for IP-Adapter added_cond_kwargs = ( {"image_embeds": image_embeds} if ip_adapter_image is not None or ip_adapter_image_embeds is not None else None ) # 7.2 Create tensor stating which controlnets to keep controlnet_keep = [] for i in range(len(timesteps)): keeps = [ 1.0 - float(i / len(timesteps) < s or (i + 1) / len(timesteps) > e) for s, e in zip(control_guidance_start, control_guidance_end) ] controlnet_keep.append(keeps[0] if isinstance(controlnet, ControlNetModel) else keeps) sigmas = self.scheduler.sigmas[init_step-len(timesteps):] # 8. Denoising loop num_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.order with self.progress_bar(total=num_inference_steps) as progress_bar: #step_x = 0 for i, t in enumerate(timesteps): # expand the latents if we are doing classifier free guidance latent_model_input = torch.cat([latents] * 2) if self.do_classifier_free_guidance else latents latent_model_input = self.scheduler.scale_model_input(latent_model_input, t) # controlnet(s) inference if guess_mode and self.do_classifier_free_guidance: # Infer ControlNet only for the conditional batch. control_model_input = latents control_model_input = self.scheduler.scale_model_input(control_model_input, t) controlnet_prompt_embeds = prompt_embeds.chunk(2)[1] else: control_model_input = latent_model_input controlnet_prompt_embeds = prompt_embeds if isinstance(controlnet_keep[i], list): cond_scale = [c * s for c, s in zip(controlnet_conditioning_scale, controlnet_keep[i])] else: controlnet_cond_scale = controlnet_conditioning_scale if isinstance(controlnet_cond_scale, list): controlnet_cond_scale = controlnet_cond_scale[0] cond_scale = controlnet_cond_scale * controlnet_keep[i] down_block_res_samples, mid_block_res_sample = self.controlnet( control_model_input, t, encoder_hidden_states=controlnet_prompt_embeds, controlnet_cond=control_image, conditioning_scale=cond_scale, guess_mode=guess_mode, return_dict=False, ) if guess_mode and self.do_classifier_free_guidance: # Infered ControlNet only for the conditional batch. # To apply the output of ControlNet to both the unconditional and conditional batches, # add 0 to the unconditional batch to keep it unchanged. down_block_res_samples = [torch.cat([torch.zeros_like(d), d]) for d in down_block_res_samples] mid_block_res_sample = torch.cat([torch.zeros_like(mid_block_res_sample), mid_block_res_sample]) region_prompt = { "region_state": region_state, "sigma": self.scheduler.sigmas[i], "weight_func": weight_func, } self._cross_attention_kwargs["region_prompt"] = region_prompt #print(t) #step_x=step_x+1 down_intrablock_additional_residuals = None if adapter_state is not None: if i < int(num_inference_steps * adapter_conditioning_factor): down_intrablock_additional_residuals = [state.clone() for state in adapter_state] else: down_intrablock_additional_residuals = None # predict the noise residual # predict the noise residual noise_pred = self.unet( latent_model_input, t, encoder_hidden_states=prompt_embeds, cross_attention_kwargs=self.cross_attention_kwargs, down_block_additional_residuals=down_block_res_samples, mid_block_additional_residual=mid_block_res_sample, down_intrablock_additional_residuals = down_intrablock_additional_residuals, added_cond_kwargs=added_cond_kwargs, return_dict=False, )[0] # perform guidance if self.do_classifier_free_guidance: noise_pred_uncond, noise_pred_text = noise_pred.chunk(2) noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond) if self.do_classifier_free_guidance and guidance_rescale > 0.0: # Based on 3.4. in https://arxiv.org/pdf/2305.08891.pdf noise_pred = rescale_noise_cfg(noise_pred, noise_pred_text, guidance_rescale=guidance_rescale) # compute the previous noisy sample x_t -> x_t-1 latents = self.scheduler.step(noise_pred, t, latents, **extra_step_kwargs, return_dict=False)[0] if latent_processing == 1: lst_latent.append(self.type_output("pil",device,prompt_embeds.dtype,return_dict,latents,generator).images[0]) if callback_on_step_end is not None: callback_kwargs = {} for k in callback_on_step_end_tensor_inputs: callback_kwargs[k] = locals()[k] callback_outputs = callback_on_step_end(self, i, t, callback_kwargs) latents = callback_outputs.pop("latents", latents) prompt_embeds = callback_outputs.pop("prompt_embeds", prompt_embeds) negative_prompt_embeds = callback_outputs.pop("negative_prompt_embeds", negative_prompt_embeds) # call the callback, if provided if i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0): progress_bar.update() if callback is not None and i % callback_steps == 0: step_idx = i // getattr(self.scheduler, "order", 1) callback(step_idx, t, latents) # If we do sequential model offloading, let's offload unet and controlnet # manually for max memory savings if hasattr(self, "final_offload_hook") and self.final_offload_hook is not None: self.unet.to("cpu") self.controlnet.to("cpu") torch.cuda.empty_cache() if latent_processing == 1: if output_type == 'latent': lst_latent.append(self.type_output(output_type,device,prompt_embeds.dtype,return_dict,latents,generator).images[0]) return lst_latent if output_type == 'latent': return [self.type_output("pil",device,prompt_embeds.dtype,return_dict,latents,generator).images[0],self.type_output(output_type,device,prompt_embeds.dtype,return_dict,latents,generator).images[0]] return [self.type_output(output_type,device,prompt_embeds.dtype,return_dict,latents,generator).images[0]] class StableDiffusionImg2ImgPipeline_finetune(IPAdapterMixin,StableDiffusionImg2ImgPipeline): def type_output(self,output_type,device,d_type,return_dict,latents,generator): if not output_type == "latent": image = self.vae.decode(latents / self.vae.config.scaling_factor, return_dict=False,generator=generator)[0] image, has_nsfw_concept = self.run_safety_checker(image, device, d_type) else: image = latents has_nsfw_concept = None if has_nsfw_concept is None: do_denormalize = [True] * image.shape[0] else: do_denormalize = [not has_nsfw for has_nsfw in has_nsfw_concept] image = self.image_processor.postprocess(image, output_type=output_type, do_denormalize=do_denormalize) # Offload all models self.maybe_free_model_hooks() if not return_dict: return (image, has_nsfw_concept) return StableDiffusionPipelineOutput(images=image, nsfw_content_detected=has_nsfw_concept) @torch.no_grad() def __call__( self, prompt: Union[str, List[str]] = None, image: PipelineImageInput = None, height: Optional[int] = None, width: Optional[int] = None, strength: float = 0.8, num_inference_steps: Optional[int] = 50, timesteps: List[int] = None, sigmas: List[float] = None, guidance_scale: Optional[float] = 7.5, negative_prompt: Optional[Union[str, List[str]]] = None, num_images_per_prompt: Optional[int] = 1, eta: Optional[float] = 0.0, generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None, prompt_embeds: Optional[torch.Tensor] = None, negative_prompt_embeds: Optional[torch.Tensor] = None, ip_adapter_image: Optional[PipelineImageInput] = None, ip_adapter_image_embeds: Optional[List[torch.Tensor]] = None, output_type: Optional[str] = "pil", return_dict: bool = True, guidance_rescale: float = 0.0, #callback: Optional[Callable[[int, int, torch.Tensor], None]] = None, #callback_steps: int = 1, cross_attention_kwargs: Optional[Dict[str, Any]] = None, clip_skip: int = 0, callback_on_step_end: Optional[ Union[Callable[[int, int, Dict], None], PipelineCallback, MultiPipelineCallbacks] ] = None, callback_on_step_end_tensor_inputs: List[str] = ["latents"], region_map_state=None, weight_func = lambda w, sigma, qk: w * sigma * qk.std(), latent_processing = 0, image_t2i_adapter : Optional[PipelineImageInput] = None, adapter_conditioning_scale: Union[float, List[float]] = 1.0, adapter_conditioning_factor: float = 1.0, long_encode: int = 0, **kwargs, ): init_step = num_inference_steps callback = kwargs.pop("callback", None) callback_steps = kwargs.pop("callback_steps", None) if callback is not None: deprecate( "callback", "1.0.0", "Passing `callback` as an input argument to `__call__` is deprecated, consider use `callback_on_step_end`", ) if callback_steps is not None: deprecate( "callback_steps", "1.0.0", "Passing `callback_steps` as an input argument to `__call__` is deprecated, consider use `callback_on_step_end`", ) if isinstance(callback_on_step_end, (PipelineCallback, MultiPipelineCallbacks)): callback_on_step_end_tensor_inputs = callback_on_step_end.tensor_inputs # 1. Check inputs. Raise error if not correct self.check_inputs( prompt, strength, callback_steps, negative_prompt, prompt_embeds, negative_prompt_embeds, ip_adapter_image, ip_adapter_image_embeds, callback_on_step_end_tensor_inputs, ) #self.prompt_parser = FrozenCLIPEmbedderWithCustomWords(self.tokenizer, self.text_encoder,clip_skip+1) self._guidance_scale = guidance_scale self._clip_skip = clip_skip self._cross_attention_kwargs = cross_attention_kwargs self._interrupt = False if height is None: _,height = get_image_size(image) height = int((height // 8)*8) if width is None: width,_ = get_image_size(image) width = int((width // 8)*8) adapter_state = None if image_t2i_adapter is not None: height, width = default_height_width(self,height, width, image_t2i_adapter) adapter_state = preprocessing_t2i_adapter(self,image_t2i_adapter,width,height,adapter_conditioning_scale,num_images_per_prompt) # 2. Define call parameters if prompt is not None and isinstance(prompt, str): batch_size = 1 elif prompt is not None and isinstance(prompt, list): batch_size = len(prompt) else: batch_size = prompt_embeds.shape[0] device = self._execution_device # 3. Encode input prompt text_encoder_lora_scale = ( self.cross_attention_kwargs.get("scale", None) if self.cross_attention_kwargs is not None else None ) #Copy input prompt_embeds and negative_prompt_embeds prompt_embeds_copy = None negative_prompt_embeds_copy = None if prompt_embeds is not None: prompt_embeds_copy = prompt_embeds.clone().detach() if negative_prompt_embeds is not None: negative_prompt_embeds_copy = negative_prompt_embeds.clone().detach() prompt_embeds, negative_prompt_embeds,text_input_ids = encode_prompt_function( self, prompt, device, num_images_per_prompt, self.do_classifier_free_guidance, negative_prompt, prompt_embeds=prompt_embeds, negative_prompt_embeds=negative_prompt_embeds, lora_scale=text_encoder_lora_scale, clip_skip=self.clip_skip, long_encode = long_encode, ) #Get token_id #text_input_ids,uncond_input_ids = get_id_text(self,prompt,max_length = prompt_embeds.shape[1],negative_prompt = negative_prompt,prompt_embeds = prompt_embeds_copy,negative_prompt_embeds = negative_prompt_embeds_copy) # For classifier free guidance, we need to do two forward passes. # Here we concatenate the unconditional and text embeddings into a single batch # to avoid doing two forward passes '''if text_input_ids is not None: text_input_ids = np.concatenate([uncond_input_ids, text_input_ids])''' #text_ids, text_embeddings = self.prompt_parser([negative_prompt, prompt]) #text_embeddings = text_embeddings.to(self.unet.dtype) # For classifier free guidance, we need to do two forward passes. # Here we concatenate the unconditional and text embeddings into a single batch # to avoid doing two forward passes if self.do_classifier_free_guidance: prompt_embeds = torch.cat([negative_prompt_embeds, prompt_embeds]) if ip_adapter_image is not None or ip_adapter_image_embeds is not None: image_embeds = self.prepare_ip_adapter_image_embeds( ip_adapter_image, ip_adapter_image_embeds, device, batch_size * num_images_per_prompt, self.do_classifier_free_guidance, ) # 4. Preprocess image image = self.image_processor.preprocess(image) # 5. set timesteps region_state = encode_region_map( self, region_map_state, width = width, height = height, num_images_per_prompt = num_images_per_prompt, text_ids=text_input_ids, ) if self.cross_attention_kwargs is None: self._cross_attention_kwargs ={} timesteps, num_inference_steps = retrieve_timesteps( self.scheduler, num_inference_steps, device, timesteps, sigmas ) timesteps, num_inference_steps = self.get_timesteps(num_inference_steps, strength, device) latent_timestep = timesteps[:1].repeat(batch_size * num_images_per_prompt) # 6. Prepare latent variables latents = self.prepare_latents( image, latent_timestep, batch_size, num_images_per_prompt, prompt_embeds.dtype, device, generator, ) lst_latent =[] if latent_processing == 1: lst_latent = [self.type_output("pil",device,prompt_embeds.dtype,return_dict,latents,generator).images[0]] # 7. Prepare extra step kwargs. TODO: Logic should ideally just be moved out of the pipeline extra_step_kwargs = self.prepare_extra_step_kwargs(generator, eta) # 7.1 Add image embeds for IP-Adapter added_cond_kwargs = ( {"image_embeds": image_embeds} if ip_adapter_image is not None or ip_adapter_image_embeds is not None else None ) # 7.2 Optionally get Guidance Scale Embedding timestep_cond = None if self.unet.config.time_cond_proj_dim is not None: guidance_scale_tensor = torch.tensor(self.guidance_scale - 1).repeat(batch_size * num_images_per_prompt) timestep_cond = self.get_guidance_scale_embedding( guidance_scale_tensor, embedding_dim=self.unet.config.time_cond_proj_dim ).to(device=device, dtype=latents.dtype) sigmas = self.scheduler.sigmas[init_step-len(timesteps):] # 8. Denoising loop num_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.order self._num_timesteps = len(timesteps) with self.progress_bar(total=num_inference_steps) as progress_bar: #step_x = 0 for i, t in enumerate(timesteps): if self.interrupt: continue # expand the latents if we are doing classifier free guidance latent_model_input = torch.cat([latents] * 2) if self.do_classifier_free_guidance else latents latent_model_input = self.scheduler.scale_model_input(latent_model_input, t) region_prompt = { "region_state": region_state, "sigma": self.scheduler.sigmas[i], "weight_func": weight_func, } self._cross_attention_kwargs["region_prompt"] = region_prompt #print(t) #step_x=step_x+1 down_intrablock_additional_residuals = None if adapter_state is not None: if i < int(num_inference_steps * adapter_conditioning_factor): down_intrablock_additional_residuals = [state.clone() for state in adapter_state] else: down_intrablock_additional_residuals = None # predict the noise residual noise_pred = self.unet( latent_model_input, t, encoder_hidden_states=prompt_embeds, timestep_cond=timestep_cond, cross_attention_kwargs=self.cross_attention_kwargs, down_intrablock_additional_residuals = down_intrablock_additional_residuals, added_cond_kwargs=added_cond_kwargs, return_dict=False, )[0] # perform guidance if self.do_classifier_free_guidance: noise_pred_uncond, noise_pred_text = noise_pred.chunk(2) noise_pred = noise_pred_uncond + self.guidance_scale * (noise_pred_text - noise_pred_uncond) if self.do_classifier_free_guidance and guidance_rescale > 0.0: # Based on 3.4. in https://arxiv.org/pdf/2305.08891.pdf noise_pred = rescale_noise_cfg(noise_pred, noise_pred_text, guidance_rescale=guidance_rescale) # compute the previous noisy sample x_t -> x_t-1 latents = self.scheduler.step(noise_pred, t, latents, **extra_step_kwargs, return_dict=False)[0] if latent_processing == 1: lst_latent.append(self.type_output("pil",device,prompt_embeds.dtype,return_dict,latents,generator).images[0]) if callback_on_step_end is not None: callback_kwargs = {} for k in callback_on_step_end_tensor_inputs: callback_kwargs[k] = locals()[k] callback_outputs = callback_on_step_end(self, i, t, callback_kwargs) latents = callback_outputs.pop("latents", latents) prompt_embeds = callback_outputs.pop("prompt_embeds", prompt_embeds) negative_prompt_embeds = callback_outputs.pop("negative_prompt_embeds", negative_prompt_embeds) # call the callback, if provided if i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0): progress_bar.update() if callback is not None and i % callback_steps == 0: step_idx = i // getattr(self.scheduler, "order", 1) callback(step_idx, t, latents) if latent_processing == 1: if output_type == 'latent': lst_latent.append(self.type_output(output_type,device,prompt_embeds.dtype,return_dict,latents,generator).images[0]) return lst_latent if output_type == 'latent': return [self.type_output("pil",device,prompt_embeds.dtype,return_dict,latents,generator).images[0],self.type_output(output_type,device,prompt_embeds.dtype,return_dict,latents,generator).images[0]] return [self.type_output(output_type,device,prompt_embeds.dtype,return_dict,latents,generator).images[0]] class StableDiffusionInpaintPipeline_finetune(IPAdapterMixin,StableDiffusionInpaintPipeline): def type_output(self,output_type,device,d_type,return_dict,latents,generator,init_image,padding_mask_crop,mask_image,original_image,crops_coords): if not output_type == "latent": condition_kwargs = {} if isinstance(self.vae, AsymmetricAutoencoderKL): init_image = init_image.to(device=device, dtype=masked_image_latents.dtype) init_image_condition = init_image.clone() init_image = self._encode_vae_image(init_image, generator=generator) mask_condition = mask_condition.to(device=device, dtype=masked_image_latents.dtype) condition_kwargs = {"image": init_image_condition, "mask": mask_condition} image = self.vae.decode( latents / self.vae.config.scaling_factor, return_dict=False, generator=generator, **condition_kwargs )[0] image, has_nsfw_concept = self.run_safety_checker(image, device, d_type) else: image = latents has_nsfw_concept = None if has_nsfw_concept is None: do_denormalize = [True] * image.shape[0] else: do_denormalize = [not has_nsfw for has_nsfw in has_nsfw_concept] image = self.image_processor.postprocess(image, output_type=output_type, do_denormalize=do_denormalize) if padding_mask_crop is not None: image = [self.image_processor.apply_overlay(mask_image, original_image, i, crops_coords) for i in image] # Offload all models self.maybe_free_model_hooks() if not return_dict: return (image, has_nsfw_concept) return StableDiffusionPipelineOutput(images=image, nsfw_content_detected=has_nsfw_concept) @torch.no_grad() def __call__( self, prompt: Union[str, List[str]] = None, image: PipelineImageInput = None, mask_image: PipelineImageInput = None, masked_image_latents: torch.Tensor = None, height: Optional[int] = None, width: Optional[int] = None, padding_mask_crop: Optional[int] = None, strength: float = 1.0, num_inference_steps: int = 50, timesteps: List[int] = None, sigmas: List[float] = None, guidance_scale: float = 7.5, negative_prompt: Optional[Union[str, List[str]]] = None, num_images_per_prompt: Optional[int] = 1, eta: float = 0.0, generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None, latents: Optional[torch.Tensor] = None, prompt_embeds: Optional[torch.Tensor] = None, negative_prompt_embeds: Optional[torch.Tensor] = None, ip_adapter_image: Optional[PipelineImageInput] = None, ip_adapter_image_embeds: Optional[List[torch.Tensor]] = None, output_type: Optional[str] = "pil", return_dict: bool = True, cross_attention_kwargs: Optional[Dict[str, Any]] = None, clip_skip: int = None, callback_on_step_end: Optional[ Union[Callable[[int, int, Dict], None], PipelineCallback, MultiPipelineCallbacks] ] = None, callback_on_step_end_tensor_inputs: List[str] = ["latents"], region_map_state=None, weight_func = lambda w, sigma, qk: w * sigma * qk.std(), latent_processing = 0, image_t2i_adapter : Optional[PipelineImageInput] = None, adapter_conditioning_scale: Union[float, List[float]] = 1.0, adapter_conditioning_factor: float = 1.0, long_encode: int = 0, guidance_rescale: float = 0.0, **kwargs, ): callback = kwargs.pop("callback", None) callback_steps = kwargs.pop("callback_steps", None) if callback is not None: deprecate( "callback", "1.0.0", "Passing `callback` as an input argument to `__call__` is deprecated, consider use `callback_on_step_end`", ) if callback_steps is not None: deprecate( "callback_steps", "1.0.0", "Passing `callback_steps` as an input argument to `__call__` is deprecated, consider use `callback_on_step_end`", ) if isinstance(callback_on_step_end, (PipelineCallback, MultiPipelineCallbacks)): callback_on_step_end_tensor_inputs = callback_on_step_end.tensor_inputs # 0. Default height and width to unet '''height = height or self.unet.config.sample_size * self.vae_scale_factor width = width or self.unet.config.sample_size * self.vae_scale_factor''' if height is None: _,height = get_image_size(image) height = int((height // 8)*8) if width is None: width,_ = get_image_size(image) width = int((width // 8)*8) adapter_state = None if image_t2i_adapter is not None: height, width = default_height_width(self,height, width, image_t2i_adapter) adapter_state = preprocessing_t2i_adapter(self,image_t2i_adapter,width,height,adapter_conditioning_scale,num_images_per_prompt) # 1. Check inputs self.check_inputs( prompt, image, mask_image, height, width, strength, callback_steps, output_type, negative_prompt, prompt_embeds, negative_prompt_embeds, ip_adapter_image, ip_adapter_image_embeds, callback_on_step_end_tensor_inputs, padding_mask_crop, ) self._guidance_scale = guidance_scale self._clip_skip = clip_skip self._cross_attention_kwargs = cross_attention_kwargs self._interrupt = False # 2. Define call parameters if prompt is not None and isinstance(prompt, str): batch_size = 1 elif prompt is not None and isinstance(prompt, list): batch_size = len(prompt) else: batch_size = prompt_embeds.shape[0] device = self._execution_device # 3. Encode input prompt text_encoder_lora_scale = ( cross_attention_kwargs.get("scale", None) if cross_attention_kwargs is not None else None ) #Copy input prompt_embeds and negative_prompt_embeds prompt_embeds_copy = None negative_prompt_embeds_copy = None if prompt_embeds is not None: prompt_embeds_copy = prompt_embeds.clone().detach() if negative_prompt_embeds is not None: negative_prompt_embeds_copy = negative_prompt_embeds.clone().detach() prompt_embeds, negative_prompt_embeds,text_input_ids = encode_prompt_function( self, prompt, device, num_images_per_prompt, self.do_classifier_free_guidance, negative_prompt, prompt_embeds=prompt_embeds, negative_prompt_embeds=negative_prompt_embeds, lora_scale=text_encoder_lora_scale, clip_skip=self.clip_skip, long_encode = long_encode, ) #Get token_id #text_input_ids,uncond_input_ids = get_id_text(self,prompt,max_length = prompt_embeds.shape[1],negative_prompt = negative_prompt,prompt_embeds = prompt_embeds_copy,negative_prompt_embeds = negative_prompt_embeds_copy) # For classifier free guidance, we need to do two forward passes. # Here we concatenate the unconditional and text embeddings into a single batch # to avoid doing two forward passes '''if text_input_ids is not None: text_input_ids = np.concatenate([uncond_input_ids, text_input_ids])''' #text_ids, text_embeddings = self.prompt_parser([negative_prompt, prompt]) #text_embeddings = text_embeddings.to(self.unet.dtype) # For classifier free guidance, we need to do two forward passes. # Here we concatenate the unconditional and text embeddings into a single batch # to avoid doing two forward passes if self.do_classifier_free_guidance: prompt_embeds = torch.cat([negative_prompt_embeds, prompt_embeds]) if ip_adapter_image is not None or ip_adapter_image_embeds is not None: image_embeds = self.prepare_ip_adapter_image_embeds( ip_adapter_image, ip_adapter_image_embeds, device, batch_size * num_images_per_prompt, self.do_classifier_free_guidance, ) # 4. set timesteps timesteps, num_inference_steps = retrieve_timesteps( self.scheduler, num_inference_steps, device, timesteps, sigmas ) timesteps, num_inference_steps = self.get_timesteps( num_inference_steps=num_inference_steps, strength=strength, device=device ) # check that number of inference steps is not < 1 - as this doesn't make sense if num_inference_steps < 1: raise ValueError( f"After adjusting the num_inference_steps by strength parameter: {strength}, the number of pipeline" f"steps is {num_inference_steps} which is < 1 and not appropriate for this pipeline." ) # at which timestep to set the initial noise (n.b. 50% if strength is 0.5) latent_timestep = timesteps[:1].repeat(batch_size * num_images_per_prompt) # create a boolean to check if the strength is set to 1. if so then initialise the latents with pure noise is_strength_max = strength == 1.0 #4.1 Preprocess region mao region_state = encode_region_map( self, region_map_state, width = width, height = height, num_images_per_prompt = num_images_per_prompt, text_ids=text_input_ids, ) if self.cross_attention_kwargs is None: self._cross_attention_kwargs ={} # 5. Preprocess mask and image if padding_mask_crop is not None: crops_coords = self.mask_processor.get_crop_region(mask_image, width, height, pad=padding_mask_crop) resize_mode = "fill" else: crops_coords = None resize_mode = "default" original_image = image init_image = self.image_processor.preprocess( image, height=height, width=width, crops_coords=crops_coords, resize_mode=resize_mode ) init_image = init_image.to(dtype=torch.float32) # 6. Prepare latent variables num_channels_latents = self.vae.config.latent_channels num_channels_unet = self.unet.config.in_channels return_image_latents = num_channels_unet == 4 latents_outputs = self.prepare_latents( batch_size * num_images_per_prompt, num_channels_latents, height, width, prompt_embeds.dtype, device, generator, latents, image=init_image, timestep=latent_timestep, is_strength_max=is_strength_max, return_noise=True, return_image_latents=return_image_latents, ) if return_image_latents: latents, noise, image_latents = latents_outputs else: latents, noise = latents_outputs # 7. Prepare mask latent variables mask_condition = self.mask_processor.preprocess( mask_image, height=height, width=width, resize_mode=resize_mode, crops_coords=crops_coords ) if masked_image_latents is None: masked_image = init_image * (mask_condition < 0.5) else: masked_image = masked_image_latents mask, masked_image_latents = self.prepare_mask_latents( mask_condition, masked_image, batch_size * num_images_per_prompt, height, width, prompt_embeds.dtype, device, generator, self.do_classifier_free_guidance, ) # 8. Check that sizes of mask, masked image and latents match if num_channels_unet == 9: # default case for runwayml/stable-diffusion-inpainting num_channels_mask = mask.shape[1] num_channels_masked_image = masked_image_latents.shape[1] if num_channels_latents + num_channels_mask + num_channels_masked_image != self.unet.config.in_channels: raise ValueError( f"Incorrect configuration settings! The config of `pipeline.unet`: {self.unet.config} expects" f" {self.unet.config.in_channels} but received `num_channels_latents`: {num_channels_latents} +" f" `num_channels_mask`: {num_channels_mask} + `num_channels_masked_image`: {num_channels_masked_image}" f" = {num_channels_latents+num_channels_masked_image+num_channels_mask}. Please verify the config of" " `pipeline.unet` or your `mask_image` or `image` input." ) elif num_channels_unet != 4: raise ValueError( f"The unet {self.unet.__class__} should have either 4 or 9 input channels, not {self.unet.config.in_channels}." ) # 9. Prepare extra step kwargs. TODO: Logic should ideally just be moved out of the pipeline extra_step_kwargs = self.prepare_extra_step_kwargs(generator, eta) # 9.1 Add image embeds for IP-Adapter added_cond_kwargs = ( {"image_embeds": image_embeds} if ip_adapter_image is not None or ip_adapter_image_embeds is not None else None ) # 9.2 Optionally get Guidance Scale Embedding timestep_cond = None if self.unet.config.time_cond_proj_dim is not None: guidance_scale_tensor = torch.tensor(self.guidance_scale - 1).repeat(batch_size * num_images_per_prompt) timestep_cond = self.get_guidance_scale_embedding( guidance_scale_tensor, embedding_dim=self.unet.config.time_cond_proj_dim ).to(device=device, dtype=latents.dtype) lst_latent =[] if latent_processing == 1: lst_latent = [self.type_output("pil",device,prompt_embeds.dtype,return_dict,latents,generator,init_image,padding_mask_crop,mask_image,original_image,crops_coords).images[0]] # 10. Denoising loop num_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.order self._num_timesteps = len(timesteps) with self.progress_bar(total=num_inference_steps) as progress_bar: for i, t in enumerate(timesteps): if self.interrupt: continue # expand the latents if we are doing classifier free guidance latent_model_input = torch.cat([latents] * 2) if self.do_classifier_free_guidance else latents # concat latents, mask, masked_image_latents in the channel dimension latent_model_input = self.scheduler.scale_model_input(latent_model_input, t) if num_channels_unet == 9: latent_model_input = torch.cat([latent_model_input, mask, masked_image_latents], dim=1) region_prompt = { "region_state": region_state, "sigma": self.scheduler.sigmas[i], "weight_func": weight_func, } self._cross_attention_kwargs["region_prompt"] = region_prompt down_intrablock_additional_residuals = None if adapter_state is not None: if i < int(num_inference_steps * adapter_conditioning_factor): down_intrablock_additional_residuals = [state.clone() for state in adapter_state] else: down_intrablock_additional_residuals = None # predict the noise residual noise_pred = self.unet( latent_model_input, t, encoder_hidden_states=prompt_embeds, timestep_cond=timestep_cond, cross_attention_kwargs=self.cross_attention_kwargs, down_intrablock_additional_residuals = down_intrablock_additional_residuals, added_cond_kwargs=added_cond_kwargs, return_dict=False, )[0] # perform guidance if self.do_classifier_free_guidance: noise_pred_uncond, noise_pred_text = noise_pred.chunk(2) noise_pred = noise_pred_uncond + self.guidance_scale * (noise_pred_text - noise_pred_uncond) if self.do_classifier_free_guidance and guidance_rescale > 0.0: # Based on 3.4. in https://arxiv.org/pdf/2305.08891.pdf noise_pred = rescale_noise_cfg(noise_pred, noise_pred_text, guidance_rescale=guidance_rescale) # compute the previous noisy sample x_t -> x_t-1 latents = self.scheduler.step(noise_pred, t, latents, **extra_step_kwargs, return_dict=False)[0] if num_channels_unet == 4: init_latents_proper = image_latents if self.do_classifier_free_guidance: init_mask, _ = mask.chunk(2) else: init_mask = mask if i < len(timesteps) - 1: noise_timestep = timesteps[i + 1] init_latents_proper = self.scheduler.add_noise( init_latents_proper, noise, torch.tensor([noise_timestep]) ) latents = (1 - init_mask) * init_latents_proper + init_mask * latents if latent_processing == 1: lst_latent.append(self.type_output("pil",device,prompt_embeds.dtype,return_dict,latents,generator,init_image,padding_mask_crop,mask_image,original_image,crops_coords).images[0]) if callback_on_step_end is not None: callback_kwargs = {} for k in callback_on_step_end_tensor_inputs: callback_kwargs[k] = locals()[k] callback_outputs = callback_on_step_end(self, i, t, callback_kwargs) latents = callback_outputs.pop("latents", latents) prompt_embeds = callback_outputs.pop("prompt_embeds", prompt_embeds) negative_prompt_embeds = callback_outputs.pop("negative_prompt_embeds", negative_prompt_embeds) mask = callback_outputs.pop("mask", mask) masked_image_latents = callback_outputs.pop("masked_image_latents", masked_image_latents) # call the callback, if provided if i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0): progress_bar.update() if callback is not None and i % callback_steps == 0: step_idx = i // getattr(self.scheduler, "order", 1) callback(step_idx, t, latents) if latent_processing == 1: if output_type == 'latent': lst_latent.append(self.type_output(output_type,device,prompt_embeds.dtype,return_dict,latents,generator,init_image,padding_mask_crop,mask_image,original_image,crops_coords).images[0]) return lst_latent if output_type == 'latent': return [self.type_output("pil",device,prompt_embeds.dtype,return_dict,latents,generator,init_image,padding_mask_crop,mask_image,original_image,crops_coords).images[0],self.type_output(output_type,device,prompt_embeds.dtype,return_dict,latents,generator,init_image,padding_mask_crop,mask_image,original_image,crops_coords).images[0]] return [self.type_output(output_type,device,prompt_embeds.dtype,return_dict,latents,generator,init_image,padding_mask_crop,mask_image,original_image,crops_coords).images[0]] class StableDiffusionControlNetInpaintPipeline_finetune(IPAdapterMixin,StableDiffusionControlNetInpaintPipeline): def type_output(self,output_type,device,d_type,return_dict,latents,generator,padding_mask_crop,mask_image,original_image,crops_coords): if not output_type == "latent": image = self.vae.decode(latents / self.vae.config.scaling_factor, return_dict=False, generator=generator)[ 0 ] image, has_nsfw_concept = self.run_safety_checker(image, device,d_type) else: image = latents has_nsfw_concept = None if has_nsfw_concept is None: do_denormalize = [True] * image.shape[0] else: do_denormalize = [not has_nsfw for has_nsfw in has_nsfw_concept] image = self.image_processor.postprocess(image, output_type=output_type, do_denormalize=do_denormalize) if padding_mask_crop is not None: image = [self.image_processor.apply_overlay(mask_image, original_image, i, crops_coords) for i in image] # Offload all models self.maybe_free_model_hooks() if not return_dict: return (image, has_nsfw_concept) return StableDiffusionPipelineOutput(images=image, nsfw_content_detected=has_nsfw_concept) @torch.no_grad() def __call__( self, prompt: Union[str, List[str]] = None, image: PipelineImageInput = None, mask_image: PipelineImageInput = None, control_image: PipelineImageInput = None, height: Optional[int] = None, width: Optional[int] = None, padding_mask_crop: Optional[int] = None, strength: float = 1.0, num_inference_steps: int = 50, guidance_scale: float = 7.5, negative_prompt: Optional[Union[str, List[str]]] = None, num_images_per_prompt: Optional[int] = 1, eta: float = 0.0, generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None, latents: Optional[torch.Tensor] = None, prompt_embeds: Optional[torch.Tensor] = None, negative_prompt_embeds: Optional[torch.Tensor] = None, ip_adapter_image: Optional[PipelineImageInput] = None, ip_adapter_image_embeds: Optional[List[torch.Tensor]] = None, output_type: Optional[str] = "pil", return_dict: bool = True, cross_attention_kwargs: Optional[Dict[str, Any]] = None, controlnet_conditioning_scale: Union[float, List[float]] = 0.5, guess_mode: bool = False, control_guidance_start: Union[float, List[float]] = 0.0, control_guidance_end: Union[float, List[float]] = 1.0, clip_skip: Optional[int] = None, callback_on_step_end: Optional[ Union[Callable[[int, int, Dict], None], PipelineCallback, MultiPipelineCallbacks] ] = None, callback_on_step_end_tensor_inputs: List[str] = ["latents"], region_map_state=None, weight_func = lambda w, sigma, qk: w * sigma * qk.std(), latent_processing = 0, image_t2i_adapter : Optional[PipelineImageInput] = None, adapter_conditioning_scale: Union[float, List[float]] = 1.0, adapter_conditioning_factor: float = 1.0, long_encode: int = 0, guidance_rescale: float = 0.0, **kwargs, ): callback = kwargs.pop("callback", None) callback_steps = kwargs.pop("callback_steps", None) if callback is not None: deprecate( "callback", "1.0.0", "Passing `callback` as an input argument to `__call__` is deprecated, consider using `callback_on_step_end`", ) if callback_steps is not None: deprecate( "callback_steps", "1.0.0", "Passing `callback_steps` as an input argument to `__call__` is deprecated, consider using `callback_on_step_end`", ) if isinstance(callback_on_step_end, (PipelineCallback, MultiPipelineCallbacks)): callback_on_step_end_tensor_inputs = callback_on_step_end.tensor_inputs if height is None: _,height = get_image_size(image) height = int((height // 8)*8) if width is None: width,_ = get_image_size(image) width = int((width // 8)*8) adapter_state = None if image_t2i_adapter is not None: height, width = default_height_width(self,height, width, image_t2i_adapter) adapter_state = preprocessing_t2i_adapter(self,image_t2i_adapter,width,height,adapter_conditioning_scale,num_images_per_prompt) controlnet = self.controlnet._orig_mod if is_compiled_module(self.controlnet) else self.controlnet # align format for control guidance if not isinstance(control_guidance_start, list) and isinstance(control_guidance_end, list): control_guidance_start = len(control_guidance_end) * [control_guidance_start] elif not isinstance(control_guidance_end, list) and isinstance(control_guidance_start, list): control_guidance_end = len(control_guidance_start) * [control_guidance_end] elif not isinstance(control_guidance_start, list) and not isinstance(control_guidance_end, list): mult = len(controlnet.nets) if isinstance(controlnet, MultiControlNetModel) else 1 control_guidance_start, control_guidance_end = ( mult * [control_guidance_start], mult * [control_guidance_end], ) # 1. Check inputs. Raise error if not correct self.check_inputs( prompt, control_image, mask_image, height, width, callback_steps, output_type, negative_prompt, prompt_embeds, negative_prompt_embeds, ip_adapter_image, ip_adapter_image_embeds, controlnet_conditioning_scale, control_guidance_start, control_guidance_end, callback_on_step_end_tensor_inputs, padding_mask_crop, ) self._guidance_scale = guidance_scale self._clip_skip = clip_skip self._cross_attention_kwargs = cross_attention_kwargs # 2. Define call parameters if prompt is not None and isinstance(prompt, str): batch_size = 1 elif prompt is not None and isinstance(prompt, list): batch_size = len(prompt) else: batch_size = prompt_embeds.shape[0] if padding_mask_crop is not None: height, width = self.image_processor.get_default_height_width(image, height, width) crops_coords = self.mask_processor.get_crop_region(mask_image, width, height, pad=padding_mask_crop) resize_mode = "fill" else: crops_coords = None resize_mode = "default" device = self._execution_device if isinstance(controlnet, MultiControlNetModel) and isinstance(controlnet_conditioning_scale, float): controlnet_conditioning_scale = [controlnet_conditioning_scale] * len(controlnet.nets) global_pool_conditions = ( controlnet.config.global_pool_conditions if isinstance(controlnet, ControlNetModel) else controlnet.nets[0].config.global_pool_conditions ) guess_mode = guess_mode or global_pool_conditions # 3. Encode input prompt text_encoder_lora_scale = ( self.cross_attention_kwargs.get("scale", None) if self.cross_attention_kwargs is not None else None ) #Copy input prompt_embeds and negative_prompt_embeds '''prompt_embeds_copy = None negative_prompt_embeds_copy = None if prompt_embeds is not None: prompt_embeds_copy = prompt_embeds.clone().detach() if negative_prompt_embeds is not None: negative_prompt_embeds_copy = negative_prompt_embeds.clone().detach()''' prompt_embeds, negative_prompt_embeds,text_input_ids = encode_prompt_function( self, prompt, device, num_images_per_prompt, self.do_classifier_free_guidance, negative_prompt, prompt_embeds=prompt_embeds, negative_prompt_embeds=negative_prompt_embeds, lora_scale=text_encoder_lora_scale, clip_skip=self.clip_skip, long_encode = long_encode, ) #Get token_id #text_input_ids,uncond_input_ids = get_id_text(self,prompt,max_length = prompt_embeds.shape[1],negative_prompt = negative_prompt,prompt_embeds = prompt_embeds_copy,negative_prompt_embeds = negative_prompt_embeds_copy) # For classifier free guidance, we need to do two forward passes. # Here we concatenate the unconditional and text embeddings into a single batch # to avoid doing two forward passes '''if text_input_ids is not None: text_input_ids = np.concatenate([uncond_input_ids, text_input_ids])''' #text_ids, text_embeddings = self.prompt_parser([negative_prompt, prompt]) #text_embeddings = text_embeddings.to(self.unet.dtype) # For classifier free guidance, we need to do two forward passes. # Here we concatenate the unconditional and text embeddings into a single batch # to avoid doing two forward passes if self.do_classifier_free_guidance: prompt_embeds = torch.cat([negative_prompt_embeds, prompt_embeds]) if ip_adapter_image is not None or ip_adapter_image_embeds is not None: image_embeds = self.prepare_ip_adapter_image_embeds( ip_adapter_image, ip_adapter_image_embeds, device, batch_size * num_images_per_prompt, self.do_classifier_free_guidance, ) # 4. Prepare image if isinstance(controlnet, ControlNetModel): control_image = self.prepare_control_image( image=control_image, width=width, height=height, batch_size=batch_size * num_images_per_prompt, num_images_per_prompt=num_images_per_prompt, device=device, dtype=controlnet.dtype, crops_coords=crops_coords, resize_mode=resize_mode, do_classifier_free_guidance=self.do_classifier_free_guidance, guess_mode=guess_mode, ) elif isinstance(controlnet, MultiControlNetModel): control_images = [] for control_image_ in control_image: control_image_ = self.prepare_control_image( image=control_image_, width=width, height=height, batch_size=batch_size * num_images_per_prompt, num_images_per_prompt=num_images_per_prompt, device=device, dtype=controlnet.dtype, crops_coords=crops_coords, resize_mode=resize_mode, do_classifier_free_guidance=self.do_classifier_free_guidance, guess_mode=guess_mode, ) control_images.append(control_image_) control_image = control_images else: assert False # 4.1 Preprocess mask and image - resizes image and mask w.r.t height and width original_image = image init_image = self.image_processor.preprocess( image, height=height, width=width, crops_coords=crops_coords, resize_mode=resize_mode ) init_image = init_image.to(dtype=torch.float32) mask = self.mask_processor.preprocess( mask_image, height=height, width=width, resize_mode=resize_mode, crops_coords=crops_coords ) masked_image = init_image * (mask < 0.5) _, _, height, width = init_image.shape #4.2 Preprocess region mao region_state = encode_region_map( self, region_map_state, width = width, height = height, num_images_per_prompt = num_images_per_prompt, text_ids=text_input_ids, ) if self.cross_attention_kwargs is None: self._cross_attention_kwargs ={} # 5. Prepare timesteps self.scheduler.set_timesteps(num_inference_steps, device=device) timesteps, num_inference_steps = self.get_timesteps( num_inference_steps=num_inference_steps, strength=strength, device=device ) # at which timestep to set the initial noise (n.b. 50% if strength is 0.5) latent_timestep = timesteps[:1].repeat(batch_size * num_images_per_prompt) # create a boolean to check if the strength is set to 1. if so then initialise the latents with pure noise is_strength_max = strength == 1.0 self._num_timesteps = len(timesteps) # 6. Prepare latent variables num_channels_latents = self.vae.config.latent_channels num_channels_unet = self.unet.config.in_channels return_image_latents = num_channels_unet == 4 latents_outputs = self.prepare_latents( batch_size * num_images_per_prompt, num_channels_latents, height, width, prompt_embeds.dtype, device, generator, latents, image=init_image, timestep=latent_timestep, is_strength_max=is_strength_max, return_noise=True, return_image_latents=return_image_latents, ) if return_image_latents: latents, noise, image_latents = latents_outputs else: latents, noise = latents_outputs # 7. Prepare mask latent variables mask, masked_image_latents = self.prepare_mask_latents( mask, masked_image, batch_size * num_images_per_prompt, height, width, prompt_embeds.dtype, device, generator, self.do_classifier_free_guidance, ) # 7. Prepare extra step kwargs. TODO: Logic should ideally just be moved out of the pipeline extra_step_kwargs = self.prepare_extra_step_kwargs(generator, eta) # 7.1 Add image embeds for IP-Adapter added_cond_kwargs = ( {"image_embeds": image_embeds} if ip_adapter_image is not None or ip_adapter_image_embeds is not None else None ) # 7.2 Create tensor stating which controlnets to keep controlnet_keep = [] for i in range(len(timesteps)): keeps = [ 1.0 - float(i / len(timesteps) < s or (i + 1) / len(timesteps) > e) for s, e in zip(control_guidance_start, control_guidance_end) ] controlnet_keep.append(keeps[0] if isinstance(controlnet, ControlNetModel) else keeps) lst_latent =[] if latent_processing == 1: lst_latent = [self.type_output("pil",device,prompt_embeds.dtype,return_dict,latents,generator,padding_mask_crop,mask_image,original_image,crops_coords).images[0]] # 8. Denoising loop num_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.order with self.progress_bar(total=num_inference_steps) as progress_bar: for i, t in enumerate(timesteps): # expand the latents if we are doing classifier free guidance latent_model_input = torch.cat([latents] * 2) if self.do_classifier_free_guidance else latents latent_model_input = self.scheduler.scale_model_input(latent_model_input, t) # controlnet(s) inference if guess_mode and self.do_classifier_free_guidance: # Infer ControlNet only for the conditional batch. control_model_input = latents control_model_input = self.scheduler.scale_model_input(control_model_input, t) controlnet_prompt_embeds = prompt_embeds.chunk(2)[1] else: control_model_input = latent_model_input controlnet_prompt_embeds = prompt_embeds if isinstance(controlnet_keep[i], list): cond_scale = [c * s for c, s in zip(controlnet_conditioning_scale, controlnet_keep[i])] else: controlnet_cond_scale = controlnet_conditioning_scale if isinstance(controlnet_cond_scale, list): controlnet_cond_scale = controlnet_cond_scale[0] cond_scale = controlnet_cond_scale * controlnet_keep[i] down_block_res_samples, mid_block_res_sample = self.controlnet( control_model_input, t, encoder_hidden_states=controlnet_prompt_embeds, controlnet_cond=control_image, conditioning_scale=cond_scale, guess_mode=guess_mode, return_dict=False, ) if guess_mode and self.do_classifier_free_guidance: # Infered ControlNet only for the conditional batch. # To apply the output of ControlNet to both the unconditional and conditional batches, # add 0 to the unconditional batch to keep it unchanged. down_block_res_samples = [torch.cat([torch.zeros_like(d), d]) for d in down_block_res_samples] mid_block_res_sample = torch.cat([torch.zeros_like(mid_block_res_sample), mid_block_res_sample]) # predict the noise residual if num_channels_unet == 9: latent_model_input = torch.cat([latent_model_input, mask, masked_image_latents], dim=1) region_prompt = { "region_state": region_state, "sigma": self.scheduler.sigmas[i], "weight_func": weight_func, } self._cross_attention_kwargs["region_prompt"] = region_prompt down_intrablock_additional_residuals = None if adapter_state is not None: if i < int(num_inference_steps * adapter_conditioning_factor): down_intrablock_additional_residuals = [state.clone() for state in adapter_state] else: down_intrablock_additional_residuals = None noise_pred = self.unet( latent_model_input, t, encoder_hidden_states=prompt_embeds, cross_attention_kwargs=self.cross_attention_kwargs, down_block_additional_residuals=down_block_res_samples, mid_block_additional_residual=mid_block_res_sample, down_intrablock_additional_residuals = down_intrablock_additional_residuals, added_cond_kwargs=added_cond_kwargs, return_dict=False, )[0] # perform guidance if self.do_classifier_free_guidance: noise_pred_uncond, noise_pred_text = noise_pred.chunk(2) noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond) if self.do_classifier_free_guidance and guidance_rescale > 0.0: # Based on 3.4. in https://arxiv.org/pdf/2305.08891.pdf noise_pred = rescale_noise_cfg(noise_pred, noise_pred_text, guidance_rescale=guidance_rescale) # compute the previous noisy sample x_t -> x_t-1 latents = self.scheduler.step(noise_pred, t, latents, **extra_step_kwargs, return_dict=False)[0] if num_channels_unet == 4: init_latents_proper = image_latents if self.do_classifier_free_guidance: init_mask, _ = mask.chunk(2) else: init_mask = mask if i < len(timesteps) - 1: noise_timestep = timesteps[i + 1] init_latents_proper = self.scheduler.add_noise( init_latents_proper, noise, torch.tensor([noise_timestep]) ) latents = (1 - init_mask) * init_latents_proper + init_mask * latents if latent_processing == 1: lst_latent.append(self.type_output("pil",device,prompt_embeds.dtype,return_dict,latents,generator,padding_mask_crop,mask_image,original_image,crops_coords).images[0]) if callback_on_step_end is not None: callback_kwargs = {} for k in callback_on_step_end_tensor_inputs: callback_kwargs[k] = locals()[k] callback_outputs = callback_on_step_end(self, i, t, callback_kwargs) latents = callback_outputs.pop("latents", latents) prompt_embeds = callback_outputs.pop("prompt_embeds", prompt_embeds) negative_prompt_embeds = callback_outputs.pop("negative_prompt_embeds", negative_prompt_embeds) # call the callback, if provided if i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0): progress_bar.update() if callback is not None and i % callback_steps == 0: step_idx = i // getattr(self.scheduler, "order", 1) callback(step_idx, t, latents) # If we do sequential model offloading, let's offload unet and controlnet # manually for max memory savings if hasattr(self, "final_offload_hook") and self.final_offload_hook is not None: self.unet.to("cpu") self.controlnet.to("cpu") torch.cuda.empty_cache() if latent_processing == 1: if output_type == 'latent': lst_latent.append(self.type_output(output_type,device,prompt_embeds.dtype,return_dict,latents,generator,padding_mask_crop,mask_image,original_image,crops_coords).images[0]) return lst_latent if output_type == 'latent': return [self.type_output("pil",device,prompt_embeds.dtype,return_dict,latents,generator,padding_mask_crop,mask_image,original_image,crops_coords).images[0],self.type_output(output_type,device,prompt_embeds.dtype,return_dict,latents,generator,init_image,padding_mask_crop,mask_image,original_image,crops_coords).images[0]] return [self.type_output(output_type,device,prompt_embeds.dtype,return_dict,latents,generator,padding_mask_crop,mask_image,original_image,crops_coords).images[0]]