# Maximize performance settings import multiprocessing import cv2 # Configure OpenCV for multi-core processing cv2.setNumThreads(multiprocessing.cpu_count()) ############## import torch import gradio as gr import numpy as np from PIL import Image, ImageDraw from ultralytics import YOLO import logging import time # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Global variables for line coordinates line_params = None model = None def initialize_yolov11(): """Initialize YOLOv11 model with error handling""" global model try: model = YOLO('yolov11n.pt') # Make sure this model file exists if torch.cuda.is_available(): model.to('cuda') logger.info("YOLOv11 initialized with CUDA acceleration") else: logger.info("YOLOv11 initialized with CPU") return True except Exception as e: logger.error(f"Model initialization failed: {str(e)}") return False def extract_first_frame(stream_url): """Robust frame extraction with retries""" for _ in range(3): # Retry up to 3 times cap = cv2.VideoCapture(stream_url) if cap.isOpened(): ret, frame = cap.read() cap.release() if ret: return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB), "First frame extracted" time.sleep(1) # Wait before retry return None, "Error: Failed to capture initial frame" def update_line(image, evt: gr.SelectData): """Optimized line drawing with validation""" global line_params if not hasattr(image, 'points'): image.points = [] if len(image.points) < 2: image.points.append((evt.index[0], evt.index[1])) draw = ImageDraw.Draw(image) color = "blue" if len(image.points) == 1 else "green" draw.ellipse([evt.index[0]-5, evt.index[1]-5, evt.index[0]+5, evt.index[1]+5], fill=color, outline=color) if len(image.points) == 2: x1, y1 = image.points[0] x2, y2 = image.points[1] draw = ImageDraw.Draw(image) draw.line([(x1,y1), (x2,y2)], fill="red", width=2) # Store line parameters if x2 - x1 != 0: slope = (y2 - y1) / (x2 - x1) intercept = y1 - slope * x1 else: slope = float('inf') intercept = x1 line_params = (slope, intercept, (x1,y1), (x2,y2)) status = f"Points: {len(image.points)}/2" if len(image.points) < 2 else "Line set!" return image, status def line_intersection(box, line): """Fast line-box intersection using vector math""" (m, b, (x1,y1), (x2,y2)) = line box_x1, box_y1, box_x2, box_y2 = box # Convert line to parametric form dx = x2 - x1 dy = y2 - y1 # Check box edges t0 = 0.0 t1 = 1.0 for edge in [0, 1]: # Check both x and y axes if edge == 0: # X-axis boundaries dir = dx p = box_x1 - x1 q = box_x2 - x1 else: # Y-axis boundaries dir = dy p = box_y1 - y1 q = box_y2 - y1 if dir == 0: if p > 0 or q < 0: return False continue t_near = p / dir t_far = q / dir if t_near > t_far: t_near, t_far = t_far, t_near t0 = max(t0, t_near) t1 = min(t1, t_far) if t0 > t1: return False return t0 <= 1 and t1 >= 0 def process_stream(conf_thresh, classes, stream_url): """Optimized video processing pipeline""" if not model: yield None, "Model not initialized" return if not line_params: yield None, "No detection line set" return cap = cv2.VideoCapture(stream_url) if not cap.isOpened(): yield None, "Failed to open video stream" return tracker = {} # {track_id: last_seen} crossed = set() frame_skip = 2 # Process every 2nd frame count = 0 while True: ret, frame = cap.read() if not ret: break count += 1 if count % frame_skip != 0: continue # Detection results = model.track( frame, persist=True, conf=conf_thresh, classes=classes, verbose=False, device='cuda' if torch.cuda.is_available() else 'cpu' ) # Processing if results[0].boxes.id is not None: boxes = results[0].boxes.xyxy.cpu().numpy() ids = results[0].boxes.id.int().cpu().numpy() scores = results[0].boxes.conf.cpu().numpy() labels = results[0].boxes.cls.cpu().numpy() for box, track_id, score, label in zip(boxes, ids, scores, labels): if line_intersection(box, line_params) and track_id not in crossed: crossed.add(track_id) if len(crossed) > 1000: crossed.clear() # Annotation annotated = results[0].plot() cv2.line(annotated, line_params[2], line_params[3], (0,255,0), 2) cv2.putText(annotated, f"Count: {len(crossed)}", (10,30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0), 2) yield cv2.cvtColor(annotated, cv2.COLOR_BGR2RGB), "" cap.release() # Gradio Interface with gr.Blocks() as app: gr.Markdown("# CCTV Smart Monitor - YOLOv11") # Initialization if not initialize_yolov11(): gr.Markdown("**Error**: Failed to initialize YOLOv11 model") # Stream URL input stream_url = gr.Textbox( label="RTSP Stream URL", value="rtsp://example.com/stream", visible=True ) # Frame setup with gr.Row(): frame = gr.Image(label="Setup Frame", interactive=True) line_status = gr.Textbox(label="Line Status", interactive=False) # Controls with gr.Row(): class_selector = gr.CheckboxGroup( choices=model.names.values() if model else [], label="Detection Classes" ) confidence = gr.Slider(0.1, 1.0, value=0.4, label="Confidence Threshold") # Output output_video = gr.Image(label="Live Analysis", streaming=True) error_box = gr.Textbox(label="System Messages", interactive=False) # Interactions frame.select( update_line, inputs=frame, outputs=[frame, line_status] ) gr.Button("Start Analysis").click( process_stream, inputs=[confidence, class_selector, stream_url], outputs=[output_video, error_box] ) if __name__ == "__main__": app.launch(debug=True, enable_queue=True)