diff --git a/include/ipm/Receiver.hpp b/include/ipm/Receiver.hpp index 0df1e46..54b39b6 100644 --- a/include/ipm/Receiver.hpp +++ b/include/ipm/Receiver.hpp @@ -43,7 +43,8 @@ ERS_DECLARE_ISSUE(ipm, ((std::string)connection_name)((int)bytes1)((int)bytes2)) // NOLINT ERS_DECLARE_ISSUE(ipm, ReceiveTimeoutExpired, - connection_name << ": Unable to receive within timeout period (timeout period was " << timeout << " milliseconds)", + connection_name << ": Unable to receive within timeout period (timeout period was " << timeout + << " milliseconds)", ((std::string)connection_name)((int)timeout)) // NOLINT // Reenable coverage collection LCOV_EXCL_STOP } // namespace dunedaq @@ -74,12 +75,12 @@ class Receiver : public opmonlib::MonitorableObject { public: - struct ConnectionInfo - { - std::string connection_name{ "" }; - std::string connection_string{ "" }; - std::vector connection_strings{}; - }; + struct ConnectionInfo + { + std::string connection_name{ "" }; + std::string connection_string{ "" }; + std::vector connection_strings{}; + }; using duration_t = std::chrono::milliseconds; static constexpr duration_t s_block = duration_t::max(); static constexpr duration_t s_no_block = duration_t::zero(); diff --git a/include/ipm/Sender.hpp b/include/ipm/Sender.hpp index b8d9bbd..c8fe5d3 100644 --- a/include/ipm/Sender.hpp +++ b/include/ipm/Sender.hpp @@ -43,7 +43,8 @@ ERS_DECLARE_ISSUE(ipm, ((std::string)connection_name)) ERS_DECLARE_ISSUE(ipm, SendTimeoutExpired, - connection_name << ": Unable to send within timeout period (timeout period was " << timeout << " milliseconds)", + connection_name << ": Unable to send within timeout period (timeout period was " << timeout + << " milliseconds)", ((std::string)connection_name)((int)timeout)) // NOLINT // Reenable coverage collection LCOV_EXCL_STOP @@ -80,6 +81,7 @@ class Sender : public opmonlib::MonitorableObject std::string connection_name{ "" }; std::string connection_string{ "inproc://default" }; int capacity{ 0 }; + std::string send_endpoint{ "" }; }; using duration_t = std::chrono::milliseconds; static constexpr duration_t s_block = duration_t::max(); diff --git a/plugins/ZmqReceiver.cpp b/plugins/ZmqReceiver.cpp index 94b7540..8c29d41 100644 --- a/plugins/ZmqReceiver.cpp +++ b/plugins/ZmqReceiver.cpp @@ -53,7 +53,8 @@ class ZmqReceiver : public Receiver try { m_socket.set(zmq::sockopt::rcvtimeo, 0); // Return immediately if we can't receive } catch (zmq::error_t const& err) { - throw ZmqOperationError(ERS_HERE, m_connection_info.connection_name, + throw ZmqOperationError(ERS_HERE, + m_connection_info.connection_name, "set timeout", "receive", err.what(), @@ -94,7 +95,8 @@ class ZmqReceiver : public Receiver m_connection_info.connection_string); } for (auto& connection_string : resolved) { - TLOG_DEBUG(TLVL_CONNECTIONSTRING) << m_connection_info.connection_name << ": Connection String is " << connection_string; + TLOG_DEBUG(TLVL_CONNECTIONSTRING) << m_connection_info.connection_name << ": Connection String is " + << connection_string; try { m_socket.bind(connection_string); m_connection_info.connection_string = m_socket.get(zmq::sockopt::last_endpoint); @@ -153,7 +155,7 @@ class ZmqReceiver : public Receiver TLOG_DEBUG(TLVL_ZMQRECEIVER_RECV_HDR) << m_connection_info.connection_name << ": Going to receive header"; res = m_socket.recv(hdr); TLOG_DEBUG(TLVL_ZMQRECEIVER_RECV_HDR_2) << m_connection_info.connection_name << ": Recv res=" << res.value_or(0) - << " for header (hdr.size() == " << hdr.size() << ")"; + << " for header (hdr.size() == " << hdr.size() << ")"; } catch (zmq::error_t const& err) { throw ZmqReceiveError(ERS_HERE, m_connection_info.connection_name, err.what(), "header"); } @@ -185,8 +187,7 @@ class ZmqReceiver : public Receiver } TLOG_DEBUG(TLVL_ZMQRECEIVER_RECV_END) - << m_connection_info.connection_name << ": Returning output with metadata size " - << output.metadata.size() + << m_connection_info.connection_name << ": Returning output with metadata size " << output.metadata.size() << " and data size " << output.data.size(); return output; } diff --git a/plugins/ZmqSender.cpp b/plugins/ZmqSender.cpp index 9b46e38..2c38591 100644 --- a/plugins/ZmqSender.cpp +++ b/plugins/ZmqSender.cpp @@ -11,6 +11,7 @@ #include "ipm/ZmqContext.hpp" #include "logging/Logging.hpp" +#include "utilities/ZmqUri.hpp" #include "zmq.hpp" #include @@ -30,10 +31,11 @@ class ZmqSender : public Sender // Probably (cpp)zmq does this in the socket dtor anyway, but I guess it doesn't hurt to be explicit if (m_connection_info.connection_string != "" && m_socket_connected) { try { - TLOG_DEBUG(TLVL_ZMQSENDER_DESTRUCTOR) << m_connection_info.connection_name << ": Setting socket HWM to zero"; + TLOG_DEBUG(TLVL_ZMQSENDER_DESTRUCTOR) << m_connection_info.connection_name << ": Setting socket HWM to one"; m_socket.set(zmq::sockopt::sndhwm, 1); - TLOG_DEBUG(TLVL_ZMQSENDER_DESTRUCTOR) << m_connection_info.connection_name + TLOG_DEBUG(TLVL_ZMQSENDER_DESTRUCTOR) + << m_connection_info.connection_name << ": Waiting up to 10s for socket to become writable before disconnecting"; auto start_time = std::chrono::steady_clock::now(); while ( @@ -48,7 +50,6 @@ class ZmqSender : public Sender TLOG_DEBUG(TLVL_ZMQSENDER_DESTRUCTOR) << m_connection_info.connection_name << ": Disconnecting socket from " << m_connection_info.connection_string; - m_socket.disconnect(m_connection_info.connection_string); m_socket_connected = false; } catch (zmq::error_t const& err) { @@ -67,34 +68,41 @@ class ZmqSender : public Sender std::string connect_for_sends(const ConnectionInfo& connection_info) override { m_connection_info = connection_info; + auto connection_string = m_connection_info.connection_string; + auto capacity = m_connection_info.capacity; + auto base_uri = utilities::ZmqUri(connection_info.connection_string); + + if (connection_info.send_endpoint != "") { + auto connection_uri = base_uri; + + if (connection_info.send_endpoint.find(":") != std::string::npos) { + connection_uri.endpoint_host = + connection_info.send_endpoint.substr(0, connection_info.send_endpoint.find(":")); + connection_uri.endpoint_port = + connection_info.send_endpoint.substr(connection_info.send_endpoint.find(":") + 1); + } else { + connection_uri.endpoint_host = connection_info.send_endpoint; + } + connection_string = connection_uri.to_string(); + } + + TLOG_DEBUG(TLVL_CONNECTIONSTRING) << "Connection String is " << connection_string; try { m_socket.set(zmq::sockopt::sndtimeo, 0); // Return immediately if we can't send } catch (zmq::error_t const& err) { - throw ZmqOperationError(ERS_HERE, - m_connection_info.connection_name, - "set timeout", - "send", - err.what(), - m_connection_info.connection_string); + throw ZmqOperationError( + ERS_HERE, m_connection_info.connection_name, "set timeout", "send", err.what(), connection_string); } - auto hwm = connection_info.capacity; - if (hwm > 0) { + if (capacity > 0) { try { - m_socket.set(zmq::sockopt::sndhwm, hwm); + m_socket.set(zmq::sockopt::sndhwm, capacity); } catch (zmq::error_t const& err) { - throw ZmqOperationError(ERS_HERE, - m_connection_info.connection_name, - "set hwm", - "send", - err.what(), - m_connection_info.connection_string); + throw ZmqOperationError( + ERS_HERE, m_connection_info.connection_name, "set hwm", "send", err.what(), connection_string); } } - auto connection_string = m_connection_info.connection_string; - - TLOG_DEBUG(TLVL_CONNECTIONSTRING) << "Connection String is " << connection_string; try { m_socket.set(zmq::sockopt::immediate, 1); // Don't queue messages to incomplete connections } catch (zmq::error_t const& err) { @@ -107,18 +115,10 @@ class ZmqSender : public Sender m_connection_info.connection_string = m_socket.get(zmq::sockopt::last_endpoint); m_socket_connected = true; } catch (zmq::error_t const& err) { - ers::error(ZmqOperationError( + throw ZmqOperationError(ZmqOperationError( ERS_HERE, m_connection_info.connection_name, "connect", "send", err.what(), connection_string)); } - if (!m_socket_connected) { - throw ZmqOperationError(ERS_HERE, - m_connection_info.connection_name, - "connect", - "send", - "Operation failed for all resolved connection strings", - ""); - } return m_connection_info.connection_string; } @@ -133,6 +133,7 @@ class ZmqSender : public Sender << m_connection_info.connection_name << ": Starting send of " << N << " bytes"; auto start_time = std::chrono::steady_clock::now(); zmq::send_result_t res{}; + do { zmq::message_t topic_msg(topic.c_str(), topic.size()); @@ -163,8 +164,7 @@ class ZmqSender : public Sender throw SendTimeoutExpired(ERS_HERE, m_connection_info.connection_name, timeout.count()); } - TLOG_DEBUG(TLVL_ZMQSENDER_SEND_END) << m_connection_info.connection_name << ": Completed send of " << N - << " bytes"; + TLOG_DEBUG(TLVL_ZMQSENDER_SEND_END) << m_connection_info.connection_name << ": Completed send of " << N << " bytes"; return res && res == N; } diff --git a/test/apps/zmq_send.cpp b/test/apps/zmq_send.cpp index 7556ab4..c2cf742 100644 --- a/test/apps/zmq_send.cpp +++ b/test/apps/zmq_send.cpp @@ -26,7 +26,7 @@ main(int argc, char* argv[]) uint32_t npackets = 1; // NOLINT(build/unsigned) size_t packetSize = 100; size_t interval = 0; - std::string conString = "tcp://127.0.0.1:12345"; + std::string conString = "tcp://127.0.0.2:*;127.0.0.1:12345"; int nthreads = 1; uint32_t id = 0; // NOLINT(build/unsigned) diff --git a/unittest/ZmqPubSub_test.cxx b/unittest/ZmqPubSub_test.cxx index 89220fd..81196bb 100644 --- a/unittest/ZmqPubSub_test.cxx +++ b/unittest/ZmqPubSub_test.cxx @@ -39,9 +39,9 @@ BOOST_AUTO_TEST_CASE(SendReceiveTest) BOOST_REQUIRE(the_sender != nullptr); BOOST_REQUIRE(!the_sender->can_send()); - Sender::ConnectionInfo sender_config("test_sender", "inproc://default"); - Receiver::ConnectionInfo receiver_config("test_receiver", "inproc://default"); - + Sender::ConnectionInfo sender_config("test_sender", "inproc://sendreceive"); + Receiver::ConnectionInfo receiver_config("test_receiver", "inproc://sendreceive"); + the_sender->connect_for_sends(sender_config); the_receiver->connect_for_receives(receiver_config); @@ -91,8 +91,8 @@ BOOST_AUTO_TEST_CASE(CallbackTest) BOOST_REQUIRE(the_sender != nullptr); BOOST_REQUIRE(!the_sender->can_send()); - Sender::ConnectionInfo sender_config("test_sender", "inproc://default"); - Receiver::ConnectionInfo receiver_config("test_receiver", "inproc://default"); + Sender::ConnectionInfo sender_config("test_sender", "inproc://callback"); + Receiver::ConnectionInfo receiver_config("test_receiver", "inproc://callback"); the_sender->connect_for_sends(sender_config); the_receiver->connect_for_receives(receiver_config); @@ -121,9 +121,8 @@ BOOST_AUTO_TEST_CASE(CallbackTest) message_received = false; test_data = { 'A', 'N', 'O', 'T', 'H', 'E', 'R', ' ', 'T', 'E', 'S', 'T' }; the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic"); - while (!message_received.load()) { - usleep(15000); - } + usleep(100000); + BOOST_REQUIRE_EQUAL(message_received, true); message_received = false; test_data = { 'A', ' ', 'T', 'H', 'I', 'R', 'D', ' ', 'T', 'E', 'S', 'T' }; @@ -138,7 +137,7 @@ BOOST_AUTO_TEST_CASE(CallbackTest) usleep(100000); BOOST_REQUIRE_EQUAL(message_received, false); - auto response = the_receiver->receive(Receiver::s_block); + auto response = the_receiver->receive(std::chrono::milliseconds(1000)); BOOST_REQUIRE_EQUAL(response.data.size(), test_data.size()); BOOST_REQUIRE(!the_receiver->data_pending()); } @@ -160,7 +159,7 @@ BOOST_AUTO_TEST_CASE(MultiplePublishers) std::vector test_data{ 'T', 'E', 'S', 'T' }; first_publisher->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic"); - auto response = the_subscriber->receive(Receiver::s_block); + auto response = the_subscriber->receive(std::chrono::milliseconds(1000)); BOOST_REQUIRE_EQUAL(response.data.size(), 4); BOOST_REQUIRE_EQUAL(response.data[0], 'T'); BOOST_REQUIRE_EQUAL(response.data[1], 'E'); @@ -168,7 +167,7 @@ BOOST_AUTO_TEST_CASE(MultiplePublishers) BOOST_REQUIRE_EQUAL(response.data[3], 'T'); second_publisher->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic"); - auto response2 = the_subscriber->receive(Receiver::s_block); + auto response2 = the_subscriber->receive(std::chrono::milliseconds(1000)); BOOST_REQUIRE_EQUAL(response2.data.size(), 4); BOOST_REQUIRE_EQUAL(response2.data[0], 'T'); BOOST_REQUIRE_EQUAL(response2.data[1], 'E'); diff --git a/unittest/ZmqReceiver_test.cxx b/unittest/ZmqReceiver_test.cxx index 8a822a6..9ccad89 100644 --- a/unittest/ZmqReceiver_test.cxx +++ b/unittest/ZmqReceiver_test.cxx @@ -64,9 +64,10 @@ BOOST_AUTO_TEST_CASE(Errors) config.connection_name = "timeout_test"; the_receiver->connect_for_receives(config); BOOST_REQUIRE(the_receiver->can_receive()); - BOOST_REQUIRE_EXCEPTION(the_receiver->receive(Receiver::s_no_block), ReceiveTimeoutExpired, [&](ReceiveTimeoutExpired e) { + BOOST_REQUIRE_EXCEPTION( + the_receiver->receive(Receiver::s_no_block), ReceiveTimeoutExpired, [&](ReceiveTimeoutExpired e) { TLOG() << e.what(); - return std::string(e.what()).find("Unable to receive within timeout period") != std::string::npos; + return std::string(e.what()).find("Unable to receive within timeout period") != std::string::npos; }); } BOOST_AUTO_TEST_SUITE_END() diff --git a/unittest/ZmqSender_test.cxx b/unittest/ZmqSender_test.cxx index e55b9cd..7a50977 100644 --- a/unittest/ZmqSender_test.cxx +++ b/unittest/ZmqSender_test.cxx @@ -9,6 +9,8 @@ #include "ipm/Sender.hpp" #include "ipm/ZmqContext.hpp" +#include "utilities/Issues.hpp" + #define BOOST_TEST_MODULE ZmqSender_test // NOLINT #include "boost/test/unit_test.hpp" @@ -23,34 +25,35 @@ BOOST_AUTO_TEST_SUITE(ZmqSender_test) BOOST_AUTO_TEST_CASE(BasicTests) { auto the_sender = make_ipm_sender("ZmqSender"); - BOOST_REQUIRE(the_sender != nullptr); - BOOST_REQUIRE(!the_sender->can_send()); + BOOST_CHECK(the_sender != nullptr); + BOOST_CHECK(!the_sender->can_send()); } BOOST_AUTO_TEST_CASE(Errors) { auto the_sender = make_ipm_sender("ZmqSender"); - BOOST_REQUIRE(the_sender != nullptr); - BOOST_REQUIRE(!the_sender->can_send()); + BOOST_CHECK(the_sender != nullptr); + BOOST_CHECK(!the_sender->can_send()); Sender::ConnectionInfo config("ZmqSenderTestConn"); config.connection_string = "not a uri"; - BOOST_REQUIRE_EXCEPTION(the_sender->connect_for_sends(config), ZmqOperationError, [&](ZmqOperationError e) { - return std::string(e.what()).find("Operation failed for all resolved connection strings") != std::string::npos; - }); - BOOST_REQUIRE(!the_sender->can_send()); - + BOOST_CHECK_EXCEPTION( + the_sender->connect_for_sends(config), dunedaq::utilities::InvalidUri, [&](dunedaq::utilities::InvalidUri e) { + return std::string(e.what()).find("not a uri") != std::string::npos; + }); + BOOST_CHECK(!the_sender->can_send()); + config.connection_string = "tcp://thishostddoesnotexist"; - BOOST_REQUIRE_EXCEPTION(the_sender->connect_for_sends(config), ZmqOperationError, [&](ZmqOperationError e) { - return std::string(e.what()).find("Operation failed for all resolved connection strings") != std::string::npos; + BOOST_CHECK_EXCEPTION(the_sender->connect_for_sends(config), ZmqOperationError, [&](ZmqOperationError e) { + return std::string(e.what()).find("thishostddoesnotexist") != std::string::npos; }); - BOOST_REQUIRE(!the_sender->can_send()); - + BOOST_CHECK(!the_sender->can_send()); + config.connection_string = "badproto://default"; - BOOST_REQUIRE_EXCEPTION(the_sender->connect_for_sends(config), ZmqOperationError, [&](ZmqOperationError e) { - return std::string(e.what()).find("Operation failed for all resolved connection strings") != std::string::npos; + BOOST_CHECK_EXCEPTION(the_sender->connect_for_sends(config), ZmqOperationError, [&](ZmqOperationError e) { + return std::string(e.what()).find("badproto") != std::string::npos; }); - BOOST_REQUIRE(!the_sender->can_send()); + BOOST_CHECK(!the_sender->can_send()); } BOOST_AUTO_TEST_SUITE_END()