from typing import  Dict, List, Any
import torch
from diffusers import DPMSolverMultistepScheduler, EulerDiscreteScheduler, EulerDiscreteScheduler, DDIMScheduler, StableDiffusionInpaintPipeline, AutoPipelineForInpainting, AutoPipelineForImage2Image, DiffusionPipeline, StableDiffusionXLImg2ImgPipeline, StableDiffusionControlNetInpaintPipeline, ControlNetModel, StableDiffusionPipeline
from PIL import Image
import base64
from io import BytesIO
import numpy as np
import cv2

# set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

if device.type != 'cuda':
    raise ValueError("need to run on GPU")

class EndpointHandler():
    def __init__(self, path=""):

        #self.fast_pipe = AutoPipelineForInpainting.from_pretrained("diffusers/stable-diffusion-xl-1.0-inpainting-0.1", torch_dtype=torch.float16, variant="fp16").to("cuda")
        #self.generator = torch.Generator(device="cuda").manual_seed(0)


        # self.smooth_pipe = StableDiffusionXLImg2ImgPipeline.from_pretrained(
        #     "stabilityai/stable-diffusion-xl-refiner-1.0", torch_dtype=torch.float16, variant="fp16", use_safetensors=True
        # )
        # self.smooth_pipe.to("cuda")

        
        self.controlnet = ControlNetModel.from_pretrained(
            "lllyasviel/control_v11p_sd15_inpaint", torch_dtype=torch.float16
        )


        self.pipe = StableDiffusionControlNetInpaintPipeline.from_pretrained(
            "runwayml/stable-diffusion-v1-5", controlnet=self.controlnet, torch_dtype=torch.float16
        )

        self.pipe.scheduler = EulerDiscreteScheduler.from_config(self.pipe.scheduler.config)
        self.pipe.enable_model_cpu_offload()
        self.pipe.enable_xformers_memory_efficient_attention()
        
        """
        # load StableDiffusionInpaintPipeline pipeline
        self.pipe = AutoPipelineForInpainting.from_pretrained(
            "runwayml/stable-diffusion-inpainting",
            revision="fp16",
            torch_dtype=torch.float16,
        )
        # use DPMSolverMultistepScheduler
        self.pipe.scheduler = DPMSolverMultistepScheduler.from_config(self.pipe.scheduler.config)

        self.pipe.enable_model_cpu_offload()

        self.pipe.enable_xformers_memory_efficient_attention()
        
        # move to device
        #self.pipe = self.pipe.to(device)

        self.pipe2 = AutoPipelineForInpainting.from_pretrained("stabilityai/stable-diffusion-xl-refiner-1.0", torch_dtype=torch.float16, variant="fp16", use_safetensors=True)
        #self.pipe2.enable_model_cpu_offload()
        self.pipe2.enable_xformers_memory_efficient_attention()
        
        self.pipe2.to("cuda")

        self.pipe3 = AutoPipelineForImage2Image.from_pipe(self.pipe2)
        #self.pipe3.enable_model_cpu_offload()
        self.pipe3.enable_xformers_memory_efficient_attention()
        """


    def __call__(self, data: Any) -> List[List[Dict[str, float]]]:
        """
        :param data: A dictionary contains `inputs` and optional `image` field.
        :return: A dictionary with `image` field contains image in base64.
        """
        encoded_image = data.pop("image", None)
        encoded_mask_image = data.pop("mask_image", None)
        
        prompt = data.pop("prompt", "")

        negative_prompt = data.pop("negative_prompt", "")

        method = data.pop("method", "slow")
        strength = data.pop("strength", 0.2)
        guidance_scale = data.pop("guidance_scale", 8.0)
        num_inference_steps = data.pop("num_inference_steps", 20)
        """
        if(method == "smooth"):
            if encoded_image is not None:
                image = self.decode_base64_image(encoded_image)
                out = self.smooth_pipe(prompt, image=image).images[0]
    
                return out
        """
        
        # process image
        if encoded_image is not None and encoded_mask_image is not None:
            image = self.decode_base64_image(encoded_image).convert("RGB")
            mask_image = self.decode_base64_image(encoded_mask_image).convert("RGB")
        else:
            image = None
            mask_image = None 

        
        """
        if(method == "fast"):
            image = self.fast_pipe(
                prompt=prompt,
                negative_prompt=negative_prompt,
                image=image,
                mask_image=mask_image,
                guidance_scale=guidance_scale,
                num_inference_steps=num_inference_steps,  # steps between 15 and 30 work well for us
                strength=strength,  # make sure to use `strength` below 1.0
                generator=self.generator,
            ).images[0]
    
            return image
        """

        #pipe = AutoPipelineForInpainting.from_pretrained("diffusers/stable-diffusion-xl-1.0-inpainting-0.1", torch_dtype=torch.float16, variant="fp16").to("cuda")
        """
        # run inference pipeline
        out = self.pipe(prompt=prompt, negative_prompt=negative_prompt, image=image, mask_image=mask_image, num_inference_steps=num_inference_steps, guidance_scale=guidance_scale)

        print("1st pipeline part successful!")

        image = out.images[0].resize((1024, 1024))

        print("image resizing successful!")
        
        image = self.pipe2(
            prompt=prompt,
            negative_prompt=negative_prompt,
            image=image,
            mask_image=mask_image,
            guidance_scale=guidance_scale, #8.0
            num_inference_steps=int(num_inference_steps/10), #100
            strength=strength, #0.2
            output_type="latent",  # let's keep in latent to save some VRAM
        ).images[0]

        print("2nd pipeline part successful!")
        
        image2 = self.pipe3(
            prompt=prompt,
            image=image,
            guidance_scale=guidance_scale, #8.0
            num_inference_steps=int(num_inference_steps/10), #100
            strength=strength, #0.2
        ).images[0]

        print("3rd pipeline part successful!")
        
            
        # return first generate PIL image
        return image2
        """
        
        
        control_image = self.make_inpaint_condition(image, mask_image)

        # generate image
        image = self.pipe(
            prompt=prompt,
            negative_prompt=negative_prompt,
            num_inference_steps=num_inference_steps,
            eta=1.0,
            image=image,
            mask_image=mask_image,
            control_image=control_image,
            guidance_scale=guidance_scale,
            strength=strength
        ).images[0]

        return image
        
        
    
    # helper to decode input image
    def decode_base64_image(self, image_string):
        base64_image = base64.b64decode(image_string)
        buffer = BytesIO(base64_image)
        image = Image.open(buffer)
        return image

    def make_inpaint_condition(self, image, image_mask):
        image = np.array(image.convert("RGB")).astype(np.float32) / 255.0
        image_mask = np.array(image_mask.convert("L")).astype(np.float32) / 255.0
    
        assert image.shape[0:1] == image_mask.shape[0:1], "image and image_mask must have the same image size"
        image[image_mask > 0.5] = -1.0  # set as masked pixel
        image = np.expand_dims(image, 0).transpose(0, 3, 1, 2)
        image = torch.from_numpy(image)
        return image