|
import os |
|
import subprocess |
|
from datetime import datetime |
|
from pathlib import Path |
|
import gradio as gr |
|
import numpy as np |
|
import os |
|
|
|
|
|
|
|
HF_HOME = "/app/hf_cache" |
|
os.environ["HF_HOME"] = HF_HOME |
|
os.environ["TRANSFORMERS_CACHE"] = HF_HOME |
|
os.makedirs(HF_HOME, exist_ok=True) |
|
|
|
PRETRAINED_DIR = "/app/pretrained" |
|
os.makedirs(PRETRAINED_DIR, exist_ok=True) |
|
|
|
|
|
|
|
|
|
def download_models(): |
|
expected_model = os.path.join(PRETRAINED_DIR, "RAFT/raft-things.pth") |
|
if not Path(expected_model).exists(): |
|
print("⚙️ Downloading pretrained models...") |
|
try: |
|
subprocess.check_call(["bash", "download/download_models.sh"]) |
|
print("✅ Models downloaded.") |
|
except subprocess.CalledProcessError as e: |
|
print(f"Model download failed: {e}") |
|
else: |
|
print("✅ Pretrained models already exist.") |
|
|
|
|
|
|
|
|
|
def get_anchor_video(video_path, fps, num_frames, target_pose, mode, |
|
radius_scale, near_far_estimated, |
|
sampler_name, diffusion_guidance_scale, diffusion_inference_steps, |
|
prompt, negative_prompt, refine_prompt, |
|
depth_inference_steps, depth_guidance_scale, |
|
window_size, overlap, max_res, sample_size, |
|
seed_input, height, width, aspect_ratio_inputs, |
|
init_dx, init_dy, init_dz): |
|
|
|
temp_input_path = "/app/temp_input.mp4" |
|
output_dir = "/app/output_anchor" |
|
video_output_path = f"{output_dir}/masked_videos/output.mp4" |
|
|
|
if video_path: |
|
os.system(f"cp '{video_path}' {temp_input_path}") |
|
|
|
try: |
|
theta, phi, r, x, y = target_pose.strip().split() |
|
except ValueError: |
|
return f"Invalid target pose format. Use: θ φ r x y", None, None |
|
logs = f"Running inference with target pose: θ={theta}, φ={phi}, r={r}, x={x}, y={y}\n" |
|
w, h = aspect_ratio_inputs.strip().split(",") |
|
h_s, w_s = sample_size.strip().split(",") |
|
|
|
command = [ |
|
"python", "/app/inference/v2v_data/inference.py", |
|
"--video_path", temp_input_path, |
|
"--stride", "1", |
|
"--out_dir", output_dir, |
|
"--radius_scale", str(radius_scale), |
|
"--camera", "target", |
|
"--mask", |
|
"--target_pose", theta, phi, r, x, y, |
|
"--video_length", str(num_frames), |
|
"--save_name", "output", |
|
"--mode", mode, |
|
"--fps", str(fps), |
|
"--depth_inference_steps", str(depth_inference_steps), |
|
"--depth_guidance_scale", str(depth_guidance_scale), |
|
"--near_far_estimated", str(near_far_estimated), |
|
"--sampler_name", sampler_name, |
|
"--diffusion_guidance_scale", str(diffusion_guidance_scale), |
|
"--diffusion_inference_steps", str(diffusion_inference_steps), |
|
"--prompt", prompt if prompt else "", |
|
"--negative_prompt", negative_prompt, |
|
"--refine_prompt", refine_prompt, |
|
"--window_size", str(window_size), |
|
"--overlap", str(overlap), |
|
"--max_res", str(max_res), |
|
"--sample_size", h_s.strip(), w_s.strip(), |
|
"--seed", str(seed_input), |
|
"--height", str(height), |
|
"--width", str(width), |
|
"--target_aspect_ratio", w.strip(), h.strip(), |
|
"--init_dx", str(init_dx), |
|
"--init_dy", str(init_dy), |
|
"--init_dz", str(init_dz), |
|
|
|
] |
|
|
|
try: |
|
result = subprocess.run(command, capture_output=True, text=True, check=True) |
|
logs += result.stdout |
|
except subprocess.CalledProcessError as e: |
|
logs += f"Inference failed:\n{e.stderr}{e.stdout}" |
|
return None, logs |
|
|
|
return str(video_output_path), logs |
|
|
|
|
|
|
|
def inference( |
|
fps, num_frames, controlnet_weights, controlnet_guidance_start, |
|
controlnet_guidance_end, guidance_scale, num_inference_steps, dtype, |
|
seed, height, width, downscale_coef, vae_channels, |
|
controlnet_input_channels, controlnet_transformer_num_layers |
|
): |
|
model_path = "/app/pretrained/CogVideoX-5b-I2V" |
|
ckpt_path = "/app/out/EPiC_pretrained/checkpoint-500.pt" |
|
video_root_dir = "/app/output_anchor" |
|
out_dir = "/app/output" |
|
|
|
command = [ |
|
"python", "/app/inference/cli_demo_camera_i2v_pcd.py", |
|
"--video_root_dir", video_root_dir, |
|
"--base_model_path", model_path, |
|
"--controlnet_model_path", ckpt_path, |
|
"--output_path", out_dir, |
|
"--controlnet_weights", str(controlnet_weights), |
|
"--controlnet_guidance_start", str(controlnet_guidance_start), |
|
"--controlnet_guidance_end", str(controlnet_guidance_end), |
|
"--guidance_scale", str(guidance_scale), |
|
"--num_inference_steps", str(num_inference_steps), |
|
"--dtype", dtype, |
|
"--seed", str(seed), |
|
"--height", str(height), |
|
"--width", str(width), |
|
"--num_frames", str(num_frames), |
|
"--fps", str(fps), |
|
"--downscale_coef", str(downscale_coef), |
|
"--vae_channels", str(vae_channels), |
|
"--controlnet_input_channels", str(controlnet_input_channels), |
|
"--controlnet_transformer_num_layers", str(controlnet_transformer_num_layers), |
|
|
|
] |
|
|
|
try: |
|
result = subprocess.run(command, capture_output=True, text=True, check=True) |
|
logs = result.stdout |
|
except subprocess.CalledProcessError as e: |
|
logs = f"❌ Step 2 Inference Failed:\nSTDERR:\n{e.stderr}\nSTDOUT:\n{e.stdout}" |
|
return None, logs |
|
video_output = f"{out_dir}/00000_{seed}_out.mp4" |
|
return video_output if os.path.exists(video_output) else None, logs |
|
|
|
|
|
|
|
|
|
demo = gr.Blocks() |
|
|
|
with demo: |
|
|
|
gr.Markdown("## 🎬 EPiC: Cinematic Camera Control") |
|
with gr.Tabs(): |
|
with gr.TabItem("Step 1: Camera Anchor"): |
|
with gr.Row(): |
|
with gr.Column(): |
|
with gr.Row(): |
|
near_far_estimated = gr.Checkbox(label="Near Far Estimation", value=True) |
|
pose_input = gr.Textbox(label="Target Pose (θ φ r x y)", placeholder="e.g., 0 30 -0.6 0 0") |
|
fps_input = gr.Number(value=24, label="FPS") |
|
aspect_ratio_inputs=gr.Textbox(label="Target Aspect Ratio (e.g., 2,3)") |
|
|
|
init_dx = gr.Number(value=0.0, label="Start Camera Offset X") |
|
init_dy = gr.Number(value=0.0, label="Start Camera Offset Y") |
|
init_dz = gr.Number(value=0.0, label="Start Camera Offset Z") |
|
|
|
num_frames_input = gr.Number(value=49, label="Number of Frames") |
|
radius_input = gr.Number(value = 1.0, label="Radius Scale") |
|
mode_input = gr.Dropdown(choices=["gradual"], value="gradual", label="Camera Mode") |
|
sampler_input = gr.Dropdown(choices=["Euler", "Euler A", "DPM++", "PNDM", "DDIM_Cog", "DDIM_Origin"], value="DDIM_Origin", label="Sampler") |
|
diff_guidance_input = gr.Number(value=6.0, label="Diffusion Guidance") |
|
diff_steps_input = gr.Number(value=50, label="Diffusion Steps") |
|
depth_steps_input = gr.Number(value=5, label="Depth Steps") |
|
depth_guidance_input = gr.Number(value=1.0, label="Depth Guidance") |
|
window_input = gr.Number(value=64, label="Window Size") |
|
overlap_input = gr.Number(value=25, label="Overlap") |
|
maxres_input = gr.Number(value=1920, label="Max Resolution") |
|
sample_size = gr.Textbox(label="Sample Size (height, width)", placeholder="e.g., 384, 672", value="384, 672") |
|
seed_input = gr.Number(value=43, label="Seed") |
|
height = gr.Number(value=480, label="Height") |
|
width = gr.Number(value=720, label="Width") |
|
prompt_input = gr.Textbox(label="Prompt") |
|
neg_prompt_input = gr.Textbox(label="Negative Prompt", value="The video is not of a high quality, it has a low resolution. Watermark present in each frame. The background is solid. Strange body and strange trajectory.") |
|
refine_prompt_input = gr.Textbox(label="Refine Prompt", value=" The video is of high quality, and the view is very clear. ") |
|
with gr.Column(): |
|
video_input = gr.Video(label="Upload Video (MP4)") |
|
|
|
step1_button = gr.Button("▶️ Run Step 1") |
|
step1_video = gr.Video(label="[Step 1] Masked Video") |
|
step1_logs = gr.Textbox(label="[Step 1] Logs") |
|
|
|
with gr.TabItem("Step 2: CogVideoX Refinement"): |
|
with gr.Row(): |
|
with gr.Column(): |
|
with gr.Row(): |
|
|
|
controlnet_weights_input = gr.Number(value=0.5, label="ControlNet Weights") |
|
controlnet_guidance_start_input = gr.Number(value=0.0, label="Guidance Start") |
|
controlnet_guidance_end_input = gr.Number(value=0.5, label="Guidance End") |
|
guidance_scale_input = gr.Number(value=6.0, label="Guidance Scale") |
|
inference_steps_input = gr.Number(value=50, label="Num Inference Steps") |
|
dtype_input = gr.Dropdown(choices=["float16", "bfloat16"], value="bfloat16", label="Compute Dtype") |
|
seed_input2 = gr.Number(value=42, label="Seed") |
|
height_input = gr.Number(value=480, label="Height") |
|
width_input = gr.Number(value=720, label="Width") |
|
num_frames_input2 = gr.Number(value=49, label="Num Frames") |
|
fps_input2 = gr.Number(value=24, label="FPS") |
|
downscale_coef_input = gr.Number(value=8, label="Downscale Coef") |
|
vae_channels_input = gr.Number(value=16, label="VAE Channels") |
|
controlnet_input_channels_input = gr.Number(value=6, label="ControlNet Input Channels") |
|
controlnet_layers_input = gr.Number(value=8, label="ControlNet Transformer Layers") |
|
with gr.Column(): |
|
step2_video = gr.Video(label="[Step 2] Final Refined Video") |
|
step2_button = gr.Button("▶️ Run Step 2") |
|
step2_logs = gr.Textbox(label="[Step 2] Logs") |
|
|
|
|
|
step1_button.click( |
|
get_anchor_video, |
|
inputs=[ |
|
video_input, fps_input, num_frames_input, pose_input, mode_input, |
|
radius_input, near_far_estimated, |
|
sampler_input, diff_guidance_input, diff_steps_input, |
|
prompt_input, neg_prompt_input, refine_prompt_input, |
|
depth_steps_input, depth_guidance_input, |
|
window_input, overlap_input, maxres_input, sample_size, |
|
seed_input, height, width, aspect_ratio_inputs, |
|
init_dx, init_dy, init_dz |
|
], |
|
outputs=[step1_video, step1_logs] |
|
) |
|
|
|
step2_button.click( |
|
inference, |
|
inputs=[ |
|
fps_input2, num_frames_input2, |
|
controlnet_weights_input, controlnet_guidance_start_input, |
|
controlnet_guidance_end_input, guidance_scale_input, |
|
inference_steps_input, dtype_input, seed_input2, |
|
height_input, width_input, downscale_coef_input, |
|
vae_channels_input, controlnet_input_channels_input, |
|
controlnet_layers_input |
|
], |
|
outputs=[step2_video, step2_logs] |
|
) |
|
if __name__ == "__main__": |
|
download_models() |
|
demo.launch(server_name="0.0.0.0", server_port=7860) |