From bbc025e76165a7014eb0c1539a15b748a068d1aa Mon Sep 17 00:00:00 2001 From: Wiktor Muller Date: Mon, 1 Jun 2026 14:10:58 +0200 Subject: [PATCH 1/2] Importing udp and udp multicast sockets from srp-platform --- .../sockets/udp_multicast_socket.cc | 122 ++++++++++++--- .../sockets/udp_multicast_socket.h | 62 +++++--- communication-core/sockets/udp_socket.cc | 145 +++++++++++------- communication-core/sockets/udp_socket.h | 75 ++++++--- 4 files changed, 291 insertions(+), 113 deletions(-) diff --git a/communication-core/sockets/udp_multicast_socket.cc b/communication-core/sockets/udp_multicast_socket.cc index 846afbca..fb2ab112 100644 --- a/communication-core/sockets/udp_multicast_socket.cc +++ b/communication-core/sockets/udp_multicast_socket.cc @@ -11,18 +11,30 @@ #include "communication-core/sockets/udp_multicast_socket.h" +#include + #include +#include // NOLINT +#include #include #include -#include "iostream" #include "unistd.h" - namespace srp { namespace com { namespace soc { -srp::core::ErrorCode UdpMulticastSocket::Init(const SocketConfig &config) { +namespace { +constexpr uint32_t kBufforSize{255 * 2}; +} // namespace + +srp::core::ErrorCode UdpMulticastSocket::Init(const std::string &local_ip, + const std::string &multicast_ip, + const std::uint16_t port_id) { + local_ip_ = local_ip; + multicast_ip_ = multicast_ip; + port_id_ = port_id; + sd = socket(AF_INET, SOCK_DGRAM, 0); if (sd < 0) { return srp::core::ErrorCode::kError; @@ -30,32 +42,70 @@ srp::core::ErrorCode UdpMulticastSocket::Init(const SocketConfig &config) { memset((char *)&groupSock, 0, sizeof(groupSock)); // NOLINT groupSock.sin_family = AF_INET; - groupSock.sin_addr.s_addr = inet_addr("231.255.42.99"); - groupSock.sin_port = htons(config.GetRxPort()); + groupSock.sin_addr.s_addr = inet_addr(multicast_ip.c_str()); + groupSock.sin_port = htons(port_id); + // { + // char loopch = 0; + + // if (setsockopt(sd, IPPROTO_IP, IP_MULTICAST_LOOP, + // (char *)&loopch, // NOLINT + // sizeof(loopch)) < 0) { + // close(sd); + // return srp::core::ErrorCode::kError; + // } + // } + + srcaddr.sin_family = AF_INET; + srcaddr.sin_addr.s_addr = inet_addr(local_ip.c_str()); + srcaddr.sin_port = htons(port_id_); + + if (setsockopt(sd, IPPROTO_IP, IP_MULTICAST_IF, + (char *)&srcaddr, // NOLINT + sizeof(srcaddr)) < 0) { + return srp::core::ErrorCode::kError; + } { - char loopch = 0; + int reuse = 1; - if (setsockopt(sd, IPPROTO_IP, IP_MULTICAST_LOOP, - (char *)&loopch, // NOLINT - sizeof(loopch)) < 0) { - close(sd); + if (setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, // NOLINT + sizeof(reuse)) < 0) { return srp::core::ErrorCode::kError; } } - localInterface.s_addr = inet_addr(config.GetIp().c_str()); - if (setsockopt(sd, IPPROTO_IP, IP_MULTICAST_IF, - (char *)&localInterface, // NOLINT - sizeof(localInterface)) < 0) { + + struct timeval timeout; + timeout.tv_sec = 10; + timeout.tv_usec = 0; + + if (setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof timeout) < 0) { + return srp::core::ErrorCode::kError; + } + + memset((char *)&localSock, 0, sizeof(localSock)); // NOLINT + localSock.sin_family = AF_INET; + localSock.sin_port = htons(port_id_); + localSock.sin_addr.s_addr = INADDR_ANY; + + if (bind(sd, (struct sockaddr *)&localSock, sizeof(localSock))) { + close(sd); + return srp::core::ErrorCode::kError; + } + + group.imr_multiaddr.s_addr = inet_addr(multicast_ip_.c_str()); + group.imr_interface.s_addr = inet_addr(local_ip_.c_str()); + if (setsockopt(sd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char *)&group, // NOLINT + sizeof(group)) < 0) { + close(sd); return srp::core::ErrorCode::kError; } return srp::core::ErrorCode::kOk; } -void UdpMulticastSocket::SetRXCallback(RXCallback callback) {} +void UdpMulticastSocket::SetRXCallback(RXCallback callback) { + this->callback_ = callback; +} -srp::core::ErrorCode UdpMulticastSocket::Transmit( - const std::string &ip, const std::uint16_t port, - std::vector payload) { +srp::core::ErrorCode UdpMulticastSocket::Transmit(const std::vector &payload) { std::uint8_t *buffor = new std::uint8_t[payload.size()]; std::copy(payload.begin(), payload.end(), buffor); if (sendto(sd, buffor, payload.size(), 0, (struct sockaddr *)&groupSock, @@ -67,9 +117,39 @@ srp::core::ErrorCode UdpMulticastSocket::Transmit( return srp::core::ErrorCode::kOk; } -void UdpMulticastSocket::StartRXThread() {} +void UdpMulticastSocket::StartRXThread() { + if (rx_thread != nullptr) { + return; + } + this->rx_thread = std::make_unique( + [&](std::stop_token stoken) { this->Loop(stoken); }); + pthread_setname_np(this->rx_thread->native_handle(), + "UdpMulticastSocket_RX_Thread"); +} -void UdpMulticastSocket::Loop(std::stop_token stoken) {} +void UdpMulticastSocket::Loop(std::stop_token stoken) { + const std::stop_callback stop_wait{stoken, + [this]() { shutdown(this->sd, SHUT_RD); }}; + while (!stoken.stop_requested()) { + struct sockaddr_in si_other; + int slen = sizeof(si_other); + std::array buffor; + const int32_t bytes_rec = + recvfrom(sd, buffor.data(), kBufforSize, 0, + (struct sockaddr *)&si_other, (socklen_t *)&slen); // NOLINT + if (bytes_rec > 0) { + if (this->callback_) { + std::ignore = std::async( + std::launch::async, [this, &si_other, &bytes_rec, &buffor]() { + this->callback_(std::string(inet_ntoa(si_other.sin_addr)), + htons(si_other.sin_port), + std::vector{buffor.begin(), + buffor.begin() + bytes_rec}); + }); + } + } + } +} } // namespace soc -} // namespace com +} // namespace com } // namespace srp diff --git a/communication-core/sockets/udp_multicast_socket.h b/communication-core/sockets/udp_multicast_socket.h index 7897aed6..803fd31b 100644 --- a/communication-core/sockets/udp_multicast_socket.h +++ b/communication-core/sockets/udp_multicast_socket.h @@ -12,66 +12,90 @@ #define COMMUNICATION_CORE_SOCKETS_UDP_MULTICAST_SOCKET_H_ #include #include -#include -#include -#include +#include #include -#include #include #include +#include #include #include #include // NOLINT #include -#include "communication-core/sockets/Isocket.h" +#include "communication-core/sockets/socket_config.h" +#include "core/common/error_code.h" + namespace srp { namespace com { namespace soc { -class UdpMulticastSocket : public ISocket { +class UdpMulticastSocket { + public: + using RXCallback = + std::function&)>; + private: struct in_addr localInterface; struct sockaddr_in groupSock; + struct sockaddr_in localSock; + struct ip_mreq group; + struct sockaddr_in srcaddr; + struct sockaddr_in cliaddr; int sd; + std::string local_ip_; + std::string multicast_ip_; + std::uint16_t port_id_; + + int s; /* s = socket */ + struct sockaddr_in in_addr; /* Structure used for bind() */ + struct sockaddr_in sock_addr; /* Output structure from getsockname */ + struct sockaddr_in src_addr; /* Used to receive (addr,port) of sender */ + int src_addr_len; /* Length of src_addr */ + int len; /* Length of result from getsockname */ + int mc_addr, port; + struct ip_mreq mreq; + struct hostent* host_entry_ptr; + char line[100]; + // int server_sock, len; // int bytes_rec = 0; // struct sockaddr_in server_sockaddr, peer_sock; - // std::unique_ptr rx_thread; + std::unique_ptr rx_thread; void Loop(std::stop_token stoken); - // RXCallback callback_; + RXCallback callback_; public: /** * @brief Socket init function * - * @param config Config file + * @pplatformm config Config file * @return core::ErrorCode initialiaze status */ - srp::core::ErrorCode Init(const SocketConfig& config) override; + srp::core::ErrorCode Init(const std::string& local_ip, + const std::string& multicast_ip, + const std::uint16_t port_id); /** * @brief Setter for rx callback * - * @param callback + * @pplatformm callback */ - void SetRXCallback(RXCallback callback) override; + void SetRXCallback(RXCallback callback); /** * @brief Function to send data by socket * - * @param ip target ip or path - * @param port target port or 0 for ipcs - * @param payload payload to send + * @pplatformm ip target ip or path + * @pplatformm port target port or 0 for ipcs + * @pplatformm payload payload to send * @return core::ErrorCode status */ - srp::core::ErrorCode Transmit(const std::string& ip, - const std::uint16_t port, - std::vector payload) override; + srp::core::ErrorCode Transmit(const std::vector& payload); /** * @brief This function start RX thread * */ - void StartRXThread() override; + void StartRXThread(); UdpMulticastSocket() = default; }; } // namespace soc diff --git a/communication-core/sockets/udp_socket.cc b/communication-core/sockets/udp_socket.cc index 3dc13d64..5e343f4e 100644 --- a/communication-core/sockets/udp_socket.cc +++ b/communication-core/sockets/udp_socket.cc @@ -1,42 +1,78 @@ /** * @file udp_socket.cc - * @author Mateusz Krajewski (matikrajek42@gmail.com) + * @author Bartosz Snieg (snieg45@gmail.com) * @brief * @version 0.1 - * @date 2023-11-17 + * @date 2024-11-26 * - * @copyright Copyright (c) 2023 + * @copyright Copyright (c) 2024 * */ - #include "communication-core/sockets/udp_socket.h" +#include + #include +#include // NOLINT +#include #include #include -#include "iostream" #include "unistd.h" - namespace srp { namespace com { namespace soc { -srp::core::ErrorCode UdpSocket::Init(const SocketConfig& config) { - memset(&server_sockaddr, 0, sizeof(server_sockaddr)); - server_sock = socket(AF_INET, SOCK_DGRAM, 0); - if (server_sock == -1) { - return srp::core::ErrorCode::kInitializeError; +namespace { +constexpr uint32_t kBufforSize{255 * 2}; +} // namespace + +srp::core::ErrorCode UdpSocket::Init(const std::string &local_ip, + const std::uint16_t port_id) { + local_ip_ = local_ip; + port_id_ = port_id; + + sd = socket(AF_INET, SOCK_DGRAM, 0); + if (sd < 0) { + return srp::core::ErrorCode::kError; + } + + srcaddr.sin_family = AF_INET; + srcaddr.sin_addr.s_addr = inet_addr(local_ip.c_str()); + srcaddr.sin_port = htons(port_id_); + + if (setsockopt(sd, IPPROTO_IP, IP_MULTICAST_IF, + (char *)&srcaddr, // NOLINT + sizeof(srcaddr)) < 0) { + return srp::core::ErrorCode::kError; + } + { + int reuse = 1; + + if (setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, // NOLINT + sizeof(reuse)) < 0) { + return srp::core::ErrorCode::kError; + } } - server_sockaddr.sin_family = AF_INET; - server_sockaddr.sin_addr.s_addr = inet_addr(config.GetIp().c_str()); - server_sockaddr.sin_port = htons(config.GetRxPort()); - this->len = sizeof(server_sockaddr); - unlink(config.GetIp().c_str()); - int rc = bind(server_sock, (struct sockaddr*)&server_sockaddr, len); - if (rc == -1) { - return srp::core::ErrorCode::kInitializeError; + + struct timeval timeout; + timeout.tv_sec = 10; + timeout.tv_usec = 0; + + if (setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof timeout) < 0) { + return srp::core::ErrorCode::kError; } + + memset((char *)&localSock, 0, sizeof(localSock)); // NOLINT + localSock.sin_family = AF_INET; + localSock.sin_port = htons(port_id_); + localSock.sin_addr.s_addr = INADDR_ANY; + + if (bind(sd, (struct sockaddr *)&localSock, sizeof(localSock))) { + close(sd); + return srp::core::ErrorCode::kError; + } + return srp::core::ErrorCode::kOk; } @@ -44,29 +80,30 @@ void UdpSocket::SetRXCallback(RXCallback callback) { this->callback_ = callback; } -srp::core::ErrorCode UdpSocket::Transmit(const std::string& ip, - const std::uint16_t port, - std::vector payload) { - int client_socket, rc; +void UdpSocket::Transmit(const uint32_t &ip, const uint16_t port, + const std::vector &payload) { + std::lock_guard lock{sending_m_}; struct sockaddr_in remote; memset(&remote, 0, sizeof(struct sockaddr_in)); - client_socket = socket(AF_INET, SOCK_DGRAM, 0); - if (client_socket == -1) { - return srp::core::ErrorCode::kError; - } remote.sin_family = AF_INET; - remote.sin_addr.s_addr = inet_addr(ip.c_str()); + remote.sin_addr.s_addr = ip; remote.sin_port = htons(port); - std::uint8_t* buffor = new std::uint8_t[payload.size()]; + + std::uint8_t *buffor = new std::uint8_t[payload.size()]; std::copy(payload.begin(), payload.end(), buffor); - rc = sendto(client_socket, buffor, payload.size(), 0, - (struct sockaddr*)&remote, sizeof(remote)); - delete[] buffor; - close(client_socket); - if (rc == -1) { - return srp::core::ErrorCode::kError; + + if (sendto(sd, buffor, payload.size(), 0, (struct sockaddr *)&remote, + sizeof(remote)) < 0) { + // delete[] buffor; + // return srp::core::ErrorCode::kError; } - return srp::core::ErrorCode::kOk; + delete[] buffor; + // return srp::core::ErrorCode::kOk; +} + +void UdpSocket::Transmit(const std::string &ip, const uint16_t port, + const std::vector &payload) { + this->Transmit(inet_addr(ip.c_str()), port, payload); } void UdpSocket::StartRXThread() { @@ -75,30 +112,32 @@ void UdpSocket::StartRXThread() { } this->rx_thread = std::make_unique( [&](std::stop_token stoken) { this->Loop(stoken); }); + pthread_setname_np(this->rx_thread->native_handle(), "UdpSocket_RX_Thread"); } void UdpSocket::Loop(std::stop_token stoken) { - struct timeval tv; - tv.tv_sec = 2; - tv.tv_usec = 0; - setsockopt(server_sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); - const std::stop_callback stop_wait{ - stoken, [this]() { shutdown(this->server_sock, SHUT_RDWR); }}; - while (true) { - std::array buffor; - bytes_rec = - recvfrom(server_sock, reinterpret_cast(&buffor), 256 * 2, 0, - (struct sockaddr*)&peer_sock, (socklen_t*)&len); // NOLINT - if (bytes_rec >= 0) { + const std::stop_callback stop_wait{stoken, + [this]() { shutdown(this->sd, SHUT_RD); }}; + while (!stoken.stop_requested()) { + struct sockaddr_in si_other; + int slen = sizeof(si_other); + std::array buffor; + const int32_t bytes_rec = + recvfrom(sd, buffor.data(), kBufforSize, 0, + (struct sockaddr *)&si_other, (socklen_t *)&slen); // NOLINT + if (bytes_rec > 0) { if (this->callback_) { - this->callback_( - "UDP", 0, - std::vector{buffor.begin(), buffor.begin() + bytes_rec}); + std::ignore = std::async( + std::launch::async, [this, &si_other, &bytes_rec, &buffor]() { + this->callback_(std::string(inet_ntoa(si_other.sin_addr)), + htons(si_other.sin_port), + std::vector{buffor.begin(), + buffor.begin() + bytes_rec}); + }); } } } - close(server_sock); } } // namespace soc -} // namespace com +} // namespace com } // namespace srp diff --git a/communication-core/sockets/udp_socket.h b/communication-core/sockets/udp_socket.h index 8b0ab762..4ca3431e 100644 --- a/communication-core/sockets/udp_socket.h +++ b/communication-core/sockets/udp_socket.h @@ -8,10 +8,11 @@ * @copyright Copyright (c) 2023 * */ -#ifndef COMMUNICATION_CORE_SOCKETS_UDP_SOCKET_H_ -#define COMMUNICATION_CORE_SOCKETS_UDP_SOCKET_H_ +#ifndef APPS_COMMON_SOMEIP_DEMON_CODE_COMMON_UDP_SOCKET_H_ +#define APPS_COMMON_SOMEIP_DEMON_CODE_COMMON_UDP_SOCKET_H_ #include #include +#include #include #include #include @@ -20,20 +21,50 @@ #include #include -#include +#include +#include // NOLINT +#include // NOLINT #include #include // NOLINT #include -#include "communication-core/sockets/Isocket.h" +#include "core/common/error_code.h" + namespace srp { namespace com { namespace soc { -class UdpSocket : public ISocket { +class UdpSocket { + public: + using RXCallback = + std::function&)>; + private: - int server_sock, len; - int bytes_rec = 0; - struct sockaddr_in server_sockaddr, peer_sock; + struct in_addr localInterface; + struct sockaddr_in groupSock; + struct sockaddr_in localSock; + struct ip_mreq group; + struct sockaddr_in srcaddr; + struct sockaddr_in cliaddr; + int sd; + std::string local_ip_; + std::string multicast_ip_; + std::uint16_t port_id_; + std::mutex sending_m_{}; + int s; /* s = socket */ + struct sockaddr_in in_addr; /* Structure used for bind() */ + struct sockaddr_in sock_addr; /* Output structure from getsockname */ + struct sockaddr_in src_addr; /* Used to receive (addr,port) of sender */ + int src_addr_len; /* Length of src_addr */ + int len; /* Length of result from getsockname */ + int mc_addr, port; + struct ip_mreq mreq; + struct hostent* host_entry_ptr; + char line[100]; + + // int server_sock, len; + // int bytes_rec = 0; + // struct sockaddr_in server_sockaddr, peer_sock; std::unique_ptr rx_thread; void Loop(std::stop_token stoken); @@ -43,35 +74,39 @@ class UdpSocket : public ISocket { /** * @brief Socket init function * - * @param config Config file + * @pplatformm config Config file * @return core::ErrorCode initialiaze status */ - srp::core::ErrorCode Init(const SocketConfig& config) override; + srp::core::ErrorCode Init(const std::string& local_ip, + const std::uint16_t port_id); /** * @brief Setter for rx callback * - * @param callback + * @pplatformm callback */ - void SetRXCallback(RXCallback callback) override; + void SetRXCallback(RXCallback callback); /** * @brief Function to send data by socket * - * @param ip target ip or path - * @param port target port or 0 for ipcs - * @param payload payload to send + * @pplatformm ip target ip or path + * @pplatformm port target port or 0 for ipcs + * @pplatformm payload payload to send * @return core::ErrorCode status */ - srp::core::ErrorCode Transmit(const std::string& ip, - const std::uint16_t port, - std::vector payload) override; + void Transmit(const std::string& ip, const uint16_t port, + const std::vector& payload); + + void Transmit(const uint32_t& ip, const uint16_t port, + const std::vector& payload); /** * @brief This function start RX thread * */ - void StartRXThread() override; + void StartRXThread(); + UdpSocket() = default; }; } // namespace soc } // namespace com } // namespace srp -#endif // COMMUNICATION_CORE_SOCKETS_UDP_SOCKET_H_ +#endif // APPS_COMMON_SOMEIP_DEMON_CODE_COMMON_UDP_SOCKET_H_ From e026068aad741c8477781b93af32b8e8d142f974 Mon Sep 17 00:00:00 2001 From: Wiktor Muller Date: Mon, 1 Jun 2026 14:31:30 +0200 Subject: [PATCH 2/2] Namespace quickfix in udp socket --- communication-core/sockets/udp_socket.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/communication-core/sockets/udp_socket.h b/communication-core/sockets/udp_socket.h index 4ca3431e..7dc05af7 100644 --- a/communication-core/sockets/udp_socket.h +++ b/communication-core/sockets/udp_socket.h @@ -8,8 +8,8 @@ * @copyright Copyright (c) 2023 * */ -#ifndef APPS_COMMON_SOMEIP_DEMON_CODE_COMMON_UDP_SOCKET_H_ -#define APPS_COMMON_SOMEIP_DEMON_CODE_COMMON_UDP_SOCKET_H_ +#ifndef COMMUNICATION_CORE_SOCKETS_UDP_SOCKET_H_ +#define COMMUNICATION_CORE_SOCKETS_UDP_SOCKET_H_ #include #include #include @@ -109,4 +109,4 @@ class UdpSocket { } // namespace com } // namespace srp -#endif // APPS_COMMON_SOMEIP_DEMON_CODE_COMMON_UDP_SOCKET_H_ +#endif // COMMUNICATION_CORE_SOCKETS_UDP_SOCKET_H_