jschwab21 commited on
Commit
d1b6fc5
·
verified ·
1 Parent(s): d94d4f7

Update video_processing.py

Browse files
Files changed (1) hide show
  1. video_processing.py +58 -82
video_processing.py CHANGED
@@ -1,3 +1,5 @@
 
 
1
  import os
2
  import cv2
3
  from scenedetect import VideoManager, SceneManager
@@ -10,52 +12,23 @@ from PIL import Image
10
  import uuid
11
  from torchvision import models, transforms
12
  from torch.nn import functional as F
13
- from cachetools import cached, TTLCache
14
- import numpy as np
15
- import logging
16
- from multiprocessing import Pool
17
- from concurrent.futures import ThreadPoolExecutor
18
- from concurrent.futures import ProcessPoolExecutor
19
-
20
-
21
-
22
-
23
- # Setup basic logging
24
- #logging.basicConfig(level=logging.INFO)
25
-
26
 
27
  categories = ["Joy", "Trust", "Fear", "Surprise", "Sadness", "Disgust", "Anger", "Anticipation"]
28
 
29
- #initializing CLIP
30
  device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
31
  model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(device)
32
  processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
33
 
34
- #initializing ZG placeholder
35
- resnet50 = models.resnet50(pretrained=True).eval().to(device)
36
 
37
- #initialize caches
38
- #scene_cache = TTLCache(maxsize=100, ttl=86400) # cache up to 100 items, each for 1 day
39
- #frame_cache = TTLCache(maxsize=1000, ttl=86400)
40
- #analysis_cache = TTLCache(maxsize=1000, ttl=86400)
41
-
42
-
43
- def cache_info_decorator(func, cache):
44
- """Decorator to add caching and logging to a function."""
45
- key_func = lambda *args, **kwargs: "_".join(map(str, args)) # Simple key func based on str(args)
46
-
47
- @cached(cache, key=key_func)
48
- def wrapper(*args, **kwargs):
49
- key = key_func(*args, **kwargs)
50
- if key in cache:
51
- logging.info(f"Cache hit for key: {key}")
52
- else:
53
- logging.info(f"Cache miss for key: {key}. Caching result.")
54
- return func(*args, **kwargs)
55
- return wrapper
56
-
57
-
58
  def classify_frame(frame):
 
 
 
 
 
 
 
59
  preprocess = transforms.Compose([
60
  transforms.Resize(256),
61
  transforms.CenterCrop(224),
@@ -65,12 +38,15 @@ def classify_frame(frame):
65
  input_tensor = preprocess(Image.fromarray(frame))
66
  input_batch = input_tensor.unsqueeze(0).to(device)
67
 
68
- # Use the globally loaded ResNet-50 model
69
  with torch.no_grad():
70
  output = resnet50(input_batch)
71
  probabilities = F.softmax(output[0], dim=0)
72
 
 
 
73
  results_array = np.array([probabilities[i].item() for i in range(len(categories))])
 
74
  return results_array
75
 
76
 
@@ -108,22 +84,17 @@ def convert_timestamp_to_seconds(timestamp):
108
  return int(h) * 3600 + int(m) * 60 + s
109
 
110
  def extract_frames(video_path, start_time, end_time):
111
- video_clip = VideoFileClip(video_path).subclip(start_time, end_time)
112
- return [video_clip.get_frame(t / video_clip.fps) for t in range(0, int(video_clip.duration * video_clip.fps), int(video_clip.fps / 10))]
113
-
 
 
 
 
 
 
114
 
115
- def analyze_frame(args):
116
- frame, positive_feature, negative_features = args
117
- image = Image.fromarray(frame[..., ::-1])
118
- image_input = processor(images=image, return_tensors="pt").to(device)
119
- with torch.no_grad():
120
- image_features = model.get_image_features(**image_input).detach()
121
- positive_similarity = torch.cosine_similarity(image_features, positive_feature.unsqueeze(0)).squeeze().item()
122
- negative_similarities = torch.cosine_similarity(image_features, negative_features).squeeze().mean().item()
123
-
124
- scene_prob = positive_similarity - negative_similarities
125
- frame_sentiments = classify_frame(frame)
126
- return scene_prob, frame_sentiments
127
 
128
  def analyze_scenes(video_path, scenes, description):
129
  scene_scores = []
@@ -140,42 +111,47 @@ def analyze_scenes(video_path, scenes, description):
140
  text_features = model.get_text_features(**text_inputs).detach()
141
  positive_feature, negative_features = text_features[0], text_features[1:]
142
 
143
- tasks = []
144
- for start_time, end_time in scenes:
145
  frames = extract_frames(video_path, start_time, end_time)
 
 
 
 
 
 
146
  for frame in frames:
147
- tasks.append((frame, positive_feature, negative_features))
148
-
149
- scene_results = {}
150
-
151
- with ProcessPoolExecutor(max_workers=8) as executor:
152
- results = list(executor.map(analyze_frame, tasks))
153
-
154
- for ((start_time, end_time), (scene_prob, sentiments)) in zip(scenes, results):
155
- if (start_time, end_time) not in scene_results:
156
- scene_results[(start_time, end_time)] = {
157
- 'probabilities': [],
158
- 'sentiments': np.zeros(8)
159
- }
160
- scene_results[(start_time, end_time)]['probabilities'].append(scene_prob)
161
- scene_results[(start_time, end_time)]['sentiments'] += sentiments
162
-
163
- # Calculate averages and prepare the final scores
164
- for (start_time, end_time), data in scene_results.items():
165
- avg_prob = np.mean(data['probabilities'])
166
- avg_sentiments = data['sentiments'] / len(data['probabilities'])
167
- sentiment_percentages = {category: round(prob * 100, 2) for category, prob in zip(categories, avg_sentiments)}
168
  scene_duration = convert_timestamp_to_seconds(end_time) - convert_timestamp_to_seconds(start_time)
169
- scene_scores.append((avg_prob, start_time, end_time, scene_duration, sentiment_percentages))
 
 
 
 
 
 
 
 
 
170
 
171
- # Sort and select the best scene
172
- scene_scores.sort(reverse=True, key=lambda x: x[0])
173
- top_3_scenes = scene_scores[:3]
174
- best_scene = max(top_3_scenes, key=lambda x: x[3])
175
 
176
  if best_scene:
177
- return (best_scene[1], best_scene[2]), best_scene[4]
 
178
  else:
 
179
  return None, {}
180
 
181
 
 
1
+ Let's go back to this version of video_processing.py:
2
+ video_processing.py:
3
  import os
4
  import cv2
5
  from scenedetect import VideoManager, SceneManager
 
12
  import uuid
13
  from torchvision import models, transforms
14
  from torch.nn import functional as F
 
 
 
 
 
 
 
 
 
 
 
 
 
15
 
16
  categories = ["Joy", "Trust", "Fear", "Surprise", "Sadness", "Disgust", "Anger", "Anticipation"]
17
 
18
+
19
  device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
20
  model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(device)
21
  processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
22
 
 
 
23
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
24
  def classify_frame(frame):
25
+ categories = ["Joy", "Trust", "Fear", "Surprise", "Sadness", "Disgust", "Anger", "Anticipation"]
26
+
27
+ # Load ResNet-50 model
28
+ resnet50 = models.resnet50(pretrained=True)
29
+ resnet50.eval().to(device)
30
+
31
+ # Preprocess the image
32
  preprocess = transforms.Compose([
33
  transforms.Resize(256),
34
  transforms.CenterCrop(224),
 
38
  input_tensor = preprocess(Image.fromarray(frame))
39
  input_batch = input_tensor.unsqueeze(0).to(device)
40
 
41
+ # Predict with ResNet-50
42
  with torch.no_grad():
43
  output = resnet50(input_batch)
44
  probabilities = F.softmax(output[0], dim=0)
45
 
46
+ # Create a numpy array from the probabilities of the categories
47
+ # This example assumes each category is mapped to a model output directly
48
  results_array = np.array([probabilities[i].item() for i in range(len(categories))])
49
+
50
  return results_array
51
 
52
 
 
84
  return int(h) * 3600 + int(m) * 60 + s
85
 
86
  def extract_frames(video_path, start_time, end_time):
87
+ frames = []
88
+ start_seconds = convert_timestamp_to_seconds(start_time)
89
+ end_seconds = convert_timestamp_to_seconds(end_time)
90
+ video_clip = VideoFileClip(video_path).subclip(start_seconds, end_seconds)
91
+ # Extract more frames: every frame in the scene
92
+ for frame_time in range(0, int(video_clip.duration * video_clip.fps), int(video_clip.fps / 10)):
93
+ frame = video_clip.get_frame(frame_time / video_clip.fps)
94
+ frames.append(frame)
95
+ return frames
96
 
97
+ import numpy as np
 
 
 
 
 
 
 
 
 
 
 
98
 
99
  def analyze_scenes(video_path, scenes, description):
100
  scene_scores = []
 
111
  text_features = model.get_text_features(**text_inputs).detach()
112
  positive_feature, negative_features = text_features[0], text_features[1:]
113
 
114
+ for scene_num, (start_time, end_time) in enumerate(scenes):
 
115
  frames = extract_frames(video_path, start_time, end_time)
116
+ if not frames:
117
+ print(f"Scene {scene_num + 1}: Start={start_time}, End={end_time} - No frames extracted")
118
+ continue
119
+
120
+ scene_prob = 0.0
121
+ sentiment_distributions = np.zeros(8) # Assuming there are 8 sentiments
122
  for frame in frames:
123
+ image = Image.fromarray(frame[..., ::-1])
124
+ image_input = processor(images=image, return_tensors="pt").to(device)
125
+ with torch.no_grad():
126
+ image_features = model.get_image_features(**image_input).detach()
127
+ positive_similarity = torch.cosine_similarity(image_features, positive_feature.unsqueeze(0)).squeeze().item()
128
+ negative_similarities = torch.cosine_similarity(image_features, negative_features).squeeze().mean().item()
129
+ scene_prob += positive_similarity - negative_similarities
130
+
131
+ frame_sentiments = classify_frame(frame)
132
+ sentiment_distributions += np.array(frame_sentiments)
133
+
134
+ sentiment_distributions /= len(frames) # Normalize to get average probabilities
135
+ sentiment_percentages = {category: round(prob * 100, 2) for category, prob in zip(categories, sentiment_distributions)}
136
+ scene_prob /= len(frames)
 
 
 
 
 
 
 
137
  scene_duration = convert_timestamp_to_seconds(end_time) - convert_timestamp_to_seconds(start_time)
138
+ print(f"Scene {scene_num + 1}: Start={start_time}, End={end_time}, Probability={scene_prob}, Duration={scene_duration}, Sentiments: {sentiment_percentages}")
139
+
140
+ scene_scores.append((scene_prob, start_time, end_time, scene_duration, sentiment_percentages))
141
+
142
+ # Sort scenes by confidence, highest first
143
+ scene_scores.sort(reverse=True, key=lambda x: x[0])
144
+
145
+ # Select the longest scene from the top 3 highest confidence scenes
146
+ top_3_scenes = scene_scores[:3] # Get the top 3 scenes
147
+ best_scene = max(top_3_scenes, key=lambda x: x[3]) # Find the longest scene from these top 3
148
 
 
 
 
 
149
 
150
  if best_scene:
151
+ print(f"Best Scene: Start={best_scene[1]}, End={best_scene[2]}, Probability={best_scene[0]}, Duration={best_scene[3]}, Sentiments: {best_scene[4]}")
152
+ return (best_scene[1], best_scene[2]), best_scene[4] # Returning a tuple with scene times and sentiments
153
  else:
154
+ print("No suitable scene found")
155
  return None, {}
156
 
157