|
import asyncio |
|
import sys |
|
|
|
|
|
if sys.platform.startswith('linux') and sys.version_info >= (3, 8): |
|
try: |
|
asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy()) |
|
except Exception: |
|
pass |
|
|
|
import streamlit as st |
|
from PIL import Image |
|
import numpy as np |
|
import subprocess |
|
import time |
|
import tempfile |
|
import os |
|
from ultralytics import YOLO |
|
import cv2 as cv |
|
|
|
|
|
from video_processor import process_video_with_progress |
|
|
|
model_path="best.pt" |
|
|
|
|
|
st.set_page_config( |
|
page_title="Driver Distraction System", |
|
page_icon="π", |
|
layout="wide", |
|
initial_sidebar_state="expanded", |
|
) |
|
|
|
|
|
st.sidebar.title("π Driver Distraction System") |
|
st.sidebar.write("Choose an option below:") |
|
|
|
|
|
page = st.sidebar.radio("Select Feature", [ |
|
"Distraction System", |
|
"Video Drowsiness Detection", |
|
"Real-time Drowsiness Detection" |
|
]) |
|
|
|
|
|
st.sidebar.subheader("Class Names") |
|
class_names = ['drinking', 'hair and makeup', 'operating the radio', 'reaching behind', |
|
'safe driving', 'talking on the phone', 'talking to passenger', 'texting'] |
|
for idx, class_name in enumerate(class_names): |
|
st.sidebar.write(f"{idx}: {class_name}") |
|
|
|
|
|
if page == "Distraction System": |
|
st.title("Driver Distraction System") |
|
st.write("Upload an image or video to detect distractions using YOLO model.") |
|
|
|
|
|
file_type = st.radio("Select file type:", ["Image", "Video"]) |
|
|
|
if file_type == "Image": |
|
uploaded_file = st.file_uploader("Upload Image", type=["jpg", "jpeg", "png"]) |
|
if uploaded_file is not None: |
|
image = Image.open(uploaded_file).convert('RGB') |
|
image_np = np.array(image) |
|
col1, col2 = st.columns([1, 1]) |
|
with col1: |
|
st.subheader("Uploaded Image") |
|
st.image(image, caption="Original Image", use_container_width=True) |
|
with col2: |
|
st.subheader("Detection Results") |
|
model = YOLO(model_path) |
|
start_time = time.time() |
|
results = model(image_np) |
|
end_time = time.time() |
|
prediction_time = end_time - start_time |
|
result = results[0] |
|
if len(result.boxes) > 0: |
|
boxes = result.boxes |
|
confidences = boxes.conf.cpu().numpy() |
|
classes = boxes.cls.cpu().numpy() |
|
class_names_dict = result.names |
|
max_conf_idx = confidences.argmax() |
|
predicted_class = class_names_dict[int(classes[max_conf_idx])] |
|
confidence_score = confidences[max_conf_idx] |
|
st.markdown(f"### Predicted Class: **{predicted_class}**") |
|
st.markdown(f"### Confidence Score: **{confidence_score:.4f}** ({confidence_score*100:.1f}%)") |
|
st.markdown(f"Inference Time: {prediction_time:.2f} seconds") |
|
else: |
|
st.warning("No distractions detected.") |
|
|
|
|
|
elif page == "Real-time Drowsiness Detection": |
|
st.title("π§ Real-time Drowsiness Detection") |
|
st.info("This feature requires a local webcam and will open a new window.") |
|
st.warning("This feature is intended for local use and will not function in the cloud deployment.") |
|
if st.button("Start Drowsiness Detection"): |
|
try: |
|
|
|
subprocess.Popen(["python3", "drowsiness_detection.py", "--mode", "webcam"]) |
|
st.success("Attempted to launch detection window. Please check your desktop.") |
|
except Exception as e: |
|
st.error(f"Failed to start process: {e}") |
|
|
|
|
|
elif page == "Video Drowsiness Detection": |
|
st.title("πΉ Video Drowsiness Detection") |
|
st.write("Upload a video file to detect drowsiness and generate a report.") |
|
uploaded_video = st.file_uploader("Upload Video", type=["mp4", "avi", "mov", "mkv", "webm"]) |
|
|
|
if uploaded_video is not None: |
|
|
|
tfile = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") |
|
tfile.write(uploaded_video.read()) |
|
temp_input_path = tfile.name |
|
temp_output_path = tempfile.mktemp(suffix="_processed.mp4") |
|
|
|
st.subheader("Original Video Preview") |
|
st.video(uploaded_video) |
|
|
|
if st.button("Process Video for Drowsiness Detection"): |
|
progress_bar = st.progress(0, text="Preparing to process video...") |
|
|
|
|
|
def streamlit_progress_callback(current, total): |
|
if total > 0: |
|
percent_complete = int((current / total) * 100) |
|
progress_bar.progress(percent_complete, text=f"Analyzing frame {current}/{total}...") |
|
|
|
try: |
|
with st.spinner("Processing video... This may take a while."): |
|
|
|
|
|
stats = process_video_with_progress( |
|
input_path=temp_input_path, |
|
output_path=temp_output_path, |
|
progress_callback=streamlit_progress_callback |
|
) |
|
|
|
progress_bar.progress(100, text="Video processing completed!") |
|
st.success("Video processed successfully!") |
|
|
|
|
|
st.subheader("Detection Results") |
|
col1, col2, col3 = st.columns(3) |
|
col1.metric("Drowsy Events", stats.get('drowsy_events', 0)) |
|
col2.metric("Yawn Events", stats.get('yawn_events', 0)) |
|
col3.metric("Head Down Events", stats.get('head_down_events', 0)) |
|
|
|
|
|
if os.path.exists(temp_output_path): |
|
with open(temp_output_path, "rb") as file: |
|
video_bytes = file.read() |
|
st.download_button( |
|
label="π₯ Download Processed Video", |
|
data=video_bytes, |
|
file_name=f"drowsiness_detected_{uploaded_video.name}", |
|
mime="video/mp4" |
|
) |
|
except Exception as e: |
|
st.error(f"An error occurred during video processing: {e}") |
|
finally: |
|
|
|
try: |
|
if os.path.exists(temp_input_path): os.unlink(temp_input_path) |
|
if os.path.exists(temp_output_path): os.unlink(temp_output_path) |
|
except Exception as e_clean: |
|
st.warning(f"Failed to clean up temporary files: {e_clean}") |