# PREP DEPENDENCIES from scipy.spatial import distance as dist from imutils import face_utils from threading import Thread import numpy as np import cv2 as cv import imutils import dlib import pygame # Used for playing alarm sounds cross-platform import argparse import os # --- INITIALIZE MODELS AND CONSTANTS --- # Haar cascade classifier for face detection haar_cascade_face_detector = "haarcascade_frontalface_default.xml" face_detector = cv.CascadeClassifier(haar_cascade_face_detector) # Dlib facial landmark detector dlib_facial_landmark_predictor = "shape_predictor_68_face_landmarks.dat" landmark_predictor = dlib.shape_predictor(dlib_facial_landmark_predictor) # Important Variables font = cv.FONT_HERSHEY_SIMPLEX # --- INITIALIZE MODELS AND CONSTANTS --- # Eye Drowsiness Detection EYE_ASPECT_RATIO_THRESHOLD = 0.25 EYE_CLOSED_THRESHOLD = 20 EYE_THRESH_COUNTER = 0 DROWSY_COUNTER = 0 drowsy_alert = False # Mouth Yawn Detection MOUTH_ASPECT_RATIO_THRESHOLD = 0.5 MOUTH_OPEN_THRESHOLD = 15 YAWN_THRESH_COUNTER = 0 YAWN_COUNTER = 0 yawn_alert = False # NEW: Head Not Visible Detection FACE_LOST_THRESHOLD = 25 # Conseq. frames face must be lost to trigger alert FACE_LOST_COUNTER = 0 HEAD_DOWN_COUNTER = 0 # Renaming for clarity head_down_alert = False # --- AUDIO SETUP (using Pygame) --- # pygame.mixer.init() # drowsiness_sound = pygame.mixer.Sound("drowsiness-detected.mp3") # yawn_sound = pygame.mixer.Sound("yawning-detected.mp3") # head_down_sound = pygame.mixer.Sound("dependencies/audio/head-down-detected.mp3") # --- CORE FUNCTIONS --- # def play_alarm(sound_to_play): # if not pygame.mixer.get_busy(): # sound_to_play.play() def generate_alert(final_eye_ratio, final_mouth_ratio): global EYE_THRESH_COUNTER, YAWN_THRESH_COUNTER global drowsy_alert, yawn_alert global DROWSY_COUNTER, YAWN_COUNTER # Drowsiness check if final_eye_ratio < EYE_ASPECT_RATIO_THRESHOLD: EYE_THRESH_COUNTER += 1 if EYE_THRESH_COUNTER >= EYE_CLOSED_THRESHOLD: if not drowsy_alert: DROWSY_COUNTER += 1 drowsy_alert = True # Thread(target=play_alarm, args=(drowsiness_sound,)).start() else: EYE_THRESH_COUNTER = 0 drowsy_alert = False # Yawn check if final_mouth_ratio > MOUTH_ASPECT_RATIO_THRESHOLD: YAWN_THRESH_COUNTER += 1 if YAWN_THRESH_COUNTER >= MOUTH_OPEN_THRESHOLD: if not yawn_alert: YAWN_COUNTER += 1 yawn_alert = True # Thread(target=play_alarm, args=(yawn_sound,)).start() else: YAWN_THRESH_COUNTER = 0 yawn_alert = False def detect_facial_landmarks(x, y, w, h, gray_frame): face = dlib.rectangle(int(x), int(y), int(x + w), int(y + h)) face_landmarks = landmark_predictor(gray_frame, face) face_landmarks = face_utils.shape_to_np(face_landmarks) return face_landmarks def eye_aspect_ratio(eye): A = dist.euclidean(eye[1], eye[5]) B = dist.euclidean(eye[2], eye[4]) C = dist.euclidean(eye[0], eye[3]) ear = (A + B) / (2.0 * C) return ear def final_eye_aspect_ratio(shape): (lStart, lEnd) = face_utils.FACIAL_LANDMARKS_IDXS["left_eye"] (rStart, rEnd) = face_utils.FACIAL_LANDMARKS_IDXS["right_eye"] left_eye = shape[lStart:lEnd] right_eye = shape[rStart:rEnd] left_ear = eye_aspect_ratio(left_eye) right_ear = eye_aspect_ratio(right_eye) final_ear = (left_ear + right_ear) / 2.0 return final_ear, left_eye, right_eye def mouth_aspect_ratio(mouth): A = dist.euclidean(mouth[2], mouth[10]) B = dist.euclidean(mouth[4], mouth[8]) C = dist.euclidean(mouth[0], mouth[6]) mar = (A + B) / (2.0 * C) return mar def final_mouth_aspect_ratio(shape): (mStart, mEnd) = face_utils.FACIAL_LANDMARKS_IDXS["mouth"] mouth = shape[mStart:mEnd] return mouth_aspect_ratio(mouth), mouth def head_pose_ratio(shape): nose_tip = shape[30] chin_tip = shape[8] left_face_corner = shape[0] right_face_corner = shape[16] nose_to_chin_dist = dist.euclidean(nose_tip, chin_tip) face_width = dist.euclidean(left_face_corner, right_face_corner) if face_width == 0: return 0.0 hpr = nose_to_chin_dist / face_width return hpr def reset_counters(): global EYE_THRESH_COUNTER, YAWN_THRESH_COUNTER, FACE_LOST_COUNTER global DROWSY_COUNTER, YAWN_COUNTER, HEAD_DOWN_COUNTER global drowsy_alert, yawn_alert, head_down_alert EYE_THRESH_COUNTER, YAWN_THRESH_COUNTER, FACE_LOST_COUNTER = 0, 0, 0 DROWSY_COUNTER, YAWN_COUNTER, HEAD_DOWN_COUNTER = 0, 0, 0 drowsy_alert, yawn_alert, head_down_alert = False, False, False def process_frame(frame): global FACE_LOST_COUNTER, head_down_alert, HEAD_DOWN_COUNTER frame = imutils.resize(frame, width=640) gray_frame = cv.cvtColor(frame, cv.COLOR_BGR2GRAY) faces = face_detector.detectMultiScale(gray_frame, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30), flags=cv.CASCADE_SCALE_IMAGE) if len(faces) > 0: FACE_LOST_COUNTER = 0 head_down_alert = False (x, y, w, h) = faces[0] face_landmarks = detect_facial_landmarks(x, y, w, h, gray_frame) final_ear, left_eye, right_eye = final_eye_aspect_ratio(face_landmarks) final_mar, mouth = final_mouth_aspect_ratio(face_landmarks) # left_eye_hull, right_eye_hull, mouth_hull = cv.convexHull(left_eye), cv.convexHull(right_eye), cv.convexHull(mouth) # cv.drawContours(frame, [left_eye_hull], -1, (0, 255, 0), 1) # cv.drawContours(frame, [right_eye_hull], -1, (0, 255, 0), 1) # cv.drawContours(frame, [mouth_hull], -1, (0, 255, 0), 1) generate_alert(final_ear, final_mar) cv.putText(frame, f"EAR: {final_ear:.2f}", (10, 30), font, 0.7, (0, 0, 255), 2) cv.putText(frame, f"MAR: {final_mar:.2f}", (10, 60), font, 0.7, (0, 0, 255), 2) else: FACE_LOST_COUNTER += 1 if FACE_LOST_COUNTER >= FACE_LOST_THRESHOLD and not head_down_alert: HEAD_DOWN_COUNTER += 1 head_down_alert = True cv.putText(frame, f"Drowsy: {DROWSY_COUNTER}", (480, 30), font, 0.7, (255, 255, 0), 2) cv.putText(frame, f"Yawn: {YAWN_COUNTER}", (480, 60), font, 0.7, (255, 255, 0), 2) cv.putText(frame, f"Head Down: {HEAD_DOWN_COUNTER}", (480, 90), font, 0.7, (255, 255, 0), 2) if drowsy_alert: cv.putText(frame, "DROWSINESS ALERT!", (150, 30), font, 0.9, (0, 0, 255), 2) if yawn_alert: cv.putText(frame, "YAWN ALERT!", (200, 60), font, 0.9, (0, 0, 255), 2) if head_down_alert: cv.putText(frame, "HEAD NOT VISIBLE!", (180, 90), font, 0.9, (0, 0, 255), 2) return frame def process_video(input_path, output_path=None): reset_counters() video_stream = cv.VideoCapture(input_path) if not video_stream.isOpened(): print(f"Error: Could not open video file {input_path}") return False fps = int(video_stream.get(cv.CAP_PROP_FPS)) width = int(video_stream.get(cv.CAP_PROP_FRAME_WIDTH)) height = int(video_stream.get(cv.CAP_PROP_FRAME_HEIGHT)) print(f"Processing video: {input_path}") print(f"Original Res: {width}x{height}, FPS: {fps}") video_writer = None if output_path: fourcc = cv.VideoWriter_fourcc(*'mp4v') # --- FIX: Calculate correct output dimensions to prevent corruption --- # The process_frame function resizes frames to a fixed width of 640. output_width = 640 # Maintain aspect ratio output_height = int(height * (output_width / float(width))) output_dims = (output_width, output_height) video_writer = cv.VideoWriter(output_path, fourcc, fps, output_dims) print(f"Outputting video with Res: {output_dims[0]}x{output_dims[1]}") while True: ret, frame = video_stream.read() if not ret: break processed_frame = process_frame(frame) if video_writer: video_writer.write(processed_frame) video_stream.release() if video_writer: video_writer.release() print("Video processing complete!") print(f"Final Stats - Drowsy: {DROWSY_COUNTER}, Yawn: {YAWN_COUNTER}, Head Down: {HEAD_DOWN_COUNTER}") return True def run_webcam(): reset_counters() video_stream = cv.VideoCapture(0) if not video_stream.isOpened(): print("Error: Could not open webcam") return False while True: ret, frame = video_stream.read() if not ret: print("Failed to grab frame") break processed_frame = process_frame(frame) cv.imshow("Live Drowsiness and Yawn Detection", processed_frame) if cv.waitKey(1) & 0xFF == ord('q'): break video_stream.release() cv.destroyAllWindows() return True # --- MAIN EXECUTION LOOP --- if __name__ == "__main__": parser = argparse.ArgumentParser(description='Drowsiness Detection System') parser.add_argument('--mode', choices=['webcam', 'video'], default='webcam', help='Mode of operation') parser.add_argument('--input', type=str, help='Input video file path for video mode') parser.add_argument('--output', type=str, help='Output video file path for video mode') args = parser.parse_args() if args.mode == 'webcam': print("Starting webcam detection...") run_webcam() elif args.mode == 'video': if not args.input: print("Error: --input argument is required for video mode.") elif not os.path.exists(args.input): print(f"Error: Input file not found at {args.input}") else: process_video(args.input, args.output)