Spaces:
Running
Running
File size: 20,256 Bytes
5769ee4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 |
from dataclasses import dataclass
import os
from typing import Union, List, Optional
import warnings
import copy
from mmcv import Config
import numpy as np
import torch
from torch import Tensor
from torch.utils.data import Dataset
from risk_biased.scene_dataset.pedestrian import RandomPedestrians
from risk_biased.utils.torch_utils import torch_linspace
@dataclass
class RandomSceneParams:
"""Dataclass that defines all the listed parameters that are necessary for a RandomScene object
Args:
batch_size: number of scenes in the batch
time_scene: time length of the scene in seconds
sample_times: list of times to get the positions
ego_ref_speed: constant reference speed of the ego vehicle in meters/seconds
ego_speed_init_low: lowest initial speed of the ego vehicle in meters/seconds
ego_speed_init_high: higest initial speed of the ego vehicle in meters/seconds
ego_acceleration_mean_low: lowest mean acceleration of the ego vehicle in m/s^2
ego_acceleration_mean_high: highest mean acceleration of the ego vehicle in m/s^2
ego_acceleration_std: std for acceleration of the ego vehicle in m/s^2
ego_length: length of the ego vehicle in meters
ego_width: width of the ego vehicle in meters
dt: time step to use in the trajectory sequence
fast_speed: fast walking speed for the random pedestrian in meters/seconds
slow_speed: slow walking speed for the random pedestrian in meters/seconds
p_change_pace: probability that a slow (resp. fast) pedestrian walk at fast_speed (resp. slow_speed) at each time step
proportion_fast: proportion of the pedestrians that are mainly walking at fast_speed
perception_noise_std: standard deviation of the gaussian noise that is affecting the position observations
"""
batch_size: int
time_scene: float
sample_times: list
ego_ref_speed: float
ego_speed_init_low: float
ego_speed_init_high: float
ego_acceleration_mean_low: float
ego_acceleration_mean_high: float
ego_acceleration_std: float
ego_length: float
ego_width: float
dt: float
fast_speed: float
slow_speed: float
p_change_pace: float
proportion_fast: float
perception_noise_std: float
@staticmethod
def from_config(cfg: Config):
return RandomSceneParams(
batch_size=cfg.batch_size,
sample_times=cfg.sample_times,
time_scene=cfg.time_scene,
ego_ref_speed=cfg.ego_ref_speed,
ego_speed_init_low=cfg.ego_speed_init_low,
ego_speed_init_high=cfg.ego_speed_init_high,
ego_acceleration_mean_low=cfg.ego_acceleration_mean_low,
ego_acceleration_mean_high=cfg.ego_acceleration_mean_high,
ego_acceleration_std=cfg.ego_acceleration_std,
ego_length=cfg.ego_length,
ego_width=cfg.ego_width,
dt=cfg.dt,
fast_speed=cfg.fast_speed,
slow_speed=cfg.slow_speed,
p_change_pace=cfg.p_change_pace,
proportion_fast=cfg.proportion_fast,
perception_noise_std=cfg.perception_noise_std,
)
class RandomScene:
"""
Batched scenes with one vehicle at constant velocity and one random pedestrian. Utility functions to draw the scene and compute risk factors (time to collision etc...)
Args:
params: dataclass containing the necessary parameters
is_torch: set to True to produce Tensor batches and to False to produce numpy arrays
"""
def __init__(
self,
params: RandomSceneParams,
is_torch: bool = False,
) -> None:
self._is_torch = is_torch
self._batch_size = params.batch_size
self._fast_speed = params.fast_speed
self._slow_speed = params.slow_speed
self._p_change_pace = params.p_change_pace
self._proportion_fast = params.proportion_fast
self.dt = params.dt
self.sample_times = params.sample_times
self.ego_ref_speed = params.ego_ref_speed
self._ego_speed_init_low = params.ego_speed_init_low
self._ego_speed_init_high = params.ego_speed_init_high
self._ego_acceleration_mean_low = params.ego_acceleration_mean_low
self._ego_acceleration_mean_high = params.ego_acceleration_mean_high
self._ego_acceleration_std = params.ego_acceleration_std
self.perception_noise_std = params.perception_noise_std
self.road_length = (
params.ego_ref_speed + params.fast_speed
) * params.time_scene
self.time_scene = params.time_scene
self.lane_width = 3
self.sidewalks_width = 1.5
self.road_width = 2 * self.lane_width + 2 * self.sidewalks_width
self.bottom = -self.lane_width / 2 - self.sidewalks_width
self.top = 3 * self.lane_width / 2 + self.sidewalks_width
self.ego_width = 1.75
self.ego_length = 4
self.current_time = 0
if self._is_torch:
pedestrians_x = (
torch.rand(params.batch_size, 1)
* (self.road_length - self.ego_length / 2)
+ self.ego_length / 2
)
pedestrians_y = (
torch.rand(params.batch_size, 1) * (self.top - self.bottom)
+ self.bottom
)
self._pedestrians_positions = torch.stack(
(pedestrians_x, pedestrians_y), -1
)
else:
pedestrians_x = np.random.uniform(
low=self.ego_length / 2,
high=self.road_length,
size=(params.batch_size, 1),
)
pedestrians_y = np.random.uniform(
low=self.bottom, high=self.top, size=(params.batch_size, 1)
)
self._pedestrians_positions = np.stack((pedestrians_x, pedestrians_y), -1)
self.pedestrians = RandomPedestrians(
batch_size=self._batch_size,
dt=self.dt,
fast_speed=self._fast_speed,
slow_speed=self._slow_speed,
p_change_pace=self._p_change_pace,
proportion_fast=self._proportion_fast,
is_torch=self._is_torch,
)
self._set_pedestrians()
@property
def pedestrians_positions(self):
# relative_positions = self._pedestrians_positions/[[(self.road_length - self.ego_length / 2), (self.top - self.bottom)]] - [[self.ego_length / 2, self.bottom]]
return self._pedestrians_positions
def set_pedestrians_states(
self,
relative_pedestrians_positions: Union[torch.Tensor, np.ndarray],
pedestrians_angles: Optional[Union[torch.Tensor, np.ndarray]] = None,
):
"""Force pedestrian initial states
Args:
relative_pedestrians_positions: Relative positions in the scene as percentage distance from left to right and from bottom to top
pedestrians_angles: Pedestrian heading angles in radiants
"""
if self._is_torch:
assert isinstance(relative_pedestrians_positions, torch.Tensor)
else:
assert isinstance(relative_pedestrians_positions, np.ndarray)
self._batch_size = relative_pedestrians_positions.shape[0]
if (0 > relative_pedestrians_positions).any() or (
relative_pedestrians_positions > 1
).any():
warnings.warn(
"Some of the given pedestrian initial positions are outside of the road range"
)
center_y = (self.top - self.bottom) * relative_pedestrians_positions[
:, :, 1
] + self.bottom
center_x = (
self.road_length - self.ego_length / 2
) * relative_pedestrians_positions[:, :, 0] + self.ego_length / 2
if self._is_torch:
pedestrians_positions = torch.stack([center_x, center_y], -1)
else:
pedestrians_positions = np.stack([center_x, center_y], -1)
self.pedestrians = RandomPedestrians(
batch_size=self._batch_size,
dt=self.dt,
fast_speed=self._fast_speed,
slow_speed=self._slow_speed,
p_change_pace=self._p_change_pace,
proportion_fast=self._proportion_fast,
is_torch=self._is_torch,
)
self._pedestrians_positions = pedestrians_positions
if pedestrians_angles is not None:
self.pedestrians.angle = pedestrians_angles
self._set_pedestrians()
def _set_pedestrians(self):
self.pedestrians_trajectories = self.sample_pedestrians_trajectories(
self.sample_times
)
self.final_pedestrians_positions = self.pedestrians_trajectories[:, :, -1]
def get_ego_ref_trajectory(self, time_sequence: list):
"""
Returns only one ego reference trajectory and not a batch because it is always the same.
Args:
time_sequence: the time points at which to get the positions
"""
out = np.array([[[[t * self.ego_ref_speed, 0] for t in time_sequence]]])
if self._is_torch:
return torch.from_numpy(out.astype("float32"))
else:
return out
def get_pedestrians_velocities(self):
"""
Returns the batch of mean pedestrian velocities between their positions and their final positions.
"""
return (self.final_pedestrians_positions - self._pedestrians_positions)[
:, None
] / self.time_scene
def get_ego_ref_velocity(self):
"""
Returns the reference ego velocity.
"""
if self._is_torch:
return torch.from_numpy(
np.array([[[[self.ego_ref_speed, 0]]]], dtype="float32")
)
else:
return np.array([[[[self.ego_ref_speed, 0]]]])
def get_ego_ref_position(self):
"""
Returns the current reference ego position (at set time self.current_time)
"""
if self._is_torch:
return torch.from_numpy(
np.array(
[[[[self.ego_ref_speed * self.current_time, 0]]]], dtype="float32"
)
)
else:
return np.array([[[[self.ego_ref_speed * self.current_time, 0]]]])
def set_current_time(self, time: float):
"""
Set the current time of the scene.
Args:
time : The current time to set. It should be between 0 and 1
"""
assert 0 <= time <= self.time_scene
self.current_time = time
def sample_ego_velocities(self, time_sequence: list):
"""
Get ego velocity trajectories following the ego's acceleration distribution and the initial
velocity distribution.
Args:
time_sequence: a list of time points at which to sample the trajectory positions.
Returns:
batch of sequence of velocities of shape (batch_size, 1, len(time_sequence), 2)
"""
vel_traj = []
# uniform sampling of acceleration_mean between self._ego_acceleration_mean_low and
# self._ego_acceleration_mean_high
acceleration_mean = np.random.rand(self._batch_size, 2) * np.array(
[
self._ego_acceleration_mean_high - self._ego_acceleration_mean_low,
0.0,
]
) + np.array([self._ego_acceleration_mean_low, 0.0])
t_prev = 0
# uniform sampling of initial velocity between self._ego_speed_init_low and
# self._ego_speed_init_high
vel_prev = np.random.rand(self._batch_size, 2) * np.array(
[self._ego_speed_init_high - self._ego_speed_init_low, 0.0]
) + np.array([self._ego_speed_init_low, 0.0])
for t in time_sequence:
# integrate accelerations once to get velocities
acceleration = acceleration_mean + np.random.randn(
self._batch_size, 2
) * np.array([self._ego_acceleration_std, 0.0])
vel_prev = vel_prev + acceleration * (t - t_prev)
t_prev = t
vel_traj.append(vel_prev)
vel_traj = np.stack(vel_traj, 1)
if self._is_torch:
vel_traj = torch.from_numpy(vel_traj.astype("float32"))
return vel_traj[:, None]
def sample_ego_trajectories(self, time_sequence: list):
"""
Get ego trajectories following the ego's acceleration distribution and the initial velocity
distribution.
Args:
time_sequence: a list of time points at which to sample the trajectory positions.
Returns:
batch of sequence of positions of shape (batch_size, len(time_sequence), 2)
"""
vel_traj = self.sample_ego_velocities(time_sequence)
traj = []
t_prev = 0
pos_prev = np.array([[0, 0]], dtype="float32")
if self._is_torch:
pos_prev = torch.from_numpy(pos_prev)
for idx, t in enumerate(time_sequence):
# integrate velocities once to get positions
vel = vel_traj[:, :, idx, :]
pos_prev = pos_prev + vel * (t - t_prev)
t_prev = t
traj.append(pos_prev)
if self._is_torch:
return torch.stack(traj, -2)
else:
return np.stack(traj, -2)
def sample_pedestrians_trajectories(self, time_sequence: list):
"""
Produce pedestrian trajectories following the pedestrian behavior distribution
(it is resampled, the final position will not match self.final_pedestrians_positions)
Args:
time_sequence: a list of time points at which to sample the trajectory positions.
Returns:
batch of sequence of positions of shape (batch_size, len(time_sequence), 2)
"""
traj = []
t_prev = 0
pos_prev = self.pedestrians_positions
for t in time_sequence:
pos_prev = (
pos_prev
+ self.pedestrians.get_final_position(t - t_prev)
- self.pedestrians.position
)
t_prev = t
traj.append(pos_prev)
if self._is_torch:
traj = torch.stack(traj, 2)
return traj + torch.randn_like(traj) * self.perception_noise_std
else:
traj = np.stack(traj, 2)
return traj + np.random.randn(*traj.shape) * self.perception_noise_std
def get_pedestrians_trajectories(self):
"""
Returns the batch of pedestrian trajectories sampled every dt.
"""
return self.pedestrians_trajectories
def get_pedestrian_trajectory(self, ind: int, time_sequence: list = None):
"""
Returns one pedestrian trajectory of index ind sampled at times set in time_sequence.
Args:
ind: index of the pedestrian in the batch.
time_sequence: a list of time points at which to sample the trajectory positions.
Returns:
A pedestrian trajectory of shape (len(time_sequence), 2)
"""
len_traj = len(self.sample_times)
if self._is_torch:
ped_traj = torch_linspace(
self.pedestrians_positions[ind],
self.final_pedestrians_positions[ind],
len_traj,
)
else:
ped_traj = np.linspace(
self.pedestrians_positions[ind],
self.final_pedestrians_positions[ind],
len_traj,
)
if time_sequence is not None:
n_steps = [int(t / self.dt) for t in time_sequence]
else:
n_steps = range(int(self.time_scene / self.dt))
return ped_traj[n_steps]
class SceneDataset(Dataset):
"""
Dataset of scenes with one vehicle at constant velocity and one random pedestrian.
The scenes are randomly generated so the distribution can be sampled at each batch or pre-fetched.
Args:
len: int number of scenes per epoch
params: dataclass defining all the necessary parameters
pre_fetch: set to True to fetch the whole dataset at initialization
"""
def __init__(
self,
len: int,
params: RandomSceneParams,
pre_fetch: bool = True,
) -> None:
super().__init__()
self._pre_fetch = pre_fetch
self._len = len
self._sample_times = params.sample_times
self.params = copy.deepcopy(params)
params.batch_size = len
if self._pre_fetch:
self.scene_set = RandomScene(
params, is_torch=True
).sample_pedestrians_trajectories(self._sample_times)
def __len__(self) -> int:
return self._len
# This is a hack, get item only returns the index so that the collate_fn can handle making the batch without looping on RandomScene creation.
def __getitem__(self, index: int) -> Tensor:
return index
def collate_fn(self, index_list: list) -> Tensor:
if self._pre_fetch:
return self.scene_set[torch.from_numpy(np.array(index_list))]
else:
self.params.batch_size = len(index_list)
return RandomScene(
self.params,
is_torch=True,
).sample_pedestrians_trajectories(self._sample_times)
# Call this function to create a dataset as a .npy file that can be loaded as a numpy array with np.load(file_name.npy)
def save_dataset(file_path: str, size: int, config: Config):
"""
Save a dataset at file_path using the configuration.
Args:
file_path: Where to save the dataset
size: Number of samples to save
config: Configuration to use for the dataset generation
"""
dir_path = os.path.dirname(file_path)
config_path = os.path.join(dir_path, "config.py")
config = copy.deepcopy(config)
config.batch_size = size
params = RandomSceneParams.from_config(config)
scene = RandomScene(
params,
is_torch=False,
)
data_pedestrian = scene.sample_pedestrians_trajectories(config.sample_times)
data_ego = scene.sample_ego_trajectories(config.sample_times)
data = np.stack([data_pedestrian, data_ego], 0)
np.save(file_path, data)
# Cannot use config.dump here because it is buggy and does not work if config was not loaded from a file.
with open(config_path, "w", encoding="utf-8") as f:
f.write(config.pretty_text)
def load_create_dataset(
config: Config,
base_dir=None,
) -> List:
"""
Load the dataset described by its config if it exists or create one.
Args:
config: Configuration to use for the dataset
base_dir: Where to look for the dataset or to save it.
"""
if base_dir is None:
base_dir = os.path.join(
os.path.dirname(os.path.realpath(__file__)), "..", "..", "data"
)
found = False
dataset_out = []
i = 0
dir_path = os.path.join(base_dir, f"scene_dataset_{i:03d}")
while os.path.exists(dir_path):
config_path = os.path.join(dir_path, "config.py")
if os.path.exists(config_path):
config_check = Config.fromfile(config_path)
if config_check.dataset_parameters == config.dataset_parameters:
found = True
break
else:
warnings.warn(
f"Dataset directory {dir_path} exists but doesn't contain a config file. Cannot use it."
)
i += 1
dir_path = os.path.join(base_dir, f"scene_dataset_{i:03d}")
if not found:
print(f"Dataset not found, creating a new one.")
os.makedirs(dir_path)
for dataset in config.datasets:
dataset_name = f"scene_dataset_{dataset}.npy"
dataset_path = os.path.join(dir_path, dataset_name)
save_dataset(dataset_path, config.datasets_sizes[dataset], config)
if found:
print(f"Loading existing dataset at {dir_path}.")
for dataset in config.datasets:
dataset_path = os.path.join(dir_path, f"scene_dataset_{dataset}.npy")
dataset_out.append(torch.from_numpy(np.load(dataset_path).astype("float32")))
return dataset_out
|