Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions test_communication/test/test_publisher_subscriber_serialized.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@

#include <gtest/gtest.h>
#include <memory>
#include <vector>

#include "rclcpp/rclcpp.hpp"
#include "rclcpp/serialization.hpp"

#include "rmw/types.h"

#include "test_msgs/message_fixtures.hpp"
#include "test_msgs/msg/unbounded_sequences.hpp"

#ifdef RMW_IMPLEMENTATION
# define CLASSNAME_(NAME, SUFFIX) NAME ## __ ## SUFFIX
Expand Down Expand Up @@ -90,3 +92,50 @@ TEST_F(CLASSNAME(TestMessageSerialization, RMW_IMPLEMENTATION), serialized_callb
}
EXPECT_GT(counter, 0u);
}

// Regression test for buffer-aware serialized take. Messages with `uint8[]`
// fields put rmw_fastrtps publishers on a separate "shared CPU channel"
// DataWriter for CPU-only subscribers; rmw_take_serialized_message must
// dispatch onto that path or generic/serialized subscribers (foxglove_bridge,
// ros2 bag record, rclcpp::GenericSubscription) silently get no data.
//
// Subscription options are intentionally left at defaults: the broken path
// is the one taken by every generic/serialized consumer today, which is
// `acceptable_buffer_backends` unset (=> CPU-only buffer-aware subscriber).
TEST_F(
CLASSNAME(TestMessageSerialization, RMW_IMPLEMENTATION),
serialized_callback_buffer_aware_message)
{
using MessageT = test_msgs::msg::UnboundedSequences;
const std::vector<uint8_t> expected_bytes{1, 2, 3, 4};

size_t received = 0;
auto serialized_callback =
[&received, &expected_bytes](
const std::shared_ptr<const rclcpp::SerializedMessage> serialized_msg) {
rclcpp::Serialization<MessageT> serializer;
MessageT decoded;
EXPECT_NO_THROW(serializer.deserialize_message(serialized_msg.get(), &decoded));
EXPECT_EQ(decoded.uint8_values, expected_bytes);
++received;
};

auto node = rclcpp::Node::make_shared("test_publisher_subscriber_serialized_buffer");
auto subscriber = node->create_subscription<MessageT>(
"test_publisher_subscriber_serialized_buffer_topic", 10, serialized_callback);
auto publisher = node->create_publisher<MessageT>(
"test_publisher_subscriber_serialized_buffer_topic", 10);
rclcpp::executors::SingleThreadedExecutor executor;
executor.add_node(node);

MessageT msg;
msg.uint8_values = expected_bytes;

rclcpp::Rate loop_rate(10);
for (auto i = 0u; i < 10; ++i) {
publisher->publish(msg);
executor.spin_some();
loop_rate.sleep();
}
EXPECT_GT(received, 0u);
}