diff --git a/CHANGELOG.md b/CHANGELOG.md index 9303816..af16eb1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,11 @@ # Labone Python API Changelog -## Version 3.1.1 +## Version 3.1.2 +* Fix bug which caused streaming errors to cancel the subscriptions +* Raise severity of errors during subscriptions to `FAILED` to cause a data server + log entry. +## Version 3.1.1 * Add support for Python 3.13 ## Version 3.1.0 diff --git a/src/labone/core/subscription.py b/src/labone/core/subscription.py index b7a5237..990a966 100644 --- a/src/labone/core/subscription.py +++ b/src/labone/core/subscription.py @@ -496,6 +496,7 @@ def _distribute_to_data_queues( # error should not be raised here since this would disconnect the # subscription. logger.exception(err.args[0]) + return except ValueError as err: # pragma: no cover self._data_queues = [ data_queue().disconnect() # type: ignore[union-attr] # supposed to throw @@ -539,4 +540,4 @@ async def capnp_callback( list(map(self._distribute_to_data_queues, call_input.values)) fulfiller.fulfill() except Exception as err: # noqa: BLE001 - fulfiller.reject(zhinst.comms.Fulfiller.DISCONNECTED, err.args[0]) + fulfiller.reject(zhinst.comms.Fulfiller.FAILED, err.args[0]) diff --git a/tests/core/test_subscription.py b/tests/core/test_subscription.py index 7d54810..81442e6 100644 --- a/tests/core/test_subscription.py +++ b/tests/core/test_subscription.py @@ -1,10 +1,12 @@ """Tests for the `labone.core.subscription` module.""" import asyncio +import logging +from unittest.mock import MagicMock import pytest -from labone.core import errors +from labone.core import errors, hpk_schema from labone.core.subscription import ( CircularDataQueue, DataQueue, @@ -257,6 +259,86 @@ def test_streaming_handle_with_parser_callback(): ) +@pytest.mark.asyncio +async def test_capnp_callback(caplog): + streaming_handle = StreamingHandle() + queue = DataQueue( + path="dummy", + handle=streaming_handle, + ) + call_param = hpk_schema.StreamingHandleSendValuesParams() + values = call_param.init_values(2) + + values[0].init_metadata(timestamp=0, path="dummy") + values[0].init_value(int64=42) + + values[1].init_metadata(timestamp=1, path="dummy") + values[1].init_value(double=22.0) + + fulfiller = MagicMock() + with caplog.at_level(logging.ERROR): + await streaming_handle.capnp_callback(0, 0, call_param, fulfiller) + assert "" in caplog.text + assert queue.qsize() == 2 + assert queue.get_nowait() == AnnotatedValue(value=42, path="dummy", timestamp=0) + assert queue.get_nowait() == AnnotatedValue(value=22.0, path="dummy", timestamp=1) + fulfiller.fulfill.assert_called_once() + + +@pytest.mark.asyncio +async def test_streaming_error(caplog): + streaming_handle = StreamingHandle() + queue = DataQueue( + path="dummy", + handle=streaming_handle, + ) + call_param = hpk_schema.StreamingHandleSendValuesParams() + values = call_param.init_values(1) + values[0].init_metadata(timestamp=0, path="dummy") + values[0].init_value().init_streamingError( + code=1, + message="test error", + category="unknown", + ) + fulfiller = MagicMock() + with caplog.at_level(logging.ERROR): + await streaming_handle.capnp_callback(0, 0, call_param, fulfiller) + assert "test error" in caplog.text + assert queue.qsize() == 0 + fulfiller.fulfill.assert_called_once() + + +@pytest.mark.asyncio +async def test_streaming_error_with_value(caplog): + streaming_handle = StreamingHandle() + queue = DataQueue( + path="dummy", + handle=streaming_handle, + ) + call_param = hpk_schema.StreamingHandleSendValuesParams() + values = call_param.init_values(2) + + # Fist value is a streaming error + values[0].init_metadata(timestamp=0, path="dummy") + values[0].init_value().init_streamingError( + code=1, + message="test error", + category="unknown", + ) + + # Second value is a normal value + values[1].init_metadata(timestamp=0, path="dummy") + values[1].init_value(int64=42) + + fulfiller = MagicMock() + with caplog.at_level(logging.ERROR): + await streaming_handle.capnp_callback(0, 0, call_param, fulfiller) + assert "test error" in caplog.text + assert queue.qsize() == 1 + assert queue.get_nowait() == AnnotatedValue(value=42, path="dummy", timestamp=0) + fulfiller.fulfill.assert_called_once() + + @pytest.mark.asyncio async def test_distinct_data_queue_put_no_wait_new_value(): subscription = FakeSubscription()