Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 34 additions & 10 deletions data_tamer_cpp/include/data_tamer/sinks/ros2_publisher_sink.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,61 @@
#include "data_tamer_msgs/msg/schemas.hpp"
#include "data_tamer_msgs/msg/snapshot.hpp"
#include <unordered_map>
#include <type_traits>
#include <rclcpp/rclcpp.hpp>
#include <rclcpp_lifecycle/lifecycle_node.hpp>
#include <rclcpp/node_interfaces/node_interfaces.hpp>
#include <rclcpp/node_interfaces/node_topics_interface.hpp>

namespace DataTamer
{

using PublisherNodeInterfaces =
rclcpp::node_interfaces::NodeInterfaces<rclcpp::node_interfaces::NodeTopicsInterface>;

class ROS2PublisherSink : public DataSinkBase
{
public:
ROS2PublisherSink(std::shared_ptr<rclcpp::Node> node, const std::string& topic_prefix);

ROS2PublisherSink(std::shared_ptr<rclcpp_lifecycle::LifecycleNode> node,
const std::string& topic_prefix);

template <typename NodeT>
void create_publishers(NodeT& node, const std::string& topic_prefix)
ROS2PublisherSink(NodeT&& nodelike, const std::string& topic_prefix)
: node_interface_(normalize_node(nodelike))
{
create_publishers(topic_prefix);
}

void create_publishers(const std::string& topic_prefix)
{
rclcpp::QoS schemas_qos{ rclcpp::KeepAll() };
schemas_qos.reliable();
schemas_qos.transient_local(); // latch

const rclcpp::QoS data_qos{ rclcpp::KeepAll() };

schema_publisher_ = node->template create_publisher<data_tamer_msgs::msg::Schemas>(
topic_prefix + "/schemas", schemas_qos);
data_publisher_ = node->template create_publisher<data_tamer_msgs::msg::Snapshot>(
topic_prefix + "/data", data_qos);
schema_publisher_ = rclcpp::create_publisher<data_tamer_msgs::msg::Schemas>(
node_interface_, topic_prefix + "/schemas", schemas_qos);
data_publisher_ = rclcpp::create_publisher<data_tamer_msgs::msg::Snapshot>(
node_interface_, topic_prefix + "/data", data_qos);
}

void addChannel(const std::string& name, const Schema& schema) override;

bool storeSnapshot(const Snapshot& snapshot) override;

private:
template <typename NodeT>
static PublisherNodeInterfaces normalize_node(NodeT&& nodelike)
{
using D = std::decay_t<NodeT>;

if constexpr(std::is_same_v<D, PublisherNodeInterfaces>)
return nodelike;
else if constexpr(std::is_same_v<D, std::shared_ptr<rclcpp::Node>> ||
std::is_same_v<D, std::shared_ptr<rclcpp_lifecycle::LifecycleNode>>)
return PublisherNodeInterfaces(*nodelike);
else
return PublisherNodeInterfaces(nodelike);
}

std::unordered_map<std::string, Schema> schemas_;
Mutex schema_mutex_;

Expand All @@ -47,6 +68,9 @@ class ROS2PublisherSink : public DataSinkBase

bool schema_changed_ = true;
data_tamer_msgs::msg::Snapshot data_msg_;

// ---- Stored node façade ----
PublisherNodeInterfaces node_interface_;
};

} // namespace DataTamer
12 changes: 0 additions & 12 deletions data_tamer_cpp/src/sinks/ros2_publisher_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,6 @@

namespace DataTamer
{
ROS2PublisherSink::ROS2PublisherSink(std::shared_ptr<rclcpp::Node> node,
const std::string& topic_prefix)
{
create_publishers(node, topic_prefix);
}

ROS2PublisherSink::ROS2PublisherSink(
std::shared_ptr<rclcpp_lifecycle::LifecycleNode> node,
const std::string& topic_prefix)
{
create_publishers(node, topic_prefix);
}

void ROS2PublisherSink::addChannel(const std::string& channel_name, const Schema& schema)
{
Expand Down
2 changes: 2 additions & 0 deletions data_tamer_cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ if(gtest_vendor_FOUND AND ament_cmake_gtest_FOUND)
dt_tests.cpp
custom_types_tests.cpp
parser_tests.cpp
ros2_publisher_tests.cpp
trait_tests.cpp)

target_include_directories(datatamer_test
Expand All @@ -28,6 +29,7 @@ else()
add_executable(datatamer_test
dt_tests.cpp
custom_types_tests.cpp
ros2_publisher_tests.cpp
parser_tests.cpp)
gtest_discover_tests(datatamer_test DISCOVERY_MODE PRE_TEST)

Expand Down
93 changes: 93 additions & 0 deletions data_tamer_cpp/tests/ros2_publisher_tests.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#include "data_tamer/data_tamer.hpp"
#include "data_tamer/sinks/dummy_sink.hpp"
#include "data_tamer/sinks/ros2_publisher_sink.hpp"

#include <gtest/gtest.h>

#include <filesystem>
#include <variant>
#include <string>
#include <thread>

using namespace DataTamer;

TEST(DataTamerROS2Publisher, SharedPointer)
{
auto node = std::make_shared<rclcpp::Node>("test_datatamer_shared_pointer");
auto ros2_sink = std::make_shared<ROS2PublisherSink>(node, "test_shared_pointer");

auto channel = ChannelsRegistry::Global().getChannel("channel_shared_pointer");

channel->addDataSink(ros2_sink);

double const value = 1.;
auto id_value = channel->registerValue("value", &value);

EXPECT_TRUE(channel->takeSnapshot());
}

TEST(DataTamerROS2Publisher, SharedPointerLifeCycle)
{
auto lifecycle_node = std::make_shared<rclcpp_lifecycle::LifecycleNode>("test_"
"datatamer_"
"shared_"
"pointer_"
"lifecycle");
auto ros2_sink = std::make_shared<ROS2PublisherSink>(lifecycle_node, "test_shared_"
"pointer_"
"lifecycle");

auto channel = ChannelsRegistry::Global().getChannel("channel_shared_pointer_"
"lifecycle");

channel->addDataSink(ros2_sink);

double const value = 1.;
auto id_value = channel->registerValue("value", &value);

EXPECT_TRUE(channel->takeSnapshot());
}

TEST(DataTamerROS2Publisher, Dereference)
{
auto node = std::make_shared<rclcpp::Node>("test_datatamer_dereference");
auto ros2_sink = std::make_shared<ROS2PublisherSink>(*node, "test_dereference");

auto channel = ChannelsRegistry::Global().getChannel("channel_dereference");

channel->addDataSink(ros2_sink);

double const value = 1.;
auto id_value = channel->registerValue("value", &value);

EXPECT_TRUE(channel->takeSnapshot());
}

TEST(DataTamerROS2Publisher, DereferenceLifeCycle)
{
auto lifecycle_node = std::make_shared<rclcpp_lifecycle::LifecycleNode>("test_"
"datatamer_"
"dereference_"
"lifecycle");
auto ros2_sink = std::make_shared<ROS2PublisherSink>(*lifecycle_node, "test_"
"dereference_"
"lifecycle");

auto channel = ChannelsRegistry::Global().getChannel("channel_dereference_lifecycle");

channel->addDataSink(ros2_sink);

double const value = 1.;
auto id_value = channel->registerValue("value", &value);

EXPECT_TRUE(channel->takeSnapshot());
}

int main(int argc, char** argv)
{
rclcpp::init(argc, argv);
::testing::InitGoogleTest(&argc, argv);
int ret = RUN_ALL_TESTS();
rclcpp::shutdown();
return ret;
}
Loading