TheKnight115's picture
Update app.py
c6dd643 verified
raw
history blame
21.3 kB
import streamlit as st
import cv2
import numpy as np
import tempfile
import time
from ultralytics import YOLO
from huggingface_hub import hf_hub_download
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
import os
import smtplib
from transformers import AutoModel, AutoProcessor
from PIL import Image, ImageDraw, ImageFont
import re
import torch
# Email credentials
FROM_EMAIL = "[email protected]"
EMAIL_PASSWORD = "cawxqifzqiwjufde" # App-Specific Password
TO_EMAIL = "[email protected]"
SMTP_SERVER = 'smtp.gmail.com'
SMTP_PORT = 465
# Arabic dictionary for converting license plate text
arabic_dict = {
"0": "٠", "1": "١", "2": "٢", "3": "٣", "4": "٤", "5": "٥",
"6": "٦", "7": "٧", "8": "٨", "9": "٩", "A": "ا", "B": "ب",
"J": "ح", "D": "د", "R": "ر", "S": "س", "X": "ص", "T": "ط",
"E": "ع", "G": "ق", "K": "ك", "L": "ل", "Z": "م", "N": "ن",
"H": "ه", "U": "و", "V": "ي", " ": " "
}
# Color mapping for different classes
class_colors = {
0: (0, 255, 0), # Green (Helmet)
1: (255, 0, 0), # Blue (License Plate)
2: (0, 0, 255), # Red (MotorbikeDelivery)
3: (255, 255, 0), # Cyan (MotorbikeSport)
4: (255, 0, 255), # Magenta (No Helmet)
5: (0, 255, 255), # Yellow (Person)
}
# Load the OCR model
processor = AutoProcessor.from_pretrained("stepfun-ai/GOT-OCR2_0", trust_remote_code=True)
model_ocr = AutoModel.from_pretrained("stepfun-ai/GOT-OCR2_0", trust_remote_code=True).to('cuda')
# Define lane area coordinates (example coordinates)
red_lane = np.array([[2, 1583], [1, 1131], [1828, 1141], [1912, 1580]], np.int32)
# YOLO inference function
def run_yolo(image):
results = model(image)
return results
# Function to process YOLO results and draw bounding boxes
def process_results(results, image):
boxes = results[0].boxes
for box in boxes:
x1, y1, x2, y2 = map(int, box.xyxy[0])
conf = box.conf[0]
cls = int(box.cls[0])
label = model.names[cls]
color = class_colors.get(cls, (255, 255, 255))
# Draw rectangle and label
cv2.rectangle(image, (x1, y1), (x2, y2), color, 2)
cv2.putText(image, f"{label} {conf:.2f}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
return image
# Process uploaded images
def process_image(uploaded_file):
image = np.array(cv2.imdecode(np.frombuffer(uploaded_file.read(), np.uint8), 1))
results = run_yolo(image)
processed_image = process_results(results, image)
processed_image_rgb = cv2.cvtColor(processed_image, cv2.COLOR_BGR2RGB)
st.image(processed_image_rgb, caption='Detected Image', use_column_width=True)
# Create a download button for the processed image
im_pil = Image.fromarray(processed_image_rgb)
im_pil.save("processed_image.png")
with open("processed_image.png", "rb") as file:
btn = st.download_button(
label="Download Processed Image",
data=file,
file_name="processed_image.png",
mime="image/png"
)
# Process and save uploaded videos
@st.cache_data
# Define the function to process the video
def process_video_and_save(uploaded_file):
# Path for Arabic font
font_path = "/kaggle/input/fontss/alfont_com_arial-1.ttf"
# Paths for saving violation images
violation_image_path = '/kaggle/working/violation.jpg'
# Track emails already sent to avoid duplicate emails
sent_emails = {}
# Dictionary to track violations per license plate
violations_dict = {}
# Video path (input)
video_path = "/kaggle/working/uploaded_video.mp4" # Save the uploaded video file to this path
with open(video_path, "wb") as f:
f.write(uploaded_file.getbuffer())
cap = cv2.VideoCapture(video_path)
# Check if the video file opened successfully
if not cap.isOpened():
print("Error opening video file")
return None
# Define codec and output video settings
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
output_video_path = '/kaggle/working/output_violation.mp4'
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) # Frame width
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # Frame height
out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
margin_y = 50
# Process the video frame by frame
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break # End of video
# Draw the red lane rectangle on each frame
cv2.polylines(frame, [red_lane], isClosed=True, color=(0, 0, 255), thickness=3) # Red lane
# Perform detection using YOLO on the current frame
results = model.track(frame)
# Process each detection in the results
for box in results[0].boxes:
x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy()) # Bounding box coordinates
label = model.names[int(box.cls)] # Class name (MotorbikeDelivery, Helmet, etc.)
color = (255, 0, 0) # Use a fixed color for bounding boxes
confidence = box.conf[0].item()
# Initialize flags and variables for the violations
helmet_violation = False
lane_violation = False
violation_type = []
# Draw bounding box around detected object
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 3) # 3 is the thickness of the rectangle
# Add label to the box (e.g., 'MotorbikeDelivery')
cv2.putText(frame, f'{label}: {confidence:.2f}', (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
# Detect MotorbikeDelivery
if label == 'MotorbikeDelivery' and confidence >= 0.4:
motorbike_crop = frame[max(0, y1 - margin_y):y2, x1:x2]
delivery_center = ((x1 + x2) // 2, (y2))
in_red_lane = cv2.pointPolygonTest(red_lane, delivery_center, False)
if in_red_lane >= 0:
lane_violation = True
violation_type.append("In Red Lane")
# Perform detection within the cropped motorbike region
sub_results = model(motorbike_crop)
for result in sub_results[0].boxes:
sub_x1, sub_y1, sub_x2, sub_y2 = map(int, result.xyxy[0].cpu().numpy()) # Bounding box coordinates
sub_label = model.names[int(result.cls)]
sub_color = (255, 0, 0) # Red color for the bounding box of sub-objects
# Draw bounding box around sub-detected objects (No_Helmet, License_plate, etc.)
cv2.rectangle(motorbike_crop, (sub_x1, sub_y1), (sub_x2, sub_y2), sub_color, 2)
cv2.putText(motorbike_crop, sub_label, (sub_x1, sub_y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, sub_color, 2)
if sub_label == 'No_Helmet':
helmet_violation = True
violation_type.append("No Helmet")
continue
if sub_label == 'License_plate':
license_crop = motorbike_crop[sub_y1:sub_y2, sub_x1:sub_x2]
# Apply OCR if a violation is detected
if helmet_violation or lane_violation:
# Perform OCR on the license plate
cv2.imwrite(violation_image_path, frame)
license_plate_pil = Image.fromarray(cv2.cvtColor(license_crop, cv2.COLOR_BGR2RGB))
temp_image_path = '/kaggle/working/license_plate.png'
license_plate_pil.save(temp_image_path)
license_plate_text = model_ocr.chat(processor, temp_image_path, ocr_type='ocr')
filtered_text = filter_license_plate_text(license_plate_text)
# Check if the license plate is already detected and saved
if filtered_text:
# Get the email from the database
email = get_vehicle_information(filtered_text, lane_violation, helmet_violation, violation_image_path, "Riyadh")
# Add the license plate and its violations to the violations dictionary
if filtered_text not in violations_dict:
violations_dict[filtered_text] = violation_type #{"1234AB":[no_Helmet,In_red_Lane]}
send_email(filtered_text, violation_image_path, ', '.join(violation_type), email)
else:
# Update the violations for the license plate if new ones are found
current_violations = set(violations_dict[filtered_text]) # no helmet
new_violations = set(violation_type) # red lane, no helmet
updated_violations = list(current_violations | new_violations) # red_lane, no helmet
# If new violations are found, update and send email
if updated_violations != violations_dict[filtered_text]:
violations_dict[filtered_text] = updated_violations
send_email(filtered_text, violation_image_path, ', '.join(updated_violations), email)
# Draw OCR text (English and Arabic) on the original frame
arabic_text = convert_to_arabic(filtered_text)
frame = draw_text_pil(frame, filtered_text, (x1, y2 + 30), font_path, font_size=30, color=(255, 255, 255))
frame = draw_text_pil(frame, arabic_text, (x1, y2 + 60), font_path, font_size=30, color=(0, 255, 0))
# Write the processed frame to the output video
out.write(frame)
# Release resources when done
cap.release()
out.release()
return output_video_path # Return the path of the processed video
# Live video feed processing
def live_video_feed():
stframe = st.empty()
video = cv2.VideoCapture(0)
if not video.isOpened():
st.error("Unable to access the webcam.")
return
while True:
ret, frame = video.read()
if not ret:
st.error("Failed to capture frame.")
break
# Run YOLO on the captured frame
results = run_yolo(frame)
annotated_frame = process_results(results, frame)
annotated_frame_rgb = cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB)
# Display the frame with detections
stframe.image(annotated_frame_rgb, channels="RGB", use_column_width=True)
if st.button("Stop"):
break
video.release()
st.stop()
# Function to filter license plate text
def filter_license_plate_text(license_plate_text):
license_plate_text = re.sub(r'[^A-Z0-9]+', "", license_plate_text)
match = re.search(r'(\d{3,4})\s*([A-Z]{2})', license_plate_text)
return f"{match.group(1)} {match.group(2)}" if match else None
# Function to convert license plate text to Arabic
def convert_to_arabic(license_plate_text):
return "".join(arabic_dict.get(char, char) for char in license_plate_text)
# Function to send email notification with image attachment
def send_email(license_text, violation_image_path, violation_type):
if violation_type == 'no_helmet':
subject = 'تنبيه مخالفة: عدم ارتداء خوذة'
body = f"لعدم ارتداء الخوذة ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة."
elif violation_type == 'in_red_lane':
subject = 'تنبيه مخالفة: دخول المسار الأيسر'
body = f"لدخولها المسار الأيسر ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة."
elif violation_type == 'no_helmet_in_red_lane':
subject = 'تنبيه مخالفة: عدم ارتداء خوذة ودخول المسار الأيسر'
body = f"لعدم ارتداء الخوذة ولدخولها المسار الأيسر ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة."
msg = MIMEMultipart()
msg['From'] = FROM_EMAIL
msg['To'] = TO_EMAIL
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
if os.path.exists(violation_image_path):
with open(violation_image_path, 'rb') as attachment_file:
part = MIMEBase('application', 'octet-stream')
part.set_payload(attachment_file.read())
encoders.encode_base64(part)
part.add_header('Content-Disposition', f'attachment; filename={os.path.basename(violation_image_path)}')
msg.attach(part)
with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT) as server:
server.login(FROM_EMAIL, EMAIL_PASSWORD)
server.sendmail(FROM_EMAIL, TO_EMAIL, msg.as_string())
print("Email with attachment sent successfully!")
def draw_text_pil(img, text, position, font_path, font_size, color):
img_pil = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(img_pil)
try:
font = ImageFont.truetype(font_path, size=font_size)
except IOError:
print(f"Font file not found at {font_path}. Using default font.")
font = ImageFont.load_default()
draw.text(position, text, font=font, fill=color)
img_np = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)
return img_np
import sqlite3
from datetime import datetime
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, String, Integer, Boolean
def get_vehicle_information(detected_license_plate, lane_violation, no_helmet, image_link, city):
# Get current date and time
current_datetime = datetime.now()
current_date = current_datetime.strftime('%Y-%m-%d')
current_time = current_datetime.strftime('%H:%M:%S')
# Connect to SQLite database (motorbike_detections.db)
try:
motorbike_conn = sqlite3.connect('motorbike_detections.db')
motorbike_cursor = motorbike_conn.cursor()
# Check if the detection already exists
check_query = '''
SELECT DetectionID FROM MotorbikeDetections
WHERE LicensePlate = ? AND Date = ? AND Time = ? AND ImageLink = ?
'''
motorbike_cursor.execute(check_query, (detected_license_plate, current_date, current_time, image_link))
existing_detection = motorbike_cursor.fetchone()
if existing_detection:
print(f"Detection for license plate {detected_license_plate} at {current_date} {current_time} already exists.")
else:
# Insert the new detection record
insert_query = '''
INSERT INTO MotorbikeDetections (Date, Time, City, LicensePlate, LaneViolation, NoHelmet, ImageLink)
VALUES (?, ?, ?, ?, ?, ?, ?)
'''
motorbike_cursor.execute(insert_query, (
current_date,
current_time,
city,
detected_license_plate,
int(lane_violation), # Convert boolean to integer (1 or 0)
int(no_helmet), # Convert boolean to integer (1 or 0)
image_link
))
# Commit the transaction
motorbike_conn.commit()
print(f"Detection data for license plate {detected_license_plate} inserted successfully.")
except sqlite3.IntegrityError as e:
print(f"Integrity Error: {e}. This detection may already exist.")
except sqlite3.Error as e:
print(f"An error occurred while inserting detection data: {e}")
motorbike_conn.rollback()
finally:
# Close the motorbike detections database connection
motorbike_conn.close()
# Retrieve email from 'vehicle_information.db'
try:
# Create an engine and session for SQLAlchemy
engine = create_engine('sqlite:///vehicle_information.db')
Session = sessionmaker(bind=engine)
session = Session()
# Query the VehicleInformation table for the detected license plate
vehicle_info = session.query(VehicleInformation).filter_by(license_plate=detected_license_plate).first()
if vehicle_info:
print(f"Email found for license plate {detected_license_plate}: {vehicle_info.email}")
return vehicle_info.email
else:
print(f"No vehicle information found for license plate {detected_license_plate}.")
return None
except Exception as e:
print(f"An error occurred while retrieving vehicle information: {e}")
return None
finally:
# Close the SQLAlchemy session
session.close()
import sqlite3
from datetime import datetime
def setup_motorbike_detections_db():
# Connect to SQLite database (or create it if it doesn't exist)
conn = sqlite3.connect('motorbike_detections.db')
cursor = conn.cursor()
# Drop the old table if it exists, useful for restructuring
cursor.execute('DROP TABLE IF EXISTS MotorbikeDetections')
# Create the new table with a unique constraint on LicensePlate, Date, Time, and ImageLink
cursor.execute('''
CREATE TABLE IF NOT EXISTS MotorbikeDetections (
DetectionID INTEGER PRIMARY KEY AUTOINCREMENT,
Date DATE NOT NULL,
Time TIME NOT NULL,
City VARCHAR(100),
LicensePlate VARCHAR(100),
LaneViolation BOOLEAN NOT NULL,
NoHelmet BOOLEAN NOT NULL,
ImageLink VARCHAR(255),
UNIQUE(LicensePlate, Date, Time, ImageLink)
)
''')
# Commit changes
conn.commit()
# Close the connection
conn.close()
from sqlalchemy import create_engine, Column, String, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# Define the base for model creation
Base = declarative_base()
# Define the VehicleInformation table using SQLAlchemy ORM
class VehicleInformation(Base):
__tablename__ = 'VehicleInformation'
id = Column(Integer, primary_key=True, autoincrement=True)
license_plate = Column(String(100), unique=True, nullable=False)
email = Column(String(255), nullable=False)
phone_number = Column(String(15), nullable=False)
driver_id = Column(String(50), nullable=False) # Government ID or identity number
def setup_database():
# Create an SQLite database (this could be any database like PostgreSQL, MySQL, etc.)
engine = create_engine('sqlite:///vehicle_information.db')
# Drop existing tables and recreate them (for ensuring clean database on rerun)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
# Create a session to interact with the database
Session = sessionmaker(bind=engine)
session = Session()
# Insert some dummy data into the VehicleInformation table
vehicle_data_1 = VehicleInformation(
license_plate='1234 AB',
email='[email protected]',
phone_number='0559947203',
driver_id='ID1110000000'
)
vehicle_data_2 = VehicleInformation(
license_plate='3321 AR',
email='[email protected]',
phone_number='0539003545',
driver_id='ID2220000000'
)
# Add records to the session
session.add(vehicle_data_1)
session.add(vehicle_data_2)
# Commit the records to the database
session.commit()
# Query the table to confirm data
vehicles = session.query(VehicleInformation).all()
# Close the session
session.close()
# Streamlit app main function
def main():
model_file = hf_hub_download(repo_id="TheKnight115/Yolov8m", filename="yolov8_Medium.pt")
global model
model = YOLO(model_file)
st.title("Motorbike Violation Detection")
input_type = st.selectbox("Select Input Type", ("Image", "Video", "Live Feed"))
if input_type == "Image":
uploaded_file = st.file_uploader("Choose an image...", type=["jpg", "jpeg", "png"])
if uploaded_file is not None:
process_image(uploaded_file)
elif input_type == "Video":
uploaded_file = st.file_uploader("Choose a video...", type=["mp4", "mov"])
if uploaded_file is not None:
output_path = process_video_and_save(uploaded_file)
# Now, move the download button here, outside the cached function
with open(output_path, "rb") as video_file:
btn = st.download_button(
label="Download Processed Video",
data=video_file,
file_name="processed_video.mp4",
mime="video/mp4"
)
elif input_type == "Live Feed":
live_video_feed()
if __name__ == "__main__":
main()