Spaces:
Running
Running
# Maximize performance settings | |
import multiprocessing | |
import cv2 | |
# Configure OpenCV for multi-core processing | |
cv2.setNumThreads(multiprocessing.cpu_count()) | |
############## | |
import torch | |
import gradio as gr | |
import numpy as np | |
from PIL import Image, ImageDraw | |
from ultralytics import YOLO | |
import logging | |
import time | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Global variables for line coordinates | |
line_params = None | |
model = None | |
def initialize_yolov11(): | |
"""Initialize YOLOv11 model with error handling""" | |
global model | |
try: | |
model = YOLO('yolov11n.pt') # Make sure this model file exists | |
if torch.cuda.is_available(): | |
model.to('cuda') | |
logger.info("YOLOv11 initialized with CUDA acceleration") | |
else: | |
logger.info("YOLOv11 initialized with CPU") | |
return True | |
except Exception as e: | |
logger.error(f"Model initialization failed: {str(e)}") | |
return False | |
def extract_first_frame(stream_url): | |
"""Robust frame extraction with retries""" | |
for _ in range(3): # Retry up to 3 times | |
cap = cv2.VideoCapture(stream_url) | |
if cap.isOpened(): | |
ret, frame = cap.read() | |
cap.release() | |
if ret: | |
return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB), "First frame extracted" | |
time.sleep(1) # Wait before retry | |
return None, "Error: Failed to capture initial frame" | |
def update_line(image, evt: gr.SelectData): | |
"""Optimized line drawing with validation""" | |
global line_params | |
if not hasattr(image, 'points'): | |
image.points = [] | |
if len(image.points) < 2: | |
image.points.append((evt.index[0], evt.index[1])) | |
draw = ImageDraw.Draw(image) | |
color = "blue" if len(image.points) == 1 else "green" | |
draw.ellipse([evt.index[0]-5, evt.index[1]-5, evt.index[0]+5, evt.index[1]+5], | |
fill=color, outline=color) | |
if len(image.points) == 2: | |
x1, y1 = image.points[0] | |
x2, y2 = image.points[1] | |
draw = ImageDraw.Draw(image) | |
draw.line([(x1,y1), (x2,y2)], fill="red", width=2) | |
# Store line parameters | |
if x2 - x1 != 0: | |
slope = (y2 - y1) / (x2 - x1) | |
intercept = y1 - slope * x1 | |
else: | |
slope = float('inf') | |
intercept = x1 | |
line_params = (slope, intercept, (x1,y1), (x2,y2)) | |
status = f"Points: {len(image.points)}/2" if len(image.points) < 2 else "Line set!" | |
return image, status | |
def line_intersection(box, line): | |
"""Fast line-box intersection using vector math""" | |
(m, b, (x1,y1), (x2,y2)) = line | |
box_x1, box_y1, box_x2, box_y2 = box | |
# Convert line to parametric form | |
dx = x2 - x1 | |
dy = y2 - y1 | |
# Check box edges | |
t0 = 0.0 | |
t1 = 1.0 | |
for edge in [0, 1]: # Check both x and y axes | |
if edge == 0: # X-axis boundaries | |
dir = dx | |
p = box_x1 - x1 | |
q = box_x2 - x1 | |
else: # Y-axis boundaries | |
dir = dy | |
p = box_y1 - y1 | |
q = box_y2 - y1 | |
if dir == 0: | |
if p > 0 or q < 0: return False | |
continue | |
t_near = p / dir | |
t_far = q / dir | |
if t_near > t_far: t_near, t_far = t_far, t_near | |
t0 = max(t0, t_near) | |
t1 = min(t1, t_far) | |
if t0 > t1: return False | |
return t0 <= 1 and t1 >= 0 | |
def process_stream(conf_thresh, classes, stream_url): | |
"""Optimized video processing pipeline""" | |
if not model: | |
yield None, "Model not initialized" | |
return | |
if not line_params: | |
yield None, "No detection line set" | |
return | |
cap = cv2.VideoCapture(stream_url) | |
if not cap.isOpened(): | |
yield None, "Failed to open video stream" | |
return | |
tracker = {} # {track_id: last_seen} | |
crossed = set() | |
frame_skip = 2 # Process every 2nd frame | |
count = 0 | |
while True: | |
ret, frame = cap.read() | |
if not ret: | |
break | |
count += 1 | |
if count % frame_skip != 0: | |
continue | |
# Detection | |
results = model.track( | |
frame, | |
persist=True, | |
conf=conf_thresh, | |
classes=classes, | |
verbose=False, | |
device='cuda' if torch.cuda.is_available() else 'cpu' | |
) | |
# Processing | |
if results[0].boxes.id is not None: | |
boxes = results[0].boxes.xyxy.cpu().numpy() | |
ids = results[0].boxes.id.int().cpu().numpy() | |
scores = results[0].boxes.conf.cpu().numpy() | |
labels = results[0].boxes.cls.cpu().numpy() | |
for box, track_id, score, label in zip(boxes, ids, scores, labels): | |
if line_intersection(box, line_params) and track_id not in crossed: | |
crossed.add(track_id) | |
if len(crossed) > 1000: | |
crossed.clear() | |
# Annotation | |
annotated = results[0].plot() | |
cv2.line(annotated, line_params[2], line_params[3], (0,255,0), 2) | |
cv2.putText(annotated, f"Count: {len(crossed)}", (10,30), | |
cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0), 2) | |
yield cv2.cvtColor(annotated, cv2.COLOR_BGR2RGB), "" | |
cap.release() | |
# Gradio Interface | |
with gr.Blocks() as app: | |
gr.Markdown("# CCTV Smart Monitor - YOLOv11") | |
# Initialization | |
if not initialize_yolov11(): | |
gr.Markdown("**Error**: Failed to initialize YOLOv11 model") | |
# Stream URL input | |
stream_url = gr.Textbox( | |
label="RTSP Stream URL", | |
value="rtsp://example.com/stream", | |
visible=True | |
) | |
# Frame setup | |
with gr.Row(): | |
frame = gr.Image(label="Setup Frame", interactive=True) | |
line_status = gr.Textbox(label="Line Status", interactive=False) | |
# Controls | |
with gr.Row(): | |
class_selector = gr.CheckboxGroup( | |
choices=model.names.values() if model else [], | |
label="Detection Classes" | |
) | |
confidence = gr.Slider(0.1, 1.0, value=0.4, label="Confidence Threshold") | |
# Output | |
output_video = gr.Image(label="Live Analysis", streaming=True) | |
error_box = gr.Textbox(label="System Messages", interactive=False) | |
# Interactions | |
frame.select( | |
update_line, | |
inputs=frame, | |
outputs=[frame, line_status] | |
) | |
gr.Button("Start Analysis").click( | |
process_stream, | |
inputs=[confidence, class_selector, stream_url], | |
outputs=[output_video, error_box] | |
) | |
if __name__ == "__main__": | |
app.launch(debug=True, enable_queue=True) |