Spaces:
Running
Running
# Maximize CPU usage | |
import multiprocessing | |
import cv2 | |
# Set OpenCV to use all available cores | |
cv2.setNumThreads(multiprocessing.cpu_count()) | |
############## | |
import gradio as gr | |
import numpy as np | |
from PIL import Image, ImageDraw | |
from ultralytics import YOLO | |
import logging | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Global variables | |
start_point = end_point = line_params = None | |
def extract_first_frame(stream_url): | |
"""Extracts first frame from IP camera""" | |
cap = cv2.VideoCapture(stream_url) | |
if not cap.isOpened(): | |
return None, "Error: Could not open stream." | |
ret, frame = cap.read() | |
cap.release() | |
if not ret: | |
return None, "Error: Could not read frame." | |
return Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)), "First frame extracted." | |
def update_line(image, evt: gr.SelectData): | |
"""Handles line drawing interactions""" | |
global start_point, end_point, line_params | |
if not start_point: | |
start_point = (evt.index[0], evt.index[1]) | |
draw = ImageDraw.Draw(image) | |
draw.ellipse((start_point[0]-5, start_point[1]-5, start_point[0]+5, start_point[1]+5), | |
fill="blue", outline="blue") | |
return image, f"Start: {start_point}" | |
end_point = (evt.index[0], evt.index[1]) | |
draw = ImageDraw.Draw(image) | |
draw.line([start_point, end_point], fill="red", width=2) | |
draw.ellipse((end_point[0]-5, end_point[1]-5, end_point[0]+5, end_point[1]+5), | |
fill="green", outline="green") | |
# Calculate line parameters | |
if start_point[0] != end_point[0]: | |
slope = (end_point[1] - start_point[1]) / (end_point[0] - start_point[0]) | |
intercept = start_point[1] - slope * start_point[0] | |
line_params = (slope, intercept, start_point, end_point) | |
else: | |
line_params = (float('inf'), start_point[0], start_point, end_point) | |
start_point = None | |
return image, f"Line: {line_params[2]} to {line_params[3]}" | |
def intersect(A, B, C, D): | |
"""Check line segment intersection""" | |
def ccw(A, B, C): | |
return (C[1]-A[1])*(B[0]-A[0]) > (B[1]-A[1])*(C[0]-A[0]) | |
return ccw(A,C,D) != ccw(B,C,D) and ccw(A,B,C) != ccw(A,B,D) | |
def is_crossing(box, line_params): | |
"""Check if box crosses line""" | |
if not line_params: | |
return False | |
(x1, y1), (x2, y2) = line_params[2], line_params[3] | |
box_edges = [ | |
((box[0], box[1]), (box[2], box[1])), | |
((box[2], box[1]), (box[2], box[3])), | |
((box[2], box[3]), (box[0], box[3])), | |
((box[0], box[3]), (box[0], box[1])) | |
] | |
intersections = 0 | |
for edge in box_edges: | |
if intersect((x1,y1), (x2,y2), edge[0], edge[1]): | |
intersections += 1 | |
if intersections >= 2: | |
return True | |
return False | |
def process_video(conf=0.5, classes=None, stream_url=None): | |
"""Main processing function""" | |
global line_params | |
# Initialize YOLOv11 | |
model = YOLO('yolo11n.pt') | |
cap = cv2.VideoCapture(stream_url) | |
crossed = set() | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
break | |
# Run inference | |
results = model.track(frame, persist=True, conf=conf, classes=classes) | |
# Process results | |
if results[0].boxes.id is not None: | |
boxes = results[0].boxes.xyxy.cpu().numpy() | |
ids = results[0].boxes.id.cpu().numpy().astype(int) | |
clss = results[0].boxes.cls.cpu().numpy().astype(int) | |
for box, tid, cls in zip(boxes, ids, clss): | |
if is_crossing(box, line_params) and tid not in crossed: | |
crossed.add(tid) | |
# Draw overlays | |
annotated = results[0].plot() | |
if line_params: | |
cv2.line(annotated, line_params[2], line_params[3], (0,255,0), 2) | |
cv2.putText(annotated, f"Count: {len(crossed)}", (10,30), | |
cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0), 2) | |
yield cv2.cvtColor(annotated, cv2.COLOR_BGR2RGB), "" | |
cap.release() | |
# Gradio Interface | |
with gr.Blocks() as app: | |
gr.Markdown("# CCTV Object Counter - YOLOv11") | |
# Stream setup | |
url = gr.Textbox(label="Stream URL", value="https://example.com/stream.m3u8") | |
frame_btn = gr.Button("Get First Frame") | |
# Image components | |
img = gr.Image(label="Draw Detection Line", interactive=True) | |
line_info = gr.Textbox(label="Line Coordinates") | |
# Controls | |
classes = gr.CheckboxGroup(label="Classes", choices=[ | |
"person", "car", "truck", "motorcycle" | |
], value=["person"]) | |
conf = gr.Slider(0.1, 1.0, value=0.4, label="Confidence Threshold") | |
# Output | |
video_out = gr.Image(label="Live View", streaming=True) | |
status = gr.Textbox(label="Status") | |
# Interactions | |
frame_btn.click( | |
extract_first_frame, | |
inputs=url, | |
outputs=[img, status] | |
) | |
img.select( | |
update_line, | |
inputs=img, | |
outputs=[img, line_info] | |
) | |
gr.Button("Start Counting").click( | |
process_video, | |
inputs=[conf, classes, url], | |
outputs=[video_out, status] | |
) | |
app.launch() |