Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/stats/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ auto configure_stream(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr

NodeConfig stream_out_config;
auto* stream_out_proto = stream_out_config.mutable_stream_out();
stream_out_proto->set_multicast_group("224.0.0.115");
auto* udp_config = stream_out_proto->mutable_udp_unicast();
udp_config->set_destination_port(StreamOut::DEFAULT_STREAM_OUT_PORT);

std::shared_ptr<Node> stream_out_node;
s = StreamOut::from_proto(stream_out_config, &stream_out_node);
Expand Down
69 changes: 52 additions & 17 deletions examples/stream_out/main.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <memory>
#include <chrono>

#include "science/scipp/status.h"
#include "science/synapse/channel.h"
Expand All @@ -18,14 +19,16 @@ using synapse::NodeType;
using synapse::Signal;
using synapse::StreamOut;
using synapse::SynapseData;
using synapse::NodeConfig;
using synapse::Node;


auto stream_new(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr) -> science::Status {
const uint32_t N_CHANNELS = 10;
if (stream_out_ptr == nullptr) {
return { science::StatusCode::kInvalidArgument, "stream out pointer is null" };
}

std::string group = "224.0.0.10";
science::Status s;
DeviceInfo info;
s = device.info(&info);
Expand All @@ -39,8 +42,8 @@ auto stream_new(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr) -> s
}
};
auto& electrodes = std::get<Electrodes>(signal.signal);
electrodes.channels.reserve(19);
for (unsigned int i = 0; i < 19; i++) {
electrodes.channels.reserve(N_CHANNELS);
for (unsigned int i = 0; i < N_CHANNELS; i++) {
electrodes.channels.push_back(Ch{
.id = i,
.electrode_id = i * 2,
Expand All @@ -50,7 +53,22 @@ auto stream_new(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr) -> s

Config config;
auto broadband_source = std::make_shared<synapse::BroadbandSource>(100, 16, 30000, 20.0, signal);
*stream_out_ptr = std::make_shared<synapse::StreamOut>("out", group);

// Create StreamOut with explicit configuration
NodeConfig stream_out_config;
auto* stream_out_proto = stream_out_config.mutable_stream_out();
auto* udp_config = stream_out_proto->mutable_udp_unicast();
udp_config->set_destination_port(StreamOut::DEFAULT_STREAM_OUT_PORT);
stream_out_proto->set_label("Broadband Stream");

std::shared_ptr<Node> stream_out_node;
s = StreamOut::from_proto(stream_out_config, &stream_out_node);
if (!s.ok()) return s;

*stream_out_ptr = std::dynamic_pointer_cast<StreamOut>(stream_out_node);
if (!*stream_out_ptr) {
return { science::StatusCode::kInternal, "failed to cast stream out node" };
}

s = config.add_node(broadband_source);
if (!s.ok()) return s;
Expand All @@ -64,7 +82,10 @@ auto stream_new(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr) -> s
s = device.configure(&config);
if (!s.ok()) return s;

std::cout << "Configured device" << std::endl;

s = device.start();
std::cout << "Started device" << std::endl;

return s;
}
Expand All @@ -75,18 +96,17 @@ auto stream_existing(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr)
}

science::Status s;

DeviceInfo info;
s = device.info(&info);
if (!s.ok()) return s;

uint32_t stream_out_id = 0; // default id
std::string group;
uint32_t stream_out_id = 0;
NodeConfig stream_out_config;
const auto& nodes = info.configuration().nodes();
for (const auto& node : nodes) {
if (node.type() == NodeType::kStreamOut) {
stream_out_id = node.id();
group = node.stream_out().multicast_group();
stream_out_config = node;
break;
}
}
Expand All @@ -95,9 +115,16 @@ auto stream_existing(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr)
return { science::StatusCode::kNotFound, "no stream out node found" };
}

std::cout << "found stream out node with id " << stream_out_id << " and group " << group << std::endl;
std::shared_ptr<Node> stream_out_node;
s = StreamOut::from_proto(stream_out_config, &stream_out_node);
if (!s.ok()) return s;

*stream_out_ptr = std::dynamic_pointer_cast<StreamOut>(stream_out_node);
if (!*stream_out_ptr) {
return { science::StatusCode::kInternal, "failed to cast stream out node" };
}

*stream_out_ptr = std::make_shared<synapse::StreamOut>("out", group);
std::cout << "found stream out node with id " << stream_out_id << std::endl;

Config config;
s = config.add_node(*stream_out_ptr, stream_out_id);
Expand All @@ -107,7 +134,7 @@ auto stream_existing(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr)
if (!s.ok()) return s;

return s;
}
}

auto stream(const std::string& uri, bool configure) -> int {
synapse::Device device(uri);
Expand All @@ -121,7 +148,6 @@ auto stream(const std::string& uri, bool configure) -> int {
<< static_cast<int>(s.code()) << ") " << s.message() << std::endl;
return 1;
}

} else {
s = stream_existing(device, &stream_out);
if (!s.ok()) {
Expand All @@ -136,6 +162,7 @@ auto stream(const std::string& uri, bool configure) -> int {
return 1;
}

std::cout << "Reading..." << std::endl;
while (true) {
SynapseData out;
s = stream_out->read(&out);
Expand Down Expand Up @@ -185,16 +212,24 @@ auto stream(const std::string& uri, bool configure) -> int {
std::cout << ss.str() << std::endl;
}
} else {
std::cout << "data type unknown" << std::endl;
std::cout << "received data of unknown type" << std::endl;
}
}

return 1;
return 0;
}

int main(int argc, char** argv) {
std::string uri = "192.168.0.1:647";
stream(uri, false);
if (argc != 2 && argc != 3) {
std::cout << "Usage: " << argv[0] << " <uri> [--config]" << std::endl;
std::cout << " uri: device URI (e.g., 192.168.0.1:647)" << std::endl;
std::cout << " --config: optional flag to configure a new stream" << std::endl;
std::cout << " if omitted, uses existing stream" << std::endl;
return 1;
}

return 0;
std::string uri = argv[1];
bool configure = (argc == 3 && std::string(argv[2]) == "--config");

return stream(uri, configure);
}
21 changes: 14 additions & 7 deletions include/science/synapse/nodes/stream_out.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,19 @@
#include "science/libndtp/types.h"
#include "science/scipp/status.h"
#include "science/synapse/api/nodes/stream_out.pb.h"
#include "science/synapse/nodes/udp_node.h"
#include "science/synapse/node.h"

namespace synapse {

class StreamOut : public UdpNode {
class StreamOut : public Node {
public:
StreamOut(const std::string& label, const std::string& multicast_group);
static constexpr uint16_t DEFAULT_STREAM_OUT_PORT = 50038;
StreamOut(const std::string& destination_address = "",
uint16_t destination_port = DEFAULT_STREAM_OUT_PORT,
const std::string& label = "");
~StreamOut();

auto init() -> science::Status;
auto read(science::libndtp::SynapseData* out, science::libndtp::NDTPHeader* header = nullptr, size_t* bytes_read = nullptr) -> science::Status;

[[nodiscard]] static auto from_proto(
Expand All @@ -27,11 +32,13 @@ class StreamOut : public UdpNode {
auto p_to_proto(synapse::NodeConfig* proto) -> science::Status override;

private:
const std::string label_;
const std::string multicast_group_;
std::string destination_address_;
uint16_t destination_port_;
std::string label_;
int socket_ = 0;
std::optional<sockaddr_in> addr_;

auto init() -> science::Status;
auto get_host(std::string* host) -> science::Status override;
static constexpr uint32_t SOCKET_BUFSIZE_BYTES = 5 * 1024 * 1024; // 5MB
};

} // namespace synapse
Loading