Spaces:
Sleeping
Sleeping
# Maximize CPU usage | |
import multiprocessing | |
import cv2 | |
# Get the number of CPU cores | |
cpu_cores = multiprocessing.cpu_count() | |
# Set OpenCV to use all available cores | |
cv2.setNumThreads(cpu_cores) | |
# Print the number of threads being used (optional) | |
print(f"OpenCV using {cv2.getNumThreads()} threads out of {cpu_cores} available cores") | |
############## | |
import cv2 | |
import gradio as gr | |
import numpy as np | |
from PIL import Image, ImageDraw | |
from ultralytics import YOLO | |
from ultralytics.utils.plotting import Annotator, colors | |
import logging | |
import math | |
import torch | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Global variables to store line coordinates and line equation | |
start_point = None | |
end_point = None | |
line_params = None # Stores (start_point, end_point) | |
# Load model once globally | |
model = YOLO("yolo11n.pt") | |
device = 'cuda' if torch.cuda.is_available() else 'cpu' | |
model = model.to(device) | |
def liang_barsky(line, bbox): | |
"""Optimized line-rectangle intersection check using Liang-Barsky algorithm""" | |
x1, y1 = line[0] | |
x2, y2 = line[1] | |
xmin, ymin, xmax, ymax = bbox | |
dx = x2 - x1 | |
dy = y2 - y1 | |
p = [-dx, dx, -dy, dy] | |
q = [x1 - xmin, xmax - x1, y1 - ymin, ymax - y1] | |
u1 = 0.0 | |
u2 = 1.0 | |
for i in range(4): | |
if p[i] == 0: | |
if q[i] < 0: | |
return False | |
continue | |
t = q[i] / p[i] | |
if p[i] < 0: | |
if t > u1: | |
u1 = t | |
else: | |
if t < u2: | |
u2 = t | |
return u1 <= u2 | |
def extract_first_frame(stream_url): | |
"""Extracts the first available frame from the IP camera stream""" | |
logger.info("Extracting first frame...") | |
cap = cv2.VideoCapture(stream_url) | |
if not cap.isOpened(): | |
return None, "Error: Could not open stream." | |
ret, frame = cap.read() | |
cap.release() | |
if not ret: | |
return None, "Error: Could not read frame." | |
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
return Image.fromarray(frame_rgb), "First frame extracted successfully." | |
def update_line(image, evt: gr.SelectData): | |
"""Handles line drawing interactions""" | |
global start_point, end_point, line_params | |
if start_point is None: | |
start_point = (evt.index[0], evt.index[1]) | |
draw = ImageDraw.Draw(image) | |
draw.ellipse((start_point[0]-5, start_point[1]-5, start_point[0]+5, start_point[1]+5), | |
fill="blue", outline="blue") | |
return image, f"Line Coordinates:\nStart: {start_point}, End: None" | |
end_point = (evt.index[0], evt.index[1]) | |
line_params = (start_point, end_point) | |
draw = ImageDraw.Draw(image) | |
draw.line([start_point, end_point], fill="red", width=2) | |
draw.ellipse((end_point[0]-5, end_point[1]-5, end_point[0]+5, end_point[1]+5), | |
fill="green", outline="green") | |
start_point = None | |
return image, f"Line Coordinates:\nStart: {line_params[0]}, End: {line_params[1]}" | |
def reset_line(): | |
"""Resets line coordinates""" | |
global start_point, end_point, line_params | |
start_point = end_point = line_params = None | |
return None, "Line reset. Click to draw a new line." | |
def is_object_crossing_line(box, line_params): | |
"""Optimized line crossing check using Liang-Barsky algorithm""" | |
if not line_params: | |
return False | |
line_start, line_end = line_params | |
x1, y1, x2, y2 = box | |
return liang_barsky((line_start, line_end), (x1, y1, x2, y2)) | |
def draw_angled_line(image, line_params, color=(0, 255, 0), thickness=2): | |
"""Draws the user-defined line on the frame""" | |
start, end = line_params | |
cv2.line(image, start, end, color, thickness) | |
def process_video(confidence_threshold=0.5, selected_classes=None, stream_url=None): | |
"""Main video processing function with optimizations""" | |
global line_params | |
errors = [] | |
if not line_params: | |
errors.append("Error: No line drawn.") | |
if not selected_classes: | |
errors.append("Error: No classes selected.") | |
if not stream_url: | |
errors.append("Error: No stream URL provided.") | |
if errors: | |
return None, "\n".join(errors) | |
# Convert class names to indices once | |
selected_class_indices = {i for i, name in model.names.items() if name in selected_classes} | |
cap = cv2.VideoCapture(stream_url) | |
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # Reduce buffer size | |
if not cap.isOpened(): | |
return None, "Error: Could not open stream." | |
crossed_objects = {} | |
max_tracked_objects = 1000 | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
break | |
# Optimized inference | |
results = model.track( | |
frame, | |
persist=True, | |
conf=confidence_threshold, | |
half=True, | |
device=device, | |
verbose=False | |
) | |
if results[0].boxes.id is not None: | |
boxes = results[0].boxes | |
track_ids = boxes.id.int().cpu().tolist() | |
clss = boxes.cls.cpu().tolist() | |
for box, cls, t_id in zip(boxes.xyxy.cpu(), clss, track_ids): | |
if cls in selected_class_indices and t_id not in crossed_objects: | |
if is_object_crossing_line(box.numpy(), line_params): | |
crossed_objects[t_id] = True | |
if len(crossed_objects) > max_tracked_objects: | |
crossed_objects.clear() | |
# Visualization | |
annotated_frame = results[0].plot() | |
draw_angled_line(annotated_frame, line_params) | |
# Draw count | |
count = len(crossed_objects) | |
(w, h), _ = cv2.getTextSize(f"COUNT: {count}", cv2.FONT_HERSHEY_SIMPLEX, 1, 2) | |
cv2.rectangle(annotated_frame, (10, 10), (20 + w, 40 + h), (0, 0, 0), -1) | |
cv2.putText(annotated_frame, f"COUNT: {count}", (20, 40), | |
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) | |
yield annotated_frame, "" | |
cap.release() | |
# Gradio interface remains unchanged | |
with gr.Blocks() as demo: | |
gr.Markdown("<h1>Real-time monitoring, object tracking, and line-crossing detection for CCTV camera streams.</h1>") | |
gr.Markdown("## https://github.com/SanshruthR/CCTV_SENTRY_YOLO11") | |
stream_url = gr.Textbox( | |
label="IP Camera Stream URL", | |
value="https://s104.ipcamlive.com/streams/68idokwtondsqpmkr/stream.m3u8", | |
visible=False | |
) | |
# First frame extraction | |
first_frame, status = extract_first_frame(stream_url.value) | |
image = gr.Image(value=first_frame, label="First Frame", type="pil") if first_frame else gr.Markdown(f"**Error:** {status}") | |
line_info = gr.Textbox(label="Line Coordinates", value="Line Coordinates:\nStart: None, End: None") | |
image.select(update_line, inputs=image, outputs=[image, line_info]) | |
# Class selection | |
class_names = list(model.names.values()) | |
selected_classes = gr.CheckboxGroup(choices=class_names, label="Select Classes to Detect") | |
# Confidence threshold | |
confidence_threshold = gr.Slider(0.0, 1.0, value=0.2, label="Confidence Threshold") | |
# Process button | |
process_button = gr.Button("Process Stream") | |
output_image = gr.Image(label="Processed Frame", streaming=True) | |
error_box = gr.Textbox(label="Errors/Warnings", interactive=False) | |
process_button.click( | |
process_video, | |
inputs=[confidence_threshold, selected_classes, stream_url], | |
outputs=[output_image, error_box] | |
) | |
demo.launch(debug=True) |