File size: 8,559 Bytes
6e3fd3f
3b03261
 
 
 
 
6e400f8
 
 
3b03261
ac55573
3b03261
 
 
6e400f8
6e3fd3f
 
6e400f8
29901d7
339ebae
 
 
6e400f8
 
 
 
 
 
3b03261
b58c110
 
 
 
ac55573
 
b58c110
ac55573
6e3fd3f
ac55573
 
6e3fd3f
ac55573
b58c110
 
6e3fd3f
b58c110
6e3fd3f
b58c110
 
 
 
3b03261
 
b58c110
 
 
ac55573
6e3fd3f
 
ac55573
a8054b3
6e400f8
6e3fd3f
 
ac55573
b58c110
 
 
6e400f8
b58c110
 
 
ac55573
 
6e400f8
b58c110
 
ac55573
b58c110
 
3b03261
6e3fd3f
b58c110
 
 
6e3fd3f
b58c110
 
 
6e3fd3f
2b307a5
6e3fd3f
b58c110
 
 
 
6e3fd3f
6e400f8
b58c110
 
 
 
 
6e3fd3f
6e400f8
b58c110
6e400f8
b58c110
6e400f8
 
6e3fd3f
6e400f8
 
b58c110
6e400f8
 
 
 
 
6e3fd3f
6e400f8
 
 
 
 
ac55573
2eb3e56
 
6e3fd3f
6e400f8
3b03261
 
 
6e3fd3f
6e400f8
 
 
6e3fd3f
6e400f8
 
 
 
 
 
 
 
2eb3e56
6e400f8
 
 
 
b58c110
6e400f8
 
 
b58c110
6e400f8
b58c110
6e400f8
 
 
 
 
 
3b03261
6e400f8
 
 
 
 
 
 
 
 
 
3b03261
b58c110
6e3fd3f
b58c110
6e3fd3f
6e400f8
b58c110
 
 
 
 
6e3fd3f
b58c110
 
 
 
 
 
 
 
 
6e400f8
 
b58c110
 
6e400f8
b58c110
 
 
339ebae
b58c110
 
 
 
 
6e400f8
 
 
 
 
 
 
a8054b3
b58c110
6e3fd3f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
import cv2
import gradio as gr
import numpy as np
from PIL import Image, ImageDraw
from ultralytics import YOLO
import logging
import threading
import queue
import time

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Global variables for line coordinates and line equation
start_point = None
end_point = None
line_params = None  # Stores (slope, intercept, start_point, end_point)

# Low-resolution for inference
LOW_RES = (320, 180)

# Frame queue for processed frames
frame_queue = queue.Queue(maxsize=30)  # Adjust queue size based on memory constraints

# Thread control flag
processing_active = True

def extract_first_frame(stream_url):
    """
    Extracts the first available frame from the IP camera stream and returns it as a PIL image.
    """
    logger.info("Attempting to extract the first frame from the stream...")
    cap = cv2.VideoCapture(stream_url)
    if not cap.isOpened():
        logger.error("Error: Could not open stream.")
        return None, "Error: Could not open stream."

    ret, frame = cap.read()
    cap.release()

    if not ret:
        logger.error("Error: Could not read the first frame.")
        return None, "Error: Could not read the first frame."

    # Convert the frame to a PIL image
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    pil_image = Image.fromarray(frame_rgb)

    logger.info("First frame extracted successfully.")
    return pil_image, "First frame extracted successfully."

def update_line(image, evt: gr.SelectData):
    """
    Updates the line based on user interaction (click and drag).
    """
    global start_point, end_point, line_params

    if start_point is None:
        start_point = (evt.index[0], evt.index[1])
        draw = ImageDraw.Draw(image)
        draw.ellipse((start_point[0] - 5, start_point[1] - 5, start_point[0] + 5, start_point[1] + 5), fill="blue", outline="blue")
        return image, f"Line Coordinates:\nStart: {start_point}, End: None"

    end_point = (evt.index[0], evt.index[1])
    if start_point[0] != end_point[0]:  # Avoid division by zero
        slope = (end_point[1] - start_point[1]) / (end_point[0] - start_point[0])
        intercept = start_point[1] - slope * start_point[0]
        line_params = (slope, intercept, start_point, end_point)
    else:
        line_params = (float('inf'), start_point[0], start_point, end_point)

    draw = ImageDraw.Draw(image)
    draw.line([start_point, end_point], fill="red", width=2)
    draw.ellipse((end_point[0] - 5, end_point[1] - 5, end_point[0] + 5, end_point[1] + 5), fill="green", outline="green")

    line_info = f"Line Coordinates:\nStart: {start_point}, End: {end_point}\nLine Equation: y = {line_params[0]:.2f}x + {line_params[1]:.2f}"
    start_point = None
    end_point = None
    return image, line_info

def reset_line():
    """
    Resets the line coordinates.
    """
    global start_point, end_point, line_params
    start_point = None
    end_point = None
    line_params = None
    return None, "Line reset. Click to draw a new line."

def is_object_crossing_line(box, line_params):
    """
    Determines if an object's bounding box is fully intersected by the user-drawn line.
    """
    _, _, line_start, line_end = line_params
    x1, y1, x2, y2 = box
    box_edges = [((x1, y1), (x2, y1)), ((x2, y1), (x2, y2)), ((x2, y2), (x1, y2)), ((x1, y2), (x1, y1))]
    intersection_count = 0
    for edge_start, edge_end in box_edges:
        if intersect(line_start, line_end, edge_start, edge_end):
            intersection_count += 1
    return intersection_count >= 2

def intersect(A, B, C, D):
    """
    Determines if two line segments AB and CD intersect.
    """
    def ccw(A, B, C):
        return (C[1] - A[1]) * (B[0] - A[0]) - (B[1] - A[1]) * (C[0] - A[0])

    def on_segment(A, B, C):
        return min(A[0], B[0]) <= C[0] <= max(A[0], B[0]) and min(A[1], B[1]) <= C[1] <= max(A[1], B[1])

    ccw1 = ccw(A, B, C)
    ccw2 = ccw(A, B, D)
    ccw3 = ccw(C, D, A)
    ccw4 = ccw(C, D, B)
    return ((ccw1 * ccw2 < 0) and (ccw3 * ccw4 < 0)) or (ccw1 == 0 and on_segment(A, B, C)) or (ccw2 == 0 and on_segment(A, B, D)) or (ccw3 == 0 and on_segment(C, D, A)) or (ccw4 == 0 and on_segment(C, D, B))

def process_frames(stream_url, confidence_threshold, selected_classes):
    """
    Processes frames in a separate thread and adds them to the frame queue.
    """
    global processing_active, frame_queue
    cap = cv2.VideoCapture(stream_url)
    model = YOLO(model="yolo11n.pt")
    crossed_objects = {}

    while processing_active and cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Perform detection on low-res frame
        low_res_frame = cv2.resize(frame, LOW_RES)
        results = model.track(low_res_frame, persist=True, conf=confidence_threshold)

        # Scale bounding boxes to high-res
        scale_x = frame.shape[1] / LOW_RES[0]
        scale_y = frame.shape[0] / LOW_RES[1]
        for detection in results[0].boxes.data:
            x1, y1, x2, y2, conf, cls = detection
            x1, y1, x2, y2 = int(x1 * scale_x), int(y1 * scale_y), int(x2 * scale_x), int(y2 * scale_y)
            if is_object_crossing_line((x1, y1, x2, y2), line_params):
                crossed_objects[results[0].boxes.id.int().cpu().tolist()[0]] = True

        # Draw bounding boxes and line on the frame
        annotated_frame = results[0].plot()
        if line_params:
            draw_angled_line(annotated_frame, line_params, color=(0, 255, 0), thickness=2)

        # Add frame to the queue
        if not frame_queue.full():
            frame_queue.put(annotated_frame)

    cap.release()

def draw_angled_line(image, line_params, color=(0, 255, 0), thickness=2):
    """
    Draws the user-defined line on the frame.
    """
    _, _, start_point, end_point = line_params
    cv2.line(image, start_point, end_point, color, thickness)

def display_frames():
    """
    Displays frames from the queue at a consistent frame rate.
    """
    while processing_active:
        if not frame_queue.empty():
            frame = frame_queue.get()
            yield cv2.cvtColor(frame, cv2.COLOR_BGR2RGB), ""
        else:
            time.sleep(0.03)  # Wait for the next frame

# Define the Gradio interface
with gr.Blocks() as demo:
    gr.Markdown("<h1>Real-time monitoring, object tracking, and line-crossing detection for CCTV camera streams.</h1></center>")
    gr.Markdown("## https://github.com/SanshruthR/CCTV_SENTRY_YOLO11")

    # Step 1: Enter the IP Camera Stream URL
    stream_url = gr.Textbox(label="Enter IP Camera Stream URL", value="https://s104.ipcamlive.com/streams/68idokwtondsqpmkr/stream.m3u8", visible=False)

    # Step 1: Extract the first frame from the stream
    gr.Markdown("### Step 1: Click on the frame to draw a line, the objects crossing it would be counted in real-time.")
    first_frame, status = extract_first_frame(stream_url.value)
    if first_frame is None:
        gr.Markdown(f"**Error:** {status}")
    else:
        image = gr.Image(value=first_frame, label="First Frame of Stream", type="pil")
        line_info = gr.Textbox(label="Line Coordinates", value="Line Coordinates:\nStart: None, End: None")
        image.select(update_line, inputs=image, outputs=[image, line_info])

        # Step 2: Select classes to detect
        gr.Markdown("### Step 2: Select Classes to Detect")
        model = YOLO(model="yolo11n.pt")
        class_names = list(model.names.values())
        selected_classes = gr.CheckboxGroup(choices=class_names, label="Select Classes to Detect")

        # Step 3: Adjust confidence threshold
        gr.Markdown("### Step 3: Adjust Confidence Threshold (Optional)")
        confidence_threshold = gr.Slider(minimum=0.0, maximum=1.0, value=0.2, label="Confidence Threshold")

        # Process the stream
        process_button = gr.Button("Process Stream")
        output_image = gr.Image(label="Processed Frame", streaming=True)
        error_box = gr.Textbox(label="Errors/Warnings", interactive=False)

        # Event listener for processing the video
        process_button.click(
            fn=lambda: (setattr(globals(), "processing_active", True), threading.Thread(target=process_frames, args=(stream_url.value, confidence_threshold.value, selected_classes.value)).start()),
            outputs=None
        )

        # Display frames
        demo.load(display_frames, inputs=None, outputs=[output_image, error_box], every=0.03)

# Launch the interface
demo.launch(debug=True)