Skip to content

The dispatch seam shares pipeline-or-callable types but not the sync/async normalisation, and the two normalisers have diverged #62

@OmarAlJarrah

Description

@OmarAlJarrah

Current design

pipeline/dispatch.py exists to give the paginator and the reconnecting SSE client a single definition of "accept a Pipeline or a bare send-callable" so neither has to import the other. It exports two @runtime_checkable structural Protocols and two callable aliases:

# pipeline/dispatch.py
type SendSync = Callable[["Request"], "Response"]
type SendAsync = Callable[["Request"], Awaitable["AsyncResponse"]]

@runtime_checkable
class SyncPipelineLike(Protocol):
    def run(self, request: Request, dispatch: DispatchContext) -> Response: ...

@runtime_checkable
class AsyncPipelineLike(Protocol):
    async def run(self, request: Request, dispatch: DispatchContext) -> AsyncResponse: ...

What the seam does not factor out is the runtime step that turns a source into a send callable. That _normalise is re-implemented four times — Paginator._normalise and AsyncPaginator._normalise (pagination/paginator.py:139, :212) and SseConnection._normalise / AsyncSseConnection._normalise (http/sse/connection.py:158, :320).

The two copies have drifted. Both paginator normalisers carry a defensive cross-wiring guard:

# pagination/paginator.py — Paginator._normalise
if isinstance(source, SyncPipelineLike):
    pipeline = source
    if inspect.iscoroutinefunction(pipeline.run):
        raise TypeError(
            "Paginator was given an async pipeline; its run() is a "
            "coroutine function. Use AsyncPaginator for async pipelines.",
        )
    ...
if inspect.iscoroutinefunction(source):
    raise TypeError("Paginator was given an async send-callable; use AsyncPaginator.")

The SSE sync normaliser has no such guard:

# http/sse/connection.py — SseConnection._normalise
def _normalise(self, source: SyncPipelineLike | SendSync) -> SendSync:
    if isinstance(source, SyncPipelineLike):
        pipeline = source
        def send(request: Request) -> Response:
            return pipeline.run(request, self._dispatch_factory())
        return send
    return source

Trade-off / concern

The seam centralises the types but leaves its single most error-prone responsibility — telling a sync source from an async one at runtime — to each consumer, and the consumers now disagree.

The structural Protocols cannot make that distinction. isinstance only checks for the presence of a run member, so an async pipeline satisfies both Protocols. A quick check confirms it:

async pipe isinstance SyncPipelineLike: True
async pipe isinstance AsyncPipelineLike: True
iscoroutinefunction(p.run): True

So isinstance(source, SyncPipelineLike) passing tells you nothing about sync-ness. The paginator compensates with the iscoroutinefunction guard above; the SSE client does not. The concrete consequence: hand an AsyncPipeline to SseConnection and it sails through the structural check, wraps the coroutine run in a sync send, and only fails later at stream time with un-awaited coroutines — whereas the same mistake into Paginator raises a clear TypeError at construction. Same seam, same mistake, two different outcomes.

This is the predictable cohesion cost of a half-factored abstraction: four near-identical normalisers, three of which guard a mistake the fourth lets through, and nothing in the type system stops the next consumer of dispatch.py from forgetting the guard too. It also points at a deeper modelling gap — runtime_checkable structural Protocols are the wrong tool for a sync-vs-async discriminator, which is why every consumer falls back to iscoroutinefunction sniffing.

Proposed direction

Move normalisation into dispatch.py next to the types it depends on, as one helper per direction:

# pipeline/dispatch.py
def to_sync_send(source: SyncPipelineLike | SendSync,
                 dispatch_factory: Callable[[], DispatchContext]) -> SendSync: ...
def to_async_send(source: AsyncPipelineLike | SendAsync,
                  dispatch_factory: Callable[[], DispatchContext]) -> SendAsync: ...

Each performs the structural check and the coroutine-direction guard exactly once, so every consumer inherits identical cross-wiring protection. Paginator, AsyncPaginator, SseConnection, and AsyncSseConnection each collapse to a single call.

Worth considering alongside it: make the sync/async distinction explicit rather than sniffed. The runtime sniffing only exists because one structural Protocol can't encode "my run is a coroutine." Options: keep two clearly-named Protocols that the caller selects deliberately and document that the consumer trusts the static type (dropping the runtime guard entirely), or attach a small explicit marker the helper can read instead of inspecting run. Either removes the reliance on an iscoroutinefunction heuristic the Protocol can't enforce.

Trade-off: a shared helper adds one package-private symbol to dispatch.py and couples the consumers to a helper rather than to bare types, slightly widening the seam's surface. In exchange it removes three near-duplicate code paths and, more importantly, closes the divergence where one consumer rejects a wrong-direction source and another silently mis-binds it.

Acknowledging the current rationale

The CHANGELOG frames the seam intentionally: the reconnecting SSE client is "built on the shared dispatch seam (pipeline.dispatch), which lets both the SSE client and the paginator accept either a pipeline or a bare send-callable." So sharing the shape across the two consumers is a deliberate, reasonable goal — this proposal extends that goal rather than reversing it. What is neither stated nor justified anywhere in CLAUDE.md, the docs, or the CHANGELOG is why the normalisation behaviour was left per-consumer, or why the SSE path omits the guard the paginator considered worth adding. Given that the seam was created precisely to stop these two consumers from drifting apart, finishing the factoring so the runtime behaviour is shared too seems in keeping with the original intent.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions