mfarre's picture
mfarre HF staff
.
fc0912b
raw
history blame
13.1 kB
import os
import json
import gradio as gr
import tempfile
from PIL import Image, ImageDraw, ImageFont
import cv2
from typing import Tuple, Optional
import torch
from pathlib import Path
import time
import torch
import spaces
import os
from video_highlight_detector import (
load_model,
BatchedVideoHighlightDetector,
get_video_duration_seconds,
get_fixed_30s_segments
)
def load_examples(json_path: str) -> dict:
with open(json_path, 'r') as f:
return json.load(f)
def format_duration(seconds: int) -> str:
hours = seconds // 3600
minutes = (seconds % 3600) // 60
secs = seconds % 60
if hours > 0:
return f"{hours}:{minutes:02d}:{secs:02d}"
return f"{minutes}:{secs:02d}"
def create_ui(examples_path: str):
examples_data = load_examples(examples_path)
with gr.Blocks() as app:
gr.Markdown("# Video Highlight Generator")
gr.Markdown("Upload a video and get an automated highlight reel!")
with gr.Row():
gr.Markdown("## Example Results")
with gr.Row():
for example in examples_data["examples"]:
with gr.Column():
gr.Video(
value=example["original"]["url"],
label=f"Original ({format_duration(example['original']['duration_seconds'])})",
interactive=False
)
gr.Markdown(f"### {example['title']}")
with gr.Column():
gr.Video(
value=example["highlights"]["url"],
label=f"Highlights ({format_duration(example['highlights']['duration_seconds'])})",
interactive=False
)
with gr.Accordion("Model chain of thought details", open=False):
gr.Markdown(f"#Summary: {example['analysis']['video_description']}")
gr.Markdown(f"#Highlights to search for: {example['analysis']['highlight_types']}")
gr.Markdown("## Try It Yourself!")
with gr.Row():
with gr.Column(scale=1):
input_video = gr.Video(
label="Upload your video (max 20 minutes)",
interactive=True
)
process_btn = gr.Button("Process Video", variant="primary")
with gr.Column(scale=1):
output_video = gr.Video(
label="Highlight Video",
visible=False,
interactive=False,
)
status = gr.Markdown()
analysis_accordion = gr.Accordion(
"Model chain of thought details",
open=True,
visible=False
)
with analysis_accordion:
video_description = gr.Markdown("", elem_id="video_desc")
highlight_types = gr.Markdown("", elem_id="highlight_types")
@spaces.GPU
def on_process(video):
if not video:
yield [
"Please upload a video",
"",
"",
gr.update(visible=False),
gr.update(visible=False)
]
return
try:
duration = get_video_duration_seconds(video)
if duration > 1200: # 20 minutes
yield [
"Video must be shorter than 20 minutes",
"",
"",
gr.update(visible=False),
gr.update(visible=False)
]
return
# Make accordion visible as soon as processing starts
yield [
"Loading model...",
"",
"",
gr.update(visible=False),
gr.update(visible=True)
]
model, processor = load_model()
detector = BatchedVideoHighlightDetector(
model,
processor,
batch_size=8
)
yield [
"Analyzing video content...",
"",
"",
gr.update(visible=False),
gr.update(visible=True)
]
video_desc = detector.analyze_video_content(video)
formatted_desc = f"#Summary: {video_desc[:500] + '...' if len(video_desc) > 500 else video_desc}"
yield [
"Determining highlight types...",
formatted_desc,
"",
gr.update(visible=False),
gr.update(visible=True)
]
highlights = detector.determine_highlights(video_desc)
formatted_highlights = f"#Highlights to search for: {highlights[:500] + '...' if len(highlights) > 500 else highlights}"
# Get all segments
segments = get_fixed_30s_segments(video)
total_segments = len(segments)
kept_segments = []
# Process segments in batches with direct UI updates
for i in range(0, len(segments), detector.batch_size):
batch_segments = segments[i:i + detector.batch_size]
# Update progress
progress = int((i / total_segments) * 100)
yield [
f"Processing segments... {progress}% complete",
formatted_desc,
formatted_highlights,
gr.update(visible=False),
gr.update(visible=True)
]
# Process batch
keep_flags = detector._process_segment_batch(
video_path=video,
segments=batch_segments,
highlight_types=highlights,
total_segments=total_segments,
segments_processed=i
)
# Keep track of segments to include
for segment, keep in zip(batch_segments, keep_flags):
if keep:
kept_segments.append(segment)
# Create final video
with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as tmp_file:
temp_output = tmp_file.name
detector._concatenate_scenes(video, kept_segments, temp_output)
yield [
"Processing complete!",
formatted_desc,
formatted_highlights,
gr.update(value=temp_output, visible=True),
gr.update(visible=True)
]
except Exception as e:
yield [
f"Error processing video: {str(e)}",
"",
"",
gr.update(visible=False),
gr.update(visible=False)
]
process_btn.click(
on_process,
inputs=[input_video],
outputs=[
status,
video_description,
highlight_types,
output_video,
analysis_accordion
],
queue=True,
)
return app
# gr.Markdown("## Try It Yourself!")
# with gr.Row():
# with gr.Column(scale=1):
# input_video = gr.Video(
# label="Upload your video (max 20 minutes)",
# interactive=True
# )
# process_btn = gr.Button("Process Video", variant="primary")
# with gr.Column(scale=1):
# output_video = gr.Video(
# label="Highlight Video",
# visible=False,
# interactive=False,
# )
# status = gr.Markdown()
# analysis_accordion = gr.Accordion(
# "Model chain of thought details",
# open=True,
# visible=False
# )
# with analysis_accordion:
# video_description = gr.Markdown("", elem_id="video_desc")
# highlight_types = gr.Markdown("", elem_id="highlight_types")
# @spaces.GPU
# def on_process(video):
# if not video:
# yield [
# "Please upload a video", # status
# "", # video_description
# "", # highlight_types
# gr.update(visible=False), # output_video
# gr.update(visible=False) # analysis_accordion
# ]
# return
# try:
# duration = get_video_duration_seconds(video)
# if duration > 1200: # 20 minutes
# yield [
# "Video must be shorter than 20 minutes",
# "",
# "",
# gr.update(visible=False),
# gr.update(visible=False)
# ]
# return
# # Make accordion visible as soon as processing starts
# yield [
# "Loading model...",
# "",
# "",
# gr.update(visible=False),
# gr.update(visible=True)
# ]
# model, processor = load_model()
# detector = BatchedVideoHighlightDetector(model, processor, batch_size=8)
# yield [
# "Analyzing video content...",
# "",
# "",
# gr.update(visible=False),
# gr.update(visible=True)
# ]
# video_desc = detector.analyze_video_content(video)
# formatted_desc = f"#Summary: {video_desc[:500] + '...' if len(video_desc) > 500 else video_desc}"
# # Update description as soon as it's available
# yield [
# "Determining highlight types...",
# formatted_desc,
# "",
# gr.update(visible=False),
# gr.update(visible=True)
# ]
# highlights = detector.determine_highlights(video_desc)
# formatted_highlights = f"#Highlights to search for: {highlights[:500] + '...' if len(highlights) > 500 else highlights}"
# # Update highlights as soon as they're available
# yield [
# "Detecting and extracting highlights...",
# formatted_desc,
# formatted_highlights,
# gr.update(visible=False),
# gr.update(visible=True)
# ]
# with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as tmp_file:
# temp_output = tmp_file.name
# detector.create_highlight_video(video, temp_output)
# yield [
# "Processing complete!",
# formatted_desc,
# formatted_highlights,
# gr.update(value=temp_output, visible=True),
# gr.update(visible=True)
# ]
# except Exception as e:
# yield [
# f"Error processing video: {str(e)}",
# "",
# "",
# gr.update(visible=False),
# gr.update(visible=False)
# ]
# process_btn.click(
# on_process,
# inputs=[input_video],
# outputs=[
# status,
# video_description,
# highlight_types,
# output_video,
# analysis_accordion
# ],
# queue=True,
# )
# return app
if __name__ == "__main__":
# Initialize CUDA
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
zero = torch.Tensor([0]).to(device)
app = create_ui("video_spec.json")
app.launch()