|
|
|
|
|
|
|
|
|
|
|
|
|
import pycolmap |
|
import os |
|
import os.path as path |
|
import kapture.io |
|
import kapture.io.csv |
|
import subprocess |
|
import PIL |
|
from tqdm import tqdm |
|
import PIL.Image |
|
import numpy as np |
|
from typing import List, Tuple, Union |
|
|
|
from mast3r.model import AsymmetricMASt3R |
|
from mast3r.colmap.database import export_matches, get_im_matches |
|
|
|
import mast3r.utils.path_to_dust3r |
|
from dust3r_visloc.datasets.utils import get_resize_function |
|
|
|
import kapture |
|
from kapture.converter.colmap.database_extra import get_colmap_camera_ids_from_db, get_colmap_image_ids_from_db |
|
from kapture.utils.paths import path_secure |
|
|
|
from dust3r.datasets.utils.transforms import ImgNorm |
|
from dust3r.inference import inference |
|
|
|
|
|
def scene_prepare_images(root: str, maxdim: int, patch_size: int, image_paths: List[str]): |
|
images = [] |
|
|
|
for idx in tqdm(range(len(image_paths))): |
|
rgb_image = PIL.Image.open(os.path.join(root, image_paths[idx])).convert('RGB') |
|
|
|
|
|
W, H = rgb_image.size |
|
resize_func, _, to_orig = get_resize_function(maxdim, patch_size, H, W) |
|
rgb_tensor = resize_func(ImgNorm(rgb_image)) |
|
|
|
|
|
images.append({'img': rgb_tensor.unsqueeze(0), |
|
'true_shape': np.int32([rgb_tensor.shape[1:]]), |
|
'to_orig': to_orig, |
|
'idx': idx, |
|
'instance': image_paths[idx], |
|
'orig_shape': np.int32([H, W])}) |
|
return images |
|
|
|
|
|
def remove_duplicates(images, image_pairs): |
|
pairs_added = set() |
|
pairs = [] |
|
for (i, _), (j, _) in image_pairs: |
|
smallidx, bigidx = min(i, j), max(i, j) |
|
if (smallidx, bigidx) in pairs_added: |
|
continue |
|
pairs_added.add((smallidx, bigidx)) |
|
pairs.append((images[i], images[j])) |
|
return pairs |
|
|
|
|
|
def run_mast3r_matching(model: AsymmetricMASt3R, maxdim: int, patch_size: int, device, |
|
kdata: kapture.Kapture, root_path: str, image_pairs_kapture: List[Tuple[str, str]], |
|
colmap_db, |
|
dense_matching: bool, pixel_tol: int, conf_thr: float, skip_geometric_verification: bool, |
|
min_len_track: int): |
|
assert kdata.records_camera is not None |
|
image_paths = kdata.records_camera.data_list() |
|
image_path_to_idx = {image_path: idx for idx, image_path in enumerate(image_paths)} |
|
image_path_to_ts = {kdata.records_camera[ts, camid]: (ts, camid) for ts, camid in kdata.records_camera.key_pairs()} |
|
|
|
images = scene_prepare_images(root_path, maxdim, patch_size, image_paths) |
|
image_pairs = [((image_path_to_idx[image_path1], image_path1), (image_path_to_idx[image_path2], image_path2)) |
|
for image_path1, image_path2 in image_pairs_kapture] |
|
matching_pairs = remove_duplicates(images, image_pairs) |
|
|
|
colmap_camera_ids = get_colmap_camera_ids_from_db(colmap_db, kdata.records_camera) |
|
colmap_image_ids = get_colmap_image_ids_from_db(colmap_db) |
|
im_keypoints = {idx: {} for idx in range(len(image_paths))} |
|
|
|
im_matches = {} |
|
image_to_colmap = {} |
|
for image_path, idx in image_path_to_idx.items(): |
|
_, camid = image_path_to_ts[image_path] |
|
colmap_camid = colmap_camera_ids[camid] |
|
colmap_imid = colmap_image_ids[image_path] |
|
image_to_colmap[idx] = { |
|
'colmap_imid': colmap_imid, |
|
'colmap_camid': colmap_camid |
|
} |
|
|
|
|
|
for chunk in tqdm(range(0, len(matching_pairs), 4)): |
|
pairs_chunk = matching_pairs[chunk:chunk + 4] |
|
output = inference(pairs_chunk, model, device, batch_size=1, verbose=False) |
|
pred1, pred2 = output['pred1'], output['pred2'] |
|
|
|
im_images_chunk = get_im_matches(pred1=pred1, pred2=pred2, pairs=pairs_chunk, image_to_colmap=image_to_colmap, |
|
im_keypoints=im_keypoints, conf_thr=conf_thr, is_sparse=not dense_matching, |
|
pixel_tol=pixel_tol) |
|
im_matches.update(im_images_chunk.items()) |
|
|
|
|
|
colmap_image_pairs = export_matches( |
|
colmap_db, images, image_to_colmap, im_keypoints, im_matches, min_len_track, skip_geometric_verification) |
|
colmap_db.commit() |
|
|
|
return colmap_image_pairs |
|
|
|
|
|
def pycolmap_run_triangulator(colmap_db_path, prior_recon_path, recon_path, image_root_path): |
|
print("running mapping") |
|
reconstruction = pycolmap.Reconstruction(prior_recon_path) |
|
pycolmap.triangulate_points( |
|
reconstruction=reconstruction, |
|
database_path=colmap_db_path, |
|
image_path=image_root_path, |
|
output_path=recon_path, |
|
refine_intrinsics=False, |
|
) |
|
|
|
|
|
def pycolmap_run_mapper(colmap_db_path, recon_path, image_root_path): |
|
print("running mapping") |
|
reconstructions = pycolmap.incremental_mapping( |
|
database_path=colmap_db_path, |
|
image_path=image_root_path, |
|
output_path=recon_path, |
|
options=pycolmap.IncrementalPipelineOptions({'multiple_models': False, |
|
'extract_colors': True, |
|
}) |
|
) |
|
|
|
|
|
def glomap_run_mapper(glomap_bin, colmap_db_path, recon_path, image_root_path): |
|
print("running mapping") |
|
args = [ |
|
'mapper', |
|
'--database_path', |
|
colmap_db_path, |
|
'--image_path', |
|
image_root_path, |
|
'--output_path', |
|
recon_path |
|
] |
|
args.insert(0, glomap_bin) |
|
glomap_process = subprocess.Popen(args) |
|
glomap_process.wait() |
|
|
|
if glomap_process.returncode != 0: |
|
raise ValueError( |
|
'\nSubprocess Error (Return code:' |
|
f' {glomap_process.returncode} )') |
|
|
|
|
|
def kapture_import_image_folder_or_list(images_path: Union[str, Tuple[str, List[str]]], use_single_camera=False) -> kapture.Kapture: |
|
images = kapture.RecordsCamera() |
|
|
|
if isinstance(images_path, str): |
|
images_root = images_path |
|
file_list = [path.relpath(path.join(dirpath, filename), images_root) |
|
for dirpath, dirs, filenames in os.walk(images_root) |
|
for filename in filenames] |
|
file_list = sorted(file_list) |
|
else: |
|
images_root, file_list = images_path |
|
|
|
sensors = kapture.Sensors() |
|
for n, filename in enumerate(file_list): |
|
|
|
try: |
|
|
|
with PIL.Image.open(path.join(images_root, filename)) as im: |
|
width, height = im.size |
|
model_params = [width, height] |
|
except (OSError, PIL.UnidentifiedImageError): |
|
|
|
print(f'Skipping invalid image file {filename}') |
|
continue |
|
|
|
camera_id = f'sensor' |
|
if use_single_camera and camera_id not in sensors: |
|
sensors[camera_id] = kapture.Camera(kapture.CameraType.UNKNOWN_CAMERA, model_params) |
|
elif use_single_camera: |
|
assert sensors[camera_id].camera_params[0] == width and sensors[camera_id].camera_params[1] == height |
|
else: |
|
camera_id = camera_id + f'{n}' |
|
sensors[camera_id] = kapture.Camera(kapture.CameraType.UNKNOWN_CAMERA, model_params) |
|
|
|
images[(n, camera_id)] = path_secure(filename) |
|
|
|
return kapture.Kapture(sensors=sensors, records_camera=images) |
|
|