diff --git a/examples/stats/main.cpp b/examples/stats/main.cpp index 10a0b51..354d7b6 100644 --- a/examples/stats/main.cpp +++ b/examples/stats/main.cpp @@ -56,7 +56,8 @@ auto configure_stream(Device& device, std::shared_ptr* stream_out_ptr NodeConfig stream_out_config; auto* stream_out_proto = stream_out_config.mutable_stream_out(); - stream_out_proto->set_multicast_group("224.0.0.115"); + auto* udp_config = stream_out_proto->mutable_udp_unicast(); + udp_config->set_destination_port(StreamOut::DEFAULT_STREAM_OUT_PORT); std::shared_ptr stream_out_node; s = StreamOut::from_proto(stream_out_config, &stream_out_node); diff --git a/examples/stream_out/main.cpp b/examples/stream_out/main.cpp index b4f0f9b..b8fafe3 100644 --- a/examples/stream_out/main.cpp +++ b/examples/stream_out/main.cpp @@ -1,4 +1,5 @@ #include +#include #include "science/scipp/status.h" #include "science/synapse/channel.h" @@ -18,14 +19,16 @@ using synapse::NodeType; using synapse::Signal; using synapse::StreamOut; using synapse::SynapseData; +using synapse::NodeConfig; +using synapse::Node; auto stream_new(Device& device, std::shared_ptr* stream_out_ptr) -> science::Status { + const uint32_t N_CHANNELS = 10; if (stream_out_ptr == nullptr) { return { science::StatusCode::kInvalidArgument, "stream out pointer is null" }; } - std::string group = "224.0.0.10"; science::Status s; DeviceInfo info; s = device.info(&info); @@ -39,8 +42,8 @@ auto stream_new(Device& device, std::shared_ptr* stream_out_ptr) -> s } }; auto& electrodes = std::get(signal.signal); - electrodes.channels.reserve(19); - for (unsigned int i = 0; i < 19; i++) { + electrodes.channels.reserve(N_CHANNELS); + for (unsigned int i = 0; i < N_CHANNELS; i++) { electrodes.channels.push_back(Ch{ .id = i, .electrode_id = i * 2, @@ -50,7 +53,22 @@ auto stream_new(Device& device, std::shared_ptr* stream_out_ptr) -> s Config config; auto broadband_source = std::make_shared(100, 16, 30000, 20.0, signal); - *stream_out_ptr = std::make_shared("out", group); + + // Create StreamOut with explicit configuration + NodeConfig stream_out_config; + auto* stream_out_proto = stream_out_config.mutable_stream_out(); + auto* udp_config = stream_out_proto->mutable_udp_unicast(); + udp_config->set_destination_port(StreamOut::DEFAULT_STREAM_OUT_PORT); + stream_out_proto->set_label("Broadband Stream"); + + std::shared_ptr stream_out_node; + s = StreamOut::from_proto(stream_out_config, &stream_out_node); + if (!s.ok()) return s; + + *stream_out_ptr = std::dynamic_pointer_cast(stream_out_node); + if (!*stream_out_ptr) { + return { science::StatusCode::kInternal, "failed to cast stream out node" }; + } s = config.add_node(broadband_source); if (!s.ok()) return s; @@ -64,7 +82,10 @@ auto stream_new(Device& device, std::shared_ptr* stream_out_ptr) -> s s = device.configure(&config); if (!s.ok()) return s; + std::cout << "Configured device" << std::endl; + s = device.start(); + std::cout << "Started device" << std::endl; return s; } @@ -75,18 +96,17 @@ auto stream_existing(Device& device, std::shared_ptr* stream_out_ptr) } science::Status s; - DeviceInfo info; s = device.info(&info); if (!s.ok()) return s; - uint32_t stream_out_id = 0; // default id - std::string group; + uint32_t stream_out_id = 0; + NodeConfig stream_out_config; const auto& nodes = info.configuration().nodes(); for (const auto& node : nodes) { if (node.type() == NodeType::kStreamOut) { stream_out_id = node.id(); - group = node.stream_out().multicast_group(); + stream_out_config = node; break; } } @@ -95,9 +115,16 @@ auto stream_existing(Device& device, std::shared_ptr* stream_out_ptr) return { science::StatusCode::kNotFound, "no stream out node found" }; } - std::cout << "found stream out node with id " << stream_out_id << " and group " << group << std::endl; + std::shared_ptr stream_out_node; + s = StreamOut::from_proto(stream_out_config, &stream_out_node); + if (!s.ok()) return s; + + *stream_out_ptr = std::dynamic_pointer_cast(stream_out_node); + if (!*stream_out_ptr) { + return { science::StatusCode::kInternal, "failed to cast stream out node" }; + } - *stream_out_ptr = std::make_shared("out", group); + std::cout << "found stream out node with id " << stream_out_id << std::endl; Config config; s = config.add_node(*stream_out_ptr, stream_out_id); @@ -107,7 +134,7 @@ auto stream_existing(Device& device, std::shared_ptr* stream_out_ptr) if (!s.ok()) return s; return s; -} +} auto stream(const std::string& uri, bool configure) -> int { synapse::Device device(uri); @@ -121,7 +148,6 @@ auto stream(const std::string& uri, bool configure) -> int { << static_cast(s.code()) << ") " << s.message() << std::endl; return 1; } - } else { s = stream_existing(device, &stream_out); if (!s.ok()) { @@ -136,6 +162,7 @@ auto stream(const std::string& uri, bool configure) -> int { return 1; } + std::cout << "Reading..." << std::endl; while (true) { SynapseData out; s = stream_out->read(&out); @@ -185,16 +212,24 @@ auto stream(const std::string& uri, bool configure) -> int { std::cout << ss.str() << std::endl; } } else { - std::cout << "data type unknown" << std::endl; + std::cout << "received data of unknown type" << std::endl; } } - return 1; + return 0; } int main(int argc, char** argv) { - std::string uri = "192.168.0.1:647"; - stream(uri, false); + if (argc != 2 && argc != 3) { + std::cout << "Usage: " << argv[0] << " [--config]" << std::endl; + std::cout << " uri: device URI (e.g., 192.168.0.1:647)" << std::endl; + std::cout << " --config: optional flag to configure a new stream" << std::endl; + std::cout << " if omitted, uses existing stream" << std::endl; + return 1; + } - return 0; + std::string uri = argv[1]; + bool configure = (argc == 3 && std::string(argv[2]) == "--config"); + + return stream(uri, configure); } diff --git a/external/sciencecorp/synapse-api b/external/sciencecorp/synapse-api index 6b02951..22f14a2 160000 --- a/external/sciencecorp/synapse-api +++ b/external/sciencecorp/synapse-api @@ -1 +1 @@ -Subproject commit 6b02951bcad82241719853487eceebe78eb6835f +Subproject commit 22f14a204e007fbbb7695aea84c344b41f5b47dc diff --git a/include/science/synapse/nodes/stream_out.h b/include/science/synapse/nodes/stream_out.h index 7006549..6df0121 100644 --- a/include/science/synapse/nodes/stream_out.h +++ b/include/science/synapse/nodes/stream_out.h @@ -8,14 +8,19 @@ #include "science/libndtp/types.h" #include "science/scipp/status.h" #include "science/synapse/api/nodes/stream_out.pb.h" -#include "science/synapse/nodes/udp_node.h" +#include "science/synapse/node.h" namespace synapse { -class StreamOut : public UdpNode { +class StreamOut : public Node { public: - StreamOut(const std::string& label, const std::string& multicast_group); + static constexpr uint16_t DEFAULT_STREAM_OUT_PORT = 50038; + StreamOut(const std::string& destination_address = "", + uint16_t destination_port = DEFAULT_STREAM_OUT_PORT, + const std::string& label = ""); + ~StreamOut(); + auto init() -> science::Status; auto read(science::libndtp::SynapseData* out, science::libndtp::NDTPHeader* header = nullptr, size_t* bytes_read = nullptr) -> science::Status; [[nodiscard]] static auto from_proto( @@ -27,11 +32,13 @@ class StreamOut : public UdpNode { auto p_to_proto(synapse::NodeConfig* proto) -> science::Status override; private: - const std::string label_; - const std::string multicast_group_; + std::string destination_address_; + uint16_t destination_port_; + std::string label_; + int socket_ = 0; + std::optional addr_; - auto init() -> science::Status; - auto get_host(std::string* host) -> science::Status override; + static constexpr uint32_t SOCKET_BUFSIZE_BYTES = 5 * 1024 * 1024; // 5MB }; } // namespace synapse diff --git a/src/science/synapse/nodes/stream_out.cpp b/src/science/synapse/nodes/stream_out.cpp index 064810f..0b73cae 100644 --- a/src/science/synapse/nodes/stream_out.cpp +++ b/src/science/synapse/nodes/stream_out.cpp @@ -18,6 +18,43 @@ using science::libndtp::ElectricalBroadbandData; using science::libndtp::NDTPMessage; using science::libndtp::SynapseData; +const std::string LOCALHOST = "127.0.0.1"; + +static auto get_client_ip() -> std::string { + int sock = socket(AF_INET, SOCK_DGRAM, 0); + if (sock < 0) { + return LOCALHOST; + } + + struct sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(80); + addr.sin_addr.s_addr = inet_addr("8.8.8.8"); + + if (connect(sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) { + close(sock); + return LOCALHOST; + } + + struct sockaddr_in local_addr; + socklen_t len = sizeof(local_addr); + if (getsockname(sock, (struct sockaddr*)&local_addr, &len) < 0) { + close(sock); + return LOCALHOST; + } + + close(sock); + return inet_ntoa(local_addr.sin_addr); +} + +static auto sockaddr(const std::string& host, uint16_t port) -> sockaddr_in { + sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + inet_pton(AF_INET, host.c_str(), &addr.sin_addr); + return addr; +} + auto unpack( const std::vector& bytes, SynapseData* data, @@ -28,7 +65,6 @@ auto unpack( try { msg = NDTPMessage::unpack(bytes); } catch (const std::exception& e) { - std::cout << "Stream Out | error unpacking NDTP message: " << e.what() << std::endl; return { science::StatusCode::kInternal, "error unpacking NDTP message: " + std::string(e.what()) }; } @@ -53,52 +89,49 @@ auto unpack( return {}; } -StreamOut::StreamOut(const std::string& label, const std::string& multicast_group) : UdpNode(NodeType::kStreamOut), - label_(label), - multicast_group_(multicast_group) {} +StreamOut::StreamOut(const std::string& destination_address, + uint16_t destination_port, + const std::string& label) + : Node(NodeType::kStreamOut), + destination_address_(destination_address.empty() ? get_client_ip() : destination_address), + destination_port_(destination_port ? destination_port : DEFAULT_STREAM_OUT_PORT), + label_(label) {} -auto StreamOut::from_proto(const synapse::NodeConfig& proto, std::shared_ptr* node) -> science::Status { - if (!proto.has_stream_out()) { - return { science::StatusCode::kInvalidArgument, "missing stream_out config" }; +StreamOut::~StreamOut() { + if (socket_ > 0) { + close(socket_); } +} - const auto& config = proto.stream_out(); - const auto& label = config.label(); - - if (config.multicast_group().empty()) { - return { science::StatusCode::kInvalidArgument, "multicast_group is required but not set" }; +auto StreamOut::init() -> science::Status { + socket_ = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + if (socket_ < 0) { + return { science::StatusCode::kInternal, "error creating socket (code: " + std::to_string(socket_) + ")" }; } - const auto& multicast_group = config.multicast_group(); - *node = std::make_shared(label, multicast_group); - return {}; -} - -auto StreamOut::get_host(std::string* host) -> science::Status { - if (multicast_group_.empty()) { - return { science::StatusCode::kInvalidArgument, "multicast_group required but not set" }; + // Allow reuse for easy restart + int reuse = 1; + auto rc = setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); + if (rc < 0) { + return { science::StatusCode::kInternal, "error configuring SO_REUSEADDR (code: " + std::to_string(rc) + ")" }; } - *host = multicast_group_; - return {}; -} -auto StreamOut::init() -> science::Status { - auto s = UdpNode::init(); - if (!s.ok()) { - return s; + #ifdef SO_REUSEPORT + rc = setsockopt(socket_, SOL_SOCKET, SO_REUSEPORT, &reuse, sizeof(reuse)); + if (rc < 0) { + return { science::StatusCode::kInternal, "error configuring SO_REUSEPORT (code: " + std::to_string(rc) + ")" }; } + #endif - int rc = 0; - int on = 1; - - int flags = fcntl(sock(), F_GETFL, 0); + // Set non-blocking mode + int flags = fcntl(socket_, F_GETFL, 0); if (flags < 0) { return { science::StatusCode::kInternal, "error getting socket flags (code: " + std::to_string(flags) + ")" }; } - rc = fcntl(sock(), F_SETFL, flags | O_NONBLOCK); + rc = fcntl(socket_, F_SETFL, flags | O_NONBLOCK); if (rc < 0) { return { science::StatusCode::kInternal, @@ -106,34 +139,41 @@ auto StreamOut::init() -> science::Status { }; } - auto saddr = addr().value(); - - rc = bind(sock(), reinterpret_cast(&saddr), sizeof(saddr)); + // Try to set a large recv buffer + int bufsize = SOCKET_BUFSIZE_BYTES; + rc = setsockopt(socket_, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)); if (rc < 0) { - return { science::StatusCode::kInternal, "error binding socket (code: " + std::to_string(rc) + ")" }; + // continue } - ip_mreq mreq; - mreq.imr_multiaddr.s_addr = inet_addr(multicast_group_.c_str()); - mreq.imr_interface.s_addr = htonl(INADDR_ANY); + int actual_bufsize; + socklen_t size = sizeof(actual_bufsize); + rc = getsockopt(socket_, SOL_SOCKET, SO_RCVBUF, &actual_bufsize, &size); + if (rc == 0 && actual_bufsize < SOCKET_BUFSIZE_BYTES) { + // continue + } - rc = setsockopt(sock(), IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)); + addr_ = sockaddr(destination_address_, destination_port_); + + rc = bind(socket_, reinterpret_cast(&addr_.value()), sizeof(addr_.value())); if (rc < 0) { - return { science::StatusCode::kInternal, "error joining multicast group (code: " + std::to_string(rc) + ")" }; + return { science::StatusCode::kInternal, + "error binding socket to " + destination_address_ + ":" + std::to_string(destination_port_) + + " (code: " + std::to_string(rc) + ", errno: " + std::to_string(errno) + ")" }; } return {}; } auto StreamOut::read(SynapseData* data, science::libndtp::NDTPHeader* header, size_t* bytes_read) -> science::Status { - if (!sock() || !addr()) { + if (!socket_ || !addr_) { auto s = init(); if (!s.ok()) { return { s.code(), "error initializing socket: " + s.message() }; } } - auto saddr = addr().value(); + auto saddr = addr_.value(); socklen_t saddr_len = sizeof(saddr); fd_set readfds; @@ -142,11 +182,11 @@ auto StreamOut::read(SynapseData* data, science::libndtp::NDTPHeader* header, si tv.tv_usec = 1000; FD_ZERO(&readfds); - FD_SET(sock(), &readfds); - - int ready = select(sock() + 1, &readfds, nullptr, nullptr, &tv); + FD_SET(socket_, &readfds); + + int ready = select(socket_ + 1, &readfds, nullptr, nullptr, &tv); if (ready < 0) { - return { science::StatusCode::kInternal, "error in select (code: " + std::to_string(errno) + ")" }; + return { science::StatusCode::kInternal, "error waiting for data: " + std::string(strerror(errno)) }; } if (ready == 0) { return { science::StatusCode::kUnavailable, "no data available" }; @@ -154,7 +194,7 @@ auto StreamOut::read(SynapseData* data, science::libndtp::NDTPHeader* header, si std::vector buf; buf.resize(8192); - auto rc = recvfrom(sock(), buf.data(), buf.size(), 0, reinterpret_cast(&saddr), &saddr_len); + auto rc = recvfrom(socket_, buf.data(), buf.size(), 0, reinterpret_cast(&saddr), &saddr_len); if (rc < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { return { science::StatusCode::kUnavailable, "no data available" }; @@ -167,15 +207,45 @@ auto StreamOut::read(SynapseData* data, science::libndtp::NDTPHeader* header, si return unpack(buf, data, header, bytes_read); } +auto StreamOut::from_proto(const synapse::NodeConfig& proto, std::shared_ptr* node) -> science::Status { + if (!proto.has_stream_out()) { + return { science::StatusCode::kInvalidArgument, "missing stream_out config" }; + } + + const auto& config = proto.stream_out(); + const auto& label = config.label(); + + if (!config.has_udp_unicast()) { + // Use defaults + *node = std::make_shared("", DEFAULT_STREAM_OUT_PORT, label); + return {}; + } + + const auto& unicast = config.udp_unicast(); + std::string dest_addr = unicast.destination_address(); + uint16_t dest_port = unicast.destination_port(); + + if (dest_addr.empty()) { + dest_addr = get_client_ip(); + } + if (dest_port == 0) { + dest_port = DEFAULT_STREAM_OUT_PORT; + } + + *node = std::make_shared(dest_addr, dest_port, label); + return {}; +} + auto StreamOut::p_to_proto(synapse::NodeConfig* proto) -> science::Status { if (proto == nullptr) { return { science::StatusCode::kInvalidArgument, "proto ptr must not be null" }; } synapse::StreamOutConfig* config = proto->mutable_stream_out(); - + synapse::UDPUnicastConfig* unicast = config->mutable_udp_unicast(); + unicast->set_destination_address(destination_address_.empty() ? LOCALHOST : destination_address_); + unicast->set_destination_port(destination_port_); config->set_label(label_); - config->set_multicast_group(multicast_group_); return {}; } diff --git a/src/science/synapse/nodes/udp_node.cpp b/src/science/synapse/nodes/udp_node.cpp index 0245a8b..e3b20a8 100644 --- a/src/science/synapse/nodes/udp_node.cpp +++ b/src/science/synapse/nodes/udp_node.cpp @@ -10,7 +10,7 @@ namespace synapse { -auto sockaddr(const std::string& host, uint16_t port) -> sockaddr_in { +static auto sockaddr(const std::string& host, uint16_t port) -> sockaddr_in { sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(port); diff --git a/src/science/synapse/signal_config.cpp b/src/science/synapse/signal_config.cpp index f8dec5e..d963333 100644 --- a/src/science/synapse/signal_config.cpp +++ b/src/science/synapse/signal_config.cpp @@ -91,6 +91,7 @@ auto Signal::from_proto(const synapse::SignalConfig& proto, Signal* config) -> s config->signal = pixels; } else { + config->signal = {}; return { science::StatusCode::kInvalidArgument, "signal type not specified" }; } diff --git a/test/nodes/test_stream_out.cpp b/test/nodes/test_stream_out.cpp new file mode 100644 index 0000000..1724f6a --- /dev/null +++ b/test/nodes/test_stream_out.cpp @@ -0,0 +1,235 @@ +#include +#include +#include +#include +#include + +using namespace synapse; +using science::libndtp::NDTP_VERSION; +using science::libndtp::NDTPMessage; +using science::libndtp::NDTPHeader; +using science::libndtp::NDTPPayloadBroadband; +using science::libndtp::NDTPPayloadSpiketrain; +using science::libndtp::BinnedSpiketrainData; +using science::libndtp::ByteArray; +using science::libndtp::ElectricalBroadbandData; +using science::libndtp::SynapseData; + +class StreamOutTest : public ::testing::Test { +protected: + void SetUp() override { + } +}; + +TEST_F(StreamOutTest, Constructor) { + StreamOut stream_out("127.0.0.1", 12345, "test label"); + NodeConfig proto; + EXPECT_TRUE(stream_out.to_proto(&proto).ok()); + + EXPECT_TRUE(proto.has_stream_out()); + const auto& config = proto.stream_out(); + EXPECT_TRUE(config.has_udp_unicast()); + EXPECT_EQ(config.label(), "test label"); + EXPECT_EQ(config.udp_unicast().destination_address(), "127.0.0.1"); + EXPECT_EQ(config.udp_unicast().destination_port(), 12345); +} + +TEST_F(StreamOutTest, DefaultConstructor) { + StreamOut stream_out; + NodeConfig proto; + EXPECT_TRUE(stream_out.to_proto(&proto).ok()); + + EXPECT_TRUE(proto.has_stream_out()); + const auto& config = proto.stream_out(); + EXPECT_TRUE(config.has_udp_unicast()); + EXPECT_EQ(config.label(), ""); + EXPECT_EQ(config.udp_unicast().destination_port(), StreamOut::DEFAULT_STREAM_OUT_PORT); +} + +TEST_F(StreamOutTest, FromProto) { + NodeConfig proto; + auto* stream_out_config = proto.mutable_stream_out(); + auto* udp_config = stream_out_config->mutable_udp_unicast(); + udp_config->set_destination_address("192.168.1.1"); + udp_config->set_destination_port(9999); + stream_out_config->set_label("test label 123"); + + std::shared_ptr node; + EXPECT_TRUE(StreamOut::from_proto(proto, &node).ok()); + + // Convert back to proto to verify + NodeConfig result_proto; + EXPECT_TRUE(node->to_proto(&result_proto).ok()); + + EXPECT_TRUE(result_proto.has_stream_out()); + const auto& result_config = result_proto.stream_out(); + EXPECT_EQ(result_config.label(), "test label 123"); + EXPECT_EQ(result_config.udp_unicast().destination_address(), "192.168.1.1"); + EXPECT_EQ(result_config.udp_unicast().destination_port(), 9999); +} + +TEST_F(StreamOutTest, FromProtoDefaults) { + NodeConfig proto; + proto.mutable_stream_out(); // Empty config + + std::shared_ptr node; + EXPECT_TRUE(StreamOut::from_proto(proto, &node).ok()); + + NodeConfig result_proto; + EXPECT_TRUE(node->to_proto(&result_proto).ok()); + + EXPECT_TRUE(result_proto.has_stream_out()); + const auto& config = result_proto.stream_out(); + EXPECT_EQ(config.udp_unicast().destination_port(), StreamOut::DEFAULT_STREAM_OUT_PORT); +} + +TEST_F(StreamOutTest, FromProtoInvalid) { + NodeConfig proto; + // Don't set stream_out config + + std::shared_ptr node; + auto status = StreamOut::from_proto(proto, &node); + EXPECT_FALSE(status.ok()); + EXPECT_EQ(status.code(), science::StatusCode::kInvalidArgument); +} + +TEST_F(StreamOutTest, ReadBroadbandData) { + StreamOut stream_out("127.0.0.1", 12345, "test"); + auto status = stream_out.init(); + EXPECT_TRUE(status.ok()) << status.message(); + + NDTPPayloadBroadband payload{ + .is_signed = true, + .bit_width = 16, + .ch_count = 2, + .sample_rate = 30000, + .channels = { + { + .channel_id = 1, + .channel_data = {1000, 2000, 3000} + }, + { + .channel_id = 2, + .channel_data = {4000, 5000, 6000} + } + } + }; + + NDTPMessage msg{ + .header = { + .version = NDTP_VERSION, + .data_type = synapse::DataType::kBroadband, + .timestamp = 123456789, + .seq_number = 1 + }, + .payload = payload + }; + + auto bytes = msg.pack(); + int sock = socket(AF_INET, SOCK_DGRAM, 0); + ASSERT_GT(sock, 0); + + struct sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(12345); + inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr); + + auto rc = sendto(sock, bytes.data(), bytes.size(), 0, + reinterpret_cast(&addr), sizeof(addr)); + ASSERT_EQ(rc, bytes.size()); + + SynapseData received_data; + status = stream_out.read(&received_data); + EXPECT_TRUE(status.ok()) << status.message(); + + ASSERT_TRUE(std::holds_alternative(received_data)); + const auto& broadband = std::get(received_data); + + EXPECT_EQ(broadband.sample_rate, 30000); + ASSERT_EQ(broadband.channels.size(), 2); + EXPECT_EQ(broadband.channels[0].channel_id, 1); + EXPECT_EQ(broadband.channels[1].channel_id, 2); + + std::vector expected_ch1 = {1000, 2000, 3000}; + std::vector expected_ch2 = {4000, 5000, 6000}; + for (size_t i = 0; i < expected_ch1.size(); i++) { + EXPECT_EQ(broadband.channels[0].channel_data[i], expected_ch1[i]); + } + for (size_t i = 0; i < expected_ch2.size(); i++) { + EXPECT_EQ(broadband.channels[1].channel_data[i], expected_ch2[i]); + } + + close(sock); +} + +TEST_F(StreamOutTest, ReadSpiketrainData) { + StreamOut stream_out("127.0.0.1", 12346, "test"); + auto status = stream_out.init(); + EXPECT_TRUE(status.ok()) << status.message(); + + NDTPPayloadSpiketrain payload{ + .bin_size_ms = 1, + .spike_counts = {3, 0, 2, 1} + }; + + NDTPMessage msg{ + .header = { + .version = NDTP_VERSION, + .data_type = synapse::DataType::kSpiketrain, + .timestamp = 123456789, + .seq_number = 1 + }, + .payload = payload + }; + + auto bytes = msg.pack(); + int sock = socket(AF_INET, SOCK_DGRAM, 0); + ASSERT_GT(sock, 0); + + struct sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(12346); + inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr); + + auto rc = sendto(sock, bytes.data(), bytes.size(), 0, + reinterpret_cast(&addr), sizeof(addr)); + ASSERT_EQ(rc, bytes.size()); + + SynapseData received_data; + status = stream_out.read(&received_data); + EXPECT_TRUE(status.ok()) << status.message(); + + ASSERT_TRUE(std::holds_alternative(received_data)); + const auto& spiketrain = std::get(received_data); + + EXPECT_EQ(spiketrain.bin_size_ms, 1); + EXPECT_EQ(spiketrain.spike_counts, std::vector({3, 0, 2, 1})); + + close(sock); +} + +TEST_F(StreamOutTest, ReadInvalidData) { + StreamOut stream_out("127.0.0.1", 12347, "test"); + auto status = stream_out.init(); + EXPECT_TRUE(status.ok()) << status.message(); + + std::vector invalid_data = {0x01, 0x02, 0x03}; + int sock = socket(AF_INET, SOCK_DGRAM, 0); + ASSERT_GT(sock, 0); + + struct sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(12347); + inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr); + + auto rc = sendto(sock, invalid_data.data(), invalid_data.size(), 0, + reinterpret_cast(&addr), sizeof(addr)); + ASSERT_EQ(rc, invalid_data.size()); + + SynapseData received_data; + status = stream_out.read(&received_data); + EXPECT_FALSE(status.ok()); + EXPECT_EQ(status.code(), science::StatusCode::kInternal); + + close(sock); +} diff --git a/vcpkg.json b/vcpkg.json index 3591a77..a071352 100644 --- a/vcpkg.json +++ b/vcpkg.json @@ -3,7 +3,7 @@ "name": "synapse", "version": "0.1.0", "supports": "arm64 | x64 | linux | osx", - "default-features": [], + "default-features": ["examples", "tests"], "dependencies": ["grpc", "protobuf", "science-libndtp", "science-scipp"], "vcpkg-configuration": { "overlay-ports": ["./external/sciencecorp/vcpkg/ports"]