Spaces:
Running
Running
# Copyright 2024 The HuggingFace Inc. team. All rights reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
"""Contains logic to instantiate a robot, read information from its motors and cameras, | |
and send orders to its motors. | |
""" | |
# TODO(rcadene, aliberts): reorganize the codebase into one file per robot, with the associated | |
# calibration procedure, to make it easy for people to add their own robot. | |
import json | |
import logging | |
import time | |
import warnings | |
from pathlib import Path | |
import numpy as np | |
import torch | |
from lerobot.common.robot_devices.cameras.utils import make_cameras_from_configs | |
from lerobot.common.robot_devices.motors.utils import MotorsBus, make_motors_buses_from_configs | |
from lerobot.common.robot_devices.robots.configs import ManipulatorRobotConfig | |
from lerobot.common.robot_devices.robots.utils import get_arm_id | |
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError | |
def ensure_safe_goal_position( | |
goal_pos: torch.Tensor, present_pos: torch.Tensor, max_relative_target: float | list[float] | |
): | |
# Cap relative action target magnitude for safety. | |
diff = goal_pos - present_pos | |
max_relative_target = torch.tensor(max_relative_target) | |
safe_diff = torch.minimum(diff, max_relative_target) | |
safe_diff = torch.maximum(safe_diff, -max_relative_target) | |
safe_goal_pos = present_pos + safe_diff | |
if not torch.allclose(goal_pos, safe_goal_pos): | |
logging.warning( | |
"Relative goal position magnitude had to be clamped to be safe.\n" | |
f" requested relative goal position target: {diff}\n" | |
f" clamped relative goal position target: {safe_diff}" | |
) | |
return safe_goal_pos | |
class ManipulatorRobot: | |
# TODO(rcadene): Implement force feedback | |
"""This class allows to control any manipulator robot of various number of motors. | |
Non exhaustive list of robots: | |
- [Koch v1.0](https://github.com/AlexanderKoch-Koch/low_cost_robot), with and without the wrist-to-elbow expansion, developed | |
by Alexander Koch from [Tau Robotics](https://tau-robotics.com) | |
- [Koch v1.1](https://github.com/jess-moss/koch-v1-1) developed by Jess Moss | |
- [Aloha](https://www.trossenrobotics.com/aloha-kits) developed by Trossen Robotics | |
Example of instantiation, a pre-defined robot config is required: | |
```python | |
robot = ManipulatorRobot(KochRobotConfig()) | |
``` | |
Example of overwriting motors during instantiation: | |
```python | |
# Defines how to communicate with the motors of the leader and follower arms | |
leader_arms = { | |
"main": DynamixelMotorsBusConfig( | |
port="/dev/tty.usbmodem575E0031751", | |
motors={ | |
# name: (index, model) | |
"shoulder_pan": (1, "xl330-m077"), | |
"shoulder_lift": (2, "xl330-m077"), | |
"elbow_flex": (3, "xl330-m077"), | |
"wrist_flex": (4, "xl330-m077"), | |
"wrist_roll": (5, "xl330-m077"), | |
"gripper": (6, "xl330-m077"), | |
}, | |
), | |
} | |
follower_arms = { | |
"main": DynamixelMotorsBusConfig( | |
port="/dev/tty.usbmodem575E0032081", | |
motors={ | |
# name: (index, model) | |
"shoulder_pan": (1, "xl430-w250"), | |
"shoulder_lift": (2, "xl430-w250"), | |
"elbow_flex": (3, "xl330-m288"), | |
"wrist_flex": (4, "xl330-m288"), | |
"wrist_roll": (5, "xl330-m288"), | |
"gripper": (6, "xl330-m288"), | |
}, | |
), | |
} | |
robot_config = KochRobotConfig(leader_arms=leader_arms, follower_arms=follower_arms) | |
robot = ManipulatorRobot(robot_config) | |
``` | |
Example of overwriting cameras during instantiation: | |
```python | |
# Defines how to communicate with 2 cameras connected to the computer. | |
# Here, the webcam of the laptop and the phone (connected in USB to the laptop) | |
# can be reached respectively using the camera indices 0 and 1. These indices can be | |
# arbitrary. See the documentation of `OpenCVCamera` to find your own camera indices. | |
cameras = { | |
"laptop": OpenCVCamera(camera_index=0, fps=30, width=640, height=480), | |
"phone": OpenCVCamera(camera_index=1, fps=30, width=640, height=480), | |
} | |
robot = ManipulatorRobot(KochRobotConfig(cameras=cameras)) | |
``` | |
Once the robot is instantiated, connect motors buses and cameras if any (Required): | |
```python | |
robot.connect() | |
``` | |
Example of highest frequency teleoperation, which doesn't require cameras: | |
```python | |
while True: | |
robot.teleop_step() | |
``` | |
Example of highest frequency data collection from motors and cameras (if any): | |
```python | |
while True: | |
observation, action = robot.teleop_step(record_data=True) | |
``` | |
Example of controlling the robot with a policy: | |
```python | |
while True: | |
# Uses the follower arms and cameras to capture an observation | |
observation = robot.capture_observation() | |
# Assumes a policy has been instantiated | |
with torch.inference_mode(): | |
action = policy.select_action(observation) | |
# Orders the robot to move | |
robot.send_action(action) | |
``` | |
Example of disconnecting which is not mandatory since we disconnect when the object is deleted: | |
```python | |
robot.disconnect() | |
``` | |
""" | |
def __init__( | |
self, | |
config: ManipulatorRobotConfig, | |
): | |
self.config = config | |
self.robot_type = self.config.type | |
self.calibration_dir = Path(self.config.calibration_dir) | |
self.leader_arms = make_motors_buses_from_configs(self.config.leader_arms) | |
self.follower_arms = make_motors_buses_from_configs(self.config.follower_arms) | |
self.cameras = make_cameras_from_configs(self.config.cameras) | |
self.is_connected = False | |
self.logs = {} | |
def get_motor_names(self, arm: dict[str, MotorsBus]) -> list: | |
return [f"{arm}_{motor}" for arm, bus in arm.items() for motor in bus.motors] | |
def camera_features(self) -> dict: | |
cam_ft = {} | |
for cam_key, cam in self.cameras.items(): | |
key = f"observation.images.{cam_key}" | |
cam_ft[key] = { | |
"shape": (cam.height, cam.width, cam.channels), | |
"names": ["height", "width", "channels"], | |
"info": None, | |
} | |
return cam_ft | |
def motor_features(self) -> dict: | |
action_names = self.get_motor_names(self.leader_arms) | |
state_names = self.get_motor_names(self.leader_arms) | |
return { | |
"action": { | |
"dtype": "float32", | |
"shape": (len(action_names),), | |
"names": action_names, | |
}, | |
"observation.state": { | |
"dtype": "float32", | |
"shape": (len(state_names),), | |
"names": state_names, | |
}, | |
} | |
def features(self): | |
return {**self.motor_features, **self.camera_features} | |
def has_camera(self): | |
return len(self.cameras) > 0 | |
def num_cameras(self): | |
return len(self.cameras) | |
def available_arms(self): | |
available_arms = [] | |
for name in self.follower_arms: | |
arm_id = get_arm_id(name, "follower") | |
available_arms.append(arm_id) | |
for name in self.leader_arms: | |
arm_id = get_arm_id(name, "leader") | |
available_arms.append(arm_id) | |
return available_arms | |
def connect(self): | |
if self.is_connected: | |
raise RobotDeviceAlreadyConnectedError( | |
"ManipulatorRobot is already connected. Do not run `robot.connect()` twice." | |
) | |
if not self.leader_arms and not self.follower_arms and not self.cameras: | |
raise ValueError( | |
"ManipulatorRobot doesn't have any device to connect. See example of usage in docstring of the class." | |
) | |
# Connect the arms | |
for name in self.follower_arms: | |
print(f"Connecting {name} follower arm.") | |
self.follower_arms[name].connect() | |
for name in self.leader_arms: | |
print(f"Connecting {name} leader arm.") | |
self.leader_arms[name].connect() | |
if self.robot_type in ["koch", "koch_bimanual", "aloha"]: | |
from lerobot.common.robot_devices.motors.dynamixel import TorqueMode | |
elif self.robot_type in ["so100", "moss", "lekiwi"]: | |
from lerobot.common.robot_devices.motors.feetech import TorqueMode | |
# We assume that at connection time, arms are in a rest position, and torque can | |
# be safely disabled to run calibration and/or set robot preset configurations. | |
for name in self.follower_arms: | |
self.follower_arms[name].write("Torque_Enable", TorqueMode.DISABLED.value) | |
for name in self.leader_arms: | |
self.leader_arms[name].write("Torque_Enable", TorqueMode.DISABLED.value) | |
self.activate_calibration() | |
# Set robot preset (e.g. torque in leader gripper for Koch v1.1) | |
if self.robot_type in ["koch", "koch_bimanual"]: | |
self.set_koch_robot_preset() | |
elif self.robot_type == "aloha": | |
self.set_aloha_robot_preset() | |
elif self.robot_type in ["so100", "moss", "lekiwi"]: | |
self.set_so100_robot_preset() | |
# Enable torque on all motors of the follower arms | |
for name in self.follower_arms: | |
print(f"Activating torque on {name} follower arm.") | |
self.follower_arms[name].write("Torque_Enable", 1) | |
if self.config.gripper_open_degree is not None: | |
if self.robot_type not in ["koch", "koch_bimanual"]: | |
raise NotImplementedError( | |
f"{self.robot_type} does not support position AND current control in the handle, which is require to set the gripper open." | |
) | |
# Set the leader arm in torque mode with the gripper motor set to an angle. This makes it possible | |
# to squeeze the gripper and have it spring back to an open position on its own. | |
for name in self.leader_arms: | |
self.leader_arms[name].write("Torque_Enable", 1, "gripper") | |
self.leader_arms[name].write("Goal_Position", self.config.gripper_open_degree, "gripper") | |
# Check both arms can be read | |
for name in self.follower_arms: | |
self.follower_arms[name].read("Present_Position") | |
for name in self.leader_arms: | |
self.leader_arms[name].read("Present_Position") | |
# Connect the cameras | |
for name in self.cameras: | |
self.cameras[name].connect() | |
self.is_connected = True | |
def activate_calibration(self): | |
"""After calibration all motors function in human interpretable ranges. | |
Rotations are expressed in degrees in nominal range of [-180, 180], | |
and linear motions (like gripper of Aloha) in nominal range of [0, 100]. | |
""" | |
def load_or_run_calibration_(name, arm, arm_type): | |
arm_id = get_arm_id(name, arm_type) | |
arm_calib_path = self.calibration_dir / f"{arm_id}.json" | |
if arm_calib_path.exists(): | |
with open(arm_calib_path) as f: | |
calibration = json.load(f) | |
else: | |
# TODO(rcadene): display a warning in __init__ if calibration file not available | |
print(f"Missing calibration file '{arm_calib_path}'") | |
if self.robot_type in ["koch", "koch_bimanual", "aloha"]: | |
from lerobot.common.robot_devices.robots.dynamixel_calibration import run_arm_calibration | |
calibration = run_arm_calibration(arm, self.robot_type, name, arm_type) | |
elif self.robot_type in ["so100", "moss", "lekiwi"]: | |
from lerobot.common.robot_devices.robots.feetech_calibration import ( | |
run_arm_manual_calibration, | |
) | |
calibration = run_arm_manual_calibration(arm, self.robot_type, name, arm_type) | |
print(f"Calibration is done! Saving calibration file '{arm_calib_path}'") | |
arm_calib_path.parent.mkdir(parents=True, exist_ok=True) | |
with open(arm_calib_path, "w") as f: | |
json.dump(calibration, f) | |
return calibration | |
for name, arm in self.follower_arms.items(): | |
calibration = load_or_run_calibration_(name, arm, "follower") | |
arm.set_calibration(calibration) | |
for name, arm in self.leader_arms.items(): | |
calibration = load_or_run_calibration_(name, arm, "leader") | |
arm.set_calibration(calibration) | |
def set_koch_robot_preset(self): | |
def set_operating_mode_(arm): | |
from lerobot.common.robot_devices.motors.dynamixel import TorqueMode | |
if (arm.read("Torque_Enable") != TorqueMode.DISABLED.value).any(): | |
raise ValueError("To run set robot preset, the torque must be disabled on all motors.") | |
# Use 'extended position mode' for all motors except gripper, because in joint mode the servos can't | |
# rotate more than 360 degrees (from 0 to 4095) And some mistake can happen while assembling the arm, | |
# you could end up with a servo with a position 0 or 4095 at a crucial point See [ | |
# https://emanual.robotis.com/docs/en/dxl/x/x_series/#operating-mode11] | |
all_motors_except_gripper = [name for name in arm.motor_names if name != "gripper"] | |
if len(all_motors_except_gripper) > 0: | |
# 4 corresponds to Extended Position on Koch motors | |
arm.write("Operating_Mode", 4, all_motors_except_gripper) | |
# Use 'position control current based' for gripper to be limited by the limit of the current. | |
# For the follower gripper, it means it can grasp an object without forcing too much even tho, | |
# it's goal position is a complete grasp (both gripper fingers are ordered to join and reach a touch). | |
# For the leader gripper, it means we can use it as a physical trigger, since we can force with our finger | |
# to make it move, and it will move back to its original target position when we release the force. | |
# 5 corresponds to Current Controlled Position on Koch gripper motors "xl330-m077, xl330-m288" | |
arm.write("Operating_Mode", 5, "gripper") | |
for name in self.follower_arms: | |
set_operating_mode_(self.follower_arms[name]) | |
# Set better PID values to close the gap between recorded states and actions | |
# TODO(rcadene): Implement an automatic procedure to set optimal PID values for each motor | |
self.follower_arms[name].write("Position_P_Gain", 1500, "elbow_flex") | |
self.follower_arms[name].write("Position_I_Gain", 0, "elbow_flex") | |
self.follower_arms[name].write("Position_D_Gain", 600, "elbow_flex") | |
if self.config.gripper_open_degree is not None: | |
for name in self.leader_arms: | |
set_operating_mode_(self.leader_arms[name]) | |
# Enable torque on the gripper of the leader arms, and move it to 45 degrees, | |
# so that we can use it as a trigger to close the gripper of the follower arms. | |
self.leader_arms[name].write("Torque_Enable", 1, "gripper") | |
self.leader_arms[name].write("Goal_Position", self.config.gripper_open_degree, "gripper") | |
def set_aloha_robot_preset(self): | |
def set_shadow_(arm): | |
# Set secondary/shadow ID for shoulder and elbow. These joints have two motors. | |
# As a result, if only one of them is required to move to a certain position, | |
# the other will follow. This is to avoid breaking the motors. | |
if "shoulder_shadow" in arm.motor_names: | |
shoulder_idx = arm.read("ID", "shoulder") | |
arm.write("Secondary_ID", shoulder_idx, "shoulder_shadow") | |
if "elbow_shadow" in arm.motor_names: | |
elbow_idx = arm.read("ID", "elbow") | |
arm.write("Secondary_ID", elbow_idx, "elbow_shadow") | |
for name in self.follower_arms: | |
set_shadow_(self.follower_arms[name]) | |
for name in self.leader_arms: | |
set_shadow_(self.leader_arms[name]) | |
for name in self.follower_arms: | |
# Set a velocity limit of 131 as advised by Trossen Robotics | |
self.follower_arms[name].write("Velocity_Limit", 131) | |
# Use 'extended position mode' for all motors except gripper, because in joint mode the servos can't | |
# rotate more than 360 degrees (from 0 to 4095) And some mistake can happen while assembling the arm, | |
# you could end up with a servo with a position 0 or 4095 at a crucial point See [ | |
# https://emanual.robotis.com/docs/en/dxl/x/x_series/#operating-mode11] | |
all_motors_except_gripper = [ | |
name for name in self.follower_arms[name].motor_names if name != "gripper" | |
] | |
if len(all_motors_except_gripper) > 0: | |
# 4 corresponds to Extended Position on Aloha motors | |
self.follower_arms[name].write("Operating_Mode", 4, all_motors_except_gripper) | |
# Use 'position control current based' for follower gripper to be limited by the limit of the current. | |
# It can grasp an object without forcing too much even tho, | |
# it's goal position is a complete grasp (both gripper fingers are ordered to join and reach a touch). | |
# 5 corresponds to Current Controlled Position on Aloha gripper follower "xm430-w350" | |
self.follower_arms[name].write("Operating_Mode", 5, "gripper") | |
# Note: We can't enable torque on the leader gripper since "xc430-w150" doesn't have | |
# a Current Controlled Position mode. | |
if self.config.gripper_open_degree is not None: | |
warnings.warn( | |
f"`gripper_open_degree` is set to {self.config.gripper_open_degree}, but None is expected for Aloha instead", | |
stacklevel=1, | |
) | |
def set_so100_robot_preset(self): | |
for name in self.follower_arms: | |
# Mode=0 for Position Control | |
self.follower_arms[name].write("Mode", 0) | |
# Set P_Coefficient to lower value to avoid shakiness (Default is 32) | |
self.follower_arms[name].write("P_Coefficient", 16) | |
# Set I_Coefficient and D_Coefficient to default value 0 and 32 | |
self.follower_arms[name].write("I_Coefficient", 0) | |
self.follower_arms[name].write("D_Coefficient", 32) | |
# Close the write lock so that Maximum_Acceleration gets written to EPROM address, | |
# which is mandatory for Maximum_Acceleration to take effect after rebooting. | |
self.follower_arms[name].write("Lock", 0) | |
# Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of | |
# the motors. Note: this configuration is not in the official STS3215 Memory Table | |
self.follower_arms[name].write("Maximum_Acceleration", 254) | |
self.follower_arms[name].write("Acceleration", 254) | |
def teleop_step( | |
self, record_data=False | |
) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]: | |
if not self.is_connected: | |
raise RobotDeviceNotConnectedError( | |
"ManipulatorRobot is not connected. You need to run `robot.connect()`." | |
) | |
# Prepare to assign the position of the leader to the follower | |
leader_pos = {} | |
for name in self.leader_arms: | |
before_lread_t = time.perf_counter() | |
leader_pos[name] = self.leader_arms[name].read("Present_Position") | |
leader_pos[name] = torch.from_numpy(leader_pos[name]) | |
self.logs[f"read_leader_{name}_pos_dt_s"] = time.perf_counter() - before_lread_t | |
# Send goal position to the follower | |
follower_goal_pos = {} | |
for name in self.follower_arms: | |
before_fwrite_t = time.perf_counter() | |
goal_pos = leader_pos[name] | |
# Cap goal position when too far away from present position. | |
# Slower fps expected due to reading from the follower. | |
if self.config.max_relative_target is not None: | |
present_pos = self.follower_arms[name].read("Present_Position") | |
present_pos = torch.from_numpy(present_pos) | |
goal_pos = ensure_safe_goal_position(goal_pos, present_pos, self.config.max_relative_target) | |
# Used when record_data=True | |
follower_goal_pos[name] = goal_pos | |
goal_pos = goal_pos.numpy().astype(np.float32) | |
self.follower_arms[name].write("Goal_Position", goal_pos) | |
self.logs[f"write_follower_{name}_goal_pos_dt_s"] = time.perf_counter() - before_fwrite_t | |
# Early exit when recording data is not requested | |
if not record_data: | |
return | |
# TODO(rcadene): Add velocity and other info | |
# Read follower position | |
follower_pos = {} | |
for name in self.follower_arms: | |
before_fread_t = time.perf_counter() | |
follower_pos[name] = self.follower_arms[name].read("Present_Position") | |
follower_pos[name] = torch.from_numpy(follower_pos[name]) | |
self.logs[f"read_follower_{name}_pos_dt_s"] = time.perf_counter() - before_fread_t | |
# Create state by concatenating follower current position | |
state = [] | |
for name in self.follower_arms: | |
if name in follower_pos: | |
state.append(follower_pos[name]) | |
state = torch.cat(state) | |
# Create action by concatenating follower goal position | |
action = [] | |
for name in self.follower_arms: | |
if name in follower_goal_pos: | |
action.append(follower_goal_pos[name]) | |
action = torch.cat(action) | |
# Capture images from cameras | |
images = {} | |
for name in self.cameras: | |
before_camread_t = time.perf_counter() | |
images[name] = self.cameras[name].async_read() | |
images[name] = torch.from_numpy(images[name]) | |
self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"] | |
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t | |
# Populate output dictionaries | |
obs_dict, action_dict = {}, {} | |
obs_dict["observation.state"] = state | |
action_dict["action"] = action | |
for name in self.cameras: | |
obs_dict[f"observation.images.{name}"] = images[name] | |
return obs_dict, action_dict | |
def capture_observation(self): | |
"""The returned observations do not have a batch dimension.""" | |
if not self.is_connected: | |
raise RobotDeviceNotConnectedError( | |
"ManipulatorRobot is not connected. You need to run `robot.connect()`." | |
) | |
# Read follower position | |
follower_pos = {} | |
for name in self.follower_arms: | |
before_fread_t = time.perf_counter() | |
follower_pos[name] = self.follower_arms[name].read("Present_Position") | |
follower_pos[name] = torch.from_numpy(follower_pos[name]) | |
self.logs[f"read_follower_{name}_pos_dt_s"] = time.perf_counter() - before_fread_t | |
# Create state by concatenating follower current position | |
state = [] | |
for name in self.follower_arms: | |
if name in follower_pos: | |
state.append(follower_pos[name]) | |
state = torch.cat(state) | |
# Capture images from cameras | |
images = {} | |
for name in self.cameras: | |
before_camread_t = time.perf_counter() | |
images[name] = self.cameras[name].async_read() | |
images[name] = torch.from_numpy(images[name]) | |
self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"] | |
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t | |
# Populate output dictionaries and format to pytorch | |
obs_dict = {} | |
obs_dict["observation.state"] = state | |
for name in self.cameras: | |
obs_dict[f"observation.images.{name}"] = images[name] | |
return obs_dict | |
def send_action(self, action: torch.Tensor) -> torch.Tensor: | |
"""Command the follower arms to move to a target joint configuration. | |
The relative action magnitude may be clipped depending on the configuration parameter | |
`max_relative_target`. In this case, the action sent differs from original action. | |
Thus, this function always returns the action actually sent. | |
Args: | |
action: tensor containing the concatenated goal positions for the follower arms. | |
""" | |
if not self.is_connected: | |
raise RobotDeviceNotConnectedError( | |
"ManipulatorRobot is not connected. You need to run `robot.connect()`." | |
) | |
from_idx = 0 | |
to_idx = 0 | |
action_sent = [] | |
for name in self.follower_arms: | |
# Get goal position of each follower arm by splitting the action vector | |
to_idx += len(self.follower_arms[name].motor_names) | |
goal_pos = action[from_idx:to_idx] | |
from_idx = to_idx | |
# Cap goal position when too far away from present position. | |
# Slower fps expected due to reading from the follower. | |
if self.config.max_relative_target is not None: | |
present_pos = self.follower_arms[name].read("Present_Position") | |
present_pos = torch.from_numpy(present_pos) | |
goal_pos = ensure_safe_goal_position(goal_pos, present_pos, self.config.max_relative_target) | |
# Save tensor to concat and return | |
action_sent.append(goal_pos) | |
# Send goal position to each follower | |
goal_pos = goal_pos.numpy().astype(np.float32) | |
self.follower_arms[name].write("Goal_Position", goal_pos) | |
return torch.cat(action_sent) | |
def print_logs(self): | |
pass | |
# TODO(aliberts): move robot-specific logs logic here | |
def disconnect(self): | |
if not self.is_connected: | |
raise RobotDeviceNotConnectedError( | |
"ManipulatorRobot is not connected. You need to run `robot.connect()` before disconnecting." | |
) | |
for name in self.follower_arms: | |
self.follower_arms[name].disconnect() | |
for name in self.leader_arms: | |
self.leader_arms[name].disconnect() | |
for name in self.cameras: | |
self.cameras[name].disconnect() | |
self.is_connected = False | |
def __del__(self): | |
if getattr(self, "is_connected", False): | |
self.disconnect() | |