# app.py - InterFuser Self-Driving API Server import uuid import base64 import cv2 # import torch # import numpy as np # from fastapi import FastAPI, HTTPException # from fastapi.responses import HTMLResponse # from pydantic import BaseModel # from torchvision import transforms # from typing import List, Dict, Any, Optional # import logging # # استيراد من ملفاتنا المحلية # from model_definition import InterfuserModel, load_and_prepare_model, create_model_config # from simulation_modules import ( # InterfuserController, ControllerConfig, Tracker, DisplayInterface, # render, render_waypoints, render_self_car, WAYPOINT_SCALE_FACTOR, # T1_FUTURE_TIME, T2_FUTURE_TIME # ) # # إعداد التسجيل # logging.basicConfig(level=logging.INFO) # logger = logging.getLogger(__name__) # # ================== إعدادات عامة وتحميل النموذج ================== # app = FastAPI( # title="Baseer Self-Driving API", # description="API للقيادة الذاتية باستخدام نموذج InterFuser", # version="1.0.0" # ) # device = torch.device("cpu") # logger.info(f"Using device: {device}") # # تحميل النموذج باستخدام الدالة المحسنة # try: # # إنشاء إعدادات النموذج باستخدام الإعدادات الصحيحة من التدريب # model_config = create_model_config( # model_path="model/best_model.pth" # # الإعدادات الصحيحة من التدريب ستطبق تلقائياً: # # embed_dim=256, rgb_backbone_name='r50', waypoints_pred_head='gru' # # with_lidar=False, with_right_left_sensors=False, with_center_sensor=False # ) # # تحميل النموذج مع الأوزان # model = load_and_prepare_model(model_config, device) # logger.info("✅ تم تحميل النموذج بنجاح") # except Exception as e: # logger.error(f"❌ خطأ في تحميل النموذج: {e}") # logger.info("🔄 محاولة إنشاء نموذج بأوزان عشوائية...") # try: # model = InterfuserModel() # model.to(device) # model.eval() # logger.warning("⚠️ تم إنشاء النموذج بأوزان عشوائية") # except Exception as e2: # logger.error(f"❌ فشل في إنشاء النموذج: {e2}") # model = None # # تهيئة واجهة العرض # display = DisplayInterface() # # قاموس لتخزين جلسات المستخدمين # SESSIONS: Dict[str, Dict] = {} # # ================== هياكل بيانات Pydantic ================== # class Measurements(BaseModel): # pos: List[float] = [0.0, 0.0] # [x, y] position # theta: float = 0.0 # orientation angle # speed: float = 0.0 # current speed # steer: float = 0.0 # current steering # throttle: float = 0.0 # current throttle # brake: bool = False # brake status # command: int = 4 # driving command (4 = FollowLane) # target_point: List[float] = [0.0, 0.0] # target point [x, y] # class ModelOutputs(BaseModel): # traffic: List[List[List[float]]] # 20x20x7 grid # waypoints: List[List[float]] # Nx2 waypoints # is_junction: float # traffic_light_state: float # stop_sign: float # class ControlCommands(BaseModel): # steer: float # throttle: float # brake: bool # class RunStepInput(BaseModel): # session_id: str # image_b64: str # measurements: Measurements # class RunStepOutput(BaseModel): # model_outputs: ModelOutputs # control_commands: ControlCommands # dashboard_image_b64: str # class SessionResponse(BaseModel): # session_id: str # message: str # # ================== دوال المساعدة ================== # def get_image_transform(): # """إنشاء تحويلات الصورة كما في PDMDataset""" # return transforms.Compose([ # transforms.ToTensor(), # transforms.Resize((224, 224), antialias=True), # transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) # ]) # # إنشاء كائن التحويل مرة واحدة # image_transform = get_image_transform() # def preprocess_input(frame_rgb: np.ndarray, measurements: Measurements, device: torch.device) -> Dict[str, torch.Tensor]: # """ # تحاكي ما يفعله PDMDataset.__getitem__ لإنشاء دفعة (batch) واحدة. # """ # # 1. معالجة الصورة الرئيسية # from PIL import Image # if isinstance(frame_rgb, np.ndarray): # frame_rgb = Image.fromarray(frame_rgb) # image_tensor = image_transform(frame_rgb).unsqueeze(0).to(device) # إضافة بُعد الدفعة # # 2. إنشاء مدخلات الكاميرات الأخرى عن طريق الاستنساخ # batch = { # 'rgb': image_tensor, # 'rgb_left': image_tensor.clone(), # 'rgb_right': image_tensor.clone(), # 'rgb_center': image_tensor.clone(), # } # # 3. إنشاء مدخل ليدار وهمي (أصفار) # batch['lidar'] = torch.zeros(1, 3, 224, 224, dtype=torch.float32).to(device) # # 4. تجميع القياسات بنفس ترتيب PDMDataset # m = measurements # measurements_tensor = torch.tensor([[ # m.pos[0], m.pos[1], m.theta, # m.steer, m.throttle, float(m.brake), # m.speed, float(m.command) # ]], dtype=torch.float32).to(device) # batch['measurements'] = measurements_tensor # # 5. إنشاء نقطة هدف # batch['target_point'] = torch.tensor([m.target_point], dtype=torch.float32).to(device) # # لا نحتاج إلى قيم ground truth (gt_*) أثناء التنبؤ # return batch # def decode_base64_image(image_b64: str) -> np.ndarray: # """ # فك تشفير صورة Base64 # """ # try: # image_bytes = base64.b64decode(image_b64) # nparr = np.frombuffer(image_bytes, np.uint8) # image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) # return image # except Exception as e: # raise HTTPException(status_code=400, detail=f"Invalid image format: {str(e)}") # def encode_image_to_base64(image: np.ndarray) -> str: # """ # تشفير صورة إلى Base64 # """ # _, buffer = cv2.imencode('.jpg', image, [cv2.IMWRITE_JPEG_QUALITY, 85]) # return base64.b64encode(buffer).decode('utf-8') # # ================== نقاط نهاية الـ API ================== # @app.get("/", response_class=HTMLResponse) # async def root(): # """ # الصفحة الرئيسية للـ API # """ # html_content = f""" # # #
# # #نظام القيادة الذاتية المتقدم
#Welcome! The API is running.
Navigate to /docs for the interactive API documentation.
""" # في ملف app.py @app.post("/start_session", summary="Start a new driving session", tags=["Session Management"]) def start_session(): session_id = str(uuid.uuid4()) # 1. الحصول على الإعدادات الكاملة من المصدر الوحيد config = get_master_config() # 2. استخراج الإعدادات المطلوبة لكل مكون بشكل صريح grid_conf = config['grid_conf'] controller_params = config['controller_params'] simulation_freq = config['simulation']['frequency'] # 3. تهيئة المتتبع (Tracker) بمعلماته المحددة tracker = Tracker( grid_conf=grid_conf, match_threshold=controller_params.get('tracker_match_thresh', 2.5), prune_age=controller_params.get('tracker_prune_age', 5) ) # 4. تهيئة المتحكم (Controller) controller = InterfuserController({ 'controller_params': controller_params, 'grid_conf': grid_conf, 'frequency': simulation_freq }) # 5. إنشاء الجلسة بالكائنات المهيأة SESSIONS[session_id] = { 'tracker': tracker, 'controller': controller, 'frame_num': 0 } logging.info(f"New session started: {session_id}") return {"session_id": session_id} @app.post("/run_step", response_model=RunStepResponse, summary="Process a single simulation step", tags=["Core"]) @torch.no_grad() def run_step(request: RunStepRequest): if MODEL is None: raise HTTPException(status_code=503, detail="Model is not available.") session = SESSIONS.get(request.session_id) if not session: raise HTTPException(status_code=404, detail="Session ID not found.") # --- 1. الإدراك (Perception) --- image = b64_to_cv2(request.image_b64) model_input = prepare_model_input(image, request.measurements) traffic, waypoints, junc, light, stop, _ = MODEL(model_input) # --- 2. معالجة مخرجات النموذج --- traffic_processed = torch.cat([torch.sigmoid(traffic[0][:, 0:1]), traffic[0][:, 1:]], dim=1) traffic_np = traffic_processed.cpu().numpy().reshape(20, 20, -1) waypoints_np = waypoints[0].cpu().numpy() junction_prob = torch.softmax(junc, dim=1)[0, 1].item() light_prob = torch.softmax(light, dim=1)[0, 1].item() stop_prob = torch.softmax(stop, dim=1)[0, 1].item() # --- 3. التتبع والتحكم --- ego_pos = np.array(request.measurements.pos_global) ego_theta = request.measurements.theta frame_num = session['frame_num'] active_tracks = session['tracker'].process_frame(traffic_np, ego_pos, ego_theta, frame_num) steer, throttle, brake, ctrl_info = session['controller'].run_step( speed=request.measurements.speed, waypoints=torch.from_numpy(waypoints_np), junction=junction_prob, traffic_light=light_prob, stop_sign=stop_prob, bev_map=traffic_np, ego_pos=ego_pos, ego_theta=ego_theta, frame_num=frame_num ) # --- 4. إنشاء الواجهة المرئية --- display_iface = DisplayInterface(DisplayConfig(width=1280, height=720)) bev_maps = render_bev(active_tracks, waypoints_np, ego_pos, ego_theta) display_data = { 'camera_view': image, 'map_t0': bev_maps['t0'], 'map_t1': bev_maps['t1'], 'map_t2': bev_maps['t2'], 'frame_num': frame_num, 'speed': request.measurements.speed * 3.6, 'target_speed': ctrl_info.get('target_speed', 0) * 3.6, 'steer': steer, 'throttle': throttle, 'brake': brake, 'light_prob': light_prob, 'stop_prob': stop_prob, 'object_counts': {'car': len(active_tracks)} } dashboard = display_iface.run_interface(display_data) # --- 5. تحديث الجلسة وإرجاع الرد --- session['frame_num'] += 1 return RunStepResponse( control_commands=ControlCommands(steer=steer, throttle=throttle, brake=brake), scene_analysis=SceneAnalysis(is_junction=junction_prob, traffic_light_state=light_prob, stop_sign=stop_prob), predicted_waypoints=[tuple(wp) for wp in waypoints_np.tolist()], dashboard_b64=cv2_to_b64(dashboard), reason=ctrl_info.get('brake_reason', "Cruising") ) @app.post("/end_session", summary="End and clean up a session", tags=["Session Management"]) def end_session(session_id: str): if session_id in SESSIONS: del SESSIONS[session_id] logging.info(f"Session ended: {session_id}") return {"message": f"Session {session_id} ended."} raise HTTPException(status_code=404, detail="Session not found.") # ================== تشغيل الخادم ================== if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)