from typing import List, Optional, Union, Dict, Callable import numbers import time from multiprocess.managers import SharedMemoryManager import numpy as np import pyrealsense2 as rs from .single_realsense import SingleRealsense class MultiRealsense: def __init__(self, serial_numbers: Optional[List[str]]=None, shm_manager: Optional[SharedMemoryManager]=None, resolution=(1280,720), capture_fps=30, put_fps=None, put_downsample=True, enable_color=True, enable_depth=False, process_depth=False, enable_infrared=False, get_max_k=30, advanced_mode_config: Optional[Union[dict, List[dict]]]=None, transform: Optional[Union[Callable[[Dict], Dict], List[Callable]]]=None, vis_transform: Optional[Union[Callable[[Dict], Dict], List[Callable]]]=None, verbose=False ): if shm_manager is None: shm_manager = SharedMemoryManager() shm_manager.start() if serial_numbers is None: serial_numbers = SingleRealsense.get_connected_devices_serial() n_cameras = len(serial_numbers) advanced_mode_config = repeat_to_list( advanced_mode_config, n_cameras, dict) transform = repeat_to_list( transform, n_cameras, Callable) vis_transform = repeat_to_list( vis_transform, n_cameras, Callable) cameras = dict() for i, serial in enumerate(serial_numbers): cameras[serial] = SingleRealsense( shm_manager=shm_manager, serial_number=serial, resolution=resolution, capture_fps=capture_fps, put_fps=put_fps, put_downsample=put_downsample, enable_color=enable_color, enable_depth=enable_depth, process_depth=process_depth, enable_infrared=enable_infrared, get_max_k=get_max_k, advanced_mode_config=advanced_mode_config[i], transform=transform[i], vis_transform=vis_transform[i], is_master=(i == 0), verbose=verbose ) self.cameras = cameras self.serial_numbers = serial_numbers self.shm_manager = shm_manager self.resolution = resolution self.capture_fps = capture_fps def __enter__(self): self.start() return self def __exit__(self, exc_type, exc_val, exc_tb): self.stop() @property def n_cameras(self): return len(self.cameras) @property def is_ready(self): is_ready = True for camera in self.cameras.values(): if not camera.is_ready: is_ready = False return is_ready def start(self, wait=True, put_start_time=None): if put_start_time is None: put_start_time = time.time() for camera in self.cameras.values(): camera.start(wait=False, put_start_time=put_start_time) if wait: self.start_wait() def stop(self, wait=True): for camera in self.cameras.values(): camera.stop(wait=False) if wait: self.stop_wait() def start_wait(self): for camera in self.cameras.values(): print('processing camera {}'.format(camera.serial_number)) camera.start_wait() def stop_wait(self): for camera in self.cameras.values(): camera.join() def get(self, k=None, index=None, out=None) -> Dict[int, Dict[str, np.ndarray]]: """ Return order T,H,W,C { 0: { 'rgb': (T,H,W,C), 'timestamp': (T,) }, 1: ... } """ if index is not None: this_out = None this_out = self.cameras[self.serial_numbers[index]].get(k=k, out=this_out) return this_out if out is None: out = dict() for i, camera in enumerate(self.cameras.values()): this_out = None if i in out: this_out = out[i] this_out = camera.get(k=k, out=this_out) out[i] = this_out return out def set_color_option(self, option, value): n_camera = len(self.cameras) value = repeat_to_list(value, n_camera, numbers.Number) for i, camera in enumerate(self.cameras.values()): camera.set_color_option(option, value[i]) def set_exposure(self, exposure=None, gain=None): """150nit. (0.1 ms, 1/10000s) gain: (0, 128) """ if exposure is None and gain is None: # auto exposure self.set_color_option(rs.option.enable_auto_exposure, 1.0) else: # manual exposure self.set_color_option(rs.option.enable_auto_exposure, 0.0) if exposure is not None: self.set_color_option(rs.option.exposure, exposure) if gain is not None: self.set_color_option(rs.option.gain, gain) def set_white_balance(self, white_balance=None): if white_balance is None: self.set_color_option(rs.option.enable_auto_white_balance, 1.0) else: self.set_color_option(rs.option.enable_auto_white_balance, 0.0) self.set_color_option(rs.option.white_balance, white_balance) def get_intrinsics(self): return np.array([c.get_intrinsics() for c in self.cameras.values()]) def get_depth_scale(self): return np.array([c.get_depth_scale() for c in self.cameras.values()]) def restart_put(self, start_time): for camera in self.cameras.values(): camera.restart_put(start_time) def repeat_to_list(x, n: int, cls): if x is None: x = [None] * n if isinstance(x, cls): x = [x] * n assert len(x) == n return x