Spaces:
Running
Running
File size: 8,559 Bytes
6e3fd3f 3b03261 6e400f8 3b03261 ac55573 3b03261 6e400f8 6e3fd3f 6e400f8 29901d7 339ebae 6e400f8 3b03261 b58c110 ac55573 b58c110 ac55573 6e3fd3f ac55573 6e3fd3f ac55573 b58c110 6e3fd3f b58c110 6e3fd3f b58c110 3b03261 b58c110 ac55573 6e3fd3f ac55573 a8054b3 6e400f8 6e3fd3f ac55573 b58c110 6e400f8 b58c110 ac55573 6e400f8 b58c110 ac55573 b58c110 3b03261 6e3fd3f b58c110 6e3fd3f b58c110 6e3fd3f 2b307a5 6e3fd3f b58c110 6e3fd3f 6e400f8 b58c110 6e3fd3f 6e400f8 b58c110 6e400f8 b58c110 6e400f8 6e3fd3f 6e400f8 b58c110 6e400f8 6e3fd3f 6e400f8 ac55573 2eb3e56 6e3fd3f 6e400f8 3b03261 6e3fd3f 6e400f8 6e3fd3f 6e400f8 2eb3e56 6e400f8 b58c110 6e400f8 b58c110 6e400f8 b58c110 6e400f8 3b03261 6e400f8 3b03261 b58c110 6e3fd3f b58c110 6e3fd3f 6e400f8 b58c110 6e3fd3f b58c110 6e400f8 b58c110 6e400f8 b58c110 339ebae b58c110 6e400f8 a8054b3 b58c110 6e3fd3f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
import cv2
import gradio as gr
import numpy as np
from PIL import Image, ImageDraw
from ultralytics import YOLO
import logging
import threading
import queue
import time
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Global variables for line coordinates and line equation
start_point = None
end_point = None
line_params = None # Stores (slope, intercept, start_point, end_point)
# Low-resolution for inference
LOW_RES = (320, 180)
# Frame queue for processed frames
frame_queue = queue.Queue(maxsize=30) # Adjust queue size based on memory constraints
# Thread control flag
processing_active = True
def extract_first_frame(stream_url):
"""
Extracts the first available frame from the IP camera stream and returns it as a PIL image.
"""
logger.info("Attempting to extract the first frame from the stream...")
cap = cv2.VideoCapture(stream_url)
if not cap.isOpened():
logger.error("Error: Could not open stream.")
return None, "Error: Could not open stream."
ret, frame = cap.read()
cap.release()
if not ret:
logger.error("Error: Could not read the first frame.")
return None, "Error: Could not read the first frame."
# Convert the frame to a PIL image
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(frame_rgb)
logger.info("First frame extracted successfully.")
return pil_image, "First frame extracted successfully."
def update_line(image, evt: gr.SelectData):
"""
Updates the line based on user interaction (click and drag).
"""
global start_point, end_point, line_params
if start_point is None:
start_point = (evt.index[0], evt.index[1])
draw = ImageDraw.Draw(image)
draw.ellipse((start_point[0] - 5, start_point[1] - 5, start_point[0] + 5, start_point[1] + 5), fill="blue", outline="blue")
return image, f"Line Coordinates:\nStart: {start_point}, End: None"
end_point = (evt.index[0], evt.index[1])
if start_point[0] != end_point[0]: # Avoid division by zero
slope = (end_point[1] - start_point[1]) / (end_point[0] - start_point[0])
intercept = start_point[1] - slope * start_point[0]
line_params = (slope, intercept, start_point, end_point)
else:
line_params = (float('inf'), start_point[0], start_point, end_point)
draw = ImageDraw.Draw(image)
draw.line([start_point, end_point], fill="red", width=2)
draw.ellipse((end_point[0] - 5, end_point[1] - 5, end_point[0] + 5, end_point[1] + 5), fill="green", outline="green")
line_info = f"Line Coordinates:\nStart: {start_point}, End: {end_point}\nLine Equation: y = {line_params[0]:.2f}x + {line_params[1]:.2f}"
start_point = None
end_point = None
return image, line_info
def reset_line():
"""
Resets the line coordinates.
"""
global start_point, end_point, line_params
start_point = None
end_point = None
line_params = None
return None, "Line reset. Click to draw a new line."
def is_object_crossing_line(box, line_params):
"""
Determines if an object's bounding box is fully intersected by the user-drawn line.
"""
_, _, line_start, line_end = line_params
x1, y1, x2, y2 = box
box_edges = [((x1, y1), (x2, y1)), ((x2, y1), (x2, y2)), ((x2, y2), (x1, y2)), ((x1, y2), (x1, y1))]
intersection_count = 0
for edge_start, edge_end in box_edges:
if intersect(line_start, line_end, edge_start, edge_end):
intersection_count += 1
return intersection_count >= 2
def intersect(A, B, C, D):
"""
Determines if two line segments AB and CD intersect.
"""
def ccw(A, B, C):
return (C[1] - A[1]) * (B[0] - A[0]) - (B[1] - A[1]) * (C[0] - A[0])
def on_segment(A, B, C):
return min(A[0], B[0]) <= C[0] <= max(A[0], B[0]) and min(A[1], B[1]) <= C[1] <= max(A[1], B[1])
ccw1 = ccw(A, B, C)
ccw2 = ccw(A, B, D)
ccw3 = ccw(C, D, A)
ccw4 = ccw(C, D, B)
return ((ccw1 * ccw2 < 0) and (ccw3 * ccw4 < 0)) or (ccw1 == 0 and on_segment(A, B, C)) or (ccw2 == 0 and on_segment(A, B, D)) or (ccw3 == 0 and on_segment(C, D, A)) or (ccw4 == 0 and on_segment(C, D, B))
def process_frames(stream_url, confidence_threshold, selected_classes):
"""
Processes frames in a separate thread and adds them to the frame queue.
"""
global processing_active, frame_queue
cap = cv2.VideoCapture(stream_url)
model = YOLO(model="yolo11n.pt")
crossed_objects = {}
while processing_active and cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# Perform detection on low-res frame
low_res_frame = cv2.resize(frame, LOW_RES)
results = model.track(low_res_frame, persist=True, conf=confidence_threshold)
# Scale bounding boxes to high-res
scale_x = frame.shape[1] / LOW_RES[0]
scale_y = frame.shape[0] / LOW_RES[1]
for detection in results[0].boxes.data:
x1, y1, x2, y2, conf, cls = detection
x1, y1, x2, y2 = int(x1 * scale_x), int(y1 * scale_y), int(x2 * scale_x), int(y2 * scale_y)
if is_object_crossing_line((x1, y1, x2, y2), line_params):
crossed_objects[results[0].boxes.id.int().cpu().tolist()[0]] = True
# Draw bounding boxes and line on the frame
annotated_frame = results[0].plot()
if line_params:
draw_angled_line(annotated_frame, line_params, color=(0, 255, 0), thickness=2)
# Add frame to the queue
if not frame_queue.full():
frame_queue.put(annotated_frame)
cap.release()
def draw_angled_line(image, line_params, color=(0, 255, 0), thickness=2):
"""
Draws the user-defined line on the frame.
"""
_, _, start_point, end_point = line_params
cv2.line(image, start_point, end_point, color, thickness)
def display_frames():
"""
Displays frames from the queue at a consistent frame rate.
"""
while processing_active:
if not frame_queue.empty():
frame = frame_queue.get()
yield cv2.cvtColor(frame, cv2.COLOR_BGR2RGB), ""
else:
time.sleep(0.03) # Wait for the next frame
# Define the Gradio interface
with gr.Blocks() as demo:
gr.Markdown("<h1>Real-time monitoring, object tracking, and line-crossing detection for CCTV camera streams.</h1></center>")
gr.Markdown("## https://github.com/SanshruthR/CCTV_SENTRY_YOLO11")
# Step 1: Enter the IP Camera Stream URL
stream_url = gr.Textbox(label="Enter IP Camera Stream URL", value="https://s104.ipcamlive.com/streams/68idokwtondsqpmkr/stream.m3u8", visible=False)
# Step 1: Extract the first frame from the stream
gr.Markdown("### Step 1: Click on the frame to draw a line, the objects crossing it would be counted in real-time.")
first_frame, status = extract_first_frame(stream_url.value)
if first_frame is None:
gr.Markdown(f"**Error:** {status}")
else:
image = gr.Image(value=first_frame, label="First Frame of Stream", type="pil")
line_info = gr.Textbox(label="Line Coordinates", value="Line Coordinates:\nStart: None, End: None")
image.select(update_line, inputs=image, outputs=[image, line_info])
# Step 2: Select classes to detect
gr.Markdown("### Step 2: Select Classes to Detect")
model = YOLO(model="yolo11n.pt")
class_names = list(model.names.values())
selected_classes = gr.CheckboxGroup(choices=class_names, label="Select Classes to Detect")
# Step 3: Adjust confidence threshold
gr.Markdown("### Step 3: Adjust Confidence Threshold (Optional)")
confidence_threshold = gr.Slider(minimum=0.0, maximum=1.0, value=0.2, label="Confidence Threshold")
# Process the stream
process_button = gr.Button("Process Stream")
output_image = gr.Image(label="Processed Frame", streaming=True)
error_box = gr.Textbox(label="Errors/Warnings", interactive=False)
# Event listener for processing the video
process_button.click(
fn=lambda: (setattr(globals(), "processing_active", True), threading.Thread(target=process_frames, args=(stream_url.value, confidence_threshold.value, selected_classes.value)).start()),
outputs=None
)
# Display frames
demo.load(display_frames, inputs=None, outputs=[output_image, error_box], every=0.03)
# Launch the interface
demo.launch(debug=True) |