diff --git a/test_communication/test/test_publisher_subscriber_serialized.cpp b/test_communication/test/test_publisher_subscriber_serialized.cpp index fb147da5..379938f2 100644 --- a/test_communication/test/test_publisher_subscriber_serialized.cpp +++ b/test_communication/test/test_publisher_subscriber_serialized.cpp @@ -14,6 +14,7 @@ #include #include +#include #include "rclcpp/rclcpp.hpp" #include "rclcpp/serialization.hpp" @@ -21,6 +22,7 @@ #include "rmw/types.h" #include "test_msgs/message_fixtures.hpp" +#include "test_msgs/msg/unbounded_sequences.hpp" #ifdef RMW_IMPLEMENTATION # define CLASSNAME_(NAME, SUFFIX) NAME ## __ ## SUFFIX @@ -90,3 +92,50 @@ TEST_F(CLASSNAME(TestMessageSerialization, RMW_IMPLEMENTATION), serialized_callb } EXPECT_GT(counter, 0u); } + +// Regression test for buffer-aware serialized take. Messages with `uint8[]` +// fields put rmw_fastrtps publishers on a separate "shared CPU channel" +// DataWriter for CPU-only subscribers; rmw_take_serialized_message must +// dispatch onto that path or generic/serialized subscribers (foxglove_bridge, +// ros2 bag record, rclcpp::GenericSubscription) silently get no data. +// +// Subscription options are intentionally left at defaults: the broken path +// is the one taken by every generic/serialized consumer today, which is +// `acceptable_buffer_backends` unset (=> CPU-only buffer-aware subscriber). +TEST_F( + CLASSNAME(TestMessageSerialization, RMW_IMPLEMENTATION), + serialized_callback_buffer_aware_message) +{ + using MessageT = test_msgs::msg::UnboundedSequences; + const std::vector expected_bytes{1, 2, 3, 4}; + + size_t received = 0; + auto serialized_callback = + [&received, &expected_bytes]( + const std::shared_ptr serialized_msg) { + rclcpp::Serialization serializer; + MessageT decoded; + EXPECT_NO_THROW(serializer.deserialize_message(serialized_msg.get(), &decoded)); + EXPECT_EQ(decoded.uint8_values, expected_bytes); + ++received; + }; + + auto node = rclcpp::Node::make_shared("test_publisher_subscriber_serialized_buffer"); + auto subscriber = node->create_subscription( + "test_publisher_subscriber_serialized_buffer_topic", 10, serialized_callback); + auto publisher = node->create_publisher( + "test_publisher_subscriber_serialized_buffer_topic", 10); + rclcpp::executors::SingleThreadedExecutor executor; + executor.add_node(node); + + MessageT msg; + msg.uint8_values = expected_bytes; + + rclcpp::Rate loop_rate(10); + for (auto i = 0u; i < 10; ++i) { + publisher->publish(msg); + executor.spin_some(); + loop_rate.sleep(); + } + EXPECT_GT(received, 0u); +}