Baseer_Server / simulation_modules.py
Adam
Deploy Baseer Self-Driving API v1.0
7b0dd2f
raw
history blame
12.9 kB
# simulation_modules.py
import torch
import numpy as np
import cv2
import math
from collections import deque
from typing import List, Tuple, Dict, Any, Optional
# ================== Constants ==================
WAYPOINT_SCALE_FACTOR = 5.0
T1_FUTURE_TIME = 1.0
T2_FUTURE_TIME = 2.0
PIXELS_PER_METER = 8
MAX_DISTANCE = 32
IMG_SIZE = MAX_DISTANCE * PIXELS_PER_METER * 2
EGO_CAR_X = IMG_SIZE // 2
EGO_CAR_Y = IMG_SIZE - (4.0 * PIXELS_PER_METER)
COLORS = {
'vehicle': [255, 0, 0],
'pedestrian': [0, 255, 0],
'cyclist': [0, 0, 255],
'waypoint': [255, 255, 0],
'ego_car': [255, 255, 255]
}
# ================== PID Controller ==================
class PIDController:
def __init__(self, K_P=1.0, K_I=0.0, K_D=0.0, n=20):
self._K_P = K_P
self._K_I = K_I
self._K_D = K_D
self._window = deque([0 for _ in range(n)], maxlen=n)
def step(self, error):
self._window.append(error)
if len(self._window) >= 2:
integral = np.mean(self._window)
derivative = self._window[-1] - self._window[-2]
else:
integral = derivative = 0.0
return self._K_P * error + self._K_I * integral + self._K_D * derivative
# ================== Helper Functions ==================
def ensure_rgb(image):
if len(image.shape) == 2:
return cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
elif image.shape[2] == 1:
return cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
return image
def add_rect(img, loc, ori, box, value, color):
center_x = int(loc[0] * PIXELS_PER_METER + MAX_DISTANCE * PIXELS_PER_METER)
center_y = int(loc[1] * PIXELS_PER_METER + MAX_DISTANCE * PIXELS_PER_METER)
size_px = (int(box[0] * PIXELS_PER_METER), int(box[1] * PIXELS_PER_METER))
angle_deg = -np.degrees(math.atan2(ori[1], ori[0]))
box_points = cv2.boxPoints(((center_x, center_y), size_px, angle_deg))
box_points = np.int32(box_points)
adjusted_color = [int(c * value) for c in color]
cv2.fillConvexPoly(img, box_points, adjusted_color)
return img
def render(traffic_grid, t=0):
img = np.zeros((IMG_SIZE, IMG_SIZE, 3), dtype=np.uint8)
counts = {'vehicles': 0, 'pedestrians': 0, 'cyclists': 0}
if isinstance(traffic_grid, torch.Tensor):
traffic_grid = traffic_grid.cpu().numpy()
h, w, c = traffic_grid.shape
for y in range(h):
for x in range(w):
for ch in range(c):
if traffic_grid[y, x, ch] > 0.1:
world_x = (x / w - 0.5) * MAX_DISTANCE * 2
world_y = (y / h - 0.5) * MAX_DISTANCE * 2
if ch < 3:
color = COLORS['vehicle']
counts['vehicles'] += 1
box_size = [2.0, 4.0]
elif ch < 5:
color = COLORS['pedestrian']
counts['pedestrians'] += 1
box_size = [0.8, 0.8]
else:
color = COLORS['cyclist']
counts['cyclists'] += 1
box_size = [1.2, 2.0]
img = add_rect(img, [world_x, world_y], [1.0, 0.0],
box_size, traffic_grid[y, x, ch], color)
return img, counts
def render_waypoints(waypoints, scale_factor=WAYPOINT_SCALE_FACTOR):
img = np.zeros((IMG_SIZE, IMG_SIZE, 3), dtype=np.uint8)
if isinstance(waypoints, torch.Tensor):
waypoints = waypoints.cpu().numpy()
scaled_waypoints = waypoints * scale_factor
for i, wp in enumerate(scaled_waypoints):
px = int(wp[0] * PIXELS_PER_METER + IMG_SIZE // 2)
py = int(wp[1] * PIXELS_PER_METER + IMG_SIZE // 2)
if 0 <= px < IMG_SIZE and 0 <= py < IMG_SIZE:
radius = max(3, 8 - i)
cv2.circle(img, (px, py), radius, COLORS['waypoint'], -1)
if i > 0:
prev_px = int(scaled_waypoints[i-1][0] * PIXELS_PER_METER + IMG_SIZE // 2)
prev_py = int(scaled_waypoints[i-1][1] * PIXELS_PER_METER + IMG_SIZE // 2)
if 0 <= prev_px < IMG_SIZE and 0 <= prev_py < IMG_SIZE:
cv2.line(img, (prev_px, prev_py), (px, py), COLORS['waypoint'], 2)
return img
def render_self_car(img):
car_pos = [0, -4.0]
car_ori = [1.0, 0.0]
car_size = [2.0, 4.5]
return add_rect(img, car_pos, car_ori, car_size, 1.0, COLORS['ego_car'])
# ================== Tracker Classes ==================
class TrackedObject:
def __init__(self, obj_id: int):
self.id = obj_id
self.last_step = 0
self.last_pos = [0.0, 0.0]
self.historical_pos = []
self.historical_steps = []
self.velocity = [0.0, 0.0]
self.confidence = 1.0
def update(self, step: int, obj_info: List[float]):
self.last_step = step
self.last_pos = obj_info[:2]
self.historical_pos.append(obj_info[:2])
self.historical_steps.append(step)
if len(self.historical_pos) >= 2:
dt = self.historical_steps[-1] - self.historical_steps[-2]
if dt > 0:
dx = self.historical_pos[-1][0] - self.historical_pos[-2][0]
dy = self.historical_pos[-1][1] - self.historical_pos[-2][1]
self.velocity = [dx/dt, dy/dt]
def predict_position(self, future_time: float) -> List[float]:
predicted_x = self.last_pos[0] + self.velocity[0] * future_time
predicted_y = self.last_pos[1] + self.velocity[1] * future_time
return [predicted_x, predicted_y]
def is_alive(self, current_step: int, max_age: int = 5) -> bool:
return (current_step - self.last_step) <= max_age
class Tracker:
def __init__(self, frequency: int = 10):
self.tracks: List[TrackedObject] = []
self.frequency = frequency
self.next_id = 0
self.current_step = 0
def update_and_predict(self, detections: List[Dict], step: int) -> np.ndarray:
self.current_step = step
for detection in detections:
pos = detection.get('position', [0, 0])
feature = detection.get('feature', 0.5)
best_match = None
min_distance = float('inf')
for track in self.tracks:
if track.is_alive(step):
distance = np.linalg.norm(np.array(pos) - np.array(track.last_pos))
if distance < min_distance and distance < 2.0:
min_distance = distance
best_match = track
if best_match:
best_match.update(step, pos + [feature])
else:
new_track = TrackedObject(self.next_id)
new_track.update(step, pos + [feature])
self.tracks.append(new_track)
self.next_id += 1
self.tracks = [t for t in self.tracks if t.is_alive(step)]
return self._generate_prediction_grid()
def _generate_prediction_grid(self) -> np.ndarray:
grid = np.zeros((20, 20, 7), dtype=np.float32)
for track in self.tracks:
if track.is_alive(self.current_step):
current_pos = track.last_pos
future_pos_t1 = track.predict_position(T1_FUTURE_TIME)
future_pos_t2 = track.predict_position(T2_FUTURE_TIME)
for pos in [current_pos, future_pos_t1, future_pos_t2]:
grid_x = int((pos[0] / (MAX_DISTANCE * 2) + 0.5) * 20)
grid_y = int((pos[1] / (MAX_DISTANCE * 2) + 0.5) * 20)
if 0 <= grid_x < 20 and 0 <= grid_y < 20:
channel = 0
grid[grid_y, grid_x, channel] = max(grid[grid_y, grid_x, channel], track.confidence)
return grid
# ================== Controller Classes ==================
class ControllerConfig:
def __init__(self):
self.turn_KP = 1.0
self.turn_KI = 0.1
self.turn_KD = 0.1
self.turn_n = 20
self.speed_KP = 0.5
self.speed_KI = 0.05
self.speed_KD = 0.1
self.speed_n = 20
self.max_speed = 6.0
self.max_throttle = 0.75
self.clip_delta = 0.25
self.brake_speed = 0.4
self.brake_ratio = 1.1
class InterfuserController:
def __init__(self, config: ControllerConfig):
self.config = config
self.turn_controller = PIDController(config.turn_KP, config.turn_KI, config.turn_KD, config.turn_n)
self.speed_controller = PIDController(config.speed_KP, config.speed_KI, config.speed_KD, config.speed_n)
self.last_steer = 0.0
self.last_throttle = 0.0
self.target_speed = 3.0
def run_step(self, current_speed: float, waypoints: np.ndarray,
junction: float, traffic_light_state: float,
stop_sign: float, meta_data: Dict) -> Tuple[float, float, bool, str]:
if isinstance(waypoints, torch.Tensor):
waypoints = waypoints.cpu().numpy()
if len(waypoints) > 1:
dx = waypoints[1][0] - waypoints[0][0]
dy = waypoints[1][1] - waypoints[0][1]
target_yaw = math.atan2(dy, dx)
steer = self.turn_controller.step(target_yaw)
else:
steer = 0.0
steer = np.clip(steer, -1.0, 1.0)
target_speed = self.target_speed
if junction > 0.5:
target_speed *= 0.7
if abs(steer) > 0.3:
target_speed *= 0.8
speed_error = target_speed - current_speed
throttle = self.speed_controller.step(speed_error)
throttle = np.clip(throttle, 0.0, self.config.max_throttle)
brake = False
if traffic_light_state > 0.5 or stop_sign > 0.5 or current_speed > self.config.max_speed:
brake = True
throttle = 0.0
self.last_steer = steer
self.last_throttle = throttle
metadata = f"Speed:{current_speed:.1f} Target:{target_speed:.1f} Junction:{junction:.2f}"
return steer, throttle, brake, metadata
# ================== Display Interface ==================
class DisplayInterface:
def __init__(self, width: int = 1200, height: int = 600):
self._width = width
self._height = height
self.camera_width = width // 2
self.camera_height = height
self.map_width = width // 2
self.map_height = height // 3
def run_interface(self, data: Dict[str, Any]) -> np.ndarray:
dashboard = np.zeros((self._height, self._width, 3), dtype=np.uint8)
# Camera view
camera_view = data.get('camera_view')
if camera_view is not None:
camera_resized = cv2.resize(camera_view, (self.camera_width, self.camera_height))
dashboard[:, :self.camera_width] = camera_resized
# Maps
map_start_x = self.camera_width
map_t0 = data.get('map_t0')
if map_t0 is not None:
map_resized = cv2.resize(map_t0, (self.map_width, self.map_height))
dashboard[:self.map_height, map_start_x:] = map_resized
cv2.putText(dashboard, "Current (t=0)", (map_start_x + 10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
map_t1 = data.get('map_t1')
if map_t1 is not None:
map_resized = cv2.resize(map_t1, (self.map_width, self.map_height))
y_start = self.map_height
dashboard[y_start:y_start + self.map_height, map_start_x:] = map_resized
cv2.putText(dashboard, f"Future (t={T1_FUTURE_TIME}s)",
(map_start_x + 10, y_start + 30), cv2.FONT_HERSHEY_SIMPLEX,
0.7, (255, 255, 255), 2)
map_t2 = data.get('map_t2')
if map_t2 is not None:
map_resized = cv2.resize(map_t2, (self.map_width, self.map_height))
y_start = self.map_height * 2
dashboard[y_start:, map_start_x:] = map_resized
cv2.putText(dashboard, f"Future (t={T2_FUTURE_TIME}s)",
(map_start_x + 10, y_start + 30), cv2.FONT_HERSHEY_SIMPLEX,
0.7, (255, 255, 255), 2)
# Text info
text_info = data.get('text_info', {})
y_offset = 50
for key, value in text_info.items():
cv2.putText(dashboard, value, (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX,
0.6, (0, 255, 0), 2)
y_offset += 30
return dashboard