File size: 5,289 Bytes
ac55573
e93dd75
 
 
ac55573
a8054b3
e93dd75
 
3b03261
 
 
 
 
 
ac55573
3b03261
 
 
ac55573
 
29901d7
3b03261
ac55573
 
 
 
 
 
 
 
 
 
 
 
3b03261
 
ac55573
 
a8054b3
ac55573
 
a8054b3
ac55573
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3b03261
ac55573
 
 
 
a8054b3
ac55573
2b307a5
ac55573
 
a8054b3
ac55573
29901d7
ac55573
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
29901d7
ac55573
 
 
 
a8054b3
 
ac55573
3b03261
 
 
a8054b3
ac55573
 
29901d7
ac55573
0848e32
29901d7
ac55573
 
29901d7
ac55573
 
 
a8054b3
ac55573
a8054b3
ac55573
 
a8054b3
 
29901d7
a8054b3
3b03261
 
 
a8054b3
 
ac55573
a8054b3
ac55573
 
 
a8054b3
ac55573
 
 
a8054b3
 
ac55573
 
 
 
a8054b3
 
ac55573
 
a8054b3
 
ac55573
 
 
 
 
 
 
a8054b3
ac55573
 
a8054b3
 
ac55573
 
 
 
a8054b3
 
ac55573
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# Maximize CPU usage
import multiprocessing
import cv2

# Set OpenCV to use all available cores
cv2.setNumThreads(multiprocessing.cpu_count())

##############
import gradio as gr
import numpy as np
from PIL import Image, ImageDraw
from ultralytics import YOLO
import logging

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Global variables
start_point = end_point = line_params = None

def extract_first_frame(stream_url):
    """Extracts first frame from IP camera"""
    cap = cv2.VideoCapture(stream_url)
    if not cap.isOpened():
        return None, "Error: Could not open stream."
    
    ret, frame = cap.read()
    cap.release()
    
    if not ret:
        return None, "Error: Could not read frame."
    
    return Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)), "First frame extracted."

def update_line(image, evt: gr.SelectData):
    """Handles line drawing interactions"""
    global start_point, end_point, line_params
    
    if not start_point:
        start_point = (evt.index[0], evt.index[1])
        draw = ImageDraw.Draw(image)
        draw.ellipse((start_point[0]-5, start_point[1]-5, start_point[0]+5, start_point[1]+5),
                    fill="blue", outline="blue")
        return image, f"Start: {start_point}"
    
    end_point = (evt.index[0], evt.index[1])
    draw = ImageDraw.Draw(image)
    draw.line([start_point, end_point], fill="red", width=2)
    draw.ellipse((end_point[0]-5, end_point[1]-5, end_point[0]+5, end_point[1]+5),
                fill="green", outline="green")
    
    # Calculate line parameters
    if start_point[0] != end_point[0]:
        slope = (end_point[1] - start_point[1]) / (end_point[0] - start_point[0])
        intercept = start_point[1] - slope * start_point[0]
        line_params = (slope, intercept, start_point, end_point)
    else:
        line_params = (float('inf'), start_point[0], start_point, end_point)
    
    start_point = None
    return image, f"Line: {line_params[2]} to {line_params[3]}"

def intersect(A, B, C, D):
    """Check line segment intersection"""
    def ccw(A, B, C):
        return (C[1]-A[1])*(B[0]-A[0]) > (B[1]-A[1])*(C[0]-A[0])
    
    return ccw(A,C,D) != ccw(B,C,D) and ccw(A,B,C) != ccw(A,B,D)

def is_crossing(box, line_params):
    """Check if box crosses line"""
    if not line_params:
        return False
    
    (x1, y1), (x2, y2) = line_params[2], line_params[3]
    box_edges = [
        ((box[0], box[1]), (box[2], box[1])),
        ((box[2], box[1]), (box[2], box[3])),
        ((box[2], box[3]), (box[0], box[3])),
        ((box[0], box[3]), (box[0], box[1]))
    ]
    
    intersections = 0
    for edge in box_edges:
        if intersect((x1,y1), (x2,y2), edge[0], edge[1]):
            intersections += 1
            if intersections >= 2:
                return True
    return False

def process_video(conf=0.5, classes=None, stream_url=None):
    """Main processing function"""
    global line_params
    
    # Initialize YOLOv11
    model = YOLO('yolo11n.pt')
    
    cap = cv2.VideoCapture(stream_url)
    crossed = set()
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        
        # Run inference
        results = model.track(frame, persist=True, conf=conf, classes=classes)
        
        # Process results
        if results[0].boxes.id is not None:
            boxes = results[0].boxes.xyxy.cpu().numpy()
            ids = results[0].boxes.id.cpu().numpy().astype(int)
            clss = results[0].boxes.cls.cpu().numpy().astype(int)
            
            for box, tid, cls in zip(boxes, ids, clss):
                if is_crossing(box, line_params) and tid not in crossed:
                    crossed.add(tid)
        
        # Draw overlays
        annotated = results[0].plot()
        if line_params:
            cv2.line(annotated, line_params[2], line_params[3], (0,255,0), 2)
        cv2.putText(annotated, f"Count: {len(crossed)}", (10,30),
                   cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0), 2)
        
        yield cv2.cvtColor(annotated, cv2.COLOR_BGR2RGB), ""

    cap.release()

# Gradio Interface
with gr.Blocks() as app:
    gr.Markdown("# CCTV Object Counter - YOLOv11")
    
    # Stream setup
    url = gr.Textbox(label="Stream URL", value="https://example.com/stream.m3u8")
    frame_btn = gr.Button("Get First Frame")
    
    # Image components
    img = gr.Image(label="Draw Detection Line", interactive=True)
    line_info = gr.Textbox(label="Line Coordinates")
    
    # Controls
    classes = gr.CheckboxGroup(label="Classes", choices=[
        "person", "car", "truck", "motorcycle"
    ], value=["person"])
    conf = gr.Slider(0.1, 1.0, value=0.4, label="Confidence Threshold")
    
    # Output
    video_out = gr.Image(label="Live View", streaming=True)
    status = gr.Textbox(label="Status")
    
    # Interactions
    frame_btn.click(
        extract_first_frame,
        inputs=url,
        outputs=[img, status]
    )
    
    img.select(
        update_line,
        inputs=img,
        outputs=[img, line_info]
    )
    
    gr.Button("Start Counting").click(
        process_video,
        inputs=[conf, classes, url],
        outputs=[video_out, status]
    )

app.launch()