File size: 4,067 Bytes
8d36ed5 4d26de9 1b0fc86 4d26de9 8d36ed5 02cc722 8d36ed5 1b0fc86 81def50 64c49d0 8d36ed5 4d26de9 81def50 b3652fb 64c49d0 3b1c0bd 4d26de9 8d36ed5 3bdfe12 4d26de9 64c49d0 8d36ed5 4d26de9 64c49d0 4a9c7f0 64c49d0 4d26de9 8d36ed5 4d26de9 8d36ed5 4d26de9 fac5d14 4d26de9 64c49d0 3bdfe12 64c49d0 4d26de9 8d36ed5 4d26de9 8d36ed5 64c49d0 8d36ed5 64c49d0 b3652fb 8d36ed5 b3652fb 64c49d0 8d36ed5 81def50 64c49d0 8d36ed5 64c49d0 8d36ed5 64c49d0 81def50 1b0fc86 17c3ee8 1b0fc86 8d36ed5 1b0fc86 64c49d0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
import io
import time
import numpy as np
import cv2
import torch
import gradio as gr
from transformers import DPTFeatureExtractor, DPTForDepthEstimation
from fastapi import FastAPI, File, UploadFile
from PIL import Image
import uvicorn
import threading
# 🟢 Tạo FastAPI
app = FastAPI()
# 🟢 Chọn thiết bị xử lý (GPU nếu có)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 🟢 Tải model DPT-Hybrid để tăng tốc
feature_extractor = DPTFeatureExtractor.from_pretrained("Intel/dpt-swinv2-tiny-256")
model = DPTForDepthEstimation.from_pretrained("Intel/dpt-swinv2-tiny-256").to(device)
model.eval()
@app.post("/analyze_path/")
async def analyze_path(file: UploadFile = File(...)):
"""Xử lý ảnh Depth Map và lưu ảnh để hiển thị trên Hugging Face"""
start_time = time.time()
# 🟢 Đọc file ảnh từ ESP32
image_bytes = await file.read()
image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
# 🔵 Resize ảnh để xử lý nhanh hơn
image = image.resize((256, 256))
image_np = np.array(image)
flipped_image = cv2.flip(image_np, -1)
# 🟢 Chuẩn bị ảnh cho mô hình
inputs = feature_extractor(images=flipped_image, return_tensors="pt").to(device)
# 🟢 Dự đoán Depth Map với DPT-Hybrid
with torch.no_grad():
outputs = model(**inputs)
# 🟢 Xử lý ảnh sau khi dự đoán
predicted_depth = outputs.predicted_depth.squeeze().cpu().numpy()
depth_map = (predicted_depth * 255 / predicted_depth.max()).astype("uint8")
# 🔵 Chuyển depth_map thành ảnh có thể hiển thị
depth_colored = cv2.applyColorMap(depth_map, cv2.COLORMAP_INFERNO)
depth_pil = Image.fromarray(depth_colored)
# 🟢 Lưu ảnh Depth Map để hiển thị trên Gradio
depth_pil.save("depth_map.png") # 🎯 **Gradio sẽ truy cập ảnh này**
end_time = time.time()
print(f"⏳ DPT xử lý trong {end_time - start_time:.4f} giây")
# 🟢 Đo thời gian xử lý đường đi
start_detect_time = time.time()
command = detect_path(depth_map)
end_detect_time = time.time()
print(f"⏳ detect_path() xử lý trong {end_detect_time - start_detect_time:.4f} giây")
return command # Trả về lệnh điều hướng (không kèm ảnh)
def detect_path(depth_map):
"""Phân tích đường đi từ ảnh Depth Map"""
h, w = depth_map.shape
center_x = w // 2
scan_y = int(h * 0.8) # Quét dòng 80% từ trên xuống
left_region = np.mean(depth_map[scan_y, :center_x])
right_region = np.mean(depth_map[scan_y, center_x:])
center_region = np.mean(depth_map[scan_y, center_x - 40:center_x + 40])
# 🟢 Cải thiện logic xử lý
threshold = 100 # Ngưỡng phân biệt vật cản
if center_region > threshold:
return "forward"
elif left_region > right_region:
return "left"
elif right_region > left_region:
return "right"
else:
return "backward"
# 🟢 Hàm Gradio hiển thị ảnh Depth Map
def get_depth_map():
"""Trả về ảnh Depth Map mới nhất từ FastAPI"""
try:
return Image.open("depth_map.png") # 🟢 Lấy ảnh đã lưu từ FastAPI
except FileNotFoundError:
return "Chưa có ảnh Depth Map nào được xử lý!"
# 🟢 Tạo giao diện Gradio trên Hugging Face Spaces
def launch_gradio():
gr.Interface(
fn=get_depth_map,
inputs=[],
outputs=gr.Image(type="pil"),
title="🔍 Depth Map Viewer",
description="Hiển thị ảnh Depth Map mới nhất từ FastAPI",
live=True, # 🎯 **Tự động cập nhật ảnh mới**
).launch(server_name="0.0.0.0", server_port=7861, share=True)
# 🟢 Chạy cả FastAPI và Gradio cùng lúc
if __name__ == "__main__":
# Chạy Gradio trong một luồng riêng
threading.Thread(target=launch_gradio, daemon=True).start()
# Chạy FastAPI
uvicorn.run(app, host="0.0.0.0", port=7860)
|