import numpy as np import h5py import os import os.path as osp import shutil from glob import glob import torch from torch_scatter import scatter from torch_geometric.data import InMemoryDataset, Data, extract_zip from tqdm import tqdm import torch_geometric as tg def intersection(lst1, lst2): return list(set(lst1) & set(lst2)) def process_indexes(idx_list): idx_dict = {} for i, idx in enumerate(idx_list): idx_dict[idx] = i return [idx_dict[i] for i in sorted(idx_dict.keys())] def add_zeros(data): data.x = torch.zeros(data.num_nodes, dtype=torch.long) return data def extract_node_feature(data, reduce='add'): if reduce in ['mean', 'max', 'add']: data.x = scatter(data.edge_attr, data.edge_index[0], dim=0, dim_size=data.num_nodes, reduce=reduce) else: raise Exception('Unknown Aggregation Type') return data # random partition graph def random_partition_graph(num_nodes, cluster_number=10): parts = np.random.randint(cluster_number, size=num_nodes) return parts def generate_sub_graphs(adj, parts, cluster_number=10, batch_size=1): # convert sparse tensor to scipy csr adj = adj.to_scipy(layout='csr') num_batches = cluster_number // batch_size sg_nodes = [[] for _ in range(num_batches)] sg_edges = [[] for _ in range(num_batches)] for cluster in range(num_batches): sg_nodes[cluster] = np.where(parts == cluster)[0] sg_edges[cluster] = tg.utils.from_scipy_sparse_matrix(adj[sg_nodes[cluster], :][:, sg_nodes[cluster]])[0] return sg_nodes, sg_edges def random_rotate(points): theta = np.random.uniform(0, np.pi * 2) rotation_matrix = np.array([[np.cos(theta), -np.sin(theta)], [np.sin(theta), np.cos(theta)]]) rotation_matrix = torch.from_numpy(rotation_matrix).float() points[:, 0:2] = torch.matmul(points[:, [0, 1]].transpose(1, 3), rotation_matrix).transpose(1, 3) return points def random_translate(points, mean=0, std=0.02): points += torch.randn(points.shape)*std + mean return points def random_points_augmentation(points, rotate=False, translate=False, **kwargs): if rotate: points = random_rotate(points) if translate: points = random_translate(points, **kwargs) return points def scale_translate_pointcloud(pointcloud, shift=[-0.2, 0.2], scale=[2. / 3., 3. /2.]): """ for scaling and shifting the point cloud :param pointcloud: :return: """ B, C, N = pointcloud.shape[0:3] scale = scale[0] + torch.rand([B, C, 1, 1])*(scale[1]-scale[0]) shift = shift[0] + torch.rand([B, C, 1, 1]) * (shift[1]-shift[0]) translated_pointcloud = torch.mul(pointcloud, scale) + shift return translated_pointcloud class PartNet(InMemoryDataset): r"""The PartNet dataset from the `"PartNet: A Large-scale Benchmark for Fine-grained and Hierarchical Part-level 3D Object Understanding" `_ paper, containing 3D objects annotated with fine-grained, instance-level, and hierarchical 3D part information. Args: root (string): Root directory where the dataset should be saved. dataset (str, optional): Which dataset to use (ins_seg_h5, or sem_seg_h5). (default: :obj:`sem_seg_h5`) obj_category (str, optional): which category to load. (default: :obj:`Bed`) level (str, optional): Which level of part semantic segmentation to use. (default: :obj:`3`) phase (str, optional): If :obj:`test`, loads the testing dataset, If :obj:`val`, loads the validation dataset, otherwise the training dataset. (default: :obj:`train`) transform (callable, optional): A function/transform that takes in an :obj:`torch_geometric.data.Data` object and returns a transformed version. The data object will be transformed before every access. (default: :obj:`None`) pre_transform (callable, optional): A function/transform that takes in an :obj:`torch_geometric.data.Data` object and returns a transformed version. The data object will be transformed before being saved to disk. (default: :obj:`None`) pre_filter (callable, optional): A function that takes in an :obj:`torch_geometric.data.Data` object and returns a boolean value, indicating whether the data object should be included in the final dataset. (default: :obj:`None`) """ # the dataset we use for our paper is pre-released version def __init__(self, root, dataset='sem_seg_h5', obj_category='Bed', level=3, phase='train', transform=None, pre_transform=None, pre_filter=None): self.dataset = dataset self.level = level self.obj_category = obj_category self.object = '-'.join([self.obj_category, str(self.level)]) self.level_folder = 'level_'+str(self.level) self.processed_file_folder = osp.join(self.dataset, self.level_folder, self.object) super(PartNet, self).__init__(root, transform, pre_transform, pre_filter) if phase == 'test': path = self.processed_paths[1] elif phase == 'val': path = self.processed_paths[2] else: path = self.processed_paths[0] self.data, self.slices = torch.load(path) @property def raw_file_names(self): return [self.dataset] @property def processed_file_names(self): return osp.join(self.processed_file_folder, 'train.pt'), osp.join(self.processed_file_folder, 'test.pt'), \ osp.join(self.processed_file_folder, 'val.pt') def download(self): path = osp.join(self.raw_dir, self.dataset) if not osp.exists(path): raise FileExistsError('PartNet can only downloaded via application. ' 'See details in https://cs.stanford.edu/~kaichun/partnet/') # path = download_url(self.url, self.root) extract_zip(path, self.root) os.unlink(path) shutil.rmtree(self.raw_dir) name = self.url.split(os.sep)[-1].split('.')[0] os.rename(osp.join(self.root, name), self.raw_dir) def process(self): # save to processed_paths processed_path = osp.join(self.processed_dir, self.processed_file_folder) if not osp.exists(processed_path): os.makedirs(osp.join(processed_path)) torch.save(self.process_set('train'), self.processed_paths[0]) torch.save(self.process_set('test'), self.processed_paths[1]) torch.save(self.process_set('val'), self.processed_paths[2]) def process_set(self, dataset): if self.dataset == 'ins_seg_h5': raw_path = osp.join(self.raw_dir, 'ins_seg_h5_for_sgpn', self.dataset) categories = glob(osp.join(raw_path, '*')) categories = sorted([x.split(os.sep)[-1] for x in categories]) data_list = [] for target, category in enumerate(tqdm(categories)): folder = osp.join(raw_path, category) paths = glob('{}/{}-*.h5'.format(folder, dataset)) labels, nors, opacitys, pts, rgbs = [], [], [], [], [] for path in paths: f = h5py.File(path) pts += torch.from_numpy(f['pts'][:]).unbind(0) labels += torch.from_numpy(f['label'][:]).to(torch.long).unbind(0) nors += torch.from_numpy(f['nor'][:]).unbind(0) opacitys += torch.from_numpy(f['opacity'][:]).unbind(0) rgbs += torch.from_numpy(f['rgb'][:]).to(torch.float32).unbind(0) for i, (pt, label, nor, opacity, rgb) in enumerate(zip(pts, labels, nors, opacitys, rgbs)): data = Data(pos=pt[:, :3], y=label, norm=nor[:, :3], x=torch.cat((opacity.unsqueeze(-1), rgb/255.), 1)) if self.pre_filter is not None and not self.pre_filter(data): continue if self.pre_transform is not None: data = self.pre_transform(data) data_list.append(data) else: raw_path = osp.join(self.raw_dir, self.dataset) categories = glob(osp.join(raw_path, self.object)) categories = sorted([x.split(os.sep)[-1] for x in categories]) data_list = [] # class_name = [] for target, category in enumerate(tqdm(categories)): folder = osp.join(raw_path, category) paths = glob('{}/{}-*.h5'.format(folder, dataset)) labels, pts = [], [] # clss = category.split('-')[0] for path in paths: f = h5py.File(path) pts += torch.from_numpy(f['data'][:].astype(np.float32)).unbind(0) labels += torch.from_numpy(f['label_seg'][:].astype(np.float32)).to(torch.long).unbind(0) for i, (pt, label) in enumerate(zip(pts, labels)): data = Data(pos=pt[:, :3], y=label) # data = PartData(pos=pt[:, :3], y=label, clss=clss) if self.pre_filter is not None and not self.pre_filter(data): continue if self.pre_transform is not None: data = self.pre_transform(data) data_list.append(data) return self.collate(data_list) class PartData(Data): def __init__(self, y=None, pos=None, clss=None): super(PartData).__init__(pos=pos, y=y) self.clss = clss # allowable multiple choice node and edge features # code from https://github.com/snap-stanford/ogb/blob/master/ogb/utils/features.py allowable_features = { 'possible_atomic_num_list' : list(range(1, 119)) + ['misc'], 'possible_chirality_list' : [ 'CHI_UNSPECIFIED', 'CHI_TETRAHEDRAL_CW', 'CHI_TETRAHEDRAL_CCW', 'CHI_OTHER' ], 'possible_degree_list' : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 'misc'], 'possible_formal_charge_list' : [-5, -4, -3, -2, -1, 0, 1, 2, 3, 4, 5, 'misc'], 'possible_numH_list' : [0, 1, 2, 3, 4, 5, 6, 7, 8, 'misc'], 'possible_number_radical_e_list': [0, 1, 2, 3, 4, 'misc'], 'possible_hybridization_list' : [ 'SP', 'SP2', 'SP3', 'SP3D', 'SP3D2', 'misc' ], 'possible_is_aromatic_list': [False, True], 'possible_is_in_ring_list': [False, True], 'possible_bond_type_list' : [ 'SINGLE', 'DOUBLE', 'TRIPLE', 'AROMATIC', 'misc' ], 'possible_bond_stereo_list': [ 'STEREONONE', 'STEREOZ', 'STEREOE', 'STEREOCIS', 'STEREOTRANS', 'STEREOANY', ], 'possible_is_conjugated_list': [False, True], } def safe_index(l, e): """ Return index of element e in list l. If e is not present, return the last index """ try: return l.index(e) except: return len(l) - 1 def atom_to_feature_vector(atom): """ Converts rdkit atom object to feature list of indices :param mol: rdkit atom object :return: list """ atom_feature = [ safe_index(allowable_features['possible_atomic_num_list'], atom.GetAtomicNum()), allowable_features['possible_chirality_list'].index(str(atom.GetChiralTag())), safe_index(allowable_features['possible_degree_list'], atom.GetTotalDegree()), safe_index(allowable_features['possible_formal_charge_list'], atom.GetFormalCharge()), safe_index(allowable_features['possible_numH_list'], atom.GetTotalNumHs()), safe_index(allowable_features['possible_number_radical_e_list'], atom.GetNumRadicalElectrons()), safe_index(allowable_features['possible_hybridization_list'], str(atom.GetHybridization())), allowable_features['possible_is_aromatic_list'].index(atom.GetIsAromatic()), allowable_features['possible_is_in_ring_list'].index(atom.IsInRing()), ] return atom_feature def get_atom_feature_dims(): return list(map(len, [ allowable_features['possible_atomic_num_list'], allowable_features['possible_chirality_list'], allowable_features['possible_degree_list'], allowable_features['possible_formal_charge_list'], allowable_features['possible_numH_list'], allowable_features['possible_number_radical_e_list'], allowable_features['possible_hybridization_list'], allowable_features['possible_is_aromatic_list'], allowable_features['possible_is_in_ring_list'] ])) def bond_to_feature_vector(bond): """ Converts rdkit bond object to feature list of indices :param mol: rdkit bond object :return: list """ bond_feature = [ safe_index(allowable_features['possible_bond_type_list'], str(bond.GetBondType())), allowable_features['possible_bond_stereo_list'].index(str(bond.GetStereo())), allowable_features['possible_is_conjugated_list'].index(bond.GetIsConjugated()), ] return bond_feature def get_bond_feature_dims(): return list(map(len, [ allowable_features['possible_bond_type_list'], allowable_features['possible_bond_stereo_list'], allowable_features['possible_is_conjugated_list'] ])) def atom_feature_vector_to_dict(atom_feature): [atomic_num_idx, chirality_idx, degree_idx, formal_charge_idx, num_h_idx, number_radical_e_idx, hybridization_idx, is_aromatic_idx, is_in_ring_idx] = atom_feature feature_dict = { 'atomic_num': allowable_features['possible_atomic_num_list'][atomic_num_idx], 'chirality': allowable_features['possible_chirality_list'][chirality_idx], 'degree': allowable_features['possible_degree_list'][degree_idx], 'formal_charge': allowable_features['possible_formal_charge_list'][formal_charge_idx], 'num_h': allowable_features['possible_numH_list'][num_h_idx], 'num_rad_e': allowable_features['possible_number_radical_e_list'][number_radical_e_idx], 'hybridization': allowable_features['possible_hybridization_list'][hybridization_idx], 'is_aromatic': allowable_features['possible_is_aromatic_list'][is_aromatic_idx], 'is_in_ring': allowable_features['possible_is_in_ring_list'][is_in_ring_idx] } return feature_dict def bond_feature_vector_to_dict(bond_feature): [bond_type_idx, bond_stereo_idx, is_conjugated_idx] = bond_feature feature_dict = { 'bond_type': allowable_features['possible_bond_type_list'][bond_type_idx], 'bond_stereo': allowable_features['possible_bond_stereo_list'][bond_stereo_idx], 'is_conjugated': allowable_features['possible_is_conjugated_list'][is_conjugated_idx] } return feature_dict def one_hot_vector_sm(input, set1, set2, set3): return torch.cat([set1[int(input[0]),:], set2[int(input[1]),:], set3[int(input[2]),:]]) def one_hot_vector_am(input, feat_dims): features = [torch.eye(feat_dims[i])[int(input[i]),:] for i in range(len(feat_dims))] return torch.cat(features)