From 9bf0de1345fc49f0bf42548c6201abc3657cbbf8 Mon Sep 17 00:00:00 2001 From: Eric Flumerfelt Date: Mon, 1 Dec 2025 16:11:13 -0600 Subject: [PATCH 1/5] Have the ZmqSender create multiple Zmq sockets for local endpoints, load balancing them by bytes sent. --- plugins/ZmqSender.cpp | 155 +++++++++++++++++++++++------------- test/apps/zmq_send.cpp | 2 +- unittest/ZmqPubSub_test.cxx | 17 ++-- unittest/ZmqSender_test.cxx | 29 ++++--- 4 files changed, 125 insertions(+), 78 deletions(-) diff --git a/plugins/ZmqSender.cpp b/plugins/ZmqSender.cpp index abcc2d1..4172b7d 100644 --- a/plugins/ZmqSender.cpp +++ b/plugins/ZmqSender.cpp @@ -11,6 +11,7 @@ #include "ipm/ZmqContext.hpp" #include "logging/Logging.hpp" +#include "utilities/ZmqUri.hpp" #include "zmq.hpp" #include @@ -20,70 +21,54 @@ namespace dunedaq::ipm { class ZmqSender : public Sender { public: - ZmqSender() - : m_socket(ZmqContext::instance().GetContext(), zmq::socket_type::push) + struct Sockinfo_t { - } + zmq::socket_t socket; + size_t bytes_sent{ 0 }; + std::string connection_string; + }; + + ZmqSender() {} ~ZmqSender() { // Probably (cpp)zmq does this in the socket dtor anyway, but I guess it doesn't hurt to be explicit - if (m_connection_string != "" && m_socket_connected) { - try { - m_socket.disconnect(m_connection_string); - m_socket_connected = false; - } catch (zmq::error_t const& err) { - ers::error(ZmqOperationError(ERS_HERE, "disconnect", "send", err.what(), m_connection_string)); + if (can_send()) { + for (auto& sock : m_sockets) { + try { + sock->socket.disconnect(sock->connection_string); + sock->socket.close(); + } catch (zmq::error_t const& err) { + ers::error(ZmqOperationError(ERS_HERE, "disconnect", "send", err.what(), sock->connection_string)); + } } + m_sockets.clear(); } - m_socket.close(); } - bool can_send() const noexcept override { return m_socket_connected; } + bool can_send() const noexcept override { return m_sockets.size() > 0; } + std::string connect_for_sends(const nlohmann::json& connection_info) override { - try { - m_socket.set(zmq::sockopt::sndtimeo, 0); // Return immediately if we can't send - } catch (zmq::error_t const& err) { - throw ZmqOperationError(ERS_HERE, - "set timeout", - "send", - err.what(), - connection_info.value("connection_string", "inproc://default")); - } - + auto send_endpoints = connection_info.value>("send_endpoints", {}); auto hwm = connection_info.value("capacity", 0); - if (hwm > 0) { - try { - m_socket.set(zmq::sockopt::sndhwm, hwm); - } catch (zmq::error_t const& err) { - throw ZmqOperationError(ERS_HERE, - "set hwm", - "send", - err.what(), - connection_info.value("connection_string", "inproc://default")); + m_connection_string = connection_info.value("connection_string", "inproc://default"); + + if (send_endpoints.size() > 0) { + auto base_uri = utilities::ZmqUri(m_connection_string); + for (auto& endpoint : send_endpoints) { + auto connection_uri = base_uri; + + if (endpoint.find(":") != std::string::npos) { + connection_uri.endpoint_host = endpoint.substr(0, endpoint.find(":")); + connection_uri.endpoint_port = endpoint.substr(endpoint.find(":") + 1); + } else { + connection_uri.endpoint_host = endpoint; + } + create_and_connect_socket(connection_uri, hwm); } - } - - auto connection_string = connection_info.value("connection_string", "inproc://default"); - - TLOG_DEBUG(TLVL_CONNECTIONSTRING) << "Connection String is " << connection_string; - try { - m_socket.set(zmq::sockopt::immediate, 1); // Don't queue messages to incomplete connections - } catch (zmq::error_t const& err) { - throw ZmqOperationError(ERS_HERE, "set immediate mode", "send", err.what(), connection_string); - } - - try { - m_socket.connect(connection_string); - m_connection_string = m_socket.get(zmq::sockopt::last_endpoint); - m_socket_connected = true; - } catch (zmq::error_t const& err) { - ers::error(ZmqOperationError(ERS_HERE, "connect", "send", err.what(), connection_string)); - } - - if (!m_socket_connected) { - throw ZmqOperationError(ERS_HERE, "connect", "send", "Operation failed for all resolved connection strings", ""); + } else { + create_and_connect_socket(utilities::ZmqUri(m_connection_string), hwm); } return m_connection_string; } @@ -99,11 +84,14 @@ class ZmqSender : public Sender << "Endpoint " << m_connection_string << ": Starting send of " << N << " bytes"; auto start_time = std::chrono::steady_clock::now(); zmq::send_result_t res{}; + + auto socket = get_socket(); + do { zmq::message_t topic_msg(topic.c_str(), topic.size()); try { - res = m_socket.send(topic_msg, zmq::send_flags::sndmore); + res = socket->socket.send(topic_msg, zmq::send_flags::sndmore); } catch (zmq::error_t const& err) { throw ZmqSendError(ERS_HERE, err.what(), topic.size(), topic); } @@ -115,7 +103,8 @@ class ZmqSender : public Sender zmq::message_t msg(message, N); try { - res = m_socket.send(msg, zmq::send_flags::none); + res = socket->socket.send(msg, zmq::send_flags::none); + socket->bytes_sent += N; } catch (zmq::error_t const& err) { throw ZmqSendError(ERS_HERE, err.what(), N, topic); } @@ -135,9 +124,65 @@ class ZmqSender : public Sender } private: - zmq::socket_t m_socket; + void create_and_connect_socket(utilities::ZmqUri connection_uri, int capacity) + { + auto conn_uri = connection_uri.to_string(); + std::shared_ptr info = std::make_shared(); + info->socket = zmq::socket_t(ZmqContext::instance().GetContext(), zmq::socket_type::push); + info->bytes_sent = 0; + info->connection_string = conn_uri; + + try { + info->socket.set(zmq::sockopt::sndtimeo, 0); // Return immediately if we can't send + } catch (zmq::error_t const& err) { + throw ZmqOperationError(ERS_HERE, "set timeout", "send", err.what(), conn_uri); + } + + if (capacity > 0) { + try { + info->socket.set(zmq::sockopt::sndhwm, capacity); + } catch (zmq::error_t const& err) { + throw ZmqOperationError(ERS_HERE, "set hwm", "send", err.what(), conn_uri); + } + } + + TLOG_DEBUG(TLVL_CONNECTIONSTRING) << "Connection String is " << conn_uri; + try { + info->socket.set(zmq::sockopt::immediate, 1); // Don't queue messages to incomplete connections + } catch (zmq::error_t const& err) { + throw ZmqOperationError(ERS_HERE, "set immediate mode", "send", err.what(), conn_uri); + } + + try { + info->socket.connect(conn_uri); + } catch (zmq::error_t const& err) { + throw ZmqOperationError(ERS_HERE, "connect", "send", err.what(), conn_uri); + } + + m_sockets.push_back(info); + } + + std::shared_ptr get_socket() + { + // Typical case, so don't iterate + if (m_sockets.size() == 1) { + return m_sockets.front(); + } + size_t min_sent = -1; + std::shared_ptr sock = nullptr; + + for (auto& sock_info : m_sockets) { + if (sock_info->bytes_sent < min_sent) { + min_sent = sock_info->bytes_sent; + sock = sock_info; + } + } + + return sock; + } + + std::vector> m_sockets; std::string m_connection_string; - bool m_socket_connected{ false }; }; } // namespace dunedaq::ipm diff --git a/test/apps/zmq_send.cpp b/test/apps/zmq_send.cpp index e490837..78787bb 100644 --- a/test/apps/zmq_send.cpp +++ b/test/apps/zmq_send.cpp @@ -26,7 +26,7 @@ main(int argc, char* argv[]) uint32_t npackets = 1; // NOLINT(build/unsigned) size_t packetSize = 100; size_t interval = 0; - std::string conString = "tcp://127.0.0.1:12345"; + std::string conString = "tcp://127.0.0.2:*;127.0.0.1:12345"; int nthreads = 1; uint32_t id = 0; // NOLINT(build/unsigned) diff --git a/unittest/ZmqPubSub_test.cxx b/unittest/ZmqPubSub_test.cxx index b0c7417..8230585 100644 --- a/unittest/ZmqPubSub_test.cxx +++ b/unittest/ZmqPubSub_test.cxx @@ -40,7 +40,7 @@ BOOST_AUTO_TEST_CASE(SendReceiveTest) BOOST_REQUIRE(!the_sender->can_send()); nlohmann::json config_json; - config_json["connection_string"] = "inproc://default"; + config_json["connection_string"] = "inproc://sendreceive"; the_sender->connect_for_sends(config_json); the_receiver->connect_for_receives(config_json); @@ -59,7 +59,7 @@ BOOST_AUTO_TEST_CASE(SendReceiveTest) [&](dunedaq::ipm::ReceiveTimeoutExpired) { return elapsed_time_milliseconds(before_recv) >= 100; }); the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic"); - auto response = the_receiver->receive(Receiver::s_block); + auto response = the_receiver->receive(std::chrono::milliseconds(100)); BOOST_REQUIRE_EQUAL(response.data.size(), 4); BOOST_REQUIRE_EQUAL(response.data[0], 'T'); BOOST_REQUIRE_EQUAL(response.data[1], 'E'); @@ -86,7 +86,7 @@ BOOST_AUTO_TEST_CASE(CallbackTest) BOOST_REQUIRE(!the_sender->can_send()); nlohmann::json config_json; - config_json["connection_string"] = "inproc://default"; + config_json["connection_string"] = "inproc://callback"; the_sender->connect_for_sends(config_json); the_receiver->connect_for_receives(config_json); @@ -115,9 +115,8 @@ BOOST_AUTO_TEST_CASE(CallbackTest) message_received = false; test_data = { 'A', 'N', 'O', 'T', 'H', 'E', 'R', ' ', 'T', 'E', 'S', 'T' }; the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic"); - while (!message_received.load()) { - usleep(15000); - } + usleep(100000); + BOOST_REQUIRE_EQUAL(message_received, true); message_received = false; test_data = { 'A', ' ', 'T', 'H', 'I', 'R', 'D', ' ', 'T', 'E', 'S', 'T' }; @@ -132,7 +131,7 @@ BOOST_AUTO_TEST_CASE(CallbackTest) usleep(100000); BOOST_REQUIRE_EQUAL(message_received, false); - auto response = the_receiver->receive(Receiver::s_block); + auto response = the_receiver->receive(std::chrono::milliseconds(1000)); BOOST_REQUIRE_EQUAL(response.data.size(), test_data.size()); } @@ -156,7 +155,7 @@ BOOST_AUTO_TEST_CASE(MultiplePublishers) std::vector test_data{ 'T', 'E', 'S', 'T' }; first_publisher->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic"); - auto response = the_subscriber->receive(Receiver::s_block); + auto response = the_subscriber->receive(std::chrono::milliseconds(1000)); BOOST_REQUIRE_EQUAL(response.data.size(), 4); BOOST_REQUIRE_EQUAL(response.data[0], 'T'); BOOST_REQUIRE_EQUAL(response.data[1], 'E'); @@ -164,7 +163,7 @@ BOOST_AUTO_TEST_CASE(MultiplePublishers) BOOST_REQUIRE_EQUAL(response.data[3], 'T'); second_publisher->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic"); - auto response2 = the_subscriber->receive(Receiver::s_block); + auto response2 = the_subscriber->receive(std::chrono::milliseconds(1000)); BOOST_REQUIRE_EQUAL(response2.data.size(), 4); BOOST_REQUIRE_EQUAL(response2.data[0], 'T'); BOOST_REQUIRE_EQUAL(response2.data[1], 'E'); diff --git a/unittest/ZmqSender_test.cxx b/unittest/ZmqSender_test.cxx index 3b2af34..b6407ed 100644 --- a/unittest/ZmqSender_test.cxx +++ b/unittest/ZmqSender_test.cxx @@ -9,6 +9,8 @@ #include "ipm/Sender.hpp" #include "ipm/ZmqContext.hpp" +#include "utilities/Issues.hpp" + #define BOOST_TEST_MODULE ZmqSender_test // NOLINT #include "boost/test/unit_test.hpp" @@ -23,34 +25,35 @@ BOOST_AUTO_TEST_SUITE(ZmqSender_test) BOOST_AUTO_TEST_CASE(BasicTests) { auto the_sender = make_ipm_sender("ZmqSender"); - BOOST_REQUIRE(the_sender != nullptr); - BOOST_REQUIRE(!the_sender->can_send()); + BOOST_CHECK(the_sender != nullptr); + BOOST_CHECK(!the_sender->can_send()); } BOOST_AUTO_TEST_CASE(Errors) { auto the_sender = make_ipm_sender("ZmqSender"); - BOOST_REQUIRE(the_sender != nullptr); - BOOST_REQUIRE(!the_sender->can_send()); + BOOST_CHECK(the_sender != nullptr); + BOOST_CHECK(!the_sender->can_send()); nlohmann::json config_json; config_json["connection_string"] = "not a uri"; - BOOST_REQUIRE_EXCEPTION(the_sender->connect_for_sends(config_json), ZmqOperationError, [&](ZmqOperationError e) { - return std::string(e.what()).find("Operation failed for all resolved connection strings") != std::string::npos; + BOOST_CHECK_EXCEPTION( + the_sender->connect_for_sends(config_json), dunedaq::utilities::InvalidUri, [&](dunedaq::utilities::InvalidUri e) { + return std::string(e.what()).find("not a uri") != std::string::npos; }); - BOOST_REQUIRE(!the_sender->can_send()); + BOOST_CHECK(!the_sender->can_send()); config_json["connection_string"] = "tcp://thishostddoesnotexist"; - BOOST_REQUIRE_EXCEPTION(the_sender->connect_for_sends(config_json), ZmqOperationError, [&](ZmqOperationError e) { - return std::string(e.what()).find("Operation failed for all resolved connection strings") != std::string::npos; + BOOST_CHECK_EXCEPTION(the_sender->connect_for_sends(config_json), ZmqOperationError, [&](ZmqOperationError e) { + return std::string(e.what()).find("thishostddoesnotexist") != std::string::npos; }); - BOOST_REQUIRE(!the_sender->can_send()); + BOOST_CHECK(!the_sender->can_send()); config_json["connection_string"] = "badproto://default"; - BOOST_REQUIRE_EXCEPTION(the_sender->connect_for_sends(config_json), ZmqOperationError, [&](ZmqOperationError e) { - return std::string(e.what()).find("Operation failed for all resolved connection strings") != std::string::npos; + BOOST_CHECK_EXCEPTION(the_sender->connect_for_sends(config_json), ZmqOperationError, [&](ZmqOperationError e) { + return std::string(e.what()).find("badproto") != std::string::npos; }); - BOOST_REQUIRE(!the_sender->can_send()); + BOOST_CHECK(!the_sender->can_send()); } BOOST_AUTO_TEST_SUITE_END() From 8638f697e4671e88e08184fc6953145b79250cf8 Mon Sep 17 00:00:00 2001 From: Eric Flumerfelt Date: Wed, 14 Jan 2026 14:25:12 -0600 Subject: [PATCH 2/5] Run dbt-clang-format --- include/ipm/ZmqContext.hpp | 9 +++++---- plugins/ZmqPublisher.cpp | 7 +++++-- unittest/ZmqSender_test.cxx | 8 ++++---- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/include/ipm/ZmqContext.hpp b/include/ipm/ZmqContext.hpp index 9cce11d..f112220 100644 --- a/include/ipm/ZmqContext.hpp +++ b/include/ipm/ZmqContext.hpp @@ -152,10 +152,11 @@ class ZmqContext set_context_maxsockets(s_minimum_sockets); } } - ~ZmqContext() { - TLOG_DEBUG(TLVL_ZMQCONTEXT) << "Closing ZMQ Context"; - m_context.close(); - TLOG_DEBUG(TLVL_ZMQCONTEXT) << "ZMQ Context closed"; + ~ZmqContext() + { + TLOG_DEBUG(TLVL_ZMQCONTEXT) << "Closing ZMQ Context"; + m_context.close(); + TLOG_DEBUG(TLVL_ZMQCONTEXT) << "ZMQ Context closed"; } zmq::context_t m_context; static constexpr int s_minimum_sockets = 16636; diff --git a/plugins/ZmqPublisher.cpp b/plugins/ZmqPublisher.cpp index 0887d95..9ac2765 100644 --- a/plugins/ZmqPublisher.cpp +++ b/plugins/ZmqPublisher.cpp @@ -34,9 +34,12 @@ class ZmqPublisher : public Sender TLOG_DEBUG(TLVL_ZMQPUBLISHER_DESTRUCTOR) << "Setting socket HWM to zero"; m_socket.set(zmq::sockopt::sndhwm, 1); - TLOG_DEBUG(TLVL_ZMQPUBLISHER_DESTRUCTOR) << "Waiting up to 10s for socket to become writable before disconnecting"; + TLOG_DEBUG(TLVL_ZMQPUBLISHER_DESTRUCTOR) + << "Waiting up to 10s for socket to become writable before disconnecting"; auto start_time = std::chrono::steady_clock::now(); - while (std::chrono::duration_cast(std::chrono::steady_clock::now() - start_time).count() < 10000) { + while ( + std::chrono::duration_cast(std::chrono::steady_clock::now() - start_time).count() < + 10000) { auto events = m_socket.get(zmq::sockopt::events); if ((events & ZMQ_POLLOUT) != 0) { break; diff --git a/unittest/ZmqSender_test.cxx b/unittest/ZmqSender_test.cxx index b6407ed..c7d7203 100644 --- a/unittest/ZmqSender_test.cxx +++ b/unittest/ZmqSender_test.cxx @@ -40,16 +40,16 @@ BOOST_AUTO_TEST_CASE(Errors) config_json["connection_string"] = "not a uri"; BOOST_CHECK_EXCEPTION( the_sender->connect_for_sends(config_json), dunedaq::utilities::InvalidUri, [&](dunedaq::utilities::InvalidUri e) { - return std::string(e.what()).find("not a uri") != std::string::npos; - }); + return std::string(e.what()).find("not a uri") != std::string::npos; + }); BOOST_CHECK(!the_sender->can_send()); - + config_json["connection_string"] = "tcp://thishostddoesnotexist"; BOOST_CHECK_EXCEPTION(the_sender->connect_for_sends(config_json), ZmqOperationError, [&](ZmqOperationError e) { return std::string(e.what()).find("thishostddoesnotexist") != std::string::npos; }); BOOST_CHECK(!the_sender->can_send()); - + config_json["connection_string"] = "badproto://default"; BOOST_CHECK_EXCEPTION(the_sender->connect_for_sends(config_json), ZmqOperationError, [&](ZmqOperationError e) { return std::string(e.what()).find("badproto") != std::string::npos; From e5f0f896cfa81b72b2145e63add2aa0b9fdd7904 Mon Sep 17 00:00:00 2001 From: Eric Flumerfelt Date: Tue, 10 Feb 2026 12:54:59 -0600 Subject: [PATCH 3/5] Fix disconnect for multiple sockets --- plugins/ZmqSender.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/plugins/ZmqSender.cpp b/plugins/ZmqSender.cpp index 5ab5055..fc884f8 100644 --- a/plugins/ZmqSender.cpp +++ b/plugins/ZmqSender.cpp @@ -37,25 +37,25 @@ class ZmqSender : public Sender for (auto& sock : m_sockets) { try { TLOG_DEBUG(TLVL_ZMQSENDER_DESTRUCTOR) << "Setting socket HWM to zero"; - m_socket.set(zmq::sockopt::sndhwm, 1); + sock->socket.set(zmq::sockopt::sndhwm, 1); TLOG_DEBUG(TLVL_ZMQSENDER_DESTRUCTOR) << "Waiting up to 10s for socket to become writable before disconnecting"; auto start_time = std::chrono::steady_clock::now(); while (std::chrono::duration_cast(std::chrono::steady_clock::now() - start_time) .count() < 10000) { - auto events = m_socket.get(zmq::sockopt::events); + auto events = sock->socket.get(zmq::sockopt::events); if ((events & ZMQ_POLLOUT) != 0) { break; } usleep(1000); } - TLOG_DEBUG(TLVL_ZMQSENDER_DESTRUCTOR) << "Disconnecting socket from " << m_connection_string; + TLOG_DEBUG(TLVL_ZMQSENDER_DESTRUCTOR) << "Disconnecting socket from " << sock->connection_string; - m_socket.disconnect(m_connection_string); - m_socket_connected = false; + sock->socket.disconnect(sock->connection_string); + sock->socket.close(); } catch (zmq::error_t const& err) { - ers::error(ZmqOperationError(ERS_HERE, "disconnect", "send", err.what(), m_connection_string)); + ers::error(ZmqOperationError(ERS_HERE, "disconnect", "send", err.what(), sock->connection_string)); } } m_sockets.clear(); From e49785adf40125f8c53c13830f8ed76e71c31860 Mon Sep 17 00:00:00 2001 From: Eric Flumerfelt Date: Tue, 10 Feb 2026 14:30:27 -0600 Subject: [PATCH 4/5] Run dbt-clang-format.sh --- include/ipm/Receiver.hpp | 15 ++++++++------- include/ipm/Sender.hpp | 3 ++- include/ipm/ZmqContext.hpp | 6 +++--- plugins/ZmqReceiver.cpp | 11 ++++++----- unittest/ZmqReceiver_test.cxx | 5 +++-- 5 files changed, 22 insertions(+), 18 deletions(-) diff --git a/include/ipm/Receiver.hpp b/include/ipm/Receiver.hpp index 34090b5..91947c1 100644 --- a/include/ipm/Receiver.hpp +++ b/include/ipm/Receiver.hpp @@ -43,7 +43,8 @@ ERS_DECLARE_ISSUE(ipm, ((std::string)connection_name)((int)bytes1)((int)bytes2)) // NOLINT ERS_DECLARE_ISSUE(ipm, ReceiveTimeoutExpired, - connection_name << ": Unable to receive within timeout period (timeout period was " << timeout << " milliseconds)", + connection_name << ": Unable to receive within timeout period (timeout period was " << timeout + << " milliseconds)", ((std::string)connection_name)((int)timeout)) // NOLINT // Reenable coverage collection LCOV_EXCL_STOP } // namespace dunedaq @@ -74,12 +75,12 @@ class Receiver : public opmonlib::MonitorableObject { public: - struct ConnectionInfo - { - std::string connection_name{ "" }; - std::string connection_string{ "" }; - std::vector connection_strings{}; - }; + struct ConnectionInfo + { + std::string connection_name{ "" }; + std::string connection_string{ "" }; + std::vector connection_strings{}; + }; using duration_t = std::chrono::milliseconds; static constexpr duration_t s_block = duration_t::max(); static constexpr duration_t s_no_block = duration_t::zero(); diff --git a/include/ipm/Sender.hpp b/include/ipm/Sender.hpp index bf5f1d6..1dbdd23 100644 --- a/include/ipm/Sender.hpp +++ b/include/ipm/Sender.hpp @@ -43,7 +43,8 @@ ERS_DECLARE_ISSUE(ipm, ((std::string)connection_name)) ERS_DECLARE_ISSUE(ipm, SendTimeoutExpired, - connection_name << ": Unable to send within timeout period (timeout period was " << timeout << " milliseconds)", + connection_name << ": Unable to send within timeout period (timeout period was " << timeout + << " milliseconds)", ((std::string)connection_name)((int)timeout)) // NOLINT // Reenable coverage collection LCOV_EXCL_STOP diff --git a/include/ipm/ZmqContext.hpp b/include/ipm/ZmqContext.hpp index defefcf..78cd201 100644 --- a/include/ipm/ZmqContext.hpp +++ b/include/ipm/ZmqContext.hpp @@ -65,10 +65,10 @@ ERS_DECLARE_ISSUE(ipm, */ ERS_DECLARE_ISSUE( ipm, - ZmqSendError, + ZmqSendError, connection_name << ": An exception occurred while sending " << N << " bytes to " << topic << ": " << what, ((std::string)connection_name)((const char*)what)((int)N)((std::string)topic)) // NOLINT - /// @endcond LCOV_EXCL_STOP + /// @endcond LCOV_EXCL_STOP /** * @brief An ERS Error indicating that an exception was thrown from ZMQ while receiving @@ -80,7 +80,7 @@ ERS_DECLARE_ISSUE(ipm, ZmqReceiveError, connection_name << ": An exception occured while receiving " << which << ": " << what, ((std::string)connection_name)((const char*)what)((const char*)which)) // NOLINT - /// @endcond LCOV_EXCL_STOP + /// @endcond LCOV_EXCL_STOP /** * @brief An ERS Error indicating that an exception was thrown from ZMQ during a subscribe diff --git a/plugins/ZmqReceiver.cpp b/plugins/ZmqReceiver.cpp index 902a4ce..1c3f5dd 100644 --- a/plugins/ZmqReceiver.cpp +++ b/plugins/ZmqReceiver.cpp @@ -53,7 +53,8 @@ class ZmqReceiver : public Receiver try { m_socket.set(zmq::sockopt::rcvtimeo, 0); // Return immediately if we can't receive } catch (zmq::error_t const& err) { - throw ZmqOperationError(ERS_HERE, m_connection_info.connection_name, + throw ZmqOperationError(ERS_HERE, + m_connection_info.connection_name, "set timeout", "receive", err.what(), @@ -94,7 +95,8 @@ class ZmqReceiver : public Receiver m_connection_info.connection_string); } for (auto& connection_string : resolved) { - TLOG_DEBUG(TLVL_CONNECTIONSTRING) << m_connection_info.connection_name << ": Connection String is " << connection_string; + TLOG_DEBUG(TLVL_CONNECTIONSTRING) << m_connection_info.connection_name << ": Connection String is " + << connection_string; try { m_socket.bind(connection_string); m_connection_info.connection_string = m_socket.get(zmq::sockopt::last_endpoint); @@ -142,7 +144,7 @@ class ZmqReceiver : public Receiver TLOG_DEBUG(TLVL_ZMQRECEIVER_RECV_HDR) << m_connection_info.connection_name << ": Going to receive header"; res = m_socket.recv(hdr); TLOG_DEBUG(TLVL_ZMQRECEIVER_RECV_HDR_2) << m_connection_info.connection_name << ": Recv res=" << res.value_or(0) - << " for header (hdr.size() == " << hdr.size() << ")"; + << " for header (hdr.size() == " << hdr.size() << ")"; } catch (zmq::error_t const& err) { throw ZmqReceiveError(ERS_HERE, m_connection_info.connection_name, err.what(), "header"); } @@ -174,8 +176,7 @@ class ZmqReceiver : public Receiver } TLOG_DEBUG(TLVL_ZMQRECEIVER_RECV_END) - << m_connection_info.connection_name << ": Returning output with metadata size " - << output.metadata.size() + << m_connection_info.connection_name << ": Returning output with metadata size " << output.metadata.size() << " and data size " << output.data.size(); return output; } diff --git a/unittest/ZmqReceiver_test.cxx b/unittest/ZmqReceiver_test.cxx index 8a822a6..9ccad89 100644 --- a/unittest/ZmqReceiver_test.cxx +++ b/unittest/ZmqReceiver_test.cxx @@ -64,9 +64,10 @@ BOOST_AUTO_TEST_CASE(Errors) config.connection_name = "timeout_test"; the_receiver->connect_for_receives(config); BOOST_REQUIRE(the_receiver->can_receive()); - BOOST_REQUIRE_EXCEPTION(the_receiver->receive(Receiver::s_no_block), ReceiveTimeoutExpired, [&](ReceiveTimeoutExpired e) { + BOOST_REQUIRE_EXCEPTION( + the_receiver->receive(Receiver::s_no_block), ReceiveTimeoutExpired, [&](ReceiveTimeoutExpired e) { TLOG() << e.what(); - return std::string(e.what()).find("Unable to receive within timeout period") != std::string::npos; + return std::string(e.what()).find("Unable to receive within timeout period") != std::string::npos; }); } BOOST_AUTO_TEST_SUITE_END() From 050dad0e243cae26e776c2f017d642f6f832fa47 Mon Sep 17 00:00:00 2001 From: Eric Flumerfelt Date: Fri, 20 Mar 2026 16:27:32 -0500 Subject: [PATCH 5/5] Refocus changes on binding sender to a specific interface if requested --- include/ipm/Sender.hpp | 2 +- plugins/ZmqSender.cpp | 193 +++++++++++++++++------------------------ 2 files changed, 82 insertions(+), 113 deletions(-) diff --git a/include/ipm/Sender.hpp b/include/ipm/Sender.hpp index 1dbdd23..c8fe5d3 100644 --- a/include/ipm/Sender.hpp +++ b/include/ipm/Sender.hpp @@ -81,7 +81,7 @@ class Sender : public opmonlib::MonitorableObject std::string connection_name{ "" }; std::string connection_string{ "inproc://default" }; int capacity{ 0 }; - std::vector send_endpoints{}; + std::string send_endpoint{ "" }; }; using duration_t = std::chrono::milliseconds; static constexpr duration_t s_block = duration_t::max(); diff --git a/plugins/ZmqSender.cpp b/plugins/ZmqSender.cpp index a7a6e84..2c38591 100644 --- a/plugins/ZmqSender.cpp +++ b/plugins/ZmqSender.cpp @@ -21,74 +21,104 @@ namespace dunedaq::ipm { class ZmqSender : public Sender { public: - struct Sockinfo_t + ZmqSender() + : m_socket(ZmqContext::instance().GetContext(), zmq::socket_type::push) { - zmq::socket_t socket; - size_t bytes_sent{ 0 }; - std::string connection_string; - }; - - ZmqSender() {} + } ~ZmqSender() { // Probably (cpp)zmq does this in the socket dtor anyway, but I guess it doesn't hurt to be explicit - if (can_send()) { - for (auto& sock : m_sockets) { - try { - TLOG_DEBUG(TLVL_ZMQSENDER_DESTRUCTOR) << m_connection_info.connection_name << ": Setting socket HWM to one"; - sock->socket.set(zmq::sockopt::sndhwm, 1); - - TLOG_DEBUG(TLVL_ZMQSENDER_DESTRUCTOR) - << m_connection_info.connection_name - << ": Waiting up to 10s for socket to become writable before disconnecting"; - auto start_time = std::chrono::steady_clock::now(); - while (std::chrono::duration_cast(std::chrono::steady_clock::now() - start_time) - .count() < 10000) { - auto events = sock->socket.get(zmq::sockopt::events); - if ((events & ZMQ_POLLOUT) != 0) { - break; - } - usleep(1000); + if (m_connection_info.connection_string != "" && m_socket_connected) { + try { + TLOG_DEBUG(TLVL_ZMQSENDER_DESTRUCTOR) << m_connection_info.connection_name << ": Setting socket HWM to one"; + m_socket.set(zmq::sockopt::sndhwm, 1); + + TLOG_DEBUG(TLVL_ZMQSENDER_DESTRUCTOR) + << m_connection_info.connection_name + << ": Waiting up to 10s for socket to become writable before disconnecting"; + auto start_time = std::chrono::steady_clock::now(); + while ( + std::chrono::duration_cast(std::chrono::steady_clock::now() - start_time).count() < + 10000) { + auto events = m_socket.get(zmq::sockopt::events); + if ((events & ZMQ_POLLOUT) != 0) { + break; } - - TLOG_DEBUG(TLVL_ZMQSENDER_DESTRUCTOR) - << m_connection_info.connection_name << ": Disconnecting socket from " << sock->connection_string; - - sock->socket.disconnect(sock->connection_string); - sock->socket.close(); - } catch (zmq::error_t const& err) { - ers::error(ZmqOperationError( - ERS_HERE, m_connection_info.connection_name, "disconnect", "send", err.what(), sock->connection_string)); + usleep(1000); } + TLOG_DEBUG(TLVL_ZMQSENDER_DESTRUCTOR) + << m_connection_info.connection_name << ": Disconnecting socket from " << m_connection_info.connection_string; + + m_socket.disconnect(m_connection_info.connection_string); + m_socket_connected = false; + } catch (zmq::error_t const& err) { + ers::error(ZmqOperationError(ERS_HERE, + m_connection_info.connection_name, + "disconnect", + "send", + err.what(), + m_connection_info.connection_string)); } - m_sockets.clear(); } + m_socket.close(); } - bool can_send() const noexcept override { return m_sockets.size() > 0; } - + bool can_send() const noexcept override { return m_socket_connected; } std::string connect_for_sends(const ConnectionInfo& connection_info) override { - m_connection_info = connection_info; + auto connection_string = m_connection_info.connection_string; + auto capacity = m_connection_info.capacity; + auto base_uri = utilities::ZmqUri(connection_info.connection_string); - if (connection_info.send_endpoints.size() > 0) { - auto base_uri = utilities::ZmqUri(connection_info.connection_string); - for (auto& endpoint : connection_info.send_endpoints) { + if (connection_info.send_endpoint != "") { auto connection_uri = base_uri; - if (endpoint.find(":") != std::string::npos) { - connection_uri.endpoint_host = endpoint.substr(0, endpoint.find(":")); - connection_uri.endpoint_port = endpoint.substr(endpoint.find(":") + 1); + if (connection_info.send_endpoint.find(":") != std::string::npos) { + connection_uri.endpoint_host = + connection_info.send_endpoint.substr(0, connection_info.send_endpoint.find(":")); + connection_uri.endpoint_port = + connection_info.send_endpoint.substr(connection_info.send_endpoint.find(":") + 1); } else { - connection_uri.endpoint_host = endpoint; + connection_uri.endpoint_host = connection_info.send_endpoint; } - create_and_connect_socket(connection_uri, connection_info.capacity); + connection_string = connection_uri.to_string(); + } + + TLOG_DEBUG(TLVL_CONNECTIONSTRING) << "Connection String is " << connection_string; + try { + m_socket.set(zmq::sockopt::sndtimeo, 0); // Return immediately if we can't send + } catch (zmq::error_t const& err) { + throw ZmqOperationError( + ERS_HERE, m_connection_info.connection_name, "set timeout", "send", err.what(), connection_string); + } + + if (capacity > 0) { + try { + m_socket.set(zmq::sockopt::sndhwm, capacity); + } catch (zmq::error_t const& err) { + throw ZmqOperationError( + ERS_HERE, m_connection_info.connection_name, "set hwm", "send", err.what(), connection_string); } - } else { - create_and_connect_socket(utilities::ZmqUri(connection_info.connection_string), connection_info.capacity); } + + try { + m_socket.set(zmq::sockopt::immediate, 1); // Don't queue messages to incomplete connections + } catch (zmq::error_t const& err) { + throw ZmqOperationError( + ERS_HERE, m_connection_info.connection_name, "set immediate mode", "send", err.what(), connection_string); + } + + try { + m_socket.connect(connection_string); + m_connection_info.connection_string = m_socket.get(zmq::sockopt::last_endpoint); + m_socket_connected = true; + } catch (zmq::error_t const& err) { + throw ZmqOperationError(ZmqOperationError( + ERS_HERE, m_connection_info.connection_name, "connect", "send", err.what(), connection_string)); + } + return m_connection_info.connection_string; } @@ -104,13 +134,11 @@ class ZmqSender : public Sender auto start_time = std::chrono::steady_clock::now(); zmq::send_result_t res{}; - auto socket = get_socket(); - do { zmq::message_t topic_msg(topic.c_str(), topic.size()); try { - res = socket->socket.send(topic_msg, zmq::send_flags::sndmore); + res = m_socket.send(topic_msg, zmq::send_flags::sndmore); } catch (zmq::error_t const& err) { throw ZmqSendError(ERS_HERE, m_connection_info.connection_name, err.what(), topic.size(), topic); } @@ -122,8 +150,7 @@ class ZmqSender : public Sender zmq::message_t msg(message, N); try { - res = socket->socket.send(msg, zmq::send_flags::none); - socket->bytes_sent += N; + res = m_socket.send(msg, zmq::send_flags::none); } catch (zmq::error_t const& err) { throw ZmqSendError(ERS_HERE, m_connection_info.connection_name, err.what(), N, topic); } @@ -142,66 +169,8 @@ class ZmqSender : public Sender } private: - void create_and_connect_socket(utilities::ZmqUri connection_uri, int capacity) - { - auto conn_uri = connection_uri.to_string(); - std::shared_ptr info = std::make_shared(); - info->socket = zmq::socket_t(ZmqContext::instance().GetContext(), zmq::socket_type::push); - info->bytes_sent = 0; - info->connection_string = conn_uri; - - try { - info->socket.set(zmq::sockopt::sndtimeo, 0); // Return immediately if we can't send - } catch (zmq::error_t const& err) { - throw ZmqOperationError(ERS_HERE, m_connection_info.connection_name, "set timeout", "send", err.what(), conn_uri); - } - - if (capacity > 0) { - try { - info->socket.set(zmq::sockopt::sndhwm, capacity); - } catch (zmq::error_t const& err) { - throw ZmqOperationError(ERS_HERE, m_connection_info.connection_name, "set hwm", "send", err.what(), conn_uri); - } - } - - TLOG_DEBUG(TLVL_CONNECTIONSTRING) << "Connection String is " << conn_uri; - try { - info->socket.set(zmq::sockopt::immediate, 1); // Don't queue messages to incomplete connections - } catch (zmq::error_t const& err) { - throw ZmqOperationError( - ERS_HERE, m_connection_info.connection_name, "set immediate mode", "send", err.what(), conn_uri); - } - - try { - info->socket.connect(conn_uri); - } catch (zmq::error_t const& err) { - throw ZmqOperationError(ERS_HERE, m_connection_info.connection_name, "connect", "send", err.what(), conn_uri); - } - - m_sockets.push_back(info); - } - - std::shared_ptr get_socket() - { - // Typical case, so don't iterate - if (m_sockets.size() == 1) { - return m_sockets.front(); - } - size_t min_sent = -1; - std::shared_ptr sock = nullptr; - - for (auto& sock_info : m_sockets) { - if (sock_info->bytes_sent < min_sent) { - min_sent = sock_info->bytes_sent; - sock = sock_info; - } - } - - return sock; - } - - std::vector> m_sockets; - std::string m_connection_string; + zmq::socket_t m_socket; + bool m_socket_connected{ false }; }; } // namespace dunedaq::ipm