From 18e14da6c0d62418fd596ad2383df5590083220b Mon Sep 17 00:00:00 2001 From: OmarAlJarrah Date: Tue, 9 Jun 2026 15:33:52 +0300 Subject: [PATCH 1/6] chore: share pipeline-dispatch protocols between pagination and sse Lift SendSync, SendAsync, SyncPipelineLike, and AsyncPipelineLike out of pagination/paginator.py into the new pipeline/dispatch.py module. The paginator re-exports all four so the public surface (pagination.__init__) is unchanged. The forthcoming SSE reconnection feature will import the same types from pipeline/dispatch without depending on pagination. Co-Authored-By: Claude Opus 4.8 --- .../dexpace/sdk/core/pagination/paginator.py | 36 ++++------------ .../src/dexpace/sdk/core/pipeline/dispatch.py | 43 +++++++++++++++++++ 2 files changed, 51 insertions(+), 28 deletions(-) create mode 100644 packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/dispatch.py diff --git a/packages/dexpace-sdk-core/src/dexpace/sdk/core/pagination/paginator.py b/packages/dexpace-sdk-core/src/dexpace/sdk/core/pagination/paginator.py index 8d2537c..442a744 100644 --- a/packages/dexpace-sdk-core/src/dexpace/sdk/core/pagination/paginator.py +++ b/packages/dexpace-sdk-core/src/dexpace/sdk/core/pagination/paginator.py @@ -27,10 +27,16 @@ from __future__ import annotations import json -from collections.abc import AsyncIterator, Awaitable, Callable, Iterator -from typing import TYPE_CHECKING, Protocol, runtime_checkable +from collections.abc import AsyncIterator, Callable, Iterator +from typing import TYPE_CHECKING from ..http.context.dispatch_context import DispatchContext +from ..pipeline.dispatch import ( + AsyncPipelineLike, + SendAsync, + SendSync, + SyncPipelineLike, +) from .page import Page if TYPE_CHECKING: @@ -39,32 +45,6 @@ from ..http.response.response import Response from .strategy import PaginationStrategy -#: A callable that sends one request through the pipeline and returns its -#: response. The paginator builds one of these from a pipeline when given one, -#: or the caller passes their own for full dispatch control. -type SendSync = Callable[["Request"], "Response"] -type SendAsync = Callable[["Request"], Awaitable["AsyncResponse"]] - - -@runtime_checkable -class SyncPipelineLike(Protocol): - """Structural view of a sync pipeline: just the ``run`` entry point. - - ``Pipeline`` satisfies this; the paginator depends only on the structural - shape so it stays decoupled from the concrete pipeline class (and so test - doubles can stand in without subclassing). ``runtime_checkable`` so the - paginator can tell a pipeline from a bare send-callable at construction. - """ - - def run(self, request: Request, dispatch: DispatchContext) -> Response: ... - - -@runtime_checkable -class AsyncPipelineLike(Protocol): - """Structural view of an async pipeline: just its ``run`` coroutine.""" - - async def run(self, request: Request, dispatch: DispatchContext) -> AsyncResponse: ... - def _decode_body(raw: str) -> object: """Decode a JSON body string into a Python value (``None`` when empty).""" diff --git a/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/dispatch.py b/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/dispatch.py new file mode 100644 index 0000000..558878c --- /dev/null +++ b/packages/dexpace-sdk-core/src/dexpace/sdk/core/pipeline/dispatch.py @@ -0,0 +1,43 @@ +# Copyright (c) 2026 dexpace and Omar Aljarrah. +# Licensed under the MIT License. See LICENSE.md in the repository root for details. + +"""Shared pipeline-dispatch abstractions for paginators and SSE connections. + +Both the paginator and the reconnecting SSE client accept either a pipeline +(run once per request with a fresh dispatch context) or a bare send-callable. +These structural Protocols and callable aliases capture that shape in one +place so neither consumer has to depend on the other. +""" + +from __future__ import annotations + +from collections.abc import Awaitable, Callable +from typing import TYPE_CHECKING, Protocol, runtime_checkable + +if TYPE_CHECKING: + from ..http.context.dispatch_context import DispatchContext + from ..http.request.request import Request + from ..http.response.async_response import AsyncResponse + from ..http.response.response import Response + +#: A callable that sends one request through the pipeline and returns its +#: response. Built from a pipeline when one is given, or passed directly. +type SendSync = Callable[["Request"], "Response"] +type SendAsync = Callable[["Request"], Awaitable["AsyncResponse"]] + + +@runtime_checkable +class SyncPipelineLike(Protocol): + """Structural view of a sync pipeline: just the ``run`` entry point.""" + + def run(self, request: Request, dispatch: DispatchContext) -> Response: ... + + +@runtime_checkable +class AsyncPipelineLike(Protocol): + """Structural view of an async pipeline: just its ``run`` coroutine.""" + + async def run(self, request: Request, dispatch: DispatchContext) -> AsyncResponse: ... + + +__all__ = ["AsyncPipelineLike", "SendAsync", "SendSync", "SyncPipelineLike"] From f6c65c5bc8dc0897de27ff1e31481149c78539ef Mon Sep 17 00:00:00 2001 From: OmarAlJarrah Date: Tue, 9 Jun 2026 15:43:31 +0300 Subject: [PATCH 2/6] feat: add synchronous reconnecting SSE connection Co-Authored-By: Claude Opus 4.8 --- .../dexpace/sdk/core/http/sse/connection.py | 237 ++++++++++++++++++ .../tests/sse/test_connection.py | 103 ++++++++ 2 files changed, 340 insertions(+) create mode 100644 packages/dexpace-sdk-core/src/dexpace/sdk/core/http/sse/connection.py create mode 100644 packages/dexpace-sdk-core/tests/sse/test_connection.py diff --git a/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/sse/connection.py b/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/sse/connection.py new file mode 100644 index 0000000..66248c5 --- /dev/null +++ b/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/sse/connection.py @@ -0,0 +1,237 @@ +# Copyright (c) 2026 dexpace and Omar Aljarrah. +# Licensed under the MIT License. See LICENSE.md in the repository root for details. + +"""Reconnecting Server-Sent Events client built on the sans-io parser. + +``SseConnection`` / ``AsyncSseConnection`` drive a long-lived SSE stream +through the pipeline and transparently reconnect when it drops, replaying the +``Last-Event-ID`` header and honouring the server's ``retry:`` hint with +jittered backoff. Parsing is delegated to the existing sans-io parser; this +layer owns only the connection lifecycle. + +Semantics follow the browser ``EventSource``: a reconnect is attempted on both +a transient transport error and a clean end-of-stream, while a non-success HTTP +status is a permanent failure (raised, never retried). The caller ends the +stream by breaking the loop (for example on a ``data: [DONE]`` sentinel). +""" + +from __future__ import annotations + +import random +from typing import TYPE_CHECKING, cast + +from ...errors import HttpResponseError, ServiceRequestError, ServiceResponseError +from ...pipeline.dispatch import ( + SendSync, + SyncPipelineLike, +) +from ...util.clock import SYSTEM_CLOCK, Clock +from ..context.dispatch_context import DispatchContext +from .parser import parse_events + +if TYPE_CHECKING: + from collections.abc import Callable, Generator, Iterator + from types import TracebackType + from typing import Self + + from ..request.request import Request + from ..response.response import Response + from .parser import SseEvent + +_LAST_EVENT_ID: str = "Last-Event-ID" +_DEFAULT_RETRY_SECONDS: float = 3.0 +_DEFAULT_MAX_BACKOFF: float = 30.0 +_DEFAULT_JITTER: float = 0.1 + +#: Transient failures that trigger a reconnect rather than propagating. A +#: clean end-of-stream reconnects too (handled separately); anything outside +#: this set — including ``CancelledError`` and ``HttpResponseError`` — is fatal. +_TRANSIENT: tuple[type[BaseException], ...] = ( + ServiceRequestError, + ServiceResponseError, + OSError, +) + + +def _resume_request(initial: Request, last_event_id: str | None) -> Request: + """Return ``initial``, stamping ``Last-Event-ID`` when an id is known.""" + if last_event_id: + return initial.with_header(_LAST_EVENT_ID, last_event_id) + return initial + + +def _observe( + event: SseEvent, last_event_id: str | None, retry_base: float +) -> tuple[str | None, float]: + """Fold one event's ``id`` / ``retry`` into the connection state. + + An explicit empty ``id`` clears the stored id (per the SSE spec, so the + next reconnect omits the header). ``retry`` is given in milliseconds. + """ + if event.id is not None: + last_event_id = event.id or None + if event.retry is not None: + retry_base = event.retry / 1000.0 + return last_event_id, retry_base + + +def _next_backoff( + retry_base: float, failures: int, max_backoff: float, jitter: float, rand: random.Random +) -> float: + """Exponential backoff with an upward-only jitter, capped at ``max_backoff``. + + Jitter only lengthens the wait, so a fleet de-synchronises without any + client reconnecting sooner than the computed delay. + """ + bounded = min(max_backoff, retry_base * (2**failures)) + # cast required: random.Random.uniform is typed as returning Any in typeshed, + # so mypy raises [no-any-return] without it. + return cast(float, bounded * rand.uniform(1.0, 1.0 + jitter)) + + +class SseConnection: + """Synchronous reconnecting SSE stream. + + Iterate it (directly or via ``with``) to receive ``SseEvent``s across + reconnections. Each (re)connection is dispatched through ``source`` — a + ``Pipeline`` (run with a fresh dispatch context per connection) or a bare + ``Request -> Response`` callable — so retry, auth, and tracing apply per + connection. + + Args: + source: A sync pipeline or a send-callable. + initial_request: The request opening the stream; reused for every + reconnection with an updated ``Last-Event-ID``. + last_event_id: Seed id to resume a previously-interrupted stream. + default_retry: Backoff base (seconds) used until the server sends a + ``retry:`` value. + max_backoff: Ceiling on any single reconnect delay. + max_reconnects: Maximum consecutive failed reconnections before + iteration raises. ``None`` reconnects indefinitely. + jitter: Upward jitter fraction applied to the backoff. + clock: Time source for backoff sleeps (injected for tests). + rand: RNG for jitter (injected for tests). + dispatch_factory: Builds the dispatch context per connection when + ``source`` is a pipeline. Defaults to ``DispatchContext.noop``. + """ + + __slots__ = ( + "_clock", + "_dispatch_factory", + "_initial", + "_jitter", + "_last_event_id", + "_max_backoff", + "_max_reconnects", + "_rand", + "_response", + "_retry_base", + "_send", + ) + + def __init__( + self, + source: SyncPipelineLike | SendSync, + initial_request: Request, + *, + last_event_id: str | None = None, + default_retry: float = _DEFAULT_RETRY_SECONDS, + max_backoff: float = _DEFAULT_MAX_BACKOFF, + max_reconnects: int | None = None, + jitter: float = _DEFAULT_JITTER, + clock: Clock = SYSTEM_CLOCK, + rand: random.Random | None = None, + dispatch_factory: Callable[[], DispatchContext] | None = None, + ) -> None: + self._initial = initial_request + self._last_event_id = last_event_id + self._retry_base = default_retry + self._max_backoff = max_backoff + self._max_reconnects = max_reconnects + self._jitter = jitter + self._clock = clock + self._rand = rand if rand is not None else random.Random() + self._dispatch_factory = dispatch_factory or DispatchContext.noop + self._response: Response | None = None + self._send = self._normalise(source) + + def _normalise(self, source: SyncPipelineLike | SendSync) -> SendSync: + if isinstance(source, SyncPipelineLike): + pipeline = source + + def send(request: Request) -> Response: + return pipeline.run(request, self._dispatch_factory()) + + return send + return source + + def close(self) -> None: + """Close the current response, if any. Idempotent.""" + response = self._response + if response is None: + return + self._response = None + response.close() + + def __enter__(self) -> Self: + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc: BaseException | None, + tb: TracebackType | None, + ) -> None: + self.close() + + def __iter__(self) -> Iterator[SseEvent]: + failures = 0 + try: + while True: + response = self._connect(_resume_request(self._initial, self._last_event_id)) + progressed = yield from self._stream(response) + self.close() + if progressed: + failures = 0 + if self._max_reconnects is not None and failures >= self._max_reconnects: + raise ServiceResponseError("SSE reconnect budget exhausted") + self._clock.sleep( + _next_backoff( + self._retry_base, failures, self._max_backoff, self._jitter, self._rand + ) + ) + failures += 1 + finally: + self.close() + + def _connect(self, request: Request) -> Response: + response = self._send(request) + self._response = response + if not response.status.is_success: + self.close() + raise HttpResponseError(response=response) + return response + + def _stream(self, response: Response) -> Generator[SseEvent, None, bool]: + """Yield one connection's events; return whether any were yielded. + + A transient transport error mid-stream ends the generator normally + (so the caller reconnects); other exceptions propagate. + """ + progressed = False + body = response.body + if body is None: + return progressed + try: + for event in parse_events(body.iter_bytes()): + self._last_event_id, self._retry_base = _observe( + event, self._last_event_id, self._retry_base + ) + progressed = True + yield event + except _TRANSIENT: + return progressed + return progressed + + +__all__ = ["SseConnection"] diff --git a/packages/dexpace-sdk-core/tests/sse/test_connection.py b/packages/dexpace-sdk-core/tests/sse/test_connection.py new file mode 100644 index 0000000..fc8b569 --- /dev/null +++ b/packages/dexpace-sdk-core/tests/sse/test_connection.py @@ -0,0 +1,103 @@ +# Copyright (c) 2026 dexpace and Omar Aljarrah. +# Licensed under the MIT License. See LICENSE.md in the repository root for details. + +"""Tests for the synchronous reconnecting SSE client.""" + +from __future__ import annotations + +import random +from collections.abc import Iterator + +from dexpace.sdk.core.http.common import MediaType, Protocol, Url +from dexpace.sdk.core.http.request import Method, Request +from dexpace.sdk.core.http.response import Response, ResponseBody, Status +from dexpace.sdk.core.http.sse.connection import SseConnection + + +def _request() -> Request: + return Request(method=Method.GET, url=Url.parse("https://api.example.com/stream")) + + +class _DropBody(ResponseBody): + """Yields scripted chunks, then raises to simulate a mid-stream drop.""" + + def __init__(self, chunks: list[bytes], *, error: BaseException | None = None) -> None: + self._chunks = chunks + self._error = error + self.closed = False + + def media_type(self) -> MediaType | None: + return MediaType.parse("text/event-stream") + + def content_length(self) -> int: + return -1 + + def iter_bytes(self, chunk_size: int = 64 * 1024) -> Iterator[bytes]: + yield from self._chunks + if self._error is not None: + raise self._error + + def close(self) -> None: + self.closed = True + + +def _response(body: ResponseBody | None, *, status: Status = Status.OK) -> Response: + return Response( + request=_request(), + protocol=Protocol.HTTP_1_1, + status=status, + body=body, + ) + + +class _Script: + """A send-callable returning scripted responses and recording requests.""" + + def __init__(self, responses: list[Response]) -> None: + self._responses = list(responses) + self.requests: list[Request] = [] + + def __call__(self, request: Request) -> Response: + self.requests.append(request) + if not self._responses: + raise AssertionError("send called more times than scripted") + return self._responses.pop(0) + + +class _RecordingClock: + """Sync Clock that records sleep durations instead of waiting.""" + + def __init__(self) -> None: + self.sleeps: list[float] = [] + + def now(self) -> float: + return 0.0 + + def monotonic(self) -> float: + return 0.0 + + def sleep(self, duration: float) -> None: + self.sleeps.append(duration) + + +class _LowJitter(random.Random): + """``uniform`` returns its low bound — deterministic backoff in tests.""" + + def uniform(self, a: float, b: float) -> float: + return a + + +def test_yields_events_and_closes_on_caller_stop() -> None: + body = _DropBody([b"data: one\n\n", b"data: two\n\n"]) + script = _Script([_response(body)]) + conn = SseConnection(script, _request(), clock=_RecordingClock(), rand=_LowJitter()) + + received: list[str] = [] + with conn as events: + for event in events: + received.append(event.data) + if len(received) == 2: + break + + assert received == ["one", "two"] + assert body.closed is True From 92f1f5eacf66c9d443fbef1c0d37b128026ba84c Mon Sep 17 00:00:00 2001 From: OmarAlJarrah Date: Tue, 9 Jun 2026 15:54:25 +0300 Subject: [PATCH 3/6] test: cover SSE reconnect, replay, backoff, and bounds Add five tests exercising the sync reconnect lifecycle: mid-stream-drop with Last-Event-ID replay, clean-EOF reconnect, server retry: hint with exponential backoff, failure-counter reset after progress, and non-2xx permanent-failure guard. Fix _stream to drive SseParser directly so retry: frames without accompanying data: lines still propagate the server hint to the connection's retry_base before the next sleep, honouring the server's reconnect delay on pure retry-hint frames. Co-Authored-By: Claude Opus 4.8 --- .../dexpace/sdk/core/http/sse/connection.py | 43 +++++---- .../src/dexpace/sdk/core/http/sse/parser.py | 11 +++ .../tests/sse/test_connection.py | 89 +++++++++++++++++++ 3 files changed, 127 insertions(+), 16 deletions(-) diff --git a/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/sse/connection.py b/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/sse/connection.py index 66248c5..94c4eca 100644 --- a/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/sse/connection.py +++ b/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/sse/connection.py @@ -27,7 +27,7 @@ ) from ...util.clock import SYSTEM_CLOCK, Clock from ..context.dispatch_context import DispatchContext -from .parser import parse_events +from .parser import SseParser if TYPE_CHECKING: from collections.abc import Callable, Generator, Iterator @@ -60,19 +60,15 @@ def _resume_request(initial: Request, last_event_id: str | None) -> Request: return initial -def _observe( - event: SseEvent, last_event_id: str | None, retry_base: float -) -> tuple[str | None, float]: - """Fold one event's ``id`` / ``retry`` into the connection state. +def _last_event_id_of(event: SseEvent, current: str | None) -> str | None: + """Return the id to resume from after ``event``. - An explicit empty ``id`` clears the stored id (per the SSE spec, so the - next reconnect omits the header). ``retry`` is given in milliseconds. + An explicit empty id clears the stored id (per the SSE spec), so the next + reconnect omits the ``Last-Event-ID`` header. """ if event.id is not None: - last_event_id = event.id or None - if event.retry is not None: - retry_base = event.retry / 1000.0 - return last_event_id, retry_base + return event.id or None + return current def _next_backoff( @@ -217,20 +213,35 @@ def _stream(self, response: Response) -> Generator[SseEvent, None, bool]: A transient transport error mid-stream ends the generator normally (so the caller reconnects); other exceptions propagate. + + ``retry:`` directives that arrive without an accompanying ``data:`` + field (so no ``SseEvent`` is emitted) are still picked up from the + parser's accumulated state after iteration ends, ensuring the + server-supplied reconnect delay is honoured even on pure + retry-hint frames. """ progressed = False body = response.body if body is None: return progressed + parser = SseParser() try: - for event in parse_events(body.iter_bytes()): - self._last_event_id, self._retry_base = _observe( - event, self._last_event_id, self._retry_base - ) + for chunk in body.iter_bytes(): + parser.feed(chunk) + for event in parser.drain(): + self._last_event_id = _last_event_id_of(event, self._last_event_id) + progressed = True + yield event + for event in parser.end(): + self._last_event_id = _last_event_id_of(event, self._last_event_id) progressed = True yield event except _TRANSIENT: - return progressed + pass + # Capture the server's sticky reconnect hint (including a retry:-only frame + # that emitted no event) for the next reconnection's backoff base. + if parser.retry is not None: + self._retry_base = parser.retry / 1000.0 return progressed diff --git a/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/sse/parser.py b/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/sse/parser.py index 628e412..3823c9a 100644 --- a/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/sse/parser.py +++ b/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/sse/parser.py @@ -126,6 +126,17 @@ class SseParser: _bom_stripped: bool = False max_line_bytes: int = 1 << 20 # 1 MiB + @property + def retry(self) -> int | None: + """The most recent ``retry:`` value seen, in milliseconds, or ``None``. + + Sticky across events for the life of the parser, mirroring the value the + parser stamps onto each emitted ``SseEvent``. Exposed so a reconnecting + client can read the server-supplied reconnect hint even from a + ``retry:``-only frame that emits no event. + """ + return self._retry + def feed(self, chunk: bytes) -> None: """Append ``chunk`` to the parser buffer and consume completed lines. diff --git a/packages/dexpace-sdk-core/tests/sse/test_connection.py b/packages/dexpace-sdk-core/tests/sse/test_connection.py index fc8b569..9c0d01b 100644 --- a/packages/dexpace-sdk-core/tests/sse/test_connection.py +++ b/packages/dexpace-sdk-core/tests/sse/test_connection.py @@ -8,6 +8,9 @@ import random from collections.abc import Iterator +import pytest + +from dexpace.sdk.core.errors import HttpResponseError, ServiceResponseError from dexpace.sdk.core.http.common import MediaType, Protocol, Url from dexpace.sdk.core.http.request import Method, Request from dexpace.sdk.core.http.response import Response, ResponseBody, Status @@ -101,3 +104,89 @@ def test_yields_events_and_closes_on_caller_stop() -> None: assert received == ["one", "two"] assert body.closed is True + + +def test_reconnects_after_mid_stream_drop_and_replays_last_event_id() -> None: + dropped = _DropBody([b"id: 7\ndata: one\n\n"], error=ServiceResponseError("dropped")) + resumed = _DropBody([b"data: two\n\n"]) + script = _Script([_response(dropped), _response(resumed)]) + conn = SseConnection(script, _request(), clock=_RecordingClock(), rand=_LowJitter()) + + received: list[str] = [] + with conn as events: + for event in events: + received.append(event.data) + if len(received) == 2: + break + + assert received == ["one", "two"] + # Second request resumed from the last seen id. + assert script.requests[1].headers.get("last-event-id") == "7" + + +def test_reconnects_after_clean_eof() -> None: + first = _DropBody([b"data: a\n\n"]) + second = _DropBody([b"data: b\n\n"]) + script = _Script([_response(first), _response(second)]) + conn = SseConnection(script, _request(), clock=_RecordingClock(), rand=_LowJitter()) + + received: list[str] = [] + with conn as events: + for event in events: + received.append(event.data) + if len(received) == 2: + break + + assert received == ["a", "b"] + assert first.closed is True + + +def test_honours_server_retry_then_exponential_backoff() -> None: + # First stream sets retry: 1000ms then drops with no events after it; each + # subsequent stream also yields nothing and drops, so failures accumulate. + def drop() -> Response: + return _response(_DropBody([], error=ServiceResponseError("x"))) + + retry_then_drop = _response(_DropBody([b"retry: 1000\n\n"], error=ServiceResponseError("x"))) + script = _Script([retry_then_drop, drop(), drop()]) + clock = _RecordingClock() + conn = SseConnection(script, _request(), clock=clock, rand=_LowJitter(), max_reconnects=2) + + with pytest.raises(ServiceResponseError, match="reconnect budget"): + for _ in conn: + pass + + # base = 1.0s (from retry:), failures 0 and 1 -> [1.0, 2.0]; then bound hit. + assert clock.sleeps == [1.0, 2.0] + + +def test_failure_counter_resets_after_progress() -> None: + # A stream that yields an event resets the backoff exponent: the next + # reconnect delay returns to the base rather than growing. + progress = _response(_DropBody([b"data: x\n\n"], error=ServiceResponseError("x"))) + again = _response(_DropBody([b"data: y\n\n"])) + script = _Script([progress, again]) + clock = _RecordingClock() + conn = SseConnection(script, _request(), clock=clock, rand=_LowJitter()) + + received: list[str] = [] + with conn as events: + for event in events: + received.append(event.data) + if len(received) == 2: + break + + # Both connections progressed, so the single reconnect slept the base 3.0s. + assert clock.sleeps == [3.0] + + +def test_non_success_status_raises_without_reconnect() -> None: + script = _Script([_response(None, status=Status.NOT_FOUND)]) + conn = SseConnection(script, _request(), clock=_RecordingClock(), rand=_LowJitter()) + + with pytest.raises(HttpResponseError): + for _ in conn: + pass + + # No reconnect attempted: send called exactly once. + assert len(script.requests) == 1 From a5c3200c0a8d2dff6988592f7b136e04ff6a2e84 Mon Sep 17 00:00:00 2001 From: OmarAlJarrah Date: Tue, 9 Jun 2026 16:09:20 +0300 Subject: [PATCH 4/6] feat: add asynchronous reconnecting SSE connection Co-Authored-By: Claude Opus 4.8 --- .../dexpace/sdk/core/http/sse/connection.py | 153 +++++++++++++++++- .../src/dexpace/sdk/core/http/sse/parser.py | 8 + .../tests/sse/test_async_connection.py | 147 +++++++++++++++++ 3 files changed, 304 insertions(+), 4 deletions(-) create mode 100644 packages/dexpace-sdk-core/tests/sse/test_async_connection.py diff --git a/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/sse/connection.py b/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/sse/connection.py index 94c4eca..5b82f86 100644 --- a/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/sse/connection.py +++ b/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/sse/connection.py @@ -22,19 +22,23 @@ from ...errors import HttpResponseError, ServiceRequestError, ServiceResponseError from ...pipeline.dispatch import ( + AsyncPipelineLike, + SendAsync, SendSync, SyncPipelineLike, ) -from ...util.clock import SYSTEM_CLOCK, Clock +from ...util.clock import ASYNC_SYSTEM_CLOCK, SYSTEM_CLOCK, AsyncClock, Clock from ..context.dispatch_context import DispatchContext -from .parser import SseParser +from ..response.async_response_body import _shielded_cleanup +from .parser import SseParser, parse_async_events if TYPE_CHECKING: - from collections.abc import Callable, Generator, Iterator + from collections.abc import AsyncIterator, Callable, Generator, Iterator from types import TracebackType from typing import Self from ..request.request import Request + from ..response.async_response import AsyncResponse from ..response.response import Response from .parser import SseEvent @@ -245,4 +249,145 @@ def _stream(self, response: Response) -> Generator[SseEvent, None, bool]: return progressed -__all__ = ["SseConnection"] +class AsyncSseConnection: + """Asynchronous twin of :class:`SseConnection`. + + Mirrors the sync client exactly with ``async`` iteration semantics. Each + (re)connection is dispatched through ``source`` — an ``AsyncPipeline`` (run + with a fresh dispatch context per connection) or an async + ``Request -> AsyncResponse`` callable. The response is closed through the + shielded-cleanup convention, so a cancelled consumer still releases the + transport handle before the cancellation continues to propagate. + + Args: + source: An async pipeline or an async send-callable. + initial_request: The request opening the stream; reused for every + reconnection with an updated ``Last-Event-ID``. + last_event_id: Seed id to resume a previously-interrupted stream. + default_retry: Backoff base (seconds) used until the server sends a + ``retry:`` value. + max_backoff: Ceiling on any single reconnect delay. + max_reconnects: Maximum consecutive failed reconnections before + iteration raises. ``None`` reconnects indefinitely. + jitter: Upward jitter fraction applied to the backoff. + clock: Async time source for backoff sleeps (injected for tests). + rand: RNG for jitter (injected for tests). + dispatch_factory: Builds the dispatch context per connection when + ``source`` is a pipeline. Defaults to ``DispatchContext.noop``. + """ + + __slots__ = ( + "_clock", + "_dispatch_factory", + "_initial", + "_jitter", + "_last_event_id", + "_max_backoff", + "_max_reconnects", + "_rand", + "_response", + "_retry_base", + "_send", + ) + + def __init__( + self, + source: AsyncPipelineLike | SendAsync, + initial_request: Request, + *, + last_event_id: str | None = None, + default_retry: float = _DEFAULT_RETRY_SECONDS, + max_backoff: float = _DEFAULT_MAX_BACKOFF, + max_reconnects: int | None = None, + jitter: float = _DEFAULT_JITTER, + clock: AsyncClock = ASYNC_SYSTEM_CLOCK, + rand: random.Random | None = None, + dispatch_factory: Callable[[], DispatchContext] | None = None, + ) -> None: + self._initial = initial_request + self._last_event_id = last_event_id + self._retry_base = default_retry + self._max_backoff = max_backoff + self._max_reconnects = max_reconnects + self._jitter = jitter + self._clock = clock + self._rand = rand if rand is not None else random.Random() + self._dispatch_factory = dispatch_factory or DispatchContext.noop + self._response: AsyncResponse | None = None + self._send = self._normalise(source) + + def _normalise(self, source: AsyncPipelineLike | SendAsync) -> SendAsync: + if isinstance(source, AsyncPipelineLike): + pipeline = source + + async def send(request: Request) -> AsyncResponse: + return await pipeline.run(request, self._dispatch_factory()) + + return send + return source + + async def aclose(self) -> None: + """Close the current response, if any. Idempotent and cancel-safe.""" + response = self._response + if response is None: + return + self._response = None + await _shielded_cleanup(response.close()) + + async def __aenter__(self) -> Self: + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc: BaseException | None, + tb: TracebackType | None, + ) -> None: + await self.aclose() + + def __aiter__(self) -> AsyncIterator[SseEvent]: + return self._events() + + async def _events(self) -> AsyncIterator[SseEvent]: + failures = 0 + try: + while True: + response = await self._connect(_resume_request(self._initial, self._last_event_id)) + progressed = False + body = response.body + if body is not None: + stream = parse_async_events(body.aiter_bytes()) + try: + async with stream: + async for event in stream: + self._last_event_id = _last_event_id_of(event, self._last_event_id) + progressed = True + yield event + except _TRANSIENT: + pass + if stream.retry is not None: + self._retry_base = stream.retry / 1000.0 + await self.aclose() + if progressed: + failures = 0 + if self._max_reconnects is not None and failures >= self._max_reconnects: + raise ServiceResponseError("SSE reconnect budget exhausted") + await self._clock.sleep( + _next_backoff( + self._retry_base, failures, self._max_backoff, self._jitter, self._rand + ) + ) + failures += 1 + finally: + await self.aclose() + + async def _connect(self, request: Request) -> AsyncResponse: + response = await self._send(request) + self._response = response + if not response.status.is_success: + await self.aclose() + raise HttpResponseError(response=response) + return response + + +__all__ = ["AsyncSseConnection", "SseConnection"] diff --git a/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/sse/parser.py b/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/sse/parser.py index 3823c9a..6692150 100644 --- a/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/sse/parser.py +++ b/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/sse/parser.py @@ -331,6 +331,14 @@ async def __anext__(self) -> SseEvent: self._parser.feed(chunk) self._pending = self._parser.drain() + @property + def retry(self) -> int | None: + """The most recent ``retry:`` value seen by the underlying parser, in + milliseconds, or ``None``. Mirrors :attr:`SseParser.retry` so a + reconnecting client can read the server's sticky hint after iteration. + """ + return self._parser.retry + async def aclose(self) -> None: """Release the upstream byte stream. Idempotent. diff --git a/packages/dexpace-sdk-core/tests/sse/test_async_connection.py b/packages/dexpace-sdk-core/tests/sse/test_async_connection.py new file mode 100644 index 0000000..2c70909 --- /dev/null +++ b/packages/dexpace-sdk-core/tests/sse/test_async_connection.py @@ -0,0 +1,147 @@ +# Copyright (c) 2026 dexpace and Omar Aljarrah. +# Licensed under the MIT License. See LICENSE.md in the repository root for details. + +"""Tests for the asynchronous reconnecting SSE client.""" + +from __future__ import annotations + +import asyncio +import random +from collections.abc import AsyncIterator + +import pytest + +from dexpace.sdk.core.errors import HttpResponseError, ServiceResponseError +from dexpace.sdk.core.http.common import MediaType, Protocol, Url +from dexpace.sdk.core.http.request import Method, Request +from dexpace.sdk.core.http.response import AsyncResponse, AsyncResponseBody, Status +from dexpace.sdk.core.http.sse.connection import AsyncSseConnection + + +def _request() -> Request: + return Request(method=Method.GET, url=Url.parse("https://api.example.com/stream")) + + +class _AsyncDropBody(AsyncResponseBody): + """Yields scripted chunks, optionally then raises, optionally blocks.""" + + def __init__( + self, + chunks: list[bytes], + *, + error: BaseException | None = None, + block: bool = False, + ) -> None: + self._chunks = chunks + self._error = error + self._block = block + self.closed = False + + def media_type(self) -> MediaType | None: + return MediaType.parse("text/event-stream") + + def content_length(self) -> int: + return -1 + + async def aiter_bytes(self, chunk_size: int = 64 * 1024) -> AsyncIterator[bytes]: + for chunk in self._chunks: + yield chunk + if self._block: + await asyncio.Event().wait() # park until cancelled + if self._error is not None: + raise self._error + + async def close(self) -> None: + self.closed = True + + +def _response(body: AsyncResponseBody | None, *, status: Status = Status.OK) -> AsyncResponse: + return AsyncResponse( + request=_request(), + protocol=Protocol.HTTP_1_1, + status=status, + body=body, + ) + + +class _AsyncScript: + def __init__(self, responses: list[AsyncResponse]) -> None: + self._responses = list(responses) + self.requests: list[Request] = [] + + async def __call__(self, request: Request) -> AsyncResponse: + self.requests.append(request) + if not self._responses: + raise AssertionError("send called more times than scripted") + return self._responses.pop(0) + + +class _RecordingAsyncClock: + def __init__(self) -> None: + self.sleeps: list[float] = [] + + def now(self) -> float: + return 0.0 + + def monotonic(self) -> float: + return 0.0 + + async def sleep(self, duration: float) -> None: + self.sleeps.append(duration) + + +class _LowJitter(random.Random): + def uniform(self, a: float, b: float) -> float: + return a + + +async def test_async_yields_events_and_reconnects_with_replay() -> None: + dropped = _AsyncDropBody([b"id: 9\ndata: one\n\n"], error=ServiceResponseError("drop")) + resumed = _AsyncDropBody([b"data: two\n\n"]) + script = _AsyncScript([_response(dropped), _response(resumed)]) + conn = AsyncSseConnection(script, _request(), clock=_RecordingAsyncClock(), rand=_LowJitter()) + + received: list[str] = [] + async with conn as events: + async for event in events: + received.append(event.data) + if len(received) == 2: + break + + assert received == ["one", "two"] + assert script.requests[1].headers.get("last-event-id") == "9" + + +async def test_async_non_success_status_raises() -> None: + script = _AsyncScript([_response(None, status=Status.BAD_GATEWAY)]) + conn = AsyncSseConnection(script, _request(), clock=_RecordingAsyncClock(), rand=_LowJitter()) + + with pytest.raises(HttpResponseError): + async for _ in conn: + pass + assert len(script.requests) == 1 + + +async def test_async_cancellation_propagates_and_closes() -> None: + body = _AsyncDropBody([b"data: hi\n\n"], block=True) + script = _AsyncScript([_response(body)]) + conn = AsyncSseConnection(script, _request(), clock=_RecordingAsyncClock(), rand=_LowJitter()) + + seen: list[str] = [] + + async def consume() -> None: + async with conn as events: + async for event in events: + seen.append(event.data) + + task = asyncio.ensure_future(consume()) + for _ in range(10): + await asyncio.sleep(0) + if seen: + break + task.cancel() + with pytest.raises(asyncio.CancelledError): + await task + + assert seen == ["hi"] + assert body.closed is True From 808f8663e7af6f946cd08363176165e3bb00bbc7 Mon Sep 17 00:00:00 2001 From: OmarAlJarrah Date: Tue, 9 Jun 2026 16:14:34 +0300 Subject: [PATCH 5/6] feat: export reconnecting SSE client from http.sse Co-Authored-By: Claude Opus 4.8 --- .../src/dexpace/sdk/core/http/sse/__init__.py | 5 +- tools/surface_baseline.json | 56 +++++++++++++------ 2 files changed, 42 insertions(+), 19 deletions(-) diff --git a/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/sse/__init__.py b/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/sse/__init__.py index db623ad..ab24b2f 100644 --- a/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/sse/__init__.py +++ b/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/sse/__init__.py @@ -1,14 +1,17 @@ # Copyright (c) 2026 dexpace and Omar Aljarrah. # Licensed under the MIT License. See LICENSE.md in the repository root for details. -"""WHATWG-spec Server-Sent Events parsing.""" +"""WHATWG-spec Server-Sent Events parsing and reconnecting client.""" from __future__ import annotations +from .connection import AsyncSseConnection, SseConnection from .parser import AsyncSseStream, SseEvent, SseParser, parse_async_events, parse_events __all__ = [ + "AsyncSseConnection", "AsyncSseStream", + "SseConnection", "SseEvent", "SseParser", "parse_async_events", diff --git a/tools/surface_baseline.json b/tools/surface_baseline.json index 68f5564..39101ea 100644 --- a/tools/surface_baseline.json +++ b/tools/surface_baseline.json @@ -736,11 +736,26 @@ } } }, + "dexpace.sdk.core.http.sse.connection": { + "AsyncSseConnection": { + "bases": [], + "methods": { + "aclose": "async aclose(self) -> None" + } + }, + "SseConnection": { + "bases": [], + "methods": { + "close": "close(self) -> None" + } + } + }, "dexpace.sdk.core.http.sse.parser": { "AsyncSseStream": { "bases": [], "methods": { - "aclose": "async aclose(self) -> None" + "aclose": "async aclose(self) -> None", + "retry": "retry(self) -> int | None" } }, "SseEvent": { @@ -752,7 +767,8 @@ "methods": { "drain": "drain(self) -> Iterator[SseEvent]", "end": "end(self) -> Iterator[SseEvent]", - "feed": "feed(self, chunk: bytes) -> None" + "feed": "feed(self, chunk: bytes) -> None", + "retry": "retry(self) -> int | None" } }, "parse_async_events": "parse_async_events(chunks: AsyncIterable[bytes]) -> AsyncSseStream", @@ -968,27 +984,11 @@ "by_page": "async by_page(self) -> AsyncIterator[Page[T]]" } }, - "AsyncPipelineLike": { - "bases": [ - "Protocol" - ], - "methods": { - "run": "async run(self, request: Request, dispatch: DispatchContext) -> AsyncResponse" - } - }, "Paginator": { "bases": [], "methods": { "by_page": "by_page(self) -> Iterator[Page[T]]" } - }, - "SyncPipelineLike": { - "bases": [ - "Protocol" - ], - "methods": { - "run": "run(self, request: Request, dispatch: DispatchContext) -> Response" - } } }, "dexpace.sdk.core.pagination.strategy": { @@ -1070,6 +1070,24 @@ "default_async_pipeline": "default_async_pipeline(client: AsyncHttpClient, *, redirect: AsyncRedirectPolicy | None, idempotency: AsyncIdempotencyPolicy | None, retry: AsyncRetryPolicy | None, set_date: AsyncSetDatePolicy | None, client_identity: AsyncClientIdentityPolicy | None, auth: AsyncPolicy | None) -> AsyncStagedPipelineBuilder", "default_pipeline": "default_pipeline(client: HttpClient, *, redirect: RedirectPolicy | None, idempotency: IdempotencyPolicy | None, retry: RetryPolicy | None, set_date: SetDatePolicy | None, client_identity: ClientIdentityPolicy | None, auth: Policy | None, logging: LoggingPolicy | None, tracing: TracingPolicy | None) -> StagedPipelineBuilder" }, + "dexpace.sdk.core.pipeline.dispatch": { + "AsyncPipelineLike": { + "bases": [ + "Protocol" + ], + "methods": { + "run": "async run(self, request: Request, dispatch: DispatchContext) -> AsyncResponse" + } + }, + "SyncPipelineLike": { + "bases": [ + "Protocol" + ], + "methods": { + "run": "run(self, request: Request, dispatch: DispatchContext) -> Response" + } + } + }, "dexpace.sdk.core.pipeline.pipeline": { "Pipeline": { "bases": [], @@ -1492,7 +1510,9 @@ "Status" ], "dexpace.sdk.core.http.sse": [ + "AsyncSseConnection", "AsyncSseStream", + "SseConnection", "SseEvent", "SseParser", "parse_async_events", From 48a6a6345a2065e2da65bfa295012099e3d06555 Mon Sep 17 00:00:00 2001 From: OmarAlJarrah Date: Tue, 9 Jun 2026 16:23:10 +0300 Subject: [PATCH 6/6] fix(sse): chain the underlying transport error on reconnect-budget exhaustion When the reconnect budget was exhausted, both the sync and async SSE clients raised a bare ServiceResponseError("SSE reconnect budget exhausted"), discarding the last transient transport error that caused the final reconnect. The error was caught and swallowed mid-stream, leaving callers without the root cause. Capture the last transient error caught while streaming and chain it as the exception cause (raise ... from last_error) when the budget is hit. The message and exception type are unchanged, so existing matchers still pass; the cause now carries the original transport failure for diagnosis. Also adds the spec-listed tests the suite was missing: empty-id clearing of the replay header, upward-only backoff jitter, async clean-EOF reconnect, and async retry-hint-then-budget exhaustion. Co-Authored-By: Claude Opus 4.8 --- .../dexpace/sdk/core/http/sse/connection.py | 44 ++++++------- .../tests/sse/test_async_connection.py | 34 ++++++++++ .../tests/sse/test_connection.py | 63 +++++++++++++++++++ 3 files changed, 120 insertions(+), 21 deletions(-) diff --git a/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/sse/connection.py b/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/sse/connection.py index 5b82f86..ecf41d7 100644 --- a/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/sse/connection.py +++ b/packages/dexpace-sdk-core/src/dexpace/sdk/core/http/sse/connection.py @@ -186,15 +186,18 @@ def __exit__( def __iter__(self) -> Iterator[SseEvent]: failures = 0 + last_error: BaseException | None = None try: while True: response = self._connect(_resume_request(self._initial, self._last_event_id)) - progressed = yield from self._stream(response) + progressed, error = yield from self._stream(response) + if error is not None: + last_error = error self.close() if progressed: failures = 0 if self._max_reconnects is not None and failures >= self._max_reconnects: - raise ServiceResponseError("SSE reconnect budget exhausted") + raise ServiceResponseError("SSE reconnect budget exhausted") from last_error self._clock.sleep( _next_backoff( self._retry_base, failures, self._max_backoff, self._jitter, self._rand @@ -212,22 +215,22 @@ def _connect(self, request: Request) -> Response: raise HttpResponseError(response=response) return response - def _stream(self, response: Response) -> Generator[SseEvent, None, bool]: - """Yield one connection's events; return whether any were yielded. + def _stream( + self, response: Response + ) -> Generator[SseEvent, None, tuple[bool, BaseException | None]]: + """Yield one connection's events; return (progressed, transient_error). - A transient transport error mid-stream ends the generator normally - (so the caller reconnects); other exceptions propagate. - - ``retry:`` directives that arrive without an accompanying ``data:`` - field (so no ``SseEvent`` is emitted) are still picked up from the - parser's accumulated state after iteration ends, ensuring the - server-supplied reconnect delay is honoured even on pure - retry-hint frames. + A transient transport error mid-stream ends the generator normally (so + the caller reconnects) and is returned so the caller can chain it as the + cause if the reconnect budget is later exhausted; other exceptions + propagate. A ``retry:`` hint that arrived without a data event is picked + up from the parser's sticky state after iteration. """ progressed = False + error: BaseException | None = None body = response.body if body is None: - return progressed + return progressed, error parser = SseParser() try: for chunk in body.iter_bytes(): @@ -240,13 +243,11 @@ def _stream(self, response: Response) -> Generator[SseEvent, None, bool]: self._last_event_id = _last_event_id_of(event, self._last_event_id) progressed = True yield event - except _TRANSIENT: - pass - # Capture the server's sticky reconnect hint (including a retry:-only frame - # that emitted no event) for the next reconnection's backoff base. + except _TRANSIENT as exc: + error = exc if parser.retry is not None: self._retry_base = parser.retry / 1000.0 - return progressed + return progressed, error class AsyncSseConnection: @@ -350,6 +351,7 @@ def __aiter__(self) -> AsyncIterator[SseEvent]: async def _events(self) -> AsyncIterator[SseEvent]: failures = 0 + last_error: BaseException | None = None try: while True: response = await self._connect(_resume_request(self._initial, self._last_event_id)) @@ -363,15 +365,15 @@ async def _events(self) -> AsyncIterator[SseEvent]: self._last_event_id = _last_event_id_of(event, self._last_event_id) progressed = True yield event - except _TRANSIENT: - pass + except _TRANSIENT as exc: + last_error = exc if stream.retry is not None: self._retry_base = stream.retry / 1000.0 await self.aclose() if progressed: failures = 0 if self._max_reconnects is not None and failures >= self._max_reconnects: - raise ServiceResponseError("SSE reconnect budget exhausted") + raise ServiceResponseError("SSE reconnect budget exhausted") from last_error await self._clock.sleep( _next_backoff( self._retry_base, failures, self._max_backoff, self._jitter, self._rand diff --git a/packages/dexpace-sdk-core/tests/sse/test_async_connection.py b/packages/dexpace-sdk-core/tests/sse/test_async_connection.py index 2c70909..584ac6b 100644 --- a/packages/dexpace-sdk-core/tests/sse/test_async_connection.py +++ b/packages/dexpace-sdk-core/tests/sse/test_async_connection.py @@ -145,3 +145,37 @@ async def consume() -> None: assert seen == ["hi"] assert body.closed is True + + +async def test_async_reconnects_after_clean_eof() -> None: + first = _AsyncDropBody([b"data: a\n\n"]) + second = _AsyncDropBody([b"data: b\n\n"]) + script = _AsyncScript([_response(first), _response(second)]) + conn = AsyncSseConnection(script, _request(), clock=_RecordingAsyncClock(), rand=_LowJitter()) + + received: list[str] = [] + async with conn as events: + async for event in events: + received.append(event.data) + if len(received) == 2: + break + + assert received == ["a", "b"] + assert first.closed is True + + +async def test_async_honours_retry_then_budget() -> None: + retry_then_drop = _response( + _AsyncDropBody([b"retry: 1000\n\n"], error=ServiceResponseError("x")) + ) + drop1 = _response(_AsyncDropBody([], error=ServiceResponseError("x"))) + drop2 = _response(_AsyncDropBody([], error=ServiceResponseError("x"))) + script = _AsyncScript([retry_then_drop, drop1, drop2]) + clock = _RecordingAsyncClock() + conn = AsyncSseConnection(script, _request(), clock=clock, rand=_LowJitter(), max_reconnects=2) + + with pytest.raises(ServiceResponseError, match="reconnect budget"): + async for _ in conn: + pass + + assert clock.sleeps == [1.0, 2.0] diff --git a/packages/dexpace-sdk-core/tests/sse/test_connection.py b/packages/dexpace-sdk-core/tests/sse/test_connection.py index 9c0d01b..ad41d40 100644 --- a/packages/dexpace-sdk-core/tests/sse/test_connection.py +++ b/packages/dexpace-sdk-core/tests/sse/test_connection.py @@ -90,6 +90,13 @@ def uniform(self, a: float, b: float) -> float: return a +class _HighJitter(random.Random): + """``uniform`` returns its high bound — exercises upward jitter.""" + + def uniform(self, a: float, b: float) -> float: + return b + + def test_yields_events_and_closes_on_caller_stop() -> None: body = _DropBody([b"data: one\n\n", b"data: two\n\n"]) script = _Script([_response(body)]) @@ -190,3 +197,59 @@ def test_non_success_status_raises_without_reconnect() -> None: # No reconnect attempted: send called exactly once. assert len(script.requests) == 1 + + +def test_empty_id_clears_replay_header() -> None: + # id:5 then an explicit empty id (clears), so the reconnect omits the header. + first = _DropBody([b"id: 5\ndata: a\n\nid:\ndata: b\n\n"], error=ServiceResponseError("x")) + second = _DropBody([b"data: c\n\n"]) + script = _Script([_response(first), _response(second)]) + conn = SseConnection(script, _request(), clock=_RecordingClock(), rand=_LowJitter()) + + received: list[str] = [] + with conn as events: + for event in events: + received.append(event.data) + if len(received) == 3: + break + + assert received == ["a", "b", "c"] + assert script.requests[1].headers.get("last-event-id") is None + + +def test_backoff_jitter_is_upward() -> None: + first = _DropBody([b"data: a\n\n"], error=ServiceResponseError("x")) + second = _DropBody([b"data: b\n\n"]) + script = _Script([_response(first), _response(second)]) + clock = _RecordingClock() + conn = SseConnection(script, _request(), clock=clock, rand=_HighJitter(), jitter=0.1) + + received: list[str] = [] + with conn as events: + for event in events: + received.append(event.data) + if len(received) == 2: + break + + # First connection progressed -> failures reset to 0 -> base 3.0; high + # jitter multiplies by 1.0 + 0.1, never below the base. + assert clock.sleeps == [pytest.approx(3.3)] + assert clock.sleeps[0] > 3.0 + + +def test_budget_exhaustion_chains_last_transport_error() -> None: + boom = ServiceResponseError("connection reset") + script = _Script( + [ + _response(_DropBody([], error=boom)), + _response(_DropBody([], error=boom)), + ] + ) + conn = SseConnection( + script, _request(), clock=_RecordingClock(), rand=_LowJitter(), max_reconnects=1 + ) + + with pytest.raises(ServiceResponseError, match="reconnect budget") as exc_info: + for _ in conn: + pass + assert exc_info.value.__cause__ is boom