File size: 3,265 Bytes
8d36ed5 4d26de9 8d36ed5 02cc722 8d36ed5 81def50 8d36ed5 4d26de9 81def50 b3652fb 4d26de9 3b1c0bd 4d26de9 8d36ed5 4d26de9 8d36ed5 4d26de9 4b32247 4a9c7f0 4d26de9 8d36ed5 4d26de9 8d36ed5 4d26de9 fac5d14 4d26de9 8d36ed5 4d26de9 8d36ed5 b098c95 8d36ed5 b3652fb 8d36ed5 b3652fb e8e141f 8d36ed5 81def50 e8e141f 8d36ed5 e8e141f 8d36ed5 e8e141f 81def50 8d36ed5 4a9c7f0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
import io
import time
import numpy as np
import cv2
import torch
from transformers import DPTFeatureExtractor, DPTForDepthEstimation
from fastapi import FastAPI, File, UploadFile
from PIL import Image
import uvicorn
app = FastAPI()
# 🟢 Chọn thiết bị xử lý (GPU nếu có)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 🟢 Tải model DPT-Hybrid thay cho ZoeDepth để tăng tốc
feature_extractor = DPTFeatureExtractor.from_pretrained("Intel/dpt-swinv2-tiny-256")
model = DPTForDepthEstimation.from_pretrained("Intel/dpt-swinv2-tiny-256").to(device)
model.eval()
@app.post("/analyze_path/")
async def analyze_path(file: UploadFile = File(...)):
# 🟢 Bắt đầu đo thời gian dự đoán Depth Map
start_time = time.time()
# 🟢 Đọc file ảnh từ ESP32
image_bytes = await file.read()
image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
# 🔵 Resize ảnh để xử lý nhanh hơn
image = image.resize((192, 192)) # Giảm kích thước giúp tăng tốc độ xử lý
image_np = np.array(image)
flipped_image = cv2.flip(image_np, -1)
# 🟢 Chuẩn bị ảnh cho mô hình
inputs = feature_extractor(images=flipped_image, return_tensors="pt").to(device)
# 🟢 Dự đoán Depth Map với DPT-Hybrid
with torch.no_grad():
outputs = model(**inputs)
# 🟢 Xử lý ảnh sau khi dự đoán
predicted_depth = outputs.predicted_depth.squeeze().cpu().numpy()
depth_map = (predicted_depth * 255 / predicted_depth.max()).astype("uint8")
end_time = time.time()
print(f"⏳ DPT xử lý trong {end_time - start_time:.4f} giây")
# 🟢 Đo thời gian xử lý đường đi
start_detect_time = time.time()
command = detect_path(depth_map)
end_detect_time = time.time()
print(f"⏳ detect_path() xử lý trong {end_detect_time - start_detect_time:.4f} giây Lệnh: {command}")
return {"command": command}
def detect_path(depth_map):
"""Phân tích đường đi từ ảnh Depth Map"""
h, w = depth_map.shape
center_x = w // 2
scan_y = int(h * 0.8) # Quét dòng 80% từ trên xuống
# 🟢 Chia ảnh thành 3 vùng: trái, giữa, phải
left_region = np.mean(depth_map[scan_y, :center_x - 40])
right_region = np.mean(depth_map[scan_y, center_x + 40:])
center_region = np.mean(depth_map[scan_y, center_x - 40:center_x + 40])
# 🟢 Ngưỡng phát hiện vật cản (càng thấp, càng nhạy)
threshold = 80
# 🟢 Không có vật cản ở cả 3 vùng → đi thẳng
if left_region > threshold and center_region > threshold and right_region > threshold:
return "forward"
# 🟢 Nếu chỉ có giữa trống → đi thẳng
if center_region > threshold:
return "forward"
# 🟢 Nếu chỉ có trái hoặc phải trống → chọn hướng có vùng trống lớn nhất
if left_region > right_region:
return "left"
elif right_region > left_region:
return "right"
# 🟢 Nếu tất cả đều có vật cản → lùi lại
return "backward"
# 🟢 Chạy server FastAPI
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860) |