File size: 8,959 Bytes
7be1f70
 
 
 
074ef51
 
 
7be1f70
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
074ef51
 
7be1f70
 
074ef51
7be1f70
074ef51
 
7be1f70
 
 
074ef51
 
 
7be1f70
 
 
074ef51
 
7be1f70
 
 
074ef51
7be1f70
074ef51
7be1f70
 
 
074ef51
7be1f70
 
 
074ef51
7be1f70
 
 
074ef51
7be1f70
 
 
 
 
 
 
 
 
 
 
 
 
 
 
074ef51
7be1f70
 
074ef51
 
7be1f70
074ef51
 
 
 
 
 
 
7be1f70
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
074ef51
7be1f70
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
074ef51
7be1f70
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
074ef51
7be1f70
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
074ef51
 
 
 
 
 
7be1f70
 
074ef51
7be1f70
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
074ef51
7be1f70
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.responses import FileResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
import cv2
import numpy as np
from ultralytics import YOLO
import os
import shutil
from typing import Optional
import uuid
import base64
from io import BytesIO
from PIL import Image

# Create FastAPI app
app = FastAPI(
    title="YOLO Intrusion Detection API",
    description="API for detecting intrusions using YOLOv8 model",
    version="1.0.0"
)

# Add CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Load YOLO model
model_name = 'yolov8n.pt'
model = None  # Will be loaded on startup

# Define trapezoidal restricted area
trapezoid_pts = np.array([[250, 150], [400, 150], [450, 300], [200, 300]], np.int32)

# Create temp directory for uploads if it doesn't exist
os.makedirs("temp", exist_ok=True)

def is_inside_trapezoid(box, trapezoid_pts):
    """Check if the center of a detected object is inside the trapezoidal area."""
    x1, y1, x2, y2 = box
    cx, cy = int((x1 + x2) / 2), int((y1 + y2) / 2)  # Calculate center of detected object
    
    # Use point-in-polygon check
    return cv2.pointPolygonTest(trapezoid_pts, (cx, cy), False) >= 0

def process_image(frame):
    """Process a single image and return the annotated image and intrusion data."""
    global model
    
    # Perform object detection
    results = model.predict(frame, conf=0.5)
    annotated_frame = results[0].plot()  # Draw bounding boxes

    # Draw trapezoidal restricted area
    cv2.polylines(annotated_frame, [trapezoid_pts.reshape((-1, 1, 2))], isClosed=True, color=(0, 0, 255), thickness=2)

    isAlert = {'alert': [False, ""], 'personCount': 0}
    classInIntrusion = ['person', 'bicycle', 'car', 'motorcycle']
    
    detections = []

    # Loop through detected objects
    for r in results:
        for box, cls, conf in zip(r.boxes.xyxy, r.boxes.cls, r.boxes.conf):
            class_id = int(cls.item())  # Convert to integer
            confidence = float(conf.item())
            x1, y1, x2, y2 = map(int, box.tolist())
            
            class_name = classInIntrusion[class_id] if class_id < len(classInIntrusion) else f"class_{class_id}"
            
            # Add to detections list
            detections.append({
                "class": class_name,
                "confidence": confidence,
                "bbox": [x1, y1, x2, y2],
                "in_restricted_area": is_inside_trapezoid(box.tolist(), trapezoid_pts)
            })
            
            if class_id == 0:  # Person
                isAlert['personCount'] += 1
            
            if class_id in [0, 1, 2, 3]:  # Person, bicycle, car, motorcycle
                if is_inside_trapezoid(box.tolist(), trapezoid_pts):
                    isAlert['alert'] = [True, classInIntrusion[class_id]]
                    # Mark the intrusion with a red box
                    cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), (0, 0, 255), 3)
    
    # Add alert text on the frame
    alert_text = f"Intrusion Alert: {isAlert['alert'][0]}, Object: {isAlert['alert'][1]}, Persons: {isAlert['personCount']}"
    cv2.putText(annotated_frame, alert_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
    
    # Convert the response
    response = {
        "intrusion_detected": isAlert['alert'][0],
        "intruding_object": isAlert['alert'][1],
        "person_count": isAlert['personCount'],
        "detections": detections
    }
    
    return annotated_frame, response

def encode_image_to_base64(image):
    """Convert an OpenCV image to base64 encoded string."""
    _, buffer = cv2.imencode('.jpg', image)
    return base64.b64encode(buffer).decode('utf-8')

@app.on_event("startup")
async def startup_event():
    """Load the YOLO model when the app starts."""
    global model
    model = YOLO(model_name)
    print(f"Model {model_name} loaded successfully")

@app.get("/")
async def root():
    """Root endpoint."""
    return {
        "message": "YOLO Intrusion Detection API is running",
        "documentation": "/docs",
        "endpoints": {
            "process_image": "/process_image/",
            "process_video": "/process_video/",
            "health": "/health/"
        }
    }

@app.get("/health/")
async def health_check():
    """Health check endpoint."""
    return {"status": "healthy", "model": model_name}

@app.post("/process_image/")
async def api_process_image(file: UploadFile = File(...), return_image: bool = True):
    """
    Process an image file and detect intrusions.
    
    Args:
        file: The image file to process
        return_image: If True, returns the annotated image as base64
    
    Returns:
        JSON with detection results and optionally the annotated image
    """
    # Check file extension
    if not file.filename.lower().endswith(('.png', '.jpg', '.jpeg')):
        raise HTTPException(status_code=400, detail="Only PNG and JPG images are supported")
    
    # Read and process image
    contents = await file.read()
    nparr = np.frombuffer(contents, np.uint8)
    img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
    
    if img is None:
        raise HTTPException(status_code=400, detail="Could not decode image")
    
    # Process the image
    annotated_img, results = process_image(img)
    
    # Optionally include the annotated image
    if return_image:
        results["image"] = encode_image_to_base64(annotated_img)
    
    return results

@app.post("/process_video/")
async def api_process_video(file: UploadFile = File(...)):
    """
    Process a video file and detect intrusions.
    
    Args:
        file: The video file to process
    
    Returns:
        JSON with detection results and path to processed video
    """
    # Check file extension
    if not file.filename.lower().endswith(('.mp4', '.avi', '.mov')):
        raise HTTPException(status_code=400, detail="Only MP4, AVI, and MOV videos are supported")
    
    # Create a unique temporary file name
    temp_input = f"temp/input_{uuid.uuid4()}.mp4"
    temp_output = f"temp/output_{uuid.uuid4()}.mp4"
    
    # Save uploaded file
    with open(temp_input, "wb") as buffer:
        shutil.copyfileobj(file.file, buffer)
    
    # Process the video
    cap = cv2.VideoCapture(temp_input)
    if not cap.isOpened():
        os.remove(temp_input)
        raise HTTPException(status_code=400, detail="Could not open video file")
    
    # Get video properties
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    
    # Create output video file
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(temp_output, fourcc, fps, (width, height))
    
    # Process frames
    final_results = {
        "intrusion_detected": False,
        "intruding_objects": set(),
        "max_person_count": 0,
        "frames_processed": 0,
        "total_detections": 0
    }
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # Process the frame
        annotated_frame, frame_results = process_image(frame)
        
        # Update final results
        final_results["frames_processed"] += 1
        final_results["total_detections"] += len(frame_results["detections"])
        
        if frame_results["intrusion_detected"]:
            final_results["intrusion_detected"] = True
            if frame_results["intruding_object"]:
                final_results["intruding_objects"].add(frame_results["intruding_object"])
        
        final_results["max_person_count"] = max(
            final_results["max_person_count"], 
            frame_results["person_count"]
        )
        
        # Write the frame
        out.write(annotated_frame)
    
    # Release resources
    cap.release()
    out.release()
    
    # Convert set to list for JSON serialization
    final_results["intruding_objects"] = list(final_results["intruding_objects"])
    
    # Clean up input file
    os.remove(temp_input)
    
    return {
        "results": final_results,
        "video_path": f"/download_video/{os.path.basename(temp_output)}"
    }

@app.get("/download_video/{filename}")
async def download_video(filename: str):
    """
    Download the processed video file.
    
    Args:
        filename: The name of the processed video file
    
    Returns:
        The video file
    """
    file_path = f"temp/{filename}"
    if not os.path.exists(file_path):
        raise HTTPException(status_code=404, detail="Video not found")
    
    return FileResponse(file_path, media_type="video/mp4", filename="processed_video.mp4")

# # For local development
# if __name__ == "__main__":
#     uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True)