|
import os |
|
import json |
|
import gradio as gr |
|
import tempfile |
|
from PIL import Image, ImageDraw, ImageFont |
|
import cv2 |
|
from typing import Tuple, Optional |
|
import torch |
|
from pathlib import Path |
|
import time |
|
import torch |
|
import spaces |
|
import os |
|
|
|
|
|
from video_highlight_detector import ( |
|
load_model, |
|
BatchedVideoHighlightDetector, |
|
get_video_duration_seconds, |
|
get_fixed_30s_segments |
|
) |
|
|
|
def load_examples(json_path: str) -> dict: |
|
with open(json_path, 'r') as f: |
|
return json.load(f) |
|
|
|
def format_duration(seconds: int) -> str: |
|
hours = seconds // 3600 |
|
minutes = (seconds % 3600) // 60 |
|
secs = seconds % 60 |
|
if hours > 0: |
|
return f"{hours}:{minutes:02d}:{secs:02d}" |
|
return f"{minutes}:{secs:02d}" |
|
|
|
|
|
def create_ui(examples_path: str): |
|
examples_data = load_examples(examples_path) |
|
|
|
with gr.Blocks() as app: |
|
gr.Markdown("# Video Highlight Generator") |
|
gr.Markdown("Upload a video and get an automated highlight reel!") |
|
|
|
with gr.Row(): |
|
gr.Markdown("## Example Results") |
|
|
|
with gr.Row(): |
|
for example in examples_data["examples"]: |
|
with gr.Column(): |
|
gr.Video( |
|
value=example["original"]["url"], |
|
label=f"Original ({format_duration(example['original']['duration_seconds'])})", |
|
interactive=False |
|
) |
|
gr.Markdown(f"### {example['title']}") |
|
|
|
with gr.Column(): |
|
|
|
gr.Video( |
|
value=example["highlights"]["url"], |
|
label=f"Highlights ({format_duration(example['highlights']['duration_seconds'])})", |
|
interactive=False |
|
) |
|
with gr.Accordion("Model chain of thought details", open=False): |
|
gr.Markdown(f"#Summary: {example['analysis']['video_description']}") |
|
gr.Markdown(f"#Highlights to search for: {example['analysis']['highlight_types']}") |
|
|
|
gr.Markdown("## Try It Yourself!") |
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
input_video = gr.Video( |
|
label="Upload your video (max 20 minutes)", |
|
interactive=True |
|
) |
|
process_btn = gr.Button("Process Video", variant="primary") |
|
|
|
with gr.Column(scale=1): |
|
output_video = gr.Video( |
|
label="Highlight Video", |
|
visible=False, |
|
interactive=False, |
|
) |
|
|
|
status = gr.Markdown() |
|
|
|
analysis_accordion = gr.Accordion( |
|
"Model chain of thought details", |
|
open=True, |
|
visible=False |
|
) |
|
|
|
with analysis_accordion: |
|
video_description = gr.Markdown("", elem_id="video_desc") |
|
highlight_types = gr.Markdown("", elem_id="highlight_types") |
|
|
|
@spaces.GPU |
|
def on_process(video): |
|
if not video: |
|
yield [ |
|
"Please upload a video", |
|
"", |
|
"", |
|
gr.update(visible=False), |
|
gr.update(visible=False) |
|
] |
|
return |
|
|
|
try: |
|
duration = get_video_duration_seconds(video) |
|
if duration > 1200: |
|
yield [ |
|
"Video must be shorter than 20 minutes", |
|
"", |
|
"", |
|
gr.update(visible=False), |
|
gr.update(visible=False) |
|
] |
|
return |
|
|
|
|
|
yield [ |
|
"Loading model...", |
|
"", |
|
"", |
|
gr.update(visible=False), |
|
gr.update(visible=True) |
|
] |
|
|
|
model, processor = load_model() |
|
detector = BatchedVideoHighlightDetector( |
|
model, |
|
processor, |
|
batch_size=8 |
|
) |
|
|
|
yield [ |
|
"Analyzing video content...", |
|
"", |
|
"", |
|
gr.update(visible=False), |
|
gr.update(visible=True) |
|
] |
|
|
|
video_desc = detector.analyze_video_content(video) |
|
formatted_desc = f"#Summary: {video_desc[:500] + '...' if len(video_desc) > 500 else video_desc}" |
|
|
|
yield [ |
|
"Determining highlight types...", |
|
formatted_desc, |
|
"", |
|
gr.update(visible=False), |
|
gr.update(visible=True) |
|
] |
|
|
|
highlights = detector.determine_highlights(video_desc) |
|
formatted_highlights = f"#Highlights to search for: {highlights[:500] + '...' if len(highlights) > 500 else highlights}" |
|
|
|
|
|
segments = get_fixed_30s_segments(video) |
|
total_segments = len(segments) |
|
kept_segments = [] |
|
|
|
|
|
for i in range(0, len(segments), detector.batch_size): |
|
batch_segments = segments[i:i + detector.batch_size] |
|
|
|
|
|
progress = int((i / total_segments) * 100) |
|
yield [ |
|
f"Processing segments... {progress}% complete", |
|
formatted_desc, |
|
formatted_highlights, |
|
gr.update(visible=False), |
|
gr.update(visible=True) |
|
] |
|
|
|
|
|
keep_flags = detector._process_segment_batch( |
|
video_path=video, |
|
segments=batch_segments, |
|
highlight_types=highlights, |
|
total_segments=total_segments, |
|
segments_processed=i |
|
) |
|
|
|
|
|
for segment, keep in zip(batch_segments, keep_flags): |
|
if keep: |
|
kept_segments.append(segment) |
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as tmp_file: |
|
temp_output = tmp_file.name |
|
detector._concatenate_scenes(video, kept_segments, temp_output) |
|
|
|
yield [ |
|
"Processing complete!", |
|
formatted_desc, |
|
formatted_highlights, |
|
gr.update(value=temp_output, visible=True), |
|
gr.update(visible=True) |
|
] |
|
|
|
except Exception as e: |
|
yield [ |
|
f"Error processing video: {str(e)}", |
|
"", |
|
"", |
|
gr.update(visible=False), |
|
gr.update(visible=False) |
|
] |
|
|
|
process_btn.click( |
|
on_process, |
|
inputs=[input_video], |
|
outputs=[ |
|
status, |
|
video_description, |
|
highlight_types, |
|
output_video, |
|
analysis_accordion |
|
], |
|
queue=True, |
|
) |
|
|
|
return app |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
|
zero = torch.Tensor([0]).to(device) |
|
|
|
app = create_ui("video_spec.json") |
|
app.launch() |