File size: 9,243 Bytes
1d4559c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 |
# prediction_sequences.py
import tensorflow as tf
import cv2
import numpy as np
import dlib
from imutils import face_utils
import os
import pickle
from collections import deque
import threading
import queue
import time
def load_model(model_path='final_model_sequences.keras'):
"""
Loads the trained model.
Args:
model_path (str): Path to the saved model.
Returns:
tensorflow.keras.Model: Loaded model.
"""
model = tf.keras.models.load_model(model_path)
return model
def get_facial_landmarks(detector, predictor, image):
"""
Detects facial landmarks in an image.
Args:
detector: dlib face detector.
predictor: dlib shape predictor.
image (numpy.ndarray): Input image.
Returns:
dict: Coordinates of eyes and eyebrows.
"""
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
rects = detector(gray, 1)
if len(rects) == 0:
return None # No face detected
# Assuming the first detected face is the target
rect = rects[0]
shape = predictor(gray, rect)
shape = face_utils.shape_to_np(shape)
landmarks = {}
# Define landmarks for left and right eyes and eyebrows
landmarks['left_eye'] = shape[36:42] # Left eye landmarks
landmarks['right_eye'] = shape[42:48] # Right eye landmarks
landmarks['left_eyebrow'] = shape[17:22] # Left eyebrow landmarks
landmarks['right_eyebrow'] = shape[22:27] # Right eyebrow landmarks
return landmarks
def extract_roi(image, landmarks, region='left_eye', padding=5):
"""
Extracts a region of interest (ROI) from the image based on landmarks.
Args:
image (numpy.ndarray): Input image.
landmarks (dict): Facial landmarks.
region (str): Region to extract ('left_eye', 'right_eye', 'left_eyebrow', 'right_eyebrow').
padding (int): Padding around the ROI.
Returns:
numpy.ndarray: Extracted ROI.
"""
points = landmarks.get(region)
if points is None:
return None
# Compute the bounding box
x, y, w, h = cv2.boundingRect(points)
x = max(x - padding, 0)
y = max(y - padding, 0)
w = w + 2 * padding
h = h + 2 * padding
roi = image[y:y+h, x:x+w]
return roi
def preprocess_frame(image, detector, predictor, img_size=(64, 64)):
"""
Preprocesses a single frame: detects landmarks, extracts ROIs, and prepares the input.
Args:
image (numpy.ndarray): Input frame.
detector: dlib face detector.
predictor: dlib shape predictor.
img_size (tuple): Desired image size for ROIs.
Returns:
numpy.ndarray: Preprocessed frame as a concatenated ROI image.
"""
landmarks = get_facial_landmarks(detector, predictor, image)
if landmarks is None:
return None # No face detected
# Extract ROIs for eyes and eyebrows
rois = {}
rois['left_eye'] = extract_roi(image, landmarks, 'left_eye')
rois['right_eye'] = extract_roi(image, landmarks, 'right_eye')
rois['left_eyebrow'] = extract_roi(image, landmarks, 'left_eyebrow')
rois['right_eyebrow'] = extract_roi(image, landmarks, 'right_eyebrow')
# Process ROIs
roi_images = []
for region in ['left_eye', 'right_eye', 'left_eyebrow', 'right_eyebrow']:
roi = rois.get(region)
if roi is not None:
roi = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) # Convert to grayscale
roi = cv2.resize(roi, img_size)
roi = roi.astype('float32') / 255.0 # Normalize to [0,1]
roi = np.expand_dims(roi, axis=-1) # Add channel dimension
roi_images.append(roi)
if len(roi_images) == 0:
return None # No ROIs extracted
# Concatenate ROIs horizontally to form a single image
combined_roi = np.hstack(roi_images)
return combined_roi
def movement_to_text(label_map):
"""
Creates a mapping from class indices to text.
Args:
label_map (dict): Mapping from class names to indices.
Returns:
dict: Mapping from indices to text descriptions.
"""
movement_to_text_map = {
'upward_eyebrow': 'Eyebrow Raised',
'downward_eyebrow': 'Eyebrow Lowered',
'left_eye': 'Left Eye Movement',
'right_eye': 'Right Eye Movement',
# Add more mappings as needed
}
# Create index to text mapping
index_to_text = {}
for cls, idx in label_map.items():
text = movement_to_text_map.get(cls, cls)
index_to_text[idx] = text
return index_to_text
def prediction_worker(model, input_queue, output_queue, max_seq_length):
"""
Worker thread for handling model predictions.
Args:
model (tensorflow.keras.Model): Trained model.
input_queue (queue.Queue): Queue to receive sequences for prediction.
output_queue (queue.Queue): Queue to send prediction results.
max_seq_length (int): Fixed sequence length for the model.
"""
while True:
sequence = input_queue.get()
if sequence is None:
break # Sentinel to stop the thread
# Pad or truncate the sequence to match the model's expected input
if sequence.shape[0] < max_seq_length:
pad_width = max_seq_length - sequence.shape[0]
padding = np.zeros((pad_width, *sequence.shape[1:]), dtype=sequence.dtype)
sequence_padded = np.concatenate((sequence, padding), axis=0)
else:
sequence_padded = sequence[:max_seq_length]
# Expand dimensions to match model input (1, frames, height, width, channels)
sequence_padded = np.expand_dims(sequence_padded, axis=0)
# Perform prediction
prediction = model.predict(sequence_padded)
class_idx = np.argmax(prediction)
confidence = np.max(prediction)
# Put the result in the output queue
output_queue.put((class_idx, confidence))
def main():
# Load the trained model
model = load_model('final_model_sequences.keras')
# Load label map
with open('dataset_sequences.pkl', 'rb') as f:
data = pickle.load(f)
label_map = data['label_map']
index_to_text = movement_to_text(label_map)
# Initialize dlib's face detector and landmark predictor
detector = dlib.get_frontal_face_detector()
predictor_path = 'shape_predictor_68_face_landmarks.dat'
if not os.path.exists(predictor_path):
print(f"Error: {predictor_path} not found. Download it from http://dlib.net/files/shape_predictor_68_face_landmarks.dat.bz2")
return
predictor = dlib.shape_predictor(predictor_path)
# Initialize queues for communication between threads
input_queue = queue.Queue()
output_queue = queue.Queue()
# Define sequence length (number of frames)
max_seq_length = 20 # Adjust based on your training data
# Start the prediction worker thread
pred_thread = threading.Thread(target=prediction_worker, args=(model, input_queue, output_queue, max_seq_length))
pred_thread.daemon = True
pred_thread.start()
# Start video capture
cap = cv2.VideoCapture(0)
if not cap.isOpened():
print("Error: Could not open webcam.")
return
print("Starting real-time prediction. Press 'q' to quit.")
# Initialize a deque to store the sequence of preprocessed frames
frame_buffer = deque(maxlen=max_seq_length)
# Variable to store the latest prediction result
latest_prediction = "Initializing..."
while True:
ret, frame = cap.read()
if not ret:
print("Failed to grab frame.")
break
# Preprocess the current frame
preprocessed_frame = preprocess_frame(frame, detector, predictor, img_size=(64, 64))
if preprocessed_frame is not None:
frame_buffer.append(preprocessed_frame)
else:
# If no face detected, append a zero array to maintain sequence length
frame_buffer.append(np.zeros((64, 256, 1), dtype='float32'))
# If the buffer is full, send the sequence to the prediction thread
if len(frame_buffer) == max_seq_length:
# Convert deque to numpy array
sequence_array = np.array(frame_buffer)
input_queue.put(sequence_array)
# Check if there's a new prediction result
try:
while True:
class_idx, confidence = output_queue.get_nowait()
movement = index_to_text.get(class_idx, "Unknown")
latest_prediction = f"{movement} ({confidence*100:.2f}%)"
except queue.Empty:
pass # No new prediction
# Display the prediction on the frame
cv2.putText(frame, latest_prediction, (30, 30), cv2.FONT_HERSHEY_SIMPLEX,
0.8, (0, 255, 0), 2, cv2.LINE_AA)
# Display the frame
cv2.imshow('Real-time Movement Prediction', frame)
# Exit condition
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# Cleanup
cap.release()
cv2.destroyAllWindows()
# Stop the prediction thread
input_queue.put(None) # Sentinel to stop the thread
pred_thread.join()
if __name__ == "__main__":
main()
|