Spaces:
Running
on
Zero
Running
on
Zero
| import gradio as gr | |
| import torch | |
| from PIL import Image | |
| from diffusers import PriorTransformer, UNet2DConditionModel, KandinskyV22Pipeline | |
| from huggingface_hub import hf_hub_download | |
| from transformers import CLIPVisionModelWithProjection, CLIPImageProcessor, CLIPTokenizer, CLIPTextModelWithProjection | |
| from model import pops_utils | |
| from model.pipeline_pops import pOpsPipeline | |
| kandinsky_prior_repo: str = 'kandinsky-community/kandinsky-2-2-prior' | |
| kandinsky_decoder_repo: str = 'kandinsky-community/kandinsky-2-2-decoder' | |
| prior_texture_repo: str = 'models/texturing/learned_prior.pth' | |
| prior_instruct_repo: str = 'models/instruct/learned_prior.pth' | |
| prior_scene_repo: str = 'models/scene/learned_prior.pth' | |
| prior_repo = "pOpsPaper/operators" | |
| # gpu = torch.device('cuda') | |
| # cpu = torch.device('cpu') | |
| class PopsPipelines: | |
| def __init__(self): | |
| weight_dtype = torch.float16 | |
| self.weight_dtype = weight_dtype | |
| device = 'cpu' #torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| self.device = 'cuda' #device | |
| self.image_encoder = CLIPVisionModelWithProjection.from_pretrained(kandinsky_prior_repo, | |
| subfolder='image_encoder', | |
| torch_dtype=weight_dtype).eval() | |
| self.image_encoder.requires_grad_(False) | |
| self.image_processor = CLIPImageProcessor.from_pretrained(kandinsky_prior_repo, | |
| subfolder='image_processor') | |
| self.tokenizer = CLIPTokenizer.from_pretrained(kandinsky_prior_repo, subfolder='tokenizer') | |
| self.text_encoder = CLIPTextModelWithProjection.from_pretrained(kandinsky_prior_repo, | |
| subfolder='text_encoder', | |
| torch_dtype=weight_dtype).eval().to(device) | |
| # Load full model for vis | |
| self.unet = UNet2DConditionModel.from_pretrained(kandinsky_decoder_repo, | |
| subfolder='unet').to(torch.float16).to(device) | |
| self.decoder = KandinskyV22Pipeline.from_pretrained(kandinsky_decoder_repo, unet=self.unet, | |
| torch_dtype=torch.float16) | |
| self.decoder = self.decoder.to(device) | |
| self.priors_dict = { | |
| 'texturing':{'repo':prior_texture_repo}, | |
| 'instruct': {'repo': prior_instruct_repo}, | |
| 'scene': {'repo':prior_scene_repo} | |
| } | |
| for prior_type in self.priors_dict: | |
| prior_path = self.priors_dict[prior_type]['repo'] | |
| prior = PriorTransformer.from_pretrained( | |
| kandinsky_prior_repo, subfolder="prior" | |
| ) | |
| # Load from huggingface | |
| prior_path = hf_hub_download(repo_id=prior_repo, filename=str(prior_path)) | |
| prior_state_dict = torch.load(prior_path, map_location=device) | |
| prior.load_state_dict(prior_state_dict, strict=False) | |
| prior.eval() | |
| prior = prior.to(weight_dtype) | |
| prior_pipeline = pOpsPipeline.from_pretrained(kandinsky_prior_repo, | |
| prior=prior, | |
| image_encoder=self.image_encoder, | |
| torch_dtype=torch.float16) | |
| self.priors_dict[prior_type]['pipeline'] = prior_pipeline | |
| def process_image(self, input_path): | |
| if input_path is None: | |
| return None | |
| image_pil = Image.open(input_path).convert("RGB").resize((512, 512)) | |
| image = torch.Tensor(self.image_processor(image_pil)['pixel_values'][0]).to(self.device).unsqueeze(0).to( | |
| self.weight_dtype) | |
| return image | |
| def process_text(self, text): | |
| self.text_encoder.to('cuda') | |
| text_inputs = self.tokenizer( | |
| text, | |
| padding="max_length", | |
| max_length=self.tokenizer.model_max_length, | |
| truncation=True, | |
| return_tensors="pt", | |
| ) | |
| mask = text_inputs.attention_mask.bool() # [0] | |
| text_encoder_output = self.text_encoder(text_inputs.input_ids.to(self.device)) | |
| text_encoder_hidden_states = text_encoder_output.last_hidden_state | |
| text_encoder_concat = text_encoder_hidden_states[:, :mask.sum().item()] | |
| self.text_encoder.to('cpu') | |
| return text_encoder_concat | |
| def run_binary(self, input_a, input_b, prior_type): | |
| # Move pipeline to GPU | |
| pipeline = self.priors_dict[prior_type]['pipeline'] | |
| pipeline.to('cuda') | |
| self.image_encoder.to('cuda') | |
| input_image_embeds, input_hidden_state = pops_utils.preprocess(input_a, input_b, | |
| self.image_encoder, | |
| pipeline.prior.clip_mean.detach(), | |
| pipeline.prior.clip_std.detach()) | |
| negative_input_embeds = torch.zeros_like(input_image_embeds) | |
| negative_hidden_states = torch.zeros_like(input_hidden_state) | |
| guidance_scale = 1.0 | |
| if prior_type == 'texturing': | |
| guidance_scale = 8.0 | |
| img_emb = pipeline(input_embeds=input_image_embeds, input_hidden_states=input_hidden_state, | |
| negative_input_embeds=negative_input_embeds, | |
| negative_input_hidden_states=negative_hidden_states, | |
| num_inference_steps=25, | |
| num_images_per_prompt=1, | |
| guidance_scale=guidance_scale) | |
| # Optional | |
| if prior_type == 'scene': | |
| # Scene is the closet to what avg represents for a background image so incorporate that as well | |
| mean_emb = 0.5 * input_hidden_state[:, 0] + 0.5 * input_hidden_state[:, 1] | |
| mean_emb = (mean_emb * pipeline.prior.clip_std) + pipeline.prior.clip_mean | |
| alpha = 0.4 | |
| img_emb.image_embeds = (1 - alpha) * img_emb.image_embeds + alpha * mean_emb | |
| # Move pipeline to CPU | |
| pipeline.to('cpu') | |
| self.image_encoder.to('cpu') | |
| return img_emb | |
| def run_instruct(self, input_a, text): | |
| text_encodings = self.process_text(text) | |
| # Move pipeline to GPU | |
| instruct_pipeline = self.priors_dict['instruct']['pipeline'] | |
| instruct_pipeline.to('cuda') | |
| self.image_encoder.to('cuda') | |
| input_image_embeds, input_hidden_state = pops_utils.preprocess(input_a, None, | |
| self.image_encoder, | |
| instruct_pipeline.prior.clip_mean.detach(), instruct_pipeline.prior.clip_std.detach(), | |
| concat_hidden_states=text_encodings) | |
| negative_input_embeds = torch.zeros_like(input_image_embeds) | |
| negative_hidden_states = torch.zeros_like(input_hidden_state) | |
| img_emb = instruct_pipeline(input_embeds=input_image_embeds, input_hidden_states=input_hidden_state, | |
| negative_input_embeds=negative_input_embeds, | |
| negative_input_hidden_states=negative_hidden_states, | |
| num_inference_steps=25, | |
| num_images_per_prompt=1, | |
| guidance_scale=1.0) | |
| # Move pipeline to CPU | |
| instruct_pipeline.to('cpu') | |
| self.image_encoder.to('cpu') | |
| return img_emb | |
| def render(self, img_emb): | |
| self.decoder.to('cuda') | |
| images = self.decoder(image_embeds=img_emb.image_embeds, negative_image_embeds=img_emb.negative_image_embeds, | |
| num_inference_steps=50, height=512, | |
| width=512, guidance_scale=4).images | |
| self.decoder.to('cpu') | |
| return images[0] | |
| def run_instruct_texture(self, image_object_path, text_instruct, image_texture_path): | |
| # Process both inputs | |
| image_object = self.process_image(image_object_path) | |
| image_texture = self.process_image(image_texture_path) | |
| if image_object is None: | |
| raise gr.Error('Object image is required') | |
| current_emb = None | |
| if image_texture is None: | |
| instruct_input = image_object | |
| else: | |
| # Run texturing | |
| current_emb = self.run_binary(input_a=image_object, input_b=image_texture,prior_type='texturing') | |
| instruct_input = current_emb.image_embeds | |
| if text_instruct != '': | |
| current_emb = self.run_instruct(input_a=instruct_input, text=text_instruct) | |
| if current_emb is None: | |
| raise gr.Error('At least one of the inputs is required') | |
| # Render as image | |
| image = self.render(current_emb) | |
| return image | |
| def run_texture_scene(self, image_object_path, image_texture_path, image_scene_path): | |
| # Process both inputs | |
| image_object = self.process_image(image_object_path) | |
| image_texture = self.process_image(image_texture_path) | |
| image_scene = self.process_image(image_scene_path) | |
| if image_object is None: | |
| raise gr.Error('Object image is required') | |
| current_emb = None | |
| if image_texture is None: | |
| scene_input = image_object | |
| else: | |
| # Run texturing | |
| current_emb = self.run_binary(input_a=image_object, input_b=image_scene,prior_type='scene') | |
| scene_input = current_emb.image_embeds | |
| # Run scene | |
| if image_scene is not None: | |
| current_emb = self.run_binary(input_a=scene_input, input_b=image_texture,prior_type='texturing') | |
| if current_emb is None: | |
| raise gr.Error('At least one of the images is required') | |
| # Render as image | |
| image = self.render(current_emb) | |
| return image | |