import cv2 import gradio as gr import numpy as np from PIL import Image, ImageDraw from ultralytics import YOLO import logging import math # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Global variables to store line coordinates and line equation start_point = None end_point = None line_params = None # Stores (slope, intercept) of the line def extract_first_frame(stream_url): """ Extracts the first available frame from the IP camera stream and returns it as a PIL image. """ logger.info("Attempting to extract the first frame from the stream...") cap = cv2.VideoCapture(stream_url) if not cap.isOpened(): logger.error("Error: Could not open stream.") return None, "Error: Could not open stream." ret, frame = cap.read() cap.release() if not ret: logger.error("Error: Could not read the first frame.") return None, "Error: Could not read the first frame." # Convert the frame to a PIL image frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) pil_image = Image.fromarray(frame_rgb) logger.info("First frame extracted successfully.") return pil_image, "First frame extracted successfully." def update_line(image, evt: gr.SelectData): """ Updates the line based on user interaction (click and drag). """ global start_point, end_point, line_params # If it's the first click, set the start point and show it on the image if start_point is None: start_point = (evt.index[0], evt.index[1]) # Draw the start point on the image draw = ImageDraw.Draw(image) draw.ellipse( (start_point[0] - 5, start_point[1] - 5, start_point[0] + 5, start_point[1] + 5), fill="blue", outline="blue" ) return image, f"Line Coordinates:\nStart: {start_point}, End: None" # If it's the second click, set the end point and draw the line end_point = (evt.index[0], evt.index[1]) # Calculate the slope (m) and intercept (b) of the line: y = mx + b if start_point[0] != end_point[0]: # Avoid division by zero slope = (end_point[1] - start_point[1]) / (end_point[0] - start_point[0]) intercept = start_point[1] - slope * start_point[0] line_params = (slope, intercept, start_point, end_point) # Store slope, intercept, and points else: # Vertical line (special case) line_params = (float('inf'), start_point[0], start_point, end_point) # Draw the line and end point on the image draw = ImageDraw.Draw(image) draw.line([start_point, end_point], fill="red", width=2) draw.ellipse( (end_point[0] - 5, end_point[1] - 5, end_point[0] + 5, end_point[1] + 5), fill="green", outline="green" ) # Return the updated image and line info line_info = f"Line Coordinates:\nStart: {start_point}, End: {end_point}\nLine Equation: y = {line_params[0]:.2f}x + {line_params[1]:.2f}" # Reset the points for the next interaction start_point = None end_point = None return image, line_info def reset_line(): """ Resets the line coordinates. """ global start_point, end_point, line_params start_point = None end_point = None line_params = None return None, "Line reset. Click to draw a new line." def is_object_crossing_line(box, line_params): """ Determines if an object's bounding box is fully intersected by the user-drawn line. """ _, _, line_start, line_end = line_params # Get the bounding box coordinates x1, y1, x2, y2 = box # Define the four edges of the bounding box box_edges = [ ((x1, y1), (x2, y1)), # Top edge ((x2, y1), (x2, y2)), # Right edge ((x2, y2), (x1, y2)), # Bottom edge ((x1, y2), (x1, y1)) # Left edge ] # Count the number of intersections between the line and the bounding box edges intersection_count = 0 for edge_start, edge_end in box_edges: if intersect(line_start, line_end, edge_start, edge_end): intersection_count += 1 # Only count the object if the line intersects the bounding box at least twice return intersection_count >= 2 def draw_angled_line(image, line_params, color=(0, 255, 0), thickness=2): """ Draws the user-defined line on the frame. """ _, _, start_point, end_point = line_params cv2.line(image, start_point, end_point, color, thickness) def process_video(confidence_threshold=0.5, selected_classes=None, stream_url=None): """ Processes the IP camera stream to count objects of the selected classes crossing the line. """ global line_params errors = [] if line_params is None: errors.append("Error: No line drawn. Please draw a line on the first frame.") if selected_classes is None or len(selected_classes) == 0: errors.append("Error: No classes selected. Please select at least one class to detect.") if stream_url is None or stream_url.strip() == "": errors.append("Error: No stream URL provided.") if errors: return None, "\n".join(errors) logger.info("Connecting to the IP camera stream...") cap = cv2.VideoCapture(stream_url) if not cap.isOpened(): errors.append("Error: Could not open stream.") return None, "\n".join(errors) model = YOLO(model="yolov8n.pt") crossed_objects = set() # Use a set to store unique object IDs (if available) logger.info("Starting to process the stream...") while cap.isOpened(): ret, frame = cap.read() if not ret: errors.append("Error: Could not read frame from the stream.") break # Perform object detection (no tracking) results = model.predict(frame, conf=confidence_threshold) for result in results: boxes = result.boxes.xyxy.cpu().numpy() clss = result.boxes.cls.cpu().numpy() confs = result.boxes.conf.cpu().numpy() for box, cls, conf in zip(boxes, clss, confs): if conf >= confidence_threshold and model.names[int(cls)] in selected_classes: # Check if the object crosses the line if is_object_crossing_line(box, line_params): # Use the bounding box center as a unique identifier center = ((box[0] + box[2]) / 2, (box[1] + box[3]) / 2) crossed_objects.add(tuple(center)) # Add the center to the set # Visualize the results with bounding boxes annotated_frame = results[0].plot() # Draw the angled line on the frame draw_angled_line(annotated_frame, line_params, color=(0, 255, 0), thickness=2) # Display the count on the frame count = len(crossed_objects) (text_width, text_height), _ = cv2.getTextSize(f"COUNT: {count}", cv2.FONT_HERSHEY_SIMPLEX, 1, 2) # Calculate the position for the middle of the top margin = 10 # Margin from the top x = (annotated_frame.shape[1] - text_width) // 2 # Center-align the text horizontally y = text_height + margin # Top-align the text # Draw the black background rectangle cv2.rectangle(annotated_frame, (x - margin, y - text_height - margin), (x + text_width + margin, y + margin), (0, 0, 0), -1) # Draw the text cv2.putText(annotated_frame, f"COUNT: {count}", (x, y), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) # Yield the annotated frame to Gradio yield annotated_frame, "" cap.release() logger.info("Stream processing completed.") # Define the Gradio interface with gr.Blocks() as demo: gr.Markdown("