import argparse import contextlib from typing import Optional, List, Dict, Any import io import sys from pathlib import Path import numpy as np from tqdm import tqdm import pycolmap from . import logger from .utils.database import COLMAPDatabase from .utils.io import get_keypoints, get_matches from .utils.parsers import parse_retrieval from .utils.geometry import compute_epipolar_errors class OutputCapture: def __init__(self, verbose: bool): self.verbose = verbose def __enter__(self): if not self.verbose: self.capture = contextlib.redirect_stdout(io.StringIO()) self.out = self.capture.__enter__() def __exit__(self, exc_type, *args): if not self.verbose: self.capture.__exit__(exc_type, *args) if exc_type is not None: logger.error('Failed with output:\n%s', self.out.getvalue()) sys.stdout.flush() def create_db_from_model(reconstruction: pycolmap.Reconstruction, database_path: Path) -> Dict[str, int]: if database_path.exists(): logger.warning('The database already exists, deleting it.') database_path.unlink() db = COLMAPDatabase.connect(database_path) db.create_tables() for i, camera in reconstruction.cameras.items(): db.add_camera( camera.model_id, camera.width, camera.height, camera.params, camera_id=i, prior_focal_length=True) for i, image in reconstruction.images.items(): db.add_image(image.name, image.camera_id, image_id=i) db.commit() db.close() return {image.name: i for i, image in reconstruction.images.items()} def import_features(image_ids: Dict[str, int], database_path: Path, features_path: Path): logger.info('Importing features into the database...') db = COLMAPDatabase.connect(database_path) for image_name, image_id in tqdm(image_ids.items()): keypoints = get_keypoints(features_path, image_name) keypoints += 0.5 # COLMAP origin db.add_keypoints(image_id, keypoints) db.commit() db.close() def import_matches(image_ids: Dict[str, int], database_path: Path, pairs_path: Path, matches_path: Path, min_match_score: Optional[float] = None, skip_geometric_verification: bool = False): logger.info('Importing matches into the database...') with open(str(pairs_path), 'r') as f: pairs = [p.split() for p in f.readlines()] db = COLMAPDatabase.connect(database_path) matched = set() for name0, name1 in tqdm(pairs): id0, id1 = image_ids[name0], image_ids[name1] if len({(id0, id1), (id1, id0)} & matched) > 0: continue matches, scores = get_matches(matches_path, name0, name1) if min_match_score: matches = matches[scores > min_match_score] db.add_matches(id0, id1, matches) matched |= {(id0, id1), (id1, id0)} if skip_geometric_verification: db.add_two_view_geometry(id0, id1, matches) db.commit() db.close() def estimation_and_geometric_verification(database_path: Path, pairs_path: Path, verbose: bool = False): logger.info('Performing geometric verification of the matches...') with OutputCapture(verbose): with pycolmap.ostream(): pycolmap.verify_matches( database_path, pairs_path, options=dict(ransac=dict(max_num_trials=20000, min_inlier_ratio=0.1)),) def geometric_verification(image_ids: Dict[str, int], reference: pycolmap.Reconstruction, database_path: Path, features_path: Path, pairs_path: Path, matches_path: Path, max_error: float = 4.0): logger.info('Performing geometric verification of the matches...') pairs = parse_retrieval(pairs_path) db = COLMAPDatabase.connect(database_path) inlier_ratios = [] matched = set() for name0 in tqdm(pairs): id0 = image_ids[name0] image0 = reference.images[id0] cam0 = reference.cameras[image0.camera_id] kps0, noise0 = get_keypoints( features_path, name0, return_uncertainty=True) noise0 = 1.0 if noise0 is None else noise0 if len(kps0) > 0: kps0 = np.stack(cam0.image_to_world(kps0)) else: kps0 = np.zeros((0, 2)) for name1 in pairs[name0]: id1 = image_ids[name1] image1 = reference.images[id1] cam1 = reference.cameras[image1.camera_id] kps1, noise1 = get_keypoints( features_path, name1, return_uncertainty=True) noise1 = 1.0 if noise1 is None else noise1 if len(kps1) > 0: kps1 = np.stack(cam1.image_to_world(kps1)) else: kps1 = np.zeros((0, 2)) matches = get_matches(matches_path, name0, name1)[0] if len({(id0, id1), (id1, id0)} & matched) > 0: continue matched |= {(id0, id1), (id1, id0)} if matches.shape[0] == 0: db.add_two_view_geometry(id0, id1, matches) continue qvec_01, tvec_01 = pycolmap.relative_pose( image0.qvec, image0.tvec, image1.qvec, image1.tvec) _, errors0, errors1 = compute_epipolar_errors( qvec_01, tvec_01, kps0[matches[:, 0]], kps1[matches[:, 1]]) valid_matches = np.logical_and( errors0 <= max_error * noise0 / cam0.mean_focal_length(), errors1 <= max_error * noise1 / cam1.mean_focal_length()) # TODO: We could also add E to the database, but we need # to reverse the transformations if id0 > id1 in utils/database.py. db.add_two_view_geometry(id0, id1, matches[valid_matches, :]) inlier_ratios.append(np.mean(valid_matches)) logger.info('mean/med/min/max valid matches %.2f/%.2f/%.2f/%.2f%%.', np.mean(inlier_ratios) * 100, np.median(inlier_ratios) * 100, np.min(inlier_ratios) * 100, np.max(inlier_ratios) * 100) db.commit() db.close() def run_triangulation(model_path: Path, database_path: Path, image_dir: Path, reference_model: pycolmap.Reconstruction, verbose: bool = False, options: Optional[Dict[str, Any]] = None, ) -> pycolmap.Reconstruction: model_path.mkdir(parents=True, exist_ok=True) logger.info('Running 3D triangulation...') if options is None: options = {} with OutputCapture(verbose): with pycolmap.ostream(): reconstruction = pycolmap.triangulate_points( reference_model, database_path, image_dir, model_path, options=options) return reconstruction def main(sfm_dir: Path, reference_model: Path, image_dir: Path, pairs: Path, features: Path, matches: Path, skip_geometric_verification: bool = False, estimate_two_view_geometries: bool = False, min_match_score: Optional[float] = None, verbose: bool = False, mapper_options: Optional[Dict[str, Any]] = None, ) -> pycolmap.Reconstruction: assert reference_model.exists(), reference_model assert features.exists(), features assert pairs.exists(), pairs assert matches.exists(), matches sfm_dir.mkdir(parents=True, exist_ok=True) database = sfm_dir / 'database.db' reference = pycolmap.Reconstruction(reference_model) image_ids = create_db_from_model(reference, database) import_features(image_ids, database, features) import_matches(image_ids, database, pairs, matches, min_match_score, skip_geometric_verification) if not skip_geometric_verification: if estimate_two_view_geometries: estimation_and_geometric_verification(database, pairs, verbose) else: geometric_verification( image_ids, reference, database, features, pairs, matches) reconstruction = run_triangulation(sfm_dir, database, image_dir, reference, verbose, mapper_options) logger.info('Finished the triangulation with statistics:\n%s', reconstruction.summary()) return reconstruction def parse_option_args(args: List[str], default_options) -> Dict[str, Any]: options = {} for arg in args: idx = arg.find('=') if idx == -1: raise ValueError('Options format: key1=value1 key2=value2 etc.') key, value = arg[:idx], arg[idx+1:] if not hasattr(default_options, key): raise ValueError( f'Unknown option "{key}", allowed options and default values' f' for {default_options.summary()}') value = eval(value) target_type = type(getattr(default_options, key)) if not isinstance(value, target_type): raise ValueError(f'Incorrect type for option "{key}":' f' {type(value)} vs {target_type}') options[key] = value return options if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--sfm_dir', type=Path, required=True) parser.add_argument('--reference_sfm_model', type=Path, required=True) parser.add_argument('--image_dir', type=Path, required=True) parser.add_argument('--pairs', type=Path, required=True) parser.add_argument('--features', type=Path, required=True) parser.add_argument('--matches', type=Path, required=True) parser.add_argument('--skip_geometric_verification', action='store_true') parser.add_argument('--min_match_score', type=float) parser.add_argument('--verbose', action='store_true') args = parser.parse_args().__dict__ mapper_options = parse_option_args( args.pop("mapper_options"), pycolmap.IncrementalMapperOptions()) main(**args, mapper_options=mapper_options)