Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- ~20% performance improvement on large images and strings with ROS1 and ROS1-zenoh backends by removing copies, and improving pre-allocation

## 0.20.0 - March 2nd, 2026

### Added
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions roslibrust_test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ roslibrust = { path = "../roslibrust", features = ["ros1", "codegen", "macro", "
lazy_static = "1.5"
tokio = { workspace = true }
log = { workspace = true }
zenoh = "1.7"

[dev-dependencies]
diffy = "0.3.0"
criterion = { version = "0.5", features = ["html_reports", "async_tokio"] }
pprof = { version = "0.15", features = ["flamegraph", "criterion"] }
test-log = { workspace = true }
hex = "0.4"
roslibrust_zenoh = { path = "../roslibrust_zenoh" }

[[bin]]
path = "src/performance_ramp.rs"
Expand Down
98 changes: 86 additions & 12 deletions roslibrust_test/benches/image_bench.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,39 @@
use criterion::{criterion_group, criterion_main, Criterion};
use pprof::criterion::{Output, PProfProfiler};
use roslibrust::{Publish, Subscribe, TopicProvider};
use roslibrust_zenoh::ZenohClient;
use std::{
hint::black_box,
sync::{Arc, Mutex},
};

struct BenchContext {
struct BenchContextRos1 {
publisher: roslibrust::ros1::Publisher<roslibrust_test::ros1::sensor_msgs::Image>,
subscriber: roslibrust::ros1::Subscriber<roslibrust_test::ros1::sensor_msgs::Image>,
image: roslibrust_test::ros1::sensor_msgs::Image,
// Need to keep alive
_client: roslibrust::ros1::NodeHandle,
}

async fn setup_bench_context() -> BenchContext {
let client = roslibrust::ros1::NodeHandle::new("http://localhost:11311", "image_bench_rs")
struct BenchContextZenoh {
publisher: roslibrust_zenoh::ZenohPublisher<roslibrust_test::ros1::sensor_msgs::Image>,
subscriber: roslibrust_zenoh::ZenohSubscriber<roslibrust_test::ros1::sensor_msgs::Image>,
image: roslibrust_test::ros1::sensor_msgs::Image,
// Need to keep alive - separate clients to avoid short-circuiting
_publisher_client: ZenohClient,
_subscriber_client: ZenohClient,
}

async fn setup_bench_context_ros1() -> BenchContextRos1 {
let client = roslibrust::ros1::NodeHandle::new("http://localhost:11311", "image_bench_ros1")
.await
.unwrap();
let publisher = client
.advertise::<roslibrust_test::ros1::sensor_msgs::Image>("/image_bench", 1, false)
.advertise::<roslibrust_test::ros1::sensor_msgs::Image>("/image_bench_ros1", 1, false)
.await
.unwrap();
let subscriber = client
.subscribe::<roslibrust_test::ros1::sensor_msgs::Image>("/image_bench", 1)
.subscribe::<roslibrust_test::ros1::sensor_msgs::Image>("/image_bench_ros1", 1)
.await
.unwrap();

Expand All @@ -39,20 +50,66 @@ async fn setup_bench_context() -> BenchContext {
data: vec![0; 1920 * 1080 * 3],
};

BenchContext {
BenchContextRos1 {
publisher,
subscriber,
image,
_client: client,
}
}

async fn bench_iteration(context: &mut BenchContext) {
async fn setup_bench_context_zenoh() -> BenchContextZenoh {
// Create separate Zenoh sessions for publisher and subscriber to avoid short-circuiting
// This better emulates real communication between different nodes
let publisher_session = zenoh::open(zenoh::Config::default()).await.unwrap();
let publisher_client = ZenohClient::new(publisher_session);

let subscriber_session = zenoh::open(zenoh::Config::default()).await.unwrap();
let subscriber_client = ZenohClient::new(subscriber_session);

let publisher = publisher_client
.advertise::<roslibrust_test::ros1::sensor_msgs::Image>("/image_bench_zenoh")
.await
.unwrap();
let subscriber = subscriber_client
.subscribe::<roslibrust_test::ros1::sensor_msgs::Image>("/image_bench_zenoh")
.await
.unwrap();

// Wait for pub / sub to establish connection
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

let image = roslibrust_test::ros1::sensor_msgs::Image {
header: Default::default(),
height: 1080,
width: 1920,
encoding: "rgb8".to_owned(),
is_bigendian: 0,
step: 1920 * 3,
data: vec![0; 1920 * 1080 * 3],
};

BenchContextZenoh {
publisher,
subscriber,
image,
_publisher_client: publisher_client,
_subscriber_client: subscriber_client,
}
}

async fn bench_iteration_ros1(context: &mut BenchContextRos1) {
context.publisher.publish(&context.image).await.unwrap();
let received_image = context.subscriber.next().await.unwrap().unwrap();
black_box(received_image);
}

async fn bench_iteration_zenoh(context: &mut BenchContextZenoh) {
context.publisher.publish(&context.image).await.unwrap();
let received_image = context.subscriber.next().await.unwrap();
black_box(received_image);
}

fn criterion_benchmark(c: &mut Criterion) {
env_logger::init();

Expand All @@ -62,17 +119,34 @@ fn criterion_benchmark(c: &mut Criterion) {
.build()
.unwrap();

let context = runtime.block_on(async {
let context = setup_bench_context().await;
// Setup ROS1 TCPROS context
let context_ros1 = runtime.block_on(async {
let context = setup_bench_context_ros1().await;
Arc::new(Mutex::new(context))
});

c.bench_function("image_bench_ros1_tcpros", |b| {
b.to_async(&runtime).iter(|| {
#[allow(clippy::await_holding_lock)]
async {
let mut context = context_ros1.lock().unwrap();
bench_iteration_ros1(&mut context).await
}
})
});

// Setup Zenoh context
let context_zenoh = runtime.block_on(async {
let context = setup_bench_context_zenoh().await;
Arc::new(Mutex::new(context))
});

c.bench_function("image_bench", |b| {
c.bench_function("image_bench_ros1_zenoh", |b| {
b.to_async(&runtime).iter(|| {
#[allow(clippy::await_holding_lock)]
async {
let mut context = context.lock().unwrap();
bench_iteration(&mut context).await
let mut context = context_zenoh.lock().unwrap();
bench_iteration_zenoh(&mut context).await
}
})
});
Expand Down
53 changes: 25 additions & 28 deletions roslibrust_zenoh/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use roslibrust_common::topic_name::{GlobalTopicName, ToGlobalTopicName};
use roslibrust_common::*;

use log::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use zenoh::bytes::ZBytes;

/// A wrapper around a normal zenoh session that adds roslibrust specific functionality.
Expand All @@ -25,16 +26,24 @@ impl ZenohClient {
/// This type is self de-registering, and dropping the publisher will automatically un-advertise the topic.
pub struct ZenohPublisher<T> {
publisher: zenoh::pubsub::Publisher<'static>,
// Used to track buffer capacity size to minimize allocations for fixed-size streams.
capacity_hint: AtomicUsize,
_marker: std::marker::PhantomData<T>,
}

impl<T: RosMessageType> Publish<T> for ZenohPublisher<T> {
async fn publish(&self, data: &T) -> Result<()> {
let bytes = roslibrust_serde_rosmsg::to_vec_skip_length(data).map_err(|e| {
let size_hint = self.capacity_hint.load(Ordering::Relaxed);
let mut bytes = Vec::with_capacity(size_hint);
roslibrust_serde_rosmsg::to_writer_skip_length(&mut bytes, data).map_err(|e| {
Error::SerializationError(format!("Failed to serialize message: {e:?}"))
})?;

match self.publisher.put(&bytes).await {
if bytes.len() > size_hint {
self.capacity_hint.store(bytes.len(), Ordering::Relaxed);
}

match self.publisher.put(bytes).await {
Ok(()) => Ok(()),
Err(e) => Err(Error::Unexpected(anyhow::anyhow!(
"Failed to publish message to zenoh: {e:?}"
Expand Down Expand Up @@ -70,17 +79,18 @@ impl<T: RosMessageType> Subscribe<T> for ZenohSubscriber<T> {
}
};

let bytes = sample.payload().to_bytes();
// Note: Zenoh decided to not make the 4 byte length header part of the payload
// So we use the known length version of the deserialization
let msg = roslibrust_serde_rosmsg::from_slice_known_length(&bytes, bytes.len() as u32)
.map_err(|e| {
Error::SerializationError(format!("Failed to deserialize sample: {e:?}"))
})?;
let msg = deserialize_payload(sample.payload(), "sample")?;
Ok(msg)
}
}

fn deserialize_payload<T: RosMessageType>(payload: &ZBytes, context: &str) -> Result<T> {
// Note: Zenoh decided to not make the 4 byte length header part of the payload.
let mut reader = payload.reader();
roslibrust_serde_rosmsg::from_reader_known_length(&mut reader, payload.len() as u32)
.map_err(|e| Error::SerializationError(format!("Failed to deserialize {context}: {e:?}")))
}

impl TopicProvider for ZenohClient {
type Publisher<T: RosMessageType> = ZenohPublisher<T>;

Expand All @@ -104,6 +114,7 @@ impl TopicProvider for ZenohClient {

Ok(ZenohPublisher {
publisher,
capacity_hint: 1024.into(),
_marker: std::marker::PhantomData,
})
}
Expand Down Expand Up @@ -166,12 +177,11 @@ impl<T: RosServiceType> Service<T> for ZenohServiceClient<T> {
let request_bytes = roslibrust_serde_rosmsg::to_vec_skip_length(request).map_err(|e| {
Error::SerializationError(format!("Failed to serialize message: {e:?}"))
})?;
debug!("request bytes: {request_bytes:?}");

let query = match self
.session
.get(&self.zenoh_query)
.payload(&request_bytes)
.payload(request_bytes)
.await
{
Ok(query) => query,
Expand Down Expand Up @@ -202,13 +212,7 @@ impl<T: RosServiceType> Service<T> for ZenohServiceClient<T> {
}
};

let bytes = sample.payload().to_bytes();

// Note: Zenoh decided to not make the 4 byte length header part of the payload
let msg = roslibrust_serde_rosmsg::from_slice_known_length(&bytes, bytes.len() as u32)
.map_err(|e| {
Error::SerializationError(format!("Failed to deserialize sample: {e:?}"))
})?;
let msg = deserialize_payload(sample.payload(), "service response")?;
Ok(msg)
}
}
Expand Down Expand Up @@ -289,16 +293,9 @@ impl ServiceProvider for ZenohClient {
error!("Received a query with no payload for a ros0 service {query:?}");
continue;
};
let bytes = payload.to_bytes();
debug!("Got bytes: {bytes:?}");

// Note: Zenoh decided the 4 byte length header is not part of the payload
let Ok(request) =
roslibrust_serde_rosmsg::from_slice_known_length(&bytes, bytes.len() as u32)
.map_err(|e| {
error!("Failed to deserialize request: {e:?}");
})
else {
let Ok(request) = deserialize_payload(payload, "service request").map_err(|e| {
error!("{e:?}");
}) else {
continue;
};

Expand Down
Loading