import io import os import sys import time import numpy as np import cv2 import torch import torchvision from fastapi import FastAPI, File, UploadFile from PIL import Image import uvicorn app = FastAPI() # 🟢 Clone ZoeDepth nếu chưa có zoe_path = "ZoeDepth" if not os.path.exists(zoe_path): os.system("git clone https://github.com/isl-org/ZoeDepth.git") # 🟢 Thêm ZoeDepth vào sys.path để import được sys.path.append(os.path.abspath(zoe_path)) # 🟢 Import ZoeDepth sau khi đã tải về from zoedepth.models.builder import build_model from zoedepth.utils.config import get_config # 🟢 Sử dụng đúng model `zoedepth_nk` device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # Load ZoeDepth Tiny model correctly config = get_config("zoedepth", version="tiny", pretrained_resource=None) model = ZoeDepth.build_from_config(config) model.eval() @app.post("/analyze_path/") async def analyze_path(file: UploadFile = File(...)): # 🟢 Đọc file ảnh từ ESP32 image_bytes = await file.read() image = Image.open(io.BytesIO(image_bytes)).convert("RGB") # 🟢 Lật ảnh trước khi tính toán Depth Map image_np = np.array(image) flipped_image = cv2.flip(image_np, -1) # 🟢 Chuyển đổi ảnh thành tensor phù hợp với ZoeDepth transform = torchvision.transforms.Compose([ torchvision.transforms.Resize((256, 256)), # ZoeDepth cần resize ảnh nhỏ hơn torchvision.transforms.ToTensor(), ]) img_tensor = transform(flipped_image).unsqueeze(0).to(device) # 🟢 Bắt đầu đo thời gian dự đoán Depth Map start_time = time.time() # 🟢 Dự đoán Depth Map với ZoeDepth with torch.no_grad(): depth_map = model.infer(img_tensor) depth_map = torch.nn.functional.interpolate( depth_map.unsqueeze(1), size=(image_np.shape[0], image_np.shape[1]), mode="bicubic", align_corners=False ).squeeze().cpu().numpy() end_time = time.time() print(f"⏳ ZoeDepth xử lý trong {end_time - start_time:.4f} giây") # 🟢 Đo thời gian xử lý đường đi start_detect_time = time.time() command = detect_path(depth_map) end_detect_time = time.time() print(f"⏳ detect_path() xử lý trong {end_detect_time - start_detect_time:.4f} giây") return {"command": command} def detect_path(depth_map): """Phân tích đường đi từ ảnh Depth Map""" h, w = depth_map.shape center_x = w // 2 scan_y = h - 20 # Quét dòng gần đáy ảnh left_region = np.mean(depth_map[scan_y, :center_x]) right_region = np.mean(depth_map[scan_y, center_x:]) center_region = np.mean(depth_map[scan_y, center_x - 20:center_x + 20]) if center_region > 200: return "forward" elif left_region > right_region: return "left" elif right_region > left_region: return "right" else: return "backward" # 🟢 Chạy server FastAPI if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)