Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
# Labone Python API Changelog

## Version 3.1.1
## Version 3.1.2
* Fix bug which caused streaming errors to cancel the subscriptions
* Raise severity of errors during subscriptions to `FAILED` to cause a data server
log entry.

## Version 3.1.1
* Add support for Python 3.13

## Version 3.1.0
Expand Down
3 changes: 2 additions & 1 deletion src/labone/core/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,7 @@ def _distribute_to_data_queues(
# error should not be raised here since this would disconnect the
# subscription.
logger.exception(err.args[0])
return
except ValueError as err: # pragma: no cover
self._data_queues = [
data_queue().disconnect() # type: ignore[union-attr] # supposed to throw
Expand Down Expand Up @@ -539,4 +540,4 @@ async def capnp_callback(
list(map(self._distribute_to_data_queues, call_input.values))
fulfiller.fulfill()
except Exception as err: # noqa: BLE001
fulfiller.reject(zhinst.comms.Fulfiller.DISCONNECTED, err.args[0])
fulfiller.reject(zhinst.comms.Fulfiller.FAILED, err.args[0])
84 changes: 83 additions & 1 deletion tests/core/test_subscription.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""Tests for the `labone.core.subscription` module."""

import asyncio
import logging
from unittest.mock import MagicMock

import pytest

from labone.core import errors
from labone.core import errors, hpk_schema
from labone.core.subscription import (
CircularDataQueue,
DataQueue,
Expand Down Expand Up @@ -257,6 +259,86 @@ def test_streaming_handle_with_parser_callback():
)


@pytest.mark.asyncio
async def test_capnp_callback(caplog):
streaming_handle = StreamingHandle()
queue = DataQueue(
path="dummy",
handle=streaming_handle,
)
call_param = hpk_schema.StreamingHandleSendValuesParams()
values = call_param.init_values(2)

values[0].init_metadata(timestamp=0, path="dummy")
values[0].init_value(int64=42)

values[1].init_metadata(timestamp=1, path="dummy")
values[1].init_value(double=22.0)

fulfiller = MagicMock()
with caplog.at_level(logging.ERROR):
await streaming_handle.capnp_callback(0, 0, call_param, fulfiller)
assert "" in caplog.text
assert queue.qsize() == 2
assert queue.get_nowait() == AnnotatedValue(value=42, path="dummy", timestamp=0)
assert queue.get_nowait() == AnnotatedValue(value=22.0, path="dummy", timestamp=1)
fulfiller.fulfill.assert_called_once()


@pytest.mark.asyncio
async def test_streaming_error(caplog):
streaming_handle = StreamingHandle()
queue = DataQueue(
path="dummy",
handle=streaming_handle,
)
call_param = hpk_schema.StreamingHandleSendValuesParams()
values = call_param.init_values(1)
values[0].init_metadata(timestamp=0, path="dummy")
values[0].init_value().init_streamingError(
code=1,
message="test error",
category="unknown",
)
fulfiller = MagicMock()
with caplog.at_level(logging.ERROR):
await streaming_handle.capnp_callback(0, 0, call_param, fulfiller)
assert "test error" in caplog.text
assert queue.qsize() == 0
fulfiller.fulfill.assert_called_once()


@pytest.mark.asyncio
async def test_streaming_error_with_value(caplog):
streaming_handle = StreamingHandle()
queue = DataQueue(
path="dummy",
handle=streaming_handle,
)
call_param = hpk_schema.StreamingHandleSendValuesParams()
values = call_param.init_values(2)

# Fist value is a streaming error
values[0].init_metadata(timestamp=0, path="dummy")
values[0].init_value().init_streamingError(
code=1,
message="test error",
category="unknown",
)

# Second value is a normal value
values[1].init_metadata(timestamp=0, path="dummy")
values[1].init_value(int64=42)

fulfiller = MagicMock()
with caplog.at_level(logging.ERROR):
await streaming_handle.capnp_callback(0, 0, call_param, fulfiller)
assert "test error" in caplog.text
assert queue.qsize() == 1
assert queue.get_nowait() == AnnotatedValue(value=42, path="dummy", timestamp=0)
fulfiller.fulfill.assert_called_once()


@pytest.mark.asyncio
async def test_distinct_data_queue_put_no_wait_new_value():
subscription = FakeSubscription()
Expand Down