text2video / app.py
ozilion's picture
Update app.py
4dcdb86 verified
raw
history blame
21 kB
import gradio as gr
import torch
import os
import gc
import numpy as np
import tempfile
from typing import Optional, Tuple
import time
# ZeroGPU support
try:
import spaces
SPACES_AVAILABLE = True
except ImportError:
SPACES_AVAILABLE = False
class spaces:
@staticmethod
def GPU(duration=300):
def decorator(func): return func
return decorator
# Environment
IS_ZERO_GPU = os.environ.get("SPACES_ZERO_GPU") == "true"
IS_SPACES = os.environ.get("SPACE_ID") is not None
HAS_CUDA = torch.cuda.is_available()
print(f"πŸš€ H200 CogVideoX Setup: ZeroGPU={IS_ZERO_GPU}, Spaces={IS_SPACES}, CUDA={HAS_CUDA}")
# WORKING MODELS - Tested and confirmed
WORKING_MODELS = [
{
"id": "THUDM/CogVideoX-2b",
"name": "CogVideoX-2B",
"pipeline_class": "CogVideoXPipeline",
"resolution": (720, 480),
"max_frames": 49,
"dtype": torch.bfloat16,
"fps": 8,
"priority": 1,
"description": "2B parameter model - fast and high quality"
},
{
"id": "THUDM/CogVideoX-5b",
"name": "CogVideoX-5B",
"pipeline_class": "CogVideoXPipeline",
"resolution": (720, 480),
"max_frames": 49,
"dtype": torch.bfloat16,
"fps": 8,
"priority": 2,
"description": "5B parameter model - maximum quality"
},
{
"id": "damo-vilab/text-to-video-ms-1.7b",
"name": "ModelScope T2V 1.7B",
"pipeline_class": "DiffusionPipeline",
"resolution": (256, 256),
"max_frames": 16,
"dtype": torch.float16,
"fps": 8,
"priority": 3,
"description": "Reliable fallback model"
}
]
# Global variables
MODEL = None
MODEL_INFO = None
LOADING_LOGS = []
def log_loading(message):
"""Enhanced logging with timestamps"""
global LOADING_LOGS
timestamp = time.strftime('%H:%M:%S')
formatted_msg = f"[{timestamp}] {message}"
print(formatted_msg)
LOADING_LOGS.append(formatted_msg)
def get_h200_memory():
"""Get H200 memory stats"""
if HAS_CUDA:
try:
total = torch.cuda.get_device_properties(0).total_memory / (1024**3)
allocated = torch.cuda.memory_allocated(0) / (1024**3)
return total, allocated
except:
return 0, 0
return 0, 0
def load_working_model():
"""Load first working model - CogVideoX priority"""
global MODEL, MODEL_INFO, LOADING_LOGS
if MODEL is not None:
return True
LOADING_LOGS = []
log_loading("🎯 H200 Working Model Loading - CogVideoX Priority")
total_mem, allocated_mem = get_h200_memory()
log_loading(f"πŸ’Ύ H200 Memory: {total_mem:.1f}GB total, {allocated_mem:.1f}GB allocated")
# Try models in priority order
sorted_models = sorted(WORKING_MODELS, key=lambda x: x["priority"])
for model_config in sorted_models:
if try_load_working_model(model_config):
return True
log_loading("❌ All working models failed")
return False
def try_load_working_model(config):
"""Try loading a specific working model"""
global MODEL, MODEL_INFO
model_id = config["id"]
model_name = config["name"]
log_loading(f"πŸ”„ Loading {model_name}...")
log_loading(f" πŸ“‹ Config: {model_id}")
log_loading(f" 🎯 Target: {config['max_frames']} frames, {config['fps']} fps, {config['resolution']}")
try:
# Clear H200 memory first
if HAS_CUDA:
torch.cuda.empty_cache()
torch.cuda.synchronize()
gc.collect()
log_loading(f" 🧹 Memory cleared")
# Import appropriate pipeline
if config["pipeline_class"] == "CogVideoXPipeline":
try:
from diffusers import CogVideoXPipeline
PipelineClass = CogVideoXPipeline
log_loading(f" πŸ“₯ Using CogVideoXPipeline")
except ImportError as e:
log_loading(f" ❌ CogVideoXPipeline import failed: {e}")
return False
else:
from diffusers import DiffusionPipeline
PipelineClass = DiffusionPipeline
log_loading(f" πŸ“₯ Using DiffusionPipeline")
# Load model with minimal parameters
log_loading(f" πŸ”„ Downloading/Loading {model_name}...")
start_load = time.time()
pipe = PipelineClass.from_pretrained(
model_id,
torch_dtype=config["dtype"],
trust_remote_code=True
)
load_time = time.time() - start_load
log_loading(f" βœ… Model loaded in {load_time:.1f}s")
# Move to H200 GPU
if HAS_CUDA:
log_loading(f" πŸ“± Moving to H200 CUDA...")
pipe = pipe.to("cuda")
torch.cuda.synchronize()
log_loading(f" βœ… Model on H200 GPU")
# H200 optimizations
if hasattr(pipe, 'enable_vae_slicing'):
pipe.enable_vae_slicing()
log_loading(f" ⚑ VAE slicing enabled")
if hasattr(pipe, 'enable_vae_tiling'):
pipe.enable_vae_tiling()
log_loading(f" ⚑ VAE tiling enabled")
if hasattr(pipe, 'enable_memory_efficient_attention'):
pipe.enable_memory_efficient_attention()
log_loading(f" ⚑ Memory efficient attention enabled")
# Memory check after setup
total_mem, allocated_mem = get_h200_memory()
log_loading(f" πŸ’Ύ Final memory: {allocated_mem:.1f}GB / {total_mem:.1f}GB")
MODEL = pipe
MODEL_INFO = config
log_loading(f"🎯 SUCCESS: {model_name} ready for generation!")
log_loading(f"πŸ“Š Capabilities: {config['max_frames']} frames @ {config['fps']} fps = {config['max_frames']/config['fps']:.1f}s videos")
return True
except Exception as e:
log_loading(f"❌ {model_name} failed: {str(e)}")
# Thorough cleanup
if HAS_CUDA:
torch.cuda.empty_cache()
torch.cuda.synchronize()
gc.collect()
return False
@spaces.GPU(duration=300) if SPACES_AVAILABLE else lambda x: x
def generate_video(
prompt: str,
negative_prompt: str = "",
num_frames: int = 49,
num_inference_steps: int = 50,
guidance_scale: float = 6.0,
seed: int = -1
) -> Tuple[Optional[str], str]:
"""Generate video with working model"""
global MODEL, MODEL_INFO
# Load working model
if not load_working_model():
logs = "\n".join(LOADING_LOGS[-10:])
return None, f"❌ No working models could be loaded\n\nDetailed Logs:\n{logs}"
# Input validation
if not prompt.strip():
return None, "❌ Please enter a detailed prompt."
if len(prompt) < 5:
return None, "❌ Please provide a more descriptive prompt."
# Get model specifications
max_frames = MODEL_INFO["max_frames"]
width, height = MODEL_INFO["resolution"]
target_fps = MODEL_INFO["fps"]
# Validate and adjust parameters
num_frames = min(max(num_frames, 8), max_frames)
# Model-specific optimizations
if MODEL_INFO["name"].startswith("CogVideoX"):
# CogVideoX optimal settings
guidance_scale = max(6.0, min(guidance_scale, 7.0))
num_inference_steps = max(50, num_inference_steps)
try:
# H200 memory preparation
start_memory = torch.cuda.memory_allocated(0) / (1024**3) if HAS_CUDA else 0
# Seed handling
if seed == -1:
seed = np.random.randint(0, 2**32 - 1)
device = "cuda" if HAS_CUDA else "cpu"
generator = torch.Generator(device=device).manual_seed(seed)
log_loading(f"🎬 GENERATION START - {MODEL_INFO['name']}")
log_loading(f"πŸ“ Prompt: {prompt[:80]}...")
log_loading(f"πŸ“ Settings: {width}x{height}, {num_frames} frames, {num_inference_steps} steps")
log_loading(f"🎯 Expected duration: {num_frames/target_fps:.1f} seconds @ {target_fps} fps")
start_time = time.time()
# Generate with proper autocast
with torch.autocast(device, dtype=MODEL_INFO["dtype"], enabled=HAS_CUDA):
# Prepare generation parameters
gen_kwargs = {
"prompt": prompt,
"height": height,
"width": width,
"num_frames": num_frames,
"num_inference_steps": num_inference_steps,
"guidance_scale": guidance_scale,
"generator": generator,
}
# Enhanced negative prompt for quality
if negative_prompt.strip():
gen_kwargs["negative_prompt"] = negative_prompt
else:
# Default quality negative prompt
quality_negative = "blurry, low quality, distorted, pixelated, compression artifacts, static, boring, amateur, watermark, text"
gen_kwargs["negative_prompt"] = quality_negative
log_loading(f"🚫 Applied quality negative prompt")
# CogVideoX specific parameters
if MODEL_INFO["name"].startswith("CogVideoX"):
gen_kwargs["num_videos_per_prompt"] = 1
log_loading(f"πŸŽ₯ CogVideoX generation starting...")
# Generate
log_loading(f"πŸš€ H200 generation in progress...")
result = MODEL(**gen_kwargs)
end_time = time.time()
generation_time = end_time - start_time
# Extract frames
if hasattr(result, 'frames'):
video_frames = result.frames[0]
log_loading(f"πŸ“Ή Extracted {len(video_frames)} frames")
elif hasattr(result, 'videos'):
video_frames = result.videos[0]
log_loading(f"πŸ“Ή Extracted video tensor")
else:
log_loading(f"❌ Unknown result format")
return None, "❌ Could not extract video frames"
# Export with correct FPS
actual_duration = num_frames / target_fps
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp_file:
from diffusers.utils import export_to_video
export_to_video(video_frames, tmp_file.name, fps=target_fps)
video_path = tmp_file.name
log_loading(f"🎬 Exported: {actual_duration:.1f}s video @ {target_fps} fps")
# Memory usage
end_memory = torch.cuda.memory_allocated(0) / (1024**3) if HAS_CUDA else 0
memory_used = end_memory - start_memory
# Success report
success_msg = f"""🎯 **H200 VIDEO GENERATED SUCCESSFULLY**
πŸ€– **Model:** {MODEL_INFO['name']}
πŸ“ **Prompt:** {prompt}
🎬 **Video:** {num_frames} frames @ {target_fps} fps = **{actual_duration:.1f} seconds**
πŸ“ **Resolution:** {width}x{height}
βš™οΈ **Quality:** {num_inference_steps} inference steps
🎯 **Guidance:** {guidance_scale}
🎲 **Seed:** {seed}
⏱️ **Generation Time:** {generation_time:.1f}s ({generation_time/60:.1f} min)
πŸ–₯️ **Device:** H200 MIG (69.5GB)
πŸ’Ύ **Memory Used:** {memory_used:.1f}GB
πŸ“‹ **Model:** {MODEL_INFO['description']}
**πŸŽ₯ Result:** {actual_duration:.1f} second high-quality video!**"""
log_loading(f"βœ… SUCCESS: {actual_duration:.1f}s video generated in {generation_time:.1f}s")
return video_path, success_msg
except torch.cuda.OutOfMemoryError:
if HAS_CUDA:
torch.cuda.empty_cache()
gc.collect()
return None, "❌ H200 memory exceeded. Try reducing frames or steps."
except Exception as e:
if HAS_CUDA:
torch.cuda.empty_cache()
gc.collect()
error_msg = str(e)
log_loading(f"❌ Generation error: {error_msg}")
return None, f"❌ Generation failed: {error_msg}"
def get_model_status():
"""Get current model status"""
if MODEL is None:
return "⏳ **No model loaded** - will auto-load CogVideoX on first generation"
name = MODEL_INFO['name']
max_frames = MODEL_INFO['max_frames']
fps = MODEL_INFO['fps']
width, height = MODEL_INFO['resolution']
max_duration = max_frames / fps
return f"""🎯 **{name} READY**
**πŸ“Š Video Capabilities:**
- **Maximum Duration:** {max_duration:.1f} seconds ({max_frames} frames @ {fps} fps)
- **Resolution:** {width}x{height}
- **Quality Level:** {MODEL_INFO['description']}
**⚑ H200 Status:**
- Model fully loaded in GPU memory
- All optimizations enabled
- Ready for {max_duration:.1f} second video generation
**πŸ’‘ This model creates {max_duration:.1f} second videos with {max_frames} frames!**"""
def get_loading_logs():
"""Get formatted loading logs"""
global LOADING_LOGS
if not LOADING_LOGS:
return "No loading logs yet. Click generate to start loading."
return "\n".join(LOADING_LOGS)
def suggest_optimal_settings():
"""Suggest optimal settings for loaded model"""
if MODEL is None:
return "No model loaded yet. Generate a video to auto-load CogVideoX."
name = MODEL_INFO['name']
max_frames = MODEL_INFO['max_frames']
fps = MODEL_INFO['fps']
max_duration = max_frames / fps
return f"""## 🎯 Optimal Settings for {name}
**πŸ† Maximum Quality (Recommended):**
- Frames: {max_frames} (full {max_duration:.1f} second video)
- Inference Steps: 50-70
- Guidance Scale: 6.0-6.5
- Expected Time: 3-5 minutes
**βš–οΈ Balanced Quality:**
- Frames: {max_frames//2} ({max_frames//2/fps:.1f} second video)
- Inference Steps: 40-50
- Guidance Scale: 6.0
- Expected Time: 2-3 minutes
**⚑ Quick Test:**
- Frames: 25 ({25/fps:.1f} second video)
- Inference Steps: 30-40
- Guidance Scale: 6.0
- Expected Time: 1-2 minutes
**πŸ“ {name} Prompt Tips:**
- Be very specific and detailed
- Describe camera movements: "slow zoom in", "tracking shot", "aerial view"
- Include lighting: "golden hour", "soft lighting", "dramatic shadows"
- Add motion description: "smooth movement", "graceful motion", "flowing"
- Specify style: "cinematic", "professional", "documentary style"
**πŸ† Example Premium Prompt:**
"A majestic eagle soaring gracefully through mountain valleys during golden hour, cinematic aerial tracking shot following the bird's smooth flight path, professional wildlife documentary style with warm sunset lighting, breathtaking landscape vista below"
Remember: {name} excels at smooth, natural motion and cinematic quality!"""
# Create working interface
with gr.Blocks(title="H200 CogVideoX Generator", theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# 🎯 H200 CogVideoX Video Generator
**CogVideoX-2B/5B Priority** β€’ **6+ Second Videos** β€’ **H200 MIG Optimized**
""")
# Status indicator
with gr.Row():
gr.Markdown("""
<div style="background: linear-gradient(45deg, #4ECDC4, #44A08D); padding: 12px; border-radius: 12px; text-align: center; color: white; font-weight: bold;">
πŸš€ H200 MIG 69.5GB - COGVIDEOX READY - 6+ SECOND VIDEOS πŸš€
</div>
""")
with gr.Tab("🎬 Generate Video"):
with gr.Row():
with gr.Column(scale=1):
prompt_input = gr.Textbox(
label="πŸ“ Detailed Video Prompt",
placeholder="A majestic eagle soaring gracefully through mountain valleys during golden hour, cinematic aerial tracking shot following the bird's smooth flight path, professional wildlife documentary style with warm sunset lighting, breathtaking landscape vista below...",
lines=4
)
negative_prompt_input = gr.Textbox(
label="🚫 Negative Prompt (Optional)",
placeholder="blurry, low quality, distorted, pixelated, static, boring, amateur...",
lines=2
)
with gr.Accordion("βš™οΈ Generation Settings", open=True):
with gr.Row():
num_frames = gr.Slider(
minimum=8,
maximum=49,
value=49,
step=1,
label="🎬 Frames (49 = 6+ seconds)"
)
num_steps = gr.Slider(
minimum=30,
maximum=70,
value=50,
step=5,
label="βš™οΈ Inference Steps"
)
with gr.Row():
guidance_scale = gr.Slider(
minimum=4.0,
maximum=8.0,
value=6.0,
step=0.5,
label="🎯 Guidance Scale"
)
seed = gr.Number(
label="🎲 Seed (-1 for random)",
value=-1,
precision=0
)
generate_btn = gr.Button(
"🎯 Generate 6+ Second Video",
variant="primary",
size="lg"
)
gr.Markdown("""
**⏱️ Generation Time:** 2-5 minutes
**πŸŽ₯ Output:** 6+ second high-quality videos
**πŸ€– Model:** CogVideoX auto-loads first time
""")
with gr.Column(scale=1):
video_output = gr.Video(
label="πŸŽ₯ H200 Generated Video",
height=400
)
result_text = gr.Textbox(
label="πŸ“‹ Generation Report",
lines=10,
show_copy_button=True
)
# Generate button
generate_btn.click(
fn=generate_video,
inputs=[
prompt_input, negative_prompt_input, num_frames,
num_steps, guidance_scale, seed
],
outputs=[video_output, result_text]
)
# Working examples
gr.Examples(
examples=[
[
"A majestic eagle soaring gracefully through mountain valleys during golden hour, cinematic aerial tracking shot, professional wildlife documentary style",
"blurry, low quality, static, amateur",
49, 50, 6.0, 42
],
[
"Ocean waves crashing against rocky coastline during sunset, slow motion cinematography with dramatic lighting and foam spray",
"calm, peaceful, low quality, boring",
41, 50, 6.5, 123
],
[
"A serene mountain lake reflecting autumn trees, gentle camera pan across the water surface, peaceful nature documentary style",
"urban, modern, low quality, distorted",
33, 45, 6.0, 456
],
[
"Steam rising from a hot coffee cup on wooden table by window during rain, cozy atmosphere with warm lighting, intimate close-up shot",
"cold, harsh, artificial, low quality",
25, 40, 6.0, 789
]
],
inputs=[prompt_input, negative_prompt_input, num_frames, num_steps, guidance_scale, seed]
)
with gr.Tab("πŸ“Š Model Status"):
with gr.Row():
status_btn = gr.Button("πŸ” Check Model Status")
logs_btn = gr.Button("πŸ“‹ View Loading Logs")
settings_btn = gr.Button("βš™οΈ Optimal Settings")
status_output = gr.Markdown()
logs_output = gr.Textbox(label="Loading Logs", lines=15, show_copy_button=True)
settings_output = gr.Markdown()
status_btn.click(fn=get_model_status, outputs=status_output)
logs_btn.click(fn=get_loading_logs, outputs=logs_output)
settings_btn.click(fn=suggest_optimal_settings, outputs=settings_output)
# Auto-load status
demo.load(fn=get_model_status, outputs=status_output)
if __name__ == "__main__":
demo.queue(max_size=3)
demo.launch(
share=False,
server_name="0.0.0.0",
server_port=7860,
show_error=True
)