Skip to content

Feature: async node#1620

Draft
nadavelkabets wants to merge 24 commits intoros2:rollingfrom
nadavelkabets:feature/async-node
Draft

Feature: async node#1620
nadavelkabets wants to merge 24 commits intoros2:rollingfrom
nadavelkabets:feature/async-node

Conversation

@nadavelkabets
Copy link
Contributor

@nadavelkabets nadavelkabets commented Mar 11, 2026

Summary

AsyncNode brings native asyncio support for rclpy, enabling async/await throughout subscription callbacks, service handlers, client calls, timers, and clock sleeps.

Changes

  • Base entity classes extracted: BasePublisher, BaseSubscription, BaseClient, BaseService, BaseTimer, BaseClock
  • BaseNode extracted from Node
  • New async entity classes: AsyncNode, AsyncPublisher, AsyncSubscription, AsyncService, AsyncClient, AsyncTimer, AsyncClock
  • ROSClock deprecated — matching rclcpp, ros_time_is_active / set_ros_time_override moved to regular Clock

Usage examples

run() — simple reactive node:

with rclpy.init():
    node = AsyncNode("my_node")
    node.create_subscription(String, "/topic", callback, 10)
    await node.run()

async with — composable, user-controlled lifetime:

with rclpy.init():
    async with AsyncNode("my_node") as node:
        node.create_subscription(String, "/topic", callback, 10)
        await some_other_async_loop()

client.call() — async service call, no futures or spinning:

async with AsyncNode("my_node") as node:
    client = node.create_client(SetBool, "/set_bool")
    response = await client.call(SetBool.Request(data=True))

clock.sleep() — sim-time-aware, cancels on node shutdown:

async with AsyncNode("my_node") as node:
    await node.get_clock().sleep(2.0)  # respects sim time

aiohttp — compose with any asyncio library in a callback:

async def handle_set(request, response):
    async with aiohttp.request("GET", "http://api.example.com/data") as resp:
        response.message = await resp.text()
    return response

async with AsyncNode("my_node") as node:
    node.create_service(SetBool, "/my_service", handle_set)
    await node.run()

serial — bridge ROS topics to a serial port:

reader, writer = await serial_asyncio.open_serial_connection(url="/dev/ttyUSB0")
async with AsyncNode("serial_bridge") as node:
    pub = node.create_publisher(Byte, "/serial/rx", 10)
    while data := await reader.read(1):
        pub.publish(Byte(data=data))

Design

Core mechanism

DDS pushes work onto the asyncio event loop instead of an executor polling a wait set:

DDS thread → on_new_message_callback → loop.call_soon_threadsafe(event.set) → asyncio executes

One reader task per entity waits on an asyncio Event, takes data, and runs the callback. The DDS callback only sets the event. This gives natural backpressure — the entity won't take another message until the current callback yields.

Structured concurrency

The design follows structured concurrency (Trio, asyncio TaskGroup). Every task has a clear owner, lifetimes are bounded by async with scopes, and no task outlives its parent.

The node's outer TaskGroup owns entity reader tasks. Subscriptions and services can run callbacks concurrently via a nested inner TaskGroup. When an entity is destroyed, the inner group cancels all in-flight callbacks before the entity handle is cleaned up — no orphaned callbacks. This is required for service correctness (a callback needs the live service handle to send its response), and subscriptions use the same pattern for consistency. Clients and timers don't need inner groups — clients only route responses to futures, timers dispatch sequentially.

Resource cleanup is deterministic: when the async with block exits, all reader tasks have finished, all DDS callbacks have been cleared, and no orphaned coroutines remain.

Two entry points

Both entry points share the same lifecycle:

  • async with is for composable use cases where the user controls the lifetime (bridges, tests, multi-protocol applications).
  • run() is for simple reactive nodes.

Class hierarchy

BaseNode
├── Node                    (executor-based, existing)
└── AsyncNode               (asyncio-based, new)

BasePublisher
├── Publisher               (executor: callback groups, executor event)
└── AsyncPublisher          (no task — fire-and-forget publish)

BaseSubscription
├── Subscription            (executor: callback groups, executor event)
└── AsyncSubscription       (owns reader task, optional concurrent dispatch)

BaseService
├── Service                 (executor: callback groups, executor event)
└── AsyncService            (owns reader task, optional concurrent dispatch)

BaseClient
├── Client                  (executor: callback groups, call_async → rclpy.Future)
└── AsyncClient             (owns reader task, call → asyncio.Future)

BaseTimer
├── Timer                   (executor: callback groups, executor event)
└── AsyncTimer              (owns timer loop, sequential dispatch)

BaseClock
├── Clock                   (blocking sleep)
└── AsyncClock              (async sleep)

Entity-owned architecture

Each async entity class owns its full DDS bridge: event creation, DDS callback registration, the take loop, callback dispatch, and cleanup. The node is a thin coordinator — it creates C handles via the base class, wraps them in async entity classes, and hands them a reference to its TaskGroup so they can spawn their own reader tasks.

BaseNode holds shared logic (parameters, clock, logger, name resolution, graph discovery) and calls factory methods polymorphically during init, so AsyncNode and Node each produce their own entity types without the base class knowing which subclass it's in.

Entities self-remove from the node's tracking set when destroyed, via a callback passed at construction. This keeps the node's bookkeeping consistent regardless of whether destruction is triggered by the node, by user code, or by the entity's own cleanup.

Shutdown

Shutdown is synchronous and uses task cancellation as the only mechanism. Destroying the node destroys all entities (cancelling their tasks and DDS handles), cancels pending clock sleeps, then marks the node handle for deferred destruction. Each entity's destroy is idempotent.

Clock sleep and timers

Clock sleep is the async replacement for blocking sleep. For wall time it schedules a delayed callback on the event loop, for sim time it registers a jump callback that resolves when simulated time advances past the target. If the time source changes during a sleep (ROS time activated or deactivated), the sleep raises an error since the target is no longer meaningful. All pending sleeps are cancelled on node shutdown.

Timers use the same dual-mode wait pattern. They support cancel (parks the loop until reset) and reset (wakes the parked loop). Timer callbacks are always dispatched sequentially — if a callback runs longer than the period, the next tick is delayed.

Known limitations

Actions and waitables are not yet supported. Waitable support requires a set_on_ready_callback API on the waitable interface, matching the approach used by rclcpp’s EventsExecutor, which is not yet available in rclpy.

Performance

CPU usage

I ran the test_rclpy_performance.py benchmark from the EventsExecutor PR adapted for the AsyncNode.
AsyncNode nearly matches EventsExecutor performance while significantly outperforming SingleThreadedExecutor.
Screenshot 2026-03-12 at 01 51 31
Running the test with uvloop.run instead of asyncio.run achieved an even lower cpu usage of 10%.

Timer latency and jitter

At 50 Hz (20 ms period), AsyncNode's mean jitter is slightly higher than the existing executors, at ~0.16 ms above SingleThreadedExecutor and ~0.38 ms above EventsExecutor.
I didn't invest much in trying to optimize this, honestly the difference is really small.
Screenshot 2026-03-12 at 17 43 11

Related work

Signed-off-by: Nadav Elkabets <elnadav12@gmail.com>
Signed-off-by: Nadav Elkabets <elnadav12@gmail.com>
Signed-off-by: Nadav Elkabets <elnadav12@gmail.com>
Signed-off-by: Nadav Elkabets <elnadav12@gmail.com>
…er to async iterator

Signed-off-by: Nadav Elkabets <elnadav12@gmail.com>
Signed-off-by: Nadav Elkabets <elnadav12@gmail.com>
Signed-off-by: Nadav Elkabets <elnadav12@gmail.com>
Signed-off-by: Nadav Elkabets <elnadav12@gmail.com>
Signed-off-by: Nadav Elkabets <elnadav12@gmail.com>
Signed-off-by: Nadav Elkabets <elnadav12@gmail.com>
Signed-off-by: Nadav Elkabets <elnadav12@gmail.com>
Signed-off-by: Nadav Elkabets <elnadav12@gmail.com>
Signed-off-by: Nadav Elkabets <elnadav12@gmail.com>
Signed-off-by: Nadav Elkabets <elnadav12@gmail.com>
Signed-off-by: Nadav Elkabets <elnadav12@gmail.com>
Signed-off-by: Nadav Elkabets <elnadav12@gmail.com>
Signed-off-by: Nadav Elkabets <elnadav12@gmail.com>
…de to BaseNode

Signed-off-by: Nadav Elkabets <elnadav12@gmail.com>
Signed-off-by: Nadav Elkabets <elnadav12@gmail.com>
Signed-off-by: Nadav Elkabets <elnadav12@gmail.com>
Signed-off-by: Nadav Elkabets <elnadav12@gmail.com>
@peci1
Copy link

peci1 commented Mar 11, 2026

This looks quite interesting! I haven't acquainted myself with async patterns too much yet, so I won't be of much value as a reviewer of the whole PR.

On thing has, however, caught my eye: always subscribing to clock.

This is something I wouldn't recommend. I sometimes deliberately don't set sim time on a node if I know it doesn't need it (e.g. a plain relay that just gets a message, computes something and outputs another message with the input timestamp). The reason why I do it is performance: if you have a 1000 Hz sim clock and 50 nodes... You should use ROS 1 :D So I definitely don't want to lose the ability to not subscribe clock.

Another thing: is it possible to specify service call timeouts with the async client?

@nadavelkabets
Copy link
Contributor Author

nadavelkabets commented Mar 11, 2026

1000 Hz sim clock

The existing SingleThreadedExecutor would probably fry your cpu if you published at 1000hz haha.
I reverted this change.

Another thing: is it possible to specify service call timeouts with the async client?

Yeah you could.
Notice that there are 2 separate mechanisms:

  • An asyncio.timeout() context manager cancels the async call operation.
  • Lifespan QoS setting drops the request at the middleware layer.
async with AsyncNode("node") as node:
    qos_profile = QoSProfile(lifespan=Duration(seconds=2))
    client = node.create_client(SetBool, "/set_bool", qos_profile=qos_profile)
    with asyncio.timeout(2):  
        response = await client.call(SetBool.Request(data=True))

That's a limitation of the current client rcl/rmw api.

Signed-off-by: Nadav Elkabets <elnadav12@gmail.com>
Signed-off-by: Nadav Elkabets <elnadav12@gmail.com>
Signed-off-by: Nadav Elkabets <elnadav12@gmail.com>
@mjcarroll
Copy link
Member

As we have discussed before, this is a very high quality contribution. It brings our interactions in the Python client library in line with what many Python developers would expect.

I agree with the refactoring into Base classes. We wouldn't want to inherit from what is already there because it could cause confusion. This is a good move.

Otherwise the API ergonomics look great, and putting it in the experimental namespace should give us a little bit of latitude on getting improvements in over time.

With that, I would suggest we do this in ~6 PRs. You mentioned 12 on zulip, but I feel like that would be a lot to manage.

I think (just guessing here):

  • Base Entity Classes (should be quick to review)
  • Moving shared logic into BaseNode
  • Clock and time
  • Async infrastructure (utilities/event loop)
  • Add the async entities and asyncnode objects
  • Tests and documentation

I think those are logical chunks that should each be pretty easy to review.

This was referenced Mar 14, 2026
Copy link
Contributor Author

@nadavelkabets nadavelkabets left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminder - test AsyncNode with python 3.14

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants