Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
- Enhance Message Validation
- Add TypeScript definitions and non-throwing variants for validator
- Add MessageIntrospector for message schema inspection
- Add Observable subscriptions with RxJS support

- **[Martins Mozeiko](https://github.com/martins-mozeiko)**
- QoS new/delete fix
Expand Down
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ rclnodejs.init().then(() => {
- [Tutorials](./tutorials/)
- [Electron-based Visualization](#electron-based-visualization)
- [Using TypeScript](#using-rclnodejs-with-typescript)
- [Observable Subscriptions](#observable-subscriptions)
- [ROS2 Interface Message Generation](#ros2-interface-message-generation)
- [Performance Benchmarks](#performance-benchmarks)
- [Efficient Usage Tips](./docs/EFFICIENCY.md)
Expand Down Expand Up @@ -133,6 +134,27 @@ rclnodejs.init().then(() => {

See [TypeScript demos](https://github.com/RobotWebTools/rclnodejs/tree/develop/ts_demo) for more examples.

## Observable Subscriptions

rclnodejs supports [RxJS](https://rxjs.dev/) Observable subscriptions for reactive programming with ROS 2 messages. Use operators like `throttleTime()`, `debounceTime()`, `map()`, and `combineLatest()` to build declarative message processing pipelines.

```javascript
const { throttleTime, map } = require('rxjs');

const obsSub = node.createObservableSubscription(
'sensor_msgs/msg/LaserScan',
'/scan'
);
obsSub.observable
.pipe(
throttleTime(200),
map((msg) => msg.ranges)
)
.subscribe((ranges) => console.log('Ranges:', ranges.length));
```

See the [Observable Subscriptions Tutorial](./tutorials/observable-subscriptions.md) for more details.

## ROS2 Interface Message Generation

ROS client libraries convert IDL message descriptions into target language source code. rclnodejs provides the `generate-ros-messages` script to generate JavaScript message interface files and TypeScript declarations.
Expand Down
17 changes: 17 additions & 0 deletions example/topics/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,21 @@ The `subscriber/` directory contains examples of nodes that subscribe to topics:
- **Features**: Manual conversion of TypedArrays, BigInt, and special values for JSON serialization
- **Run Command**: `node subscriber/subscription-json-utilities-example.js`

### 10. Observable Subscriber (`subscription-observable-example.js`)

**Purpose**: Demonstrates RxJS Observable subscriptions for reactive programming.

- **Message Type**: `std_msgs/msg/String`
- **Topic**: `topic`
- **Functionality**: Shows how to use `createObservableSubscription()` with RxJS operators
- **Features**:
- Throttling with `throttleTime()` for rate limiting
- Message transformation with `map()`
- Content filtering with `filter()`
- Batching with `bufferCount()`
- **Run Command**: `node subscriber/subscription-observable-example.js`
- **Pair**: Works with `publisher-example.js`

## Validator Example

The `validator/` directory contains validation utilities:
Expand All @@ -211,6 +226,7 @@ Several examples work together to demonstrate complete communication:
| `publisher-multiarray-example.js` | `subscription-multiarray-example.js` | Multi-dimensional array data |
| `publisher-qos-example.js` | `subscription-qos-example.js` | QoS configuration |
| `publisher-raw-message.js` | `subscription-raw-message.js` | Raw binary data |
| `publisher-example.js` | `subscription-observable-example.js` | RxJS Observable subscription |

## How to Run Examples

Expand All @@ -237,6 +253,7 @@ Several examples work together to demonstrate complete communication:
- **Message Serialization**: TypedArray handling and JSON-safe conversion for web applications
- **Name Validation**: Topic names, node names, and namespace validation utilities
- **Message Validation**: Schema introspection and pre-publish message validation with detailed error reporting
- **Observable Subscriptions**: RxJS-based reactive programming with operators for throttling, filtering, and combining message streams

## Notes

Expand Down
72 changes: 72 additions & 0 deletions example/topics/subscriber/subscription-observable-example.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (c) 2025 Mahmoud Alghalayini. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

'use strict';

const rclnodejs = require('../../../index.js');
const { throttleTime, map, filter, bufferCount } = require('rxjs');

async function main() {
await rclnodejs.init();

const node = rclnodejs.createNode('observable_subscription_example_node');

// Basic observable subscription
const obsSub = node.createObservableSubscription(
'std_msgs/msg/String',
'topic'
);

// Example 1: Throttled messages (max 2/sec)
obsSub.observable
.pipe(
throttleTime(500),
map((msg) => msg.data)
)
.subscribe((data) => {
console.log('[Throttled]', data);
});

// Example 2: Filtered messages (only containing "ROS")
// Note: RxJS filter() operates at the application level after messages are received.
// For more efficient filtering at the DDS middleware level (reducing network traffic),
// use the contentFilter option. See: tutorials/content-filtering-subscription.md
obsSub.observable
.pipe(
map((msg) => msg.data),
filter((data) => data.includes('ROS'))
Copy link
Member

@minggangw minggangw Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the ROS2 DDS also supports subscription content filtering, which is more efficient I believe, shall we point out the difference between the two filter?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the readme to have an example for content filter + RxJS

)
.subscribe((data) => {
console.log('[Filtered]', data);
});

// Example 3: Batched messages (every 3 messages)
obsSub.observable
.pipe(
map((msg) => msg.data),
bufferCount(3)
)
.subscribe((batch) => {
console.log('[Batched]', batch.length, 'messages');
});

console.log('Observable subscription created on /topic');
console.log(
'Run: ros2 topic pub /topic std_msgs/msg/String "{data: Hello ROS}" -r 5'
);

rclnodejs.spin(node);
}

main();
4 changes: 4 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const ParameterClient = require('./lib/parameter_client.js');
const errors = require('./lib/errors.js');
const ParameterWatcher = require('./lib/parameter_watcher.js');
const MessageIntrospector = require('./lib/message_introspector.js');
const ObservableSubscription = require('./lib/observable_subscription.js');
const { spawn } = require('child_process');
const {
ValidationProblem,
Expand Down Expand Up @@ -236,6 +237,9 @@ let rcl = {
/** {@link ParameterWatcher} class */
ParameterWatcher: ParameterWatcher,

/** {@link ObservableSubscription} class */
ObservableSubscription: ObservableSubscription,

/** {@link QoS} class */
QoS: QoS,

Expand Down
37 changes: 37 additions & 0 deletions lib/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const QoS = require('./qos.js');
const Rates = require('./rate.js');
const Service = require('./service.js');
const Subscription = require('./subscription.js');
const ObservableSubscription = require('./observable_subscription.js');
const TimeSource = require('./time_source.js');
const Timer = require('./timer.js');
const TypeDescriptionService = require('./type_description_service.js');
Expand Down Expand Up @@ -820,6 +821,42 @@ class Node extends rclnodejs.ShadowNode {
return subscription;
}

/**
* Create a Subscription that returns an RxJS Observable.
* This allows using reactive programming patterns with ROS 2 messages.
*
* @param {function|string|object} typeClass - The ROS message class,
* OR a string representing the message class, e.g. 'std_msgs/msg/String',
* OR an object representing the message class, e.g. {package: 'std_msgs', type: 'msg', name: 'String'}
* @param {string} topic - The name of the topic.
* @param {object} [options] - The options argument used to parameterize the subscription.
* @param {boolean} [options.enableTypedArray=true] - The topic will use TypedArray if necessary.
* @param {QoS} [options.qos=QoS.profileDefault] - ROS Middleware "quality of service" settings.
* @param {boolean} [options.isRaw=false] - The topic is serialized when true.
* @param {string} [options.serializationMode='default'] - Controls message serialization format.
* @param {object} [options.contentFilter] - The content-filter (if supported by RMW).
* @param {SubscriptionEventCallbacks} [eventCallbacks] - The event callbacks for the subscription.
* @return {ObservableSubscription} - An ObservableSubscription with an RxJS Observable.
*/
createObservableSubscription(typeClass, topic, options, eventCallbacks) {
let observableSubscription = null;

const subscription = this.createSubscription(
typeClass,
topic,
options,
(message) => {
if (observableSubscription) {
observableSubscription._emit(message);
}
},
eventCallbacks
);

observableSubscription = new ObservableSubscription(subscription);
return observableSubscription;
}

/**
* Create a Client.
* @param {function|string|object} typeClass - The ROS message class,
Expand Down
105 changes: 105 additions & 0 deletions lib/observable_subscription.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright (c) 2025 Mahmoud Alghalayini. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

'use strict';

const { Subject } = require('rxjs');

/**
* A wrapper that provides RxJS Observable support for ROS 2 subscriptions.
* This class wraps a standard Subscription and emits messages through an Observable.
*
* @class ObservableSubscription
* @hideconstructor
*/
class ObservableSubscription {
#subscription;
#subject;
#destroyed;

/**
* Create an ObservableSubscription wrapper.
* @param {Subscription} subscription - The underlying ROS 2 subscription
*/
constructor(subscription) {
this.#subscription = subscription;
this.#subject = new Subject();
this.#destroyed = false;
}

/**
* Get the RxJS Observable for this subscription.
* Use this to pipe operators and subscribe to messages.
* @type {Observable}
*/
get observable() {
return this.#subject.asObservable();
}

/**
* Get the underlying ROS 2 subscription.
* @type {Subscription}
*/
get subscription() {
return this.#subscription;
}

/**
* Get the topic name.
* @type {string}
*/
get topic() {
return this.#subscription.topic;
}

/**
* Check if this observable subscription has been destroyed.
* @type {boolean}
*/
get isDestroyed() {
return this.#destroyed;
}

/**
* Internal method to emit a message to subscribers.
* Called by the subscription's processResponse.
* @private
* @param {any} message - The message to emit
*/
_emit(message) {
if (!this.#destroyed) {
this.#subject.next(message);
}
}

/**
* Complete the observable and clean up resources.
* After calling this, no more messages will be emitted.
*/
complete() {
if (!this.#destroyed) {
this.#destroyed = true;
this.#subject.complete();
}
}

/**
* Alias for complete() for consistency with RxJS naming.
*/
destroy() {
this.complete();
}
}

module.exports = ObservableSubscription;
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
"debug": "^4.4.0",
"json-bigint": "^1.0.0",
"node-addon-api": "^8.3.1",
"rxjs": "^7.8.1",
"walk": "^2.3.15"
},
"husky": {
Expand Down
Loading
Loading