File size: 12,603 Bytes
36ab56f 6e3fd3f 3b03261 36ab56f 3b03261 36ab56f 6e400f8 36ab56f 3b03261 ac55573 3b03261 36ab56f 6e3fd3f 36ab56f 29901d7 36ab56f 6e400f8 3b03261 b58c110 f7a2ee2 b58c110 f7a2ee2 ac55573 b58c110 ac55573 6e3fd3f ac55573 6e3fd3f ac55573 f7a2ee2 6e3fd3f b58c110 6e3fd3f b58c110 f7a2ee2 3b03261 b58c110 ac55573 6e3fd3f 36ab56f 6e3fd3f ac55573 36ab56f a8054b3 36ab56f 6e3fd3f 36ab56f ac55573 36ab56f b58c110 36ab56f b58c110 36ab56f b58c110 36ab56f ac55573 36ab56f b58c110 36ab56f b58c110 36ab56f ac55573 b58c110 36ab56f b58c110 3b03261 6e3fd3f b58c110 6e3fd3f b58c110 6e3fd3f 2b307a5 6e400f8 b58c110 6e400f8 b58c110 6e400f8 6e3fd3f 6e400f8 36ab56f b58c110 36ab56f 6e400f8 6e3fd3f 36ab56f 6e400f8 36ab56f 6e400f8 36ab56f 6e3fd3f 36ab56f 6e3fd3f 36ab56f b58c110 36ab56f b58c110 36ab56f b58c110 6e400f8 3b03261 6cacbad 6e400f8 36ab56f 6e400f8 36ab56f 6cacbad 36ab56f d3d411e 36ab56f d3d411e 36ab56f 3b03261 b58c110 6e3fd3f b58c110 539cf6e 36ab56f b58c110 a4c4c84 fb742e1 b58c110 6e3fd3f b58c110 36ab56f b58c110 36ab56f b58c110 6cacbad 36ab56f b58c110 36ab56f b58c110 d3d411e 896b496 d3d411e 6cacbad 8bd04e0 6cacbad 339ebae b58c110 36ab56f b58c110 36ab56f b58c110 6cacbad a8054b3 b58c110 6cacbad |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 |
import multiprocessing
import cv2
import gradio as gr
import numpy as np
from PIL import Image, ImageDraw
from ultralytics import YOLO
from ultralytics.utils.plotting import Annotator, colors
import logging
import math
import time
from collections import deque
# Set up logging
logger = logging.getLogger(__name__)
# Global variables to store line coordinates and line equation
start_point = None
end_point = None
line_params = None # Stores (slope, intercept) of the line
# Maximize CPU usage
cpu_cores = multiprocessing.cpu_count()
cv2.setNumThreads(cpu_cores)"OpenCV using {cv2.getNumThreads()} threads out of {cpu_cores} available cores")
def extract_first_frame(stream_url):
Extracts the first available frame from the IP camera stream and returns it as a PIL image.
""""Attempting to extract the first frame from the stream...")
cap = cv2.VideoCapture(stream_url)
if not cap.isOpened():
logger.error("Error: Could not open stream.")
return None, "Error: Could not open stream."
ret, frame =
if not ret:
logger.error("Error: Could not read the first frame.")
return None, "Error: Could not read the first frame."
# Convert the frame to a PIL image
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(frame_rgb)"First frame extracted successfully.")
return pil_image, "First frame extracted successfully."
def update_line(image, evt: gr.SelectData):
Updates the line based on user interaction (click and drag).
global start_point, end_point, line_params
# If it's the first click, set the start point and show it on the image
if start_point is None:
start_point = (evt.index[0], evt.index[1])
# Draw the start point on the image
draw = ImageDraw.Draw(image)
(start_point[0] - 5, start_point[1] - 5, start_point[0] + 5, start_point[1] + 5),
fill="blue", outline="blue"
return image, f"Line Coordinates:\nStart: {start_point}, End: None"
# If it's the second click, set the end point and draw the line
end_point = (evt.index[0], evt.index[1])
# Calculate the slope (m) and intercept (b) of the line: y = mx + b
if start_point[0] != end_point[0]: # Avoid division by zero
slope = (end_point[1] - start_point[1]) / (end_point[0] - start_point[0])
intercept = start_point[1] - slope * start_point[0]
line_params = (slope, intercept, start_point, end_point) # Store slope, intercept, and points
# Vertical line (special case)
line_params = (float('inf'), start_point[0], start_point, end_point)
# Draw the line and end point on the image
draw = ImageDraw.Draw(image)
draw.line([start_point, end_point], fill="red", width=2)
(end_point[0] - 5, end_point[1] - 5, end_point[0] + 5, end_point[1] + 5),
fill="green", outline="green"
# Return the updated image and line info
line_info = f"Line Coordinates:\nStart: {start_point}, End: {end_point}\nLine Equation: y = {line_params[0]:.2f}x + {line_params[1]:.2f}"
# Reset the points for the next interaction
start_point = None
end_point = None
return image, line_info
def reset_line():
Resets the line coordinates.
global start_point, end_point, line_params
start_point = None
end_point = None
line_params = None
return None, "Line reset. Click to draw a new line."
def intersect(A, B, C, D):
Determines if two line segments AB and CD intersect.
def ccw(A, B, C):
return (C[1] - A[1]) * (B[0] - A[0]) - (B[1] - A[1]) * (C[0] - A[0])
def on_segment(A, B, C):
if min(A[0], B[0]) <= C[0] <= max(A[0], B[0]) and min(A[1], B[1]) <= C[1] <= max(A[1], B[1]):
return True
return False
# Check if the line segments intersect
ccw1 = ccw(A, B, C)
ccw2 = ccw(A, B, D)
ccw3 = ccw(C, D, A)
ccw4 = ccw(C, D, B)
if ((ccw1 * ccw2 < 0) and (ccw3 * ccw4 < 0)):
return True
elif ccw1 == 0 and on_segment(A, B, C):
return True
elif ccw2 == 0 and on_segment(A, B, D):
return True
elif ccw3 == 0 and on_segment(C, D, A):
return True
elif ccw4 == 0 and on_segment(C, D, B):
return True
return False
def is_object_crossing_line(box, line_params):
Determines if an object's bounding box is fully intersected by the user-drawn line.
_, _, line_start, line_end = line_params
# Get the bounding box coordinates
x1, y1, x2, y2 = box
# Define the four edges of the bounding box
box_edges = [
((x1, y1), (x2, y1)), # Top edge
((x2, y1), (x2, y2)), # Right edge
((x2, y2), (x1, y2)), # Bottom edge
((x1, y2), (x1, y1)) # Left edge
# Count the number of intersections between the line and the bounding box edges
intersection_count = 0
for edge_start, edge_end in box_edges:
if intersect(line_start, line_end, edge_start, edge_end):
intersection_count += 1
# Only count the object if the line intersects the bounding box at least twice
return intersection_count >= 2
def draw_angled_line(image, line_params, color=(0, 255, 0), thickness=2):
Draws the user-defined line on the frame.
_, _, start_point, end_point = line_params
cv2.line(image, start_point, end_point, color, thickness)
def process_video(confidence_threshold=0.5, selected_classes=None, stream_url=None, target_fps=30, model_name=""):
Processes the IP camera stream to count objects of the selected classes crossing the line.
global line_params
errors = []
if line_params is None:
errors.append("Error: No line drawn. Please draw a line on the first frame.")
if selected_classes is None or len(selected_classes) == 0:
errors.append("Error: No classes selected. Please select at least one class to detect.")
if stream_url is None or stream_url.strip() == "":
errors.append("Error: No stream URL provided.")
if errors:
return None, "\n".join(errors)"Connecting to the IP camera stream...")
cap = cv2.VideoCapture(stream_url)
if not cap.isOpened():
errors.append("Error: Could not open stream.")
return None, "\n".join(errors)
model = YOLO(model=model_name)
crossed_objects = {}
max_tracked_objects = 1000 # Maximum number of objects to track before clearing
# Queue to hold frames for processing
frame_queue = deque(maxlen=10)"Starting to process the stream...")
last_time = time.time()
while cap.isOpened():
ret, frame =
if not ret:
errors.append("Error: Could not read frame from the stream.")
# Add frame to the queue
# Process frames in the queue
if len(frame_queue) > 0:
process_frame = frame_queue.popleft()
# Perform object tracking with confidence threshold
results = model.track(process_frame, persist=True, conf=confidence_threshold)
if results[0] is not None:
track_ids = results[0]
clss = results[0].boxes.cls.cpu().tolist()
boxes = results[0].boxes.xyxy.cpu()
confs = results[0].boxes.conf.cpu().tolist()
for box, cls, t_id, conf in zip(boxes, clss, track_ids, confs):
if conf >= confidence_threshold and model.names[cls] in selected_classes:
# Check if the object crosses the line
if is_object_crossing_line(box, line_params) and t_id not in crossed_objects:
crossed_objects[t_id] = True
# Clear the dictionary if it gets too large
if len(crossed_objects) > max_tracked_objects:
# Visualize the results with bounding boxes, masks, and IDs
annotated_frame = results[0].plot()
# Draw the angled line on the frame
draw_angled_line(annotated_frame, line_params, color=(0, 255, 0), thickness=2)
# Display the count on the frame with a modern look
count = len(crossed_objects)
(text_width, text_height), _ = cv2.getTextSize(f"COUNT: {count}", cv2.FONT_HERSHEY_SIMPLEX, 1, 2)
# Calculate the position for the middle of the top
margin = 10 # Margin from the top
x = (annotated_frame.shape[1] - text_width) // 2 # Center-align the text horizontally
y = text_height + margin # Top-align the text
# Draw the black background rectangle
cv2.rectangle(annotated_frame, (x - margin, y - text_height - margin), (x + text_width + margin, y + margin), (0, 0, 0), -1)
# Draw the text
cv2.putText(annotated_frame, f"COUNT: {count}", (x, y), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
# Yield the annotated frame to Gradio
yield annotated_frame, ""
# Calculate the time taken to process the frame
current_time = time.time()
elapsed_time = current_time - last_time
last_time = current_time
# Calculate the time to sleep to maintain the target FPS
sleep_time = max(0, (1.0 / target_fps) - elapsed_time)
cap.release()"Stream processing completed.")
# Define the Gradio interface
with gr.Blocks() as demo:
gr.Markdown("<h1>Real-time monitoring, object tracking, and line-crossing detection for CCTV camera streams.</h1></center>")
# Step 1: Enter the IP Camera Stream URL
stream_url = gr.Textbox(label="Enter IP Camera Stream URL", value=ip, visible=False)
# Step 1: Extract the first frame from the stream
gr.Markdown("### Step 1: Click on the frame to draw a line, the objects crossing it would be counted in real-time.")
first_frame, status = extract_first_frame(stream_url.value)
if first_frame is None:
gr.Markdown(f"**Error:** {status}")
# Image component for displaying the first frame
image = gr.Image(value=first_frame, label="First Frame of Stream", type="pil")
line_info = gr.Textbox(label="Line Coordinates", value="Line Coordinates:\nStart: None, End: None"), inputs=image, outputs=[image, line_info])
# Step 2: Select classes to detect
gr.Markdown("### Step 2: Select Classes to Detect")
model = YOLO(model="") # Load the model to get class names
class_names = list(model.names.values()) # Get class names
selected_classes = gr.CheckboxGroup(choices=class_names, label="Select Classes to Detect")
# Step 3: Adjust confidence threshold
gr.Markdown("### Step 3: Adjust Confidence Threshold (Optional)")
confidence_threshold = gr.Slider(minimum=0.0, maximum=1.0, value=0.2, label="Confidence Threshold")
# Step 4: Set target FPS
gr.Markdown("### Step 4: Set Target FPS (Optional)")
target_fps = gr.Slider(minimum=1, maximum=120*4, value=60, label="Target FPS")
# Step 5: Select YOLO model
gr.Markdown("### Step 5: Select YOLO Model")
model_name = gr.Dropdown(choices=["", "",""], label="Select YOLO Model", value="")
# Process the stream
process_button = gr.Button("Process Stream")
# Output image for real-time frame rendering
output_image = gr.Image(label="Processed Frame", streaming=True)
# Error box to display warnings/errors
error_box = gr.Textbox(label="Errors/Warnings", interactive=False)
# Event listener for processing the video, inputs=[confidence_threshold, selected_classes, stream_url, target_fps, model_name], outputs=[output_image, error_box])
# Launch the interface
demo.launch(debug=True) |