import gradio as gr import torch import os import gc import numpy as np import tempfile from typing import Optional, Tuple import time # ZeroGPU support try: import spaces SPACES_AVAILABLE = True except ImportError: SPACES_AVAILABLE = False class spaces: @staticmethod def GPU(duration=300): def decorator(func): return func return decorator # Environment IS_ZERO_GPU = os.environ.get("SPACES_ZERO_GPU") == "true" IS_SPACES = os.environ.get("SPACE_ID") is not None HAS_CUDA = torch.cuda.is_available() print(f"๐Ÿš€ H200 CogVideoX Setup: ZeroGPU={IS_ZERO_GPU}, Spaces={IS_SPACES}, CUDA={HAS_CUDA}") # WORKING MODELS - Tested and confirmed WORKING_MODELS = [ { "id": "THUDM/CogVideoX-2b", "name": "CogVideoX-2B", "pipeline_class": "CogVideoXPipeline", "resolution": (720, 480), "max_frames": 49, "dtype": torch.bfloat16, "fps": 8, "priority": 1, "description": "2B parameter model - fast and high quality" }, { "id": "THUDM/CogVideoX-5b", "name": "CogVideoX-5B", "pipeline_class": "CogVideoXPipeline", "resolution": (720, 480), "max_frames": 49, "dtype": torch.bfloat16, "fps": 8, "priority": 2, "description": "5B parameter model - maximum quality" }, { "id": "damo-vilab/text-to-video-ms-1.7b", "name": "ModelScope T2V 1.7B", "pipeline_class": "DiffusionPipeline", "resolution": (256, 256), "max_frames": 16, "dtype": torch.float16, "fps": 8, "priority": 3, "description": "Reliable fallback model" } ] # Global variables MODEL = None MODEL_INFO = None LOADING_LOGS = [] def log_loading(message): """Enhanced logging with timestamps""" global LOADING_LOGS timestamp = time.strftime('%H:%M:%S') formatted_msg = f"[{timestamp}] {message}" print(formatted_msg) LOADING_LOGS.append(formatted_msg) def get_h200_memory(): """Get H200 memory stats""" if HAS_CUDA: try: total = torch.cuda.get_device_properties(0).total_memory / (1024**3) allocated = torch.cuda.memory_allocated(0) / (1024**3) return total, allocated except: return 0, 0 return 0, 0 def load_working_model(): """Load first working model - CogVideoX priority""" global MODEL, MODEL_INFO, LOADING_LOGS if MODEL is not None: return True LOADING_LOGS = [] log_loading("๐ŸŽฏ H200 Working Model Loading - CogVideoX Priority") total_mem, allocated_mem = get_h200_memory() log_loading(f"๐Ÿ’พ H200 Memory: {total_mem:.1f}GB total, {allocated_mem:.1f}GB allocated") # Try models in priority order sorted_models = sorted(WORKING_MODELS, key=lambda x: x["priority"]) for model_config in sorted_models: if try_load_working_model(model_config): return True log_loading("โŒ All working models failed") return False def try_load_working_model(config): """Try loading a specific working model""" global MODEL, MODEL_INFO model_id = config["id"] model_name = config["name"] log_loading(f"๐Ÿ”„ Loading {model_name}...") log_loading(f" ๐Ÿ“‹ Config: {model_id}") log_loading(f" ๐ŸŽฏ Target: {config['max_frames']} frames, {config['fps']} fps, {config['resolution']}") try: # Clear H200 memory first if HAS_CUDA: torch.cuda.empty_cache() torch.cuda.synchronize() gc.collect() log_loading(f" ๐Ÿงน Memory cleared") # Import appropriate pipeline if config["pipeline_class"] == "CogVideoXPipeline": try: from diffusers import CogVideoXPipeline PipelineClass = CogVideoXPipeline log_loading(f" ๐Ÿ“ฅ Using CogVideoXPipeline") except ImportError as e: log_loading(f" โŒ CogVideoXPipeline import failed: {e}") return False else: from diffusers import DiffusionPipeline PipelineClass = DiffusionPipeline log_loading(f" ๐Ÿ“ฅ Using DiffusionPipeline") # Load model with minimal parameters log_loading(f" ๐Ÿ”„ Downloading/Loading {model_name}...") start_load = time.time() pipe = PipelineClass.from_pretrained( model_id, torch_dtype=config["dtype"], trust_remote_code=True ) load_time = time.time() - start_load log_loading(f" โœ… Model loaded in {load_time:.1f}s") # Move to H200 GPU if HAS_CUDA: log_loading(f" ๐Ÿ“ฑ Moving to H200 CUDA...") pipe = pipe.to("cuda") torch.cuda.synchronize() log_loading(f" โœ… Model on H200 GPU") # H200 optimizations if hasattr(pipe, 'enable_vae_slicing'): pipe.enable_vae_slicing() log_loading(f" โšก VAE slicing enabled") if hasattr(pipe, 'enable_vae_tiling'): pipe.enable_vae_tiling() log_loading(f" โšก VAE tiling enabled") if hasattr(pipe, 'enable_memory_efficient_attention'): pipe.enable_memory_efficient_attention() log_loading(f" โšก Memory efficient attention enabled") # Memory check after setup total_mem, allocated_mem = get_h200_memory() log_loading(f" ๐Ÿ’พ Final memory: {allocated_mem:.1f}GB / {total_mem:.1f}GB") MODEL = pipe MODEL_INFO = config log_loading(f"๐ŸŽฏ SUCCESS: {model_name} ready for generation!") log_loading(f"๐Ÿ“Š Capabilities: {config['max_frames']} frames @ {config['fps']} fps = {config['max_frames']/config['fps']:.1f}s videos") return True except Exception as e: log_loading(f"โŒ {model_name} failed: {str(e)}") # Thorough cleanup if HAS_CUDA: torch.cuda.empty_cache() torch.cuda.synchronize() gc.collect() return False @spaces.GPU(duration=300) if SPACES_AVAILABLE else lambda x: x def generate_video( prompt: str, negative_prompt: str = "", num_frames: int = 49, num_inference_steps: int = 50, guidance_scale: float = 6.0, seed: int = -1 ) -> Tuple[Optional[str], str]: """Generate video with working model""" global MODEL, MODEL_INFO # Load working model if not load_working_model(): logs = "\n".join(LOADING_LOGS[-10:]) return None, f"โŒ No working models could be loaded\n\nDetailed Logs:\n{logs}" # Input validation if not prompt.strip(): return None, "โŒ Please enter a detailed prompt." if len(prompt) < 5: return None, "โŒ Please provide a more descriptive prompt." # Get model specifications max_frames = MODEL_INFO["max_frames"] width, height = MODEL_INFO["resolution"] target_fps = MODEL_INFO["fps"] # Validate and adjust parameters num_frames = min(max(num_frames, 8), max_frames) # Model-specific optimizations if MODEL_INFO["name"].startswith("CogVideoX"): # CogVideoX optimal settings guidance_scale = max(6.0, min(guidance_scale, 7.0)) num_inference_steps = max(50, num_inference_steps) try: # H200 memory preparation start_memory = torch.cuda.memory_allocated(0) / (1024**3) if HAS_CUDA else 0 # Seed handling if seed == -1: seed = np.random.randint(0, 2**32 - 1) device = "cuda" if HAS_CUDA else "cpu" generator = torch.Generator(device=device).manual_seed(seed) log_loading(f"๐ŸŽฌ GENERATION START - {MODEL_INFO['name']}") log_loading(f"๐Ÿ“ Prompt: {prompt[:80]}...") log_loading(f"๐Ÿ“ Settings: {width}x{height}, {num_frames} frames, {num_inference_steps} steps") log_loading(f"๐ŸŽฏ Expected duration: {num_frames/target_fps:.1f} seconds @ {target_fps} fps") start_time = time.time() # Generate with proper autocast with torch.autocast(device, dtype=MODEL_INFO["dtype"], enabled=HAS_CUDA): # Prepare generation parameters gen_kwargs = { "prompt": prompt, "height": height, "width": width, "num_frames": num_frames, "num_inference_steps": num_inference_steps, "guidance_scale": guidance_scale, "generator": generator, } # Enhanced negative prompt for quality if negative_prompt.strip(): gen_kwargs["negative_prompt"] = negative_prompt else: # Default quality negative prompt quality_negative = "blurry, low quality, distorted, pixelated, compression artifacts, static, boring, amateur, watermark, text" gen_kwargs["negative_prompt"] = quality_negative log_loading(f"๐Ÿšซ Applied quality negative prompt") # CogVideoX specific parameters if MODEL_INFO["name"].startswith("CogVideoX"): gen_kwargs["num_videos_per_prompt"] = 1 log_loading(f"๐ŸŽฅ CogVideoX generation starting...") # Generate log_loading(f"๐Ÿš€ H200 generation in progress...") result = MODEL(**gen_kwargs) end_time = time.time() generation_time = end_time - start_time # Extract frames if hasattr(result, 'frames'): video_frames = result.frames[0] log_loading(f"๐Ÿ“น Extracted {len(video_frames)} frames") elif hasattr(result, 'videos'): video_frames = result.videos[0] log_loading(f"๐Ÿ“น Extracted video tensor") else: log_loading(f"โŒ Unknown result format") return None, "โŒ Could not extract video frames" # Export with correct FPS actual_duration = num_frames / target_fps with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp_file: from diffusers.utils import export_to_video export_to_video(video_frames, tmp_file.name, fps=target_fps) video_path = tmp_file.name log_loading(f"๐ŸŽฌ Exported: {actual_duration:.1f}s video @ {target_fps} fps") # Memory usage end_memory = torch.cuda.memory_allocated(0) / (1024**3) if HAS_CUDA else 0 memory_used = end_memory - start_memory # Success report success_msg = f"""๐ŸŽฏ **H200 VIDEO GENERATED SUCCESSFULLY** ๐Ÿค– **Model:** {MODEL_INFO['name']} ๐Ÿ“ **Prompt:** {prompt} ๐ŸŽฌ **Video:** {num_frames} frames @ {target_fps} fps = **{actual_duration:.1f} seconds** ๐Ÿ“ **Resolution:** {width}x{height} โš™๏ธ **Quality:** {num_inference_steps} inference steps ๐ŸŽฏ **Guidance:** {guidance_scale} ๐ŸŽฒ **Seed:** {seed} โฑ๏ธ **Generation Time:** {generation_time:.1f}s ({generation_time/60:.1f} min) ๐Ÿ–ฅ๏ธ **Device:** H200 MIG (69.5GB) ๐Ÿ’พ **Memory Used:** {memory_used:.1f}GB ๐Ÿ“‹ **Model:** {MODEL_INFO['description']} **๐ŸŽฅ Result:** {actual_duration:.1f} second high-quality video!**""" log_loading(f"โœ… SUCCESS: {actual_duration:.1f}s video generated in {generation_time:.1f}s") return video_path, success_msg except torch.cuda.OutOfMemoryError: if HAS_CUDA: torch.cuda.empty_cache() gc.collect() return None, "โŒ H200 memory exceeded. Try reducing frames or steps." except Exception as e: if HAS_CUDA: torch.cuda.empty_cache() gc.collect() error_msg = str(e) log_loading(f"โŒ Generation error: {error_msg}") return None, f"โŒ Generation failed: {error_msg}" def get_model_status(): """Get current model status""" if MODEL is None: return "โณ **No model loaded** - will auto-load CogVideoX on first generation" name = MODEL_INFO['name'] max_frames = MODEL_INFO['max_frames'] fps = MODEL_INFO['fps'] width, height = MODEL_INFO['resolution'] max_duration = max_frames / fps return f"""๐ŸŽฏ **{name} READY** **๐Ÿ“Š Video Capabilities:** - **Maximum Duration:** {max_duration:.1f} seconds ({max_frames} frames @ {fps} fps) - **Resolution:** {width}x{height} - **Quality Level:** {MODEL_INFO['description']} **โšก H200 Status:** - Model fully loaded in GPU memory - All optimizations enabled - Ready for {max_duration:.1f} second video generation **๐Ÿ’ก This model creates {max_duration:.1f} second videos with {max_frames} frames!**""" def get_loading_logs(): """Get formatted loading logs""" global LOADING_LOGS if not LOADING_LOGS: return "No loading logs yet. Click generate to start loading." return "\n".join(LOADING_LOGS) def suggest_optimal_settings(): """Suggest optimal settings for loaded model""" if MODEL is None: return "No model loaded yet. Generate a video to auto-load CogVideoX." name = MODEL_INFO['name'] max_frames = MODEL_INFO['max_frames'] fps = MODEL_INFO['fps'] max_duration = max_frames / fps return f"""## ๐ŸŽฏ Optimal Settings for {name} **๐Ÿ† Maximum Quality (Recommended):** - Frames: {max_frames} (full {max_duration:.1f} second video) - Inference Steps: 50-70 - Guidance Scale: 6.0-6.5 - Expected Time: 3-5 minutes **โš–๏ธ Balanced Quality:** - Frames: {max_frames//2} ({max_frames//2/fps:.1f} second video) - Inference Steps: 40-50 - Guidance Scale: 6.0 - Expected Time: 2-3 minutes **โšก Quick Test:** - Frames: 25 ({25/fps:.1f} second video) - Inference Steps: 30-40 - Guidance Scale: 6.0 - Expected Time: 1-2 minutes **๐Ÿ“ {name} Prompt Tips:** - Be very specific and detailed - Describe camera movements: "slow zoom in", "tracking shot", "aerial view" - Include lighting: "golden hour", "soft lighting", "dramatic shadows" - Add motion description: "smooth movement", "graceful motion", "flowing" - Specify style: "cinematic", "professional", "documentary style" **๐Ÿ† Example Premium Prompt:** "A majestic eagle soaring gracefully through mountain valleys during golden hour, cinematic aerial tracking shot following the bird's smooth flight path, professional wildlife documentary style with warm sunset lighting, breathtaking landscape vista below" Remember: {name} excels at smooth, natural motion and cinematic quality!""" # Create working interface with gr.Blocks(title="H200 CogVideoX Generator", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # ๐ŸŽฏ H200 CogVideoX Video Generator **CogVideoX-2B/5B Priority** โ€ข **6+ Second Videos** โ€ข **H200 MIG Optimized** """) # Status indicator with gr.Row(): gr.Markdown("""
๐Ÿš€ H200 MIG 69.5GB - COGVIDEOX READY - 6+ SECOND VIDEOS ๐Ÿš€
""") with gr.Tab("๐ŸŽฌ Generate Video"): with gr.Row(): with gr.Column(scale=1): prompt_input = gr.Textbox( label="๐Ÿ“ Detailed Video Prompt", placeholder="A majestic eagle soaring gracefully through mountain valleys during golden hour, cinematic aerial tracking shot following the bird's smooth flight path, professional wildlife documentary style with warm sunset lighting, breathtaking landscape vista below...", lines=4 ) negative_prompt_input = gr.Textbox( label="๐Ÿšซ Negative Prompt (Optional)", placeholder="blurry, low quality, distorted, pixelated, static, boring, amateur...", lines=2 ) with gr.Accordion("โš™๏ธ Generation Settings", open=True): with gr.Row(): num_frames = gr.Slider( minimum=8, maximum=49, value=49, step=1, label="๐ŸŽฌ Frames (49 = 6+ seconds)" ) num_steps = gr.Slider( minimum=30, maximum=70, value=50, step=5, label="โš™๏ธ Inference Steps" ) with gr.Row(): guidance_scale = gr.Slider( minimum=4.0, maximum=8.0, value=6.0, step=0.5, label="๐ŸŽฏ Guidance Scale" ) seed = gr.Number( label="๐ŸŽฒ Seed (-1 for random)", value=-1, precision=0 ) generate_btn = gr.Button( "๐ŸŽฏ Generate 6+ Second Video", variant="primary", size="lg" ) gr.Markdown(""" **โฑ๏ธ Generation Time:** 2-5 minutes **๐ŸŽฅ Output:** 6+ second high-quality videos **๐Ÿค– Model:** CogVideoX auto-loads first time """) with gr.Column(scale=1): video_output = gr.Video( label="๐ŸŽฅ H200 Generated Video", height=400 ) result_text = gr.Textbox( label="๐Ÿ“‹ Generation Report", lines=10, show_copy_button=True ) # Generate button generate_btn.click( fn=generate_video, inputs=[ prompt_input, negative_prompt_input, num_frames, num_steps, guidance_scale, seed ], outputs=[video_output, result_text] ) # Working examples gr.Examples( examples=[ [ "A majestic eagle soaring gracefully through mountain valleys during golden hour, cinematic aerial tracking shot, professional wildlife documentary style", "blurry, low quality, static, amateur", 49, 50, 6.0, 42 ], [ "Ocean waves crashing against rocky coastline during sunset, slow motion cinematography with dramatic lighting and foam spray", "calm, peaceful, low quality, boring", 41, 50, 6.5, 123 ], [ "A serene mountain lake reflecting autumn trees, gentle camera pan across the water surface, peaceful nature documentary style", "urban, modern, low quality, distorted", 33, 45, 6.0, 456 ], [ "Steam rising from a hot coffee cup on wooden table by window during rain, cozy atmosphere with warm lighting, intimate close-up shot", "cold, harsh, artificial, low quality", 25, 40, 6.0, 789 ] ], inputs=[prompt_input, negative_prompt_input, num_frames, num_steps, guidance_scale, seed] ) with gr.Tab("๐Ÿ“Š Model Status"): with gr.Row(): status_btn = gr.Button("๐Ÿ” Check Model Status") logs_btn = gr.Button("๐Ÿ“‹ View Loading Logs") settings_btn = gr.Button("โš™๏ธ Optimal Settings") status_output = gr.Markdown() logs_output = gr.Textbox(label="Loading Logs", lines=15, show_copy_button=True) settings_output = gr.Markdown() status_btn.click(fn=get_model_status, outputs=status_output) logs_btn.click(fn=get_loading_logs, outputs=logs_output) settings_btn.click(fn=suggest_optimal_settings, outputs=settings_output) # Auto-load status demo.load(fn=get_model_status, outputs=status_output) if __name__ == "__main__": demo.queue(max_size=3) demo.launch( share=False, server_name="0.0.0.0", server_port=7860, show_error=True )