Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions include/ipm/Receiver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ ERS_DECLARE_ISSUE(ipm,
((std::string)connection_name)((int)bytes1)((int)bytes2)) // NOLINT
ERS_DECLARE_ISSUE(ipm,
ReceiveTimeoutExpired,
connection_name << ": Unable to receive within timeout period (timeout period was " << timeout << " milliseconds)",
connection_name << ": Unable to receive within timeout period (timeout period was " << timeout
<< " milliseconds)",
((std::string)connection_name)((int)timeout)) // NOLINT
// Reenable coverage collection LCOV_EXCL_STOP
} // namespace dunedaq
Expand Down Expand Up @@ -74,12 +75,12 @@ class Receiver : public opmonlib::MonitorableObject
{

public:
struct ConnectionInfo
{
std::string connection_name{ "" };
std::string connection_string{ "" };
std::vector<std::string> connection_strings{};
};
struct ConnectionInfo
{
std::string connection_name{ "" };
std::string connection_string{ "" };
std::vector<std::string> connection_strings{};
};
using duration_t = std::chrono::milliseconds;
static constexpr duration_t s_block = duration_t::max();
static constexpr duration_t s_no_block = duration_t::zero();
Expand Down
4 changes: 3 additions & 1 deletion include/ipm/Sender.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ ERS_DECLARE_ISSUE(ipm,
((std::string)connection_name))
ERS_DECLARE_ISSUE(ipm,
SendTimeoutExpired,
connection_name << ": Unable to send within timeout period (timeout period was " << timeout << " milliseconds)",
connection_name << ": Unable to send within timeout period (timeout period was " << timeout
<< " milliseconds)",
((std::string)connection_name)((int)timeout)) // NOLINT

// Reenable coverage collection LCOV_EXCL_STOP
Expand Down Expand Up @@ -80,6 +81,7 @@ class Sender : public opmonlib::MonitorableObject
std::string connection_name{ "" };
std::string connection_string{ "inproc://default" };
int capacity{ 0 };
std::string send_endpoint{ "" };
};
using duration_t = std::chrono::milliseconds;
static constexpr duration_t s_block = duration_t::max();
Expand Down
11 changes: 6 additions & 5 deletions plugins/ZmqReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ class ZmqReceiver : public Receiver
try {
m_socket.set(zmq::sockopt::rcvtimeo, 0); // Return immediately if we can't receive
} catch (zmq::error_t const& err) {
throw ZmqOperationError(ERS_HERE, m_connection_info.connection_name,
throw ZmqOperationError(ERS_HERE,
m_connection_info.connection_name,
"set timeout",
"receive",
err.what(),
Expand Down Expand Up @@ -94,7 +95,8 @@ class ZmqReceiver : public Receiver
m_connection_info.connection_string);
}
for (auto& connection_string : resolved) {
TLOG_DEBUG(TLVL_CONNECTIONSTRING) << m_connection_info.connection_name << ": Connection String is " << connection_string;
TLOG_DEBUG(TLVL_CONNECTIONSTRING) << m_connection_info.connection_name << ": Connection String is "
<< connection_string;
try {
m_socket.bind(connection_string);
m_connection_info.connection_string = m_socket.get(zmq::sockopt::last_endpoint);
Expand Down Expand Up @@ -153,7 +155,7 @@ class ZmqReceiver : public Receiver
TLOG_DEBUG(TLVL_ZMQRECEIVER_RECV_HDR) << m_connection_info.connection_name << ": Going to receive header";
res = m_socket.recv(hdr);
TLOG_DEBUG(TLVL_ZMQRECEIVER_RECV_HDR_2) << m_connection_info.connection_name << ": Recv res=" << res.value_or(0)
<< " for header (hdr.size() == " << hdr.size() << ")";
<< " for header (hdr.size() == " << hdr.size() << ")";
} catch (zmq::error_t const& err) {
throw ZmqReceiveError(ERS_HERE, m_connection_info.connection_name, err.what(), "header");
}
Expand Down Expand Up @@ -185,8 +187,7 @@ class ZmqReceiver : public Receiver
}

TLOG_DEBUG(TLVL_ZMQRECEIVER_RECV_END)
<< m_connection_info.connection_name << ": Returning output with metadata size "
<< output.metadata.size()
<< m_connection_info.connection_name << ": Returning output with metadata size " << output.metadata.size()
<< " and data size " << output.data.size();
return output;
}
Expand Down
64 changes: 32 additions & 32 deletions plugins/ZmqSender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "ipm/ZmqContext.hpp"

#include "logging/Logging.hpp"
#include "utilities/ZmqUri.hpp"
#include "zmq.hpp"

#include <string>
Expand All @@ -30,10 +31,11 @@ class ZmqSender : public Sender
// Probably (cpp)zmq does this in the socket dtor anyway, but I guess it doesn't hurt to be explicit
if (m_connection_info.connection_string != "" && m_socket_connected) {
try {
TLOG_DEBUG(TLVL_ZMQSENDER_DESTRUCTOR) << m_connection_info.connection_name << ": Setting socket HWM to zero";
TLOG_DEBUG(TLVL_ZMQSENDER_DESTRUCTOR) << m_connection_info.connection_name << ": Setting socket HWM to one";
m_socket.set(zmq::sockopt::sndhwm, 1);

TLOG_DEBUG(TLVL_ZMQSENDER_DESTRUCTOR) << m_connection_info.connection_name
TLOG_DEBUG(TLVL_ZMQSENDER_DESTRUCTOR)
<< m_connection_info.connection_name
<< ": Waiting up to 10s for socket to become writable before disconnecting";
auto start_time = std::chrono::steady_clock::now();
while (
Expand All @@ -48,7 +50,6 @@ class ZmqSender : public Sender
TLOG_DEBUG(TLVL_ZMQSENDER_DESTRUCTOR)
<< m_connection_info.connection_name << ": Disconnecting socket from " << m_connection_info.connection_string;


m_socket.disconnect(m_connection_info.connection_string);
m_socket_connected = false;
} catch (zmq::error_t const& err) {
Expand All @@ -67,34 +68,41 @@ class ZmqSender : public Sender
std::string connect_for_sends(const ConnectionInfo& connection_info) override
{
m_connection_info = connection_info;
auto connection_string = m_connection_info.connection_string;
auto capacity = m_connection_info.capacity;
auto base_uri = utilities::ZmqUri(connection_info.connection_string);

if (connection_info.send_endpoint != "") {
auto connection_uri = base_uri;

if (connection_info.send_endpoint.find(":") != std::string::npos) {
connection_uri.endpoint_host =
connection_info.send_endpoint.substr(0, connection_info.send_endpoint.find(":"));
connection_uri.endpoint_port =
connection_info.send_endpoint.substr(connection_info.send_endpoint.find(":") + 1);
} else {
connection_uri.endpoint_host = connection_info.send_endpoint;
}
connection_string = connection_uri.to_string();
}

TLOG_DEBUG(TLVL_CONNECTIONSTRING) << "Connection String is " << connection_string;
try {
m_socket.set(zmq::sockopt::sndtimeo, 0); // Return immediately if we can't send
} catch (zmq::error_t const& err) {
throw ZmqOperationError(ERS_HERE,
m_connection_info.connection_name,
"set timeout",
"send",
err.what(),
m_connection_info.connection_string);
throw ZmqOperationError(
ERS_HERE, m_connection_info.connection_name, "set timeout", "send", err.what(), connection_string);
}

auto hwm = connection_info.capacity;
if (hwm > 0) {
if (capacity > 0) {
try {
m_socket.set(zmq::sockopt::sndhwm, hwm);
m_socket.set(zmq::sockopt::sndhwm, capacity);
} catch (zmq::error_t const& err) {
throw ZmqOperationError(ERS_HERE,
m_connection_info.connection_name,
"set hwm",
"send",
err.what(),
m_connection_info.connection_string);
throw ZmqOperationError(
ERS_HERE, m_connection_info.connection_name, "set hwm", "send", err.what(), connection_string);
}
}

auto connection_string = m_connection_info.connection_string;

TLOG_DEBUG(TLVL_CONNECTIONSTRING) << "Connection String is " << connection_string;
try {
m_socket.set(zmq::sockopt::immediate, 1); // Don't queue messages to incomplete connections
} catch (zmq::error_t const& err) {
Expand All @@ -107,18 +115,10 @@ class ZmqSender : public Sender
m_connection_info.connection_string = m_socket.get(zmq::sockopt::last_endpoint);
m_socket_connected = true;
} catch (zmq::error_t const& err) {
ers::error(ZmqOperationError(
throw ZmqOperationError(ZmqOperationError(
ERS_HERE, m_connection_info.connection_name, "connect", "send", err.what(), connection_string));
}

if (!m_socket_connected) {
throw ZmqOperationError(ERS_HERE,
m_connection_info.connection_name,
"connect",
"send",
"Operation failed for all resolved connection strings",
"");
}
return m_connection_info.connection_string;
}

Expand All @@ -133,6 +133,7 @@ class ZmqSender : public Sender
<< m_connection_info.connection_name << ": Starting send of " << N << " bytes";
auto start_time = std::chrono::steady_clock::now();
zmq::send_result_t res{};

do {

zmq::message_t topic_msg(topic.c_str(), topic.size());
Expand Down Expand Up @@ -163,8 +164,7 @@ class ZmqSender : public Sender
throw SendTimeoutExpired(ERS_HERE, m_connection_info.connection_name, timeout.count());
}

TLOG_DEBUG(TLVL_ZMQSENDER_SEND_END) << m_connection_info.connection_name << ": Completed send of " << N
<< " bytes";
TLOG_DEBUG(TLVL_ZMQSENDER_SEND_END) << m_connection_info.connection_name << ": Completed send of " << N << " bytes";
return res && res == N;
}

Expand Down
2 changes: 1 addition & 1 deletion test/apps/zmq_send.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ main(int argc, char* argv[])
uint32_t npackets = 1; // NOLINT(build/unsigned)
size_t packetSize = 100;
size_t interval = 0;
std::string conString = "tcp://127.0.0.1:12345";
std::string conString = "tcp://127.0.0.2:*;127.0.0.1:12345";
int nthreads = 1;
uint32_t id = 0; // NOLINT(build/unsigned)

Expand Down
21 changes: 10 additions & 11 deletions unittest/ZmqPubSub_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ BOOST_AUTO_TEST_CASE(SendReceiveTest)
BOOST_REQUIRE(the_sender != nullptr);
BOOST_REQUIRE(!the_sender->can_send());

Sender::ConnectionInfo sender_config("test_sender", "inproc://default");
Receiver::ConnectionInfo receiver_config("test_receiver", "inproc://default");
Sender::ConnectionInfo sender_config("test_sender", "inproc://sendreceive");
Receiver::ConnectionInfo receiver_config("test_receiver", "inproc://sendreceive");

the_sender->connect_for_sends(sender_config);
the_receiver->connect_for_receives(receiver_config);

Expand Down Expand Up @@ -91,8 +91,8 @@ BOOST_AUTO_TEST_CASE(CallbackTest)
BOOST_REQUIRE(the_sender != nullptr);
BOOST_REQUIRE(!the_sender->can_send());

Sender::ConnectionInfo sender_config("test_sender", "inproc://default");
Receiver::ConnectionInfo receiver_config("test_receiver", "inproc://default");
Sender::ConnectionInfo sender_config("test_sender", "inproc://callback");
Receiver::ConnectionInfo receiver_config("test_receiver", "inproc://callback");
the_sender->connect_for_sends(sender_config);
the_receiver->connect_for_receives(receiver_config);

Expand Down Expand Up @@ -121,9 +121,8 @@ BOOST_AUTO_TEST_CASE(CallbackTest)
message_received = false;
test_data = { 'A', 'N', 'O', 'T', 'H', 'E', 'R', ' ', 'T', 'E', 'S', 'T' };
the_sender->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic");
while (!message_received.load()) {
usleep(15000);
}
usleep(100000);
BOOST_REQUIRE_EQUAL(message_received, true);

message_received = false;
test_data = { 'A', ' ', 'T', 'H', 'I', 'R', 'D', ' ', 'T', 'E', 'S', 'T' };
Expand All @@ -138,7 +137,7 @@ BOOST_AUTO_TEST_CASE(CallbackTest)

usleep(100000);
BOOST_REQUIRE_EQUAL(message_received, false);
auto response = the_receiver->receive(Receiver::s_block);
auto response = the_receiver->receive(std::chrono::milliseconds(1000));
BOOST_REQUIRE_EQUAL(response.data.size(), test_data.size());
BOOST_REQUIRE(!the_receiver->data_pending());
}
Expand All @@ -160,15 +159,15 @@ BOOST_AUTO_TEST_CASE(MultiplePublishers)

std::vector<char> test_data{ 'T', 'E', 'S', 'T' };
first_publisher->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic");
auto response = the_subscriber->receive(Receiver::s_block);
auto response = the_subscriber->receive(std::chrono::milliseconds(1000));
BOOST_REQUIRE_EQUAL(response.data.size(), 4);
BOOST_REQUIRE_EQUAL(response.data[0], 'T');
BOOST_REQUIRE_EQUAL(response.data[1], 'E');
BOOST_REQUIRE_EQUAL(response.data[2], 'S');
BOOST_REQUIRE_EQUAL(response.data[3], 'T');

second_publisher->send(test_data.data(), test_data.size(), Sender::s_no_block, "testTopic");
auto response2 = the_subscriber->receive(Receiver::s_block);
auto response2 = the_subscriber->receive(std::chrono::milliseconds(1000));
BOOST_REQUIRE_EQUAL(response2.data.size(), 4);
BOOST_REQUIRE_EQUAL(response2.data[0], 'T');
BOOST_REQUIRE_EQUAL(response2.data[1], 'E');
Expand Down
5 changes: 3 additions & 2 deletions unittest/ZmqReceiver_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,10 @@ BOOST_AUTO_TEST_CASE(Errors)
config.connection_name = "timeout_test";
the_receiver->connect_for_receives(config);
BOOST_REQUIRE(the_receiver->can_receive());
BOOST_REQUIRE_EXCEPTION(the_receiver->receive(Receiver::s_no_block), ReceiveTimeoutExpired, [&](ReceiveTimeoutExpired e) {
BOOST_REQUIRE_EXCEPTION(
the_receiver->receive(Receiver::s_no_block), ReceiveTimeoutExpired, [&](ReceiveTimeoutExpired e) {
TLOG() << e.what();
return std::string(e.what()).find("Unable to receive within timeout period") != std::string::npos;
return std::string(e.what()).find("Unable to receive within timeout period") != std::string::npos;
});
}
BOOST_AUTO_TEST_SUITE_END()
35 changes: 19 additions & 16 deletions unittest/ZmqSender_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include "ipm/Sender.hpp"
#include "ipm/ZmqContext.hpp"

#include "utilities/Issues.hpp"

#define BOOST_TEST_MODULE ZmqSender_test // NOLINT

#include "boost/test/unit_test.hpp"
Expand All @@ -23,34 +25,35 @@ BOOST_AUTO_TEST_SUITE(ZmqSender_test)
BOOST_AUTO_TEST_CASE(BasicTests)
{
auto the_sender = make_ipm_sender("ZmqSender");
BOOST_REQUIRE(the_sender != nullptr);
BOOST_REQUIRE(!the_sender->can_send());
BOOST_CHECK(the_sender != nullptr);
BOOST_CHECK(!the_sender->can_send());
}

BOOST_AUTO_TEST_CASE(Errors)
{
auto the_sender = make_ipm_sender("ZmqSender");
BOOST_REQUIRE(the_sender != nullptr);
BOOST_REQUIRE(!the_sender->can_send());
BOOST_CHECK(the_sender != nullptr);
BOOST_CHECK(!the_sender->can_send());

Sender::ConnectionInfo config("ZmqSenderTestConn");

config.connection_string = "not a uri";
BOOST_REQUIRE_EXCEPTION(the_sender->connect_for_sends(config), ZmqOperationError, [&](ZmqOperationError e) {
return std::string(e.what()).find("Operation failed for all resolved connection strings") != std::string::npos;
});
BOOST_REQUIRE(!the_sender->can_send());

BOOST_CHECK_EXCEPTION(
the_sender->connect_for_sends(config), dunedaq::utilities::InvalidUri, [&](dunedaq::utilities::InvalidUri e) {
return std::string(e.what()).find("not a uri") != std::string::npos;
});
BOOST_CHECK(!the_sender->can_send());

config.connection_string = "tcp://thishostddoesnotexist";
BOOST_REQUIRE_EXCEPTION(the_sender->connect_for_sends(config), ZmqOperationError, [&](ZmqOperationError e) {
return std::string(e.what()).find("Operation failed for all resolved connection strings") != std::string::npos;
BOOST_CHECK_EXCEPTION(the_sender->connect_for_sends(config), ZmqOperationError, [&](ZmqOperationError e) {
return std::string(e.what()).find("thishostddoesnotexist") != std::string::npos;
});
BOOST_REQUIRE(!the_sender->can_send());
BOOST_CHECK(!the_sender->can_send());

config.connection_string = "badproto://default";
BOOST_REQUIRE_EXCEPTION(the_sender->connect_for_sends(config), ZmqOperationError, [&](ZmqOperationError e) {
return std::string(e.what()).find("Operation failed for all resolved connection strings") != std::string::npos;
BOOST_CHECK_EXCEPTION(the_sender->connect_for_sends(config), ZmqOperationError, [&](ZmqOperationError e) {
return std::string(e.what()).find("badproto") != std::string::npos;
});
BOOST_REQUIRE(!the_sender->can_send());
BOOST_CHECK(!the_sender->can_send());
}
BOOST_AUTO_TEST_SUITE_END()
Loading