From 878a9373fe2c08e9a954e3a384f7553d6e987dc1 Mon Sep 17 00:00:00 2001 From: Jordan Shaw Date: Thu, 11 Jun 2026 14:58:51 -0400 Subject: [PATCH 1/2] Surface plan-limit errors and stop reconnecting after device_limit_reached The client previously reported itself connected as soon as the WebSocket opened, without waiting for the gateway handshake. A device-limited client would therefore reconnect in a backoff loop forever, and connect() would appear to succeed (or time out) with no explanation. - Wait for the gateway's {"type": "connected"} handshake (or its error) before reporting success; connect()/connect_sync() now raise a structured DataNetError for pre-handshake rejections - Treat device_limit_reached as fatal: stop the reconnect loop instead of hammering the gateway with retries that can only fail - Add DataNetError.limit (plan cap that was hit) for device_limit_reached and the gateway's new topic_limit_reached - Document all gateway error codes in PROTOCOL.md (when they fire, extra fields, retryability) Co-Authored-By: Claude Fable 5 --- PROTOCOL.md | 18 ++++++ datanet/client.py | 98 +++++++++++++++++++++++++++---- tests/test_client.py | 135 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 239 insertions(+), 12 deletions(-) diff --git a/PROTOCOL.md b/PROTOCOL.md index cd7d76f..47b0fdd 100644 --- a/PROTOCOL.md +++ b/PROTOCOL.md @@ -124,3 +124,21 @@ The Python SDK handles this by first attempting to decode binary WebSocket frames as UTF-8 JSON envelopes. If decoding or JSON parsing fails, it dispatches the bytes to registered binary subscribers and marks the metadata as `{"raw": True}`. + +## Error Codes + +Gateway errors arrive as `{"type": "error", "error": "", ...}` envelopes: + +| `error` | When | Extra fields | Retryable? | +|---|---|---|---| +| `rate_limited` | Publish exceeded a per-connection, per-topic, or per-project msgs/sec or bytes/sec budget | `retry_ms`, `scope` (`"connection"` when the per-connection throttle fired) | Yes — back off for `retry_ms` | +| `device_limit_reached` | Connecting would exceed the plan's active-device cap; sent before the handshake, then the socket is closed | `limit` (the plan's device cap) | No — disconnect another device or upgrade | +| `topic_limit_reached` | The channel exists but is over the plan's channel cap (e.g. after a tier downgrade) | `limit` (the plan's channel cap) | No — remove channels or upgrade | +| `channel_not_provisioned` | The channel has not been created for this project | `channel`, `operation` | No — create the channel first | +| `channel_not_allowed` | The JWT's channel prefixes don't cover this channel | `channel`, `operation` | No | +| `insufficient_scope` | The API key lacks the `pub` or `sub` scope | `required` | No | + +The Python SDK surfaces these as `DataNetError` with `code`, `channel`, +`retry_ms`, `scope`, and `limit` attributes. `device_limit_reached` is fatal: +the client stops its reconnect loop and `connect()` / `connect_sync()` raise +the structured error instead of timing out. diff --git a/datanet/client.py b/datanet/client.py index 6b74468..3ab0307 100644 --- a/datanet/client.py +++ b/datanet/client.py @@ -119,6 +119,7 @@ def __init__( retry_ms: int | None = None, scope: str | None = None, status: int | None = None, + limit: int | None = None, ) -> None: super().__init__(message) self.code = code @@ -126,6 +127,28 @@ def __init__( self.retry_ms = retry_ms self.scope = scope self.status = status + #: Plan limit that was hit (device_limit_reached, topic_limit_reached) + self.limit = limit + + +#: Gateway errors after which the server closes the socket and a reconnect +#: with the same credentials can only fail again — do not retry these. +_FATAL_GATEWAY_ERRORS = frozenset({"device_limit_reached"}) + + +def _gateway_error(envelope: dict[str, Any]) -> DataNetError: + """Build a structured DataNetError from a gateway error envelope.""" + code = str(envelope.get("code") or envelope.get("error") or "gateway_error") + retry_ms = envelope.get("retry_ms") + limit = envelope.get("limit") + return DataNetError( + f"DataNet: {envelope.get('error')}", + code=code, + channel=envelope.get("channel") or envelope.get("ch"), + retry_ms=retry_ms if isinstance(retry_ms, int) else None, + scope=envelope.get("scope"), + limit=limit if isinstance(limit, int) else None, + ) def _to_bytes(data: BinaryData) -> bytes: @@ -299,6 +322,9 @@ def __init__( # Reconnect bookkeeping self._reconnect_attempt = 0 self._should_run = False + # Set when the gateway rejects us with a non-retryable error + # (e.g. device_limit_reached); stops the reconnect loop. + self._fatal_error: DataNetError | None = None # For sync usage self._thread: threading.Thread | None = None @@ -336,6 +362,7 @@ async def connect(self) -> None: """ self._loop = asyncio.get_running_loop() self._should_run = True + self._fatal_error = None self._run_task = asyncio.create_task(self._run_loop(), name="datanet-run") # Wait until actually connected (or the task fails) @@ -626,6 +653,7 @@ def connect_sync(self, timeout: float = 10.0) -> None: If the background thread fails to start. """ self._connected_event.clear() + self._fatal_error = None def _thread_target() -> None: loop = asyncio.new_event_loop() @@ -653,10 +681,15 @@ async def _run() -> None: ) self._thread.start() - if not self._connected_event.wait(timeout=timeout): - raise TimeoutError( - f"DataNet: could not connect within {timeout}s." - ) + deadline = time.monotonic() + timeout + while not self._connected_event.is_set(): + if self._fatal_error is not None: + raise self._fatal_error + if time.monotonic() > deadline: + raise TimeoutError( + f"DataNet: could not connect within {timeout}s." + ) + self._connected_event.wait(timeout=0.05) # ── Async context manager ───────────────────────────────────────────────── @@ -681,7 +714,9 @@ async def _run_loop(self) -> None: except asyncio.CancelledError: break except Exception as exc: - await self._emit("error", exc) + # Fatal gateway errors were already emitted by the handshake. + if exc is not self._fatal_error: + await self._emit("error", exc) logger.warning("DataNet connection error: %s", exc) if not self._should_run: @@ -722,6 +757,10 @@ async def _connect_once(self) -> None: ping_interval=None, # We manage our own heartbeats ) as ws: self._ws = ws + # The gateway accepts the socket and then either sends + # {"type": "connected"} or an error (e.g. device_limit_reached) + # before closing. Don't report success until we know which. + await self._await_handshake(ws) logger.info("DataNet: connected.") # Signal sync callers that we're up self._connected_event.set() @@ -748,6 +787,38 @@ async def _connect_once(self) -> None: self._active_subs.clear() await self._emit("disconnect") + async def _await_handshake(self, ws: Any) -> None: + """Wait for the gateway's post-upgrade handshake message. + + The gateway sends ``{"type": "connected"}`` once the connection is + registered, or ``{"type": "error"}`` (e.g. ``device_limit_reached``) + followed by a close. Raising here keeps :meth:`connect` honest — the + caller only sees success after the gateway accepted us. + """ + while True: + raw = await asyncio.wait_for(ws.recv(), timeout=10) + if isinstance(raw, bytes): + try: + raw = raw.decode("utf-8") + except UnicodeDecodeError: + continue + try: + envelope = json.loads(raw) + except json.JSONDecodeError: + continue + if not isinstance(envelope, dict): + continue + if envelope.get("type") == "connected": + return + if envelope.get("type") == "error" and envelope.get("error"): + error = _gateway_error(envelope) + if error.code in _FATAL_GATEWAY_ERRORS: + self._fatal_error = error + self._should_run = False + await self._emit("error", error) + raise error + # Anything else pre-handshake is unexpected; keep waiting. + # ── Internal: JWT ───────────────────────────────────────────────────────── async def _fetch_jwt(self) -> str: @@ -823,13 +894,12 @@ async def _handle_message(self, raw: str) -> None: op: str = envelope.get("op", "") if envelope.get("type") == "error" and envelope.get("error"): - error = DataNetError( - f"DataNet: {envelope.get('error')}", - code=str(envelope.get("code") or envelope.get("error")), - channel=envelope.get("channel") or envelope.get("ch"), - retry_ms=envelope.get("retry_ms"), - scope=envelope.get("scope"), - ) + error = _gateway_error(envelope) + if error.code in _FATAL_GATEWAY_ERRORS: + # The gateway will close the socket; reconnecting with the + # same credentials can only fail again. + self._fatal_error = error + self._should_run = False await self._emit("error", error) return @@ -1087,10 +1157,14 @@ async def _wait_for_connection(self, timeout: float = 10.0) -> None: """Await until the WebSocket is open or the run task fails.""" deadline = time.monotonic() + timeout while not self.connected: + if self._fatal_error is not None: + raise self._fatal_error if self._run_task and self._run_task.done(): exc = self._run_task.exception() if exc: raise exc + if self._fatal_error is not None: + raise self._fatal_error return if time.monotonic() > deadline: raise TimeoutError(f"DataNet: did not connect within {timeout}s.") diff --git a/tests/test_client.py b/tests/test_client.py index 49a1e1b..2e61d73 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -369,3 +369,138 @@ async def handler(*_): client._send_sub.assert_any_await("demo.binary") client._send_sub.assert_any_await("demo.any") self.assertEqual(client._send_sub.await_count, 3) + + +class FakeHandshakeWebSocket: + """Minimal ws exposing recv() for handshake tests.""" + + def __init__(self, frames): + self.frames = list(frames) + + async def recv(self): + if self.frames: + return self.frames.pop(0) + raise AssertionError("handshake consumed all frames without resolving") + + +class GatewayLimitErrorTests(unittest.IsolatedAsyncioTestCase): + async def test_handshake_succeeds_on_connected_message(self): + client = DataNet("ak_test") + ws = FakeHandshakeWebSocket([json.dumps({"type": "connected", "userId": "dev-1"})]) + await client._await_handshake(ws) # must not raise + + async def test_handshake_raises_structured_device_limit_error(self): + client = DataNet("ak_test") + client._should_run = True + errors = [] + + async def on_error(exc): + errors.append(exc) + + client.on("error", on_error) + ws = FakeHandshakeWebSocket( + [json.dumps({"type": "error", "error": "device_limit_reached", "limit": 25})] + ) + + with self.assertRaises(client_module.DataNetError) as ctx: + await client._await_handshake(ws) + + self.assertEqual(ctx.exception.code, "device_limit_reached") + self.assertEqual(ctx.exception.limit, 25) + # Fatal: the reconnect loop must stop instead of hammering the gateway. + self.assertFalse(client._should_run) + self.assertIs(client._fatal_error, ctx.exception) + self.assertEqual(len(errors), 1) + + async def test_handshake_skips_binary_noise_before_connected(self): + client = DataNet("ak_test") + ws = FakeHandshakeWebSocket( + [b"\xff\xfe", "not json", json.dumps({"type": "connected"})] + ) + await client._await_handshake(ws) # must not raise + + async def test_rate_limited_error_carries_retry_ms_and_scope(self): + client = DataNet("ak_test") + errors = [] + + async def on_error(exc): + errors.append(exc) + + client.on("error", on_error) + await client._handle_message( + json.dumps( + { + "type": "error", + "error": "rate_limited", + "retry_ms": 250, + "scope": "connection", + "channel": "project.p1.sensor", + } + ) + ) + + self.assertEqual(len(errors), 1) + error = errors[0] + self.assertEqual(error.code, "rate_limited") + self.assertEqual(error.retry_ms, 250) + self.assertEqual(error.scope, "connection") + self.assertEqual(error.channel, "project.p1.sensor") + # Rate limiting is transient — must not stop the client. + self.assertIsNone(client._fatal_error) + + async def test_topic_limit_reached_error_carries_plan_limit(self): + client = DataNet("ak_test") + errors = [] + + async def on_error(exc): + errors.append(exc) + + client.on("error", on_error) + await client._handle_message( + json.dumps( + { + "type": "error", + "error": "topic_limit_reached", + "limit": 240, + "channel": "project.p1.one-too-many", + "operation": "sub", + } + ) + ) + + self.assertEqual(len(errors), 1) + self.assertEqual(errors[0].code, "topic_limit_reached") + self.assertEqual(errors[0].limit, 240) + self.assertEqual(errors[0].channel, "project.p1.one-too-many") + self.assertIsNone(client._fatal_error) + + async def test_device_limit_error_mid_session_stops_reconnect_loop(self): + client = DataNet("ak_test") + client._should_run = True + errors = [] + + async def on_error(exc): + errors.append(exc) + + client.on("error", on_error) + await client._handle_message( + json.dumps({"type": "error", "error": "device_limit_reached", "limit": 5}) + ) + + self.assertEqual(len(errors), 1) + self.assertEqual(errors[0].limit, 5) + self.assertFalse(client._should_run) + self.assertIsNotNone(client._fatal_error) + + async def test_wait_for_connection_raises_fatal_error_immediately(self): + client = DataNet("ak_test") + client._fatal_error = client_module.DataNetError( + "DataNet: device_limit_reached", + code="device_limit_reached", + limit=25, + ) + + with self.assertRaises(client_module.DataNetError) as ctx: + await client._wait_for_connection(timeout=5) + + self.assertEqual(ctx.exception.code, "device_limit_reached") From b14d5b0ff0621dd93a0da78b0c850d44c938b167 Mon Sep 17 00:00:00 2001 From: Jordan Shaw Date: Thu, 11 Jun 2026 14:58:51 -0400 Subject: [PATCH 2/2] Add PyPI release workflow (Trusted Publishing on v* tags) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Builds wheel + sdist and publishes to PyPI via OIDC — no token to store or rotate. Requires a one-time pending-publisher registration on pypi.org and a "pypi" environment in repo settings (steps documented in the workflow). Co-Authored-By: Claude Fable 5 --- .github/workflows/release.yml | 60 +++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 .github/workflows/release.yml diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..1c17836 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,60 @@ +name: Release + +# Publishes datanet-sdk to PyPI on every v* tag. +# +# Uses PyPI Trusted Publishing (OIDC) — no token needed. +# Before the first release, register a Pending Publisher at: +# pypi.org → Publishing → Add a pending publisher +# PyPI project name : datanet-sdk +# GitHub owner : datanet-art +# Repository : datanet-python +# Workflow : release.yml +# Environment : pypi +# +# After the first publish, manage it under the project's +# Publishing tab on PyPI instead. + +on: + push: + tags: + - "v*" + +permissions: + contents: read + +jobs: + build: + name: Build distribution + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-python@v5 + with: + python-version: "3.11" + + - name: Install build frontend + run: python -m pip install --upgrade pip build + + - name: Build wheel and sdist + run: python -m build + + - uses: actions/upload-artifact@v4 + with: + name: dist + path: dist/ + + publish: + name: Publish to PyPI + needs: build + runs-on: ubuntu-latest + environment: pypi + permissions: + id-token: write # OIDC — Trusted Publishing + steps: + - uses: actions/download-artifact@v4 + with: + name: dist + path: dist/ + + - uses: pypa/gh-action-pypi-publish@release/v1