diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index 63c24da6..6d968cab 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -47,6 +47,7 @@ - Enhance Message Validation - Add TypeScript definitions and non-throwing variants for validator - Add MessageIntrospector for message schema inspection + - Add Observable subscriptions with RxJS support - **[Martins Mozeiko](https://github.com/martins-mozeiko)** - QoS new/delete fix diff --git a/README.md b/README.md index 915246fc..f3ebf6fd 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ rclnodejs.init().then(() => { - [Tutorials](./tutorials/) - [Electron-based Visualization](#electron-based-visualization) - [Using TypeScript](#using-rclnodejs-with-typescript) +- [Observable Subscriptions](#observable-subscriptions) - [ROS2 Interface Message Generation](#ros2-interface-message-generation) - [Performance Benchmarks](#performance-benchmarks) - [Efficient Usage Tips](./docs/EFFICIENCY.md) @@ -133,6 +134,27 @@ rclnodejs.init().then(() => { See [TypeScript demos](https://github.com/RobotWebTools/rclnodejs/tree/develop/ts_demo) for more examples. +## Observable Subscriptions + +rclnodejs supports [RxJS](https://rxjs.dev/) Observable subscriptions for reactive programming with ROS 2 messages. Use operators like `throttleTime()`, `debounceTime()`, `map()`, and `combineLatest()` to build declarative message processing pipelines. + +```javascript +const { throttleTime, map } = require('rxjs'); + +const obsSub = node.createObservableSubscription( + 'sensor_msgs/msg/LaserScan', + '/scan' +); +obsSub.observable + .pipe( + throttleTime(200), + map((msg) => msg.ranges) + ) + .subscribe((ranges) => console.log('Ranges:', ranges.length)); +``` + +See the [Observable Subscriptions Tutorial](./tutorials/observable-subscriptions.md) for more details. + ## ROS2 Interface Message Generation ROS client libraries convert IDL message descriptions into target language source code. rclnodejs provides the `generate-ros-messages` script to generate JavaScript message interface files and TypeScript declarations. diff --git a/example/topics/README.md b/example/topics/README.md index 8fbdda16..0b147304 100644 --- a/example/topics/README.md +++ b/example/topics/README.md @@ -187,6 +187,21 @@ The `subscriber/` directory contains examples of nodes that subscribe to topics: - **Features**: Manual conversion of TypedArrays, BigInt, and special values for JSON serialization - **Run Command**: `node subscriber/subscription-json-utilities-example.js` +### 10. Observable Subscriber (`subscription-observable-example.js`) + +**Purpose**: Demonstrates RxJS Observable subscriptions for reactive programming. + +- **Message Type**: `std_msgs/msg/String` +- **Topic**: `topic` +- **Functionality**: Shows how to use `createObservableSubscription()` with RxJS operators +- **Features**: + - Throttling with `throttleTime()` for rate limiting + - Message transformation with `map()` + - Content filtering with `filter()` + - Batching with `bufferCount()` +- **Run Command**: `node subscriber/subscription-observable-example.js` +- **Pair**: Works with `publisher-example.js` + ## Validator Example The `validator/` directory contains validation utilities: @@ -211,6 +226,7 @@ Several examples work together to demonstrate complete communication: | `publisher-multiarray-example.js` | `subscription-multiarray-example.js` | Multi-dimensional array data | | `publisher-qos-example.js` | `subscription-qos-example.js` | QoS configuration | | `publisher-raw-message.js` | `subscription-raw-message.js` | Raw binary data | +| `publisher-example.js` | `subscription-observable-example.js` | RxJS Observable subscription | ## How to Run Examples @@ -237,6 +253,7 @@ Several examples work together to demonstrate complete communication: - **Message Serialization**: TypedArray handling and JSON-safe conversion for web applications - **Name Validation**: Topic names, node names, and namespace validation utilities - **Message Validation**: Schema introspection and pre-publish message validation with detailed error reporting +- **Observable Subscriptions**: RxJS-based reactive programming with operators for throttling, filtering, and combining message streams ## Notes diff --git a/example/topics/subscriber/subscription-observable-example.js b/example/topics/subscriber/subscription-observable-example.js new file mode 100644 index 00000000..3dccac3e --- /dev/null +++ b/example/topics/subscriber/subscription-observable-example.js @@ -0,0 +1,72 @@ +// Copyright (c) 2025 Mahmoud Alghalayini. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +'use strict'; + +const rclnodejs = require('../../../index.js'); +const { throttleTime, map, filter, bufferCount } = require('rxjs'); + +async function main() { + await rclnodejs.init(); + + const node = rclnodejs.createNode('observable_subscription_example_node'); + + // Basic observable subscription + const obsSub = node.createObservableSubscription( + 'std_msgs/msg/String', + 'topic' + ); + + // Example 1: Throttled messages (max 2/sec) + obsSub.observable + .pipe( + throttleTime(500), + map((msg) => msg.data) + ) + .subscribe((data) => { + console.log('[Throttled]', data); + }); + + // Example 2: Filtered messages (only containing "ROS") + // Note: RxJS filter() operates at the application level after messages are received. + // For more efficient filtering at the DDS middleware level (reducing network traffic), + // use the contentFilter option. See: tutorials/content-filtering-subscription.md + obsSub.observable + .pipe( + map((msg) => msg.data), + filter((data) => data.includes('ROS')) + ) + .subscribe((data) => { + console.log('[Filtered]', data); + }); + + // Example 3: Batched messages (every 3 messages) + obsSub.observable + .pipe( + map((msg) => msg.data), + bufferCount(3) + ) + .subscribe((batch) => { + console.log('[Batched]', batch.length, 'messages'); + }); + + console.log('Observable subscription created on /topic'); + console.log( + 'Run: ros2 topic pub /topic std_msgs/msg/String "{data: Hello ROS}" -r 5' + ); + + rclnodejs.spin(node); +} + +main(); diff --git a/index.js b/index.js index 5a4ba581..ce91afc5 100644 --- a/index.js +++ b/index.js @@ -62,6 +62,7 @@ const ParameterClient = require('./lib/parameter_client.js'); const errors = require('./lib/errors.js'); const ParameterWatcher = require('./lib/parameter_watcher.js'); const MessageIntrospector = require('./lib/message_introspector.js'); +const ObservableSubscription = require('./lib/observable_subscription.js'); const { spawn } = require('child_process'); const { ValidationProblem, @@ -236,6 +237,9 @@ let rcl = { /** {@link ParameterWatcher} class */ ParameterWatcher: ParameterWatcher, + /** {@link ObservableSubscription} class */ + ObservableSubscription: ObservableSubscription, + /** {@link QoS} class */ QoS: QoS, diff --git a/lib/node.js b/lib/node.js index c602e628..d78eee8d 100644 --- a/lib/node.js +++ b/lib/node.js @@ -45,6 +45,7 @@ const QoS = require('./qos.js'); const Rates = require('./rate.js'); const Service = require('./service.js'); const Subscription = require('./subscription.js'); +const ObservableSubscription = require('./observable_subscription.js'); const TimeSource = require('./time_source.js'); const Timer = require('./timer.js'); const TypeDescriptionService = require('./type_description_service.js'); @@ -820,6 +821,42 @@ class Node extends rclnodejs.ShadowNode { return subscription; } + /** + * Create a Subscription that returns an RxJS Observable. + * This allows using reactive programming patterns with ROS 2 messages. + * + * @param {function|string|object} typeClass - The ROS message class, + * OR a string representing the message class, e.g. 'std_msgs/msg/String', + * OR an object representing the message class, e.g. {package: 'std_msgs', type: 'msg', name: 'String'} + * @param {string} topic - The name of the topic. + * @param {object} [options] - The options argument used to parameterize the subscription. + * @param {boolean} [options.enableTypedArray=true] - The topic will use TypedArray if necessary. + * @param {QoS} [options.qos=QoS.profileDefault] - ROS Middleware "quality of service" settings. + * @param {boolean} [options.isRaw=false] - The topic is serialized when true. + * @param {string} [options.serializationMode='default'] - Controls message serialization format. + * @param {object} [options.contentFilter] - The content-filter (if supported by RMW). + * @param {SubscriptionEventCallbacks} [eventCallbacks] - The event callbacks for the subscription. + * @return {ObservableSubscription} - An ObservableSubscription with an RxJS Observable. + */ + createObservableSubscription(typeClass, topic, options, eventCallbacks) { + let observableSubscription = null; + + const subscription = this.createSubscription( + typeClass, + topic, + options, + (message) => { + if (observableSubscription) { + observableSubscription._emit(message); + } + }, + eventCallbacks + ); + + observableSubscription = new ObservableSubscription(subscription); + return observableSubscription; + } + /** * Create a Client. * @param {function|string|object} typeClass - The ROS message class, diff --git a/lib/observable_subscription.js b/lib/observable_subscription.js new file mode 100644 index 00000000..775723bc --- /dev/null +++ b/lib/observable_subscription.js @@ -0,0 +1,105 @@ +// Copyright (c) 2025 Mahmoud Alghalayini. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +'use strict'; + +const { Subject } = require('rxjs'); + +/** + * A wrapper that provides RxJS Observable support for ROS 2 subscriptions. + * This class wraps a standard Subscription and emits messages through an Observable. + * + * @class ObservableSubscription + * @hideconstructor + */ +class ObservableSubscription { + #subscription; + #subject; + #destroyed; + + /** + * Create an ObservableSubscription wrapper. + * @param {Subscription} subscription - The underlying ROS 2 subscription + */ + constructor(subscription) { + this.#subscription = subscription; + this.#subject = new Subject(); + this.#destroyed = false; + } + + /** + * Get the RxJS Observable for this subscription. + * Use this to pipe operators and subscribe to messages. + * @type {Observable} + */ + get observable() { + return this.#subject.asObservable(); + } + + /** + * Get the underlying ROS 2 subscription. + * @type {Subscription} + */ + get subscription() { + return this.#subscription; + } + + /** + * Get the topic name. + * @type {string} + */ + get topic() { + return this.#subscription.topic; + } + + /** + * Check if this observable subscription has been destroyed. + * @type {boolean} + */ + get isDestroyed() { + return this.#destroyed; + } + + /** + * Internal method to emit a message to subscribers. + * Called by the subscription's processResponse. + * @private + * @param {any} message - The message to emit + */ + _emit(message) { + if (!this.#destroyed) { + this.#subject.next(message); + } + } + + /** + * Complete the observable and clean up resources. + * After calling this, no more messages will be emitted. + */ + complete() { + if (!this.#destroyed) { + this.#destroyed = true; + this.#subject.complete(); + } + } + + /** + * Alias for complete() for consistency with RxJS naming. + */ + destroy() { + this.complete(); + } +} + +module.exports = ObservableSubscription; diff --git a/package.json b/package.json index 323daab4..4964498d 100644 --- a/package.json +++ b/package.json @@ -80,6 +80,7 @@ "debug": "^4.4.0", "json-bigint": "^1.0.0", "node-addon-api": "^8.3.1", + "rxjs": "^7.8.1", "walk": "^2.3.15" }, "husky": { diff --git a/test/test-observable-subscription.js b/test/test-observable-subscription.js new file mode 100644 index 00000000..865ef646 --- /dev/null +++ b/test/test-observable-subscription.js @@ -0,0 +1,223 @@ +// Copyright (c) 2025 Mahmoud Alghalayini. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +'use strict'; + +const assert = require('assert'); +const rclnodejs = require('../index.js'); +const { take, map, filter, toArray } = require('rxjs'); + +describe('rclnodejs observable subscription test suite', function () { + this.timeout(60 * 1000); + + beforeEach(function () { + return rclnodejs.init(); + }); + + afterEach(function () { + rclnodejs.shutdown(); + }); + + it('createObservableSubscription returns an ObservableSubscription', function () { + const node = new rclnodejs.Node('test_node'); + const String = 'std_msgs/msg/String'; + + const obsSub = node.createObservableSubscription(String, 'test_topic'); + + assert.ok(obsSub, 'ObservableSubscription should be created'); + assert.ok(obsSub.observable, 'Should have an observable property'); + assert.ok(obsSub.subscription, 'Should have a subscription property'); + assert.ok(obsSub.topic.includes('test_topic')); + assert.strictEqual(obsSub.isDestroyed, false); + + node.destroy(); + }); + + it('ObservableSubscription exports the class', function () { + assert.ok( + rclnodejs.ObservableSubscription, + 'ObservableSubscription should be exported' + ); + }); + + it('Observable receives messages from publisher', function (done) { + const node = new rclnodejs.Node('test_node'); + const String = 'std_msgs/msg/String'; + + const publisher = node.createPublisher(String, 'observable_test_topic'); + const obsSub = node.createObservableSubscription( + String, + 'observable_test_topic' + ); + + const receivedMessages = []; + let interval; + + obsSub.observable.pipe(take(3)).subscribe({ + next: (msg) => { + receivedMessages.push(msg.data); + }, + complete: () => { + clearInterval(interval); + assert.strictEqual(receivedMessages.length, 3); + assert.deepStrictEqual(receivedMessages, [ + 'message_0', + 'message_1', + 'message_2', + ]); + node.destroy(); + done(); + }, + error: (err) => { + clearInterval(interval); + node.destroy(); + done(err); + }, + }); + + node.spin(); + + let count = 0; + interval = setInterval(() => { + publisher.publish({ data: `message_${count}` }); + count++; + if (count >= 10) { + clearInterval(interval); + } + }, 50); + }); + + it('Observable can use RxJS operators', function (done) { + const node = new rclnodejs.Node('test_node'); + const Int32 = 'std_msgs/msg/Int32'; + + const publisher = node.createPublisher(Int32, 'rxjs_operators_topic'); + const obsSub = node.createObservableSubscription( + Int32, + 'rxjs_operators_topic' + ); + + let interval; + + obsSub.observable + .pipe( + take(5), + map((msg) => msg.data), + filter((data) => data % 2 === 0), + toArray() + ) + .subscribe({ + next: (results) => { + // From 0,1,2,3,4 we filter even: 0,2,4 + assert.deepStrictEqual(results, [0, 2, 4]); + }, + complete: () => { + clearInterval(interval); + node.destroy(); + done(); + }, + error: (err) => { + clearInterval(interval); + node.destroy(); + done(err); + }, + }); + + node.spin(); + + let count = 0; + interval = setInterval(() => { + publisher.publish({ data: count }); + count++; + if (count >= 10) { + clearInterval(interval); + } + }, 50); + }); + + it('complete() stops the observable', function (done) { + const node = new rclnodejs.Node('test_node'); + const String = 'std_msgs/msg/String'; + + const obsSub = node.createObservableSubscription( + String, + 'complete_test_topic' + ); + + let completed = false; + + obsSub.observable.subscribe({ + complete: () => { + completed = true; + }, + }); + + obsSub.complete(); + + assert.strictEqual(obsSub.isDestroyed, true); + assert.strictEqual(completed, true); + + node.destroy(); + done(); + }); + + it('destroy() is an alias for complete()', function () { + const node = new rclnodejs.Node('test_node'); + const String = 'std_msgs/msg/String'; + + const obsSub = node.createObservableSubscription( + String, + 'destroy_test_topic' + ); + + obsSub.destroy(); + + assert.strictEqual(obsSub.isDestroyed, true); + + node.destroy(); + }); + + it('can be used with options', function () { + const node = new rclnodejs.Node('test_node'); + const String = 'std_msgs/msg/String'; + + const obsSub = node.createObservableSubscription( + String, + 'options_test_topic', + { + qos: rclnodejs.QoS.profileDefault, + enableTypedArray: false, + } + ); + + assert.ok(obsSub, 'ObservableSubscription should be created with options'); + assert.ok(obsSub.observable, 'Should have an observable property'); + + node.destroy(); + }); + + it('underlying subscription can be destroyed via node', function () { + const node = new rclnodejs.Node('test_node'); + const String = 'std_msgs/msg/String'; + + const obsSub = node.createObservableSubscription( + String, + 'destroy_sub_test_topic' + ); + + node.destroySubscription(obsSub.subscription); + + node.destroy(); + }); +}); diff --git a/test/types/index.test-d.ts b/test/types/index.test-d.ts index e4eb7d32..56238c22 100644 --- a/test/types/index.test-d.ts +++ b/test/types/index.test-d.ts @@ -3,6 +3,7 @@ import { expectType, expectAssignable } from 'tsd'; import * as rclnodejs from 'rclnodejs'; import { ChildProcess } from 'child_process'; +import { Observable } from 'rxjs'; const NODE_NAME = 'test_node'; const LIFECYCLE_NODE_NAME = 'lifecycle_test_node'; @@ -235,6 +236,32 @@ expectType(subscription.clearContentFilter()); expectType(subscription.hasContentFilter()); expectType(subscription.loggerName); +// ---- ObservableSubscription ---- +const observableSubscription = node.createObservableSubscription( + TYPE_CLASS, + TOPIC +); +expectType>( + observableSubscription +); +expectType>( + observableSubscription.observable +); +expectType(observableSubscription.subscription); +expectType(observableSubscription.topic); +expectType(observableSubscription.isDestroyed); +expectType(observableSubscription.complete()); +expectType(observableSubscription.destroy()); + +const observableSubscriptionWithOptions = node.createObservableSubscription( + TYPE_CLASS, + TOPIC, + { qos: rclnodejs.QoS.profileDefault } +); +expectType>( + observableSubscriptionWithOptions +); + // ---- Service ---- const service = node.createService( 'example_interfaces/srv/AddTwoInts', diff --git a/tutorials/README.md b/tutorials/README.md index d69faccc..e61d098a 100644 --- a/tutorials/README.md +++ b/tutorials/README.md @@ -38,6 +38,10 @@ Debug and monitor service interactions in real-time. Learn to observe service ca Reduce network traffic and improve performance by filtering messages at the DDS middleware level using SQL-like expressions. Essential for high-frequency data streams. +#### [Observable Subscriptions with RxJS](observable-subscriptions.md) + +Use RxJS Observables for reactive programming with ROS 2 subscriptions. Apply operators like `throttleTime()`, `combineLatest()`, and `filter()` to process message streams declaratively. + ### 🔍 Introspection & Debugging #### [Type Description Service](type-description-service.md) diff --git a/tutorials/observable-subscriptions.md b/tutorials/observable-subscriptions.md new file mode 100644 index 00000000..7caf2550 --- /dev/null +++ b/tutorials/observable-subscriptions.md @@ -0,0 +1,420 @@ +# Observable Subscriptions in rclnodejs + +rclnodejs provides RxJS Observable support for subscriptions, enabling reactive programming patterns when working with ROS 2 messages. + +## Table of Contents + +- [What are Observable Subscriptions?](#what-are-observable-subscriptions) +- [Basic Usage](#basic-usage) +- [API Reference](#api-reference) +- [Examples](#examples) +- [Cleanup](#cleanup) +- [Best Practices](#best-practices) +- [Running the Examples](#running-the-examples) +- [Comparison with Callback API](#comparison-with-callback-api) + +## What are Observable Subscriptions? + +**Observable Subscriptions** wrap standard ROS 2 subscriptions and expose messages through RxJS Observables. This allows you to use the full power of reactive programming for: + +- 🔄 **Rate limiting**: `throttleTime()`, `debounceTime()`, `auditTime()` +- 🔧 **Transformation**: `map()`, `scan()`, `buffer()` +- 🔍 **Filtering**: `filter()`, `distinctUntilChanged()`, `take()` +- 🔗 **Combining topics**: `combineLatest()`, `merge()`, `zip()` +- ⚠️ **Error handling**: `catchError()`, `retry()`, `timeout()` + +Observable subscriptions are ideal for complex message processing pipelines where you need to combine, filter, or transform data from multiple sources. + +> **Note:** RxJS `filter()` operates at the application level after messages are received. For simple content-based filtering that reduces network traffic, consider using [DDS Content Filtering](content-filtering-subscription.md) instead, which filters at the middleware level before messages reach your application. See [Example 6](#example-6-combining-dds-content-filtering-with-rxjs) for combined usage. + +## Basic Usage + +```javascript +const rclnodejs = require('rclnodejs'); +const { throttleTime, map, filter } = require('rxjs'); + +async function main() { + await rclnodejs.init(); + const node = new rclnodejs.Node('observable_example'); + + // Create an Observable subscription + const obsSub = node.createObservableSubscription( + 'sensor_msgs/msg/LaserScan', + '/scan' + ); + + // Use RxJS operators to process messages + obsSub.observable + .pipe( + throttleTime(200), // Limit to 5 Hz + map((msg) => msg.ranges), + filter((ranges) => ranges.length > 0) + ) + .subscribe((ranges) => { + console.log('Received ranges:', ranges.length); + }); + + node.spin(); +} + +main(); +``` + +## API Reference + +### `node.createObservableSubscription(typeClass, topic, options?, eventCallbacks?)` + +Creates a subscription that returns an `ObservableSubscription`. + +**Parameters:** + +- `typeClass` - Message type (string, object, or class) +- `topic` - Topic name +- `options` - Optional subscription options (same as `createSubscription()`) +- `eventCallbacks` - Optional event callbacks + +**Returns:** `ObservableSubscription` + +### `ObservableSubscription` + +| Property/Method | Type | Description | +| --------------- | --------------- | ----------------------------------------- | +| `observable` | `Observable` | RxJS Observable that emits messages | +| `subscription` | `Subscription` | Underlying ROS 2 subscription | +| `topic` | `string` | Topic name | +| `isDestroyed` | `boolean` | Whether the observable has been completed | +| `complete()` | `void` | Complete the observable and stop emitting | +| `destroy()` | `void` | Alias for `complete()` | + +## Examples + +### Example 1: Throttling High-Frequency Sensors + +```javascript +const { throttleTime } = require('rxjs'); + +// LiDAR publishes at 20Hz, but we only need 5Hz for visualization +const lidarSub = node.createObservableSubscription( + 'sensor_msgs/msg/LaserScan', + '/scan' +); + +lidarSub.observable + .pipe(throttleTime(200)) // 200ms = 5Hz + .subscribe((scan) => { + visualize(scan); + }); +``` + +### Example 2: Combining Multiple Topics + +```javascript +const { combineLatest, map } = require('rxjs'); + +const odomSub = node.createObservableSubscription( + 'nav_msgs/msg/Odometry', + '/odom' +); +const imuSub = node.createObservableSubscription('sensor_msgs/msg/Imu', '/imu'); + +combineLatest([odomSub.observable, imuSub.observable]) + .pipe( + map(([odom, imu]) => ({ + position: odom.pose.pose.position, + orientation: imu.orientation, + })) + ) + .subscribe((combined) => { + console.log('Combined data:', combined); + }); +``` + +### Example 3: Debouncing Burst Events + +```javascript +const { debounceTime } = require('rxjs'); + +// Joystick commands may come in bursts - only act on the final value +const joySub = node.createObservableSubscription('sensor_msgs/msg/Joy', '/joy'); + +joySub.observable + .pipe(debounceTime(50)) // Wait 50ms of quiet before processing + .subscribe((joy) => { + processJoystickCommand(joy); + }); +``` + +### Example 4: Sampling Every Nth Message + +```javascript +const { filter } = require('rxjs'); + +// IMU at 100Hz - sample every 10th message for logging +let count = 0; +const imuSub = node.createObservableSubscription('sensor_msgs/msg/Imu', '/imu'); + +imuSub.observable.pipe(filter(() => ++count % 10 === 0)).subscribe((imu) => { + logToFile(imu); +}); +``` + +### Example 5: Buffering Messages + +```javascript +const { bufferTime, filter } = require('rxjs'); + +const tempSub = node.createObservableSubscription( + 'sensor_msgs/msg/Temperature', + '/temperature' +); + +// Collect messages over 1 second, then process as batch +tempSub.observable + .pipe( + bufferTime(1000), + filter((batch) => batch.length > 0) + ) + .subscribe((batch) => { + const avgTemp = + batch.reduce((sum, msg) => sum + msg.temperature, 0) / batch.length; + console.log('Average temperature:', avgTemp); + }); +``` + +### Example 6: Combining DDS Content Filtering with RxJS + +For optimal performance, use DDS content filtering to reduce network traffic at the middleware level, then apply RxJS operators for additional processing: + +```javascript +const { throttleTime, map } = require('rxjs'); + +// DDS filters at middleware level - only temperatures > 30°C are delivered +const tempSub = node.createObservableSubscription( + 'sensor_msgs/msg/Temperature', + '/temperature', + { + contentFilter: { + expression: 'temperature > %0', + parameters: ['30.0'], + }, + } +); + +// RxJS processes the pre-filtered stream +tempSub.observable + .pipe( + throttleTime(1000), // Rate limit to 1 msg/sec + map((msg) => ({ + celsius: msg.temperature, + fahrenheit: msg.temperature * 1.8 + 32, + })) + ) + .subscribe((temp) => { + console.log(`High temp alert: ${temp.celsius}°C (${temp.fahrenheit}°F)`); + }); +``` + +This approach provides: + +- **Network efficiency**: DDS drops messages below 30°C before transmission +- **CPU efficiency**: RxJS only processes relevant messages +- **Flexibility**: RxJS handles rate limiting and transformation + +> See [Content Filtering Subscription](content-filtering-subscription.md) for more details on DDS content filtering. + +## Cleanup + +Always clean up subscriptions when done: + +```javascript +// Option 1: Complete the observable +obsSub.complete(); + +// Option 2: Destroy via node +node.destroySubscription(obsSub.subscription); + +// Option 3: Destroy the entire node +node.destroy(); +``` + +## Best Practices + +### 1. Always Unsubscribe + +Prevent memory leaks by unsubscribing when done: + +```javascript +const rxjsSubscription = obsSub.observable + .pipe(throttleTime(100)) + .subscribe((msg) => console.log(msg)); + +// Later, when cleanup is needed: +rxjsSubscription.unsubscribe(); +obsSub.complete(); +``` + +### 2. Use `take()` for Finite Streams + +```javascript +const { take } = require('rxjs'); + +// Only process the first 10 messages +obsSub.observable.pipe(take(10)).subscribe({ + next: (msg) => console.log(msg), + complete: () => console.log('Received 10 messages'), +}); +``` + +### 3. Handle Errors Gracefully + +```javascript +const { catchError } = require('rxjs'); +const { of } = require('rxjs'); + +obsSub.observable + .pipe( + map((msg) => processMessage(msg)), + catchError((err) => { + console.error('Processing error:', err); + return of(null); // Continue with null on error + }), + filter((result) => result !== null) + ) + .subscribe((result) => { + console.log('Processed:', result); + }); +``` + +### 4. Combine with Content Filtering + +For maximum efficiency, combine RxJS operators with DDS-level content filtering: + +```javascript +const { map } = require('rxjs'); + +// DDS filters at middleware level (reduces network traffic) +const obsSub = node.createObservableSubscription( + 'sensor_msgs/msg/Temperature', + '/temperature', + { + contentFilter: { + expression: 'temperature > %0', + parameters: [50.0], + }, + } +); + +// RxJS operators for additional processing +obsSub.observable + .pipe( + map((msg) => ({ temp: msg.temperature, critical: msg.temperature > 80 })) + ) + .subscribe((data) => { + if (data.critical) { + console.warn('Critical temperature:', data.temp); + } + }); +``` + +## Running the Examples + +The rclnodejs repository includes an Observable subscription example in `example/topics/subscriber/`. + +### Run the Built-in Example + +```bash +# Terminal 1 - Start a publisher +node example/topics/publisher/publisher-example.js + +# Terminal 2 - Run the observable subscription example +node example/topics/subscriber/subscription-observable-example.js +``` + +### Expected Output + +``` +Observable subscription created on /topic +Run: ros2 topic pub /topic std_msgs/msg/String "{data: Hello ROS}" -r 5 +[Throttled] Hello ROS 2 from rclnodejs +[Filtered] Hello ROS 2 from rclnodejs +[Throttled] Hello ROS 2 from rclnodejs +[Filtered] Hello ROS 2 from rclnodejs +[Batched] 3 messages +[Filtered] Hello ROS 2 from rclnodejs +[Throttled] Hello ROS 2 from rclnodejs +[Filtered] Hello ROS 2 from rclnodejs +[Batched] 3 messages +``` + +- **[Throttled]** — Rate limited to ~2 messages/second via `throttleTime(500)` +- **[Filtered]** — Only messages containing "ROS" (all pass in this case) +- **[Batched]** — Emits after every 3 messages via `bufferCount(3)` + +### Custom Example + +```javascript +// observable-example.js +const rclnodejs = require('rclnodejs'); +const { take, map } = require('rxjs'); + +async function main() { + await rclnodejs.init(); + const node = new rclnodejs.Node('observable_demo'); + + const obsSub = node.createObservableSubscription( + 'std_msgs/msg/String', + '/test_topic' + ); + + obsSub.observable + .pipe( + take(5), + map((msg) => msg.data.toUpperCase()) + ) + .subscribe({ + next: (data) => console.log('Received:', data), + complete: () => { + console.log('Done - received 5 messages'); + node.destroy(); + rclnodejs.shutdown(); + }, + }); + + node.spin(); +} + +main().catch(console.error); +``` + +```bash +# Terminal 2 - Run the example +node observable-example.js +``` + +### Expected Output + +``` +Received: HELLO +Received: HELLO +Received: HELLO +Received: HELLO +Received: HELLO +Done - received 5 messages +``` + +## Comparison with Callback API + +| Feature | `createSubscription()` | `createObservableSubscription()` | +| ---------------- | ---------------------- | -------------------------------- | +| Style | Callback-based | Observable-based | +| Rate limiting | Manual implementation | Via RxJS operators | +| Combining topics | Manual | Built-in with RxJS | +| Learning curve | Lower | Requires RxJS knowledge | +| Use case | Simple subscriptions | Complex reactive pipelines | +| Dependencies | None | RxJS (included) | + +Both APIs can coexist in the same application. Use the callback-based API for simple cases and the Observable API when you need advanced reactive patterns. + +--- + +_This tutorial is part of the rclnodejs documentation. For more tutorials and examples, visit the [rclnodejs GitHub repository](https://github.com/RobotWebTools/rclnodejs)._ diff --git a/types/base.d.ts b/types/base.d.ts index 7de0dda5..9f06c4d2 100644 --- a/types/base.d.ts +++ b/types/base.d.ts @@ -26,6 +26,7 @@ /// /// /// +/// /// /// /// diff --git a/types/node.d.ts b/types/node.d.ts index 6fd2cb1c..47676b3f 100644 --- a/types/node.d.ts +++ b/types/node.d.ts @@ -363,6 +363,23 @@ declare module 'rclnodejs' { eventCallbacks?: (event: object) => void ): Subscription; + /** + * Create a Subscription that returns an RxJS Observable. + * This allows using reactive programming patterns with ROS 2 messages. + * + * @param typeClass - Type of ROS messages the subscription will subscribe to. + * @param topic - Name of the topic the subscription will subscribe to. + * @param options - Configuration options, see DEFAULT_OPTIONS. + * @param eventCallbacks - Optional event callbacks for the subscription. + * @returns An ObservableSubscription with an RxJS Observable. + */ + createObservableSubscription>( + typeClass: T, + topic: string, + options?: Options, + eventCallbacks?: (event: object) => void + ): ObservableSubscription>; + /** * Create a Client for making server requests. * diff --git a/types/observable_subscription.d.ts b/types/observable_subscription.d.ts new file mode 100644 index 00000000..c0213865 --- /dev/null +++ b/types/observable_subscription.d.ts @@ -0,0 +1,39 @@ +declare module 'rclnodejs' { + /** + * A wrapper that provides RxJS Observable support for ROS 2 subscriptions. + * This class wraps a standard Subscription and emits messages through an Observable. + */ + class ObservableSubscription { + /** + * Get the RxJS Observable for this subscription. + * Use this to pipe operators and subscribe to messages. + */ + readonly observable: import('rxjs').Observable; + + /** + * Get the underlying ROS 2 subscription. + */ + readonly subscription: Subscription; + + /** + * Get the topic name. + */ + readonly topic: string; + + /** + * Check if this observable subscription has been destroyed. + */ + readonly isDestroyed: boolean; + + /** + * Complete the observable and clean up resources. + * After calling this, no more messages will be emitted. + */ + complete(): void; + + /** + * Alias for complete() for consistency with RxJS naming. + */ + destroy(): void; + } +}