Spaces:
Running
Running
# simulation_modules.py | |
import torch | |
import numpy as np | |
import cv2 | |
import math | |
from collections import deque | |
from typing import List, Tuple, Dict, Any, Optional | |
# ================== Constants ================== | |
WAYPOINT_SCALE_FACTOR = 5.0 | |
T1_FUTURE_TIME = 1.0 | |
T2_FUTURE_TIME = 2.0 | |
PIXELS_PER_METER = 8 | |
MAX_DISTANCE = 32 | |
IMG_SIZE = MAX_DISTANCE * PIXELS_PER_METER * 2 | |
EGO_CAR_X = IMG_SIZE // 2 | |
EGO_CAR_Y = IMG_SIZE - (4.0 * PIXELS_PER_METER) | |
COLORS = { | |
'vehicle': [255, 0, 0], | |
'pedestrian': [0, 255, 0], | |
'cyclist': [0, 0, 255], | |
'waypoint': [255, 255, 0], | |
'ego_car': [255, 255, 255] | |
} | |
# ================== PID Controller ================== | |
class PIDController: | |
def __init__(self, K_P=1.0, K_I=0.0, K_D=0.0, n=20): | |
self._K_P = K_P | |
self._K_I = K_I | |
self._K_D = K_D | |
self._window = deque([0 for _ in range(n)], maxlen=n) | |
def step(self, error): | |
self._window.append(error) | |
if len(self._window) >= 2: | |
integral = np.mean(self._window) | |
derivative = self._window[-1] - self._window[-2] | |
else: | |
integral = derivative = 0.0 | |
return self._K_P * error + self._K_I * integral + self._K_D * derivative | |
# ================== Helper Functions ================== | |
def ensure_rgb(image): | |
if len(image.shape) == 2: | |
return cv2.cvtColor(image, cv2.COLOR_GRAY2BGR) | |
elif image.shape[2] == 1: | |
return cv2.cvtColor(image, cv2.COLOR_GRAY2BGR) | |
return image | |
def add_rect(img, loc, ori, box, value, color): | |
center_x = int(loc[0] * PIXELS_PER_METER + MAX_DISTANCE * PIXELS_PER_METER) | |
center_y = int(loc[1] * PIXELS_PER_METER + MAX_DISTANCE * PIXELS_PER_METER) | |
size_px = (int(box[0] * PIXELS_PER_METER), int(box[1] * PIXELS_PER_METER)) | |
angle_deg = -np.degrees(math.atan2(ori[1], ori[0])) | |
box_points = cv2.boxPoints(((center_x, center_y), size_px, angle_deg)) | |
box_points = np.int32(box_points) | |
adjusted_color = [int(c * value) for c in color] | |
cv2.fillConvexPoly(img, box_points, adjusted_color) | |
return img | |
def render(traffic_grid, t=0): | |
img = np.zeros((IMG_SIZE, IMG_SIZE, 3), dtype=np.uint8) | |
counts = {'vehicles': 0, 'pedestrians': 0, 'cyclists': 0} | |
if isinstance(traffic_grid, torch.Tensor): | |
traffic_grid = traffic_grid.cpu().numpy() | |
h, w, c = traffic_grid.shape | |
for y in range(h): | |
for x in range(w): | |
for ch in range(c): | |
if traffic_grid[y, x, ch] > 0.1: | |
world_x = (x / w - 0.5) * MAX_DISTANCE * 2 | |
world_y = (y / h - 0.5) * MAX_DISTANCE * 2 | |
if ch < 3: | |
color = COLORS['vehicle'] | |
counts['vehicles'] += 1 | |
box_size = [2.0, 4.0] | |
elif ch < 5: | |
color = COLORS['pedestrian'] | |
counts['pedestrians'] += 1 | |
box_size = [0.8, 0.8] | |
else: | |
color = COLORS['cyclist'] | |
counts['cyclists'] += 1 | |
box_size = [1.2, 2.0] | |
img = add_rect(img, [world_x, world_y], [1.0, 0.0], | |
box_size, traffic_grid[y, x, ch], color) | |
return img, counts | |
def render_waypoints(waypoints, scale_factor=WAYPOINT_SCALE_FACTOR): | |
img = np.zeros((IMG_SIZE, IMG_SIZE, 3), dtype=np.uint8) | |
if isinstance(waypoints, torch.Tensor): | |
waypoints = waypoints.cpu().numpy() | |
scaled_waypoints = waypoints * scale_factor | |
for i, wp in enumerate(scaled_waypoints): | |
px = int(wp[0] * PIXELS_PER_METER + IMG_SIZE // 2) | |
py = int(wp[1] * PIXELS_PER_METER + IMG_SIZE // 2) | |
if 0 <= px < IMG_SIZE and 0 <= py < IMG_SIZE: | |
radius = max(3, 8 - i) | |
cv2.circle(img, (px, py), radius, COLORS['waypoint'], -1) | |
if i > 0: | |
prev_px = int(scaled_waypoints[i-1][0] * PIXELS_PER_METER + IMG_SIZE // 2) | |
prev_py = int(scaled_waypoints[i-1][1] * PIXELS_PER_METER + IMG_SIZE // 2) | |
if 0 <= prev_px < IMG_SIZE and 0 <= prev_py < IMG_SIZE: | |
cv2.line(img, (prev_px, prev_py), (px, py), COLORS['waypoint'], 2) | |
return img | |
def render_self_car(img): | |
car_pos = [0, -4.0] | |
car_ori = [1.0, 0.0] | |
car_size = [2.0, 4.5] | |
return add_rect(img, car_pos, car_ori, car_size, 1.0, COLORS['ego_car']) | |
# ================== Tracker Classes ================== | |
class TrackedObject: | |
def __init__(self, obj_id: int): | |
self.id = obj_id | |
self.last_step = 0 | |
self.last_pos = [0.0, 0.0] | |
self.historical_pos = [] | |
self.historical_steps = [] | |
self.velocity = [0.0, 0.0] | |
self.confidence = 1.0 | |
def update(self, step: int, obj_info: List[float]): | |
self.last_step = step | |
self.last_pos = obj_info[:2] | |
self.historical_pos.append(obj_info[:2]) | |
self.historical_steps.append(step) | |
if len(self.historical_pos) >= 2: | |
dt = self.historical_steps[-1] - self.historical_steps[-2] | |
if dt > 0: | |
dx = self.historical_pos[-1][0] - self.historical_pos[-2][0] | |
dy = self.historical_pos[-1][1] - self.historical_pos[-2][1] | |
self.velocity = [dx/dt, dy/dt] | |
def predict_position(self, future_time: float) -> List[float]: | |
predicted_x = self.last_pos[0] + self.velocity[0] * future_time | |
predicted_y = self.last_pos[1] + self.velocity[1] * future_time | |
return [predicted_x, predicted_y] | |
def is_alive(self, current_step: int, max_age: int = 5) -> bool: | |
return (current_step - self.last_step) <= max_age | |
class Tracker: | |
def __init__(self, frequency: int = 10): | |
self.tracks: List[TrackedObject] = [] | |
self.frequency = frequency | |
self.next_id = 0 | |
self.current_step = 0 | |
def update_and_predict(self, detections: List[Dict], step: int) -> np.ndarray: | |
self.current_step = step | |
for detection in detections: | |
pos = detection.get('position', [0, 0]) | |
feature = detection.get('feature', 0.5) | |
best_match = None | |
min_distance = float('inf') | |
for track in self.tracks: | |
if track.is_alive(step): | |
distance = np.linalg.norm(np.array(pos) - np.array(track.last_pos)) | |
if distance < min_distance and distance < 2.0: | |
min_distance = distance | |
best_match = track | |
if best_match: | |
best_match.update(step, pos + [feature]) | |
else: | |
new_track = TrackedObject(self.next_id) | |
new_track.update(step, pos + [feature]) | |
self.tracks.append(new_track) | |
self.next_id += 1 | |
self.tracks = [t for t in self.tracks if t.is_alive(step)] | |
return self._generate_prediction_grid() | |
def _generate_prediction_grid(self) -> np.ndarray: | |
grid = np.zeros((20, 20, 7), dtype=np.float32) | |
for track in self.tracks: | |
if track.is_alive(self.current_step): | |
current_pos = track.last_pos | |
future_pos_t1 = track.predict_position(T1_FUTURE_TIME) | |
future_pos_t2 = track.predict_position(T2_FUTURE_TIME) | |
for pos in [current_pos, future_pos_t1, future_pos_t2]: | |
grid_x = int((pos[0] / (MAX_DISTANCE * 2) + 0.5) * 20) | |
grid_y = int((pos[1] / (MAX_DISTANCE * 2) + 0.5) * 20) | |
if 0 <= grid_x < 20 and 0 <= grid_y < 20: | |
channel = 0 | |
grid[grid_y, grid_x, channel] = max(grid[grid_y, grid_x, channel], track.confidence) | |
return grid | |
# ================== Controller Classes ================== | |
class ControllerConfig: | |
def __init__(self): | |
self.turn_KP = 1.0 | |
self.turn_KI = 0.1 | |
self.turn_KD = 0.1 | |
self.turn_n = 20 | |
self.speed_KP = 0.5 | |
self.speed_KI = 0.05 | |
self.speed_KD = 0.1 | |
self.speed_n = 20 | |
self.max_speed = 6.0 | |
self.max_throttle = 0.75 | |
self.clip_delta = 0.25 | |
self.brake_speed = 0.4 | |
self.brake_ratio = 1.1 | |
class InterfuserController: | |
def __init__(self, config: ControllerConfig): | |
self.config = config | |
self.turn_controller = PIDController(config.turn_KP, config.turn_KI, config.turn_KD, config.turn_n) | |
self.speed_controller = PIDController(config.speed_KP, config.speed_KI, config.speed_KD, config.speed_n) | |
self.last_steer = 0.0 | |
self.last_throttle = 0.0 | |
self.target_speed = 3.0 | |
def run_step(self, current_speed: float, waypoints: np.ndarray, | |
junction: float, traffic_light_state: float, | |
stop_sign: float, meta_data: Dict) -> Tuple[float, float, bool, str]: | |
if isinstance(waypoints, torch.Tensor): | |
waypoints = waypoints.cpu().numpy() | |
if len(waypoints) > 1: | |
dx = waypoints[1][0] - waypoints[0][0] | |
dy = waypoints[1][1] - waypoints[0][1] | |
target_yaw = math.atan2(dy, dx) | |
steer = self.turn_controller.step(target_yaw) | |
else: | |
steer = 0.0 | |
steer = np.clip(steer, -1.0, 1.0) | |
target_speed = self.target_speed | |
if junction > 0.5: | |
target_speed *= 0.7 | |
if abs(steer) > 0.3: | |
target_speed *= 0.8 | |
speed_error = target_speed - current_speed | |
throttle = self.speed_controller.step(speed_error) | |
throttle = np.clip(throttle, 0.0, self.config.max_throttle) | |
brake = False | |
if traffic_light_state > 0.5 or stop_sign > 0.5 or current_speed > self.config.max_speed: | |
brake = True | |
throttle = 0.0 | |
self.last_steer = steer | |
self.last_throttle = throttle | |
metadata = f"Speed:{current_speed:.1f} Target:{target_speed:.1f} Junction:{junction:.2f}" | |
return steer, throttle, brake, metadata | |
# ================== Display Interface ================== | |
class DisplayInterface: | |
def __init__(self, width: int = 1200, height: int = 600): | |
self._width = width | |
self._height = height | |
self.camera_width = width // 2 | |
self.camera_height = height | |
self.map_width = width // 2 | |
self.map_height = height // 3 | |
def run_interface(self, data: Dict[str, Any]) -> np.ndarray: | |
dashboard = np.zeros((self._height, self._width, 3), dtype=np.uint8) | |
# Camera view | |
camera_view = data.get('camera_view') | |
if camera_view is not None: | |
camera_resized = cv2.resize(camera_view, (self.camera_width, self.camera_height)) | |
dashboard[:, :self.camera_width] = camera_resized | |
# Maps | |
map_start_x = self.camera_width | |
map_t0 = data.get('map_t0') | |
if map_t0 is not None: | |
map_resized = cv2.resize(map_t0, (self.map_width, self.map_height)) | |
dashboard[:self.map_height, map_start_x:] = map_resized | |
cv2.putText(dashboard, "Current (t=0)", (map_start_x + 10, 30), | |
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) | |
map_t1 = data.get('map_t1') | |
if map_t1 is not None: | |
map_resized = cv2.resize(map_t1, (self.map_width, self.map_height)) | |
y_start = self.map_height | |
dashboard[y_start:y_start + self.map_height, map_start_x:] = map_resized | |
cv2.putText(dashboard, f"Future (t={T1_FUTURE_TIME}s)", | |
(map_start_x + 10, y_start + 30), cv2.FONT_HERSHEY_SIMPLEX, | |
0.7, (255, 255, 255), 2) | |
map_t2 = data.get('map_t2') | |
if map_t2 is not None: | |
map_resized = cv2.resize(map_t2, (self.map_width, self.map_height)) | |
y_start = self.map_height * 2 | |
dashboard[y_start:, map_start_x:] = map_resized | |
cv2.putText(dashboard, f"Future (t={T2_FUTURE_TIME}s)", | |
(map_start_x + 10, y_start + 30), cv2.FONT_HERSHEY_SIMPLEX, | |
0.7, (255, 255, 255), 2) | |
# Text info | |
text_info = data.get('text_info', {}) | |
y_offset = 50 | |
for key, value in text_info.items(): | |
cv2.putText(dashboard, value, (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, | |
0.6, (0, 255, 0), 2) | |
y_offset += 30 | |
return dashboard | |