Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 30 additions & 26 deletions src/discovery/discovery_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,6 @@ impl DiscoveryDB {
pub fn update_subscription(&mut self, data: &DiscoveredReaderData) -> DiscoveredReaderData {
let guid = data.reader_proxy.remote_reader_guid;

self.external_topic_readers.insert(guid, data.clone());

// fill in the default locators from participant, in case DRD did not provide
// any
let default_locator_lists = self
Expand All @@ -449,7 +447,20 @@ impl DiscoveryDB {
}
(Vec::default(), Vec::default())
});
debug!("External reader: {data:?}");

// Build enriched data with participant's default locators filled in, so that
// later lookups via readers_on_topic() return usable locator information.
let enriched = DiscoveredReaderData {
reader_proxy: ReaderProxy::from(RtpsReaderProxy::from_discovered_reader_data(
data,
&default_locator_lists.0,
&default_locator_lists.1,
)),
..data.clone()
};

self.external_topic_readers.insert(guid, enriched.clone());
debug!("External reader: {enriched:?}");

// Now the topic update:
let dtd = data.subscription_topic_data.to_topic_data();
Expand All @@ -463,31 +474,19 @@ impl DiscoveryDB {
// reader update. If there is a DiscoveredVia::Topic record, use QosPolicies
// from that record and modify by QoS given in the DRD.

// Return DiscoveredReaderData with possibly updated locators.
DiscoveredReaderData {
reader_proxy: ReaderProxy::from(RtpsReaderProxy::from_discovered_reader_data(
data,
&default_locator_lists.0,
&default_locator_lists.1,
)),
..data.clone()
}
enriched
}

// TODO: This is silly. Returns one of the parameters cloned, or None
pub fn update_publication(&mut self, data: &DiscoveredWriterData) -> DiscoveredWriterData {
let guid = data.writer_proxy.remote_writer_guid;

self
.external_topic_writers
.insert(data.writer_proxy.remote_writer_guid, data.clone());

// fill in the default locators from participant, in case DRD did not provide
// any
let default_locator_lists = self
.find_participant_proxy(guid.prefix)
.map(|pp| {
debug!("Added participant locators to Reader {guid:?}");
debug!("Added participant locators to Writer {guid:?}");
(
pp.default_unicast_locators.clone(),
pp.default_multicast_locators.clone(),
Expand All @@ -506,7 +505,19 @@ impl DiscoveryDB {
(Vec::default(), Vec::default())
});

debug!("External writer: {data:?}");
// Build enriched data with participant's default locators filled in, so that
// later lookups via writers_on_topic() return usable locator information.
let enriched = DiscoveredWriterData {
writer_proxy: WriterProxy::from(RtpsWriterProxy::from_discovered_writer_data(
data,
&default_locator_lists.0,
&default_locator_lists.1,
)),
..data.clone()
};

self.external_topic_writers.insert(guid, enriched.clone());
debug!("External writer: {enriched:?}");

// Now the topic update:
let dtd = data.publication_topic_data.to_topic_data();
Expand All @@ -516,14 +527,7 @@ impl DiscoveryDB {
DiscoveredVia::Publication,
);

DiscoveredWriterData {
writer_proxy: WriterProxy::from(RtpsWriterProxy::from_discovered_writer_data(
data,
&default_locator_lists.0,
&default_locator_lists.1,
)),
..data.clone()
}
enriched
}

// This is for local participant updating the topic table
Expand Down
8 changes: 3 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,14 +220,12 @@ pub use dds::{
/// CDR.
pub use serialization::RepresentationIdentifier;
#[doc(inline)]
pub use serialization::{
CDRDeserializerAdapter, CDRSerializerAdapter, CdrDeserializer, CdrSerializer,
};
pub use serialization::{CDRDeserializerAdapter, CDRSerializerAdapter, CdrDeserializer, CdrSerializer};
/// Part of RTPS DATA submessage: 4-byte header + serialized data
pub use messages::submessages::elements::serialized_payload::SerializedPayload;
pub use structure::{
duration::Duration, entity::RTPSEntity, guid::GUID, sequence_number::SequenceNumber,
time::Timestamp,
duration::Duration, entity::RTPSEntity, guid::GUID, rpc::SampleIdentity,
sequence_number::SequenceNumber, time::Timestamp,
};
// re-export from a helper crate
/// Helper pacakge to compute the CDR-serialized size of data
Expand Down
136 changes: 136 additions & 0 deletions tests/late_endpoint_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/// Regression test: endpoints created after SEDP discovery must be able to
/// exchange data. Previously, `discovery_db` stored endpoint data without
/// filling in the participant's default locators, so late-created endpoints
/// would get reader/writer proxies with empty locator lists and silently
/// drop all data.
use std::time::{Duration, Instant};

use rustdds::{policy, DomainParticipant, QosPolicyBuilder, TopicKind};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
struct Ping {
seq: u32,
}

#[test]
fn late_writer_can_reach_early_reader() {
// Participant A: creates a reader immediately.
let participant_a = DomainParticipant::new(51).unwrap();
let qos = QosPolicyBuilder::new()
.reliability(policy::Reliability::Reliable {
max_blocking_time: rustdds::Duration::from_secs(1),
})
.durability(policy::Durability::Volatile)
.history(policy::History::KeepAll)
.build();

let topic_a = participant_a
.create_topic(
"late_endpoint_test_topic".to_string(),
"Ping".to_string(),
&qos,
TopicKind::NoKey,
)
.unwrap();
let subscriber = participant_a.create_subscriber(&qos).unwrap();
let mut reader = subscriber
.create_datareader_no_key_cdr::<Ping>(&topic_a, None)
.unwrap();

// Participant B: wait for SEDP discovery to complete with A, then create a
// late writer.
let participant_b = DomainParticipant::new(51).unwrap();
std::thread::sleep(Duration::from_secs(3));

let topic_b = participant_b
.create_topic(
"late_endpoint_test_topic".to_string(),
"Ping".to_string(),
&qos,
TopicKind::NoKey,
)
.unwrap();
let publisher = participant_b.create_publisher(&qos).unwrap();
let writer = publisher
.create_datawriter_no_key_cdr::<Ping>(&topic_b, None)
.unwrap();

// Wait for the late writer to be matched.
std::thread::sleep(Duration::from_secs(2));

// Write data from the late writer.
writer.write(Ping { seq: 42 }, None).unwrap();

// Read from A.
let deadline = Instant::now() + Duration::from_secs(5);
while Instant::now() < deadline {
if let Ok(Some(sample)) = reader.take_next_sample() {
assert_eq!(sample.into_value().seq, 42);
return; // success
}
std::thread::sleep(Duration::from_millis(50));
}
panic!("late writer's data never arrived at the early reader within 5 seconds");
}

#[test]
fn late_reader_can_receive_from_early_writer() {
// Participant A: creates a writer immediately.
let participant_a = DomainParticipant::new(52).unwrap();
let qos = QosPolicyBuilder::new()
.reliability(policy::Reliability::Reliable {
max_blocking_time: rustdds::Duration::from_secs(1),
})
.durability(policy::Durability::Volatile)
.history(policy::History::KeepAll)
.build();

let topic_a = participant_a
.create_topic(
"late_endpoint_test_topic_2".to_string(),
"Ping".to_string(),
&qos,
TopicKind::NoKey,
)
.unwrap();
let publisher = participant_a.create_publisher(&qos).unwrap();
let writer = publisher
.create_datawriter_no_key_cdr::<Ping>(&topic_a, None)
.unwrap();

// Participant B: wait for SEDP discovery to complete with A, then create a
// late reader.
let participant_b = DomainParticipant::new(52).unwrap();
std::thread::sleep(Duration::from_secs(3));

let topic_b = participant_b
.create_topic(
"late_endpoint_test_topic_2".to_string(),
"Ping".to_string(),
&qos,
TopicKind::NoKey,
)
.unwrap();
let subscriber = participant_b.create_subscriber(&qos).unwrap();
let mut reader = subscriber
.create_datareader_no_key_cdr::<Ping>(&topic_b, None)
.unwrap();

// Wait for the late reader to be matched.
std::thread::sleep(Duration::from_secs(2));

// Write data from the early writer (after matching).
writer.write(Ping { seq: 99 }, None).unwrap();

// Read from B's late reader.
let deadline = Instant::now() + Duration::from_secs(5);
while Instant::now() < deadline {
if let Ok(Some(sample)) = reader.take_next_sample() {
assert_eq!(sample.into_value().seq, 99);
return; // success
}
std::thread::sleep(Duration::from_millis(50));
}
panic!("early writer's data never arrived at the late reader within 5 seconds");
}
Loading