jmercat's picture
Removed history to avoid any unverified information being released
5769ee4
import atexit
import copy
import os
from mmcv import Config
import numpy as np
import pytest
from pytorch_lightning import seed_everything
import torch
import shutil
from torch.utils.data import DataLoader
from risk_biased.scene_dataset.loaders import SceneDataLoaders
from risk_biased.scene_dataset.scene import SceneDataset, RandomSceneParams
from risk_biased.scene_dataset.scene import load_create_dataset
def clean_up_dataset_dir():
"""
This function is designed to delete the directories
that might have created even if the test fails early
by being called on exit.
"""
current_dir = os.path.dirname(os.path.realpath(__file__))
dataset_dir0 = os.path.join(current_dir, "scene_dataset_000")
if os.path.exists(dataset_dir0):
shutil.rmtree(dataset_dir0)
dataset_dir1 = os.path.join(current_dir, "scene_dataset_001")
if os.path.exists(dataset_dir1):
shutil.rmtree(dataset_dir1)
# atexit.register(clean_up_dataset_dir)
@pytest.fixture(scope="module")
def params():
seed_everything(0)
cfg = Config()
cfg.batch_size = 4
cfg.time_scene = 5.0
cfg.dt = 0.1
cfg.sample_times = [t * cfg.dt for t in range(0, int(cfg.time_scene / cfg.dt))]
cfg.ego_ref_speed = 14
cfg.ego_speed_init_low = 4.0
cfg.ego_speed_init_high = 16.0
cfg.ego_acceleration_mean_low = -1.5
cfg.ego_acceleration_mean_high = 1.5
cfg.ego_acceleration_std = 1.5
cfg.ego_length = 4
cfg.ego_width = 1.75
cfg.fast_speed = 2.0
cfg.slow_speed = 1.0
cfg.p_change_pace = 0.2
cfg.proportion_fast = 0.5
cfg.perception_noise_std = 0.03
cfg.state_dim = 2
cfg.num_steps = 3
cfg.num_steps_future = len(cfg.sample_times) - cfg.num_steps
cfg.file_name = "test_scene_data"
cfg.datasets_sizes = {"train": 100, "val": 10, "test": 30}
cfg.datasets = list(cfg.datasets_sizes.keys())
cfg.num_workers = 2
cfg.dataset_parameters = {
"dt": cfg.dt,
"time_scene": cfg.time_scene,
"sample_times": cfg.sample_times,
"ego_ref_speed": cfg.ego_ref_speed,
"ego_speed_init_low": cfg.ego_speed_init_low,
"ego_speed_init_high": cfg.ego_speed_init_high,
"ego_acceleration_mean_low": cfg.ego_acceleration_mean_low,
"ego_acceleration_mean_high": cfg.ego_acceleration_mean_high,
"ego_acceleration_std": cfg.ego_acceleration_std,
"fast_speed": cfg.fast_speed,
"slow_speed": cfg.slow_speed,
"p_change_pace": cfg.p_change_pace,
"proportion_fast": cfg.proportion_fast,
"file_name": cfg.file_name,
"datasets_sizes": cfg.datasets_sizes,
"state_dim": cfg.state_dim,
"num_steps": cfg.num_steps,
"num_steps_future": cfg.num_steps_future,
"perception_noise_std": cfg.perception_noise_std,
}
return cfg
@pytest.mark.parametrize(
"n_data, batch_size, sample_times, state_dim",
[(1024, 128, [0.0, 1.0, 2.0, 3.0, 4.0], 2)],
)
def test_load_data(params, n_data, batch_size, sample_times, state_dim):
params = copy.deepcopy(params)
params.batch_size = batch_size
params.sample_times = sample_times
scene_params = RandomSceneParams.from_config(params)
dataset_rand = SceneDataset(n_data, scene_params, pre_fetch=False)
data_loader_rand = DataLoader(
dataset_rand, batch_size, collate_fn=dataset_rand.collate_fn, shuffle=False
)
dataset_prefetch = SceneDataset(n_data, scene_params, pre_fetch=True)
data_loader_prefetch = DataLoader(
dataset_prefetch,
batch_size,
collate_fn=dataset_prefetch.collate_fn,
shuffle=False,
)
for i, (batch_rand, batch_prefetch) in enumerate(
zip(data_loader_rand, data_loader_prefetch)
):
if i == 0:
first_batch_prefetch = batch_prefetch
first_batch_rand = batch_rand
# Check the shape of the data is the expected one
assert (
batch_rand.shape
== batch_prefetch.shape
== (batch_size, 1, len(sample_times), state_dim)
)
# Shuffle false and pre-fetch should loop back to the same batch
assert torch.allclose(next(iter(data_loader_prefetch)), first_batch_prefetch)
# Shuffle false but producing random batches should not loop back to the same batch
assert not torch.allclose(next(iter(data_loader_rand)), first_batch_rand)
class TestDataset:
@pytest.fixture(autouse=True)
def setup(self, params):
clean_up_dataset_dir()
current_dir = os.path.dirname(os.path.realpath(__file__))
[data_train, data_val, data_test] = load_create_dataset(params, current_dir)
self.loaders = SceneDataLoaders(
params.state_dim,
params.num_steps,
params.num_steps_future,
params.batch_size,
data_train=data_train,
data_val=data_val,
data_test=data_test,
num_workers=params.num_workers,
)
assert os.path.exists(os.path.join(current_dir, "scene_dataset_000"))
assert not os.path.exists(os.path.join(current_dir, "scene_dataset_001"))
self.batch = torch.rand(
params.batch_size,
params.num_steps + params.num_steps_future,
params.state_dim,
)
self.normalized_batch, self.offset = self.loaders.normalize_trajectory(
self.batch
)
(
self.normalized_batch_past,
self.normalized_batch_future,
) = self.loaders.split_trajectory(self.normalized_batch)
# Setup is done but some cleanup must be defined
yield
# Remove data directory after use
dataset_dir = os.path.join(current_dir, "scene_dataset_000")
shutil.rmtree(dataset_dir)
def test_setup_datasets(self, params):
current_dir = os.path.dirname(os.path.realpath(__file__))
assert os.path.exists(os.path.join(current_dir, "scene_dataset_000"))
# Should only load from directory that was created, not create a new one
[train_set, val_set, test_set] = load_create_dataset(
params, base_dir=current_dir
)
assert not os.path.exists(os.path.join(current_dir, "scene_dataset_001"))
train_path = os.path.join(
current_dir, "scene_dataset_000", "scene_dataset_train.npy"
)
val_path = os.path.join(
current_dir, "scene_dataset_000", "scene_dataset_val.npy"
)
test_path = os.path.join(
current_dir, "scene_dataset_000", "scene_dataset_test.npy"
)
# make sure paths for datasets exist
assert os.path.exists(train_path)
assert os.path.exists(val_path)
assert os.path.exists(test_path)
# make sure datasets match the specifications made in config
assert np.load(train_path).shape == (
2,
params.datasets_sizes["train"],
1,
params.num_steps + params.num_steps_future,
params.state_dim,
)
assert np.load(val_path).shape == (
2,
params.datasets_sizes["val"],
1,
params.num_steps + params.num_steps_future,
params.state_dim,
)
assert np.load(test_path).shape == (
2,
params.datasets_sizes["test"],
1,
params.num_steps + params.num_steps_future,
params.state_dim,
)
total_steps = params.num_steps + params.num_steps_future
assert list(train_set.shape) == [
2,
params.datasets_sizes.train,
1,
total_steps,
2,
]
assert list(val_set.shape) == [2, params.datasets_sizes.val, 1, total_steps, 2]
assert list(test_set.shape) == [
2,
params.datasets_sizes.test,
1,
total_steps,
2,
]
def test_split_trajectory(self, params):
batch_history, batch_future = self.loaders.split_trajectory(self.batch)
# make sure split_trajectory splits batch into history and future
assert torch.all(torch.eq(batch_history, self.batch[:, : params.num_steps, :]))
assert torch.all(torch.eq(batch_future, self.batch[:, params.num_steps :, :]))
def test_normalize_trajectory(self, params):
batch_copied = self.batch.detach().clone()
# make sure batch remains the same
assert torch.all(torch.eq(batch_copied, self.batch))
# test normalization of whole batch
assert torch.allclose(
self.normalized_batch + self.offset.unsqueeze(1), self.batch
)
assert torch.allclose(
self.batch - self.offset.unsqueeze(1), self.normalized_batch
)
batch_past, batch_fut = self.loaders.split_trajectory(self.batch)
# test normalization of history
assert torch.allclose(
self.normalized_batch_past + self.offset.unsqueeze(1), batch_past
)
def test_unnormalize_trajectory(self, params):
batch_future_test = self.loaders.unnormalize_trajectory(
self.normalized_batch_future, self.offset
)
# test unnormalization
assert torch.allclose(
self.normalized_batch_future + self.offset.unsqueeze(1), batch_future_test
)