Spaces:
Running
Running
# Copyright 2024 The HuggingFace Inc. team. All rights reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
""" | |
This file contains utilities for recording frames from Intel Realsense cameras. | |
""" | |
import argparse | |
import concurrent.futures | |
import logging | |
import math | |
import shutil | |
import threading | |
import time | |
import traceback | |
from collections import Counter | |
from pathlib import Path | |
from threading import Thread | |
import numpy as np | |
from PIL import Image | |
from lerobot.common.robot_devices.cameras.configs import IntelRealSenseCameraConfig | |
from lerobot.common.robot_devices.utils import ( | |
RobotDeviceAlreadyConnectedError, | |
RobotDeviceNotConnectedError, | |
busy_wait, | |
) | |
from lerobot.common.utils.utils import capture_timestamp_utc | |
SERIAL_NUMBER_INDEX = 1 | |
def find_cameras(raise_when_empty=True, mock=False) -> list[dict]: | |
""" | |
Find the names and the serial numbers of the Intel RealSense cameras | |
connected to the computer. | |
""" | |
if mock: | |
import lerobot.common.mocks.cameras.mock_pyrealsense2 as rs | |
else: | |
import pyrealsense2 as rs | |
cameras = [] | |
for device in rs.context().query_devices(): | |
serial_number = int(device.get_info(rs.camera_info(SERIAL_NUMBER_INDEX))) | |
name = device.get_info(rs.camera_info.name) | |
cameras.append( | |
{ | |
"serial_number": serial_number, | |
"name": name, | |
} | |
) | |
if raise_when_empty and len(cameras) == 0: | |
raise OSError( | |
"Not a single camera was detected. Try re-plugging, or re-installing `librealsense` and its python wrapper `pyrealsense2`, or updating the firmware." | |
) | |
return cameras | |
def save_image(img_array, serial_number, frame_index, images_dir): | |
try: | |
img = Image.fromarray(img_array) | |
path = images_dir / f"camera_{serial_number}_frame_{frame_index:06d}.png" | |
path.parent.mkdir(parents=True, exist_ok=True) | |
img.save(str(path), quality=100) | |
logging.info(f"Saved image: {path}") | |
except Exception as e: | |
logging.error(f"Failed to save image for camera {serial_number} frame {frame_index}: {e}") | |
def save_images_from_cameras( | |
images_dir: Path, | |
serial_numbers: list[int] | None = None, | |
fps=None, | |
width=None, | |
height=None, | |
record_time_s=2, | |
mock=False, | |
): | |
""" | |
Initializes all the cameras and saves images to the directory. Useful to visually identify the camera | |
associated to a given serial number. | |
""" | |
if serial_numbers is None or len(serial_numbers) == 0: | |
camera_infos = find_cameras(mock=mock) | |
serial_numbers = [cam["serial_number"] for cam in camera_infos] | |
if mock: | |
import lerobot.common.mocks.cameras.mock_cv2 as cv2 | |
else: | |
import cv2 | |
print("Connecting cameras") | |
cameras = [] | |
for cam_sn in serial_numbers: | |
print(f"{cam_sn=}") | |
config = IntelRealSenseCameraConfig( | |
serial_number=cam_sn, fps=fps, width=width, height=height, mock=mock | |
) | |
camera = IntelRealSenseCamera(config) | |
camera.connect() | |
print( | |
f"IntelRealSenseCamera({camera.serial_number}, fps={camera.fps}, width={camera.capture_width}, height={camera.capture_height}, color_mode={camera.color_mode})" | |
) | |
cameras.append(camera) | |
images_dir = Path(images_dir) | |
if images_dir.exists(): | |
shutil.rmtree( | |
images_dir, | |
) | |
images_dir.mkdir(parents=True, exist_ok=True) | |
print(f"Saving images to {images_dir}") | |
frame_index = 0 | |
start_time = time.perf_counter() | |
try: | |
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: | |
while True: | |
now = time.perf_counter() | |
for camera in cameras: | |
# If we use async_read when fps is None, the loop will go full speed, and we will end up | |
# saving the same images from the cameras multiple times until the RAM/disk is full. | |
image = camera.read() if fps is None else camera.async_read() | |
if image is None: | |
print("No Frame") | |
bgr_converted_image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) | |
executor.submit( | |
save_image, | |
bgr_converted_image, | |
camera.serial_number, | |
frame_index, | |
images_dir, | |
) | |
if fps is not None: | |
dt_s = time.perf_counter() - now | |
busy_wait(1 / fps - dt_s) | |
if time.perf_counter() - start_time > record_time_s: | |
break | |
print(f"Frame: {frame_index:04d}\tLatency (ms): {(time.perf_counter() - now) * 1000:.2f}") | |
frame_index += 1 | |
finally: | |
print(f"Images have been saved to {images_dir}") | |
for camera in cameras: | |
camera.disconnect() | |
class IntelRealSenseCamera: | |
""" | |
The IntelRealSenseCamera class is similar to OpenCVCamera class but adds additional features for Intel Real Sense cameras: | |
- is instantiated with the serial number of the camera - won't randomly change as it can be the case of OpenCVCamera for Linux, | |
- can also be instantiated with the camera's name — if it's unique — using IntelRealSenseCamera.init_from_name(), | |
- depth map can be returned. | |
To find the camera indices of your cameras, you can run our utility script that will save a few frames for each camera: | |
```bash | |
python lerobot/common/robot_devices/cameras/intelrealsense.py --images-dir outputs/images_from_intelrealsense_cameras | |
``` | |
When an IntelRealSenseCamera is instantiated, if no specific config is provided, the default fps, width, height and color_mode | |
of the given camera will be used. | |
Example of instantiating with a serial number: | |
```python | |
from lerobot.common.robot_devices.cameras.configs import IntelRealSenseCameraConfig | |
config = IntelRealSenseCameraConfig(serial_number=128422271347) | |
camera = IntelRealSenseCamera(config) | |
camera.connect() | |
color_image = camera.read() | |
# when done using the camera, consider disconnecting | |
camera.disconnect() | |
``` | |
Example of instantiating with a name if it's unique: | |
``` | |
config = IntelRealSenseCameraConfig(name="Intel RealSense D405") | |
``` | |
Example of changing default fps, width, height and color_mode: | |
```python | |
config = IntelRealSenseCameraConfig(serial_number=128422271347, fps=30, width=1280, height=720) | |
config = IntelRealSenseCameraConfig(serial_number=128422271347, fps=90, width=640, height=480) | |
config = IntelRealSenseCameraConfig(serial_number=128422271347, fps=90, width=640, height=480, color_mode="bgr") | |
# Note: might error out upon `camera.connect()` if these settings are not compatible with the camera | |
``` | |
Example of returning depth: | |
```python | |
config = IntelRealSenseCameraConfig(serial_number=128422271347, use_depth=True) | |
camera = IntelRealSenseCamera(config) | |
camera.connect() | |
color_image, depth_map = camera.read() | |
``` | |
""" | |
def __init__( | |
self, | |
config: IntelRealSenseCameraConfig, | |
): | |
self.config = config | |
if config.name is not None: | |
self.serial_number = self.find_serial_number_from_name(config.name) | |
else: | |
self.serial_number = config.serial_number | |
# Store the raw (capture) resolution from the config. | |
self.capture_width = config.width | |
self.capture_height = config.height | |
# If rotated by ±90, swap width and height. | |
if config.rotation in [-90, 90]: | |
self.width = config.height | |
self.height = config.width | |
else: | |
self.width = config.width | |
self.height = config.height | |
self.fps = config.fps | |
self.channels = config.channels | |
self.color_mode = config.color_mode | |
self.use_depth = config.use_depth | |
self.force_hardware_reset = config.force_hardware_reset | |
self.mock = config.mock | |
self.camera = None | |
self.is_connected = False | |
self.thread = None | |
self.stop_event = None | |
self.color_image = None | |
self.depth_map = None | |
self.logs = {} | |
if self.mock: | |
import lerobot.common.mocks.cameras.mock_cv2 as cv2 | |
else: | |
import cv2 | |
self.rotation = None | |
if config.rotation == -90: | |
self.rotation = cv2.ROTATE_90_COUNTERCLOCKWISE | |
elif config.rotation == 90: | |
self.rotation = cv2.ROTATE_90_CLOCKWISE | |
elif config.rotation == 180: | |
self.rotation = cv2.ROTATE_180 | |
def find_serial_number_from_name(self, name): | |
camera_infos = find_cameras() | |
camera_names = [cam["name"] for cam in camera_infos] | |
this_name_count = Counter(camera_names)[name] | |
if this_name_count > 1: | |
# TODO(aliberts): Test this with multiple identical cameras (Aloha) | |
raise ValueError( | |
f"Multiple {name} cameras have been detected. Please use their serial number to instantiate them." | |
) | |
name_to_serial_dict = {cam["name"]: cam["serial_number"] for cam in camera_infos} | |
cam_sn = name_to_serial_dict[name] | |
return cam_sn | |
def connect(self): | |
if self.is_connected: | |
raise RobotDeviceAlreadyConnectedError( | |
f"IntelRealSenseCamera({self.serial_number}) is already connected." | |
) | |
if self.mock: | |
import lerobot.common.mocks.cameras.mock_pyrealsense2 as rs | |
else: | |
import pyrealsense2 as rs | |
config = rs.config() | |
config.enable_device(str(self.serial_number)) | |
if self.fps and self.capture_width and self.capture_height: | |
# TODO(rcadene): can we set rgb8 directly? | |
config.enable_stream( | |
rs.stream.color, self.capture_width, self.capture_height, rs.format.rgb8, self.fps | |
) | |
else: | |
config.enable_stream(rs.stream.color) | |
if self.use_depth: | |
if self.fps and self.capture_width and self.capture_height: | |
config.enable_stream( | |
rs.stream.depth, self.capture_width, self.capture_height, rs.format.z16, self.fps | |
) | |
else: | |
config.enable_stream(rs.stream.depth) | |
self.camera = rs.pipeline() | |
try: | |
profile = self.camera.start(config) | |
is_camera_open = True | |
except RuntimeError: | |
is_camera_open = False | |
traceback.print_exc() | |
# If the camera doesn't work, display the camera indices corresponding to | |
# valid cameras. | |
if not is_camera_open: | |
# Verify that the provided `serial_number` is valid before printing the traceback | |
camera_infos = find_cameras() | |
serial_numbers = [cam["serial_number"] for cam in camera_infos] | |
if self.serial_number not in serial_numbers: | |
raise ValueError( | |
f"`serial_number` is expected to be one of these available cameras {serial_numbers}, but {self.serial_number} is provided instead. " | |
"To find the serial number you should use, run `python lerobot/common/robot_devices/cameras/intelrealsense.py`." | |
) | |
raise OSError(f"Can't access IntelRealSenseCamera({self.serial_number}).") | |
color_stream = profile.get_stream(rs.stream.color) | |
color_profile = color_stream.as_video_stream_profile() | |
actual_fps = color_profile.fps() | |
actual_width = color_profile.width() | |
actual_height = color_profile.height() | |
# Using `math.isclose` since actual fps can be a float (e.g. 29.9 instead of 30) | |
if self.fps is not None and not math.isclose(self.fps, actual_fps, rel_tol=1e-3): | |
# Using `OSError` since it's a broad that encompasses issues related to device communication | |
raise OSError( | |
f"Can't set {self.fps=} for IntelRealSenseCamera({self.serial_number}). Actual value is {actual_fps}." | |
) | |
if self.capture_width is not None and self.capture_width != actual_width: | |
raise OSError( | |
f"Can't set {self.capture_width=} for IntelRealSenseCamera({self.serial_number}). Actual value is {actual_width}." | |
) | |
if self.capture_height is not None and self.capture_height != actual_height: | |
raise OSError( | |
f"Can't set {self.capture_height=} for IntelRealSenseCamera({self.serial_number}). Actual value is {actual_height}." | |
) | |
self.fps = round(actual_fps) | |
self.capture_width = round(actual_width) | |
self.capture_height = round(actual_height) | |
self.is_connected = True | |
def read(self, temporary_color: str | None = None) -> np.ndarray | tuple[np.ndarray, np.ndarray]: | |
"""Read a frame from the camera returned in the format height x width x channels (e.g. 480 x 640 x 3) | |
of type `np.uint8`, contrarily to the pytorch format which is float channel first. | |
When `use_depth=True`, returns a tuple `(color_image, depth_map)` with a depth map in the format | |
height x width (e.g. 480 x 640) of type np.uint16. | |
Note: Reading a frame is done every `camera.fps` times per second, and it is blocking. | |
If you are reading data from other sensors, we advise to use `camera.async_read()` which is non blocking version of `camera.read()`. | |
""" | |
if not self.is_connected: | |
raise RobotDeviceNotConnectedError( | |
f"IntelRealSenseCamera({self.serial_number}) is not connected. Try running `camera.connect()` first." | |
) | |
if self.mock: | |
import lerobot.common.mocks.cameras.mock_cv2 as cv2 | |
else: | |
import cv2 | |
start_time = time.perf_counter() | |
frame = self.camera.wait_for_frames(timeout_ms=5000) | |
color_frame = frame.get_color_frame() | |
if not color_frame: | |
raise OSError(f"Can't capture color image from IntelRealSenseCamera({self.serial_number}).") | |
color_image = np.asanyarray(color_frame.get_data()) | |
requested_color_mode = self.color_mode if temporary_color is None else temporary_color | |
if requested_color_mode not in ["rgb", "bgr"]: | |
raise ValueError( | |
f"Expected color values are 'rgb' or 'bgr', but {requested_color_mode} is provided." | |
) | |
# IntelRealSense uses RGB format as default (red, green, blue). | |
if requested_color_mode == "bgr": | |
color_image = cv2.cvtColor(color_image, cv2.COLOR_RGB2BGR) | |
h, w, _ = color_image.shape | |
if h != self.capture_height or w != self.capture_width: | |
raise OSError( | |
f"Can't capture color image with expected height and width ({self.height} x {self.width}). ({h} x {w}) returned instead." | |
) | |
if self.rotation is not None: | |
color_image = cv2.rotate(color_image, self.rotation) | |
# log the number of seconds it took to read the image | |
self.logs["delta_timestamp_s"] = time.perf_counter() - start_time | |
# log the utc time at which the image was received | |
self.logs["timestamp_utc"] = capture_timestamp_utc() | |
if self.use_depth: | |
depth_frame = frame.get_depth_frame() | |
if not depth_frame: | |
raise OSError(f"Can't capture depth image from IntelRealSenseCamera({self.serial_number}).") | |
depth_map = np.asanyarray(depth_frame.get_data()) | |
h, w = depth_map.shape | |
if h != self.capture_height or w != self.capture_width: | |
raise OSError( | |
f"Can't capture depth map with expected height and width ({self.height} x {self.width}). ({h} x {w}) returned instead." | |
) | |
if self.rotation is not None: | |
depth_map = cv2.rotate(depth_map, self.rotation) | |
return color_image, depth_map | |
else: | |
return color_image | |
def read_loop(self): | |
while not self.stop_event.is_set(): | |
if self.use_depth: | |
self.color_image, self.depth_map = self.read() | |
else: | |
self.color_image = self.read() | |
def async_read(self): | |
"""Access the latest color image""" | |
if not self.is_connected: | |
raise RobotDeviceNotConnectedError( | |
f"IntelRealSenseCamera({self.serial_number}) is not connected. Try running `camera.connect()` first." | |
) | |
if self.thread is None: | |
self.stop_event = threading.Event() | |
self.thread = Thread(target=self.read_loop, args=()) | |
self.thread.daemon = True | |
self.thread.start() | |
num_tries = 0 | |
while self.color_image is None: | |
# TODO(rcadene, aliberts): intelrealsense has diverged compared to opencv over here | |
num_tries += 1 | |
time.sleep(1 / self.fps) | |
if num_tries > self.fps and (self.thread.ident is None or not self.thread.is_alive()): | |
raise Exception( | |
"The thread responsible for `self.async_read()` took too much time to start. There might be an issue. Verify that `self.thread.start()` has been called." | |
) | |
if self.use_depth: | |
return self.color_image, self.depth_map | |
else: | |
return self.color_image | |
def disconnect(self): | |
if not self.is_connected: | |
raise RobotDeviceNotConnectedError( | |
f"IntelRealSenseCamera({self.serial_number}) is not connected. Try running `camera.connect()` first." | |
) | |
if self.thread is not None and self.thread.is_alive(): | |
# wait for the thread to finish | |
self.stop_event.set() | |
self.thread.join() | |
self.thread = None | |
self.stop_event = None | |
self.camera.stop() | |
self.camera = None | |
self.is_connected = False | |
def __del__(self): | |
if getattr(self, "is_connected", False): | |
self.disconnect() | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser( | |
description="Save a few frames using `IntelRealSenseCamera` for all cameras connected to the computer, or a selected subset." | |
) | |
parser.add_argument( | |
"--serial-numbers", | |
type=int, | |
nargs="*", | |
default=None, | |
help="List of serial numbers used to instantiate the `IntelRealSenseCamera`. If not provided, find and use all available camera indices.", | |
) | |
parser.add_argument( | |
"--fps", | |
type=int, | |
default=30, | |
help="Set the number of frames recorded per seconds for all cameras. If not provided, use the default fps of each camera.", | |
) | |
parser.add_argument( | |
"--width", | |
type=str, | |
default=640, | |
help="Set the width for all cameras. If not provided, use the default width of each camera.", | |
) | |
parser.add_argument( | |
"--height", | |
type=str, | |
default=480, | |
help="Set the height for all cameras. If not provided, use the default height of each camera.", | |
) | |
parser.add_argument( | |
"--images-dir", | |
type=Path, | |
default="outputs/images_from_intelrealsense_cameras", | |
help="Set directory to save a few frames for each camera.", | |
) | |
parser.add_argument( | |
"--record-time-s", | |
type=float, | |
default=2.0, | |
help="Set the number of seconds used to record the frames. By default, 2 seconds.", | |
) | |
args = parser.parse_args() | |
save_images_from_cameras(**vars(args)) | |