Spaces:
Running
Running
import torch | |
from diffusers import UniPCMultistepScheduler, FlowMatchEulerDiscreteScheduler, DDIMScheduler, DPMSolverMultistepScheduler | |
from diffusers import WanPipeline, AutoencoderKLWan # Use Wan-specific VAE | |
# from diffusers.hooks import apply_first_block_cache, FirstBlockCacheConfig | |
from para_attn.first_block_cache.diffusers_adapters import apply_cache_on_pipe | |
from diffusers.models import UNetSpatioTemporalConditionModel | |
from transformers import T5EncoderModel, T5Tokenizer | |
from huggingface_hub import hf_hub_download | |
from PIL import Image | |
import numpy as np | |
import gradio as gr | |
import spaces | |
model_id = "Wan-AI/Wan2.1-T2V-1.3B-Diffusers" | |
vae = AutoencoderKLWan.from_pretrained(model_id, subfolder="vae", torch_dtype=torch.float32) | |
pipe = WanPipeline.from_pretrained(model_id, vae=vae, torch_dtype=torch.bfloat16) | |
flow_shift = 1.0 #5.0 1.0 for image, 5.0 for 720P, 3.0 for 480P | |
pipe.scheduler = UniPCMultistepScheduler.from_config(pipe.scheduler.config, flow_shift=flow_shift) | |
# Configure DDIMScheduler with a beta schedule | |
# pipe.scheduler = DDIMScheduler.from_config( | |
# pipe.scheduler.config, | |
# beta_start=0.00085, # Starting beta value | |
# beta_end=0.012, # Ending beta value | |
# beta_schedule="linear", # Linear beta schedule (other options: "scaled_linear", "squaredcos_cap_v2") | |
# num_train_timesteps=1000, # Number of timesteps | |
# flow_shift=flow_shift | |
# ) | |
# Configure FlowMatchEulerDiscreteScheduler | |
# pipe.scheduler = FlowMatchEulerDiscreteScheduler.from_config( | |
# pipe.scheduler.config, | |
# flow_shift=flow_shift # Retain flow_shift for WanPipeline compatibility | |
# ) | |
# --- LoRA State Management --- | |
# Define unique names for our adapters | |
DEFAULT_LORA_NAME = "causvid_lora" | |
CUSTOM_LORA_NAME = "custom_lora" | |
# Track which custom LoRA is currently loaded to avoid reloading | |
CURRENTLY_LOADED_CUSTOM_LORA = None | |
# Load the default base LoRA ONCE at startup | |
print("Loading base LoRA...") | |
CAUSVID_LORA_REPO = "Kijai/WanVideo_comfy" | |
CAUSVID_LORA_FILENAME = "Wan21_CausVid_bidirect2_T2V_1_3B_lora_rank32.safetensors" | |
try: | |
causvid_path = hf_hub_download(repo_id=CAUSVID_LORA_REPO, filename=CAUSVID_LORA_FILENAME) | |
pipe.load_lora_weights(causvid_path, adapter_name=DEFAULT_LORA_NAME) | |
print(f"✅ Default LoRA '{DEFAULT_LORA_NAME}' loaded successfully.") | |
except Exception as e: | |
print(f"⚠️ Default LoRA could not be loaded: {e}") | |
DEFAULT_LORA_NAME = None | |
print("Initialization complete. Gradio is starting...") | |
def generate(prompt, negative_prompt, width=1024, height=1024, num_inference_steps=30, lora_id=None, progress=gr.Progress(track_tqdm=True)): | |
# if lora_id and lora_id.strip() != "": | |
# pipe.unload_lora_weights() | |
# pipe.load_lora_weights(lora_id.strip()) | |
print("Loading base LoRA for this run...") | |
causvid_path = hf_hub_download(repo_id=CAUSVID_LORA_REPO, filename=CAUSVID_LORA_FILENAME) | |
pipe.load_lora_weights(causvid_path, adapter_name=DEFAULT_LORA_NAME) | |
# If a custom LoRA is provided, load it as well. | |
if clean_lora_id: | |
print(f"Loading custom LoRA '{clean_lora_id}' for this run...") | |
pipe.load_lora_weights(clean_lora_id, adapter_name=CUSTOM_LORA_NAME) | |
# If a custom LoRA is present, activate both. | |
pipe.set_adapters([DEFAULT_LORA_NAME, CUSTOM_LORA_NAME], adapter_weights=[1.0, 1.0]) | |
else: | |
# If no custom LoRA, just activate the base one. | |
print("Activating base LoRA only.") | |
pipe.set_adapters([DEFAULT_LORA_NAME], adapter_weights=[1.0]) | |
pipe.to("cuda") | |
# apply_first_block_cache(pipe.transformer, FirstBlockCacheConfig(threshold=0.2)) | |
apply_cache_on_pipe( | |
pipe, | |
# residual_diff_threshold=0.2, | |
) | |
try: | |
output = pipe( | |
prompt=prompt, | |
negative_prompt=negative_prompt, | |
height=height, | |
width=width, | |
num_frames=1, | |
num_inference_steps=num_inference_steps, | |
guidance_scale=1.0, #5.0 | |
) | |
image = output.frames[0][0] | |
image = (image * 255).astype(np.uint8) | |
return Image.fromarray(image) | |
finally: | |
# if lora_id and lora_id.strip() != "": | |
# pass | |
# pipe.unload_lora_weights() | |
# if clean_lora_id: | |
# print(f"Unloading '{CUSTOM_LORA_NAME}' from this run.") | |
# pipe.unload_lora_weights(CUSTOM_LORA_NAME) | |
# # Always disable all active LoRAs to reset the state. | |
# pipe.disable_lora() | |
print("Unloading all LoRAs to clean up.") | |
pipe.unload_lora_weights() | |
iface = gr.Interface( | |
fn=generate, | |
inputs=[ | |
gr.Textbox(label="Input prompt"), | |
], | |
additional_inputs = [ | |
gr.Textbox(label="Negative prompt", value = "Bright tones, overexposed, static, blurred details, subtitles, style, works, paintings, images, static, overall gray, worst quality, low quality, JPEG compression residue, ugly, incomplete, extra fingers, poorly drawn hands, poorly drawn faces, deformed, disfigured, misshapen limbs, fused fingers, still picture, messy background, three legs, many people in the background, walking backwards"), | |
gr.Slider(label="Width", minimum=480, maximum=1280, step=16, value=1024), | |
gr.Slider(label="Height", minimum=480, maximum=1280, step=16, value=1024), | |
gr.Slider(minimum=1, maximum=80, step=1, label="Inference Steps", value=30), | |
gr.Textbox(label="LoRA ID"), | |
], | |
outputs=gr.Image(label="output"), | |
) | |
iface.launch() |