# run7.py # Updated to implement Option 1 directional crossing: # - Detect directional crossing of L1 then L2 (L1 coords and L2 coords provided) # - Maintain a global counter that increments only when an ID crosses L1 (outside->inside) then later crosses L2 (outside->inside) # - Maintain a live "inside polygon" counter # - Visualize both counters in Zone Summary panel # - Keeps all previous features: homography patch, foot-point mapping, travel distance, avg time, occlusion tolerance and reappearance inheritance # Paste and run. Output video and person_times.xlsx saved in working folder. import cv2 import numpy as np import time import torch import pandas as pd from collections import defaultdict, deque from scipy.ndimage import gaussian_filter1d from ultralytics import YOLO import os import platform import sys # Mac-specific optimizations if platform.system() == "Darwin": import os os.environ['PYTORCH_ENABLE_MPS_FALLBACK'] = '1' os.environ['OMP_NUM_THREADS'] = '1' # ---------------- Points in image (given) - adjust if needed A = (440.0, 829.0) B = (883.0, 928.0) C = (1052.0, 325.0) D = (739.0, 297.0) E = (727.0, 688.0) F = (893.0, 312.0) POLYGON = np.array([A, B, C, D], dtype=np.float32) # ---------------- Real-world segment lengths for path C -> B -> A -> D (meters) SEG_REAL_M = [5.0, 2.5, 5.0] # C->B, B->A, A->D # image path (order C,B,A,D) PATH_IMAGE = np.array([C, B, A, D], dtype=np.float32) # Patch base scaling (pixels per meter). Will adapt to fit. BASE_SCALE_PX_PER_M = 80.0 RIGHT_PANEL_W = 350 SMOOTH_ALPHA = 0.65 MISSING_TIMEOUT = 3.0 # ---------------- Lines (L1, L2) coordinates (image space) - use these for counting L1_p1 = (898.0, 322.0) L1_p2 = (1020.0, 453.0) L2_p1 = (786.0, 576.0) L2_p2 = (977.0, 607.0) # ---------------- Utilities def progress_bar(current, total, bar_length=30): if total <= 0: return ratio = current / total filled = int(ratio * bar_length) bar = "█" * filled + "-" * (bar_length - filled) print(f"\r[{bar}] {int(ratio * 100)}% Frame {current}/{total}", end="") def point_in_polygon(cx, cy, polygon): return cv2.pointPolygonTest(polygon.astype(np.int32), (int(cx), int(cy)), False) >= 0 def euclid(a, b): return float(np.hypot(a[0]-b[0], a[1]-b[1])) def fmt(t): return time.strftime('%H:%M:%S', time.gmtime(t)) def calculate_foot_from_head(head_box, head_center): """Calculate foot position from head detection.""" x1, y1, x2, y2 = head_box head_cx, head_cy = head_center head_height = y2 - y1 body_length_est = head_height * 5.5 foot_x = head_cx foot_y = head_cy + body_length_est return foot_x, foot_y def nms_obb(boxes, scores, threshold=0.4): """Non-Maximum Suppression for Oriented Bounding Boxes""" if len(boxes) == 0: return [] boxes_np = np.array(boxes) scores_np = np.array(scores) x_coords = boxes_np[:, 0::2] y_coords = boxes_np[:, 1::2] x_min = np.min(x_coords, axis=1) y_min = np.min(y_coords, axis=1) x_max = np.max(x_coords, axis=1) y_max = np.max(y_coords, axis=1) areas = (x_max - x_min) * (y_max - y_min) order = scores_np.argsort()[::-1] keep = [] while order.size > 0: i = order[0] keep.append(i) xx1 = np.maximum(x_min[i], x_min[order[1:]]) yy1 = np.maximum(y_min[i], y_min[order[1:]]) xx2 = np.minimum(x_max[i], x_max[order[1:]]) yy2 = np.minimum(y_max[i], y_max[order[1:]]) w = np.maximum(0.0, xx2 - xx1) h = np.maximum(0.0, yy2 - yy1) intersection = w * h union = areas[i] + areas[order[1:]] - intersection iou = intersection / union inds = np.where(iou <= threshold)[0] order = order[inds + 1] return keep # ---------------- Project point onto polyline (returns along distance in px and proj point) def project_point_to_polyline(pt, poly): best_dist = None best_proj = None best_cum = 0.0 cum = 0.0 for i in range(1, len(poly)): a = np.array(poly[i-1], dtype=np.float32) b = np.array(poly[i], dtype=np.float32) v = b - a w = np.array(pt, dtype=np.float32) - a seg_len = float(np.hypot(v[0], v[1])) if seg_len == 0: t = 0.0 proj = a.copy() else: t = float(np.dot(w, v) / (seg_len*seg_len)) t = max(0.0, min(1.0, t)) proj = a + t*v d = float(np.hypot(proj[0]-pt[0], proj[1]-pt[1])) along_px = cum + t * seg_len if best_dist is None or d < best_dist: best_dist = d best_proj = proj best_cum = along_px cum += seg_len return float(best_cum), (float(best_proj[0]), float(best_proj[1])) def polyline_pixel_lengths(poly): return [euclid(poly[i-1], poly[i]) for i in range(1, len(poly))] # ---------------- Compute conversion per segment (image) img_seg_px_lengths = polyline_pixel_lengths(PATH_IMAGE) if len(img_seg_px_lengths) != len(SEG_REAL_M): raise RuntimeError("PATH_IMAGE and SEG_REAL_M length mismatch") seg_px_to_m = [] for px_len, m_len in zip(img_seg_px_lengths, SEG_REAL_M): seg_px_to_m.append((m_len / px_len) if px_len > 1e-6 else 0.0) # helper: compute along_m from an image point using image PATH_IMAGE def image_point_to_along_m(pt): along_px, _ = project_point_to_polyline(pt, PATH_IMAGE) px_cum = 0.0 cum_m = 0.0 for i, seg_px in enumerate(img_seg_px_lengths): next_px = px_cum + seg_px if along_px <= next_px + 1e-9: offset_px = along_px - px_cum along_m = cum_m + offset_px * seg_px_to_m[i] return float(max(0.0, min(sum(SEG_REAL_M), along_m))) px_cum = next_px cum_m += SEG_REAL_M[i] return float(sum(SEG_REAL_M)) # ---------------- Build patch rectangle layout (pixel coordinates) def build_patch_layout(scale_px_per_m): margin = 18 rect_w_px = int(2.5 * scale_px_per_m) rect_h_px = int(5.0 * scale_px_per_m) patch_w = rect_w_px + 2*margin patch_h = rect_h_px + 2*margin left_x = margin right_x = margin + rect_w_px top_y = margin bottom_y = margin + rect_h_px # top row: D (left-top), F (mid-top), C (right-top) D_p = (left_x, top_y) F_p = ( (left_x + right_x)//2, top_y ) C_p = (right_x, top_y) A_p = (left_x, bottom_y) B_p = (right_x, bottom_y) # E point down from F E_p = (F_p[0], top_y + int(rect_h_px * 0.55)) path_patch = np.array([C_p, B_p, A_p, D_p], dtype=np.float32) # C->B->A->D extras = {"patch_w": patch_w, "patch_h": patch_h, "D": D_p, "F": F_p, "C": C_p, "A": A_p, "B": B_p, "E": E_p, "scale": scale_px_per_m} return path_patch, extras PATCH_PATH, PATCH_EXTRAS = build_patch_layout(BASE_SCALE_PX_PER_M) PATCH_W = PATCH_EXTRAS["patch_w"] PATCH_H = PATCH_EXTRAS["patch_h"] # ---------------- Line helpers for crossing detection def line_coeffs(p1, p2): # returns a,b,c for line ax+by+c=0 (x1,y1), (x2,y2) = p1, p2 a = y1 - y2 b = x2 - x1 c = x1*y2 - x2*y1 return a, b, c def signed_dist_to_line(p, line_coeff): a,b,c = line_coeff x,y = p return (a*x + b*y + c) / (np.hypot(a,b) + 1e-12) def segment_intersects(a1,a2,b1,b2): # standard segment intersection test def ccw(A,B,C): return (C[1]-A[1])*(B[0]-A[0]) > (B[1]-A[1])*(C[0]-A[0]) A=a1; B=a2; C=b1; D=b2 return (ccw(A,C,D) != ccw(B,C,D)) and (ccw(A,B,C) != ccw(A,B,D)) L1_coeff = line_coeffs(L1_p1, L1_p2) L2_coeff = line_coeffs(L2_p1, L2_p2) # Determine inside side for each line using polygon centroid: poly_centroid = tuple(np.mean(POLYGON, axis=0).tolist()) L1_inside_sign = np.sign(signed_dist_to_line(poly_centroid, L1_coeff)) if L1_inside_sign == 0: L1_inside_sign = 1.0 L2_inside_sign = np.sign(signed_dist_to_line(poly_centroid, L2_coeff)) if L2_inside_sign == 0: L2_inside_sign = 1.0 # ---------------- BBox smoother class BBoxSmoother: def __init__(self, buffer_size=5): self.buf = buffer_size self.hist = defaultdict(lambda: deque(maxlen=buffer_size)) def smooth(self, boxes, ids): out = [] for box, tid in zip(boxes, ids): self.hist[tid].append(box) arr = np.array(self.hist[tid]) if arr.shape[0] >= 3: sm = gaussian_filter1d(arr, sigma=1, axis=0)[-1] else: sm = arr[-1] out.append(sm) return np.array(out) # ---------------- Main processing function def process_video( input_video_path="crop_video.mp4", output_video_path="people_polygon_tracking_corrected.avi", model_name="yolo11x.pt", head_model_name="head_detection_model.pt", conf_threshold=0.3, img_size=1280, use_gpu=True, enhance_frames=False, smooth_bbox_tracks=True, missing_timeout=MISSING_TIMEOUT ): device = "cuda" if torch.cuda.is_available() and use_gpu else "cpu" model = YOLO(model_name) PERSON_CLASS = 0 head_model = YOLO(head_model_name) # Your OBB head detection model HEAD_CLASS = 0 bbox_smoother = BBoxSmoother(5) if smooth_bbox_tracks else None # persistent state inside_state = {} entry_time = {} accumulated_time = defaultdict(float) first_entry_vid = {} last_exit_vid = {} last_seen = {} prev_along = {} prev_time = {} entry_along = {} travel_distance = defaultdict(float) display_pos = {} head_foot_positions = {} # Stores head detections with estimated foot positions person_only_ids = set() # Track person-only detections head_only_ids = set() # Track head-only detections # crossing trackers prev_foot = {} # {id: (x,y)} previous foot coordinate (image space) crossed_l1_flag = {} # {id: bool} whether this id has crossed L1 (in required direction) and not yet used to count crossed_l2_counted = {} # {id: bool} whether this id has already triggered the global count by crossing L2 after L1 prev_l1_dist = {} # Track distance to L1 prev_l2_dist = {} # Track distance to L2 global_counter = 0 # counts completed L1->L2 sequences completed_times = [] # for avg time taken sequential_entries = [] cap = cv2.VideoCapture(input_video_path) if not cap.isOpened(): raise RuntimeError("Cannot open input video: " + input_video_path) fps = int(cap.get(cv2.CAP_PROP_FPS)) or 25 width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) out_w = width + RIGHT_PANEL_W out_h = height fourcc = cv2.VideoWriter_fourcc(*'mp4v') # or 'H264' or 'avc1' output_video_path = "output23.mp4" # Must be .mp4 extension writer = cv2.VideoWriter(output_video_path, fourcc, fps, (out_w, out_h)) if not writer.isOpened(): raise RuntimeError("Failed to open VideoWriter. Try different codec or path.") # adjust patch scale if too tall PATCH_PATH_local = PATCH_PATH.copy() patch_w = PATCH_W patch_h = PATCH_H patch_scale = PATCH_EXTRAS["scale"] if patch_h > height - 40: factor = (height - 60) / patch_h PATCH_PATH_local = PATCH_PATH_local * factor patch_w = int(patch_w * factor) patch_h = int(patch_h * factor) patch_scale = patch_scale * factor # Create homography from POLYGON (image A,B,C,D) to rect corners in patch coordinates (A_p,B_p,C_p,D_p) A_p = PATCH_EXTRAS["A"] B_p = PATCH_EXTRAS["B"] C_p = PATCH_EXTRAS["C"] D_p = PATCH_EXTRAS["D"] dest_rect = np.array([A_p, B_p, C_p, D_p], dtype=np.float32) H_img2patch = cv2.getPerspectiveTransform(POLYGON.astype(np.float32), dest_rect.astype(np.float32)) start_time = time.time() frame_idx = 0 # precompute line endpoints & ints for visualization and intersection tests L1 = (L1_p1, L1_p2) L2 = (L2_p1, L2_p2) while True: ret, frame = cap.read() if not ret: break frame_idx += 1 progress_bar(frame_idx, total_frames) now = time.time() vid_seconds = now - start_time if enhance_frames: frame = cv2.fastNlMeansDenoisingColored(frame, None, 5,5,7,21) results = model.track( frame, persist=True, tracker="bytetrack.yaml", classes=[PERSON_CLASS], conf=conf_threshold, iou=0.5, imgsz=img_size, device=device, half=use_gpu, verbose=False ) # Head detection (NEW - runs in parallel) head_results = head_model(frame, conf=conf_threshold, classes=[HEAD_CLASS], verbose=False)[0] # Process head detections obb_boxes = [] obb_scores = [] obb_data = [] head_foot_positions = {} # {estimated_foot_pos: (head_box, conf)} if head_results.obb is not None and len(head_results.obb) > 0: for obb in head_results.obb: xyxyxyxy = obb.xyxyxyxy[0].cpu().numpy() conf = float(obb.conf[0]) if conf < conf_threshold: continue obb_boxes.append(xyxyxyxy.flatten().tolist()) obb_scores.append(conf) obb_data.append((xyxyxyxy, conf)) # Apply NMS to head detections if len(obb_boxes) > 0: keep_indices = nms_obb(obb_boxes, obb_scores, 0.4) for idx in keep_indices: xyxyxyxy, conf = obb_data[idx] # Convert OBB to axis-aligned bbox x_min = int(xyxyxyxy[:, 0].min()) y_min = int(xyxyxyxy[:, 1].min()) x_max = int(xyxyxyxy[:, 0].max()) y_max = int(xyxyxyxy[:, 1].max()) head_cx = (x_min + x_max) / 2.0 head_cy = float(y_min) # Calculate foot from head foot_x, foot_y = calculate_foot_from_head( [x_min, y_min, x_max, y_max], (head_cx, head_cy) ) head_foot_positions[(foot_x, foot_y)] = ((x_min, y_min, x_max, y_max, xyxyxyxy), conf) # draw polygon on frame cv2.polylines(frame, [POLYGON.astype(np.int32)], True, (255,0,0), 3) # draw L1 and L2 on frame (blue) cv2.line(frame, tuple(map(int, L1_p1)), tuple(map(int, L1_p2)), (255,180,0), 3) cv2.line(frame, tuple(map(int, L2_p1)), tuple(map(int, L2_p2)), (255,180,0), 3) right_panel = np.ones((height, RIGHT_PANEL_W, 3), dtype=np.uint8) * 40 patch = np.ones((patch_h, patch_w, 3), dtype=np.uint8) * 255 # draw patch structure: rectangle and center divider A_px = (int(dest_rect[0][0]), int(dest_rect[0][1])) B_px = (int(dest_rect[1][0]), int(dest_rect[1][1])) C_px = (int(dest_rect[2][0]), int(dest_rect[2][1])) D_px = (int(dest_rect[3][0]), int(dest_rect[3][1])) # walls (thick black lines) cv2.line(patch, A_px, D_px, (0,0,0), 6) # left cv2.line(patch, A_px, B_px, (0,0,0), 6) # bottom cv2.line(patch, B_px, C_px, (0,0,0), 6) # right cv2.line(patch, D_px, C_px, (0,0,0), 6) # top # center divider F->E F_px = ( (D_px[0] + C_px[0])//2, D_px[1] ) E_px = (F_px[0], D_px[1] + int((patch_h) * 0.5)) cv2.line(patch, F_px, E_px, (0,0,0), 6) for p in [A_px, B_px, C_px, D_px, F_px, E_px]: cv2.circle(patch, p, 5, (0,0,0), -1) # Match person detections with head detections person_head_matches = {} # {person_id: head_foot_pos} matched_heads = set() b = results[0].boxes detected_ids = set() current_inside = [] current_projs = [] if b is not None and b.id is not None: boxes = b.xyxy.cpu().numpy() ids = b.id.cpu().numpy().astype(int) if bbox_smoother is not None: boxes = bbox_smoother.smooth(boxes, ids) # First pass: match person detections with head detections for box, tid in zip(boxes, ids): x1, y1, x2, y2 = map(int, box) person_foot_x = float((x1 + x2) / 2.0) person_foot_y = float(y2) # Find closest head detection within reasonable distance best_head = None best_dist = 100 # pixels threshold for head_foot_pos, (head_box_data, head_conf) in head_foot_positions.items(): head_fx, head_fy = head_foot_pos dist = np.sqrt((person_foot_x - head_fx)**2 + (person_foot_y - head_fy)**2) # Check if head is roughly above person bbox (y_head < y_person_top) head_box = head_box_data[:4] if head_box[3] < y1 + 50: # head bottom should be near person top if dist < best_dist and head_foot_pos not in matched_heads: best_dist = dist best_head = head_foot_pos if best_head: person_head_matches[tid] = best_head matched_heads.add(best_head) person_only_ids.discard(tid) else: person_only_ids.add(tid) for box, tid in zip(boxes, ids): x1, y1, x2, y2 = map(int, box) # Use head-derived foot if available, otherwise use person bbox foot if tid in person_head_matches: fx, fy = person_head_matches[tid] head_box_data, head_conf = head_foot_positions[person_head_matches[tid]] head_box = head_box_data[:4] xyxyxyxy = head_box_data[4] # Draw head OBB (cyan for matched detection) points = xyxyxyxy.astype(np.int32) cv2.polylines(frame, [points], True, (255, 255, 0), 2) else: fx = float((x1 + x2) / 2.0) fy = float(y2) # bottom center (foot) detected_ids.add(tid) last_seen[tid] = now inside = point_in_polygon(fx, fy, POLYGON) prev = inside_state.get(tid, False) # maintain prev_foot for intersection tests prev_pt = prev_foot.get(tid, None) current_pt = (fx, fy) # Crossing detection for L1 # if prev_pt is not None: # # check intersection with L1 # inter_l1 = segment_intersects(prev_pt, current_pt, L1_p1, L1_p2) # if inter_l1: # # check direction: we want prev_sign != curr_sign and curr_sign == inside sign # prev_sign = np.sign(signed_dist_to_line(prev_pt, L1_coeff)) # curr_sign = np.sign(signed_dist_to_line(current_pt, L1_coeff)) # if prev_sign == 0: # prev_sign = -curr_sign if curr_sign != 0 else 1.0 # if curr_sign == 0: # curr_sign = prev_sign # if prev_sign != curr_sign and curr_sign == L1_inside_sign: # # crossed L1 in correct direction (outside -> inside) # crossed_l1_flag[tid] = True # # check intersection with L2 # inter_l2 = segment_intersects(prev_pt, current_pt, L2_p1, L2_p2) # if inter_l2: # prev_sign = np.sign(signed_dist_to_line(prev_pt, L2_coeff)) # curr_sign = np.sign(signed_dist_to_line(current_pt, L2_coeff)) # if prev_sign == 0: # prev_sign = -curr_sign if curr_sign != 0 else 1.0 # if curr_sign == 0: # curr_sign = prev_sign # if prev_sign != curr_sign and curr_sign == L2_inside_sign: # # crossed L2 in correct direction; if previously crossed L1 and not yet counted => count # if crossed_l1_flag.get(tid, False) and not crossed_l2_counted.get(tid, False): # global_counter += 1 # crossed_l2_counted[tid] = True # # Record the sequential entry # entry_vid_time = first_entry_vid.get(tid, vid_seconds) # sequential_entries.append({ # 'person_num': global_counter, # 'tid': tid, # 'entry_time': entry_vid_time, # 'exit_time': None, # 'duration': None # }) # # once person completed crossing sequence, we keep their travel/time records intact # update prev_foot # prev_foot[tid] = current_pt # maintain prev_foot for intersection tests prev_pt = prev_foot.get(tid, None) current_pt = (fx, fy) # Calculate signed distances to both lines curr_l1_dist = signed_dist_to_line(current_pt, L1_coeff) curr_l2_dist = signed_dist_to_line(current_pt, L2_coeff) # Robust crossing detection if prev_pt is not None and tid in prev_l1_dist and tid in prev_l2_dist: prev_l1 = prev_l1_dist[tid] prev_l2 = prev_l2_dist[tid] # === L1 CROSSING (3 detection methods) === # Method 1: Segment intersection (current method) inter_l1 = segment_intersects(prev_pt, current_pt, L1_p1, L1_p2) # Method 2: Sign change in distance prev_sign_l1 = np.sign(prev_l1) curr_sign_l1 = np.sign(curr_l1_dist) if prev_sign_l1 == 0: prev_sign_l1 = 1.0 if curr_sign_l1 == 0: curr_sign_l1 = prev_sign_l1 sign_change_l1 = (prev_sign_l1 != curr_sign_l1) correct_dir_l1 = (curr_sign_l1 == L1_inside_sign) # Method 3: Close proximity check (catches near-misses) close_to_l1 = abs(curr_l1_dist) < 35 # within 40 pixels was_far_l1 = abs(prev_l1) > 40 # was at least 20 pixels away moving_toward_l1 = abs(curr_l1_dist) < abs(prev_l1) # getting closer # Trigger L1 crossing if ANY method detects it if (inter_l1 or (sign_change_l1 and correct_dir_l1) or (close_to_l1 and was_far_l1 and moving_toward_l1 and correct_dir_l1)): if inside and not crossed_l1_flag.get(tid, False): crossed_l1_flag[tid] = True print(f"L1 crossed by ID {tid}") # === L2 CROSSING (3 detection methods) === # Method 1: Segment intersection inter_l2 = segment_intersects(prev_pt, current_pt, L2_p1, L2_p2) # Method 2: Sign change in distance prev_sign_l2 = np.sign(prev_l2) curr_sign_l2 = np.sign(curr_l2_dist) if prev_sign_l2 == 0: prev_sign_l2 = 1.0 if curr_sign_l2 == 0: curr_sign_l2 = prev_sign_l2 sign_change_l2 = (prev_sign_l2 != curr_sign_l2) correct_dir_l2 = (curr_sign_l2 == L2_inside_sign) # Method 3: Close proximity check close_to_l2 = abs(curr_l2_dist) < 40 was_far_l2 = abs(prev_l2) > 20 moving_toward_l2 = abs(curr_l2_dist) < abs(prev_l2) # Trigger L2 crossing if ANY method detects it if (inter_l2 or (sign_change_l2 and correct_dir_l2) or (close_to_l2 and was_far_l2 and moving_toward_l2 and correct_dir_l2)): # Count only if L1 was already crossed and not yet counted if inside and crossed_l1_flag.get(tid, False) and not crossed_l2_counted.get(tid, False): global_counter += 1 crossed_l2_counted[tid] = True print(f"✓ COUNTED: ID {tid} | Global count now: {global_counter}") entry_vid_time = first_entry_vid.get(tid, vid_seconds) sequential_entries.append({ 'person_num': global_counter, 'tid': tid, 'entry_time': entry_vid_time, 'exit_time': None, 'duration': None }) # Update distance tracking for next frame prev_l1_dist[tid] = curr_l1_dist prev_l2_dist[tid] = curr_l2_dist prev_foot[tid] = current_pt if inside and not prev: inside_state[tid] = True if tid not in entry_time: entry_time[tid] = now if tid not in first_entry_vid: first_entry_vid[tid] = vid_seconds if tid not in accumulated_time: accumulated_time[tid] = 0.0 if tid not in travel_distance: travel_distance[tid] = 0.0 # draw bbox only for inside persons if inside: # Green if matched with head, yellow if person-only color = (0, 200, 0) if tid in person_head_matches else (0, 200, 200) cv2.rectangle(frame, (x1,y1), (x2,y2), color, 2) cv2.putText(frame, f"ID {tid}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2) # map foot point through homography to patch coordinates (this is the key) pt_img = np.array([[[fx, fy]]], dtype=np.float32) mapped = cv2.perspectiveTransform(pt_img, H_img2patch)[0][0] mx = float(np.clip(mapped[0], 0, patch_w - 1)) my = float(np.clip(mapped[1], 0, patch_h - 1)) # smooth display position if tid in display_pos: px_prev, py_prev = display_pos[tid] sx = SMOOTH_ALPHA dx = px_prev*(1 - sx) + mx*sx dy = py_prev*(1 - sx) + my*sx else: dx, dy = mx, my display_pos[tid] = (dx, dy) current_inside.append(tid) # compute along_m using image-based method for metric consistency along_m = image_point_to_along_m((fx, fy)) current_projs.append((tid, along_m)) # initialize prev_along if first time if tid not in prev_along: prev_along[tid] = along_m entry_along[tid] = along_m prev_time[tid] = now # compute forward-only travel distance delta = along_m - prev_along.get(tid, along_m) if delta > 0: travel_distance[tid] += delta prev_along[tid] = along_m prev_time[tid] = now for head_foot_pos, (head_box_data, head_conf) in head_foot_positions.items(): if head_foot_pos in matched_heads: continue # Already matched with a person fx, fy = head_foot_pos # Only process if inside polygon if not point_in_polygon(fx, fy, POLYGON): continue # Try to match with existing tracked IDs by proximity matched_existing = False for tid in list(inside_state.keys()): if tid in detected_ids: continue # Already detected this frame if tid in display_pos: prev_x, prev_y = display_pos[tid] # Check if head is near previous position dist = np.sqrt((fx - prev_x)**2 + (fy - prev_y)**2) if dist < 80: # pixels threshold # Reactivate this ID using head detection detected_ids.add(tid) last_seen[tid] = now prev_foot[tid] = (fx, fy) matched_existing = True head_only_ids.add(tid) # Draw head detection (red for head-only recovery) head_box = head_box_data[:4] xyxyxyxy = head_box_data[4] points = xyxyxyxy.astype(np.int32) cv2.polylines(frame, [points], True, (0, 0, 255), 2) cv2.putText(frame, f"ID {tid} (H)", (int(head_box[0]), int(head_box[1]) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) # Continue tracking inside_state[tid] = True current_inside.append(tid) # Map through homography pt_img = np.array([[[fx, fy]]], dtype=np.float32) mapped = cv2.perspectiveTransform(pt_img, H_img2patch)[0][0] mx = float(np.clip(mapped[0], 0, patch_w - 1)) my = float(np.clip(mapped[1], 0, patch_h - 1)) # Smooth display position if tid in display_pos: px_prev, py_prev = display_pos[tid] sx = SMOOTH_ALPHA dx = px_prev*(1 - sx) + mx*sx dy = py_prev*(1 - sx) + my*sx else: dx, dy = mx, my display_pos[tid] = (dx, dy) # Track travel distance along_m = image_point_to_along_m((fx, fy)) current_projs.append((tid, along_m)) if tid not in prev_along: prev_along[tid] = along_m entry_along[tid] = along_m prev_time[tid] = now delta = along_m - prev_along.get(tid, along_m) if delta > 0: travel_distance[tid] += delta prev_along[tid] = along_m prev_time[tid] = now break # finalize exits after missing timeout known_ids = set(list(inside_state.keys()) + list(last_seen.keys())) for tid in list(known_ids): if inside_state.get(tid, False) and tid not in detected_ids: ls = last_seen.get(tid, None) if ls is None: continue missing = now - ls if missing > missing_timeout: inside_state[tid] = False if tid in entry_time: accumulated_time[tid] += now - entry_time[tid] exit_vid_time = ls - start_time last_exit_vid[tid] = exit_vid_time completed_times.append(accumulated_time[tid]) # Update sequential entry exit time for entry in sequential_entries: if entry['tid'] == tid and entry['exit_time'] is None: entry['exit_time'] = exit_vid_time entry['duration'] = accumulated_time[tid] break entry_time.pop(tid, None) else: # within occlusion grace window -> keep inside state pass # Reappearance inheritance logic (same as prior): copy neighbor state if ID lost & reappears current_projs_map = {tid: a for tid, a in current_projs} for tid, along in current_projs: if tid in prev_along: continue candidates = [] for other_tid, other_al in current_projs_map.items(): if other_tid == tid: continue candidates.append((other_tid, other_al)) if not candidates and prev_along: candidates = [(other_tid, prev_along_val) for other_tid, prev_along_val in prev_along.items() if other_tid != tid] if not candidates: prev_along[tid] = along entry_along.setdefault(tid, along) prev_time[tid] = now continue neighbor_tid, neighbor_al = min(candidates, key=lambda x: abs(x[1] - along)) if abs(neighbor_al - along) < max(0.5, sum(SEG_REAL_M)*0.5): prev_along[tid] = prev_along.get(neighbor_tid, neighbor_al) entry_along[tid] = entry_along.get(neighbor_tid, neighbor_al) prev_time[tid] = now accumulated_time[tid] = accumulated_time.get(neighbor_tid, 0.0) if neighbor_tid in entry_time: entry_time[tid] = entry_time[neighbor_tid] else: entry_time[tid] = now - accumulated_time[tid] # also inherit crossed L1/L2 flags if neighbor had them (helps maintain global count consistency) if crossed_l1_flag.get(neighbor_tid, False) and not crossed_l1_flag.get(tid, False): crossed_l1_flag[tid] = True if crossed_l2_counted.get(neighbor_tid, False) and not crossed_l2_counted.get(tid, False): crossed_l2_counted[tid] = True else: prev_along[tid] = along entry_along.setdefault(tid, along) prev_time[tid] = now # build display list sorted by along for consistent ordering disp = [] for tid in current_inside: if tid not in display_pos: continue dx, dy = display_pos[tid] cur_al = prev_along.get(tid, entry_along.get(tid, 0.0)) t_inside = int(now - entry_time[tid]) if tid in entry_time else int(accumulated_time.get(tid, 0.0)) trav = travel_distance.get(tid, 0.0) disp.append((tid, int(round(dx)), int(round(dy)), t_inside, trav, cur_al)) disp.sort(key=lambda x: x[5]) # by along # draw patch dots and labels (no velocity) for tid, xi, yi, t_inside, trav, _ in disp: cv2.circle(patch, (xi, yi), 6, (0,0,255), -1) cv2.putText(patch, f"ID {tid}", (xi+8, yi-8), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0,0,0), 1) cv2.putText(patch, f"{t_inside}s {trav:.2f}m", (xi+8, yi+8), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0,0,0), 1) # compute avg time taken from completed_times avg_time_taken = float(np.mean(completed_times)) if len(completed_times) > 0 else 0.0 # top-right summary: show both counters panel_h, panel_w = 220, 350 panel = np.ones((panel_h, panel_w, 3), dtype=np.uint8) * 255 cv2.putText(panel, "Zone Summary", (12, 24), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,0,0), 2) cv2.putText(panel, f"Inside count: {len(disp)}", (12, 58), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0,120,0), 2) cv2.putText(panel, f"Global count: {global_counter}", (12, 92), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,0,128), 2) cv2.putText(panel, f"Avg time taken: {int(avg_time_taken)}s", (12, 126), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,0,0), 2) yv = 150 for tid, _, _, t_inside, trav, _ in disp[:8]: cv2.putText(panel, f"ID {tid}: {t_inside}s, {trav:.2f}m", (12, yv), cv2.FONT_HERSHEY_SIMPLEX, 0.55, (50,50,50), 1) yv += 18 final = np.hstack((frame, right_panel)) # place panel top-right inside right panel panel_x = width + (RIGHT_PANEL_W - panel_w)//2 panel_y = 10 final[panel_y:panel_y+panel_h, panel_x:panel_x+panel_w] = panel # place patch below panel patch_x = width + (RIGHT_PANEL_W - patch_w)//2 patch_y = panel_y + panel_h + 10 if patch_y + patch_h > height: patch_y = height - patch_h - 10 final[patch_y:patch_y+patch_h, patch_x:patch_x+patch_w] = patch writer.write(np.ascontiguousarray(final)) # finalize end_t = time.time() for tid in list(entry_time.keys()): accumulated_time[tid] += end_t - entry_time[tid] exit_vid_time = last_seen.get(tid, end_t) - start_time last_exit_vid[tid] = exit_vid_time completed_times.append(accumulated_time[tid]) # Update sequential entry exit time for entry in sequential_entries: if entry['tid'] == tid and entry['exit_time'] is None: entry['exit_time'] = exit_vid_time entry['duration'] = accumulated_time[tid] break entry_time.pop(tid, None) inside_state[tid] = False cap.release() writer.release() # export excel (only >0) # export excel with sequential person numbers rows = [] for entry in sequential_entries: if entry['exit_time'] is not None and entry['duration'] is not None and entry['duration'] > 0: rows.append({ "Person": entry['person_num'], "Time in": fmt(entry['entry_time']), "Time out": fmt(entry['exit_time']), "Time in queue (seconds)": round(float(entry['duration']), 2) }) df = pd.DataFrame(rows, columns=["Person","Time in","Time out","Time in queue (seconds)"]) if len(df) > 0: df.to_excel("person_times_2.xlsx", index=False) else: pd.DataFrame(columns=["Passenger","Time in","Time out","Time in queue (seconds)"]).to_excel("person_times_2.xlsx", index=False) print("\nFinished. Output:", os.path.abspath(output_video_path)) print("Saved times:", os.path.abspath("person_times_2.xlsx")) # # ---------------- Runner # if __name__ == "__main__": # CONFIG = { # 'input_video_path': "sample_vid_o.mp4", # 'output_video_path': "output24.avi", # 'model_name': "yolo11x.pt", # 'head_model_name': "head_detection_single_video_best.pt", # 'conf_threshold': 0.3, # 'img_size': 1280, # 'use_gpu': True, # 'enhance_frames': False, # 'smooth_bbox_tracks': True, # 'missing_timeout': 3.0 # } # process_video( # input_video_path = CONFIG['input_video_path'], # output_video_path = CONFIG['output_video_path'], # model_name = CONFIG['model_name'], # head_model_name = CONFIG['head_model_name'], # conf_threshold = CONFIG['conf_threshold'], # img_size = CONFIG['img_size'], # use_gpu = CONFIG['use_gpu'], # enhance_frames = CONFIG['enhance_frames'], # smooth_bbox_tracks = CONFIG['smooth_bbox_tracks'], # missing_timeout = CONFIG['missing_timeout'] # ) # ---------------- Gradio Interface import gradio as gr import tempfile import shutil def gradio_process_video(input_video, conf_threshold=0.3, missing_timeout=3.0): """ Wrapper function for Gradio interface """ try: # Create temporary directory for outputs temp_dir = tempfile.mkdtemp() # Define output paths output_video_path = os.path.join(temp_dir, "output_tracking.mp4") excel_path = os.path.join(temp_dir, "person_times.xlsx") # Copy the excel file path for the process_video function to use original_excel = "person_times_2.xlsx" # Run the processing CONFIG = { 'input_video_path': input_video, 'output_video_path': output_video_path, 'model_name': "yolo11x.pt", 'head_model_name': "head_detection_single_video_best.pt", 'conf_threshold': float(conf_threshold), 'img_size': 1280, 'use_gpu': torch.cuda.is_available(), 'enhance_frames': False, 'smooth_bbox_tracks': True, 'missing_timeout': float(missing_timeout) } process_video( input_video_path=CONFIG['input_video_path'], output_video_path=CONFIG['output_video_path'], model_name=CONFIG['model_name'], head_model_name=CONFIG['head_model_name'], conf_threshold=CONFIG['conf_threshold'], img_size=CONFIG['img_size'], use_gpu=CONFIG['use_gpu'], enhance_frames=CONFIG['enhance_frames'], smooth_bbox_tracks=CONFIG['smooth_bbox_tracks'], missing_timeout=CONFIG['missing_timeout'] ) # Copy the generated excel file to temp directory if os.path.exists(original_excel): shutil.copy(original_excel, excel_path) return output_video_path, excel_path except Exception as e: print(f"Error processing video: {str(e)}") import traceback traceback.print_exc() return None, None # Create Gradio interface with gr.Blocks(title="Queue Tracking System") as demo: gr.Markdown( """ # 🎯 Queue Tracking & Analytics System Upload a video to track people in a defined polygon area. The system will: - Track people entering and exiting the zone - Count directional crossings through L1 and L2 lines - Calculate time spent in queue - Measure travel distance - Detect both full body and head-only detections **Note:** Processing may take several minutes depending on video length. """ ) with gr.Row(): with gr.Column(): video_input = gr.Video( label="Upload Video", format="mp4" ) conf_threshold = gr.Slider( minimum=0.1, maximum=0.9, value=0.3, step=0.05, label="Detection Confidence Threshold", info="Lower values detect more objects but may include false positives" ) missing_timeout = gr.Slider( minimum=1.0, maximum=10.0, value=3.0, step=0.5, label="Missing Timeout (seconds)", info="How long to wait before considering a person has left the zone" ) process_btn = gr.Button("🚀 Process Video", variant="primary", size="lg") with gr.Column(): video_output = gr.Video( label="Processed Video with Tracking", format="mp4" ) excel_output = gr.File( label="Download Excel Report", file_types=[".xlsx"] ) gr.Markdown( """ ### 📊 Output Information: - **Processed Video**: Shows tracking overlay with IDs, polygon area, and crossing lines - **Excel Report**: Contains entry/exit times and queue duration for each person """ ) gr.Markdown( """ --- ### 🔧 Technical Details: - Uses YOLO11x for person detection - Custom head detection model for occlusion handling - Homographic transformation for accurate spatial mapping - ByteTrack for robust ID tracking - Directional crossing detection (L1 → L2) """ ) # Connect the button to the processing function process_btn.click( fn=gradio_process_video, inputs=[video_input, conf_threshold, missing_timeout], outputs=[video_output, excel_output] ) # Add examples if you have sample videos gr.Examples( examples=[ ["sample_vid_o.mp4", 0.3, 3.0], ], inputs=[video_input, conf_threshold, missing_timeout], outputs=[video_output, excel_output], fn=gradio_process_video, cache_examples=False, ) # Launch the app if __name__ == "__main__": demo.launch( share=False, # Set to True if you want a temporary public link server_name="0.0.0.0", # Important for Hugging Face Spaces server_port=7860 # Default port for HF Spaces )