Spaces:
Sleeping
Sleeping
Update video_processing.py
Browse files- video_processing.py +57 -49
video_processing.py
CHANGED
@@ -13,9 +13,11 @@ from torch.nn import functional as F
|
|
13 |
from cachetools import cached, TTLCache
|
14 |
import numpy as np
|
15 |
import logging
|
|
|
|
|
16 |
|
17 |
# Setup basic logging
|
18 |
-
logging.basicConfig(level=logging.INFO)
|
19 |
|
20 |
|
21 |
categories = ["Joy", "Trust", "Fear", "Surprise", "Sadness", "Disgust", "Anger", "Anticipation"]
|
@@ -29,9 +31,9 @@ processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
|
|
29 |
resnet50 = models.resnet50(pretrained=True).eval().to(device)
|
30 |
|
31 |
#initialize caches
|
32 |
-
scene_cache = TTLCache(maxsize=100, ttl=86400) # cache up to 100 items, each for 1 day
|
33 |
-
frame_cache = TTLCache(maxsize=1000, ttl=86400)
|
34 |
-
analysis_cache = TTLCache(maxsize=1000, ttl=86400)
|
35 |
|
36 |
|
37 |
def cache_info_decorator(func, cache):
|
@@ -85,7 +87,6 @@ def download_video(url):
|
|
85 |
def sanitize_filename(filename):
|
86 |
return "".join([c if c.isalnum() or c in " .-_()" else "_" for c in filename])
|
87 |
|
88 |
-
@cache_info_decorator
|
89 |
def find_scenes(video_path):
|
90 |
video_manager = VideoManager([video_path])
|
91 |
scene_manager = SceneManager()
|
@@ -102,21 +103,30 @@ def convert_timestamp_to_seconds(timestamp):
|
|
102 |
h, m, s = map(float, timestamp.split(':'))
|
103 |
return int(h) * 3600 + int(m) * 60 + s
|
104 |
|
105 |
-
@cache_info_decorator
|
106 |
def extract_frames(video_path, start_time, end_time):
|
107 |
-
|
108 |
-
|
109 |
-
|
110 |
video_clip = VideoFileClip(video_path).subclip(start_seconds, end_seconds)
|
111 |
-
|
112 |
-
|
113 |
-
|
114 |
-
|
|
|
|
|
115 |
return frames
|
116 |
|
117 |
-
|
118 |
-
|
119 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
120 |
negative_descriptions = [
|
121 |
"black screen",
|
122 |
"Intro text for a video",
|
@@ -125,53 +135,51 @@ def analyze_scenes(video_path, scenes, description):
|
|
125 |
"A still shot of natural scenery",
|
126 |
"Still-camera shot of a person's face"
|
127 |
]
|
128 |
-
|
129 |
text_inputs = processor(text=[description] + negative_descriptions, return_tensors="pt", padding=True).to(device)
|
130 |
text_features = model.get_text_features(**text_inputs).detach()
|
131 |
positive_feature, negative_features = text_features[0], text_features[1:]
|
132 |
|
133 |
-
for
|
134 |
-
|
135 |
-
|
136 |
-
|
137 |
-
|
138 |
-
|
139 |
-
|
140 |
-
|
141 |
-
|
142 |
-
|
143 |
-
|
144 |
-
with torch.no_grad():
|
145 |
-
image_features = model.get_image_features(**image_input).detach()
|
146 |
-
positive_similarity = torch.cosine_similarity(image_features, positive_feature.unsqueeze(0)).squeeze().item()
|
147 |
-
negative_similarities = torch.cosine_similarity(image_features, negative_features).squeeze().mean().item()
|
148 |
-
scene_prob += positive_similarity - negative_similarities
|
149 |
-
|
150 |
-
frame_sentiments = classify_frame(frame)
|
151 |
-
sentiment_distributions += np.array(frame_sentiments)
|
152 |
|
|
|
153 |
sentiment_distributions /= len(frames) # Normalize to get average probabilities
|
154 |
sentiment_percentages = {category: round(prob * 100, 2) for category, prob in zip(categories, sentiment_distributions)}
|
155 |
scene_prob /= len(frames)
|
156 |
scene_duration = convert_timestamp_to_seconds(end_time) - convert_timestamp_to_seconds(start_time)
|
157 |
-
print(f"Scene
|
|
|
158 |
|
159 |
-
|
160 |
|
161 |
-
|
162 |
-
|
163 |
-
|
164 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
165 |
top_3_scenes = scene_scores[:3] # Get the top 3 scenes
|
166 |
best_scene = max(top_3_scenes, key=lambda x: x[3]) # Find the longest scene from these top 3
|
|
|
|
|
|
|
167 |
|
168 |
-
|
169 |
-
|
170 |
-
print(f"Best Scene: Start={best_scene[1]}, End={best_scene[2]}, Probability={best_scene[0]}, Duration={best_scene[3]}, Sentiments: {best_scene[4]}")
|
171 |
-
return (best_scene[1], best_scene[2]), best_scene[4] # Returning a tuple with scene times and sentiments
|
172 |
-
else:
|
173 |
-
print("No suitable scene found")
|
174 |
-
return None, {}
|
175 |
|
176 |
|
177 |
def extract_best_scene(video_path, scene):
|
|
|
13 |
from cachetools import cached, TTLCache
|
14 |
import numpy as np
|
15 |
import logging
|
16 |
+
from multiprocessing import Pool
|
17 |
+
|
18 |
|
19 |
# Setup basic logging
|
20 |
+
#logging.basicConfig(level=logging.INFO)
|
21 |
|
22 |
|
23 |
categories = ["Joy", "Trust", "Fear", "Surprise", "Sadness", "Disgust", "Anger", "Anticipation"]
|
|
|
31 |
resnet50 = models.resnet50(pretrained=True).eval().to(device)
|
32 |
|
33 |
#initialize caches
|
34 |
+
#scene_cache = TTLCache(maxsize=100, ttl=86400) # cache up to 100 items, each for 1 day
|
35 |
+
#frame_cache = TTLCache(maxsize=1000, ttl=86400)
|
36 |
+
#analysis_cache = TTLCache(maxsize=1000, ttl=86400)
|
37 |
|
38 |
|
39 |
def cache_info_decorator(func, cache):
|
|
|
87 |
def sanitize_filename(filename):
|
88 |
return "".join([c if c.isalnum() or c in " .-_()" else "_" for c in filename])
|
89 |
|
|
|
90 |
def find_scenes(video_path):
|
91 |
video_manager = VideoManager([video_path])
|
92 |
scene_manager = SceneManager()
|
|
|
103 |
h, m, s = map(float, timestamp.split(':'))
|
104 |
return int(h) * 3600 + int(m) * 60 + s
|
105 |
|
|
|
106 |
def extract_frames(video_path, start_time, end_time):
|
107 |
+
def extract_frame_at_time(t):
|
108 |
+
return video_clip.get_frame(t / video_clip.fps)
|
109 |
+
|
110 |
video_clip = VideoFileClip(video_path).subclip(start_seconds, end_seconds)
|
111 |
+
frame_times = range(0, int(video_clip.duration * video_clip.fps), int(video_clip.fps / 10))
|
112 |
+
|
113 |
+
# Create a pool of workers to extract frames in parallel
|
114 |
+
with Pool() as pool:
|
115 |
+
frames = pool.map(extract_frame_at_time, frame_times)
|
116 |
+
|
117 |
return frames
|
118 |
|
119 |
+
def analyze_scene(params):
|
120 |
+
video_path, start_time, end_time, description = params
|
121 |
+
frames = extract_frames(video_path, start_time, end_time)
|
122 |
+
if not frames:
|
123 |
+
print(f"Scene: Start={start_time}, End={end_time} - No frames extracted")
|
124 |
+
return (start_time, end_time, None) # Adjust as needed for error handling
|
125 |
+
|
126 |
+
scene_prob = 0.0
|
127 |
+
sentiment_distributions = np.zeros(8) # Assuming there are 8 sentiments
|
128 |
+
|
129 |
+
# Preparing text inputs and features once per scene
|
130 |
negative_descriptions = [
|
131 |
"black screen",
|
132 |
"Intro text for a video",
|
|
|
135 |
"A still shot of natural scenery",
|
136 |
"Still-camera shot of a person's face"
|
137 |
]
|
|
|
138 |
text_inputs = processor(text=[description] + negative_descriptions, return_tensors="pt", padding=True).to(device)
|
139 |
text_features = model.get_text_features(**text_inputs).detach()
|
140 |
positive_feature, negative_features = text_features[0], text_features[1:]
|
141 |
|
142 |
+
for frame in frames:
|
143 |
+
image = Image.fromarray(frame[..., ::-1])
|
144 |
+
image_input = processor(images=image, return_tensors="pt").to(device)
|
145 |
+
with torch.no_grad():
|
146 |
+
image_features = model.get_image_features(**image_input).detach()
|
147 |
+
positive_similarity = torch.cosine_similarity(image_features, positive_feature.unsqueeze(0)).squeeze().item()
|
148 |
+
negative_similarities = torch.cosine_similarity(image_features, negative_features).squeeze().mean().item()
|
149 |
+
scene_prob += positive_similarity - negative_similarities
|
150 |
+
|
151 |
+
frame_sentiments = classify_frame(frame)
|
152 |
+
sentiment_distributions += np.array(frame_sentiments)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
153 |
|
154 |
+
if len(frames) > 0:
|
155 |
sentiment_distributions /= len(frames) # Normalize to get average probabilities
|
156 |
sentiment_percentages = {category: round(prob * 100, 2) for category, prob in zip(categories, sentiment_distributions)}
|
157 |
scene_prob /= len(frames)
|
158 |
scene_duration = convert_timestamp_to_seconds(end_time) - convert_timestamp_to_seconds(start_time)
|
159 |
+
print(f"Scene: Start={start_time}, End={end_time}, Probability={scene_prob}, Duration={scene_duration}, Sentiments: {sentiment_percentages}")
|
160 |
+
return (start_time, end_time, scene_prob, scene_duration, sentiment_percentages)
|
161 |
|
162 |
+
return (start_time, end_time, None) # Adjust as needed for error handling
|
163 |
|
164 |
+
def analyze_scenes(video_path, scenes, description):
|
165 |
+
scene_params = [(video_path, start, end, description) for start, end in scenes]
|
166 |
+
|
167 |
+
# Analyze each scene in parallel
|
168 |
+
with Pool(processes=4) as pool: # You can set the number of processes based on your system's CPU cores
|
169 |
+
results = pool.map(analyze_scene, scene_params)
|
170 |
+
|
171 |
+
# Process results to find the best scene
|
172 |
+
scene_scores = [result for result in results if result[2] is not None] # Filter out scenes with no data
|
173 |
+
if scene_scores:
|
174 |
+
scene_scores.sort(reverse=True, key=lambda x: x[2]) # Sort scenes by confidence, highest first
|
175 |
top_3_scenes = scene_scores[:3] # Get the top 3 scenes
|
176 |
best_scene = max(top_3_scenes, key=lambda x: x[3]) # Find the longest scene from these top 3
|
177 |
+
if best_scene:
|
178 |
+
print(f"Best Scene: Start={best_scene[0]}, End={best_scene[1]}, Probability={best_scene[2]}, Duration={best_scene[3]}, Sentiments: {best_scene[4]}")
|
179 |
+
return (best_scene[0], best_scene[1]), best_scene[4] # Returning a tuple with scene times and sentiments
|
180 |
|
181 |
+
print("No suitable scene found")
|
182 |
+
return None, {}
|
|
|
|
|
|
|
|
|
|
|
183 |
|
184 |
|
185 |
def extract_best_scene(video_path, scene):
|