diff --git a/src/discovery/discovery_db.rs b/src/discovery/discovery_db.rs index 7f8ff2f7..b6753acb 100644 --- a/src/discovery/discovery_db.rs +++ b/src/discovery/discovery_db.rs @@ -424,8 +424,6 @@ impl DiscoveryDB { pub fn update_subscription(&mut self, data: &DiscoveredReaderData) -> DiscoveredReaderData { let guid = data.reader_proxy.remote_reader_guid; - self.external_topic_readers.insert(guid, data.clone()); - // fill in the default locators from participant, in case DRD did not provide // any let default_locator_lists = self @@ -449,7 +447,20 @@ impl DiscoveryDB { } (Vec::default(), Vec::default()) }); - debug!("External reader: {data:?}"); + + // Build enriched data with participant's default locators filled in, so that + // later lookups via readers_on_topic() return usable locator information. + let enriched = DiscoveredReaderData { + reader_proxy: ReaderProxy::from(RtpsReaderProxy::from_discovered_reader_data( + data, + &default_locator_lists.0, + &default_locator_lists.1, + )), + ..data.clone() + }; + + self.external_topic_readers.insert(guid, enriched.clone()); + debug!("External reader: {enriched:?}"); // Now the topic update: let dtd = data.subscription_topic_data.to_topic_data(); @@ -463,31 +474,19 @@ impl DiscoveryDB { // reader update. If there is a DiscoveredVia::Topic record, use QosPolicies // from that record and modify by QoS given in the DRD. - // Return DiscoveredReaderData with possibly updated locators. - DiscoveredReaderData { - reader_proxy: ReaderProxy::from(RtpsReaderProxy::from_discovered_reader_data( - data, - &default_locator_lists.0, - &default_locator_lists.1, - )), - ..data.clone() - } + enriched } // TODO: This is silly. Returns one of the parameters cloned, or None pub fn update_publication(&mut self, data: &DiscoveredWriterData) -> DiscoveredWriterData { let guid = data.writer_proxy.remote_writer_guid; - self - .external_topic_writers - .insert(data.writer_proxy.remote_writer_guid, data.clone()); - // fill in the default locators from participant, in case DRD did not provide // any let default_locator_lists = self .find_participant_proxy(guid.prefix) .map(|pp| { - debug!("Added participant locators to Reader {guid:?}"); + debug!("Added participant locators to Writer {guid:?}"); ( pp.default_unicast_locators.clone(), pp.default_multicast_locators.clone(), @@ -506,7 +505,19 @@ impl DiscoveryDB { (Vec::default(), Vec::default()) }); - debug!("External writer: {data:?}"); + // Build enriched data with participant's default locators filled in, so that + // later lookups via writers_on_topic() return usable locator information. + let enriched = DiscoveredWriterData { + writer_proxy: WriterProxy::from(RtpsWriterProxy::from_discovered_writer_data( + data, + &default_locator_lists.0, + &default_locator_lists.1, + )), + ..data.clone() + }; + + self.external_topic_writers.insert(guid, enriched.clone()); + debug!("External writer: {enriched:?}"); // Now the topic update: let dtd = data.publication_topic_data.to_topic_data(); @@ -516,14 +527,7 @@ impl DiscoveryDB { DiscoveredVia::Publication, ); - DiscoveredWriterData { - writer_proxy: WriterProxy::from(RtpsWriterProxy::from_discovered_writer_data( - data, - &default_locator_lists.0, - &default_locator_lists.1, - )), - ..data.clone() - } + enriched } // This is for local participant updating the topic table diff --git a/src/lib.rs b/src/lib.rs index 1905ec2e..f3ab955c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -220,14 +220,12 @@ pub use dds::{ /// CDR. pub use serialization::RepresentationIdentifier; #[doc(inline)] -pub use serialization::{ - CDRDeserializerAdapter, CDRSerializerAdapter, CdrDeserializer, CdrSerializer, -}; +pub use serialization::{CDRDeserializerAdapter, CDRSerializerAdapter, CdrDeserializer, CdrSerializer}; /// Part of RTPS DATA submessage: 4-byte header + serialized data pub use messages::submessages::elements::serialized_payload::SerializedPayload; pub use structure::{ - duration::Duration, entity::RTPSEntity, guid::GUID, sequence_number::SequenceNumber, - time::Timestamp, + duration::Duration, entity::RTPSEntity, guid::GUID, rpc::SampleIdentity, + sequence_number::SequenceNumber, time::Timestamp, }; // re-export from a helper crate /// Helper pacakge to compute the CDR-serialized size of data diff --git a/tests/late_endpoint_test.rs b/tests/late_endpoint_test.rs new file mode 100644 index 00000000..4ff64d30 --- /dev/null +++ b/tests/late_endpoint_test.rs @@ -0,0 +1,136 @@ +/// Regression test: endpoints created after SEDP discovery must be able to +/// exchange data. Previously, `discovery_db` stored endpoint data without +/// filling in the participant's default locators, so late-created endpoints +/// would get reader/writer proxies with empty locator lists and silently +/// drop all data. +use std::time::{Duration, Instant}; + +use rustdds::{policy, DomainParticipant, QosPolicyBuilder, TopicKind}; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +struct Ping { + seq: u32, +} + +#[test] +fn late_writer_can_reach_early_reader() { + // Participant A: creates a reader immediately. + let participant_a = DomainParticipant::new(51).unwrap(); + let qos = QosPolicyBuilder::new() + .reliability(policy::Reliability::Reliable { + max_blocking_time: rustdds::Duration::from_secs(1), + }) + .durability(policy::Durability::Volatile) + .history(policy::History::KeepAll) + .build(); + + let topic_a = participant_a + .create_topic( + "late_endpoint_test_topic".to_string(), + "Ping".to_string(), + &qos, + TopicKind::NoKey, + ) + .unwrap(); + let subscriber = participant_a.create_subscriber(&qos).unwrap(); + let mut reader = subscriber + .create_datareader_no_key_cdr::(&topic_a, None) + .unwrap(); + + // Participant B: wait for SEDP discovery to complete with A, then create a + // late writer. + let participant_b = DomainParticipant::new(51).unwrap(); + std::thread::sleep(Duration::from_secs(3)); + + let topic_b = participant_b + .create_topic( + "late_endpoint_test_topic".to_string(), + "Ping".to_string(), + &qos, + TopicKind::NoKey, + ) + .unwrap(); + let publisher = participant_b.create_publisher(&qos).unwrap(); + let writer = publisher + .create_datawriter_no_key_cdr::(&topic_b, None) + .unwrap(); + + // Wait for the late writer to be matched. + std::thread::sleep(Duration::from_secs(2)); + + // Write data from the late writer. + writer.write(Ping { seq: 42 }, None).unwrap(); + + // Read from A. + let deadline = Instant::now() + Duration::from_secs(5); + while Instant::now() < deadline { + if let Ok(Some(sample)) = reader.take_next_sample() { + assert_eq!(sample.into_value().seq, 42); + return; // success + } + std::thread::sleep(Duration::from_millis(50)); + } + panic!("late writer's data never arrived at the early reader within 5 seconds"); +} + +#[test] +fn late_reader_can_receive_from_early_writer() { + // Participant A: creates a writer immediately. + let participant_a = DomainParticipant::new(52).unwrap(); + let qos = QosPolicyBuilder::new() + .reliability(policy::Reliability::Reliable { + max_blocking_time: rustdds::Duration::from_secs(1), + }) + .durability(policy::Durability::Volatile) + .history(policy::History::KeepAll) + .build(); + + let topic_a = participant_a + .create_topic( + "late_endpoint_test_topic_2".to_string(), + "Ping".to_string(), + &qos, + TopicKind::NoKey, + ) + .unwrap(); + let publisher = participant_a.create_publisher(&qos).unwrap(); + let writer = publisher + .create_datawriter_no_key_cdr::(&topic_a, None) + .unwrap(); + + // Participant B: wait for SEDP discovery to complete with A, then create a + // late reader. + let participant_b = DomainParticipant::new(52).unwrap(); + std::thread::sleep(Duration::from_secs(3)); + + let topic_b = participant_b + .create_topic( + "late_endpoint_test_topic_2".to_string(), + "Ping".to_string(), + &qos, + TopicKind::NoKey, + ) + .unwrap(); + let subscriber = participant_b.create_subscriber(&qos).unwrap(); + let mut reader = subscriber + .create_datareader_no_key_cdr::(&topic_b, None) + .unwrap(); + + // Wait for the late reader to be matched. + std::thread::sleep(Duration::from_secs(2)); + + // Write data from the early writer (after matching). + writer.write(Ping { seq: 99 }, None).unwrap(); + + // Read from B's late reader. + let deadline = Instant::now() + Duration::from_secs(5); + while Instant::now() < deadline { + if let Ok(Some(sample)) = reader.take_next_sample() { + assert_eq!(sample.into_value().seq, 99); + return; // success + } + std::thread::sleep(Duration::from_millis(50)); + } + panic!("early writer's data never arrived at the late reader within 5 seconds"); +}