import io import time import numpy as np import cv2 import torch import gradio as gr from transformers import DPTFeatureExtractor, DPTForDepthEstimation from fastapi import FastAPI, File, UploadFile from PIL import Image import uvicorn import threading # 🟢 Tạo FastAPI app = FastAPI() # 🟢 Chọn thiết bị xử lý (GPU nếu có) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # 🟢 Tải model DPT-Hybrid để tăng tốc feature_extractor = DPTFeatureExtractor.from_pretrained("Intel/dpt-swinv2-tiny-256") model = DPTForDepthEstimation.from_pretrained("Intel/dpt-swinv2-tiny-256").to(device) model.eval() @app.post("/analyze_path/") async def analyze_path(file: UploadFile = File(...)): """Xử lý ảnh Depth Map và lưu ảnh để hiển thị trên Hugging Face""" start_time = time.time() # 🟢 Đọc file ảnh từ ESP32 image_bytes = await file.read() image = Image.open(io.BytesIO(image_bytes)).convert("RGB") # 🔵 Resize ảnh để xử lý nhanh hơn image = image.resize((256, 256)) image_np = np.array(image) flipped_image = cv2.flip(image_np, -1) # 🟢 Chuẩn bị ảnh cho mô hình inputs = feature_extractor(images=flipped_image, return_tensors="pt").to(device) # 🟢 Dự đoán Depth Map với DPT-Hybrid with torch.no_grad(): outputs = model(**inputs) # 🟢 Xử lý ảnh sau khi dự đoán predicted_depth = outputs.predicted_depth.squeeze().cpu().numpy() depth_map = (predicted_depth * 255 / predicted_depth.max()).astype("uint8") # 🔵 Chuyển depth_map thành ảnh có thể hiển thị depth_colored = cv2.applyColorMap(depth_map, cv2.COLORMAP_INFERNO) depth_pil = Image.fromarray(depth_colored) # 🟢 Lưu ảnh Depth Map để hiển thị trên Gradio depth_pil.save("depth_map.png") # 🎯 **Gradio sẽ truy cập ảnh này** end_time = time.time() print(f"⏳ DPT xử lý trong {end_time - start_time:.4f} giây") # 🟢 Đo thời gian xử lý đường đi start_detect_time = time.time() command = detect_path(depth_map) end_detect_time = time.time() print(f"⏳ detect_path() xử lý trong {end_detect_time - start_detect_time:.4f} giây") return command # Trả về lệnh điều hướng (không kèm ảnh) def detect_path(depth_map): """Phân tích đường đi từ ảnh Depth Map""" h, w = depth_map.shape center_x = w // 2 scan_y = int(h * 0.8) # Quét dòng 80% từ trên xuống left_region = np.mean(depth_map[scan_y, :center_x]) right_region = np.mean(depth_map[scan_y, center_x:]) center_region = np.mean(depth_map[scan_y, center_x - 40:center_x + 40]) # 🟢 Cải thiện logic xử lý threshold = 100 # Ngưỡng phân biệt vật cản if center_region > threshold: return "forward" elif left_region > right_region: return "left" elif right_region > left_region: return "right" else: return "backward" # 🟢 Hàm Gradio hiển thị ảnh Depth Map def get_depth_map(): """Trả về ảnh Depth Map mới nhất từ FastAPI""" try: return Image.open("depth_map.png") # 🟢 Lấy ảnh đã lưu từ FastAPI except FileNotFoundError: return "Chưa có ảnh Depth Map nào được xử lý!" # 🟢 Tạo giao diện Gradio trên Hugging Face Spaces def launch_gradio(): gr.Interface( fn=get_depth_map, inputs=[], outputs=gr.Image(type="pil"), title="🔍 Depth Map Viewer", description="Hiển thị ảnh Depth Map mới nhất từ FastAPI", live=True, # 🎯 **Tự động cập nhật ảnh mới** ).launch(server_name="0.0.0.0", server_port=7861, share=True) # 🟢 Chạy cả FastAPI và Gradio cùng lúc if __name__ == "__main__": # Chạy Gradio trong một luồng riêng threading.Thread(target=launch_gradio, daemon=True).start() # Chạy FastAPI uvicorn.run(app, host="0.0.0.0", port=7860)