flux-lightning / app.py
Jordan Legg
loaded vae
c5a49a2
raw
history blame
8.27 kB
import gradio as gr
import numpy as np
import random
import torch
import spaces
from PIL import Image
from torchvision import transforms
from diffusers import DiffusionPipeline, AutoencoderKL
# Define constants
dtype = torch.bfloat16
device = "cuda" if torch.cuda.is_available() else "cpu"
MAX_SEED = np.iinfo(np.int32).max
MAX_IMAGE_SIZE = 2048
# Load the initial VAE model for preprocessing YAY
# Load the initial VAE model for preprocessing
vae_model_name = "runwayml/stable-diffusion-v1-5" # Adjusted VAE model path
vae = AutoencoderKL.from_pretrained(vae_model_name, subfolder="vae").to(device)
# Load the FLUX diffusion pipeline with optimizations
pipe = DiffusionPipeline.from_pretrained("black-forest-labs/FLUX.1-schnell", torch_dtype=dtype)
pipe.enable_model_cpu_offload()
pipe.vae.enable_slicing()
pipe.vae.enable_tiling()
pipe.to(device)
def preprocess_image(image, image_size):
preprocess = transforms.Compose([
transforms.Resize((image_size, image_size)),
transforms.ToTensor(),
transforms.Normalize([0.5], [0.5])
])
image = preprocess(image).unsqueeze(0).to(device, dtype=dtype)
print("Image processed successfully.")
return image
def encode_image(image, vae):
with torch.no_grad():
latents = vae.encode(image).latent_dist.sample() * 0.18215
print("Image encoded successfully.")
return latents
@spaces.GPU()
def infer(prompt, init_image=None, seed=42, randomize_seed=False, width=1024, height=1024, num_inference_steps=4, progress=gr.Progress(track_tqdm=True)):
if randomize_seed:
seed = random.randint(0, MAX_SEED)
generator = torch.Generator().manual_seed(seed)
fallback_image = Image.new("RGB", (width, height), (255, 0, 0)) # Red image as a fallback
if init_image is None:
# text2img case
try:
result = pipe(
prompt=prompt,
height=height,
width=width,
num_inference_steps=num_inference_steps,
generator=generator,
guidance_scale=0.0,
max_sequence_length=256
)
image = result.images[0]
return image, seed
except Exception as e:
print(f"Pipeline call failed with error: {e}")
return fallback_image, seed
else:
# img2img case
print("Initial image provided, starting preprocessing...")
vae_image_size = 1024 # Using FLUX VAE sample size for preprocessing
init_image = init_image.convert("RGB")
init_image = preprocess_image(init_image, vae_image_size)
print("Starting encoding of the image...")
latents = encode_image(init_image, vae)
print(f"Latents shape after encoding: {latents.shape}")
# Ensure the latents size matches the expected input size for the FLUX model
print("Interpolating latents to match model's input size...")
latents = torch.nn.functional.interpolate(latents, size=(height // 8, width // 8))
latent_channels = 16 # Using FLUX VAE latent channels
print(f"Latent channels from VAE: {latent_channels}, expected by FLUX model: {pipe.vae.config.latent_channels}")
if latent_channels != pipe.vae.config.latent_channels:
print(f"Adjusting latent channels from {latent_channels} to {pipe.vae.config.latent_channels}")
conv = torch.nn.Conv2d(latent_channels, pipe.vae.config.latent_channels, kernel_size=1).to(device, dtype=dtype)
latents = conv(latents)
latents = latents.permute(0, 2, 3, 1).contiguous().view(-1, pipe.vae.config.latent_channels)
print(f"Latents shape after permutation: {latents.shape}")
try:
print("Sending latents to the FLUX transformer...")
# Determine if 'timesteps' is required for the transformer
if hasattr(pipe.transformer, 'forward') and hasattr(pipe.transformer.forward, '__code__') and 'timesteps' in pipe.transformer.forward.__code__.co_varnames:
timestep = torch.tensor([num_inference_steps], device=device, dtype=dtype)
_ = pipe.transformer(latents, timesteps=timestep)
else:
_ = pipe.transformer(latents)
except Exception as e:
print(f"Transformer call failed with error: {e}. Skipping transformer step.")
return fallback_image, seed
try:
print("Generating final image with the FLUX pipeline...")
image = pipe(
prompt=prompt,
height=height,
width=width,
num_inference_steps=num_inference_steps,
generator=generator,
guidance_scale=0.0,
latents=latents
).images[0]
print("Image generation completed.")
except Exception as e:
print(f"Pipeline call with latents failed with error: {e}")
return fallback_image, seed
return image, seed
# Define example prompts
examples = [
"a tiny astronaut hatching from an egg on the moon",
"a cat holding a sign that says hello world",
"an anime illustration of a wiener schnitzel",
]
# CSS styling for the Japanese-inspired interface
css = """
body {
background-color: #fff;
font-family: 'Noto Sans JP', sans-serif;
color: #333;
}
#col-container {
margin: 0 auto;
max-width: 520px;
border: 2px solid #000;
padding: 20px;
background-color: #f7f7f7;
border-radius: 10px;
}
.gr-button {
background-color: #e60012;
color: #fff;
border: 2px solid #000;
}
.gr-button:hover {
background-color: #c20010;
}
.gr-slider, .gr-checkbox, .gr-textbox {
border: 2px solid #000;
}
.gr-accordion {
border: 2px solid #000;
background-color: #fff;
}
.gr-image {
border: 2px solid #000;
}
"""
# Create the Gradio interface
with gr.Blocks(css=css) as demo:
with gr.Column(elem_id="col-container"):
gr.Markdown("""
# FLUX.1 [schnell]
12B param rectified flow transformer distilled from [FLUX.1 [pro]](https://blackforestlabs.ai/) for 4 step generation
[[blog](https://blackforestlabs.ai/announcing-black-forest-labs/)] [[model](https://huggingface.co/black-forest-labs/FLUX.1-schnell)]
""")
with gr.Row():
prompt = gr.Textbox(
label="Prompt",
show_label=False,
max_lines=1,
placeholder="Enter your prompt",
container=False,
)
run_button = gr.Button("Run", scale=0)
with gr.Row():
init_image = gr.Image(label="Initial Image (optional)", type="pil")
result = gr.Image(label="Result", show_label=False)
with gr.Accordion("Advanced Settings", open=False):
seed = gr.Slider(
label="Seed",
minimum=0,
maximum=MAX_SEED,
step=1,
value=42,
)
randomize_seed = gr.Checkbox(label="Randomize seed", value=True)
with gr.Row():
width = gr.Slider(
label="Width",
minimum=256,
maximum=MAX_IMAGE_SIZE,
step=32,
value=1024,
)
height = gr.Slider(
label="Height",
minimum=256,
maximum=MAX_IMAGE_SIZE,
step=32,
value=1024,
)
with gr.Row():
num_inference_steps = gr.Slider(
label="Number of inference steps",
minimum=1,
maximum=50,
step=1,
value=4,
)
gr.Examples(
examples=examples,
fn=infer,
inputs=[prompt],
outputs=[result, seed],
cache_examples="lazy"
)
run_button.click(
infer,
inputs=[prompt, init_image, seed, randomize_seed, width, height, num_inference_steps],
outputs=[result, seed]
)
demo.launch()