Spaces:
Sleeping
Sleeping
import os | |
import pytest | |
import torch | |
from mmcv import Config | |
from risk_biased.mpc_planner.planner import MPCPlanner, MPCPlannerParams | |
from risk_biased.predictors.biased_predictor import ( | |
LitTrajectoryPredictorParams, | |
LitTrajectoryPredictor, | |
) | |
from risk_biased.scene_dataset.loaders import SceneDataLoaders | |
from risk_biased.utils.cost import TTCCostParams | |
from risk_biased.utils.planner_utils import to_state | |
def params(): | |
torch.manual_seed(0) | |
working_dir = os.path.dirname(os.path.realpath(__file__)) | |
config_path = os.path.join( | |
working_dir, "..", "..", "..", "risk_biased", "config", "learning_config.py" | |
) | |
cfg = Config.fromfile(config_path) | |
cfg.num_control_samples = 10 | |
cfg.num_elite = 3 | |
cfg.iter_max = 3 | |
cfg.smoothing_factor = 0.2 | |
cfg.mean_warm_start = True | |
cfg.acceleration_std_x_m_s2 = 2.0 | |
cfg.acceleration_std_y_m_s2 = 0.0 | |
cfg.dt = 0.1 | |
cfg.num_steps = 3 | |
cfg.num_steps_future = 5 | |
cfg.tracking_cost_scale_longitudinal = 0.1 | |
cfg.tracking_cost_scale_lateral = 1.0 | |
cfg.tracking_cost_reduce = "mean" | |
cfg.cost_scale = 10 | |
cfg.cost_reduce = "mean" | |
cfg.distance_bandwidth = 2 | |
cfg.time_bandwidth = 0.5 | |
cfg.min_velocity_diff = 0.01 | |
cfg.risk_estimator = {"type": "cvar", "eps": 1e-3} | |
cfg.interaction_type = "" | |
cfg.mcg_dim_expansion = 2 | |
cfg.mcg_num_layers = 0 | |
cfg.num_attention_heads = 4 | |
cfg.num_blocks = 3 | |
cfg.sequence_encoder_type = "MLP" # one of "MLP", "LSTM", "maskedLSTM" | |
cfg.sequence_decoder_type = "MLP" # one of "MLP", "LSTM" | |
cfg.state_dim = 2 | |
cfg.dynamic_state_dim = 2 | |
cfg.map_state_dim = 2 | |
cfg.max_size_lane = 0 | |
cfg.latent_dim = 2 | |
cfg.hidden_dim = 64 | |
cfg.num_hidden_layers = 3 | |
cfg.risk_distribution = {"type": "log-uniform", "min": 0, "max": 1, "scale": 3} | |
cfg.kl_weight = 1.0 | |
cfg.kl_threshold = 0.1 | |
cfg.learning_rate = 1e-3 | |
cfg.n_mc_samples_risk = 2048 | |
cfg.n_mc_samples_biased = 128 | |
cfg.risk_weight = 1e3 | |
cfg.use_risk_constraint = True | |
cfg.risk_constraint_update_every_n_epoch = 20 | |
cfg.risk_constraint_weight_update_factor = 1.5 | |
cfg.risk_constraint_weight_maximum = 1e5 | |
cfg.condition_on_ego_future = True | |
cfg.is_mlp_residual = True | |
cfg.num_samples_min_fde = 6 | |
return cfg | |
class TestMPCPlanner: | |
def setup(self, params): | |
self.planner_params = MPCPlannerParams.from_config(params) | |
predictor_params = LitTrajectoryPredictorParams.from_config(params) | |
self.predictor = LitTrajectoryPredictor( | |
predictor_params, | |
TTCCostParams.from_config(params), | |
SceneDataLoaders.unnormalize_trajectory, | |
) | |
self.normalizer = SceneDataLoaders.normalize_trajectory | |
self.planner = MPCPlanner(self.planner_params, self.predictor, self.normalizer) | |
def test_reset(self): | |
self.planner.reset() | |
assert torch.allclose( | |
self.planner.solver.control_input_mean_init, | |
self.planner.control_input_mean_init, | |
) | |
assert torch.allclose( | |
self.planner.solver.control_input_std_init, | |
self.planner.control_input_std_init, | |
) | |
assert self.planner._ego_state_history == [] | |
assert self.planner._ego_state_target_trajectory == None | |
assert self.planner._ego_state_planned_trajectory == None | |
assert self.planner._ado_state_history == [] | |
assert self.planner._latest_ado_position_future_samples == None | |
def test_replan(self, params): | |
num_prediction_samples = 100 | |
num_agents = 1 | |
self.planner.reset() | |
current_ego_state = to_state(torch.Tensor([[1, 1, 0, 0]]), params.dt) | |
for step in range(params.num_steps + 1): | |
self.planner._update_ego_state_history(current_ego_state) | |
current_ado_state = to_state(torch.Tensor([[2.0, 0.0, 0, 0]]), params.dt) | |
for step in range(params.num_steps + 1): | |
self.planner._update_ado_state_history(current_ado_state) | |
target_velocity = torch.Tensor([3.0, 0.0]) | |
self.planner.replan( | |
current_ado_state, | |
current_ego_state, | |
target_velocity, | |
num_prediction_samples=num_prediction_samples, | |
) | |
assert self.planner._ego_state_planned_trajectory.shape == torch.Size( | |
[num_agents, params.num_steps_future] | |
) | |
next_ego_state = self.planner.get_planned_next_ego_state() | |
assert next_ego_state.shape == torch.Size([1]) | |
assert self.planner.fetch_latest_prediction().shape == torch.Size( | |
[num_prediction_samples, num_agents, params.num_steps_future] | |
) | |