File size: 6,049 Bytes
d2d7c02 2ae859b ea8870e 2ae859b 376cb79 2ae859b fcac2ca 2ae859b ea8870e fcac2ca 2ae859b 306187e 2ae859b 306187e 2ae859b ea8870e 2ae859b ea8870e 2ae859b d2d7c02 2ae859b d2d7c02 2ae859b ea8870e 2ae859b d2d7c02 ea8870e d2d7c02 1b37cad 2ae859b d2d7c02 2ae859b ea8870e 2ae859b 306187e 2ae859b 306187e 2ae859b d2d7c02 ea8870e d2d7c02 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
import os
import gradio as gr
import subprocess
import uuid
import shutil
import psutil
from huggingface_hub import snapshot_download
# ----------------------------------------
# Step 1: Download Model Weights
# ----------------------------------------
MODEL_REPO = "roll-ai/DOVE"
MODEL_PATH = "pretrained_models/DOVE"
if not os.path.exists(MODEL_PATH) or len(os.listdir(MODEL_PATH)) == 0:
print("🔽 Downloading model weights from Hugging Face Hub...")
snapshot_download(
repo_id=MODEL_REPO,
repo_type="dataset",
local_dir=MODEL_PATH,
local_dir_use_symlinks=False
)
print("✅ Download complete.")
print("\n📂 Directory structure after download:")
for root, dirs, files in os.walk(MODEL_PATH):
level = root.replace(MODEL_PATH, "").count(os.sep)
indent = " " * level
print(f"{indent}📁 {os.path.basename(root) or 'DOVE'}/")
subindent = " " * (level + 1)
for f in sorted(files):
print(f"{subindent}📄 {f}")
def list_downloaded_files(model_path):
lines = []
for root, dirs, files in os.walk(model_path):
level = root.replace(model_path, "").count(os.sep)
indent = " " * level
lines.append(f"{indent}📁 {os.path.basename(root) or 'DOVE'}/")
subindent = " " * (level + 1)
for f in sorted(files):
lines.append(f"{subindent}📄 {f}")
return "\n".join(lines)
# ----------------------------------------
# Step 2: Setup Directories
# ----------------------------------------
INFERENCE_SCRIPT = "inference_script.py"
OUTPUT_DIR = "results/DOVE/demo"
UPLOAD_DIR = "input_videos"
os.makedirs(UPLOAD_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
# ----------------------------------------
# Step 3: Inference Function
# ----------------------------------------
def run_inference(video_path, save_format, upscale):
input_name = f"{uuid.uuid4()}.mp4"
input_path = os.path.join(UPLOAD_DIR, input_name)
shutil.copy(video_path, input_path)
cmd = [
"python", INFERENCE_SCRIPT,
"--input_dir", UPLOAD_DIR,
"--model_path", MODEL_PATH,
"--output_path", OUTPUT_DIR,
"--is_vae_st",
"--save_format", save_format,
"--upscale", str(upscale) # ✅ add this
]
try:
inference_result = subprocess.run(
cmd, capture_output=True, text=True, check=True
)
print("📄 Inference stdout:\n", inference_result.stdout)
print("⚠️ Inference stderr:\n", inference_result.stderr)
except subprocess.CalledProcessError as e:
print("❌ Inference failed.")
print("⚠️ STDOUT:\n", e.stdout)
print("⚠️ STDERR:\n", e.stderr)
return f"Inference failed:\n{e.stderr}", None
mkv_path = os.path.join(OUTPUT_DIR, input_name).replace(".mp4", ".mkv")
mp4_path = os.path.join(OUTPUT_DIR, input_name)
if os.path.exists(mkv_path):
convert_cmd = [
"ffmpeg", "-y", "-i", mkv_path, "-c:v", "copy", "-c:a", "aac", mp4_path
]
try:
convert_result = subprocess.run(
convert_cmd, capture_output=True, text=True, check=True
)
print("🔄 FFmpeg stdout:\n", convert_result.stdout)
print("⚠️ FFmpeg stderr:\n", convert_result.stderr)
except subprocess.CalledProcessError as e:
print("❌ FFmpeg conversion failed.")
print("⚠️ STDOUT:\n", e.stdout)
print("⚠️ STDERR:\n", e.stderr)
return f"Inference OK, but conversion failed:\n{e.stderr}", None
if os.path.exists(mp4_path):
return "Inference successful!", mp4_path
else:
return "Output video not found.", None
# ----------------------------------------
# Step 4: System Resource Functions
# ----------------------------------------
def get_resources():
ram = psutil.virtual_memory().used / (1024**3)
disk = shutil.disk_usage('/').used / (1024**3)
total_disk = shutil.disk_usage('/').total / (1024**3)
cpu = psutil.cpu_percent()
return (
f"🧠 RAM Used: {ram:.2f} GB\n"
f"📀 Disk Used: {disk:.2f} GB / {total_disk:.2f} GB\n"
f"🖥️ CPU Usage: {cpu:.2f}%"
)
def get_gpu_info():
try:
result = subprocess.run(["nvidia-smi"], capture_output=True, text=True, check=True)
return result.stdout
except Exception as e:
return "❌ GPU not available or error:\n" + str(e)
# ----------------------------------------
# Step 5: Gradio UI
# ----------------------------------------
with gr.Blocks() as demo:
gr.Markdown("# 🎥 DOVE Video Super-Resolution & Restoration Demo")
gr.Markdown("⚙️ **Tip:** Default save format is `yuv444p`. If playback fails, try `yuv420p`.")
with gr.Row():
input_video = gr.Video(label="Upload input video")
output_video = gr.Video(label="Output video")
with gr.Row():
save_format = gr.Dropdown(
choices=["yuv444p", "yuv420p"],
value="yuv444p",
label="Save format"
)
upscale_input = gr.Number(value=4, label="Upscale Factor (e.g. 2, 4, 8)")
run_button = gr.Button("Run Inference")
status = gr.Textbox(label="Status")
run_button.click(
fn=run_inference,
inputs=[input_video, save_format, upscale_input], # ✅ added upscale_input
outputs=[status, output_video],
)
gr.Markdown("## 🧾 Downloaded Model Files")
gr.Textbox(value=list_downloaded_files(MODEL_PATH), label="Model Directory Tree", lines=20)
gr.Markdown("## 🧠 System Resources")
with gr.Row():
res_btn = gr.Button("Check CPU/RAM/Disk")
gpu_btn = gr.Button("Check GPU (if available)")
sys_info = gr.Textbox(label="CPU, RAM, Disk")
gpu_info = gr.Textbox(label="GPU Info", lines=10)
res_btn.click(fn=get_resources, inputs=[], outputs=sys_info)
gpu_btn.click(fn=get_gpu_info, inputs=[], outputs=gpu_info)
demo.launch()
|