Skip to content

feat(sse): reconnecting client with Last-Event-ID replay and backoff#4

Merged
OmarAlJarrah merged 6 commits into
mainfrom
feat/sse-reconnection
Jun 9, 2026
Merged

feat(sse): reconnecting client with Last-Event-ID replay and backoff#4
OmarAlJarrah merged 6 commits into
mainfrom
feat/sse-reconnection

Conversation

@OmarAlJarrah

Copy link
Copy Markdown
Member

Adds a reconnecting Server-Sent Events client to dexpace-sdk-core, layered on the existing sans-io SSE parser. The parser already turns a byte stream into events; this adds the connection lifecycle that long-lived SSE consumers need — transparent reconnection with Last-Event-ID replay and backoff — with no new runtime dependency.

What's added

  • SseConnection and AsyncSseConnection (http.sse) iterate events across reconnections. Each (re)connection is dispatched through a Pipeline / AsyncPipeline (or a bare send-callable), so retry, auth, redirect, and tracing apply per connection.
  • A public retry accessor on SseParser / AsyncSseStream, so a reconnecting client can read the server's sticky reconnect hint even from a retry:-only frame that emits no event.
  • The pipeline-dispatch protocols (SyncPipelineLike / AsyncPipelineLike and the send-callable aliases) are lifted from the paginator into pipeline.dispatch so the paginator and the SSE client share one definition. The pagination package keeps re-exporting them, so its public surface is unchanged.

Behavior

  • Reconnects on both a transient transport error and a clean end-of-stream (browser EventSource semantics); a non-success HTTP status is a permanent failure and raises HttpResponseError without reconnecting.
  • Replays the last seen event id via the Last-Event-ID header; an explicit empty id clears it.
  • Backoff uses the server retry: value as its base (falling back to a configurable default), grows exponentially across consecutive failures, is jittered upward only (so a reconnect never wakes before the suggested interval), and is capped. The failure counter resets once a connection yields an event.
  • max_reconnects bounds consecutive failures; on exhaustion it raises, chaining the last transport error as the cause.
  • Cancellation propagates while the response is released through the shielded-cleanup convention, so a cancelled consumer never leaks the transport handle.
  • The caller ends the stream by breaking the loop (for example on a data: [DONE] sentinel) or via the (async) context manager.

Verification

ruff check, ruff format --check, mypy --strict (216 source files), and the full pytest suite (1013 passing) are green. New tests cover reconnect-on-drop and clean-EOF, Last-Event-ID replay and empty-id clearing, server-retry: backoff, upward-jitter direction, exponential growth, failure-counter reset, max_reconnects exhaustion with cause chaining, non-success raises, and async cancellation with deterministic cleanup — for both the sync and async clients.

OmarAlJarrah and others added 6 commits June 9, 2026 17:47
Lift SendSync, SendAsync, SyncPipelineLike, and AsyncPipelineLike out of
pagination/paginator.py into the new pipeline/dispatch.py module. The
paginator re-exports all four so the public surface (pagination.__init__)
is unchanged. The forthcoming SSE reconnection feature will import the same
types from pipeline/dispatch without depending on pagination.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Add five tests exercising the sync reconnect lifecycle:
mid-stream-drop with Last-Event-ID replay, clean-EOF reconnect,
server retry: hint with exponential backoff, failure-counter reset after
progress, and non-2xx permanent-failure guard.

Fix _stream to drive SseParser directly so retry: frames without
accompanying data: lines still propagate the server hint to the
connection's retry_base before the next sleep, honouring the server's
reconnect delay on pure retry-hint frames.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…haustion

When the reconnect budget was exhausted, both the sync and async SSE clients
raised a bare ServiceResponseError("SSE reconnect budget exhausted"), discarding
the last transient transport error that caused the final reconnect. The error
was caught and swallowed mid-stream, leaving callers without the root cause.

Capture the last transient error caught while streaming and chain it as the
exception cause (raise ... from last_error) when the budget is hit. The message
and exception type are unchanged, so existing matchers still pass; the cause now
carries the original transport failure for diagnosis.

Also adds the spec-listed tests the suite was missing: empty-id clearing of the
replay header, upward-only backoff jitter, async clean-EOF reconnect, and async
retry-hint-then-budget exhaustion.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@OmarAlJarrah OmarAlJarrah merged commit 03a5f44 into main Jun 9, 2026
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant