Krokodilpirat's picture
Update app.py
213db45 verified
raw
history blame
15.8 kB
import os
# Set cache directories to writable locations right at the beginning
os.environ["HF_HOME"] = "/tmp/huggingface"
os.environ["TRANSFORMERS_CACHE"] = "/tmp/huggingface/transformers"
os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib"
# Patching the schema handling problem in Gradio 5.x
# This needs to be done before any Gradio imports
import sys
def patch_gradio_utils():
try:
from gradio_client import utils
original_get_type = utils.get_type
def patched_get_type(schema):
if isinstance(schema, bool):
return "boolean"
if not isinstance(schema, dict):
return "any"
return original_get_type(schema)
utils.get_type = patched_get_type
print("Successfully patched Gradio utils.get_type")
except Exception as e:
print(f"Could not patch Gradio utils: {e}")
patch_gradio_utils()
import gc
import torch
import cv2
import gradio as gr
print("📦 Gradio version:", gr.__version__)
import numpy as np
import matplotlib.cm as cm
import matplotlib # New import for the updated colormap API
import subprocess
from video_depth_anything.video_depth import VideoDepthAnything
from utils.dc_utils import read_video_frames, save_video
from huggingface_hub import hf_hub_download
# Use GPU if available; otherwise, use CPU.
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
# Model configuration for different encoder variants.
model_configs = {
'vits': {'encoder': 'vits', 'features': 64, 'out_channels': [48, 96, 192, 384]},
'vitl': {'encoder': 'vitl', 'features': 256, 'out_channels': [256, 512, 1024, 1024]},
}
encoder2name = {
'vits': 'Small',
'vitl': 'Large',
}
encoder = 'vitl'
model_name = encoder2name[encoder]
# Initialize the model.
video_depth_anything = VideoDepthAnything(**model_configs[encoder])
filepath = hf_hub_download(
repo_id=f"depth-anything/Video-Depth-Anything-{model_name}",
filename=f"video_depth_anything_{encoder}.pth",
repo_type="model",
cache_dir="/tmp/huggingface" # Explizites Setzen des Cache-Verzeichnisses
)
video_depth_anything.load_state_dict(torch.load(filepath, map_location='cpu'))
video_depth_anything = video_depth_anything.to(DEVICE).eval()
title = "# Video Depth Anything + RGBD sbs output"
description = """Official demo for **Video Depth Anything** + RGBD sbs output for viewing with Looking Glass Factory displays.
Please refer to our [paper](https://arxiv.org/abs/2501.12375), [project page](https://videodepthanything.github.io/), and [github](https://github.com/DepthAnything/Video-Depth-Anything) for more details."""
def infer_video_depth(
input_video: str,
max_len: int = -1,
target_fps: int = -1,
max_res: int = 1280,
stitch: bool = True,
grayscale: bool = True,
convert_from_color: bool = True,
blur: float = 0.3,
loop_factor: int = 1, # Neuer Parameter
output_dir: str = './outputs',
input_size: int = 518,
):
# 1. Read input video frames for inference (downscaled to max_res).
frames, target_fps = read_video_frames(input_video, max_len, target_fps, max_res)
# 2. Perform depth inference using the model.
depths, fps = video_depth_anything.infer_video_depth(frames, target_fps, input_size=input_size, device=DEVICE)
video_name = os.path.basename(input_video)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# Save the preprocessed (RGB) video and the generated depth visualization.
# Still process the video, but we won't display it in the UI
processed_video_path = os.path.join(output_dir, os.path.splitext(video_name)[0] + '_src.mp4')
depth_vis_path = os.path.join(output_dir, os.path.splitext(video_name)[0] + '_vis.mp4')
save_video(frames, processed_video_path, fps=fps)
save_video(depths, depth_vis_path, fps=fps, is_depths=True)
stitched_video_path = None
if stitch:
# For stitching: read the original video in full resolution (without downscaling).
full_frames, _ = read_video_frames(input_video, max_len, target_fps, max_res=-1)
# For each frame, create a visual depth image from the inferenced depths.
d_min, d_max = depths.min(), depths.max()
stitched_frames = []
for i in range(min(len(full_frames), len(depths))):
rgb_full = full_frames[i] # Full-resolution RGB frame.
depth_frame = depths[i]
# Normalize the depth frame to the range [0, 255].
depth_norm = ((depth_frame - d_min) / (d_max - d_min) * 255).astype(np.uint8)
# Generate depth visualization:
if grayscale:
if convert_from_color:
# First, generate a color depth image using the inferno colormap,
# then convert that color image to grayscale.
cmap = matplotlib.colormaps.get_cmap("inferno")
depth_color = (cmap(depth_norm / 255.0)[..., :3] * 255).astype(np.uint8)
depth_gray = cv2.cvtColor(depth_color, cv2.COLOR_RGB2GRAY)
depth_vis = np.stack([depth_gray] * 3, axis=-1)
else:
# Directly generate a grayscale image from the normalized depth values.
depth_vis = np.stack([depth_norm] * 3, axis=-1)
else:
# Generate a color depth image using the inferno colormap.
cmap = matplotlib.colormaps.get_cmap("inferno")
depth_vis = (cmap(depth_norm / 255.0)[..., :3] * 255).astype(np.uint8)
# Apply Gaussian blur if requested.
if blur > 0:
kernel_size = int(blur * 20) * 2 + 1 # Ensures an odd kernel size.
depth_vis = cv2.GaussianBlur(depth_vis, (kernel_size, kernel_size), 0)
# Resize the depth visualization to match the full-resolution RGB frame.
H_full, W_full = rgb_full.shape[:2]
depth_vis_resized = cv2.resize(depth_vis, (W_full, H_full))
# Concatenate the full-resolution RGB frame (left) and the resized depth visualization (right).
stitched = cv2.hconcat([rgb_full, depth_vis_resized])
stitched_frames.append(stitched)
stitched_frames = np.array(stitched_frames)
# Use only the first 20 characters of the base name for the output filename and append '_RGBD.mp4'
base_name = os.path.splitext(video_name)[0]
short_name = base_name[:20]
stitched_video_path = os.path.join(output_dir, short_name + '_RGBD.mp4')
save_video(stitched_frames, stitched_video_path, fps=fps)
# Merge audio from the input video into the stitched video using ffmpeg.
temp_audio_path = stitched_video_path.replace('_RGBD.mp4', '_RGBD_audio.mp4')
cmd = [
"ffmpeg",
"-y",
"-i", stitched_video_path,
"-i", input_video,
"-c:v", "copy",
"-c:a", "aac",
"-map", "0:v:0",
"-map", "1:a:0?",
"-shortest",
temp_audio_path
]
subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
os.replace(temp_audio_path, stitched_video_path)
# Nachdem die Videos erstellt wurden, wenden wir den Loop-Faktor an
if loop_factor > 1:
depth_looped_path = os.path.join(output_dir, os.path.splitext(os.path.basename(depth_vis_path))[0] + f'_loop{loop_factor}.mp4')
# Erstelle eine temporäre Textdatei mit der Liste der zu wiederholenden Dateien
concat_file_path = os.path.join(output_dir, 'concat_list.txt')
with open(concat_file_path, 'w') as f:
for _ in range(loop_factor):
f.write(f"file '{depth_vis_path}'\n")
# Verwende ffmpeg, um das Video zu wiederholen ohne Neucodierung
cmd = [
"ffmpeg",
"-y",
"-f", "concat",
"-safe", "0",
"-i", concat_file_path,
"-c", "copy",
depth_looped_path
]
subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Ersetze den ursprünglichen Pfad durch den neuen geloopten Pfad
depth_vis_path = depth_looped_path
if stitch and stitched_video_path:
# Speichern wir den Originalnamen
original_path = stitched_video_path
# Überprüfen wir, ob das Input-Video einen Audio-Stream hat
has_audio = False
check_audio_cmd = [
"ffmpeg",
"-i", input_video,
"-c", "copy",
"-f", "null",
"-"
]
result = subprocess.run(check_audio_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stderr = result.stderr.decode('utf-8')
if "Audio" in stderr:
has_audio = True
# Erstelle eine temporäre Textdatei für die stitched Videos
concat_stitched_file_path = os.path.join(output_dir, 'concat_stitched_list.txt')
with open(concat_stitched_file_path, 'w') as f:
for _ in range(loop_factor):
f.write(f"file '{original_path}'\n")
# Temporärer Pfad für das geloopte Video ohne Audio
temp_looped_path = os.path.join(output_dir, 'temp_looped_rgbd.mp4')
# Verwende ffmpeg, um das Video zu loopen
concat_cmd = [
"ffmpeg",
"-y",
"-f", "concat",
"-safe", "0",
"-i", concat_stitched_file_path,
"-c", "copy",
temp_looped_path
]
subprocess.run(concat_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Wenn Audio vorhanden ist, müssen wir es separat behandeln
if has_audio:
# Extrahiere den Audio-Track aus dem originalen Input-Video
# Dies ist die sauberste Quelle
audio_path = os.path.join(output_dir, 'extracted_audio.aac')
extract_audio_cmd = [
"ffmpeg",
"-y",
"-i", input_video, # Original Input-Video verwenden
"-vn", "-acodec", "copy",
audio_path
]
subprocess.run(extract_audio_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Erstelle eine Textdatei für das Audio-Looping
concat_audio_file_path = os.path.join(output_dir, 'concat_audio_list.txt')
with open(concat_audio_file_path, 'w') as f:
for _ in range(loop_factor):
f.write(f"file '{audio_path}'\n")
# Erstelle den geloopten Audio-Track
looped_audio_path = os.path.join(output_dir, 'looped_audio.aac')
audio_loop_cmd = [
"ffmpeg",
"-y",
"-f", "concat",
"-safe", "0",
"-i", concat_audio_file_path,
"-c", "copy",
looped_audio_path
]
subprocess.run(audio_loop_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Kombiniere das geloopte Video mit dem geloopten Audio
final_cmd = [
"ffmpeg",
"-y",
"-i", temp_looped_path,
"-i", looped_audio_path,
"-c:v", "copy",
"-c:a", "aac",
"-map", "0:v:0",
"-map", "1:a:0",
"-shortest",
original_path # Verwenden des originalen Pfads als Ziel
]
subprocess.run(final_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
else:
# Wenn kein Audio vorhanden ist, einfach das Video umbenennen
os.replace(temp_looped_path, original_path)
# Bereinige temporäre Dateien
temp_files = [concat_file_path, concat_stitched_file_path]
if has_audio:
temp_files.extend([concat_audio_file_path, audio_path, looped_audio_path])
if os.path.exists(temp_looped_path):
temp_files.append(temp_looped_path)
for file_path in temp_files:
if os.path.exists(file_path):
try:
os.remove(file_path)
except:
pass
gc.collect()
torch.cuda.empty_cache()
# Only return the depth visualization and stitched video (not the preprocessed video)
return [depth_vis_path, stitched_video_path]
def construct_demo():
with gr.Blocks(analytics_enabled=False) as demo:
gr.Markdown(title)
gr.Markdown(description)
gr.Markdown("### If you find this work useful, please help ⭐ the [Github Repo](https://github.com/DepthAnything/Video-Depth-Anything). Thanks for your attention!")
with gr.Row(equal_height=True):
with gr.Column(scale=1):
# Video input component for file upload.
input_video = gr.Video(label="Input Video")
with gr.Column(scale=2):
with gr.Row(equal_height=True):
# Removed the processed_video component from the UI
depth_vis_video = gr.Video(label="Generated Depth Video", interactive=False, autoplay=True, show_share_button=True, scale=5)
stitched_video = gr.Video(label="Stitched RGBD Video", interactive=False, autoplay=True, show_share_button=True, scale=5)
with gr.Row(equal_height=True):
with gr.Column(scale=1):
with gr.Accordion("Advanced Settings", open=False):
max_len = gr.Slider(label="Max process length", minimum=-1, maximum=1000, value=-1, step=1)
target_fps = gr.Slider(label="Target FPS", minimum=-1, maximum=30, value=-1, step=1)
max_res = gr.Slider(label="Max side resolution", minimum=480, maximum=1920, value=1280, step=1)
stitch_option = gr.Checkbox(label="Stitch RGB & Depth Videos", value=True)
grayscale_option = gr.Checkbox(label="Output Depth as Grayscale", value=True)
convert_from_color_option = gr.Checkbox(label="Convert Grayscale from Color", value=True)
blur_slider = gr.Slider(minimum=0, maximum=1, step=0.01, label="Depth Blur (can reduce edge artifacts on display)", value=0.3)
# Füge den Loop-Faktor Slider hinzu
loop_factor = gr.Slider(label="Loop Factor (repeats the output video)", minimum=1, maximum=20, value=1, step=1)
generate_btn = gr.Button("Generate")
with gr.Column(scale=2):
pass
# Removed Examples block to improve loading time
generate_btn.click(
fn=infer_video_depth,
inputs=[input_video, max_len, target_fps, max_res, stitch_option, grayscale_option, convert_from_color_option, blur_slider, loop_factor], # loop_factor hinzugefügt
outputs=[depth_vis_video, stitched_video],
)
return demo
if __name__ == "__main__":
demo = construct_demo()
demo.queue() # Enable asynchronous processing.
demo.launch(share=True, show_api=False)