Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/publish-to-pypi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
- uses: actions/checkout@v4

- name: Install uv
uses: astral-sh/setup-uv@v2
uses: astral-sh/setup-uv@v5

- name: Build Package
run: uv build
Expand Down
23 changes: 23 additions & 0 deletions src/pycbsdk/cbhw/device/nsp.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ def _register_basic_callbacks(self):
CBPacketType.SYSPROTOCOLMONITOR, self._handle_procmon
)
self.register_config_callback(CBPacketType.LOGREP, self._handle_log)
self.register_config_callback(CBPacketType.COMMENTREP, self._handle_comment)
# Register the _black_hole (do nothing) callback for packets we are aware of but do not handle yet
self.register_config_callback(CBPacketType.SYSHEARTBEAT, self._black_hole)
self.register_config_callback(CBPacketType.SS_MODELREP, self._black_hole)
Expand Down Expand Up @@ -501,6 +502,15 @@ def _handle_log(self, pkt):
log_lvl = log_lvls.get(pkt.mode, logging.INFO)
logger.log(log_lvl, f"Log from {pkt.name}:\t{pkt.desc}")

def _handle_comment(self, pkt):
# Note: Though we might receive a comment in response to one we just sent,
# we do not use events for comments because timing is not critical and
# there is no need to wait for a response when firing off a comment.
if hasattr(pkt, "timeStarted"):
logger.debug(f"At {pkt.timeStarted}, received comment:\t{pkt.comment}")
else:
logger.debug(f"Received comment:\t{pkt.comment}")

def _black_hole(self, pkt):
_old = len(g_debug_unhandled_packets)
g_debug_unhandled_packets.add(pkt.header.type)
Expand Down Expand Up @@ -1134,6 +1144,19 @@ def get_transport(self, force_refresh=False) -> int:
def get_monitor_state(self) -> dict:
return self._monitor_state.copy()

def send_comment(self, comment: str, timestamp: Optional[int] = None):
pkt = self.packet_factory.make_packet(
None, chid=CBSpecialChan.CONFIGURATION, pkt_type=CBPacketType.COMMENTSET
)
# Property setter should handle converting Python string to C string.
pkt.comment = comment
if hasattr(pkt, "timeStarted"):
pkt.timeStarted = timestamp or self.last_time
# pkt.comment = bytes(create_string_buffer(comment.encode("utf-8"), 256))
logger.debug(f"Sending comment (timeStarted: {pkt.timeStarted}): {pkt.comment}")
self._send_packet(pkt)
return 0

def reset(self) -> int:
print("TODO: reset NSP proctime to 0")
return 0
Expand Down
13 changes: 7 additions & 6 deletions src/pycbsdk/cbhw/packet/v40.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,8 @@ class CBPacketComment(CBPacketVarLen):
("timeStarted", c_uint64),
("rgba", c_uint32), # depends on flags (see flags above)
]
_array = (
c_char * 0
)() # Supposed to be variable length, but seems like it is always padded out to 128.
_array = (c_char * 0)()
# Supposed to be variable length, but seems like it is always padded out to 128.

@property
def default_type(self):
Expand All @@ -107,9 +106,10 @@ def max_elements(self) -> int:

@property
def comment(self) -> str:
# codec = {0: 'ANSI', 1: 'UTF16', 255: 'ANSI'}[self.charset]
# codec = {0: 'ANSI', 1: 'UTF16', 255: 'ANSI'}[self.info.charset]
# ''.join([_.decode(codec) for _ in res[4:]]).rstrip('\x00')
return self._array.rstrip("\x00") # TODO: Decode?
# return self._array.rstrip("\x00") # TODO: Decode?
return self._array[: self.max_elements].decode("utf-8")

@comment.setter
def comment(self, incomment: str):
Expand All @@ -123,5 +123,6 @@ def comment(self, incomment: str):
) # TODO: encode?
else:
self._array = (self._array._type_ * len(incomment))()
memmove(self._array, incomment, len(incomment))
# memmove(self._array, incomment, len(incomment))
self._array[: len(incomment)] = incomment.encode("utf-8")
self._update_dlen()
16 changes: 16 additions & 0 deletions src/pycbsdk/cbsdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ def get_monitor_state(device: NSPDevice) -> dict:
return device.get_monitor_state()


def set_comment(
device: NSPDevice, comment: str, timestamp: Optional[int] = None
) -> int:
return device.send_comment(comment, timestamp)


def register_event_callback(
device: NSPDevice, channel_type: CBChannelType, func: Callable[[Structure], None]
):
Expand Down Expand Up @@ -220,3 +226,13 @@ def unregister_config_callback(
device: NSPDevice, packet_type: CBPacketType, func: Callable[[Structure], None]
) -> int:
return device.unregister_config_callback(packet_type, func)


def register_comment_callback(device: NSPDevice, func: Callable[[Structure], None]):
register_config_callback(device, CBPacketType.COMMENTREP, func)


def unregister_comment_callback(
device: NSPDevice, func: Callable[[Structure], None]
) -> int:
return unregister_config_callback(device, CBPacketType.COMMENTREP, func)
103 changes: 103 additions & 0 deletions src/pycbsdk/examples/comments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import sys
import logging
from pycbsdk import cbsdk


logger = logging.getLogger(__name__)


def handle_callback(comment_pkt):
print(
f"\nReceived comment {comment_pkt.comment} with timestamp {comment_pkt.timeStarted}\n"
)


def main(
inst_addr: str = "",
inst_port: int = 51002,
client_addr: str = "",
client_port: int = 51002,
recv_bufsize: int = (8 if sys.platform == "win32" else 6) * 1024 * 1024,
protocol: str = "4.1",
loglevel: str = "debug",
skip_startup: bool = False,
):
"""
Run the application:
- Set up the connection to the nsp.
- Normalize the device config (disable all continuous, activate spiking with man. thresh on all channels).
- Create a dummy application.
- Use the app to register a callback that handles the spikes and updates internal state.
- The app will render its internal state (summary spike rate statistics).
:param inst_addr: ipv4 address of device. pycbsdk will send control packets to this address.
Use 127.0.0.1 for use with nPlayServer (non-bcast).
Subnet OK, e.g. 192.168.137.255 well send control packets to all devices on subnet.
The default is 0.0.0.0 (IPADDR_ANY) on Mac and Linux. On Windows, known IPs will be searched.
:param inst_port: Network port to send control packets.
Use 51002 for Gemini and 51001 for Legacy NSP.
:param client_addr: ipv4 address of this machine's network adapter we will receive packets on.
Defaults to INADDR_ANY. If address is provided, assumes Cerebus Subnet.
:param client_port:
Network port to receive packets. This should always be 51002.
:param recv_bufsize: UDP socket recv buffer size.
:param protocol: Protocol Version. 3.11, 4.0, or 4.1 supported.
:param loglevel: debug, info, or warning
:param skip_startup: Skip the initial handshake as well as the attempt to set the device to RUNNING.
:return:
"""
# Handle logger arguments
loglevel = {
"debug": logging.DEBUG,
"info": logging.INFO,
"warning": logging.WARNING,
}[loglevel.lower()]
logger.setLevel(loglevel)

# Create connection to the device.
params_obj = cbsdk.create_params(
inst_addr=inst_addr,
inst_port=inst_port,
client_addr=client_addr,
client_port=client_port,
recv_bufsize=recv_bufsize,
protocol=protocol,
)
nsp_obj = cbsdk.get_device(params_obj)
if cbsdk.connect(nsp_obj, startup_sequence=not skip_startup) != 50:
logger.error(
f"Could not connect to device. Check params and try again: \n{params_obj}."
)
sys.exit(-1)

config = cbsdk.get_config(nsp_obj)
if not config:
sys.exit(-1)

cbsdk.register_comment_callback(nsp_obj, handle_callback)

try:
while True:
input("Press any key to begin entering comment...")
comment = input("Enter comment: ")
ts = nsp_obj.last_time
print(f"Sending comment {comment} with timestamp {ts}")
cbsdk.set_comment(nsp_obj, comment, ts)
except KeyboardInterrupt:
pass
finally:
_ = cbsdk.disconnect(nsp_obj)


if __name__ == "__main__":
b_try_with_defaults = False
try:
import typer

typer.run(main)
except ModuleNotFoundError:
print(
"`pip install typer` to pass command-line arguments. Trying with defaults."
)
b_try_with_defaults = True
if b_try_with_defaults:
main()
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.