Spaces:
Sleeping
Sleeping
# Maximize CPU usage and GPU utilization | |
import multiprocessing | |
import cv2 | |
# Get the number of CPU cores | |
cpu_cores = multiprocessing.cpu_count() | |
# Set OpenCV to use all available cores | |
cv2.setNumThreads(cpu_cores) | |
# Print the number of threads being used (optional) | |
print(f"OpenCV using {cv2.getNumThreads()} threads out of {cpu_cores} available cores") | |
############## | |
import torch | |
import gradio as gr | |
import numpy as np | |
from PIL import Image, ImageDraw | |
from ultralytics import YOLO | |
from ultralytics.utils.plotting import Annotator, colors | |
import logging | |
import math | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Global variables to store line coordinates and line equation | |
start_point = None | |
end_point = None | |
line_params = None # Stores (slope, intercept) of the line | |
# Initialize model once | |
model = YOLO('yolov8n.pt') # Use smaller model if needed | |
# Check for GPU availability | |
device = 'cuda' if torch.cuda.is_available() else 'cpu' | |
model.to(device) | |
logger.info(f"Using device: {device}") | |
# Video processing parameters | |
FRAME_SKIP = 1 # Process every nth frame | |
FRAME_SCALE = 0.5 # Scale factor for input frames | |
def extract_first_frame(stream_url): | |
"""Extracts the first available frame from the IP camera stream.""" | |
logger.info("Extracting first frame...") | |
cap = cv2.VideoCapture(stream_url) | |
if not cap.isOpened(): | |
logger.error("Could not open stream.") | |
return None, "Error: Could not open stream." | |
ret, frame = cap.read() | |
cap.release() | |
if not ret: | |
logger.error("Could not read frame.") | |
return None, "Error: Could not read frame." | |
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
return Image.fromarray(frame_rgb), "First frame extracted." | |
def update_line(image, evt: gr.SelectData): | |
"""Updates the line based on user interaction.""" | |
global start_point, end_point, line_params | |
if start_point is None: | |
start_point = (evt.index[0], evt.index[1]) | |
draw = ImageDraw.Draw(image) | |
draw.ellipse((start_point[0]-5, start_point[1]-5, start_point[0]+5, start_point[1]+5), | |
fill="blue", outline="blue") | |
return image, f"Line Start: {start_point}" | |
end_point = (evt.index[0], evt.index[1]) | |
draw = ImageDraw.Draw(image) | |
draw.line([start_point, end_point], fill="red", width=2) | |
draw.ellipse((end_point[0]-5, end_point[1]-5, end_point[0]+5, end_point[1]+5), | |
fill="green", outline="green") | |
# Calculate line parameters | |
if start_point[0] != end_point[0]: | |
slope = (end_point[1] - start_point[1]) / (end_point[0] - start_point[0]) | |
intercept = start_point[1] - slope * start_point[0] | |
line_params = (slope, intercept, start_point, end_point) | |
else: | |
line_params = (float('inf'), start_point[0], start_point, end_point) | |
start_point = None | |
return image, f"Line: {line_params[0]:.2f}x + {line_params[1]:.2f}" | |
def optimized_intersection_check(box, line_params): | |
"""Optimized line-box intersection check using vector math.""" | |
_, _, (x1, y1), (x2, y2) = line_params | |
box_x1, box_y1, box_x2, box_y2 = box | |
# Convert line to parametric form | |
dx = x2 - x1 | |
dy = y2 - y1 | |
# Check if any box edge intersects the line | |
t_near = -float('inf') | |
t_far = float('inf') | |
for i in range(2): | |
if dx == 0 and dy == 0: | |
continue | |
if i == 0: # X-axis | |
t0 = (box_x1 - x1) / dx if dx != 0 else 0 | |
t1 = (box_x2 - x1) / dx if dx != 0 else 0 | |
else: # Y-axis | |
t0 = (box_y1 - y1) / dy if dy != 0 else 0 | |
t1 = (box_y2 - y1) / dy if dy != 0 else 0 | |
t_min = min(t0, t1) | |
t_max = max(t0, t1) | |
if t_min > t_near: t_near = t_min | |
if t_max < t_far: t_far = t_max | |
return t_near <= t_far and t_near <= 1 and t_far >= 0 | |
def process_video(confidence_threshold=0.5, selected_classes=None, stream_url=None): | |
"""Optimized video processing pipeline.""" | |
global line_params | |
# Validation checks | |
if not line_params or not selected_classes or not stream_url: | |
return None, "Missing configuration parameters" | |
# Convert to set for faster lookups | |
selected_classes = set(selected_classes) | |
# Video capture setup | |
cap = cv2.VideoCapture(stream_url) | |
if not cap.isOpened(): | |
return None, "Error opening stream" | |
crossed_objects = set() | |
frame_count = 0 | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
break | |
frame_count += 1 | |
if frame_count % FRAME_SKIP != 0: | |
continue | |
# Preprocess frame | |
frame = cv2.resize(frame, None, fx=FRAME_SCALE, fy=FRAME_SCALE) | |
# Object detection | |
results = model.track( | |
frame, | |
persist=True, | |
conf=confidence_threshold, | |
verbose=False, | |
device=device, | |
tracker="botsort.yaml" # Use optimized tracker config | |
) | |
# Process detections | |
if results[0].boxes.id is not None: | |
boxes = results[0].boxes.xyxy.cpu().numpy() | |
track_ids = results[0].boxes.id.int().cpu().numpy() | |
classes = results[0].boxes.cls.cpu().numpy() | |
for box, track_id, cls in zip(boxes, track_ids, classes): | |
if model.names[int(cls)] not in selected_classes: | |
continue | |
if optimized_intersection_check(box, line_params) and track_id not in crossed_objects: | |
crossed_objects.add(track_id) | |
if len(crossed_objects) > 1000: | |
crossed_objects.clear() | |
# Annotation | |
annotated_frame = results[0].plot() | |
cv2.line(annotated_frame, line_params[2], line_params[3], (0,255,0), 2) | |
cv2.putText(annotated_frame, f"COUNT: {len(crossed_objects)}", | |
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0), 2) | |
yield annotated_frame, "" | |
cap.release() | |