From bcd405c961a87296dc50521aa60a9db2319cab31 Mon Sep 17 00:00:00 2001 From: Alvaro Gaona Date: Fri, 13 Feb 2026 22:49:51 +0100 Subject: [PATCH 1/2] fix: store enriched endpoint data with participant default locators in discovery_db When a remote DDS implementation (e.g. CycloneDDS) omits per-endpoint locators in SEDP announcements and relies on the participant's default locators from SPDP, update_subscription() and update_publication() would store the raw data with empty locator lists. The enriched copy (with default locators filled in) was only returned for the immediate notification but never stored back. This caused endpoints created after initial SEDP discovery to get reader/writer proxies with empty locator lists, silently dropping all data. Service calls (single request/reply) failed 100% of the time. The fix builds the enriched data first, then stores the enriched version in external_topic_readers/external_topic_writers so that later lookups via readers_on_topic()/writers_on_topic() return usable locator information. --- src/discovery/discovery_db.rs | 56 +++++++------- tests/late_endpoint_test.rs | 136 ++++++++++++++++++++++++++++++++++ 2 files changed, 166 insertions(+), 26 deletions(-) create mode 100644 tests/late_endpoint_test.rs diff --git a/src/discovery/discovery_db.rs b/src/discovery/discovery_db.rs index 7f8ff2f7..b6753acb 100644 --- a/src/discovery/discovery_db.rs +++ b/src/discovery/discovery_db.rs @@ -424,8 +424,6 @@ impl DiscoveryDB { pub fn update_subscription(&mut self, data: &DiscoveredReaderData) -> DiscoveredReaderData { let guid = data.reader_proxy.remote_reader_guid; - self.external_topic_readers.insert(guid, data.clone()); - // fill in the default locators from participant, in case DRD did not provide // any let default_locator_lists = self @@ -449,7 +447,20 @@ impl DiscoveryDB { } (Vec::default(), Vec::default()) }); - debug!("External reader: {data:?}"); + + // Build enriched data with participant's default locators filled in, so that + // later lookups via readers_on_topic() return usable locator information. + let enriched = DiscoveredReaderData { + reader_proxy: ReaderProxy::from(RtpsReaderProxy::from_discovered_reader_data( + data, + &default_locator_lists.0, + &default_locator_lists.1, + )), + ..data.clone() + }; + + self.external_topic_readers.insert(guid, enriched.clone()); + debug!("External reader: {enriched:?}"); // Now the topic update: let dtd = data.subscription_topic_data.to_topic_data(); @@ -463,31 +474,19 @@ impl DiscoveryDB { // reader update. If there is a DiscoveredVia::Topic record, use QosPolicies // from that record and modify by QoS given in the DRD. - // Return DiscoveredReaderData with possibly updated locators. - DiscoveredReaderData { - reader_proxy: ReaderProxy::from(RtpsReaderProxy::from_discovered_reader_data( - data, - &default_locator_lists.0, - &default_locator_lists.1, - )), - ..data.clone() - } + enriched } // TODO: This is silly. Returns one of the parameters cloned, or None pub fn update_publication(&mut self, data: &DiscoveredWriterData) -> DiscoveredWriterData { let guid = data.writer_proxy.remote_writer_guid; - self - .external_topic_writers - .insert(data.writer_proxy.remote_writer_guid, data.clone()); - // fill in the default locators from participant, in case DRD did not provide // any let default_locator_lists = self .find_participant_proxy(guid.prefix) .map(|pp| { - debug!("Added participant locators to Reader {guid:?}"); + debug!("Added participant locators to Writer {guid:?}"); ( pp.default_unicast_locators.clone(), pp.default_multicast_locators.clone(), @@ -506,7 +505,19 @@ impl DiscoveryDB { (Vec::default(), Vec::default()) }); - debug!("External writer: {data:?}"); + // Build enriched data with participant's default locators filled in, so that + // later lookups via writers_on_topic() return usable locator information. + let enriched = DiscoveredWriterData { + writer_proxy: WriterProxy::from(RtpsWriterProxy::from_discovered_writer_data( + data, + &default_locator_lists.0, + &default_locator_lists.1, + )), + ..data.clone() + }; + + self.external_topic_writers.insert(guid, enriched.clone()); + debug!("External writer: {enriched:?}"); // Now the topic update: let dtd = data.publication_topic_data.to_topic_data(); @@ -516,14 +527,7 @@ impl DiscoveryDB { DiscoveredVia::Publication, ); - DiscoveredWriterData { - writer_proxy: WriterProxy::from(RtpsWriterProxy::from_discovered_writer_data( - data, - &default_locator_lists.0, - &default_locator_lists.1, - )), - ..data.clone() - } + enriched } // This is for local participant updating the topic table diff --git a/tests/late_endpoint_test.rs b/tests/late_endpoint_test.rs new file mode 100644 index 00000000..4ff64d30 --- /dev/null +++ b/tests/late_endpoint_test.rs @@ -0,0 +1,136 @@ +/// Regression test: endpoints created after SEDP discovery must be able to +/// exchange data. Previously, `discovery_db` stored endpoint data without +/// filling in the participant's default locators, so late-created endpoints +/// would get reader/writer proxies with empty locator lists and silently +/// drop all data. +use std::time::{Duration, Instant}; + +use rustdds::{policy, DomainParticipant, QosPolicyBuilder, TopicKind}; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +struct Ping { + seq: u32, +} + +#[test] +fn late_writer_can_reach_early_reader() { + // Participant A: creates a reader immediately. + let participant_a = DomainParticipant::new(51).unwrap(); + let qos = QosPolicyBuilder::new() + .reliability(policy::Reliability::Reliable { + max_blocking_time: rustdds::Duration::from_secs(1), + }) + .durability(policy::Durability::Volatile) + .history(policy::History::KeepAll) + .build(); + + let topic_a = participant_a + .create_topic( + "late_endpoint_test_topic".to_string(), + "Ping".to_string(), + &qos, + TopicKind::NoKey, + ) + .unwrap(); + let subscriber = participant_a.create_subscriber(&qos).unwrap(); + let mut reader = subscriber + .create_datareader_no_key_cdr::(&topic_a, None) + .unwrap(); + + // Participant B: wait for SEDP discovery to complete with A, then create a + // late writer. + let participant_b = DomainParticipant::new(51).unwrap(); + std::thread::sleep(Duration::from_secs(3)); + + let topic_b = participant_b + .create_topic( + "late_endpoint_test_topic".to_string(), + "Ping".to_string(), + &qos, + TopicKind::NoKey, + ) + .unwrap(); + let publisher = participant_b.create_publisher(&qos).unwrap(); + let writer = publisher + .create_datawriter_no_key_cdr::(&topic_b, None) + .unwrap(); + + // Wait for the late writer to be matched. + std::thread::sleep(Duration::from_secs(2)); + + // Write data from the late writer. + writer.write(Ping { seq: 42 }, None).unwrap(); + + // Read from A. + let deadline = Instant::now() + Duration::from_secs(5); + while Instant::now() < deadline { + if let Ok(Some(sample)) = reader.take_next_sample() { + assert_eq!(sample.into_value().seq, 42); + return; // success + } + std::thread::sleep(Duration::from_millis(50)); + } + panic!("late writer's data never arrived at the early reader within 5 seconds"); +} + +#[test] +fn late_reader_can_receive_from_early_writer() { + // Participant A: creates a writer immediately. + let participant_a = DomainParticipant::new(52).unwrap(); + let qos = QosPolicyBuilder::new() + .reliability(policy::Reliability::Reliable { + max_blocking_time: rustdds::Duration::from_secs(1), + }) + .durability(policy::Durability::Volatile) + .history(policy::History::KeepAll) + .build(); + + let topic_a = participant_a + .create_topic( + "late_endpoint_test_topic_2".to_string(), + "Ping".to_string(), + &qos, + TopicKind::NoKey, + ) + .unwrap(); + let publisher = participant_a.create_publisher(&qos).unwrap(); + let writer = publisher + .create_datawriter_no_key_cdr::(&topic_a, None) + .unwrap(); + + // Participant B: wait for SEDP discovery to complete with A, then create a + // late reader. + let participant_b = DomainParticipant::new(52).unwrap(); + std::thread::sleep(Duration::from_secs(3)); + + let topic_b = participant_b + .create_topic( + "late_endpoint_test_topic_2".to_string(), + "Ping".to_string(), + &qos, + TopicKind::NoKey, + ) + .unwrap(); + let subscriber = participant_b.create_subscriber(&qos).unwrap(); + let mut reader = subscriber + .create_datareader_no_key_cdr::(&topic_b, None) + .unwrap(); + + // Wait for the late reader to be matched. + std::thread::sleep(Duration::from_secs(2)); + + // Write data from the early writer (after matching). + writer.write(Ping { seq: 99 }, None).unwrap(); + + // Read from B's late reader. + let deadline = Instant::now() + Duration::from_secs(5); + while Instant::now() < deadline { + if let Ok(Some(sample)) = reader.take_next_sample() { + assert_eq!(sample.into_value().seq, 99); + return; // success + } + std::thread::sleep(Duration::from_millis(50)); + } + panic!("early writer's data never arrived at the late reader within 5 seconds"); +} From 6c495aedf7d190f9ef891d4e46fd10058e87b917 Mon Sep 17 00:00:00 2001 From: Alvaro Gaona Date: Fri, 13 Feb 2026 22:49:57 +0100 Subject: [PATCH 2/2] feat: re-export SampleIdentity from crate root SampleIdentity is required to call WriteOptionsBuilder::related_sample_identity() but was not publicly accessible. It lived in structure::rpc which is pub(crate), making the public API method impossible to call from downstream crates. This is needed for ROS 2 service interop with FastDDS, which uses RTPS Inline QoS PID_RELATED_SAMPLE_IDENTITY for service request/reply correlation. --- src/lib.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 1905ec2e..f3ab955c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -220,14 +220,12 @@ pub use dds::{ /// CDR. pub use serialization::RepresentationIdentifier; #[doc(inline)] -pub use serialization::{ - CDRDeserializerAdapter, CDRSerializerAdapter, CdrDeserializer, CdrSerializer, -}; +pub use serialization::{CDRDeserializerAdapter, CDRSerializerAdapter, CdrDeserializer, CdrSerializer}; /// Part of RTPS DATA submessage: 4-byte header + serialized data pub use messages::submessages::elements::serialized_payload::SerializedPayload; pub use structure::{ - duration::Duration, entity::RTPSEntity, guid::GUID, sequence_number::SequenceNumber, - time::Timestamp, + duration::Duration, entity::RTPSEntity, guid::GUID, rpc::SampleIdentity, + sequence_number::SequenceNumber, time::Timestamp, }; // re-export from a helper crate /// Helper pacakge to compute the CDR-serialized size of data