# simulation_modules.py import torch import numpy as np import cv2 import math from collections import deque from typing import List, Tuple, Dict, Any, Optional # ================== Constants ================== WAYPOINT_SCALE_FACTOR = 5.0 T1_FUTURE_TIME = 1.0 T2_FUTURE_TIME = 2.0 PIXELS_PER_METER = 8 MAX_DISTANCE = 32 IMG_SIZE = MAX_DISTANCE * PIXELS_PER_METER * 2 EGO_CAR_X = IMG_SIZE // 2 EGO_CAR_Y = IMG_SIZE - (4.0 * PIXELS_PER_METER) COLORS = { 'vehicle': [255, 0, 0], 'pedestrian': [0, 255, 0], 'cyclist': [0, 0, 255], 'waypoint': [255, 255, 0], 'ego_car': [255, 255, 255] } # ================== PID Controller ================== class PIDController: def __init__(self, K_P=1.0, K_I=0.0, K_D=0.0, n=20): self._K_P = K_P self._K_I = K_I self._K_D = K_D self._window = deque([0 for _ in range(n)], maxlen=n) def step(self, error): self._window.append(error) if len(self._window) >= 2: integral = np.mean(self._window) derivative = self._window[-1] - self._window[-2] else: integral = derivative = 0.0 return self._K_P * error + self._K_I * integral + self._K_D * derivative # ================== Helper Functions ================== def ensure_rgb(image): if len(image.shape) == 2: return cv2.cvtColor(image, cv2.COLOR_GRAY2BGR) elif image.shape[2] == 1: return cv2.cvtColor(image, cv2.COLOR_GRAY2BGR) return image def add_rect(img, loc, ori, box, value, color): center_x = int(loc[0] * PIXELS_PER_METER + MAX_DISTANCE * PIXELS_PER_METER) center_y = int(loc[1] * PIXELS_PER_METER + MAX_DISTANCE * PIXELS_PER_METER) size_px = (int(box[0] * PIXELS_PER_METER), int(box[1] * PIXELS_PER_METER)) angle_deg = -np.degrees(math.atan2(ori[1], ori[0])) box_points = cv2.boxPoints(((center_x, center_y), size_px, angle_deg)) box_points = np.int32(box_points) adjusted_color = [int(c * value) for c in color] cv2.fillConvexPoly(img, box_points, adjusted_color) return img def render(traffic_grid, t=0): img = np.zeros((IMG_SIZE, IMG_SIZE, 3), dtype=np.uint8) counts = {'vehicles': 0, 'pedestrians': 0, 'cyclists': 0} if isinstance(traffic_grid, torch.Tensor): traffic_grid = traffic_grid.cpu().numpy() h, w, c = traffic_grid.shape for y in range(h): for x in range(w): for ch in range(c): if traffic_grid[y, x, ch] > 0.1: world_x = (x / w - 0.5) * MAX_DISTANCE * 2 world_y = (y / h - 0.5) * MAX_DISTANCE * 2 if ch < 3: color = COLORS['vehicle'] counts['vehicles'] += 1 box_size = [2.0, 4.0] elif ch < 5: color = COLORS['pedestrian'] counts['pedestrians'] += 1 box_size = [0.8, 0.8] else: color = COLORS['cyclist'] counts['cyclists'] += 1 box_size = [1.2, 2.0] img = add_rect(img, [world_x, world_y], [1.0, 0.0], box_size, traffic_grid[y, x, ch], color) return img, counts def render_waypoints(waypoints, scale_factor=WAYPOINT_SCALE_FACTOR): img = np.zeros((IMG_SIZE, IMG_SIZE, 3), dtype=np.uint8) if isinstance(waypoints, torch.Tensor): waypoints = waypoints.cpu().numpy() scaled_waypoints = waypoints * scale_factor for i, wp in enumerate(scaled_waypoints): px = int(wp[0] * PIXELS_PER_METER + IMG_SIZE // 2) py = int(wp[1] * PIXELS_PER_METER + IMG_SIZE // 2) if 0 <= px < IMG_SIZE and 0 <= py < IMG_SIZE: radius = max(3, 8 - i) cv2.circle(img, (px, py), radius, COLORS['waypoint'], -1) if i > 0: prev_px = int(scaled_waypoints[i-1][0] * PIXELS_PER_METER + IMG_SIZE // 2) prev_py = int(scaled_waypoints[i-1][1] * PIXELS_PER_METER + IMG_SIZE // 2) if 0 <= prev_px < IMG_SIZE and 0 <= prev_py < IMG_SIZE: cv2.line(img, (prev_px, prev_py), (px, py), COLORS['waypoint'], 2) return img def render_self_car(img): car_pos = [0, -4.0] car_ori = [1.0, 0.0] car_size = [2.0, 4.5] return add_rect(img, car_pos, car_ori, car_size, 1.0, COLORS['ego_car']) # ================== Tracker Classes ================== class TrackedObject: def __init__(self, obj_id: int): self.id = obj_id self.last_step = 0 self.last_pos = [0.0, 0.0] self.historical_pos = [] self.historical_steps = [] self.velocity = [0.0, 0.0] self.confidence = 1.0 def update(self, step: int, obj_info: List[float]): self.last_step = step self.last_pos = obj_info[:2] self.historical_pos.append(obj_info[:2]) self.historical_steps.append(step) if len(self.historical_pos) >= 2: dt = self.historical_steps[-1] - self.historical_steps[-2] if dt > 0: dx = self.historical_pos[-1][0] - self.historical_pos[-2][0] dy = self.historical_pos[-1][1] - self.historical_pos[-2][1] self.velocity = [dx/dt, dy/dt] def predict_position(self, future_time: float) -> List[float]: predicted_x = self.last_pos[0] + self.velocity[0] * future_time predicted_y = self.last_pos[1] + self.velocity[1] * future_time return [predicted_x, predicted_y] def is_alive(self, current_step: int, max_age: int = 5) -> bool: return (current_step - self.last_step) <= max_age class Tracker: def __init__(self, frequency: int = 10): self.tracks: List[TrackedObject] = [] self.frequency = frequency self.next_id = 0 self.current_step = 0 def update_and_predict(self, detections: List[Dict], step: int) -> np.ndarray: self.current_step = step for detection in detections: pos = detection.get('position', [0, 0]) feature = detection.get('feature', 0.5) best_match = None min_distance = float('inf') for track in self.tracks: if track.is_alive(step): distance = np.linalg.norm(np.array(pos) - np.array(track.last_pos)) if distance < min_distance and distance < 2.0: min_distance = distance best_match = track if best_match: best_match.update(step, pos + [feature]) else: new_track = TrackedObject(self.next_id) new_track.update(step, pos + [feature]) self.tracks.append(new_track) self.next_id += 1 self.tracks = [t for t in self.tracks if t.is_alive(step)] return self._generate_prediction_grid() def _generate_prediction_grid(self) -> np.ndarray: grid = np.zeros((20, 20, 7), dtype=np.float32) for track in self.tracks: if track.is_alive(self.current_step): current_pos = track.last_pos future_pos_t1 = track.predict_position(T1_FUTURE_TIME) future_pos_t2 = track.predict_position(T2_FUTURE_TIME) for pos in [current_pos, future_pos_t1, future_pos_t2]: grid_x = int((pos[0] / (MAX_DISTANCE * 2) + 0.5) * 20) grid_y = int((pos[1] / (MAX_DISTANCE * 2) + 0.5) * 20) if 0 <= grid_x < 20 and 0 <= grid_y < 20: channel = 0 grid[grid_y, grid_x, channel] = max(grid[grid_y, grid_x, channel], track.confidence) return grid # ================== Controller Classes ================== class ControllerConfig: def __init__(self): self.turn_KP = 1.0 self.turn_KI = 0.1 self.turn_KD = 0.1 self.turn_n = 20 self.speed_KP = 0.5 self.speed_KI = 0.05 self.speed_KD = 0.1 self.speed_n = 20 self.max_speed = 6.0 self.max_throttle = 0.75 self.clip_delta = 0.25 self.brake_speed = 0.4 self.brake_ratio = 1.1 class InterfuserController: def __init__(self, config: ControllerConfig): self.config = config self.turn_controller = PIDController(config.turn_KP, config.turn_KI, config.turn_KD, config.turn_n) self.speed_controller = PIDController(config.speed_KP, config.speed_KI, config.speed_KD, config.speed_n) self.last_steer = 0.0 self.last_throttle = 0.0 self.target_speed = 3.0 def run_step(self, current_speed: float, waypoints: np.ndarray, junction: float, traffic_light_state: float, stop_sign: float, meta_data: Dict) -> Tuple[float, float, bool, str]: if isinstance(waypoints, torch.Tensor): waypoints = waypoints.cpu().numpy() if len(waypoints) > 1: dx = waypoints[1][0] - waypoints[0][0] dy = waypoints[1][1] - waypoints[0][1] target_yaw = math.atan2(dy, dx) steer = self.turn_controller.step(target_yaw) else: steer = 0.0 steer = np.clip(steer, -1.0, 1.0) target_speed = self.target_speed if junction > 0.5: target_speed *= 0.7 if abs(steer) > 0.3: target_speed *= 0.8 speed_error = target_speed - current_speed throttle = self.speed_controller.step(speed_error) throttle = np.clip(throttle, 0.0, self.config.max_throttle) brake = False if traffic_light_state > 0.5 or stop_sign > 0.5 or current_speed > self.config.max_speed: brake = True throttle = 0.0 self.last_steer = steer self.last_throttle = throttle metadata = f"Speed:{current_speed:.1f} Target:{target_speed:.1f} Junction:{junction:.2f}" return steer, throttle, brake, metadata # ================== Display Interface ================== class DisplayInterface: def __init__(self, width: int = 1200, height: int = 600): self._width = width self._height = height self.camera_width = width // 2 self.camera_height = height self.map_width = width // 2 self.map_height = height // 3 def run_interface(self, data: Dict[str, Any]) -> np.ndarray: dashboard = np.zeros((self._height, self._width, 3), dtype=np.uint8) # Camera view camera_view = data.get('camera_view') if camera_view is not None: camera_resized = cv2.resize(camera_view, (self.camera_width, self.camera_height)) dashboard[:, :self.camera_width] = camera_resized # Maps map_start_x = self.camera_width map_t0 = data.get('map_t0') if map_t0 is not None: map_resized = cv2.resize(map_t0, (self.map_width, self.map_height)) dashboard[:self.map_height, map_start_x:] = map_resized cv2.putText(dashboard, "Current (t=0)", (map_start_x + 10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) map_t1 = data.get('map_t1') if map_t1 is not None: map_resized = cv2.resize(map_t1, (self.map_width, self.map_height)) y_start = self.map_height dashboard[y_start:y_start + self.map_height, map_start_x:] = map_resized cv2.putText(dashboard, f"Future (t={T1_FUTURE_TIME}s)", (map_start_x + 10, y_start + 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) map_t2 = data.get('map_t2') if map_t2 is not None: map_resized = cv2.resize(map_t2, (self.map_width, self.map_height)) y_start = self.map_height * 2 dashboard[y_start:, map_start_x:] = map_resized cv2.putText(dashboard, f"Future (t={T2_FUTURE_TIME}s)", (map_start_x + 10, y_start + 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) # Text info text_info = data.get('text_info', {}) y_offset = 50 for key, value in text_info.items(): cv2.putText(dashboard, value, (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) y_offset += 30 return dashboard