Sanshruth's picture
Update app.py
ac55573 verified
raw
history blame
5.29 kB
# Maximize CPU usage
import multiprocessing
import cv2
# Set OpenCV to use all available cores
cv2.setNumThreads(multiprocessing.cpu_count())
##############
import gradio as gr
import numpy as np
from PIL import Image, ImageDraw
from ultralytics import YOLO
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Global variables
start_point = end_point = line_params = None
def extract_first_frame(stream_url):
"""Extracts first frame from IP camera"""
cap = cv2.VideoCapture(stream_url)
if not cap.isOpened():
return None, "Error: Could not open stream."
ret, frame = cap.read()
cap.release()
if not ret:
return None, "Error: Could not read frame."
return Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)), "First frame extracted."
def update_line(image, evt: gr.SelectData):
"""Handles line drawing interactions"""
global start_point, end_point, line_params
if not start_point:
start_point = (evt.index[0], evt.index[1])
draw = ImageDraw.Draw(image)
draw.ellipse((start_point[0]-5, start_point[1]-5, start_point[0]+5, start_point[1]+5),
fill="blue", outline="blue")
return image, f"Start: {start_point}"
end_point = (evt.index[0], evt.index[1])
draw = ImageDraw.Draw(image)
draw.line([start_point, end_point], fill="red", width=2)
draw.ellipse((end_point[0]-5, end_point[1]-5, end_point[0]+5, end_point[1]+5),
fill="green", outline="green")
# Calculate line parameters
if start_point[0] != end_point[0]:
slope = (end_point[1] - start_point[1]) / (end_point[0] - start_point[0])
intercept = start_point[1] - slope * start_point[0]
line_params = (slope, intercept, start_point, end_point)
else:
line_params = (float('inf'), start_point[0], start_point, end_point)
start_point = None
return image, f"Line: {line_params[2]} to {line_params[3]}"
def intersect(A, B, C, D):
"""Check line segment intersection"""
def ccw(A, B, C):
return (C[1]-A[1])*(B[0]-A[0]) > (B[1]-A[1])*(C[0]-A[0])
return ccw(A,C,D) != ccw(B,C,D) and ccw(A,B,C) != ccw(A,B,D)
def is_crossing(box, line_params):
"""Check if box crosses line"""
if not line_params:
return False
(x1, y1), (x2, y2) = line_params[2], line_params[3]
box_edges = [
((box[0], box[1]), (box[2], box[1])),
((box[2], box[1]), (box[2], box[3])),
((box[2], box[3]), (box[0], box[3])),
((box[0], box[3]), (box[0], box[1]))
]
intersections = 0
for edge in box_edges:
if intersect((x1,y1), (x2,y2), edge[0], edge[1]):
intersections += 1
if intersections >= 2:
return True
return False
def process_video(conf=0.5, classes=None, stream_url=None):
"""Main processing function"""
global line_params
# Initialize YOLOv11
model = YOLO('yolo11n.pt')
cap = cv2.VideoCapture(stream_url)
crossed = set()
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# Run inference
results = model.track(frame, persist=True, conf=conf, classes=classes)
# Process results
if results[0].boxes.id is not None:
boxes = results[0].boxes.xyxy.cpu().numpy()
ids = results[0].boxes.id.cpu().numpy().astype(int)
clss = results[0].boxes.cls.cpu().numpy().astype(int)
for box, tid, cls in zip(boxes, ids, clss):
if is_crossing(box, line_params) and tid not in crossed:
crossed.add(tid)
# Draw overlays
annotated = results[0].plot()
if line_params:
cv2.line(annotated, line_params[2], line_params[3], (0,255,0), 2)
cv2.putText(annotated, f"Count: {len(crossed)}", (10,30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0), 2)
yield cv2.cvtColor(annotated, cv2.COLOR_BGR2RGB), ""
cap.release()
# Gradio Interface
with gr.Blocks() as app:
gr.Markdown("# CCTV Object Counter - YOLOv11")
# Stream setup
url = gr.Textbox(label="Stream URL", value="https://example.com/stream.m3u8")
frame_btn = gr.Button("Get First Frame")
# Image components
img = gr.Image(label="Draw Detection Line", interactive=True)
line_info = gr.Textbox(label="Line Coordinates")
# Controls
classes = gr.CheckboxGroup(label="Classes", choices=[
"person", "car", "truck", "motorcycle"
], value=["person"])
conf = gr.Slider(0.1, 1.0, value=0.4, label="Confidence Threshold")
# Output
video_out = gr.Image(label="Live View", streaming=True)
status = gr.Textbox(label="Status")
# Interactions
frame_btn.click(
extract_first_frame,
inputs=url,
outputs=[img, status]
)
img.select(
update_line,
inputs=img,
outputs=[img, line_info]
)
gr.Button("Start Counting").click(
process_video,
inputs=[conf, classes, url],
outputs=[video_out, status]
)
app.launch()