Spaces:
Sleeping
Sleeping
Create app.py
Browse files
app.py
ADDED
@@ -0,0 +1,82 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
import torch
|
2 |
+
from transformers import AutoImageProcessor, AutoModelForObjectDetection
|
3 |
+
from PIL import Image
|
4 |
+
import cv2
|
5 |
+
import numpy as np
|
6 |
+
import time
|
7 |
+
from flask import Flask, jsonify, request
|
8 |
+
|
9 |
+
# Initialize Flask app
|
10 |
+
app = Flask(__name__)
|
11 |
+
|
12 |
+
# Device setup (GPU or CPU)
|
13 |
+
device = 'cpu'
|
14 |
+
if torch.cuda.is_available():
|
15 |
+
device = torch.device('cuda')
|
16 |
+
elif torch.backends.mps.is_available():
|
17 |
+
device = torch.device('mps')
|
18 |
+
|
19 |
+
# Load pre-trained model and image processor from Hugging Face
|
20 |
+
ckpt = 'yainage90/fashion-object-detection'
|
21 |
+
image_processor = AutoImageProcessor.from_pretrained(ckpt)
|
22 |
+
model = AutoModelForObjectDetection.from_pretrained(ckpt).to(device)
|
23 |
+
|
24 |
+
def detect_objects(frame):
|
25 |
+
"""Detect objects in the video frame."""
|
26 |
+
# Convert the frame to PIL image
|
27 |
+
image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
|
28 |
+
|
29 |
+
# Prepare inputs for the model
|
30 |
+
with torch.no_grad():
|
31 |
+
inputs = image_processor(images=[image], return_tensors="pt")
|
32 |
+
outputs = model(**inputs.to(device))
|
33 |
+
target_sizes = torch.tensor([[image.size[1], image.size[0]]])
|
34 |
+
results = image_processor.post_process_object_detection(outputs, threshold=0.4, target_sizes=target_sizes)[0]
|
35 |
+
|
36 |
+
# Extract the detected items
|
37 |
+
items = []
|
38 |
+
for score, label, box in zip(results["scores"], results["labels"], results["boxes"]):
|
39 |
+
score = score.item()
|
40 |
+
label = label.item()
|
41 |
+
box = [i.item() for i in box]
|
42 |
+
print(f"{model.config.id2label[label]}: {round(score, 3)} at {box}")
|
43 |
+
items.append((score, label, box))
|
44 |
+
|
45 |
+
return items
|
46 |
+
|
47 |
+
def save_data(frame, items):
|
48 |
+
"""Save image and extract plate number."""
|
49 |
+
filename = f"helmet_violation_{int(time.time())}.jpg"
|
50 |
+
cv2.imwrite(filename, frame)
|
51 |
+
|
52 |
+
# Here, you'd extract plate numbers or process further
|
53 |
+
plate_number = extract_plate_number(frame)
|
54 |
+
save_to_database(filename, plate_number, items)
|
55 |
+
|
56 |
+
def extract_plate_number(frame):
|
57 |
+
"""Extract license plate number (simplified)."""
|
58 |
+
plate_number = "XYZ 1234" # Replace with an actual license plate recognition method
|
59 |
+
return plate_number
|
60 |
+
|
61 |
+
def save_to_database(image_filename, plate_number, items):
|
62 |
+
"""Save the data (for simplicity, we just print it here)."""
|
63 |
+
print(f"Plate Number: {plate_number}, Image saved as {image_filename}")
|
64 |
+
print("Detected items:", items)
|
65 |
+
|
66 |
+
@app.route("/process_frame", methods=["POST"])
|
67 |
+
def process_frame():
|
68 |
+
"""Process incoming video frame via API."""
|
69 |
+
frame = request.files["frame"].read()
|
70 |
+
np_array = np.frombuffer(frame, np.uint8)
|
71 |
+
img = cv2.imdecode(np_array, cv2.IMREAD_COLOR)
|
72 |
+
|
73 |
+
# Detect objects (e.g., helmets) in the frame
|
74 |
+
items = detect_objects(img)
|
75 |
+
|
76 |
+
if items: # If objects are detected, save the data
|
77 |
+
save_data(img, items)
|
78 |
+
|
79 |
+
return jsonify({"status": "processed"})
|
80 |
+
|
81 |
+
if __name__ == "__main__":
|
82 |
+
app.run(debug=True, host="0.0.0.0", port=5000)
|