# app.py - InterFuser Self-Driving API Server import uuid import base64 import cv2 import torch import numpy as np from fastapi import FastAPI, HTTPException from fastapi.responses import HTMLResponse from pydantic import BaseModel from torchvision import transforms from typing import List, Dict, Any, Optional import logging # # استيراد من ملفاتنا المحلية # from model_definition import InterfuserModel, load_and_prepare_model, create_model_config # from simulation_modules import ( # InterfuserController, ControllerConfig, Tracker, DisplayInterface, # render, render_waypoints, render_self_car, WAYPOINT_SCALE_FACTOR, # T1_FUTURE_TIME, T2_FUTURE_TIME # ) # # إعداد التسجيل # logging.basicConfig(level=logging.INFO) # logger = logging.getLogger(__name__) # # ================== إعدادات عامة وتحميل النموذج ================== # app = FastAPI( # title="Baseer Self-Driving API", # description="API للقيادة الذاتية باستخدام نموذج InterFuser", # version="1.0.0" # ) # device = torch.device("cpu") # logger.info(f"Using device: {device}") # # تحميل النموذج باستخدام الدالة المحسنة # try: # # إنشاء إعدادات النموذج باستخدام الإعدادات الصحيحة من التدريب # model_config = create_model_config( # model_path="model/best_model.pth" # # الإعدادات الصحيحة من التدريب ستطبق تلقائياً: # # embed_dim=256, rgb_backbone_name='r50', waypoints_pred_head='gru' # # with_lidar=False, with_right_left_sensors=False, with_center_sensor=False # ) # # تحميل النموذج مع الأوزان # model = load_and_prepare_model(model_config, device) # logger.info("✅ تم تحميل النموذج بنجاح") # except Exception as e: # logger.error(f"❌ خطأ في تحميل النموذج: {e}") # logger.info("🔄 محاولة إنشاء نموذج بأوزان عشوائية...") # try: # model = InterfuserModel() # model.to(device) # model.eval() # logger.warning("⚠️ تم إنشاء النموذج بأوزان عشوائية") # except Exception as e2: # logger.error(f"❌ فشل في إنشاء النموذج: {e2}") # model = None # # تهيئة واجهة العرض # display = DisplayInterface() # # قاموس لتخزين جلسات المستخدمين # SESSIONS: Dict[str, Dict] = {} # # ================== هياكل بيانات Pydantic ================== # class Measurements(BaseModel): # pos: List[float] = [0.0, 0.0] # [x, y] position # theta: float = 0.0 # orientation angle # speed: float = 0.0 # current speed # steer: float = 0.0 # current steering # throttle: float = 0.0 # current throttle # brake: bool = False # brake status # command: int = 4 # driving command (4 = FollowLane) # target_point: List[float] = [0.0, 0.0] # target point [x, y] # class ModelOutputs(BaseModel): # traffic: List[List[List[float]]] # 20x20x7 grid # waypoints: List[List[float]] # Nx2 waypoints # is_junction: float # traffic_light_state: float # stop_sign: float # class ControlCommands(BaseModel): # steer: float # throttle: float # brake: bool # class RunStepInput(BaseModel): # session_id: str # image_b64: str # measurements: Measurements # class RunStepOutput(BaseModel): # model_outputs: ModelOutputs # control_commands: ControlCommands # dashboard_image_b64: str # class SessionResponse(BaseModel): # session_id: str # message: str # # ================== دوال المساعدة ================== # def get_image_transform(): # """إنشاء تحويلات الصورة كما في PDMDataset""" # return transforms.Compose([ # transforms.ToTensor(), # transforms.Resize((224, 224), antialias=True), # transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) # ]) # # إنشاء كائن التحويل مرة واحدة # image_transform = get_image_transform() # def preprocess_input(frame_rgb: np.ndarray, measurements: Measurements, device: torch.device) -> Dict[str, torch.Tensor]: # """ # تحاكي ما يفعله PDMDataset.__getitem__ لإنشاء دفعة (batch) واحدة. # """ # # 1. معالجة الصورة الرئيسية # from PIL import Image # if isinstance(frame_rgb, np.ndarray): # frame_rgb = Image.fromarray(frame_rgb) # image_tensor = image_transform(frame_rgb).unsqueeze(0).to(device) # إضافة بُعد الدفعة # # 2. إنشاء مدخلات الكاميرات الأخرى عن طريق الاستنساخ # batch = { # 'rgb': image_tensor, # 'rgb_left': image_tensor.clone(), # 'rgb_right': image_tensor.clone(), # 'rgb_center': image_tensor.clone(), # } # # 3. إنشاء مدخل ليدار وهمي (أصفار) # batch['lidar'] = torch.zeros(1, 3, 224, 224, dtype=torch.float32).to(device) # # 4. تجميع القياسات بنفس ترتيب PDMDataset # m = measurements # measurements_tensor = torch.tensor([[ # m.pos[0], m.pos[1], m.theta, # m.steer, m.throttle, float(m.brake), # m.speed, float(m.command) # ]], dtype=torch.float32).to(device) # batch['measurements'] = measurements_tensor # # 5. إنشاء نقطة هدف # batch['target_point'] = torch.tensor([m.target_point], dtype=torch.float32).to(device) # # لا نحتاج إلى قيم ground truth (gt_*) أثناء التنبؤ # return batch # def decode_base64_image(image_b64: str) -> np.ndarray: # """ # فك تشفير صورة Base64 # """ # try: # image_bytes = base64.b64decode(image_b64) # nparr = np.frombuffer(image_bytes, np.uint8) # image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) # return image # except Exception as e: # raise HTTPException(status_code=400, detail=f"Invalid image format: {str(e)}") # def encode_image_to_base64(image: np.ndarray) -> str: # """ # تشفير صورة إلى Base64 # """ # _, buffer = cv2.imencode('.jpg', image, [cv2.IMWRITE_JPEG_QUALITY, 85]) # return base64.b64encode(buffer).decode('utf-8') # # ================== نقاط نهاية الـ API ================== # @app.get("/", response_class=HTMLResponse) # async def root(): # """ # الصفحة الرئيسية للـ API # """ # html_content = f""" # # #
# # #نظام القيادة الذاتية المتقدم
#Welcome! The API is running.
Navigate to /docs for the interactive API documentation.
""" # في ملف app.py @app.post("/start_session", summary="Start a new driving session", tags=["Session Management"]) def start_session(): session_id = str(uuid.uuid4()) # 1. الحصول على الإعدادات الكاملة من المصدر الوحيد config = get_master_config() # 2. استخراج الإعدادات المطلوبة لكل مكون بشكل صريح grid_conf = config['grid_conf'] controller_params = config['controller_params'] simulation_freq = config['simulation']['frequency'] # 3. تهيئة المتتبع (Tracker) بمعلماته المحددة tracker = Tracker( grid_conf=grid_conf, match_threshold=controller_params.get('tracker_match_thresh', 2.5), prune_age=controller_params.get('tracker_prune_age', 5) ) # 4. تهيئة المتحكم (Controller) controller = InterfuserController({ 'controller_params': controller_params, 'grid_conf': grid_conf, 'frequency': simulation_freq }) # 5. إنشاء الجلسة بالكائنات المهيأة SESSIONS[session_id] = { 'tracker': tracker, 'controller': controller, 'frame_num': 0 } logging.info(f"New session started: {session_id}") return {"session_id": session_id} @app.post("/run_step", response_model=RunStepResponse, summary="Process a single simulation step", tags=["Core"]) @torch.no_grad() def run_step(request: RunStepRequest): if MODEL is None: raise HTTPException(status_code=503, detail="Model is not available.") session = SESSIONS.get(request.session_id) if not session: raise HTTPException(status_code=404, detail="Session ID not found.") # --- 1. الإدراك (Perception) --- image = b64_to_cv2(request.image_b64) model_input = prepare_model_input(image, request.measurements) traffic, waypoints, junc, light, stop, _ = MODEL(model_input) # --- 2. معالجة مخرجات النموذج --- traffic_processed = torch.cat([torch.sigmoid(traffic[0][:, 0:1]), traffic[0][:, 1:]], dim=1) traffic_np = traffic_processed.cpu().numpy().reshape(20, 20, -1) waypoints_np = waypoints[0].cpu().numpy() junction_prob = torch.softmax(junc, dim=1)[0, 1].item() light_prob = torch.softmax(light, dim=1)[0, 1].item() stop_prob = torch.softmax(stop, dim=1)[0, 1].item() # --- 3. التتبع والتحكم --- ego_pos = np.array(request.measurements.pos_global) ego_theta = request.measurements.theta frame_num = session['frame_num'] active_tracks = session['tracker'].process_frame(traffic_np, ego_pos, ego_theta, frame_num) steer, throttle, brake, ctrl_info = session['controller'].run_step( speed=request.measurements.speed, waypoints=torch.from_numpy(waypoints_np), junction=junction_prob, traffic_light=light_prob, stop_sign=stop_prob, bev_map=traffic_np, ego_pos=ego_pos, ego_theta=ego_theta, frame_num=frame_num ) # --- 4. إنشاء الواجهة المرئية --- display_iface = DisplayInterface(DisplayConfig(width=1280, height=720)) bev_maps = render_bev(active_tracks, waypoints_np, ego_pos, ego_theta) display_data = { 'camera_view': image, 'map_t0': bev_maps['t0'], 'map_t1': bev_maps['t1'], 'map_t2': bev_maps['t2'], 'frame_num': frame_num, 'speed': request.measurements.speed * 3.6, 'target_speed': ctrl_info.get('target_speed', 0) * 3.6, 'steer': steer, 'throttle': throttle, 'brake': brake, 'light_prob': light_prob, 'stop_prob': stop_prob, 'object_counts': {'car': len(active_tracks)} } dashboard = display_iface.run_interface(display_data) # --- 5. تحديث الجلسة وإرجاع الرد --- session['frame_num'] += 1 return RunStepResponse( control_commands=ControlCommands(steer=steer, throttle=throttle, brake=brake), scene_analysis=SceneAnalysis(is_junction=junction_prob, traffic_light_state=light_prob, stop_sign=stop_prob), predicted_waypoints=[tuple(wp) for wp in waypoints_np.tolist()], dashboard_b64=cv2_to_b64(dashboard), reason=ctrl_info.get('brake_reason', "Cruising") ) @app.post("/end_session", summary="End and clean up a session", tags=["Session Management"]) def end_session(session_id: str): if session_id in SESSIONS: del SESSIONS[session_id] logging.info(f"Session ended: {session_id}") return {"message": f"Session {session_id} ended."} raise HTTPException(status_code=404, detail="Session not found.") # ================== تشغيل الخادم ================== if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)