EPiC / gradio_app.py
Muhammad Taqi Raza
gradio
fd926cd
import os
import subprocess
from datetime import datetime
from pathlib import Path
import gradio as gr
import numpy as np
import os
# -----------------------------
# Setup paths and env
# -----------------------------
HF_HOME = "/app/hf_cache"
os.environ["HF_HOME"] = HF_HOME
os.environ["TRANSFORMERS_CACHE"] = HF_HOME
os.makedirs(HF_HOME, exist_ok=True)
PRETRAINED_DIR = "/app/pretrained"
os.makedirs(PRETRAINED_DIR, exist_ok=True)
# -----------------------------
# Step 1: Optional Model Download
# -----------------------------
def download_models():
expected_model = os.path.join(PRETRAINED_DIR, "RAFT/raft-things.pth")
if not Path(expected_model).exists():
print("⚙️ Downloading pretrained models...")
try:
subprocess.check_call(["bash", "download/download_models.sh"])
print("✅ Models downloaded.")
except subprocess.CalledProcessError as e:
print(f"Model download failed: {e}")
else:
print("✅ Pretrained models already exist.")
# -----------------------------
# Step 1: Get Anchor Video
# -----------------------------
def get_anchor_video(video_path, fps, num_frames, target_pose, mode,
radius_scale, near_far_estimated,
sampler_name, diffusion_guidance_scale, diffusion_inference_steps,
prompt, negative_prompt, refine_prompt,
depth_inference_steps, depth_guidance_scale,
window_size, overlap, max_res, sample_size,
seed_input, height, width, aspect_ratio_inputs,
init_dx, init_dy, init_dz):
temp_input_path = "/app/temp_input.mp4"
output_dir = "/app/output_anchor"
video_output_path = f"{output_dir}/masked_videos/output.mp4"
if video_path:
os.system(f"cp '{video_path}' {temp_input_path}")
try:
theta, phi, r, x, y = target_pose.strip().split()
except ValueError:
return f"Invalid target pose format. Use: θ φ r x y", None, None
logs = f"Running inference with target pose: θ={theta}, φ={phi}, r={r}, x={x}, y={y}\n"
w, h = aspect_ratio_inputs.strip().split(",")
h_s, w_s = sample_size.strip().split(",")
command = [
"python", "/app/inference/v2v_data/inference.py",
"--video_path", temp_input_path,
"--stride", "1",
"--out_dir", output_dir,
"--radius_scale", str(radius_scale),
"--camera", "target",
"--mask",
"--target_pose", theta, phi, r, x, y,
"--video_length", str(num_frames),
"--save_name", "output",
"--mode", mode,
"--fps", str(fps),
"--depth_inference_steps", str(depth_inference_steps),
"--depth_guidance_scale", str(depth_guidance_scale),
"--near_far_estimated", str(near_far_estimated),
"--sampler_name", sampler_name,
"--diffusion_guidance_scale", str(diffusion_guidance_scale),
"--diffusion_inference_steps", str(diffusion_inference_steps),
"--prompt", prompt if prompt else "",
"--negative_prompt", negative_prompt,
"--refine_prompt", refine_prompt,
"--window_size", str(window_size),
"--overlap", str(overlap),
"--max_res", str(max_res),
"--sample_size", h_s.strip(), w_s.strip(),
"--seed", str(seed_input),
"--height", str(height),
"--width", str(width),
"--target_aspect_ratio", w.strip(), h.strip(),
"--init_dx", str(init_dx),
"--init_dy", str(init_dy),
"--init_dz", str(init_dz),
]
try:
result = subprocess.run(command, capture_output=True, text=True, check=True)
logs += result.stdout
except subprocess.CalledProcessError as e:
logs += f"Inference failed:\n{e.stderr}{e.stdout}"
return None, logs
return str(video_output_path), logs
# -----------------------------
# Step 2: Run Inference
# -----------------------------
def inference(
fps, num_frames, controlnet_weights, controlnet_guidance_start,
controlnet_guidance_end, guidance_scale, num_inference_steps, dtype,
seed, height, width, downscale_coef, vae_channels,
controlnet_input_channels, controlnet_transformer_num_layers
):
model_path = "/app/pretrained/CogVideoX-5b-I2V"
ckpt_path = "/app/out/EPiC_pretrained/checkpoint-500.pt"
video_root_dir = "/app/output_anchor"
out_dir = "/app/output"
command = [
"python", "/app/inference/cli_demo_camera_i2v_pcd.py",
"--video_root_dir", video_root_dir,
"--base_model_path", model_path,
"--controlnet_model_path", ckpt_path,
"--output_path", out_dir,
"--controlnet_weights", str(controlnet_weights),
"--controlnet_guidance_start", str(controlnet_guidance_start),
"--controlnet_guidance_end", str(controlnet_guidance_end),
"--guidance_scale", str(guidance_scale),
"--num_inference_steps", str(num_inference_steps),
"--dtype", dtype,
"--seed", str(seed),
"--height", str(height),
"--width", str(width),
"--num_frames", str(num_frames),
"--fps", str(fps),
"--downscale_coef", str(downscale_coef),
"--vae_channels", str(vae_channels),
"--controlnet_input_channels", str(controlnet_input_channels),
"--controlnet_transformer_num_layers", str(controlnet_transformer_num_layers),
]
try:
result = subprocess.run(command, capture_output=True, text=True, check=True)
logs = result.stdout
except subprocess.CalledProcessError as e:
logs = f"❌ Step 2 Inference Failed:\nSTDERR:\n{e.stderr}\nSTDOUT:\n{e.stdout}"
return None, logs
video_output = f"{out_dir}/00000_{seed}_out.mp4"
return video_output if os.path.exists(video_output) else None, logs
# -----------------------------
# UI
# -----------------------------
demo = gr.Blocks()
with demo:
gr.Markdown("## 🎬 EPiC: Cinematic Camera Control")
with gr.Tabs():
with gr.TabItem("Step 1: Camera Anchor"):
with gr.Row():
with gr.Column():
with gr.Row():
near_far_estimated = gr.Checkbox(label="Near Far Estimation", value=True)
pose_input = gr.Textbox(label="Target Pose (θ φ r x y)", placeholder="e.g., 0 30 -0.6 0 0")
fps_input = gr.Number(value=24, label="FPS")
aspect_ratio_inputs=gr.Textbox(label="Target Aspect Ratio (e.g., 2,3)")
init_dx = gr.Number(value=0.0, label="Start Camera Offset X")
init_dy = gr.Number(value=0.0, label="Start Camera Offset Y")
init_dz = gr.Number(value=0.0, label="Start Camera Offset Z")
num_frames_input = gr.Number(value=49, label="Number of Frames")
radius_input = gr.Number(value = 1.0, label="Radius Scale")
mode_input = gr.Dropdown(choices=["gradual"], value="gradual", label="Camera Mode")
sampler_input = gr.Dropdown(choices=["Euler", "Euler A", "DPM++", "PNDM", "DDIM_Cog", "DDIM_Origin"], value="DDIM_Origin", label="Sampler")
diff_guidance_input = gr.Number(value=6.0, label="Diffusion Guidance")
diff_steps_input = gr.Number(value=50, label="Diffusion Steps")
depth_steps_input = gr.Number(value=5, label="Depth Steps")
depth_guidance_input = gr.Number(value=1.0, label="Depth Guidance")
window_input = gr.Number(value=64, label="Window Size")
overlap_input = gr.Number(value=25, label="Overlap")
maxres_input = gr.Number(value=1920, label="Max Resolution")
sample_size = gr.Textbox(label="Sample Size (height, width)", placeholder="e.g., 384, 672", value="384, 672")
seed_input = gr.Number(value=43, label="Seed")
height = gr.Number(value=480, label="Height")
width = gr.Number(value=720, label="Width")
prompt_input = gr.Textbox(label="Prompt")
neg_prompt_input = gr.Textbox(label="Negative Prompt", value="The video is not of a high quality, it has a low resolution. Watermark present in each frame. The background is solid. Strange body and strange trajectory.")
refine_prompt_input = gr.Textbox(label="Refine Prompt", value=" The video is of high quality, and the view is very clear. ")
with gr.Column():
video_input = gr.Video(label="Upload Video (MP4)")
step1_button = gr.Button("▶️ Run Step 1")
step1_video = gr.Video(label="[Step 1] Masked Video")
step1_logs = gr.Textbox(label="[Step 1] Logs")
with gr.TabItem("Step 2: CogVideoX Refinement"):
with gr.Row():
with gr.Column():
with gr.Row():
controlnet_weights_input = gr.Number(value=0.5, label="ControlNet Weights")
controlnet_guidance_start_input = gr.Number(value=0.0, label="Guidance Start")
controlnet_guidance_end_input = gr.Number(value=0.5, label="Guidance End")
guidance_scale_input = gr.Number(value=6.0, label="Guidance Scale")
inference_steps_input = gr.Number(value=50, label="Num Inference Steps")
dtype_input = gr.Dropdown(choices=["float16", "bfloat16"], value="bfloat16", label="Compute Dtype")
seed_input2 = gr.Number(value=42, label="Seed")
height_input = gr.Number(value=480, label="Height")
width_input = gr.Number(value=720, label="Width")
num_frames_input2 = gr.Number(value=49, label="Num Frames")
fps_input2 = gr.Number(value=24, label="FPS")
downscale_coef_input = gr.Number(value=8, label="Downscale Coef")
vae_channels_input = gr.Number(value=16, label="VAE Channels")
controlnet_input_channels_input = gr.Number(value=6, label="ControlNet Input Channels")
controlnet_layers_input = gr.Number(value=8, label="ControlNet Transformer Layers")
with gr.Column():
step2_video = gr.Video(label="[Step 2] Final Refined Video")
step2_button = gr.Button("▶️ Run Step 2")
step2_logs = gr.Textbox(label="[Step 2] Logs")
step1_button.click(
get_anchor_video,
inputs=[
video_input, fps_input, num_frames_input, pose_input, mode_input,
radius_input, near_far_estimated,
sampler_input, diff_guidance_input, diff_steps_input,
prompt_input, neg_prompt_input, refine_prompt_input,
depth_steps_input, depth_guidance_input,
window_input, overlap_input, maxres_input, sample_size,
seed_input, height, width, aspect_ratio_inputs,
init_dx, init_dy, init_dz # ← NEW INPUTS
],
outputs=[step1_video, step1_logs]
)
step2_button.click(
inference,
inputs=[
fps_input2, num_frames_input2,
controlnet_weights_input, controlnet_guidance_start_input,
controlnet_guidance_end_input, guidance_scale_input,
inference_steps_input, dtype_input, seed_input2,
height_input, width_input, downscale_coef_input,
vae_channels_input, controlnet_input_channels_input,
controlnet_layers_input
],
outputs=[step2_video, step2_logs]
)
if __name__ == "__main__":
download_models()
demo.launch(server_name="0.0.0.0", server_port=7860)