|
import os |
|
import gradio as gr |
|
import subprocess |
|
import uuid |
|
import shutil |
|
import psutil |
|
from huggingface_hub import snapshot_download |
|
|
|
|
|
|
|
|
|
MODEL_REPO = "roll-ai/DOVE" |
|
MODEL_PATH = "pretrained_models/DOVE" |
|
|
|
if not os.path.exists(MODEL_PATH) or len(os.listdir(MODEL_PATH)) == 0: |
|
print("🔽 Downloading model weights from Hugging Face Hub...") |
|
snapshot_download( |
|
repo_id=MODEL_REPO, |
|
repo_type="dataset", |
|
local_dir=MODEL_PATH, |
|
local_dir_use_symlinks=False |
|
) |
|
print("✅ Download complete.") |
|
|
|
print("\n📂 Directory structure after download:") |
|
for root, dirs, files in os.walk(MODEL_PATH): |
|
level = root.replace(MODEL_PATH, "").count(os.sep) |
|
indent = " " * level |
|
print(f"{indent}📁 {os.path.basename(root) or 'DOVE'}/") |
|
subindent = " " * (level + 1) |
|
for f in sorted(files): |
|
print(f"{subindent}📄 {f}") |
|
|
|
def list_downloaded_files(model_path): |
|
lines = [] |
|
for root, dirs, files in os.walk(model_path): |
|
level = root.replace(model_path, "").count(os.sep) |
|
indent = " " * level |
|
lines.append(f"{indent}📁 {os.path.basename(root) or 'DOVE'}/") |
|
subindent = " " * (level + 1) |
|
for f in sorted(files): |
|
lines.append(f"{subindent}📄 {f}") |
|
return "\n".join(lines) |
|
|
|
|
|
|
|
|
|
INFERENCE_SCRIPT = "inference_script.py" |
|
OUTPUT_DIR = "results/DOVE/demo" |
|
UPLOAD_DIR = "input_videos" |
|
|
|
os.makedirs(UPLOAD_DIR, exist_ok=True) |
|
os.makedirs(OUTPUT_DIR, exist_ok=True) |
|
|
|
|
|
|
|
|
|
def run_inference(video_path, save_format, upscale): |
|
input_name = f"{uuid.uuid4()}.mp4" |
|
input_path = os.path.join(UPLOAD_DIR, input_name) |
|
shutil.copy(video_path, input_path) |
|
|
|
cmd = [ |
|
"python", INFERENCE_SCRIPT, |
|
"--input_dir", UPLOAD_DIR, |
|
"--model_path", MODEL_PATH, |
|
"--output_path", OUTPUT_DIR, |
|
"--is_vae_st", |
|
"--save_format", save_format, |
|
"--upscale", str(upscale) |
|
] |
|
|
|
try: |
|
inference_result = subprocess.run( |
|
cmd, capture_output=True, text=True, check=True |
|
) |
|
print("📄 Inference stdout:\n", inference_result.stdout) |
|
print("⚠️ Inference stderr:\n", inference_result.stderr) |
|
except subprocess.CalledProcessError as e: |
|
print("❌ Inference failed.") |
|
print("⚠️ STDOUT:\n", e.stdout) |
|
print("⚠️ STDERR:\n", e.stderr) |
|
return f"Inference failed:\n{e.stderr}", None |
|
|
|
mkv_path = os.path.join(OUTPUT_DIR, input_name).replace(".mp4", ".mkv") |
|
mp4_path = os.path.join(OUTPUT_DIR, input_name) |
|
|
|
if os.path.exists(mkv_path): |
|
convert_cmd = [ |
|
"ffmpeg", "-y", "-i", mkv_path, "-c:v", "copy", "-c:a", "aac", mp4_path |
|
] |
|
try: |
|
convert_result = subprocess.run( |
|
convert_cmd, capture_output=True, text=True, check=True |
|
) |
|
print("🔄 FFmpeg stdout:\n", convert_result.stdout) |
|
print("⚠️ FFmpeg stderr:\n", convert_result.stderr) |
|
except subprocess.CalledProcessError as e: |
|
print("❌ FFmpeg conversion failed.") |
|
print("⚠️ STDOUT:\n", e.stdout) |
|
print("⚠️ STDERR:\n", e.stderr) |
|
return f"Inference OK, but conversion failed:\n{e.stderr}", None |
|
|
|
if os.path.exists(mp4_path): |
|
return "Inference successful!", mp4_path |
|
else: |
|
return "Output video not found.", None |
|
|
|
|
|
|
|
|
|
def get_resources(): |
|
ram = psutil.virtual_memory().used / (1024**3) |
|
disk = shutil.disk_usage('/').used / (1024**3) |
|
total_disk = shutil.disk_usage('/').total / (1024**3) |
|
cpu = psutil.cpu_percent() |
|
return ( |
|
f"🧠 RAM Used: {ram:.2f} GB\n" |
|
f"📀 Disk Used: {disk:.2f} GB / {total_disk:.2f} GB\n" |
|
f"🖥️ CPU Usage: {cpu:.2f}%" |
|
) |
|
|
|
def get_gpu_info(): |
|
try: |
|
result = subprocess.run(["nvidia-smi"], capture_output=True, text=True, check=True) |
|
return result.stdout |
|
except Exception as e: |
|
return "❌ GPU not available or error:\n" + str(e) |
|
|
|
|
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("# 🎥 DOVE Video Super-Resolution & Restoration Demo") |
|
gr.Markdown("⚙️ **Tip:** Default save format is `yuv444p`. If playback fails, try `yuv420p`.") |
|
|
|
with gr.Row(): |
|
input_video = gr.Video(label="Upload input video") |
|
output_video = gr.Video(label="Output video") |
|
|
|
with gr.Row(): |
|
save_format = gr.Dropdown( |
|
choices=["yuv444p", "yuv420p"], |
|
value="yuv444p", |
|
label="Save format" |
|
) |
|
upscale_input = gr.Number(value=4, label="Upscale Factor (e.g. 2, 4, 8)") |
|
|
|
run_button = gr.Button("Run Inference") |
|
status = gr.Textbox(label="Status") |
|
|
|
run_button.click( |
|
fn=run_inference, |
|
inputs=[input_video, save_format, upscale_input], |
|
outputs=[status, output_video], |
|
) |
|
|
|
gr.Markdown("## 🧾 Downloaded Model Files") |
|
gr.Textbox(value=list_downloaded_files(MODEL_PATH), label="Model Directory Tree", lines=20) |
|
|
|
gr.Markdown("## 🧠 System Resources") |
|
|
|
with gr.Row(): |
|
res_btn = gr.Button("Check CPU/RAM/Disk") |
|
gpu_btn = gr.Button("Check GPU (if available)") |
|
|
|
sys_info = gr.Textbox(label="CPU, RAM, Disk") |
|
gpu_info = gr.Textbox(label="GPU Info", lines=10) |
|
|
|
res_btn.click(fn=get_resources, inputs=[], outputs=sys_info) |
|
gpu_btn.click(fn=get_gpu_info, inputs=[], outputs=gpu_info) |
|
|
|
demo.launch() |
|
|