diff --git a/plugins/dogma/dogma/UdpTransport.h b/plugins/dogma/dogma/UdpTransport.h index 2c385642..8300e061 100644 --- a/plugins/dogma/dogma/UdpTransport.h +++ b/plugins/dogma/dogma/UdpTransport.h @@ -97,6 +97,7 @@ namespace dogma { dabc::BufferSize_t fBufferSize{0}; ///< available buffer size std::string fHostName; ///< host name used to create UDP socket int fRecvBufLen{100000}; ///< recv buf len + std::string fMcastAddr; ///< mcast address unsigned fMTU{0}; ///< maximal size of packet expected from DOG void* fMtuBuffer{nullptr}; ///< buffer used to skip packets when no normal buffer is available int fSkipCnt{0}; ///< counter used to control buffers skipping @@ -121,12 +122,12 @@ namespace dogma { bool CloseBuffer(); public: - UdpAddon(int fd, const std::string &host, int nport, int rcvbuflen, int mtu, bool debug, bool print, int maxloop, double reduce); - ~UdpAddon() override; + UdpAddon(int fd, const std::string &host, int nport, int rcvbuflen, const std::string &mcast, int mtu, bool debug, bool print, int maxloop, double reduce); + ~UdpAddon() override; bool HasBuffer() const { return !fTgtPtr.null(); } - static int OpenUdp(const std::string &host, int nport, int rcvbuflen); + static int OpenUdp(const std::string &host, int nport, int rcvbuflen, const std::string &mcast = ""); }; // ================================================================================== diff --git a/plugins/dogma/src/Factory.cxx b/plugins/dogma/src/Factory.cxx index fbd2649d..e7b6da9a 100644 --- a/plugins/dogma/src/Factory.cxx +++ b/plugins/dogma/src/Factory.cxx @@ -89,7 +89,9 @@ dabc::Module* dogma::Factory::CreateTransport(const dabc::Reference& port, const std::string host = url.GetHostName(); int rcvbuflen = url.GetOptionInt("udpbuf", 200000); - int fd = dogma::UdpAddon::OpenUdp(host, nport, rcvbuflen); + std::string mcast = url.GetOptionStr("mcast", ""); + + int fd = dogma::UdpAddon::OpenUdp(host, nport, rcvbuflen, mcast); if (fd <= 0) { EOUT("Cannot open UDP socket for %s", url.GetHostNameWithPort().c_str()); return nullptr; @@ -104,10 +106,11 @@ dabc::Module* dogma::Factory::CreateTransport(const dabc::Reference& port, const int udp_queue = url.GetOptionInt("upd_queue", 0); double heartbeat = url.GetOptionDouble("heartbeat", -1.); - if (udp_queue>0) cmd.SetInt("TransportQueue", udp_queue); + if (udp_queue > 0) + cmd.SetInt("TransportQueue", udp_queue); DOUT0("Start DOGMA UDP transport on %s", url.GetHostNameWithPort().c_str()); - auto addon = new dogma::UdpAddon(fd, host, nport, rcvbuflen, mtu, debug, print, maxloop, reduce); + auto addon = new dogma::UdpAddon(fd, host, nport, rcvbuflen, mcast, mtu, debug, print, maxloop, reduce); return new dogma::UdpTransport(cmd, portref, addon, flush, heartbeat); } diff --git a/plugins/dogma/src/UdpTransport.cxx b/plugins/dogma/src/UdpTransport.cxx index 8c08a16a..0eec9ba8 100644 --- a/plugins/dogma/src/UdpTransport.cxx +++ b/plugins/dogma/src/UdpTransport.cxx @@ -28,17 +28,19 @@ #include #include #include +#include // according to specification maximal UDP packet is 65,507 or 0xFFE3 #define DEFAULT_MTU 0xFFF0 -dogma::UdpAddon::UdpAddon(int fd, const std::string &host, int nport, int rcvbuflen, int mtu, bool debug, bool print, int maxloop, double reduce) : +dogma::UdpAddon::UdpAddon(int fd, const std::string &host, int nport, int rcvbuflen, const std::string &mcast, int mtu, bool debug, bool print, int maxloop, double reduce) : dabc::SocketAddon(fd), TransportInfo(nport), fTgtPtr(), fHostName(host), fRecvBufLen(rcvbuflen), + fMcastAddr(mcast), fMTU(mtu > 0 ? mtu : DEFAULT_MTU), fMtuBuffer(nullptr), fSkipCnt(0), @@ -237,10 +239,11 @@ bool dogma::UdpAddon::ReadUdp() return true; // indicate that buffer reading will be finished by callback } -int dogma::UdpAddon::OpenUdp(const std::string &host, int nport, int rcvbuflen) +int dogma::UdpAddon::OpenUdp(const std::string &host, int nport, int rcvbuflen, const std::string &mcast) { int fd = socket(PF_INET, SOCK_DGRAM, 0); - if (fd < 0) return -1; + if (fd < 0) + return -1; if (!dabc::SocketThread::SetNonBlockSocket(fd)) { EOUT("Cannot set non-blocking mode for UDP socket %d", fd); @@ -269,6 +272,17 @@ int dogma::UdpAddon::OpenUdp(const std::string &host, int nport, int rcvbuflen) } } + if (mcast.length() > 0) { + struct in_addr localInterface; + memset(&localInterface, 0, sizeof(localInterface)); + localInterface.s_addr = inet_addr(mcast.c_str()); + + if(setsockopt(fd, IPPROTO_IP, IP_MULTICAST_IF, (char *)&localInterface, sizeof(localInterface)) < 0) + EOUT("Fail to set mcast addr %s to socket", mcast.c_str()); + else + EOUT("MCAST addr %s set OK", mcast.c_str()); + } + if ((host.length() > 0) && (host != "host")) { struct addrinfo hints, *info = nullptr; @@ -281,7 +295,8 @@ int dogma::UdpAddon::OpenUdp(const std::string &host, int nport, int rcvbuflen) getaddrinfo(host.c_str(), service.c_str(), &hints, &info); - if (info && bind(fd, info->ai_addr, info->ai_addrlen) == 0) return fd; + if (info && bind(fd, info->ai_addr, info->ai_addrlen) == 0) + return fd; } sockaddr_in addr; @@ -289,7 +304,8 @@ int dogma::UdpAddon::OpenUdp(const std::string &host, int nport, int rcvbuflen) addr.sin_family = AF_INET; addr.sin_port = htons(nport); - if (bind(fd, (struct sockaddr *) &addr, sizeof(addr)) == 0) return fd; + if (bind(fd, (struct sockaddr *) &addr, sizeof(addr)) == 0) + return fd; close(fd); return -1; @@ -340,7 +356,7 @@ int dogma::UdpTransport::ExecuteCommand(dabc::Command cmd) auto addon = static_cast(fAddon()); if (addon) { addon->CloseSocket(); - int fd = dogma::UdpAddon::OpenUdp(addon->fHostName, addon->fNPort, addon->fRecvBufLen); + int fd = dogma::UdpAddon::OpenUdp(addon->fHostName, addon->fNPort, addon->fRecvBufLen, addon->fMcastAddr); if (fd <= 0) { EOUT("Cannot recreate UDP socket for port %d", addon->fNPort); dabc::mgr.StopApplication();