Sanshruth's picture
Update app.py
a8054b3 verified
raw
history blame
6.84 kB
# Maximize performance settings
import multiprocessing
import cv2
# Configure OpenCV for multi-core processing
cv2.setNumThreads(multiprocessing.cpu_count())
##############
import torch
import gradio as gr
import numpy as np
from PIL import Image, ImageDraw
from ultralytics import YOLO
import logging
import time
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Global variables for line coordinates
line_params = None
model = None
def initialize_yolov11():
"""Initialize YOLOv11 model with error handling"""
global model
try:
model = YOLO('yolov11n.pt') # Make sure this model file exists
if torch.cuda.is_available():
model.to('cuda')
logger.info("YOLOv11 initialized with CUDA acceleration")
else:
logger.info("YOLOv11 initialized with CPU")
return True
except Exception as e:
logger.error(f"Model initialization failed: {str(e)}")
return False
def extract_first_frame(stream_url):
"""Robust frame extraction with retries"""
for _ in range(3): # Retry up to 3 times
cap = cv2.VideoCapture(stream_url)
if cap.isOpened():
ret, frame = cap.read()
cap.release()
if ret:
return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB), "First frame extracted"
time.sleep(1) # Wait before retry
return None, "Error: Failed to capture initial frame"
def update_line(image, evt: gr.SelectData):
"""Optimized line drawing with validation"""
global line_params
if not hasattr(image, 'points'):
image.points = []
if len(image.points) < 2:
image.points.append((evt.index[0], evt.index[1]))
draw = ImageDraw.Draw(image)
color = "blue" if len(image.points) == 1 else "green"
draw.ellipse([evt.index[0]-5, evt.index[1]-5, evt.index[0]+5, evt.index[1]+5],
fill=color, outline=color)
if len(image.points) == 2:
x1, y1 = image.points[0]
x2, y2 = image.points[1]
draw = ImageDraw.Draw(image)
draw.line([(x1,y1), (x2,y2)], fill="red", width=2)
# Store line parameters
if x2 - x1 != 0:
slope = (y2 - y1) / (x2 - x1)
intercept = y1 - slope * x1
else:
slope = float('inf')
intercept = x1
line_params = (slope, intercept, (x1,y1), (x2,y2))
status = f"Points: {len(image.points)}/2" if len(image.points) < 2 else "Line set!"
return image, status
def line_intersection(box, line):
"""Fast line-box intersection using vector math"""
(m, b, (x1,y1), (x2,y2)) = line
box_x1, box_y1, box_x2, box_y2 = box
# Convert line to parametric form
dx = x2 - x1
dy = y2 - y1
# Check box edges
t0 = 0.0
t1 = 1.0
for edge in [0, 1]: # Check both x and y axes
if edge == 0: # X-axis boundaries
dir = dx
p = box_x1 - x1
q = box_x2 - x1
else: # Y-axis boundaries
dir = dy
p = box_y1 - y1
q = box_y2 - y1
if dir == 0:
if p > 0 or q < 0: return False
continue
t_near = p / dir
t_far = q / dir
if t_near > t_far: t_near, t_far = t_far, t_near
t0 = max(t0, t_near)
t1 = min(t1, t_far)
if t0 > t1: return False
return t0 <= 1 and t1 >= 0
def process_stream(conf_thresh, classes, stream_url):
"""Optimized video processing pipeline"""
if not model:
yield None, "Model not initialized"
return
if not line_params:
yield None, "No detection line set"
return
cap = cv2.VideoCapture(stream_url)
if not cap.isOpened():
yield None, "Failed to open video stream"
return
tracker = {} # {track_id: last_seen}
crossed = set()
frame_skip = 2 # Process every 2nd frame
count = 0
while True:
ret, frame = cap.read()
if not ret:
break
count += 1
if count % frame_skip != 0:
continue
# Detection
results = model.track(
frame,
persist=True,
conf=conf_thresh,
classes=classes,
verbose=False,
device='cuda' if torch.cuda.is_available() else 'cpu'
)
# Processing
if results[0].boxes.id is not None:
boxes = results[0].boxes.xyxy.cpu().numpy()
ids = results[0].boxes.id.int().cpu().numpy()
scores = results[0].boxes.conf.cpu().numpy()
labels = results[0].boxes.cls.cpu().numpy()
for box, track_id, score, label in zip(boxes, ids, scores, labels):
if line_intersection(box, line_params) and track_id not in crossed:
crossed.add(track_id)
if len(crossed) > 1000:
crossed.clear()
# Annotation
annotated = results[0].plot()
cv2.line(annotated, line_params[2], line_params[3], (0,255,0), 2)
cv2.putText(annotated, f"Count: {len(crossed)}", (10,30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0), 2)
yield cv2.cvtColor(annotated, cv2.COLOR_BGR2RGB), ""
cap.release()
# Gradio Interface
with gr.Blocks() as app:
gr.Markdown("# CCTV Smart Monitor - YOLOv11")
# Initialization
if not initialize_yolov11():
gr.Markdown("**Error**: Failed to initialize YOLOv11 model")
# Stream URL input
stream_url = gr.Textbox(
label="RTSP Stream URL",
value="rtsp://example.com/stream",
visible=True
)
# Frame setup
with gr.Row():
frame = gr.Image(label="Setup Frame", interactive=True)
line_status = gr.Textbox(label="Line Status", interactive=False)
# Controls
with gr.Row():
class_selector = gr.CheckboxGroup(
choices=model.names.values() if model else [],
label="Detection Classes"
)
confidence = gr.Slider(0.1, 1.0, value=0.4, label="Confidence Threshold")
# Output
output_video = gr.Image(label="Live Analysis", streaming=True)
error_box = gr.Textbox(label="System Messages", interactive=False)
# Interactions
frame.select(
update_line,
inputs=frame,
outputs=[frame, line_status]
)
gr.Button("Start Analysis").click(
process_stream,
inputs=[confidence, class_selector, stream_url],
outputs=[output_video, error_box]
)
if __name__ == "__main__":
app.launch(debug=True, enable_queue=True)