Spaces:
Sleeping
Sleeping
Update video_processing.py
Browse files- video_processing.py +20 -53
video_processing.py
CHANGED
@@ -1,8 +1,6 @@
|
|
1 |
import os
|
2 |
import cv2
|
3 |
-
from scenedetect import SceneManager, open_video, split_video_ffmpeg
|
4 |
from scenedetect import VideoManager, SceneManager
|
5 |
-
|
6 |
from scenedetect.detectors import ContentDetector
|
7 |
from moviepy.editor import VideoFileClip
|
8 |
from transformers import CLIPProcessor, CLIPModel
|
@@ -10,7 +8,6 @@ import torch
|
|
10 |
import yt_dlp
|
11 |
from PIL import Image
|
12 |
import uuid
|
13 |
-
import subprocess
|
14 |
|
15 |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
16 |
model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(device)
|
@@ -33,56 +30,28 @@ def download_video(url):
|
|
33 |
def sanitize_filename(filename):
|
34 |
return "".join([c if c.isalnum() or c in " .-_()" else "_" for c in filename])
|
35 |
|
36 |
-
def ensure_video_format(video_path):
|
37 |
-
output_dir = "temp_videos"
|
38 |
-
os.makedirs(output_dir, exist_ok=True)
|
39 |
-
temp_path = os.path.join(output_dir, f"formatted_{uuid.uuid4()}.mp4")
|
40 |
-
command = ['ffmpeg', '-i', video_path, '-c', 'copy', temp_path]
|
41 |
-
try:
|
42 |
-
subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
43 |
-
return temp_path
|
44 |
-
except subprocess.CalledProcessError as e:
|
45 |
-
print(f"Error processing video with ffmpeg: {e.stderr.decode()}")
|
46 |
-
return None
|
47 |
-
|
48 |
def find_scenes(video_path):
|
49 |
-
# Ensure video path is a list, as required by VideoManager
|
50 |
video_manager = VideoManager([video_path])
|
51 |
scene_manager = SceneManager()
|
52 |
-
|
53 |
-
|
54 |
-
scene_manager.add_detector(ContentDetector(threshold=33))
|
55 |
-
|
56 |
-
# Begin processing the video
|
57 |
video_manager.start()
|
58 |
-
|
59 |
-
# Detect scenes
|
60 |
scene_manager.detect_scenes(frame_source=video_manager)
|
61 |
-
|
62 |
-
# Get the list of detected scenes
|
63 |
scene_list = scene_manager.get_scene_list()
|
64 |
-
|
65 |
-
# Release the video manager resources
|
66 |
video_manager.release()
|
67 |
-
|
68 |
-
# Convert scene list to timecodes
|
69 |
scenes = [(start.get_timecode(), end.get_timecode()) for start, end in scene_list]
|
70 |
-
|
71 |
return scenes
|
72 |
|
73 |
-
|
74 |
-
|
75 |
def convert_timestamp_to_seconds(timestamp):
|
76 |
-
|
77 |
-
|
78 |
-
def timecode_to_seconds(timecode):
|
79 |
-
h, m, s = timecode.split(':')
|
80 |
-
return int(h) * 3600 + int(m) * 60 + float(s)
|
81 |
|
82 |
-
|
83 |
def extract_frames(video_path, start_time, end_time):
|
84 |
frames = []
|
85 |
-
|
|
|
|
|
|
|
86 |
for frame_time in range(0, int(video_clip.duration * video_clip.fps), int(video_clip.fps / 5)):
|
87 |
frame = video_clip.get_frame(frame_time / video_clip.fps)
|
88 |
frames.append(frame)
|
@@ -100,13 +69,12 @@ def analyze_scenes(video_path, scenes, description):
|
|
100 |
"Still-camera shot of a person's face"
|
101 |
]
|
102 |
|
|
|
103 |
text_inputs = processor(text=[description] + negative_descriptions, return_tensors="pt", padding=True).to(device)
|
104 |
text_features = model.get_text_features(**text_inputs).detach()
|
105 |
positive_feature, negative_features = text_features[0], text_features[1:]
|
106 |
|
107 |
for scene_num, (start_time, end_time) in enumerate(scenes):
|
108 |
-
start_seconds = timecode_to_seconds(start_time)
|
109 |
-
end_seconds = timecode_to_seconds(end_time)
|
110 |
frames = extract_frames(video_path, start_time, end_time)
|
111 |
if not frames:
|
112 |
print(f"Scene {scene_num + 1}: Start={start_time}, End={end_time} - No frames extracted")
|
@@ -123,14 +91,16 @@ def analyze_scenes(video_path, scenes, description):
|
|
123 |
scene_prob += positive_similarity - negative_similarities
|
124 |
|
125 |
scene_prob /= len(frames)
|
126 |
-
scene_duration =
|
127 |
-
|
128 |
print(f"Scene {scene_num + 1}: Start={start_time}, End={end_time}, Probability={scene_prob}, Duration={scene_duration}")
|
129 |
|
130 |
scene_scores.append((scene_prob, start_time, end_time, scene_duration))
|
131 |
|
|
|
132 |
scene_scores.sort(reverse=True, key=lambda x: x[0])
|
133 |
top_scenes = scene_scores[:5]
|
|
|
|
|
134 |
longest_scene = max(top_scenes, key=lambda x: x[3])
|
135 |
|
136 |
if longest_scene:
|
@@ -145,20 +115,17 @@ def extract_best_scene(video_path, scene):
|
|
145 |
return None
|
146 |
|
147 |
start_time, end_time = scene
|
148 |
-
|
|
|
|
|
149 |
return video_clip
|
150 |
|
151 |
-
def process_video(
|
152 |
-
video_path = download_video(
|
153 |
scenes = find_scenes(video_path)
|
154 |
-
if not scenes:
|
155 |
-
print("No scenes detected. Exiting.")
|
156 |
-
return None
|
157 |
best_scene = analyze_scenes(video_path, scenes, description)
|
158 |
-
if not best_scene:
|
159 |
-
print("No suitable scenes found. Exiting.")
|
160 |
-
return None
|
161 |
final_clip = extract_best_scene(video_path, best_scene)
|
|
|
162 |
if final_clip:
|
163 |
output_dir = "output"
|
164 |
os.makedirs(output_dir, exist_ok=True)
|
@@ -177,4 +144,4 @@ def cleanup_temp_files():
|
|
177 |
if os.path.isfile(file_path):
|
178 |
os.unlink(file_path)
|
179 |
except Exception as e:
|
180 |
-
print(f"Error
|
|
|
1 |
import os
|
2 |
import cv2
|
|
|
3 |
from scenedetect import VideoManager, SceneManager
|
|
|
4 |
from scenedetect.detectors import ContentDetector
|
5 |
from moviepy.editor import VideoFileClip
|
6 |
from transformers import CLIPProcessor, CLIPModel
|
|
|
8 |
import yt_dlp
|
9 |
from PIL import Image
|
10 |
import uuid
|
|
|
11 |
|
12 |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
13 |
model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(device)
|
|
|
30 |
def sanitize_filename(filename):
|
31 |
return "".join([c if c.isalnum() or c in " .-_()" else "_" for c in filename])
|
32 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
33 |
def find_scenes(video_path):
|
|
|
34 |
video_manager = VideoManager([video_path])
|
35 |
scene_manager = SceneManager()
|
36 |
+
scene_manager.add_detector(ContentDetector(threshold=33)) # Adjusted threshold for finer segmentation
|
37 |
+
video_manager.set_downscale_factor()
|
|
|
|
|
|
|
38 |
video_manager.start()
|
|
|
|
|
39 |
scene_manager.detect_scenes(frame_source=video_manager)
|
|
|
|
|
40 |
scene_list = scene_manager.get_scene_list()
|
|
|
|
|
41 |
video_manager.release()
|
|
|
|
|
42 |
scenes = [(start.get_timecode(), end.get_timecode()) for start, end in scene_list]
|
|
|
43 |
return scenes
|
44 |
|
|
|
|
|
45 |
def convert_timestamp_to_seconds(timestamp):
|
46 |
+
h, m, s = map(float, timestamp.split(':'))
|
47 |
+
return int(h) * 3600 + int(m) * 60 + s
|
|
|
|
|
|
|
48 |
|
|
|
49 |
def extract_frames(video_path, start_time, end_time):
|
50 |
frames = []
|
51 |
+
start_seconds = convert_timestamp_to_seconds(start_time)
|
52 |
+
end_seconds = convert_timestamp_to_seconds(end_time)
|
53 |
+
video_clip = VideoFileClip(video_path).subclip(start_seconds, end_seconds)
|
54 |
+
# Extract more frames: every frame in the scene
|
55 |
for frame_time in range(0, int(video_clip.duration * video_clip.fps), int(video_clip.fps / 5)):
|
56 |
frame = video_clip.get_frame(frame_time / video_clip.fps)
|
57 |
frames.append(frame)
|
|
|
69 |
"Still-camera shot of a person's face"
|
70 |
]
|
71 |
|
72 |
+
# Tokenize and encode the description text
|
73 |
text_inputs = processor(text=[description] + negative_descriptions, return_tensors="pt", padding=True).to(device)
|
74 |
text_features = model.get_text_features(**text_inputs).detach()
|
75 |
positive_feature, negative_features = text_features[0], text_features[1:]
|
76 |
|
77 |
for scene_num, (start_time, end_time) in enumerate(scenes):
|
|
|
|
|
78 |
frames = extract_frames(video_path, start_time, end_time)
|
79 |
if not frames:
|
80 |
print(f"Scene {scene_num + 1}: Start={start_time}, End={end_time} - No frames extracted")
|
|
|
91 |
scene_prob += positive_similarity - negative_similarities
|
92 |
|
93 |
scene_prob /= len(frames)
|
94 |
+
scene_duration = convert_timestamp_to_seconds(end_time) - convert_timestamp_to_seconds(start_time)
|
|
|
95 |
print(f"Scene {scene_num + 1}: Start={start_time}, End={end_time}, Probability={scene_prob}, Duration={scene_duration}")
|
96 |
|
97 |
scene_scores.append((scene_prob, start_time, end_time, scene_duration))
|
98 |
|
99 |
+
# Sort scenes by probability in descending order and select the top 5
|
100 |
scene_scores.sort(reverse=True, key=lambda x: x[0])
|
101 |
top_scenes = scene_scores[:5]
|
102 |
+
|
103 |
+
# Find the longest scene among the top 5
|
104 |
longest_scene = max(top_scenes, key=lambda x: x[3])
|
105 |
|
106 |
if longest_scene:
|
|
|
115 |
return None
|
116 |
|
117 |
start_time, end_time = scene
|
118 |
+
start_seconds = convert_timestamp_to_seconds(start_time)
|
119 |
+
end_seconds = convert_timestamp_to_seconds(end_time)
|
120 |
+
video_clip = VideoFileClip(video_path).subclip(start_seconds, end_seconds)
|
121 |
return video_clip
|
122 |
|
123 |
+
def process_video(video_url, description):
|
124 |
+
video_path = download_video(video_url)
|
125 |
scenes = find_scenes(video_path)
|
|
|
|
|
|
|
126 |
best_scene = analyze_scenes(video_path, scenes, description)
|
|
|
|
|
|
|
127 |
final_clip = extract_best_scene(video_path, best_scene)
|
128 |
+
|
129 |
if final_clip:
|
130 |
output_dir = "output"
|
131 |
os.makedirs(output_dir, exist_ok=True)
|
|
|
144 |
if os.path.isfile(file_path):
|
145 |
os.unlink(file_path)
|
146 |
except Exception as e:
|
147 |
+
print(f"Error: {e}")
|