Sanshruth's picture
Update app.py
6e3fd3f verified
raw
history blame
7.54 kB
# Maximize CPU usage
import multiprocessing
import cv2
# Get the number of CPU cores
cpu_cores = multiprocessing.cpu_count()
# Set OpenCV to use all available cores
cv2.setNumThreads(cpu_cores)
# Print the number of threads being used (optional)
print(f"OpenCV using {cv2.getNumThreads()} threads out of {cpu_cores} available cores")
##############
import cv2
import gradio as gr
import numpy as np
from PIL import Image, ImageDraw
from ultralytics import YOLO
from ultralytics.utils.plotting import Annotator, colors
import logging
import math
import torch
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Global variables to store line coordinates and line equation
start_point = None
end_point = None
line_params = None # Stores (start_point, end_point)
# Load model once globally
model = YOLO("yolo11n.pt")
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = model.to(device)
def liang_barsky(line, bbox):
"""Optimized line-rectangle intersection check using Liang-Barsky algorithm"""
x1, y1 = line[0]
x2, y2 = line[1]
xmin, ymin, xmax, ymax = bbox
dx = x2 - x1
dy = y2 - y1
p = [-dx, dx, -dy, dy]
q = [x1 - xmin, xmax - x1, y1 - ymin, ymax - y1]
u1 = 0.0
u2 = 1.0
for i in range(4):
if p[i] == 0:
if q[i] < 0:
return False
continue
t = q[i] / p[i]
if p[i] < 0:
if t > u1:
u1 = t
else:
if t < u2:
u2 = t
return u1 <= u2
def extract_first_frame(stream_url):
"""Extracts the first available frame from the IP camera stream"""
logger.info("Extracting first frame...")
cap = cv2.VideoCapture(stream_url)
if not cap.isOpened():
return None, "Error: Could not open stream."
ret, frame = cap.read()
cap.release()
if not ret:
return None, "Error: Could not read frame."
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
return Image.fromarray(frame_rgb), "First frame extracted successfully."
def update_line(image, evt: gr.SelectData):
"""Handles line drawing interactions"""
global start_point, end_point, line_params
if start_point is None:
start_point = (evt.index[0], evt.index[1])
draw = ImageDraw.Draw(image)
draw.ellipse((start_point[0]-5, start_point[1]-5, start_point[0]+5, start_point[1]+5),
fill="blue", outline="blue")
return image, f"Line Coordinates:\nStart: {start_point}, End: None"
end_point = (evt.index[0], evt.index[1])
line_params = (start_point, end_point)
draw = ImageDraw.Draw(image)
draw.line([start_point, end_point], fill="red", width=2)
draw.ellipse((end_point[0]-5, end_point[1]-5, end_point[0]+5, end_point[1]+5),
fill="green", outline="green")
start_point = None
return image, f"Line Coordinates:\nStart: {line_params[0]}, End: {line_params[1]}"
def reset_line():
"""Resets line coordinates"""
global start_point, end_point, line_params
start_point = end_point = line_params = None
return None, "Line reset. Click to draw a new line."
def is_object_crossing_line(box, line_params):
"""Optimized line crossing check using Liang-Barsky algorithm"""
if not line_params:
return False
line_start, line_end = line_params
x1, y1, x2, y2 = box
return liang_barsky((line_start, line_end), (x1, y1, x2, y2))
def draw_angled_line(image, line_params, color=(0, 255, 0), thickness=2):
"""Draws the user-defined line on the frame"""
start, end = line_params
cv2.line(image, start, end, color, thickness)
def process_video(confidence_threshold=0.5, selected_classes=None, stream_url=None):
"""Main video processing function with optimizations"""
global line_params
errors = []
if not line_params:
errors.append("Error: No line drawn.")
if not selected_classes:
errors.append("Error: No classes selected.")
if not stream_url:
errors.append("Error: No stream URL provided.")
if errors:
return None, "\n".join(errors)
# Convert class names to indices once
selected_class_indices = {i for i, name in model.names.items() if name in selected_classes}
cap = cv2.VideoCapture(stream_url)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # Reduce buffer size
if not cap.isOpened():
return None, "Error: Could not open stream."
crossed_objects = {}
max_tracked_objects = 1000
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# Optimized inference
results = model.track(
frame,
persist=True,
conf=confidence_threshold,
half=True,
device=device,
verbose=False
)
if results[0].boxes.id is not None:
boxes = results[0].boxes
track_ids = boxes.id.int().cpu().tolist()
clss = boxes.cls.cpu().tolist()
for box, cls, t_id in zip(boxes.xyxy.cpu(), clss, track_ids):
if cls in selected_class_indices and t_id not in crossed_objects:
if is_object_crossing_line(box.numpy(), line_params):
crossed_objects[t_id] = True
if len(crossed_objects) > max_tracked_objects:
crossed_objects.clear()
# Visualization
annotated_frame = results[0].plot()
draw_angled_line(annotated_frame, line_params)
# Draw count
count = len(crossed_objects)
(w, h), _ = cv2.getTextSize(f"COUNT: {count}", cv2.FONT_HERSHEY_SIMPLEX, 1, 2)
cv2.rectangle(annotated_frame, (10, 10), (20 + w, 40 + h), (0, 0, 0), -1)
cv2.putText(annotated_frame, f"COUNT: {count}", (20, 40),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
yield annotated_frame, ""
cap.release()
# Gradio interface remains unchanged
with gr.Blocks() as demo:
gr.Markdown("<h1>Real-time monitoring, object tracking, and line-crossing detection for CCTV camera streams.</h1>")
gr.Markdown("## https://github.com/SanshruthR/CCTV_SENTRY_YOLO11")
stream_url = gr.Textbox(
label="IP Camera Stream URL",
value="https://s104.ipcamlive.com/streams/68idokwtondsqpmkr/stream.m3u8",
visible=False
)
# First frame extraction
first_frame, status = extract_first_frame(stream_url.value)
image = gr.Image(value=first_frame, label="First Frame", type="pil") if first_frame else gr.Markdown(f"**Error:** {status}")
line_info = gr.Textbox(label="Line Coordinates", value="Line Coordinates:\nStart: None, End: None")
image.select(update_line, inputs=image, outputs=[image, line_info])
# Class selection
class_names = list(model.names.values())
selected_classes = gr.CheckboxGroup(choices=class_names, label="Select Classes to Detect")
# Confidence threshold
confidence_threshold = gr.Slider(0.0, 1.0, value=0.2, label="Confidence Threshold")
# Process button
process_button = gr.Button("Process Stream")
output_image = gr.Image(label="Processed Frame", streaming=True)
error_box = gr.Textbox(label="Errors/Warnings", interactive=False)
process_button.click(
process_video,
inputs=[confidence_threshold, selected_classes, stream_url],
outputs=[output_image, error_box]
)
demo.launch(debug=True)