import gradio as gr
import json
import torch
import time
import random
try:
    # Only on HuggingFace
    import spaces
    is_space_imported = True
except ImportError:
    is_space_imported = False

from tqdm import tqdm
from huggingface_hub import snapshot_download
from models import AudioDiffusion, DDPMScheduler
from audioldm.audio.stft import TacotronSTFT
from audioldm.variational_autoencoder import AutoencoderKL

# Old import
import numpy as np
import torch.nn.functional as F
from torchvision.transforms.functional import normalize
from huggingface_hub import hf_hub_download
from gradio_imageslider import ImageSlider
from briarmbg import BriaRMBG
import PIL
from PIL import Image
from typing import Tuple

max_64_bit_int = 2**63 - 1

# Automatic device detection
if torch.cuda.is_available():
    device_type = "cuda"
    device_selection = "cuda:0"
else:
    device_type = "cpu"
    device_selection = "cpu"

class Tango:
    def __init__(self, name = "declare-lab/tango2", device = device_selection):
        
        path = snapshot_download(repo_id = name)
        
        vae_config = json.load(open("{}/vae_config.json".format(path)))
        stft_config = json.load(open("{}/stft_config.json".format(path)))
        main_config = json.load(open("{}/main_config.json".format(path)))
        
        self.vae = AutoencoderKL(**vae_config).to(device)
        self.stft = TacotronSTFT(**stft_config).to(device)
        self.model = AudioDiffusion(**main_config).to(device)
        
#        vae_weights = torch.load("{}/pytorch_model_vae.bin".format(path), map_location = device)
#        stft_weights = torch.load("{}/pytorch_model_stft.bin".format(path), map_location = device)
#        main_weights = torch.load("{}/pytorch_model_main.bin".format(path), map_location = device)
#        
#        self.vae.load_state_dict(vae_weights)
#        self.stft.load_state_dict(stft_weights)
#        self.model.load_state_dict(main_weights)
#
#        print ("Successfully loaded checkpoint from:", name)
#        
#        self.vae.eval()
#        self.stft.eval()
#        self.model.eval()
#        
#        self.scheduler = DDPMScheduler.from_pretrained(main_config["scheduler_name"], subfolder = "scheduler")
        
    def chunks(self, lst, n):
        # Yield successive n-sized chunks from a list
        for i in range(0, len(lst), n):
            yield lst[i:i + n]
        
    def generate(self, prompt, steps = 100, guidance = 3, samples = 1, disable_progress = True):
        # Generate audio for a single prompt string
        with torch.no_grad():
            latents = self.model.inference([prompt], self.scheduler, steps, guidance, samples, disable_progress = disable_progress)
            mel = self.vae.decode_first_stage(latents)
            wave = self.vae.decode_to_waveform(mel)
        return wave
    
    def generate_for_batch(self, prompts, steps = 200, guidance = 3, samples = 1, batch_size = 8, disable_progress = True):
        # Generate audio for a list of prompt strings
        outputs = []
        for k in tqdm(range(0, len(prompts), batch_size)):
            batch = prompts[k: k + batch_size]
            with torch.no_grad():
                latents = self.model.inference(batch, self.scheduler, steps, guidance, samples, disable_progress = disable_progress)
                mel = self.vae.decode_first_stage(latents)
                wave = self.vae.decode_to_waveform(mel)
                outputs += [item for item in wave]
        if samples == 1:
            return outputs
        return list(self.chunks(outputs, samples))

# Initialize TANGO

tango = Tango(device = "cpu")
#tango.vae.to(device_type)
#tango.stft.to(device_type)
#tango.model.to(device_type)

#def update_seed(is_randomize_seed, seed):
#    if is_randomize_seed:
#        return random.randint(0, max_64_bit_int)
#    return seed
#
#def check(
#    prompt,
#    output_number,
#    steps,
#    guidance,
#    is_randomize_seed,
#    seed
#):
#    if prompt is None or prompt == "":
#        raise gr.Error("Please provide a prompt input.")
#    if not output_number in [1, 2, 3]:
#        raise gr.Error("Please ask for 1, 2 or 3 output files.")
#
#def update_output(output_format, output_number):
#    return [
#        gr.update(format = output_format),
#        gr.update(format = output_format, visible = (2 <= output_number)),
#        gr.update(format = output_format, visible = (output_number == 3)),
#        gr.update(visible = False)
#    ]
#
#def text2audio(
#    prompt,
#    output_number,
#    steps,
#    guidance,
#    is_randomize_seed,
#    seed
#):
#    start = time.time()
#
#    if seed is None:
#        seed = random.randint(0, max_64_bit_int)
#
#    random.seed(seed)
#    torch.manual_seed(seed)
#
#    output_wave = tango.generate(prompt, steps, guidance, output_number)
#
#    output_wave_1 = gr.make_waveform((16000, output_wave[0]))
#    output_wave_2 = gr.make_waveform((16000, output_wave[1])) if (2 <= output_number) else None
#    output_wave_3 = gr.make_waveform((16000, output_wave[2])) if (output_number == 3) else None
#
#    end = time.time()
#    secondes = int(end - start)
#    minutes = secondes // 60
#    secondes = secondes - (minutes * 60)
#    hours = minutes // 60
#    minutes = minutes - (hours * 60)
#    return [
#        output_wave_1,
#        output_wave_2,
#        output_wave_3,
#        gr.update(visible = True, value = "Start again to get a different result. The output have been generated in " + ((str(hours) + " h, ") if hours != 0 else "") + ((str(minutes) + " min, ") if hours != 0 or minutes != 0 else "") + str(secondes) + " sec.")
#    ]
#
#if is_space_imported:
#    text2audio = spaces.GPU(text2audio, duration = 420)

# Old code
net=BriaRMBG()
model_path = hf_hub_download("cocktailpeanut/gbmr", 'model.pth')
if torch.cuda.is_available():
    net.load_state_dict(torch.load(model_path))
    net=net.cuda()
    device = "cuda"
elif torch.backends.mps.is_available():
    net.load_state_dict(torch.load(model_path,map_location="mps"))
    net=net.to("mps")
    device = "mps"
else:
    net.load_state_dict(torch.load(model_path,map_location="cpu"))
    device = "cpu"
net.eval() 

    
def resize_image(image):
    image = image.convert('RGB')
    model_input_size = (1024, 1024)
    image = image.resize(model_input_size, Image.BILINEAR)
    return image


def process(image):

    # prepare input
    orig_image = Image.fromarray(image)
    w,h = orig_im_size = orig_image.size
    image = resize_image(orig_image)
    im_np = np.array(image)
    im_tensor = torch.tensor(im_np, dtype=torch.float32).permute(2,0,1)
    im_tensor = torch.unsqueeze(im_tensor,0)
    im_tensor = torch.divide(im_tensor,255.0)
    im_tensor = normalize(im_tensor,[0.5,0.5,0.5],[1.0,1.0,1.0])
    if device == "cuda":
        im_tensor=im_tensor.cuda()
    elif device == "mps":
        im_tensor=im_tensor.to("mps")

    #inference
    result=net(im_tensor)
    # post process
    result = torch.squeeze(F.interpolate(result[0][0], size=(h,w), mode='bilinear') ,0)
    ma = torch.max(result)
    mi = torch.min(result)
    result = (result-mi)/(ma-mi)    
    # image to pil
    im_array = (result*255).cpu().data.numpy().astype(np.uint8)
    pil_im = Image.fromarray(np.squeeze(im_array))
    # paste the mask on the original image
    new_im = Image.new("RGBA", pil_im.size, (0,0,0,0))
    new_im.paste(orig_image, mask=pil_im)

    return new_im

gr.Markdown("## BRIA RMBG 1.4")
gr.HTML('''
  <p style="margin-bottom: 10px; font-size: 94%">
    This is a demo for BRIA RMBG 1.4 that using
    <a href="https://huggingface.co/briaai/RMBG-1.4" target="_blank">BRIA RMBG-1.4 image matting model</a> as backbone. 
  </p>
''')
title = "Background Removal"
description = r"""Background removal model developed by <a href='https://BRIA.AI' target='_blank'><b>BRIA.AI</b></a>, trained on a carefully selected dataset and is available as an open-source model for non-commercial use.<br> 
For test upload your image and wait. Read more at model card <a href='https://huggingface.co/briaai/RMBG-1.4' target='_blank'><b>briaai/RMBG-1.4</b></a>.<br>
"""
examples = [['./input.jpg'],]
demo = gr.Interface(fn=process,inputs="image", outputs="image", examples=examples, title=title, description=description)

if __name__ == "__main__":
    demo.launch(share=False)