Spaces:
Sleeping
Sleeping
import cv2 | |
import numpy as np | |
import glob | |
import os.path as osp | |
import os | |
import time | |
import torch | |
from pathlib import Path | |
from multiprocessing import Process, Queue | |
from dpvo.utils import Timer | |
from dpvo.dpvo import DPVO | |
from dpvo.config import cfg | |
from dpvo.stream import image_stream, video_stream | |
ROOT_DIR = osp.abspath(f"{__file__}/../../../../") | |
DPVO_DIR = osp.join(ROOT_DIR, "third-party/DPVO") | |
class SLAMModel(object): | |
def __init__(self, video, output_pth, width, height, calib=None, stride=1, skip=0, buffer=2048): | |
if calib == None or not osp.exists(calib): | |
calib = osp.join(output_pth, 'calib.txt') | |
if not osp.exists(calib): | |
self.estimate_intrinsics(width, height, calib) | |
self.dpvo_cfg = osp.join(DPVO_DIR, 'config/default.yaml') | |
self.dpvo_ckpt = osp.join(ROOT_DIR, 'checkpoints', 'dpvo.pth') | |
self.buffer = buffer | |
self.times = [] | |
self.slam = None | |
self.queue = Queue(maxsize=8) | |
self.reader = Process(target=video_stream, args=(self.queue, video, calib, stride, skip)) | |
self.reader.start() | |
def estimate_intrinsics(self, width, height, calib): | |
focal_length = (height ** 2 + width ** 2) ** 0.5 | |
center_x = width / 2 | |
center_y = height / 2 | |
with open(calib, 'w') as fopen: | |
line = f'{focal_length} {focal_length} {center_x} {center_y}' | |
fopen.write(line) | |
def track(self, ): | |
(t, image, intrinsics) = self.queue.get() | |
if t < 0: return | |
image = torch.from_numpy(image).permute(2,0,1).cuda() | |
intrinsics = torch.from_numpy(intrinsics).cuda() | |
if self.slam is None: | |
cfg.merge_from_file(self.dpvo_cfg) | |
cfg.BUFFER_SIZE = self.buffer | |
self.slam = DPVO(cfg, self.dpvo_ckpt, ht=image.shape[1], wd=image.shape[2], viz=False) | |
with Timer("SLAM", enabled=False): | |
t = time.time() | |
self.slam(t, image, intrinsics) | |
self.times.append(time.time() - t) | |
def process(self, ): | |
for _ in range(12): | |
self.slam.update() | |
self.reader.join() | |
return self.slam.terminate()[0] |