From 11e01e9c6900948b52f0e485f429960fc995a109 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Tue, 5 May 2026 15:38:30 -0400 Subject: [PATCH] Add in a test for unbounded serialized take. (#592) This is actually handled differently in rmw_fastrtps_cpp now, so we should have a separate test for it. Signed-off-by: Chris Lalancette (cherry picked from commit 1a0d6a0ade40f14d1425101fec024b20112c3656) --- .../test_publisher_subscriber_serialized.cpp | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/test_communication/test/test_publisher_subscriber_serialized.cpp b/test_communication/test/test_publisher_subscriber_serialized.cpp index fb147da5..379938f2 100644 --- a/test_communication/test/test_publisher_subscriber_serialized.cpp +++ b/test_communication/test/test_publisher_subscriber_serialized.cpp @@ -14,6 +14,7 @@ #include #include +#include #include "rclcpp/rclcpp.hpp" #include "rclcpp/serialization.hpp" @@ -21,6 +22,7 @@ #include "rmw/types.h" #include "test_msgs/message_fixtures.hpp" +#include "test_msgs/msg/unbounded_sequences.hpp" #ifdef RMW_IMPLEMENTATION # define CLASSNAME_(NAME, SUFFIX) NAME ## __ ## SUFFIX @@ -90,3 +92,50 @@ TEST_F(CLASSNAME(TestMessageSerialization, RMW_IMPLEMENTATION), serialized_callb } EXPECT_GT(counter, 0u); } + +// Regression test for buffer-aware serialized take. Messages with `uint8[]` +// fields put rmw_fastrtps publishers on a separate "shared CPU channel" +// DataWriter for CPU-only subscribers; rmw_take_serialized_message must +// dispatch onto that path or generic/serialized subscribers (foxglove_bridge, +// ros2 bag record, rclcpp::GenericSubscription) silently get no data. +// +// Subscription options are intentionally left at defaults: the broken path +// is the one taken by every generic/serialized consumer today, which is +// `acceptable_buffer_backends` unset (=> CPU-only buffer-aware subscriber). +TEST_F( + CLASSNAME(TestMessageSerialization, RMW_IMPLEMENTATION), + serialized_callback_buffer_aware_message) +{ + using MessageT = test_msgs::msg::UnboundedSequences; + const std::vector expected_bytes{1, 2, 3, 4}; + + size_t received = 0; + auto serialized_callback = + [&received, &expected_bytes]( + const std::shared_ptr serialized_msg) { + rclcpp::Serialization serializer; + MessageT decoded; + EXPECT_NO_THROW(serializer.deserialize_message(serialized_msg.get(), &decoded)); + EXPECT_EQ(decoded.uint8_values, expected_bytes); + ++received; + }; + + auto node = rclcpp::Node::make_shared("test_publisher_subscriber_serialized_buffer"); + auto subscriber = node->create_subscription( + "test_publisher_subscriber_serialized_buffer_topic", 10, serialized_callback); + auto publisher = node->create_publisher( + "test_publisher_subscriber_serialized_buffer_topic", 10); + rclcpp::executors::SingleThreadedExecutor executor; + executor.add_node(node); + + MessageT msg; + msg.uint8_values = expected_bytes; + + rclcpp::Rate loop_rate(10); + for (auto i = 0u; i < 10; ++i) { + publisher->publish(msg); + executor.spin_some(); + loop_rate.sleep(); + } + EXPECT_GT(received, 0u); +}