diff --git a/data/.lfs/groot.tar.gz b/data/.lfs/groot.tar.gz new file mode 100644 index 0000000000..16602bca84 --- /dev/null +++ b/data/.lfs/groot.tar.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:305ed4b8538cb8083d91518e687b184b374c977afd185adecabc1152cef8d701 +size 3517892 diff --git a/data/.lfs/mujoco_sim.tar.gz b/data/.lfs/mujoco_sim.tar.gz index 57833fbbc6..47d2df201d 100644 --- a/data/.lfs/mujoco_sim.tar.gz +++ b/data/.lfs/mujoco_sim.tar.gz @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:d178439569ed81dfad05455419dc51da2c52021313b6d7b9259d9e30946db7c6 -size 60186340 +oid sha256:4232d61e4af19dee0c0e3a8f55f2c1b48b28a70f810ff17039876a48b999d6d2 +size 60251722 diff --git a/dimos/control/components.py b/dimos/control/components.py index 04ca6f03a7..3a93f6891b 100644 --- a/dimos/control/components.py +++ b/dimos/control/components.py @@ -16,8 +16,11 @@ from dataclasses import dataclass, field from enum import Enum +from pathlib import Path from typing import Any +from dimos.hardware.whole_body.spec import WholeBodyConfig + HardwareId = str JointName = str TaskName = str @@ -61,16 +64,28 @@ class HardwareComponent: address: Connection address - IP for TCP, port for CAN auto_enable: Whether to auto-enable servos gripper_joints: Joints that use adapter gripper methods (separate from joints). + domain_id: DDS domain ID for adapters that use DDS transport + (e.g. Unitree G1). Real robot uses 0; unitree_mujoco sim + defaults to 1. Ignored by non-DDS adapters. + adapter_kwargs: Generic untyped kwargs forwarded to the adapter + constructor — use for adapter-specific knobs that don't + belong in the spec. + wb_config: Whole-body-specific config (PD gains etc.). Populate + on hardware_type=WHOLE_BODY components. Keeps WB-only knobs + off the generic HardwareComponent shared by manipulators, + bases, and grippers. """ hardware_id: HardwareId hardware_type: HardwareType joints: list[JointName] = field(default_factory=list) adapter_type: str = "mock" - address: str | None = None + address: str | Path | None = None auto_enable: bool = True gripper_joints: list[JointName] = field(default_factory=list) + domain_id: int = 0 adapter_kwargs: dict[str, Any] = field(default_factory=dict) + wb_config: WholeBodyConfig | None = None @property def all_joints(self) -> list[JointName]: diff --git a/dimos/control/coordinator.py b/dimos/control/coordinator.py index 13c959cd89..00e97cac65 100644 --- a/dimos/control/coordinator.py +++ b/dimos/control/coordinator.py @@ -75,22 +75,23 @@ class TaskConfig: Attributes: name: Task name (e.g., "traj_arm") - type: Task type ("trajectory", "servo", "velocity", "cartesian_ik", "teleop_ik") + type: Registered task type name. See ``dimos.control.tasks.registry``. joint_names: List of joint names this task controls priority: Task priority (higher wins arbitration) - model_path: Path to URDF/MJCF for IK solver (cartesian_ik/teleop_ik only) + model_path: Optional model path used by task factories that need one. ee_joint_id: End-effector joint ID in model (cartesian_ik/teleop_ik only) hand: "left" or "right" controller hand (teleop_ik only) gripper_joint: Joint name for gripper virtual joint gripper_open_pos: Gripper position at trigger 0.0 gripper_closed_pos: Gripper position at trigger 1.0 + hardware_id: Optional hardware dependency used by task factories. """ name: str type: str = "trajectory" joint_names: list[str] = field(default_factory=lambda: []) priority: int = 10 - # Cartesian IK / Teleop IK specific + # Cartesian IK / Teleop IK / GR00T WBC specific model_path: str | Path | None = None ee_joint_id: int = 6 hand: Literal["left", "right"] | None = None # teleop_ik only @@ -98,6 +99,20 @@ class TaskConfig: gripper_joint: str | None = None gripper_open_pos: float = 0.0 gripper_closed_pos: float = 0.0 + # For tasks that need access to a configured hardware adapter. + hardware_id: str | None = None + # Servo task: optional initial target held until/unless a new one arrives. + default_positions: list[float] | None = None + # Start the task immediately after registration. + auto_start: bool = False + # For tasks with arm()/disarm(): arm automatically on start. + auto_arm: bool = False + # For tasks with dry-run support: compute but suppress output on start. + auto_dry_run: bool = False + # Default arming ramp duration, for tasks that interpolate on arm(). + default_ramp_seconds: float = 10.0 + # Optional task-level compute decimation; ``None`` keeps the task default. + decimation: int | None = None class ControlCoordinatorConfig(ModuleConfig): @@ -110,6 +125,11 @@ class ControlCoordinatorConfig(ModuleConfig): log_ticks: Whether to log tick information (verbose) hardware: List of hardware configurations to create on start tasks: List of task configurations to create on start + + Odom is no longer published here — most ``WholeBodyAdapter`` impls + don't (and shouldn't) produce base pose. Sim engines publish odom + on their own Out port; real-hardware setups attach a dedicated + estimator Module that fuses IMU + leg kinematics. """ tick_rate: float = 100.0 @@ -166,11 +186,13 @@ class ControlCoordinator(Module): # Input: Teleop buttons for engage/disengage signaling buttons: In[Buttons] + # Arming and dry-run are one-shot RPCs, not streams. + def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) # Connected hardware (keyed by hardware_id) - self._hardware: dict[HardwareId, ConnectedHardware] = {} + self._hardware: dict[HardwareId, ConnectedHardware | ConnectedWholeBody] = {} self._hardware_lock = threading.Lock() # Joint -> hardware mapping (built when hardware added) @@ -203,6 +225,10 @@ def _setup_from_config(self) -> None: for task_cfg in self.config.tasks: task = self._create_task_from_config(task_cfg) self.add_task(task) + if task_cfg.auto_start: + start = getattr(task, "start", None) + if callable(start): + start() except Exception: # Rollback: clean up all successfully added hardware @@ -262,105 +288,36 @@ def _create_twist_base_adapter(self, component: HardwareComponent) -> TwistBaseA def _create_whole_body_adapter(self, component: HardwareComponent) -> WholeBodyAdapter: """Create a whole-body adapter from component config. - ``component.address`` carries the DDS network interface — int (CAN port) - or str ("enp60s0"); cyclonedds requires the right type, so try int() first - and fall back to keeping the original string. + ``component.address`` is the universal hardware-locator field. + It carries the DDS network interface for real-hw adapters (e.g. + ``"enp2s0"``) and the MJCF path for sim adapters — each adapter + knows how to interpret a string vs. an integer in its own + ``__init__``. Adapters that need extra arguments accept them via + ``**component.adapter_kwargs``. This unification (vs. the earlier + dual ``address`` + ``network_interface``) matches the convention + used by ``_create_manipulator_adapter`` and + ``_create_twist_base_adapter``. """ from dimos.hardware.whole_body.registry import whole_body_adapter_registry - addr: int | str | None = component.address - if addr is not None: - try: - addr = int(addr) - except ValueError: - pass # keep as string (e.g. "enp60s0") - return whole_body_adapter_registry.create( component.adapter_type, dof=len(component.joints), hardware_id=component.hardware_id, - network_interface=addr if addr is not None else "", + address=component.address, + domain_id=component.domain_id, **component.adapter_kwargs, ) def _create_task_from_config(self, cfg: TaskConfig) -> ControlTask: - """Create a control task from config.""" - task_type = cfg.type.lower() - - if task_type == "trajectory": - from dimos.control.tasks.trajectory_task import ( - JointTrajectoryTask, - JointTrajectoryTaskConfig, - ) - - return JointTrajectoryTask( - cfg.name, - JointTrajectoryTaskConfig( - joint_names=cfg.joint_names, - priority=cfg.priority, - ), - ) - - elif task_type == "servo": - from dimos.control.tasks.servo_task import JointServoTask, JointServoTaskConfig - - return JointServoTask( - cfg.name, - JointServoTaskConfig( - joint_names=cfg.joint_names, - priority=cfg.priority, - ), - ) - - elif task_type == "velocity": - from dimos.control.tasks.velocity_task import JointVelocityTask, JointVelocityTaskConfig - - return JointVelocityTask( - cfg.name, - JointVelocityTaskConfig( - joint_names=cfg.joint_names, - priority=cfg.priority, - ), - ) - - elif task_type == "cartesian_ik": - from dimos.control.tasks.cartesian_ik_task import CartesianIKTask, CartesianIKTaskConfig - - if cfg.model_path is None: - raise ValueError(f"CartesianIKTask '{cfg.name}' requires model_path in TaskConfig") - - return CartesianIKTask( - cfg.name, - CartesianIKTaskConfig( - joint_names=cfg.joint_names, - model_path=cfg.model_path, - ee_joint_id=cfg.ee_joint_id, - priority=cfg.priority, - ), - ) + """Create a control task from config via the task registry. - elif task_type == "teleop_ik": - from dimos.control.tasks.teleop_task import TeleopIKTask, TeleopIKTaskConfig - - if cfg.model_path is None: - raise ValueError(f"TeleopIKTask '{cfg.name}' requires model_path in TaskConfig") - - return TeleopIKTask( - cfg.name, - TeleopIKTaskConfig( - joint_names=cfg.joint_names, - model_path=cfg.model_path, - ee_joint_id=cfg.ee_joint_id, - priority=cfg.priority, - hand=cfg.hand, - gripper_joint=cfg.gripper_joint, - gripper_open_pos=cfg.gripper_open_pos, - gripper_closed_pos=cfg.gripper_closed_pos, - ), - ) + Each task module self-registers a factory under its type name. + Factories can resolve hardware dependencies from the hardware map. + """ + from dimos.control.tasks.registry import control_task_registry - else: - raise ValueError(f"Unknown task type: {task_type}") + return control_task_registry.create(cfg.type, cfg, hardware=self._hardware) @rpc def add_hardware( @@ -594,12 +551,45 @@ def _on_twist_command(self, msg: Twist) -> None: joint_state = JointState(name=names, velocity=velocities) self._on_joint_command(joint_state) + # Velocity-capable tasks opt in with set_velocity_command(). + t_now = time.perf_counter() + with self._task_lock: + for task in self._tasks.values(): + set_vel = getattr(task, "set_velocity_command", None) + if set_vel is not None: + set_vel(msg.linear.x, msg.linear.y, msg.angular.z, t_now) + def _on_buttons(self, msg: Buttons) -> None: """Forward button state to all tasks.""" with self._task_lock: for task in self._tasks.values(): task.on_buttons(msg) + @rpc + def set_activated(self, engaged: bool) -> None: + """Arm/disarm every task exposing ``arm()`` / ``disarm()``.""" + with self._task_lock: + for task in self._tasks.values(): + method_name = "arm" if engaged else "disarm" + handler = getattr(task, method_name, None) + if callable(handler): + try: + handler() + except Exception: + logger.exception(f"{method_name}() raised on task {task.name!r}") + + @rpc + def set_dry_run(self, enabled: bool) -> None: + """Toggle dry-run on every task exposing ``set_dry_run``.""" + with self._task_lock: + for task in self._tasks.values(): + handler = getattr(task, "set_dry_run", None) + if callable(handler): + try: + handler(enabled) + except Exception: + logger.exception(f"set_dry_run() raised on task {task.name!r}") + @rpc def task_invoke( self, task_name: TaskName, method: str, kwargs: dict[str, Any] | None = None @@ -711,16 +701,21 @@ def start(self) -> None: "Use task_invoke RPC or set transport via blueprint." ) - # Subscribe to twist commands if any twist base hardware configured + # Twist commands drive either base hardware or velocity-capable tasks. has_twist_base = any(c.hardware_type == HardwareType.BASE for c in self.config.hardware) - if has_twist_base: + with self._task_lock: + has_velocity_task = any( + callable(getattr(task, "set_velocity_command", None)) + for task in self._tasks.values() + ) + if has_twist_base or has_velocity_task: try: self._twist_command_unsub = self.twist_command.subscribe(self._on_twist_command) - logger.info("Subscribed to twist_command for twist base control") + logger.info("Subscribed to twist_command for twist base / velocity-capable tasks") except Exception: logger.warning( - "Twist base configured but could not subscribe to twist_command. " - "Use task_invoke RPC or set transport via blueprint." + "Twist base or velocity-capable task configured but could not subscribe " + "to twist_command. Use task_invoke RPC or set transport via blueprint." ) # Subscribe to buttons if any teleop_ik tasks configured (engage/disengage) @@ -729,6 +724,8 @@ def start(self) -> None: self._buttons_unsub = self.buttons.subscribe(self._on_buttons) logger.info("Subscribed to buttons for engage/disengage") + # Arming + dry-run are RPC-only; no stream subscription here. + logger.info(f"ControlCoordinator started at {self.config.tick_rate}Hz") @rpc diff --git a/dimos/control/hardware_interface.py b/dimos/control/hardware_interface.py index 0169321600..5e5bb2a3da 100644 --- a/dimos/control/hardware_interface.py +++ b/dimos/control/hardware_interface.py @@ -14,11 +14,12 @@ """Connected hardware for the ControlCoordinator. -Provides two wrapper types: +Provides runtime wrappers for coordinator-managed hardware: - ConnectedHardware: Wraps ManipulatorAdapter for joint-controlled arms - ConnectedTwistBase: Wraps TwistBaseAdapter for velocity-commanded platforms +- ConnectedWholeBody: Wraps WholeBodyAdapter for full-body motor control -Both share the same duck-type interface (read_state, write_command, etc.) +They share the same duck-type interface (read_state, write_command, etc.) so the tick loop treats them uniformly. """ @@ -332,6 +333,34 @@ def __init__( self._component = component self._joint_names = component.joints + # Resolve per-joint PD gains once at wire-up time. Gains live on + # the WB-specific sub-config; fall back to _DEFAULT_KP/_DEFAULT_KD + # if the blueprint didn't supply a wb_config. + n = len(self._joint_names) + wb = component.wb_config + kp_in = wb.kp if wb is not None else None + kd_in = wb.kd if wb is not None else None + if kp_in is not None: + if len(kp_in) != n: + raise ValueError( + f"HardwareComponent '{component.hardware_id}': wb_config.kp length " + f"{len(kp_in)} does not match joints length {n}" + ) + self._kp = list(kp_in) + else: + self._kp = [_DEFAULT_KP] * n + if kd_in is not None: + if len(kd_in) != n: + raise ValueError( + f"HardwareComponent '{component.hardware_id}': wb_config.kd length " + f"{len(kd_in)} does not match joints length {n}" + ) + self._kd = list(kd_in) + else: + self._kd = [_DEFAULT_KD] * n + self._kp_by_name = dict(zip(self._joint_names, self._kp, strict=False)) + self._kd_by_name = dict(zip(self._joint_names, self._kd, strict=False)) + self._last_commanded: dict[str, float] = {} self._initialized = False self._warned_unknown_joints: set[str] = set() @@ -361,10 +390,13 @@ def read_state(self) -> dict[JointName, JointState]: } def write_command(self, commands: dict[str, float], mode: ControlMode) -> bool: - """Write position commands — converts to MotorCommand with PD gains. + """Write position commands — converts to MotorCommand with per-joint PD gains. Only POSITION / SERVO_POSITION are supported; other modes are warned and dropped (matches ConnectedHardware's warn-and-skip pattern). + Per-joint kp/kd come from ``component.wb_config`` (resolved in + ``__init__``); fall back to ``_DEFAULT_KP``/``_DEFAULT_KD`` when + the blueprint didn't supply gains. """ from dimos.hardware.whole_body.spec import MotorCommand @@ -392,8 +424,8 @@ def write_command(self, commands: dict[str, float], mode: ControlMode) -> bool: MotorCommand( q=self._last_commanded[name], dq=0.0, - kp=_DEFAULT_KP, - kd=_DEFAULT_KD, + kp=self._kp_by_name[name], + kd=self._kd_by_name[name], tau=0.0, ) for name in self._joint_names diff --git a/dimos/control/task.py b/dimos/control/task.py index 56c2c4d426..0b32fae4ef 100644 --- a/dimos/control/task.py +++ b/dimos/control/task.py @@ -32,6 +32,7 @@ from dimos.control.components import JointName from dimos.hardware.manipulators.spec import ControlMode +from dimos.hardware.whole_body.spec import IMUState if TYPE_CHECKING: from dimos.msgs.geometry_msgs.Pose import Pose @@ -106,13 +107,16 @@ class CoordinatorState: Attributes: joints: Aggregated joint states from all hardware + imu: Per-whole-body IMU readings, keyed by hardware_id. + Empty dict when no whole-body hardware exposes IMU this tick. t_now: Current tick time (time.perf_counter()) dt: Time since last tick (seconds) """ joints: JointStateSnapshot - t_now: float # Coordinator time (perf_counter) - USE THIS, NOT time.time()! - dt: float # Time since last tick + imu: dict[str, IMUState] = field(default_factory=dict) + t_now: float = 0.0 # Coordinator time (perf_counter) - USE THIS, NOT time.time()! + dt: float = 0.0 # Time since last tick @dataclass diff --git a/dimos/control/tasks/cartesian_ik_task.py b/dimos/control/tasks/cartesian_ik_task.py index 24b2728e14..c071c1b8fc 100644 --- a/dimos/control/tasks/cartesian_ik_task.py +++ b/dimos/control/tasks/cartesian_ik_task.py @@ -330,3 +330,17 @@ def forward_kinematics(self, joint_positions: NDArray[np.floating[Any]]) -> pino "CartesianIKTask", "CartesianIKTaskConfig", ] + + +def create_task(cfg: Any, hardware: Any) -> CartesianIKTask: + if cfg.model_path is None: + raise ValueError(f"CartesianIKTask {cfg.name!r} requires model_path in TaskConfig") + return CartesianIKTask( + cfg.name, + CartesianIKTaskConfig( + joint_names=cfg.joint_names, + model_path=cfg.model_path, + ee_joint_id=cfg.ee_joint_id, + priority=cfg.priority, + ), + ) diff --git a/dimos/control/tasks/g1_groot_wbc_task.py b/dimos/control/tasks/g1_groot_wbc_task.py new file mode 100644 index 0000000000..0b03d6853f --- /dev/null +++ b/dimos/control/tasks/g1_groot_wbc_task.py @@ -0,0 +1,810 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""GR00T whole-body-control task for the Unitree G1 humanoid. + +Runs the two-model GR00T WBC locomotion policy (balance + walk) inside +the coordinator tick loop. Claims the 15 legs+waist joints at high +priority; arm joints are left to lower-priority tasks in the blueprint. + +Reference implementation: g1_control/backends/groot_wbc_backend.py. +Observation, action, and model-selection semantics are preserved +verbatim — changing them drifts us away from the ONNX policies trained +by GR00T-WholeBodyControl. + +CRITICAL: Uses t_now from CoordinatorState, never calls time.time(). +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +import threading +from typing import TYPE_CHECKING, Any + +import numpy as np +import onnxruntime as ort # type: ignore[import-untyped] + +from dimos.control.components import make_humanoid_joints +from dimos.control.task import ( + BaseControlTask, + ControlMode, + CoordinatorState, + JointCommandOutput, + ResourceClaim, +) +from dimos.utils.logging_config import setup_logger + +if TYPE_CHECKING: + from pathlib import Path + + from dimos.hardware.whole_body.spec import WholeBodyAdapter + from dimos.msgs.geometry_msgs.Twist import Twist + +logger = setup_logger() + + +# The 29 DDS motor names + their kp/kd for the GR00T-trained policies. +# Lifted verbatim from g1-control-api/configs/g1_groot_wbc.yaml, which +# itself copies GR00T-WBC's g1_29dof_gear_wbc.yaml. Diverging from these +# on real hardware risks instability — the ONNX models were trained +# against them. +g1_joints = make_humanoid_joints("g1") +g1_legs_waist = g1_joints[:15] # indices 0..14 — legs (12) + waist (3) +g1_arms = g1_joints[15:] # indices 15..28 — left arm (7) + right arm (7) + +G1_GROOT_KP: list[float] = [ + 150.0, + 150.0, + 150.0, + 200.0, + 40.0, + 40.0, # left leg + 150.0, + 150.0, + 150.0, + 200.0, + 40.0, + 40.0, # right leg + 250.0, + 250.0, + 250.0, # waist + 100.0, + 100.0, + 40.0, + 40.0, + 20.0, + 20.0, + 20.0, # left arm + 100.0, + 100.0, + 40.0, + 40.0, + 20.0, + 20.0, + 20.0, # right arm +] +G1_GROOT_KD: list[float] = [ + 2.0, + 2.0, + 2.0, + 4.0, + 2.0, + 2.0, # left leg + 2.0, + 2.0, + 2.0, + 4.0, + 2.0, + 2.0, # right leg + 5.0, + 5.0, + 5.0, # waist + 5.0, + 5.0, + 2.0, + 2.0, + 2.0, + 2.0, + 2.0, # left arm + 5.0, + 5.0, + 2.0, + 2.0, + 2.0, + 2.0, + 2.0, # right arm +] + +# Relaxed arms-down pose. From g1_control/backends/groot_wbc_backend.py +# DEFAULT_29[15:] (all zeros) — the zero-offset pose the policy was +# trained against. Operators can override at runtime by publishing +# joint targets on the arms via the coordinator's joint_command transport. +ARM_DEFAULT_POSE: list[float] = [0.0] * 14 + + +# Default joint angles copied verbatim from +# g1_control/backends/groot_wbc_backend.py DEFAULT_29. Policy was trained +# against these as the zero-offset pose. +_DEFAULT_POSITIONS_29 = [ + -0.1, + 0.0, + 0.0, + 0.3, + -0.2, + 0.0, # left leg + -0.1, + 0.0, + 0.0, + 0.3, + -0.2, + 0.0, # right leg + 0.0, + 0.0, + 0.0, # waist + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, # left arm (not driven by policy) + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, # right arm (not driven by policy) +] + +_SINGLE_OBS_DIM = 86 +_OBS_HISTORY_LEN = 6 +_NUM_ACTIONS = 15 +_NUM_MOTORS = 29 + + +@dataclass +class G1GrootWBCTaskConfig: + """Configuration for the GR00T WBC task. + + Attributes: + balance_onnx: Path to the balance ONNX model. Used when + ``||cmd|| <= cmd_norm_threshold``. + walk_onnx: Path to the walk ONNX model. Used otherwise. + joint_names: The 15 coordinator joint names this task claims + (legs 0-11 + waist 12-14, in DDS order). + all_joint_names: All 29 coordinator joint names in DDS order + (legs 0-11 + waist 12-14 + arms 15-28). Required to build + the observation, which feeds all 29 joint states. + default_positions_29: Default joint angles for all 29 joints + (DDS order). First 15 are the policy's zero-offset pose. + priority: Arbitration priority (higher wins). 50 is the + recommended WBC priority per the task.py conventions. + decimation: Run inference every N ticks. At 500 Hz tick / + 50 Hz policy → decimation=10. + action_scale: Multiplier on raw policy output before adding + defaults. + obs_ang_vel_scale: Scale for base angular velocity in obs. + obs_dof_pos_scale: Scale for joint position offset in obs. + obs_dof_vel_scale: Scale for joint velocity in obs. + cmd_scale: Per-axis scale applied to (vx, vy, wz) in obs. + cmd_norm_threshold: ||cmd|| below this selects the balance + model, otherwise walk. + height_cmd: Fixed height command slot in obs. + timeout: Seconds without a velocity command before zeroing it. + auto_arm: Arm the policy automatically on ``start()``. Default + False — safe for real hardware; the blueprint sets True for + simulation. + auto_dry_run: Enter dry-run mode on ``start()``. Policy still + runs but outputs are not emitted to the adapter — useful for + verifying on real hardware without commanding motors. + default_ramp_seconds: Duration of the arming ramp (current pose + → ``default_15``) when ``arm()`` is called without an + explicit duration. Set to 0 in simulation (no ramp needed); + 10 s on real hardware mirrors the g1-control-api default. + """ + + balance_onnx: str | Path + walk_onnx: str | Path + joint_names: list[str] + all_joint_names: list[str] + default_positions_29: list[float] = field(default_factory=lambda: list(_DEFAULT_POSITIONS_29)) + priority: int = 50 + decimation: int = 10 + action_scale: float = 0.25 + obs_ang_vel_scale: float = 0.5 + obs_dof_pos_scale: float = 1.0 + obs_dof_vel_scale: float = 0.05 + cmd_scale: tuple[float, float, float] = (2.0, 2.0, 0.5) + cmd_norm_threshold: float = 0.05 + height_cmd: float = 0.74 + timeout: float = 1.0 + auto_arm: bool = False + auto_dry_run: bool = False + default_ramp_seconds: float = 10.0 + + +class G1GrootWBCTask(BaseControlTask): + """Runs the GR00T balance / walk ONNX policies inside the coordinator tick loop. + + Observation vector (86 dims, built each inference tick, replicates + ``groot_wbc_backend.GrootWBCBackend._compute_obs`` verbatim): + + [0:3] cmd_vel * cmd_scale # scaled velocity command + [3] height_cmd # fixed slot (0.74) + [4:7] (0, 0, 0) # rpy_cmd, zeros + [7:10] gyro * obs_ang_vel_scale # body-frame ang vel + [10:13] projected_gravity(quat) # gravity in body frame + [13:42] (q_29 - default_29) * dof_pos_scale + [42:71] dq_29 * dof_vel_scale + [71:86] last_action (15 dims) + + The observation is stacked into a 6-frame history buffer (516 dims) + before being fed to ONNX. + + Action (15 dims, legs + waist only): + + target_q_15 = action * action_scale + default_15 + + Arms are NOT driven by this task — the blueprint pairs this task + with a lower-priority servo task scoped to the 14 arm joints. + """ + + def __init__( + self, + name: str, + config: G1GrootWBCTaskConfig, + adapter: WholeBodyAdapter, + ) -> None: + if len(config.joint_names) != _NUM_ACTIONS: + raise ValueError( + f"G1GrootWBCTask '{name}' requires exactly {_NUM_ACTIONS} joint names " + f"(legs + waist), got {len(config.joint_names)}" + ) + if len(config.all_joint_names) != _NUM_MOTORS: + raise ValueError( + f"G1GrootWBCTask '{name}' requires exactly {_NUM_MOTORS} all_joint_names " + f"(full 29-DOF G1), got {len(config.all_joint_names)}" + ) + if len(config.default_positions_29) != _NUM_MOTORS: + raise ValueError( + f"G1GrootWBCTask '{name}' requires exactly {_NUM_MOTORS} " + f"default_positions_29, got {len(config.default_positions_29)}" + ) + if config.decimation < 1: + raise ValueError(f"G1GrootWBCTask '{name}' requires decimation >= 1") + + self._name = name + self._config = config + self._adapter = adapter + self._joint_names_list = list(config.joint_names) + self._joint_names_set = frozenset(config.joint_names) + self._all_joint_names = list(config.all_joint_names) + + providers = ort.get_available_providers() + self._balance_session = ort.InferenceSession(str(config.balance_onnx), providers=providers) + self._walk_session = ort.InferenceSession(str(config.walk_onnx), providers=providers) + self._balance_input = self._balance_session.get_inputs()[0].name + self._walk_input = self._walk_session.get_inputs()[0].name + logger.info( + f"G1GrootWBCTask '{name}' loaded balance={config.balance_onnx}, " + f"walk={config.walk_onnx} (providers: {providers})" + ) + + self._default_29 = np.asarray(config.default_positions_29, dtype=np.float32) + self._default_15 = self._default_29[:_NUM_ACTIONS] + self._cmd_scale = np.asarray(config.cmd_scale, dtype=np.float32) + + # Inference state + self._last_action = np.zeros(_NUM_ACTIONS, dtype=np.float32) + self._obs_buf = np.zeros((1, _SINGLE_OBS_DIM * _OBS_HISTORY_LEN), dtype=np.float32) + self._first_inference = True + self._tick_count = 0 + self._last_targets: list[float] | None = None + + # Last-known-good state caches. compute() falls back to these + # whenever a joint is missing from CoordinatorState (transient + # packet drop, late publisher, etc) instead of substituting 0.0 + # — feeding a zero pose to the policy makes it think the robot + # is at the URDF zero (legs straight) and command a snap-back, + # which on real hardware tips the robot over. ``_state_seen`` + # tracks whether we've ever observed a fully-populated state; + # until then compute() returns None rather than running on + # half-cached defaults. + self._cached_q_29 = self._default_29.copy() + self._cached_dq_29 = np.zeros(_NUM_MOTORS, dtype=np.float32) + self._cached_q_15 = self._default_15.copy() + self._state_seen = False + + # Lifecycle state machine. + # + # _active — task is registered and compute() is being invoked + # by the coordinator. Gate for the whole compute() + # path; start()/stop() toggle this. + # _armed — policy outputs are emitted to the adapter. Flip + # via arm()/disarm(). + # _arming — currently ramping current-pose → default_15 over + # ``_arming_duration`` seconds. Set by arm() with + # a non-zero ramp, cleared when alpha reaches 1. + # _arm_pending — arm() was called; compute() captures the ramp + # start pose from state on the next tick and flips + # into _arming (or _armed directly if ramp=0). + # _dry_run — compute() still runs the policy (obs history + # stays hot) but returns None so the coordinator + # sends no command to the adapter. Throttled log + # lets the operator see what WOULD have gone out. + # + # When active-but-unarmed, compute() echoes back the current + # joint positions so the PD error is zero and the robot sits in + # pure damping (kd-only) — this mirrors the reference backend's + # "hold current pose" inactive state. + self._active = False + self._armed = False + self._arming = False + self._arm_pending = False + self._dry_run = bool(config.auto_dry_run) + self._arming_duration = 0.0 + self._arming_start_t = 0.0 + self._ramp_start: np.ndarray | None = None + self._last_dry_run_log_t: float = 0.0 + + self._cmd_lock = threading.Lock() + self._cmd = np.zeros(3, dtype=np.float32) + self._last_cmd_time: float = 0.0 + + @property + def name(self) -> str: + return self._name + + def claim(self) -> ResourceClaim: + return ResourceClaim( + joints=self._joint_names_set, + priority=self._config.priority, + mode=ControlMode.SERVO_POSITION, + ) + + def is_active(self) -> bool: + return self._active + + def _refresh_state_caches(self, state: CoordinatorState) -> bool: + """Pull current q/dq for the 15 claimed joints and the full 29 + from ``CoordinatorState``, updating last-known-good caches and + returning True iff the full 29 came back populated this tick. + + On a missing joint we keep the cached value rather than dropping + in 0.0 — the policy interprets 0.0 as "at URDF zero / legs + straight" and commands a recovery, which tips the robot. + """ + all_present = True + for i, jname in enumerate(self._joint_names_list): + pos = state.joints.get_position(jname) + if pos is None: + all_present = False + else: + self._cached_q_15[i] = pos + for i, jname in enumerate(self._all_joint_names): + pos = state.joints.get_position(jname) + vel = state.joints.get_velocity(jname) + if pos is None or vel is None: + all_present = False + else: + self._cached_q_29[i] = pos + self._cached_dq_29[i] = vel + if all_present: + self._state_seen = True + return all_present + + def compute(self, state: CoordinatorState) -> JointCommandOutput | None: + if not self._active: + return None + + # Refresh the last-known-good state caches. If we've never seen + # a fully-populated state and this tick is also incomplete, hold + # off — emitting a command from defaults would snap the robot. + fresh = self._refresh_state_caches(state) + if not self._state_seen and not fresh: + return None + + current_15 = self._cached_q_15.copy() + + # arm() was called — snapshot the ramp start and enter arming / + # armed state (ramp=0 arms immediately). + if self._arm_pending: + self._ramp_start = current_15.copy() + self._arming_start_t = state.t_now + if self._arming_duration > 0.0: + self._arming = True + self._armed = False + logger.info( + f"G1GrootWBCTask '{self._name}' arming: " + f"ramp → default_15 over {self._arming_duration:.1f}s" + ) + else: + self._arming = False + self._armed = True + self._reset_policy_state() + logger.info(f"G1GrootWBCTask '{self._name}' armed (no ramp)") + self._arm_pending = False + + # Unarmed & not arming: echo current joint positions. With the + # component's kp/kd applied downstream, q_tgt == q_actual yields + # pure damping (tau = -kd * dq), which mirrors the reference + # backend's inactive "hold current pose" behaviour. + if not self._armed and not self._arming: + self._last_targets = current_15.tolist() + return JointCommandOutput( + joint_names=self._joint_names_list, + positions=self._last_targets, + mode=ControlMode.SERVO_POSITION, + ) + + # Arming: lerp ramp_start → default_15 over arming_duration. + if self._arming: + assert self._ramp_start is not None + elapsed = state.t_now - self._arming_start_t + alpha = ( + 1.0 if self._arming_duration <= 0.0 else min(1.0, elapsed / self._arming_duration) + ) + target = self._ramp_start + alpha * (self._default_15 - self._ramp_start) + self._last_targets = target.tolist() + if alpha >= 1.0: + self._arming = False + self._armed = True + self._reset_policy_state() + logger.info( + f"G1GrootWBCTask '{self._name}' ramp complete — policy armed " + f"({'dry-run' if self._dry_run else 'live'})" + ) + return JointCommandOutput( + joint_names=self._joint_names_list, + positions=self._last_targets, + mode=ControlMode.SERVO_POSITION, + ) + + # Armed: run the policy. In dry-run mode we still compute (so + # the obs buffer stays hot), but return None so no command goes + # downstream. A throttled log line shows what WOULD have been + # sent, which is how g1-control-api lets operators verify pre-go. + self._tick_count += 1 + + # Decimation: only run inference every N ticks. Between inference + # ticks, re-emit the last target so the coordinator keeps driving + # the joints (or nothing, in dry-run). + if self._tick_count % self._config.decimation != 0: + if self._dry_run or self._last_targets is None: + return None + return JointCommandOutput( + joint_names=self._joint_names_list, + positions=self._last_targets, + mode=ControlMode.SERVO_POSITION, + ) + + # State was refreshed up top (with fall-back-to-last-good on + # missing joints). Snapshot the caches now so concurrent state + # updates don't tear the obs vector. + q_29 = self._cached_q_29.copy() + dq_29 = self._cached_dq_29.copy() + + # Prefer IMU from CoordinatorState (populated by the coordinator + # each tick from every whole-body adapter); fall back to the + # adapter-direct read if state.imu is empty (e.g. unit tests + # that build a bare CoordinatorState). The state path is what + # decouples this task from the WholeBodyAdapter Protocol. + if state.imu: + # Single whole-body adapter is the common case — take any. + imu = next(iter(state.imu.values())) + else: + imu = self._adapter.read_imu() + gyro = np.asarray(imu.gyroscope, dtype=np.float32) + gravity = self._projected_gravity(imu.quaternion) + + # Velocity command (with timeout → zero). + with self._cmd_lock: + if ( + self._config.timeout > 0.0 + and self._last_cmd_time > 0.0 + and (state.t_now - self._last_cmd_time) > self._config.timeout + ): + cmd = np.zeros(3, dtype=np.float32) + else: + cmd = self._cmd.copy() + + obs = self._build_obs(cmd=cmd, gyro=gyro, gravity=gravity, q=q_29, dq=dq_29) + + # History buffer: first inference fills all slots with the current + # obs (warm-start); subsequent ticks roll the window. + if self._first_inference: + tiled = np.tile(obs, _OBS_HISTORY_LEN) + self._obs_buf[0, :] = tiled + self._first_inference = False + else: + self._obs_buf[0, : _SINGLE_OBS_DIM * (_OBS_HISTORY_LEN - 1)] = self._obs_buf[ + 0, _SINGLE_OBS_DIM: + ] + self._obs_buf[0, _SINGLE_OBS_DIM * (_OBS_HISTORY_LEN - 1) :] = obs + + # Model selection: balance when near-stationary, walk otherwise. + cmd_norm = float(np.linalg.norm(cmd)) + if cmd_norm <= self._config.cmd_norm_threshold: + raw = self._balance_session.run(None, {self._balance_input: self._obs_buf})[0] + else: + raw = self._walk_session.run(None, {self._walk_input: self._obs_buf})[0] + + action = raw[0, :_NUM_ACTIONS].astype(np.float32) + self._last_action[:] = action + + target_q_15 = action * self._config.action_scale + self._default_15 + self._last_targets = target_q_15.tolist() + + if self._dry_run: + # Throttled peek at the commanded pose so the operator can + # decide whether it looks sane before flipping dry-run off. + if (state.t_now - self._last_dry_run_log_t) >= 1.0: + max_delta = float(np.max(np.abs(target_q_15 - current_15))) + logger.info( + f"G1GrootWBCTask '{self._name}' DRY-RUN (|Δq|_max={max_delta:.3f} rad, " + f"model={'walk' if cmd_norm > self._config.cmd_norm_threshold else 'balance'})" + ) + self._last_dry_run_log_t = state.t_now + return None + + return JointCommandOutput( + joint_names=self._joint_names_list, + positions=self._last_targets, + mode=ControlMode.SERVO_POSITION, + ) + + def on_preempted(self, by_task: str, joints: frozenset[str]) -> None: + if joints & self._joint_names_set: + logger.warning(f"G1GrootWBCTask '{self._name}' preempted by {by_task} on {joints}") + + # Velocity command input + + def set_velocity_command(self, vx: float, vy: float, yaw_rate: float, t_now: float) -> None: + """Set the (vx, vy, yaw_rate) commanded to the policy. + + Called by the coordinator's twist_command dispatcher and by + external Python callers. Thread-safe. + """ + with self._cmd_lock: + self._cmd[:] = [vx, vy, yaw_rate] + self._last_cmd_time = t_now + + def on_twist(self, msg: Twist, t_now: float) -> bool: + """Accept a Twist message, e.g. from an LCM cmd_vel transport.""" + self.set_velocity_command( + float(msg.linear.x), + float(msg.linear.y), + float(msg.angular.z), + t_now, + ) + return True + + # Lifecycle + + def start(self) -> None: + """Enter the coordinator tick loop. + + Starts in "active but unarmed" — compute() echoes current joint + positions every tick, which (combined with the component's + kp/kd) produces damping-only behaviour on real hardware (the + robot sits quietly in dev mode). + + If ``config.auto_arm`` is set, schedules an immediate + ``arm()`` using ``config.default_ramp_seconds`` — this is how + the simulation blueprint bypasses the activation ritual. + If ``config.auto_dry_run`` is set, starts in dry-run mode. + """ + self._active = True + self._armed = False + self._arming = False + self._arm_pending = False + self._dry_run = bool(self._config.auto_dry_run) + self._last_targets = None + self._reset_policy_state() + with self._cmd_lock: + self._cmd[:] = 0.0 + self._last_cmd_time = 0.0 + logger.info( + f"G1GrootWBCTask '{self._name}' started (unarmed" + + (", dry-run" if self._dry_run else "") + + ")" + ) + if self._config.auto_arm: + self.arm(self._config.default_ramp_seconds) + + def stop(self) -> None: + """Leave the tick loop. Re-activation resets policy state.""" + self._active = False + self._armed = False + self._arming = False + self._arm_pending = False + self._last_targets = None + logger.info(f"G1GrootWBCTask '{self._name}' stopped") + + # Arming / dry-run (RPC-callable via coordinator.task_invoke) + + def arm(self, ramp_seconds: float | None = None) -> bool: + """Begin the arming sequence. + + ``compute()`` will snapshot the current joint positions on the + next tick, lerp toward ``default_15`` over ``ramp_seconds``, + then flip ``_armed`` true and hand control to the ONNX policy. + A ramp of 0 arms immediately with no interpolation (what sim + uses — the subprocess already holds the MJCF's default pose). + + Safe to call redundantly; calls while already armed or arming + are ignored. No-op if the task is not ``_active``. + """ + if not self._active: + logger.warning(f"G1GrootWBCTask '{self._name}' arm() called before start() — ignoring") + return False + if self._armed: + logger.info(f"G1GrootWBCTask '{self._name}' already armed — arm() ignored") + return False + if self._arming or self._arm_pending: + logger.info(f"G1GrootWBCTask '{self._name}' arm in progress -- arm() ignored") + return False + ramp = ramp_seconds if ramp_seconds is not None else self._config.default_ramp_seconds + self._arming_duration = max(0.0, float(ramp)) + self._arm_pending = True + logger.info( + f"G1GrootWBCTask '{self._name}' arm requested (ramp={self._arming_duration:.1f}s)" + ) + return True + + def disarm(self) -> bool: + """Stop emitting policy outputs; fall back to hold-current-pose. + + Called either from an operator ``Disarm`` button or from + safety watchdogs. Resets obs history so the next ``arm()`` + starts with a clean buffer. + """ + if not self._armed and not self._arming and not self._arm_pending: + return False + self._armed = False + self._arming = False + self._arm_pending = False + self._ramp_start = None + self._reset_policy_state() + logger.info(f"G1GrootWBCTask '{self._name}' disarmed (holding current pose)") + return True + + def set_dry_run(self, enabled: bool) -> None: + """Enable/disable dry-run. + + In dry-run the policy still runs (obs history stays hot) but + ``compute()`` returns ``None``, so the coordinator forwards no + command to the adapter. Use to verify policy sanity on real + hardware before committing motor torques. + """ + new_val = bool(enabled) + if new_val == self._dry_run: + return + self._dry_run = new_val + self._last_dry_run_log_t = 0.0 + logger.info(f"G1GrootWBCTask '{self._name}' dry_run = {new_val}") + + def state_snapshot(self) -> dict[str, object]: + """Return the current state-machine flags for UI / telemetry.""" + return { + "active": self._active, + "armed": self._armed, + "arming": self._arming, + "arm_pending": self._arm_pending, + "dry_run": self._dry_run, + "arming_duration": self._arming_duration, + } + + # Internal helpers + + def _reset_policy_state(self) -> None: + """Clear inference state — obs history, last action, tick count.""" + self._last_action[:] = 0.0 + self._obs_buf[:] = 0.0 + self._first_inference = True + self._tick_count = 0 + + def _build_obs( + self, + cmd: np.ndarray, + gyro: np.ndarray, + gravity: np.ndarray, + q: np.ndarray, + dq: np.ndarray, + ) -> np.ndarray: + """Build the 86-dim GR00T observation. Layout matches + ``groot_wbc_backend.py`` exactly.""" + obs = np.zeros(_SINGLE_OBS_DIM, dtype=np.float32) + obs[0:3] = cmd * self._cmd_scale + obs[3] = self._config.height_cmd + obs[4:7] = 0.0 + obs[7:10] = gyro * self._config.obs_ang_vel_scale + obs[10:13] = gravity + obs[13:42] = (q - self._default_29) * self._config.obs_dof_pos_scale + obs[42:71] = dq * self._config.obs_dof_vel_scale + obs[71:86] = self._last_action + return obs + + @staticmethod + def _projected_gravity(quaternion: tuple[float, ...]) -> np.ndarray: + """Project world gravity into body frame. + + Uses Unitree DDS quaternion order (w, x, y, z). Formula matches + ``groot_wbc_backend._get_gravity_orientation`` and is + algebraically equivalent to the Go2 RLPolicyTask helper. + """ + w, x, y, z = quaternion + gx = 2.0 * (-x * z + w * y) + gy = 2.0 * (-y * z - w * x) + gz = -(w * w - x * x - y * y + z * z) + return np.array([gx, gy, gz], dtype=np.float32) + + +__all__ = [ + "ARM_DEFAULT_POSE", + "G1_GROOT_KD", + "G1_GROOT_KP", + "G1GrootWBCTask", + "G1GrootWBCTaskConfig", + "g1_arms", + "g1_joints", + "g1_legs_waist", +] + + +def create_task(cfg: Any, hardware: Any) -> G1GrootWBCTask: + from pathlib import Path + + from dimos.control.hardware_interface import ConnectedWholeBody + + if cfg.model_path is None: + raise ValueError( + f"G1GrootWBCTask {cfg.name!r} requires model_path " + f"(directory containing balance.onnx + walk.onnx)" + ) + if cfg.hardware_id is None: + raise ValueError(f"G1GrootWBCTask {cfg.name!r} requires hardware_id in TaskConfig") + hw = hardware.get(cfg.hardware_id) if hardware else None + if hw is None: + raise ValueError( + f"G1GrootWBCTask {cfg.name!r} references unknown hardware " + f"{cfg.hardware_id!r}. Declare the hardware before the task " + f"in the blueprint config." + ) + if not isinstance(hw, ConnectedWholeBody): + raise TypeError( + f"G1GrootWBCTask {cfg.name!r} requires a WHOLE_BODY hardware " + f"component for {cfg.hardware_id!r}, got {type(hw).__name__}. " + f"Set hardware_type=HardwareType.WHOLE_BODY." + ) + + model_dir = Path(cfg.model_path) + kwargs: dict[str, Any] = dict( + balance_onnx=model_dir / "balance.onnx", + walk_onnx=model_dir / "walk.onnx", + joint_names=cfg.joint_names, + all_joint_names=hw.joint_names, + priority=cfg.priority, + auto_arm=cfg.auto_arm, + auto_dry_run=cfg.auto_dry_run, + default_ramp_seconds=cfg.default_ramp_seconds, + ) + if cfg.decimation is not None: + kwargs["decimation"] = cfg.decimation + return G1GrootWBCTask( + cfg.name, + G1GrootWBCTaskConfig(**kwargs), + adapter=hw.adapter, + ) diff --git a/dimos/control/tasks/registry.py b/dimos/control/tasks/registry.py new file mode 100644 index 0000000000..f61cd0aeb6 --- /dev/null +++ b/dimos/control/tasks/registry.py @@ -0,0 +1,97 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ControlTask registry with lazy factories. + +The registry keeps task-specific imports out of ``ControlCoordinator`` +without importing every task module at startup. Some task modules depend +on heavier packages such as Pinocchio or ONNX Runtime, so factories are +resolved only for the requested task type. + +Usage: + from dimos.control.tasks.registry import control_task_registry + + task = control_task_registry.create(cfg.type, cfg, hardware=self._hardware) + print(control_task_registry.available()) +""" + +from __future__ import annotations + +from collections.abc import Callable, Mapping +import importlib +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from dimos.control.coordinator import TaskConfig + from dimos.control.hardware_interface import ConnectedHardware, ConnectedWholeBody + from dimos.control.task import ControlTask + +TaskFactory = Callable[..., "ControlTask"] + + +class ControlTaskRegistry: + """Registry for control-task factories with lazy imports.""" + + def __init__(self) -> None: + self._factory_paths: dict[str, str] = { + "trajectory": "dimos.control.tasks.trajectory_task:create_task", + "servo": "dimos.control.tasks.servo_task:create_task", + "velocity": "dimos.control.tasks.velocity_task:create_task", + "cartesian_ik": "dimos.control.tasks.cartesian_ik_task:create_task", + "teleop_ik": "dimos.control.tasks.teleop_task:create_task", + "g1_groot_wbc": "dimos.control.tasks.g1_groot_wbc_task:create_task", + } + self._factories: dict[str, TaskFactory] = {} + + def create( + self, + name: str, + cfg: TaskConfig, + *, + hardware: Mapping[str, ConnectedHardware | ConnectedWholeBody] | None = None, + ) -> ControlTask: + """Instantiate a task by registered name. + + Args: + name: Registered task-type name (e.g. ``"trajectory"``). + cfg: ``TaskConfig`` carrying name/joint_names/priority and + whatever else this task needs. + hardware: Coordinator's hardware map. Tasks that need an + adapter resolve via ``cfg.hardware_id``; pass ``None`` + only if no task in this registry needs hardware. + """ + key = name.lower() + factory = self._resolve_factory(key) + return factory(cfg=cfg, hardware=hardware or {}) + + def available(self) -> list[str]: + return sorted(self._factory_paths.keys()) + + def _resolve_factory(self, key: str) -> TaskFactory: + if key in self._factories: + return self._factories[key] + if key not in self._factory_paths: + raise ValueError(f"Unknown task type: {key!r}. Available: {self.available()}") + module_name, attr = self._factory_paths[key].split(":", maxsplit=1) + module = importlib.import_module(module_name) + factory = getattr(module, attr) + if not callable(factory): + raise TypeError(f"Task factory {self._factory_paths[key]!r} is not callable") + self._factories[key] = factory + return factory + + +control_task_registry = ControlTaskRegistry() + +__all__ = ["ControlTaskRegistry", "TaskFactory", "control_task_registry"] diff --git a/dimos/control/tasks/servo_task.py b/dimos/control/tasks/servo_task.py index ac83a16244..1130bb697c 100644 --- a/dimos/control/tasks/servo_task.py +++ b/dimos/control/tasks/servo_task.py @@ -25,6 +25,8 @@ from dataclasses import dataclass import threading +import time +from typing import Any from dimos.control.task import ( BaseControlTask, @@ -46,11 +48,17 @@ class JointServoTaskConfig: joint_names: List of joint names this task controls priority: Priority for arbitration (higher wins) timeout: If no command received for this many seconds, go inactive (0 = never timeout) + default_positions: Optional initial target held until/unless a + new target arrives via set_target(). Must match joint_names + length if provided. Useful for "hold at this pose" tasks + (e.g. arms during whole-body locomotion). Pair with + timeout=0.0 to hold indefinitely. """ joint_names: list[str] priority: int = 10 timeout: float = 0.5 # 500ms default timeout + default_positions: list[float] | None = None class JointServoTask(BaseControlTask): @@ -99,6 +107,15 @@ def __init__(self, name: str, config: JointServoTaskConfig) -> None: self._last_update_time: float = 0.0 self._active = False + if config.default_positions is not None: + if len(config.default_positions) != self._num_joints: + raise ValueError( + f"JointServoTask '{name}': default_positions length " + f"{len(config.default_positions)} does not match " + f"joint_names length {self._num_joints}" + ) + self._target = list(config.default_positions) + logger.info(f"JointServoTask {name} initialized for joints: {config.joint_names}") @property @@ -211,6 +228,11 @@ def start(self) -> None: """Activate the task (start accepting and outputting commands).""" with self._lock: self._active = True + # Refresh the timeout reference so a caller that re-starts + # the task after a long idle window (or uses default_positions + # with a non-zero timeout) doesn't time out on the first tick + # from the stale 0.0 left at construction. + self._last_update_time = time.perf_counter() logger.info(f"JointServoTask {self._name} started") def stop(self) -> None: @@ -236,3 +258,15 @@ def is_streaming(self) -> bool: "JointServoTask", "JointServoTaskConfig", ] + + +def create_task(cfg: Any, hardware: Any) -> JointServoTask: + kwargs: dict[str, object] = { + "joint_names": cfg.joint_names, + "priority": cfg.priority, + } + if cfg.default_positions is not None: + kwargs["default_positions"] = cfg.default_positions + # Zero timeout pairs naturally with default-hold. + kwargs["timeout"] = 0.0 + return JointServoTask(cfg.name, JointServoTaskConfig(**kwargs)) # type: ignore[arg-type] diff --git a/dimos/control/tasks/teleop_task.py b/dimos/control/tasks/teleop_task.py index 359e425c57..14f1b5596b 100644 --- a/dimos/control/tasks/teleop_task.py +++ b/dimos/control/tasks/teleop_task.py @@ -360,3 +360,21 @@ def stop(self) -> None: "TeleopIKTask", "TeleopIKTaskConfig", ] + + +def create_task(cfg: Any, hardware: Any) -> TeleopIKTask: + if cfg.model_path is None: + raise ValueError(f"TeleopIKTask {cfg.name!r} requires model_path in TaskConfig") + return TeleopIKTask( + cfg.name, + TeleopIKTaskConfig( + joint_names=cfg.joint_names, + model_path=cfg.model_path, + ee_joint_id=cfg.ee_joint_id, + priority=cfg.priority, + hand=cfg.hand, + gripper_joint=cfg.gripper_joint, + gripper_open_pos=cfg.gripper_open_pos, + gripper_closed_pos=cfg.gripper_closed_pos, + ), + ) diff --git a/dimos/control/tasks/trajectory_task.py b/dimos/control/tasks/trajectory_task.py index 90a7d57479..e3a4857368 100644 --- a/dimos/control/tasks/trajectory_task.py +++ b/dimos/control/tasks/trajectory_task.py @@ -24,6 +24,7 @@ from __future__ import annotations from dataclasses import dataclass +from typing import Any from dimos.control.task import ( BaseControlTask, @@ -256,3 +257,13 @@ def get_progress(self, t_now: float) -> float: "JointTrajectoryTask", "JointTrajectoryTaskConfig", ] + + +def create_task(cfg: Any, hardware: Any) -> JointTrajectoryTask: + return JointTrajectoryTask( + cfg.name, + JointTrajectoryTaskConfig( + joint_names=cfg.joint_names, + priority=cfg.priority, + ), + ) diff --git a/dimos/control/tasks/velocity_task.py b/dimos/control/tasks/velocity_task.py index 21f67a7293..51eac8f2b0 100644 --- a/dimos/control/tasks/velocity_task.py +++ b/dimos/control/tasks/velocity_task.py @@ -27,6 +27,7 @@ from dataclasses import dataclass import threading +from typing import Any from dimos.control.task import ( BaseControlTask, @@ -271,3 +272,13 @@ def is_streaming(self) -> bool: "JointVelocityTask", "JointVelocityTaskConfig", ] + + +def create_task(cfg: Any, hardware: Any) -> JointVelocityTask: + return JointVelocityTask( + cfg.name, + JointVelocityTaskConfig( + joint_names=cfg.joint_names, + priority=cfg.priority, + ), + ) diff --git a/dimos/control/tick_loop.py b/dimos/control/tick_loop.py index b06cc8a5d4..d38b7e76d2 100644 --- a/dimos/control/tick_loop.py +++ b/dimos/control/tick_loop.py @@ -48,6 +48,7 @@ from dimos.control.components import HardwareId, JointName, TaskName from dimos.control.hardware_interface import ConnectedHardware from dimos.hardware.manipulators.spec import ControlMode + from dimos.hardware.whole_body.spec import IMUState logger = setup_logger() @@ -174,7 +175,8 @@ def _tick(self) -> None: self._tick_count += 1 joint_states = self._read_all_hardware() - state = CoordinatorState(joints=joint_states, t_now=t_now, dt=dt) + imu_states = self._read_all_imu() + state = CoordinatorState(joints=joint_states, imu=imu_states, t_now=t_now, dt=dt) commands = self._compute_all_tasks(state) @@ -222,6 +224,29 @@ def _read_all_hardware(self) -> JointStateSnapshot: timestamp=time.time(), ) + def _read_all_imu(self) -> dict[str, IMUState]: + """Poll IMU from every whole-body hardware in the pool. + + Tasks read this through ``CoordinatorState.imu[hardware_id]`` + instead of reaching into adapters directly. Hardware without + IMU support is absent from the dict. + """ + from dimos.control.hardware_interface import ConnectedWholeBody + + out: dict[str, IMUState] = {} + with self._hardware_lock: + for hw_id, hw in self._hardware.items(): + if not isinstance(hw, ConnectedWholeBody): + continue + read_imu = getattr(hw.adapter, "read_imu", None) + if not callable(read_imu): + continue + try: + out[hw_id] = read_imu() + except Exception as e: + logger.error(f"Failed to read IMU from {hw_id}: {e}") + return out + def _compute_all_tasks( self, state: CoordinatorState ) -> list[tuple[ControlTask, ResourceClaim, JointCommandOutput | None]]: diff --git a/dimos/hardware/whole_body/registry.py b/dimos/hardware/whole_body/registry.py index ddab15b0fc..caf305601e 100644 --- a/dimos/hardware/whole_body/registry.py +++ b/dimos/hardware/whole_body/registry.py @@ -14,14 +14,21 @@ """WholeBodyAdapter registry with auto-discovery. -Mirrors the TwistBaseAdapterRegistry pattern: each subpackage provides a -``register(registry)`` function in its ``adapter.py`` module. +Each adapter module exposes a ``register(registry)`` function that the +registry calls during discovery. Two roots are scanned: + +* ``dimos/hardware/whole_body/`` — real-hardware adapters (Unitree DDS, + transport-LCM bridge). Subpackages are either flat ``/adapter.py`` + or nested ``//adapter.py``. +* ``dimos/simulation/adapters/whole_body/`` — sim adapters + (``g1.py``, etc.). Sim engines live under ``dimos/simulation/`` so + their adapter glue lives there too instead of under ``hardware/``. Usage: from dimos.hardware.whole_body.registry import whole_body_adapter_registry - adapter = whole_body_adapter_registry.create("unitree_go2") - print(whole_body_adapter_registry.available()) # ["unitree_go2"] + adapter = whole_body_adapter_registry.create("sim_mujoco_g1") + print(whole_body_adapter_registry.available()) # ["sim_mujoco_g1", ...] """ from __future__ import annotations @@ -64,20 +71,58 @@ def available(self) -> list[str]: return sorted(self._adapters.keys()) def discover(self) -> None: - """Discover and register adapters from subpackages.""" - import dimos.hardware.whole_body as pkg + """Discover and register adapters from both subpackages. + + Walks each scan root recursively (max depth 2) looking for either + an ``adapter.py`` module (legacy hardware/whole_body layout) or a + flat ``.py`` module (simulation/adapters/whole_body layout), + each providing a ``register(registry)`` function. Skips dunder/ + dot directories. + """ + import dimos.hardware.whole_body as hw_pkg + import dimos.simulation.adapters.whole_body as sim_pkg + + self._discover_in( + "dimos.hardware.whole_body", hw_pkg.__path__[0], max_depth=2 + ) + self._discover_flat( + "dimos.simulation.adapters.whole_body", sim_pkg.__path__[0] + ) + + def _discover_flat(self, pkg_path: str, dir_path: str) -> None: + """Discovery variant for the simulation adapters layout: each + module is a flat ``.py`` (not a ``/adapter.py`` + subpackage). Skips dunder files and __init__.py.""" + for entry in sorted(os.listdir(dir_path)): + if not entry.endswith(".py") or entry.startswith("_"): + continue + mod_name = f"{pkg_path}.{entry[:-3]}" + try: + mod = importlib.import_module(mod_name) + except ImportError as e: + logger.warning(f"Skipping whole-body adapter {mod_name}: {e}") + continue + if hasattr(mod, "register"): + mod.register(self) - pkg_dir = pkg.__path__[0] - for entry in sorted(os.listdir(pkg_dir)): - entry_path = os.path.join(pkg_dir, entry) + def _discover_in(self, pkg_path: str, dir_path: str, *, max_depth: int) -> None: + for entry in sorted(os.listdir(dir_path)): + entry_path = os.path.join(dir_path, entry) if not os.path.isdir(entry_path) or entry.startswith(("_", ".")): continue + sub_pkg_path = f"{pkg_path}.{entry}" + adapter_module = f"{sub_pkg_path}.adapter" try: - mod = importlib.import_module(f"dimos.hardware.whole_body.{entry}.adapter") - if hasattr(mod, "register"): - mod.register(self) + mod = importlib.import_module(adapter_module) except ImportError as e: - logger.warning(f"Skipping whole-body adapter {entry}: {e}") + # No adapter.py at this level — recurse one deeper if budget left. + if max_depth > 1: + self._discover_in(sub_pkg_path, entry_path, max_depth=max_depth - 1) + else: + logger.warning(f"Skipping whole-body adapter {entry}: {e}") + continue + if hasattr(mod, "register"): + mod.register(self) whole_body_adapter_registry = WholeBodyAdapterRegistry() diff --git a/dimos/hardware/whole_body/spec.py b/dimos/hardware/whole_body/spec.py index 4645e8c44e..6cb88751f0 100644 --- a/dimos/hardware/whole_body/spec.py +++ b/dimos/hardware/whole_body/spec.py @@ -54,6 +54,25 @@ class IMUState: rpy: tuple[float, float, float] = (0.0, 0.0, 0.0) +@dataclass(frozen=True) +class WholeBodyConfig: + """Whole-body-specific component config. + + Lives on ``HardwareComponent.wb_config`` for components of type WHOLE_BODY. + Keeps PD gains (and any future whole-body-only knobs) off the generic + HardwareComponent shared by all hardware kinds. + + Attributes: + kp: Per-joint position gains used by ConnectedWholeBody when + translating position commands to MotorCommand. Length must + match the component's ``joints`` list when set. + kd: Per-joint velocity gains. Same length constraint. + """ + + kp: tuple[float, ...] | None = None + kd: tuple[float, ...] | None = None + + @runtime_checkable class WholeBodyAdapter(Protocol): """Joint-level whole-body motor IO. SI units (rad, rad/s, Nm).""" @@ -64,6 +83,7 @@ def is_connected(self) -> bool: ... def read_motor_states(self) -> list[MotorState]: ... def has_motor_states(self) -> bool: ... def read_imu(self) -> IMUState: ... + def write_motor_commands(self, commands: list[MotorCommand]) -> bool: ... @@ -74,4 +94,5 @@ def write_motor_commands(self, commands: list[MotorCommand]) -> bool: ... "MotorCommand", "MotorState", "WholeBodyAdapter", + "WholeBodyConfig", ] diff --git a/dimos/hardware/whole_body/transport/adapter.py b/dimos/hardware/whole_body/transport/adapter.py index 8950803bb3..14f79cce3d 100644 --- a/dimos/hardware/whole_body/transport/adapter.py +++ b/dimos/hardware/whole_body/transport/adapter.py @@ -15,7 +15,8 @@ """Transport-based whole-body adapter: bridges coordinator ↔ Module via pub/sub. Subscribes /{hardware_id}/motor_states + /{hardware_id}/imu, publishes -/{hardware_id}/motor_command. ``network_interface`` is accepted but ignored. +/{hardware_id}/motor_command. Address-style arguments are accepted via +``**_`` and ignored — the transport layer owns the wire. """ from __future__ import annotations @@ -45,7 +46,6 @@ def __init__( dof: int = 29, hardware_id: str = "wholebody", transport_cls: type = LCMTransport, - network_interface: int | str = "", # accepted-and-ignored — see module docstring **_: object, ) -> None: self._dof = dof diff --git a/dimos/protocol/service/system_configurator/libpython.py b/dimos/protocol/service/system_configurator/libpython.py index 1aa16f4d3d..82341d79e8 100644 --- a/dimos/protocol/service/system_configurator/libpython.py +++ b/dimos/protocol/service/system_configurator/libpython.py @@ -15,8 +15,14 @@ """Ensure libpython is available in the venv for MuJoCo's mjpython on macOS. When Python is installed via uv, mjpython fails because it expects -libpython at .venv/lib/ but uv places it in its own managed directory. -This configurator creates a symlink so mjpython can find the library. +libpython at a path that uv doesn't populate. mjpython's launcher script +parses @executable_path rpaths from the python binary via otool and +applies os.path.dirname() to each, feeding the results into +DYLD_FALLBACK_LIBRARY_PATH. For uv's cpython build with +``LC_RPATH = @executable_path/../lib``, this buggy dirname computation +resolves to the venv root (``.venv/``), not ``.venv/lib/`` — so that's +where mjpython actually searches. This configurator creates the symlink +at the venv root so the dlopen succeeds. """ from __future__ import annotations @@ -44,10 +50,14 @@ def check(self) -> bool: return True self._missing.clear() - venv_lib = Path(sys.prefix) / "lib" + venv_root = Path(sys.prefix) + venv_lib = venv_root / "lib" real_lib = Path(sys.executable).resolve().parent.parent / "lib" for dylib in real_lib.glob("libpython*.dylib"): + target = venv_root / dylib.name + if not target.exists(): + self._missing.append((target, dylib)) target = venv_lib / dylib.name if not target.exists(): self._missing.append((target, dylib)) diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index 48658a53bf..53c124d737 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -87,6 +87,7 @@ "unitree-g1-coordinator": "dimos.robot.unitree.g1.blueprints.basic.unitree_g1_coordinator:unitree_g1_coordinator", "unitree-g1-detection": "dimos.robot.unitree.g1.blueprints.perceptive.unitree_g1_detection:unitree_g1_detection", "unitree-g1-full": "dimos.robot.unitree.g1.blueprints.agentic.unitree_g1_full:unitree_g1_full", + "unitree-g1-groot-wbc": "dimos.robot.unitree.g1.blueprints.basic.unitree_g1_groot_wbc:unitree_g1_groot_wbc", "unitree-g1-joystick": "dimos.robot.unitree.g1.blueprints.basic.unitree_g1_joystick:unitree_g1_joystick", "unitree-g1-shm": "dimos.robot.unitree.g1.blueprints.perceptive.unitree_g1_shm:unitree_g1_shm", "unitree-g1-sim": "dimos.robot.unitree.g1.blueprints.perceptive.unitree_g1_sim:unitree_g1_sim", diff --git a/dimos/robot/cli/dimos.py b/dimos/robot/cli/dimos.py index 522faf50be..5c3169723f 100644 --- a/dimos/robot/cli/dimos.py +++ b/dimos/robot/cli/dimos.py @@ -24,7 +24,7 @@ import sys import time import types -from typing import TYPE_CHECKING, Any, Union, get_args, get_origin +from typing import TYPE_CHECKING, Any, Union, cast, get_args, get_origin import click from dotenv import load_dotenv @@ -121,6 +121,13 @@ def callback(**kwargs) -> None: # type: ignore[no-untyped-def] main.add_typer(go2tool_app, name="go2tool") +def _format_arg_default(value: Any) -> str: + with suppress(AttributeError): + filename = object.__getattribute__(value, "_lfs_filename") + return f"LfsPath({filename})" + return str(value) + + def arg_help( config: type[BaseModel], blueprint: Blueprint, @@ -155,7 +162,7 @@ def arg_help( display_type = t.__name__ if isinstance(t, type) else t required = "[Required] " if info.is_required() and k not in _atom.kwargs else "" d = _atom.kwargs.get(k, info.default) - default = f" (default: {d})" if d is not PydanticUndefined else "" + default = f" (default: {_format_arg_default(d)})" if d is not PydanticUndefined else "" output += f"{indent}* {required}{module}{k}: {display_type}{default}\n" return output @@ -224,6 +231,10 @@ def run( cli_config_overrides: dict[str, Any] = ctx.obj + # Apply CLI config before importing blueprints that branch on global_config. + if cli_config_overrides: + global_config.update(**cli_config_overrides) + # Clean stale registry entries stale = cleanup_stale() if stale: @@ -662,17 +673,22 @@ def send( @main.command(name="rerun-bridge") def rerun_bridge_cmd( - viewer_mode: str = typer.Option( - "native", help="Viewer mode: native (desktop), web (browser), none (headless)" - ), memory_limit: str = typer.Option( "25%", help="Memory limit for Rerun viewer (e.g., '4GB', '16GB', '25%')" ), + rerun_open: str = typer.Option("native", help="How to open Rerun: native, web, both, none"), + rerun_web: bool = typer.Option( + True, "--rerun-web/--no-rerun-web", help="Enable/Disable Rerun web server" + ), ) -> None: """Launch the Rerun visualization bridge.""" from dimos.visualization.rerun.bridge import run_bridge - run_bridge(viewer_mode=viewer_mode, memory_limit=memory_limit) + run_bridge( + memory_limit=memory_limit, + rerun_open=cast("RerunOpenOption", rerun_open), + rerun_web=rerun_web, + ) if __name__ == "__main__": diff --git a/dimos/robot/unitree/g1/blueprints/basic/unitree_g1_groot_wbc.py b/dimos/robot/unitree/g1/blueprints/basic/unitree_g1_groot_wbc.py new file mode 100644 index 0000000000..55ff431c64 --- /dev/null +++ b/dimos/robot/unitree/g1/blueprints/basic/unitree_g1_groot_wbc.py @@ -0,0 +1,186 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unitree G1 GR00T whole-body-control blueprint. + +One blueprint, ``--simulation`` flag picks the backend: + +Real hardware (default): + G1WholeBodyConnection (DDS rt/lowstate <-> rt/lowcmd) + transport_lcm + whole-body adapter. 500 Hz tick. Safety profile: unarmed + dry-run on + start; the operator clicks Activate in the dashboard at :7779 and the + policy ramps from the current pose to its bent-knee default over 10 s + before taking torque control. The 14 arm joints are held at the + relaxed GR00T-trained default via a lower-priority servo task. + +Sim (``--simulation``): + MujocoSimModule (in-process MuJoCo + SHM) + sim_mujoco_g1 adapter. + 50 Hz tick (matches the rate the policy was trained at). No arming + ramp, no dry-run, no servo_arms -- sim physics doesn't gravity-collapse + the arms between trajectories. Optional passive viewer runs in the + engine subprocess; flip on via + ``-o mujocosimmodule.engine_mode=subprocess -o mujocosimmodule.headless=false`` + so the viewer lives on that subprocess's main thread. + +Usage: + dimos run unitree-g1-groot-wbc # real hardware + dimos --simulation run unitree-g1-groot-wbc # sim + +Overrides (replace the old env-var dance): + dimos run unitree-g1-groot-wbc \\ + -o g1wholebodyconnection.network_interface=enp2s0 + dimos --simulation run unitree-g1-groot-wbc \\ + -o mujocosimmodule.engine_mode=subprocess -o mujocosimmodule.headless=false +""" + +from __future__ import annotations + +from pathlib import Path + +from dimos.control.components import HardwareComponent, HardwareType +from dimos.control.coordinator import ControlCoordinator, TaskConfig +from dimos.control.tasks.g1_groot_wbc_task import ( + ARM_DEFAULT_POSE, + G1_GROOT_KD, + G1_GROOT_KP, + g1_arms, + g1_joints, + g1_legs_waist, +) +from dimos.core.coordination.blueprints import autoconnect +from dimos.core.global_config import global_config +from dimos.core.transport import LCMTransport +from dimos.hardware.whole_body.spec import WholeBodyConfig +from dimos.msgs.geometry_msgs.Twist import Twist +from dimos.msgs.sensor_msgs.Imu import Imu +from dimos.msgs.sensor_msgs.JointState import JointState +from dimos.msgs.sensor_msgs.MotorCommandArray import MotorCommandArray +from dimos.robot.unitree.g1.wholebody_connection import G1WholeBodyConnection +from dimos.simulation.engines.mujoco_sim_module import MujocoSimModule +from dimos.utils.data import LfsPath +from dimos.web.websocket_vis.websocket_vis_module import WebsocketVisModule + +# Lazy data handles. LfsPath only triggers the LFS pull on first +# str()/open(); using ``get_data(...)`` at import time would block the +# whole CLI on a multi-GB download every time the module is imported. +_GROOT_MODEL_DIR = LfsPath("groot") +_MJCF_PATH = LfsPath("mujoco_sim/g1_gear_wbc.xml") + +_adapter_address: str | Path + +if global_config.simulation: + # Sim backend: MuJoCo engine via SHM. + _backend = MujocoSimModule.blueprint( + address=_MJCF_PATH, + headless=True, + dof=29, + enable_color=False, + enable_depth=False, + enable_pointcloud=False, + inject_legacy_assets=True, + ) + # MujocoSimModule's ``odom`` Out is the sole producer of ``/odom`` + # now — the coordinator no longer polls the whole-body adapter for + # base pose (read_odom was dropped from the Protocol). autoconnect + # maps ``(odom, PoseStamped)`` to ``/odom`` by default; no override. + _adapter_type = "sim_mujoco_g1" + _adapter_address = _MJCF_PATH + _tick_rate = 50.0 + _auto_arm = True + _auto_dry_run = False + _default_ramp_seconds = 0.0 + _decimation: int | None = 1 + # Sim physics holds the arms between trajectories on its own -- no + # servo task needed. + _arm_holder: TaskConfig | None = None +else: + # Real-hw backend: DDS connection module + transport_lcm adapter. + _backend = G1WholeBodyConnection.blueprint(release_sport_mode=True) + _adapter_type = "transport_lcm" + _adapter_address = "" + _tick_rate = 500.0 + # Real hardware: come up unarmed + dry-run; operator must click + # Activate (10 s ramp) after verifying commands. + _auto_arm = False + _auto_dry_run = True + _default_ramp_seconds = 10.0 + _decimation = None # task default (10) pairs with 500 Hz tick. + # Real hardware needs the arms held -- kd damping alone would let + # them sag toward singular configurations between trajectories. + _arm_holder = TaskConfig( + name="servo_arms", + type="servo", + joint_names=g1_arms, + priority=10, + default_positions=ARM_DEFAULT_POSE, + auto_start=True, + ) + + +_coordinator = ControlCoordinator.blueprint( + tick_rate=_tick_rate, + publish_joint_state=True, + joint_state_frame_id="coordinator", + hardware=[ + HardwareComponent( + hardware_id="g1", + hardware_type=HardwareType.WHOLE_BODY, + joints=g1_joints, + adapter_type=_adapter_type, + address=_adapter_address, + auto_enable=True, + wb_config=WholeBodyConfig(kp=tuple(G1_GROOT_KP), kd=tuple(G1_GROOT_KD)), + ), + ], + tasks=[ + TaskConfig( + name="groot_wbc", + type="g1_groot_wbc", + joint_names=g1_legs_waist, + priority=50, + model_path=_GROOT_MODEL_DIR, + hardware_id="g1", + auto_start=True, + auto_arm=_auto_arm, + auto_dry_run=_auto_dry_run, + default_ramp_seconds=_default_ramp_seconds, + decimation=_decimation, + ), + *([_arm_holder] if _arm_holder is not None else []), + ], +).transports( + { + ("joint_state", JointState): LCMTransport("/coordinator/joint_state", JointState), + ("joint_command", JointState): LCMTransport("/g1/joint_command", JointState), + ("twist_command", Twist): LCMTransport("/g1/cmd_vel", Twist), + # Real-hw only: the transport_lcm adapter speaks to + # G1WholeBodyConnection over these topics. autoconnect already + # matches by (name, type) so sim doesn't need them -- they're + # harmless when the sim engine doesn't expose those ports. + ("motor_states", JointState): LCMTransport("/g1/motor_states", JointState), + ("imu", Imu): LCMTransport("/g1/imu", Imu), + ("motor_command", MotorCommandArray): LCMTransport("/g1/motor_command", MotorCommandArray), + } +) + +# Operator dashboard at http://localhost:7779/ -- Arm/Disarm + Dry-Run +# buttons call ControlCoordinator.set_activated() / .set_dry_run() +# directly via RPC (no LCM round-trip). +_ws_vis = WebsocketVisModule.blueprint().transports( + {("tele_cmd_vel", Twist): LCMTransport("/g1/cmd_vel", Twist)} +) + +unitree_g1_groot_wbc = autoconnect(_backend, _coordinator, _ws_vis) + +__all__ = ["unitree_g1_groot_wbc"] diff --git a/dimos/robot/unitree/g1/wholebody_connection.py b/dimos/robot/unitree/g1/wholebody_connection.py index f4fb762bae..925df49fff 100644 --- a/dimos/robot/unitree/g1/wholebody_connection.py +++ b/dimos/robot/unitree/g1/wholebody_connection.py @@ -51,7 +51,12 @@ _NUM_MOTORS = 29 _NUM_MOTOR_SLOTS = 35 # G1 hg LowCmd has 35 slots; only 29 are used -_MODE_MACHINE_WAIT_S = 10.0 +# mode_machine is a static value identifying the G1 firmware/hardware +# variant. Older code read it back from the first LowState frame and +# echoed it into LowCmd; that callback path is unreliable on macOS +# cyclonedds, and the value never changes for a given robot, so we +# hardcode it. 29-DOF G1 (gear) reports 5. +_MODE_MACHINE_G1: int = 5 # Joint names sourced from the canonical helper. Order matches the motor index # convention above. Single-source-of-truth so any coordinator-side adapter built @@ -84,8 +89,12 @@ def __init__(self, **kwargs: Any) -> None: self._low_cmd: LowCmd_ | None = None self._low_state: LowState_ | None = None self._crc: CRC | None = None - # mode_machine: read from first LowState, echoed back in every LowCmd. + # mode_machine: hardcoded at start() to the static value for the + # 29-DOF G1. We log a one-shot warning if the first LowState we + # read disagrees — that's the early signal of firmware drift on a + # variant that needs a different value. self._mode_machine: int | None = None + self._mode_machine_verified: bool = False # Guards _low_cmd / _low_state / _mode_machine across DDS, publish, and LCM threads. self._lock = threading.Lock() self._stop_event = threading.Event() @@ -119,12 +128,19 @@ def start(self) -> None: self._publisher = ChannelPublisher("rt/lowcmd", LowCmd_) self._publisher.Init() + # Passive subscriber — Read() per tick from the publish loop. The + # callback variant (Init(self._on_low_state, 10)) doesn't fire + # reliably under cyclonedds on macOS, which used to leave us + # blocked here forever waiting for a first LowState. self._subscriber = ChannelSubscriber("rt/lowstate", LowState_) - self._subscriber.Init(self._on_low_state, 10) + self._subscriber.Init(None, 0) # POS_STOP/VEL_STOP + zero gains so the robot can't twitch pre-command. self._low_cmd = unitree_hg_msg_dds__LowCmd_() self._low_cmd.mode_pr = 0 # PR (pitch/roll) mode + # mode_machine is a static value (see comment above the constant). + self._mode_machine = _MODE_MACHINE_G1 + self._low_cmd.mode_machine = self._mode_machine for i in range(_NUM_MOTOR_SLOTS): self._low_cmd.motor_cmd[i].mode = 0x01 # enable self._low_cmd.motor_cmd[i].q = POS_STOP @@ -141,16 +157,6 @@ def start(self) -> None: else: logger.info("Skipping sport mode release (release_sport_mode=False)") - logger.info("Waiting for first LowState to capture mode_machine...") - deadline = time.time() + _MODE_MACHINE_WAIT_S - while self._mode_machine is None and time.time() < deadline: - time.sleep(0.1) - if self._mode_machine is None: - raise RuntimeError( - f"Timed out after {_MODE_MACHINE_WAIT_S:.1f}s waiting for " - f"first LowState — mode_machine never captured" - ) - logger.info(f"G1WholeBodyConnection connected (mode_machine={self._mode_machine})") self.register_disposable(Disposable(self.motor_command.subscribe(self._on_motor_command))) @@ -167,6 +173,29 @@ def stop(self) -> None: self._publish_thread.join(timeout=DEFAULT_THREAD_JOIN_TIMEOUT) self._publish_thread = None + # Final safe-stop lowcmd: disable every motor (mode=0x00, kp=kd=0, + # tau=0). Without this, the motors freeze stiffly at whatever + # the last commanded pose was and the next ``dimos run`` opens + # against a robot that's actively fighting its own controllers + # — observed as horrible mechanical noise during sport-mode + # release. Best-effort: any failure is logged, not raised, so + # cleanup still drains the DDS endpoints. + if self._publisher is not None and self._low_cmd is not None and self._crc is not None: + try: + with self._lock: + for i in range(_NUM_MOTOR_SLOTS): + self._low_cmd.motor_cmd[i].mode = 0x00 # disable + self._low_cmd.motor_cmd[i].q = POS_STOP + self._low_cmd.motor_cmd[i].dq = VEL_STOP + self._low_cmd.motor_cmd[i].kp = 0 + self._low_cmd.motor_cmd[i].kd = 0 + self._low_cmd.motor_cmd[i].tau = 0 + self._low_cmd.crc = self._crc.Crc(self._low_cmd) + self._publisher.Write(self._low_cmd) + logger.info("Sent safe-stop lowcmd (motors disabled)") + except (OSError, RuntimeError, AttributeError) as e: + logger.warning(f"Safe-stop lowcmd failed: {e}") + # Close DDS endpoints explicitly — GC-based cleanup races with in-flight # callbacks and segfaults on process exit (mirrors the Go2 adapter). if self._subscriber is not None: @@ -190,55 +219,120 @@ def stop(self) -> None: logger.info("G1WholeBodyConnection disconnected") super().stop() + # Identity quaternion + zeros while LowState hasn't arrived (start() blocks + # for it, but the publish loop may also see _low_state cleared during stop()). + _ZERO_QUAT = (1.0, 0.0, 0.0, 0.0) + _ZERO_VEC3 = (0.0, 0.0, 0.0) + + def _drain_low_state(self) -> None: + """Pull the freshest LowState frame off the subscriber and stash it.""" + sub = self._subscriber + if sub is None: + return + fresh = sub.Read() + if fresh is None: + return + with self._lock: + self._low_state = fresh + self._verify_mode_machine_once(fresh) + + def _verify_mode_machine_once(self, sample: object) -> None: + """One-shot sanity check: log if the hardcoded mode_machine + doesn't match what the firmware reports. Commands with a + wrong mode_machine are silently rejected, so this prevents + a confusing "everything looks fine but the robot doesn't + move" failure mode on G1 variants we haven't tested.""" + if self._mode_machine_verified: + return + self._mode_machine_verified = True + actual = int(getattr(sample, "mode_machine", -1)) + if actual != self._mode_machine: + logger.warning( + f"mode_machine mismatch: hardcoded {self._mode_machine}, " + f"robot reports {actual}. Commands may be silently rejected " + f"by firmware — set _MODE_MACHINE_G1 to {actual} for this variant." + ) + + def _snapshot_motor_imu( + self, + ) -> tuple[ + list[float], list[float], list[float], + tuple[float, float, float, float], + tuple[float, float, float], + tuple[float, float, float], + ]: + """Return (positions, velocities, efforts, quat, gyro, accel) for + the latest cached LowState, or zero-defaults when none has been + received yet.""" + with self._lock: + ls = self._low_state + if ls is None: + return ( + [0.0] * _NUM_MOTORS, + [0.0] * _NUM_MOTORS, + [0.0] * _NUM_MOTORS, + self._ZERO_QUAT, + self._ZERO_VEC3, + self._ZERO_VEC3, + ) + return ( + [ls.motor_state[i].q for i in range(_NUM_MOTORS)], + [ls.motor_state[i].dq for i in range(_NUM_MOTORS)], + [ls.motor_state[i].tau_est for i in range(_NUM_MOTORS)], + tuple(ls.imu_state.quaternion), + tuple(ls.imu_state.gyroscope), + tuple(ls.imu_state.accelerometer), + ) + + def _publish_motor_state_and_imu( + self, + now: float, + frame_id: str, + positions: list[float], + velocities: list[float], + efforts: list[float], + quat: tuple[float, float, float, float], + gyro: tuple[float, float, float], + accel: tuple[float, float, float], + ) -> None: + self.motor_states.publish( + JointState( + ts=now, + frame_id=frame_id, + name=G1_JOINT_NAMES, + position=positions, + velocity=velocities, + effort=efforts, + ) + ) + # Unitree quat is (w,x,y,z); dimos Quaternion is (x,y,z,w). + self.imu.publish( + Imu( + ts=now, + frame_id=frame_id, + orientation=Quaternion(quat[1], quat[2], quat[3], quat[0]), + angular_velocity=Vector3(gyro[0], gyro[1], gyro[2]), + linear_acceleration=Vector3(accel[0], accel[1], accel[2]), + ) + ) + def _publish_loop(self) -> None: period = 1.0 / float(self.config.publish_rate_hz) next_tick = time.perf_counter() frame_id = self.config.frame_id - # Identity quaternion + zeros while LowState hasn't arrived (start() blocks - # for it, but the publish loop may also see _low_state cleared during stop()). - zero_quat = (1.0, 0.0, 0.0, 0.0) - zero_vec3 = (0.0, 0.0, 0.0) - while not self._stop_event.is_set(): - with self._lock: - ls = self._low_state - if ls is None: - positions: list[float] = [0.0] * _NUM_MOTORS - velocities: list[float] = [0.0] * _NUM_MOTORS - efforts: list[float] = [0.0] * _NUM_MOTORS - quat = zero_quat - gyro = zero_vec3 - accel = zero_vec3 - else: - positions = [ls.motor_state[i].q for i in range(_NUM_MOTORS)] - velocities = [ls.motor_state[i].dq for i in range(_NUM_MOTORS)] - efforts = [ls.motor_state[i].tau_est for i in range(_NUM_MOTORS)] - quat = tuple(ls.imu_state.quaternion) - gyro = tuple(ls.imu_state.gyroscope) - accel = tuple(ls.imu_state.accelerometer) - - now = time.time() - self.motor_states.publish( - JointState( - ts=now, - frame_id=frame_id, - name=G1_JOINT_NAMES, - position=positions, - velocity=velocities, - effort=efforts, - ) - ) - - # Unitree quat is (w,x,y,z); dimos Quaternion is (x,y,z,w). - self.imu.publish( - Imu( - ts=now, - frame_id=frame_id, - orientation=Quaternion(quat[1], quat[2], quat[3], quat[0]), - angular_velocity=Vector3(gyro[0], gyro[1], gyro[2]), - linear_acceleration=Vector3(accel[0], accel[1], accel[2]), - ) + self._drain_low_state() + positions, velocities, efforts, quat, gyro, accel = self._snapshot_motor_imu() + self._publish_motor_state_and_imu( + now=time.time(), + frame_id=frame_id, + positions=positions, + velocities=velocities, + efforts=efforts, + quat=quat, + gyro=gyro, + accel=accel, ) next_tick += period @@ -276,15 +370,17 @@ def _on_motor_command(self, msg: MotorCommandArray) -> None: self._low_cmd.crc = self._crc.Crc(self._low_cmd) self._publisher.Write(self._low_cmd) - def _on_low_state(self, msg: Any) -> None: - """rt/lowstate callback — captures mode_machine and the latest snapshot.""" - with self._lock: - self._low_state = msg - if self._mode_machine is None: - self._mode_machine = msg.mode_machine - def _release_sport_mode(self) -> None: - """Loop ReleaseMode until MotionSwitcher reports no active controller.""" + """Loop ReleaseMode until MotionSwitcher reports no active controller. + + Bails early if the first CheckMode reports nothing active. That + matters for back-to-back ``dimos run`` invocations: the first run + already released sport mode, so on a clean second start there's + nothing to release. Calling ReleaseMode anyway opens a window + where motor controllers are mid-handoff while we're already + publishing rt/lowcmd, which has been observed to cause horrible + mechanical noise from the gearboxes. + """ from unitree_sdk2py.comm.motion_switcher.motion_switcher_client import ( MotionSwitcherClient, ) @@ -293,8 +389,14 @@ def _release_sport_mode(self) -> None: msc.SetTimeout(5.0) msc.Init() - # CheckMode returns (status, None) once nothing is active — null-tolerant. + # CheckMode returns (status, None) — or (status, {"name": ""}) on + # some firmwares — once nothing is active. Treat both as "already + # released" and return without poking ReleaseMode. _status, result = msc.CheckMode() + if not result or not result.get("name"): + logger.info("Sport mode already released — skipping ReleaseMode") + return + while result and result.get("name"): msc.ReleaseMode() _status, result = msc.CheckMode() diff --git a/dimos/simulation/adapters/whole_body/g1.py b/dimos/simulation/adapters/whole_body/g1.py new file mode 100644 index 0000000000..b0b8693642 --- /dev/null +++ b/dimos/simulation/adapters/whole_body/g1.py @@ -0,0 +1,205 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""MuJoCo simulation ``WholeBodyAdapter`` for the Unitree G1. + +Pairs with ``MujocoSimModule`` (in-process MuJoCo engine + SHM publisher). +The blueprint composes both modules; they share the same ``MujocoEngine`` +indirectly via SHM keyed on the MJCF path. + +The adapter conforms to the same ``WholeBodyAdapter`` Protocol the real-hw +DDS adapter implements, so ControlCoordinator (and the GR00T WBC task on +top of it) can't tell sim from real. +""" + +from __future__ import annotations + +import math +from pathlib import Path +import time +from typing import TYPE_CHECKING, Any + +from dimos.hardware.whole_body.spec import ( + POS_STOP, + IMUState, + MotorCommand, + MotorState, +) +from dimos.simulation.engines.mujoco_shm import ( + ManipShmReader, + shm_key_from_path, +) +from dimos.utils.logging_config import setup_logger + +if TYPE_CHECKING: + from dimos.hardware.whole_body.registry import WholeBodyAdapterRegistry + +logger = setup_logger() + +_NUM_MOTORS = 29 + +_READY_WAIT_TIMEOUT_S = 60.0 +_READY_WAIT_POLL_S = 0.1 +_ATTACH_RETRY_TIMEOUT_S = 30.0 +_ATTACH_RETRY_POLL_S = 0.2 + + +class SimMujocoG1WholeBodyAdapter: + """G1 ``WholeBodyAdapter`` that proxies to a ``MujocoSimModule`` via SHM. + + The sim module owns the engine and publishes joint state + IMU into + SHM each step; this adapter reads them and forwards per-joint + (q, kp, kd, tau) commands back into SHM for the engine's pre-step + PD-with-feedforward hook to apply. + + ``address`` (the MJCF XML path) is the discovery key — both sides + derive the same SHM names from it via ``shm_key_from_path``. + """ + + def __init__( + self, + address: str | Path | None = None, + domain_id: int = 0, + **_: Any, + ) -> None: + if address is None: + raise ValueError( + "SimMujocoG1WholeBodyAdapter: address (MJCF XML path) is required — " + "set HardwareComponent.address to the same MJCF path the " + "MujocoSimModule loads." + ) + self._address = address + self._shm_key = shm_key_from_path(address) + self._shm: ManipShmReader | None = None + self._connected = False + + # Lifecycle + + def connect(self) -> bool: + # Attach with retry — MujocoSimModule may still be starting up. + deadline = time.monotonic() + _ATTACH_RETRY_TIMEOUT_S + while True: + try: + self._shm = ManipShmReader(self._shm_key) + break + except FileNotFoundError: + if time.monotonic() > deadline: + logger.error( + "SimMujocoG1WholeBodyAdapter: SHM buffers not found", + address=self._address, + shm_key=self._shm_key, + timeout_s=_ATTACH_RETRY_TIMEOUT_S, + ) + return False + time.sleep(_ATTACH_RETRY_POLL_S) + + # Wait for the sim to signal ready (engine connected, first + # joint-state packet written). Without this the first + # read_motor_states() returns zeros and the WBC obs is junk. + deadline = time.monotonic() + _READY_WAIT_TIMEOUT_S + while not self._shm.is_ready(): + if time.monotonic() > deadline: + logger.error( + "SimMujocoG1WholeBodyAdapter: sim module not ready", + timeout_s=_READY_WAIT_TIMEOUT_S, + ) + self._shm.cleanup() + self._shm = None + return False + time.sleep(_READY_WAIT_POLL_S) + + self._connected = True + logger.info( + "SimMujocoG1WholeBodyAdapter connected", + num_motors=_NUM_MOTORS, + shm_key=self._shm_key, + ) + return True + + def disconnect(self) -> None: + if self._shm is not None: + try: + self._shm.cleanup() + except Exception as e: # best-effort cleanup + logger.warning(f"SHM cleanup raised: {e}") + self._shm = None + self._connected = False + + def is_connected(self) -> bool: + return self._connected and self._shm is not None + + # IO (WholeBodyAdapter protocol) + + def read_motor_states(self) -> list[MotorState]: + if not self._connected or self._shm is None: + return [MotorState()] * _NUM_MOTORS + positions = self._shm.read_positions(_NUM_MOTORS) + velocities = self._shm.read_velocities(_NUM_MOTORS) + efforts = self._shm.read_efforts(_NUM_MOTORS) + return [ + MotorState(q=positions[i], dq=velocities[i], tau=efforts[i]) for i in range(_NUM_MOTORS) + ] + + def has_motor_states(self) -> bool: + # Sim ground truth is available the moment SHM attaches. + # No ramp-up window like real DDS adapters need before the + # first state msg arrives. + return self._connected and self._shm is not None + + def read_imu(self) -> IMUState: + if not self._connected or self._shm is None: + return IMUState() + quat, gyro, accel = self._shm.read_imu() + # Derive ZYX Euler from the quaternion — matches the real G1 adapter. + w, x, y, z = quat + sinr = 2.0 * (w * x + y * z) + cosr = 1.0 - 2.0 * (x * x + y * y) + roll = math.atan2(sinr, cosr) + sinp = 2.0 * (w * y - z * x) + pitch = math.copysign(math.pi / 2.0, sinp) if abs(sinp) >= 1.0 else math.asin(sinp) + siny = 2.0 * (w * z + x * y) + cosy = 1.0 - 2.0 * (y * y + z * z) + yaw = math.atan2(siny, cosy) + return IMUState( + quaternion=quat, + gyroscope=gyro, + accelerometer=accel, + rpy=(roll, pitch, yaw), + ) + + def write_motor_commands(self, commands: list[MotorCommand]) -> bool: + if not self._connected or self._shm is None: + return False + if len(commands) != _NUM_MOTORS: + logger.error( + f"SimMujocoG1WholeBodyAdapter: expected {_NUM_MOTORS} commands, got {len(commands)}" + ) + return False + # Flatten the per-motor command into per-joint arrays. POS_STOP + # ("no command") is replaced with 0.0 — the engine's PD only + # acts when kp > 0 anyway, so a zeroed q is harmless. + q = [cmd.q if cmd.q != POS_STOP else 0.0 for cmd in commands] + kp = [cmd.kp for cmd in commands] + kd = [cmd.kd for cmd in commands] + tau = [cmd.tau for cmd in commands] + self._shm.write_pd_tau_command(q, kp, kd, tau) + return True + + +def register(registry: WholeBodyAdapterRegistry) -> None: + """Register with the whole-body adapter registry.""" + registry.register("sim_mujoco_g1", SimMujocoG1WholeBodyAdapter) + + +__all__ = ["SimMujocoG1WholeBodyAdapter"] diff --git a/dimos/simulation/engines/mujoco_engine.py b/dimos/simulation/engines/mujoco_engine.py index ada85dd477..4266948fb4 100644 --- a/dimos/simulation/engines/mujoco_engine.py +++ b/dimos/simulation/engines/mujoco_engine.py @@ -19,6 +19,7 @@ from collections.abc import Callable from dataclasses import dataclass from pathlib import Path +import signal import threading import time from typing import TYPE_CHECKING @@ -29,7 +30,14 @@ from numpy.typing import NDArray from dimos.constants import DEFAULT_THREAD_JOIN_TIMEOUT +from dimos.core.transport import LCMTransport +from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped +from dimos.msgs.geometry_msgs.Quaternion import Quaternion +from dimos.msgs.geometry_msgs.Vector3 import Vector3 +from dimos.msgs.sensor_msgs.Imu import Imu from dimos.simulation.engines.base import SimulationEngine +from dimos.simulation.engines.mujoco_shm import ManipShmWriter +from dimos.simulation.engines.wholebody_sim_hooks import WholeBodySimHooks from dimos.simulation.utils.xml_parser import JointMapping, build_joint_mappings from dimos.utils.logging_config import setup_logger @@ -86,13 +94,22 @@ def __init__( cameras: list[CameraConfig] | None = None, on_before_step: StepHook | None = None, on_after_step: StepHook | None = None, + assets: dict[str, bytes] | None = None, ) -> None: super().__init__(config_path=config_path, headless=headless) self._on_before_step: StepHook | None = on_before_step self._on_after_step: StepHook | None = on_after_step xml_path = self._resolve_xml_path(config_path) - self._model = mujoco.MjModel.from_xml_path(str(xml_path)) + if assets is not None: + # MJCFs that reference meshes by bare filename (e.g. menagerie + # G1) need the mesh bytes injected by name; from_xml_path can't + # find them on disk. + with open(xml_path) as f: + xml_str = f.read() + self._model = mujoco.MjModel.from_xml_string(xml_str, assets=assets) + else: + self._model = mujoco.MjModel.from_xml_path(str(xml_path)) self._xml_path = xml_path self._data = mujoco.MjData(self._model) @@ -208,12 +225,33 @@ def connect(self) -> bool: logger.error("connect() failed", cls=self.__class__.__name__, error=str(e)) return False - def disconnect(self) -> bool: + def run_blocking(self, on_started: Callable[[], None] | None = None) -> None: + """Run the simulation loop on the current thread until stopped. + + This is used by the subprocess entry point so ``mujoco.viewer`` runs + on that process's main thread. The normal Module path still uses + ``connect()``, which starts the loop on a worker thread. + """ + logger.info("run_blocking()", cls=self.__class__.__name__) + with self._lock: + self._connected = True + self._stop_event.clear() try: - logger.info("disconnect()", cls=self.__class__.__name__) + self._sim_loop(on_started=on_started) + finally: with self._lock: self._connected = False - self._stop_event.set() + + def request_stop(self) -> None: + """Ask the sim loop to stop without joining a thread.""" + with self._lock: + self._connected = False + self._stop_event.set() + + def disconnect(self) -> bool: + try: + logger.info("disconnect()", cls=self.__class__.__name__) + self.request_stop() if self._sim_thread and self._sim_thread.is_alive(): self._sim_thread.join(timeout=DEFAULT_THREAD_JOIN_TIMEOUT) self._sim_thread = None @@ -273,7 +311,7 @@ def _close_cam_renderers(cam_renderers: dict[str, _CameraRendererState]) -> None state.rgb_renderer.close() state.depth_renderer.close() - def _sim_loop(self) -> None: + def _sim_loop(self, on_started: Callable[[], None] | None = None) -> None: logger.info("sim loop started", cls=self.__class__.__name__) dt = 1.0 / self._control_frequency @@ -305,12 +343,16 @@ def _step_once(sync_viewer: bool) -> None: time.sleep(sleep_time) if self._headless: + if on_started is not None: + on_started() while not self._stop_event.is_set(): _step_once(sync_viewer=False) else: with viewer.launch_passive( self._model, self._data, show_left_ui=False, show_right_ui=False ) as m_viewer: + if on_started is not None: + on_started() while m_viewer.is_running() and not self._stop_event.is_set(): _step_once(sync_viewer=True) @@ -334,6 +376,14 @@ def joint_names(self) -> list[str]: def model(self) -> mujoco.MjModel: return self._model + @property + def data(self) -> mujoco.MjData: + """Live MjData. In-process consumers (sensors, PD hooks) read it + directly; physics integration in the sim thread mutates it under + ``self._lock`` so reads inside the same MujocoEngine instance are + coherent without extra locking.""" + return self._data + @property def joint_positions(self) -> list[float]: with self._lock: @@ -458,9 +508,212 @@ def get_camera_fovy(self, camera_name: str) -> float | None: return float(self._model.cam_fovy[cam_id]) +def engine_main( + mjcf_path: str, + shm_key: str, + dof: int, + *, + headless: bool = True, + inject_legacy_assets: bool = True, + odom_topic: str = "/odom", + imu_topic: str = "/imu", + imu_gyro_sensor_names: tuple[str, ...] = ( + "imu-pelvis-angular-velocity", + "imu-torso-angular-velocity", + "gyro_pelvis", + "imu_gyro", + ), + imu_accel_sensor_names: tuple[str, ...] = ( + "imu-pelvis-linear-acceleration", + "imu-torso-linear-acceleration", + "accelerometer_pelvis", + "imu_accel", + ), +) -> None: + """Standalone whole-body sim entry point. + + Runs an in-process MuJoCo engine + the whole-body SHM bridge + (optionally) + a passive viewer, all on the main thread. ``MujocoSimModule`` spawns this + as a subprocess when ``engine_mode='subprocess'`` is set — that way MuJoCo + can render with ``viewer.launch_passive`` on macOS (which requires main + thread) while dimos workers remain free to be daemonic. + + SHM layout matches ``ManipShmWriter`` so the same + ``WholeBodyAdapter`` (sim_mujoco_g1) reads commands + writes states from + the dimos side. The /odom + /imu LCM publishes mirror what + ``MujocoSimModule`` does in thread mode. + """ + # SHM writer that mirrors the in-process module's layout — the + # dimos-side adapter reads the same buffers either way. + shm = ManipShmWriter(shm_key) + + # Engine + asset injection (mirrors MujocoSimModule.start()). + assets: dict[str, bytes] | None = None + if inject_legacy_assets: + try: + from dimos.simulation.mujoco.model import get_assets + + assets = get_assets() + except Exception as e: # pragma: no cover - bare MJCFs don't need this + logger.warning(f"engine_main: asset injection skipped: {e}") + eng = MujocoEngine( + config_path=Path(mjcf_path), + headless=headless, + cameras=[], + assets=assets, + ) + + # Resolve IMU sensors + base-qpos slice once. + imu_gyro_slice = _find_sensor_slice_inline(eng.model, imu_gyro_sensor_names) + imu_accel_slice = _find_sensor_slice_inline(eng.model, imu_accel_sensor_names) + has_freejoint = bool( + eng.model.njnt > 0 + and int(eng.model.jnt_type[0]) == int(mujoco.mjtJoint.mjJNT_FREE) + ) + + # SHM bridge — runs in the engine's sim loop. + hooks = WholeBodySimHooks(shm, dof=dof) + + # LCM publishers (started lazily; .start() spawns the LCM thread). + odom_tx: LCMTransport[PoseStamped] = LCMTransport(odom_topic, PoseStamped) + odom_tx.start() + imu_tx: LCMTransport[Imu] = LCMTransport(imu_topic, Imu) + imu_tx.start() + + def _on_after_step(engine: MujocoEngine) -> None: + """Composite post-step: SHM writes + LCM publishes.""" + hooks.post_step(engine) + + data = engine.data + ts = time.time() + + # Base pose (qpos[0:7]) → /odom + if has_freejoint: + pos = data.qpos[0:3] + quat = data.qpos[3:7] # (w, x, y, z) MuJoCo convention + odom_tx.publish( + PoseStamped( + ts=ts, + frame_id="world", + position=Vector3(float(pos[0]), float(pos[1]), float(pos[2])), + orientation=Quaternion( + float(quat[1]), float(quat[2]), float(quat[3]), float(quat[0]) + ), + ) + ) + + # IMU sensors → SHM + /imu + if imu_gyro_slice is None and imu_accel_slice is None and not has_freejoint: + return + quat_tup = ( + ( + float(data.qpos[3]), + float(data.qpos[4]), + float(data.qpos[5]), + float(data.qpos[6]), + ) + if has_freejoint + else (1.0, 0.0, 0.0, 0.0) + ) + if imu_gyro_slice is not None: + gyro_vals = data.sensordata[imu_gyro_slice] + gyro_tup = (float(gyro_vals[0]), float(gyro_vals[1]), float(gyro_vals[2])) + else: + gyro_tup = (0.0, 0.0, 0.0) + if imu_accel_slice is not None: + accel_vals = data.sensordata[imu_accel_slice] + accel_tup = (float(accel_vals[0]), float(accel_vals[1]), float(accel_vals[2])) + else: + accel_tup = (0.0, 0.0, 0.0) + shm.write_imu(quaternion=quat_tup, gyroscope=gyro_tup, accelerometer=accel_tup) + imu_tx.publish( + Imu( + ts=ts, + frame_id="pelvis", + orientation=Quaternion(quat_tup[1], quat_tup[2], quat_tup[3], quat_tup[0]), + angular_velocity=Vector3(*gyro_tup), + linear_acceleration=Vector3(*accel_tup), + ) + ) + + eng._on_before_step = hooks.pre_step # type: ignore[attr-defined] + eng._on_after_step = _on_after_step # type: ignore[attr-defined] + + def _handle_sig(signum: int, frame: object) -> None: + logger.info(f"engine_main: signal {signum} received, stopping") + eng.request_stop() + + signal.signal(signal.SIGINT, _handle_sig) + signal.signal(signal.SIGTERM, _handle_sig) + + def _mark_ready() -> None: + shm.signal_ready(num_joints=eng.num_joints) + logger.info( + "engine_main: ready", + mjcf=mjcf_path, + shm_key=shm_key, + dof=dof, + headless=headless, + ) + + try: + eng.run_blocking(on_started=_mark_ready) + finally: + try: + eng.request_stop() + except Exception as e: + logger.warning(f"engine_main: request_stop raised: {e}") + try: + shm.signal_stop() + shm.cleanup() + except Exception as e: + logger.warning(f"engine_main: shm cleanup raised: {e}") + odom_tx.stop() + imu_tx.stop() + + +def _find_sensor_slice_inline( + model: mujoco.MjModel, names: tuple[str, ...], dim: int = 3 +) -> slice | None: + for n in names: + sid = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_SENSOR, n) + if sid >= 0: + adr = int(model.sensor_adr[sid]) + return slice(adr, adr + dim) + return None + + __all__ = [ "CameraConfig", "CameraFrame", "MujocoEngine", "StepHook", + "engine_main", ] + + +if __name__ == "__main__": + import argparse + + p = argparse.ArgumentParser( + description="Standalone MuJoCo whole-body sim subprocess.", + prog="python -m dimos.simulation.engines.mujoco_engine", + ) + p.add_argument("mjcf", help="Path to MJCF XML") + p.add_argument("shm_key", help="SHM key (matches the dimos-side adapter)") + p.add_argument("dof", type=int, help="Number of motor DOFs") + p.add_argument("--view", action="store_true", help="Launch passive viewer") + p.add_argument("--no-asset-inject", action="store_true", help="Skip menagerie asset injection") + p.add_argument("--odom-topic", default="/odom") + p.add_argument("--imu-topic", default="/imu") + args = p.parse_args() + + engine_main( + mjcf_path=args.mjcf, + shm_key=args.shm_key, + dof=args.dof, + headless=not args.view, + inject_legacy_assets=not args.no_asset_inject, + odom_topic=args.odom_topic, + imu_topic=args.imu_topic, + ) diff --git a/dimos/simulation/engines/mujoco_shm.py b/dimos/simulation/engines/mujoco_shm.py index c0623c7915..4138d28c5a 100644 --- a/dimos/simulation/engines/mujoco_shm.py +++ b/dimos/simulation/engines/mujoco_shm.py @@ -40,26 +40,38 @@ logger = setup_logger() -# Upper bound on joint count per sim. Arms + gripper are typically <= 10. -MAX_JOINTS = 16 +# Upper bound on joint count per sim. Manipulators use ≤10; humanoids +# (Unitree G1: 29) push higher. 32 leaves headroom while keeping all +# per-joint buffers tiny (32 floats = 256 B). +MAX_JOINTS = 32 _FLOAT_BYTES = 8 # float64 _INT32_BYTES = 4 +# IMU layout: quat (4) + gyro (3) + accel (3) = 10 floats. +_IMU_FLOATS = 10 + _joint_array_size = MAX_JOINTS * _FLOAT_BYTES # float64 array # Element counts for control and sequence arrays. _NUM_CTRL_FIELDS = 4 # [ready, stop, command_mode, num_joints] -_NUM_SEQ_COUNTERS = 8 # one per buffer type +_NUM_SEQ_COUNTERS = 12 # one per buffer type (manipulator + WB additions) # Buffer sizes (in bytes). # Keys are short to stay under macOS PSHMNAMLEN (31 bytes). _shm_sizes = { + # Manipulator-shared layout "pos": _joint_array_size, "vel": _joint_array_size, "eff": _joint_array_size, "pos_t": _joint_array_size, "vel_t": _joint_array_size, "grp": 2 * _FLOAT_BYTES, # [gripper_position, gripper_target] + # Whole-body additions (unused by manipulator path). + "imu": _IMU_FLOATS * _FLOAT_BYTES, # [w,x,y,z, gx,gy,gz, ax,ay,az] + "kp_t": _joint_array_size, # per-joint position-gain target + "kd_t": _joint_array_size, # per-joint velocity-gain target + "tau_t": _joint_array_size, # per-joint feedforward torque + # Bookkeeping "seq": _NUM_SEQ_COUNTERS * _FLOAT_BYTES, # int64 counters "ctl": _NUM_CTRL_FIELDS * _INT32_BYTES, # [ready, stop, command_mode, num_joints] } @@ -72,6 +84,11 @@ SEQ_VELOCITY_CMD = 4 SEQ_GRIPPER_STATE = 5 SEQ_GRIPPER_CMD = 6 +# Whole-body additions +SEQ_IMU = 7 +SEQ_KP_CMD = 8 +SEQ_KD_CMD = 9 +SEQ_TAU_CMD = 10 # Control indices. CTRL_READY = 0 @@ -82,6 +99,9 @@ # Command modes. CMD_MODE_POSITION = 0 CMD_MODE_VELOCITY = 1 +# Whole-body PD-with-feedforward: ctrl = kp*(q_t - q) + kd*(0 - dq) + tau_t. +# Per-step kp/kd lets a policy retune gains online if it wants to. +CMD_MODE_PD_TAU = 2 _NAME_PREFIX = "dmjm" @@ -114,7 +134,13 @@ def _unregister(shm: SharedMemory) -> SharedMemory: @dataclass(frozen=True) class ManipShmSet: - """Frozen set of named SharedMemory buffers for manipulator IPC.""" + """Frozen set of named SharedMemory buffers for sim ↔ adapter IPC. + + Despite the name (kept for backward compat with existing manipulator + consumers), the layout now also covers whole-body needs: IMU, per-joint + PD gain commands, and per-joint feedforward torque commands. The + extra buffers are unused by the manipulator path. + """ pos: SharedMemory vel: SharedMemory @@ -122,6 +148,12 @@ class ManipShmSet: pos_t: SharedMemory vel_t: SharedMemory grp: SharedMemory + # Whole-body additions + imu: SharedMemory + kp_t: SharedMemory + kd_t: SharedMemory + tau_t: SharedMemory + # Bookkeeping seq: SharedMemory ctl: SharedMemory @@ -170,6 +202,9 @@ def __init__(self, key: str) -> None: self._last_pos_cmd_seq = 0 self._last_vel_cmd_seq = 0 self._last_gripper_cmd_seq = 0 + self._last_kp_cmd_seq = 0 + self._last_kd_cmd_seq = 0 + self._last_tau_cmd_seq = 0 # Zero everything. for buf in self.shm.as_list(): np.ndarray((buf.size,), dtype=np.uint8, buffer=buf.buf)[:] = 0 @@ -226,6 +261,47 @@ def read_gripper_command(self) -> float | None: def read_command_mode(self) -> int: return int(self._control()[CTRL_COMMAND_MODE]) + # Whole-body additions + + def write_imu( + self, + quaternion: tuple[float, float, float, float], + gyroscope: tuple[float, float, float], + accelerometer: tuple[float, float, float], + ) -> None: + """Write IMU sample. Quaternion is (w, x, y, z).""" + arr = self._array(self.shm.imu, _IMU_FLOATS, np.float64) + arr[0:4] = quaternion + arr[4:7] = gyroscope + arr[7:10] = accelerometer + self._increment_seq(SEQ_IMU) + + def read_kp_command(self, num_joints: int) -> NDArray[np.float64] | None: + """Per-joint position-gain target if a new command landed since last call.""" + seq = self._get_seq(SEQ_KP_CMD) + if seq <= self._last_kp_cmd_seq: + return None + self._last_kp_cmd_seq = seq + arr = self._array(self.shm.kp_t, MAX_JOINTS, np.float64) + return arr[:num_joints].copy() + + def read_kd_command(self, num_joints: int) -> NDArray[np.float64] | None: + seq = self._get_seq(SEQ_KD_CMD) + if seq <= self._last_kd_cmd_seq: + return None + self._last_kd_cmd_seq = seq + arr = self._array(self.shm.kd_t, MAX_JOINTS, np.float64) + return arr[:num_joints].copy() + + def read_tau_command(self, num_joints: int) -> NDArray[np.float64] | None: + """Per-joint feedforward torque if a new command landed since last call.""" + seq = self._get_seq(SEQ_TAU_CMD) + if seq <= self._last_tau_cmd_seq: + return None + self._last_tau_cmd_seq = seq + arr = self._array(self.shm.tau_t, MAX_JOINTS, np.float64) + return arr[:num_joints].copy() + def signal_ready(self, num_joints: int) -> None: ctrl = self._control() ctrl[CTRL_NUM_JOINTS] = num_joints @@ -314,6 +390,79 @@ def write_gripper_command(self, position: float) -> None: arr[1] = position self._increment_seq(SEQ_GRIPPER_CMD) + # Whole-body additions + + def read_imu( + self, + ) -> tuple[ + tuple[float, float, float, float], + tuple[float, float, float], + tuple[float, float, float], + ]: + """Read IMU sample: ((qw, qx, qy, qz), (gx, gy, gz), (ax, ay, az)).""" + arr = np.ndarray((_IMU_FLOATS,), dtype=np.float64, buffer=self.shm.imu.buf) + return ( + (float(arr[0]), float(arr[1]), float(arr[2]), float(arr[3])), + (float(arr[4]), float(arr[5]), float(arr[6])), + (float(arr[7]), float(arr[8]), float(arr[9])), + ) + + def write_kp_command(self, kp: list[float]) -> None: + """Per-joint position-gain target. Switches command mode to PD+τ.""" + n = min(len(kp), MAX_JOINTS) + arr = np.ndarray((MAX_JOINTS,), dtype=np.float64, buffer=self.shm.kp_t.buf) + arr[:n] = kp[:n] + self._set_command_mode(CMD_MODE_PD_TAU) + self._increment_seq(SEQ_KP_CMD) + + def write_kd_command(self, kd: list[float]) -> None: + n = min(len(kd), MAX_JOINTS) + arr = np.ndarray((MAX_JOINTS,), dtype=np.float64, buffer=self.shm.kd_t.buf) + arr[:n] = kd[:n] + self._set_command_mode(CMD_MODE_PD_TAU) + self._increment_seq(SEQ_KD_CMD) + + def write_tau_command(self, tau: list[float]) -> None: + """Per-joint feedforward torque, applied on top of PD.""" + n = min(len(tau), MAX_JOINTS) + arr = np.ndarray((MAX_JOINTS,), dtype=np.float64, buffer=self.shm.tau_t.buf) + arr[:n] = tau[:n] + self._set_command_mode(CMD_MODE_PD_TAU) + self._increment_seq(SEQ_TAU_CMD) + + def write_pd_tau_command( + self, + positions: list[float], + kp: list[float], + kd: list[float], + tau: list[float], + ) -> None: + """Write a whole-body PD+tau command without transient mode flips. + + The sim engine runs in a different process, so setting position mode + first and PD mode later creates a small but real race. Write all arrays, + publish PD mode once, then bump the sequence counters. + """ + n_pos = min(len(positions), MAX_JOINTS) + n_kp = min(len(kp), MAX_JOINTS) + n_kd = min(len(kd), MAX_JOINTS) + n_tau = min(len(tau), MAX_JOINTS) + np.ndarray((MAX_JOINTS,), dtype=np.float64, buffer=self.shm.pos_t.buf)[:n_pos] = ( + positions[:n_pos] + ) + np.ndarray((MAX_JOINTS,), dtype=np.float64, buffer=self.shm.kp_t.buf)[:n_kp] = kp[:n_kp] + np.ndarray((MAX_JOINTS,), dtype=np.float64, buffer=self.shm.kd_t.buf)[:n_kd] = kd[:n_kd] + np.ndarray((MAX_JOINTS,), dtype=np.float64, buffer=self.shm.tau_t.buf)[:n_tau] = tau[ + :n_tau + ] + self._set_command_mode(CMD_MODE_PD_TAU) + self._increment_seq(SEQ_KP_CMD) + self._increment_seq(SEQ_KD_CMD) + self._increment_seq(SEQ_TAU_CMD) + # Position is the engine-side trigger for latching a new PD target, + # so publish it last after gains/torque are visible. + self._increment_seq(SEQ_POSITION_CMD) + def is_ready(self) -> bool: return bool(self._control()[CTRL_READY] == 1) diff --git a/dimos/simulation/engines/mujoco_sim_module.py b/dimos/simulation/engines/mujoco_sim_module.py index 3d2ff927fe..7b71b77177 100644 --- a/dimos/simulation/engines/mujoco_sim_module.py +++ b/dimos/simulation/engines/mujoco_sim_module.py @@ -28,10 +28,14 @@ import math from pathlib import Path +import shutil +import subprocess +import sys import threading import time -from typing import Any +from typing import Any, Literal +import mujoco from pydantic import Field import reactivex as rx from scipy.spatial.transform import Rotation as R @@ -40,12 +44,13 @@ from dimos.core.module import Module, ModuleConfig from dimos.core.stream import Out from dimos.hardware.sensors.camera.spec import DepthCameraConfig, DepthCameraHardware +from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped from dimos.msgs.geometry_msgs.Quaternion import Quaternion from dimos.msgs.geometry_msgs.Transform import Transform from dimos.msgs.geometry_msgs.Vector3 import Vector3 from dimos.msgs.sensor_msgs.CameraInfo import CameraInfo from dimos.msgs.sensor_msgs.Image import Image, ImageFormat -from dimos.msgs.sensor_msgs.JointState import JointState +from dimos.msgs.sensor_msgs.Imu import Imu from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2 from dimos.simulation.engines.mujoco_engine import ( CameraConfig, @@ -61,6 +66,17 @@ logger = setup_logger() + +def _find_sensor_slice(model: mujoco.MjModel, *names: str, dim: int = 3) -> slice | None: + """Return the first matching MJCF sensor's slice into sensordata, or None.""" + for n in names: + sid = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_SENSOR, n) + if sid >= 0: + adr = int(model.sensor_adr[sid]) + return slice(adr, adr + dim) + return None + + _RX180 = R.from_euler("x", 180, degrees=True) @@ -74,7 +90,7 @@ def _default_identity_transform() -> Transform: class MujocoSimModuleConfig(ModuleConfig, DepthCameraConfig): """Configuration for the unified MuJoCo simulation module.""" - address: str = "" + address: str | Path = "" headless: bool = False dof: int = 7 @@ -86,10 +102,48 @@ class MujocoSimModuleConfig(ModuleConfig, DepthCameraConfig): base_frame_id: str = "link7" base_transform: Transform | None = Field(default_factory=_default_identity_transform) align_depth_to_color: bool = True + enable_color: bool = True enable_depth: bool = True enable_pointcloud: bool = False pointcloud_fps: float = 5.0 camera_info_fps: float = 1.0 + # Inject menagerie/dimos-bundled mesh bytes (via + # dimos.simulation.mujoco.model.get_assets) into MjModel.from_xml_string. + # MJCFs that reference meshes by bare filename (G1 GR00T, Go2) need this; + # self-contained MJCFs with on-disk meshes (xarm scene.xml) don't. + inject_legacy_assets: bool = False + # MJCF sensor names used to publish IMU. The module probes these in + # order and uses the first that exists in the model; if none match + # IMU publishing stays silent. Default list covers the common + # humanoid pelvis-mounted naming conventions (menagerie + dimos + # bundled MJCFs); pass robot-specific names for other platforms. + imu_gyro_sensor_names: list[str] = Field( + default_factory=lambda: [ + "imu-pelvis-angular-velocity", + "imu-torso-angular-velocity", + "gyro_pelvis", + "imu_gyro", + ] + ) + imu_accel_sensor_names: list[str] = Field( + default_factory=lambda: [ + "imu-pelvis-linear-acceleration", + "imu-torso-linear-acceleration", + "accelerometer_pelvis", + "imu_accel", + ] + ) + # Engine execution mode: + # "thread" — engine runs on a dimos worker thread inside this + # Module (current default). Cameras supported. + # "subprocess" — Module spawns ``python -m + # dimos.simulation.engines.mujoco_engine`` and proxies + # to it via SHM. Needed when a passive viewer is wanted + # (mujoco.viewer.launch_passive requires main thread, + # which a dimos worker can't provide on macOS). + # Cameras + image-stream Out ports are not supported + # in this mode — set enable_color/depth/pointcloud=False. + engine_mode: Literal["thread", "subprocess"] = "thread" class MujocoSimModule( @@ -111,11 +165,18 @@ class MujocoSimModule( pointcloud: Out[PointCloud2] camera_info: Out[CameraInfo] depth_camera_info: Out[CameraInfo] + imu: Out[Imu] + # Floating-base pose (qpos[0:7]) for robots whose MJCF has a free + # joint at the root. Published every step; consumers like the viser + # viewer use this to translate the robot in world space. + odom: Out[PoseStamped] def __init__(self, **kwargs: Any) -> None: super().__init__(**kwargs) self._engine: MujocoEngine | None = None self._shm: ManipShmWriter | None = None + self._sim_hooks: Any | None = None # WholeBodySimHooks; thread mode only + self._engine_proc: subprocess.Popen[Any] | None = None # subprocess mode only self._gripper_idx: int | None = None self._gripper_ctrl_range: tuple[float, float] = (0.0, 1.0) self._gripper_joint_range: tuple[float, float] = (0.0, 1.0) @@ -123,6 +184,14 @@ def __init__(self, **kwargs: Any) -> None: self._publish_thread: threading.Thread | None = None self._camera_info_base: CameraInfo | None = None + # IMU sensor slices into MjData.sensordata, resolved once at start. + # None if the MJCF has no recognized IMU sensors (e.g. arm-only sims). + self._imu_gyro_slice: slice | None = None + self._imu_accel_slice: slice | None = None + # Quaternion is read from the floating-base qpos[3:7] when the model + # has a free joint at the root; None otherwise. + self._imu_base_qpos_slice: slice | None = None + @property def _camera_link(self) -> str: return f"{self.config.camera_name}_link" @@ -165,23 +234,50 @@ def start(self) -> None: raise RuntimeError("MujocoSimModule: config.address (MJCF path) is required") # SHM key — adapter derives the same key from the same MJCF path. + # Both engine_mode paths use it. shm_key = shm_key_from_path(self.config.address) + + if self.config.engine_mode == "subprocess": + self._start_subprocess(shm_key) + return + + # Thread mode is the default engine path. self._shm = ManipShmWriter(shm_key) # Build engine with SHM hooks installed. - self._engine = MujocoEngine( - config_path=Path(self.config.address), - headless=self.config.headless, - cameras=[ + engine_assets: dict[str, bytes] | None = None + if self.config.inject_legacy_assets: + from dimos.simulation.mujoco.model import get_assets + + engine_assets = get_assets() + # Compose the camera list. Each registered camera blocks the + # sim thread inside _step_once (mujoco_engine._render_cameras + # does update_scene + GPU render synchronously between physics + # steps — typically 5-30 ms per camera), so registering a camera + # nobody consumes burns the 500 Hz tick deadline for nothing. + # Skip the primary camera entirely when none of color / depth / + # pointcloud is enabled. + cameras: list[CameraConfig] = [] + primary_needed = ( + self.config.enable_color or self.config.enable_depth or self.config.enable_pointcloud + ) + if primary_needed: + cameras.append( CameraConfig( name=self.config.camera_name, width=self.config.width, height=self.config.height, fps=float(self.config.fps), ) - ], - on_before_step=self._apply_shm_commands, - on_after_step=self._publish_shm_state, + ) + + self._engine = MujocoEngine( + config_path=Path(self.config.address), + headless=self.config.headless, + cameras=cameras, + on_before_step=None, # set after gripper detection below + on_after_step=None, + assets=engine_assets, ) # Detect gripper (extra joint beyond dof). @@ -202,6 +298,38 @@ def start(self) -> None: joint_range=joint_range, ) + # Resolve IMU sensors once. Names come from config so robot- + # specific blueprints (G1, H1, Optimus, …) can override; manipulator + # MJCFs typically have neither — we leave the slices as None and + # skip IMU publishing for those. + self._imu_gyro_slice = _find_sensor_slice( + self._engine.model, *self.config.imu_gyro_sensor_names, dim=3 + ) + self._imu_accel_slice = _find_sensor_slice( + self._engine.model, *self.config.imu_accel_sensor_names, dim=3 + ) + # Floating-base orientation is qpos[3:7] (w,x,y,z) when the root + # joint is a free joint. Detect by checking jnt_type[0]. + if self._engine.model.njnt > 0 and int(self._engine.model.jnt_type[0]) == int( + mujoco.mjtJoint.mjJNT_FREE + ): + self._imu_base_qpos_slice = slice(3, 7) + else: + self._imu_base_qpos_slice = None + + # Wire SHM bridge hooks (shared with subprocess mode). + from dimos.simulation.engines.wholebody_sim_hooks import WholeBodySimHooks + + self._sim_hooks = WholeBodySimHooks( + self._shm, + dof=dof, + gripper_idx=self._gripper_idx, + gripper_ctrl_range=self._gripper_ctrl_range, + gripper_joint_range=self._gripper_joint_range, + ) + self._engine._on_before_step = self._sim_hooks.pre_step # type: ignore[attr-defined] + self._engine._on_after_step = self._publish_shm_and_lcm # type: ignore[attr-defined] + # Start physics (sim thread spawned inside engine.connect()). if not self._engine.connect(): raise RuntimeError("MujocoSimModule: engine.connect() failed") @@ -226,7 +354,7 @@ def start(self) -> None: ) ) - # Optional pointcloud generation. + # Optional pointcloud generation: back-projects primary camera depth. if self.config.enable_pointcloud and self.config.enable_depth: pc_interval = 1.0 / self.config.pointcloud_fps self.register_disposable( @@ -244,6 +372,63 @@ def start(self) -> None: shm_key=shm_key, ) + def _start_subprocess(self, shm_key: str) -> None: + """Spawn ``mujoco_engine.engine_main`` as a child process. + + macOS needs ``mjpython`` for ``viewer.launch_passive`` (it bridges + the Cocoa main-thread requirement); Linux runs fine under the + regular Python interpreter. Cameras + image-stream outputs are + not supported in this mode — frame transport would need an SHM + ring buffer that doesn't exist yet — so we refuse early to give + a clear error. + """ + if self.config.enable_color or self.config.enable_depth or self.config.enable_pointcloud: + raise RuntimeError( + "MujocoSimModule(engine_mode='subprocess') does not support cameras " + "(no cross-process frame buffer yet). Set enable_color / enable_depth / " + "enable_pointcloud to False or use engine_mode='thread'." + ) + if sys.platform == "darwin": + interp = shutil.which("mjpython") or shutil.which("python") + else: + interp = sys.executable + if interp is None: + raise RuntimeError( + "MujocoSimModule(engine_mode='subprocess'): no mjpython/python on PATH" + ) + + cmd = [ + interp, + "-m", + "dimos.simulation.engines.mujoco_engine", + str(self.config.address), + shm_key, + str(self.config.dof), + ] + if not self.config.headless: + cmd.append("--view") + if not self.config.inject_legacy_assets: + cmd.append("--no-asset-inject") + + self._engine_proc = subprocess.Popen(cmd) + time.sleep(0.2) + returncode = self._engine_proc.poll() + if returncode is not None: + self._engine_proc = None + raise RuntimeError( + "MujocoSimModule engine subprocess exited immediately " + f"(returncode={returncode}, address={self.config.address})" + ) + logger.info( + "MujocoSimModule spawned engine subprocess", + pid=self._engine_proc.pid, + interp=interp, + address=self.config.address, + shm_key=shm_key, + ) + # The dimos-side adapter polls SHM readiness; start() only catches + # immediate spawn failures here. + @rpc def stop(self) -> None: self._stop_event.set() @@ -252,6 +437,7 @@ def stop(self) -> None: self._publish_thread = None errors: list[tuple[str, BaseException]] = [] + # Thread-mode engine teardown. if self._engine is not None: try: self._engine.disconnect() @@ -259,6 +445,23 @@ def stop(self) -> None: except Exception as exc: logger.error("engine.disconnect() failed", error=str(exc)) errors.append(("engine.disconnect", exc)) + # Subprocess-mode teardown — SIGTERM, escalate to SIGKILL after + # a grace period if the subprocess doesn't exit (SHM cleanup + # happens inside engine_main's finally block). + if self._engine_proc is not None and self._engine_proc.poll() is None: + try: + self._engine_proc.terminate() + self._engine_proc.wait(timeout=3.0) + except subprocess.TimeoutExpired: + logger.warning( + f"engine subprocess pid={self._engine_proc.pid} didn't exit in 3 s; SIGKILL" + ) + self._engine_proc.kill() + except Exception as exc: + logger.error("engine subprocess terminate raised", error=str(exc)) + errors.append(("engine_proc.terminate", exc)) + finally: + self._engine_proc = None if self._shm is not None: try: self._shm.signal_stop() @@ -268,6 +471,7 @@ def stop(self) -> None: logger.error("SHM cleanup failed", error=str(exc)) errors.append(("shm.cleanup", exc)) + self._sim_hooks = None self._camera_info_base = None super().stop() @@ -275,51 +479,74 @@ def stop(self) -> None: op, err = errors[0] raise RuntimeError(f"MujocoSimModule.stop() failed during {op}: {err}") from err - def _apply_shm_commands(self, engine: MujocoEngine) -> None: - """Pre-step hook: pull command targets from SHM into the engine.""" + def _publish_shm_and_lcm(self, engine: MujocoEngine) -> None: + """Post-step hook: SHM writes (via WholeBodySimHooks) + LCM publishes. + + Subprocess-mode engines run the SHM half in their own process; the + LCM half there too (see ``mujoco_engine.engine_main``). This thread- + mode version stays here so existing in-Module behaviour is identical. + """ + if self._sim_hooks is not None: + self._sim_hooks.post_step(engine) shm = self._shm if shm is None: return - dof = self.config.dof - - pos_cmd = shm.read_position_command(dof) - if pos_cmd is not None: - engine.write_joint_command(JointState(position=pos_cmd.tolist())) - - vel_cmd = shm.read_velocity_command(dof) - if vel_cmd is not None: - engine.write_joint_command(JointState(velocity=vel_cmd.tolist())) - if self._gripper_idx is not None: - gripper_cmd = shm.read_gripper_command() - if gripper_cmd is not None: - ctrl_value = self._gripper_joint_to_ctrl(gripper_cmd) - engine.set_position_target(self._gripper_idx, ctrl_value) + # Odom — when the MJCF has a free-joint root, publish base pose + # from qpos[0:7] every step. Without this, downstream consumers + # (viser viewer, nav stack) only see joint articulation, not + # base translation through the world. + data = engine.data # in-process: same MjData the sim thread mutates + if self._imu_base_qpos_slice is not None: + base_pos = data.qpos[0:3] + base_quat = data.qpos[3:7] # (w, x, y, z) per MuJoCo convention + self.odom.publish( + PoseStamped( + ts=time.time(), + frame_id="world", + position=Vector3(float(base_pos[0]), float(base_pos[1]), float(base_pos[2])), + orientation=Quaternion( + float(base_quat[1]), + float(base_quat[2]), + float(base_quat[3]), + float(base_quat[0]), + ), # PoseStamped uses x,y,z,w + ) + ) - def _publish_shm_state(self, engine: MujocoEngine) -> None: - """Post-step hook: publish joint state to SHM.""" - shm = self._shm - if shm is None: + # IMU — only if MJCF declared the sensors. + if ( + self._imu_gyro_slice is None + and self._imu_accel_slice is None + and self._imu_base_qpos_slice is None + ): return - shm.write_joint_state( - positions=engine.joint_positions, - velocities=engine.joint_velocities, - efforts=engine.joint_efforts, + if self._imu_base_qpos_slice is not None: + q = data.qpos[self._imu_base_qpos_slice] + quat = (float(q[0]), float(q[1]), float(q[2]), float(q[3])) + else: + quat = (1.0, 0.0, 0.0, 0.0) + if self._imu_gyro_slice is not None: + g = data.sensordata[self._imu_gyro_slice] + gyro = (float(g[0]), float(g[1]), float(g[2])) + else: + gyro = (0.0, 0.0, 0.0) + if self._imu_accel_slice is not None: + a = data.sensordata[self._imu_accel_slice] + accel = (float(a[0]), float(a[1]), float(a[2])) + else: + accel = (0.0, 0.0, 0.0) + shm.write_imu(quaternion=quat, gyroscope=gyro, accelerometer=accel) + # Also publish on the stream port for downstream consumers. + self.imu.publish( + Imu( + ts=time.time(), + frame_id="pelvis", + orientation=Quaternion(quat[1], quat[2], quat[3], quat[0]), + angular_velocity=Vector3(gyro[0], gyro[1], gyro[2]), + linear_acceleration=Vector3(accel[0], accel[1], accel[2]), + ) ) - if self._gripper_idx is not None: - positions = engine.joint_positions - if self._gripper_idx < len(positions): - shm.write_gripper_state(positions[self._gripper_idx]) - - def _gripper_joint_to_ctrl(self, joint_position: float) -> float: - """Map joint-space gripper position to actuator control value.""" - jlo, jhi = self._gripper_joint_range - clo, chi = self._gripper_ctrl_range - clamped = max(jlo, min(jhi, joint_position)) - if jhi == jlo: - return clo - t = (clamped - jlo) / (jhi - jlo) - return chi - t * (chi - clo) def _build_camera_info(self) -> None: if self._engine is None: @@ -382,13 +609,14 @@ def _publish_loop(self) -> None: last_timestamp = frame.timestamp ts = time.time() - color_img = Image( - data=frame.rgb, - format=ImageFormat.RGB, - frame_id=self._color_optical_frame, - ts=ts, - ) - self.color_image.publish(color_img) + if self.config.enable_color: + color_img = Image( + data=frame.rgb, + format=ImageFormat.RGB, + frame_id=self._color_optical_frame, + ts=ts, + ) + self.color_image.publish(color_img) if self.config.enable_depth: depth_img = Image( @@ -469,7 +697,10 @@ def _publish_tf(self, ts: float, frame: CameraFrame | None) -> None: ) def _generate_pointcloud(self) -> None: - if self._engine is None or self._camera_info_base is None: + if self._engine is None: + return + # Back-project the primary camera's depth image. + if self._camera_info_base is None: return frame = self._engine.read_camera(self.config.camera_name) if frame is None: diff --git a/dimos/simulation/engines/wholebody_sim_hooks.py b/dimos/simulation/engines/wholebody_sim_hooks.py new file mode 100644 index 0000000000..c8a37bdf44 --- /dev/null +++ b/dimos/simulation/engines/wholebody_sim_hooks.py @@ -0,0 +1,156 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Pre-step + post-step hooks bridging ``MujocoEngine`` to the whole-body SHM. + +Extracted from ``MujocoSimModule`` so the same hook bodies run regardless of +*where* the engine lives: + +* ``MujocoSimModule(engine_mode="thread")`` — engine runs on a dimos worker + thread; hooks fire there. +* ``MujocoSimModule(engine_mode="subprocess")`` — engine runs in a child + process via ``mujoco_engine.engine_main``; hooks fire in that process's + main thread. + +The same SHM layout serves both modes, so the ``WholeBodyAdapter`` reading +SHM on the dimos side is unaffected by which mode the engine is using. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import numpy as np + +from dimos.msgs.sensor_msgs.JointState import JointState +from dimos.simulation.engines.mujoco_shm import CMD_MODE_PD_TAU +from dimos.utils.logging_config import setup_logger + +if TYPE_CHECKING: + from dimos.simulation.engines.mujoco_engine import MujocoEngine + from dimos.simulation.engines.mujoco_shm import ManipShmWriter + +logger = setup_logger() + + +class WholeBodySimHooks: + """Owns the per-step state bridging engine ↔ whole-body SHM. + + Pre-step: read motor commands (pos/vel/kp/kd/tau and an optional gripper + command) from SHM, latch them for PD-with-feedforward, and + drive the engine actuators accordingly. + Post-step: write motor states (positions, velocities, efforts) and the + gripper position back to SHM for the ``WholeBodyAdapter`` to + consume on the dimos side. + """ + + def __init__( + self, + shm: ManipShmWriter, + dof: int, + *, + gripper_idx: int | None = None, + gripper_ctrl_range: tuple[float, float] = (0.0, 1.0), + gripper_joint_range: tuple[float, float] = (0.0, 1.0), + ) -> None: + self._shm = shm + self._dof = dof + self._gripper_idx = gripper_idx + self._gripper_ctrl_range = gripper_ctrl_range + self._gripper_joint_range = gripper_joint_range + # PD-with-feedforward latching: kp/kd come from the controller's + # WholeBodyConfig (constant across ticks once set) so we keep the + # last seen values and apply them every step against the live q/dq. + self._latest_pd_pos_target: np.ndarray | None = None + self._latest_pd_kp: np.ndarray | None = None + self._latest_pd_kd: np.ndarray | None = None + self._latest_pd_tau: np.ndarray | None = None + + def pre_step(self, engine: MujocoEngine) -> None: + """Pull command targets from SHM into the engine.""" + shm = self._shm + dof = self._dof + + pos_cmd = shm.read_position_command(dof) + if pos_cmd is not None: + if shm.read_command_mode() == CMD_MODE_PD_TAU: + # Latch position target for the per-step PD computation + # below; do NOT route through engine.write_joint_command + # (that would set position-mode and override our tau). + self._latest_pd_pos_target = pos_cmd + else: + engine.write_joint_command(JointState(position=pos_cmd.tolist())) + + vel_cmd = shm.read_velocity_command(dof) + if vel_cmd is not None: + engine.write_joint_command(JointState(velocity=vel_cmd.tolist())) + + kp_cmd = shm.read_kp_command(dof) + if kp_cmd is not None: + self._latest_pd_kp = kp_cmd + kd_cmd = shm.read_kd_command(dof) + if kd_cmd is not None: + self._latest_pd_kd = kd_cmd + tau_cmd = shm.read_tau_command(dof) + if tau_cmd is not None: + self._latest_pd_tau = tau_cmd + + # Apply latched PD-tau if all four pieces have arrived at least once. + # Manipulator path (no kp/kd writes) skips this entirely. + if ( + self._latest_pd_pos_target is not None + and self._latest_pd_kp is not None + and self._latest_pd_kd is not None + ): + q = np.asarray(engine.joint_positions[:dof], dtype=np.float64) + dq = np.asarray(engine.joint_velocities[:dof], dtype=np.float64) + tau_ff = self._latest_pd_tau if self._latest_pd_tau is not None else np.zeros(dof) + tau = ( + self._latest_pd_kp * (self._latest_pd_pos_target - q) + + self._latest_pd_kd * (-dq) + + tau_ff + ) + engine.write_joint_command(JointState(effort=tau.tolist())) + + if self._gripper_idx is not None: + gripper_cmd = shm.read_gripper_command() + if gripper_cmd is not None: + ctrl_value = self._gripper_joint_to_ctrl(gripper_cmd) + engine.set_position_target(self._gripper_idx, ctrl_value) + + def post_step(self, engine: MujocoEngine) -> None: + """Publish joint state (and optionally gripper) back into SHM.""" + shm = self._shm + shm.write_joint_state( + positions=engine.joint_positions, + velocities=engine.joint_velocities, + efforts=engine.joint_efforts, + ) + if self._gripper_idx is not None: + positions = engine.joint_positions + if self._gripper_idx < len(positions): + shm.write_gripper_state(positions[self._gripper_idx]) + + def _gripper_joint_to_ctrl(self, joint_position: float) -> float: + """Map joint-space gripper position to actuator control value.""" + jlo, jhi = self._gripper_joint_range + clo, chi = self._gripper_ctrl_range + clamped = max(jlo, min(jhi, joint_position)) + if jhi == jlo: + return clo + t = (clamped - jlo) / (jhi - jlo) + return chi - t * (chi - clo) + + +__all__ = ["WholeBodySimHooks"] diff --git a/dimos/web/websocket_vis/websocket_vis_module.py b/dimos/web/websocket_vis/websocket_vis_module.py index 3d6b3df11c..fdeb851c12 100644 --- a/dimos/web/websocket_vis/websocket_vis_module.py +++ b/dimos/web/websocket_vis/websocket_vis_module.py @@ -105,8 +105,9 @@ class WebsocketVisModule(Module): gps_goal: Out[LatLon] explore_cmd: Out[Bool] stop_explore_cmd: Out[Bool] - cmd_vel: Out[Twist] + tele_cmd_vel: Out[Twist] movecmd_stamped: Out[TwistStamped] + # Arm/disarm/dry-run are one-shot coordinator RPCs, not output streams. def __init__(self, **kwargs: Any) -> None: """Initialize the WebSocket visualization module. @@ -129,10 +130,33 @@ def __init__(self, **kwargs: Any) -> None: # Track GPS goal points for visualization self.gps_goal_points: list[dict[str, float]] = [] + + # Lazy-initialised RPC client to ControlCoordinator. Used by the + # dashboard's arm/disarm/set_dry_run buttons which call the + # coordinator's @rpc set_activated / set_dry_run directly (no + # stream round-trip — arming is one-shot). + self._coordinator_client: Any | None = None + logger.info( f"WebSocket visualization module initialized on port {self.config.port}, GPS goal tracking enabled" ) + def _get_coordinator_client(self) -> Any | None: + """Lazy-init RPC client to ControlCoordinator. Returns None and + warns once if the coordinator isn't reachable (e.g. ws_vis is + being used in a blueprint that has no coordinator). Mirrors the + pattern in ``G1ManipulationModule._get_coordinator_client``.""" + if self._coordinator_client is None: + try: + from dimos.control.coordinator import ControlCoordinator + from dimos.core.rpc_client import RPCClient + + self._coordinator_client = RPCClient(None, ControlCoordinator) + except Exception as e: + logger.warning(f"ControlCoordinator RPC unreachable: {e}") + return None + return self._coordinator_client + def _start_broadcast_loop(self) -> None: def websocket_vis_loop() -> None: self._broadcast_loop = asyncio.new_event_loop() # type: ignore[assignment] @@ -147,6 +171,12 @@ def websocket_vis_loop() -> None: self._broadcast_thread = threading.Thread(target=websocket_vis_loop, daemon=True) # type: ignore[assignment] self._broadcast_thread.start() # type: ignore[attr-defined] + def _uses_rerun_web(self) -> bool: + rerun_open = getattr(self.config.g, "rerun_open", "native") + return self.config.g.viewer == "rerun-web" or ( + self.config.g.viewer == "rerun" and rerun_open in ("web", "both") + ) + @rpc def start(self) -> None: super().start() @@ -158,9 +188,8 @@ def start(self) -> None: self._uvicorn_server_thread = threading.Thread(target=self._run_uvicorn_server, daemon=True) self._uvicorn_server_thread.start() - # Auto-open browser only for rerun-web (dashboard with Rerun iframe + command center) - # For rerun and foxglove, users access the command center manually if needed - if self.config.g.viewer == "rerun-web": + # Only auto-open when the user chose web-based viewing. + if self._uses_rerun_web(): url = f"http://localhost:{self.config.port}/" logger.info(f"Dimensional Command Center: {url}") @@ -236,11 +265,8 @@ def _create_server(self) -> None: async def serve_index(request): # type: ignore[no-untyped-def] """Serve appropriate HTML based on viewer mode.""" - # If running native Rerun, redirect to standalone command center - if self.config.g.viewer != "rerun-web": + if not self._uses_rerun_web(): return RedirectResponse(url="/command-center") - - # Otherwise serve full dashboard with Rerun iframe return FileResponse(_DASHBOARD_HTML, media_type="text/html") async def serve_command_center(request): # type: ignore[no-untyped-def] @@ -330,17 +356,52 @@ async def clear_gps_goals(sid: str) -> None: await self.sio.emit("gps_travel_goal_points", self.gps_goal_points) logger.info("GPS goal points cleared and updated clients") + @self.sio.event # type: ignore[untyped-decorator] + async def arm(sid: str, data: dict[str, Any] | None = None) -> None: + """Dashboard → arm the locomotion policy (with ramp).""" + client = self._get_coordinator_client() + if client is None: + logger.warning("arm requested but ControlCoordinator RPC is unavailable") + return + logger.info("Dashboard requested arm") + client.set_activated(engaged=True) + + @self.sio.event # type: ignore[untyped-decorator] + async def disarm(sid: str, data: dict[str, Any] | None = None) -> None: + """Dashboard → disarm; task falls back to hold-current-pose.""" + client = self._get_coordinator_client() + if client is None: + logger.warning("disarm requested but ControlCoordinator RPC is unavailable") + return + logger.info("Dashboard requested disarm") + client.set_activated(engaged=False) + + @self.sio.event # type: ignore[untyped-decorator] + async def set_dry_run(sid: str, data: dict[str, Any] | None = None) -> None: + """Dashboard → toggle dry-run on the locomotion policy. + + Payload: ``{"enabled": bool}``. Task still computes but + coordinator sends nothing to the adapter when enabled. + """ + client = self._get_coordinator_client() + if client is None: + logger.warning("set_dry_run requested but ControlCoordinator RPC is unavailable") + return + enabled = bool((data or {}).get("enabled", False)) + logger.info(f"Dashboard set dry_run = {enabled}") + client.set_dry_run(enabled=enabled) + @self.sio.event # type: ignore[untyped-decorator] async def move_command(sid: str, data: dict[str, Any]) -> None: # Publish Twist if transport is configured - if self.cmd_vel and self.cmd_vel.transport: + if self.tele_cmd_vel and self.tele_cmd_vel.transport: twist = Twist( linear=Vector3(data["linear"]["x"], data["linear"]["y"], data["linear"]["z"]), angular=Vector3( data["angular"]["x"], data["angular"]["y"], data["angular"]["z"] ), ) - self.cmd_vel.publish(twist) + self.tele_cmd_vel.publish(twist) # Publish TwistStamped if transport is configured if self.movecmd_stamped and self.movecmd_stamped.transport: