From b57dbae8a5eb3786f9390345d556629562653168 Mon Sep 17 00:00:00 2001 From: Sergey Linev Date: Tue, 27 Jan 2026 14:46:59 +0100 Subject: [PATCH 1/2] [dogma] support mcast parameter in the dogma --- plugins/dogma/dogma/UdpTransport.h | 4 ++-- plugins/dogma/src/Factory.cxx | 7 +++++-- plugins/dogma/src/UdpTransport.cxx | 23 +++++++++++++++++++---- 3 files changed, 26 insertions(+), 8 deletions(-) diff --git a/plugins/dogma/dogma/UdpTransport.h b/plugins/dogma/dogma/UdpTransport.h index 2c385642..db14e856 100644 --- a/plugins/dogma/dogma/UdpTransport.h +++ b/plugins/dogma/dogma/UdpTransport.h @@ -122,11 +122,11 @@ namespace dogma { public: UdpAddon(int fd, const std::string &host, int nport, int rcvbuflen, int mtu, bool debug, bool print, int maxloop, double reduce); - ~UdpAddon() override; + ~UdpAddon() override; bool HasBuffer() const { return !fTgtPtr.null(); } - static int OpenUdp(const std::string &host, int nport, int rcvbuflen); + static int OpenUdp(const std::string &host, int nport, int rcvbuflen, const std::string &mcast = ""); }; // ================================================================================== diff --git a/plugins/dogma/src/Factory.cxx b/plugins/dogma/src/Factory.cxx index fbd2649d..84d5ce21 100644 --- a/plugins/dogma/src/Factory.cxx +++ b/plugins/dogma/src/Factory.cxx @@ -89,7 +89,9 @@ dabc::Module* dogma::Factory::CreateTransport(const dabc::Reference& port, const std::string host = url.GetHostName(); int rcvbuflen = url.GetOptionInt("udpbuf", 200000); - int fd = dogma::UdpAddon::OpenUdp(host, nport, rcvbuflen); + std::string mcast = url.GetOptionStr("mcast", ""); + + int fd = dogma::UdpAddon::OpenUdp(host, nport, rcvbuflen, mcast); if (fd <= 0) { EOUT("Cannot open UDP socket for %s", url.GetHostNameWithPort().c_str()); return nullptr; @@ -104,7 +106,8 @@ dabc::Module* dogma::Factory::CreateTransport(const dabc::Reference& port, const int udp_queue = url.GetOptionInt("upd_queue", 0); double heartbeat = url.GetOptionDouble("heartbeat", -1.); - if (udp_queue>0) cmd.SetInt("TransportQueue", udp_queue); + if (udp_queue > 0) + cmd.SetInt("TransportQueue", udp_queue); DOUT0("Start DOGMA UDP transport on %s", url.GetHostNameWithPort().c_str()); diff --git a/plugins/dogma/src/UdpTransport.cxx b/plugins/dogma/src/UdpTransport.cxx index 8c08a16a..46f11539 100644 --- a/plugins/dogma/src/UdpTransport.cxx +++ b/plugins/dogma/src/UdpTransport.cxx @@ -28,6 +28,7 @@ #include #include #include +#include // according to specification maximal UDP packet is 65,507 or 0xFFE3 @@ -237,10 +238,11 @@ bool dogma::UdpAddon::ReadUdp() return true; // indicate that buffer reading will be finished by callback } -int dogma::UdpAddon::OpenUdp(const std::string &host, int nport, int rcvbuflen) +int dogma::UdpAddon::OpenUdp(const std::string &host, int nport, int rcvbuflen, const std::string &mcast) { int fd = socket(PF_INET, SOCK_DGRAM, 0); - if (fd < 0) return -1; + if (fd < 0) + return -1; if (!dabc::SocketThread::SetNonBlockSocket(fd)) { EOUT("Cannot set non-blocking mode for UDP socket %d", fd); @@ -269,6 +271,17 @@ int dogma::UdpAddon::OpenUdp(const std::string &host, int nport, int rcvbuflen) } } + if (mcast.length() > 0) { + struct in_addr localInterface; + memset(&localInterface, 0, sizeof(localInterface)); + localInterface.s_addr = inet_addr(mcast.c_str()); + + if(setsockopt(fd, IPPROTO_IP, IP_MULTICAST_IF, (char *)&localInterface, sizeof(localInterface)) < 0) + EOUT("Fail to set mcast addr %s to socket", mcast.c_str()); + else + EOUT("MCAST addr %s set OK", mcast.c_str()); + } + if ((host.length() > 0) && (host != "host")) { struct addrinfo hints, *info = nullptr; @@ -281,7 +294,8 @@ int dogma::UdpAddon::OpenUdp(const std::string &host, int nport, int rcvbuflen) getaddrinfo(host.c_str(), service.c_str(), &hints, &info); - if (info && bind(fd, info->ai_addr, info->ai_addrlen) == 0) return fd; + if (info && bind(fd, info->ai_addr, info->ai_addrlen) == 0) + return fd; } sockaddr_in addr; @@ -289,7 +303,8 @@ int dogma::UdpAddon::OpenUdp(const std::string &host, int nport, int rcvbuflen) addr.sin_family = AF_INET; addr.sin_port = htons(nport); - if (bind(fd, (struct sockaddr *) &addr, sizeof(addr)) == 0) return fd; + if (bind(fd, (struct sockaddr *) &addr, sizeof(addr)) == 0) + return fd; close(fd); return -1; From 5d470e44124b1934a6514e4e3cc4b12a45bab410 Mon Sep 17 00:00:00 2001 From: Sergey Linev Date: Tue, 27 Jan 2026 15:57:17 +0100 Subject: [PATCH 2/2] Keep mcast addr in addon --- plugins/dogma/dogma/UdpTransport.h | 3 ++- plugins/dogma/src/Factory.cxx | 2 +- plugins/dogma/src/UdpTransport.cxx | 5 +++-- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/plugins/dogma/dogma/UdpTransport.h b/plugins/dogma/dogma/UdpTransport.h index db14e856..8300e061 100644 --- a/plugins/dogma/dogma/UdpTransport.h +++ b/plugins/dogma/dogma/UdpTransport.h @@ -97,6 +97,7 @@ namespace dogma { dabc::BufferSize_t fBufferSize{0}; ///< available buffer size std::string fHostName; ///< host name used to create UDP socket int fRecvBufLen{100000}; ///< recv buf len + std::string fMcastAddr; ///< mcast address unsigned fMTU{0}; ///< maximal size of packet expected from DOG void* fMtuBuffer{nullptr}; ///< buffer used to skip packets when no normal buffer is available int fSkipCnt{0}; ///< counter used to control buffers skipping @@ -121,7 +122,7 @@ namespace dogma { bool CloseBuffer(); public: - UdpAddon(int fd, const std::string &host, int nport, int rcvbuflen, int mtu, bool debug, bool print, int maxloop, double reduce); + UdpAddon(int fd, const std::string &host, int nport, int rcvbuflen, const std::string &mcast, int mtu, bool debug, bool print, int maxloop, double reduce); ~UdpAddon() override; bool HasBuffer() const { return !fTgtPtr.null(); } diff --git a/plugins/dogma/src/Factory.cxx b/plugins/dogma/src/Factory.cxx index 84d5ce21..e7b6da9a 100644 --- a/plugins/dogma/src/Factory.cxx +++ b/plugins/dogma/src/Factory.cxx @@ -111,6 +111,6 @@ dabc::Module* dogma::Factory::CreateTransport(const dabc::Reference& port, const DOUT0("Start DOGMA UDP transport on %s", url.GetHostNameWithPort().c_str()); - auto addon = new dogma::UdpAddon(fd, host, nport, rcvbuflen, mtu, debug, print, maxloop, reduce); + auto addon = new dogma::UdpAddon(fd, host, nport, rcvbuflen, mcast, mtu, debug, print, maxloop, reduce); return new dogma::UdpTransport(cmd, portref, addon, flush, heartbeat); } diff --git a/plugins/dogma/src/UdpTransport.cxx b/plugins/dogma/src/UdpTransport.cxx index 46f11539..0eec9ba8 100644 --- a/plugins/dogma/src/UdpTransport.cxx +++ b/plugins/dogma/src/UdpTransport.cxx @@ -34,12 +34,13 @@ // according to specification maximal UDP packet is 65,507 or 0xFFE3 #define DEFAULT_MTU 0xFFF0 -dogma::UdpAddon::UdpAddon(int fd, const std::string &host, int nport, int rcvbuflen, int mtu, bool debug, bool print, int maxloop, double reduce) : +dogma::UdpAddon::UdpAddon(int fd, const std::string &host, int nport, int rcvbuflen, const std::string &mcast, int mtu, bool debug, bool print, int maxloop, double reduce) : dabc::SocketAddon(fd), TransportInfo(nport), fTgtPtr(), fHostName(host), fRecvBufLen(rcvbuflen), + fMcastAddr(mcast), fMTU(mtu > 0 ? mtu : DEFAULT_MTU), fMtuBuffer(nullptr), fSkipCnt(0), @@ -355,7 +356,7 @@ int dogma::UdpTransport::ExecuteCommand(dabc::Command cmd) auto addon = static_cast(fAddon()); if (addon) { addon->CloseSocket(); - int fd = dogma::UdpAddon::OpenUdp(addon->fHostName, addon->fNPort, addon->fRecvBufLen); + int fd = dogma::UdpAddon::OpenUdp(addon->fHostName, addon->fNPort, addon->fRecvBufLen, addon->fMcastAddr); if (fd <= 0) { EOUT("Cannot recreate UDP socket for port %d", addon->fNPort); dabc::mgr.StopApplication();