feat: CEP-41 - open-stream transport wiring, call_tool_stream, e2e#98
Conversation
|
The core streaming path (server→client via Should fix before merge1. 2. FFI Worth correcting in the PR body3. "Enabled by default … matching the TS SDK" is inaccurate. The TS SDK defaults open-stream to Non-blocking follow-ups (file or defer)4. 5. 6. Server-as-reader keepalive Nothing here blocks the core feature. 1 and 2 are quick; 3 is a doc/wording fix. Happy to see this land. |
|
addressed all 5 review points - FFI mirror fixed (Python/Swift/Kotlin consumers now see supports_open_stream), default reverted to false (genuine TS parity), result flattened to BoxFuture so consumers await once, bind_lock deadlock fixed with tokio::select! + cancel_outbound, and the e-tag correlated_event_id cleaned up to Option<&str> with a comment marking the bidirectional streaming breadcrumb. 737 tests passing, all gates green including --no-default-features. changelog will be updated in this PR #95 as a new commit |
Second and final PR for CEP-41. Wires the PR1 engine into both transports and makes open-ended streaming live end-to-end.
What's in this PR:
Capability advertisement + learning - client advertises support_open_stream when enabled, OR-learns server support. Server advertises via announcements and first responses, OR-learns from inbound client tags. Disabled by default (opt-in via with_open_stream(OpenStreamConfig::enabled())), matching the TS SDK.
Server wiring - writer created on tools/call with a progressToken, captures a RouteSnapshot (client_pubkey, request_id, wrap_kind) at creation so the deferred final response can be sent even after the route store sweeps the entry. Response deferral sits at the top of send_response: a started writer stashes the response and returns immediately, an unstarted writer drops and sends normally. Inbound interception routes start/ping/pong/abort/close to the right handler (writer or server-as-reader). Keepalive sweep drives tick(now) on all sessions.
Client wiring - inbound interception feeds the reader session, forwards stripped+token-restored progress for rmcp's reset_timeout_on_progress, and touches the correlation entry to keep it alive. A bind_lock serializes the push→bind window so concurrent call_tool_stream calls don't cross tokens.
call_tool_stream - free function returning ToolStreamCall { progress_token, stream, result, abort }. Builds its own PeerRequestOptions with no hard lifetime cap unless max_total_timeout_ms is explicitly set.
rmcp worker injection - OpenStreamWriter is injected into request extensions after conversion, before dispatch, so tool handlers reach it via ctx.extensions.
Also fixes a bug: the oversized reassembly path dispatched tools/call directly, bypassing writer creation, so an oversized request with a progressToken never got a writer and never streamed. Fixed by creating the writer on the reassembled end frame.
10 e2e tests covering roundtrip (string + numeric token), deferred response after close, client abort, gate off, unstarted writer, concurrent stream isolation, CEP-22 + CEP-41 composition (oversized request + streaming response), oversized response alongside a separate live stream, and plain call with progressToken not interfering with a live stream. 737 tests passing.
After review: FFI mirror updated (supports_open_stream now visible to Python/Swift/Kotlin consumers), default reverted to false (TS parity), ToolStreamCall::result flattened to BoxFuture, bind_lock deadlock fixed with tokio::select!, e-tag correlated_event_id cleaned up to Option<&str>.
Closes #96