From 29a5a1fc38f3eb1e4d77f4303225e02ae342461f Mon Sep 17 00:00:00 2001 From: mark Date: Fri, 27 Feb 2026 17:29:38 +0000 Subject: [PATCH] feat: add leader arm support --- example-agilex-dictionary.txt | 3 + examples/X_leader_arm_teleop_agilex.py | 270 +++++++++++++++++++++++ examples/common/data_manager.py | 48 ++++ examples/common/leader_arm.py | 139 ++++++++++++ examples/common/threads/leader_reader.py | 47 ++++ 5 files changed, 507 insertions(+) create mode 100644 examples/X_leader_arm_teleop_agilex.py create mode 100644 examples/common/leader_arm.py create mode 100644 examples/common/threads/leader_reader.py diff --git a/example-agilex-dictionary.txt b/example-agilex-dictionary.txt index 25fbae6..718fffc 100644 --- a/example-agilex-dictionary.txt +++ b/example-agilex-dictionary.txt @@ -11,6 +11,9 @@ fromarray hyperparameters imshow Levenberg +lerobot +Lerobot +LEROBOT linalg Marquardt mathbb diff --git a/examples/X_leader_arm_teleop_agilex.py b/examples/X_leader_arm_teleop_agilex.py new file mode 100644 index 0000000..c4c1e9c --- /dev/null +++ b/examples/X_leader_arm_teleop_agilex.py @@ -0,0 +1,270 @@ +#!/usr/bin/env python3 +"""Piper (Agilex) teleop with LeRobot S0100 leader arm โ€“ direct joint mapping. + +Leader arm drives the Piper URDF in the GUI; optionally drive the real robot +with --real-robot. The leader arm class is configured with Piper follower +limits, offsets, and directions and outputs mapped joint angles + gripper. + +Requirements: +- Leader arm: calibrated S0100 (use same --leader-id as when calibrating). +- Conda: piper-teleop (or env with lerobot). + +Usage: + cd example_agilex/examples + python 0_leader_arm_teleop_agilex.py --leader-port /dev/ttyACM0 --leader-id my_awesome_leader_arm + python 0_leader_arm_teleop_agilex.py --real-robot --leader-port /dev/ttyACM0 --leader-id my_awesome_leader_arm + + Move the leader arm to drive the URDF (and robot if --real-robot). Ctrl+C to exit. +""" + +import argparse +import sys +import threading +import time +import traceback +from pathlib import Path + +import numpy as np + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from common.configs import NEUTRAL_JOINT_ANGLES, ROBOT_RATE, VISUALIZATION_RATE +from common.data_manager import DataManager, RobotActivityState +from common.leader_arm import LerobotSO100LeaderArm +from common.robot_visualizer import RobotVisualizer +from common.threads.leader_reader import leader_reader_thread + +# NOTE:Piper follower config for the leader arm (limits, offsets, directions, mapping). +# Leader 5 DOF + gripper -> Piper 6 DOF + gripper; Piper joint 4 is fixed at 0. + +PIPER_JOINT_LIMITS_DEG = np.array( + [ + (-150.0, 150.0), + (0.0, 180.0), + (-170.0, 0.0), + (-100.0, 100.0), + (-70.0, 70.0), + (-120.0, 120.0), + ], + dtype=np.float64, +) + +# NOTE: Measure leader zero vs Piper zero per joint (degrees). joint4 unused. +PIPER_OFFSETS_DEG = np.array( + [0.0, 90.0, -90.0, 0.0, 0.0, -45.0], + dtype=np.float64, +) + +PIPER_DIRECTIONS = np.array( + [-1.0, 1.0, 1.0, 1.0, 1.0, -1.0], + dtype=np.float64, +) + +# Leader joint index -> Piper joint index. Piper joint 4 (index 3) has no leader. +LEADER_TO_PIPER_JOINT = {0: 0, 1: 1, 2: 2, 3: 4, 4: 5} +PIPER_FIXED_JOINTS = {3: 0.0} + + +def main() -> None: + """Run Piper teleop with LeRobot S0100 leader arm (URDF and optionally real robot).""" + from common.configs import URDF_PATH + + parser = argparse.ArgumentParser( + description="Piper URDF teleop with LeRobot S0100 leader arm." + ) + parser.add_argument("--leader-port", type=str, default="/dev/ttyACM0") + parser.add_argument("--leader-id", type=str, default="my_awesome_leader_arm") + parser.add_argument("--leader-rate", type=float, default=50.0) + parser.add_argument( + "--real-robot", + action="store_true", + help="Use the real Piper robot (default: URDF only)", + ) + parser.add_argument("--can-interface", type=str, default="can0") + args = parser.parse_args() + + use_real_robot = args.real_robot + print("=" * 60) + print( + "PIPER TELEOP WITH LEROBOT S0100 LEADER ARM" + + (" โ€“ REAL ROBOT" if use_real_robot else " โ€“ URDF only") + ) + print("=" * 60) + + leader = LerobotSO100LeaderArm(port=args.leader_port, calibration_id=args.leader_id) + leader.configure_follower( + follower_limits_deg=PIPER_JOINT_LIMITS_DEG, + follower_offsets_deg=PIPER_OFFSETS_DEG, + follower_directions=PIPER_DIRECTIONS, + leader_to_follower_joint=LEADER_TO_PIPER_JOINT, + fixed_joints=PIPER_FIXED_JOINTS, + ) + print("\n๐Ÿฆพ Connecting to S0100 leader arm...") + try: + leader.connect(calibrate=False) + except Exception as e: + print(f"Failed to connect to leader: {e}") + if "no calibration registered" in str(e): + print( + "Run: lerobot-calibrate --teleop.type=so100_leader --teleop.port=... --teleop.id=..." + ) + sys.exit(1) + print("โœ“ Leader arm connected") + + data_manager = DataManager() + + robot_controller = None + joint_state_thread_obj = None + if use_real_robot: + from common.threads.joint_state import joint_state_thread + + from piper_controller import PiperController + + print("\n๐Ÿค– Initializing Piper robot controller...") + robot_controller = PiperController( + can_interface=args.can_interface, + robot_rate=ROBOT_RATE, + control_mode=PiperController.ControlMode.JOINT_SPACE, + neutral_joint_angles=NEUTRAL_JOINT_ANGLES, + debug_mode=False, + ) + robot_controller.start_control_loop() + print("๐Ÿ“Š Starting joint state thread...") + joint_state_thread_obj = threading.Thread( + target=joint_state_thread, + args=(data_manager, robot_controller), + daemon=True, + ) + joint_state_thread_obj.start() + + leader_thread = threading.Thread( + target=leader_reader_thread, + args=(data_manager, leader, args.leader_rate), + daemon=True, + ) + leader_thread.start() + + visualizer = RobotVisualizer(urdf_path=URDF_PATH) + visualizer.add_basic_controls() + visualizer.add_teleop_controls() + visualizer.add_gripper_status_controls() + if use_real_robot: + visualizer.add_homing_controls() + visualizer.add_toggle_robot_enabled_status_button() + + def toggle_robot_enabled() -> None: + assert robot_controller is not None # Only registered when use_real_robot + state = data_manager.get_robot_activity_state() + if state == RobotActivityState.ENABLED: + data_manager.set_robot_activity_state(RobotActivityState.DISABLED) + robot_controller.graceful_stop() + data_manager.set_teleop_state(False, None, None) + visualizer.update_toggle_robot_enabled_status(False) + print("โœ“ ๐Ÿ”ด Robot disabled") + elif state == RobotActivityState.DISABLED: + if robot_controller.resume_robot(): + data_manager.set_robot_activity_state(RobotActivityState.ENABLED) + visualizer.update_toggle_robot_enabled_status(True) + print("โœ“ ๐ŸŸข Robot enabled") + else: + print("โœ— Failed to enable robot") + + def on_go_home() -> None: + assert robot_controller is not None # Only registered when use_real_robot + state = data_manager.get_robot_activity_state() + if state == RobotActivityState.ENABLED: + data_manager.set_robot_activity_state(RobotActivityState.HOMING) + data_manager.set_teleop_state(False, None, None) + ok = robot_controller.move_to_home() + if not ok: + data_manager.set_robot_activity_state(RobotActivityState.ENABLED) + else: + print("โš ๏ธ Cannot home: robot not enabled") + + visualizer.set_toggle_robot_enabled_status_callback(toggle_robot_enabled) + visualizer.set_go_home_callback(on_go_home) + + print() + if use_real_robot: + print( + "๐Ÿš€ Leader arm driving REAL Piper robot. Enable robot in GUI, then move leader. Ctrl+C to exit." + ) + else: + print("๐Ÿš€ Leader arm driving Piper URDF. Move the leader arm. Ctrl+C to exit.") + print() + + dt_viz = 1.0 / VISUALIZATION_RATE + try: + while True: + t0 = time.time() + mapped_angles, mapped_gripper = data_manager.get_leader_mapped_state() + if mapped_angles is not None and mapped_gripper is not None: + data_manager.set_target_joint_angles(mapped_angles) + data_manager.set_controller_data(None, 1.0, 1.0 - mapped_gripper) + data_manager.set_teleop_state(True, None, None) + if not use_real_robot: + data_manager.set_current_joint_angles(mapped_angles) + + current_joint_angles = data_manager.get_current_joint_angles() + _, grip_value, trigger_value = data_manager.get_controller_data() + target_joint_angles = data_manager.get_target_joint_angles() + + visualizer.set_grip_value(grip_value) + visualizer.set_trigger_value(trigger_value) + visualizer.update_teleop_status(data_manager.get_teleop_active()) + + if use_real_robot: + robot_activity_state = data_manager.get_robot_activity_state() + if robot_activity_state == RobotActivityState.ENABLED: + visualizer.update_robot_status("Robot Status: Enabled") + elif robot_activity_state == RobotActivityState.HOMING: + visualizer.update_robot_status("Robot Status: Homing") + else: + visualizer.update_robot_status("Robot Status: Disabled") + if ( + target_joint_angles is not None + and robot_activity_state == RobotActivityState.ENABLED + ): + visualizer.update_ghost_robot_visibility(True) + visualizer.update_ghost_robot_pose(np.radians(target_joint_angles)) + else: + visualizer.update_ghost_robot_visibility(False) + visualizer.update_gripper_status( + trigger_value, + robot_enabled=(robot_activity_state == RobotActivityState.ENABLED), + ) + else: + visualizer.update_robot_status("URDF only โ€“ leader arm driving") + visualizer.update_ghost_robot_visibility(False) + visualizer.update_gripper_status(trigger_value, robot_enabled=True) + + if current_joint_angles is not None: + current_rad = np.radians(current_joint_angles) + visualizer.update_robot_pose(current_rad) + visualizer.update_joint_angles_display(current_rad) + + elapsed = time.time() - t0 + if dt_viz - elapsed > 0: + time.sleep(dt_viz - elapsed) + + except KeyboardInterrupt: + print("\n\n๐Ÿ‘‹ Interrupt โ€“ shutting down...") + except Exception as e: + print(f"\nโŒ Error: {e}") + traceback.print_exc() + + print("\n๐Ÿงน Cleaning up...") + data_manager.request_shutdown() + data_manager.set_robot_activity_state(RobotActivityState.DISABLED) + leader_thread.join(timeout=2.0) + if joint_state_thread_obj is not None: + joint_state_thread_obj.join(timeout=2.0) + if robot_controller is not None: + robot_controller.cleanup() + leader.disconnect() + visualizer.stop() + print("๐Ÿ‘‹ Done.") + + +if __name__ == "__main__": + main() diff --git a/examples/common/data_manager.py b/examples/common/data_manager.py index 9e159a1..d2a380c 100644 --- a/examples/common/data_manager.py +++ b/examples/common/data_manager.py @@ -93,6 +93,17 @@ def __init__(self) -> None: self.rgb_image: np.ndarray | None = None +class LeaderMappedState: + """Leader arm mapped state - joint angles and gripper from leader read_mapped().""" + + def __init__(self) -> None: + """Initialize LeaderMappedState with default values.""" + self._lock = threading.Lock() + + self.joint_angles: np.ndarray | None = None + self.gripper_open: float | None = None + + class DataManager: """Main state container coordinating all state groups. @@ -112,6 +123,7 @@ def __init__(self) -> None: self._robot_state = RobotState() self._ik_state = IKState() self._camera_state = CameraState() + self._leader_mapped_state = LeaderMappedState() # System state self._shutdown_event = threading.Event() @@ -530,6 +542,42 @@ def get_ik_success(self) -> bool: with self._ik_state._lock: return self._ik_state.success + # ============================================================================ + # Leader Mapped State Methods + # ============================================================================ + + def set_leader_mapped_state( + self, joint_angles: np.ndarray, gripper_open: float + ) -> None: + """Set leader-mapped joint angles and gripper (thread-safe). + + Args: + joint_angles: Follower-space joint angles from leader read_mapped(). + gripper_open: Gripper open amount in [0, 1]. + """ + with self._leader_mapped_state._lock: + self._leader_mapped_state.joint_angles = ( + joint_angles.copy() if joint_angles is not None else None + ) + self._leader_mapped_state.gripper_open = gripper_open + + def get_leader_mapped_state( + self, + ) -> tuple[np.ndarray | None, float | None]: + """Get leader-mapped joint angles and gripper (thread-safe). + + Returns: + Tuple of (joint_angles, gripper_open); either may be None if not set. + """ + with self._leader_mapped_state._lock: + angles = ( + self._leader_mapped_state.joint_angles.copy() + if self._leader_mapped_state.joint_angles is not None + else None + ) + gripper = self._leader_mapped_state.gripper_open + return (angles, gripper) + # ============================================================================ # System State Methods # ============================================================================ diff --git a/examples/common/leader_arm.py b/examples/common/leader_arm.py new file mode 100644 index 0000000..aec43cf --- /dev/null +++ b/examples/common/leader_arm.py @@ -0,0 +1,139 @@ +#!/usr/bin/env python3 +"""LeRobot S0100 leader arm: connect, read, and map to a configured follower. + +The class holds follower-specific config (limits, offsets, directions, joint +mapping) and can output mapped joint angles + gripper for any follower DOF. +Raw actions are still available via read() for debugging. +""" + +import numpy as np +from lerobot.teleoperators.so_leader import SO100Leader, SO100LeaderConfig + +# Fixed S0100 leader arm parameters (do not change per follower). +JOINT_ACTION_KEYS = [ + "shoulder_pan.pos", + "shoulder_lift.pos", + "elbow_flex.pos", + "wrist_flex.pos", + "wrist_roll.pos", +] +GRIPPER_ACTION_KEY = "gripper.pos" +NUM_JOINTS = 5 +USE_DEGREES = True + + +class LerobotSO100LeaderArm: + """LeRobot S0100 leader arm: read raw or mapped to a configured follower.""" + + def __init__(self, port: str, calibration_id: str) -> None: + """Configure the leader arm (does not connect). + + Args: + port: Serial port (e.g. /dev/ttyACM0). + calibration_id: Id used when calibrating (lerobot-calibrate --teleop.id=...). + """ + self._config = SO100LeaderConfig( + port=port, + id=calibration_id, + use_degrees=USE_DEGREES, + ) + self._leader = SO100Leader(self._config) + self._follower_limits_deg: np.ndarray | None = None + self._follower_offsets_deg: np.ndarray | None = None + self._follower_directions: np.ndarray | None = None + self._leader_to_follower_joint: dict[int, int] | None = None + self._fixed_joints: dict[int, float] = {} + + def connect(self, calibrate: bool = False) -> None: + """Connect to the leader arm. Raises if port or calibration fails.""" + self._leader.connect(calibrate=calibrate) + + def disconnect(self) -> None: + """Disconnect from the leader arm.""" + self._leader.disconnect() + + def read(self) -> dict[str, float]: + """Read current joint angles (degrees) and gripper (0โ€“100). Raw leader output.""" + return self._leader.get_action() + + def configure_follower( + self, + follower_limits_deg: np.ndarray, + follower_offsets_deg: np.ndarray, + follower_directions: np.ndarray, + leader_to_follower_joint: dict[int, int] | list[int], + fixed_joints: dict[int, float] | None = None, + ) -> None: + """Set follower mapping so read_mapped() returns follower-space angles. + + Args: + follower_limits_deg: (n_follower_joints, 2) min/max in degrees per joint. + follower_offsets_deg: (n_follower_joints,) offset per follower joint (leader 0ยฐ -> follower offsetยฐ). + follower_directions: (n_follower_joints,) sign per follower joint (+1 or -1). + leader_to_follower_joint: leader joint index -> follower joint index (dict or list of length NUM_JOINTS). + fixed_joints: optional dict {follower_joint_index: value} for joints with no leader (e.g. {3: 0.0}). + """ + n = follower_limits_deg.shape[0] + assert follower_limits_deg.shape == (n, 2) + assert follower_offsets_deg.shape == (n,) + assert follower_directions.shape == (n,) + if isinstance(leader_to_follower_joint, dict): + assert set(leader_to_follower_joint.keys()) == set(range(NUM_JOINTS)) + else: + assert len(leader_to_follower_joint) == NUM_JOINTS + self._follower_limits_deg = np.asarray(follower_limits_deg, dtype=np.float64) + self._follower_offsets_deg = np.asarray(follower_offsets_deg, dtype=np.float64) + self._follower_directions = np.asarray(follower_directions, dtype=np.float64) + self._leader_to_follower_joint = ( + dict(leader_to_follower_joint) + if isinstance(leader_to_follower_joint, dict) + else {i: v for i, v in enumerate(leader_to_follower_joint)} + ) + self._fixed_joints = dict(fixed_joints) if fixed_joints else {} + + def read_mapped(self) -> tuple[np.ndarray, float]: + """Read leader and return follower-space joint angles (degrees) and gripper open (0โ€“1). + + Must call configure_follower() first. Clips to follower limits; fixed joints set as configured. + """ + if ( + self._follower_limits_deg is None + or self._follower_offsets_deg is None + or self._follower_directions is None + or self._leader_to_follower_joint is None + ): + raise RuntimeError( + "configure_follower() must be called before read_mapped()" + ) + raw = self.read() + n = self._follower_limits_deg.shape[0] + angles = np.zeros(n, dtype=np.float64) + for fj, val in self._fixed_joints.items(): + angles[fj] = val + for i, key in enumerate(JOINT_ACTION_KEYS): + leader_val = raw.get(key, 0.0) + fj = self._leader_to_follower_joint[i] + lo, hi = self._follower_limits_deg[fj, 0], self._follower_limits_deg[fj, 1] + angles[fj] = np.clip( + leader_val * self._follower_directions[fj] + + self._follower_offsets_deg[fj], + lo, + hi, + ) + gripper = float(np.clip(raw.get(GRIPPER_ACTION_KEY, 50.0) / 100.0, 0.0, 1.0)) + return angles, gripper + + @property + def is_connected(self) -> bool: + """True if the leader arm is connected.""" + return self._leader.is_connected + + @staticmethod + def joint_keys() -> list[str]: + """Ordered list of body joint keys (no gripper).""" + return list(JOINT_ACTION_KEYS) + + @staticmethod + def gripper_key() -> str: + """Key for gripper value in the action dict.""" + return GRIPPER_ACTION_KEY diff --git a/examples/common/threads/leader_reader.py b/examples/common/threads/leader_reader.py new file mode 100644 index 0000000..8d7691f --- /dev/null +++ b/examples/common/threads/leader_reader.py @@ -0,0 +1,47 @@ +"""Leader arm reader thread โ€“ reads mapped output from leader and writes to DataManager.""" + +import time +import traceback + +from common.data_manager import DataManager +from common.leader_arm import LerobotSO100LeaderArm + + +def leader_reader_thread( + data_manager: DataManager, + leader_arm: LerobotSO100LeaderArm, + rate_hz: float, +) -> None: + """Read leader arm at rate_hz via read_mapped() and store in data_manager. + + Leader arm must have configure_follower() called before starting this thread. + """ + dt = 1.0 / rate_hz + print("๐Ÿฆพ Leader reader thread started") + try: + while not data_manager.is_shutdown_requested(): + t0 = time.perf_counter() + try: + joint_angles, gripper_open = leader_arm.read_mapped() + data_manager.set_leader_mapped_state(joint_angles, gripper_open) + except RuntimeError as e: + if "no calibration registered" in str(e): + print( + "โŒ Leader has no calibration. Run lerobot-calibrate for the leader." + ) + data_manager.request_shutdown() + elif "configure_follower" in str(e): + print( + "โŒ Leader arm has no follower config. Call configure_follower() before starting the thread." + ) + data_manager.request_shutdown() + break + elapsed = time.perf_counter() - t0 + if dt - elapsed > 0: + time.sleep(dt - elapsed) + except Exception as e: + print(f"โŒ Leader reader error: {e}") + traceback.print_exc() + data_manager.request_shutdown() + finally: + print("๐Ÿฆพ Leader reader thread stopped")