Sanshruth's picture
Update app.py
6e400f8 verified
raw
history blame
8.56 kB
import cv2
import gradio as gr
import numpy as np
from PIL import Image, ImageDraw
from ultralytics import YOLO
import logging
import threading
import queue
import time
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Global variables for line coordinates and line equation
start_point = None
end_point = None
line_params = None # Stores (slope, intercept, start_point, end_point)
# Low-resolution for inference
LOW_RES = (320, 180)
# Frame queue for processed frames
frame_queue = queue.Queue(maxsize=30) # Adjust queue size based on memory constraints
# Thread control flag
processing_active = True
def extract_first_frame(stream_url):
"""
Extracts the first available frame from the IP camera stream and returns it as a PIL image.
"""
logger.info("Attempting to extract the first frame from the stream...")
cap = cv2.VideoCapture(stream_url)
if not cap.isOpened():
logger.error("Error: Could not open stream.")
return None, "Error: Could not open stream."
ret, frame = cap.read()
cap.release()
if not ret:
logger.error("Error: Could not read the first frame.")
return None, "Error: Could not read the first frame."
# Convert the frame to a PIL image
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(frame_rgb)
logger.info("First frame extracted successfully.")
return pil_image, "First frame extracted successfully."
def update_line(image, evt: gr.SelectData):
"""
Updates the line based on user interaction (click and drag).
"""
global start_point, end_point, line_params
if start_point is None:
start_point = (evt.index[0], evt.index[1])
draw = ImageDraw.Draw(image)
draw.ellipse((start_point[0] - 5, start_point[1] - 5, start_point[0] + 5, start_point[1] + 5), fill="blue", outline="blue")
return image, f"Line Coordinates:\nStart: {start_point}, End: None"
end_point = (evt.index[0], evt.index[1])
if start_point[0] != end_point[0]: # Avoid division by zero
slope = (end_point[1] - start_point[1]) / (end_point[0] - start_point[0])
intercept = start_point[1] - slope * start_point[0]
line_params = (slope, intercept, start_point, end_point)
else:
line_params = (float('inf'), start_point[0], start_point, end_point)
draw = ImageDraw.Draw(image)
draw.line([start_point, end_point], fill="red", width=2)
draw.ellipse((end_point[0] - 5, end_point[1] - 5, end_point[0] + 5, end_point[1] + 5), fill="green", outline="green")
line_info = f"Line Coordinates:\nStart: {start_point}, End: {end_point}\nLine Equation: y = {line_params[0]:.2f}x + {line_params[1]:.2f}"
start_point = None
end_point = None
return image, line_info
def reset_line():
"""
Resets the line coordinates.
"""
global start_point, end_point, line_params
start_point = None
end_point = None
line_params = None
return None, "Line reset. Click to draw a new line."
def is_object_crossing_line(box, line_params):
"""
Determines if an object's bounding box is fully intersected by the user-drawn line.
"""
_, _, line_start, line_end = line_params
x1, y1, x2, y2 = box
box_edges = [((x1, y1), (x2, y1)), ((x2, y1), (x2, y2)), ((x2, y2), (x1, y2)), ((x1, y2), (x1, y1))]
intersection_count = 0
for edge_start, edge_end in box_edges:
if intersect(line_start, line_end, edge_start, edge_end):
intersection_count += 1
return intersection_count >= 2
def intersect(A, B, C, D):
"""
Determines if two line segments AB and CD intersect.
"""
def ccw(A, B, C):
return (C[1] - A[1]) * (B[0] - A[0]) - (B[1] - A[1]) * (C[0] - A[0])
def on_segment(A, B, C):
return min(A[0], B[0]) <= C[0] <= max(A[0], B[0]) and min(A[1], B[1]) <= C[1] <= max(A[1], B[1])
ccw1 = ccw(A, B, C)
ccw2 = ccw(A, B, D)
ccw3 = ccw(C, D, A)
ccw4 = ccw(C, D, B)
return ((ccw1 * ccw2 < 0) and (ccw3 * ccw4 < 0)) or (ccw1 == 0 and on_segment(A, B, C)) or (ccw2 == 0 and on_segment(A, B, D)) or (ccw3 == 0 and on_segment(C, D, A)) or (ccw4 == 0 and on_segment(C, D, B))
def process_frames(stream_url, confidence_threshold, selected_classes):
"""
Processes frames in a separate thread and adds them to the frame queue.
"""
global processing_active, frame_queue
cap = cv2.VideoCapture(stream_url)
model = YOLO(model="yolo11n.pt")
crossed_objects = {}
while processing_active and cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# Perform detection on low-res frame
low_res_frame = cv2.resize(frame, LOW_RES)
results = model.track(low_res_frame, persist=True, conf=confidence_threshold)
# Scale bounding boxes to high-res
scale_x = frame.shape[1] / LOW_RES[0]
scale_y = frame.shape[0] / LOW_RES[1]
for detection in results[0].boxes.data:
x1, y1, x2, y2, conf, cls = detection
x1, y1, x2, y2 = int(x1 * scale_x), int(y1 * scale_y), int(x2 * scale_x), int(y2 * scale_y)
if is_object_crossing_line((x1, y1, x2, y2), line_params):
crossed_objects[results[0].boxes.id.int().cpu().tolist()[0]] = True
# Draw bounding boxes and line on the frame
annotated_frame = results[0].plot()
if line_params:
draw_angled_line(annotated_frame, line_params, color=(0, 255, 0), thickness=2)
# Add frame to the queue
if not frame_queue.full():
frame_queue.put(annotated_frame)
cap.release()
def draw_angled_line(image, line_params, color=(0, 255, 0), thickness=2):
"""
Draws the user-defined line on the frame.
"""
_, _, start_point, end_point = line_params
cv2.line(image, start_point, end_point, color, thickness)
def display_frames():
"""
Displays frames from the queue at a consistent frame rate.
"""
while processing_active:
if not frame_queue.empty():
frame = frame_queue.get()
yield cv2.cvtColor(frame, cv2.COLOR_BGR2RGB), ""
else:
time.sleep(0.03) # Wait for the next frame
# Define the Gradio interface
with gr.Blocks() as demo:
gr.Markdown("<h1>Real-time monitoring, object tracking, and line-crossing detection for CCTV camera streams.</h1></center>")
gr.Markdown("## https://github.com/SanshruthR/CCTV_SENTRY_YOLO11")
# Step 1: Enter the IP Camera Stream URL
stream_url = gr.Textbox(label="Enter IP Camera Stream URL", value="https://s104.ipcamlive.com/streams/68idokwtondsqpmkr/stream.m3u8", visible=False)
# Step 1: Extract the first frame from the stream
gr.Markdown("### Step 1: Click on the frame to draw a line, the objects crossing it would be counted in real-time.")
first_frame, status = extract_first_frame(stream_url.value)
if first_frame is None:
gr.Markdown(f"**Error:** {status}")
else:
image = gr.Image(value=first_frame, label="First Frame of Stream", type="pil")
line_info = gr.Textbox(label="Line Coordinates", value="Line Coordinates:\nStart: None, End: None")
image.select(update_line, inputs=image, outputs=[image, line_info])
# Step 2: Select classes to detect
gr.Markdown("### Step 2: Select Classes to Detect")
model = YOLO(model="yolo11n.pt")
class_names = list(model.names.values())
selected_classes = gr.CheckboxGroup(choices=class_names, label="Select Classes to Detect")
# Step 3: Adjust confidence threshold
gr.Markdown("### Step 3: Adjust Confidence Threshold (Optional)")
confidence_threshold = gr.Slider(minimum=0.0, maximum=1.0, value=0.2, label="Confidence Threshold")
# Process the stream
process_button = gr.Button("Process Stream")
output_image = gr.Image(label="Processed Frame", streaming=True)
error_box = gr.Textbox(label="Errors/Warnings", interactive=False)
# Event listener for processing the video
process_button.click(
fn=lambda: (setattr(globals(), "processing_active", True), threading.Thread(target=process_frames, args=(stream_url.value, confidence_threshold.value, selected_classes.value)).start()),
outputs=None
)
# Display frames
demo.load(display_frames, inputs=None, outputs=[output_image, error_box], every=0.03)
# Launch the interface
demo.launch(debug=True)