From 2e7f492f561370245862821d0c326da4a1fc3608 Mon Sep 17 00:00:00 2001 From: carter Date: Wed, 3 Jun 2026 10:49:32 -0600 Subject: [PATCH 1/2] Initial Cut at Graph APIs --- roslibrust_common/src/traits.rs | 23 ++ roslibrust_mock/src/lib.rs | 138 ++++++- roslibrust_ros1/src/master_client.rs | 8 + roslibrust_ros1/src/node/handle.rs | 53 ++- roslibrust_ros1/src/tcpros.rs | 43 +++ roslibrust_ros2/src/lib.rs | 26 ++ roslibrust_rosbridge/src/lib.rs | 56 +++ roslibrust_rosbridge/src/rosapi_discovery.rs | 71 ++++ roslibrust_zenoh/src/lib.rs | 377 ++++++++++++++++--- 9 files changed, 726 insertions(+), 69 deletions(-) create mode 100644 roslibrust_rosbridge/src/rosapi_discovery.rs diff --git a/roslibrust_common/src/traits.rs b/roslibrust_common/src/traits.rs index 1a484d1..1f84a2d 100644 --- a/roslibrust_common/src/traits.rs +++ b/roslibrust_common/src/traits.rs @@ -2,6 +2,20 @@ use crate::topic_name::*; use crate::{Result, ServiceError}; use std::future::Future; +/// Information about a topic currently visible in the ROS graph. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct TopicInfo { + pub name: String, + pub type_name: String, +} + +/// Information about a service currently visible in the ROS graph. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct ServiceInfo { + pub name: String, + pub type_name: String, +} + /// Fundamental traits for message types this crate works with /// This trait will be satisfied for any types generated with this crate's message_gen functionality pub trait RosMessageType: @@ -194,6 +208,15 @@ pub trait ServiceProvider { ) -> impl Future> + Send; } +/// Describes the ability to inspect the visible ROS graph. +pub trait GraphProvider { + /// List topics currently visible to this backend. + fn list_topics(&self) -> impl Future>> + Send; + + /// List services currently visible to this backend. + fn list_services(&self) -> impl Future>> + Send; +} + // ANCHOR: ros_trait /// Represents all "standard" ROS functionality generically supported by roslibrust /// diff --git a/roslibrust_mock/src/lib.rs b/roslibrust_mock/src/lib.rs index 4c65a11..3aa679c 100644 --- a/roslibrust_mock/src/lib.rs +++ b/roslibrust_mock/src/lib.rs @@ -44,8 +44,19 @@ type TypeErasedCallback = Arc< + 'static, >; +struct TopicEntry { + sender: Channel::Sender>, + receiver: Channel::Receiver>, + type_name: String, +} + +struct ServiceEntry { + callback: TypeErasedCallback, + type_name: String, +} + // Internal type for storing services -type ServiceStore = RwLock>; +type ServiceStore = RwLock>; /// A mock ROS implementation that can be substituted for any roslibrust backend in unit tests. /// @@ -55,7 +66,7 @@ type ServiceStore = RwLock>; pub struct MockRos { // We could probably achieve some fancier type erasure than actually serializing the data // but this ends up being pretty simple - topics: Arc>, Channel::Receiver>)>>>, + topics: Arc>>, services: Arc, } @@ -89,10 +100,17 @@ impl TopicProvider for MockRos { // Check if we already have this channel { let topics = self.topics.read().await; - if let Some((sender, _)) = topics.get(topic_str) { + if let Some(entry) = topics.get(topic_str) { + if entry.type_name != MsgType::ROS_TYPE_NAME { + return Err(Error::ServerError(format!( + "Topic {topic_str} already registered with type {}, cannot also use {}", + entry.type_name, + MsgType::ROS_TYPE_NAME + ))); + } debug!("Issued new publisher to existing topic {}", topic_str); return Ok(MockPublisher { - sender: sender.clone(), + sender: entry.sender.clone(), _marker: Default::default(), }); } @@ -101,7 +119,14 @@ impl TopicProvider for MockRos { let tx_rx = Channel::channel(10); let tx_copy = tx_rx.0.clone(); let mut topics = self.topics.write().await; - topics.insert(topic_str.to_string(), tx_rx); + topics.insert( + topic_str.to_string(), + TopicEntry { + sender: tx_rx.0, + receiver: tx_rx.1, + type_name: MsgType::ROS_TYPE_NAME.to_string(), + }, + ); debug!("Created new publisher and channel for topic {}", topic_str); Ok(MockPublisher { sender: tx_copy, @@ -118,10 +143,17 @@ impl TopicProvider for MockRos { // Check if we already have this channel { let topics = self.topics.read().await; - if let Some((_, receiver)) = topics.get(topic_str) { + if let Some(entry) = topics.get(topic_str) { + if entry.type_name != MsgType::ROS_TYPE_NAME { + return Err(Error::ServerError(format!( + "Topic {topic_str} already registered with type {}, cannot also use {}", + entry.type_name, + MsgType::ROS_TYPE_NAME + ))); + } debug!("Issued new subscriber to existing topic {}", topic_str); return Ok(MockSubscriber { - receiver: receiver.resubscribe(), + receiver: entry.receiver.resubscribe(), _marker: Default::default(), }); } @@ -130,7 +162,14 @@ impl TopicProvider for MockRos { let tx_rx = Channel::channel(10); let rx_copy = tx_rx.1.resubscribe(); let mut topics = self.topics.write().await; - topics.insert(topic_str.to_string(), tx_rx); + topics.insert( + topic_str.to_string(), + TopicEntry { + sender: tx_rx.0, + receiver: tx_rx.1, + type_name: MsgType::ROS_TYPE_NAME.to_string(), + }, + ); debug!("Created new subscriber and channel for topic {}", topic_str); Ok(MockSubscriber { receiver: rx_copy, @@ -172,7 +211,9 @@ impl Service for MockServiceClient { // Check if a service exists for this topic let callback = { let services = services.read().await; - services.get(&self.topic).cloned() + services + .get(&self.topic) + .map(|entry| entry.callback.clone()) }; let callback = match callback { Some(callback) => callback, @@ -237,6 +278,19 @@ impl ServiceProvider for MockRos { server: F, ) -> Result { let service: GlobalTopicName = service.to_global_name()?; + let service_name = service.as_ref().to_string(); + { + let services = self.services.read().await; + if let Some(existing) = services.get(&service_name) { + if existing.type_name != SrvType::ROS_SERVICE_NAME { + return Err(Error::ServerError(format!( + "Service {service_name} already registered with type {}, cannot also use {}", + existing.type_name, + SrvType::ROS_SERVICE_NAME + ))); + } + } + } // Type erase the service function here let erased_closure = move |message: Vec| -> std::result::Result< Vec, @@ -251,7 +305,13 @@ impl ServiceProvider for MockRos { }; let erased_closure = Arc::new(erased_closure); let mut services = self.services.write().await; - services.insert(String::from(service), erased_closure); + services.insert( + service_name, + ServiceEntry { + callback: erased_closure, + type_name: SrvType::ROS_SERVICE_NAME.to_string(), + }, + ); // We technically need to hand back a token that shuts the service down here // But we haven't implemented that yet in this mock @@ -259,6 +319,30 @@ impl ServiceProvider for MockRos { } } +impl GraphProvider for MockRos { + async fn list_topics(&self) -> Result> { + let topics = self.topics.read().await; + Ok(topics + .iter() + .map(|(name, entry)| TopicInfo { + name: name.clone(), + type_name: entry.type_name.clone(), + }) + .collect()) + } + + async fn list_services(&self) -> Result> { + let services = self.services.read().await; + Ok(services + .iter() + .map(|(name, entry)| ServiceInfo { + name: name.clone(), + type_name: entry.type_name.clone(), + }) + .collect()) + } +} + /// The publisher type returned by calling [MockRos::advertise]. pub struct MockPublisher { sender: Channel::Sender>, @@ -353,6 +437,40 @@ mod tests { assert_eq!(response.message, "You set my bool!"); } + #[tokio::test(flavor = "multi_thread")] + async fn test_mock_graph_provider() { + let mock_ros = MockRos::new(); + + let _publisher = mock_ros + .advertise::("/test_topic") + .await + .unwrap(); + mock_ros + .advertise_service::("/test_service", |request| { + Ok(std_srvs::SetBoolResponse { + success: request.data, + message: String::new(), + }) + }) + .await + .unwrap(); + + assert_eq!( + mock_ros.list_topics().await.unwrap(), + vec![TopicInfo { + name: "/test_topic".to_string(), + type_name: "std_msgs/String".to_string(), + }] + ); + assert_eq!( + mock_ros.list_services().await.unwrap(), + vec![ServiceInfo { + name: "/test_service".to_string(), + type_name: "std_srvs/SetBool".to_string(), + }] + ); + } + #[tokio::test(flavor = "multi_thread")] async fn test_mock_node() { // Proves that MockRos impls the Ros trait (via auto impl in roslibrust_common) diff --git a/roslibrust_ros1/src/master_client.rs b/roslibrust_ros1/src/master_client.rs index a6dd185..91dc42a 100644 --- a/roslibrust_ros1/src/master_client.rs +++ b/roslibrust_ros1/src/master_client.rs @@ -88,6 +88,14 @@ impl SystemState { }; entry.nodes.iter().any(|name| name.as_str().eq(node)) } + + /// Returns all service names currently registered with the master. + pub fn service_names(&self) -> Vec { + self.service_providers + .iter() + .map(|entry| entry.topic.clone()) + .collect() + } } impl MasterClient { diff --git a/roslibrust_ros1/src/node/handle.rs b/roslibrust_ros1/src/node/handle.rs index b3ed8b8..161c515 100644 --- a/roslibrust_ros1/src/node/handle.rs +++ b/roslibrust_ros1/src/node/handle.rs @@ -3,7 +3,7 @@ use crate::{ names::Name, publisher::Publisher, publisher::PublisherAny, service_client::ServiceClient, subscriber::Subscriber, subscriber::SubscriberAny, NodeError, ServiceServer, }; -use roslibrust_common::ServiceFn; +use roslibrust_common::{GraphProvider, ServiceFn, ServiceInfo, TopicInfo}; /// Represents a handle to an underlying Node. NodeHandle's can be freely cloned, moved, copied, etc. /// This class provides the user facing API for interacting with ROS. @@ -174,3 +174,54 @@ impl NodeHandle { Ok(ServiceServer::new(service_name, weak_node)) } } + +impl GraphProvider for NodeHandle { + async fn list_topics(&self) -> roslibrust_common::Result> { + let client = { + let node = self.inner.node.lock().await; + node.client.clone() + }; + + let mut topics: Vec<_> = client + .get_topic_types() + .await + .map_err(NodeError::from)? + .into_iter() + .map(|(name, type_name)| TopicInfo { name, type_name }) + .collect(); + topics.sort_by(|a, b| a.name.cmp(&b.name)); + Ok(topics) + } + + async fn list_services(&self) -> roslibrust_common::Result> { + let (client, caller_id) = { + let node = self.inner.node.lock().await; + (node.client.clone(), node.node_name.to_string()) + }; + + let service_names = client + .get_system_state() + .await + .map_err(NodeError::from)? + .service_names(); + + let mut services = Vec::with_capacity(service_names.len()); + for service_name in service_names { + let service_uri = client + .lookup_service(&service_name) + .await + .map_err(NodeError::from)?; + let type_name = + crate::tcpros::probe_service_type(&caller_id, &service_name, &service_uri) + .await + .map_err(NodeError::from)?; + services.push(ServiceInfo { + name: service_name, + type_name, + }); + } + + services.sort_by(|a, b| a.name.cmp(&b.name)); + Ok(services) + } +} diff --git a/roslibrust_ros1/src/tcpros.rs b/roslibrust_ros1/src/tcpros.rs index 2d67692..68bd277 100644 --- a/roslibrust_ros1/src/tcpros.rs +++ b/roslibrust_ros1/src/tcpros.rs @@ -245,6 +245,49 @@ pub async fn establish_connection( .map_err(std::io::Error::from) } +/// Connects to a ROS1 service server only long enough to discover its connection header. +pub async fn probe_service_type( + caller_id: &str, + service_name: &str, + service_uri: &str, +) -> Result { + use tokio::io::AsyncWriteExt; + + let server_uri = service_uri.replace("rosrpc://", ""); + let mut stream = TcpStream::connect(&server_uri).await.map_err(|err| { + log::error!( + "Failed to establish TCPROS probe connection to service {service_name} at {server_uri}: {err}" + ); + err + })?; + + let mut header_data = Vec::with_capacity(256); + WriteBytesExt::write_u32::(&mut header_data, 0)?; + for field in [ + format!("callerid={caller_id}"), + "md5sum=*".to_string(), + format!("service={service_name}"), + "probe=1".to_string(), + ] { + WriteBytesExt::write_u32::(&mut header_data, field.len() as u32)?; + std::io::Write::write_all(&mut header_data, field.as_bytes())?; + } + + let total_length = (header_data.len() - 4) as u32; + header_data[..4].copy_from_slice(&total_length.to_le_bytes()); + stream.write_all(&header_data).await?; + + let responded_header = receive_header(&mut stream).await?; + if responded_header.topic_type.is_empty() { + Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("Service {service_name} probe response did not include a type field"), + )) + } else { + Ok(responded_header.topic_type) + } +} + pub async fn receive_header_bytes(stream: &mut TcpStream) -> Result, std::io::Error> { // Bring trait def into scope use tokio::io::AsyncReadExt; diff --git a/roslibrust_ros2/src/lib.rs b/roslibrust_ros2/src/lib.rs index 092117b..85756bb 100644 --- a/roslibrust_ros2/src/lib.rs +++ b/roslibrust_ros2/src/lib.rs @@ -123,6 +123,32 @@ impl roslibrust_common::TopicProvider for ZenohClient { } } +impl roslibrust_common::GraphProvider for ZenohClient { + async fn list_topics(&self) -> Result> { + let mut topics: Vec<_> = self + .node + .graph() + .get_topic_names_and_types() + .into_iter() + .map(|(name, type_name)| TopicInfo { name, type_name }) + .collect(); + topics.sort_by(|a, b| a.name.cmp(&b.name)); + Ok(topics) + } + + async fn list_services(&self) -> Result> { + let mut services: Vec<_> = self + .node + .graph() + .get_service_names_and_types() + .into_iter() + .map(|(name, type_name)| ServiceInfo { name, type_name }) + .collect(); + services.sort_by(|a, b| a.name.cmp(&b.name)); + Ok(services) + } +} + pub struct ZenohServiceServer { cancellation_token: tokio_util::sync::CancellationToken, } diff --git a/roslibrust_rosbridge/src/lib.rs b/roslibrust_rosbridge/src/lib.rs index 3336c6c..72fc856 100644 --- a/roslibrust_rosbridge/src/lib.rs +++ b/roslibrust_rosbridge/src/lib.rs @@ -60,6 +60,7 @@ type TestResult = std::result::Result<(), anyhow::Error>; /// Communication primitives for the rosbridge_suite protocol mod comm; +mod rosapi_discovery; use futures_util::stream::{SplitSink, SplitStream}; use std::collections::HashMap; @@ -194,6 +195,61 @@ impl ServiceProvider for crate::ClientHandle { } } +impl GraphProvider for crate::ClientHandle { + async fn list_topics(&self) -> Result> { + let response = self + .call_service::( + "/rosapi/topics", + rosapi_discovery::EmptyRequest {}, + ) + .await?; + + if response.topics.len() != response.types.len() { + return Err(Error::SerializationError(format!( + "rosapi returned {} topic names but {} topic types", + response.topics.len(), + response.types.len() + ))); + } + + let mut topics: Vec<_> = response + .topics + .into_iter() + .zip(response.types) + .map(|(name, type_name)| TopicInfo { name, type_name }) + .collect(); + topics.sort_by(|a, b| a.name.cmp(&b.name)); + Ok(topics) + } + + async fn list_services(&self) -> Result> { + let response = self + .call_service::( + "/rosapi/services", + rosapi_discovery::EmptyRequest {}, + ) + .await?; + + let mut services = Vec::with_capacity(response.services.len()); + for service in response.services { + let service_type = self + .call_service::( + "/rosapi/service_type", + rosapi_discovery::ServiceTypeRequest { + service: service.clone(), + }, + ) + .await?; + services.push(ServiceInfo { + name: service, + type_name: service_type.type_name, + }); + } + services.sort_by(|a, b| a.name.cmp(&b.name)); + Ok(services) + } +} + // Implementation of TopicProvider trait for rosbridge client impl TopicProvider for crate::ClientHandle { type Publisher = crate::Publisher; diff --git a/roslibrust_rosbridge/src/rosapi_discovery.rs b/roslibrust_rosbridge/src/rosapi_discovery.rs new file mode 100644 index 0000000..6914319 --- /dev/null +++ b/roslibrust_rosbridge/src/rosapi_discovery.rs @@ -0,0 +1,71 @@ +use roslibrust_common::{RosMessageType, RosServiceType}; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +pub struct EmptyRequest {} + +impl RosMessageType for EmptyRequest { + const ROS_TYPE_NAME: &'static str = ""; +} + +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +pub struct TopicsResponse { + pub topics: Vec, + pub types: Vec, +} + +impl RosMessageType for TopicsResponse { + const ROS_TYPE_NAME: &'static str = ""; +} + +pub struct Topics; + +impl RosServiceType for Topics { + const ROS_SERVICE_NAME: &'static str = "rosapi/Topics"; + type Request = EmptyRequest; + type Response = TopicsResponse; +} + +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +pub struct ServicesResponse { + pub services: Vec, +} + +impl RosMessageType for ServicesResponse { + const ROS_TYPE_NAME: &'static str = ""; +} + +pub struct Services; + +impl RosServiceType for Services { + const ROS_SERVICE_NAME: &'static str = "rosapi/Services"; + type Request = EmptyRequest; + type Response = ServicesResponse; +} + +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +pub struct ServiceTypeRequest { + pub service: String, +} + +impl RosMessageType for ServiceTypeRequest { + const ROS_TYPE_NAME: &'static str = ""; +} + +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +pub struct ServiceTypeResponse { + #[serde(rename = "type")] + pub type_name: String, +} + +impl RosMessageType for ServiceTypeResponse { + const ROS_TYPE_NAME: &'static str = ""; +} + +pub struct ServiceType; + +impl RosServiceType for ServiceType { + const ROS_SERVICE_NAME: &'static str = "rosapi/ServiceType"; + type Request = ServiceTypeRequest; + type Response = ServiceTypeResponse; +} diff --git a/roslibrust_zenoh/src/lib.rs b/roslibrust_zenoh/src/lib.rs index b0c06da..e5976a3 100644 --- a/roslibrust_zenoh/src/lib.rs +++ b/roslibrust_zenoh/src/lib.rs @@ -5,27 +5,186 @@ use roslibrust_common::topic_name::{GlobalTopicName, ToGlobalTopicName}; use roslibrust_common::*; use log::*; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::{ + collections::{BTreeMap, HashMap}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::{Duration, Instant}, +}; +use tokio::sync::RwLock; use zenoh::bytes::ZBytes; +const DISCOVERY_NAMESPACE: &str = "*"; +const BRIDGE_NAMESPACE: &str = "*"; +const DISCOVERY_KEYEXPR: &str = "ros1_discovery_info/*/*/*/*/*/**"; +const DISCOVERY_BEACON_PERIOD: Duration = Duration::from_secs(1); +const DISCOVERY_LOST_AFTER: Duration = Duration::from_secs(3); + /// A wrapper around a normal zenoh session that adds roslibrust specific functionality. /// Should be created via [ZenohClient::new], and then used via the [TopicProvider] and [ServiceProvider] traits. #[derive(Clone)] pub struct ZenohClient { session: zenoh::Session, + graph: Arc>>, } impl ZenohClient { /// Creates a new client wrapped around a Zenoh session pub fn new(session: zenoh::Session) -> Self { - Self { session } + let graph = Arc::new(RwLock::new(BTreeMap::new())); + spawn_discovery_monitor(session.clone(), graph.clone()); + Self { session, graph } + } +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +enum DiscoveryClass { + Publisher, + Subscriber, + Service, + Client, +} + +impl DiscoveryClass { + fn as_str(self) -> &'static str { + match self { + Self::Publisher => "pub", + Self::Subscriber => "sub", + Self::Service => "srv", + Self::Client => "cl", + } + } + + fn parse(value: &str) -> Option { + match value { + "pub" => Some(Self::Publisher), + "sub" => Some(Self::Subscriber), + "srv" => Some(Self::Service), + "cl" => Some(Self::Client), + _ => None, + } } } +#[derive(Clone, Debug, Eq, PartialEq)] +struct DiscoveryFact { + class: DiscoveryClass, + name: String, + type_name: String, + md5sum: String, +} + +struct DiscoveryDeclaration { + shutdown: Option>, +} + +impl Drop for DiscoveryDeclaration { + fn drop(&mut self) { + if let Some(shutdown) = self.shutdown.take() { + let _ = shutdown.send(()); + } + } +} + +impl DiscoveryDeclaration { + async fn new( + session: zenoh::Session, + class: DiscoveryClass, + name: &str, + type_name: &str, + md5sum: &str, + ) -> Result { + let key = make_discovery_key(class, name, type_name, md5sum); + let publisher = session.declare_publisher(key).await.map_err(|e| { + Error::Unexpected(anyhow::anyhow!( + "Failed to declare discovery publisher: {e:?}" + )) + })?; + let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(DISCOVERY_BEACON_PERIOD); + loop { + if let Err(e) = publisher.put(ZBytes::default()).await { + error!("Failed to publish discovery info: {e:?}"); + } + tokio::select! { + _ = interval.tick() => {} + _ = &mut shutdown_rx => break, + } + } + }); + Ok(Self { + shutdown: Some(shutdown_tx), + }) + } +} + +fn spawn_discovery_monitor( + session: zenoh::Session, + graph: Arc>>, +) { + let Ok(handle) = tokio::runtime::Handle::try_current() else { + error!("Failed to start Zenoh discovery monitor: no active Tokio runtime"); + return; + }; + + handle.spawn(async move { + let subscriber = match session.declare_subscriber(DISCOVERY_KEYEXPR).await { + Ok(subscriber) => subscriber, + Err(e) => { + error!("Failed to subscribe to {DISCOVERY_KEYEXPR}: {e:?}"); + return; + } + }; + let mut active_facts: HashMap = HashMap::new(); + let mut interval = tokio::time::interval(DISCOVERY_BEACON_PERIOD); + loop { + tokio::select! { + sample = subscriber.recv_async() => { + let sample = match sample { + Ok(sample) => sample, + Err(e) => { + error!("Failed to receive discovery info: {e:?}"); + continue; + } + }; + let key = sample.key_expr().as_str().to_string(); + let Some(fact) = parse_discovery_key(&key) else { + continue; + }; + active_facts.insert(key.clone(), Instant::now()); + graph.write().await.insert(key, fact); + } + _ = interval.tick() => { + let now = Instant::now(); + let lost: Vec<_> = active_facts + .iter() + .filter_map(|(key, last_seen)| { + now.duration_since(*last_seen) + .gt(&DISCOVERY_LOST_AFTER) + .then(|| key.clone()) + }) + .collect(); + if !lost.is_empty() { + let mut graph = graph.write().await; + for key in lost { + active_facts.remove(&key); + graph.remove(&key); + } + } + } + } + } + }); +} + /// The publisher type returned by [TopicProvider::advertise] on [ZenohClient] /// This type is self de-registering, and dropping the publisher will automatically un-advertise the topic. pub struct ZenohPublisher { publisher: zenoh::pubsub::Publisher<'static>, + _discovery: DiscoveryDeclaration, // Used to track buffer capacity size to minimize allocations for fixed-size streams. capacity_hint: AtomicUsize, _marker: std::marker::PhantomData, @@ -84,6 +243,7 @@ type ZenohSubInner = /// It is typically used with types generated by roslibrust's codegen. pub struct ZenohSubscriber { subscriber: ZenohSubInner, + _discovery: DiscoveryDeclaration, _marker: std::marker::PhantomData, } @@ -133,9 +293,18 @@ impl TopicProvider for ZenohClient { ))); } }; + let discovery = DiscoveryDeclaration::new( + self.session.clone(), + DiscoveryClass::Publisher, + topic.as_ref(), + MsgType::ROS_TYPE_NAME, + MsgType::MD5SUM, + ) + .await?; Ok(ZenohPublisher { publisher, + _discovery: discovery, capacity_hint: 1024.into(), _marker: std::marker::PhantomData, }) @@ -156,8 +325,17 @@ impl TopicProvider for ZenohClient { ))); } }; + let discovery = DiscoveryDeclaration::new( + self.session.clone(), + DiscoveryClass::Subscriber, + topic.as_ref(), + MsgType::ROS_TYPE_NAME, + MsgType::MD5SUM, + ) + .await?; Ok(ZenohSubscriber { subscriber: sub, + _discovery: discovery, _marker: std::marker::PhantomData, }) } @@ -174,15 +352,98 @@ fn mangle_topic(topic: &str, type_str: &str, md5sum: &str) -> String { let topic = topic.trim_start_matches('/').trim_end_matches("/"); // Encode the type as hex let type_str = hex::encode(type_str.as_bytes()); - format!("{type_str}/{md5sum}/{topic}") + format!("{type_str}/{md5sum}/{BRIDGE_NAMESPACE}/{topic}") +} + +impl GraphProvider for ZenohClient { + async fn list_topics(&self) -> Result> { + let graph = self.graph.read().await; + let mut topics = BTreeMap::new(); + for fact in graph.values() { + if matches!( + fact.class, + DiscoveryClass::Publisher | DiscoveryClass::Subscriber + ) { + insert_discovered_type( + &mut topics, + fact.name.clone(), + fact.type_name.clone(), + "topic", + )?; + } + } + Ok(topics + .into_iter() + .map(|(name, type_name)| TopicInfo { name, type_name }) + .collect()) + } + + async fn list_services(&self) -> Result> { + let graph = self.graph.read().await; + let mut services = BTreeMap::new(); + for fact in graph.values() { + if fact.class == DiscoveryClass::Service { + insert_discovered_type( + &mut services, + fact.name.clone(), + fact.type_name.clone(), + "service", + )?; + } + } + Ok(services + .into_iter() + .map(|(name, type_name)| ServiceInfo { name, type_name }) + .collect()) + } } -/// Identical to mangle_topic, but for services we want to separate the datatype stuff from the service name -fn mangle_service(service: &str, type_str: &str, md5sum: &str) -> (String, String) { - let service = service.trim_start_matches('/').trim_end_matches("/"); +fn make_discovery_key(class: DiscoveryClass, name: &str, type_name: &str, md5sum: &str) -> String { + let name = name.trim_start_matches('/').trim_end_matches('/'); + format!( + "ros1_discovery_info/{DISCOVERY_NAMESPACE}/{}/{}/{md5sum}/{BRIDGE_NAMESPACE}/{name}", + class.as_str(), + hex::encode(type_name.as_bytes()), + ) +} - let type_str = hex::encode(type_str.as_bytes()); - (format!("{type_str}/{md5sum}"), service.to_string()) +fn parse_discovery_key(key: &str) -> Option { + let parts: Vec<_> = key.split('/').collect(); + if parts.len() < 7 || parts[0] != "ros1_discovery_info" { + return None; + } + let class = DiscoveryClass::parse(parts[2])?; + let type_name = decode_hex_type(parts[3])?; + let md5sum = parts[4].to_string(); + let name = format!("/{}", parts[6..].join("/")); + Some(DiscoveryFact { + class, + name, + type_name, + md5sum, + }) +} + +fn decode_hex_type(hex_type: &str) -> Option { + String::from_utf8(hex::decode(hex_type).ok()?).ok() +} + +fn insert_discovered_type( + types: &mut BTreeMap, + name: String, + type_name: String, + kind: &str, +) -> Result<()> { + if let Some(existing) = types.get(&name) { + if existing != &type_name { + return Err(Error::ServerError(format!( + "{kind} {name} discovered with conflicting types {existing} and {type_name}" + ))); + } + } else { + types.insert(name, type_name); + } + Ok(()) } /// The client type returned by [ServiceProvider::service_client] on [ZenohClient] @@ -190,6 +451,7 @@ fn mangle_service(service: &str, type_str: &str, md5sum: &str) -> (String, Strin pub struct ZenohServiceClient { session: zenoh::Session, zenoh_query: String, + _discovery: DiscoveryDeclaration, _marker: std::marker::PhantomData, } @@ -244,8 +506,7 @@ impl Service for ZenohServiceClient { pub struct ZenohServiceServer { // Dropping this will stop zenoh's declaration of the queryable _queryable: zenoh::query::Queryable<()>, - // Dropping this will stop the advertising of the service - _shutdown_channel: tokio::sync::oneshot::Sender<()>, + _discovery: DiscoveryDeclaration, } impl ServiceProvider for ZenohClient { @@ -270,10 +531,19 @@ impl ServiceProvider for ZenohClient { let service: GlobalTopicName = service.to_global_name()?; let mangled_topic = mangle_topic(service.as_ref(), SrvType::ROS_SERVICE_NAME, SrvType::MD5SUM); + let discovery = DiscoveryDeclaration::new( + self.session.clone(), + DiscoveryClass::Client, + service.as_ref(), + SrvType::ROS_SERVICE_NAME, + SrvType::MD5SUM, + ) + .await?; Ok(ZenohServiceClient { session: self.session.clone(), zenoh_query: mangled_topic, + _discovery: discovery, _marker: std::marker::PhantomData, }) } @@ -288,7 +558,6 @@ impl ServiceProvider for ZenohClient { mangle_topic(service.as_ref(), SrvType::ROS_SERVICE_NAME, SrvType::MD5SUM); let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel(); let x = self .session @@ -302,6 +571,14 @@ impl ServiceProvider for ZenohClient { .map_err(|e| { Error::Unexpected(anyhow::anyhow!("Failed to declare queryable: {e:?}")) })?; + let discovery = DiscoveryDeclaration::new( + self.session.clone(), + DiscoveryClass::Service, + service.as_ref(), + SrvType::ROS_SERVICE_NAME, + SrvType::MD5SUM, + ) + .await?; // Move the server into an Arc so we can ensure lifetime of it remains valid across spawn_blocking: let server = std::sync::Arc::new(server); @@ -352,54 +629,10 @@ impl ServiceProvider for ZenohClient { }); } }); - // zenoh-ros1-bridge won't serve our service without us publishing info on 'ros1_discovery_info' - // We don't have to worry about this for publishers, because zenoh will initiate the bridge whenever someone subscribes on the ros side. - // For service, zenoh-ros1-bridge has to create the service before anyone can call it so it has to know that it needs to do that. - // This is a bit of a brittle implementation as it relies on internal implementation details for zenoh-ros1-bridge, but it works for now. - // See: https://github.com/eclipse-zenoh/zenoh-plugin-ros1/blob/main/zenoh-plugin-ros1/src/ros_to_zenoh_bridge/discovery.rs - - // Note: I'm uncertain about "discovery_namespace" and just using * for now - // Note: I'm uncertain about "bridge_namespace" and just using * for now - let (type_mangle, svc_name) = - mangle_service(service.as_ref(), SrvType::ROS_SERVICE_NAME, SrvType::MD5SUM); - let zenoh_info_topic = format!("ros1_discovery_info/*/srv/{type_mangle}/*/{svc_name}"); - - let q2 = self - .session - .declare_publisher(zenoh_info_topic) - .await - .map_err(|e| { - Error::Unexpected(anyhow::anyhow!( - "Failed to declare queryable for service discovery: {e:?}" - )) - })?; - tokio::spawn(async move { - let mut interval = tokio::time::interval(std::time::Duration::from_secs(1)); - loop { - let shutdown = shutdown_rx.try_recv(); - match shutdown { - Ok(_) => { - break; - } - Err(tokio::sync::oneshot::error::TryRecvError::Empty) => { - // Continue no shutdown yet - } - Err(tokio::sync::oneshot::error::TryRecvError::Closed) => { - break; - } - } - // Send an empty message to the discovery topic - let res = q2.put(ZBytes::default()).await; - if let Err(e) = res { - error!("Failed to publish service discovery info: {e:?}"); - } - interval.tick().await; - } - }); Ok(ZenohServiceServer { _queryable: x, - _shutdown_channel: shutdown_tx, + _discovery: discovery, }) } } @@ -416,7 +649,7 @@ mod tests { "std_msgs/String", "992ce8a1687cec8c8bd883ec73ca41d1" ), - "7374645f6d7367732f537472696e67/992ce8a1687cec8c8bd883ec73ca41d1/chatter" + "7374645f6d7367732f537472696e67/992ce8a1687cec8c8bd883ec73ca41d1/*/chatter" ); } @@ -428,7 +661,35 @@ mod tests { "std_srvs/SetBool", "09fb03525b03e7ea1fd3992bafd87e16" ), - "7374645f737276732f536574426f6f6c/09fb03525b03e7ea1fd3992bafd87e16/service_server_rs/my_set_bool"); + "7374645f737276732f536574426f6f6c/09fb03525b03e7ea1fd3992bafd87e16/*/service_server_rs/my_set_bool"); + } + + #[test] + fn test_make_discovery_key() { + assert_eq!( + make_discovery_key( + DiscoveryClass::Service, + "/service_server_rs/my_set_bool", + "std_srvs/SetBool", + "09fb03525b03e7ea1fd3992bafd87e16" + ), + "ros1_discovery_info/*/srv/7374645f737276732f536574426f6f6c/09fb03525b03e7ea1fd3992bafd87e16/*/service_server_rs/my_set_bool" + ); + } + + #[test] + fn test_parse_discovery_key() { + assert_eq!( + parse_discovery_key( + "ros1_discovery_info/*/srv/7374645f737276732f536574426f6f6c/09fb03525b03e7ea1fd3992bafd87e16/*/service_server_rs/my_set_bool" + ), + Some(DiscoveryFact { + class: DiscoveryClass::Service, + name: "/service_server_rs/my_set_bool".to_string(), + type_name: "std_srvs/SetBool".to_string(), + md5sum: "09fb03525b03e7ea1fd3992bafd87e16".to_string(), + }) + ); } #[test] From bc2aab0f19b134e2a43ae701a7401ab619ae505f Mon Sep 17 00:00:00 2001 From: carter Date: Wed, 3 Jun 2026 14:38:55 -0600 Subject: [PATCH 2/2] Add integration tests for graph_provider --- Cargo.lock | 3 + roslibrust_test/Cargo.toml | 6 + roslibrust_test/tests/graph_provider_tests.rs | 233 ++++++++++++++++++ 3 files changed, 242 insertions(+) create mode 100644 roslibrust_test/tests/graph_provider_tests.rs diff --git a/Cargo.lock b/Cargo.lock index 700f2af..63decad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3659,7 +3659,10 @@ dependencies = [ "lazy_static", "log", "pprof", + "ros-z", "roslibrust", + "roslibrust_ros2", + "roslibrust_rosbridge", "roslibrust_zenoh", "test-log", "tokio", diff --git a/roslibrust_test/Cargo.toml b/roslibrust_test/Cargo.toml index 5a33836..46bad85 100644 --- a/roslibrust_test/Cargo.toml +++ b/roslibrust_test/Cargo.toml @@ -19,6 +19,9 @@ pprof = { version = "0.15", features = ["flamegraph", "criterion"] } test-log = { workspace = true } hex = "0.4" roslibrust_zenoh = { path = "../roslibrust_zenoh" } +roslibrust_ros2 = { path = "../roslibrust_ros2" } +roslibrust_rosbridge = { path = "../roslibrust_rosbridge" } +ros-z = { git = "https://github.com/ZettaScaleLabs/ros-z.git", rev = "9bb6305" } [[bin]] path = "src/performance_ramp.rs" @@ -30,3 +33,6 @@ harness = false [features] ros1_test = [] +ros2_zenoh_test = [] +rosbridge_ros1_test = [] +rosbridge_ros2_test = [] diff --git a/roslibrust_test/tests/graph_provider_tests.rs b/roslibrust_test/tests/graph_provider_tests.rs new file mode 100644 index 0000000..f928adc --- /dev/null +++ b/roslibrust_test/tests/graph_provider_tests.rs @@ -0,0 +1,233 @@ +//! Integration tests for GraphProvider trait across all backends +//! +//! These tests verify that list_topics() and list_services() work correctly +//! for each backend implementation. +//! +//! Run tests for specific backends: +//! - Mock: cargo test --test graph_provider_tests (no feature needed) +//! - ROS1: cargo test --test graph_provider_tests --features ros1_test +//! - ROS2 Zenoh: cargo test --test graph_provider_tests --features ros2_zenoh_test +//! - Rosbridge ROS1: cargo test --test graph_provider_tests --features rosbridge_ros1_test +//! - Rosbridge ROS2: cargo test --test graph_provider_tests --features rosbridge_ros2_test + +use roslibrust::traits::*; +use roslibrust_test::ros1::*; + +/// Generic test function that exercises GraphProvider topics for any backend +async fn test_graph_provider_topics( + ros: T, +) -> roslibrust::Result<()> { + // Initially, we might have topics from other tests or system, so just verify the call works + let initial_topics = ros.list_topics().await?; + + // Advertise a topic + let _publisher = ros + .advertise::("/test_graph_topic") + .await?; + + // Give the graph time to update (backends may need time to propagate) + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + + // List topics again + let topics = ros.list_topics().await?; + + // Verify our topic appears in the list + let found = topics.iter().any(|t| t.name == "/test_graph_topic"); + assert!( + found, + "Expected to find /test_graph_topic in list, got: {:?}", + topics + ); + + // Verify the topic has the correct type + let topic_info = topics + .iter() + .find(|t| t.name == "/test_graph_topic") + .expect("Topic should exist"); + assert_eq!( + topic_info.type_name, "std_msgs/String", + "Topic type should be std_msgs/String" + ); + + // Verify we have more topics now than initially (or at least the same if our topic already existed) + assert!( + topics.len() >= initial_topics.len(), + "Should have at least as many topics after advertising" + ); + + Ok(()) +} + +/// Generic test function that exercises GraphProvider services for any backend +async fn test_graph_provider_services( + ros: T, +) -> roslibrust::Result<()> { + // Initially, we might have services from other tests or system + let initial_services = ros.list_services().await?; + + // Advertise a service + let _service = ros + .advertise_service::("/test_graph_service", |req| { + Ok(std_srvs::SetBoolResponse { + success: req.data, + message: "test".to_string(), + }) + }) + .await?; + + // Give the graph time to update + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + + // List services + let services = ros.list_services().await?; + + // Verify our service appears in the list + let found = services.iter().any(|s| s.name == "/test_graph_service"); + assert!( + found, + "Expected to find /test_graph_service in list, got: {:?}", + services + ); + + // Verify the service has the correct type + let service_info = services + .iter() + .find(|s| s.name == "/test_graph_service") + .expect("Service should exist"); + assert_eq!( + service_info.type_name, "std_srvs/SetBool", + "Service type should be std_srvs/SetBool" + ); + + // Verify we have more services now than initially + assert!( + services.len() >= initial_services.len(), + "Should have at least as many services after advertising" + ); + + Ok(()) +} + +// ============================================================================ +// Mock Backend Tests +// ============================================================================ + +#[tokio::test(flavor = "multi_thread")] +async fn test_mock_graph_provider_topics() { + let ros = roslibrust::mock::MockRos::new(); + test_graph_provider_topics(ros).await.unwrap(); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_mock_graph_provider_services() { + let ros = roslibrust::mock::MockRos::new(); + test_graph_provider_services(ros).await.unwrap(); +} + +// ============================================================================ +// ROS1 Backend Tests +// ============================================================================ + +#[cfg(feature = "ros1_test")] +mod ros1_tests { + use super::*; + use roslibrust::ros1::NodeHandle; + + #[test_log::test(tokio::test)] + async fn test_ros1_graph_provider_topics() { + let ros = NodeHandle::new("http://localhost:11311", "/test_graph_provider_topics") + .await + .unwrap(); + test_graph_provider_topics(ros).await.unwrap(); + } + + #[test_log::test(tokio::test)] + async fn test_ros1_graph_provider_services() { + let ros = NodeHandle::new("http://localhost:11311", "/test_graph_provider_services") + .await + .unwrap(); + test_graph_provider_services(ros).await.unwrap(); + } +} + +// ============================================================================ +// ROS2 Zenoh Backend Tests +// ============================================================================ + +#[cfg(feature = "ros2_zenoh_test")] +mod ros2_zenoh_tests { + use super::*; + use ros_z::context::ZContextBuilder; + use ros_z::Builder; + use roslibrust_ros2::ZenohClient; + + fn make_test_context() -> ros_z::context::ZContext { + ZContextBuilder::default() + .with_domain_id(0) + .with_connect_endpoints(["tcp/[::]:7447"]) + .build() + .unwrap() + } + + #[test_log::test(tokio::test)] + async fn test_ros2_zenoh_graph_provider_topics() { + let ctx = make_test_context(); + let ros = ZenohClient::new(&ctx, "test_graph_provider_topics") + .await + .unwrap(); + test_graph_provider_topics(ros).await.unwrap(); + } + + #[test_log::test(tokio::test)] + async fn test_ros2_zenoh_graph_provider_services() { + let ctx = make_test_context(); + let ros = ZenohClient::new(&ctx, "test_graph_provider_services") + .await + .unwrap(); + test_graph_provider_services(ros).await.unwrap(); + } +} + +// ============================================================================ +// Rosbridge ROS1 Backend Tests +// ============================================================================ + +#[cfg(feature = "rosbridge_ros1_test")] +mod rosbridge_ros1_tests { + use super::*; + use roslibrust_rosbridge::ClientHandle; + + #[test_log::test(tokio::test)] + async fn test_rosbridge_ros1_graph_provider_topics() { + let ros = ClientHandle::new("ws://localhost:9090").await.unwrap(); + test_graph_provider_topics(ros).await.unwrap(); + } + + #[test_log::test(tokio::test)] + async fn test_rosbridge_ros1_graph_provider_services() { + let ros = ClientHandle::new("ws://localhost:9090").await.unwrap(); + test_graph_provider_services(ros).await.unwrap(); + } +} + +// ============================================================================ +// Rosbridge ROS2 Backend Tests +// ============================================================================ + +#[cfg(feature = "rosbridge_ros2_test")] +mod rosbridge_ros2_tests { + use super::*; + use roslibrust_rosbridge::ClientHandle; + + #[test_log::test(tokio::test)] + async fn test_rosbridge_ros2_graph_provider_topics() { + let ros = ClientHandle::new("ws://localhost:9090").await.unwrap(); + test_graph_provider_topics(ros).await.unwrap(); + } + + #[test_log::test(tokio::test)] + async fn test_rosbridge_ros2_graph_provider_services() { + let ros = ClientHandle::new("ws://localhost:9090").await.unwrap(); + test_graph_provider_services(ros).await.unwrap(); + } +}