From 88cb09cdfbc0f7c0d7fc5f830cd9f1089a4ac3fa Mon Sep 17 00:00:00 2001 From: arminvoid <3612498+arminvoid@users.noreply.github.com> Date: Fri, 8 Aug 2025 18:30:29 +0200 Subject: [PATCH 1/3] remove stream_in/out nodes --- synapse/client/__init__.py | 2 - synapse/client/nodes/__init__.py | 4 - synapse/client/nodes/stream_in.py | 93 ------------ synapse/client/nodes/stream_out.py | 182 ------------------------ synapse/examples/stream_out.py | 88 ------------ synapse/server/nodes/__init__.py | 5 - synapse/server/nodes/spectral_filter.py | 3 +- synapse/server/nodes/stream_in.py | 60 -------- synapse/server/nodes/stream_out.py | 79 ---------- synapse/simulator/__init__.py | 17 +-- synapse/tests/test_stream_out.py | 164 --------------------- 11 files changed, 8 insertions(+), 689 deletions(-) delete mode 100644 synapse/client/nodes/stream_in.py delete mode 100644 synapse/client/nodes/stream_out.py delete mode 100644 synapse/examples/stream_out.py delete mode 100644 synapse/server/nodes/stream_in.py delete mode 100644 synapse/server/nodes/stream_out.py delete mode 100644 synapse/tests/test_stream_out.py diff --git a/synapse/client/__init__.py b/synapse/client/__init__.py index 6a659920..7236d045 100644 --- a/synapse/client/__init__.py +++ b/synapse/client/__init__.py @@ -15,5 +15,3 @@ from synapse.client.nodes.spike_binner import SpikeBinner from synapse.client.nodes.spike_detector import SpikeDetector from synapse.client.nodes.spectral_filter import SpectralFilter -from synapse.client.nodes.stream_in import StreamIn -from synapse.client.nodes.stream_out import StreamOut diff --git a/synapse/client/nodes/__init__.py b/synapse/client/nodes/__init__.py index 101fb9cd..07229127 100644 --- a/synapse/client/nodes/__init__.py +++ b/synapse/client/nodes/__init__.py @@ -5,8 +5,6 @@ from synapse.client.nodes.spike_binner import SpikeBinner from synapse.client.nodes.spike_detector import SpikeDetector from synapse.client.nodes.spike_source import SpikeSource -from synapse.client.nodes.stream_in import StreamIn -from synapse.client.nodes.stream_out import StreamOut from synapse.client.nodes.disk_writer import DiskWriter from synapse.client.nodes.application_node import ApplicationNode @@ -21,7 +19,5 @@ NodeType.kSpikeBinner: SpikeBinner, NodeType.kSpikeDetector: SpikeDetector, NodeType.kSpikeSource: SpikeSource, - NodeType.kStreamIn: StreamIn, - NodeType.kStreamOut: StreamOut, NodeType.kApplication: ApplicationNode, } diff --git a/synapse/client/nodes/stream_in.py b/synapse/client/nodes/stream_in.py deleted file mode 100644 index d33b38c2..00000000 --- a/synapse/client/nodes/stream_in.py +++ /dev/null @@ -1,93 +0,0 @@ -import socket -import time -from typing import List, Optional - -from synapse.api.datatype_pb2 import DataType -from synapse.api.node_pb2 import NodeConfig, NodeType -from synapse.api.nodes.stream_in_pb2 import StreamInConfig -from synapse.client.node import Node -from synapse.utils.ndtp_types import ElectricalBroadbandData, SpiketrainData, SynapseData - -MULTICAST_TTL = 3 - - -class StreamIn(Node): - type = NodeType.kStreamIn - - def __init__(self, data_type: DataType, shape: List[int]): - self.__sequence_number = 0 - self.__socket = socket.socket( - socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP - ) - self.data_type = data_type - self.shape = shape - - def write(self, data: SynapseData): - if self.device is None: - return False - - node_socket = next( - (s for s in self.device.sockets if s.node_id == self.id), None - ) - - if node_socket is None: - return False - - bind = node_socket.bind.split(":") - if len(bind) != 2: - return False - - host = bind[0] - port = bind[1] - if host is None: - return False - port = int(port) - - packets = self._pack(data) - - for packet in packets: - try: - self.__socket.sendto(packet, (host, port)) - # https://stackoverflow.com/questions/21973661/os-x-udp-send-error-55-no-buffer-space-available - time.sleep(0.00001) - except Exception as e: - print(f"Error sending data: {e}") - return True - - def _pack(self, data: SynapseData) -> List[bytes]: - packets = [] - if type(data) == ElectricalBroadbandData: - packets.append(data.pack(self.__sequence_number)) - self.__sequence_number = (self.__sequence_number + 1) & 0xFFFF - - elif type(data) == SpiketrainData: - packets.append(data.pack(self.__sequence_number)) - self.__sequence_number = (self.__sequence_number + 1) & 0xFFFF - - elif type(data) == bytes: - packets.append(data) - self.__sequence_number = (self.__sequence_number + 1) & 0xFFFF - - else: - raise ValueError(f"Unknown data type: {type(data)}") - - return packets - - def _to_proto(self): - n = NodeConfig() - i = StreamInConfig() - i.data_type = self.data_type - i.shape = self.shape - - n.stream_in.CopyFrom(i) - return n - - @staticmethod - def _from_proto(proto: Optional[StreamInConfig]): - if proto is None: - return StreamIn() - - if not isinstance(proto, StreamInConfig): - raise ValueError("proto is not of type StreamInConfig") - - return StreamIn(data_type=proto.data_type, shape=proto.shape) diff --git a/synapse/client/nodes/stream_out.py b/synapse/client/nodes/stream_out.py deleted file mode 100644 index 626c5009..00000000 --- a/synapse/client/nodes/stream_out.py +++ /dev/null @@ -1,182 +0,0 @@ -import logging -import socket -import traceback -from typing import Optional, Tuple - -from synapse.api.datatype_pb2 import DataType -from synapse.api.node_pb2 import NodeConfig, NodeType -from synapse.api.nodes.stream_out_pb2 import StreamOutConfig -from synapse.client.node import Node -from synapse.utils.ndtp import NDTPMessage -from synapse.utils.ndtp_types import ( - ElectricalBroadbandData, - SpiketrainData, - SynapseData, - NDTPHeader, -) - -DEFAULT_STREAM_OUT_PORT = 50038 -STREAM_OUT_TIMEOUT_SEC = 1 # seconds - - -# Try to get the current user's ip for setting the destination address -def get_client_ip(): - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - try: - # This won't actually establish a connection, but helps us figure out the ip - s.connect(("8.8.8.8", 80)) - local_ip = s.getsockname()[0] - except Exception as e: - logging.error(f"Failed to get client IP: {e}") - return None - finally: - s.close() - return local_ip - - -class StreamOut(Node): - type = NodeType.kStreamOut - - def __init__( - self, - label=None, - destination_address=None, - destination_port=None, - read_timeout=STREAM_OUT_TIMEOUT_SEC, - ): - self.__socket = None - self.__label = label - self.__read_timeout = read_timeout - - # If we have been passed a None for destination address, try to resolve it - if not destination_address: - self.__destination_address = get_client_ip() - else: - self.__destination_address = destination_address - - if not destination_port: - self.__destination_port = DEFAULT_STREAM_OUT_PORT - else: - self.__destination_port = destination_port - - def read(self) -> Tuple[Optional[Tuple[NDTPHeader, SynapseData]], int]: - if self.__socket is None: - if self.open_socket() is None: - return None - try: - data, _ = self.__socket.recvfrom(8192) - bytes_read = len(data) - except socket.timeout: - logging.warning("StreamOut socket timed out.") - return None - return self._unpack(data), bytes_read - - def open_socket(self): - logging.info( - f"Opening socket at {self.__destination_address}:{self.__destination_port}" - ) - if self.device is None: - logging.error("Node has no device") - return None - - # UDP socket - self.__socket = socket.socket( - socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP - ) - - # Allow reuse for easy restart - self.__socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - - # Try to set a large recv buffer - SOCKET_BUFSIZE_BYTES = 5 * 1024 * 1024 # 5MB - self.__socket.setsockopt( - socket.SOL_SOCKET, socket.SO_RCVBUF, SOCKET_BUFSIZE_BYTES - ) - recvbuf = self.__socket.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF) - if recvbuf < SOCKET_BUFSIZE_BYTES: - logging.warning( - f"Could not set socket buffer size to {SOCKET_BUFSIZE_BYTES}. Current size is {recvbuf}. Consider increasing the system limit." - ) - - # Set a timeout - self.__socket.settimeout(self.__read_timeout) - - # Bind to the destination address (our ip) and port - try: - self.__socket.bind((self.__destination_address, self.__destination_port)) - except Exception as e: - logging.error( - f"Failed to bind to {self.__destination_address}:{self.__destination_port}: {e}" - ) - return None - return self.__socket - - def _to_proto(self): - n = NodeConfig() - - o = StreamOutConfig() - if self.__label: - o.label = self.__label - else: - o.label = "Stream Out" - - if self.__destination_address: - o.udp_unicast.destination_address = self.__destination_address - - if self.__destination_port: - o.udp_unicast.destination_port = self.__destination_port - - n.stream_out.CopyFrom(o) - return n - - def _unpack(self, data: bytes) -> Tuple[NDTPHeader, SynapseData]: - u = None - try: - u = NDTPMessage.unpack(data) - - except Exception as e: - logging.error(f"Failed to unpack NDTPMessage: {e}") - traceback.print_exc() - - h = u.header - if h.data_type == DataType.kBroadband: - return h, ElectricalBroadbandData.from_ndtp_message(u) - elif h.data_type == DataType.kSpiketrain: - return h, SpiketrainData.from_ndtp_message(u) - else: - logging.error(f"Unknown data type: {h.data_type}") - return h, data - - @staticmethod - def _from_proto(proto: Optional[StreamOutConfig]): - if proto is None: - return StreamOut() - - if not isinstance(proto, StreamOutConfig): - raise ValueError("proto is not of type StreamOutConfig") - - # We currently only support udp unicast - selected_transport = proto.WhichOneof("transport") - - if selected_transport is None: - # Set the defaults - destination_address = get_client_ip() - destination_port = DEFAULT_STREAM_OUT_PORT - logging.info( - f"Requesting StreamOut to: {destination_address}:{destination_port}" - ) - return StreamOut(proto.label, destination_address, destination_port) - elif selected_transport == "udp_unicast": - dest_address = proto.udp_unicast.destination_address - dest_port = proto.udp_unicast.destination_port - if dest_address == "": - dest_address = get_client_ip() - if dest_port == 0: - dest_port = DEFAULT_STREAM_OUT_PORT - logging.info( - f"Using user provided StreamOut destination: {dest_address}:{dest_port}" - ) - return StreamOut(proto.label, dest_address, dest_port) - else: - logging.error(f"Unsupported transport: {selected_transport}") - return None diff --git a/synapse/examples/stream_out.py b/synapse/examples/stream_out.py deleted file mode 100644 index 82a9838d..00000000 --- a/synapse/examples/stream_out.py +++ /dev/null @@ -1,88 +0,0 @@ -import synapse as syn -import sys -import time - -SIMULATED_PERIPHERAL_ID = 100 - -if __name__ == "__main__": - uri = sys.argv[1] if len(sys.argv) > 1 else "127.0.0.1:647" - device = syn.Device(uri) - info = device.info() - assert info is not None, "Couldn't get device info" - - print("Device info:") - print(info) - - # The StreamOut node will automatically set your dest IP and port - stream_out = syn.StreamOut(label="my broadband") - - channels = [ - syn.Channel( - id=channel_num, - electrode_id=channel_num * 2, - reference_id=channel_num * 2 + 1, - ) - for channel_num in range(32) - ] - - broadband = syn.BroadbandSource( - # Use the simulated peripheral (100), or replace with your own - peripheral_id=SIMULATED_PERIPHERAL_ID, - sample_rate_hz=30000, - bit_width=12, - gain=20.0, - signal=syn.SignalConfig( - electrode=syn.ElectrodeConfig( - channels=channels, - low_cutoff_hz=500.0, - high_cutoff_hz=6000.0, - ) - ), - ) - - config = syn.Config() - config.add_node(stream_out) - config.add_node(broadband) - config.connect(broadband, stream_out) - - device.configure(config) - device.start() - - info = device.info() - assert info is not None, "Couldn't get device info" - print("Configured device info:") - print(info) - - should_run = True - total_bytes_read = 0 - start_time = time.time() - last_update_time = start_time - update_interval_sec = 1 - while should_run: - try: - # Wait for data - syn_data, bytes_read = stream_out.read() - if syn_data is None or bytes_read == 0: - print("Failed to read data from node") - continue - # Do something with the data - _header, _data = syn_data - total_bytes_read += bytes_read - - current_time = time.time() - if (current_time - last_update_time) >= update_interval_sec: - sys.stdout.write("\r") - sys.stdout.write( - f"{total_bytes_read} bytes in {time.time() - start_time:.2f} sec" - ) - last_update_time = current_time - - if current_time - start_time > 5: - should_run = False - - except KeyboardInterrupt: - print("Keyboard interrupt detected, stopping") - should_run = False - - print("Stopping device") - device.stop() diff --git a/synapse/server/nodes/__init__.py b/synapse/server/nodes/__init__.py index fe7d3207..f54686dc 100644 --- a/synapse/server/nodes/__init__.py +++ b/synapse/server/nodes/__init__.py @@ -1,11 +1,6 @@ from synapse.api.node_pb2 import NodeType -from synapse.server.nodes.base import BaseNode -from synapse.server.nodes.stream_in import StreamIn -from synapse.server.nodes.stream_out import StreamOut from synapse.server.nodes.spectral_filter import SpectralFilter SERVER_NODE_OBJECT_MAP = { - NodeType.kStreamIn: StreamIn, - NodeType.kStreamOut: StreamOut, NodeType.kSpectralFilter: SpectralFilter, } diff --git a/synapse/server/nodes/spectral_filter.py b/synapse/server/nodes/spectral_filter.py index 8f2decf4..3c1ad96b 100644 --- a/synapse/server/nodes/spectral_filter.py +++ b/synapse/server/nodes/spectral_filter.py @@ -1,4 +1,3 @@ -import queue from collections import defaultdict import numpy as np @@ -10,7 +9,7 @@ SpectralFilterConfig, SpectralFilterMethod, ) -from synapse.server.nodes import BaseNode +from synapse.server.nodes.base import BaseNode from synapse.server.status import Status from synapse.utils.ndtp_types import ElectricalBroadbandData, SynapseData diff --git a/synapse/server/nodes/stream_in.py b/synapse/server/nodes/stream_in.py deleted file mode 100644 index dbb76fa7..00000000 --- a/synapse/server/nodes/stream_in.py +++ /dev/null @@ -1,60 +0,0 @@ -import select -import socket - -from synapse.api.node_pb2 import NodeType -from synapse.api.nodes.stream_in_pb2 import StreamInConfig -from synapse.server.nodes.base import BaseNode -from synapse.server.status import Status - -from synapse.utils.ndtp_types import SynapseData - - -class StreamIn(BaseNode): - def __init__(self, id): - super().__init__(id, NodeType.kStreamIn) - - def config(self): - c = super().config() - - if self.__config: - c.stream_in.CopyFrom(self.__config) - - return c - - def configure(self, config: StreamInConfig) -> Status: - self.__config = config - - self.__socket = socket.socket( - socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP - ) - - self.__socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.__socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) - self.__socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 3) - - self.__socket.bind(("", 0)) - self.socket = self.__socket.getsockname() - self.logger.info(f"listening on {self.socket}") - - return Status() - - def configure_iface_ip(self, iface_ip): - pass - - async def run(self): - self.logger.info("starting to receive data...") - while self.running: - try: - ready = select.select([self.__socket], [], [], 1) - if ready[0]: - data, _ = self.__socket.recvfrom(1024) - unpacked = self._unpack(data) - await self.emit_data(unpacked) - except Exception as e: - self.logger.error(f"Error receiving data: {e}") - - self.logger.info("exited thread") - - def _unpack(self, data: bytes) -> SynapseData: - # TODO: fill this in using NDTP - return data diff --git a/synapse/server/nodes/stream_out.py b/synapse/server/nodes/stream_out.py deleted file mode 100644 index bb63f680..00000000 --- a/synapse/server/nodes/stream_out.py +++ /dev/null @@ -1,79 +0,0 @@ -import asyncio -import socket -from typing import List - -from synapse.api.node_pb2 import NodeType -from synapse.api.nodes.stream_out_pb2 import StreamOutConfig -from synapse.server.nodes.base import BaseNode -from synapse.server.status import Status -from synapse.utils.ndtp_types import SynapseData - - -class StreamOut(BaseNode): - __n = 0 - - def __init__(self, id): - super().__init__(id, NodeType.kStreamOut) - self.__i = StreamOut.__n - StreamOut.__n += 1 - self.__sequence_number = 0 - self.__config = None - self.socket_endpoint = None - - def config(self): - c = super().config() - - if self.__config: - c.stream_out.CopyFrom(self.__config) - - return c - - def configure(self, config: StreamOutConfig) -> Status: - self.__config = config - - if not config.udp_unicast: - self.logger.error( - "Cannot conifgure StreamOut, only udp unicast is supported" - ) - raise Exception("Only udp unicast is supported for streamout") - - self.__socket = socket.socket( - socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP - ) - self.__socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - - dest_address = self.__config.udp_unicast.destination_address - dest_port = self.__config.udp_unicast.destination_port - self.socket_endpoint = (dest_address, dest_port) - self.logger.info(f"created stream out socket on {self.__socket}") - - return Status() - - async def run(self): - loop = asyncio.get_running_loop() - while self.running: - if not self.socket_endpoint: - self.logger.error("socket not configured") - return - - data = await self.data_queue.get() - packets = self._pack(data) - - for packet in packets: - await loop.run_in_executor( - None, - self.__socket.sendto, - packet, - self.socket_endpoint, - ) - - def _pack(self, data: SynapseData) -> List[bytes]: - packets = [] - - try: - packets, seq = data.pack(self.__sequence_number) - self.__sequence_number = seq - except Exception as e: - raise ValueError(f"Error packing data: {e}") - - return packets diff --git a/synapse/simulator/__init__.py b/synapse/simulator/__init__.py index 01b28821..f470e438 100644 --- a/synapse/simulator/__init__.py +++ b/synapse/simulator/__init__.py @@ -4,22 +4,19 @@ from synapse.simulator.nodes.optical_stimulation import OpticalStimulation from synapse.simulator.nodes.spike_source import SpikeSource from synapse.server.nodes.spectral_filter import SpectralFilter -from synapse.server.nodes.stream_in import StreamIn -from synapse.server.nodes.stream_out import StreamOut SIMULATOR_NODE_OBJECT_MAP = { - NodeType.kBroadbandSource: BroadbandSource, - NodeType.kSpectralFilter: SpectralFilter, - NodeType.kSpikeSource: SpikeSource, - NodeType.kStreamIn: StreamIn, - NodeType.kStreamOut: StreamOut, - NodeType.kOpticalStimulation: OpticalStimulation + NodeType.kBroadbandSource: BroadbandSource, + NodeType.kSpectralFilter: SpectralFilter, + NodeType.kSpikeSource: SpikeSource, + NodeType.kOpticalStimulation: OpticalStimulation, } SIMULATOR_PERIPHERALS = [] SIMULATOR_DEFAULTS = ENTRY_DEFAULTS.copy() -SIMULATOR_DEFAULTS["device_serial"] = "SFI-SIM001" +SIMULATOR_DEFAULTS["device_serial"] = "SFI-SIM001" + def main(): - server(SIMULATOR_NODE_OBJECT_MAP, SIMULATOR_PERIPHERALS, ENTRY_DEFAULTS) + server(SIMULATOR_NODE_OBJECT_MAP, SIMULATOR_PERIPHERALS, ENTRY_DEFAULTS) diff --git a/synapse/tests/test_stream_out.py b/synapse/tests/test_stream_out.py deleted file mode 100644 index 14ad5fd0..00000000 --- a/synapse/tests/test_stream_out.py +++ /dev/null @@ -1,164 +0,0 @@ -import numpy as np -import pytest - -from synapse.server.nodes.stream_out import StreamOut -from synapse.utils.ndtp import NDTPMessage -from synapse.utils.ndtp_types import ( - MAX_CH_PAYLOAD_SIZE_BYTES, - chunk_channel_data, - ElectricalBroadbandData, - SpiketrainData, -) - - -def test_packing_broadband_data(): - node = StreamOut(id=1) - - assert node.__sequence_number == 0 - - # Signed - sample_data = [ - (1, np.array([-1000, 2000, 1000], dtype=np.int16)), - (2, np.array([1234, -1234, 1234, 1234], dtype=np.int16)), - (3, np.array([2000, -2000, 1000, 2000, -2000], dtype=np.int16)), - ] - bdata = ElectricalBroadbandData( - bit_width=16, - sample_rate=3, - t0=1234567890, - samples=sample_data, - is_signed=True - ) - - packed = node._pack(bdata) - - assert node.__sequence_number == 1 - - for i, p in enumerate(packed): - unpacked = NDTPMessage.unpack(p) - - assert unpacked.header.timestamp == bdata.t0 - assert unpacked.header.seq_number == i - - assert unpacked.payload.bit_width == 16 - assert unpacked.payload.channels[0].channel_id == bdata.samples[i][0] - assert list(unpacked.payload.channels[0].channel_data) == list( - bdata.samples[i][1] - ) - - # Unsigned - sample_data = [ - (1, np.array([1000, 2000, 3000], dtype=np.uint16)), - (2, np.array([1234, 1234, 1234, 1234], dtype=np.uint16)), - (3, np.array([1000, 2000, 3000, 4000, 3000], dtype=np.uint16)), - ] - bdata = ElectricalBroadbandData( - bit_width=12, - sample_rate=3, - t0=1234567890, - samples=sample_data, - is_signed=False - ) - - packed = node._pack(bdata) - - assert node.__sequence_number == 2 - - for i, p in enumerate(packed): - unpacked = NDTPMessage.unpack(p) - - -def test_packing_broadband_data(): - node = StreamOut(id=1) - assert node._StreamOut__sequence_number == 0 - - n_samples = 10000 - bit_width = 16 - - sample_data = [ - (1, np.array([i for i in range(n_samples)], dtype=np.int16)), - ] - bdata = ElectricalBroadbandData( - bit_width=bit_width, - sample_rate=36000, - t0=1234567890, - samples=sample_data, - is_signed=True - ) - - packed = node._pack(bdata) - seq = 0 - n_samples = 0 - ch_data = sample_data[0] - chunks = chunk_channel_data(bit_width, ch_data[1], MAX_CH_PAYLOAD_SIZE_BYTES) - - for chunk in chunks: - t_chunk = bdata.t0 + round(n_samples * 1e6 / bdata.sample_rate) - - p = packed[seq] - unpacked = NDTPMessage.unpack(p) - assert unpacked.header.timestamp == t_chunk - assert unpacked.header.seq_number == seq - - assert unpacked.payload.bit_width == bit_width - assert unpacked.payload.sample_rate == bdata.sample_rate - assert unpacked.payload.is_signed == bdata.is_signed - assert unpacked.payload.channels[0].channel_id == ch_data[0] - assert list(unpacked.payload.channels[0].channel_data) == list(chunk) - - n_samples += len(chunk) - if chunk is list(chunks)[-1]: # If this is the last chunk for this channel - n_samples = j - seq += 1 - - sample_data = [ - (1, np.array([i for i in range(n_samples)], dtype=np.uint16)), - ] - bdata = ElectricalBroadbandData( - bit_width=bit_width, - sample_rate=36000, - t0=1234567890, - samples=sample_data, - ) - - node = StreamOut(id=1) - assert node._StreamOut__sequence_number == 0 - - packed = node._pack(bdata) - seq = 0 - ch_data = sample_data[0] - chunks = chunk_channel_data(bit_width, ch_data[1], MAX_CH_PAYLOAD_SIZE_BYTES) - - for chunk in chunks: - p = packed[seq] - unpacked = NDTPMessage.unpack(p) - - assert unpacked.header.timestamp == bdata.t0 - assert unpacked.header.seq_number == seq - - assert unpacked.payload.bit_width == 16 - assert unpacked.payload.sample_rate == bdata.sample_rate - assert unpacked.payload.is_signed == bdata.is_signed - assert unpacked.payload.channels[0].channel_id == ch_data[0] - assert list(unpacked.payload.channels[0].channel_data) == list(chunk) - - seq += 1 - - -def test_packing_spiketrain_data(): - node = StreamOut(id=1) - - sdata = SpiketrainData( - t0=1234567890, - bin_size_ms=10, - spike_counts=[0, 1, 2, 3, 4, 5, 6], - ) - - packed = node._pack(sdata)[0] - unpacked = NDTPMessage.unpack(packed) - - assert unpacked.header.timestamp == sdata.t0 - assert unpacked.payload.bin_size_ms == sdata.bin_size_ms - assert len(unpacked.payload.spike_counts) == len(sdata.spike_counts) - - assert list(unpacked.payload.spike_counts) == list(sdata.spike_counts) From 2fee92f29235cc3a3ed101de106f1029267bd983 Mon Sep 17 00:00:00 2001 From: arminvoid <3612498+arminvoid@users.noreply.github.com> Date: Sat, 9 Aug 2025 09:22:49 +0200 Subject: [PATCH 2/3] remove all other references --- .github/workflows/main.yml | 2 +- README.md | 11 ++---- synapse-api | 2 +- synapse/tests/blink_ostim.py | 41 -------------------- synapse/tests/doom_synapse.py | 58 ---------------------------- synapse/tests/test_input_config.json | 16 +------- test_config.json | 14 +------ 7 files changed, 8 insertions(+), 136 deletions(-) delete mode 100644 synapse/tests/blink_ostim.py delete mode 100755 synapse/tests/doom_synapse.py diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index ce303f25..3f6a71eb 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -40,6 +40,6 @@ jobs: sleep 5 - python synapse/examples/stream_out.py 127.0.0.1:50051 + python synapse/examples/tap_example.py 127.0.0.1:50051 kill $(jobs -p) diff --git a/README.md b/README.md index 15489d2a..b38445bf 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ Includes `synapsectl` command line utility: % synapsectl --help usage: synapsectl [-h] [--uri URI] [--version] [--verbose] - {discover,info,query,start,stop,configure,logs,read,plot,file,taps,deploy} ... + {discover,info,query,start,stop,configure,logs,read,plot,file,taps,deploy,build,settings} ... Synapse Device Manager @@ -17,7 +17,7 @@ Includes `synapsectl` command line utility: --verbose, -v Enable verbose output Commands: - {discover,info,query,start,stop,configure,logs,read,plot,file,taps,deploy,build} + {discover,info,query,start,stop,configure,logs,read,plot,file,taps,deploy,build,settings} discover Discover Synapse devices on the network info Get device information query Execute a query on the device @@ -25,12 +25,13 @@ Includes `synapsectl` command line utility: stop Stop the device or an application configure Write a configuration to the device logs Get logs from the device - read Read from a device's StreamOut node + read Read from a device's Broadband Tap and save to HDF5 plot Plot recorded synapse data file File commands taps Interact with taps on the network deploy Deploy an application to a Synapse device build Cross-compile and package an application into a .deb without deploying + settings Manage the persistent device settings As well as the base for a device implementation (`synapse/server`), @@ -119,8 +120,6 @@ info = device.info() print("Device info: ", device.info()) -stream_out = syn.StreamOut(label="my broadband", multicast_group="224.0.0.1") - channels = [ syn.Channel( id=channel_num, @@ -144,9 +143,7 @@ broadband = syn.BroadbandSource( ) config = syn.Config() -config.add_node(stream_out) config.add_node(broadband) -config.connect(broadband, stream_out) device.configure(config) device.start() diff --git a/synapse-api b/synapse-api index 248ab7f6..d01d543f 160000 --- a/synapse-api +++ b/synapse-api @@ -1 +1 @@ -Subproject commit 248ab7f6eba47ea30392374befefb2084115abb7 +Subproject commit d01d543fb291cd8ff109ca0d0490bb2c1d0deda5 diff --git a/synapse/tests/blink_ostim.py b/synapse/tests/blink_ostim.py deleted file mode 100644 index d7a8b6d9..00000000 --- a/synapse/tests/blink_ostim.py +++ /dev/null @@ -1,41 +0,0 @@ -import time - - -from synapse.device import Device -from synapse.config import Config -from synapse.nodes.stream_in import StreamIn -from synapse.api.device_pb2 import DeviceConfiguration -from google.protobuf.json_format import Parse - -addr = "localhost:647" -dev = Device(addr) -with open("synapse/tests/test_input_config.json") as config_json: - config_proto = Parse(config_json.read(), DeviceConfiguration()) - print("Configuring device with the following configuration:") - dev_config = Config.from_proto(config_proto) - dev.configure(dev_config) - -input_node: StreamIn = dev_config.get_node(1) - -rows = 32 -cols = 64 - -print("Sending START...") -dev.start() - -on_frame = [0xFF for _ in range(rows * cols)] -off_frame = [0x00 for _ in range(rows * cols)] - -try: - while True: - # print(f"\nSending frame {frame_number} to MUX01:") - # print(f"len: {len(it)}. frame: {it}, type: {type(it[0])}") - - input_node.write(bytes(on_frame)) - time.sleep(0.5) - input_node.write(bytes(off_frame)) - time.sleep(0.5) - -except KeyboardInterrupt: - print("Stopping") - dev.stop() diff --git a/synapse/tests/doom_synapse.py b/synapse/tests/doom_synapse.py deleted file mode 100755 index 4027ddb4..00000000 --- a/synapse/tests/doom_synapse.py +++ /dev/null @@ -1,58 +0,0 @@ -import time - -import cv2 - -from synapse.device import Device -from synapse.config import Config -from synapse.nodes.stream_in import StreamIn -from synapse.api.device_pb2 import DeviceConfiguration -from google.protobuf.json_format import Parse - -addr = "localhost:647" -dev = Device(addr) -with open("synapse/tests/test_input_config.json") as config_json: - config_proto = Parse(config_json.read(), DeviceConfiguration()) - print("Configuring device with the following configuration:") - dev_config = Config.from_proto(config_proto) - dev.configure(dev_config) - -input_node: StreamIn = dev_config.get_node(1) - -rows = 32 -cols = 64 - -print("Sending START...") -dev.start() - - -# read .avi file frame by frame and send to mux01 -# filename is doom_clip_64x32.avi -video_path = "synapse/tests/doom_clip_64x32.avi" -cap = cv2.VideoCapture(video_path) -frame_number = 0 -try: - while True: - ret, frame = cap.read() - if not ret: - break - frame_number += 1 - - if frame_number == cap.get(cv2.CAP_PROP_FRAME_COUNT): - frame_number = 0 - cap.set(cv2.CAP_PROP_POS_FRAMES, 0) - # Ensure the frame is grayscale - if len(frame.shape) == 3 and frame.shape[2] == 3: - frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) - - it = frame.flatten().tolist() - # print(f"\nSending frame {frame_number} to MUX01:") - # print(f"len: {len(it)}. frame: {it}, type: {type(it[0])}") - - input_node.write(frame) - time.sleep(0.05) - -except KeyboardInterrupt: - cap.release() - - print("Stopping") - dev.stop() diff --git a/synapse/tests/test_input_config.json b/synapse/tests/test_input_config.json index 1215edd7..9bd1fd33 100644 --- a/synapse/tests/test_input_config.json +++ b/synapse/tests/test_input_config.json @@ -1,25 +1,11 @@ { "nodes": [ - { - "type": "kStreamIn", - "id": 1, - "stream_in": { - "shape": [2048], - "data_type": "kImage" - } - }, { "type": "kOpticalStimulation", - "id": 2, + "id": 1, "optical_stimulation": { "peripheral_id": 1 } } - ], - "connections": [ - { - "src_node_id": 1, - "dst_node_id": 2 - } ] } diff --git a/test_config.json b/test_config.json index ed32943d..30f00401 100644 --- a/test_config.json +++ b/test_config.json @@ -1,14 +1,8 @@ { "nodes": [ - { - "type": "kStreamOut", - "id": 1, - "stream_out": { - } - }, { "type": "kBroadbandSource", - "id": 2, + "id": 1, "broadband_source": { "peripheral_id": 1, "sample_rate_hz": 32000, @@ -27,11 +21,5 @@ } } } - ], - "connections": [ - { - "src_node_id": 2, - "dst_node_id": 1 - } ] } From 7745146a425e572ce27409df65fed0d4406d28e7 Mon Sep 17 00:00:00 2001 From: arminvoid <3612498+arminvoid@users.noreply.github.com> Date: Tue, 12 Aug 2025 07:52:38 +0200 Subject: [PATCH 3/3] remove reference to kStreamIn --- synapse/server/rpc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/server/rpc.py b/synapse/server/rpc.py index 14d678b4..aea0b033 100644 --- a/synapse/server/rpc.py +++ b/synapse/server/rpc.py @@ -327,7 +327,7 @@ def _reconfigure(self, configuration): "Creating %s node(%d)" % (NodeType.Name(node.type), node.id) ) node = self.node_object_map[node.type](node.id) - if node.type in [NodeType.kStreamIn, NodeType.kBroadbandSource]: + if node.type in [NodeType.kBroadbandSource]: node.configure_iface_ip(self.iface_ip) status = node.configure(config)