Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions roslibrust_common/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,20 @@ use crate::topic_name::*;
use crate::{Result, ServiceError};
use std::future::Future;

/// Information about a topic currently visible in the ROS graph.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct TopicInfo {
pub name: String,
pub type_name: String,
}

/// Information about a service currently visible in the ROS graph.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ServiceInfo {
pub name: String,
pub type_name: String,
}

/// Fundamental traits for message types this crate works with
/// This trait will be satisfied for any types generated with this crate's message_gen functionality
pub trait RosMessageType:
Expand Down Expand Up @@ -194,6 +208,15 @@ pub trait ServiceProvider {
) -> impl Future<Output = Result<Self::ServiceServer>> + Send;
}

/// Describes the ability to inspect the visible ROS graph.
pub trait GraphProvider {
/// List topics currently visible to this backend.
fn list_topics(&self) -> impl Future<Output = Result<Vec<TopicInfo>>> + Send;

/// List services currently visible to this backend.
fn list_services(&self) -> impl Future<Output = Result<Vec<ServiceInfo>>> + Send;
}

// ANCHOR: ros_trait
/// Represents all "standard" ROS functionality generically supported by roslibrust
///
Expand Down
138 changes: 128 additions & 10 deletions roslibrust_mock/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,19 @@ type TypeErasedCallback = Arc<
+ 'static,
>;

struct TopicEntry {
sender: Channel::Sender<Vec<u8>>,
receiver: Channel::Receiver<Vec<u8>>,
type_name: String,
}

struct ServiceEntry {
callback: TypeErasedCallback,
type_name: String,
}

// Internal type for storing services
type ServiceStore = RwLock<BTreeMap<String, TypeErasedCallback>>;
type ServiceStore = RwLock<BTreeMap<String, ServiceEntry>>;

/// A mock ROS implementation that can be substituted for any roslibrust backend in unit tests.
///
Expand All @@ -55,7 +66,7 @@ type ServiceStore = RwLock<BTreeMap<String, TypeErasedCallback>>;
pub struct MockRos {
// We could probably achieve some fancier type erasure than actually serializing the data
// but this ends up being pretty simple
topics: Arc<RwLock<BTreeMap<String, (Channel::Sender<Vec<u8>>, Channel::Receiver<Vec<u8>>)>>>,
topics: Arc<RwLock<BTreeMap<String, TopicEntry>>>,
services: Arc<ServiceStore>,
}

Expand Down Expand Up @@ -89,10 +100,17 @@ impl TopicProvider for MockRos {
// Check if we already have this channel
{
let topics = self.topics.read().await;
if let Some((sender, _)) = topics.get(topic_str) {
if let Some(entry) = topics.get(topic_str) {
if entry.type_name != MsgType::ROS_TYPE_NAME {
return Err(Error::ServerError(format!(
"Topic {topic_str} already registered with type {}, cannot also use {}",
entry.type_name,
MsgType::ROS_TYPE_NAME
)));
}
debug!("Issued new publisher to existing topic {}", topic_str);
return Ok(MockPublisher {
sender: sender.clone(),
sender: entry.sender.clone(),
_marker: Default::default(),
});
}
Expand All @@ -101,7 +119,14 @@ impl TopicProvider for MockRos {
let tx_rx = Channel::channel(10);
let tx_copy = tx_rx.0.clone();
let mut topics = self.topics.write().await;
topics.insert(topic_str.to_string(), tx_rx);
topics.insert(
topic_str.to_string(),
TopicEntry {
sender: tx_rx.0,
receiver: tx_rx.1,
type_name: MsgType::ROS_TYPE_NAME.to_string(),
},
);
debug!("Created new publisher and channel for topic {}", topic_str);
Ok(MockPublisher {
sender: tx_copy,
Expand All @@ -118,10 +143,17 @@ impl TopicProvider for MockRos {
// Check if we already have this channel
{
let topics = self.topics.read().await;
if let Some((_, receiver)) = topics.get(topic_str) {
if let Some(entry) = topics.get(topic_str) {
if entry.type_name != MsgType::ROS_TYPE_NAME {
return Err(Error::ServerError(format!(
"Topic {topic_str} already registered with type {}, cannot also use {}",
entry.type_name,
MsgType::ROS_TYPE_NAME
)));
}
debug!("Issued new subscriber to existing topic {}", topic_str);
return Ok(MockSubscriber {
receiver: receiver.resubscribe(),
receiver: entry.receiver.resubscribe(),
_marker: Default::default(),
});
}
Expand All @@ -130,7 +162,14 @@ impl TopicProvider for MockRos {
let tx_rx = Channel::channel(10);
let rx_copy = tx_rx.1.resubscribe();
let mut topics = self.topics.write().await;
topics.insert(topic_str.to_string(), tx_rx);
topics.insert(
topic_str.to_string(),
TopicEntry {
sender: tx_rx.0,
receiver: tx_rx.1,
type_name: MsgType::ROS_TYPE_NAME.to_string(),
},
);
debug!("Created new subscriber and channel for topic {}", topic_str);
Ok(MockSubscriber {
receiver: rx_copy,
Expand Down Expand Up @@ -172,7 +211,9 @@ impl<T: RosServiceType> Service<T> for MockServiceClient<T> {
// Check if a service exists for this topic
let callback = {
let services = services.read().await;
services.get(&self.topic).cloned()
services
.get(&self.topic)
.map(|entry| entry.callback.clone())
};
let callback = match callback {
Some(callback) => callback,
Expand Down Expand Up @@ -237,6 +278,19 @@ impl ServiceProvider for MockRos {
server: F,
) -> Result<Self::ServiceServer> {
let service: GlobalTopicName = service.to_global_name()?;
let service_name = service.as_ref().to_string();
{
let services = self.services.read().await;
if let Some(existing) = services.get(&service_name) {
if existing.type_name != SrvType::ROS_SERVICE_NAME {
return Err(Error::ServerError(format!(
"Service {service_name} already registered with type {}, cannot also use {}",
existing.type_name,
SrvType::ROS_SERVICE_NAME
)));
}
}
}
// Type erase the service function here
let erased_closure = move |message: Vec<u8>| -> std::result::Result<
Vec<u8>,
Expand All @@ -251,14 +305,44 @@ impl ServiceProvider for MockRos {
};
let erased_closure = Arc::new(erased_closure);
let mut services = self.services.write().await;
services.insert(String::from(service), erased_closure);
services.insert(
service_name,
ServiceEntry {
callback: erased_closure,
type_name: SrvType::ROS_SERVICE_NAME.to_string(),
},
);

// We technically need to hand back a token that shuts the service down here
// But we haven't implemented that yet in this mock
Ok(())
}
}

impl GraphProvider for MockRos {
async fn list_topics(&self) -> Result<Vec<TopicInfo>> {
let topics = self.topics.read().await;
Ok(topics
.iter()
.map(|(name, entry)| TopicInfo {
name: name.clone(),
type_name: entry.type_name.clone(),
})
.collect())
}

async fn list_services(&self) -> Result<Vec<ServiceInfo>> {
let services = self.services.read().await;
Ok(services
.iter()
.map(|(name, entry)| ServiceInfo {
name: name.clone(),
type_name: entry.type_name.clone(),
})
.collect())
}
}

/// The publisher type returned by calling [MockRos::advertise].
pub struct MockPublisher<T: RosMessageType> {
sender: Channel::Sender<Vec<u8>>,
Expand Down Expand Up @@ -353,6 +437,40 @@ mod tests {
assert_eq!(response.message, "You set my bool!");
}

#[tokio::test(flavor = "multi_thread")]
async fn test_mock_graph_provider() {
let mock_ros = MockRos::new();

let _publisher = mock_ros
.advertise::<std_msgs::String>("/test_topic")
.await
.unwrap();
mock_ros
.advertise_service::<std_srvs::SetBool, _>("/test_service", |request| {
Ok(std_srvs::SetBoolResponse {
success: request.data,
message: String::new(),
})
})
.await
.unwrap();

assert_eq!(
mock_ros.list_topics().await.unwrap(),
vec![TopicInfo {
name: "/test_topic".to_string(),
type_name: "std_msgs/String".to_string(),
}]
);
assert_eq!(
mock_ros.list_services().await.unwrap(),
vec![ServiceInfo {
name: "/test_service".to_string(),
type_name: "std_srvs/SetBool".to_string(),
}]
);
}

#[tokio::test(flavor = "multi_thread")]
async fn test_mock_node() {
// Proves that MockRos impls the Ros trait (via auto impl in roslibrust_common)
Expand Down
8 changes: 8 additions & 0 deletions roslibrust_ros1/src/master_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ impl SystemState {
};
entry.nodes.iter().any(|name| name.as_str().eq(node))
}

/// Returns all service names currently registered with the master.
pub fn service_names(&self) -> Vec<String> {
self.service_providers
.iter()
.map(|entry| entry.topic.clone())
.collect()
}
}

impl MasterClient {
Expand Down
53 changes: 52 additions & 1 deletion roslibrust_ros1/src/node/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
names::Name, publisher::Publisher, publisher::PublisherAny, service_client::ServiceClient,
subscriber::Subscriber, subscriber::SubscriberAny, NodeError, ServiceServer,
};
use roslibrust_common::ServiceFn;
use roslibrust_common::{GraphProvider, ServiceFn, ServiceInfo, TopicInfo};

/// Represents a handle to an underlying Node. NodeHandle's can be freely cloned, moved, copied, etc.
/// This class provides the user facing API for interacting with ROS.
Expand Down Expand Up @@ -174,3 +174,54 @@ impl NodeHandle {
Ok(ServiceServer::new(service_name, weak_node))
}
}

impl GraphProvider for NodeHandle {
async fn list_topics(&self) -> roslibrust_common::Result<Vec<TopicInfo>> {
let client = {
let node = self.inner.node.lock().await;
node.client.clone()
};

let mut topics: Vec<_> = client
.get_topic_types()
.await
.map_err(NodeError::from)?
.into_iter()
.map(|(name, type_name)| TopicInfo { name, type_name })
.collect();
topics.sort_by(|a, b| a.name.cmp(&b.name));
Ok(topics)
}

async fn list_services(&self) -> roslibrust_common::Result<Vec<ServiceInfo>> {
let (client, caller_id) = {
let node = self.inner.node.lock().await;
(node.client.clone(), node.node_name.to_string())
};

let service_names = client
.get_system_state()
.await
.map_err(NodeError::from)?
.service_names();

let mut services = Vec::with_capacity(service_names.len());
for service_name in service_names {
let service_uri = client
.lookup_service(&service_name)
.await
.map_err(NodeError::from)?;
let type_name =
crate::tcpros::probe_service_type(&caller_id, &service_name, &service_uri)
.await
.map_err(NodeError::from)?;
services.push(ServiceInfo {
name: service_name,
type_name,
});
}

services.sort_by(|a, b| a.name.cmp(&b.name));
Ok(services)
}
}
43 changes: 43 additions & 0 deletions roslibrust_ros1/src/tcpros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,49 @@ pub async fn establish_connection(
.map_err(std::io::Error::from)
}

/// Connects to a ROS1 service server only long enough to discover its connection header.
pub async fn probe_service_type(
caller_id: &str,
service_name: &str,
service_uri: &str,
) -> Result<String, std::io::Error> {
use tokio::io::AsyncWriteExt;

let server_uri = service_uri.replace("rosrpc://", "");
let mut stream = TcpStream::connect(&server_uri).await.map_err(|err| {
log::error!(
"Failed to establish TCPROS probe connection to service {service_name} at {server_uri}: {err}"
);
err
})?;

let mut header_data = Vec::with_capacity(256);
WriteBytesExt::write_u32::<LittleEndian>(&mut header_data, 0)?;
for field in [
format!("callerid={caller_id}"),
"md5sum=*".to_string(),
format!("service={service_name}"),
"probe=1".to_string(),
] {
WriteBytesExt::write_u32::<LittleEndian>(&mut header_data, field.len() as u32)?;
std::io::Write::write_all(&mut header_data, field.as_bytes())?;
}

let total_length = (header_data.len() - 4) as u32;
header_data[..4].copy_from_slice(&total_length.to_le_bytes());
stream.write_all(&header_data).await?;

let responded_header = receive_header(&mut stream).await?;
if responded_header.topic_type.is_empty() {
Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Service {service_name} probe response did not include a type field"),
))
} else {
Ok(responded_header.topic_type)
}
}

pub async fn receive_header_bytes(stream: &mut TcpStream) -> Result<Vec<u8>, std::io::Error> {
// Bring trait def into scope
use tokio::io::AsyncReadExt;
Expand Down
Loading
Loading