|
import asyncio |
|
import sys |
|
|
|
|
|
if sys.platform.startswith('linux') and sys.version_info >= (3, 8): |
|
try: |
|
asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy()) |
|
except Exception: |
|
pass |
|
|
|
import streamlit as st |
|
from PIL import Image |
|
import numpy as np |
|
import subprocess |
|
import time |
|
import tempfile |
|
import os |
|
from ultralytics import YOLO |
|
import cv2 as cv |
|
|
|
|
|
from video_processor import process_video_with_progress |
|
|
|
|
|
model_path = "best.pt" |
|
if not os.path.exists(model_path): |
|
st.error(f"Model file '{model_path}' not found. Please ensure it's included in your deployment.") |
|
st.stop() |
|
|
|
|
|
st.set_page_config( |
|
page_title="Driver Distraction System", |
|
page_icon="π", |
|
layout="wide", |
|
initial_sidebar_state="expanded", |
|
) |
|
|
|
|
|
st.sidebar.title("π Driver Distraction System") |
|
st.sidebar.write("Choose an option below:") |
|
|
|
|
|
if os.getenv("SPACE_ID"): |
|
available_features = [ |
|
"Distraction System", |
|
"Video Drowsiness Detection" |
|
] |
|
st.sidebar.info("π‘ Note: Real-time webcam detection is not available in cloud deployment.") |
|
else: |
|
available_features = [ |
|
"Distraction System", |
|
"Video Drowsiness Detection", |
|
"Real-time Drowsiness Detection" |
|
] |
|
|
|
|
|
page = st.sidebar.radio("Select Feature", available_features) |
|
|
|
|
|
st.sidebar.subheader("Class Names") |
|
class_names = ['drinking', 'hair and makeup', 'operating the radio', 'reaching behind', |
|
'safe driving', 'talking on the phone', 'talking to passenger', 'texting'] |
|
for idx, class_name in enumerate(class_names): |
|
st.sidebar.write(f"{idx}: {class_name}") |
|
|
|
|
|
if page == "Distraction System": |
|
st.title("Driver Distraction System") |
|
st.write("Upload an image or video to detect distractions using YOLO model.") |
|
|
|
|
|
file_type = st.radio("Select file type:", ["Image", "Video"]) |
|
|
|
if file_type == "Image": |
|
uploaded_file = st.file_uploader("Upload Image", type=["jpg", "jpeg", "png"]) |
|
if uploaded_file is not None: |
|
try: |
|
image = Image.open(uploaded_file).convert('RGB') |
|
image_np = np.array(image) |
|
col1, col2 = st.columns([1, 1]) |
|
with col1: |
|
st.subheader("Uploaded Image") |
|
st.image(image, caption="Original Image", use_container_width=True) |
|
with col2: |
|
st.subheader("Detection Results") |
|
|
|
|
|
try: |
|
model = YOLO(model_path) |
|
start_time = time.time() |
|
results = model(image_np) |
|
end_time = time.time() |
|
prediction_time = end_time - start_time |
|
|
|
result = results[0] |
|
if len(result.boxes) > 0: |
|
boxes = result.boxes |
|
confidences = boxes.conf.cpu().numpy() |
|
classes = boxes.cls.cpu().numpy() |
|
class_names_dict = result.names |
|
max_conf_idx = confidences.argmax() |
|
predicted_class = class_names_dict[int(classes[max_conf_idx])] |
|
confidence_score = confidences[max_conf_idx] |
|
st.markdown(f"### Predicted Class: **{predicted_class}**") |
|
st.markdown(f"### Confidence Score: **{confidence_score:.4f}** ({confidence_score*100:.1f}%)") |
|
st.markdown(f"Inference Time: {prediction_time:.2f} seconds") |
|
else: |
|
st.warning("No distractions detected.") |
|
except Exception as e: |
|
st.error(f"Error loading or running model: {str(e)}") |
|
st.info("Please ensure the model file 'best.pt' is present and valid.") |
|
except Exception as e: |
|
st.error(f"Error processing image: {str(e)}") |
|
|
|
elif file_type == "Video": |
|
uploaded_file = st.file_uploader("Upload Video", type=["mp4", "avi", "mov", "mkv", "webm"]) |
|
if uploaded_file is not None: |
|
try: |
|
|
|
tfile = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") |
|
tfile.write(uploaded_file.read()) |
|
temp_input_path = tfile.name |
|
temp_output_path = tempfile.mktemp(suffix="_processed.mp4") |
|
|
|
st.subheader("Original Video Preview") |
|
st.video(uploaded_file) |
|
|
|
if st.button("Process Video for Distraction Detection"): |
|
progress_bar = st.progress(0, text="Preparing to process video...") |
|
|
|
try: |
|
model = YOLO(model_path) |
|
cap = cv.VideoCapture(temp_input_path) |
|
total_frames = int(cap.get(cv.CAP_PROP_FRAME_COUNT)) |
|
fps = cap.get(cv.CAP_PROP_FPS) |
|
|
|
|
|
width = int(cap.get(cv.CAP_PROP_FRAME_WIDTH)) |
|
height = int(cap.get(cv.CAP_PROP_FRAME_HEIGHT)) |
|
|
|
|
|
fourcc = cv.VideoWriter_fourcc(*'mp4v') |
|
out = cv.VideoWriter(temp_output_path, fourcc, fps, (width, height)) |
|
|
|
frame_count = 0 |
|
detections = [] |
|
|
|
while True: |
|
ret, frame = cap.read() |
|
if not ret: |
|
break |
|
|
|
frame_count += 1 |
|
|
|
|
|
results = model(frame) |
|
result = results[0] |
|
|
|
|
|
annotated_frame = result.plot() |
|
out.write(annotated_frame) |
|
|
|
|
|
if len(result.boxes) > 0: |
|
boxes = result.boxes |
|
for i in range(len(boxes)): |
|
conf = boxes.conf[i].cpu().numpy() |
|
cls = int(boxes.cls[i].cpu().numpy()) |
|
class_name = result.names[cls] |
|
detections.append({ |
|
'frame': frame_count, |
|
'class': class_name, |
|
'confidence': conf |
|
}) |
|
|
|
|
|
progress = int((frame_count / total_frames) * 100) |
|
progress_bar.progress(progress, text=f"Processing frame {frame_count}/{total_frames}") |
|
|
|
cap.release() |
|
out.release() |
|
|
|
st.success("Video processed successfully!") |
|
|
|
|
|
st.subheader("Detection Results") |
|
if detections: |
|
|
|
class_counts = {} |
|
for det in detections: |
|
class_name = det['class'] |
|
if class_name not in class_counts: |
|
class_counts[class_name] = 0 |
|
class_counts[class_name] += 1 |
|
|
|
|
|
cols = st.columns(len(class_counts)) |
|
for i, (class_name, count) in enumerate(class_counts.items()): |
|
cols[i].metric(class_name.title(), count) |
|
else: |
|
st.info("No distractions detected in the video.") |
|
|
|
|
|
if os.path.exists(temp_output_path): |
|
with open(temp_output_path, "rb") as file: |
|
video_bytes = file.read() |
|
st.download_button( |
|
label="π₯ Download Processed Video", |
|
data=video_bytes, |
|
file_name=f"distraction_detected_{uploaded_file.name}", |
|
mime="video/mp4" |
|
) |
|
|
|
except Exception as e: |
|
st.error(f"Error processing video: {str(e)}") |
|
finally: |
|
|
|
try: |
|
if os.path.exists(temp_input_path): |
|
os.unlink(temp_input_path) |
|
if os.path.exists(temp_output_path): |
|
os.unlink(temp_output_path) |
|
except Exception as e: |
|
st.warning(f"Failed to clean up temporary files: {e}") |
|
|
|
except Exception as e: |
|
st.error(f"Error handling video upload: {str(e)}") |
|
|
|
|
|
elif page == "Real-time Drowsiness Detection": |
|
st.title("π§ Real-time Drowsiness Detection") |
|
|
|
if os.getenv("SPACE_ID"): |
|
st.error("β οΈ Real-time webcam detection is not available in cloud deployment.") |
|
st.info("This feature requires direct access to your camera and only works in local environments.") |
|
st.markdown(""" |
|
**To use this feature:** |
|
1. Download the code to your local machine |
|
2. Install the required dependencies |
|
3. Run the application locally with `streamlit run streamlit_app.py` |
|
""") |
|
else: |
|
st.info("This feature requires a local webcam and will open a new window.") |
|
st.warning("This feature is intended for local use and will not function in cloud deployment.") |
|
if st.button("Start Drowsiness Detection"): |
|
try: |
|
subprocess.Popen(["python3", "drowsiness_detection.py", "--mode", "webcam"]) |
|
st.success("Attempted to launch detection window. Please check your desktop.") |
|
except Exception as e: |
|
st.error(f"Failed to start process: {e}") |
|
|
|
|
|
elif page == "Video Drowsiness Detection": |
|
st.title("πΉ Video Drowsiness Detection") |
|
st.write("Upload a video file to detect drowsiness and generate a report.") |
|
uploaded_video = st.file_uploader("Upload Video", type=["mp4", "avi", "mov", "mkv", "webm"]) |
|
|
|
if uploaded_video is not None: |
|
try: |
|
|
|
tfile = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") |
|
tfile.write(uploaded_video.read()) |
|
temp_input_path = tfile.name |
|
temp_output_path = tempfile.mktemp(suffix="_processed.mp4") |
|
|
|
st.subheader("Original Video Preview") |
|
st.video(uploaded_video) |
|
|
|
if st.button("Process Video for Drowsiness Detection"): |
|
progress_bar = st.progress(0, text="Preparing to process video...") |
|
|
|
|
|
def streamlit_progress_callback(current, total): |
|
if total > 0: |
|
percent_complete = int((current / total) * 100) |
|
progress_bar.progress(percent_complete, text=f"Analyzing frame {current}/{total}...") |
|
|
|
try: |
|
with st.spinner("Processing video... This may take a while."): |
|
|
|
stats = process_video_with_progress( |
|
input_path=temp_input_path, |
|
output_path=temp_output_path, |
|
progress_callback=streamlit_progress_callback |
|
) |
|
|
|
progress_bar.progress(100, text="Video processing completed!") |
|
st.success("Video processed successfully!") |
|
|
|
|
|
st.subheader("Detection Results") |
|
col1, col2, col3 = st.columns(3) |
|
col1.metric("Drowsy Events", stats.get('drowsy_events', 0)) |
|
col2.metric("Yawn Events", stats.get('yawn_events', 0)) |
|
col3.metric("Head Down Events", stats.get('head_down_events', 0)) |
|
|
|
|
|
if os.path.exists(temp_output_path): |
|
with open(temp_output_path, "rb") as file: |
|
video_bytes = file.read() |
|
st.download_button( |
|
label="π₯ Download Processed Video", |
|
data=video_bytes, |
|
file_name=f"drowsiness_detected_{uploaded_video.name}", |
|
mime="video/mp4" |
|
) |
|
except Exception as e: |
|
st.error(f"An error occurred during video processing: {e}") |
|
st.info("Please ensure all required model files are present and the video format is supported.") |
|
finally: |
|
|
|
try: |
|
if os.path.exists(temp_input_path): |
|
os.unlink(temp_input_path) |
|
if os.path.exists(temp_output_path): |
|
os.unlink(temp_output_path) |
|
except Exception as e_clean: |
|
st.warning(f"Failed to clean up temporary files: {e_clean}") |
|
|
|
except Exception as e: |
|
st.error(f"Error handling video upload: {str(e)}") |
|
|
|
|
|
st.sidebar.markdown("---") |
|
st.sidebar.markdown("### π Notes") |
|
st.sidebar.markdown(""" |
|
- **Image Detection**: Upload JPG, PNG images |
|
- **Video Detection**: Upload MP4, AVI, MOV videos |
|
- **Cloud Limitations**: Webcam access not available in cloud deployment |
|
- **Model**: Uses YOLO for distraction detection |
|
""") |