Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,22 @@ install(

if ("examples" IN_LIST VCPKG_MANIFEST_FEATURES)
add_executable(stream_out examples/stream_out/main.cpp)
file(GLOB_RECURSE TEST_SOURCES "${CMAKE_CURRENT_SOURCE_DIR}/examples/stream_out/*.cpp")
target_sources(stream_out PRIVATE ${TEST_SOURCES})
target_include_directories(stream_out PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/examples)
target_link_libraries(stream_out PRIVATE ${PROJECT_NAME} science::scipp)
set_target_properties(stream_out PROPERTIES
RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/examples"
)

add_executable(stats examples/stats/main.cpp)
file(GLOB_RECURSE TEST_SOURCES "${CMAKE_CURRENT_SOURCE_DIR}/examples/stats/*.cpp")
target_sources(stats PRIVATE ${TEST_SOURCES})
target_include_directories(stats PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/examples)
target_link_libraries(stats PRIVATE ${PROJECT_NAME} science::scipp)
set_target_properties(stats PROPERTIES
RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/examples"
)
endif()

if ("tests" IN_LIST VCPKG_MANIFEST_FEATURES)
Expand Down
151 changes: 151 additions & 0 deletions examples/stats/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
#include <memory>
#include <chrono>

#include "science/scipp/status.h"
#include "science/synapse/channel.h"
#include "science/synapse/data.h"
#include "science/synapse/device.h"
#include "science/synapse/nodes/broadband_source.h"
#include "science/synapse/nodes/stream_out.h"
#include "packet_monitoring.h"

using science::libndtp::NDTPHeader;
using synapse::Ch;
using synapse::Config;
using synapse::Device;
using synapse::DeviceInfo;
using synapse::Electrodes;
using synapse::NodeType;
using synapse::Signal;
using synapse::StreamOut;
using synapse::SynapseData;
using synapse::NodeConfig;
using synapse::Node;

auto configure_stream(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr) -> science::Status {
const uint32_t N_CHANNELS = 32; // Using more channels for stats testing
if (stream_out_ptr == nullptr) {
return { science::StatusCode::kInvalidArgument, "stream out pointer is null" };
}

science::Status s;
DeviceInfo info;
s = device.info(&info);
if (!s.ok()) return s;

// Configure signal with more channels for statistics gathering
Signal signal{
Electrodes{
.channels = {},
.low_cutoff_hz = 500,
.high_cutoff_hz = 6000
}
};
auto& electrodes = std::get<Electrodes>(signal.signal);
electrodes.channels.reserve(N_CHANNELS);
for (unsigned int i = 0; i < N_CHANNELS; i++) {
electrodes.channels.push_back(Ch{
.id = i,
.electrode_id = i * 2,
.reference_id = i * 2 + 1
});
}

Config config;
auto broadband_source = std::make_shared<synapse::BroadbandSource>(100, 16, 30000, 20.0, signal);

NodeConfig stream_out_config;
auto* stream_out_proto = stream_out_config.mutable_stream_out();
stream_out_proto->set_multicast_group("224.0.0.115");

std::shared_ptr<Node> stream_out_node;
s = StreamOut::from_proto(stream_out_config, &stream_out_node);
if (!s.ok()) return s;

*stream_out_ptr = std::dynamic_pointer_cast<StreamOut>(stream_out_node);
if (!*stream_out_ptr) {
return { science::StatusCode::kInternal, "failed to cast stream out node" };
}

s = config.add_node(broadband_source);
if (!s.ok()) return s;

s = config.add_node(*stream_out_ptr);
if (!s.ok()) return s;

s = config.connect(broadband_source, *stream_out_ptr);
if (!s.ok()) return s;

s = device.configure(&config);
if (!s.ok()) return s;

std::cout << "Configured device..." << std::endl;

s = device.start();
if (!s.ok()) return s;

std::cout << "Started device..." << std::endl;
return s;
}

auto stream(const std::string& uri) -> int {
synapse::Device device(uri);
science::Status s;

std::shared_ptr<synapse::StreamOut> stream_out;
s = configure_stream(device, &stream_out);
if (!s.ok()) {
std::cout << "error configuring stream: ("
<< static_cast<int>(s.code()) << ") " << s.message() << std::endl;
return 1;
}

if (stream_out == nullptr) {
std::cout << "stream out node not initialized" << std::endl;
return 1;
}

std::cout << "Monitoring packet statistics..." << std::endl;

// Initialize packet monitor
PacketMonitor monitor;
monitor.start_monitoring();
auto last_stats_time = std::chrono::steady_clock::now();

while (true) {
size_t bytes_read;
NDTPHeader header;
SynapseData out;
s = stream_out->read(&out, &header, &bytes_read);
if (s.code() == science::StatusCode::kUnavailable) {
continue;
}

if (!s.ok()) {
std::cout << "error reading from stream out node: ("
<< static_cast<int>(s.code()) << ") " << s.message() << std::endl;
continue;
}

monitor.process_packet(header.seq_number, bytes_read);

auto now = std::chrono::steady_clock::now();
if (std::chrono::duration_cast<std::chrono::seconds>(now - last_stats_time).count() >= 1) {
monitor.print_stats();
last_stats_time = now;
}
}

return 0;
}

int main(int argc, char** argv) {
if (argc != 2) {
std::cout << "Usage: " << argv[0] << " <uri>" << std::endl;
std::cout << " uri: device URI (e.g., 192.168.0.1:647)" << std::endl;
return 1;
}

std::string uri = argv[1];
return stream(uri);
}
102 changes: 102 additions & 0 deletions examples/stats/packet_monitoring.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#include "stats/packet_monitoring.h"
#include <iomanip>
#include <sstream>

PacketMonitor::PacketMonitor()
: packet_count_(0)
, last_seq_number_(0)
, dropped_packets_(0)
, out_of_order_packets_(0)
, bytes_received_(0)
, bytes_received_in_interval_(0)
, last_jitter_(0)
, avg_jitter_(0) {}

void PacketMonitor::start_monitoring() {
start_time_ = std::chrono::steady_clock::now();
last_stats_time_ = start_time_;
last_bandwidth_time_ = start_time_;
}

bool PacketMonitor::process_packet(uint16_t seq_number, size_t bytes_read) {
auto now = std::chrono::steady_clock::now();

if (packet_count_ == 0) {
first_packet_time_ = now;
last_packet_time_ = now;
auto elapsed = std::chrono::duration<double>(now - start_time_).count();
std::cout << "First packet received after " << std::fixed << std::setprecision(3)
<< elapsed << " seconds\n\n";
} else {
// Calculate jitter
auto interval = std::chrono::duration<double>(now - last_packet_time_).count();
if (packet_count_ > 1) {
double jitter_diff = std::abs(interval - last_jitter_);
avg_jitter_ += (jitter_diff - avg_jitter_) / 16.0; // RFC 3550 algorithm
}
last_jitter_ = interval;
last_packet_time_ = now;

// Check for dropped or out-of-order packets
uint16_t expected = (last_seq_number_ + 1) % (1 << 16);
if (seq_number != expected) {
if (seq_number > expected) {
dropped_packets_ += (seq_number - expected) % (1 << 16);
} else {
out_of_order_packets_++;
}
}
}

packet_count_++;
bytes_received_ += bytes_read;
bytes_received_in_interval_ += bytes_read;
last_seq_number_ = seq_number;

return true;
}

void PacketMonitor::clear_line() const {
// Move to start of line and clear it
std::cout << "\r" << std::string(80, ' ') << "\r";
}

std::string PacketMonitor::format_stats() const {
auto now = std::chrono::steady_clock::now();
std::stringstream ss;

// Runtime
auto runtime = std::chrono::duration<double>(now - start_time_).count();
ss << "Runtime " << std::fixed << std::setprecision(1) << runtime << "s | ";

// Drop calculation
double drop_percent = (static_cast<double>(dropped_packets_) / std::max<uint64_t>(1, packet_count_)) * 100.0;
ss << "Dropped: " << dropped_packets_ << "/" << packet_count_
<< " (" << std::setprecision(1) << drop_percent << "%) | ";

// Bandwidth calculation
auto dt_sec = std::chrono::duration<double>(now - last_bandwidth_time_).count();
if (dt_sec > 0) {
double bytes_per_second = bytes_received_in_interval_ / dt_sec;
double megabits_per_second = (bytes_per_second * 8) / 1'000'000;
ss << "Mbit/sec: " << std::setprecision(1) << megabits_per_second << " | ";
}

// Jitter (in milliseconds)
double jitter_ms = avg_jitter_ * 1000;
ss << "Jitter: " << std::setprecision(2) << jitter_ms << " ms | ";

// Out of order packets
ss << "Out of Order: " << out_of_order_packets_;

return ss.str();
}

void PacketMonitor::print_stats() {
clear_line();
std::cout << format_stats() << std::flush;

// Reset interval counters
bytes_received_in_interval_ = 0;
last_bandwidth_time_ = std::chrono::steady_clock::now();
}
41 changes: 41 additions & 0 deletions examples/stats/packet_monitoring.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#pragma once

#include <chrono>
#include <cstdint>
#include <iostream>
#include <string>

class PacketMonitor {
public:
PacketMonitor();

void start_monitoring();
void print_stats();
bool process_packet(uint16_t seq_number, size_t bytes_read);

private:
// Packet tracking
uint64_t packet_count_;
uint16_t last_seq_number_;
uint64_t dropped_packets_;
uint64_t out_of_order_packets_;

// Timing metrics
std::chrono::steady_clock::time_point start_time_;
std::chrono::steady_clock::time_point first_packet_time_;
std::chrono::steady_clock::time_point last_packet_time_;
std::chrono::steady_clock::time_point last_stats_time_;

// Bandwidth tracking
uint64_t bytes_received_;
uint64_t bytes_received_in_interval_;
std::chrono::steady_clock::time_point last_bandwidth_time_;

// Jitter tracking
double last_jitter_;
double avg_jitter_;

// Helper methods
void clear_line() const;
std::string format_stats() const;
};
2 changes: 1 addition & 1 deletion examples/stream_out/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ auto stream_new(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr) -> s
}

Config config;
auto broadband_source = std::make_shared<synapse::BroadbandSource>(1, 16, 30000, 20.0, signal);
auto broadband_source = std::make_shared<synapse::BroadbandSource>(100, 16, 30000, 20.0, signal);
*stream_out_ptr = std::make_shared<synapse::StreamOut>("out", group);

s = config.add_node(broadband_source);
Expand Down
2 changes: 1 addition & 1 deletion include/science/synapse/nodes/stream_out.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class StreamOut : public UdpNode {
public:
StreamOut(const std::string& label, const std::string& multicast_group);

auto read(science::libndtp::SynapseData* out) -> science::Status;
auto read(science::libndtp::SynapseData* out, science::libndtp::NDTPHeader* header = nullptr, size_t* bytes_read = nullptr) -> science::Status;

[[nodiscard]] static auto from_proto(
const synapse::NodeConfig& proto,
Expand Down
15 changes: 12 additions & 3 deletions src/science/synapse/nodes/stream_out.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ using science::libndtp::SynapseData;

auto unpack(
const std::vector<uint8_t>& bytes,
SynapseData* data
SynapseData* data,
science::libndtp::NDTPHeader* header,
size_t* bytes_read
) -> science::Status {
NDTPMessage msg;
try {
Expand All @@ -30,6 +32,13 @@ auto unpack(
return { science::StatusCode::kInternal, "error unpacking NDTP message: " + std::string(e.what()) };
}

if (header != nullptr) {
*header = msg.header;
}
if (bytes_read != nullptr) {
*bytes_read = bytes.size();
}

switch (msg.header.data_type) {
case DataType::kBroadband:
*data = ElectricalBroadbandData::unpack(msg);
Expand Down Expand Up @@ -116,7 +125,7 @@ auto StreamOut::init() -> science::Status {
return {};
}

auto StreamOut::read(SynapseData* data) -> science::Status {
auto StreamOut::read(SynapseData* data, science::libndtp::NDTPHeader* header, size_t* bytes_read) -> science::Status {
if (!sock() || !addr()) {
auto s = init();
if (!s.ok()) {
Expand Down Expand Up @@ -155,7 +164,7 @@ auto StreamOut::read(SynapseData* data) -> science::Status {
}

buf.resize(rc);
return unpack(buf, data);
return unpack(buf, data, header, bytes_read);
}

auto StreamOut::p_to_proto(synapse::NodeConfig* proto) -> science::Status {
Expand Down
Loading
Loading