Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions plugins/dogma/dogma/UdpTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ namespace dogma {
dabc::BufferSize_t fBufferSize{0}; ///< available buffer size
std::string fHostName; ///< host name used to create UDP socket
int fRecvBufLen{100000}; ///< recv buf len
std::string fMcastAddr; ///< mcast address
unsigned fMTU{0}; ///< maximal size of packet expected from DOG
void* fMtuBuffer{nullptr}; ///< buffer used to skip packets when no normal buffer is available
int fSkipCnt{0}; ///< counter used to control buffers skipping
Expand All @@ -121,12 +122,12 @@ namespace dogma {
bool CloseBuffer();

public:
UdpAddon(int fd, const std::string &host, int nport, int rcvbuflen, int mtu, bool debug, bool print, int maxloop, double reduce);
~UdpAddon() override;
UdpAddon(int fd, const std::string &host, int nport, int rcvbuflen, const std::string &mcast, int mtu, bool debug, bool print, int maxloop, double reduce);
~UdpAddon() override;

bool HasBuffer() const { return !fTgtPtr.null(); }

static int OpenUdp(const std::string &host, int nport, int rcvbuflen);
static int OpenUdp(const std::string &host, int nport, int rcvbuflen, const std::string &mcast = "");
};

// ==================================================================================
Expand Down
9 changes: 6 additions & 3 deletions plugins/dogma/src/Factory.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ dabc::Module* dogma::Factory::CreateTransport(const dabc::Reference& port, const

std::string host = url.GetHostName();
int rcvbuflen = url.GetOptionInt("udpbuf", 200000);
int fd = dogma::UdpAddon::OpenUdp(host, nport, rcvbuflen);
std::string mcast = url.GetOptionStr("mcast", "");

int fd = dogma::UdpAddon::OpenUdp(host, nport, rcvbuflen, mcast);
if (fd <= 0) {
EOUT("Cannot open UDP socket for %s", url.GetHostNameWithPort().c_str());
return nullptr;
Expand All @@ -104,10 +106,11 @@ dabc::Module* dogma::Factory::CreateTransport(const dabc::Reference& port, const
int udp_queue = url.GetOptionInt("upd_queue", 0);
double heartbeat = url.GetOptionDouble("heartbeat", -1.);

if (udp_queue>0) cmd.SetInt("TransportQueue", udp_queue);
if (udp_queue > 0)
cmd.SetInt("TransportQueue", udp_queue);

DOUT0("Start DOGMA UDP transport on %s", url.GetHostNameWithPort().c_str());

auto addon = new dogma::UdpAddon(fd, host, nport, rcvbuflen, mtu, debug, print, maxloop, reduce);
auto addon = new dogma::UdpAddon(fd, host, nport, rcvbuflen, mcast, mtu, debug, print, maxloop, reduce);
return new dogma::UdpTransport(cmd, portref, addon, flush, heartbeat);
}
28 changes: 22 additions & 6 deletions plugins/dogma/src/UdpTransport.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,19 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/syscall.h>
#include <arpa/inet.h>


// according to specification maximal UDP packet is 65,507 or 0xFFE3
#define DEFAULT_MTU 0xFFF0

dogma::UdpAddon::UdpAddon(int fd, const std::string &host, int nport, int rcvbuflen, int mtu, bool debug, bool print, int maxloop, double reduce) :
dogma::UdpAddon::UdpAddon(int fd, const std::string &host, int nport, int rcvbuflen, const std::string &mcast, int mtu, bool debug, bool print, int maxloop, double reduce) :
dabc::SocketAddon(fd),
TransportInfo(nport),
fTgtPtr(),
fHostName(host),
fRecvBufLen(rcvbuflen),
fMcastAddr(mcast),
fMTU(mtu > 0 ? mtu : DEFAULT_MTU),
fMtuBuffer(nullptr),
fSkipCnt(0),
Expand Down Expand Up @@ -237,10 +239,11 @@ bool dogma::UdpAddon::ReadUdp()
return true; // indicate that buffer reading will be finished by callback
}

int dogma::UdpAddon::OpenUdp(const std::string &host, int nport, int rcvbuflen)
int dogma::UdpAddon::OpenUdp(const std::string &host, int nport, int rcvbuflen, const std::string &mcast)
{
int fd = socket(PF_INET, SOCK_DGRAM, 0);
if (fd < 0) return -1;
if (fd < 0)
return -1;

if (!dabc::SocketThread::SetNonBlockSocket(fd)) {
EOUT("Cannot set non-blocking mode for UDP socket %d", fd);
Expand Down Expand Up @@ -269,6 +272,17 @@ int dogma::UdpAddon::OpenUdp(const std::string &host, int nport, int rcvbuflen)
}
}

if (mcast.length() > 0) {
struct in_addr localInterface;
memset(&localInterface, 0, sizeof(localInterface));
localInterface.s_addr = inet_addr(mcast.c_str());

if(setsockopt(fd, IPPROTO_IP, IP_MULTICAST_IF, (char *)&localInterface, sizeof(localInterface)) < 0)
EOUT("Fail to set mcast addr %s to socket", mcast.c_str());
else
EOUT("MCAST addr %s set OK", mcast.c_str());
}

if ((host.length() > 0) && (host != "host")) {
struct addrinfo hints, *info = nullptr;

Expand All @@ -281,15 +295,17 @@ int dogma::UdpAddon::OpenUdp(const std::string &host, int nport, int rcvbuflen)

getaddrinfo(host.c_str(), service.c_str(), &hints, &info);

if (info && bind(fd, info->ai_addr, info->ai_addrlen) == 0) return fd;
if (info && bind(fd, info->ai_addr, info->ai_addrlen) == 0)
return fd;
}

sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(nport);

if (bind(fd, (struct sockaddr *) &addr, sizeof(addr)) == 0) return fd;
if (bind(fd, (struct sockaddr *) &addr, sizeof(addr)) == 0)
return fd;

close(fd);
return -1;
Expand Down Expand Up @@ -340,7 +356,7 @@ int dogma::UdpTransport::ExecuteCommand(dabc::Command cmd)
auto addon = static_cast<UdpAddon *>(fAddon());
if (addon) {
addon->CloseSocket();
int fd = dogma::UdpAddon::OpenUdp(addon->fHostName, addon->fNPort, addon->fRecvBufLen);
int fd = dogma::UdpAddon::OpenUdp(addon->fHostName, addon->fNPort, addon->fRecvBufLen, addon->fMcastAddr);
if (fd <= 0) {
EOUT("Cannot recreate UDP socket for port %d", addon->fNPort);
dabc::mgr.StopApplication();
Expand Down