Spaces:
Runtime error
Runtime error
| """ | |
| Util functions based on Diffuser framework. | |
| """ | |
| import os | |
| import torch | |
| import cv2 | |
| import numpy as np | |
| import torch.nn.functional as F | |
| from tqdm import tqdm | |
| from PIL import Image | |
| from torchvision.utils import save_image | |
| from torchvision.io import read_image | |
| from diffusers import StableDiffusionPipeline | |
| from pytorch_lightning import seed_everything | |
| class MasaCtrlPipeline(StableDiffusionPipeline): | |
| def next_step( | |
| self, | |
| model_output: torch.FloatTensor, | |
| timestep: int, | |
| x: torch.FloatTensor, | |
| eta=0., | |
| verbose=False | |
| ): | |
| """ | |
| Inverse sampling for DDIM Inversion | |
| """ | |
| if verbose: | |
| print("timestep: ", timestep) | |
| next_step = timestep | |
| timestep = min(timestep - self.scheduler.config.num_train_timesteps // self.scheduler.num_inference_steps, 999) | |
| alpha_prod_t = self.scheduler.alphas_cumprod[timestep] if timestep >= 0 else self.scheduler.final_alpha_cumprod | |
| alpha_prod_t_next = self.scheduler.alphas_cumprod[next_step] | |
| beta_prod_t = 1 - alpha_prod_t | |
| pred_x0 = (x - beta_prod_t**0.5 * model_output) / alpha_prod_t**0.5 | |
| pred_dir = (1 - alpha_prod_t_next)**0.5 * model_output | |
| x_next = alpha_prod_t_next**0.5 * pred_x0 + pred_dir | |
| return x_next, pred_x0 | |
| def step( | |
| self, | |
| model_output: torch.FloatTensor, | |
| timestep: int, | |
| x: torch.FloatTensor, | |
| eta: float=0.0, | |
| verbose=False, | |
| ): | |
| """ | |
| predict the sampe the next step in the denoise process. | |
| """ | |
| prev_timestep = timestep - self.scheduler.config.num_train_timesteps // self.scheduler.num_inference_steps | |
| alpha_prod_t = self.scheduler.alphas_cumprod[timestep] | |
| alpha_prod_t_prev = self.scheduler.alphas_cumprod[prev_timestep] if prev_timestep > 0 else self.scheduler.final_alpha_cumprod | |
| beta_prod_t = 1 - alpha_prod_t | |
| pred_x0 = (x - beta_prod_t**0.5 * model_output) / alpha_prod_t**0.5 | |
| pred_dir = (1 - alpha_prod_t_prev)**0.5 * model_output | |
| x_prev = alpha_prod_t_prev**0.5 * pred_x0 + pred_dir | |
| return x_prev, pred_x0 | |
| def image2latent(self, image): | |
| DEVICE = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") | |
| if type(image) is Image: | |
| image = np.array(image) | |
| image = torch.from_numpy(image).float() / 127.5 - 1 | |
| image = image.permute(2, 0, 1).unsqueeze(0).to(DEVICE) | |
| # input image density range [-1, 1] | |
| latents = self.vae.encode(image)['latent_dist'].mean | |
| latents = latents * 0.18215 | |
| return latents | |
| def latent2image(self, latents, return_type='np'): | |
| latents = 1 / 0.18215 * latents.detach() | |
| image = self.vae.decode(latents)['sample'] | |
| if return_type == 'np': | |
| image = (image / 2 + 0.5).clamp(0, 1) | |
| image = image.cpu().permute(0, 2, 3, 1).numpy()[0] | |
| image = (image * 255).astype(np.uint8) | |
| elif return_type == "pt": | |
| image = (image / 2 + 0.5).clamp(0, 1) | |
| return image | |
| def latent2image_grad(self, latents): | |
| latents = 1 / 0.18215 * latents | |
| image = self.vae.decode(latents)['sample'] | |
| return image # range [-1, 1] | |
| def __call__( | |
| self, | |
| prompt, | |
| batch_size=1, | |
| height=512, | |
| width=512, | |
| num_inference_steps=50, | |
| guidance_scale=7.5, | |
| eta=0.0, | |
| latents=None, | |
| unconditioning=None, | |
| neg_prompt=None, | |
| ref_intermediate_latents=None, | |
| return_intermediates=False, | |
| **kwds): | |
| DEVICE = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") | |
| if isinstance(prompt, list): | |
| batch_size = len(prompt) | |
| elif isinstance(prompt, str): | |
| if batch_size > 1: | |
| prompt = [prompt] * batch_size | |
| # text embeddings | |
| text_input = self.tokenizer( | |
| prompt, | |
| padding="max_length", | |
| max_length=77, | |
| return_tensors="pt" | |
| ) | |
| text_embeddings = self.text_encoder(text_input.input_ids.to(DEVICE))[0] | |
| print("input text embeddings :", text_embeddings.shape) | |
| if kwds.get("dir"): | |
| dir = text_embeddings[-2] - text_embeddings[-1] | |
| u, s, v = torch.pca_lowrank(dir.transpose(-1, -2), q=1, center=True) | |
| text_embeddings[-1] = text_embeddings[-1] + kwds.get("dir") * v | |
| print(u.shape) | |
| print(v.shape) | |
| # define initial latents | |
| latents_shape = (batch_size, self.unet.config.in_channels, height//8, width//8) | |
| if latents is None: | |
| latents = torch.randn(latents_shape, device=DEVICE) | |
| else: | |
| assert latents.shape == latents_shape, f"The shape of input latent tensor {latents.shape} should equal to predefined one." | |
| # unconditional embedding for classifier free guidance | |
| if guidance_scale > 1.: | |
| max_length = text_input.input_ids.shape[-1] | |
| if neg_prompt: | |
| uc_text = neg_prompt | |
| else: | |
| uc_text = "" | |
| # uc_text = "ugly, tiling, poorly drawn hands, poorly drawn feet, body out of frame, cut off, low contrast, underexposed, distorted face" | |
| unconditional_input = self.tokenizer( | |
| [uc_text] * batch_size, | |
| padding="max_length", | |
| max_length=77, | |
| return_tensors="pt" | |
| ) | |
| # unconditional_input.input_ids = unconditional_input.input_ids[:, 1:] | |
| unconditional_embeddings = self.text_encoder(unconditional_input.input_ids.to(DEVICE))[0] | |
| text_embeddings = torch.cat([unconditional_embeddings, text_embeddings], dim=0) | |
| print("latents shape: ", latents.shape) | |
| # iterative sampling | |
| self.scheduler.set_timesteps(num_inference_steps) | |
| # print("Valid timesteps: ", reversed(self.scheduler.timesteps)) | |
| latents_list = [latents] | |
| pred_x0_list = [latents] | |
| for i, t in enumerate(tqdm(self.scheduler.timesteps, desc="DDIM Sampler")): | |
| if ref_intermediate_latents is not None: | |
| # note that the batch_size >= 2 | |
| latents_ref = ref_intermediate_latents[-1 - i] | |
| _, latents_cur = latents.chunk(2) | |
| latents = torch.cat([latents_ref, latents_cur]) | |
| if guidance_scale > 1.: | |
| model_inputs = torch.cat([latents] * 2) | |
| else: | |
| model_inputs = latents | |
| if unconditioning is not None and isinstance(unconditioning, list): | |
| _, text_embeddings = text_embeddings.chunk(2) | |
| text_embeddings = torch.cat([unconditioning[i].expand(*text_embeddings.shape), text_embeddings]) | |
| # predict tghe noise | |
| noise_pred = self.unet(model_inputs, t, encoder_hidden_states=text_embeddings).sample | |
| if guidance_scale > 1.: | |
| noise_pred_uncon, noise_pred_con = noise_pred.chunk(2, dim=0) | |
| noise_pred = noise_pred_uncon + guidance_scale * (noise_pred_con - noise_pred_uncon) | |
| # compute the previous noise sample x_t -> x_t-1 | |
| latents, pred_x0 = self.step(noise_pred, t, latents) | |
| latents_list.append(latents) | |
| pred_x0_list.append(pred_x0) | |
| image = self.latent2image(latents, return_type="pt") | |
| if return_intermediates: | |
| pred_x0_list = [self.latent2image(img, return_type="pt") for img in pred_x0_list] | |
| latents_list = [self.latent2image(img, return_type="pt") for img in latents_list] | |
| return image, pred_x0_list, latents_list | |
| return image | |
| def invert( | |
| self, | |
| image: torch.Tensor, | |
| prompt, | |
| num_inference_steps=50, | |
| guidance_scale=7.5, | |
| eta=0.0, | |
| return_intermediates=False, | |
| **kwds): | |
| """ | |
| invert a real image into noise map with determinisc DDIM inversion | |
| """ | |
| DEVICE = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") | |
| batch_size = image.shape[0] | |
| if isinstance(prompt, list): | |
| if batch_size == 1: | |
| image = image.expand(len(prompt), -1, -1, -1) | |
| elif isinstance(prompt, str): | |
| if batch_size > 1: | |
| prompt = [prompt] * batch_size | |
| # text embeddings | |
| text_input = self.tokenizer( | |
| prompt, | |
| padding="max_length", | |
| max_length=77, | |
| return_tensors="pt" | |
| ) | |
| text_embeddings = self.text_encoder(text_input.input_ids.to(DEVICE))[0] | |
| print("input text embeddings :", text_embeddings.shape) | |
| # define initial latents | |
| latents = self.image2latent(image) | |
| start_latents = latents | |
| # print(latents) | |
| # exit() | |
| # unconditional embedding for classifier free guidance | |
| if guidance_scale > 1.: | |
| max_length = text_input.input_ids.shape[-1] | |
| unconditional_input = self.tokenizer( | |
| [""] * batch_size, | |
| padding="max_length", | |
| max_length=77, | |
| return_tensors="pt" | |
| ) | |
| unconditional_embeddings = self.text_encoder(unconditional_input.input_ids.to(DEVICE))[0] | |
| text_embeddings = torch.cat([unconditional_embeddings, text_embeddings], dim=0) | |
| print("latents shape: ", latents.shape) | |
| # interative sampling | |
| self.scheduler.set_timesteps(num_inference_steps) | |
| print("Valid timesteps: ", reversed(self.scheduler.timesteps)) | |
| # print("attributes: ", self.scheduler.__dict__) | |
| latents_list = [latents] | |
| pred_x0_list = [latents] | |
| for i, t in enumerate(tqdm(reversed(self.scheduler.timesteps), desc="DDIM Inversion")): | |
| if guidance_scale > 1.: | |
| model_inputs = torch.cat([latents] * 2) | |
| else: | |
| model_inputs = latents | |
| # predict the noise | |
| noise_pred = self.unet(model_inputs, t, encoder_hidden_states=text_embeddings).sample | |
| if guidance_scale > 1.: | |
| noise_pred_uncon, noise_pred_con = noise_pred.chunk(2, dim=0) | |
| noise_pred = noise_pred_uncon + guidance_scale * (noise_pred_con - noise_pred_uncon) | |
| # compute the previous noise sample x_t-1 -> x_t | |
| latents, pred_x0 = self.next_step(noise_pred, t, latents) | |
| latents_list.append(latents) | |
| pred_x0_list.append(pred_x0) | |
| if return_intermediates: | |
| # return the intermediate laters during inversion | |
| # pred_x0_list = [self.latent2image(img, return_type="pt") for img in pred_x0_list] | |
| return latents, latents_list | |
| return latents, start_latents | |