|
|
|
r"""Adapted from `Waymo to KITTI converter |
|
<https://github.com/caizhongang/waymo_kitti_converter>`_. |
|
""" |
|
|
|
try: |
|
from waymo_open_dataset import dataset_pb2 |
|
except ImportError: |
|
raise ImportError('Please run "pip install waymo-open-dataset-tf-2-6-0" ' |
|
'>1.4.5 to install the official devkit first.') |
|
|
|
import copy |
|
import os |
|
import os.path as osp |
|
from glob import glob |
|
from io import BytesIO |
|
from os.path import exists, join |
|
|
|
import mmengine |
|
import numpy as np |
|
import tensorflow as tf |
|
from mmengine import print_log |
|
from nuscenes.utils.geometry_utils import view_points |
|
from PIL import Image |
|
from waymo_open_dataset.utils import range_image_utils, transform_utils |
|
from waymo_open_dataset.utils.frame_utils import \ |
|
parse_range_image_and_camera_projection |
|
|
|
from mmdet3d.datasets.convert_utils import post_process_coords |
|
from mmdet3d.structures import Box3DMode, LiDARInstance3DBoxes, points_cam2img |
|
|
|
|
|
class Waymo2KITTI(object): |
|
"""Waymo to KITTI converter. There are 2 steps as follows: |
|
|
|
Step 1. Extract camera images and lidar point clouds from waymo raw data in |
|
'*.tfreord' and save as kitti format. |
|
Step 2. Generate waymo train/val/test infos and save as pickle file. |
|
|
|
Args: |
|
load_dir (str): Directory to load waymo raw data. |
|
save_dir (str): Directory to save data in KITTI format. |
|
prefix (str): Prefix of filename. In general, 0 for training, 1 for |
|
validation and 2 for testing. |
|
workers (int, optional): Number of workers for the parallel process. |
|
Defaults to 64. |
|
test_mode (bool, optional): Whether in the test_mode. |
|
Defaults to False. |
|
save_senor_data (bool, optional): Whether to save image and lidar |
|
data. Defaults to True. |
|
save_cam_sync_instances (bool, optional): Whether to save cam sync |
|
instances. Defaults to True. |
|
save_cam_instances (bool, optional): Whether to save cam instances. |
|
Defaults to False. |
|
info_prefix (str, optional): Prefix of info filename. |
|
Defaults to 'waymo'. |
|
max_sweeps (int, optional): Max length of sweeps. Defaults to 10. |
|
split (str, optional): Split of the data. Defaults to 'training'. |
|
""" |
|
|
|
def __init__(self, |
|
load_dir, |
|
save_dir, |
|
prefix, |
|
workers=64, |
|
test_mode=False, |
|
save_senor_data=True, |
|
save_cam_sync_instances=True, |
|
save_cam_instances=True, |
|
info_prefix='waymo', |
|
max_sweeps=10, |
|
split='training'): |
|
|
|
if int(tf.__version__.split('.')[0]) < 2: |
|
tf.enable_eager_execution() |
|
|
|
|
|
self.cam_list = [ |
|
'_FRONT', |
|
'_FRONT_LEFT', |
|
'_FRONT_RIGHT', |
|
'_SIDE_LEFT', |
|
'_SIDE_RIGHT', |
|
] |
|
self.lidar_list = ['TOP', 'FRONT', 'SIDE_LEFT', 'SIDE_RIGHT', 'REAR'] |
|
self.type_list = [ |
|
'UNKNOWN', 'VEHICLE', 'PEDESTRIAN', 'SIGN', 'CYCLIST' |
|
] |
|
|
|
|
|
self.camera_types = [ |
|
'CAM_FRONT', |
|
'CAM_FRONT_LEFT', |
|
'CAM_FRONT_RIGHT', |
|
'CAM_SIDE_LEFT', |
|
'CAM_SIDE_RIGHT', |
|
] |
|
self.selected_waymo_classes = ['VEHICLE', 'PEDESTRIAN', 'CYCLIST'] |
|
self.info_map = { |
|
'training': '_infos_train.pkl', |
|
'validation': '_infos_val.pkl', |
|
'testing': '_infos_test.pkl', |
|
'testing_3d_camera_only_detection': '_infos_test_cam_only.pkl' |
|
} |
|
|
|
self.load_dir = load_dir |
|
self.save_dir = save_dir |
|
self.prefix = prefix |
|
self.workers = int(workers) |
|
self.test_mode = test_mode |
|
self.save_senor_data = save_senor_data |
|
self.save_cam_sync_instances = save_cam_sync_instances |
|
self.save_cam_instances = save_cam_instances |
|
self.info_prefix = info_prefix |
|
self.max_sweeps = max_sweeps |
|
self.split = split |
|
|
|
|
|
self.filter_empty_3dboxes = True |
|
self.filter_no_label_zone_points = True |
|
self.save_track_id = False |
|
|
|
self.tfrecord_pathnames = sorted( |
|
glob(join(self.load_dir, '*.tfrecord'))) |
|
|
|
self.image_save_dir = f'{self.save_dir}/image_' |
|
self.point_cloud_save_dir = f'{self.save_dir}/velodyne' |
|
|
|
|
|
|
|
if 'testing_3d_camera_only_detection' not in self.load_dir: |
|
mmengine.mkdir_or_exist(self.point_cloud_save_dir) |
|
for i in range(5): |
|
mmengine.mkdir_or_exist(f'{self.image_save_dir}{str(i)}') |
|
|
|
def convert(self): |
|
"""Convert action.""" |
|
print_log(f'Start converting {self.split} dataset', logger='current') |
|
if self.workers == 0: |
|
data_infos = mmengine.track_progress(self.convert_one, |
|
range(len(self))) |
|
else: |
|
data_infos = mmengine.track_parallel_progress( |
|
self.convert_one, range(len(self)), self.workers) |
|
data_list = [] |
|
for data_info in data_infos: |
|
data_list.extend(data_info) |
|
metainfo = dict() |
|
metainfo['dataset'] = 'waymo' |
|
metainfo['version'] = 'waymo_v1.4' |
|
metainfo['info_version'] = 'mmdet3d_v1.4' |
|
waymo_infos = dict(data_list=data_list, metainfo=metainfo) |
|
filenames = osp.join( |
|
osp.dirname(self.save_dir), |
|
f'{self.info_prefix + self.info_map[self.split]}') |
|
print_log(f'Saving {self.split} dataset infos into {filenames}') |
|
mmengine.dump(waymo_infos, filenames) |
|
|
|
def convert_one(self, file_idx): |
|
"""Convert one '*.tfrecord' file to kitti format. Each file stores all |
|
the frames (about 200 frames) in current scene. We treat each frame as |
|
a sample, save their images and point clouds in kitti format, and then |
|
create info for all frames. |
|
|
|
Args: |
|
file_idx (int): Index of the file to be converted. |
|
|
|
Returns: |
|
List[dict]: Waymo infos for all frames in current file. |
|
""" |
|
pathname = self.tfrecord_pathnames[file_idx] |
|
dataset = tf.data.TFRecordDataset(pathname, compression_type='') |
|
|
|
|
|
|
|
file_infos = [] |
|
for frame_idx, data in enumerate(dataset): |
|
|
|
frame = dataset_pb2.Frame() |
|
frame.ParseFromString(bytearray(data.numpy())) |
|
|
|
|
|
|
|
if self.save_senor_data: |
|
self.save_image(frame, file_idx, frame_idx) |
|
self.save_lidar(frame, file_idx, frame_idx) |
|
|
|
|
|
|
|
self.create_waymo_info_file(frame, file_idx, frame_idx, file_infos) |
|
return file_infos |
|
|
|
def __len__(self): |
|
"""Length of the filename list.""" |
|
return len(self.tfrecord_pathnames) |
|
|
|
def save_image(self, frame, file_idx, frame_idx): |
|
"""Parse and save the images in jpg format. |
|
|
|
Args: |
|
frame (:obj:`Frame`): Open dataset frame proto. |
|
file_idx (int): Current file index. |
|
frame_idx (int): Current frame index. |
|
""" |
|
for img in frame.images: |
|
img_path = f'{self.image_save_dir}{str(img.name - 1)}/' + \ |
|
f'{self.prefix}{str(file_idx).zfill(3)}' + \ |
|
f'{str(frame_idx).zfill(3)}.jpg' |
|
with open(img_path, 'wb') as fp: |
|
fp.write(img.image) |
|
|
|
def save_lidar(self, frame, file_idx, frame_idx): |
|
"""Parse and save the lidar data in psd format. |
|
|
|
Args: |
|
frame (:obj:`Frame`): Open dataset frame proto. |
|
file_idx (int): Current file index. |
|
frame_idx (int): Current frame index. |
|
""" |
|
range_images, camera_projections, seg_labels, range_image_top_pose = \ |
|
parse_range_image_and_camera_projection(frame) |
|
|
|
if range_image_top_pose is None: |
|
|
|
return |
|
|
|
points_0, cp_points_0, intensity_0, elongation_0, mask_indices_0 = \ |
|
self.convert_range_image_to_point_cloud( |
|
frame, |
|
range_images, |
|
camera_projections, |
|
range_image_top_pose, |
|
ri_index=0 |
|
) |
|
points_0 = np.concatenate(points_0, axis=0) |
|
intensity_0 = np.concatenate(intensity_0, axis=0) |
|
elongation_0 = np.concatenate(elongation_0, axis=0) |
|
mask_indices_0 = np.concatenate(mask_indices_0, axis=0) |
|
|
|
|
|
points_1, cp_points_1, intensity_1, elongation_1, mask_indices_1 = \ |
|
self.convert_range_image_to_point_cloud( |
|
frame, |
|
range_images, |
|
camera_projections, |
|
range_image_top_pose, |
|
ri_index=1 |
|
) |
|
points_1 = np.concatenate(points_1, axis=0) |
|
intensity_1 = np.concatenate(intensity_1, axis=0) |
|
elongation_1 = np.concatenate(elongation_1, axis=0) |
|
mask_indices_1 = np.concatenate(mask_indices_1, axis=0) |
|
|
|
points = np.concatenate([points_0, points_1], axis=0) |
|
intensity = np.concatenate([intensity_0, intensity_1], axis=0) |
|
elongation = np.concatenate([elongation_0, elongation_1], axis=0) |
|
mask_indices = np.concatenate([mask_indices_0, mask_indices_1], axis=0) |
|
|
|
|
|
|
|
|
|
point_cloud = np.column_stack( |
|
(points, intensity, elongation, mask_indices)) |
|
|
|
pc_path = f'{self.point_cloud_save_dir}/{self.prefix}' + \ |
|
f'{str(file_idx).zfill(3)}{str(frame_idx).zfill(3)}.bin' |
|
point_cloud.astype(np.float32).tofile(pc_path) |
|
|
|
def convert_range_image_to_point_cloud(self, |
|
frame, |
|
range_images, |
|
camera_projections, |
|
range_image_top_pose, |
|
ri_index=0): |
|
"""Convert range images to point cloud. |
|
|
|
Args: |
|
frame (:obj:`Frame`): Open dataset frame. |
|
range_images (dict): Mapping from laser_name to list of two |
|
range images corresponding with two returns. |
|
camera_projections (dict): Mapping from laser_name to list of two |
|
camera projections corresponding with two returns. |
|
range_image_top_pose (:obj:`Transform`): Range image pixel pose for |
|
top lidar. |
|
ri_index (int, optional): 0 for the first return, |
|
1 for the second return. Default: 0. |
|
|
|
Returns: |
|
tuple[list[np.ndarray]]: (List of points with shape [N, 3], |
|
camera projections of points with shape [N, 6], intensity |
|
with shape [N, 1], elongation with shape [N, 1], points' |
|
position in the depth map (element offset if points come from |
|
the main lidar otherwise -1) with shape[N, 1]). All the |
|
lists have the length of lidar numbers (5). |
|
""" |
|
calibrations = sorted( |
|
frame.context.laser_calibrations, key=lambda c: c.name) |
|
points = [] |
|
cp_points = [] |
|
intensity = [] |
|
elongation = [] |
|
mask_indices = [] |
|
|
|
frame_pose = tf.convert_to_tensor( |
|
value=np.reshape(np.array(frame.pose.transform), [4, 4])) |
|
|
|
range_image_top_pose_tensor = tf.reshape( |
|
tf.convert_to_tensor(value=range_image_top_pose.data), |
|
range_image_top_pose.shape.dims) |
|
|
|
range_image_top_pose_tensor_rotation = \ |
|
transform_utils.get_rotation_matrix( |
|
range_image_top_pose_tensor[..., 0], |
|
range_image_top_pose_tensor[..., 1], |
|
range_image_top_pose_tensor[..., 2]) |
|
range_image_top_pose_tensor_translation = \ |
|
range_image_top_pose_tensor[..., 3:] |
|
range_image_top_pose_tensor = transform_utils.get_transform( |
|
range_image_top_pose_tensor_rotation, |
|
range_image_top_pose_tensor_translation) |
|
for c in calibrations: |
|
range_image = range_images[c.name][ri_index] |
|
if len(c.beam_inclinations) == 0: |
|
beam_inclinations = range_image_utils.compute_inclination( |
|
tf.constant( |
|
[c.beam_inclination_min, c.beam_inclination_max]), |
|
height=range_image.shape.dims[0]) |
|
else: |
|
beam_inclinations = tf.constant(c.beam_inclinations) |
|
|
|
beam_inclinations = tf.reverse(beam_inclinations, axis=[-1]) |
|
extrinsic = np.reshape(np.array(c.extrinsic.transform), [4, 4]) |
|
|
|
range_image_tensor = tf.reshape( |
|
tf.convert_to_tensor(value=range_image.data), |
|
range_image.shape.dims) |
|
pixel_pose_local = None |
|
frame_pose_local = None |
|
if c.name == dataset_pb2.LaserName.TOP: |
|
pixel_pose_local = range_image_top_pose_tensor |
|
pixel_pose_local = tf.expand_dims(pixel_pose_local, axis=0) |
|
frame_pose_local = tf.expand_dims(frame_pose, axis=0) |
|
range_image_mask = range_image_tensor[..., 0] > 0 |
|
|
|
if self.filter_no_label_zone_points: |
|
nlz_mask = range_image_tensor[..., 3] != 1.0 |
|
range_image_mask = range_image_mask & nlz_mask |
|
|
|
range_image_cartesian = \ |
|
range_image_utils.extract_point_cloud_from_range_image( |
|
tf.expand_dims(range_image_tensor[..., 0], axis=0), |
|
tf.expand_dims(extrinsic, axis=0), |
|
tf.expand_dims(tf.convert_to_tensor( |
|
value=beam_inclinations), axis=0), |
|
pixel_pose=pixel_pose_local, |
|
frame_pose=frame_pose_local) |
|
|
|
mask_index = tf.where(range_image_mask) |
|
|
|
range_image_cartesian = tf.squeeze(range_image_cartesian, axis=0) |
|
points_tensor = tf.gather_nd(range_image_cartesian, mask_index) |
|
|
|
cp = camera_projections[c.name][ri_index] |
|
cp_tensor = tf.reshape( |
|
tf.convert_to_tensor(value=cp.data), cp.shape.dims) |
|
cp_points_tensor = tf.gather_nd(cp_tensor, mask_index) |
|
points.append(points_tensor.numpy()) |
|
cp_points.append(cp_points_tensor.numpy()) |
|
|
|
intensity_tensor = tf.gather_nd(range_image_tensor[..., 1], |
|
mask_index) |
|
intensity.append(intensity_tensor.numpy()) |
|
|
|
elongation_tensor = tf.gather_nd(range_image_tensor[..., 2], |
|
mask_index) |
|
elongation.append(elongation_tensor.numpy()) |
|
if c.name == 1: |
|
mask_index = (ri_index * range_image_mask.shape[0] + |
|
mask_index[:, 0] |
|
) * range_image_mask.shape[1] + mask_index[:, 1] |
|
mask_index = mask_index.numpy().astype(elongation[-1].dtype) |
|
else: |
|
mask_index = np.full_like(elongation[-1], -1) |
|
|
|
mask_indices.append(mask_index) |
|
|
|
return points, cp_points, intensity, elongation, mask_indices |
|
|
|
def cart_to_homo(self, mat): |
|
"""Convert transformation matrix in Cartesian coordinates to |
|
homogeneous format. |
|
|
|
Args: |
|
mat (np.ndarray): Transformation matrix in Cartesian. |
|
The input matrix shape is 3x3 or 3x4. |
|
|
|
Returns: |
|
np.ndarray: Transformation matrix in homogeneous format. |
|
The matrix shape is 4x4. |
|
""" |
|
ret = np.eye(4) |
|
if mat.shape == (3, 3): |
|
ret[:3, :3] = mat |
|
elif mat.shape == (3, 4): |
|
ret[:3, :] = mat |
|
else: |
|
raise ValueError(mat.shape) |
|
return ret |
|
|
|
def create_waymo_info_file(self, frame, file_idx, frame_idx, file_infos): |
|
r"""Generate waymo train/val/test infos. |
|
|
|
For more details about infos, please refer to: |
|
https://mmdetection3d.readthedocs.io/en/latest/advanced_guides/datasets/waymo.html |
|
""" |
|
frame_infos = dict() |
|
|
|
|
|
sample_idx = \ |
|
f'{self.prefix}{str(file_idx).zfill(3)}{str(frame_idx).zfill(3)}' |
|
frame_infos['sample_idx'] = int(sample_idx) |
|
frame_infos['timestamp'] = frame.timestamp_micros |
|
frame_infos['ego2global'] = np.array(frame.pose.transform).reshape( |
|
4, 4).astype(np.float32).tolist() |
|
frame_infos['context_name'] = frame.context.name |
|
|
|
|
|
frame_infos['images'] = dict() |
|
|
|
T_front_cam_to_ref = np.array([[0.0, -1.0, 0.0], [0.0, 0.0, -1.0], |
|
[1.0, 0.0, 0.0]]) |
|
camera_calibs = [] |
|
Tr_velo_to_cams = [] |
|
for camera in frame.context.camera_calibrations: |
|
|
|
T_cam_to_vehicle = np.array(camera.extrinsic.transform).reshape( |
|
4, 4) |
|
T_vehicle_to_cam = np.linalg.inv(T_cam_to_vehicle) |
|
Tr_velo_to_cam = \ |
|
self.cart_to_homo(T_front_cam_to_ref) @ T_vehicle_to_cam |
|
Tr_velo_to_cams.append(Tr_velo_to_cam) |
|
|
|
|
|
camera_calib = np.zeros((3, 4)) |
|
camera_calib[0, 0] = camera.intrinsic[0] |
|
camera_calib[1, 1] = camera.intrinsic[1] |
|
camera_calib[0, 2] = camera.intrinsic[2] |
|
camera_calib[1, 2] = camera.intrinsic[3] |
|
camera_calib[2, 2] = 1 |
|
camera_calibs.append(camera_calib) |
|
|
|
for i, (cam_key, camera_calib, Tr_velo_to_cam) in enumerate( |
|
zip(self.camera_types, camera_calibs, Tr_velo_to_cams)): |
|
cam_infos = dict() |
|
cam_infos['img_path'] = str(sample_idx) + '.jpg' |
|
|
|
for img in frame.images: |
|
if img.name == i + 1: |
|
width, height = Image.open(BytesIO(img.image)).size |
|
cam_infos['height'] = height |
|
cam_infos['width'] = width |
|
cam_infos['lidar2cam'] = Tr_velo_to_cam.astype(np.float32).tolist() |
|
cam_infos['cam2img'] = camera_calib.astype(np.float32).tolist() |
|
cam_infos['lidar2img'] = (camera_calib @ Tr_velo_to_cam).astype( |
|
np.float32).tolist() |
|
frame_infos['images'][cam_key] = cam_infos |
|
|
|
|
|
lidar_infos = dict() |
|
lidar_infos['lidar_path'] = str(sample_idx) + '.bin' |
|
lidar_infos['num_pts_feats'] = 6 |
|
frame_infos['lidar_points'] = lidar_infos |
|
|
|
|
|
|
|
|
|
lidar_sweeps_infos, image_sweeps_infos = [], [] |
|
for prev_offset in range(-1, -self.max_sweeps - 1, -1): |
|
prev_lidar_infos = dict() |
|
prev_image_infos = dict() |
|
if frame_idx + prev_offset >= 0: |
|
prev_frame_infos = file_infos[prev_offset] |
|
prev_lidar_infos['timestamp'] = prev_frame_infos['timestamp'] |
|
prev_lidar_infos['ego2global'] = prev_frame_infos['ego2global'] |
|
prev_lidar_infos['lidar_points'] = dict() |
|
lidar_path = prev_frame_infos['lidar_points']['lidar_path'] |
|
prev_lidar_infos['lidar_points']['lidar_path'] = lidar_path |
|
lidar_sweeps_infos.append(prev_lidar_infos) |
|
|
|
prev_image_infos['timestamp'] = prev_frame_infos['timestamp'] |
|
prev_image_infos['ego2global'] = prev_frame_infos['ego2global'] |
|
prev_image_infos['images'] = dict() |
|
for cam_key in self.camera_types: |
|
prev_image_infos['images'][cam_key] = dict() |
|
img_path = prev_frame_infos['images'][cam_key]['img_path'] |
|
prev_image_infos['images'][cam_key]['img_path'] = img_path |
|
image_sweeps_infos.append(prev_image_infos) |
|
if lidar_sweeps_infos: |
|
frame_infos['lidar_sweeps'] = lidar_sweeps_infos |
|
if image_sweeps_infos: |
|
frame_infos['image_sweeps'] = image_sweeps_infos |
|
|
|
if not self.test_mode: |
|
|
|
frame_infos['instances'] = self.gather_instance_info(frame) |
|
|
|
|
|
if self.save_cam_sync_instances: |
|
frame_infos['cam_sync_instances'] = self.gather_instance_info( |
|
frame, cam_sync=True) |
|
|
|
|
|
|
|
if self.save_cam_instances: |
|
frame_infos['cam_instances'] = self.gather_cam_instance_info( |
|
copy.deepcopy(frame_infos['instances']), |
|
frame_infos['images']) |
|
file_infos.append(frame_infos) |
|
|
|
def gather_instance_info(self, frame, cam_sync=False): |
|
"""Generate instances and cam_sync_instances infos. |
|
|
|
For more details about infos, please refer to: |
|
https://mmdetection3d.readthedocs.io/en/latest/advanced_guides/datasets/waymo.html |
|
""" |
|
id_to_bbox = dict() |
|
id_to_name = dict() |
|
for labels in frame.projected_lidar_labels: |
|
name = labels.name |
|
for label in labels.labels: |
|
|
|
bbox = [ |
|
label.box.center_x - label.box.length / 2, |
|
label.box.center_y - label.box.width / 2, |
|
label.box.center_x + label.box.length / 2, |
|
label.box.center_y + label.box.width / 2 |
|
] |
|
id_to_bbox[label.id] = bbox |
|
id_to_name[label.id] = name - 1 |
|
|
|
group_id = 0 |
|
instance_infos = [] |
|
for obj in frame.laser_labels: |
|
instance_info = dict() |
|
bounding_box = None |
|
name = None |
|
id = obj.id |
|
for proj_cam in self.cam_list: |
|
if id + proj_cam in id_to_bbox: |
|
bounding_box = id_to_bbox.get(id + proj_cam) |
|
name = id_to_name.get(id + proj_cam) |
|
break |
|
|
|
|
|
|
|
|
|
|
|
if cam_sync: |
|
if obj.most_visible_camera_name: |
|
name = self.cam_list.index( |
|
f'_{obj.most_visible_camera_name}') |
|
box3d = obj.camera_synced_box |
|
else: |
|
continue |
|
else: |
|
box3d = obj.box |
|
|
|
if bounding_box is None or name is None: |
|
name = 0 |
|
bounding_box = [0.0, 0.0, 0.0, 0.0] |
|
|
|
my_type = self.type_list[obj.type] |
|
|
|
if my_type not in self.selected_waymo_classes: |
|
continue |
|
else: |
|
label = self.selected_waymo_classes.index(my_type) |
|
|
|
if self.filter_empty_3dboxes and obj.num_lidar_points_in_box < 1: |
|
continue |
|
|
|
group_id += 1 |
|
instance_info['group_id'] = group_id |
|
instance_info['camera_id'] = name |
|
instance_info['bbox'] = bounding_box |
|
instance_info['bbox_label'] = label |
|
|
|
height = box3d.height |
|
width = box3d.width |
|
length = box3d.length |
|
|
|
|
|
x = box3d.center_x |
|
y = box3d.center_y |
|
z = box3d.center_z - height / 2 |
|
|
|
rotation_y = box3d.heading |
|
|
|
instance_info['bbox_3d'] = np.array( |
|
[x, y, z, length, width, height, |
|
rotation_y]).astype(np.float32).tolist() |
|
instance_info['bbox_label_3d'] = label |
|
instance_info['num_lidar_pts'] = obj.num_lidar_points_in_box |
|
|
|
if self.save_track_id: |
|
instance_info['track_id'] = obj.id |
|
instance_infos.append(instance_info) |
|
return instance_infos |
|
|
|
def gather_cam_instance_info(self, instances: dict, images: dict): |
|
"""Generate cam_instances infos. |
|
|
|
For more details about infos, please refer to: |
|
https://mmdetection3d.readthedocs.io/en/latest/advanced_guides/datasets/waymo.html |
|
""" |
|
cam_instances = dict() |
|
for cam_type in self.camera_types: |
|
lidar2cam = np.array(images[cam_type]['lidar2cam']) |
|
cam2img = np.array(images[cam_type]['cam2img']) |
|
cam_instances[cam_type] = [] |
|
for instance in instances: |
|
cam_instance = dict() |
|
gt_bboxes_3d = np.array(instance['bbox_3d']) |
|
|
|
gt_bboxes_3d = LiDARInstance3DBoxes( |
|
gt_bboxes_3d[None, :]).convert_to( |
|
Box3DMode.CAM, lidar2cam, correct_yaw=True) |
|
corners_3d = gt_bboxes_3d.corners.numpy() |
|
corners_3d = corners_3d[0].T |
|
in_camera = np.argwhere(corners_3d[2, :] > 0).flatten() |
|
corners_3d = corners_3d[:, in_camera] |
|
|
|
corner_coords = view_points(corners_3d, cam2img, |
|
True).T[:, :2].tolist() |
|
|
|
|
|
|
|
|
|
|
|
|
|
final_coords = post_process_coords( |
|
corner_coords, |
|
imsize=(images['CAM_FRONT']['width'], |
|
images['CAM_FRONT']['height'])) |
|
|
|
|
|
|
|
if final_coords is None: |
|
continue |
|
else: |
|
min_x, min_y, max_x, max_y = final_coords |
|
|
|
cam_instance['bbox'] = [min_x, min_y, max_x, max_y] |
|
cam_instance['bbox_label'] = instance['bbox_label'] |
|
cam_instance['bbox_3d'] = gt_bboxes_3d.numpy().squeeze( |
|
).astype(np.float32).tolist() |
|
cam_instance['bbox_label_3d'] = instance['bbox_label_3d'] |
|
|
|
center_3d = gt_bboxes_3d.gravity_center.numpy() |
|
center_2d_with_depth = points_cam2img( |
|
center_3d, cam2img, with_depth=True) |
|
center_2d_with_depth = center_2d_with_depth.squeeze().tolist() |
|
|
|
|
|
|
|
if center_2d_with_depth[2] <= 0: |
|
continue |
|
cam_instance['center_2d'] = center_2d_with_depth[:2] |
|
cam_instance['depth'] = center_2d_with_depth[2] |
|
|
|
|
|
cam_instance['bbox_3d_isvalid'] = True |
|
cam_instance['velocity'] = -1 |
|
cam_instances[cam_type].append(cam_instance) |
|
|
|
return cam_instances |
|
|
|
def merge_trainval_infos(self): |
|
"""Merge training and validation infos into a single file.""" |
|
train_infos_path = osp.join( |
|
osp.dirname(self.save_dir), f'{self.info_prefix}_infos_train.pkl') |
|
val_infos_path = osp.join( |
|
osp.dirname(self.save_dir), f'{self.info_prefix}_infos_val.pkl') |
|
train_infos = mmengine.load(train_infos_path) |
|
val_infos = mmengine.load(val_infos_path) |
|
trainval_infos = dict( |
|
metainfo=train_infos['metainfo'], |
|
data_list=train_infos['data_list'] + val_infos['data_list']) |
|
mmengine.dump( |
|
trainval_infos, |
|
osp.join( |
|
osp.dirname(self.save_dir), |
|
f'{self.info_prefix}_infos_trainval.pkl')) |
|
|
|
|
|
def create_ImageSets_img_ids(root_dir, splits): |
|
"""Create txt files indicating what to collect in each split.""" |
|
save_dir = join(root_dir, 'ImageSets/') |
|
if not exists(save_dir): |
|
os.mkdir(save_dir) |
|
|
|
idx_all = [[] for _ in splits] |
|
for i, split in enumerate(splits): |
|
path = join(root_dir, split, 'image_0') |
|
if not exists(path): |
|
RawNames = [] |
|
else: |
|
RawNames = os.listdir(path) |
|
|
|
for name in RawNames: |
|
if name.endswith('.jpg'): |
|
idx = name.replace('.jpg', '\n') |
|
idx_all[int(idx[0])].append(idx) |
|
idx_all[i].sort() |
|
|
|
open(save_dir + 'train.txt', 'w').writelines(idx_all[0]) |
|
open(save_dir + 'val.txt', 'w').writelines(idx_all[1]) |
|
open(save_dir + 'trainval.txt', 'w').writelines(idx_all[0] + idx_all[1]) |
|
if len(idx_all) >= 3: |
|
open(save_dir + 'test.txt', 'w').writelines(idx_all[2]) |
|
if len(idx_all) >= 4: |
|
open(save_dir + 'test_cam_only.txt', 'w').writelines(idx_all[3]) |
|
print('created txt files indicating what to collect in ', splits) |
|
|