Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 101 additions & 21 deletions communication-core/sockets/udp_multicast_socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,51 +11,101 @@

#include "communication-core/sockets/udp_multicast_socket.h"

#include <arpa/inet.h>

#include <algorithm>
#include <future> // NOLINT
#include <iostream>
#include <string>
#include <vector>

#include "iostream"
#include "unistd.h"

namespace srp {
namespace com {
namespace soc {

srp::core::ErrorCode UdpMulticastSocket::Init(const SocketConfig &config) {
namespace {
constexpr uint32_t kBufforSize{255 * 2};
} // namespace

srp::core::ErrorCode UdpMulticastSocket::Init(const std::string &local_ip,
const std::string &multicast_ip,
const std::uint16_t port_id) {
local_ip_ = local_ip;
multicast_ip_ = multicast_ip;
port_id_ = port_id;

sd = socket(AF_INET, SOCK_DGRAM, 0);
if (sd < 0) {
return srp::core::ErrorCode::kError;
}

memset((char *)&groupSock, 0, sizeof(groupSock)); // NOLINT
groupSock.sin_family = AF_INET;
groupSock.sin_addr.s_addr = inet_addr("231.255.42.99");
groupSock.sin_port = htons(config.GetRxPort());
groupSock.sin_addr.s_addr = inet_addr(multicast_ip.c_str());
groupSock.sin_port = htons(port_id);
// {
// char loopch = 0;

// if (setsockopt(sd, IPPROTO_IP, IP_MULTICAST_LOOP,
// (char *)&loopch, // NOLINT
// sizeof(loopch)) < 0) {
// close(sd);
// return srp::core::ErrorCode::kError;
// }
// }

srcaddr.sin_family = AF_INET;
srcaddr.sin_addr.s_addr = inet_addr(local_ip.c_str());
srcaddr.sin_port = htons(port_id_);

if (setsockopt(sd, IPPROTO_IP, IP_MULTICAST_IF,
(char *)&srcaddr, // NOLINT
sizeof(srcaddr)) < 0) {
return srp::core::ErrorCode::kError;
}
{
char loopch = 0;
int reuse = 1;

if (setsockopt(sd, IPPROTO_IP, IP_MULTICAST_LOOP,
(char *)&loopch, // NOLINT
sizeof(loopch)) < 0) {
close(sd);
if (setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, // NOLINT
sizeof(reuse)) < 0) {
return srp::core::ErrorCode::kError;
}
}
localInterface.s_addr = inet_addr(config.GetIp().c_str());
if (setsockopt(sd, IPPROTO_IP, IP_MULTICAST_IF,
(char *)&localInterface, // NOLINT
sizeof(localInterface)) < 0) {

struct timeval timeout;
timeout.tv_sec = 10;
timeout.tv_usec = 0;

if (setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof timeout) < 0) {
return srp::core::ErrorCode::kError;
}

memset((char *)&localSock, 0, sizeof(localSock)); // NOLINT
localSock.sin_family = AF_INET;
localSock.sin_port = htons(port_id_);
localSock.sin_addr.s_addr = INADDR_ANY;

if (bind(sd, (struct sockaddr *)&localSock, sizeof(localSock))) {
close(sd);
return srp::core::ErrorCode::kError;
}

group.imr_multiaddr.s_addr = inet_addr(multicast_ip_.c_str());
group.imr_interface.s_addr = inet_addr(local_ip_.c_str());
if (setsockopt(sd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char *)&group, // NOLINT
sizeof(group)) < 0) {
close(sd);
return srp::core::ErrorCode::kError;
}
return srp::core::ErrorCode::kOk;
}

void UdpMulticastSocket::SetRXCallback(RXCallback callback) {}
void UdpMulticastSocket::SetRXCallback(RXCallback callback) {
this->callback_ = callback;
}

srp::core::ErrorCode UdpMulticastSocket::Transmit(
const std::string &ip, const std::uint16_t port,
std::vector<std::uint8_t> payload) {
srp::core::ErrorCode UdpMulticastSocket::Transmit(const std::vector<std::uint8_t> &payload) {
std::uint8_t *buffor = new std::uint8_t[payload.size()];
std::copy(payload.begin(), payload.end(), buffor);
if (sendto(sd, buffor, payload.size(), 0, (struct sockaddr *)&groupSock,
Expand All @@ -67,9 +117,39 @@ srp::core::ErrorCode UdpMulticastSocket::Transmit(
return srp::core::ErrorCode::kOk;
}

void UdpMulticastSocket::StartRXThread() {}
void UdpMulticastSocket::StartRXThread() {
if (rx_thread != nullptr) {
return;
}
this->rx_thread = std::make_unique<std::jthread>(
[&](std::stop_token stoken) { this->Loop(stoken); });
pthread_setname_np(this->rx_thread->native_handle(),
"UdpMulticastSocket_RX_Thread");
}

void UdpMulticastSocket::Loop(std::stop_token stoken) {}
void UdpMulticastSocket::Loop(std::stop_token stoken) {
const std::stop_callback stop_wait{stoken,
[this]() { shutdown(this->sd, SHUT_RD); }};
while (!stoken.stop_requested()) {
struct sockaddr_in si_other;
int slen = sizeof(si_other);
std::array<char, kBufforSize> buffor;
const int32_t bytes_rec =
recvfrom(sd, buffor.data(), kBufforSize, 0,
(struct sockaddr *)&si_other, (socklen_t *)&slen); // NOLINT
if (bytes_rec > 0) {
if (this->callback_) {
std::ignore = std::async(
std::launch::async, [this, &si_other, &bytes_rec, &buffor]() {
this->callback_(std::string(inet_ntoa(si_other.sin_addr)),
htons(si_other.sin_port),
std::vector<uint8_t>{buffor.begin(),
buffor.begin() + bytes_rec});
});
}
}
}
}
} // namespace soc
} // namespace com
} // namespace com
} // namespace srp
62 changes: 43 additions & 19 deletions communication-core/sockets/udp_multicast_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,66 +12,90 @@
#define COMMUNICATION_CORE_SOCKETS_UDP_MULTICAST_SOCKET_H_
#include <arpa/inet.h>
#include <errno.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <netdb.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/un.h>

#include <functional>
#include <memory>
#include <string>
#include <thread> // NOLINT
#include <vector>

#include "communication-core/sockets/Isocket.h"
#include "communication-core/sockets/socket_config.h"
#include "core/common/error_code.h"

namespace srp {
namespace com {
namespace soc {
class UdpMulticastSocket : public ISocket {
class UdpMulticastSocket {
public:
using RXCallback =
std::function<void(const std::string& ip, const std::uint16_t& port,
const std::vector<std::uint8_t>&)>;

private:
struct in_addr localInterface;
struct sockaddr_in groupSock;
struct sockaddr_in localSock;
struct ip_mreq group;
struct sockaddr_in srcaddr;
struct sockaddr_in cliaddr;
int sd;
std::string local_ip_;
std::string multicast_ip_;
std::uint16_t port_id_;

int s; /* s = socket */
struct sockaddr_in in_addr; /* Structure used for bind() */
struct sockaddr_in sock_addr; /* Output structure from getsockname */
struct sockaddr_in src_addr; /* Used to receive (addr,port) of sender */
int src_addr_len; /* Length of src_addr */
int len; /* Length of result from getsockname */
int mc_addr, port;
struct ip_mreq mreq;
struct hostent* host_entry_ptr;
char line[100];

// int server_sock, len;
// int bytes_rec = 0;
// struct sockaddr_in server_sockaddr, peer_sock;

// std::unique_ptr<std::jthread> rx_thread;
std::unique_ptr<std::jthread> rx_thread;
void Loop(std::stop_token stoken);
// RXCallback callback_;
RXCallback callback_;

public:
/**
* @brief Socket init function
*
* @param config Config file
* @pplatformm config Config file
* @return core::ErrorCode initialiaze status
*/
srp::core::ErrorCode Init(const SocketConfig& config) override;
srp::core::ErrorCode Init(const std::string& local_ip,
const std::string& multicast_ip,
const std::uint16_t port_id);
/**
* @brief Setter for rx callback
*
* @param callback
* @pplatformm callback
*/
void SetRXCallback(RXCallback callback) override;
void SetRXCallback(RXCallback callback);
/**
* @brief Function to send data by socket
*
* @param ip target ip or path
* @param port target port or 0 for ipcs
* @param payload payload to send
* @pplatformm ip target ip or path
* @pplatformm port target port or 0 for ipcs
* @pplatformm payload payload to send
* @return core::ErrorCode status
*/
srp::core::ErrorCode Transmit(const std::string& ip,
const std::uint16_t port,
std::vector<std::uint8_t> payload) override;
srp::core::ErrorCode Transmit(const std::vector<std::uint8_t>& payload);
/**
* @brief This function start RX thread
*
*/
void StartRXThread() override;
void StartRXThread();
UdpMulticastSocket() = default;
};
} // namespace soc
Expand Down
Loading
Loading