Spaces:
Running
Running
from typing import Tuple | |
import numpy as np | |
import torch | |
from torch.utils.data import DataLoader, TensorDataset | |
class SceneDataLoaders: | |
""" | |
This class loads a scene dataset and pre-process it (normalization, unnormalization) | |
Args: | |
state_dim : dimension of the observed state (2 for x,y position observation) | |
num_steps : number of observed steps | |
num_steps_future : number of steps in the future | |
batch_size: set data loader with this batch size | |
data_train: training dataset | |
data_val: validation dataset | |
data_test: test dataset | |
num_workers: number of workers to use for data loading | |
""" | |
def __init__( | |
self, | |
state_dim: int, | |
num_steps: int, | |
num_steps_future: int, | |
batch_size: int, | |
data_train: torch.Tensor, | |
data_val: torch.Tensor, | |
data_test: torch.Tensor, | |
num_workers: int = 0, | |
): | |
self._batch_size = batch_size | |
self._num_workers = num_workers | |
self._state_dim = state_dim | |
self._num_steps = num_steps | |
self._num_steps_future = num_steps_future | |
self._setup_datasets(data_train, data_val, data_test) | |
def train_dataloader(self, shuffle=True, drop_last=True) -> DataLoader: | |
"""Setup and return training DataLoader | |
Returns: | |
DataLoader: training DataLoader | |
""" | |
data_size = self._data_train_past.shape[0] | |
# This is a didactic data loader that only defines minimalistic inputs. | |
# This dataloader adds some empty tensors and ones to match the expected format with masks and map information. | |
train_loader = DataLoader( | |
dataset=TensorDataset( | |
self._data_train_past, | |
torch.ones_like(self._data_train_past[..., 0]), # Mask past | |
self._data_train_fut, | |
torch.ones_like(self._data_train_fut[..., 0]), # Mask fut | |
torch.ones_like(self._data_train_fut[..., 0]), # Mask loss | |
torch.empty( | |
data_size, 1, 0, 0, device=self._data_train_past.device | |
), # Map | |
torch.empty( | |
data_size, 1, 0, device=self._data_train_past.device | |
), # Mask map | |
self._offset_train, | |
self._data_train_ego_past, | |
self._data_train_ego_fut, | |
), | |
batch_size=self._batch_size, | |
shuffle=shuffle, | |
drop_last=drop_last, | |
num_workers=self._num_workers, | |
) | |
return train_loader | |
def val_dataloader(self, shuffle=False, drop_last=False) -> DataLoader: | |
"""Setup and return validation DataLoader | |
Returns: | |
DataLoader: validation DataLoader | |
""" | |
data_size = self._data_val_past.shape[0] | |
# This is a didactic data loader that only defines minimalistic inputs. | |
# This dataloader adds some empty tensors and ones to match the expected format with masks and map information. | |
val_loader = DataLoader( | |
dataset=TensorDataset( | |
self._data_val_past, | |
torch.ones_like(self._data_val_past[..., 0]), # Mask past | |
self._data_val_fut, | |
torch.ones_like(self._data_val_fut[..., 0]), # Mask fut | |
torch.ones_like(self._data_val_fut[..., 0]), # Mask loss | |
torch.zeros( | |
data_size, 1, 0, 0, device=self._data_val_past.device | |
), # Map | |
torch.ones( | |
data_size, 1, 0, device=self._data_val_past.device | |
), # Mask map | |
self._offset_val, | |
self._data_val_ego_past, | |
self._data_val_ego_fut, | |
), | |
batch_size=self._batch_size, | |
shuffle=shuffle, | |
drop_last=drop_last, | |
num_workers=self._num_workers, | |
) | |
return val_loader | |
def test_dataloader(self) -> DataLoader: | |
"""Setup and return test DataLoader | |
Returns: | |
DataLoader: test DataLoader | |
""" | |
data_size = self._data_test_past.shape[0] | |
# This is a didactic data loader that only defines minimalistic inputs. | |
# This dataloader adds some empty tensors and ones to match the expected format with masks and map information. | |
test_loader = DataLoader( | |
dataset=TensorDataset( | |
self._data_test_past, | |
torch.ones_like(self._data_test_past[..., 0]), # Mask | |
torch.zeros( | |
data_size, 0, 1, 0, device=self._data_test_past.device | |
), # Map | |
torch.ones( | |
data_size, 0, 1, device=self._data_test_past.device | |
), # Mask map | |
self._offset_test, | |
self._data_test_ego_past, | |
self._data_test_ego_fut, | |
), | |
batch_size=self._batch_size, | |
shuffle=False, | |
num_workers=self._num_workers, | |
) | |
return test_loader | |
def _setup_datasets( | |
self, data_train: torch.Tensor, data_val: torch.Tensor, data_test: torch.Tensor | |
): | |
"""Setup datasets: normalize and split into past future | |
Args: | |
data_train: training dataset | |
data_val: validation dataset | |
data_test: test dataset | |
""" | |
data_train, data_train_ego = data_train[0], data_train[1] | |
data_val, data_val_ego = data_val[0], data_val[1] | |
data_test, data_test_ego = data_test[0], data_test[1] | |
data_train, self._offset_train = self.normalize_trajectory(data_train) | |
data_val, self._offset_val = self.normalize_trajectory(data_val) | |
data_test, self._offset_test = self.normalize_trajectory(data_test) | |
# This is a didactic data loader that only defines minimalistic inputs. | |
# An extra dimension is added to account for the number of agents in the scene. | |
# In this minimal input there is only one but the model using the data expects any number of agents. | |
self._data_train_past, self._data_train_fut = self.split_trajectory(data_train) | |
self._data_val_past, self._data_val_fut = self.split_trajectory(data_val) | |
self._data_test_past, self._data_test_fut = self.split_trajectory(data_test) | |
self._data_train_ego_past, self._data_train_ego_fut = self.split_trajectory( | |
data_train_ego | |
) | |
self._data_val_ego_past, self._data_val_ego_fut = self.split_trajectory( | |
data_val_ego | |
) | |
self._data_test_ego_past, self._data_test_ego_fut = self.split_trajectory( | |
data_test_ego | |
) | |
def split_trajectory( | |
self, input: torch.Tensor | |
) -> Tuple[torch.Tensor, torch.Tensor]: | |
"""Split input trajectory into history and future | |
Args: | |
input : (batch_size, (n_agents), num_steps + num_steps_future, state_dim) tensor of | |
entire trajectory [x, y] | |
Returns: | |
Tuple of history and future trajectories | |
""" | |
assert ( | |
input.shape[-2] == self._num_steps + self._num_steps_future | |
), "trajectory length ({}) does not match the expected length".format( | |
input.shape[-2] | |
) | |
assert ( | |
input.shape[-1] == self._state_dim | |
), "state dimension ({}) does no match the expected dimension".format( | |
input.shape[-1] | |
) | |
input_history, input_future = torch.split( | |
input, [self._num_steps, self._num_steps_future], dim=-2 | |
) | |
return input_history, input_future | |
def normalize_trajectory(input: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: | |
"""Normalize input trajectory by subtracting initial state | |
Args: | |
input : (some_shape, n_agents, num_steps + num_steps_future, state_dim) tensor of | |
entire trajectory [x, y], or (some_shape, num_steps, state_dim) tensor of history x | |
Returns: | |
Tuple of (normalized_trajectory, offset), where | |
normalized_trajectory has the same dimension as the input and offset is a | |
(some_shape, state_dim) tensor corresponding to the initial state | |
""" | |
offset = input[..., 0, :].clone() | |
return input - offset.unsqueeze(-2), offset | |
def unnormalize_trajectory( | |
input: torch.Tensor, offset: torch.Tensor | |
) -> torch.Tensor: | |
"""Unnormalize trajectory by adding offset to input | |
Args: | |
input : (some_shape, (n_sample), num_steps_future, state_dim) tensor of future | |
trajectory y | |
offset : (some_shape, 2 or 4 or 5) tensor of offset to add to y | |
Returns: | |
Unnormalized trajectory that has the same size as input | |
""" | |
offset_dim = offset.shape[-1] | |
assert input.shape[-1] >= offset_dim | |
input_clone = input.clone() | |
if offset.ndim == 2: | |
batch_size, _ = offset.shape | |
assert input_clone.shape[0] == batch_size | |
input_clone[..., :offset_dim] = input_clone[ | |
..., :offset_dim | |
] + offset.reshape( | |
[batch_size, *([1] * (input_clone.ndim - 2)), offset_dim] | |
) | |
elif offset.ndim == 3: | |
batch_size, num_agents, _ = offset.shape | |
assert input_clone.shape[0] == batch_size | |
assert input_clone.shape[1] == num_agents | |
input_clone[..., :offset_dim] = input_clone[ | |
..., :offset_dim | |
] + offset.reshape( | |
[batch_size, num_agents, *([1] * (input_clone.ndim - 3)), offset_dim] | |
) | |
return input_clone | |