diff --git a/CHANGELOG.md b/CHANGELOG.md index d94d59f..7c6297b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- ~20% performance improvement on large images and strings with ROS1 and ROS1-zenoh backends by removing copies, and improving pre-allocation + ## 0.20.0 - March 2nd, 2026 ### Added diff --git a/Cargo.lock b/Cargo.lock index 78fe051..689646e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3579,8 +3579,10 @@ dependencies = [ "log", "pprof", "roslibrust", + "roslibrust_zenoh", "test-log", "tokio", + "zenoh", ] [[package]] diff --git a/roslibrust_test/Cargo.toml b/roslibrust_test/Cargo.toml index a6bc587..5a33836 100644 --- a/roslibrust_test/Cargo.toml +++ b/roslibrust_test/Cargo.toml @@ -10,6 +10,7 @@ roslibrust = { path = "../roslibrust", features = ["ros1", "codegen", "macro", " lazy_static = "1.5" tokio = { workspace = true } log = { workspace = true } +zenoh = "1.7" [dev-dependencies] diffy = "0.3.0" @@ -17,6 +18,7 @@ criterion = { version = "0.5", features = ["html_reports", "async_tokio"] } pprof = { version = "0.15", features = ["flamegraph", "criterion"] } test-log = { workspace = true } hex = "0.4" +roslibrust_zenoh = { path = "../roslibrust_zenoh" } [[bin]] path = "src/performance_ramp.rs" diff --git a/roslibrust_test/benches/image_bench.rs b/roslibrust_test/benches/image_bench.rs index c04b6d8..4c9403a 100644 --- a/roslibrust_test/benches/image_bench.rs +++ b/roslibrust_test/benches/image_bench.rs @@ -1,11 +1,13 @@ use criterion::{criterion_group, criterion_main, Criterion}; use pprof::criterion::{Output, PProfProfiler}; +use roslibrust::{Publish, Subscribe, TopicProvider}; +use roslibrust_zenoh::ZenohClient; use std::{ hint::black_box, sync::{Arc, Mutex}, }; -struct BenchContext { +struct BenchContextRos1 { publisher: roslibrust::ros1::Publisher, subscriber: roslibrust::ros1::Subscriber, image: roslibrust_test::ros1::sensor_msgs::Image, @@ -13,16 +15,25 @@ struct BenchContext { _client: roslibrust::ros1::NodeHandle, } -async fn setup_bench_context() -> BenchContext { - let client = roslibrust::ros1::NodeHandle::new("http://localhost:11311", "image_bench_rs") +struct BenchContextZenoh { + publisher: roslibrust_zenoh::ZenohPublisher, + subscriber: roslibrust_zenoh::ZenohSubscriber, + image: roslibrust_test::ros1::sensor_msgs::Image, + // Need to keep alive - separate clients to avoid short-circuiting + _publisher_client: ZenohClient, + _subscriber_client: ZenohClient, +} + +async fn setup_bench_context_ros1() -> BenchContextRos1 { + let client = roslibrust::ros1::NodeHandle::new("http://localhost:11311", "image_bench_ros1") .await .unwrap(); let publisher = client - .advertise::("/image_bench", 1, false) + .advertise::("/image_bench_ros1", 1, false) .await .unwrap(); let subscriber = client - .subscribe::("/image_bench", 1) + .subscribe::("/image_bench_ros1", 1) .await .unwrap(); @@ -39,7 +50,7 @@ async fn setup_bench_context() -> BenchContext { data: vec![0; 1920 * 1080 * 3], }; - BenchContext { + BenchContextRos1 { publisher, subscriber, image, @@ -47,12 +58,58 @@ async fn setup_bench_context() -> BenchContext { } } -async fn bench_iteration(context: &mut BenchContext) { +async fn setup_bench_context_zenoh() -> BenchContextZenoh { + // Create separate Zenoh sessions for publisher and subscriber to avoid short-circuiting + // This better emulates real communication between different nodes + let publisher_session = zenoh::open(zenoh::Config::default()).await.unwrap(); + let publisher_client = ZenohClient::new(publisher_session); + + let subscriber_session = zenoh::open(zenoh::Config::default()).await.unwrap(); + let subscriber_client = ZenohClient::new(subscriber_session); + + let publisher = publisher_client + .advertise::("/image_bench_zenoh") + .await + .unwrap(); + let subscriber = subscriber_client + .subscribe::("/image_bench_zenoh") + .await + .unwrap(); + + // Wait for pub / sub to establish connection + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + let image = roslibrust_test::ros1::sensor_msgs::Image { + header: Default::default(), + height: 1080, + width: 1920, + encoding: "rgb8".to_owned(), + is_bigendian: 0, + step: 1920 * 3, + data: vec![0; 1920 * 1080 * 3], + }; + + BenchContextZenoh { + publisher, + subscriber, + image, + _publisher_client: publisher_client, + _subscriber_client: subscriber_client, + } +} + +async fn bench_iteration_ros1(context: &mut BenchContextRos1) { context.publisher.publish(&context.image).await.unwrap(); let received_image = context.subscriber.next().await.unwrap().unwrap(); black_box(received_image); } +async fn bench_iteration_zenoh(context: &mut BenchContextZenoh) { + context.publisher.publish(&context.image).await.unwrap(); + let received_image = context.subscriber.next().await.unwrap(); + black_box(received_image); +} + fn criterion_benchmark(c: &mut Criterion) { env_logger::init(); @@ -62,17 +119,34 @@ fn criterion_benchmark(c: &mut Criterion) { .build() .unwrap(); - let context = runtime.block_on(async { - let context = setup_bench_context().await; + // Setup ROS1 TCPROS context + let context_ros1 = runtime.block_on(async { + let context = setup_bench_context_ros1().await; + Arc::new(Mutex::new(context)) + }); + + c.bench_function("image_bench_ros1_tcpros", |b| { + b.to_async(&runtime).iter(|| { + #[allow(clippy::await_holding_lock)] + async { + let mut context = context_ros1.lock().unwrap(); + bench_iteration_ros1(&mut context).await + } + }) + }); + + // Setup Zenoh context + let context_zenoh = runtime.block_on(async { + let context = setup_bench_context_zenoh().await; Arc::new(Mutex::new(context)) }); - c.bench_function("image_bench", |b| { + c.bench_function("image_bench_ros1_zenoh", |b| { b.to_async(&runtime).iter(|| { #[allow(clippy::await_holding_lock)] async { - let mut context = context.lock().unwrap(); - bench_iteration(&mut context).await + let mut context = context_zenoh.lock().unwrap(); + bench_iteration_zenoh(&mut context).await } }) }); diff --git a/roslibrust_zenoh/src/lib.rs b/roslibrust_zenoh/src/lib.rs index 32dcb8d..391e190 100644 --- a/roslibrust_zenoh/src/lib.rs +++ b/roslibrust_zenoh/src/lib.rs @@ -5,6 +5,7 @@ use roslibrust_common::topic_name::{GlobalTopicName, ToGlobalTopicName}; use roslibrust_common::*; use log::*; +use std::sync::atomic::{AtomicUsize, Ordering}; use zenoh::bytes::ZBytes; /// A wrapper around a normal zenoh session that adds roslibrust specific functionality. @@ -25,16 +26,24 @@ impl ZenohClient { /// This type is self de-registering, and dropping the publisher will automatically un-advertise the topic. pub struct ZenohPublisher { publisher: zenoh::pubsub::Publisher<'static>, + // Used to track buffer capacity size to minimize allocations for fixed-size streams. + capacity_hint: AtomicUsize, _marker: std::marker::PhantomData, } impl Publish for ZenohPublisher { async fn publish(&self, data: &T) -> Result<()> { - let bytes = roslibrust_serde_rosmsg::to_vec_skip_length(data).map_err(|e| { + let size_hint = self.capacity_hint.load(Ordering::Relaxed); + let mut bytes = Vec::with_capacity(size_hint); + roslibrust_serde_rosmsg::to_writer_skip_length(&mut bytes, data).map_err(|e| { Error::SerializationError(format!("Failed to serialize message: {e:?}")) })?; - match self.publisher.put(&bytes).await { + if bytes.len() > size_hint { + self.capacity_hint.store(bytes.len(), Ordering::Relaxed); + } + + match self.publisher.put(bytes).await { Ok(()) => Ok(()), Err(e) => Err(Error::Unexpected(anyhow::anyhow!( "Failed to publish message to zenoh: {e:?}" @@ -70,17 +79,18 @@ impl Subscribe for ZenohSubscriber { } }; - let bytes = sample.payload().to_bytes(); - // Note: Zenoh decided to not make the 4 byte length header part of the payload - // So we use the known length version of the deserialization - let msg = roslibrust_serde_rosmsg::from_slice_known_length(&bytes, bytes.len() as u32) - .map_err(|e| { - Error::SerializationError(format!("Failed to deserialize sample: {e:?}")) - })?; + let msg = deserialize_payload(sample.payload(), "sample")?; Ok(msg) } } +fn deserialize_payload(payload: &ZBytes, context: &str) -> Result { + // Note: Zenoh decided to not make the 4 byte length header part of the payload. + let mut reader = payload.reader(); + roslibrust_serde_rosmsg::from_reader_known_length(&mut reader, payload.len() as u32) + .map_err(|e| Error::SerializationError(format!("Failed to deserialize {context}: {e:?}"))) +} + impl TopicProvider for ZenohClient { type Publisher = ZenohPublisher; @@ -104,6 +114,7 @@ impl TopicProvider for ZenohClient { Ok(ZenohPublisher { publisher, + capacity_hint: 1024.into(), _marker: std::marker::PhantomData, }) } @@ -166,12 +177,11 @@ impl Service for ZenohServiceClient { let request_bytes = roslibrust_serde_rosmsg::to_vec_skip_length(request).map_err(|e| { Error::SerializationError(format!("Failed to serialize message: {e:?}")) })?; - debug!("request bytes: {request_bytes:?}"); let query = match self .session .get(&self.zenoh_query) - .payload(&request_bytes) + .payload(request_bytes) .await { Ok(query) => query, @@ -202,13 +212,7 @@ impl Service for ZenohServiceClient { } }; - let bytes = sample.payload().to_bytes(); - - // Note: Zenoh decided to not make the 4 byte length header part of the payload - let msg = roslibrust_serde_rosmsg::from_slice_known_length(&bytes, bytes.len() as u32) - .map_err(|e| { - Error::SerializationError(format!("Failed to deserialize sample: {e:?}")) - })?; + let msg = deserialize_payload(sample.payload(), "service response")?; Ok(msg) } } @@ -289,16 +293,9 @@ impl ServiceProvider for ZenohClient { error!("Received a query with no payload for a ros0 service {query:?}"); continue; }; - let bytes = payload.to_bytes(); - debug!("Got bytes: {bytes:?}"); - - // Note: Zenoh decided the 4 byte length header is not part of the payload - let Ok(request) = - roslibrust_serde_rosmsg::from_slice_known_length(&bytes, bytes.len() as u32) - .map_err(|e| { - error!("Failed to deserialize request: {e:?}"); - }) - else { + let Ok(request) = deserialize_payload(payload, "service request").map_err(|e| { + error!("{e:?}"); + }) else { continue; };