David Driscoll commited on
Commit
b37a8e6
·
1 Parent(s): 8947b35

Caching and lag reduction

Browse files
Files changed (1) hide show
  1. app.py +108 -81
app.py CHANGED
@@ -7,144 +7,171 @@ from torchvision.models.detection import FasterRCNN_ResNet50_FPN_Weights
7
  from PIL import Image
8
  import mediapipe as mp
9
  from fer import FER # Facial emotion recognition
10
- from concurrent.futures import ThreadPoolExecutor
11
 
12
  # -----------------------------
13
- # Asynchronous Processing Setup
14
  # -----------------------------
15
- executor = ThreadPoolExecutor(max_workers=4)
16
- latest_results = {
17
- "posture": None,
18
- "emotion": None,
19
- "objects": None,
20
- "faces": None
21
- }
22
- futures = {
23
- "posture": None,
24
- "emotion": None,
25
- "objects": None,
26
- "faces": None
27
- }
28
 
29
- def async_analyze(key, func, image):
30
- # If a background task is done, update our cache.
31
- if futures[key] is not None and futures[key].done():
32
- latest_results[key] = futures[key].result()
33
- futures[key] = None
34
- # If we already have a cached result, return it immediately and schedule a new update if none is running.
35
- if latest_results[key] is not None:
36
- if futures[key] is None:
37
- futures[key] = executor.submit(func, image)
38
- return latest_results[key]
39
- # Otherwise, compute synchronously (blocking) to initialize the cache.
40
- result = func(image)
41
- latest_results[key] = result
42
- futures[key] = executor.submit(func, image)
43
- return result
44
 
45
  # -----------------------------
46
  # Initialize Models and Helpers
47
  # -----------------------------
48
- # MediaPipe Pose for posture analysis
49
  mp_pose = mp.solutions.pose
50
  pose = mp_pose.Pose()
51
  mp_drawing = mp.solutions.drawing_utils
52
 
53
- # MediaPipe Face Detection for face detection
54
  mp_face_detection = mp.solutions.face_detection
55
  face_detection = mp_face_detection.FaceDetection(min_detection_confidence=0.5)
56
 
57
- # Object Detection Model: Faster R-CNN (pretrained on COCO)
58
  object_detection_model = models.detection.fasterrcnn_resnet50_fpn(
59
  weights=FasterRCNN_ResNet50_FPN_Weights.DEFAULT
60
  )
61
  object_detection_model.eval()
62
  obj_transform = transforms.Compose([transforms.ToTensor()])
63
 
64
- # Facial Emotion Detection using FER (requires TensorFlow)
65
  emotion_detector = FER(mtcnn=True)
66
 
67
  # -----------------------------
68
- # Heavy (Synchronous) Analysis Functions
69
  # -----------------------------
70
- def _analyze_posture(image):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
71
  frame = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
72
- output_frame = frame.copy()
73
  frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
74
- posture_result = "No posture detected"
75
  pose_results = pose.process(frame_rgb)
76
  if pose_results.pose_landmarks:
77
- posture_result = "Posture detected"
78
- mp_drawing.draw_landmarks(
79
- output_frame, pose_results.pose_landmarks, mp_pose.POSE_CONNECTIONS,
80
- mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=2),
81
- mp_drawing.DrawingSpec(color=(0, 0, 255), thickness=2)
82
  )
83
- annotated_image = cv2.cvtColor(output_frame, cv2.COLOR_BGR2RGB)
84
- return annotated_image, f"Posture Analysis: {posture_result}"
 
 
 
85
 
86
- def _analyze_emotion(image):
87
  frame = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
88
  frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
89
  emotions = emotion_detector.detect_emotions(frame_rgb)
90
  if emotions:
91
  top_emotion, score = max(emotions[0]["emotions"].items(), key=lambda x: x[1])
92
- emotion_text = f"{top_emotion} ({score:.2f})"
93
  else:
94
- emotion_text = "No face detected for emotion analysis"
95
- annotated_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
96
- return annotated_image, f"Emotion Analysis: {emotion_text}"
97
 
98
- def _analyze_objects(image):
99
  frame = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
100
- output_frame = frame.copy()
101
  frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
102
  image_pil = Image.fromarray(frame_rgb)
103
  img_tensor = obj_transform(image_pil)
104
  with torch.no_grad():
105
  detections = object_detection_model([img_tensor])[0]
106
  threshold = 0.8
107
- detected_boxes = detections["boxes"][detections["scores"] > threshold]
108
- for box in detected_boxes:
109
- box = box.int().cpu().numpy()
110
- cv2.rectangle(output_frame, (box[0], box[1]), (box[2], box[3]), (255, 255, 0), 2)
111
- object_result = f"Detected {len(detected_boxes)} object(s)" if len(detected_boxes) else "No objects detected"
112
- annotated_image = cv2.cvtColor(output_frame, cv2.COLOR_BGR2RGB)
113
- return annotated_image, f"Object Detection: {object_result}"
114
-
115
- def _analyze_faces(image):
116
  frame = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
117
- output_frame = frame.copy()
118
  frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
119
  face_results = face_detection.process(frame_rgb)
120
- face_result = "No faces detected"
121
  if face_results.detections:
122
- face_result = f"Detected {len(face_results.detections)} face(s)"
123
- h, w, _ = output_frame.shape
124
  for detection in face_results.detections:
125
  bbox = detection.location_data.relative_bounding_box
126
  x = int(bbox.xmin * w)
127
  y = int(bbox.ymin * h)
128
  box_w = int(bbox.width * w)
129
  box_h = int(bbox.height * h)
130
- cv2.rectangle(output_frame, (x, y), (x + box_w, y + box_h), (0, 0, 255), 2)
131
- annotated_image = cv2.cvtColor(output_frame, cv2.COLOR_BGR2RGB)
132
- return annotated_image, f"Face Detection: {face_result}"
 
 
133
 
134
  # -----------------------------
135
- # Asynchronous Wrappers for Each Analysis
 
136
  # -----------------------------
137
- def analyze_posture_async(image):
138
- return async_analyze("posture", _analyze_posture, image)
 
 
 
 
 
 
 
 
 
 
 
 
139
 
140
- def analyze_emotion_async(image):
141
- return async_analyze("emotion", _analyze_emotion, image)
 
 
 
 
 
 
 
142
 
143
- def analyze_objects_async(image):
144
- return async_analyze("objects", _analyze_objects, image)
 
 
 
 
 
 
 
 
 
 
145
 
146
- def analyze_faces_async(image):
147
- return async_analyze("faces", _analyze_faces, image)
 
 
 
 
 
 
 
 
 
 
148
 
149
  # -----------------------------
150
  # Custom CSS for a High-Tech Look (White Font)
@@ -183,7 +210,7 @@ body {
183
  # Create Individual Interfaces for Each Analysis
184
  # -----------------------------
185
  posture_interface = gr.Interface(
186
- fn=analyze_posture_async,
187
  inputs=gr.Image(sources=["webcam"], streaming=True, label="Capture Your Posture"),
188
  outputs=[gr.Image(type="numpy", label="Annotated Output"), gr.Textbox(label="Posture Analysis")],
189
  title="Posture Analysis",
@@ -192,7 +219,7 @@ posture_interface = gr.Interface(
192
  )
193
 
194
  emotion_interface = gr.Interface(
195
- fn=analyze_emotion_async,
196
  inputs=gr.Image(sources=["webcam"], streaming=True, label="Capture Your Face"),
197
  outputs=[gr.Image(type="numpy", label="Annotated Output"), gr.Textbox(label="Emotion Analysis")],
198
  title="Emotion Analysis",
@@ -201,7 +228,7 @@ emotion_interface = gr.Interface(
201
  )
202
 
203
  objects_interface = gr.Interface(
204
- fn=analyze_objects_async,
205
  inputs=gr.Image(sources=["webcam"], streaming=True, label="Capture the Scene"),
206
  outputs=[gr.Image(type="numpy", label="Annotated Output"), gr.Textbox(label="Object Detection")],
207
  title="Object Detection",
@@ -210,7 +237,7 @@ objects_interface = gr.Interface(
210
  )
211
 
212
  faces_interface = gr.Interface(
213
- fn=analyze_faces_async,
214
  inputs=gr.Image(sources=["webcam"], streaming=True, label="Capture Your Face"),
215
  outputs=[gr.Image(type="numpy", label="Annotated Output"), gr.Textbox(label="Face Detection")],
216
  title="Face Detection",
 
7
  from PIL import Image
8
  import mediapipe as mp
9
  from fer import FER # Facial emotion recognition
 
10
 
11
  # -----------------------------
12
+ # Configuration: Adjust skip rate (lower = more frequent heavy updates)
13
  # -----------------------------
14
+ SKIP_RATE = 5
 
 
 
 
 
 
 
 
 
 
 
 
15
 
16
+ # -----------------------------
17
+ # Global caches for overlay info and frame counters
18
+ # -----------------------------
19
+ posture_cache = {"landmarks": None, "text": "Initializing...", "counter": 0}
20
+ emotion_cache = {"text": "Initializing...", "counter": 0}
21
+ objects_cache = {"boxes": None, "text": "Initializing...", "counter": 0}
22
+ faces_cache = {"boxes": None, "text": "Initializing...", "counter": 0}
 
 
 
 
 
 
 
 
23
 
24
  # -----------------------------
25
  # Initialize Models and Helpers
26
  # -----------------------------
 
27
  mp_pose = mp.solutions.pose
28
  pose = mp_pose.Pose()
29
  mp_drawing = mp.solutions.drawing_utils
30
 
 
31
  mp_face_detection = mp.solutions.face_detection
32
  face_detection = mp_face_detection.FaceDetection(min_detection_confidence=0.5)
33
 
 
34
  object_detection_model = models.detection.fasterrcnn_resnet50_fpn(
35
  weights=FasterRCNN_ResNet50_FPN_Weights.DEFAULT
36
  )
37
  object_detection_model.eval()
38
  obj_transform = transforms.Compose([transforms.ToTensor()])
39
 
 
40
  emotion_detector = FER(mtcnn=True)
41
 
42
  # -----------------------------
43
+ # Fast Overlay Functions
44
  # -----------------------------
45
+ def draw_posture_overlay(raw_frame, landmarks):
46
+ # Draw each landmark as a small circle
47
+ for (x, y) in landmarks:
48
+ cv2.circle(raw_frame, (x, y), 4, (0, 255, 0), -1)
49
+ return raw_frame
50
+
51
+ def draw_boxes_overlay(raw_frame, boxes, color):
52
+ for (x1, y1, x2, y2) in boxes:
53
+ cv2.rectangle(raw_frame, (x1, y1), (x2, y2), color, 2)
54
+ return raw_frame
55
+
56
+ # -----------------------------
57
+ # Heavy (Synchronous) Detection Functions
58
+ # These functions compute the overlay info on the current frame.
59
+ # -----------------------------
60
+ def compute_posture_overlay(image):
61
  frame = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
62
+ h, w, _ = frame.shape
63
  frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
 
64
  pose_results = pose.process(frame_rgb)
65
  if pose_results.pose_landmarks:
66
+ landmarks = []
67
+ for lm in pose_results.pose_landmarks.landmark:
68
+ landmarks.append((int(lm.x * w), int(lm.y * h)))
 
 
69
  )
70
+ text = "Posture detected"
71
+ else:
72
+ landmarks = []
73
+ text = "No posture detected"
74
+ return landmarks, text
75
 
76
+ def compute_emotion_overlay(image):
77
  frame = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
78
  frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
79
  emotions = emotion_detector.detect_emotions(frame_rgb)
80
  if emotions:
81
  top_emotion, score = max(emotions[0]["emotions"].items(), key=lambda x: x[1])
82
+ text = f"{top_emotion} ({score:.2f})"
83
  else:
84
+ text = "No face detected"
85
+ return text
 
86
 
87
+ def compute_objects_overlay(image):
88
  frame = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
 
89
  frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
90
  image_pil = Image.fromarray(frame_rgb)
91
  img_tensor = obj_transform(image_pil)
92
  with torch.no_grad():
93
  detections = object_detection_model([img_tensor])[0]
94
  threshold = 0.8
95
+ boxes = []
96
+ for box, score in zip(detections["boxes"], detections["scores"]):
97
+ if score > threshold:
98
+ boxes.append(tuple(box.int().cpu().numpy()))
99
+ text = f"Detected {len(boxes)} object(s)" if boxes else "No objects detected"
100
+ return boxes, text
101
+
102
+ def compute_faces_overlay(image):
 
103
  frame = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
104
+ h, w, _ = frame.shape
105
  frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
106
  face_results = face_detection.process(frame_rgb)
107
+ boxes = []
108
  if face_results.detections:
 
 
109
  for detection in face_results.detections:
110
  bbox = detection.location_data.relative_bounding_box
111
  x = int(bbox.xmin * w)
112
  y = int(bbox.ymin * h)
113
  box_w = int(bbox.width * w)
114
  box_h = int(bbox.height * h)
115
+ boxes.append((x, y, x + box_w, y + box_h))
116
+ text = f"Detected {len(boxes)} face(s)"
117
+ else:
118
+ text = "No faces detected"
119
+ return boxes, text
120
 
121
  # -----------------------------
122
+ # Main Analysis Functions (run every frame)
123
+ # They update the cache every SKIP_RATE frames and always return a current frame with overlay.
124
  # -----------------------------
125
+ def analyze_posture_current(image):
126
+ global posture_cache
127
+ posture_cache["counter"] += 1
128
+ current_frame = np.array(image) # raw RGB frame (as numpy array)
129
+ # Update overlay info every SKIP_RATE frames
130
+ if posture_cache["counter"] % SKIP_RATE == 0 or posture_cache["landmarks"] is None:
131
+ landmarks, text = compute_posture_overlay(image)
132
+ posture_cache["landmarks"] = landmarks
133
+ posture_cache["text"] = text
134
+ # Draw cached landmarks on the current frame copy
135
+ output = current_frame.copy()
136
+ if posture_cache["landmarks"]:
137
+ output = draw_posture_overlay(output, posture_cache["landmarks"])
138
+ return output, f"Posture Analysis: {posture_cache['text']}"
139
 
140
+ def analyze_emotion_current(image):
141
+ global emotion_cache
142
+ emotion_cache["counter"] += 1
143
+ current_frame = np.array(image)
144
+ if emotion_cache["counter"] % SKIP_RATE == 0 or emotion_cache["text"] is None:
145
+ text = compute_emotion_overlay(image)
146
+ emotion_cache["text"] = text
147
+ # For emotion, we don't overlay anything; just return the current frame.
148
+ return current_frame, f"Emotion Analysis: {emotion_cache['text']}"
149
 
150
+ def analyze_objects_current(image):
151
+ global objects_cache
152
+ objects_cache["counter"] += 1
153
+ current_frame = np.array(image)
154
+ if objects_cache["counter"] % SKIP_RATE == 0 or objects_cache["boxes"] is None:
155
+ boxes, text = compute_objects_overlay(image)
156
+ objects_cache["boxes"] = boxes
157
+ objects_cache["text"] = text
158
+ output = current_frame.copy()
159
+ if objects_cache["boxes"]:
160
+ output = draw_boxes_overlay(output, objects_cache["boxes"], (255, 255, 0))
161
+ return output, f"Object Detection: {objects_cache['text']}"
162
 
163
+ def analyze_faces_current(image):
164
+ global faces_cache
165
+ faces_cache["counter"] += 1
166
+ current_frame = np.array(image)
167
+ if faces_cache["counter"] % SKIP_RATE == 0 or faces_cache["boxes"] is None:
168
+ boxes, text = compute_faces_overlay(image)
169
+ faces_cache["boxes"] = boxes
170
+ faces_cache["text"] = text
171
+ output = current_frame.copy()
172
+ if faces_cache["boxes"]:
173
+ output = draw_boxes_overlay(output, faces_cache["boxes"], (0, 0, 255))
174
+ return output, f"Face Detection: {faces_cache['text']}"
175
 
176
  # -----------------------------
177
  # Custom CSS for a High-Tech Look (White Font)
 
210
  # Create Individual Interfaces for Each Analysis
211
  # -----------------------------
212
  posture_interface = gr.Interface(
213
+ fn=analyze_posture_current,
214
  inputs=gr.Image(sources=["webcam"], streaming=True, label="Capture Your Posture"),
215
  outputs=[gr.Image(type="numpy", label="Annotated Output"), gr.Textbox(label="Posture Analysis")],
216
  title="Posture Analysis",
 
219
  )
220
 
221
  emotion_interface = gr.Interface(
222
+ fn=analyze_emotion_current,
223
  inputs=gr.Image(sources=["webcam"], streaming=True, label="Capture Your Face"),
224
  outputs=[gr.Image(type="numpy", label="Annotated Output"), gr.Textbox(label="Emotion Analysis")],
225
  title="Emotion Analysis",
 
228
  )
229
 
230
  objects_interface = gr.Interface(
231
+ fn=analyze_objects_current,
232
  inputs=gr.Image(sources=["webcam"], streaming=True, label="Capture the Scene"),
233
  outputs=[gr.Image(type="numpy", label="Annotated Output"), gr.Textbox(label="Object Detection")],
234
  title="Object Detection",
 
237
  )
238
 
239
  faces_interface = gr.Interface(
240
+ fn=analyze_faces_current,
241
  inputs=gr.Image(sources=["webcam"], streaming=True, label="Capture Your Face"),
242
  outputs=[gr.Image(type="numpy", label="Annotated Output"), gr.Textbox(label="Face Detection")],
243
  title="Face Detection",