flux-lightning / app.py
Jordan Legg
target the text encoder, merge latent space before the pipeline
3be64a5
raw
history blame
5.49 kB
import spaces
import gradio as gr
import numpy as np
import random
import torch
import torch.nn as nn
from PIL import Image
from torchvision import transforms
from diffusers import DiffusionPipeline
# Constants
dtype = torch.bfloat16
device = "cuda" if torch.cuda.is_available() else "cpu"
MAX_SEED = np.iinfo(np.int32).max
MAX_IMAGE_SIZE = 2048
LATENT_CHANNELS = 16
TEXT_EMBED_DIM = 768
MAX_TEXT_EMBEDDINGS = 77
SCALING_FACTOR = 0.3611
# Load FLUX model
pipe = DiffusionPipeline.from_pretrained("black-forest-labs/FLUX.1-schnell", torch_dtype=dtype).to(device)
pipe.enable_model_cpu_offload()
pipe.vae.enable_slicing()
pipe.vae.enable_tiling()
# Add a projection layer to match text embedding dimension
projection = nn.Linear(LATENT_CHANNELS, TEXT_EMBED_DIM).to(device).to(dtype)
def preprocess_image(image, image_size):
preprocess = transforms.Compose([
transforms.Resize((image_size, image_size), interpolation=transforms.InterpolationMode.LANCZOS),
transforms.ToTensor(),
transforms.Normalize([0.5], [0.5])
])
image = preprocess(image).unsqueeze(0).to(device, dtype=dtype)
return image
def process_latents(latents, height, width):
print(f"Input latent shape: {latents.shape}")
# Ensure latents are the correct shape
if latents.shape[2:] != (height // 8, width // 8):
latents = torch.nn.functional.interpolate(latents, size=(height // 8, width // 8), mode='bilinear')
print(f"Latent shape after potential interpolation: {latents.shape}")
# Reshape latents to [batch_size, seq_len, channels]
latents = latents.permute(0, 2, 3, 1).reshape(1, -1, LATENT_CHANNELS)
print(f"Reshaped latent shape: {latents.shape}")
# Project latents to match text embedding dimension
latents = projection(latents)
print(f"Projected latent shape: {latents.shape}")
# Adjust sequence length to match text embeddings
seq_len = latents.shape[1]
if seq_len > MAX_TEXT_EMBEDDINGS:
latents = latents[:, :MAX_TEXT_EMBEDDINGS, :]
elif seq_len < MAX_TEXT_EMBEDDINGS:
pad_len = MAX_TEXT_EMBEDDINGS - seq_len
latents = torch.nn.functional.pad(latents, (0, 0, 0, pad_len, 0, 0))
print(f"Final latent shape: {latents.shape}")
return latents
@spaces.GPU()
def infer(prompt, init_image=None, seed=42, randomize_seed=False, width=1024, height=1024, num_inference_steps=4, progress=gr.Progress(track_tqdm=True)):
if randomize_seed:
seed = random.randint(0, MAX_SEED)
generator = torch.Generator(device=device).manual_seed(seed)
try:
if init_image is None:
# text2img case
image = pipe(
prompt=prompt,
height=height,
width=width,
num_inference_steps=num_inference_steps,
generator=generator,
guidance_scale=0.0
).images[0]
else:
# img2img case
init_image = init_image.convert("RGB")
init_image = preprocess_image(init_image, 1024) # Using 1024 as FLUX VAE sample size
# Encode the image using FLUX VAE
latents = pipe.vae.encode(init_image).latent_dist.sample() * SCALING_FACTOR
print(f"Initial latent shape from VAE: {latents.shape}")
# Process latents to match text embedding format
latents = process_latents(latents, height, width)
# Get text embeddings
text_embeddings = pipe.transformer.text_encoder([prompt])
print(f"Text embedding shape: {text_embeddings.shape}")
# Combine image latents and text embeddings
combined_embeddings = torch.cat([latents, text_embeddings], dim=1)
print(f"Combined embedding shape: {combined_embeddings.shape}")
image = pipe(
prompt=prompt,
height=height,
width=width,
num_inference_steps=num_inference_steps,
generator=generator,
guidance_scale=0.0,
latents=combined_embeddings
).images[0]
return image, seed
except Exception as e:
print(f"Error during inference: {e}")
import traceback
traceback.print_exc()
return Image.new("RGB", (width, height), (255, 0, 0)), seed # Red fallback image
# Gradio interface setup
with gr.Blocks() as demo:
with gr.Row():
prompt = gr.Textbox(label="Prompt")
init_image = gr.Image(label="Initial Image (optional)", type="pil")
with gr.Row():
generate = gr.Button("Generate")
with gr.Row():
result = gr.Image(label="Result")
seed_output = gr.Number(label="Seed")
with gr.Accordion("Advanced Settings", open=False):
seed = gr.Slider(label="Seed", minimum=0, maximum=MAX_SEED, step=1, value=42)
randomize_seed = gr.Checkbox(label="Randomize seed", value=True)
width = gr.Slider(label="Width", minimum=256, maximum=MAX_IMAGE_SIZE, step=32, value=1024)
height = gr.Slider(label="Height", minimum=256, maximum=MAX_IMAGE_SIZE, step=32, value=1024)
num_inference_steps = gr.Slider(label="Number of inference steps", minimum=1, maximum=50, step=1, value=4)
generate.click(
infer,
inputs=[prompt, init_image, seed, randomize_seed, width, height, num_inference_steps],
outputs=[result, seed_output]
)
demo.launch()