Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
name: Release

# Publishes datanet-sdk to PyPI on every v* tag.
#
# Uses PyPI Trusted Publishing (OIDC) — no token needed.
# Before the first release, register a Pending Publisher at:
# pypi.org → Publishing → Add a pending publisher
# PyPI project name : datanet-sdk
# GitHub owner : datanet-art
# Repository : datanet-python
# Workflow : release.yml
# Environment : pypi
#
# After the first publish, manage it under the project's
# Publishing tab on PyPI instead.

on:
push:
tags:
- "v*"

permissions:
contents: read

jobs:
build:
name: Build distribution
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- uses: actions/setup-python@v5
with:
python-version: "3.11"

- name: Install build frontend
run: python -m pip install --upgrade pip build

- name: Build wheel and sdist
run: python -m build

- uses: actions/upload-artifact@v4
with:
name: dist
path: dist/

publish:
name: Publish to PyPI
needs: build
runs-on: ubuntu-latest
environment: pypi
permissions:
id-token: write # OIDC — Trusted Publishing
steps:
- uses: actions/download-artifact@v4
with:
name: dist
path: dist/

- uses: pypa/gh-action-pypi-publish@release/v1
18 changes: 18 additions & 0 deletions PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,21 @@ The Python SDK handles this by first attempting to decode binary WebSocket
frames as UTF-8 JSON envelopes. If decoding or JSON parsing fails, it dispatches
the bytes to registered binary subscribers and marks the metadata as
`{"raw": True}`.

## Error Codes

Gateway errors arrive as `{"type": "error", "error": "<code>", ...}` envelopes:

| `error` | When | Extra fields | Retryable? |
|---|---|---|---|
| `rate_limited` | Publish exceeded a per-connection, per-topic, or per-project msgs/sec or bytes/sec budget | `retry_ms`, `scope` (`"connection"` when the per-connection throttle fired) | Yes — back off for `retry_ms` |
| `device_limit_reached` | Connecting would exceed the plan's active-device cap; sent before the handshake, then the socket is closed | `limit` (the plan's device cap) | No — disconnect another device or upgrade |
| `topic_limit_reached` | The channel exists but is over the plan's channel cap (e.g. after a tier downgrade) | `limit` (the plan's channel cap) | No — remove channels or upgrade |
| `channel_not_provisioned` | The channel has not been created for this project | `channel`, `operation` | No — create the channel first |
| `channel_not_allowed` | The JWT's channel prefixes don't cover this channel | `channel`, `operation` | No |
| `insufficient_scope` | The API key lacks the `pub` or `sub` scope | `required` | No |

The Python SDK surfaces these as `DataNetError` with `code`, `channel`,
`retry_ms`, `scope`, and `limit` attributes. `device_limit_reached` is fatal:
the client stops its reconnect loop and `connect()` / `connect_sync()` raise
the structured error instead of timing out.
98 changes: 86 additions & 12 deletions datanet/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,36 @@ def __init__(
retry_ms: int | None = None,
scope: str | None = None,
status: int | None = None,
limit: int | None = None,
) -> None:
super().__init__(message)
self.code = code
self.channel = channel
self.retry_ms = retry_ms
self.scope = scope
self.status = status
#: Plan limit that was hit (device_limit_reached, topic_limit_reached)
self.limit = limit


#: Gateway errors after which the server closes the socket and a reconnect
#: with the same credentials can only fail again — do not retry these.
_FATAL_GATEWAY_ERRORS = frozenset({"device_limit_reached"})


def _gateway_error(envelope: dict[str, Any]) -> DataNetError:
"""Build a structured DataNetError from a gateway error envelope."""
code = str(envelope.get("code") or envelope.get("error") or "gateway_error")
retry_ms = envelope.get("retry_ms")
limit = envelope.get("limit")
return DataNetError(
f"DataNet: {envelope.get('error')}",
code=code,
channel=envelope.get("channel") or envelope.get("ch"),
retry_ms=retry_ms if isinstance(retry_ms, int) else None,
scope=envelope.get("scope"),
limit=limit if isinstance(limit, int) else None,
)


def _to_bytes(data: BinaryData) -> bytes:
Expand Down Expand Up @@ -299,6 +322,9 @@ def __init__(
# Reconnect bookkeeping
self._reconnect_attempt = 0
self._should_run = False
# Set when the gateway rejects us with a non-retryable error
# (e.g. device_limit_reached); stops the reconnect loop.
self._fatal_error: DataNetError | None = None

# For sync usage
self._thread: threading.Thread | None = None
Expand Down Expand Up @@ -336,6 +362,7 @@ async def connect(self) -> None:
"""
self._loop = asyncio.get_running_loop()
self._should_run = True
self._fatal_error = None
self._run_task = asyncio.create_task(self._run_loop(), name="datanet-run")

# Wait until actually connected (or the task fails)
Expand Down Expand Up @@ -626,6 +653,7 @@ def connect_sync(self, timeout: float = 10.0) -> None:
If the background thread fails to start.
"""
self._connected_event.clear()
self._fatal_error = None

def _thread_target() -> None:
loop = asyncio.new_event_loop()
Expand Down Expand Up @@ -653,10 +681,15 @@ async def _run() -> None:
)
self._thread.start()

if not self._connected_event.wait(timeout=timeout):
raise TimeoutError(
f"DataNet: could not connect within {timeout}s."
)
deadline = time.monotonic() + timeout
while not self._connected_event.is_set():
if self._fatal_error is not None:
raise self._fatal_error
if time.monotonic() > deadline:
raise TimeoutError(
f"DataNet: could not connect within {timeout}s."
)
self._connected_event.wait(timeout=0.05)

# ── Async context manager ─────────────────────────────────────────────────

Expand All @@ -681,7 +714,9 @@ async def _run_loop(self) -> None:
except asyncio.CancelledError:
break
except Exception as exc:
await self._emit("error", exc)
# Fatal gateway errors were already emitted by the handshake.
if exc is not self._fatal_error:
await self._emit("error", exc)
logger.warning("DataNet connection error: %s", exc)

if not self._should_run:
Expand Down Expand Up @@ -722,6 +757,10 @@ async def _connect_once(self) -> None:
ping_interval=None, # We manage our own heartbeats
) as ws:
self._ws = ws
# The gateway accepts the socket and then either sends
# {"type": "connected"} or an error (e.g. device_limit_reached)
# before closing. Don't report success until we know which.
await self._await_handshake(ws)
logger.info("DataNet: connected.")
# Signal sync callers that we're up
self._connected_event.set()
Expand All @@ -748,6 +787,38 @@ async def _connect_once(self) -> None:
self._active_subs.clear()
await self._emit("disconnect")

async def _await_handshake(self, ws: Any) -> None:
"""Wait for the gateway's post-upgrade handshake message.

The gateway sends ``{"type": "connected"}`` once the connection is
registered, or ``{"type": "error"}`` (e.g. ``device_limit_reached``)
followed by a close. Raising here keeps :meth:`connect` honest — the
caller only sees success after the gateway accepted us.
"""
while True:
raw = await asyncio.wait_for(ws.recv(), timeout=10)
if isinstance(raw, bytes):
try:
raw = raw.decode("utf-8")
except UnicodeDecodeError:
continue
try:
envelope = json.loads(raw)
except json.JSONDecodeError:
continue
if not isinstance(envelope, dict):
continue
if envelope.get("type") == "connected":
return
if envelope.get("type") == "error" and envelope.get("error"):
error = _gateway_error(envelope)
if error.code in _FATAL_GATEWAY_ERRORS:
self._fatal_error = error
self._should_run = False
await self._emit("error", error)
raise error
# Anything else pre-handshake is unexpected; keep waiting.

# ── Internal: JWT ─────────────────────────────────────────────────────────

async def _fetch_jwt(self) -> str:
Expand Down Expand Up @@ -823,13 +894,12 @@ async def _handle_message(self, raw: str) -> None:

op: str = envelope.get("op", "")
if envelope.get("type") == "error" and envelope.get("error"):
error = DataNetError(
f"DataNet: {envelope.get('error')}",
code=str(envelope.get("code") or envelope.get("error")),
channel=envelope.get("channel") or envelope.get("ch"),
retry_ms=envelope.get("retry_ms"),
scope=envelope.get("scope"),
)
error = _gateway_error(envelope)
if error.code in _FATAL_GATEWAY_ERRORS:
# The gateway will close the socket; reconnecting with the
# same credentials can only fail again.
self._fatal_error = error
self._should_run = False
await self._emit("error", error)
return

Expand Down Expand Up @@ -1087,10 +1157,14 @@ async def _wait_for_connection(self, timeout: float = 10.0) -> None:
"""Await until the WebSocket is open or the run task fails."""
deadline = time.monotonic() + timeout
while not self.connected:
if self._fatal_error is not None:
raise self._fatal_error
if self._run_task and self._run_task.done():
exc = self._run_task.exception()
if exc:
raise exc
if self._fatal_error is not None:
raise self._fatal_error
return
if time.monotonic() > deadline:
raise TimeoutError(f"DataNet: did not connect within {timeout}s.")
Expand Down
Loading
Loading