import spaces
import gradio as gr
import numpy as np
import random
import torch
import torch.nn as nn
from PIL import Image
from torchvision import transforms
from diffusers import DiffusionPipeline

# Constants
dtype = torch.bfloat16
device = "cuda" if torch.cuda.is_available() else "cpu"
MAX_SEED = np.iinfo(np.int32).max
MAX_IMAGE_SIZE = 2048
LATENT_CHANNELS = 16
TEXT_EMBED_DIM = 768
MAX_TEXT_EMBEDDINGS = 77
SCALING_FACTOR = 0.3611

# Load FLUX model
pipe = DiffusionPipeline.from_pretrained("black-forest-labs/FLUX.1-schnell", torch_dtype=dtype).to(device)
pipe.enable_model_cpu_offload()
pipe.vae.enable_slicing()
pipe.vae.enable_tiling()

# Add a projection layer to match text embedding dimension
projection = nn.Linear(LATENT_CHANNELS, TEXT_EMBED_DIM).to(device).to(dtype)

def preprocess_image(image, image_size):
    preprocess = transforms.Compose([
        transforms.Resize((image_size, image_size), interpolation=transforms.InterpolationMode.LANCZOS),
        transforms.ToTensor(),
        transforms.Normalize([0.5], [0.5])
    ])
    image = preprocess(image).unsqueeze(0).to(device, dtype=dtype)
    return image

def process_latents(latents, height, width):
    print(f"Input latent shape: {latents.shape}")
    
    # Ensure latents are the correct shape
    if latents.shape[2:] != (height // 8, width // 8):
        latents = torch.nn.functional.interpolate(latents, size=(height // 8, width // 8), mode='bilinear')
    print(f"Latent shape after potential interpolation: {latents.shape}")
    
    # Reshape latents to [batch_size, seq_len, channels]
    latents = latents.permute(0, 2, 3, 1).reshape(1, -1, LATENT_CHANNELS)
    print(f"Reshaped latent shape: {latents.shape}")
    
    # Project latents to match text embedding dimension
    latents = projection(latents)
    print(f"Projected latent shape: {latents.shape}")
    
    # Adjust sequence length to match text embeddings
    seq_len = latents.shape[1]
    if seq_len > MAX_TEXT_EMBEDDINGS:
        latents = latents[:, :MAX_TEXT_EMBEDDINGS, :]
    elif seq_len < MAX_TEXT_EMBEDDINGS:
        pad_len = MAX_TEXT_EMBEDDINGS - seq_len
        latents = torch.nn.functional.pad(latents, (0, 0, 0, pad_len, 0, 0))
    print(f"Final latent shape: {latents.shape}")
    
    return latents

@spaces.GPU()
def infer(prompt, init_image=None, seed=42, randomize_seed=False, width=1024, height=1024, num_inference_steps=4, progress=gr.Progress(track_tqdm=True)):
    if randomize_seed:
        seed = random.randint(0, MAX_SEED)
    generator = torch.Generator(device=device).manual_seed(seed)

    try:
        if init_image is None:
            # text2img case
            image = pipe(
                prompt=prompt,
                height=height,
                width=width,
                num_inference_steps=num_inference_steps,
                generator=generator,
                guidance_scale=0.0
            ).images[0]
        else:
            # img2img case
            init_image = init_image.convert("RGB")
            init_image = preprocess_image(init_image, 1024)  # Using 1024 as FLUX VAE sample size
            
            # Encode the image using FLUX VAE
            latents = pipe.vae.encode(init_image).latent_dist.sample() * SCALING_FACTOR
            print(f"Initial latent shape from VAE: {latents.shape}")
            
            # Process latents to match text embedding format
            latents = process_latents(latents, height, width)

            # Get text embeddings
            text_embeddings = pipe.transformer.text_encoder([prompt])
            print(f"Text embedding shape: {text_embeddings.shape}")

            # Combine image latents and text embeddings
            combined_embeddings = torch.cat([latents, text_embeddings], dim=1)
            print(f"Combined embedding shape: {combined_embeddings.shape}")

            image = pipe(
                prompt=prompt,
                height=height,
                width=width,
                num_inference_steps=num_inference_steps,
                generator=generator,
                guidance_scale=0.0,
                latents=combined_embeddings
            ).images[0]

        return image, seed
    except Exception as e:
        print(f"Error during inference: {e}")
        import traceback
        traceback.print_exc()
        return Image.new("RGB", (width, height), (255, 0, 0)), seed  # Red fallback image

# Gradio interface setup
with gr.Blocks() as demo:
    with gr.Row():
        prompt = gr.Textbox(label="Prompt")
        init_image = gr.Image(label="Initial Image (optional)", type="pil")

    with gr.Row():
        generate = gr.Button("Generate")
        
    with gr.Row():
        result = gr.Image(label="Result")
        seed_output = gr.Number(label="Seed")

    with gr.Accordion("Advanced Settings", open=False):
        seed = gr.Slider(label="Seed", minimum=0, maximum=MAX_SEED, step=1, value=42)
        randomize_seed = gr.Checkbox(label="Randomize seed", value=True)
        width = gr.Slider(label="Width", minimum=256, maximum=MAX_IMAGE_SIZE, step=32, value=1024)
        height = gr.Slider(label="Height", minimum=256, maximum=MAX_IMAGE_SIZE, step=32, value=1024)
        num_inference_steps = gr.Slider(label="Number of inference steps", minimum=1, maximum=50, step=1, value=4)

    generate.click(
        infer,
        inputs=[prompt, init_image, seed, randomize_seed, width, height, num_inference_steps],
        outputs=[result, seed_output]
    )

demo.launch()