Spaces:
Running
on
Zero
Running
on
Zero
import spaces | |
import os | |
import uuid | |
import torch | |
import logging | |
import tempfile | |
import numpy as np | |
import gradio as gr | |
from datetime import datetime | |
from diffusers import WanImageToVideoPipeline | |
from diffusers.utils import export_to_video | |
from huggingface_hub import upload_file | |
from PIL import Image | |
# ----------------- Setup ----------------- | |
logging.basicConfig(level=logging.INFO) | |
HF_MODEL = "rahul7star/rahulAI" | |
dtype = torch.bfloat16 | |
device = "cuda" | |
model_id = "FastDM/Wan2.2-I2V-A14B-Merge-Lightning-V1.0-Diffusers" | |
pipe = WanImageToVideoPipeline.from_pretrained(model_id, torch_dtype=dtype) | |
pipe.to(device) | |
default_negative_prompt = ( | |
"色调艳丽,过曝,静态,细节模糊不清,字幕,风格,作品,画作,画面,静止,整体发灰,最差质量,低质量," | |
"JPEG压缩残留,丑陋的,残缺的,多余的手指,画得不好的手部,画得不好的脸部,畸形的,毁容的,形态畸形的肢体," | |
"手指融合,静止不动的画面,杂乱的背景,三条腿,背景人很多,倒着走" | |
) | |
# ----------------- Upload helper ----------------- | |
def upscale_and_upload_4k(input_video_path: str, input_image, summary_text: str) -> str: | |
""" | |
Upload video (4K), input image, and summary text to HF. | |
""" | |
logging.info(f"Upscaling video to 4K for upload: {input_video_path}") | |
# Upscale video | |
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp_upscaled: | |
upscaled_path = tmp_upscaled.name | |
cmd = [ | |
"ffmpeg", "-i", input_video_path, | |
"-vf", "scale=3840:2160:flags=lanczos", | |
"-c:v", "libx264", "-crf", "18", "-preset", "slow", "-y", upscaled_path, | |
] | |
os.system(" ".join(cmd)) # safer: subprocess.run, but HF Spaces sometimes picky | |
# Create HF folder | |
today_str = datetime.now().strftime("%Y-%m-%d") | |
unique_subfolder = f"upload_{uuid.uuid4().hex[:8]}" | |
hf_folder = f"{today_str}-WAN-I2V/{unique_subfolder}" | |
# Upload video | |
video_filename = os.path.basename(input_video_path) | |
video_hf_path = f"{hf_folder}/{video_filename}" | |
upload_file(upscaled_path, video_hf_path, repo_id=HF_MODEL, repo_type="model", | |
token=os.environ.get("HUGGINGFACE_HUB_TOKEN")) | |
# Upload image | |
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp_img: | |
if isinstance(input_image, str): | |
import shutil | |
shutil.copy(input_image, tmp_img.name) | |
else: | |
input_image.save(tmp_img.name, format="PNG") | |
tmp_img_path = tmp_img.name | |
image_hf_path = f"{hf_folder}/input_image.png" | |
upload_file(tmp_img_path, image_hf_path, repo_id=HF_MODEL, repo_type="model", | |
token=os.environ.get("HUGGINGFACE_HUB_TOKEN")) | |
# Upload summary | |
summary_file = tempfile.NamedTemporaryFile(delete=False, suffix=".txt").name | |
with open(summary_file, "w", encoding="utf-8") as f: | |
f.write(summary_text) | |
summary_hf_path = f"{hf_folder}/summary.txt" | |
upload_file(summary_file, summary_hf_path, repo_id=HF_MODEL, repo_type="model", | |
token=os.environ.get("HUGGINGFACE_HUB_TOKEN")) | |
# Cleanup | |
os.remove(upscaled_path) | |
os.remove(tmp_img_path) | |
os.remove(summary_file) | |
return hf_folder | |
# ----------------- Video generation ----------------- | |
def get_duration( | |
input_image, | |
prompt, | |
negative_prompt, | |
duration_seconds, | |
guidance_scale, | |
guidance_scale_2, | |
steps, | |
seed, | |
randomize_seed, | |
progress, | |
): | |
return steps * 15 | |
def generate_video(input_image, prompt, negative_prompt=default_negative_prompt, | |
duration_seconds=2, guidance_scale=3.5, steps=40, seed=0): | |
if input_image is None: | |
return None, "Please upload an image!" | |
# Ensure divisible by patch size | |
max_area = 480 * 832 | |
aspect_ratio = input_image.height / input_image.width | |
mod_value = pipe.vae_scale_factor_spatial * pipe.transformer.config.patch_size[1] | |
height = round(np.sqrt(max_area * aspect_ratio)) // mod_value * mod_value | |
width = round(np.sqrt(max_area / aspect_ratio)) // mod_value * mod_value | |
input_image = input_image.resize((width, height)) | |
generator = torch.Generator(device=device).manual_seed(int(seed)) | |
with torch.inference_mode(): | |
output_frames_list = pipe( | |
image=input_image, | |
prompt=prompt, | |
negative_prompt=negative_prompt, | |
height=height, | |
width=width, | |
num_frames=int(duration_seconds * 16), # 16 fps | |
guidance_scale=float(guidance_scale), | |
num_inference_steps=int(steps), | |
generator=generator, | |
).frames[0] | |
# Save temp video | |
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmpfile: | |
video_path = tmpfile.name | |
export_to_video(output_frames_list, video_path, fps=16) | |
# Upload to HF | |
#hf_folder = upscale_and_upload_4k(video_path, input_image, prompt) | |
return video_path, f"✅ Uploaded to HF: {hf_folder}" | |
# ----------------- Gradio UI ----------------- | |
with gr.Blocks() as demo: | |
gr.Markdown("# 🖼️➡️🎥 Image to Video with Wan 2.2 I2V (14B Lightning)") | |
with gr.Row(): | |
with gr.Column(): | |
input_image = gr.Image(type="pil", label="Upload an Image") | |
prompt = gr.Textbox(lines=4, label="Prompt") | |
negative_prompt = gr.Textbox(value=default_negative_prompt, lines=3, label="Negative Prompt") | |
duration = gr.Slider(1, 4, value=2, step=1, label="Duration (seconds)") | |
guidance_scale = gr.Slider(0, 10, value=3.5, step=0.5, label="Guidance Scale") | |
steps = gr.Slider(10, 50, value=40, step=1, label="Inference Steps") | |
seed = gr.Number(value=0, precision=0, label="Seed") | |
generate_btn = gr.Button("🚀 Generate Video") | |
with gr.Column(): | |
output_video = gr.Video(label="Generated Video") | |
upload_status = gr.Textbox(label="Upload Status", interactive=False) | |
generate_btn.click( | |
generate_video, | |
inputs=[input_image, prompt, negative_prompt, duration, guidance_scale, steps, seed], | |
outputs=[output_video, upload_status], | |
) | |
if __name__ == "__main__": | |
demo.launch() | |