import os import subprocess from datetime import datetime from pathlib import Path import gradio as gr import numpy as np import os # ----------------------------- # Setup paths and env # ----------------------------- HF_HOME = "/app/hf_cache" os.environ["HF_HOME"] = HF_HOME os.environ["TRANSFORMERS_CACHE"] = HF_HOME os.makedirs(HF_HOME, exist_ok=True) PRETRAINED_DIR = "/app/pretrained" os.makedirs(PRETRAINED_DIR, exist_ok=True) # ----------------------------- # Step 1: Optional Model Download # ----------------------------- def download_models(): expected_model = os.path.join(PRETRAINED_DIR, "RAFT/raft-things.pth") if not Path(expected_model).exists(): print("⚙️ Downloading pretrained models...") try: subprocess.check_call(["bash", "download/download_models.sh"]) print("✅ Models downloaded.") except subprocess.CalledProcessError as e: print(f"Model download failed: {e}") else: print("✅ Pretrained models already exist.") # ----------------------------- # Step 1: Get Anchor Video # ----------------------------- def get_anchor_video(video_path, fps, num_frames, target_pose, mode, radius_scale, near_far_estimated, sampler_name, diffusion_guidance_scale, diffusion_inference_steps, prompt, negative_prompt, refine_prompt, depth_inference_steps, depth_guidance_scale, window_size, overlap, max_res, sample_size, seed_input, height, width, aspect_ratio_inputs, init_dx, init_dy, init_dz): temp_input_path = "/app/temp_input.mp4" output_dir = "/app/output_anchor" video_output_path = f"{output_dir}/masked_videos/output.mp4" if video_path: os.system(f"cp '{video_path}' {temp_input_path}") try: theta, phi, r, x, y = target_pose.strip().split() except ValueError: return f"Invalid target pose format. Use: θ φ r x y", None, None logs = f"Running inference with target pose: θ={theta}, φ={phi}, r={r}, x={x}, y={y}\n" w, h = aspect_ratio_inputs.strip().split(",") h_s, w_s = sample_size.strip().split(",") command = [ "python", "/app/inference/v2v_data/inference.py", "--video_path", temp_input_path, "--stride", "1", "--out_dir", output_dir, "--radius_scale", str(radius_scale), "--camera", "target", "--mask", "--target_pose", theta, phi, r, x, y, "--video_length", str(num_frames), "--save_name", "output", "--mode", mode, "--fps", str(fps), "--depth_inference_steps", str(depth_inference_steps), "--depth_guidance_scale", str(depth_guidance_scale), "--near_far_estimated", str(near_far_estimated), "--sampler_name", sampler_name, "--diffusion_guidance_scale", str(diffusion_guidance_scale), "--diffusion_inference_steps", str(diffusion_inference_steps), "--prompt", prompt if prompt else "", "--negative_prompt", negative_prompt, "--refine_prompt", refine_prompt, "--window_size", str(window_size), "--overlap", str(overlap), "--max_res", str(max_res), "--sample_size", h_s.strip(), w_s.strip(), "--seed", str(seed_input), "--height", str(height), "--width", str(width), "--target_aspect_ratio", w.strip(), h.strip(), "--init_dx", str(init_dx), "--init_dy", str(init_dy), "--init_dz", str(init_dz), ] try: result = subprocess.run(command, capture_output=True, text=True, check=True) logs += result.stdout except subprocess.CalledProcessError as e: logs += f"Inference failed:\n{e.stderr}{e.stdout}" return None, logs return str(video_output_path), logs # ----------------------------- # Step 2: Run Inference # ----------------------------- def inference( fps, num_frames, controlnet_weights, controlnet_guidance_start, controlnet_guidance_end, guidance_scale, num_inference_steps, dtype, seed, height, width, downscale_coef, vae_channels, controlnet_input_channels, controlnet_transformer_num_layers ): model_path = "/app/pretrained/CogVideoX-5b-I2V" ckpt_path = "/app/out/EPiC_pretrained/checkpoint-500.pt" video_root_dir = "/app/output_anchor" out_dir = "/app/output" command = [ "python", "/app/inference/cli_demo_camera_i2v_pcd.py", "--video_root_dir", video_root_dir, "--base_model_path", model_path, "--controlnet_model_path", ckpt_path, "--output_path", out_dir, "--controlnet_weights", str(controlnet_weights), "--controlnet_guidance_start", str(controlnet_guidance_start), "--controlnet_guidance_end", str(controlnet_guidance_end), "--guidance_scale", str(guidance_scale), "--num_inference_steps", str(num_inference_steps), "--dtype", dtype, "--seed", str(seed), "--height", str(height), "--width", str(width), "--num_frames", str(num_frames), "--fps", str(fps), "--downscale_coef", str(downscale_coef), "--vae_channels", str(vae_channels), "--controlnet_input_channels", str(controlnet_input_channels), "--controlnet_transformer_num_layers", str(controlnet_transformer_num_layers), ] try: result = subprocess.run(command, capture_output=True, text=True, check=True) logs = result.stdout except subprocess.CalledProcessError as e: logs = f"❌ Step 2 Inference Failed:\nSTDERR:\n{e.stderr}\nSTDOUT:\n{e.stdout}" return None, logs video_output = f"{out_dir}/00000_{seed}_out.mp4" return video_output if os.path.exists(video_output) else None, logs # ----------------------------- # UI # ----------------------------- demo = gr.Blocks() with demo: gr.Markdown("## 🎬 EPiC: Cinematic Camera Control") with gr.Tabs(): with gr.TabItem("Step 1: Camera Anchor"): with gr.Row(): with gr.Column(): with gr.Row(): near_far_estimated = gr.Checkbox(label="Near Far Estimation", value=True) pose_input = gr.Textbox(label="Target Pose (θ φ r x y)", placeholder="e.g., 0 30 -0.6 0 0") fps_input = gr.Number(value=24, label="FPS") aspect_ratio_inputs=gr.Textbox(label="Target Aspect Ratio (e.g., 2,3)") init_dx = gr.Number(value=0.0, label="Start Camera Offset X") init_dy = gr.Number(value=0.0, label="Start Camera Offset Y") init_dz = gr.Number(value=0.0, label="Start Camera Offset Z") num_frames_input = gr.Number(value=49, label="Number of Frames") radius_input = gr.Number(value = 1.0, label="Radius Scale") mode_input = gr.Dropdown(choices=["gradual"], value="gradual", label="Camera Mode") sampler_input = gr.Dropdown(choices=["Euler", "Euler A", "DPM++", "PNDM", "DDIM_Cog", "DDIM_Origin"], value="DDIM_Origin", label="Sampler") diff_guidance_input = gr.Number(value=6.0, label="Diffusion Guidance") diff_steps_input = gr.Number(value=50, label="Diffusion Steps") depth_steps_input = gr.Number(value=5, label="Depth Steps") depth_guidance_input = gr.Number(value=1.0, label="Depth Guidance") window_input = gr.Number(value=64, label="Window Size") overlap_input = gr.Number(value=25, label="Overlap") maxres_input = gr.Number(value=1920, label="Max Resolution") sample_size = gr.Textbox(label="Sample Size (height, width)", placeholder="e.g., 384, 672", value="384, 672") seed_input = gr.Number(value=43, label="Seed") height = gr.Number(value=480, label="Height") width = gr.Number(value=720, label="Width") prompt_input = gr.Textbox(label="Prompt") neg_prompt_input = gr.Textbox(label="Negative Prompt", value="The video is not of a high quality, it has a low resolution. Watermark present in each frame. The background is solid. Strange body and strange trajectory.") refine_prompt_input = gr.Textbox(label="Refine Prompt", value=" The video is of high quality, and the view is very clear. ") with gr.Column(): video_input = gr.Video(label="Upload Video (MP4)") step1_button = gr.Button("▶️ Run Step 1") step1_video = gr.Video(label="[Step 1] Masked Video") step1_logs = gr.Textbox(label="[Step 1] Logs") with gr.TabItem("Step 2: CogVideoX Refinement"): with gr.Row(): with gr.Column(): with gr.Row(): controlnet_weights_input = gr.Number(value=0.5, label="ControlNet Weights") controlnet_guidance_start_input = gr.Number(value=0.0, label="Guidance Start") controlnet_guidance_end_input = gr.Number(value=0.5, label="Guidance End") guidance_scale_input = gr.Number(value=6.0, label="Guidance Scale") inference_steps_input = gr.Number(value=50, label="Num Inference Steps") dtype_input = gr.Dropdown(choices=["float16", "bfloat16"], value="bfloat16", label="Compute Dtype") seed_input2 = gr.Number(value=42, label="Seed") height_input = gr.Number(value=480, label="Height") width_input = gr.Number(value=720, label="Width") num_frames_input2 = gr.Number(value=49, label="Num Frames") fps_input2 = gr.Number(value=24, label="FPS") downscale_coef_input = gr.Number(value=8, label="Downscale Coef") vae_channels_input = gr.Number(value=16, label="VAE Channels") controlnet_input_channels_input = gr.Number(value=6, label="ControlNet Input Channels") controlnet_layers_input = gr.Number(value=8, label="ControlNet Transformer Layers") with gr.Column(): step2_video = gr.Video(label="[Step 2] Final Refined Video") step2_button = gr.Button("▶️ Run Step 2") step2_logs = gr.Textbox(label="[Step 2] Logs") step1_button.click( get_anchor_video, inputs=[ video_input, fps_input, num_frames_input, pose_input, mode_input, radius_input, near_far_estimated, sampler_input, diff_guidance_input, diff_steps_input, prompt_input, neg_prompt_input, refine_prompt_input, depth_steps_input, depth_guidance_input, window_input, overlap_input, maxres_input, sample_size, seed_input, height, width, aspect_ratio_inputs, init_dx, init_dy, init_dz # ← NEW INPUTS ], outputs=[step1_video, step1_logs] ) step2_button.click( inference, inputs=[ fps_input2, num_frames_input2, controlnet_weights_input, controlnet_guidance_start_input, controlnet_guidance_end_input, guidance_scale_input, inference_steps_input, dtype_input, seed_input2, height_input, width_input, downscale_coef_input, vae_channels_input, controlnet_input_channels_input, controlnet_layers_input ], outputs=[step2_video, step2_logs] ) if __name__ == "__main__": download_models() demo.launch(server_name="0.0.0.0", server_port=7860)