From 9e1bc59142908939d60fb11000f57bec4aa13d5c Mon Sep 17 00:00:00 2001 From: Tobias Ahrens Date: Mon, 9 Dec 2024 15:47:39 +0100 Subject: [PATCH 1/2] Register queue on streaming handle before sending the subscribe request This commit creates the subscription queue befor sending out the subscribe request. This ensures that if the data server sends an update event before the subscribe request returns, the data is still processed. --- CHANGELOG.md | 3 +++ src/labone/core/session.py | 19 ++++++++++++------- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b309afa..184a3a5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # Labone Python API Changelog +## Version 3.2.1 +* Fix bug that caused subscriptions to potentially miss value updates after the subscription was registered but before the subscribe functions returned. + ## Version 3.2.0 * `subscribe` accepts keyword arguments, which are forwarded to the data-server. This allows to configure the subscription to the data-server. diff --git a/src/labone/core/session.py b/src/labone/core/session.py index 0425a7f..df8f5e9 100644 --- a/src/labone/core/session.py +++ b/src/labone/core/session.py @@ -853,23 +853,28 @@ async def subscribe( }, } if get_initial_value: - _, initial_value = await asyncio.gather( - self._session.subscribe(subscription=subscription), - self.get(path), - ) new_queue_type = queue_type or DataQueue queue = new_queue_type( path=path, handle=streaming_handle, ) - queue.put_nowait(initial_value) + _, initial_value = await asyncio.gather( + self._session.subscribe(subscription=subscription), + self.get(path), + ) + # If the queue already has received a update event we do not + # need to put the initial value in the queue. As it may break the + # order. + if queue.empty(): + queue.put_nowait(initial_value) return queue - await self._session.subscribe(subscription=subscription) new_queue_type = queue_type or DataQueue - return new_queue_type( + queue = new_queue_type( path=path, handle=streaming_handle, ) + await self._session.subscribe(subscription=subscription) + return queue async def wait_for_state_change( self, From 94cc4f7e8a36ada9258ce370f8a17a2d4296a53e Mon Sep 17 00:00:00 2001 From: Tobias Ahrens Date: Mon, 9 Dec 2024 16:06:17 +0100 Subject: [PATCH 2/2] Update ruff to latest version --- pyproject.toml | 4 ---- src/labone/__init__.py | 4 ++-- src/labone/core/__init__.py | 14 +++++++------- src/labone/server/__init__.py | 2 +- 4 files changed, 10 insertions(+), 14 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 865f969..18ce6a4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -103,12 +103,8 @@ target-version = "py39" [tool.ruff.lint] select = ["ALL"] ignore = [ - # Type annotation for self - "ANN101", # Missing docstring in `__init__` "D107", - # Missing type annotation for `cls` in classmethod - "ANN102", # Missing docstring in magic method "D105", # Missing type annotation for `*args` diff --git a/src/labone/__init__.py b/src/labone/__init__.py index 00f3bce..0b8d74b 100644 --- a/src/labone/__init__.py +++ b/src/labone/__init__.py @@ -6,8 +6,8 @@ from labone.instrument import Instrument __all__ = [ - "__version__", - "Instrument", "DataServer", + "Instrument", "ListNodesFlags", + "__version__", ] diff --git a/src/labone/core/__init__.py b/src/labone/core/__init__.py index a6fe673..a0ff4e5 100644 --- a/src/labone/core/__init__.py +++ b/src/labone/core/__init__.py @@ -26,16 +26,16 @@ __all__ = [ "AnnotatedValue", - "ListNodesFlags", - "ListNodesInfoFlags", - "DataQueue", "CircularDataQueue", + "DataQueue", "DistinctConsecutiveDataQueue", - "Session", + "KernelInfo", "KernelSession", + "ListNodesFlags", + "ListNodesInfoFlags", "ServerInfo", - "KernelInfo", - "ZIContext", - "Value", + "Session", "ShfGeneratorWaveformVectorData", + "Value", + "ZIContext", ] diff --git a/src/labone/server/__init__.py b/src/labone/server/__init__.py index d69c592..b281d1f 100644 --- a/src/labone/server/__init__.py +++ b/src/labone/server/__init__.py @@ -8,7 +8,7 @@ from labone.mock.session import LabOneServerBase, MockSession, Subscription -__all__ = ["CapnpServer", "LabOneServerBase", "Subscription", "MockSession"] +__all__ = ["CapnpServer", "LabOneServerBase", "MockSession", "Subscription"] import warnings