Spaces:
Running
Running
File size: 6,178 Bytes
29901d7 e93dd75 29901d7 3b03261 29901d7 3b03261 29901d7 3b03261 29901d7 3b03261 29901d7 3b03261 29901d7 3b03261 29901d7 3b03261 29901d7 3b03261 29901d7 3b03261 29901d7 3b03261 29901d7 3b03261 29901d7 3b03261 29901d7 3b03261 38a38c4 29901d7 3b03261 29901d7 2b307a5 29901d7 3b03261 29901d7 3b03261 29901d7 0848e32 29901d7 38a38c4 29901d7 38a38c4 3b03261 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
# Maximize CPU usage and GPU utilization
import multiprocessing
import cv2
# Get the number of CPU cores
cpu_cores = multiprocessing.cpu_count()
# Set OpenCV to use all available cores
cv2.setNumThreads(cpu_cores)
# Print the number of threads being used (optional)
print(f"OpenCV using {cv2.getNumThreads()} threads out of {cpu_cores} available cores")
##############
import torch
import gradio as gr
import numpy as np
from PIL import Image, ImageDraw
from ultralytics import YOLO
from ultralytics.utils.plotting import Annotator, colors
import logging
import math
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Global variables to store line coordinates and line equation
start_point = None
end_point = None
line_params = None # Stores (slope, intercept) of the line
# Initialize model once
model = YOLO('yolov8n.pt') # Use smaller model if needed
# Check for GPU availability
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model.to(device)
logger.info(f"Using device: {device}")
# Video processing parameters
FRAME_SKIP = 1 # Process every nth frame
FRAME_SCALE = 0.5 # Scale factor for input frames
def extract_first_frame(stream_url):
"""Extracts the first available frame from the IP camera stream."""
logger.info("Extracting first frame...")
cap = cv2.VideoCapture(stream_url)
if not cap.isOpened():
logger.error("Could not open stream.")
return None, "Error: Could not open stream."
ret, frame = cap.read()
cap.release()
if not ret:
logger.error("Could not read frame.")
return None, "Error: Could not read frame."
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
return Image.fromarray(frame_rgb), "First frame extracted."
def update_line(image, evt: gr.SelectData):
"""Updates the line based on user interaction."""
global start_point, end_point, line_params
if start_point is None:
start_point = (evt.index[0], evt.index[1])
draw = ImageDraw.Draw(image)
draw.ellipse((start_point[0]-5, start_point[1]-5, start_point[0]+5, start_point[1]+5),
fill="blue", outline="blue")
return image, f"Line Start: {start_point}"
end_point = (evt.index[0], evt.index[1])
draw = ImageDraw.Draw(image)
draw.line([start_point, end_point], fill="red", width=2)
draw.ellipse((end_point[0]-5, end_point[1]-5, end_point[0]+5, end_point[1]+5),
fill="green", outline="green")
# Calculate line parameters
if start_point[0] != end_point[0]:
slope = (end_point[1] - start_point[1]) / (end_point[0] - start_point[0])
intercept = start_point[1] - slope * start_point[0]
line_params = (slope, intercept, start_point, end_point)
else:
line_params = (float('inf'), start_point[0], start_point, end_point)
start_point = None
return image, f"Line: {line_params[0]:.2f}x + {line_params[1]:.2f}"
def optimized_intersection_check(box, line_params):
"""Optimized line-box intersection check using vector math."""
_, _, (x1, y1), (x2, y2) = line_params
box_x1, box_y1, box_x2, box_y2 = box
# Convert line to parametric form
dx = x2 - x1
dy = y2 - y1
# Check if any box edge intersects the line
t_near = -float('inf')
t_far = float('inf')
for i in range(2):
if dx == 0 and dy == 0:
continue
if i == 0: # X-axis
t0 = (box_x1 - x1) / dx if dx != 0 else 0
t1 = (box_x2 - x1) / dx if dx != 0 else 0
else: # Y-axis
t0 = (box_y1 - y1) / dy if dy != 0 else 0
t1 = (box_y2 - y1) / dy if dy != 0 else 0
t_min = min(t0, t1)
t_max = max(t0, t1)
if t_min > t_near: t_near = t_min
if t_max < t_far: t_far = t_max
return t_near <= t_far and t_near <= 1 and t_far >= 0
def process_video(confidence_threshold=0.5, selected_classes=None, stream_url=None):
"""Optimized video processing pipeline."""
global line_params
# Validation checks
if not line_params or not selected_classes or not stream_url:
return None, "Missing configuration parameters"
# Convert to set for faster lookups
selected_classes = set(selected_classes)
# Video capture setup
cap = cv2.VideoCapture(stream_url)
if not cap.isOpened():
return None, "Error opening stream"
crossed_objects = set()
frame_count = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
frame_count += 1
if frame_count % FRAME_SKIP != 0:
continue
# Preprocess frame
frame = cv2.resize(frame, None, fx=FRAME_SCALE, fy=FRAME_SCALE)
# Object detection
results = model.track(
frame,
persist=True,
conf=confidence_threshold,
verbose=False,
device=device,
tracker="botsort.yaml" # Use optimized tracker config
)
# Process detections
if results[0].boxes.id is not None:
boxes = results[0].boxes.xyxy.cpu().numpy()
track_ids = results[0].boxes.id.int().cpu().numpy()
classes = results[0].boxes.cls.cpu().numpy()
for box, track_id, cls in zip(boxes, track_ids, classes):
if model.names[int(cls)] not in selected_classes:
continue
if optimized_intersection_check(box, line_params) and track_id not in crossed_objects:
crossed_objects.add(track_id)
if len(crossed_objects) > 1000:
crossed_objects.clear()
# Annotation
annotated_frame = results[0].plot()
cv2.line(annotated_frame, line_params[2], line_params[3], (0,255,0), 2)
cv2.putText(annotated_frame, f"COUNT: {len(crossed_objects)}",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0), 2)
yield annotated_frame, ""
cap.release()
|