File size: 14,826 Bytes
eaa4e30
 
7d35b07
486b028
 
4a7ddd0
ad414f5
4a7ddd0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
eaa4e30
d69ce69
759bcfc
 
 
 
 
 
 
 
 
4a7ddd0
 
 
d69ce69
26f78a4
 
4a7ddd0
 
d69ce69
 
 
 
4a7ddd0
 
9ec5726
4a7ddd0
9ec5726
4a7ddd0
 
 
 
 
 
 
 
 
9ec5726
 
3f85f8f
4a7ddd0
d69ce69
 
 
 
 
 
 
37663c7
 
 
 
 
 
 
 
 
 
4a7ddd0
 
d69ce69
26f78a4
d69ce69
26f78a4
5a983b3
d69ce69
26f78a4
5a983b3
d69ce69
26f78a4
 
92dd333
26f78a4
 
d69ce69
5316aad
5a983b3
 
5316aad
 
26f78a4
 
92dd333
26f78a4
5316aad
26f78a4
5316aad
26f78a4
 
5316aad
26f78a4
 
5316aad
 
26f78a4
8be0123
b3b5e5e
 
5316aad
26f78a4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f0a70b1
26f78a4
 
 
 
c0507c7
26f78a4
 
 
c0507c7
26f78a4
 
 
 
 
 
 
 
 
c0507c7
26f78a4
 
 
 
 
 
 
 
 
 
 
8be0123
5316aad
 
c6dd643
26f78a4
d69ce69
8be0123
37663c7
4a7ddd0
759bcfc
4a7ddd0
 
4de3665
 
 
 
759bcfc
 
 
 
4de3665
759bcfc
d69ce69
4de3665
759bcfc
4de3665
 
d69ce69
4de3665
 
5469b05
759bcfc
 
 
 
4a7ddd0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
26f78a4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4a7ddd0
 
7d35b07
6e47efd
d69ce69
6e47efd
d69ce69
7d35b07
eaa4e30
759bcfc
7d35b07
759bcfc
 
 
d69ce69
7d35b07
759bcfc
 
 
d69ce69
4de3665
 
 
 
 
 
 
 
 
d69ce69
759bcfc
 
7d35b07
d69ce69
7d35b07
5469b05
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
import streamlit as st
import cv2
import numpy as np
import tempfile
import time
from ultralytics import YOLO
from huggingface_hub import hf_hub_download
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
import os
import smtplib
from transformers import AutoModel, AutoProcessor
from PIL import Image, ImageDraw, ImageFont
import re
import torch

# Email credentials
FROM_EMAIL = "[email protected]"
EMAIL_PASSWORD = "cawxqifzqiwjufde"  # App-Specific Password
TO_EMAIL = "[email protected]"
SMTP_SERVER = 'smtp.gmail.com'
SMTP_PORT = 465

# Arabic dictionary for converting license plate text
arabic_dict = {
    "0": "٠", "1": "١", "2": "٢", "3": "٣", "4": "٤", "5": "٥",
    "6": "٦", "7": "٧", "8": "٨", "9": "٩", "A": "ا", "B": "ب",
    "J": "ح", "D": "د", "R": "ر", "S": "س", "X": "ص", "T": "ط",
    "E": "ع", "G": "ق", "K": "ك", "L": "ل", "Z": "م", "N": "ن",
    "H": "ه", "U": "و", "V": "ي", " ": " "
}

# Color mapping for different classes
class_colors = {
    0: (0, 255, 0),    # Green (Helmet)
    1: (255, 0, 0),    # Blue (License Plate)
    2: (0, 0, 255),    # Red (MotorbikeDelivery)
    3: (255, 255, 0),  # Cyan (MotorbikeSport)
    4: (255, 0, 255),  # Magenta (No Helmet)
    5: (0, 255, 255),  # Yellow (Person)
}

# Load the OCR model
processor = AutoProcessor.from_pretrained("stepfun-ai/GOT-OCR2_0", trust_remote_code=True)
model_ocr = AutoModel.from_pretrained("stepfun-ai/GOT-OCR2_0", trust_remote_code=True).to('cuda')

# Define lane area coordinates (example coordinates)
red_lane = np.array([[2, 1583], [1, 1131], [1828, 1141], [1912, 1580]], np.int32)

# YOLO inference function
def run_yolo(image):
    results = model(image)
    return results


# Function to process YOLO results and draw bounding boxes
def process_results(results, image):
    boxes = results[0].boxes
    for box in boxes:
        x1, y1, x2, y2 = map(int, box.xyxy[0])
        conf = box.conf[0]
        cls = int(box.cls[0])
        label = model.names[cls]
        color = class_colors.get(cls, (255, 255, 255))

        # Draw rectangle and label
        cv2.rectangle(image, (x1, y1), (x2, y2), color, 2)
        cv2.putText(image, f"{label} {conf:.2f}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
    return image


# Process uploaded images
def process_image(uploaded_file):
    image = np.array(cv2.imdecode(np.frombuffer(uploaded_file.read(), np.uint8), 1))
    results = run_yolo(image)
    processed_image = process_results(results, image)
    processed_image_rgb = cv2.cvtColor(processed_image, cv2.COLOR_BGR2RGB)
    st.image(processed_image_rgb, caption='Detected Image', use_column_width=True)

    # Create a download button for the processed image
    im_pil = Image.fromarray(processed_image_rgb)
    im_pil.save("processed_image.png")
    with open("processed_image.png", "rb") as file:
        btn = st.download_button(
            label="Download Processed Image",
            data=file,
            file_name="processed_image.png",
            mime="image/png"
        )

# Process and save uploaded videos
@st.cache_data
# Define the function to process the video
def process_video_and_save(uploaded_file):
    # Path for Arabic font
    font_path = "alfont_com_arial-1.ttf"

    # Paths for saving violation images
    violation_image_path = 'violation.jpg'

    # Track emails already sent to avoid duplicate emails
    sent_emails = {}

    # Dictionary to track violations per license plate
    violations_dict = {}

    # Paths for saving violation images and videos
    video_path = "uploaded_video.mp4"
    output_video_path = 'output_violation.mp4'
    
    # Save the uploaded video file to this path
    with open(video_path, "wb") as f:
        f.write(uploaded_file.getbuffer())

    cap = cv2.VideoCapture(video_path)
    
    if not cap.isOpened():
        st.error("Error opening video file.")
        return None

    # Codec and output settings
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

    margin_y = 50

    # Process frames
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break  # End of video

        # Draw the red lane rectangle on each frame
        cv2.polylines(frame, [red_lane], isClosed=True, color=(0, 0, 255), thickness=3)  # Red lane

        # Perform detection using YOLO on the current frame
        results = model.track(frame)
        
        # Process each detection in the results
        for box in results[0].boxes:
            x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy())  # Bounding box coordinates
            label = model.names[int(box.cls)]  # Class name (MotorbikeDelivery, Helmet, etc.)
            color = (255, 0, 0)  # Use a fixed color for bounding boxes
            confidence = box.conf[0].item()

            # Initialize flags and variables for the violations
            helmet_violation = False
            lane_violation = False
            violation_type = []

            # Draw bounding box around detected object
            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 3)  # 3 is the thickness of the rectangle

            # Add label to the box (e.g., 'MotorbikeDelivery')
            cv2.putText(frame, f'{label}: {confidence:.2f}', (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

            # Detect MotorbikeDelivery
            if label == 'MotorbikeDelivery' and confidence >= 0.4:
                motorbike_crop = frame[max(0, y1 - margin_y):y2, x1:x2]
                delivery_center = ((x1 + x2) // 2, (y2))
                in_red_lane = cv2.pointPolygonTest(red_lane, delivery_center, False)
                if in_red_lane >= 0:
                    lane_violation = True
                    violation_type.append("In Red Lane")

                # Perform detection within the cropped motorbike region
                sub_results = model(motorbike_crop)

                for result in sub_results[0].boxes:
                    sub_x1, sub_y1, sub_x2, sub_y2 = map(int, result.xyxy[0].cpu().numpy())  # Bounding box coordinates
                    sub_label = model.names[int(result.cls)]
                    sub_color = (255, 0, 0)  # Red color for the bounding box of sub-objects

                    # Draw bounding box around sub-detected objects (No_Helmet, License_plate, etc.)
                    cv2.rectangle(motorbike_crop, (sub_x1, sub_y1), (sub_x2, sub_y2), sub_color, 2)
                    cv2.putText(motorbike_crop, sub_label, (sub_x1, sub_y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, sub_color, 2)

                    if sub_label == 'No_Helmet':
                        helmet_violation = True
                        violation_type.append("No Helmet")
                        continue
                    if sub_label == 'License_plate':
                        license_crop = motorbike_crop[sub_y1:sub_y2, sub_x1:sub_x2]

                        # Apply OCR if a violation is detected
                        if helmet_violation or lane_violation:
                            # Perform OCR on the license plate
                            cv2.imwrite(violation_image_path, frame)
                            license_plate_pil = Image.fromarray(cv2.cvtColor(license_crop, cv2.COLOR_BGR2RGB))
                            temp_image_path = 'license_plate.png'
                            license_plate_pil.save(temp_image_path)
                            license_plate_text = model_ocr.chat(processor, temp_image_path, ocr_type='ocr')
                            filtered_text = filter_license_plate_text(license_plate_text)
                            # Check if the license plate is already detected and saved
                            if filtered_text:                            
                                # Add the license plate and its violations to the violations dictionary
                                if filtered_text not in violations_dict:
                                    violations_dict[filtered_text] = violation_type  #{"1234AB":[no_Helmet,In_red_Lane]}
                                    send_email(filtered_text, violation_image_path, ', '.join(violation_type))
                                else:
                                    # Update the violations for the license plate if new ones are found
                                    current_violations = set(violations_dict[filtered_text])  # no helmet
                                    new_violations = set(violation_type)  # red lane, no helmet
                                    updated_violations = list(current_violations | new_violations)  # red_lane, no helmet

                                    # If new violations are found, update and send email
                                    if updated_violations != violations_dict[filtered_text]:
                                        violations_dict[filtered_text] = updated_violations
                                        send_email(filtered_text, violation_image_path, ', '.join(updated_violations))

                            # Draw OCR text (English and Arabic) on the original frame
                            arabic_text = convert_to_arabic(filtered_text)
                            frame = draw_text_pil(frame, filtered_text, (x1, y2 + 30), font_path, font_size=30, color=(255, 255, 255))
                            frame = draw_text_pil(frame, arabic_text, (x1, y2 + 60), font_path, font_size=30, color=(0, 255, 0))

        # Write the processed frame to the output video
        out.write(frame)

    # Release resources when done
    cap.release()
    out.release()
    if not os.path.exists(output_video_path):
        st.error("Error: Processed video was not created.")
    return output_video_path  # Return the path of the processed video




# Live video feed processing
def live_video_feed():
    stframe = st.empty()
    video = cv2.VideoCapture(0)

    if not video.isOpened():
        st.error("Unable to access the webcam.")
        return

    while True:
        ret, frame = video.read()
        if not ret:
            st.error("Failed to capture frame.")
            break

        # Run YOLO on the captured frame
        results = run_yolo(frame)
        annotated_frame = process_results(results, frame)
        annotated_frame_rgb = cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB)

        # Display the frame with detections
        stframe.image(annotated_frame_rgb, channels="RGB", use_column_width=True)

        if st.button("Stop"):
            break

    video.release()
    st.stop()


# Function to filter license plate text
def filter_license_plate_text(license_plate_text):
    license_plate_text = re.sub(r'[^A-Z0-9]+', "", license_plate_text)
    match = re.search(r'(\d{3,4})\s*([A-Z]{2})', license_plate_text)
    return f"{match.group(1)} {match.group(2)}" if match else None


# Function to convert license plate text to Arabic
def convert_to_arabic(license_plate_text):
    return "".join(arabic_dict.get(char, char) for char in license_plate_text)


# Function to send email notification with image attachment
def send_email(license_text, violation_image_path, violation_type):
    if violation_type == 'no_helmet':
        subject = 'تنبيه مخالفة: عدم ارتداء خوذة'
        body = f"لعدم ارتداء الخوذة ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة."
    elif violation_type == 'in_red_lane':
        subject = 'تنبيه مخالفة: دخول المسار الأيسر'
        body = f"لدخولها المسار الأيسر ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة."
    elif violation_type == 'no_helmet_in_red_lane':
        subject = 'تنبيه مخالفة: عدم ارتداء خوذة ودخول المسار الأيسر'
        body = f"لعدم ارتداء الخوذة ولدخولها المسار الأيسر ({license_text}) تم تغريم دراجة نارية التي تحمل لوحة."

    msg = MIMEMultipart()
    msg['From'] = FROM_EMAIL
    msg['To'] = TO_EMAIL
    msg['Subject'] = subject
    msg.attach(MIMEText(body, 'plain'))

    if os.path.exists(violation_image_path):
        with open(violation_image_path, 'rb') as attachment_file:
            part = MIMEBase('application', 'octet-stream')
            part.set_payload(attachment_file.read())
            encoders.encode_base64(part)
            part.add_header('Content-Disposition', f'attachment; filename={os.path.basename(violation_image_path)}')
            msg.attach(part)

    with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT) as server:
        server.login(FROM_EMAIL, EMAIL_PASSWORD)
        server.sendmail(FROM_EMAIL, TO_EMAIL, msg.as_string())
        print("Email with attachment sent successfully!")

def draw_text_pil(img, text, position, font_path, font_size, color):
    img_pil = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
    
    draw = ImageDraw.Draw(img_pil)
    
    try:
        font = ImageFont.truetype(font_path, size=font_size)
    except IOError:
        print(f"Font file not found at {font_path}. Using default font.")
        font = ImageFont.load_default()
        
    draw.text(position, text, font=font, fill=color)
    
    img_np = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)
    return img_np


# Streamlit app main function
def main():
    model_file = hf_hub_download(repo_id="TheKnight115/Yolov8m", filename="yolov8_Medium.pt")
    global model
    model = YOLO(model_file)

    st.title("Motorbike Violation Detection")

    input_type = st.selectbox("Select Input Type", ("Image", "Video", "Live Feed"))

    if input_type == "Image":
        uploaded_file = st.file_uploader("Choose an image...", type=["jpg", "jpeg", "png"])
        if uploaded_file is not None:
            process_image(uploaded_file)

    elif input_type == "Video":
        uploaded_file = st.file_uploader("Choose a video...", type=["mp4", "mov"])
        if uploaded_file is not None:
            output_path = process_video_and_save(uploaded_file)

            # Now, move the download button here, outside the cached function
            with open(output_path, "rb") as video_file:
                btn = st.download_button(
                    label="Download Processed Video",
                    data=video_file,
                    file_name="processed_video.mp4",
                    mime="video/mp4"
                )

    elif input_type == "Live Feed":
        live_video_feed()


if __name__ == "__main__":
    main()