From 29de59149d6c9b3592223de30790f645abde3da1 Mon Sep 17 00:00:00 2001 From: Antonia Elsen Date: Tue, 11 Mar 2025 17:23:07 -0700 Subject: [PATCH 1/2] feat: add packet monitoring 'stats' example --- CMakeLists.txt | 12 ++ examples/stats/main.cpp | 151 +++++++++++++++++++++ examples/stats/packet_monitoring.cpp | 102 ++++++++++++++ examples/stats/packet_monitoring.h | 41 ++++++ include/science/synapse/nodes/stream_out.h | 2 +- src/science/synapse/nodes/stream_out.cpp | 15 +- src/science/synapse/signal_config.cpp | 37 +++++ 7 files changed, 356 insertions(+), 4 deletions(-) create mode 100644 examples/stats/main.cpp create mode 100644 examples/stats/packet_monitoring.cpp create mode 100644 examples/stats/packet_monitoring.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 3f668e2..3414420 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -112,10 +112,22 @@ install( if ("examples" IN_LIST VCPKG_MANIFEST_FEATURES) add_executable(stream_out examples/stream_out/main.cpp) + file(GLOB_RECURSE TEST_SOURCES "${CMAKE_CURRENT_SOURCE_DIR}/examples/stream_out/*.cpp") + target_sources(stream_out PRIVATE ${TEST_SOURCES}) + target_include_directories(stream_out PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/examples) target_link_libraries(stream_out PRIVATE ${PROJECT_NAME} science::scipp) set_target_properties(stream_out PROPERTIES RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/examples" ) + + add_executable(stats examples/stats/main.cpp) + file(GLOB_RECURSE TEST_SOURCES "${CMAKE_CURRENT_SOURCE_DIR}/examples/stats/*.cpp") + target_sources(stats PRIVATE ${TEST_SOURCES}) + target_include_directories(stats PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/examples) + target_link_libraries(stats PRIVATE ${PROJECT_NAME} science::scipp) + set_target_properties(stats PROPERTIES + RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/examples" + ) endif() if ("tests" IN_LIST VCPKG_MANIFEST_FEATURES) diff --git a/examples/stats/main.cpp b/examples/stats/main.cpp new file mode 100644 index 0000000..7217b61 --- /dev/null +++ b/examples/stats/main.cpp @@ -0,0 +1,151 @@ +#include +#include + +#include "science/scipp/status.h" +#include "science/synapse/channel.h" +#include "science/synapse/data.h" +#include "science/synapse/device.h" +#include "science/synapse/nodes/broadband_source.h" +#include "science/synapse/nodes/stream_out.h" +#include "packet_monitoring.h" + +using science::libndtp::NDTPHeader; +using synapse::Ch; +using synapse::Config; +using synapse::Device; +using synapse::DeviceInfo; +using synapse::Electrodes; +using synapse::NodeType; +using synapse::Signal; +using synapse::StreamOut; +using synapse::SynapseData; +using synapse::NodeConfig; +using synapse::Node; + +auto configure_stream(Device& device, std::shared_ptr* stream_out_ptr) -> science::Status { + const uint32_t N_CHANNELS = 32; // Using more channels for stats testing + if (stream_out_ptr == nullptr) { + return { science::StatusCode::kInvalidArgument, "stream out pointer is null" }; + } + + science::Status s; + DeviceInfo info; + s = device.info(&info); + if (!s.ok()) return s; + + // Configure signal with more channels for statistics gathering + Signal signal{ + Electrodes{ + .channels = {}, + .low_cutoff_hz = 500, + .high_cutoff_hz = 6000 + } + }; + auto& electrodes = std::get(signal.signal); + electrodes.channels.reserve(N_CHANNELS); + for (unsigned int i = 0; i < N_CHANNELS; i++) { + electrodes.channels.push_back(Ch{ + .id = i, + .electrode_id = i * 2, + .reference_id = i * 2 + 1 + }); + } + + Config config; + auto broadband_source = std::make_shared(1, 16, 30000, 20.0, signal); + + NodeConfig stream_out_config; + auto* stream_out_proto = stream_out_config.mutable_stream_out(); + stream_out_proto->set_multicast_group("224.0.0.115"); + + std::shared_ptr stream_out_node; + s = StreamOut::from_proto(stream_out_config, &stream_out_node); + if (!s.ok()) return s; + + *stream_out_ptr = std::dynamic_pointer_cast(stream_out_node); + if (!*stream_out_ptr) { + return { science::StatusCode::kInternal, "failed to cast stream out node" }; + } + + s = config.add_node(broadband_source); + if (!s.ok()) return s; + + s = config.add_node(*stream_out_ptr); + if (!s.ok()) return s; + + s = config.connect(broadband_source, *stream_out_ptr); + if (!s.ok()) return s; + + s = device.configure(&config); + if (!s.ok()) return s; + + std::cout << "Configured device..." << std::endl; + + s = device.start(); + if (!s.ok()) return s; + + std::cout << "Started device..." << std::endl; + return s; +} + +auto stream(const std::string& uri) -> int { + synapse::Device device(uri); + science::Status s; + + std::shared_ptr stream_out; + s = configure_stream(device, &stream_out); + if (!s.ok()) { + std::cout << "error configuring stream: (" + << static_cast(s.code()) << ") " << s.message() << std::endl; + return 1; + } + + if (stream_out == nullptr) { + std::cout << "stream out node not initialized" << std::endl; + return 1; + } + + std::cout << "Monitoring packet statistics..." << std::endl; + + // Initialize packet monitor + PacketMonitor monitor; + monitor.start_monitoring(); + auto last_stats_time = std::chrono::steady_clock::now(); + + while (true) { + size_t bytes_read; + NDTPHeader header; + SynapseData out; + s = stream_out->read(&out, &header, &bytes_read); + if (s.code() == science::StatusCode::kUnavailable) { + continue; + } + + if (!s.ok()) { + std::cout << "error reading from stream out node: (" + << static_cast(s.code()) << ") " << s.message() << std::endl; + continue; + } + + monitor.process_packet(header.seq_number, bytes_read); + + auto now = std::chrono::steady_clock::now(); + if (std::chrono::duration_cast(now - last_stats_time).count() >= 1) { + monitor.print_stats(); + last_stats_time = now; + } + } + + return 0; +} + +int main(int argc, char** argv) { + if (argc != 2) { + std::cout << "Usage: " << argv[0] << " " << std::endl; + std::cout << " uri: device URI (e.g., 192.168.0.1:647)" << std::endl; + return 1; + } + + std::string uri = argv[1]; + return stream(uri); +} diff --git a/examples/stats/packet_monitoring.cpp b/examples/stats/packet_monitoring.cpp new file mode 100644 index 0000000..cf1d001 --- /dev/null +++ b/examples/stats/packet_monitoring.cpp @@ -0,0 +1,102 @@ +#include "stats/packet_monitoring.h" +#include +#include + +PacketMonitor::PacketMonitor() + : packet_count_(0) + , last_seq_number_(0) + , dropped_packets_(0) + , out_of_order_packets_(0) + , bytes_received_(0) + , bytes_received_in_interval_(0) + , last_jitter_(0) + , avg_jitter_(0) {} + +void PacketMonitor::start_monitoring() { + start_time_ = std::chrono::steady_clock::now(); + last_stats_time_ = start_time_; + last_bandwidth_time_ = start_time_; +} + +bool PacketMonitor::process_packet(uint16_t seq_number, size_t bytes_read) { + auto now = std::chrono::steady_clock::now(); + + if (packet_count_ == 0) { + first_packet_time_ = now; + last_packet_time_ = now; + auto elapsed = std::chrono::duration(now - start_time_).count(); + std::cout << "First packet received after " << std::fixed << std::setprecision(3) + << elapsed << " seconds\n\n"; + } else { + // Calculate jitter + auto interval = std::chrono::duration(now - last_packet_time_).count(); + if (packet_count_ > 1) { + double jitter_diff = std::abs(interval - last_jitter_); + avg_jitter_ += (jitter_diff - avg_jitter_) / 16.0; // RFC 3550 algorithm + } + last_jitter_ = interval; + last_packet_time_ = now; + + // Check for dropped or out-of-order packets + uint16_t expected = (last_seq_number_ + 1) % (1 << 16); + if (seq_number != expected) { + if (seq_number > expected) { + dropped_packets_ += (seq_number - expected) % (1 << 16); + } else { + out_of_order_packets_++; + } + } + } + + packet_count_++; + bytes_received_ += bytes_read; + bytes_received_in_interval_ += bytes_read; + last_seq_number_ = seq_number; + + return true; +} + +void PacketMonitor::clear_line() const { + // Move to start of line and clear it + std::cout << "\r" << std::string(80, ' ') << "\r"; +} + +std::string PacketMonitor::format_stats() const { + auto now = std::chrono::steady_clock::now(); + std::stringstream ss; + + // Runtime + auto runtime = std::chrono::duration(now - start_time_).count(); + ss << "Runtime " << std::fixed << std::setprecision(1) << runtime << "s | "; + + // Drop calculation + double drop_percent = (static_cast(dropped_packets_) / std::max(1, packet_count_)) * 100.0; + ss << "Dropped: " << dropped_packets_ << "/" << packet_count_ + << " (" << std::setprecision(1) << drop_percent << "%) | "; + + // Bandwidth calculation + auto dt_sec = std::chrono::duration(now - last_bandwidth_time_).count(); + if (dt_sec > 0) { + double bytes_per_second = bytes_received_in_interval_ / dt_sec; + double megabits_per_second = (bytes_per_second * 8) / 1'000'000; + ss << "Mbit/sec: " << std::setprecision(1) << megabits_per_second << " | "; + } + + // Jitter (in milliseconds) + double jitter_ms = avg_jitter_ * 1000; + ss << "Jitter: " << std::setprecision(2) << jitter_ms << " ms | "; + + // Out of order packets + ss << "Out of Order: " << out_of_order_packets_; + + return ss.str(); +} + +void PacketMonitor::print_stats() { + clear_line(); + std::cout << format_stats() << std::flush; + + // Reset interval counters + bytes_received_in_interval_ = 0; + last_bandwidth_time_ = std::chrono::steady_clock::now(); +} diff --git a/examples/stats/packet_monitoring.h b/examples/stats/packet_monitoring.h new file mode 100644 index 0000000..2d1c01a --- /dev/null +++ b/examples/stats/packet_monitoring.h @@ -0,0 +1,41 @@ +#pragma once + +#include +#include +#include +#include + +class PacketMonitor { + public: + PacketMonitor(); + + void start_monitoring(); + void print_stats(); + bool process_packet(uint16_t seq_number, size_t bytes_read); + + private: + // Packet tracking + uint64_t packet_count_; + uint16_t last_seq_number_; + uint64_t dropped_packets_; + uint64_t out_of_order_packets_; + + // Timing metrics + std::chrono::steady_clock::time_point start_time_; + std::chrono::steady_clock::time_point first_packet_time_; + std::chrono::steady_clock::time_point last_packet_time_; + std::chrono::steady_clock::time_point last_stats_time_; + + // Bandwidth tracking + uint64_t bytes_received_; + uint64_t bytes_received_in_interval_; + std::chrono::steady_clock::time_point last_bandwidth_time_; + + // Jitter tracking + double last_jitter_; + double avg_jitter_; + + // Helper methods + void clear_line() const; + std::string format_stats() const; +}; diff --git a/include/science/synapse/nodes/stream_out.h b/include/science/synapse/nodes/stream_out.h index 6f2d2db..7006549 100644 --- a/include/science/synapse/nodes/stream_out.h +++ b/include/science/synapse/nodes/stream_out.h @@ -16,7 +16,7 @@ class StreamOut : public UdpNode { public: StreamOut(const std::string& label, const std::string& multicast_group); - auto read(science::libndtp::SynapseData* out) -> science::Status; + auto read(science::libndtp::SynapseData* out, science::libndtp::NDTPHeader* header = nullptr, size_t* bytes_read = nullptr) -> science::Status; [[nodiscard]] static auto from_proto( const synapse::NodeConfig& proto, diff --git a/src/science/synapse/nodes/stream_out.cpp b/src/science/synapse/nodes/stream_out.cpp index a6bf57c..064810f 100644 --- a/src/science/synapse/nodes/stream_out.cpp +++ b/src/science/synapse/nodes/stream_out.cpp @@ -20,7 +20,9 @@ using science::libndtp::SynapseData; auto unpack( const std::vector& bytes, - SynapseData* data + SynapseData* data, + science::libndtp::NDTPHeader* header, + size_t* bytes_read ) -> science::Status { NDTPMessage msg; try { @@ -30,6 +32,13 @@ auto unpack( return { science::StatusCode::kInternal, "error unpacking NDTP message: " + std::string(e.what()) }; } + if (header != nullptr) { + *header = msg.header; + } + if (bytes_read != nullptr) { + *bytes_read = bytes.size(); + } + switch (msg.header.data_type) { case DataType::kBroadband: *data = ElectricalBroadbandData::unpack(msg); @@ -116,7 +125,7 @@ auto StreamOut::init() -> science::Status { return {}; } -auto StreamOut::read(SynapseData* data) -> science::Status { +auto StreamOut::read(SynapseData* data, science::libndtp::NDTPHeader* header, size_t* bytes_read) -> science::Status { if (!sock() || !addr()) { auto s = init(); if (!s.ok()) { @@ -155,7 +164,7 @@ auto StreamOut::read(SynapseData* data) -> science::Status { } buf.resize(rc); - return unpack(buf, data); + return unpack(buf, data, header, bytes_read); } auto StreamOut::p_to_proto(synapse::NodeConfig* proto) -> science::Status { diff --git a/src/science/synapse/signal_config.cpp b/src/science/synapse/signal_config.cpp index 3d9e716..f8dec5e 100644 --- a/src/science/synapse/signal_config.cpp +++ b/src/science/synapse/signal_config.cpp @@ -36,6 +36,10 @@ auto Pixels::to_proto(synapse::PixelConfig* proto) -> science::Status { if (proto == nullptr) { return { science::StatusCode::kInvalidArgument, "proto ptr must not be null" }; } + + for (const auto& pixel : pixel_mask) { + proto->add_pixel_mask(pixel); + } return {}; } @@ -43,6 +47,11 @@ auto Pixels::from_proto(const synapse::PixelConfig& proto, Pixels* config) -> sc if (!config) { return { science::StatusCode::kInvalidArgument, "missing config" }; } + + config->pixel_mask.clear(); + for (const auto& pixel : proto.pixel_mask()) { + config->pixel_mask.push_back(pixel); + } return {}; } @@ -50,6 +59,15 @@ auto Signal::to_proto(synapse::SignalConfig* proto) -> science::Status { if (proto == nullptr) { return { science::StatusCode::kInvalidArgument, "proto ptr must not be null" }; } + + if (std::holds_alternative(signal)) { + auto s = std::get(signal).to_proto(proto->mutable_electrode()); + if (!s.ok()) return s; + } else if (std::holds_alternative(signal)) { + auto s = std::get(signal).to_proto(proto->mutable_pixel()); + if (!s.ok()) return s; + } + return {}; } @@ -57,6 +75,25 @@ auto Signal::from_proto(const synapse::SignalConfig& proto, Signal* config) -> s if (!config) { return { science::StatusCode::kInvalidArgument, "missing config" }; } + + if (proto.has_electrode()) { + Electrodes electrodes; + auto s = Electrodes::from_proto(proto.electrode(), &electrodes); + + if (!s.ok()) return s; + config->signal = electrodes; + + } else if (proto.has_pixel()) { + Pixels pixels; + auto s = Pixels::from_proto(proto.pixel(), &pixels); + + if (!s.ok()) return s; + config->signal = pixels; + + } else { + return { science::StatusCode::kInvalidArgument, "signal type not specified" }; + } + return {}; } From 7778cbeae63fcbb682dbb4ba566112243b950a6a Mon Sep 17 00:00:00 2001 From: Antonia Elsen Date: Tue, 11 Mar 2025 18:34:42 -0700 Subject: [PATCH 2/2] chore: update testing peripheral --- examples/stats/main.cpp | 2 +- examples/stream_out/main.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/stats/main.cpp b/examples/stats/main.cpp index 7217b61..10a0b51 100644 --- a/examples/stats/main.cpp +++ b/examples/stats/main.cpp @@ -52,7 +52,7 @@ auto configure_stream(Device& device, std::shared_ptr* stream_out_ptr } Config config; - auto broadband_source = std::make_shared(1, 16, 30000, 20.0, signal); + auto broadband_source = std::make_shared(100, 16, 30000, 20.0, signal); NodeConfig stream_out_config; auto* stream_out_proto = stream_out_config.mutable_stream_out(); diff --git a/examples/stream_out/main.cpp b/examples/stream_out/main.cpp index 091a138..b4f0f9b 100644 --- a/examples/stream_out/main.cpp +++ b/examples/stream_out/main.cpp @@ -49,7 +49,7 @@ auto stream_new(Device& device, std::shared_ptr* stream_out_ptr) -> s } Config config; - auto broadband_source = std::make_shared(1, 16, 30000, 20.0, signal); + auto broadband_source = std::make_shared(100, 16, 30000, 20.0, signal); *stream_out_ptr = std::make_shared("out", group); s = config.add_node(broadband_source);