Skip to content

#1986 concurrent planning#2072

Open
pgn-dev wants to merge 17 commits into
dimensionalOS:mainfrom
pgn-dev:feature/1986-concurrent-planning
Open

#1986 concurrent planning#2072
pgn-dev wants to merge 17 commits into
dimensionalOS:mainfrom
pgn-dev:feature/1986-concurrent-planning

Conversation

@pgn-dev
Copy link
Copy Markdown

@pgn-dev pgn-dev commented May 13, 2026

Problem

Details here - #1986

With multiple arms (e.g. OpenArm bimanual), every plan call blocks the whole module for 5–15s, so a bimanual sequence pays planning latency twice before either arm moves. ManipulationModule already keys _planned_paths /
_planned_trajectories by robot_name, but plan_to_* calls self._planner.plan_joint_path(...) inline, so a second plan can't start until the first returns.

This PR allows ManipulationModule to plan concurrently for an arbitrary number of arms, followed by overlapped preview and execute

Closes DIM-854

Solution

  • Async planning API. plan_to_pose / plan_to_joints now submit work to a per-module ThreadPoolExecutor (one worker per configured robot) and return immediately on accept. The Drake IK + RRT call moved into a new _planning_worker that runs lock-free against WorldMonitor (Drake scratch contexts handle isolation) and publishes its result via compare-and-store on request_id.

  • Per-robot job tracking. New PlanningJob dataclass. State is no longer a single module-wide enum.get_state() derives an aggregate (FAULT > PLANNING > EXECUTING > COMPLETED > IDLE) from _planning_jobs, _executing, and _last_op_success maps keyed by robot.

  • New RPCs on ManipulationModule:

    • wait_for_planning_completion(robot_name=None, timeout=None): blocks on one robot's future or all active ones.
    • get_planning_status(robot_name=None): returns per-robot dict (active, done, success, error, duration_s, invalidated, request_id) or map of all.
    • has_planned_path, clear_planned_path, cancel: now take robot_name and refuse to act on a robot with an in-flight job.
  • reset(), cancel(), execute() rewrites.

    • reset(): invalidates active jobs (Drake RRT isn't preemptable. Late results are dropped at compare-and-store), clears stored paths/trajectories and _last_op_success, refuses if any robot is executing.
    • cancel(robot_name=None): clears the execute accept window for one or all robots; no longer touches planning.
    • execute(): per-robot gating: refuses if planning is in flight or module is faulted, sets _executing[robot_name] only during the coordinator task_invoke accept window.- Updates to downstream consumers
  • Concurrency-safe WorldMonitor.dismiss_preview(robot_id): serialized under the monitor lock since the live world isn't thread-safe; the planner worker calls this before each new plan.

  • Downstream consumers updated. pick_and_place_module.py now waits for the async planner between grasp candidates via the new private _wait_plan helper; _preview_execute_wait waits internally so existing callers (move_to_pose, move_to_joints, home/init, lift) keep their pre-async semantics. Interactive client (manipulation_client.py) adds wait_plan() and plan_status(). README + OpenArm integration doc updated.

  • Tests.

    • New TestAsyncPlanningUnit (8 deterministic unit tests, gated fake planner): accept latency, concurrent distinct-robot plans, duplicate rejection, fault gating, reset-invalidates-late-results, preview/execute per-robot gating, clear_planned_path gating.
    • Updated Drake-backed integration tests to wait for completion before asserting state.
    • New test_openarm_bimanual_planning.py e2e: sequential-vs-concurrent timing assertion, plus overlapped preview and execute on the openarm-mock-planner-coordinator blueprint.

How to Test

# Unit tests
pytest dimos/manipulation/test_manipulation_module.py -v

# Bimanual e2e on the OpenArm mock blueprint
pytest dimos/e2e_tests/test_openarm_bimanual_planning.py -v

# Interactive
python -m dimos.manipulation.planning.examples.manipulation_client
>>> plan([0.1]*7, "left_arm"); plan([0.1]*7, "right_arm")
>>> wait_plan()             # waits on all active jobs
>>> preview(); execute()

Contributor License Agreement

  • I have read and approved the CLA.

@pgn-dev pgn-dev marked this pull request as ready for review May 13, 2026 18:16
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 13, 2026

Greptile Summary

This PR refactors ManipulationModule to support fully concurrent motion planning across multiple arms. Instead of blocking the module while Drake IK+RRT runs (5–15 s per arm), each plan_to_pose/plan_to_joints call now submits work to a per-robot ThreadPoolExecutor and returns immediately; results are committed via compare-and-store keyed on a request_id.

  • Async planning API: PlanningJob dataclass tracks per-robot state; wait_for_planning_completion, get_planning_status, and the reworked _wait_plan helper provide blocking and polling interfaces; reset() invalidates in-flight jobs via the invalidated flag without cancelling non-preemptable Drake threads.
  • Execution hardening: execute() now wraps the task_invoke call in a try/except so coordinator crashes correctly record _last_op_success[robot] = False and set FAULT; _clear_failed_plan_for_retry narrows fault-clearing to one robot's terminal failed plan so pick() can retry grasp candidates without a full reset().
  • WorldMonitor.dismiss_preview is serialized under the monitor lock so concurrent planning workers don't race on live-world visualization state.

Confidence Score: 4/5

Safe to merge with awareness that the module-wide FAULT policy — where any single arm's planning failure blocks all arms from new plans — remains in place; this is documented as intentional but affects bimanual workflows in the failure path.

The five previously-flagged bugs are substantively addressed: _wait_plan now resolves the robot name before calling get_planning_status so single-robot skills no longer always return failure; execute() wraps task_invoke in try/except so coordinator crashes properly land in FAULT; and pick() calls _clear_failed_plan_for_retry between grasp candidates. The refactor introduces a large amount of new concurrent state (PlanningJob maps, per-robot executing/last_op_success dicts, ThreadPoolExecutor) in the critical manipulation path, and concurrent Drake context access relies on the planner using scratch contexts internally — both reasonable designs, but difficult to verify exhaustively from code review alone.

dimos/manipulation/manipulation_module.py deserves the closest look — it carries most of the new concurrency logic, and the interactions between _planning_jobs, _last_op_success, _executing, and _state span several methods that must stay mutually consistent.

Important Files Changed

Filename Overview
dimos/manipulation/manipulation_module.py Core async refactor: per-robot PlanningJob tracking, ThreadPoolExecutor, compare-and-store completion, and rewritten cancel/reset/execute with proper exception handling; large scope warrants careful review
dimos/manipulation/pick_and_place_module.py Adds explicit _wait_plan + _clear_failed_plan_for_retry between grasp candidates, fixing the bug where the first planning failure blocked all subsequent candidates
dimos/manipulation/planning/monitor/world_monitor.py Adds dismiss_preview() correctly serialized under the WorldMonitor lock so concurrent planning workers don't race on live-world visualization state
dimos/manipulation/test_manipulation_unit.py New unit tests cover async job registration, fault gating, retry-clear semantics, and execution exception paths; all deterministic with mocked infrastructure
dimos/manipulation/test_manipulation_module.py New TestAsyncPlanningUnit with 8 deterministic tests using _GatedPlanner; covers concurrency, invalidation, per-robot preview/execute gating, and failure paths
dimos/e2e_tests/test_openarm_bimanual_planning.py End-to-end bimanual timing assertion and overlapped preview/execute tests; gated behind a mock blueprint so CI stays fast
dimos/manipulation/planning/examples/manipulation_client.py Interactive client updated with wait_plan() and plan_status() helpers; handles the None-robot-name case correctly at the client level

Sequence Diagram

sequenceDiagram
    participant Caller
    participant ManipModule
    participant Pool as ThreadPoolExecutor
    participant Worker as _planning_worker
    participant WorldMonitor
    participant Planner as Drake RRT/IK
    participant Coordinator

    Caller->>ManipModule: plan_to_pose(pose, "left_arm")
    ManipModule->>ManipModule: "_begin_planning() → PlanningJob(future=None)"
    ManipModule->>Pool: submit(_planning_worker, "left_arm", ...)
    Pool-->>ManipModule: future
    ManipModule->>ManipModule: "job.future = future (under lock)"
    ManipModule-->>Caller: True (accepted immediately)

    Caller->>ManipModule: plan_to_pose(pose, "right_arm")
    ManipModule->>ManipModule: "_begin_planning() → PlanningJob(future=None)"
    ManipModule->>Pool: submit(_planning_worker, "right_arm", ...)
    Pool-->>ManipModule: future
    ManipModule-->>Caller: True (both in flight concurrently)

    par left_arm planning
        Worker->>WorldMonitor: dismiss_preview(left_id) [locked]
        Worker->>WorldMonitor: get_current_joint_state(left_id)
        Worker->>Planner: plan_joint_path(world, left_id, ...)
        Planner-->>Worker: path
        Worker->>ManipModule: _complete_job("left_arm", req_id, True, path, traj)
    and right_arm planning
        Worker->>WorldMonitor: dismiss_preview(right_id) [locked]
        Worker->>WorldMonitor: get_current_joint_state(right_id)
        Worker->>Planner: plan_joint_path(world, right_id, ...)
        Planner-->>Worker: path
        Worker->>ManipModule: _complete_job("right_arm", req_id, True, path, traj)
    end

    Caller->>ManipModule: wait_for_planning_completion(None, timeout)
    ManipModule-->>Caller: True (both done)

    Caller->>ManipModule: execute("left_arm")
    ManipModule->>Coordinator: task_invoke("left_arm_task", "execute", traj)
    Coordinator-->>ManipModule: True
    ManipModule-->>Caller: True

    Caller->>ManipModule: execute("right_arm")
    ManipModule->>Coordinator: task_invoke("right_arm_task", "execute", traj)
    Coordinator-->>ManipModule: True
    ManipModule-->>Caller: True
Loading

Reviews (5): Last reviewed commit: "Merge branch 'main' into feature/1986-co..." | Re-trigger Greptile

Comment thread dimos/manipulation/manipulation_module.py
Comment thread dimos/manipulation/manipulation_module.py
Comment thread dimos/manipulation/manipulation_module.py
Comment thread dimos/manipulation/manipulation_module.py
Comment thread dimos/manipulation/pick_and_place_module.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant