Spaces:
Running
Running
# =================================================================== | |
# ملف: utils.py (نسخة v2.0 - متوافقة مع الخرائط الديناميكية) | |
# =================================================================== | |
import numpy as np | |
from collections import deque | |
import cv2 | |
import math | |
from typing import Dict, List | |
import torch | |
from torchvision import transforms | |
import os | |
import json | |
import gzip | |
import torch | |
from torch.utils.data import Dataset | |
from dataclasses import dataclass | |
from typing import Dict, Tuple, Optional | |
import time | |
from dataclasses import dataclass | |
from typing import Dict, Tuple, Optional | |
def find_peak_box_and_classify(data): | |
""" | |
[v2.0] تجد القمم في شبكة الكشف وتستخرج معلومات الكائنات. | |
هذه النسخة مرنة وتعمل مع أي حجم خريطة. | |
""" | |
# ✅ إصلاح: الحصول على أبعاد الخريطة ديناميكيًا | |
H, W, _ = data.shape | |
# ✅ إصلاح: إضافة إطار الأمان يعتمد على حجم الخريطة الفعلي | |
det_data = np.zeros((H + 2, W + 2, 7)) | |
det_data[1:H+1, 1:W+1] = data | |
detected_objects, object_counts = [], {"car": 0, "bike": 0, "pedestrian": 0, "unknown": 0} | |
# ✅ إصلاح: حلقة التكرار تمر على حجم الخريطة الفعلي | |
for i in range(1, H + 1): | |
for j in range(1, W + 1): | |
confidence = det_data[i, j, 0] | |
# تم رفع حد الثقة قليلاً لتقليل الاكتشافات الخاطئة | |
if confidence > 0.12: | |
# البحث عن القمة المحلية | |
if (confidence >= det_data[i,j-1,0] and confidence >= det_data[i,j+1,0] and | |
confidence >= det_data[i-1,j,0] and confidence >= det_data[i+1,j,0]): | |
# استخراج البيانات | |
length, width = det_data[i,j,4], det_data[i,j,5] | |
# تصنيف مبسط يعتمد على الأبعاد | |
if length > 3.5: obj_type = 'car' | |
elif length > 1.5 and width > 0.5: obj_type = 'bike' | |
else: obj_type = 'pedestrian' | |
object_counts[obj_type] += 1 | |
# ✅ إصلاح: إحداثيات الشبكة الآن نسبة إلى الخريطة الأصلية (i-1, j-1) | |
detected_objects.append({ | |
'grid_coords': (i - 1, j - 1), | |
'raw_data': det_data[i, j], | |
'type': obj_type | |
}) | |
return detected_objects, object_counts | |
def check_for_nearby_obstacle(meta_data, grid_conf, max_dist=15.0, threshold=0.4): | |
""" | |
[v2.0] تتحقق من وجود أي عائق قريب. | |
""" | |
detected_objects, _ = find_peak_box_and_classify(meta_data) | |
if not detected_objects: | |
return False | |
# استخدام إعدادات الشبكة من القاموس | |
x_res = grid_conf['x_res'] | |
y_res = grid_conf['y_res'] | |
x_min = grid_conf['x_min'] | |
y_min = grid_conf['y_min'] | |
for obj_dict in detected_objects: | |
if obj_dict['raw_data'][0] > threshold: | |
raw_data = obj_dict['raw_data'] | |
grid_i, grid_j = obj_dict['grid_coords'] | |
offset_x, offset_y = raw_data[1], raw_data[2] | |
# ✅ إصلاح: استخدام نظام الإحداثيات الصحيح (Y-أمام, X-يسار) | |
# لاحظ أن grid_j يرتبط بـ x_rel و grid_i يرتبط بـ y_rel | |
x_rel = (grid_j * x_res) + x_min + (x_res / 2) + offset_x | |
y_rel = (grid_i * y_res) + y_min + (y_res / 2) + offset_y | |
distance = np.linalg.norm([x_rel, y_rel]) | |
if distance < max_dist: | |
print(f"🛑 تم اكتشاف عائق قريب على مسافة {distance:.2f} متر.") | |
return True, distance | |
# print("✅ لا توجد عائق قريب.") | |
return False | |
# =================================================================== | |
# وحدات مساعدة (Helpers) - وظائف نقية ومفصولة | |
# =================================================================== | |
def convert_grid_to_relative_xy(grid_i, grid_j, grid_conf): | |
""" | |
يحول إحداثيات الشبكة (i, j) إلى إحداثيات مترية نسبية للسيارة (y-أمام, x-جانب). | |
Args: | |
grid_i (int): مؤشر الصف في الشبكة (المحور الأمامي). | |
grid_j (int): مؤشر العمود في الشبكة (المحور الجانبي). | |
grid_conf (dict): قاموس يحتوي على أبعاد ودقة الشبكة. | |
Returns: | |
tuple[float, float]: الإحداثيات النسبية (relative_y, relative_x) لمركز الخلية. | |
""" | |
x_res = grid_conf['x_res'] | |
y_res = grid_conf['y_res'] | |
x_min = grid_conf['x_min'] | |
y_min = grid_conf['y_min'] | |
# حساب مركز الخلية | |
relative_x = (grid_j * x_res) + x_min + (x_res / 2.0) | |
relative_y = (grid_i * y_res) + y_min + (y_res / 2.0) | |
return relative_y, relative_x | |
# =================================================================== | |
# الفئات الرئيسية (Core Classes) | |
# =================================================================== | |
class TrackedObject: | |
""" | |
يمثل كائنًا واحدًا يتم تتبعه عبر الزمن. | |
يحتفظ بمعرف فريد وتاريخ لمواقعه. | |
""" | |
def __init__(self, obj_id, initial_detection, frame_num): | |
self.id = obj_id | |
self.type = initial_detection['type'] | |
self.history = deque(maxlen=20) # تخزين آخر 20 موقعًا | |
self.last_frame_seen = frame_num | |
self.last_confidence = 0.0 | |
self.update(initial_detection, frame_num) | |
def update(self, detection, frame_num): | |
"""تحديث حالة الكائن ببيانات اكتشاف جديدة.""" | |
self.history.append(detection['global_pos']) | |
self.last_frame_seen = frame_num | |
self.last_confidence = detection['raw_data'][0] | |
def last_pos(self): | |
"""الحصول على آخر موقع مسجل للكائن.""" | |
return self.history[-1] if self.history else None | |
def __repr__(self): | |
"""تمثيل نصي مفيد لتصحيح الأخطاء.""" | |
return f"Track(ID={self.id}, Type='{self.type}', LastSeen={self.last_frame_seen})" | |
class Tracker: | |
""" | |
يتتبع الكائنات المكتشفة في خرائط BEV عبر الإطارات. | |
يستخدم نظام إحداثيات عالمي للمطابقة والحفاظ على هوية الكائنات. | |
""" | |
def __init__(self, grid_conf, match_threshold=2.5, prune_age=5): | |
""" | |
Args: | |
grid_conf (dict): إعدادات الشبكة لتحويل الإحداثيات. | |
match_threshold (float): أقصى مسافة (بالمتر) لاعتبار كائن متطابقًا مع مسار. | |
prune_age (int): عدد الإطارات التي يجب انتظارها قبل حذف مسار غير نشط. | |
""" | |
self.grid_conf = grid_conf | |
self.match_threshold = match_threshold | |
self.prune_age = prune_age | |
self.tracks = {} # قاموس لتخزين المسارات باستخدام ID كـ مفتاح | |
self.next_track_id = 0 | |
def process_frame(self, bev_map, ego_pos, ego_theta, frame_num): | |
""" | |
الدالة الرئيسية: تعالج إطارًا واحدًا، تكتشف الكائنات، وتحدث المسارات. | |
""" | |
# 1. اكتشاف الكائنات من خريطة BEV | |
# (نفترض أن find_peak_box_and_classify معرفة في مكان آخر) | |
detections, _ = find_peak_box_and_classify(bev_map) | |
# 2. تحويل مواقع الكائنات إلى إحداثيات عالمية | |
self._add_global_positions(detections, ego_pos, ego_theta) | |
# 3. مطابقة الاكتشافات بالمسارات الموجودة | |
matches, unmatched_detections = self._match_detections_to_tracks(detections) | |
# 4. تحديث المسارات المتطابقة | |
for track_id, detection_idx in matches.items(): | |
self.tracks[track_id].update(detections[detection_idx], frame_num) | |
# 5. إنشاء مسارات جديدة للاكتشافات غير المتطابقة | |
for detection_idx in unmatched_detections: | |
self._create_new_track(detections[detection_idx], frame_num) | |
# 6. حذف المسارات القديمة (التي لم يتم رؤيتها منذ فترة) | |
self._prune_tracks(frame_num) | |
return list(self.tracks.values()) | |
def _add_global_positions(self, detections, ego_pos, ego_theta): | |
"""يضيف مفتاح 'global_pos' لكل اكتشاف في القائمة.""" | |
R = np.array([[np.cos(ego_theta), -np.sin(ego_theta)], | |
[np.sin(ego_theta), np.cos(ego_theta)]]) | |
for det in detections: | |
grid_i, grid_j = det['grid_coords'] | |
raw_data = det['raw_data'] | |
# حساب الموقع النسبي (y-أمام, x-جانب) | |
relative_y, relative_x = convert_grid_to_relative_xy(grid_i, grid_j, self.grid_conf) | |
relative_x += raw_data[1] # الإزاحة الجانبية بالأمتار | |
relative_y += raw_data[2] # الإزاحة الأمامية بالأمتار | |
# التحويل إلى إحداثيات عالمية | |
global_offset = R.dot(np.array([relative_y, relative_x])) | |
det['global_pos'] = ego_pos + global_offset | |
def _match_detections_to_tracks(self, detections): | |
"""مطابقة بسيطة وجشعة (Greedy Matching) بين الاكتشافات والمسارات.""" | |
matches = {} | |
unmatched_detections = set(range(len(detections))) | |
if not self.tracks or not detections: | |
return matches, unmatched_detections | |
active_track_ids = list(self.tracks.keys()) | |
for track_id in active_track_ids: | |
track = self.tracks[track_id] | |
min_dist = self.match_threshold | |
best_det_idx = -1 | |
for i in range(len(detections)): | |
if i not in unmatched_detections: continue | |
dist = np.linalg.norm(track.last_pos - detections[i]['global_pos']) | |
if dist < min_dist: | |
min_dist = dist | |
best_det_idx = i | |
if best_det_idx != -1: | |
matches[track_id] = best_det_idx | |
unmatched_detections.remove(best_det_idx) | |
return matches, unmatched_detections | |
def _create_new_track(self, detection, frame_num): | |
"""إنشاء وإضافة مسار جديد إلى القاموس.""" | |
new_id = self.next_track_id | |
new_track = TrackedObject(new_id, detection, frame_num) | |
self.tracks[new_id] = new_track | |
self.next_track_id += 1 | |
def _prune_tracks(self, current_frame_num): | |
"""حذف المسارات التي لم يتم تحديثها منذ فترة طويلة.""" | |
ids_to_delete = [] | |
for track_id, track in self.tracks.items(): | |
if current_frame_num - track.last_frame_seen > self.prune_age: | |
ids_to_delete.append(track_id) | |
for track_id in ids_to_delete: | |
del self.tracks[track_id] | |
class PIDController: | |
def __init__(self, K_P=1.0, K_I=0.0, K_D=0.0, n=20): | |
self._K_P, self._K_I, self._K_D = K_P, K_I, K_D | |
self._window = deque([0 for _ in range(n)], maxlen=n) | |
def step(self, error): | |
self._window.append(error) | |
integral = np.mean(self._window) if len(self._window) > 1 else 0.0 | |
derivative = (self._window[-1] - self._window[-2]) if len(self._window) > 1 else 0.0 | |
return self._K_P * error + self._K_I * integral + self._K_D * derivative | |
def reset(self): | |
self._window.clear() | |
self._window.extend([0 for _ in range(self._window.maxlen)]) | |
class InterfuserController: | |
""" | |
[النسخة 34.0] المتحكم ذو الذاكرة (Memory-Enhanced). | |
يعالج مشكلة "التردد المميت" عن طريق: | |
1. عدم الثقة بتقديرات السرعة الأولية للكائنات المكتشفة حديثًا. | |
2. إضافة "فترة سماح" (ذاكرة قصيرة المدى) للحفاظ على الحذر بعد فقدان أثر | |
المركبة التي يتم متابعتها، مما يمنع التسارع غير الآمن. | |
""" | |
def __init__(self, config_dict): | |
self.config = config_dict | |
c = self.config['controller_params'] | |
self.freq = self.config.get('frequency', 10.0) # تردد النظام | |
# 1. إعداد وحدات التحكم الأساسية | |
self.turn_controller = PIDController(c['turn_KP'], c['turn_KI'], c['turn_KD'], c['turn_n']) | |
self.speed_controller = PIDController(c['speed_KP'], c['speed_KI'], c['speed_KD'], c['speed_n']) | |
# 2. دمج المتتبع | |
self.tracker = Tracker(grid_conf=config_dict.get('grid_conf', {}), | |
match_threshold=c.get('tracker_match_thresh', 2.5), | |
prune_age=c.get('tracker_prune_age', 5)) | |
# 3. إعداد متغيرات الحالة | |
self.current_target_speed = 0.0 | |
self.last_steer = 0 | |
# [تعديل] إضافة متغيرات الذاكرة والسلوك الحذر | |
self.last_followed_track_id = -1 | |
self.follow_grace_period_timer = 0 | |
# متغيرات الحالة الأخرى | |
self.stop_sign_timer, self.red_light_block_timer = 0, 0 | |
self.stop_steps_counter, self.forced_move_timer = 0, 0 | |
def run_step(self, speed, waypoints, junction, traffic_light, stop_sign, bev_map, ego_pos, ego_theta, frame_num): | |
active_tracks = self.tracker.process_frame(bev_map, ego_pos, ego_theta, frame_num) | |
self._update_system_states(speed, traffic_light) | |
steer = self._get_steering_command(waypoints, speed) | |
final_goal_speed, reason = self._get_goal_speed( | |
speed, waypoints, traffic_light, stop_sign, junction, active_tracks, ego_pos | |
) | |
self._apply_speed_smoothing(final_goal_speed) | |
throttle, brake = self._get_longitudinal_control(speed, self.current_target_speed) | |
final_reason = reason if brake else "Cruising" | |
if self.forced_move_timer > 0: | |
throttle, brake, final_reason = self.config['controller_params']['forced_throttle'], False, "Forced Move" | |
return steer, throttle, brake, {'target_speed': self.current_target_speed, 'brake_reason': final_reason, 'active_tracks': len(active_tracks)} | |
def _get_goal_speed(self, current_speed, waypoints, traffic_light, stop_sign, junction, active_tracks, ego_pos): | |
c = self.config['controller_params'] | |
max_speed = c['max_speed'] | |
# القاعدة 1: قواعد السلامة الثابتة | |
if (traffic_light > c['light_threshold']): return 0.0, "Red Light" | |
if self._is_stop_sign_active(stop_sign, junction): return 0.0, "Stop Sign" | |
# القاعدة 2: منطق تجنب العوائق الديناميكي مع الذاكرة | |
obstacle_speed_limit, obstacle_reason, is_following = self._obstacle_avoidance_logic(active_tracks, ego_pos, current_speed) | |
if is_following: | |
return obstacle_speed_limit, obstacle_reason | |
# القاعدة 3: منطق الملاحة (نقاط المسار) | |
return self._navigation_logic(waypoints, max_speed) | |
def _obstacle_avoidance_logic(self, active_tracks, ego_pos, current_speed): | |
"""[دالة جديدة] تحتوي على كل منطق التعامل مع العوائق.""" | |
c = self.config['controller_params'] | |
obstacle_speed_limit = c['max_speed'] | |
is_following_a_track = False | |
if self.follow_grace_period_timer > 0: | |
self.follow_grace_period_timer -= 1 | |
for track in active_tracks: | |
distance = np.linalg.norm(track.last_pos - ego_pos) | |
if distance < c.get('critical_distance', 4.0): | |
self.last_followed_track_id = -1 | |
return 0.0, f"Critical Obstacle (ID: {track.id})", True | |
if distance < c.get('follow_distance', 12.0): | |
is_following_a_track = True | |
self.last_followed_track_id = track.id | |
self.follow_grace_period_timer = c.get('follow_grace_period', int(2 * self.freq)) # ثانيتان من الذاكرة | |
track_speed = self._estimate_track_speed(track) | |
# [منطق جديد] تجاهل التقديرات الأولية غير الموثوقة | |
if track_speed < 0.1 and len(track.history) < 3: | |
# لا تثق بالتقدير، حافظ على سرعتك الحالية مؤقتًا لمنع الفرملة غير الضرورية | |
target_speed = current_speed | |
else: | |
target_speed = track_speed * c.get('speed_match_factor', 0.9) | |
if target_speed < obstacle_speed_limit: | |
obstacle_speed_limit = target_speed | |
if is_following_a_track: | |
return obstacle_speed_limit, f"Following ID {self.last_followed_track_id}", True | |
# [منطق جديد] إذا لم نعد نرى السيارة ولكن الذاكرة نشطة | |
if self.follow_grace_period_timer > 0: | |
cautious_speed = c.get('cautious_speed', 5.0) # سرعة منخفضة حذرة | |
return min(current_speed, cautious_speed), f"Cautious (Lost Track {self.last_followed_track_id})", True | |
return -1, "No Obstacle", False # إشارة لعدم وجود عائق | |
def _navigation_logic(self, waypoints, max_speed): | |
"""[دالة جديدة] تحتوي على منطق الاقتراب من الهدف النهائي.""" | |
if not waypoints.any(): return max_speed, "Cruising (No Waypoints)" | |
APPROACH_ZONE, STOPPING_ZONE, MIN_APPROACH_SPEED = 15.0, 3.0, 2.5 | |
distance_to_target = np.linalg.norm(waypoints[-1].numpy()) # .numpy() للأمان | |
if distance_to_target > APPROACH_ZONE: | |
return max_speed, "Cruising" | |
elif distance_to_target > STOPPING_ZONE: | |
ratio = (distance_to_target - STOPPING_ZONE) / (APPROACH_ZONE - STOPPING_ZONE) | |
return MIN_APPROACH_SPEED + ratio * (max_speed - MIN_APPROACH_SPEED), "Approaching Target" | |
else: | |
return 0.0, "Stopping at Target" | |
def _apply_speed_smoothing(self, final_goal_speed): | |
"""[دالة جديدة] تحتوي على منطق تنعيم السرعة.""" | |
c = self.config['controller_params'] | |
accel_rate = c.get('accel_rate', 0.1) | |
decel_rate = c.get('decel_rate', 0.2) | |
if final_goal_speed > self.current_target_speed: | |
self.current_target_speed = min(self.current_target_speed + accel_rate, final_goal_speed) | |
elif final_goal_speed < self.current_target_speed: | |
self.current_target_speed = max(self.current_target_speed - decel_rate, final_goal_speed) | |
def _estimate_track_speed(self, track): | |
if len(track.history) < 2: return 0.0 | |
dist_moved = np.linalg.norm(track.history[-1] - track.history[-2]) | |
return dist_moved * self.freq # السرعة = المسافة * التردد | |
# --- باقي الدوال المساعدة تبقى كما هي --- | |
def _is_stop_sign_active(self, stop_sign, junction): # ... | |
c = self.config['controller_params'] | |
is_active = self.stop_sign_timer > 0 | |
if self.stop_sign_timer > 0: self.stop_sign_timer -= 1 | |
elif stop_sign > c['stop_threshold']: self.stop_sign_timer = c['stop_sign_duration'] | |
if junction < 0.1: self.stop_sign_timer = 0 | |
return is_active | |
def _get_longitudinal_control(self, current_speed, target_speed): # ... | |
c = self.config['controller_params'] | |
speed_error = target_speed - current_speed | |
control_signal = self.speed_controller.step(speed_error) | |
if control_signal > 0: | |
throttle, brake = np.clip(control_signal, 0.0, c['max_throttle']), False | |
else: | |
throttle, brake = 0.0, abs(control_signal) > c['brake_sensitivity'] | |
if target_speed < c['min_speed']: throttle, brake = 0.0, True | |
return throttle, brake | |
def _update_system_states(self, speed, traffic_light): # ... | |
c = self.config['controller_params'] | |
if speed < 0.1: self.stop_steps_counter += 1 | |
else: self.stop_steps_counter = 0 | |
if self.stop_steps_counter > c['max_stop_time']: | |
self.forced_move_timer, self.stop_steps_counter = c['forced_move_duration'], 0 | |
if self.forced_move_timer > 0: self.forced_move_timer -= 1 | |
if self.red_light_block_timer > 0: self.red_light_block_timer -= 1 | |
elif speed < 0.1 and traffic_light > c['light_threshold']: | |
self.red_light_block_timer = c['max_red_light_time'] | |
else: | |
self.red_light_block_timer = 0 | |
def _get_steering_command(self, waypoints, speed): | |
if not waypoints.any() or speed < 0.2: return 0.0 | |
# .numpy() للأمان عند التعامل مع NumPy | |
aim_point = (waypoints[1].numpy() + waypoints[0].numpy()) / 2.0 | |
angle_rad = np.arctan2(aim_point[0], aim_point[1]) | |
steer = self.turn_controller.step(np.degrees(angle_rad) / -90.0) | |
steer = self.last_steer * 0.4 + steer * 0.6 | |
self.last_steer = steer | |
return np.clip(steer, -1.0, 1.0) | |
def unnormalize_image(tensor: torch.Tensor) -> np.ndarray: | |
""" | |
يعكس عملية تطبيع الصورة في PyTorch ويحولها إلى صورة BGR | |
جاهزة للعرض باستخدام OpenCV. | |
Args: | |
tensor: موتر (Tensor) الصورة المُطبع، بالشكل (C, H, W). | |
Returns: | |
صورة NumPy array بتنسيق BGR، بالشكل (H, W, C). | |
""" | |
# 1. تحديد قيم المتوسط والانحراف المعياري المستخدمة في التطبيع الأصلي | |
# (هذه هي القيم القياسية لمجموعة بيانات ImageNet) | |
mean = torch.tensor([0.485, 0.456, 0.406], device=tensor.device) | |
std = torch.tensor([0.229, 0.224, 0.225], device=tensor.device) | |
# 2. عكس العملية الحسابية: (tensor * std) + mean | |
# يتم تغيير شكل mean و std لتتناسب مع أبعاد الموتر (C, H, W) | |
tensor = tensor * std[:, None, None] + mean[:, None, None] | |
# 3. قص القيم للتأكد من أنها ضمن النطاق [0, 1] | |
# قد تتجاوز العملية الحسابية هذا النطاق بشكل طفيف بسبب أخطاء التقريب. | |
tensor = torch.clamp(tensor, 0, 1) | |
# 4. تحويل الموتر إلى مصفوفة NumPy | |
img_np = tensor.cpu().numpy() | |
# 5. تغيير ترتيب الأبعاد من (C, H, W) إلى (H, W, C) | |
# PyTorch: (القنوات، الارتفاع، العرض) | |
# NumPy/OpenCV: (الارتفاع، العرض، القنوات) | |
img_np = np.transpose(img_np, (1, 2, 0)) | |
# 6. تحويل نطاق الألوان من [0, 1] إلى [0, 255] وتغيير النوع | |
img_np = (img_np * 255).astype(np.uint8) | |
# 7. تحويل قنوات الألوان من RGB إلى BGR | |
# معظم مكتبات التعلم العميق (بما في ذلك PyTorch) تستخدم RGB. | |
# مكتبة OpenCV تتوقع صيغة BGR. | |
img_bgr = cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR) | |
return img_bgr | |
# ========================================================= | |
# الخطوة 1: دالة world_to_pixel (تبقى كما هي) | |
# ========================================================= | |
def world_to_pixel(world_points: np.ndarray, grid_size_pixels: tuple, grid_size_meters: tuple) -> np.ndarray: | |
pixel_per_meter_x = grid_size_pixels[0] / grid_size_meters[0] | |
pixel_per_meter_y = grid_size_pixels[1] / grid_size_meters[1] | |
pixel_x = (grid_size_pixels[0] / 2) + (world_points[:, 1] * pixel_per_meter_x) | |
pixel_y = grid_size_pixels[1] - (world_points[:, 0] * pixel_per_meter_y) | |
return np.vstack((pixel_x, pixel_y)).T | |
# =================================================================== | |
# =================================================================== | |
def render_bev( | |
active_tracks: List, | |
waypoints_to_draw: np.ndarray, | |
ego_pos_global: np.ndarray, | |
ego_theta_global: float, | |
pixels_per_meter: int = 20, | |
grid_size_meters: tuple = (20, 20), | |
future_time_steps: tuple = (1.0, 2.0) | |
) -> Dict[str, np.ndarray]: | |
side_m, fwd_m = grid_size_meters | |
width_px, height_px = int(side_m * pixels_per_meter), int(fwd_m * pixels_per_meter) | |
# ... (كود مصفوفة التحويل وإنشاء الخرائط يبقى كما هو) | |
R_world_to_ego = np.array([[math.cos(ego_theta_global), math.sin(ego_theta_global)], | |
[-math.sin(ego_theta_global), math.cos(ego_theta_global)]]) | |
bev_maps = { | |
't0': np.zeros((height_px, width_px, 3), dtype=np.uint8), | |
't1': np.zeros((height_px, width_px, 3), dtype=np.uint8), | |
't2': np.zeros((height_px, width_px, 3), dtype=np.uint8) | |
} | |
for track in active_tracks: | |
pos_global=track.last_pos;yaw_rad_global=getattr(track,'last_yaw',0); | |
speed=getattr(track,'speed',0);extent=getattr(track,'last_extent',(1.0,2.0)); | |
relative_yaw_rad=yaw_rad_global-ego_theta_global;angle_deg=90-math.degrees(relative_yaw_rad); | |
width_px_obj=extent[1]*2*pixels_per_meter; | |
length_px_obj=extent[0]*2*pixels_per_meter; | |
box_size_px=(float(width_px_obj),float(length_px_obj)); | |
relative_pos_t0=R_world_to_ego.dot(pos_global-ego_pos_global); | |
center_pixel_t0=world_to_pixel(np.array([relative_pos_t0]),(width_px,height_px),grid_size_meters)[0]; | |
box_points_t0=cv2.boxPoints(((float(center_pixel_t0[0]),float(center_pixel_t0[1])-40),box_size_px,angle_deg)); | |
cv2.drawContours(bev_maps['t0'],[box_points_t0.astype(np.int32)],0,(0,0,255),2); | |
for i,t in enumerate(future_time_steps): | |
offset=np.array([math.cos(yaw_rad_global),math.sin(yaw_rad_global)])*speed*t; | |
future_pos_global=pos_global+offset; | |
relative_pos_future=R_world_to_ego.dot(future_pos_global-ego_pos_global); | |
center_pixel_future=world_to_pixel(np.array([relative_pos_future]),(width_px,height_px-40),grid_size_meters)[0];key=f't{i+1}';box_points_future=cv2.boxPoints(((float(center_pixel_future[0]),float(center_pixel_future[1])),box_size_px,angle_deg)); | |
cv2.drawContours(bev_maps[key],[box_points_future.astype(np.int32)],0,(255,0,128),2); | |
ego_center_pixel=((width_px/2)+20,height_px); | |
ego_size_px=(1.8*pixels_per_meter,3.8*pixels_per_meter); | |
ego_box=cv2.boxPoints((ego_center_pixel,ego_size_px,0)); | |
for key in bev_maps: | |
cv2.drawContours(bev_maps[key],[ego_box.astype(np.int32)],0,(0,255,255),-1); | |
# ========================================================================= | |
# ========================================================================= | |
if waypoints_to_draw is not None and waypoints_to_draw.size > 0: | |
waypoints_corrected = waypoints_to_draw.copy() | |
waypoints_corrected[:, 1] *= 1 # [Forward, Left] -> [Forward, -Left] which is [Forward, Right] | |
waypoints_pixels = world_to_pixel( | |
waypoints_corrected, | |
(width_px, height_px), | |
grid_size_meters | |
) | |
for point in waypoints_pixels: | |
center = (int(point[0])+20, int(point[1])) | |
cv2.circle( | |
bev_maps['t0'], | |
center, | |
radius=3, | |
color=(0, 255, 0), | |
thickness=-1, | |
lineType=cv2.LINE_AA | |
) | |
return bev_maps | |
# ========================================================= | |
# # (دالة world_to_pixel تبقى كما هي من الرد السابق) | |
# def world_to_pixel(world_points, grid_size_pixels, grid_size_meters): | |
# pixel_per_meter_x = grid_size_pixels[0] / grid_size_meters[0] | |
# pixel_per_meter_y = grid_size_pixels[1] / grid_size_meters[1] | |
# pixel_x = (world_points[:, 1] * pixel_per_meter_x) + (grid_size_pixels[0] / 2) | |
# pixel_y = (grid_size_pixels[1]) - (world_points[:, 0] * pixel_per_meter_y) | |
# return np.vstack((pixel_x, pixel_y)).T | |
# ========================================================== | |
# الدالة الثانية: generate_bev_image (النسخة النهائية) | |
# ========================================================== | |
def generate_bev_image(sample, grid_size_pixels=(400, 400), grid_size_meters=(20, 20)): | |
""" | |
تأخذ عينة بيانات وتنشئ صورة BEV خام باستخدام OpenCV. | |
:return: صورة (numpy array) بتنسيق BGR. | |
""" | |
side_meters, forward_meters = grid_size_meters | |
width_px, height_px = grid_size_pixels | |
bev_image = np.zeros((height_px, width_px, 3), dtype=np.uint8) | |
route_data = sample['measurements'].get('route', []) | |
if route_data: | |
route_world = np.array([[point[0], point[1]] for point in route_data]) | |
route_pixels = world_to_pixel(route_world, (width_px, height_px), (side_meters, forward_meters)) | |
cv2.polylines(bev_image, [route_pixels.astype(np.int_)], isClosed=False, color=(0, 255, 0), thickness=2) | |
objects_data = sample['objects'] | |
for obj in objects_data: | |
obj_class = obj.get('class', 'غير معروف') | |
if obj_class in ['weather', 'ego_info', 'static']: continue | |
pos = obj.get('position', [0,0,0]) | |
ext = obj.get('extent', [0,0,0]) | |
yaw = obj.get('yaw', 0) | |
center_world = np.array([[pos[0], pos[1]]]) | |
center_pixel = world_to_pixel(center_world, (width_px, height_px), (side_meters, forward_meters))[0] | |
length_m, width_m = ext[0]*2, ext[1]*2 | |
width_px_obj = width_m * (width_px / side_meters) | |
length_px_obj = length_m * (height_px / forward_meters) | |
color = (255, 255, 0) if obj_class == 'ego_car' else (0, 0, 255) | |
center_tuple = (float(center_pixel[0]), float(center_pixel[1])) | |
# ملاحظة: OpenCV تتوقع (العرض، الطول) في size_tuple | |
size_tuple = (float(width_px_obj), float(length_px_obj)) | |
angle_deg = np.degrees(yaw) # عكس الزاوية لتناسب OpenCV | |
box = cv2.boxPoints((center_tuple, size_tuple, angle_deg)) | |
box = box.astype(np.int_) | |
cv2.drawContours(bev_image, [box], 0, color, 2) | |
return bev_image | |
# # ========================================================== | |
# # الكلاس الثالث: الواجهة الاحترافية للعرض | |
# # ========================================================== | |
# ============================================================================== | |
# الكلاس الأول: إعدادات العرض | |
# ============================================================================== | |
class DisplayConfig: | |
width: int = 1920 | |
height: int = 1080 | |
camera_ratio: float = 0.65 | |
panel_margin: int = 30 | |
section_spacing: int = 25 | |
min_section_height: int = 180 | |
# ============================================================================== | |
# الكلاس الثاني: الواجهة الاحترافية للعرض (النسخة المحسنة) | |
# ============================================================================== | |
class DisplayInterface: | |
def __init__(self, config: Optional[DisplayConfig] = None): | |
self.config = config if config else DisplayConfig() | |
self._init_colors_and_fonts() | |
self._error_log = [] | |
def _init_colors_and_fonts(self): | |
self.colors = {'panel_bg':(28,30,34), 'text':(240,240,240), 'text_header':(0,165,255), 'separator':(75,75,75), 'throttle':(100,200,100), 'brake':(30,30,240), 'steer_bar':(0,190,255), 'steer_neutral':(100,100,100), 'light_red':(30,30,240), 'light_green':(100,200,100), 'stop_sign':(200,100,255), 'gauge_bg':(50,50,50), 'gauge_needle':(30,30,240), 'error':(30,30,240)} | |
self.fonts = {'header': cv2.FONT_HERSHEY_DUPLEX, 'normal': cv2.FONT_HERSHEY_SIMPLEX} | |
def run_interface(self, data: Dict) -> np.ndarray: | |
"""الدالة الرئيسية التي تنشئ وتعيد لوحة التحكم النهائية.""" | |
try: | |
dashboard = np.zeros((self.config.height, self.config.width, 3), dtype=np.uint8) | |
cam_w = int(self.config.width * self.config.camera_ratio) | |
# --- 1. رسم المكونات الرئيسية --- | |
self._draw_camera_view(dashboard, data.get('camera_view'), cam_w) | |
self._draw_info_overlay(dashboard, data, cam_w) | |
# --- 2. رسم لوحة المعلومات الجانبية --- | |
dashboard[:, cam_w:] = self.colors['panel_bg'] | |
current_y = self.config.panel_margin | |
current_y = self._draw_bev_maps_section(dashboard, data, cam_w, current_y) | |
self._draw_controls_section(dashboard, data, cam_w, current_y + self.config.section_spacing) | |
return dashboard | |
except Exception as e: | |
self._log_error(str(e)) | |
return self._create_error_display(str(e)) | |
# -------------------------------------------------------------------------- | |
# الدوال المساعدة للرسم | |
# -------------------------------------------------------------------------- | |
def _draw_camera_view(self, db: np.ndarray, view: np.ndarray, cam_w: int): | |
if view is not None: | |
db[:, :cam_w] = cv2.resize(view, (cam_w, self.config.height)) | |
def _draw_info_overlay(self, db: np.ndarray, data: Dict, cam_w: int): | |
"""يرسم شريط المعلومات الشفاف فوق عرض الكاميرا.""" | |
overlay = np.zeros_like(db); panel_h=90; alpha=0.6 | |
cv2.rectangle(overlay, (0, self.config.height-panel_h), (cam_w, self.config.height), self.colors['panel_bg'], -1) | |
db[:,:cam_w] = cv2.addWeighted(db[:,:cam_w], 1, overlay[:,:cam_w], alpha, 0) | |
base_y = self.config.height - panel_h + 35 | |
self._draw_text(db, f"Frame: {data.get('frame_num', 'N/A')}", (20, base_y), 1.2) | |
# ============ | |
counts=data.get('object_counts',{}); | |
self._draw_text(db, f"Detections: C:{counts.get('car',0)}", (20, base_y+35), 1.0, font_type='normal') | |
# self._draw_text(db, f"Detections: C:{counts.get('car',0)} B:{counts.get('bike',0)} P:{counts.get('pedestrian',0)}", | |
# (20, base_y + 35), self.fonts['normal'], 1.0) | |
right_x=cam_w-320 | |
light_prob = data.get('light_prob', 0.0) | |
light_color = self.colors['light_red'] if light_prob > 0.5 else self.colors['light_green'] | |
cv2.circle(db, (right_x, base_y), 12, light_color, -1) | |
self._draw_text(db, f"Light: {light_prob:.2f}", (right_x+25, base_y+8), 1.0, font_type='normal') | |
stop_prob = data.get('stop_prob', 0.0) | |
stop_color = self.colors['stop_sign'] if stop_prob > 0.5 else self.colors['steer_neutral'] | |
cv2.circle(db, (right_x, base_y+35), 12, stop_color, -1) | |
self._draw_text(db, f"Stop: {stop_prob:.2f}", (right_x+25, base_y+43), 1.0, font_type='normal') | |
def _draw_bev_maps_section(self, db: np.ndarray, data: Dict, info_x: int, start_y: int) -> int: | |
"""يرسم قسم خرائط BEV (الحالية والمستقبلية).""" | |
x = info_x + self.config.panel_margin | |
panel_w = (self.config.width - info_x) - 2 * self.config.panel_margin | |
main_bev_h = 380; future_bev_h = 180 | |
# الخريطة الرئيسية | |
bev_t0 = cv2.resize(data.get('map_t0', np.zeros((1,1,3))), (panel_w, main_bev_h)) | |
self._draw_text(bev_t0, "BEV t+0.0s", (10, 30), 1.2, color=self.colors['text_header']) | |
db[start_y:start_y+main_bev_h, x:x+panel_w] = bev_t0 | |
# الخرائط المستقبلية | |
future_y = start_y + main_bev_h + 10 | |
future_bev_w = panel_w // 2 | |
bev_t1 = cv2.resize(data.get('map_t1', np.zeros((1,1,3))), (future_bev_w, future_bev_h)) | |
self._draw_text(bev_t1, "t+1.0s", (10, 20), 0.7, font_type='normal') | |
bev_t2 = cv2.resize(data.get('map_t2', np.zeros((1,1,3))), (future_bev_w, future_bev_h)) | |
self._draw_text(bev_t2, "t+2.0s", (10, 20), 0.7, font_type='normal') | |
db[future_y:future_y+future_bev_h, x:x+future_bev_w] = bev_t1 | |
db[future_y:future_y+future_bev_h, x+future_bev_w:x+panel_w] = bev_t2 | |
separator_y = future_y + future_bev_h + self.config.section_spacing | |
cv2.line(db, (info_x, separator_y), (self.config.width, separator_y), self.colors['separator'], 2) | |
return separator_y | |
def _draw_controls_section(self, db: np.ndarray, data: Dict, info_x: int, start_y: int): | |
"""يرسم قسم عناصر التحكم بالمركبة.""" | |
x = info_x + self.config.panel_margin | |
self._draw_text(db, "VEHICLE CONTROL", (x, start_y+40), 1.2, color=self.colors['text_header']) | |
self._draw_gauge_display(db, data, x, start_y + 80) | |
self._draw_control_bars(db, data, x + 180, start_y + 80) | |
def _draw_gauge_display(self, db: np.ndarray, data: Dict, x: int, y: int): | |
"""يرسم عداد السرعة.""" | |
radius = 65 | |
self._draw_speed_gauge(db, x+radius, y+radius, radius, data.get('speed',0), data.get('target_speed',0), 60.0) | |
def _draw_control_bars(self, db: np.ndarray, data: Dict, x: int, y: int): | |
"""يرسم مؤشرات التوجيه، الوقود، والمكابح.""" | |
bar_w = self.config.width - x - self.config.panel_margin | |
self._draw_text(db, f"Steer: {data.get('steer', 0.0):.2f}", (x, y), 0.8, font_type='normal') | |
self._draw_steer_indicator(db, x, y+15, bar_w, 20, data.get('steer', 0.0)) | |
self._draw_text(db, f"Throttle: {data.get('throttle', 0.0):.2f}", (x, y+50), 0.8, font_type='normal') | |
self._draw_bar(db, x, y+65, bar_w, 20, data.get('throttle', 0.0), 1.0, self.colors['throttle']) | |
brake_on = data.get('brake', False) | |
brake_color = self.colors['brake'] if brake_on else self.colors['text'] | |
self._draw_text(db, f"Brake: {'ON' if brake_on else 'OFF'}", (x, y+100), 0.8, brake_color, font_type='normal') | |
self._draw_bar(db, x, y+115, bar_w, 20, float(brake_on), 1.0, self.colors['brake']) | |
# -------------------------------------------------------------------------- | |
# الدوال الأساسية للرسم (Primitives) | |
# -------------------------------------------------------------------------- | |
def _draw_text(self, img, text, pos, size, color=None, font_type='header', thickness=1): | |
color = color if color is not None else self.colors['text'] | |
font = self.fonts[font_type] | |
cv2.putText(img, text, (pos[0]+1, pos[1]+1), font, size, (0,0,0), thickness+1, cv2.LINE_AA) | |
cv2.putText(img, text, pos, font, size, color, thickness, cv2.LINE_AA) | |
def _draw_bar(self, img, x, y, w, h, val, max_val, color): | |
val=np.clip(val,0,max_val); ratio=val/max_val | |
cv2.rectangle(img,(x,y),(x+w,y+h),self.colors['steer_neutral'],-1) | |
cv2.rectangle(img,(x,y),(x+int(w*ratio),y+h),color,-1) | |
cv2.rectangle(img,(x,y),(x+w,y+h),self.colors['text'],1) | |
def _draw_steer_indicator(self, img, x, y, w, h, val): | |
center_x=x+w//2; val=np.clip(val,-1.0,1.0) | |
cv2.rectangle(img,(x,y),(x+w,y+h),self.colors['steer_neutral'],-1) | |
cv2.line(img,(center_x,y),(center_x,y+h),self.colors['text'],1) | |
indicator_x=center_x+int(val*(w//2*0.95)) | |
cv2.line(img,(indicator_x,y),(indicator_x,y+h),self.colors['steer_bar'],6) | |
def _draw_speed_gauge(self, img, cx, cy, r, spd, t_spd, m_spd): | |
cv2.circle(img,(cx,cy),r,self.colors['gauge_bg'],-1); cv2.circle(img,(cx,cy),r,self.colors['text'],2) | |
ratio=np.clip(spd/m_spd,0,1); angle=math.radians(135+ratio*270) | |
ex=cx+int(r*0.85*math.cos(angle)); ey=cy+int(r*0.85*math.sin(angle)) | |
cv2.line(img,(cx,cy),(ex,ey),self.colors['gauge_needle'],3); cv2.circle(img,(cx,cy),5,self.colors['gauge_needle'],-1) | |
(w,h),_=cv2.getTextSize(f"{spd:.1f}",self.fonts['header'],1.5,3) | |
self._draw_text(img,f"{spd:.1f}",(cx-w//2,cy+10),1.5); self._draw_text(img,"km/h",(cx-25,cy+35),0.5, font_type='normal') | |
self._draw_text(img,f"Target: {t_spd:.1f}",(cx-50,cy-r-10),0.7) | |
def _create_error_display(self, error_msg: str): | |
error_display = np.zeros((self.config.height, self.config.width, 3), dtype=np.uint8) | |
error_display[:] = self.colors['panel_bg'] | |
self._draw_text(error_display, "SYSTEM ERROR", (self.config.width//2-200, self.config.height//2-50), 1.5, self.colors['error']) | |
self._draw_text(error_display, error_msg, (50, self.config.height//2+20), 0.8, self.colors['steer_bar'], font_type='normal') | |
return error_display | |
def _log_error(self, error_msg: str): | |
self._error_log.append(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {error_msg}") | |
if len(self._error_log) > 50: self._error_log.pop(0) | |