# Maximize CPU usage import multiprocessing import cv2 # Get the number of CPU cores cpu_cores = multiprocessing.cpu_count() # Set OpenCV to use all available cores cv2.setNumThreads(cpu_cores) # Print the number of threads being used (optional) print(f"OpenCV using {cv2.getNumThreads()} threads out of {cpu_cores} available cores") ############## import cv2 import gradio as gr import numpy as np from PIL import Image, ImageDraw from ultralytics import YOLO from ultralytics.utils.plotting import Annotator, colors import logging import math import torch # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Global variables to store line coordinates and line equation start_point = None end_point = None line_params = None # Stores (start_point, end_point) # Load model once globally model = YOLO("yolo11n.pt") device = 'cuda' if torch.cuda.is_available() else 'cpu' model = model.to(device) def liang_barsky(line, bbox): """Optimized line-rectangle intersection check using Liang-Barsky algorithm""" x1, y1 = line[0] x2, y2 = line[1] xmin, ymin, xmax, ymax = bbox dx = x2 - x1 dy = y2 - y1 p = [-dx, dx, -dy, dy] q = [x1 - xmin, xmax - x1, y1 - ymin, ymax - y1] u1 = 0.0 u2 = 1.0 for i in range(4): if p[i] == 0: if q[i] < 0: return False continue t = q[i] / p[i] if p[i] < 0: if t > u1: u1 = t else: if t < u2: u2 = t return u1 <= u2 def extract_first_frame(stream_url): """Extracts the first available frame from the IP camera stream""" logger.info("Extracting first frame...") cap = cv2.VideoCapture(stream_url) if not cap.isOpened(): return None, "Error: Could not open stream." ret, frame = cap.read() cap.release() if not ret: return None, "Error: Could not read frame." frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) return Image.fromarray(frame_rgb), "First frame extracted successfully." def update_line(image, evt: gr.SelectData): """Handles line drawing interactions""" global start_point, end_point, line_params if start_point is None: start_point = (evt.index[0], evt.index[1]) draw = ImageDraw.Draw(image) draw.ellipse((start_point[0]-5, start_point[1]-5, start_point[0]+5, start_point[1]+5), fill="blue", outline="blue") return image, f"Line Coordinates:\nStart: {start_point}, End: None" end_point = (evt.index[0], evt.index[1]) line_params = (start_point, end_point) draw = ImageDraw.Draw(image) draw.line([start_point, end_point], fill="red", width=2) draw.ellipse((end_point[0]-5, end_point[1]-5, end_point[0]+5, end_point[1]+5), fill="green", outline="green") start_point = None return image, f"Line Coordinates:\nStart: {line_params[0]}, End: {line_params[1]}" def reset_line(): """Resets line coordinates""" global start_point, end_point, line_params start_point = end_point = line_params = None return None, "Line reset. Click to draw a new line." def is_object_crossing_line(box, line_params): """Optimized line crossing check using Liang-Barsky algorithm""" if not line_params: return False line_start, line_end = line_params x1, y1, x2, y2 = box return liang_barsky((line_start, line_end), (x1, y1, x2, y2)) def draw_angled_line(image, line_params, color=(0, 255, 0), thickness=2): """Draws the user-defined line on the frame""" start, end = line_params cv2.line(image, start, end, color, thickness) def process_video(confidence_threshold=0.5, selected_classes=None, stream_url=None): """Main video processing function with optimizations""" global line_params errors = [] if not line_params: errors.append("Error: No line drawn.") if not selected_classes: errors.append("Error: No classes selected.") if not stream_url: errors.append("Error: No stream URL provided.") if errors: return None, "\n".join(errors) # Convert class names to indices once selected_class_indices = {i for i, name in model.names.items() if name in selected_classes} cap = cv2.VideoCapture(stream_url) cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # Reduce buffer size if not cap.isOpened(): return None, "Error: Could not open stream." crossed_objects = {} max_tracked_objects = 1000 while cap.isOpened(): ret, frame = cap.read() if not ret: break # Optimized inference results = model.track( frame, persist=True, conf=confidence_threshold, half=True, device=device, verbose=False ) if results[0].boxes.id is not None: boxes = results[0].boxes track_ids = boxes.id.int().cpu().tolist() clss = boxes.cls.cpu().tolist() for box, cls, t_id in zip(boxes.xyxy.cpu(), clss, track_ids): if cls in selected_class_indices and t_id not in crossed_objects: if is_object_crossing_line(box.numpy(), line_params): crossed_objects[t_id] = True if len(crossed_objects) > max_tracked_objects: crossed_objects.clear() # Visualization annotated_frame = results[0].plot() draw_angled_line(annotated_frame, line_params) # Draw count count = len(crossed_objects) (w, h), _ = cv2.getTextSize(f"COUNT: {count}", cv2.FONT_HERSHEY_SIMPLEX, 1, 2) cv2.rectangle(annotated_frame, (10, 10), (20 + w, 40 + h), (0, 0, 0), -1) cv2.putText(annotated_frame, f"COUNT: {count}", (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) yield annotated_frame, "" cap.release() # Gradio interface remains unchanged with gr.Blocks() as demo: gr.Markdown("

Real-time monitoring, object tracking, and line-crossing detection for CCTV camera streams.

") gr.Markdown("## https://github.com/SanshruthR/CCTV_SENTRY_YOLO11") stream_url = gr.Textbox( label="IP Camera Stream URL", value="https://s104.ipcamlive.com/streams/68idokwtondsqpmkr/stream.m3u8", visible=False ) # First frame extraction first_frame, status = extract_first_frame(stream_url.value) image = gr.Image(value=first_frame, label="First Frame", type="pil") if first_frame else gr.Markdown(f"**Error:** {status}") line_info = gr.Textbox(label="Line Coordinates", value="Line Coordinates:\nStart: None, End: None") image.select(update_line, inputs=image, outputs=[image, line_info]) # Class selection class_names = list(model.names.values()) selected_classes = gr.CheckboxGroup(choices=class_names, label="Select Classes to Detect") # Confidence threshold confidence_threshold = gr.Slider(0.0, 1.0, value=0.2, label="Confidence Threshold") # Process button process_button = gr.Button("Process Stream") output_image = gr.Image(label="Processed Frame", streaming=True) error_box = gr.Textbox(label="Errors/Warnings", interactive=False) process_button.click( process_video, inputs=[confidence_threshold, selected_classes, stream_url], outputs=[output_image, error_box] ) demo.launch(debug=True)