Spaces:
Sleeping
Sleeping
File size: 2,360 Bytes
c87d1bc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
import cv2
import numpy as np
import glob
import os.path as osp
import os
import time
import torch
from pathlib import Path
from multiprocessing import Process, Queue
from dpvo.utils import Timer
from dpvo.dpvo import DPVO
from dpvo.config import cfg
from dpvo.stream import image_stream, video_stream
ROOT_DIR = osp.abspath(f"{__file__}/../../../../")
DPVO_DIR = osp.join(ROOT_DIR, "third-party/DPVO")
class SLAMModel(object):
def __init__(self, video, output_pth, width, height, calib=None, stride=1, skip=0, buffer=2048):
if calib == None or not osp.exists(calib):
calib = osp.join(output_pth, 'calib.txt')
if not osp.exists(calib):
self.estimate_intrinsics(width, height, calib)
self.dpvo_cfg = osp.join(DPVO_DIR, 'config/default.yaml')
self.dpvo_ckpt = osp.join(ROOT_DIR, 'checkpoints', 'dpvo.pth')
self.buffer = buffer
self.times = []
self.slam = None
self.queue = Queue(maxsize=8)
self.reader = Process(target=video_stream, args=(self.queue, video, calib, stride, skip))
self.reader.start()
def estimate_intrinsics(self, width, height, calib):
focal_length = (height ** 2 + width ** 2) ** 0.5
center_x = width / 2
center_y = height / 2
with open(calib, 'w') as fopen:
line = f'{focal_length} {focal_length} {center_x} {center_y}'
fopen.write(line)
def track(self, ):
(t, image, intrinsics) = self.queue.get()
if t < 0: return
image = torch.from_numpy(image).permute(2,0,1).cuda()
intrinsics = torch.from_numpy(intrinsics).cuda()
if self.slam is None:
cfg.merge_from_file(self.dpvo_cfg)
cfg.BUFFER_SIZE = self.buffer
self.slam = DPVO(cfg, self.dpvo_ckpt, ht=image.shape[1], wd=image.shape[2], viz=False)
with Timer("SLAM", enabled=False):
t = time.time()
self.slam(t, image, intrinsics)
self.times.append(time.time() - t)
def process(self, ):
for _ in range(12):
self.slam.update()
self.reader.join()
return self.slam.terminate()[0] |