Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion book/src/chapters/actions.md
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ impl ZAction for CountAction {

**Key points:**

- `Goal`, `Feedback`, and `Result` can be any struct that implements `Serialize + Deserialize + Clone + Send + Sync + 'static` (blanket implementation via `ZMessage`)
- `Goal`, `Feedback`, and `Result` can be any struct that is `Send + Sync + 'static` — the `ZMessage` trait is satisfied automatically via a blanket impl, no manual `impl ZMessage` needed
- `name()` sets the Zenoh key prefix for the action's internal services and topics
- The default `send_goal_type_info()`, `get_result_type_info()`, etc. all return `TypeHash::zero()`, which is correct for ros-z-to-ros-z communication. For ROS 2 interop, override these with the correct RIHS01 hashes.

Expand Down
55 changes: 52 additions & 3 deletions crates/rmw-zenoh-rs/src/msg.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::type_support::MessageTypeSupport;
use ros_z::entity::{TypeHash, TypeInfo};
use ros_z::msg::{ZDeserializer, ZMessage, ZSerializer, ZService};
use ros_z::msg::{ZDeserializer, ZSerdes, ZSerializer, ZService};
use ros_z::ros_msg::WithTypeInfo;
use ros_z::{MessageTypeInfo, ServiceTypeInfo};

Expand Down Expand Up @@ -79,8 +79,57 @@ impl ZSerializer for RosSerdes {
}
}

impl ZMessage for RosMessage {
type Serdes = RosSerdes;
/// `ZSerdes<RosMessage>` implementation for `RosSerdes`.
///
/// Serialization uses the C FFI path (`ts.serialize_message`).
/// Deserialization via the `ZSerdes` interface is not supported — the RMW path
/// deserializes directly into a pre-allocated C++ message via `ts.deserialize_message`
/// in `SubscriptionImpl::take`. Calling `deserialize` on this impl returns an error.
impl ZSerdes<RosMessage> for RosSerdes {
type Error = std::io::Error;

fn serialize(msg: &RosMessage) -> zenoh_buffers::ZBuf {
let RosMessage { msg: ptr, ts } = msg;
let bytes = unsafe { ts.serialize_message(*ptr) };
zenoh_buffers::ZBuf::from(bytes)
}

fn serialize_with_hint(msg: &RosMessage, _capacity_hint: usize) -> zenoh_buffers::ZBuf {
<RosSerdes as ZSerdes<RosMessage>>::serialize(msg)
}

fn serialize_to_vec(msg: &RosMessage) -> Vec<u8> {
let RosMessage { msg: ptr, ts } = msg;
unsafe { ts.serialize_message(*ptr) }
}

fn serialize_to_shm(
msg: &RosMessage,
_estimated_size: usize,
provider: &zenoh::shm::ShmProvider<zenoh::shm::PosixShmProviderBackend>,
) -> zenoh::Result<(zenoh_buffers::ZBuf, usize)> {
let data = Self::serialize_to_vec(msg);
let actual_size = data.len();

use zenoh::Wait;
use zenoh::shm::{BlockOn, GarbageCollect};

let mut shm_buf = provider
.alloc(actual_size)
.with_policy::<BlockOn<GarbageCollect>>()
.wait()
.map_err(|e| zenoh::Error::from(format!("SHM allocation failed: {}", e)))?;

shm_buf[0..actual_size].copy_from_slice(&data);
Ok((zenoh_buffers::ZBuf::from(shm_buf), actual_size))
}

fn deserialize(_buf: &[u8]) -> Result<RosMessage, std::io::Error> {
Err(std::io::Error::new(
std::io::ErrorKind::Unsupported,
"RosMessage deserialization requires a pre-allocated target; use SubscriptionImpl::take instead",
))
}
}

impl MessageTypeInfo for RosMessage {
Expand Down
6 changes: 4 additions & 2 deletions crates/rmw-zenoh-rs/src/rmw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,8 @@ pub extern "C" fn rmw_create_client(
let mut zclient_builder = node_impl
.inner
.create_client::<crate::msg::RosService>(&qualified_service)
.with_type_info(service_type_support.get_type_info());
.with_type_info(service_type_support.get_type_info())
.with_serdes::<crate::msg::RosSerdes>();
zclient_builder.entity.qos = qos.to_protocol_qos();
let entity = zclient_builder.entity.clone();

Expand Down Expand Up @@ -1052,7 +1053,8 @@ pub extern "C" fn rmw_create_service(
let mut zserver_builder = node_impl
.inner
.create_service::<crate::msg::RosService>(&qualified_service)
.with_type_info(service_type_support.get_type_info());
.with_type_info(service_type_support.get_type_info())
.with_serdes::<crate::msg::RosSerdes>();
zserver_builder.entity.qos = qos.to_protocol_qos();
let entity = zserver_builder.entity.clone();

Expand Down
7 changes: 4 additions & 3 deletions crates/rmw-zenoh-rs/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use zenoh::Result;

/// Client implementation for RMW
pub struct ClientImpl {
pub inner: ros_z::service::ZClient<crate::msg::RosService>,
pub inner: ros_z::service::ZClient<crate::msg::RosService, crate::msg::RosSerdes>,
pub service_name: CString,
pub options: rmw_client_options_t,
pub request_ts: crate::type_support::ServiceTypeSupport,
Expand Down Expand Up @@ -171,7 +171,8 @@ impl ClientImpl {

/// Service implementation for RMW
pub struct ServiceImpl {
pub inner: ros_z::service::ZServer<crate::msg::RosService>,
pub inner:
ros_z::service::ZServer<crate::msg::RosService, zenoh::query::Query, crate::msg::RosSerdes>,
pub service_name: CString,
pub request_ts: crate::type_support::ServiceTypeSupport,
pub response_ts: crate::type_support::ServiceTypeSupport,
Expand Down Expand Up @@ -317,7 +318,7 @@ impl ServiceImpl {
let resp = crate::msg::RosMessage::new(response, self.response_ts.response);

// Send response
match self.inner.send_response(&resp, &key) {
match self.inner.rmw_send_response(&resp, &key) {
Ok(_) => {
tracing::debug!("[ServiceImpl::send_response] Response sent successfully");
Ok(())
Expand Down
6 changes: 0 additions & 6 deletions crates/ros-z-codegen/src/protobuf_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,6 @@ impl ProtobufMessageGenerator {
combined_output.push_str("#[allow(unused_imports)]\n");
combined_output.push_str("use ros_z::ros_msg::WithTypeInfo;\n");
combined_output.push_str("#[allow(unused_imports)]\n");
combined_output.push_str("use ros_z::msg::ZMessage;\n");
combined_output.push_str("#[allow(unused_imports)]\n");
combined_output.push_str("use ros_z::msg::ProtobufSerdes;\n\n");

// Compile all proto files at once to avoid duplicates
Expand Down Expand Up @@ -311,10 +309,6 @@ impl ProtobufMessageGenerator {

impl ::ros_z::WithTypeInfo for {proto_type} {{}}

impl ::ros_z::msg::ZMessage for {proto_type} {{
type Serdes = ::ros_z::msg::ProtobufSerdes<{proto_type}>;
}}

"#
));
}
Expand Down
8 changes: 4 additions & 4 deletions crates/ros-z-codegen/src/python_msgspec_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ fn generate_serialize_to_zbuf(
match_arms.push(quote! {
#full_name => {
let rust_msg = <ros::#package_ident::#name_ident>::from_py(msg)?;
Ok(rust_msg.serialize_to_zbuf())
Ok(::ros_z::msg::CdrSerdes::serialize(&rust_msg))
}
});
}
Expand All @@ -660,7 +660,7 @@ fn generate_serialize_to_zbuf(
match_arms.push(quote! {
#req_full_name => {
let rust_msg = <ros::#package_ident::#req_name_ident>::from_py(msg)?;
Ok(rust_msg.serialize_to_zbuf())
Ok(::ros_z::msg::CdrSerdes::serialize(&rust_msg))
}
});

Expand All @@ -670,7 +670,7 @@ fn generate_serialize_to_zbuf(
match_arms.push(quote! {
#resp_full_name => {
let rust_msg = <ros::#package_ident::#resp_name_ident>::from_py(msg)?;
Ok(rust_msg.serialize_to_zbuf())
Ok(::ros_z::msg::CdrSerdes::serialize(&rust_msg))
}
});
}
Expand All @@ -679,7 +679,7 @@ fn generate_serialize_to_zbuf(
/// Direct ZBuf serialization - bypasses Python registry for zero-copy performance
pub fn serialize_to_zbuf(type_name: &str, msg: &Bound<'_, PyAny>) -> PyResult<::zenoh_buffers::ZBuf> {
use ::ros_z::python_bridge::FromPyMessage;
use ::ros_z::msg::ZMessage;
use ::ros_z::msg::ZSerdes as _;
match type_name {
#(#match_arms)*
_ => Err(pyo3::exceptions::PyValueError::new_err(
Expand Down
4 changes: 2 additions & 2 deletions crates/ros-z-console/src/core/dynamic_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{sync::Arc, time::Duration};

use flume::Receiver;
use ros_z::{
dynamic::{DynamicMessage, DynamicSerdeCdrSerdes, MessageSchema},
dynamic::{DynamicCdrCompatSerdes, DynamicMessage, MessageSchema},
node::ZNode,
pubsub::ZSub,
};
Expand All @@ -25,7 +25,7 @@ pub struct DynamicTopicSubscriber {
/// Channel for receiving messages asynchronously
message_rx: Receiver<DynamicMessage>,
/// Subscriber handle (kept alive to maintain subscription)
_subscriber: Arc<ZSub<DynamicMessage, Sample, DynamicSerdeCdrSerdes>>,
_subscriber: Arc<ZSub<DynamicMessage, Sample, DynamicCdrCompatSerdes>>,
}

impl DynamicTopicSubscriber {
Expand Down
13 changes: 7 additions & 6 deletions crates/ros-z-msgs/tests/shm_size_estimation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::sync::Arc;

use ros_z::{
ZBuf,
msg::{ZMessage, ZSerializer},
msg::{CdrSerdes, ZSerdes},
shm::ShmProviderBuilder,
};
use zenoh_buffers::buffer::Buffer;
Expand Down Expand Up @@ -82,7 +82,8 @@ fn test_pointcloud2_shm_serialization_with_accurate_estimate() {
);

// Serialize to SHM - should not panic!
let result = <PointCloud2 as ZMessage>::Serdes::serialize_to_shm(&cloud, estimated, &provider);
let result =
<CdrSerdes as ZSerdes<PointCloud2>>::serialize_to_shm(&cloud, estimated, &provider);

assert!(result.is_ok(), "SHM serialization should succeed");

Expand Down Expand Up @@ -148,7 +149,7 @@ fn test_image_shm_serialization_with_accurate_estimate() {
.expect("Failed to create SHM provider"),
);

let result = <Image as ZMessage>::Serdes::serialize_to_shm(&image, estimated, &provider);
let result = <CdrSerdes as ZSerdes<Image>>::serialize_to_shm(&image, estimated, &provider);
assert!(result.is_ok(), "SHM serialization should succeed");

let (zbuf, actual_size) = result.unwrap();
Expand Down Expand Up @@ -205,7 +206,7 @@ fn test_laserscan_shm_serialization_with_accurate_estimate() {
.expect("Failed to create SHM provider"),
);

let result = <LaserScan as ZMessage>::Serdes::serialize_to_shm(&scan, estimated, &provider);
let result = <CdrSerdes as ZSerdes<LaserScan>>::serialize_to_shm(&scan, estimated, &provider);
assert!(result.is_ok(), "SHM serialization should succeed");

let (zbuf, actual_size) = result.unwrap();
Expand Down Expand Up @@ -256,7 +257,7 @@ fn test_compressed_image_shm_serialization() {
);

let result =
<CompressedImage as ZMessage>::Serdes::serialize_to_shm(&img, estimated, &provider);
<CdrSerdes as ZSerdes<CompressedImage>>::serialize_to_shm(&img, estimated, &provider);
assert!(result.is_ok(), "SHM serialization should succeed");

let (zbuf, actual_size) = result.unwrap();
Expand Down Expand Up @@ -298,7 +299,7 @@ fn test_multiple_messages_share_shm_pool() {
};

let estimated = image.estimated_serialized_size();
let result = <Image as ZMessage>::Serdes::serialize_to_shm(&image, estimated, &provider);
let result = <CdrSerdes as ZSerdes<Image>>::serialize_to_shm(&image, estimated, &provider);

assert!(
result.is_ok(),
Expand Down
29 changes: 15 additions & 14 deletions crates/ros-z-msgs/tests/size_estimation_performance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@

use std::time::Instant;

use ros_z::{ZBuf, msg::ZMessage};
use ros_z::{
ZBuf,
msg::{CdrSerdes, ZSerdes},
};
use ros_z_msgs::{builtin_interfaces::Time, sensor_msgs::*, std_msgs::Header};
use zenoh_buffers::buffer::Buffer;

Expand All @@ -30,13 +33,13 @@ fn test_pointcloud2_serialization_performance() {

// Warm up
for _ in 0..5 {
let _ = cloud.serialize_to_zbuf();
let _ = CdrSerdes::serialize(&cloud);
}

// Benchmark with accurate size estimation (current implementation)
let start = Instant::now();
for _ in 0..100 {
let zbuf = cloud.serialize_to_zbuf();
let zbuf = CdrSerdes::serialize(&cloud);
assert!(zbuf.len() > 1_000_000);
}
let with_estimation = start.elapsed();
Expand Down Expand Up @@ -68,13 +71,13 @@ fn test_image_serialization_performance() {

// Warm up
for _ in 0..5 {
let _ = image.serialize_to_zbuf();
let _ = CdrSerdes::serialize(&image);
}

// Benchmark
let start = Instant::now();
for _ in 0..100 {
let zbuf = image.serialize_to_zbuf();
let zbuf = CdrSerdes::serialize(&image);
assert!(zbuf.len() > 920_000);
}
let elapsed = start.elapsed();
Expand Down Expand Up @@ -105,7 +108,7 @@ fn test_estimated_size_matches_actual() {
};

let estimated = cloud.estimated_serialized_size();
let zbuf = cloud.serialize_to_zbuf();
let zbuf = CdrSerdes::serialize(&cloud);
let actual = zbuf.len();

println!("PointCloud2: estimated={}, actual={}", estimated, actual);
Expand All @@ -130,7 +133,7 @@ fn test_estimated_size_matches_actual() {
};

let estimated = image.estimated_serialized_size();
let zbuf = image.serialize_to_zbuf();
let zbuf = CdrSerdes::serialize(&image);
let actual = zbuf.len();

println!("Image: estimated={}, actual={}", estimated, actual);
Expand Down Expand Up @@ -158,7 +161,7 @@ fn test_estimated_size_matches_actual() {
};

let estimated = scan.estimated_serialized_size();
let zbuf = scan.serialize_to_zbuf();
let zbuf = CdrSerdes::serialize(&scan);
let actual = zbuf.len();

println!("LaserScan: estimated={}, actual={}", estimated, actual);
Expand All @@ -171,8 +174,6 @@ fn test_estimated_size_matches_actual() {

#[test]
fn test_capacity_hint_api() {
use ros_z::msg::{SerdeCdrSerdes, ZSerializer};

let cloud = PointCloud2 {
header: Header {
stamp: Time { sec: 0, nanosec: 0 },
Expand All @@ -188,14 +189,14 @@ fn test_capacity_hint_api() {
is_dense: true,
};

// Test the low-level API with explicit hint
// Test serialize_with_hint (capacity hint API)
let hint = cloud.estimated_serialized_size();
let zbuf = SerdeCdrSerdes::<PointCloud2>::serialize_to_zbuf_with_hint(&cloud, hint);
let zbuf = <CdrSerdes as ZSerdes<PointCloud2>>::serialize_with_hint(&cloud, hint);

assert!(zbuf.len() > 50_000);
println!("Serialized with explicit hint: {} bytes", zbuf.len());

// Test that ZMessage::serialize_to_zbuf uses the hint automatically
let zbuf2 = cloud.serialize_to_zbuf();
// Both serialize paths should produce the same length
let zbuf2 = CdrSerdes::serialize(&cloud);
assert_eq!(zbuf.len(), zbuf2.len());
}
Loading
Loading