Spaces:
Sleeping
Sleeping
File size: 5,289 Bytes
ac55573 e93dd75 ac55573 a8054b3 e93dd75 3b03261 ac55573 3b03261 ac55573 29901d7 3b03261 ac55573 3b03261 ac55573 a8054b3 ac55573 a8054b3 ac55573 3b03261 ac55573 a8054b3 ac55573 2b307a5 ac55573 a8054b3 ac55573 29901d7 ac55573 29901d7 ac55573 a8054b3 ac55573 3b03261 a8054b3 ac55573 29901d7 ac55573 0848e32 29901d7 ac55573 29901d7 ac55573 a8054b3 ac55573 a8054b3 ac55573 a8054b3 29901d7 a8054b3 3b03261 a8054b3 ac55573 a8054b3 ac55573 a8054b3 ac55573 a8054b3 ac55573 a8054b3 ac55573 a8054b3 ac55573 a8054b3 ac55573 a8054b3 ac55573 a8054b3 ac55573 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# Maximize CPU usage
import multiprocessing
import cv2
# Set OpenCV to use all available cores
cv2.setNumThreads(multiprocessing.cpu_count())
##############
import gradio as gr
import numpy as np
from PIL import Image, ImageDraw
from ultralytics import YOLO
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Global variables
start_point = end_point = line_params = None
def extract_first_frame(stream_url):
"""Extracts first frame from IP camera"""
cap = cv2.VideoCapture(stream_url)
if not cap.isOpened():
return None, "Error: Could not open stream."
ret, frame = cap.read()
cap.release()
if not ret:
return None, "Error: Could not read frame."
return Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)), "First frame extracted."
def update_line(image, evt: gr.SelectData):
"""Handles line drawing interactions"""
global start_point, end_point, line_params
if not start_point:
start_point = (evt.index[0], evt.index[1])
draw = ImageDraw.Draw(image)
draw.ellipse((start_point[0]-5, start_point[1]-5, start_point[0]+5, start_point[1]+5),
fill="blue", outline="blue")
return image, f"Start: {start_point}"
end_point = (evt.index[0], evt.index[1])
draw = ImageDraw.Draw(image)
draw.line([start_point, end_point], fill="red", width=2)
draw.ellipse((end_point[0]-5, end_point[1]-5, end_point[0]+5, end_point[1]+5),
fill="green", outline="green")
# Calculate line parameters
if start_point[0] != end_point[0]:
slope = (end_point[1] - start_point[1]) / (end_point[0] - start_point[0])
intercept = start_point[1] - slope * start_point[0]
line_params = (slope, intercept, start_point, end_point)
else:
line_params = (float('inf'), start_point[0], start_point, end_point)
start_point = None
return image, f"Line: {line_params[2]} to {line_params[3]}"
def intersect(A, B, C, D):
"""Check line segment intersection"""
def ccw(A, B, C):
return (C[1]-A[1])*(B[0]-A[0]) > (B[1]-A[1])*(C[0]-A[0])
return ccw(A,C,D) != ccw(B,C,D) and ccw(A,B,C) != ccw(A,B,D)
def is_crossing(box, line_params):
"""Check if box crosses line"""
if not line_params:
return False
(x1, y1), (x2, y2) = line_params[2], line_params[3]
box_edges = [
((box[0], box[1]), (box[2], box[1])),
((box[2], box[1]), (box[2], box[3])),
((box[2], box[3]), (box[0], box[3])),
((box[0], box[3]), (box[0], box[1]))
]
intersections = 0
for edge in box_edges:
if intersect((x1,y1), (x2,y2), edge[0], edge[1]):
intersections += 1
if intersections >= 2:
return True
return False
def process_video(conf=0.5, classes=None, stream_url=None):
"""Main processing function"""
global line_params
# Initialize YOLOv11
model = YOLO('yolo11n.pt')
cap = cv2.VideoCapture(stream_url)
crossed = set()
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# Run inference
results = model.track(frame, persist=True, conf=conf, classes=classes)
# Process results
if results[0].boxes.id is not None:
boxes = results[0].boxes.xyxy.cpu().numpy()
ids = results[0].boxes.id.cpu().numpy().astype(int)
clss = results[0].boxes.cls.cpu().numpy().astype(int)
for box, tid, cls in zip(boxes, ids, clss):
if is_crossing(box, line_params) and tid not in crossed:
crossed.add(tid)
# Draw overlays
annotated = results[0].plot()
if line_params:
cv2.line(annotated, line_params[2], line_params[3], (0,255,0), 2)
cv2.putText(annotated, f"Count: {len(crossed)}", (10,30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0), 2)
yield cv2.cvtColor(annotated, cv2.COLOR_BGR2RGB), ""
cap.release()
# Gradio Interface
with gr.Blocks() as app:
gr.Markdown("# CCTV Object Counter - YOLOv11")
# Stream setup
url = gr.Textbox(label="Stream URL", value="https://example.com/stream.m3u8")
frame_btn = gr.Button("Get First Frame")
# Image components
img = gr.Image(label="Draw Detection Line", interactive=True)
line_info = gr.Textbox(label="Line Coordinates")
# Controls
classes = gr.CheckboxGroup(label="Classes", choices=[
"person", "car", "truck", "motorcycle"
], value=["person"])
conf = gr.Slider(0.1, 1.0, value=0.4, label="Confidence Threshold")
# Output
video_out = gr.Image(label="Live View", streaming=True)
status = gr.Textbox(label="Status")
# Interactions
frame_btn.click(
extract_first_frame,
inputs=url,
outputs=[img, status]
)
img.select(
update_line,
inputs=img,
outputs=[img, line_info]
)
gr.Button("Start Counting").click(
process_video,
inputs=[conf, classes, url],
outputs=[video_out, status]
)
app.launch() |