Spaces:
Sleeping
Sleeping
File size: 6,842 Bytes
a8054b3 e93dd75 a8054b3 e93dd75 29901d7 3b03261 a8054b3 3b03261 a8054b3 3b03261 a8054b3 29901d7 3b03261 a8054b3 3b03261 a8054b3 3b03261 a8054b3 3b03261 a8054b3 29901d7 a8054b3 29901d7 a8054b3 29901d7 a8054b3 29901d7 a8054b3 29901d7 a8054b3 29901d7 a8054b3 2b307a5 a8054b3 29901d7 3b03261 a8054b3 29901d7 a8054b3 3b03261 a8054b3 29901d7 a8054b3 29901d7 a8054b3 29901d7 a8054b3 29901d7 a8054b3 0848e32 29901d7 a8054b3 29901d7 a8054b3 29901d7 a8054b3 29901d7 a8054b3 3b03261 a8054b3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# Maximize performance settings
import multiprocessing
import cv2
# Configure OpenCV for multi-core processing
cv2.setNumThreads(multiprocessing.cpu_count())
##############
import torch
import gradio as gr
import numpy as np
from PIL import Image, ImageDraw
from ultralytics import YOLO
import logging
import time
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Global variables for line coordinates
line_params = None
model = None
def initialize_yolov11():
"""Initialize YOLOv11 model with error handling"""
global model
try:
model = YOLO('yolov11n.pt') # Make sure this model file exists
if torch.cuda.is_available():
model.to('cuda')
logger.info("YOLOv11 initialized with CUDA acceleration")
else:
logger.info("YOLOv11 initialized with CPU")
return True
except Exception as e:
logger.error(f"Model initialization failed: {str(e)}")
return False
def extract_first_frame(stream_url):
"""Robust frame extraction with retries"""
for _ in range(3): # Retry up to 3 times
cap = cv2.VideoCapture(stream_url)
if cap.isOpened():
ret, frame = cap.read()
cap.release()
if ret:
return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB), "First frame extracted"
time.sleep(1) # Wait before retry
return None, "Error: Failed to capture initial frame"
def update_line(image, evt: gr.SelectData):
"""Optimized line drawing with validation"""
global line_params
if not hasattr(image, 'points'):
image.points = []
if len(image.points) < 2:
image.points.append((evt.index[0], evt.index[1]))
draw = ImageDraw.Draw(image)
color = "blue" if len(image.points) == 1 else "green"
draw.ellipse([evt.index[0]-5, evt.index[1]-5, evt.index[0]+5, evt.index[1]+5],
fill=color, outline=color)
if len(image.points) == 2:
x1, y1 = image.points[0]
x2, y2 = image.points[1]
draw = ImageDraw.Draw(image)
draw.line([(x1,y1), (x2,y2)], fill="red", width=2)
# Store line parameters
if x2 - x1 != 0:
slope = (y2 - y1) / (x2 - x1)
intercept = y1 - slope * x1
else:
slope = float('inf')
intercept = x1
line_params = (slope, intercept, (x1,y1), (x2,y2))
status = f"Points: {len(image.points)}/2" if len(image.points) < 2 else "Line set!"
return image, status
def line_intersection(box, line):
"""Fast line-box intersection using vector math"""
(m, b, (x1,y1), (x2,y2)) = line
box_x1, box_y1, box_x2, box_y2 = box
# Convert line to parametric form
dx = x2 - x1
dy = y2 - y1
# Check box edges
t0 = 0.0
t1 = 1.0
for edge in [0, 1]: # Check both x and y axes
if edge == 0: # X-axis boundaries
dir = dx
p = box_x1 - x1
q = box_x2 - x1
else: # Y-axis boundaries
dir = dy
p = box_y1 - y1
q = box_y2 - y1
if dir == 0:
if p > 0 or q < 0: return False
continue
t_near = p / dir
t_far = q / dir
if t_near > t_far: t_near, t_far = t_far, t_near
t0 = max(t0, t_near)
t1 = min(t1, t_far)
if t0 > t1: return False
return t0 <= 1 and t1 >= 0
def process_stream(conf_thresh, classes, stream_url):
"""Optimized video processing pipeline"""
if not model:
yield None, "Model not initialized"
return
if not line_params:
yield None, "No detection line set"
return
cap = cv2.VideoCapture(stream_url)
if not cap.isOpened():
yield None, "Failed to open video stream"
return
tracker = {} # {track_id: last_seen}
crossed = set()
frame_skip = 2 # Process every 2nd frame
count = 0
while True:
ret, frame = cap.read()
if not ret:
break
count += 1
if count % frame_skip != 0:
continue
# Detection
results = model.track(
frame,
persist=True,
conf=conf_thresh,
classes=classes,
verbose=False,
device='cuda' if torch.cuda.is_available() else 'cpu'
)
# Processing
if results[0].boxes.id is not None:
boxes = results[0].boxes.xyxy.cpu().numpy()
ids = results[0].boxes.id.int().cpu().numpy()
scores = results[0].boxes.conf.cpu().numpy()
labels = results[0].boxes.cls.cpu().numpy()
for box, track_id, score, label in zip(boxes, ids, scores, labels):
if line_intersection(box, line_params) and track_id not in crossed:
crossed.add(track_id)
if len(crossed) > 1000:
crossed.clear()
# Annotation
annotated = results[0].plot()
cv2.line(annotated, line_params[2], line_params[3], (0,255,0), 2)
cv2.putText(annotated, f"Count: {len(crossed)}", (10,30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0), 2)
yield cv2.cvtColor(annotated, cv2.COLOR_BGR2RGB), ""
cap.release()
# Gradio Interface
with gr.Blocks() as app:
gr.Markdown("# CCTV Smart Monitor - YOLOv11")
# Initialization
if not initialize_yolov11():
gr.Markdown("**Error**: Failed to initialize YOLOv11 model")
# Stream URL input
stream_url = gr.Textbox(
label="RTSP Stream URL",
value="rtsp://example.com/stream",
visible=True
)
# Frame setup
with gr.Row():
frame = gr.Image(label="Setup Frame", interactive=True)
line_status = gr.Textbox(label="Line Status", interactive=False)
# Controls
with gr.Row():
class_selector = gr.CheckboxGroup(
choices=model.names.values() if model else [],
label="Detection Classes"
)
confidence = gr.Slider(0.1, 1.0, value=0.4, label="Confidence Threshold")
# Output
output_video = gr.Image(label="Live Analysis", streaming=True)
error_box = gr.Textbox(label="System Messages", interactive=False)
# Interactions
frame.select(
update_line,
inputs=frame,
outputs=[frame, line_status]
)
gr.Button("Start Analysis").click(
process_stream,
inputs=[confidence, class_selector, stream_url],
outputs=[output_video, error_box]
)
if __name__ == "__main__":
app.launch(debug=True, enable_queue=True) |