From 32a5eca38676fbf096a30a530734c03953ebace8 Mon Sep 17 00:00:00 2001 From: parav nagarsheth Date: Tue, 12 May 2026 09:55:56 +0000 Subject: [PATCH 01/15] add planning thread pool and worker function to manip module --- dimos/manipulation/manipulation_module.py | 285 ++++++++++++------ .../planning/monitor/world_monitor.py | 13 + 2 files changed, 214 insertions(+), 84 deletions(-) diff --git a/dimos/manipulation/manipulation_module.py b/dimos/manipulation/manipulation_module.py index 54863353be..674c211d91 100644 --- a/dimos/manipulation/manipulation_module.py +++ b/dimos/manipulation/manipulation_module.py @@ -24,6 +24,8 @@ from __future__ import annotations +from concurrent.futures import Future, ThreadPoolExecutor +from dataclasses import dataclass from enum import Enum import threading import time @@ -71,6 +73,20 @@ """Maps robot_name -> planned trajectory""" +@dataclass +class PlanningJob: + """In-flight or completed planning job for a single robot. + """ + + request_id: int + accepted_at: float + future: Future[Any] | None = None + completed_at: float | None = None + success: bool | None = None + error: str | None = None + invalidated: bool = False + + class ManipulationState(Enum): """State machine for manipulation module.""" @@ -129,6 +145,12 @@ def __init__(self, **kwargs: Any) -> None: self._planned_paths: PlannedPaths = {} self._planned_trajectories: PlannedTrajectories = {} + # Per-robot async planning job tracking + self._planning_jobs: dict[RobotName, PlanningJob] = {} + self._executing: dict[RobotName, bool] = {} + self._next_request_id: int = 0 + self._planning_pool: ThreadPoolExecutor | None = None + # Coordinator integration (lazy initialized) self._coordinator_client: RPCClient | None = None @@ -203,6 +225,13 @@ def _initialize_planning(self) -> None: self._planner = create_planner(name=self.config.planner_name) self._kinematics = create_kinematics(name=self.config.kinematics_name) + # One worker per robot lets bimanual plans run in parallel without + # queuing one arm's job behind the other. + self._planning_pool = ThreadPoolExecutor( + max_workers=max(1, len(self._robots)), + thread_name_prefix="manip-plan", + ) + # Start TF publishing thread if any robot has tf_extra_links if any(c.tf_extra_links for _, c, _ in self._robots.values()): _ = self.tf # Eager init @@ -406,23 +435,37 @@ def is_collision_free(self, joints: list[float], robot_name: RobotName | None = def _begin_planning( self, robot_name: RobotName | None = None - ) -> tuple[RobotName, WorldRobotID] | None: - """Check state and begin planning. Returns (robot_name, robot_id) or None. + ) -> tuple[RobotName, WorldRobotID, int] | None: + """Per-robot gating + job registration. Returns (name, robot_id, request_id) or None. + + On acceptance, clears the robot's stale plan and registers a new + PlanningJob (with future=None — caller submits the worker and assigns it). Args: robot_name: Robot to plan for (required if multiple robots configured) """ - if self._world_monitor is None: + if self._world_monitor is None or self._planner is None or self._planning_pool is None: logger.error("Planning not initialized") return None if (robot := self._get_robot(robot_name)) is None: return None + name, robot_id, _, _ = robot with self._lock: - if self._state not in (ManipulationState.IDLE, ManipulationState.COMPLETED): - logger.warning(f"Cannot plan: state is {self._state.name}") + existing = self._planning_jobs.get(name) + if existing is not None and (existing.future is None or not existing.future.done()): + logger.warning(f"Cannot plan: '{name}' already has an active planning job") + return None + if self._executing.get(name, False): + logger.warning(f"Cannot plan: '{name}' is currently executing") return None - self._state = ManipulationState.PLANNING - return robot[0], robot[1] + self._planned_paths.pop(name, None) + self._planned_trajectories.pop(name, None) + request_id = self._next_request_id + self._next_request_id += 1 + self._planning_jobs[name] = PlanningJob( + request_id=request_id, accepted_at=time.time() + ) + return name, robot_id, request_id def _fail(self, msg: str) -> bool: """Set FAULT state with error message.""" @@ -431,57 +474,40 @@ def _fail(self, msg: str) -> bool: self._error_message = msg return False - def _dismiss_preview(self, robot_id: WorldRobotID) -> None: - """Hide the preview ghost if the world supports it.""" - if self._world_monitor is None: - return - world = self._world_monitor.world - if hasattr(world, "hide_preview"): - world.hide_preview(robot_id) - world.publish_visualization() - @rpc def plan_to_pose(self, pose: Pose, robot_name: RobotName | None = None) -> bool: - """Plan motion to pose. Use preview_path() then execute(). + """Submit an async motion plan to a target pose. Returns True on accept. + + Planning runs in the background. Use wait_for_planning_completion() + before preview_path() / execute(). Args: pose: Target end-effector pose robot_name: Robot to plan for (required if multiple robots configured) """ - if self._kinematics is None or (r := self._begin_planning(robot_name)) is None: + if self._kinematics is None: + logger.error("Planning not initialized") return False - robot_name, robot_id = r - assert self._world_monitor # guaranteed by _begin_planning - - current = self._world_monitor.get_current_joint_state(robot_id) - if current is None: - return self._fail("No joint state") - - # Convert Pose to PoseStamped for the IK solver - from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped - - target_pose = PoseStamped( - frame_id="world", - position=pose.position, - orientation=pose.orientation, - ) - - ik = self._kinematics.solve( - world=self._world_monitor.world, - robot_id=robot_id, - target_pose=target_pose, - seed=current, - check_collision=True, + if (r := self._begin_planning(robot_name)) is None: + return False + name, robot_id, request_id = r + logger.info(f"Plan-to-pose accepted for '{name}' (request_id={request_id})") + assert self._planning_pool is not None # guaranteed by _begin_planning + future = self._planning_pool.submit( + self._planning_worker, name, robot_id, request_id, pose, None ) - if not ik.is_success() or ik.joint_state is None: - return self._fail(f"IK failed: {ik.status.name}") - - logger.info(f"IK solved, error: {ik.position_error:.4f}m") - return self._plan_path_only(robot_name, robot_id, ik.joint_state) + with self._lock: + job = self._planning_jobs.get(name) + if job is not None and job.request_id == request_id: + job.future = future + return True @rpc def plan_to_joints(self, joints: JointState, robot_name: RobotName | None = None) -> bool: - """Plan motion to joint config. Use preview_path() then execute(). + """Submit an async motion plan to a joint configuration. Returns True on accept. + + Planning runs in the background. Use wait_for_planning_completion() + before preview_path() / execute(). Args: joints: Target joint state (names + positions) @@ -489,49 +515,134 @@ def plan_to_joints(self, joints: JointState, robot_name: RobotName | None = None """ if (r := self._begin_planning(robot_name)) is None: return False - robot_name, robot_id = r - logger.info(f"Planning to joints for {robot_name}: {[f'{j:.3f}' for j in joints.position]}") - return self._plan_path_only(robot_name, robot_id, joints) + name, robot_id, request_id = r + logger.info( + f"Plan-to-joints accepted for '{name}' (request_id={request_id}): " + f"[{', '.join(f'{j:.3f}' for j in joints.position)}]" + ) + assert self._planning_pool is not None # guaranteed by _begin_planning + future = self._planning_pool.submit( + self._planning_worker, name, robot_id, request_id, None, joints + ) + with self._lock: + job = self._planning_jobs.get(name) + if job is not None and job.request_id == request_id: + job.future = future + return True - def _plan_path_only( - self, robot_name: RobotName, robot_id: WorldRobotID, goal: JointState - ) -> bool: - """Plan path from current position to goal, store result.""" - assert self._world_monitor and self._planner # guaranteed by _begin_planning - self._dismiss_preview(robot_id) - start = self._world_monitor.get_current_joint_state(robot_id) - if start is None: - return self._fail("No joint state") - - # Trim goal to planner DOF (e.g. strip gripper joint from coordinator state) - planner_dof = len(start.position) - if len(goal.position) > planner_dof: - goal = JointState( - name=list(goal.name[:planner_dof]) if goal.name else [], - position=list(goal.position[:planner_dof]), - ) + def _planning_worker( + self, + robot_name: RobotName, + robot_id: WorldRobotID, + request_id: int, + target_pose: Pose | None, + target_joints: JointState | None, + ) -> None: + """Run a planning job on the pool. Result is recorded via compare-and-store. + + Lock-free against WorldMonitor (Drake scratch contexts handle isolation). + Only reacquires self._lock at the very end to publish the result. + """ + try: + assert self._world_monitor is not None and self._planner is not None - result = self._planner.plan_joint_path( - world=self._world_monitor.world, - robot_id=robot_id, - start=start, - goal=goal, - timeout=self.config.planning_timeout, - ) - if not result.is_success(): - return self._fail(f"Planning failed: {result.status.name}") + # Hide any active preview for this robot before planning kicks off. + # Serialized inside WorldMonitor — the live world is not thread-safe. + self._world_monitor.dismiss_preview(robot_id) - logger.info(f"Path: {len(result.path)} waypoints") - self._planned_paths[robot_name] = result.path + start = self._world_monitor.get_current_joint_state(robot_id) + if start is None: + self._complete_job(robot_name, request_id, False, "No joint state", None, None) + return - _, _, traj_gen = self._robots[robot_name] - # Convert JointState path to list of position lists for trajectory generator - traj = traj_gen.generate([list(state.position) for state in result.path]) - self._planned_trajectories[robot_name] = traj - logger.info(f"Trajectory: {traj.duration:.3f}s") + if target_pose is not None: + assert self._kinematics is not None + from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped - self._state = ManipulationState.COMPLETED - return True + ps = PoseStamped( + frame_id="world", + position=target_pose.position, + orientation=target_pose.orientation, + ) + ik = self._kinematics.solve( + world=self._world_monitor.world, + robot_id=robot_id, + target_pose=ps, + seed=start, + check_collision=True, + ) + if not ik.is_success() or ik.joint_state is None: + self._complete_job( + robot_name, request_id, False, + f"IK failed: {ik.status.name}", None, None, + ) + return + logger.info(f"IK solved for '{robot_name}', error: {ik.position_error:.4f}m") + goal = ik.joint_state + else: + assert target_joints is not None + goal = target_joints + + # Trim goal to planner DOF (e.g. strip gripper joint from coordinator state) + planner_dof = len(start.position) + if len(goal.position) > planner_dof: + goal = JointState( + name=list(goal.name[:planner_dof]) if goal.name else [], + position=list(goal.position[:planner_dof]), + ) + + result = self._planner.plan_joint_path( + world=self._world_monitor.world, + robot_id=robot_id, + start=start, + goal=goal, + timeout=self.config.planning_timeout, + ) + if not result.is_success(): + self._complete_job( + robot_name, request_id, False, + f"Planning failed: {result.status.name}", None, None, + ) + return + + logger.info(f"Path for '{robot_name}': {len(result.path)} waypoints") + _, _, traj_gen = self._robots[robot_name] + traj = traj_gen.generate([list(state.position) for state in result.path]) + logger.info(f"Trajectory for '{robot_name}': {traj.duration:.3f}s") + + self._complete_job(robot_name, request_id, True, None, result.path, traj) + except Exception as e: + logger.exception(f"Planning worker error for '{robot_name}'") + self._complete_job(robot_name, request_id, False, str(e), None, None) + + def _complete_job( + self, + robot_name: RobotName, + request_id: int, + success: bool, + error: str | None, + path: JointPath | None, + traj: JointTrajectory | None, + ) -> None: + """Compare-and-store completion. Drops the result if the job was + superseded by a newer accept or invalidated by reset(). + """ + with self._lock: + job = self._planning_jobs.get(robot_name) + if job is None or job.request_id != request_id or job.invalidated: + logger.info( + f"Dropping planning result for '{robot_name}' " + f"req={request_id} (superseded or invalidated)" + ) + return + job.completed_at = time.time() + job.success = success + job.error = error + if success and path is not None and traj is not None: + self._planned_paths[robot_name] = path + self._planned_trajectories[robot_name] = traj + elif not success and error: + self._error_message = error @rpc def preview_path(self, duration: float = 3.0, robot_name: RobotName | None = None) -> bool: @@ -1248,6 +1359,12 @@ def stop(self) -> None: self._tf_thread.join(timeout=DEFAULT_THREAD_JOIN_TIMEOUT) self._tf_thread = None + # Drain planning pool before the WorldMonitor goes away — in-flight RRT/IK + # calls hold Drake contexts derived from the world. + if self._planning_pool is not None: + self._planning_pool.shutdown(wait=True, cancel_futures=True) + self._planning_pool = None + # Stop world monitor (includes visualization thread) if self._world_monitor is not None: self._world_monitor.stop_all_monitors() diff --git a/dimos/manipulation/planning/monitor/world_monitor.py b/dimos/manipulation/planning/monitor/world_monitor.py index c55a5bad75..b56a75d235 100644 --- a/dimos/manipulation/planning/monitor/world_monitor.py +++ b/dimos/manipulation/planning/monitor/world_monitor.py @@ -428,6 +428,19 @@ def publish_visualization(self) -> None: if hasattr(self._world, "publish_visualization"): self._world.publish_visualization() + def dismiss_preview(self, robot_id: WorldRobotID) -> None: + """Hide the preview ghost for a robot and publish viz update. + + Serialized under the WorldMonitor lock — the live world mutates + non-scratch state, so concurrent planners must not call hide_preview + or publish_visualization unsynchronized. + """ + with self._lock: + if hasattr(self._world, "hide_preview"): + self._world.hide_preview(robot_id) + if hasattr(self._world, "publish_visualization"): + self._world.publish_visualization() + def start_visualization_thread(self, rate_hz: float = 10.0) -> None: """Start background thread for visualization updates at given rate.""" if self._viz_thread is not None and self._viz_thread.is_alive(): From a20011756a7340904fcd8e99c139413dc221cd66 Mon Sep 17 00:00:00 2001 From: parav nagarsheth Date: Tue, 12 May 2026 10:48:45 +0000 Subject: [PATCH 02/15] add planning state apis --- dimos/manipulation/manipulation_module.py | 113 +++++++++++++++++++++- 1 file changed, 108 insertions(+), 5 deletions(-) diff --git a/dimos/manipulation/manipulation_module.py b/dimos/manipulation/manipulation_module.py index 674c211d91..9298e094f4 100644 --- a/dimos/manipulation/manipulation_module.py +++ b/dimos/manipulation/manipulation_module.py @@ -24,7 +24,7 @@ from __future__ import annotations -from concurrent.futures import Future, ThreadPoolExecutor +from concurrent.futures import Future, ThreadPoolExecutor, wait as futures_wait from dataclasses import dataclass from enum import Enum import threading @@ -148,6 +148,7 @@ def __init__(self, **kwargs: Any) -> None: # Per-robot async planning job tracking self._planning_jobs: dict[RobotName, PlanningJob] = {} self._executing: dict[RobotName, bool] = {} + self._last_op_success: dict[RobotName, bool] = {} self._next_request_id: int = 0 self._planning_pool: ThreadPoolExecutor | None = None @@ -360,8 +361,29 @@ def _tf_publish_loop(self) -> None: @rpc def get_state(self) -> str: - """Get current manipulation state name.""" - return self._state.name + """Get aggregate manipulation state derived from per-robot job records. + + may need to be revisited + + Priority: FAULT > PLANNING > EXECUTING > COMPLETED > IDLE. + """ + with self._lock: + if self._state == ManipulationState.FAULT: + return ManipulationState.FAULT.name + if any(j.success is False for j in self._planning_jobs.values()): + return ManipulationState.FAULT.name + if any( + j.future is None or not j.future.done() + for j in self._planning_jobs.values() + ): + return ManipulationState.PLANNING.name + # During transition, execute() still flips self._state to EXECUTING/COMPLETED. + # Once execute() is rewritten to use self._executing, the self._state check drops. + if self._state == ManipulationState.EXECUTING or any(self._executing.values()): + return ManipulationState.EXECUTING.name + if any(self._last_op_success.values()): + return ManipulationState.COMPLETED.name + return ManipulationState.IDLE.name @rpc def get_error(self) -> str: @@ -391,8 +413,10 @@ def reset(self) -> str: """ if self._state == ManipulationState.EXECUTING: return "Error: Cannot reset while executing — cancel the motion first" - self._state = ManipulationState.IDLE - self._error_message = "" + with self._lock: + self._state = ManipulationState.IDLE + self._error_message = "" + self._last_op_success.clear() return "Reset to IDLE — ready for new commands" @rpc @@ -530,6 +554,84 @@ def plan_to_joints(self, joints: JointState, robot_name: RobotName | None = None job.future = future return True + def _build_status(self, robot_name: RobotName, job: PlanningJob | None) -> dict[str, Any]: + """Caller must hold self._lock.""" + if job is None: + return { + "robot_name": robot_name, + "active": False, + "accepted": False, + "done": False, + "success": None, + "error": None, + "duration_s": None, + } + done = job.future is not None and job.future.done() + end_time = job.completed_at if job.completed_at is not None else time.time() + return { + "robot_name": robot_name, + "active": not done, + "accepted": True, + "done": done, + "success": job.success, + "error": job.error, + "duration_s": end_time - job.accepted_at, + } + + @rpc + def get_planning_status( + self, robot_name: RobotName | None = None + ) -> dict[str, Any]: + """Return per-robot planning job status. + + With a robot_name, returns a single status dict for that robot. + With robot_name=None, returns a {robot_name: status_dict} map covering + every robot that has ever had a planning job submitted. + """ + with self._lock: + if robot_name is not None: + return self._build_status(robot_name, self._planning_jobs.get(robot_name)) + return { + name: self._build_status(name, job) + for name, job in self._planning_jobs.items() + } + + @rpc + def wait_for_planning_completion( + self, robot_name: RobotName | None = None, timeout: float | None = None + ) -> bool: + """Block until planning job(s) reach a terminal state. + + Returns True iff every waited-on job completed within the timeout + (regardless of success). Check get_planning_status() or has_planned_path() + to distinguish success from planning failure. + + With robot_name=None, timeout is a total wall-clock budget across all + currently-active robots, not per-robot. + + Args: + robot_name: robot name to wait for + timeout: Max time to wait in seconds + """ + with self._lock: + if robot_name is not None: + job = self._planning_jobs.get(robot_name) + futures = ( + [job.future] + if job is not None and job.future is not None + else [] + ) + else: + futures = [ + j.future for j in self._planning_jobs.values() if j.future is not None + ] + + if not futures: + return True + + _, not_done = futures_wait(futures, timeout=timeout) + return not not_done + def _planning_worker( self, robot_name: RobotName, @@ -638,6 +740,7 @@ def _complete_job( job.completed_at = time.time() job.success = success job.error = error + self._last_op_success[robot_name] = success if success and path is not None and traj is not None: self._planned_paths[robot_name] = path self._planned_trajectories[robot_name] = traj From 848b76ad218ff172ab86c965783f75a42416109c Mon Sep 17 00:00:00 2001 From: parav nagarsheth Date: Tue, 12 May 2026 16:48:15 +0000 Subject: [PATCH 03/15] rewrite cancel, reset, execute for handling in-flight planning --- dimos/manipulation/manipulation_module.py | 199 +++++++++++++++------- 1 file changed, 142 insertions(+), 57 deletions(-) diff --git a/dimos/manipulation/manipulation_module.py b/dimos/manipulation/manipulation_module.py index 9298e094f4..74a8d4f453 100644 --- a/dimos/manipulation/manipulation_module.py +++ b/dimos/manipulation/manipulation_module.py @@ -370,16 +370,16 @@ def get_state(self) -> str: with self._lock: if self._state == ManipulationState.FAULT: return ManipulationState.FAULT.name - if any(j.success is False for j in self._planning_jobs.values()): + # FAULT reads _last_op_success (cleared by reset) rather than the + # historical _planning_jobs records, so reset truly clears the fault. + if any(s is False for s in self._last_op_success.values()): return ManipulationState.FAULT.name if any( j.future is None or not j.future.done() for j in self._planning_jobs.values() ): return ManipulationState.PLANNING.name - # During transition, execute() still flips self._state to EXECUTING/COMPLETED. - # Once execute() is rewritten to use self._executing, the self._state check drops. - if self._state == ManipulationState.EXECUTING or any(self._executing.values()): + if any(self._executing.values()): return ManipulationState.EXECUTING.name if any(self._last_op_success.values()): return ManipulationState.COMPLETED.name @@ -395,28 +395,52 @@ def get_error(self) -> str: return self._error_message @rpc - def cancel(self) -> bool: - """Cancel current motion.""" - if self._state != ManipulationState.EXECUTING: - return False - self._state = ManipulationState.IDLE - logger.info("Motion cancelled") - return True + def cancel(self, robot_name: RobotName | None = None) -> bool: + """Clear the execute() accept window for one robot or all robots. + + Best-effort signal — coordinator interaction is unchanged. Does not + cancel in-flight planning jobs (use reset() for that). + + Args: + robot_name: Robot to cancel. None cancels every robot currently + in its execute() accept window. + + Returns: + True if at least one robot was cancelled, else False. + """ + with self._lock: + if robot_name is not None: + if not self._executing.get(robot_name, False): + return False + self._executing[robot_name] = False + logger.info(f"Cancelled execute window for '{robot_name}'") + return True + if not any(self._executing.values()): + return False + self._executing = {k: False for k in self._executing} + logger.info("Cancelled execute window for all robots") + return True @rpc @skill def reset(self) -> str: - """Reset the robot module to IDLE state, clearing any fault. + """Reset the module to IDLE: drop stored plans and invalidate active jobs. - Use this after an error or fault to allow new commands. - Cannot reset while a motion is executing — cancel first. + In-flight planner threads keep running (Drake's RRT is not preemptable), + but their late results are dropped at compare-and-store via the + invalidated flag. Use cancel() to abort an active coordinator execution. """ - if self._state == ManipulationState.EXECUTING: - return "Error: Cannot reset while executing — cancel the motion first" with self._lock: - self._state = ManipulationState.IDLE - self._error_message = "" + if any(self._executing.values()): + return "Error: Cannot reset while executing — cancel the motion first" + for job in self._planning_jobs.values(): + if job.future is None or not job.future.done(): + job.invalidated = True + self._planned_paths.clear() + self._planned_trajectories.clear() self._last_op_success.clear() + self._error_message = "" + self._state = ManipulationState.IDLE return "Reset to IDLE — ready for new commands" @rpc @@ -476,7 +500,11 @@ def _begin_planning( name, robot_id, _, _ = robot with self._lock: existing = self._planning_jobs.get(name) - if existing is not None and (existing.future is None or not existing.future.done()): + if ( + existing is not None + and not existing.invalidated + and (existing.future is None or not existing.future.done()) + ): logger.warning(f"Cannot plan: '{name}' already has an active planning job") return None if self._executing.get(name, False): @@ -559,23 +587,27 @@ def _build_status(self, robot_name: RobotName, job: PlanningJob | None) -> dict[ if job is None: return { "robot_name": robot_name, + "request_id": None, "active": False, "accepted": False, "done": False, "success": None, "error": None, "duration_s": None, + "invalidated": False, } done = job.future is not None and job.future.done() end_time = job.completed_at if job.completed_at is not None else time.time() return { "robot_name": robot_name, + "request_id": job.request_id, "active": not done, "accepted": True, "done": done, "success": job.success, "error": job.error, "duration_s": end_time - job.accepted_at, + "invalidated": job.invalidated, } @rpc @@ -751,6 +783,8 @@ def _complete_job( def preview_path(self, duration: float = 3.0, robot_name: RobotName | None = None) -> bool: """Preview the planned path in the visualizer. + Rejects if a planning job is still in flight for this robot. + Args: duration: Total animation duration in seconds robot_name: Robot to preview (required if multiple robots configured) @@ -765,30 +799,53 @@ def preview_path(self, duration: float = 3.0, robot_name: RobotName | None = Non return False robot_name, robot_id, _, _ = robot - planned_path = self._planned_paths.get(robot_name) - if planned_path is None or len(planned_path) == 0: - logger.warning(f"No planned path to preview for {robot_name}") - return False - - # Interpolate and animate + with self._lock: + job = self._planning_jobs.get(robot_name) + if ( + job is not None + and not job.invalidated + and (job.future is None or not job.future.done()) + ): + logger.warning( + f"Cannot preview: planning still active for '{robot_name}'" + ) + return False + planned_path = self._planned_paths.get(robot_name) + if planned_path is None or len(planned_path) == 0: + logger.warning(f"No planned path to preview for {robot_name}") + return False + + # Animate outside the lock — the WorldMonitor handles its own + # synchronization for visualization writes. interpolated = interpolate_path(planned_path, resolution=0.1) self._world_monitor.world.animate_path(robot_id, interpolated, duration) return True @rpc - def has_planned_path(self) -> bool: - """Check if there's a planned path ready. + def has_planned_path(self, robot_name: RobotName | None = None) -> bool: + """Check if a usable planned path exists for the given robot. - Returns: - True if a path is planned and ready + Returns False if a planning job is still in flight, since the stored + path (if any) reflects a stale prior plan that was cleared on accept. + + Args: + robot_name: Robot to query (required if multiple robots configured) """ - robot = self._get_robot() + robot = self._get_robot(robot_name) if robot is None: return False robot_name, _, _, _ = robot - path = self._planned_paths.get(robot_name) - return path is not None and len(path) > 0 + with self._lock: + job = self._planning_jobs.get(robot_name) + if ( + job is not None + and not job.invalidated + and (job.future is None or not job.future.done()) + ): + return False + path = self._planned_paths.get(robot_name) + return path is not None and len(path) > 0 @rpc def get_visualization_url(self) -> str | None: @@ -802,19 +859,20 @@ def get_visualization_url(self) -> str | None: return self._world_monitor.get_visualization_url() @rpc - def clear_planned_path(self) -> bool: - """Clear the stored planned path. + def clear_planned_path(self, robot_name: RobotName | None = None) -> bool: + """Clear the stored planned path and trajectory for the given robot. - Returns: - True if cleared + Args: + robot_name: Robot whose stored plan to drop (required if multiple + robots configured). """ - robot = self._get_robot() + robot = self._get_robot(robot_name) if robot is None: return False robot_name, _, _, _ = robot - - self._planned_paths.pop(robot_name, None) - self._planned_trajectories.pop(robot_name, None) + with self._lock: + self._planned_paths.pop(robot_name, None) + self._planned_trajectories.pop(robot_name, None) return True @rpc @@ -958,14 +1016,16 @@ def _translate_trajectory_to_coordinator( @rpc def execute(self, robot_name: RobotName | None = None) -> bool: - """Execute planned trajectory via ControlCoordinator.""" + """Ship the stored trajectory to the coordinator for execution. + + Rejects if a planning job is still in flight or no trajectory is stored + for this robot. Sets self._executing[robot_name] during the brief + task_invoke accept window so cancel(robot_name) and aggregate + get_state() can see the EXECUTING state. + """ if (robot := self._get_robot(robot_name)) is None: return False robot_name, _, config, _ = robot - - if (traj := self._planned_trajectories.get(robot_name)) is None: - logger.warning("No planned trajectory") - return False if not config.coordinator_task_name: logger.error(f"No coordinator_task_name for '{robot_name}'") return False @@ -973,21 +1033,46 @@ def execute(self, robot_name: RobotName | None = None) -> bool: logger.error("No coordinator client") return False - translated = self._translate_trajectory_to_coordinator(traj, config) - logger.info( - f"Executing: task='{config.coordinator_task_name}', {len(translated.points)} pts, {translated.duration:.2f}s" - ) + with self._lock: + job = self._planning_jobs.get(robot_name) + if ( + job is not None + and not job.invalidated + and (job.future is None or not job.future.done()) + ): + logger.warning( + f"Cannot execute: planning still active for '{robot_name}'" + ) + return False + traj = self._planned_trajectories.get(robot_name) + if traj is None: + logger.warning(f"No planned trajectory for '{robot_name}'") + return False + self._executing[robot_name] = True - self._state = ManipulationState.EXECUTING - result = client.task_invoke( - config.coordinator_task_name, "execute", {"trajectory": translated} - ) + try: + translated = self._translate_trajectory_to_coordinator(traj, config) + logger.info( + f"Executing '{robot_name}': task='{config.coordinator_task_name}', " + f"{len(translated.points)} pts, {translated.duration:.2f}s" + ) + result = client.task_invoke( + config.coordinator_task_name, "execute", {"trajectory": translated} + ) + finally: + with self._lock: + self._executing[robot_name] = False + + with self._lock: + self._last_op_success[robot_name] = bool(result) + if not result: + self._state = ManipulationState.FAULT + self._error_message = f"Coordinator rejected trajectory for '{robot_name}'" if result: - logger.info("Trajectory accepted") - self._state = ManipulationState.COMPLETED + logger.info(f"Trajectory accepted for '{robot_name}'") return True - else: - return self._fail("Coordinator rejected trajectory") + logger.warning(f"Coordinator rejected trajectory for '{robot_name}'") + return False @rpc def get_trajectory_status(self, robot_name: RobotName | None = None) -> dict[str, Any] | None: From 7b7e9fd3c7779b0dc9a24d4b1664697b25204cd1 Mon Sep 17 00:00:00 2001 From: parav nagarsheth Date: Tue, 12 May 2026 17:50:10 +0000 Subject: [PATCH 04/15] pick & place, manip api tweaks --- dimos/manipulation/manipulation_module.py | 42 +++++++++++++++------ dimos/manipulation/pick_and_place_module.py | 22 +++++++---- 2 files changed, 45 insertions(+), 19 deletions(-) diff --git a/dimos/manipulation/manipulation_module.py b/dimos/manipulation/manipulation_module.py index 74a8d4f453..9a816a383e 100644 --- a/dimos/manipulation/manipulation_module.py +++ b/dimos/manipulation/manipulation_module.py @@ -1290,20 +1290,40 @@ def _lift_if_low(self, robot_name: RobotName | None = None, min_z: float = 0.05) logger.info(f"EE z={ee.position.z:.3f} < {min_z}, lifting to z={lift_z:.3f}") lift_pose = Pose(Vector3(ee.position.x, ee.position.y, lift_z), ee.orientation) if not self.plan_to_pose(lift_pose, robot_name): - return f"Error: Failed to plan lift from z={ee.position.z:.3f}" + return f"Error: Planning request rejected for lift from z={ee.position.z:.3f}" return self._preview_execute_wait(robot_name) + def _wait_plan(self, robot_name: RobotName | None = None) -> str | None: + """Block on the in-flight planning job and validate its result. + + Returns None on success, or an error string. Callers that want to + retry on planning failure (e.g. pick() iterating grasp candidates) + should use this directly; sites that always bail on failure can use + the bundled _preview_execute_wait(). + """ + plan_timeout = self.config.planning_timeout + 5.0 + if not self.wait_for_planning_completion(robot_name, plan_timeout): + return f"Error: Planning timed out after {plan_timeout:.1f}s" + status = self.get_planning_status(robot_name) + if not status.get("success"): + err = status.get("error") or "unknown reason" + return f"Error: Planning failed — {err}" + return None + def _preview_execute_wait( self, robot_name: RobotName | None = None, preview_duration: float = 0.5 ) -> str | None: - """Preview planned path, execute, and wait for completion. + """Wait for in-flight planning, then preview, execute, and wait for the trajectory. - Returns None on success, or an error string on failure. - - Args: - robot_name: Robot to operate on - preview_duration: Duration to animate the preview in Meshcat (seconds) + Returns None on success, or an error string on failure. Run this after + plan_to_*() returns True (accepted): the planning job is in flight, + and this helper blocks until it finishes, then runs the rest of the + motion sequence. Idempotent w.r.t. the wait — if planning is already + done, the wait is a no-op. """ + if (err := self._wait_plan(robot_name)) is not None: + return err + logger.info("Previewing trajectory...") self.preview_path(preview_duration, robot_name) @@ -1405,7 +1425,7 @@ def move_to_pose( return err if not self.plan_to_pose(pose, robot_name): - return f"Error: Planning failed — pose ({x:.3f}, {y:.3f}, {z:.3f}) may be unreachable or in collision" + return f"Error: Planning request rejected for pose ({x:.3f}, {y:.3f}, {z:.3f})" err = self._preview_execute_wait(robot_name) if err: @@ -1440,7 +1460,7 @@ def move_to_joints( logger.info(f"Planning motion to joints [{', '.join(f'{j:.3f}' for j in joint_values)}]...") if not self.plan_to_joints(goal, rname): - return "Error: Planning failed — joint configuration may be unreachable or in collision" + return "Error: Planning request rejected for target joint configuration" err = self._preview_execute_wait(robot_name) if err: @@ -1472,7 +1492,7 @@ def go_home(self, robot_name: str | None = None) -> str: goal = JointState(name=config.joint_names, position=config.home_joints) logger.info("Planning motion to home position...") if not self.plan_to_joints(goal, rname): - return "Error: Failed to plan path to home position" + return "Error: Planning request rejected for home position" err = self._preview_execute_wait(robot_name) if err: @@ -1528,7 +1548,7 @@ def go_init(self, robot_name: str | None = None) -> str: f"Planning motion to init position [{', '.join(f'{j:.3f}' for j in init.position)}]..." ) if not self.plan_to_joints(init, robot_name): - return "Error: Failed to plan path to init position" + return "Error: Planning request rejected for init position" err = self._preview_execute_wait(robot_name) if err: diff --git a/dimos/manipulation/pick_and_place_module.py b/dimos/manipulation/pick_and_place_module.py index 9ca223832c..6a890549d2 100644 --- a/dimos/manipulation/pick_and_place_module.py +++ b/dimos/manipulation/pick_and_place_module.py @@ -577,15 +577,21 @@ def pick( logger.info(f"Planning approach to pre-grasp (attempt {i + 1}/{max_attempts})...") if not self.plan_to_pose(pre_grasp_pose, rname): - logger.info(f"Grasp candidate {i + 1} approach planning failed, trying next") - continue # Try next candidate + logger.info(f"Grasp candidate {i + 1} approach rejected, trying next") + continue + # Wait for the async planner before committing to this candidate — + # planning may still fail after acceptance (collision, IK). + plan_err = self._wait_plan(rname) + if plan_err: + logger.info(f"Grasp candidate {i + 1} {plan_err}, trying next") + continue # 3. Open gripper before approach logger.info("Opening gripper...") self._set_gripper_position(0.85, rname) time.sleep(0.5) - # 4. Execute approach to pre-grasp + # 4. Execute approach to pre-grasp (planning already validated) err = self._preview_execute_wait(rname) if err: return err @@ -593,7 +599,7 @@ def pick( # 5. Move to grasp pose logger.info("Moving to grasp position...") if not self.plan_to_pose(grasp_pose, rname): - return "Error: Grasp pose planning failed" + return "Error: Grasp pose planning request rejected" err = self._preview_execute_wait(rname) if err: return err @@ -606,7 +612,7 @@ def pick( # 7. Retract to pre-grasp logger.info("Retracting with object...") if not self.plan_to_pose(pre_grasp_pose, rname): - return "Error: Retract planning failed" + return "Error: Retract planning request rejected" err = self._preview_execute_wait(rname) if err: return err @@ -672,7 +678,7 @@ def _place_with_orientation( # 1. Move to pre-place logger.info(f"Planning approach to place position ({x:.3f}, {y:.3f}, {z:.3f})...") if not self.plan_to_pose(pre_place_pose, rname): - return "Error: Pre-place approach planning failed" + return "Error: Pre-place approach planning request rejected" err = self._preview_execute_wait(rname) if err: @@ -681,7 +687,7 @@ def _place_with_orientation( # 2. Lower to place position logger.info("Lowering to place position...") if not self.plan_to_pose(place_pose, rname): - return "Error: Place pose planning failed" + return "Error: Place pose planning request rejected" err = self._preview_execute_wait(rname) if err: return err @@ -694,7 +700,7 @@ def _place_with_orientation( # 4. Retract logger.info("Retracting...") if not self.plan_to_pose(pre_place_pose, rname): - return "Error: Retract planning failed" + return "Error: Retract planning request rejected" err = self._preview_execute_wait(rname) if err: return err From 0923a9d39af592014f19ac87c59ad03bd38d94fd Mon Sep 17 00:00:00 2001 From: parav nagarsheth Date: Tue, 12 May 2026 18:27:41 +0000 Subject: [PATCH 05/15] add / modify unit tests --- dimos/manipulation/manipulation_module.py | 2 +- .../manipulation/test_manipulation_module.py | 302 +++++++++++++++++- 2 files changed, 297 insertions(+), 7 deletions(-) diff --git a/dimos/manipulation/manipulation_module.py b/dimos/manipulation/manipulation_module.py index 9a816a383e..cba0213e08 100644 --- a/dimos/manipulation/manipulation_module.py +++ b/dimos/manipulation/manipulation_module.py @@ -375,7 +375,7 @@ def get_state(self) -> str: if any(s is False for s in self._last_op_success.values()): return ManipulationState.FAULT.name if any( - j.future is None or not j.future.done() + not j.invalidated and (j.future is None or not j.future.done()) for j in self._planning_jobs.values() ): return ManipulationState.PLANNING.name diff --git a/dimos/manipulation/test_manipulation_module.py b/dimos/manipulation/test_manipulation_module.py index 3d9f22db9a..eea6ce8b24 100644 --- a/dimos/manipulation/test_manipulation_module.py +++ b/dimos/manipulation/test_manipulation_module.py @@ -21,7 +21,11 @@ from __future__ import annotations +from concurrent.futures import ThreadPoolExecutor import importlib.util +from pathlib import Path +import threading +import time from unittest.mock import MagicMock import pytest @@ -31,6 +35,11 @@ ManipulationState, ) from dimos.manipulation.planning.spec.config import RobotModelConfig +from dimos.manipulation.planning.spec.enums import PlanningStatus +from dimos.manipulation.planning.spec.models import PlanningResult +from dimos.manipulation.planning.trajectory_generator.joint_trajectory_generator import ( + JointTrajectoryGenerator, +) from dimos.msgs.geometry_msgs.Pose import Pose from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped from dimos.msgs.geometry_msgs.Quaternion import Quaternion @@ -119,6 +128,281 @@ def module(xarm7_config): mod.stop() +class _FakeWorld: + def __init__(self) -> None: + self.animated: list[tuple[str, int, float]] = [] + + def animate_path(self, robot_id: str, path: list[JointState], duration: float) -> None: + self.animated.append((robot_id, len(path), duration)) + + +class _FakeWorldMonitor: + def __init__(self) -> None: + self.world = _FakeWorld() + self.dismissed: list[str] = [] + self._states = { + "left_id": JointState(name=["j1", "j2"], position=[0.0, 0.0]), + "right_id": JointState(name=["j1", "j2"], position=[0.0, 0.0]), + } + + def dismiss_preview(self, robot_id: str) -> None: + self.dismissed.append(robot_id) + + def get_current_joint_state(self, robot_id: str) -> JointState | None: + return self._states.get(robot_id) + + def stop_all_monitors(self) -> None: + pass + + +class _PlanControl: + def __init__(self, release: threading.Event | None = None, succeed: bool = True) -> None: + self.started = threading.Event() + self.release = release or threading.Event() + self.succeed = succeed + + +class _GatedPlanner: + def __init__(self, controls: list[_PlanControl]) -> None: + self.controls = controls + self.calls: list[tuple[str, JointState]] = [] + self._lock = threading.Lock() + + def plan_joint_path( + self, + world: object, + robot_id: str, + start: JointState, + goal: JointState, + timeout: float = 10.0, + max_iterations: int = 5000, + ) -> PlanningResult: + with self._lock: + call_index = len(self.calls) + self.calls.append((robot_id, goal)) + control = self.controls[call_index] + + control.started.set() + if not control.release.wait(timeout=2.0): + raise TimeoutError(f"test planner was not released for {robot_id}") + + if not control.succeed: + return PlanningResult( + status=PlanningStatus.NO_SOLUTION, + message="synthetic planning failure", + ) + return PlanningResult( + status=PlanningStatus.SUCCESS, + path=[start, goal], + planning_time=0.01, + path_length=1.0, + iterations=1, + message="synthetic success", + ) + + +def _mock_robot_config(name: str) -> RobotModelConfig: + return RobotModelConfig( + name=name, + model_path=Path("/tmp/mock.urdf"), + base_pose=PoseStamped(position=Vector3(), orientation=Quaternion()), + joint_names=["j1", "j2"], + end_effector_link="tool", + coordinator_task_name=f"{name}_task", + ) + + +@pytest.fixture +def async_module(): + """Create a ManipulationModule wired with fake world/planner components.""" + mod = ManipulationModule(robots=[], planning_timeout=1.0, enable_viz=False) + mod._world_monitor = _FakeWorldMonitor() + mod._planning_pool = ThreadPoolExecutor(max_workers=2, thread_name_prefix="test-plan") + for name, robot_id in (("left_arm", "left_id"), ("right_arm", "right_id")): + config = _mock_robot_config(name) + mod._robots[name] = ( + robot_id, + config, + JointTrajectoryGenerator(num_joints=2, max_velocity=1.0, max_acceleration=2.0), + ) + try: + yield mod + finally: + mod.stop() + + +class TestAsyncPlanningUnit: + """Deterministic unit tests for ManipulationModule async planning semantics.""" + + def test_plan_accepts_quickly_and_path_appears_after_completion(self, async_module): + control = _PlanControl() + async_module._planner = _GatedPlanner([control]) + + start = time.monotonic() + accepted = async_module.plan_to_joints( + JointState(name=["j1", "j2"], position=[0.1, 0.1]), "left_arm" + ) + elapsed = time.monotonic() - start + + assert accepted is True + assert elapsed < 0.5 + assert control.started.wait(timeout=0.5) + assert async_module.get_state() == ManipulationState.PLANNING.name + assert async_module.has_planned_path("left_arm") is False + + control.release.set() + + assert async_module.wait_for_planning_completion("left_arm", timeout=1.0) + status = async_module.get_planning_status("left_arm") + assert status["active"] is False + assert status["done"] is True + assert status["success"] is True + assert status["duration_s"] is not None + assert async_module.get_state() == ManipulationState.COMPLETED.name + assert async_module.has_planned_path("left_arm") is True + + def test_distinct_robots_can_have_active_plans_simultaneously(self, async_module): + release = threading.Event() + left = _PlanControl(release=release) + right = _PlanControl(release=release) + planner = _GatedPlanner([left, right]) + async_module._planner = planner + + assert async_module.plan_to_joints( + JointState(name=["j1", "j2"], position=[0.1, 0.1]), "left_arm" + ) + assert async_module.plan_to_joints( + JointState(name=["j1", "j2"], position=[0.2, 0.2]), "right_arm" + ) + + assert left.started.wait(timeout=0.5) + assert right.started.wait(timeout=0.5) + assert len(planner.calls) == 2 + assert async_module.get_planning_status("left_arm")["active"] is True + assert async_module.get_planning_status("right_arm")["active"] is True + + release.set() + + assert async_module.wait_for_planning_completion(None, timeout=1.0) + assert async_module.has_planned_path("left_arm") is True + assert async_module.has_planned_path("right_arm") is True + + def test_plan_for_other_robot_is_accepted_while_one_robot_is_planning(self, async_module): + left = _PlanControl() + right = _PlanControl() + async_module._planner = _GatedPlanner([left, right]) + + assert async_module.plan_to_joints( + JointState(name=["j1", "j2"], position=[0.1, 0.1]), "left_arm" + ) + assert left.started.wait(timeout=0.5) + + assert async_module.plan_to_joints( + JointState(name=["j1", "j2"], position=[0.2, 0.2]), "right_arm" + ) + assert right.started.wait(timeout=0.5) + + left.release.set() + right.release.set() + assert async_module.wait_for_planning_completion(None, timeout=1.0) + + def test_same_robot_rejects_duplicate_active_plan(self, async_module): + control = _PlanControl() + async_module._planner = _GatedPlanner([control]) + + assert async_module.plan_to_joints( + JointState(name=["j1", "j2"], position=[0.1, 0.1]), "left_arm" + ) + assert control.started.wait(timeout=0.5) + + duplicate = async_module.plan_to_joints( + JointState(name=["j1", "j2"], position=[0.2, 0.2]), "left_arm" + ) + + assert duplicate is False + control.release.set() + assert async_module.wait_for_planning_completion("left_arm", timeout=1.0) + + def test_failed_planning_sets_fault_and_records_error(self, async_module): + control = _PlanControl(succeed=False) + async_module._planner = _GatedPlanner([control]) + + assert async_module.plan_to_joints( + JointState(name=["j1", "j2"], position=[0.1, 0.1]), "left_arm" + ) + assert control.started.wait(timeout=0.5) + control.release.set() + + assert async_module.wait_for_planning_completion("left_arm", timeout=1.0) + status = async_module.get_planning_status("left_arm") + assert status["success"] is False + assert "Planning failed" in status["error"] + assert async_module.get_state() == ManipulationState.FAULT.name + assert "Planning failed" in async_module.get_error() + + def test_late_invalidated_job_does_not_overwrite_newer_plan(self, async_module): + first = _PlanControl() + second = _PlanControl() + async_module._planner = _GatedPlanner([first, second]) + + assert async_module.plan_to_joints( + JointState(name=["j1", "j2"], position=[0.1, 0.1]), "left_arm" + ) + assert first.started.wait(timeout=0.5) + first_status = async_module.get_planning_status("left_arm") + assert first_status["active"] is True + + assert async_module.reset().startswith("Reset") + assert async_module.get_state() == ManipulationState.IDLE.name + assert async_module.get_planning_status("left_arm")["invalidated"] is True + + assert async_module.plan_to_joints( + JointState(name=["j1", "j2"], position=[0.2, 0.2]), "left_arm" + ) + assert second.started.wait(timeout=0.5) + second_request_id = async_module.get_planning_status("left_arm")["request_id"] + assert second_request_id != first_status["request_id"] + + first.release.set() + second.release.set() + + assert async_module.wait_for_planning_completion("left_arm", timeout=1.0) + status = async_module.get_planning_status("left_arm") + assert status["request_id"] == second_request_id + assert status["success"] is True + assert async_module._planned_paths["left_arm"][-1].position == [0.2, 0.2] + assert async_module._planned_trajectories["left_arm"].points[-1].positions == [0.2, 0.2] + + def test_preview_and_execute_reject_only_robot_with_active_plan(self, async_module): + right_done = _PlanControl() + left_active = _PlanControl() + async_module._planner = _GatedPlanner([right_done, left_active]) + + assert async_module.plan_to_joints( + JointState(name=["j1", "j2"], position=[0.3, 0.3]), "right_arm" + ) + assert right_done.started.wait(timeout=0.5) + right_done.release.set() + assert async_module.wait_for_planning_completion("right_arm", timeout=1.0) + + assert async_module.plan_to_joints( + JointState(name=["j1", "j2"], position=[0.1, 0.1]), "left_arm" + ) + assert left_active.started.wait(timeout=0.5) + + coordinator = MagicMock() + coordinator.task_invoke.return_value = True + async_module._coordinator_client = coordinator + + assert async_module.preview_path(0.1, "left_arm") is False + assert async_module.execute("left_arm") is False + assert async_module.preview_path(0.1, "right_arm") is True + assert async_module.execute("right_arm") is True + + left_active.release.set() + assert async_module.wait_for_planning_completion("left_arm", timeout=1.0) + + @pytest.mark.skipif(not _drake_available(), reason="Drake not installed") @pytest.mark.skipif(not _xarm_urdf_available(), reason="XArm URDF not available") class TestManipulationModuleIntegration: @@ -156,7 +440,8 @@ def test_plan_to_joints(self, module, joint_state_zeros): success = module.plan_to_joints(target) assert success is True - assert module._state == ManipulationState.COMPLETED + assert module.wait_for_planning_completion(timeout=15.0) is True + assert module.get_state() == ManipulationState.COMPLETED.name assert module.has_planned_path() is True assert "test_arm" in module._planned_trajectories @@ -208,6 +493,7 @@ def test_trajectory_name_translation(self, module, joint_state_zeros): success = module.plan_to_joints(JointState(position=[0.05] * 7)) assert success is True + assert module.wait_for_planning_completion(timeout=15.0) is True traj = module._planned_trajectories["test_arm"] robot_config = module._robots["test_arm"][1] @@ -229,6 +515,7 @@ def test_execute_with_mock_coordinator(self, module, joint_state_zeros): success = module.plan_to_joints(JointState(position=[0.05] * 7)) assert success is True + assert module.wait_for_planning_completion(timeout=15.0) is True # Mock the coordinator client mock_client = MagicMock() @@ -238,7 +525,7 @@ def test_execute_with_mock_coordinator(self, module, joint_state_zeros): result = module.execute() assert result is True - assert module._state == ManipulationState.COMPLETED + assert module.get_state() == ManipulationState.COMPLETED.name # Verify coordinator was called mock_client.task_invoke.assert_called_once() @@ -257,6 +544,7 @@ def test_execute_rejected_by_coordinator(self, module, joint_state_zeros): module._on_joint_state(joint_state_zeros) module.plan_to_joints(JointState(position=[0.05] * 7)) + assert module.wait_for_planning_completion(timeout=15.0) is True # Mock coordinator to reject mock_client = MagicMock() @@ -266,7 +554,7 @@ def test_execute_rejected_by_coordinator(self, module, joint_state_zeros): result = module.execute() assert result is False - assert module._state == ManipulationState.FAULT + assert module.get_state() == ManipulationState.FAULT.name assert "rejected" in module._error_message.lower() def test_state_transitions_during_execution(self, module, joint_state_zeros): @@ -277,14 +565,16 @@ def test_state_transitions_during_execution(self, module, joint_state_zeros): # Plan - should go through PLANNING -> COMPLETED module.plan_to_joints(JointState(position=[0.05] * 7)) - assert module._state == ManipulationState.COMPLETED + assert module.wait_for_planning_completion(timeout=15.0) is True + assert module.get_state() == ManipulationState.COMPLETED.name # Reset works from COMPLETED module.reset() - assert module._state == ManipulationState.IDLE + assert module.get_state() == ManipulationState.IDLE.name # Plan again module.plan_to_joints(JointState(position=[0.05] * 7)) + assert module.wait_for_planning_completion(timeout=15.0) is True # Mock coordinator mock_client = MagicMock() @@ -293,4 +583,4 @@ def test_state_transitions_during_execution(self, module, joint_state_zeros): # Execute - should go to EXECUTING then COMPLETED module.execute() - assert module._state == ManipulationState.COMPLETED + assert module.get_state() == ManipulationState.COMPLETED.name From ccd16fa3fe123eba9530f5932ee1cbb12b9dfe1a Mon Sep 17 00:00:00 2001 From: parav nagarsheth Date: Tue, 12 May 2026 18:49:36 +0000 Subject: [PATCH 06/15] add bimanual e2e test --- .../test_openarm_bimanual_planning.py | 168 ++++++++++++++++++ 1 file changed, 168 insertions(+) create mode 100644 dimos/e2e_tests/test_openarm_bimanual_planning.py diff --git a/dimos/e2e_tests/test_openarm_bimanual_planning.py b/dimos/e2e_tests/test_openarm_bimanual_planning.py new file mode 100644 index 0000000000..d112de71d5 --- /dev/null +++ b/dimos/e2e_tests/test_openarm_bimanual_planning.py @@ -0,0 +1,168 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""OpenArm bimanual manipulation e2e tests.""" + +from __future__ import annotations + +from collections.abc import Callable +from concurrent.futures import ThreadPoolExecutor +import time +from typing import Any + +import pytest + +from dimos.core.rpc_client import RPCClient +from dimos.manipulation.manipulation_module import ManipulationModule +from dimos.msgs.sensor_msgs.JointState import JointState + +_ROBOTS = ("left_arm", "right_arm") +_TARGET = [0.05] * 7 +_JOINT_STATE_TOPIC = "/coordinator/joint_state#sensor_msgs.JointState" +_PLANNING_TIMEOUT_S = 30.0 + + +def _wait_until( + predicate: Callable[[], bool], + *, + timeout_s: float, + interval_s: float = 0.1, +) -> None: + deadline = time.monotonic() + timeout_s + while time.monotonic() < deadline: + if predicate(): + return + time.sleep(interval_s) + raise AssertionError(f"condition was not met within {timeout_s:.1f}s") + + +def _wait_for_robot_state(client: RPCClient, robot_name: str) -> None: + _wait_until( + lambda: client.get_current_joints(robot_name) is not None, + timeout_s=10.0, + ) + + +def _assert_successful_plan_status(status: dict[str, Any], robot_name: str) -> None: + assert status["robot_name"] == robot_name + assert status["done"] is True + assert status["success"] is True + assert status["error"] is None + assert status["duration_s"] is not None + + +def _submit_plan_when_ready( + client: RPCClient, + robot_name: str, + target: list[float], + *, + timeout_s: float = 15.0, +) -> None: + deadline = time.monotonic() + timeout_s + while time.monotonic() < deadline: + if client.plan_to_joints(JointState(position=target), robot_name) is True: + return + time.sleep(0.25) + raise AssertionError(f"{robot_name} plan was not accepted within {timeout_s:.1f}s") + + +def _plan_and_wait( + client: RPCClient, + robot_name: str, + target: list[float], + *, + wait_until_ready: bool = False, +) -> float: + start = time.monotonic() + if wait_until_ready: + _submit_plan_when_ready(client, robot_name, target) + else: + accepted = client.plan_to_joints(JointState(position=target), robot_name) + assert accepted is True + assert client.wait_for_planning_completion(robot_name, _PLANNING_TIMEOUT_S) is True + status = client.get_planning_status(robot_name) + _assert_successful_plan_status(status, robot_name) + assert client.has_planned_path(robot_name) is True + return time.monotonic() - start + + +def _plan_concurrently(client: RPCClient, targets: dict[str, list[float]]) -> float: + def submit(robot_name: str) -> bool: + return bool(client.plan_to_joints(JointState(position=targets[robot_name]), robot_name)) + + start = time.monotonic() + with ThreadPoolExecutor(max_workers=len(targets)) as pool: + futures = {robot_name: pool.submit(submit, robot_name) for robot_name in targets} + for robot_name, future in futures.items(): + assert future.result(timeout=5.0) is True, f"{robot_name} plan was not accepted" + + assert client.wait_for_planning_completion(None, _PLANNING_TIMEOUT_S) is True + return time.monotonic() - start + + +def _execute_concurrently(client: RPCClient) -> None: + def execute(robot_name: str) -> bool: + return bool(client.execute(robot_name)) + + with ThreadPoolExecutor(max_workers=len(_ROBOTS)) as pool: + futures = {robot_name: pool.submit(execute, robot_name) for robot_name in _ROBOTS} + for robot_name, future in futures.items(): + assert future.result(timeout=5.0) is True, f"{robot_name} execute was not accepted" + + +@pytest.mark.skipif_in_ci +@pytest.mark.slow +def test_openarm_mock_bimanual_planning_overlap(lcm_spy, start_blueprint) -> None: + """OpenArm mock should plan both arms concurrently and execute both trajectories.""" + lcm_spy.save_topic(_JOINT_STATE_TOPIC) + start_blueprint("openarm-mock-planner-coordinator") + lcm_spy.wait_for_saved_topic(_JOINT_STATE_TOPIC) + + client = RPCClient(None, ManipulationModule) + try: + _wait_until( + lambda: set(_ROBOTS).issubset(set(client.list_robots())), + timeout_s=10.0, + ) + for robot_name in _ROBOTS: + _wait_for_robot_state(client, robot_name) + + targets = {robot_name: list(_TARGET) for robot_name in _ROBOTS} + + sequential_left_s = _plan_and_wait( + client, "left_arm", targets["left_arm"], wait_until_ready=True + ) + sequential_right_s = _plan_and_wait(client, "right_arm", targets["right_arm"]) + sequential_total_s = sequential_left_s + sequential_right_s + + assert client.reset().startswith("Reset") + + concurrent_total_s = _plan_concurrently(client, targets) + + for robot_name in _ROBOTS: + status = client.get_planning_status(robot_name) + _assert_successful_plan_status(status, robot_name) + assert client.has_planned_path(robot_name) is True + + # Very fast mock plans are dominated by process/RPC jitter. Enforce a + # ratio only once the sequential baseline is long enough to be meaningful. + if sequential_total_s >= 1.0: + assert concurrent_total_s < sequential_total_s * 0.85, ( + f"expected bimanual planning overlap: sequential={sequential_total_s:.2f}s, " + f"concurrent={concurrent_total_s:.2f}s" + ) + + _execute_concurrently(client) + finally: + client.stop_rpc_client() From 4f5851460eff1bb164ec3e9e42272fac154bf0e7 Mon Sep 17 00:00:00 2001 From: parav nagarsheth Date: Tue, 12 May 2026 19:29:39 +0000 Subject: [PATCH 07/15] fixes --- .../test_openarm_bimanual_planning.py | 36 +++++++++++++-- dimos/manipulation/manipulation_module.py | 31 ++++++++++--- .../manipulation/test_manipulation_module.py | 46 +++++++++++++++++++ 3 files changed, 103 insertions(+), 10 deletions(-) diff --git a/dimos/e2e_tests/test_openarm_bimanual_planning.py b/dimos/e2e_tests/test_openarm_bimanual_planning.py index d112de71d5..875d75bb0a 100644 --- a/dimos/e2e_tests/test_openarm_bimanual_planning.py +++ b/dimos/e2e_tests/test_openarm_bimanual_planning.py @@ -26,11 +26,15 @@ from dimos.core.rpc_client import RPCClient from dimos.manipulation.manipulation_module import ManipulationModule from dimos.msgs.sensor_msgs.JointState import JointState +from dimos.utils.logging_config import setup_logger _ROBOTS = ("left_arm", "right_arm") _TARGET = [0.05] * 7 _JOINT_STATE_TOPIC = "/coordinator/joint_state#sensor_msgs.JointState" _PLANNING_TIMEOUT_S = 30.0 +_PREVIEW_DURATION_S = 0.5 + +logger = setup_logger() def _wait_until( @@ -111,20 +115,34 @@ def submit(robot_name: str) -> bool: return time.monotonic() - start -def _execute_concurrently(client: RPCClient) -> None: +def _preview_concurrently(client: RPCClient) -> float: + def preview(robot_name: str) -> bool: + return bool(client.preview_path(_PREVIEW_DURATION_S, robot_name)) + + start = time.monotonic() + with ThreadPoolExecutor(max_workers=len(_ROBOTS)) as pool: + futures = {robot_name: pool.submit(preview, robot_name) for robot_name in _ROBOTS} + for robot_name, future in futures.items(): + assert future.result(timeout=5.0) is True, f"{robot_name} preview failed" + return time.monotonic() - start + + +def _execute_concurrently(client: RPCClient) -> float: def execute(robot_name: str) -> bool: return bool(client.execute(robot_name)) + start = time.monotonic() with ThreadPoolExecutor(max_workers=len(_ROBOTS)) as pool: futures = {robot_name: pool.submit(execute, robot_name) for robot_name in _ROBOTS} for robot_name, future in futures.items(): assert future.result(timeout=5.0) is True, f"{robot_name} execute was not accepted" + return time.monotonic() - start @pytest.mark.skipif_in_ci @pytest.mark.slow def test_openarm_mock_bimanual_planning_overlap(lcm_spy, start_blueprint) -> None: - """OpenArm mock should plan both arms concurrently and execute both trajectories.""" + """OpenArm mock should overlap bimanual plan, preview, and execute acceptance.""" lcm_spy.save_topic(_JOINT_STATE_TOPIC) start_blueprint("openarm-mock-planner-coordinator") lcm_spy.wait_for_saved_topic(_JOINT_STATE_TOPIC) @@ -163,6 +181,18 @@ def test_openarm_mock_bimanual_planning_overlap(lcm_spy, start_blueprint) -> Non f"concurrent={concurrent_total_s:.2f}s" ) - _execute_concurrently(client) + preview_total_s = _preview_concurrently(client) + execute_accept_s = _execute_concurrently(client) + logger.info( + "OpenArm bimanual timing: sequential_plan=%.2fs " + "(left=%.2fs right=%.2fs), concurrent_plan=%.2fs, " + "concurrent_preview=%.2fs, concurrent_execute_accept=%.2fs", + sequential_total_s, + sequential_left_s, + sequential_right_s, + concurrent_total_s, + preview_total_s, + execute_accept_s, + ) finally: client.stop_rpc_client() diff --git a/dimos/manipulation/manipulation_module.py b/dimos/manipulation/manipulation_module.py index cba0213e08..bcf6267acc 100644 --- a/dimos/manipulation/manipulation_module.py +++ b/dimos/manipulation/manipulation_module.py @@ -363,16 +363,10 @@ def _tf_publish_loop(self) -> None: def get_state(self) -> str: """Get aggregate manipulation state derived from per-robot job records. - may need to be revisited - Priority: FAULT > PLANNING > EXECUTING > COMPLETED > IDLE. """ with self._lock: - if self._state == ManipulationState.FAULT: - return ManipulationState.FAULT.name - # FAULT reads _last_op_success (cleared by reset) rather than the - # historical _planning_jobs records, so reset truly clears the fault. - if any(s is False for s in self._last_op_success.values()): + if self._has_fault_locked(): return ManipulationState.FAULT.name if any( not j.invalidated and (j.future is None or not j.future.done()) @@ -385,6 +379,13 @@ def get_state(self) -> str: return ManipulationState.COMPLETED.name return ManipulationState.IDLE.name + def _has_fault_locked(self) -> bool: + """Return whether the module is faulted. Caller must hold self._lock.""" + if self._state == ManipulationState.FAULT: + return True + # Failed jobs are kept as FAULT until reset() clears _last_op_success. + return any(s is False for s in self._last_op_success.values()) + @rpc def get_error(self) -> str: """Get last error message. @@ -499,6 +500,9 @@ def _begin_planning( return None name, robot_id, _, _ = robot with self._lock: + if self._has_fault_locked(): + logger.warning("Cannot plan: module is in FAULT, call reset() first") + return None existing = self._planning_jobs.get(name) if ( existing is not None @@ -871,6 +875,16 @@ def clear_planned_path(self, robot_name: RobotName | None = None) -> bool: return False robot_name, _, _, _ = robot with self._lock: + job = self._planning_jobs.get(robot_name) + if ( + job is not None + and not job.invalidated + and (job.future is None or not job.future.done()) + ): + logger.warning( + f"Cannot clear planned path: planning still active for '{robot_name}'" + ) + return False self._planned_paths.pop(robot_name, None) self._planned_trajectories.pop(robot_name, None) return True @@ -1034,6 +1048,9 @@ def execute(self, robot_name: RobotName | None = None) -> bool: return False with self._lock: + if self._has_fault_locked(): + logger.warning("Cannot execute: module is in FAULT, call reset() first") + return False job = self._planning_jobs.get(robot_name) if ( job is not None diff --git a/dimos/manipulation/test_manipulation_module.py b/dimos/manipulation/test_manipulation_module.py index eea6ce8b24..2042866dd9 100644 --- a/dimos/manipulation/test_manipulation_module.py +++ b/dimos/manipulation/test_manipulation_module.py @@ -340,6 +340,35 @@ def test_failed_planning_sets_fault_and_records_error(self, async_module): assert async_module.get_state() == ManipulationState.FAULT.name assert "Planning failed" in async_module.get_error() + def test_fault_rejects_new_plans_until_reset(self, async_module): + failed = _PlanControl(succeed=False) + after_reset = _PlanControl() + async_module._planner = _GatedPlanner([failed, after_reset]) + + assert async_module.plan_to_joints( + JointState(name=["j1", "j2"], position=[0.1, 0.1]), "left_arm" + ) + assert failed.started.wait(timeout=0.5) + failed.release.set() + assert async_module.wait_for_planning_completion("left_arm", timeout=1.0) + assert async_module.get_state() == ManipulationState.FAULT.name + + assert ( + async_module.plan_to_joints( + JointState(name=["j1", "j2"], position=[0.2, 0.2]), "right_arm" + ) + is False + ) + + assert async_module.reset().startswith("Reset") + assert async_module.plan_to_joints( + JointState(name=["j1", "j2"], position=[0.2, 0.2]), "right_arm" + ) + assert after_reset.started.wait(timeout=0.5) + after_reset.release.set() + assert async_module.wait_for_planning_completion("right_arm", timeout=1.0) + assert async_module.get_state() == ManipulationState.COMPLETED.name + def test_late_invalidated_job_does_not_overwrite_newer_plan(self, async_module): first = _PlanControl() second = _PlanControl() @@ -402,6 +431,23 @@ def test_preview_and_execute_reject_only_robot_with_active_plan(self, async_modu left_active.release.set() assert async_module.wait_for_planning_completion("left_arm", timeout=1.0) + def test_clear_planned_path_rejects_active_plan(self, async_module): + control = _PlanControl() + async_module._planner = _GatedPlanner([control]) + + assert async_module.plan_to_joints( + JointState(name=["j1", "j2"], position=[0.1, 0.1]), "left_arm" + ) + assert control.started.wait(timeout=0.5) + + assert async_module.clear_planned_path("left_arm") is False + + control.release.set() + assert async_module.wait_for_planning_completion("left_arm", timeout=1.0) + assert async_module.has_planned_path("left_arm") is True + assert async_module.clear_planned_path("left_arm") is True + assert async_module.has_planned_path("left_arm") is False + @pytest.mark.skipif(not _drake_available(), reason="Drake not installed") @pytest.mark.skipif(not _xarm_urdf_available(), reason="XArm URDF not available") From 06745afd349e01c96ed01202d7f0f483afe6e56d Mon Sep 17 00:00:00 2001 From: parav nagarsheth Date: Wed, 13 May 2026 09:12:50 +0000 Subject: [PATCH 08/15] update manipulation rpc client --- .../planning/examples/manipulation_client.py | 56 +++++++++++++++---- 1 file changed, 44 insertions(+), 12 deletions(-) diff --git a/dimos/manipulation/planning/examples/manipulation_client.py b/dimos/manipulation/planning/examples/manipulation_client.py index 4dda1cb305..204b78d3b5 100644 --- a/dimos/manipulation/planning/examples/manipulation_client.py +++ b/dimos/manipulation/planning/examples/manipulation_client.py @@ -26,11 +26,13 @@ joints() Get current joint positions ee() Get end-effector pose state() Get module state (IDLE, PLANNING, EXECUTING, ...) - plan(joints) Plan to joint configuration, e.g. plan([0.1]*7) - plan_pose(x,y,z) Plan to Cartesian pose - preview(duration) Preview planned path in Meshcat + plan(joints) Submit plan to joint configuration (returns True if accepted) + plan_pose(x,y,z) Submit plan to Cartesian pose (returns True if accepted) + wait_plan(timeout) Block until planning job(s) finish; returns True on success + plan_status(robot) Get per-robot planning job status dict + preview(duration) Preview planned path in Meshcat (must wait_plan first) execute() Execute planned trajectory via coordinator - home() Move to home position + home() Plan and execute move to home position url() Get Meshcat visualization URL robots() List configured robots info(robot) Get robot config details @@ -72,7 +74,12 @@ def state() -> str: def plan(target_joints: list[float], robot_name: str | None = None) -> bool: - """Plan to joint configuration. e.g. plan([0.1]*7)""" + """Submit a joint-space plan. Returns True if accepted (not when finished). + + Planning is asynchronous: True means the request was queued. Call + wait_plan() before preview() / execute(). + e.g. plan([0.1]*7); wait_plan(); preview(); execute() + """ from dimos.msgs.sensor_msgs.JointState import JointState js = JointState(position=target_joints) @@ -88,7 +95,12 @@ def plan_pose( yaw: float | None = None, robot_name: str | None = None, ) -> bool: - """Plan to Cartesian pose. Preserves current orientation if rpy not given.""" + """Submit a Cartesian plan. Returns True if accepted (not when finished). + + Planning is asynchronous: True means the request was queued. Call + wait_plan() before preview() / execute(). Preserves current orientation + if rpy not given. + """ if roll is not None or pitch is not None or yaw is not None: orientation = Quaternion.from_euler(Vector3(x=roll or 0, y=pitch or 0, z=yaw or 0)) else: @@ -99,6 +111,25 @@ def plan_pose( return _client.plan_to_pose(target, robot_name) +def wait_plan(timeout: float = 30.0, robot_name: str | None = None) -> bool: + """Block until planning finishes; return True if it succeeded. + + With robot_name=None, waits on every active planning job. Returns False + if the wait timed out or the job ended in failure. + """ + if not _client.wait_for_planning_completion(robot_name, timeout): + return False + status = _client.get_planning_status(robot_name) + if robot_name is None: + return all(s.get("success") is True for s in status.values()) if status else True + return bool(status.get("success")) + + +def plan_status(robot_name: str | None = None) -> dict[str, Any]: + """Get planning job status. Per-robot dict, or map keyed by robot name.""" + return _client.get_planning_status(robot_name) + + def preview(duration: float = 3.0, robot_name: str | None = None) -> bool: """Preview planned path in Meshcat.""" return _client.preview_path(duration, robot_name) @@ -109,15 +140,16 @@ def execute(robot_name: str | None = None) -> bool: return _client.execute(robot_name) -def home(robot_name: str | None = None) -> bool: +def home(robot_name: str | None = None, timeout: float = 30.0) -> bool: """Plan and execute move to home position.""" from dimos.msgs.sensor_msgs.JointState import JointState home_joints = _client.get_robot_info(robot_name).get("home_joints", [0.0] * 7) - success = _client.plan_to_joints(JointState(position=home_joints), robot_name) - if success: - return _client.execute(robot_name) - return False + if not _client.plan_to_joints(JointState(position=home_joints), robot_name): + return False + if not wait_plan(timeout, robot_name): + return False + return _client.execute(robot_name) def url() -> str | None: @@ -189,4 +221,4 @@ def stop() -> None: if __name__ == "__main__": print("Manipulation RPC client ready.") print("Type commands() for available functions.") - print("Try: joints(), plan([0.1]*7), preview(), execute()") + print("Try: joints(), plan([0.1]*7), wait_plan(), preview(), execute()") From 60e7e0796e70caeef7f79b966ff950ccb07a0aa2 Mon Sep 17 00:00:00 2001 From: parav nagarsheth Date: Wed, 13 May 2026 09:17:15 +0000 Subject: [PATCH 09/15] update openarm integration and planning readme --- dimos/manipulation/planning/README.md | 6 ++- .../manipulation/openarm_integration.md | 37 ++++++++++++++----- 2 files changed, 32 insertions(+), 11 deletions(-) diff --git a/dimos/manipulation/planning/README.md b/dimos/manipulation/planning/README.md index e7d34d29b2..12a01662d0 100644 --- a/dimos/manipulation/planning/README.md +++ b/dimos/manipulation/planning/README.md @@ -20,7 +20,8 @@ In the interactive client: ```python commands() # List available commands joints() # Get current joint positions -plan([0.1] * 7) # Plan to target +plan([0.1] * 7) # Submit a plan (returns True if accepted; planning is async) +wait_plan() # Block until planning finishes; True iff it succeeded preview() # Preview in Meshcat (url() for link) execute() # Execute via coordinator ``` @@ -82,7 +83,10 @@ module = ManipulationModule( kinematics_name="drake_optimization", # Or "jacobian" ) module.start() +# plan_to_joints / plan_to_pose return True on accept — planning runs on a +# per-robot worker thread. Wait for completion before previewing/executing. module.plan_to_joints([0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7]) +module.wait_for_planning_completion(timeout=30.0) module.execute() # Sends to coordinator ``` diff --git a/docs/capabilities/manipulation/openarm_integration.md b/docs/capabilities/manipulation/openarm_integration.md index 6869864f5a..4f3f986364 100644 --- a/docs/capabilities/manipulation/openarm_integration.md +++ b/docs/capabilities/manipulation/openarm_integration.md @@ -147,13 +147,17 @@ This gives you an interactive Python prompt with these functions: | `joints(robot_name)` | Read current joint positions (7 floats) | | `ee(robot_name)` | Read current end-effector pose | | `state()` | Module state: `IDLE`, `PLANNING`, `EXECUTING`, `FAULT`, etc. | -| `plan([q1..q7], robot_name)` | Plan a collision-free trajectory to a joint configuration | -| `plan_pose(x, y, z, robot_name=...)` | Plan to a Cartesian EE pose (preserves current orientation) | +| `plan([q1..q7], robot_name)` | Submit a joint plan — returns `True` if accepted (planning is async) | +| `plan_pose(x, y, z, robot_name=...)` | Submit a Cartesian EE plan (preserves current orientation if rpy not given) | +| `wait_plan(timeout, robot_name)` | Block until planning finishes; returns `True` on success | +| `plan_status(robot_name)` | Per-robot job status dict (`done`, `success`, `error`, `duration_s`, ...) | | `preview(robot_name)` | Animate the planned path in Meshcat without executing | | `execute(robot_name)` | Send the planned trajectory to the coordinator | | `home(robot_name)` | Plan + execute to home joints | | `commands()` | Print all available functions | +Planning is **asynchronous**: `plan()` / `plan_pose()` return `True` as soon as the request is accepted, not when planning finishes. Each robot has its own planner thread, so two arms can plan in parallel. Call `wait_plan()` before `preview()` / `execute()` — they will reject while planning is still in flight. + #### Example session — simple joint moves ```python skip @@ -163,15 +167,19 @@ This gives you an interactive Python prompt with these functions: >>> joints(robot_name="left_arm") [0.02, -0.01, -0.13, 0.15, 0.17, -0.07, 0.10] ->>> # One-liner: plan → preview in Meshcat → execute on hardware ->>> plan([0.3, 0, 0, 0, 0, 0, 0], robot_name="left_arm") and preview(robot_name="left_arm") and execute(robot_name="left_arm") +>>> # plan → wait for the planner → preview in Meshcat → execute on hardware +>>> plan([0.3, 0, 0, 0, 0, 0, 0], robot_name="left_arm") # accepts the request (async) +True +>>> wait_plan(robot_name="left_arm") # blocks until planning finishes +True +>>> preview(robot_name="left_arm") and execute(robot_name="left_arm") True >>> joints(robot_name="left_arm") [0.30, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00] # arm is now at the commanded pose ``` -`plan()` returns `True` on success, `False` if planning failed (check the coordinator terminal for `COLLISION_AT_GOAL`, `INVALID_START`, `NO_SOLUTION`, etc). The `and` chaining is an idiom — if any step fails, the next one is short-circuited. +`plan()` returns `True` once the request is **accepted**, not once planning finishes. `wait_plan()` returns `True` only if the plan succeeded — on failure, check `plan_status(robot_name)["error"]` or the coordinator terminal for `COLLISION_AT_GOAL`, `INVALID_START`, `NO_SOLUTION`, etc. `preview()` / `execute()` will reject while a plan is still in flight, so always `wait_plan()` first. If you ever get stuck in a `FAULT` state (e.g. an invalid plan was sent), reset the state machine: @@ -183,20 +191,26 @@ If you ever get stuck in a `FAULT` state (e.g. an invalid plan was sent), reset #### Example session — bimanual ```python skip ->>> # Move both arms to mirrored poses ->>> plan([0.5, 0, 0, 0, 0, 0, 0], robot_name="left_arm") and execute(robot_name="left_arm") +>>> # Plan both arms concurrently, then execute once both are ready +>>> plan([0.5, 0, 0, 0, 0, 0, 0], robot_name="left_arm") # accepted immediately +True +>>> plan([-0.5, 0, 0, 0, 0, 0, 0], robot_name="right_arm") # planned in parallel with left_arm True ->>> plan([-0.5, 0, 0, 0, 0, 0, 0], robot_name="right_arm") and execute(robot_name="right_arm") +>>> wait_plan() # waits on every active plan +True +>>> execute(robot_name="left_arm") and execute(robot_name="right_arm") True ``` -Each arm plans and executes independently — the coordinator runs both trajectories simultaneously on separate tick-loop tasks. +Each arm has its own planner thread and its own coordinator task, so the two plans run in parallel and the two trajectories execute simultaneously. #### Example session — Cartesian target ```python skip >>> ee(robot_name="left_arm") # see where the EE currently is ->>> plan_pose(0.1, 0.3, 0.5, robot_name="left_arm") and preview(robot_name="left_arm") +>>> plan_pose(0.1, 0.3, 0.5, robot_name="left_arm") +True +>>> wait_plan(robot_name="left_arm") and preview(robot_name="left_arm") True >>> execute(robot_name="left_arm") True @@ -210,6 +224,9 @@ If you don't know which Cartesian targets are reachable, check first with the wo >>> add_box("table", 0.4, 0.0, 0.1, w=0.6, h=0.4, d=0.05) # rectangular obstacle >>> add_sphere("ball", 0.3, 0.2, 0.4, radius=0.05) >>> plan_pose(0.4, 0.0, 0.3, robot_name="left_arm") # now plans around it +True +>>> wait_plan(robot_name="left_arm") # check the plan succeeded +True >>> remove("table") # id returned by add_* ``` From 72bc0a6e24ae2c26e747fff18f6f77b58c10bfd4 Mon Sep 17 00:00:00 2001 From: parav nagarsheth Date: Wed, 13 May 2026 19:21:44 +0000 Subject: [PATCH 10/15] fix tests --- .../manipulation/test_manipulation_module.py | 4 +- dimos/manipulation/test_manipulation_unit.py | 49 ++++++++++++------- 2 files changed, 33 insertions(+), 20 deletions(-) diff --git a/dimos/manipulation/test_manipulation_module.py b/dimos/manipulation/test_manipulation_module.py index 37d95228b8..1a38fdb90f 100644 --- a/dimos/manipulation/test_manipulation_module.py +++ b/dimos/manipulation/test_manipulation_module.py @@ -47,8 +47,6 @@ from dimos.msgs.sensor_msgs.JointState import JointState from dimos.utils.data import get_data -pytestmark = pytest.mark.self_hosted - def _drake_available() -> bool: return importlib.util.find_spec("pydrake") is not None @@ -453,6 +451,7 @@ def test_clear_planned_path_rejects_active_plan(self, async_module): @pytest.mark.skipif(not _drake_available(), reason="Drake not installed") @pytest.mark.skipif(not _xarm_urdf_available(), reason="XArm URDF not available") +@pytest.mark.self_hosted class TestManipulationModuleIntegration: """Integration tests for ManipulationModule with real Drake backend.""" @@ -554,6 +553,7 @@ def test_trajectory_name_translation(self, module, joint_state_zeros): @pytest.mark.skipif(not _drake_available(), reason="Drake not installed") @pytest.mark.skipif(not _xarm_urdf_available(), reason="XArm URDF not available") +@pytest.mark.self_hosted class TestCoordinatorIntegration: """Test coordinator integration with mocked RPC client.""" diff --git a/dimos/manipulation/test_manipulation_unit.py b/dimos/manipulation/test_manipulation_unit.py index 7f5552d116..cd85b8eb18 100644 --- a/dimos/manipulation/test_manipulation_unit.py +++ b/dimos/manipulation/test_manipulation_unit.py @@ -96,6 +96,11 @@ def _make_module(): module._robots = {} module._planned_paths = {} module._planned_trajectories = {} + module._planning_jobs = {} + module._executing = {} + module._last_op_success = {} + module._next_request_id = 0 + module._planning_pool = None module._world_monitor = None module._planner = None module._kinematics = None @@ -107,15 +112,14 @@ class TestStateMachine: """Test state transitions.""" def test_cancel_only_during_execution(self): - """Cancel only works in EXECUTING state.""" + """Cancel only works for robots in the execute accept window.""" module = _make_module() - module._state = ManipulationState.IDLE assert module.cancel() is False - module._state = ManipulationState.EXECUTING + module._executing["test_arm"] = True assert module.cancel() is True - assert module._state == ManipulationState.IDLE + assert module._executing["test_arm"] is False def test_reset_not_during_execution(self): """Reset works in any state except EXECUTING.""" @@ -128,7 +132,7 @@ def test_reset_not_during_execution(self): assert module._state == ManipulationState.IDLE assert module._error_message == "" - module._state = ManipulationState.EXECUTING + module._executing["test_arm"] = True result = module.reset() assert "Error" in result @@ -142,23 +146,31 @@ def test_fail_sets_fault_state(self): assert module._state == ManipulationState.FAULT assert module._error_message == "Test error" - def test_begin_planning_state_checks(self, robot_config): - """_begin_planning only allowed from IDLE or COMPLETED.""" + def test_begin_planning_registers_per_robot_job(self, robot_config): + """_begin_planning registers a per-robot async planning job.""" module = _make_module() module._world_monitor = MagicMock() + module._planner = MagicMock() + module._planning_pool = MagicMock() module._robots = {"test_arm": ("robot_id", robot_config, MagicMock())} - # From IDLE - OK - module._state = ManipulationState.IDLE - assert module._begin_planning() == ("test_arm", "robot_id") - assert module._state == ManipulationState.PLANNING + assert module._begin_planning() == ("test_arm", "robot_id", 0) + assert module.get_state() == ManipulationState.PLANNING.name + assert module._planning_jobs["test_arm"].request_id == 0 + + def test_begin_planning_rejects_fault_and_duplicate_active_job(self, robot_config): + """_begin_planning rejects faulted modules and duplicate jobs per robot.""" + module = _make_module() + module._world_monitor = MagicMock() + module._planner = MagicMock() + module._planning_pool = MagicMock() + module._robots = {"test_arm": ("robot_id", robot_config, MagicMock())} - # From COMPLETED - OK - module._state = ManipulationState.COMPLETED - assert module._begin_planning() == ("test_arm", "robot_id") + assert module._begin_planning() == ("test_arm", "robot_id", 0) + assert module._begin_planning() is None - # From EXECUTING - Fail - module._state = ManipulationState.EXECUTING + module.reset() + module._state = ManipulationState.FAULT assert module._begin_planning() is None @@ -220,6 +232,7 @@ def test_execute_requires_trajectory(self, robot_config): module = _make_module() module._robots = {"test_arm": ("id", robot_config, MagicMock())} module._planned_trajectories = {} + module._coordinator_client = MagicMock() assert module.execute() is False @@ -249,7 +262,7 @@ def test_execute_success(self, robot_config, simple_trajectory): module._coordinator_client = mock_client assert module.execute() is True - assert module._state == ManipulationState.COMPLETED + assert module.get_state() == ManipulationState.COMPLETED.name mock_client.task_invoke.assert_called_once_with( "traj_arm", "execute", {"trajectory": simple_trajectory} ) @@ -265,7 +278,7 @@ def test_execute_rejected(self, robot_config, simple_trajectory): module._coordinator_client = mock_client assert module.execute() is False - assert module._state == ManipulationState.FAULT + assert module.get_state() == ManipulationState.FAULT.name class TestRobotModelConfigMapping: From 57323a506e20898f9908999477db1e8abf8119ec Mon Sep 17 00:00:00 2001 From: parav nagarsheth Date: Wed, 13 May 2026 19:29:17 +0000 Subject: [PATCH 11/15] fix wait_plan to work for default robot (none) --- dimos/manipulation/manipulation_module.py | 11 ++++++++--- dimos/manipulation/test_manipulation_module.py | 15 +++++++++++++++ 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/dimos/manipulation/manipulation_module.py b/dimos/manipulation/manipulation_module.py index bcf6267acc..70474c1d9b 100644 --- a/dimos/manipulation/manipulation_module.py +++ b/dimos/manipulation/manipulation_module.py @@ -1318,11 +1318,16 @@ def _wait_plan(self, robot_name: RobotName | None = None) -> str | None: should use this directly; sites that always bail on failure can use the bundled _preview_execute_wait(). """ + robot = self._get_robot(robot_name) + if robot is None: + return "Error: Robot not found" + rname, _, _, _ = robot + plan_timeout = self.config.planning_timeout + 5.0 - if not self.wait_for_planning_completion(robot_name, plan_timeout): + if not self.wait_for_planning_completion(rname, plan_timeout): return f"Error: Planning timed out after {plan_timeout:.1f}s" - status = self.get_planning_status(robot_name) - if not status.get("success"): + status = self.get_planning_status(rname) + if status.get("success") is not True: err = status.get("error") or "unknown reason" return f"Error: Planning failed — {err}" return None diff --git a/dimos/manipulation/test_manipulation_module.py b/dimos/manipulation/test_manipulation_module.py index 1a38fdb90f..446f681e30 100644 --- a/dimos/manipulation/test_manipulation_module.py +++ b/dimos/manipulation/test_manipulation_module.py @@ -261,6 +261,21 @@ def test_plan_accepts_quickly_and_path_appears_after_completion(self, async_modu assert async_module.get_state() == ManipulationState.COMPLETED.name assert async_module.has_planned_path("left_arm") is True + def test_wait_plan_resolves_single_robot_default(self, async_module): + control = _PlanControl() + async_module._planner = _GatedPlanner([control]) + async_module._robots = {"left_arm": async_module._robots["left_arm"]} + + assert async_module.plan_to_joints( + JointState(name=["j1", "j2"], position=[0.1, 0.1]) + ) + assert control.started.wait(timeout=0.5) + control.release.set() + + assert async_module._wait_plan() is None + status = async_module.get_planning_status("left_arm") + assert status["success"] is True + def test_distinct_robots_can_have_active_plans_simultaneously(self, async_module): release = threading.Event() left = _PlanControl(release=release) From 7106313fcc66e83fef6ea7bfc46701e06c20c1d0 Mon Sep 17 00:00:00 2001 From: parav nagarsheth Date: Wed, 13 May 2026 19:40:16 +0000 Subject: [PATCH 12/15] fix narrow race condition --- dimos/manipulation/manipulation_module.py | 56 +++++++++---------- .../manipulation/test_manipulation_module.py | 29 ++++++++++ 2 files changed, 54 insertions(+), 31 deletions(-) diff --git a/dimos/manipulation/manipulation_module.py b/dimos/manipulation/manipulation_module.py index 70474c1d9b..d33ec2ef0b 100644 --- a/dimos/manipulation/manipulation_module.py +++ b/dimos/manipulation/manipulation_module.py @@ -368,10 +368,7 @@ def get_state(self) -> str: with self._lock: if self._has_fault_locked(): return ManipulationState.FAULT.name - if any( - not j.invalidated and (j.future is None or not j.future.done()) - for j in self._planning_jobs.values() - ): + if any(self._job_active_locked(j) for j in self._planning_jobs.values()): return ManipulationState.PLANNING.name if any(self._executing.values()): return ManipulationState.EXECUTING.name @@ -386,6 +383,23 @@ def _has_fault_locked(self) -> bool: # Failed jobs are kept as FAULT until reset() clears _last_op_success. return any(s is False for s in self._last_op_success.values()) + def _job_done_locked(self, job: PlanningJob) -> bool: + """Return whether a planning job has reached terminal state. + + Caller must hold self._lock. completed_at is authoritative because a + very fast worker can complete before plan_to_* records job.future. + """ + return job.completed_at is not None or ( + job.future is not None and job.future.done() + ) + + def _job_active_locked(self, job: PlanningJob | None) -> bool: + """Return whether a planning job should block plan/preview/execute. + + Caller must hold self._lock. + """ + return job is not None and not job.invalidated and not self._job_done_locked(job) + @rpc def get_error(self) -> str: """Get last error message. @@ -435,7 +449,7 @@ def reset(self) -> str: if any(self._executing.values()): return "Error: Cannot reset while executing — cancel the motion first" for job in self._planning_jobs.values(): - if job.future is None or not job.future.done(): + if not self._job_done_locked(job): job.invalidated = True self._planned_paths.clear() self._planned_trajectories.clear() @@ -504,11 +518,7 @@ def _begin_planning( logger.warning("Cannot plan: module is in FAULT, call reset() first") return None existing = self._planning_jobs.get(name) - if ( - existing is not None - and not existing.invalidated - and (existing.future is None or not existing.future.done()) - ): + if self._job_active_locked(existing): logger.warning(f"Cannot plan: '{name}' already has an active planning job") return None if self._executing.get(name, False): @@ -600,7 +610,7 @@ def _build_status(self, robot_name: RobotName, job: PlanningJob | None) -> dict[ "duration_s": None, "invalidated": False, } - done = job.future is not None and job.future.done() + done = self._job_done_locked(job) end_time = job.completed_at if job.completed_at is not None else time.time() return { "robot_name": robot_name, @@ -805,11 +815,7 @@ def preview_path(self, duration: float = 3.0, robot_name: RobotName | None = Non with self._lock: job = self._planning_jobs.get(robot_name) - if ( - job is not None - and not job.invalidated - and (job.future is None or not job.future.done()) - ): + if self._job_active_locked(job): logger.warning( f"Cannot preview: planning still active for '{robot_name}'" ) @@ -842,11 +848,7 @@ def has_planned_path(self, robot_name: RobotName | None = None) -> bool: with self._lock: job = self._planning_jobs.get(robot_name) - if ( - job is not None - and not job.invalidated - and (job.future is None or not job.future.done()) - ): + if self._job_active_locked(job): return False path = self._planned_paths.get(robot_name) return path is not None and len(path) > 0 @@ -876,11 +878,7 @@ def clear_planned_path(self, robot_name: RobotName | None = None) -> bool: robot_name, _, _, _ = robot with self._lock: job = self._planning_jobs.get(robot_name) - if ( - job is not None - and not job.invalidated - and (job.future is None or not job.future.done()) - ): + if self._job_active_locked(job): logger.warning( f"Cannot clear planned path: planning still active for '{robot_name}'" ) @@ -1052,11 +1050,7 @@ def execute(self, robot_name: RobotName | None = None) -> bool: logger.warning("Cannot execute: module is in FAULT, call reset() first") return False job = self._planning_jobs.get(robot_name) - if ( - job is not None - and not job.invalidated - and (job.future is None or not job.future.done()) - ): + if self._job_active_locked(job): logger.warning( f"Cannot execute: planning still active for '{robot_name}'" ) diff --git a/dimos/manipulation/test_manipulation_module.py b/dimos/manipulation/test_manipulation_module.py index 446f681e30..b7cf0dd2f6 100644 --- a/dimos/manipulation/test_manipulation_module.py +++ b/dimos/manipulation/test_manipulation_module.py @@ -33,6 +33,7 @@ from dimos.manipulation.manipulation_module import ( ManipulationModule, ManipulationState, + PlanningJob, ) from dimos.manipulation.planning.spec.config import RobotModelConfig from dimos.manipulation.planning.spec.enums import PlanningStatus @@ -276,6 +277,34 @@ def test_wait_plan_resolves_single_robot_default(self, async_module): status = async_module.get_planning_status("left_arm") assert status["success"] is True + def test_completed_job_with_missing_future_is_not_active(self, async_module): + control = _PlanControl() + async_module._planner = _GatedPlanner([control]) + + assert async_module.plan_to_joints( + JointState(name=["j1", "j2"], position=[0.1, 0.1]), "left_arm" + ) + assert control.started.wait(timeout=0.5) + control.release.set() + assert async_module.wait_for_planning_completion("left_arm", timeout=1.0) + + with async_module._lock: + job = async_module._planning_jobs["left_arm"] + async_module._planning_jobs["left_arm"] = PlanningJob( + request_id=job.request_id, + accepted_at=job.accepted_at, + future=None, + completed_at=job.completed_at, + success=job.success, + error=job.error, + ) + + status = async_module.get_planning_status("left_arm") + assert status["done"] is True + assert status["active"] is False + assert async_module.has_planned_path("left_arm") is True + assert async_module.preview_path(0.1, "left_arm") is True + def test_distinct_robots_can_have_active_plans_simultaneously(self, async_module): release = threading.Event() left = _PlanControl(release=release) From 08c43c22bf2c9b75237b9365cc812f26b8f2363a Mon Sep 17 00:00:00 2001 From: parav nagarsheth Date: Wed, 13 May 2026 19:42:51 +0000 Subject: [PATCH 13/15] clarify module-wide FAULT in documentation --- dimos/manipulation/manipulation_module.py | 4 ++++ docs/capabilities/manipulation/openarm_integration.md | 6 +++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/dimos/manipulation/manipulation_module.py b/dimos/manipulation/manipulation_module.py index d33ec2ef0b..3fbcce8e05 100644 --- a/dimos/manipulation/manipulation_module.py +++ b/dimos/manipulation/manipulation_module.py @@ -364,6 +364,8 @@ def get_state(self) -> str: """Get aggregate manipulation state derived from per-robot job records. Priority: FAULT > PLANNING > EXECUTING > COMPLETED > IDLE. + FAULT is module-wide: any failed plan or coordinator rejection blocks + new plans/executions for every robot until reset() clears the fault. """ with self._lock: if self._has_fault_locked(): @@ -444,6 +446,8 @@ def reset(self) -> str: In-flight planner threads keep running (Drake's RRT is not preemptable), but their late results are dropped at compare-and-store via the invalidated flag. Use cancel() to abort an active coordinator execution. + Also clears the module-wide FAULT state raised by any robot's planning + or execution failure. """ with self._lock: if any(self._executing.values()): diff --git a/docs/capabilities/manipulation/openarm_integration.md b/docs/capabilities/manipulation/openarm_integration.md index 4f3f986364..51f1beef5d 100644 --- a/docs/capabilities/manipulation/openarm_integration.md +++ b/docs/capabilities/manipulation/openarm_integration.md @@ -181,7 +181,9 @@ True `plan()` returns `True` once the request is **accepted**, not once planning finishes. `wait_plan()` returns `True` only if the plan succeeded — on failure, check `plan_status(robot_name)["error"]` or the coordinator terminal for `COLLISION_AT_GOAL`, `INVALID_START`, `NO_SOLUTION`, etc. `preview()` / `execute()` will reject while a plan is still in flight, so always `wait_plan()` first. -If you ever get stuck in a `FAULT` state (e.g. an invalid plan was sent), reset the state machine: +Planning and execution failures are treated as module-wide faults. In a bimanual sequence, if either arm fails to plan or the coordinator rejects a trajectory, `state()` reports `FAULT` and new plans for both arms are blocked until reset. This keeps the two-arm workflow from continuing from a partial plan. + +If you ever get stuck in a `FAULT` state, reset the state machine: ```python skip >>> _client.reset() @@ -204,6 +206,8 @@ True Each arm has its own planner thread and its own coordinator task, so the two plans run in parallel and the two trajectories execute simultaneously. +If `wait_plan()` returns `False` in a bimanual flow, inspect `plan_status()` before resetting; one arm may have succeeded while the other failed, but the module intentionally requires `reset()` before any further planning. + #### Example session — Cartesian target ```python skip From be130fa7ae2863d4e65fc673e0bee1e596611450 Mon Sep 17 00:00:00 2001 From: parav nagarsheth Date: Wed, 13 May 2026 19:51:51 +0000 Subject: [PATCH 14/15] linting and fix exception handling in execute() --- dimos/manipulation/manipulation_module.py | 78 ++++++++++--------- .../manipulation/test_manipulation_module.py | 4 +- dimos/manipulation/test_manipulation_unit.py | 30 +++++++ 3 files changed, 71 insertions(+), 41 deletions(-) diff --git a/dimos/manipulation/manipulation_module.py b/dimos/manipulation/manipulation_module.py index 3fbcce8e05..3b69f8c3d6 100644 --- a/dimos/manipulation/manipulation_module.py +++ b/dimos/manipulation/manipulation_module.py @@ -75,8 +75,7 @@ @dataclass class PlanningJob: - """In-flight or completed planning job for a single robot. - """ + """In-flight or completed planning job for a single robot.""" request_id: int accepted_at: float @@ -391,9 +390,7 @@ def _job_done_locked(self, job: PlanningJob) -> bool: Caller must hold self._lock. completed_at is authoritative because a very fast worker can complete before plan_to_* records job.future. """ - return job.completed_at is not None or ( - job.future is not None and job.future.done() - ) + return job.completed_at is not None or (job.future is not None and job.future.done()) def _job_active_locked(self, job: PlanningJob | None) -> bool: """Return whether a planning job should block plan/preview/execute. @@ -532,9 +529,7 @@ def _begin_planning( self._planned_trajectories.pop(name, None) request_id = self._next_request_id self._next_request_id += 1 - self._planning_jobs[name] = PlanningJob( - request_id=request_id, accepted_at=time.time() - ) + self._planning_jobs[name] = PlanningJob(request_id=request_id, accepted_at=time.time()) return name, robot_id, request_id def _fail(self, msg: str) -> bool: @@ -629,9 +624,7 @@ def _build_status(self, robot_name: RobotName, job: PlanningJob | None) -> dict[ } @rpc - def get_planning_status( - self, robot_name: RobotName | None = None - ) -> dict[str, Any]: + def get_planning_status(self, robot_name: RobotName | None = None) -> dict[str, Any]: """Return per-robot planning job status. With a robot_name, returns a single status dict for that robot. @@ -642,8 +635,7 @@ def get_planning_status( if robot_name is not None: return self._build_status(robot_name, self._planning_jobs.get(robot_name)) return { - name: self._build_status(name, job) - for name, job in self._planning_jobs.items() + name: self._build_status(name, job) for name, job in self._planning_jobs.items() } @rpc @@ -666,15 +658,9 @@ def wait_for_planning_completion( with self._lock: if robot_name is not None: job = self._planning_jobs.get(robot_name) - futures = ( - [job.future] - if job is not None and job.future is not None - else [] - ) + futures = [job.future] if job is not None and job.future is not None else [] else: - futures = [ - j.future for j in self._planning_jobs.values() if j.future is not None - ] + futures = [j.future for j in self._planning_jobs.values() if j.future is not None] if not futures: return True @@ -725,8 +711,12 @@ def _planning_worker( ) if not ik.is_success() or ik.joint_state is None: self._complete_job( - robot_name, request_id, False, - f"IK failed: {ik.status.name}", None, None, + robot_name, + request_id, + False, + f"IK failed: {ik.status.name}", + None, + None, ) return logger.info(f"IK solved for '{robot_name}', error: {ik.position_error:.4f}m") @@ -752,8 +742,12 @@ def _planning_worker( ) if not result.is_success(): self._complete_job( - robot_name, request_id, False, - f"Planning failed: {result.status.name}", None, None, + robot_name, + request_id, + False, + f"Planning failed: {result.status.name}", + None, + None, ) return @@ -820,9 +814,7 @@ def preview_path(self, duration: float = 3.0, robot_name: RobotName | None = Non with self._lock: job = self._planning_jobs.get(robot_name) if self._job_active_locked(job): - logger.warning( - f"Cannot preview: planning still active for '{robot_name}'" - ) + logger.warning(f"Cannot preview: planning still active for '{robot_name}'") return False planned_path = self._planned_paths.get(robot_name) if planned_path is None or len(planned_path) == 0: @@ -1055,9 +1047,7 @@ def execute(self, robot_name: RobotName | None = None) -> bool: return False job = self._planning_jobs.get(robot_name) if self._job_active_locked(job): - logger.warning( - f"Cannot execute: planning still active for '{robot_name}'" - ) + logger.warning(f"Cannot execute: planning still active for '{robot_name}'") return False traj = self._planned_trajectories.get(robot_name) if traj is None: @@ -1065,28 +1055,40 @@ def execute(self, robot_name: RobotName | None = None) -> bool: return False self._executing[robot_name] = True + execution_error: str | None = None try: translated = self._translate_trajectory_to_coordinator(traj, config) logger.info( f"Executing '{robot_name}': task='{config.coordinator_task_name}', " f"{len(translated.points)} pts, {translated.duration:.2f}s" ) - result = client.task_invoke( - config.coordinator_task_name, "execute", {"trajectory": translated} + result = bool( + client.task_invoke( + config.coordinator_task_name, "execute", {"trajectory": translated} + ) ) - finally: - with self._lock: - self._executing[robot_name] = False + except Exception as e: + logger.exception(f"Execution request failed for '{robot_name}'") + result = False + execution_error = f"Execution request failed for '{robot_name}': {e}" + # Update execution and fault state together so observers never see a + # completed state after the coordinator has rejected or crashed. with self._lock: + self._executing[robot_name] = False self._last_op_success[robot_name] = bool(result) if not result: self._state = ManipulationState.FAULT - self._error_message = f"Coordinator rejected trajectory for '{robot_name}'" + self._error_message = ( + execution_error or f"Coordinator rejected trajectory for '{robot_name}'" + ) if result: logger.info(f"Trajectory accepted for '{robot_name}'") return True - logger.warning(f"Coordinator rejected trajectory for '{robot_name}'") + if execution_error: + logger.warning(execution_error) + else: + logger.warning(f"Coordinator rejected trajectory for '{robot_name}'") return False @rpc diff --git a/dimos/manipulation/test_manipulation_module.py b/dimos/manipulation/test_manipulation_module.py index b7cf0dd2f6..fe706d1f7e 100644 --- a/dimos/manipulation/test_manipulation_module.py +++ b/dimos/manipulation/test_manipulation_module.py @@ -267,9 +267,7 @@ def test_wait_plan_resolves_single_robot_default(self, async_module): async_module._planner = _GatedPlanner([control]) async_module._robots = {"left_arm": async_module._robots["left_arm"]} - assert async_module.plan_to_joints( - JointState(name=["j1", "j2"], position=[0.1, 0.1]) - ) + assert async_module.plan_to_joints(JointState(name=["j1", "j2"], position=[0.1, 0.1])) assert control.started.wait(timeout=0.5) control.release.set() diff --git a/dimos/manipulation/test_manipulation_unit.py b/dimos/manipulation/test_manipulation_unit.py index cd85b8eb18..fe8767c762 100644 --- a/dimos/manipulation/test_manipulation_unit.py +++ b/dimos/manipulation/test_manipulation_unit.py @@ -280,6 +280,36 @@ def test_execute_rejected(self, robot_config, simple_trajectory): assert module.execute() is False assert module.get_state() == ManipulationState.FAULT.name + def test_execute_task_invoke_exception_sets_fault(self, robot_config, simple_trajectory): + """Coordinator exceptions are recorded as execution failures.""" + module = _make_module() + module._robots = {"test_arm": ("id", robot_config, MagicMock())} + module._planned_trajectories = {"test_arm": simple_trajectory} + + mock_client = MagicMock() + mock_client.task_invoke.side_effect = RuntimeError("coordinator crashed") + module._coordinator_client = mock_client + + assert module.execute() is False + assert module._executing["test_arm"] is False + assert module.get_state() == ManipulationState.FAULT.name + assert "coordinator crashed" in module.get_error() + + def test_execute_translation_exception_sets_fault(self, robot_config, simple_trajectory): + """Trajectory translation exceptions are recorded as execution failures.""" + module = _make_module() + module._robots = {"test_arm": ("id", robot_config, MagicMock())} + module._planned_trajectories = {"test_arm": simple_trajectory} + module._coordinator_client = MagicMock() + module._translate_trajectory_to_coordinator = MagicMock( + side_effect=RuntimeError("bad trajectory") + ) + + assert module.execute() is False + assert module._executing["test_arm"] is False + assert module.get_state() == ManipulationState.FAULT.name + assert "bad trajectory" in module.get_error() + class TestRobotModelConfigMapping: """Test RobotModelConfig joint name mapping helpers.""" From b23064d0d14dd219efdd0a491bd73a6a03802131 Mon Sep 17 00:00:00 2001 From: parav nagarsheth Date: Wed, 13 May 2026 20:18:59 +0000 Subject: [PATCH 15/15] allow clearing per robot failure for searching through candidates, like pick() --- dimos/manipulation/manipulation_module.py | 26 ++++++ dimos/manipulation/pick_and_place_module.py | 1 + dimos/manipulation/test_manipulation_unit.py | 86 ++++++++++++++++++++ 3 files changed, 113 insertions(+) diff --git a/dimos/manipulation/manipulation_module.py b/dimos/manipulation/manipulation_module.py index 3b69f8c3d6..6ba22c2d9e 100644 --- a/dimos/manipulation/manipulation_module.py +++ b/dimos/manipulation/manipulation_module.py @@ -532,6 +532,32 @@ def _begin_planning( self._planning_jobs[name] = PlanningJob(request_id=request_id, accepted_at=time.time()) return name, robot_id, request_id + def _clear_failed_plan_for_retry(self, robot_name: RobotName) -> bool: + """Clear one completed failed plan so an internal retry loop can continue. + + This is intentionally narrower than reset(): it only applies to a + terminal failed planning job for one robot, and it does not clear active + planning or execution. Use it for algorithms where a failed plan is an + expected candidate rejection, such as pick() trying the next grasp pose. + """ + with self._lock: + if self._executing.get(robot_name, False): + return False + job = self._planning_jobs.get(robot_name) + if job is None or self._job_active_locked(job) or job.success is not False: + return False + + self._planning_jobs.pop(robot_name, None) + self._planned_paths.pop(robot_name, None) + self._planned_trajectories.pop(robot_name, None) + self._last_op_success.pop(robot_name, None) + + has_remaining_planning_fault = any(s is False for s in self._last_op_success.values()) + if self._state != ManipulationState.FAULT and not has_remaining_planning_fault: + self._state = ManipulationState.IDLE + self._error_message = "" + return True + def _fail(self, msg: str) -> bool: """Set FAULT state with error message.""" logger.warning(msg) diff --git a/dimos/manipulation/pick_and_place_module.py b/dimos/manipulation/pick_and_place_module.py index 6a890549d2..8dbae70884 100644 --- a/dimos/manipulation/pick_and_place_module.py +++ b/dimos/manipulation/pick_and_place_module.py @@ -584,6 +584,7 @@ def pick( plan_err = self._wait_plan(rname) if plan_err: logger.info(f"Grasp candidate {i + 1} {plan_err}, trying next") + self._clear_failed_plan_for_retry(rname) continue # 3. Open gripper before approach diff --git a/dimos/manipulation/test_manipulation_unit.py b/dimos/manipulation/test_manipulation_unit.py index fe8767c762..331d39b3ef 100644 --- a/dimos/manipulation/test_manipulation_unit.py +++ b/dimos/manipulation/test_manipulation_unit.py @@ -25,8 +25,11 @@ from dimos.manipulation.manipulation_module import ( ManipulationModule, ManipulationState, + PlanningJob, ) +from dimos.manipulation.pick_and_place_module import PickAndPlaceModule from dimos.manipulation.planning.spec.config import RobotModelConfig +from dimos.msgs.geometry_msgs.Pose import Pose from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped from dimos.msgs.geometry_msgs.Quaternion import Quaternion from dimos.msgs.geometry_msgs.Vector3 import Vector3 @@ -173,6 +176,45 @@ def test_begin_planning_rejects_fault_and_duplicate_active_job(self, robot_confi module._state = ManipulationState.FAULT assert module._begin_planning() is None + def test_clear_failed_plan_for_retry_clears_only_completed_failed_plan(self): + """Internal retry clear removes one terminal failed plan.""" + module = _make_module() + module._planning_jobs["test_arm"] = PlanningJob( + request_id=1, + accepted_at=1.0, + completed_at=2.0, + success=False, + error="no IK", + ) + module._last_op_success["test_arm"] = False + module._planned_paths["test_arm"] = MagicMock() + module._planned_trajectories["test_arm"] = MagicMock() + module._error_message = "no IK" + + assert module.get_state() == ManipulationState.FAULT.name + assert module._clear_failed_plan_for_retry("test_arm") is True + assert module.get_state() == ManipulationState.IDLE.name + assert "test_arm" not in module._planning_jobs + assert "test_arm" not in module._last_op_success + assert module._error_message == "" + + def test_clear_failed_plan_for_retry_preserves_other_fault(self): + """Internal retry clear does not mask another robot's failed plan.""" + module = _make_module() + for name, success in {"left": False, "right": False}.items(): + module._planning_jobs[name] = PlanningJob( + request_id=1, + accepted_at=1.0, + completed_at=2.0, + success=success, + error=f"{name} failed", + ) + module._last_op_success[name] = success + + assert module._clear_failed_plan_for_retry("left") is True + assert module.get_state() == ManipulationState.FAULT.name + assert module._last_op_success == {"right": False} + class TestRobotSelection: """Test robot selection logic.""" @@ -203,6 +245,50 @@ def test_multiple_robots_require_name(self, robot_config): assert result[0] == "left" +class TestPickAndPlaceRetry: + """Test pick() grasp-candidate retry behavior.""" + + def test_pick_clears_failed_candidate_plan_before_retry(self, robot_config): + """A failed approach plan for one grasp candidate does not block the next.""" + with patch.object(PickAndPlaceModule, "__init__", lambda self: None): + module = PickAndPlaceModule.__new__(PickAndPlaceModule) + module._robots = {"test_arm": ("robot_id", robot_config, MagicMock())} + + grasp_poses = [ + Pose(Vector3(0.2, 0.0, 0.2), Quaternion()), + Pose(Vector3(0.25, 0.0, 0.2), Quaternion()), + ] + module._generate_grasps_for_pick = MagicMock(return_value=grasp_poses) + module._lift_if_low = MagicMock(return_value=None) + module._compute_pre_grasp_pose = MagicMock(side_effect=lambda pose, _offset: pose) + module._wait_plan = MagicMock(side_effect=["Error: Planning failed: no IK", None]) + module._set_gripper_position = MagicMock() + module._preview_execute_wait = MagicMock(return_value=None) + + events: list[str] = [] + + def plan_to_pose(_pose: Pose, _robot_name: str) -> bool: + events.append("plan") + return True + + def clear_failed_plan(_robot_name: str) -> bool: + events.append("clear") + return True + + module.plan_to_pose = MagicMock(side_effect=plan_to_pose) + module._clear_failed_plan_for_retry = MagicMock(side_effect=clear_failed_plan) + + with patch("dimos.manipulation.pick_and_place_module.time.sleep", return_value=None): + result = module.pick("cup", robot_name="test_arm") + + assert "Pick complete" in result + assert "'cup'" in result + assert module._wait_plan.call_count == 2 + assert module._clear_failed_plan_for_retry.call_count == 1 + assert events[:3] == ["plan", "clear", "plan"] + assert module.plan_to_pose.call_count == 4 + + class TestJointNameTranslation: """Test trajectory joint name translation for coordinator."""