From d12e692db58a0628d958cabf5bfea7945bd28299 Mon Sep 17 00:00:00 2001 From: douglasswng Date: Thu, 7 May 2026 11:16:50 +0800 Subject: [PATCH 1/3] feat(agents): agent-encode PoseStamped + position nav skills --- dimos/agents/skills/navigation.py | 87 ++++++++++++++++++-- dimos/agents/skills/test_navigation.py | 78 ++++++++++++++++++ dimos/msgs/geometry_msgs/PoseStamped.py | 19 ++++- dimos/msgs/geometry_msgs/test_PoseStamped.py | 39 +++++++++ 4 files changed, 217 insertions(+), 6 deletions(-) diff --git a/dimos/agents/skills/navigation.py b/dimos/agents/skills/navigation.py index d88bec452e..a7202fb94c 100644 --- a/dimos/agents/skills/navigation.py +++ b/dimos/agents/skills/navigation.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import math import time from typing import Any @@ -37,6 +38,14 @@ logger = setup_logger() +def _make_position_goal(x: float, y: float, yaw_rad: float, frame_id: str) -> PoseStamped: + return PoseStamped( + position=make_vector3(x, y, 0.0), + orientation=Quaternion.from_euler(make_vector3(0.0, 0.0, yaw_rad)), + frame_id=frame_id, + ) + + class NavigationSkillContainer(Module): _latest_image: Image | None = None _latest_odom: PoseStamped | None = None @@ -243,6 +252,78 @@ def _navigate_using_semantic_map(self, query: str) -> str: message = f"Found a location in the semantic map matching '{query}'." return self._navigate_to(goal_pose, message) + @skill + def navigate_to_position( + self, + x: float, + y: float, + yaw_deg: float = 0.0, + frame_id: str = "map", + ) -> str: + """Navigate to an absolute position in the given frame. + + Use this to act on a pose returned by another tool — pass its x, y, yaw_deg, and frame_id straight in. + + Args: + x: target x in meters + y: target y in meters + yaw_deg: final heading in degrees, 0 = facing +x + frame_id: coordinate frame + """ + if not self._skill_started: + raise ValueError(f"{self} has not been started.") + + goal = _make_position_goal(x, y, math.radians(yaw_deg), frame_id) + return self._navigate_to(goal, f"Heading to ({x:.2f}, {y:.2f}) in {frame_id}") + + @skill + def rotate_toward_position( + self, + x: float, + y: float, + frame_id: str = "map", + ) -> str: + """Rotate in place to face an absolute position in the given frame. + + Pass the target x and y from a pose's fields. Yaw is computed against the robot's current odometry. + + Args: + x: target x in meters + y: target y in meters + frame_id: coordinate frame + """ + if not self._skill_started: + raise ValueError(f"{self} has not been started.") + + if self._latest_odom is None: + return "No odometry data received yet, cannot rotate." + + cur_x = self._latest_odom.position.x + cur_y = self._latest_odom.position.y + yaw_rad = math.atan2(y - cur_y, x - cur_x) + + goal = _make_position_goal(cur_x, cur_y, yaw_rad, frame_id) + return self._navigate_to(goal, f"Rotating to face ({x:.2f}, {y:.2f}) in {frame_id}") + + @skill + def current_pose(self) -> str: + """Return the robot's current pose. + + Use this to reason about a target pose relative to the robot (e.g. distance or bearing). Compare only against poses with the same frame_id. + """ + if not self._skill_started: + raise ValueError(f"{self} has not been started.") + + if self._latest_odom is None: + return "No odometry data received yet." + + data = self._latest_odom.agent_encode() + return ( + f"Pose: ({data['x']}, {data['y']}, {data['z']})\n" + f"Yaw: {data['yaw_deg']}°\n" + f"Frame: {data['frame_id']}" + ) + @skill def stop_navigation(self) -> str: """Immediatly stop moving.""" @@ -273,8 +354,4 @@ def _get_goal_pose_from_result(self, result: dict[str, Any]) -> PoseStamped | No pos_y = first.get("pos_y", 0) theta = first.get("rot_z", 0) - return PoseStamped( - position=make_vector3(pos_x, pos_y, 0), - orientation=Quaternion.from_euler(make_vector3(0, 0, theta)), - frame_id="map", - ) + return _make_position_goal(pos_x, pos_y, theta, "map") diff --git a/dimos/agents/skills/test_navigation.py b/dimos/agents/skills/test_navigation.py index c837492d50..395d4a10e5 100644 --- a/dimos/agents/skills/test_navigation.py +++ b/dimos/agents/skills/test_navigation.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import math from typing import Any from langchain_core.messages import HumanMessage @@ -118,6 +119,21 @@ def _navigate_using_semantic_map(self, query): return f"Successfuly arrived at '{query}'" +class MockedPositionNavSkill(NavigationSkillContainer): + """Direct-instantiation harness for navigate_to_position / + rotate_toward_position / current_pose tests. Skips the heavy parent + __init__ and records goals at the _navigate_to boundary.""" + + def __init__(self, latest_odom: PoseStamped | None = None) -> None: + self._skill_started = True + self._latest_odom = latest_odom + self.recorded_goals: list[PoseStamped] = [] + + def _navigate_to(self, pose: PoseStamped, message: str) -> str: + self.recorded_goals.append(pose) + return message + + @pytest.mark.slow def test_stop_movement(agent_setup) -> None: history = agent_setup( @@ -163,3 +179,65 @@ def test_go_to_semantic_location(agent_setup) -> None: ) assert "success" in history[-1].content.lower() + + +def test_navigate_to_position_sets_goal_with_yaw() -> None: + skill = MockedPositionNavSkill() + skill.navigate_to_position(x=3.0, y=4.0, yaw_deg=90.0, frame_id="map") + + assert len(skill.recorded_goals) == 1 + goal = skill.recorded_goals[0] + assert goal.frame_id == "map" + assert goal.position.x == pytest.approx(3.0) + assert goal.position.y == pytest.approx(4.0) + assert math.degrees(goal.yaw) == pytest.approx(90.0, abs=0.1) + + +def test_rotate_toward_position_computes_yaw_from_odom() -> None: + skill = MockedPositionNavSkill( + latest_odom=PoseStamped( + position=(0.0, 0.0, 0.0), + orientation=(0.0, 0.0, 0.0, 1.0), + frame_id="map", + ) + ) + skill.rotate_toward_position(x=0.0, y=1.0, frame_id="map") + + assert len(skill.recorded_goals) == 1 + goal = skill.recorded_goals[0] + assert goal.position.x == pytest.approx(0.0) + assert goal.position.y == pytest.approx(0.0) + assert math.degrees(goal.yaw) == pytest.approx(90.0, abs=0.1) + + +def test_rotate_toward_position_without_odom_returns_message() -> None: + skill = MockedPositionNavSkill() + + result = skill.rotate_toward_position(x=1.0, y=0.0, frame_id="map") + + assert "no odometry" in result.lower() + assert skill.recorded_goals == [] + + +def test_current_pose_returns_formatted_pose() -> None: + skill = MockedPositionNavSkill( + latest_odom=PoseStamped( + position=(1.0, 2.0, 0.0), + orientation=(0.0, 0.0, 0.0, 1.0), + frame_id="map", + ) + ) + + result = skill.current_pose() + + assert "Pose: (1.0, 2.0, 0.0)" in result + assert "Yaw: 0.0" in result + assert "Frame: map" in result + + +def test_current_pose_without_odom_returns_message() -> None: + skill = MockedPositionNavSkill() + + result = skill.current_pose() + + assert "no odometry" in result.lower() diff --git a/dimos/msgs/geometry_msgs/PoseStamped.py b/dimos/msgs/geometry_msgs/PoseStamped.py index acf0af8b32..722e4dce3d 100644 --- a/dimos/msgs/geometry_msgs/PoseStamped.py +++ b/dimos/msgs/geometry_msgs/PoseStamped.py @@ -16,7 +16,7 @@ import math import time -from typing import TYPE_CHECKING, BinaryIO, TypeAlias +from typing import TYPE_CHECKING, Any, BinaryIO, TypeAlias if TYPE_CHECKING: from rerun._baseclasses import Archetype @@ -82,6 +82,23 @@ def __str__(self) -> str: f"euler=[{math.degrees(self.roll):.1f}, {math.degrees(self.pitch):.1f}, {math.degrees(self.yaw):.1f}])" ) + def agent_encode(self) -> dict[str, Any]: + """Render the pose for an LLM agent. + + Returns a flat dict with ``frame_id``, ``x``, ``y``, ``z``, + ``yaw_deg``. Pass these straight into navigation skills (e.g. + ``navigate_to_position``) to act on the pose. To reason about + the pose relative to the robot, the agent should fetch its own + pose (e.g. via ``current_pose``) and combine the two. + """ + return { + "frame_id": self.frame_id, + "x": round(self.x, 3), + "y": round(self.y, 3), + "z": round(self.z, 3), + "yaw_deg": round(math.degrees(self.yaw), 1), + } + def to_rerun(self) -> Archetype: """Convert to rerun Transform3D format. diff --git a/dimos/msgs/geometry_msgs/test_PoseStamped.py b/dimos/msgs/geometry_msgs/test_PoseStamped.py index a486f33303..8a4d8ba18d 100644 --- a/dimos/msgs/geometry_msgs/test_PoseStamped.py +++ b/dimos/msgs/geometry_msgs/test_PoseStamped.py @@ -12,10 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import math import pickle import time +import pytest + from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped +from dimos.msgs.geometry_msgs.Quaternion import Quaternion +from dimos.msgs.geometry_msgs.Vector3 import Vector3 def test_lcm_encode_decode() -> None: @@ -53,3 +58,37 @@ def test_pickle_encode_decode() -> None: assert isinstance(pose_dest, PoseStamped) assert pose_dest is not pose_source assert pose_dest == pose_source + + +def test_agent_encode_returns_absolute_fields() -> None: + """Test that agent_encode returns frame_id, x, y, z, and yaw_deg.""" + + pose = PoseStamped( + position=Vector3(1.0, 2.0, 3.0), + orientation=Quaternion.from_euler(Vector3(0.0, 0.0, math.radians(45.0))), + frame_id="map", + ) + encoded = pose.agent_encode() + + assert set(encoded.keys()) == {"frame_id", "x", "y", "z", "yaw_deg"} + assert encoded["frame_id"] == "map" + assert encoded["x"] == pytest.approx(1.0) + assert encoded["y"] == pytest.approx(2.0) + assert encoded["z"] == pytest.approx(3.0) + assert encoded["yaw_deg"] == pytest.approx(45.0, abs=0.1) + + +def test_agent_encode_rounds_values() -> None: + """Test that agent_encode rounds position to 3 dp and yaw to 1 dp.""" + + pose = PoseStamped( + position=Vector3(1.23456, 2.34567, 3.45678), + orientation=Quaternion.from_euler(Vector3(0.0, 0.0, math.radians(45.123))), + frame_id="map", + ) + encoded = pose.agent_encode() + + assert encoded["x"] == pytest.approx(1.235) + assert encoded["y"] == pytest.approx(2.346) + assert encoded["z"] == pytest.approx(3.457) + assert encoded["yaw_deg"] == pytest.approx(45.1, abs=0.05) From 2b10b9893ba647a647a211b7aad6e5b3bddabe5a Mon Sep 17 00:00:00 2001 From: douglasswng Date: Fri, 15 May 2026 11:49:10 +0800 Subject: [PATCH 2/3] fix(msgs): include roll_deg and pitch_deg in PoseStamped.agent_encode agent_encode lives on the generic PoseStamped type; excluding roll and pitch baked in a ground-robot assumption. Drones and manipulators need full orientation. --- dimos/msgs/geometry_msgs/PoseStamped.py | 11 +++++--- dimos/msgs/geometry_msgs/test_PoseStamped.py | 28 +++++++++++++++----- 2 files changed, 29 insertions(+), 10 deletions(-) diff --git a/dimos/msgs/geometry_msgs/PoseStamped.py b/dimos/msgs/geometry_msgs/PoseStamped.py index 722e4dce3d..06546419f4 100644 --- a/dimos/msgs/geometry_msgs/PoseStamped.py +++ b/dimos/msgs/geometry_msgs/PoseStamped.py @@ -86,16 +86,19 @@ def agent_encode(self) -> dict[str, Any]: """Render the pose for an LLM agent. Returns a flat dict with ``frame_id``, ``x``, ``y``, ``z``, - ``yaw_deg``. Pass these straight into navigation skills (e.g. - ``navigate_to_position``) to act on the pose. To reason about - the pose relative to the robot, the agent should fetch its own - pose (e.g. via ``current_pose``) and combine the two. + ``roll_deg``, ``pitch_deg``, ``yaw_deg``. Pass these straight + into navigation skills (e.g. ``navigate_to_position``) to act + on the pose. To reason about the pose relative to the robot, + the agent should fetch its own pose (e.g. via ``current_pose``) + and combine the two. """ return { "frame_id": self.frame_id, "x": round(self.x, 3), "y": round(self.y, 3), "z": round(self.z, 3), + "roll_deg": round(math.degrees(self.roll), 1), + "pitch_deg": round(math.degrees(self.pitch), 1), "yaw_deg": round(math.degrees(self.yaw), 1), } diff --git a/dimos/msgs/geometry_msgs/test_PoseStamped.py b/dimos/msgs/geometry_msgs/test_PoseStamped.py index 8a4d8ba18d..6d8917f1ed 100644 --- a/dimos/msgs/geometry_msgs/test_PoseStamped.py +++ b/dimos/msgs/geometry_msgs/test_PoseStamped.py @@ -61,29 +61,43 @@ def test_pickle_encode_decode() -> None: def test_agent_encode_returns_absolute_fields() -> None: - """Test that agent_encode returns frame_id, x, y, z, and yaw_deg.""" + """Test that agent_encode returns frame_id, position, and full orientation.""" pose = PoseStamped( position=Vector3(1.0, 2.0, 3.0), - orientation=Quaternion.from_euler(Vector3(0.0, 0.0, math.radians(45.0))), + orientation=Quaternion.from_euler( + Vector3(math.radians(10.0), math.radians(20.0), math.radians(45.0)) + ), frame_id="map", ) encoded = pose.agent_encode() - assert set(encoded.keys()) == {"frame_id", "x", "y", "z", "yaw_deg"} + assert set(encoded.keys()) == { + "frame_id", + "x", + "y", + "z", + "roll_deg", + "pitch_deg", + "yaw_deg", + } assert encoded["frame_id"] == "map" assert encoded["x"] == pytest.approx(1.0) assert encoded["y"] == pytest.approx(2.0) assert encoded["z"] == pytest.approx(3.0) + assert encoded["roll_deg"] == pytest.approx(10.0, abs=0.1) + assert encoded["pitch_deg"] == pytest.approx(20.0, abs=0.1) assert encoded["yaw_deg"] == pytest.approx(45.0, abs=0.1) def test_agent_encode_rounds_values() -> None: - """Test that agent_encode rounds position to 3 dp and yaw to 1 dp.""" + """Test that agent_encode rounds position to 3 dp and angles to 1 dp.""" pose = PoseStamped( position=Vector3(1.23456, 2.34567, 3.45678), - orientation=Quaternion.from_euler(Vector3(0.0, 0.0, math.radians(45.123))), + orientation=Quaternion.from_euler( + Vector3(math.radians(10.123), math.radians(20.456), math.radians(45.789)) + ), frame_id="map", ) encoded = pose.agent_encode() @@ -91,4 +105,6 @@ def test_agent_encode_rounds_values() -> None: assert encoded["x"] == pytest.approx(1.235) assert encoded["y"] == pytest.approx(2.346) assert encoded["z"] == pytest.approx(3.457) - assert encoded["yaw_deg"] == pytest.approx(45.1, abs=0.05) + assert encoded["roll_deg"] == pytest.approx(10.1, abs=0.05) + assert encoded["pitch_deg"] == pytest.approx(20.5, abs=0.05) + assert encoded["yaw_deg"] == pytest.approx(45.8, abs=0.05) From 04bb0914df45bd5dd51937da9dde8c4787e9a227 Mon Sep 17 00:00:00 2001 From: douglasswng Date: Fri, 15 May 2026 11:52:26 +0800 Subject: [PATCH 3/3] refactor(skills): drop frame_id param from position nav skills The rest of the file hardcodes "map" (tag_location, _get_goal_pose_from_result). Exposing frame_id as an LLM-settable param was inconsistent and gave the agent a knob it had no basis to set correctly. Documented the map-frame assumption in the docstrings. Will re-introduce frame_id when TF-tree lookup is added in a follow-up PR. --- dimos/agents/skills/navigation.py | 20 ++++++++------------ dimos/agents/skills/test_navigation.py | 6 +++--- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/dimos/agents/skills/navigation.py b/dimos/agents/skills/navigation.py index a7202fb94c..bc2ed28dfb 100644 --- a/dimos/agents/skills/navigation.py +++ b/dimos/agents/skills/navigation.py @@ -258,39 +258,35 @@ def navigate_to_position( x: float, y: float, yaw_deg: float = 0.0, - frame_id: str = "map", ) -> str: - """Navigate to an absolute position in the given frame. + """Navigate to an absolute position in the map frame. - Use this to act on a pose returned by another tool — pass its x, y, yaw_deg, and frame_id straight in. + Use this to act on a pose returned by another tool — pass its x, y, and yaw_deg straight in. Positions are interpreted in the map frame. Args: x: target x in meters y: target y in meters yaw_deg: final heading in degrees, 0 = facing +x - frame_id: coordinate frame """ if not self._skill_started: raise ValueError(f"{self} has not been started.") - goal = _make_position_goal(x, y, math.radians(yaw_deg), frame_id) - return self._navigate_to(goal, f"Heading to ({x:.2f}, {y:.2f}) in {frame_id}") + goal = _make_position_goal(x, y, math.radians(yaw_deg), "map") + return self._navigate_to(goal, f"Heading to ({x:.2f}, {y:.2f})") @skill def rotate_toward_position( self, x: float, y: float, - frame_id: str = "map", ) -> str: - """Rotate in place to face an absolute position in the given frame. + """Rotate in place to face an absolute position in the map frame. - Pass the target x and y from a pose's fields. Yaw is computed against the robot's current odometry. + Pass the target x and y from a pose's fields. Yaw is computed against the robot's current odometry. Positions are interpreted in the map frame. Args: x: target x in meters y: target y in meters - frame_id: coordinate frame """ if not self._skill_started: raise ValueError(f"{self} has not been started.") @@ -302,8 +298,8 @@ def rotate_toward_position( cur_y = self._latest_odom.position.y yaw_rad = math.atan2(y - cur_y, x - cur_x) - goal = _make_position_goal(cur_x, cur_y, yaw_rad, frame_id) - return self._navigate_to(goal, f"Rotating to face ({x:.2f}, {y:.2f}) in {frame_id}") + goal = _make_position_goal(cur_x, cur_y, yaw_rad, "map") + return self._navigate_to(goal, f"Rotating to face ({x:.2f}, {y:.2f})") @skill def current_pose(self) -> str: diff --git a/dimos/agents/skills/test_navigation.py b/dimos/agents/skills/test_navigation.py index 6a5d959030..34e5435a29 100644 --- a/dimos/agents/skills/test_navigation.py +++ b/dimos/agents/skills/test_navigation.py @@ -180,7 +180,7 @@ def test_go_to_semantic_location(agent_setup) -> None: def test_navigate_to_position_sets_goal_with_yaw() -> None: skill = MockedPositionNavSkill() - skill.navigate_to_position(x=3.0, y=4.0, yaw_deg=90.0, frame_id="map") + skill.navigate_to_position(x=3.0, y=4.0, yaw_deg=90.0) assert len(skill.recorded_goals) == 1 goal = skill.recorded_goals[0] @@ -198,7 +198,7 @@ def test_rotate_toward_position_computes_yaw_from_odom() -> None: frame_id="map", ) ) - skill.rotate_toward_position(x=0.0, y=1.0, frame_id="map") + skill.rotate_toward_position(x=0.0, y=1.0) assert len(skill.recorded_goals) == 1 goal = skill.recorded_goals[0] @@ -210,7 +210,7 @@ def test_rotate_toward_position_computes_yaw_from_odom() -> None: def test_rotate_toward_position_without_odom_returns_message() -> None: skill = MockedPositionNavSkill() - result = skill.rotate_toward_position(x=1.0, y=0.0, frame_id="map") + result = skill.rotate_toward_position(x=1.0, y=0.0) assert "no odometry" in result.lower() assert skill.recorded_goals == []