diff --git a/Cargo.toml b/Cargo.toml index d199f39..efc1c8d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,6 +77,14 @@ required-features = ["test-utils"] name = "oversized_timeout_e2e" required-features = ["rmcp", "test-utils"] +[[test]] +name = "open_stream_e2e" +required-features = ["rmcp", "test-utils"] + +[[test]] +name = "open_stream_timeout_e2e" +required-features = ["rmcp", "test-utils"] + [dev-dependencies] # `start_paused` deterministic-time tests (tokio's "full" does not include test-util). tokio = { version = "1", features = ["test-util"] } diff --git a/contextvm-ffi/src/uniffi_types.rs b/contextvm-ffi/src/uniffi_types.rs index d2ae890..ce23fd8 100644 --- a/contextvm-ffi/src/uniffi_types.rs +++ b/contextvm-ffi/src/uniffi_types.rs @@ -96,6 +96,7 @@ pub struct PeerCapabilities { pub supports_encryption: bool, pub supports_ephemeral_encryption: bool, pub supports_oversized_transfer: bool, + pub supports_open_stream: bool, } /// A discovered MCP tool and provider metadata used by foreign clients. @@ -360,6 +361,7 @@ fn capabilities_to_uniffi(caps: contextvm_sdk::PeerCapabilities) -> PeerCapabili supports_encryption: caps.supports_encryption, supports_ephemeral_encryption: caps.supports_ephemeral_encryption, supports_oversized_transfer: caps.supports_oversized_transfer, + supports_open_stream: caps.supports_open_stream, } } diff --git a/src/core/types.rs b/src/core/types.rs index b790e8a..00fada0 100644 --- a/src/core/types.rs +++ b/src/core/types.rs @@ -203,7 +203,7 @@ pub struct ClientSession { /// Learned from client discovery tags: peer supports CEP-22 oversized transfer. pub supports_oversized_transfer: bool, /// Learned from client discovery tags: peer supports CEP-41 open streams - /// (learned, gated by server config; data only until PR2 activation). + /// (learned, gated by the server's open-stream config). pub supports_open_stream: bool, /// Last activity timestamp. pub last_activity: Instant, diff --git a/src/lib.rs b/src/lib.rs index c231863..0cad02b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -97,3 +97,9 @@ pub use rmcp_transport::progress::{ progress_aware_options, PeerRequestOptionsExt, DEFAULT_OVERSIZED_IDLE_TIMEOUT, DEFAULT_OVERSIZED_MAX_TOTAL_TIMEOUT, }; + +// ── CEP-41 open-stream consumer API ───────────────────────────────── +#[cfg(feature = "rmcp")] +pub use rmcp_transport::open_stream::{call_tool_stream, ToolStreamCall}; +#[cfg(feature = "rmcp")] +pub use transport::client::ClientOpenStreamHandle; diff --git a/src/rmcp_transport/mod.rs b/src/rmcp_transport/mod.rs index 8cc1211..f7c870f 100644 --- a/src/rmcp_transport/mod.rs +++ b/src/rmcp_transport/mod.rs @@ -4,6 +4,7 @@ //! ContextVM transports plug directly into rmcp service APIs. pub mod convert; +pub mod open_stream; pub mod progress; pub mod transport; pub mod worker; @@ -15,6 +16,7 @@ pub use convert::{ internal_to_rmcp_client_rx, internal_to_rmcp_server_rx, rmcp_client_tx_to_internal, rmcp_server_tx_to_internal, }; +pub use open_stream::{call_tool_stream, ToolStreamCall}; pub use progress::{ progress_aware_options, PeerRequestOptionsExt, DEFAULT_OVERSIZED_IDLE_TIMEOUT, DEFAULT_OVERSIZED_MAX_TOTAL_TIMEOUT, diff --git a/src/rmcp_transport/open_stream.rs b/src/rmcp_transport/open_stream.rs new file mode 100644 index 0000000..b611248 --- /dev/null +++ b/src/rmcp_transport/open_stream.rs @@ -0,0 +1,198 @@ +//! CEP-41: the ergonomic `call_tool_stream` consumer API. +//! +//! Ports `sdk/src/transport/call-tool-stream.ts`. Pairs a normal rmcp +//! `tools/call` with its CEP-41 open stream: the call is issued with +//! progress-aware request options (so rmcp stamps a `progressToken` and arms the +//! reset-on-progress watcher), and the transport binds the SDK-stamped token to a +//! reader [`OpenStreamSession`] the moment the request is published (via the +//! [`ClientOpenStreamHandle`] obtained before the transport is served). +//! +//! One `tools/call` yields two outputs: the live chunk [`stream`](ToolStreamCall::stream) +//! and the eventual final [`result`](ToolStreamCall::result) — exactly the CEP-41 +//! supplement-not-replace semantics. + +use std::time::Duration; + +use futures::future::BoxFuture; +use rmcp::model::{CallToolRequestParams, CallToolResult}; +use rmcp::service::{Peer, PeerRequestOptions, ServiceError}; +use rmcp::RoleClient; +use tokio::task::{JoinError, JoinHandle}; + +use crate::core::error::{Error, Result}; +use crate::transport::client::ClientOpenStreamHandle; +use crate::transport::open_stream::OpenStreamSession; + +use super::progress::PeerRequestOptionsExt; + +type AbortFn = Box) -> BoxFuture<'static, ()> + Send + Sync>; + +/// A live CEP-41 tool call: the incremental chunk [`stream`](Self::stream), the +/// eventual final [`result`](Self::result), and an [`abort`](Self::abort) handle. +pub struct ToolStreamCall { + /// The stringified `progressToken` correlating the call and its stream. + pub progress_token: String, + /// The async stream of payload chunks (`impl Stream>`). + pub stream: OpenStreamSession, + /// The final `CallToolResult`, resolving after the stream closes (deferral). + /// + /// A **flat** result: the spawned-task (`JoinError`) and rmcp (`ServiceError`) + /// failures are folded into [`crate::Error`], so consumers `await` once rather + /// than unwrapping a nested `Result`. + pub result: BoxFuture<'static, Result>, + abort_fn: AbortFn, +} + +impl ToolStreamCall { + /// Consumer cancel: publish an `abort` frame to the server (so its writer + /// aborts), finalize the local stream, and free the reader registry slot. + pub async fn abort(&self, reason: Option) { + (self.abort_fn)(reason).await; + } +} + +impl std::fmt::Debug for ToolStreamCall { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ToolStreamCall") + .field("progress_token", &self.progress_token) + .finish_non_exhaustive() + } +} + +/// Build the progress-aware [`PeerRequestOptions`] for an open-stream call. +/// +/// Idle timeout covers a full keepalive cycle (idle → probe → close-grace) so the +/// rmcp request is never failed before the open-stream keepalive would have +/// aborted a genuinely-dead stream; `reset_timeout_on_progress` re-arms it on +/// every forwarded chunk/keepalive frame. A hard lifetime cap is applied **only** +/// when `max_total_timeout_ms` is set — never the CEP-22 oversized default (an +/// open stream may legitimately run unbounded). +fn open_stream_request_options(handle: &ClientOpenStreamHandle) -> PeerRequestOptions { + let config = handle.config(); + let idle_ms = config + .idle_timeout_ms + .saturating_add(config.probe_timeout_ms) + .saturating_add(config.close_grace_period_ms) + .max(1); + let mut options = PeerRequestOptions::with_timeout(Duration::from_millis(idle_ms)) + .reset_timeout_on_progress(); + if let Some(max_total_ms) = config.max_total_timeout_ms { + options = options.with_max_total_timeout(Duration::from_millis(max_total_ms)); + } + options +} + +/// Call an MCP tool and return the paired CEP-41 [`ToolStreamCall`]. +/// +/// The reader session is registered **before** the request is published (the +/// placeholder is bound synchronously inside the transport's `send`), so no +/// inbound chunk can race ahead of it. The call itself runs on a spawned task so +/// this returns as soon as the stream handle is available — long before the final +/// result settles. +pub async fn call_tool_stream( + peer: &Peer, + transport: &ClientOpenStreamHandle, + params: CallToolRequestParams, +) -> Result { + // Serialize the placeholder push→bind window so two concurrent + // `call_tool_stream` calls cannot cross their FIFO placeholders against the + // tokens rmcp stamps (the transport binds by FIFO order, but rmcp/worker order + // is independent). At most one placeholder is ever unbound while this is held; + // it is released the moment this call's session binds, so the streams + // themselves still run fully concurrently. + let bind_guard = transport.bind_lock().clone().lock_owned().await; + + // 1. Register the placeholder for the reader session (resolved by `send`). + let pending = transport.prepare_outbound(); + + // 2. Build progress-aware options (rmcp stamps the token + arms the watcher). + let options = open_stream_request_options(transport); + + // 3. Issue the call on a spawned task so we can hand back the stream first. + let peer = peer.clone(); + let mut result_handle: JoinHandle> = + tokio::spawn(async move { peer.call_tool_with_options(params, options).await }); + + // 4. Bind the reader session. Race `pending` against the call itself: if the + // call settles BEFORE the transport binds the placeholder (e.g. rmcp + // rejects the request before publishing), `pending` would never resolve and + // the bind lock would deadlock every later `call_tool_stream`. `biased` + // prefers the bind (a successful call always publishes — and binds — first, + // so the call-settled arm fires only on a pre-publish failure). + let (progress_token, stream) = tokio::select! { + biased; + bound = pending => match bound { + Ok(Ok(pair)) => pair, + Ok(Err(error)) => { + drop(bind_guard); + result_handle.abort(); + return Err(error); + } + Err(_) => { + // The transport closed (or dropped the placeholder) before binding. + drop(bind_guard); + result_handle.abort(); + return Err(Error::Transport( + "transport closed before the outbound open-stream session was bound" + .to_string(), + )); + } + }, + settled = &mut result_handle => { + // The call finished without ever binding a session — drop the orphaned + // placeholder so the next `tools/call` is not mis-bound to it, then + // surface the (flattened) error. + transport.cancel_outbound(); + drop(bind_guard); + return Err(match flatten_call_result(settled) { + Err(error) => error, + Ok(_) => Error::Other( + "open-stream tool call completed without establishing a stream".to_string(), + ), + }); + } + }; + drop(bind_guard); + + // 5. Build the abort handle (publish abort + finalize + free the slot). + let registry = transport.registry(); + let abort_session = stream.clone(); + let abort_token = progress_token.clone(); + let abort_fn: AbortFn = Box::new(move |reason: Option| { + let registry = registry.clone(); + let session = abort_session.clone(); + let token = abort_token.clone(); + Box::pin(async move { + // Publish the `abort` frame to the server + finalize locally. + session.abort(reason.clone()).await; + // Free the concurrency slot + run any hook (idempotent re-finalize). + registry.lock().await.consumer_abort(&token, reason).await; + }) + }); + + // 6. Flatten the spawned call into a single-`await` future (JoinError + + // ServiceError folded into `crate::Error`). + let result: BoxFuture<'static, Result> = + Box::pin(async move { flatten_call_result(result_handle.await) }); + + Ok(ToolStreamCall { + progress_token, + stream, + result, + abort_fn, + }) +} + +/// Fold the doubly-nested `call_tool_with_options` outcome (`JoinError` outside, +/// `ServiceError` inside) into a flat [`crate::Error`]. +fn flatten_call_result( + settled: std::result::Result, JoinError>, +) -> Result { + match settled { + Ok(Ok(result)) => Ok(result), + Ok(Err(service_error)) => Err(Error::Transport(service_error.to_string())), + Err(join_error) => Err(Error::Other(format!( + "call_tool_stream task failed: {join_error}" + ))), + } +} diff --git a/src/rmcp_transport/worker.rs b/src/rmcp_transport/worker.rs index da83b79..27d7dab 100644 --- a/src/rmcp_transport/worker.rs +++ b/src/rmcp_transport/worker.rs @@ -8,6 +8,7 @@ use crate::core::error::Result; use crate::core::types::{JsonRpcMessage, JsonRpcNotification, JsonRpcRequest}; use crate::transport::client::{NostrClientTransport, NostrClientTransportConfig}; use crate::transport::server::{NostrServerTransport, NostrServerTransportConfig}; +use rmcp::model::GetExtensions; use rmcp::transport::worker::{Worker, WorkerContext, WorkerQuitReason}; use std::collections::HashSet; @@ -204,7 +205,20 @@ impl Worker for NostrServerWorker { } } - if let Some(rmcp_msg) = internal_to_rmcp_server_rx(&message) { + if let Some(mut rmcp_msg) = internal_to_rmcp_server_rx(&message) { + // CEP-41: inject the open-stream writer into the + // request's `extensions` typemap so the tool handler can + // reach it via `ctx.extensions.get::()`. + // The rmcp service loop swaps these extensions straight into + // the handler's `RequestContext` before dispatch. No-op when + // open-stream is disabled or the request has no writer. + if let rmcp::model::JsonRpcMessage::Request(ref mut jr) = rmcp_msg { + if let Some(writer) = + self.transport.get_open_stream_writer(&event_id) + { + jr.request.extensions_mut().insert(writer); + } + } if let Err(reason) = context.send_to_handler(rmcp_msg).await { break reason; } diff --git a/src/transport/client/mod.rs b/src/transport/client/mod.rs index 8e0abf2..0dd4f61 100644 --- a/src/transport/client/mod.rs +++ b/src/transport/client/mod.rs @@ -10,15 +10,16 @@ pub mod server_relay_discovery; pub use correlation_store::ClientCorrelationStore; -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::num::NonZeroUsize; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; -use std::time::Duration; +use std::time::{Duration, Instant}; use lru::LruCache; use nostr_sdk::prelude::*; use tokio::sync::oneshot; +use tokio::sync::Mutex as AsyncMutex; use tokio_util::sync::CancellationToken; use crate::core::constants::*; @@ -30,7 +31,10 @@ use crate::encryption; use crate::relay::{RelayPool, RelayPoolTrait}; use crate::transport::base::BaseTransport; use crate::transport::discovery_tags::{parse_discovered_peer_capabilities, PeerCapabilities}; -use crate::transport::open_stream::OpenStreamConfig; +use crate::transport::open_stream::{ + FrameOutcome, KeepaliveAction, OpenStreamConfig, OpenStreamFrame, OpenStreamReceiver, + OpenStreamRegistry, OpenStreamSession, OpenStreamSessionInit, PublishFrame, +}; use crate::transport::oversized_transfer::{ build_oversized_frames, progress_token_string, resolve_safe_chunk_size, send_oversized_transfer, OversizedFrame, OversizedSenderOptions, OversizedTransferConfig, @@ -71,8 +75,9 @@ pub struct NostrClientTransportConfig { pub oversized_transfer: OversizedTransferConfig, /// CEP-41 open-stream configuration. Disabled by default (opt-in). /// - /// **Data only in PR1** — the event loop does not consult it yet; activation - /// (capability advertisement, learning, `call_tool_stream`) lands in PR2. + /// When enabled, drives capability advertisement/learning, the inbound reader + /// engine, the keepalive sweep, and `call_tool_stream`. Opt in with + /// `OpenStreamConfig::enabled()` / `with_enabled(true)`. pub open_stream: OpenStreamConfig, } @@ -144,9 +149,8 @@ impl NostrClientTransportConfig { self.oversized_transfer.enabled = enabled; self } - /// Set the full CEP-41 open-stream configuration. - /// - /// Data only in PR1: the event loop does not read this until PR2. + /// Set the full CEP-41 open-stream configuration (disabled by default; opt in + /// with `OpenStreamConfig::enabled()`). pub fn with_open_stream(mut self, config: OpenStreamConfig) -> Self { self.open_stream = config; self @@ -193,6 +197,26 @@ pub struct NostrClientTransport { /// (`Number(5)` ≠ `String("5")`). LRU-bounded; entries are dropped when /// their transfer concludes and cleared on [`close`](Self::close). original_progress_tokens: Arc>>, + /// CEP-41: inbound reader engine for server→client streams (single peer). + /// Outbound `call_tool_stream` sessions are created here too (each with a + /// publish closure for consumer abort). Disposed on [`close`](Self::close). + open_stream_registry: Arc>, + /// CEP-41: FIFO of `call_tool_stream` placeholders awaiting their SDK-stamped + /// progress token. [`send`](Self::send) binds the next one when a `tools/call` + /// carrying a token is published (mirrors TS `pendingOutboundOpenStreamResolvers`). + #[allow(clippy::type_complexity)] + pending_outbound_open_stream: + Arc>>>>, + /// CEP-41: monotonic `progress` for client→server control frames (`ping`/`pong` + /// from the keepalive sweep / inbound handler). The server does not validate + /// these, so one shared counter suffices. + open_stream_control_progress: Arc, + /// CEP-41: serializes the placeholder push→bind window of `call_tool_stream`. + /// The placeholder FIFO is matched to a request by *order*, but rmcp stamps the + /// token (and the worker publishes) in its own order — so two concurrent + /// `call_tool_stream` calls could otherwise bind each other's tokens. Holding + /// this across one call's push→bind keeps at most one placeholder unbound. + open_stream_bind_lock: Arc>, /// Channel for receiving processed MCP messages from the event loop. message_tx: Option>, message_rx: Option>, @@ -202,6 +226,79 @@ pub struct NostrClientTransport { event_loop_handle: Option>, } +/// CEP-41: a cheap, shareable handle to a client transport's open-stream state, +/// passed to [`call_tool_stream`](crate::call_tool_stream). +/// +/// Obtained via [`NostrClientTransport::open_stream_handle`] before the transport +/// is moved into an rmcp service. It shares the transport's registry + placeholder +/// `Arc`s, so the served transport's `send` binds the placeholders this handle +/// pushes. +/// +/// Only consumed through `call_tool_stream` (the `rmcp` feature); without it the +/// handle is dead but harmless. +#[cfg_attr(not(feature = "rmcp"), allow(dead_code))] +#[derive(Clone)] +pub struct ClientOpenStreamHandle { + registry: Arc>, + #[allow(clippy::type_complexity)] + pending: Arc>>>>, + bind_lock: Arc>, + config: OpenStreamConfig, +} + +#[cfg_attr(not(feature = "rmcp"), allow(dead_code))] +impl ClientOpenStreamHandle { + /// Register a placeholder for the next outbound `call_tool_stream` session + /// (resolved by the served transport's `send`). + pub(crate) fn prepare_outbound( + &self, + ) -> oneshot::Receiver> { + let (tx, rx) = oneshot::channel(); + let mut pending = match self.pending.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + }; + pending.push_back(tx); + rx + } + + /// Drop the most-recently registered (still-unbound) placeholder. + /// + /// Called by `call_tool_stream` when its request fails before the transport's + /// `send` ever binds it — otherwise the orphaned placeholder would be popped + /// (and mis-bound) by the next outbound `tools/call`. The bind lock guarantees + /// only this call's placeholder can be unbound, so the back of the FIFO is it. + pub(crate) fn cancel_outbound(&self) { + let mut pending = match self.pending.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + }; + pending.pop_back(); + } + + /// Clone the reader-registry handle (for the consumer `abort` path). + pub(crate) fn registry(&self) -> Arc> { + self.registry.clone() + } + + /// The placeholder push→bind serialization lock (see the transport field). + pub(crate) fn bind_lock(&self) -> &Arc> { + &self.bind_lock + } + + /// The open-stream config (for the request-timeout options). + pub(crate) fn config(&self) -> &OpenStreamConfig { + &self.config + } +} + +impl std::fmt::Debug for ClientOpenStreamHandle { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ClientOpenStreamHandle") + .finish_non_exhaustive() + } +} + impl NostrClientTransport { /// Create a new client transport. pub async fn new(signer: T, config: NostrClientTransportConfig) -> Result @@ -258,6 +355,9 @@ impl NostrClientTransport { let original_progress_tokens = Arc::new(Mutex::new(LruCache::new( NonZeroUsize::new(DEFAULT_LRU_SIZE).expect("DEFAULT_LRU_SIZE must be non-zero"), ))); + let open_stream_registry = Arc::new(AsyncMutex::new(OpenStreamRegistry::with_policy( + (&config.open_stream).into(), + ))); Ok(Self { base: BaseTransport { @@ -268,6 +368,10 @@ impl NostrClientTransport { oversized_receiver, accept_waiters, original_progress_tokens, + open_stream_registry, + pending_outbound_open_stream: Arc::new(Mutex::new(VecDeque::new())), + open_stream_control_progress: Arc::new(AtomicU64::new(0)), + open_stream_bind_lock: Arc::new(AsyncMutex::new(())), config, server_pubkey, hinted_relay_urls, @@ -332,6 +436,9 @@ impl NostrClientTransport { let original_progress_tokens = Arc::new(Mutex::new(LruCache::new( NonZeroUsize::new(DEFAULT_LRU_SIZE).expect("DEFAULT_LRU_SIZE must be non-zero"), ))); + let open_stream_registry = Arc::new(AsyncMutex::new(OpenStreamRegistry::with_policy( + (&config.open_stream).into(), + ))); Ok(Self { base: BaseTransport { @@ -342,6 +449,10 @@ impl NostrClientTransport { oversized_receiver, accept_waiters, original_progress_tokens, + open_stream_registry, + pending_outbound_open_stream: Arc::new(Mutex::new(VecDeque::new())), + open_stream_control_progress: Arc::new(AtomicU64::new(0)), + open_stream_bind_lock: Arc::new(AsyncMutex::new(())), config, server_pubkey, hinted_relay_urls, @@ -435,6 +546,9 @@ impl NostrClientTransport { let accept_waiters = self.accept_waiters.clone(); let original_progress_tokens = self.original_progress_tokens.clone(); let oversized_enabled = self.config.oversized_transfer.enabled; + let open_stream_registry = self.open_stream_registry.clone(); + let open_stream_control_progress = self.open_stream_control_progress.clone(); + let open_stream_enabled = self.config.open_stream.enabled; let timeout = self.config.timeout; let token = self.cancellation_token.child_token(); @@ -454,6 +568,9 @@ impl NostrClientTransport { accept_waiters, original_progress_tokens, oversized_enabled, + open_stream_registry, + open_stream_control_progress, + open_stream_enabled, timeout, token, ) @@ -499,6 +616,16 @@ impl NostrClientTransport { }; originals.clear(); } + // CEP-41: dispose reader sessions and drop any unbound `call_tool_stream` + // placeholders so their awaiters unblock (cancelled) instead of hanging. + self.open_stream_registry.lock().await.clear(); + { + let mut pending = match self.pending_outbound_open_stream.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + }; + pending.clear(); + } self.base.disconnect().await } @@ -520,6 +647,30 @@ impl NostrClientTransport { } let is_request = message.is_request(); + + // CEP-41: bind a pending `call_tool_stream` placeholder to this request's + // SDK-stamped progress token, synchronously before publish (mirrors TS), and + // record the token's original JSON type. Inbound stream progress is then + // forwarded with that type restored so rmcp's reset-on-progress watcher — + // keyed by the numeric token rmcp stamped — actually fires. + if is_request && self.config.open_stream.enabled { + if let JsonRpcMessage::Request(req) = message { + if req.method == "tools/call" { + if let Some(original) = req + .params + .as_ref() + .and_then(|p| p.get("_meta")) + .and_then(|m| m.get("progressToken")) + { + if let Some(token) = progress_token_string(original) { + self.bind_pending_outbound_open_stream(&token, original) + .await; + } + } + } + } + } + let base_tags = BaseTransport::create_recipient_tags(&self.server_pubkey); let discovery_tags = if is_request { self.get_pending_client_discovery_tags() @@ -986,6 +1137,9 @@ impl NostrClientTransport { accept_waiters: Arc>>>, original_progress_tokens: Arc>>, oversized_enabled: bool, + open_stream_registry: Arc>, + open_stream_control_progress: Arc, + open_stream_enabled: bool, timeout: Duration, cancel: CancellationToken, ) { @@ -1031,6 +1185,9 @@ impl NostrClientTransport { &oversized_receiver, &accept_waiters, &original_progress_tokens, + &open_stream_registry, + &open_stream_control_progress, + open_stream_enabled, &relay_pool, ) .await; @@ -1067,6 +1224,24 @@ impl NostrClientTransport { ); } } + // CEP-41: drive the open-stream keepalive (idle → ping; probe / + // close-grace deadline → abort) for every active reader session. + if open_stream_enabled { + let gift_wrap_kind = outbound_gift_wrap_kind( + gift_wrap_mode, + server_supports_ephemeral.load(Ordering::Relaxed), + ); + Self::sweep_client_open_stream_sessions( + &open_stream_registry, + &open_stream_control_progress, + &relay_pool, + server_pubkey, + encryption_mode, + gift_wrap_kind, + Instant::now(), + ) + .await; + } } } } @@ -1096,6 +1271,13 @@ impl NostrClientTransport { Vec::::new(), )); } + // CEP-41: advertise open-stream support when enabled. + if self.config.open_stream.enabled { + tags.push(Tag::custom( + TagKind::Custom(tags::SUPPORT_OPEN_STREAM.into()), + Vec::::new(), + )); + } tags } @@ -1129,6 +1311,8 @@ impl NostrClientTransport { caps.supports_ephemeral_encryption |= discovered.capabilities.supports_ephemeral_encryption; caps.supports_oversized_transfer |= discovered.capabilities.supports_oversized_transfer; + // CEP-41: OR-learn the server's open-stream support (never downgrades). + caps.supports_open_stream |= discovered.capabilities.supports_open_stream; } let mut stored = match init_event.lock() { @@ -1184,6 +1368,273 @@ impl NostrClientTransport { *guard } + // ── CEP-41 open-stream ──────────────────────────────────────── + + /// CEP-41: register a placeholder for the next outbound `call_tool_stream` + /// session. [`send`](Self::send) binds it to the paired `tools/call`'s + /// SDK-stamped progress token and resolves the returned receiver with + /// `(progress_token, OpenStreamSession)` (or an error on admission failure / + /// transport close). + pub fn prepare_outbound_open_stream_session( + &self, + ) -> oneshot::Receiver> { + let (tx, rx) = oneshot::channel(); + let mut pending = match self.pending_outbound_open_stream.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + }; + pending.push_back(tx); + rx + } + + /// CEP-41: bind the oldest pending placeholder to `token`, creating its reader + /// session. A no-op when no placeholder is waiting (an ordinary `tools/call`). + /// + /// The original token JSON value is recorded **only when a placeholder is + /// actually bound** (i.e. for a `call_tool_stream`), so the inbound handler can + /// restore the numeric type for rmcp's reset-on-progress watcher — an ordinary + /// `tools/call` (no outbound stream) records nothing. + async fn bind_pending_outbound_open_stream(&self, token: &str, original: &serde_json::Value) { + let waiter = { + let mut pending = match self.pending_outbound_open_stream.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + }; + pending.pop_front() + }; + if let Some(waiter) = waiter { + self.record_original_progress_token(token, original); + let result = self + .create_outbound_open_stream_session(token) + .await + .map(|session| (token.to_string(), session)); + let _ = waiter.send(result); + } + } + + /// CEP-41: get-or-create the reader session for `token`, injecting the publish + /// closure used by the consumer `abort` path. + async fn create_outbound_open_stream_session(&self, token: &str) -> Result { + let mut registry = self.open_stream_registry.lock().await; + if let Some(existing) = registry.get_session(token) { + return Ok(existing); + } + let init = OpenStreamSessionInit { + publish_frame: Some(self.open_stream_publish_closure()), + ..Default::default() + }; + Ok(registry.create_session_with(token, init)?) + } + + /// CEP-41: the reader session for `token`, if one exists. + pub async fn get_open_stream_session(&self, token: &str) -> Option { + self.open_stream_registry.lock().await.get_session(token) + } + + /// CEP-41: a cheap, shareable handle to this transport's open-stream state for + /// [`call_tool_stream`](crate::call_tool_stream). + /// + /// Obtain it **before** moving the transport into an rmcp service (which + /// consumes it): the handle clones the registry + placeholder `Arc`s, so the + /// served transport's `send` still binds the placeholders the handle pushes. + pub fn open_stream_handle(&self) -> ClientOpenStreamHandle { + ClientOpenStreamHandle { + registry: self.open_stream_registry.clone(), + pending: self.pending_outbound_open_stream.clone(), + bind_lock: self.open_stream_bind_lock.clone(), + config: self.config.open_stream.clone(), + } + } + + /// CEP-41: consumer cancel for an outbound stream — publish an `abort` frame to + /// the server (so its writer aborts), finalize the local stream, and free the + /// registry slot. Exposed to consumers via `ToolStreamCall::abort`. + pub async fn abort_open_stream(&self, token: &str, reason: Option) { + let session = { self.open_stream_registry.lock().await.get_session(token) }; + if let Some(session) = session { + // Publishes the `abort` frame to the server + finalizes locally. + session.abort(reason.clone()).await; + } + // Free the concurrency slot + run any hook (idempotent re-finalize). + self.open_stream_registry + .lock() + .await + .consumer_abort(token, reason) + .await; + } + + /// CEP-41: build the outbound publish closure for an open-stream session — + /// publishes a `notifications/progress` frame to the server (recipient-tagged; + /// the server correlates by `progressToken`, so no `e` tag is needed). + fn open_stream_publish_closure(&self) -> PublishFrame { + let relay_pool = Arc::clone(&self.base.relay_pool); + let encryption_mode = self.config.encryption_mode; + let server_pubkey = self.server_pubkey; + let gift_wrap_kind = self.choose_outbound_gift_wrap_kind(); + Arc::new(move |notification: JsonRpcNotification| { + let relay_pool = Arc::clone(&relay_pool); + Box::pin(async move { + let base = BaseTransport { + relay_pool, + encryption_mode, + is_connected: true, + }; + let tags = BaseTransport::create_recipient_tags(&server_pubkey); + base.send_mcp_message( + &JsonRpcMessage::Notification(notification), + &server_pubkey, + CTXVM_MESSAGES_KIND, + tags, + None, + Some(gift_wrap_kind), + ) + .await + }) + }) + } + + /// CEP-41 inbound interception (beside the oversized branch). Feeds the frame + /// to the reader engine, publishes a `pong` on `SendPong`, forwards a stripped + /// progress with the original token restored (resets rmcp's idle timer), + /// and keeps the request correlation alive via `pending.touch`. + #[allow(clippy::too_many_arguments)] + async fn handle_inbound_open_stream_frame( + open_stream_registry: &Arc>, + open_stream_control_progress: &Arc, + original_progress_tokens: &Mutex>, + pending: &ClientCorrelationStore, + relay_pool: &Arc, + server_pubkey: PublicKey, + encryption_mode: EncryptionMode, + gift_wrap_kind: u16, + tx: &tokio::sync::mpsc::UnboundedSender, + notif: &JsonRpcNotification, + e_tag: Option<&str>, + ) { + let token = notif + .params + .as_ref() + .and_then(|p| p.get("progressToken")) + .and_then(progress_token_string); + + // Keep the request correlation alive — chunks don't otherwise refresh it. + if let Some(correlated) = e_tag { + pending.touch(correlated).await; + } + + // Feed the reader engine (delivers the chunk to the consumer's stream). + let outcome = { + open_stream_registry + .lock() + .await + .process_frame(Instant::now(), notif) + .await + }; + + // SendPong → answer the peer's keepalive ping. + if let Ok(FrameOutcome::SendPong(nonce)) = &outcome { + if let Some(token) = token.as_deref() { + let progress = open_stream_control_progress.fetch_add(1, Ordering::SeqCst) + 1; + if let Ok(frame) = (OpenStreamFrame::Pong { + nonce: nonce.clone(), + }) + .into_progress_notification(token, progress, None) + { + let base = BaseTransport { + relay_pool: Arc::clone(relay_pool), + encryption_mode, + is_connected: true, + }; + let tags = BaseTransport::create_recipient_tags(&server_pubkey); + let _ = base + .send_mcp_message( + &JsonRpcMessage::Notification(frame), + &server_pubkey, + CTXVM_MESSAGES_KIND, + tags, + None, + Some(gift_wrap_kind), + ) + .await; + } + } + } + + // Reset rmcp's idle timer: forward a stripped progress carrying the ORIGINAL + // (numeric) token — the chunk itself already reached the consumer's stream. + // Done before the terminal cleanup so the token is still recorded. + if let Some(token) = token.as_deref() { + Self::forward_stripped_progress(notif, token, original_progress_tokens, tx); + } + + // Drop the recorded original token once the stream is terminal. + if matches!( + &outcome, + Ok(FrameOutcome::Closed) | Ok(FrameOutcome::Aborted(_)) | Err(_) + ) { + Self::remove_original_progress_token(original_progress_tokens, token.as_deref()); + } + } + + /// CEP-41 client keepalive sweep: drive each reader session's pure `tick(now)`; + /// publish a `ping` on `SendPing` (an `Abort` already finalized + removed the + /// session — the consumer sees the terminal error on the stream). `now` is a + /// parameter so tests can drive idle→ping→probe deterministically. + async fn sweep_client_open_stream_sessions( + open_stream_registry: &Arc>, + open_stream_control_progress: &Arc, + relay_pool: &Arc, + server_pubkey: PublicKey, + encryption_mode: EncryptionMode, + gift_wrap_kind: u16, + now: Instant, + ) { + let actions = { open_stream_registry.lock().await.tick_all(now) }; + for (token, action) in actions { + if let KeepaliveAction::SendPing(nonce) = action { + let progress = open_stream_control_progress.fetch_add(1, Ordering::SeqCst) + 1; + if let Ok(frame) = (OpenStreamFrame::Ping { nonce }) + .into_progress_notification(&token, progress, None) + { + let base = BaseTransport { + relay_pool: Arc::clone(relay_pool), + encryption_mode, + is_connected: true, + }; + let tags = BaseTransport::create_recipient_tags(&server_pubkey); + let _ = base + .send_mcp_message( + &JsonRpcMessage::Notification(frame), + &server_pubkey, + CTXVM_MESSAGES_KIND, + tags, + None, + Some(gift_wrap_kind), + ) + .await; + } + } + } + } + + /// CEP-41: run one keepalive sweep at `now`. The event loop drives this on its + /// own timer with `Instant::now()`; it is also exposed so callers (and + /// deterministic tests) can drive idle→ping→probe→abort with an explicit + /// instant — the session clock is `std::time::Instant`, unaffected by + /// `tokio`'s `start_paused`. + pub async fn run_open_stream_keepalive_sweep(&self, now: Instant) { + Self::sweep_client_open_stream_sessions( + &self.open_stream_registry, + &self.open_stream_control_progress, + &self.base.relay_pool, + self.server_pubkey, + self.config.encryption_mode, + self.choose_outbound_gift_wrap_kind(), + now, + ) + .await; + } + #[allow(clippy::too_many_arguments)] async fn handle_notification( notification: &RelayPoolNotification, @@ -1199,6 +1650,9 @@ impl NostrClientTransport { oversized_receiver: &Arc>, accept_waiters: &Arc>>>, original_progress_tokens: &Arc>>, + open_stream_registry: &Arc>, + open_stream_control_progress: &Arc, + open_stream_enabled: bool, relay_pool: &Arc, ) { let event = match notification { @@ -1342,6 +1796,39 @@ impl NostrClientTransport { server_supports_ephemeral.store(true, Ordering::Relaxed); } + // CEP-41: intercept open-stream frames before the correlation gate, beside + // the oversized branch. Type-disjoint (`is_open_stream_frame` vs + // `is_oversized_frame` claim distinct `cvm.type`s). The engine delivers + // chunks to the consumer's `OpenStreamSession`; here we also forward a + // stripped progress to reset rmcp's idle timer and keep correlation alive. + if open_stream_enabled { + if let Ok(notif) = serde_json::from_str::(&actual_event_content) { + if notif.method == NOTIFICATIONS_PROGRESS_METHOD + && OpenStreamReceiver::is_open_stream_frame(¬if) + { + let gift_wrap_kind = outbound_gift_wrap_kind( + gift_wrap_mode, + server_supports_ephemeral.load(Ordering::Relaxed), + ); + Self::handle_inbound_open_stream_frame( + open_stream_registry, + open_stream_control_progress, + original_progress_tokens, + pending, + relay_pool, + server_pubkey, + encryption_mode, + gift_wrap_kind, + tx, + ¬if, + e_tag.as_deref(), + ) + .await; + return; + } + } + } + // CEP-22: intercept oversized-transfer frames ABOVE the correlation gate // below. This is mandatory: an `accept` is e-tagged to the start frame // (not in `pending`), and chunk/end response frames must be reassembled @@ -1520,17 +2007,10 @@ impl NostrClientTransport { } fn choose_outbound_gift_wrap_kind(&self) -> u16 { - match self.config.gift_wrap_mode { - GiftWrapMode::Persistent => GIFT_WRAP_KIND, - GiftWrapMode::Ephemeral => EPHEMERAL_GIFT_WRAP_KIND, - GiftWrapMode::Optional => { - if self.server_supports_ephemeral.load(Ordering::Relaxed) { - EPHEMERAL_GIFT_WRAP_KIND - } else { - GIFT_WRAP_KIND - } - } - } + outbound_gift_wrap_kind( + self.config.gift_wrap_mode, + self.server_supports_ephemeral.load(Ordering::Relaxed), + ) } fn has_support_ephemeral_tag(tags: &Tags) -> bool { @@ -1564,6 +2044,23 @@ fn is_gift_wrap_kind(kind: &Kind) -> bool { *kind == Kind::Custom(GIFT_WRAP_KIND) || *kind == Kind::Custom(EPHEMERAL_GIFT_WRAP_KIND) } +/// CEP-19: the outbound gift-wrap kind for the client (free fn so the static +/// event-loop / sweep contexts can resolve it without `&self`). +#[inline] +fn outbound_gift_wrap_kind(mode: GiftWrapMode, server_supports_ephemeral: bool) -> u16 { + match mode { + GiftWrapMode::Persistent => GIFT_WRAP_KIND, + GiftWrapMode::Ephemeral => EPHEMERAL_GIFT_WRAP_KIND, + GiftWrapMode::Optional => { + if server_supports_ephemeral { + EPHEMERAL_GIFT_WRAP_KIND + } else { + GIFT_WRAP_KIND + } + } + } +} + /// Returns `true` when the inbound event kind violates the configured encryption /// policy and must be dropped before any further processing. #[inline] @@ -1837,6 +2334,10 @@ mod tests { original_progress_tokens: Arc::new(Mutex::new(LruCache::new( NonZeroUsize::new(10).unwrap(), ))), + open_stream_registry: Arc::new(AsyncMutex::new(OpenStreamRegistry::new())), + pending_outbound_open_stream: Arc::new(Mutex::new(VecDeque::new())), + open_stream_control_progress: Arc::new(AtomicU64::new(0)), + open_stream_bind_lock: Arc::new(AsyncMutex::new(())), message_tx: Some(tokio::sync::mpsc::unbounded_channel().0), message_rx: None, cancellation_token: CancellationToken::new(), @@ -1859,7 +2360,7 @@ mod tests { let t = make_transport_for_tags(EncryptionMode::Optional, GiftWrapMode::Optional); let tags = t.get_client_capability_tags(); let names = tag_names(&tags); - // The oversized tag (default-on) is pushed last. + // The oversized tag (default-on) is pushed last; open-stream is opt-in. assert_eq!( names, vec![ @@ -1874,7 +2375,7 @@ mod tests { fn client_capability_tags_encryption_disabled() { let t = make_transport_for_tags(EncryptionMode::Disabled, GiftWrapMode::Optional); let tags = t.get_client_capability_tags(); - // No encryption tags; the default-on oversized tag remains. + // No encryption tags; the default-on oversized tag remains (open-stream is opt-in). assert_eq!(tag_names(&tags), vec!["support_oversized_transfer"]); } @@ -1932,6 +2433,32 @@ mod tests { assert_eq!(names, vec!["support_oversized_transfer"]); } + #[test] + fn client_capability_tags_open_stream_gate() { + // Open-stream is opt-in: absent by default. + let mut t = make_transport_for_tags(EncryptionMode::Disabled, GiftWrapMode::Optional); + assert!( + !tag_names(&t.get_client_capability_tags()) + .contains(&"support_open_stream".to_string()), + "open-stream tag must be absent by default (opt-in)" + ); + // Enabling it advertises the single-element tag. + t.config.open_stream = OpenStreamConfig::enabled(); + assert!( + tag_names(&t.get_client_capability_tags()).contains(&"support_open_stream".to_string()), + "open-stream tag must be advertised when enabled" + ); + } + + #[test] + fn client_learn_server_discovery_learns_open_stream() { + let caps = Mutex::new(PeerCapabilities::default()); + let init = Mutex::new(None); + let event = make_event_with_tags(&[&["support_open_stream"]]); + NostrClientTransport::learn_server_discovery(&caps, &init, &event); + assert!(caps.lock().unwrap().supports_open_stream); + } + #[test] fn client_config_oversized_builders() { let cfg = NostrClientTransportConfig::default().with_oversized_enabled(true); diff --git a/src/transport/open_stream/constants.rs b/src/transport/open_stream/constants.rs index 076cca5..e7d4047 100644 --- a/src/transport/open_stream/constants.rs +++ b/src/transport/open_stream/constants.rs @@ -24,3 +24,9 @@ pub const DEFAULT_OPEN_STREAM_PROBE_TIMEOUT_MS: u64 = 20_000; /// Default grace period after a `close` with unresolved gaps before aborting (milliseconds). pub const DEFAULT_OPEN_STREAM_CLOSE_GRACE_PERIOD_MS: u64 = 5_000; + +/// Maximum byte length of an inbound `ping` nonce a reader will echo in its +/// `pong`. CEP-41 makes a local nonce-size cap a SHOULD; bounding it here keeps +/// a peer from inducing an arbitrarily large reflected `pong`. Nonces this +/// engine *mints* are `{token}:{n}` and stay well under the cap. +pub const MAX_OPEN_STREAM_PING_NONCE_BYTES: usize = 64; diff --git a/src/transport/open_stream/mod.rs b/src/transport/open_stream/mod.rs index 64d7197..7ef2ed6 100644 --- a/src/transport/open_stream/mod.rs +++ b/src/transport/open_stream/mod.rs @@ -31,9 +31,14 @@ pub use constants::*; pub use errors::OpenStreamError; pub use frame::{open_stream_frame_from_notification, OpenStreamFrame}; pub use receiver::OpenStreamReceiver; -pub use registry::{OpenStreamRegistry, OpenStreamRegistryPolicy}; -pub use session::{FrameOutcome, KeepaliveAction, OpenStreamSession, PublishFrame}; -pub use writer::OpenStreamWriter; +pub use registry::{ + OpenStreamRegistry, OpenStreamRegistryPolicy, OpenStreamSessionInit, RegistryAbortHook, + RegistryCloseHook, +}; +pub use session::{ + FrameOutcome, KeepaliveAction, OpenStreamSession, OpenStreamSessionOptions, PublishFrame, +}; +pub use writer::{OnAbortHook, OnCloseHook, OpenStreamWriter, OpenStreamWriterOptions}; /// CEP-41 open-stream configuration shared by both transports. /// @@ -43,9 +48,11 @@ pub use writer::OpenStreamWriter; /// and [`NostrClientTransportConfig`](crate::transport::NostrClientTransportConfig) /// via their `with_open_stream` builders. /// -/// **Disabled by default** (opt-in): the capability is neither advertised nor -/// activated until a future default flip, so the field is inert data the event -/// loops do not consult yet. +/// **Disabled by default** (opt-in): open-stream is neither advertised nor +/// activated until enabled (matching the TS SDK's default). Opt in with +/// [`OpenStreamConfig::enabled`] or `with_enabled(true)`. Once on, it is safe for +/// non-CEP-41 peers — the server activates only for advertising clients, and a +/// writer is injected only when a request carries a `progressToken`. #[derive(Debug, Clone)] #[non_exhaustive] pub struct OpenStreamConfig { @@ -75,6 +82,8 @@ pub struct OpenStreamConfig { impl Default for OpenStreamConfig { fn default() -> Self { Self { + // Open-stream is opt-in (disabled by default), matching the TS SDK. + // Enable it with `OpenStreamConfig::enabled()` / `with_enabled(true)`. enabled: false, max_concurrent_streams: constants::DEFAULT_MAX_CONCURRENT_OPEN_STREAMS, max_buffered_chunks_per_stream: constants::DEFAULT_MAX_BUFFERED_CHUNKS_PER_STREAM, @@ -167,7 +176,11 @@ mod config_tests { #[test] fn default_is_disabled_with_ts_parity_knobs() { let config = OpenStreamConfig::default(); + // Open-stream is opt-in (disabled by default), matching the TS SDK. assert!(!config.enabled); + // Opting in is one call. + assert!(OpenStreamConfig::default().with_enabled(true).enabled); + assert!(OpenStreamConfig::enabled().enabled); assert_eq!(config.max_concurrent_streams, 64); assert_eq!(config.max_buffered_chunks_per_stream, 64); assert_eq!(config.max_buffered_bytes_per_stream, 512 * 1024); diff --git a/src/transport/open_stream/receiver.rs b/src/transport/open_stream/receiver.rs index 709e4f1..454d846 100644 --- a/src/transport/open_stream/receiver.rs +++ b/src/transport/open_stream/receiver.rs @@ -143,4 +143,39 @@ mod tests { receiver.process_frame(&close).await.unwrap(); assert_eq!(receiver.active_stream_count(), 0); } + + #[tokio::test] + async fn non_open_stream_progress_notification_is_not_intercepted() { + let mut receiver = OpenStreamReceiver::new(); + + // A CEP-22 oversized-transfer frame and a plain progress notification are + // both NOT open-stream frames — the two receivers are type-disjoint. + let oversized = JsonRpcNotification { + jsonrpc: "2.0".to_string(), + method: "notifications/progress".to_string(), + params: Some(json!({ + "progressToken": "tok", + "progress": 1, + "cvm": { "type": "oversized-transfer", "frameType": "end" } + })), + }; + let plain = JsonRpcNotification { + jsonrpc: "2.0".to_string(), + method: "notifications/progress".to_string(), + params: Some(json!({ "progressToken": "tok", "progress": 1 })), + }; + + for notification in [&oversized, &plain] { + assert!(!OpenStreamReceiver::is_open_stream_frame(notification)); + // The dispatcher only feeds the registry when the predicate is true. + // Mirror that gate: a non-open-stream frame must never be processed. + if OpenStreamReceiver::is_open_stream_frame(notification) { + receiver.process_frame(notification).await.unwrap(); + } + } + + // Neither frame reached the registry — no session was created/tracked. + assert_eq!(receiver.active_stream_count(), 0); + assert!(receiver.get_session("tok").is_none()); + } } diff --git a/src/transport/open_stream/registry.rs b/src/transport/open_stream/registry.rs index 609591b..4515bb6 100644 --- a/src/transport/open_stream/registry.rs +++ b/src/transport/open_stream/registry.rs @@ -23,7 +23,9 @@ use super::constants::{ }; use super::errors::OpenStreamError; use super::frame::OpenStreamFrame; -use super::session::{FrameOutcome, OpenStreamSession, OpenStreamSessionOptions, PublishFrame}; +use super::session::{ + FrameOutcome, KeepaliveAction, OpenStreamSession, OpenStreamSessionOptions, PublishFrame, +}; const LOG_TARGET: &str = "contextvm_sdk::transport::open_stream"; @@ -240,12 +242,42 @@ impl OpenStreamRegistry { Ok(other) => Ok(other), Err(error) => { session.fail(error.clone()); - self.run_abort(&progress_token, None).await; + // Forward the failure reason to the `on_abort` hook (TS passes + // `onAbort(error.message)`); the inbound-abort path already forwards + // the peer's reason — this covers the local processing-failure + // branch so deferral/metrics hooks see why. + self.run_abort(&progress_token, Some(error.to_string())) + .await; Err(error) } } } + /// Drive the pure keepalive [`tick`](OpenStreamSession::tick) for every active + /// session, returning the `(progress_token, action)` pairs that need an + /// outbound send (`SendPing`) or signal a local abort (`Abort`). Sessions that + /// aborted on this tick are removed (their slot is freed); the transport sweep + /// performs the actual publish for each returned action. + pub fn tick_all(&mut self, now: Instant) -> Vec<(String, KeepaliveAction)> { + let mut actions = Vec::new(); + let mut aborted = Vec::new(); + for (token, entry) in self.sessions.iter() { + match entry.session.tick(now) { + KeepaliveAction::None => {} + action => { + if matches!(action, KeepaliveAction::Abort(_)) { + aborted.push(token.clone()); + } + actions.push((token.clone(), action)); + } + } + } + for token in aborted { + self.sessions.remove(&token); + } + actions + } + /// Dispose every session gracefully and drop them (runs no hooks). pub fn clear(&mut self) { for (_, entry) in self.sessions.drain() { @@ -253,6 +285,35 @@ impl OpenStreamRegistry { } } + /// Consumer-cancel cleanup: finalize the session locally, run its `on_abort` + /// hook, and **remove the entry** so the concurrency slot is freed. + /// + /// The session's `process_frame`/`tick` paths only remove an entry on an + /// *inbound* terminal frame; a consumer that cancels its own read + /// ([`OpenStreamSession::abort`]) finalizes + publishes an `abort` frame but + /// leaves the registry entry counting against `max_concurrent_streams`. The + /// transport calls this to close that gap when wiring cancel. The outbound + /// `abort` *frame* is published by the caller via + /// [`OpenStreamSession::abort`]; here `fail` only guarantees the local stream + /// is terminal (idempotent if already finalized). + pub async fn consumer_abort(&mut self, progress_token: &str, reason: Option) { + if let Some(entry) = self.sessions.remove(progress_token) { + entry + .session + .fail(OpenStreamError::abort(progress_token, reason.clone())); + if let Some(hook) = entry.on_abort { + if let Err(error) = hook(reason).await { + tracing::debug!( + target: LOG_TARGET, + token = %progress_token, + %error, + "open-stream on_abort hook errored during consumer abort" + ); + } + } + } + } + /// Remove a closed session and run its `on_close` hook (errors swallowed). async fn run_close(&mut self, progress_token: &str) { if let Some(entry) = self.sessions.remove(progress_token) { @@ -656,6 +717,54 @@ mod tests { assert_eq!(registry.size(), 0); } + #[tokio::test] + async fn consumer_abort_frees_slot_and_runs_hook() { + // A consumer cancel must remove the registry entry (freeing the + // concurrency slot) and run the `on_abort` hook, even though no inbound + // terminal frame ever arrives. + let mut registry = OpenStreamRegistry::with_policy(small_policy(1)); + let fired = std::sync::Arc::new(std::sync::Mutex::new(Vec::::new())); + let f = fired.clone(); + let on_abort: RegistryAbortHook = Box::new(move |reason| { + let f = f.clone(); + Box::pin(async move { + f.lock().unwrap().push(reason.unwrap_or_default()); + Ok(()) + }) + }); + let mut session = registry + .create_session_with( + "token-consumer-abort", + OpenStreamSessionInit { + on_abort: Some(on_abort), + ..Default::default() + }, + ) + .unwrap(); + // The slot is occupied: a second admission is rejected by the cap. + assert!(matches!( + registry.create_session("token-other").unwrap_err(), + OpenStreamError::Policy(_) + )); + + registry + .consumer_abort("token-consumer-abort", Some("user cancelled".to_string())) + .await; + + assert_eq!(registry.size(), 0); + assert!(registry.get_session("token-consumer-abort").is_none()); + assert_eq!(*fired.lock().unwrap(), vec!["user cancelled".to_string()]); + // The local stream surfaces the abort error on its next poll. + match session.next().await { + Some(Err(OpenStreamError::Abort { reason, .. })) => { + assert_eq!(reason.as_deref(), Some("user cancelled")); + } + other => panic!("expected abort error on the stream, got {other:?}"), + } + // Slot reclaimed: a fresh admission now succeeds. + registry.create_session("token-other").unwrap(); + } + #[tokio::test] async fn removes_session_even_when_on_abort_hook_errors() { let mut registry = OpenStreamRegistry::with_policy(small_policy(1)); @@ -701,4 +810,101 @@ mod tests { other => panic!("expected abort error on the stream, got {other:?}"), } } + + #[tokio::test] + async fn fires_only_on_close_on_graceful_close_and_only_on_abort_on_abort() { + use std::sync::{Arc as StdArc, Mutex as StdMutex}; + + // Build a (on_close, on_abort) pair that each record which hook fired, + // tagged by the session, into one shared log. + fn recording_hooks( + events: StdArc>>, + token: &'static str, + ) -> (RegistryCloseHook, RegistryAbortHook) { + let close_events = events.clone(); + let on_close: RegistryCloseHook = Box::new(move || { + Box::pin(async move { + close_events.lock().unwrap().push(format!("close:{token}")); + Ok(()) + }) + }); + let on_abort: RegistryAbortHook = Box::new(move |_reason| { + Box::pin(async move { + events.lock().unwrap().push(format!("abort:{token}")); + Ok(()) + }) + }); + (on_close, on_abort) + } + + let events = StdArc::new(StdMutex::new(Vec::::new())); + let mut registry = OpenStreamRegistry::with_policy(small_policy(4)); + + // (1) A graceful close fires only `on_close`. + let (on_close, on_abort) = recording_hooks(events.clone(), "graceful"); + registry + .create_session_with( + "tok-graceful", + OpenStreamSessionInit { + on_close: Some(on_close), + on_abort: Some(on_abort), + ..Default::default() + }, + ) + .unwrap(); + registry + .process_frame(now(), ¬if("tok-graceful", 1, start())) + .await + .unwrap(); + registry + .process_frame( + now(), + ¬if( + "tok-graceful", + 2, + OpenStreamFrame::Close { + last_chunk_index: None, + }, + ), + ) + .await + .unwrap(); + + // (2) An abort fires only `on_abort`. + let (on_close, on_abort) = recording_hooks(events.clone(), "aborted"); + registry + .create_session_with( + "tok-aborted", + OpenStreamSessionInit { + on_close: Some(on_close), + on_abort: Some(on_abort), + ..Default::default() + }, + ) + .unwrap(); + registry + .process_frame(now(), ¬if("tok-aborted", 1, start())) + .await + .unwrap(); + registry + .process_frame( + now(), + ¬if( + "tok-aborted", + 2, + OpenStreamFrame::Abort { + reason: Some("bye".to_string()), + }, + ), + ) + .await + .unwrap(); + + // Exactly one hook fired per session, the right one each time. + let fired = events.lock().unwrap().clone(); + assert_eq!( + fired, + vec!["close:graceful".to_string(), "abort:aborted".to_string()] + ); + } } diff --git a/src/transport/open_stream/session.rs b/src/transport/open_stream/session.rs index 581240c..50d3762 100644 --- a/src/transport/open_stream/session.rs +++ b/src/transport/open_stream/session.rs @@ -27,6 +27,7 @@ use nostr_sdk::prelude::EventId; use crate::core::types::JsonRpcNotification; +use super::constants::MAX_OPEN_STREAM_PING_NONCE_BYTES; use super::errors::OpenStreamError; use super::frame::OpenStreamFrame; @@ -430,6 +431,12 @@ impl OpenStreamSession { OpenStreamFrame::Accept => Ok(FrameOutcome::None), OpenStreamFrame::Ping { nonce } => { s.assert_started()?; + if nonce.len() > MAX_OPEN_STREAM_PING_NONCE_BYTES { + return Err(OpenStreamError::Sequence(format!( + "ping nonce exceeds {MAX_OPEN_STREAM_PING_NONCE_BYTES}-byte cap (got {} bytes)", + nonce.len() + ))); + } Ok(FrameOutcome::SendPong(nonce)) } OpenStreamFrame::Pong { nonce } => { @@ -1050,6 +1057,48 @@ mod tests { ); } + #[tokio::test] + async fn rejects_ping_nonce_exceeding_64_bytes() { + let now = Instant::now(); + let s = make_session("token-ping-cap", 8, 1024); + s.process_frame(now, 1, start()).unwrap(); + + // A 64-byte nonce is at the cap and is echoed. + let at_cap = "x".repeat(MAX_OPEN_STREAM_PING_NONCE_BYTES); + assert_eq!(at_cap.len(), 64); + assert_eq!( + s.process_frame( + now, + 2, + OpenStreamFrame::Ping { + nonce: at_cap.clone() + } + ) + .unwrap(), + FrameOutcome::SendPong(at_cap) + ); + + // A 65-byte nonce exceeds the cap and is rejected (Sequence), without + // finalizing the stream. + let over_cap = "x".repeat(MAX_OPEN_STREAM_PING_NONCE_BYTES + 1); + assert!(matches!( + s.process_frame(now, 3, OpenStreamFrame::Ping { nonce: over_cap }), + Err(OpenStreamError::Sequence(_)) + )); + // The stream stays active: a normal ping still earns a pong. + assert_eq!( + s.process_frame( + now, + 4, + OpenStreamFrame::Ping { + nonce: "ok".to_string() + } + ) + .unwrap(), + FrameOutcome::SendPong("ok".to_string()) + ); + } + #[tokio::test] async fn idle_sends_ping_then_probe_timeout_aborts() { let t0 = Instant::now(); diff --git a/src/transport/server/mod.rs b/src/transport/server/mod.rs index 9c50b7f..4841be2 100644 --- a/src/transport/server/mod.rs +++ b/src/transport/server/mod.rs @@ -10,12 +10,14 @@ pub mod session_store; pub use correlation_store::{RouteEntry, ServerEventRouteStore}; pub use session_store::{SessionSnapshot, SessionStore}; +use tokio::sync::Mutex as AsyncMutex; use tokio::sync::RwLock; use std::collections::HashMap; use std::num::NonZeroUsize; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; -use std::time::Duration; +use std::time::{Duration, Instant}; use lru::LruCache; use nostr_sdk::prelude::*; @@ -29,7 +31,11 @@ use crate::encryption; use crate::relay::{RelayPool, RelayPoolTrait}; use crate::transport::base::BaseTransport; use crate::transport::discovery_tags::learn_peer_capabilities; -use crate::transport::open_stream::OpenStreamConfig; +use crate::transport::open_stream::{ + open_stream_frame_from_notification, FrameOutcome, KeepaliveAction, OnAbortHook, OnCloseHook, + OpenStreamConfig, OpenStreamFrame, OpenStreamReceiver, OpenStreamRegistryPolicy, + OpenStreamWriter, OpenStreamWriterOptions, PublishFrame, +}; use crate::transport::oversized_transfer::{ build_oversized_frames, progress_token_string, resolve_safe_chunk_size, OversizedFrame, OversizedSenderOptions, OversizedTransferConfig, OversizedTransferReceiver, TransferPolicy, @@ -51,6 +57,27 @@ fn oversized_support_tags(config: &NostrServerTransportConfig) -> Vec { } } +/// CEP-41: the `support_open_stream` capability tag to advertise, or empty when +/// open-stream is disabled. Mirrors [`oversized_support_tags`]. +fn open_stream_support_tags(config: &OpenStreamConfig) -> Vec { + if config.enabled { + vec![Tag::custom( + TagKind::Custom(tags::SUPPORT_OPEN_STREAM.into()), + Vec::::new(), + )] + } else { + Vec::new() + } +} + +/// CEP-22 + CEP-41: the internal capability tags advertised on announcements and +/// replayed on the first response to each client. +fn internal_common_capability_tags(config: &NostrServerTransportConfig) -> Vec { + let mut tags = oversized_support_tags(config); + tags.extend(open_stream_support_tags(&config.open_stream)); + tags +} + /// CEP-22: build the empty per-peer reassembly store, bounded to `max_sessions` /// peers (one [`OversizedTransferReceiver`] per client pubkey, inserted lazily by /// the inbound event loop). @@ -62,6 +89,135 @@ fn new_oversized_receiver_store( ))) } +/// CEP-41: build the empty per-peer open-stream reader store, bounded to +/// `max_sessions` peers. Mirrors [`new_oversized_receiver_store`]; one +/// [`OpenStreamReceiver`] per client pubkey is inserted lazily for inbound +/// (client→server) streams. +/// +/// Uses a [`tokio::sync::Mutex`] rather than an `RwLock`: the registry's +/// `FnOnce` lifecycle hooks are `Send` but not `Sync`, so a shared-read lock +/// could not be made `Sync`; the store is write-only anyway (`process_frame` +/// needs `&mut`), so exclusive access loses nothing. +fn new_open_stream_receiver_store( + max_sessions: usize, +) -> Arc>> { + Arc::new(AsyncMutex::new(LruCache::new( + NonZeroUsize::new(max_sessions).unwrap_or(NonZeroUsize::new(1).unwrap()), + ))) +} + +/// CEP-41: response-routing fields captured at writer creation, while the +/// request's event route is fresh. +/// +/// The deferred final response is delivered from this snapshot (via +/// [`NostrServerTransport::send_open_stream_deferred_response`]) rather than from +/// `event_routes`, so a stream that outlives `request_timeout` — after which the +/// route is swept — still delivers its response. `mirrored_wrap_kind` mirrors +/// the inbound gift-wrap kind for CEP-19, exactly as `send_response` does. +#[derive(Clone)] +struct RouteSnapshot { + client_pubkey: PublicKey, + original_request_id: serde_json::Value, + is_encrypted: bool, + mirrored_wrap_kind: Option, +} + +/// CEP-41: the per-stream coordination slot for a server→client writer, keyed by +/// request `event_id` in [`ServerOpenStreamState::slots`]. +/// +/// A single mutex over the whole map serializes the two writers of the deferred +/// final response — `send_response` (the worker task) and the writer's +/// close/abort hook (the tool task) — against the [`terminated`](Self::terminated) +/// flag, so the response is never both stashed *and* dropped under a race. +struct OpenStreamSlot { + writer: OpenStreamWriter, + snapshot: RouteSnapshot, + /// The final response, stashed by `send_response` when it arrives before the + /// stream closes (ordering A). + pending_response: Option, + /// Set by the writer's close/abort hook once the stream is terminal. When + /// `send_response` arrives after this (ordering B), it delivers immediately. + terminated: bool, +} + +/// CEP-41: the open-stream runtime state shared between the server transport and +/// its spawned event loop. Bundled so the event-loop signature stays manageable. +#[derive(Clone)] +struct ServerOpenStreamState { + /// Master gate (`config.open_stream.enabled`). + enabled: bool, + /// Reader admission/buffering/keepalive policy projected from config. + policy: OpenStreamRegistryPolicy, + /// Per-peer reader engines for inbound (client→server) streams. + receiver: Arc>>, + /// Per-stream writer + deferred-response slots, keyed by `event_id`. + slots: Arc>>, + /// `progress_token → event_id`, so inbound control frames and the keepalive + /// sweep resolve the writer/route without consulting the route store. + token_to_event: Arc>>, + /// Monotonic `progress` source for server-*as-reader* control frames + /// (`accept`/`pong`/`ping` on inbound client→server streams, where no writer + /// owns the counter). Per-token monotonicity holds even though it is shared. + control_progress: Arc, +} + +impl ServerOpenStreamState { + fn new(config: &OpenStreamConfig, max_sessions: usize) -> Self { + Self { + enabled: config.enabled, + policy: config.into(), + receiver: new_open_stream_receiver_store(max_sessions), + slots: Arc::new(Mutex::new(HashMap::new())), + token_to_event: Arc::new(Mutex::new(HashMap::new())), + control_progress: Arc::new(AtomicU64::new(0)), + } + } + + /// Next monotonic control-frame `progress` (1, 2, 3, …) for the reader path. + fn next_control_progress(&self) -> u64 { + self.control_progress.fetch_add(1, Ordering::SeqCst) + 1 + } + + /// Lock-poison-tolerant access to the slots map. + fn lock_slots(&self) -> std::sync::MutexGuard<'_, HashMap> { + match self.slots.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + } + } + + fn lock_token_index(&self) -> std::sync::MutexGuard<'_, HashMap> { + match self.token_to_event.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + } + } + + /// Clone the active writer for `event_id`, if any (for inbound ping/abort + /// routing and the worker's extensions injection). + fn writer_for(&self, event_id: &str) -> Option { + self.lock_slots().get(event_id).map(|s| s.writer.clone()) + } + + /// Resolve `progress_token → event_id`. + fn event_id_for_token(&self, token: &str) -> Option { + self.lock_token_index().get(token).cloned() + } +} + +/// CEP-41: the outcome of the response-deferral decision in `send_response`. +enum OpenStreamDeferral { + /// The response was stashed; the writer's close/abort hook will flush it. + Deferred, + /// The stream is already terminal — deliver this response now from the snapshot. + SendNow { + snapshot: RouteSnapshot, + response: JsonRpcMessage, + }, + /// No active stream for this event — send the response through the normal path. + Passthrough(JsonRpcMessage), +} + /// Configuration for the server transport. #[derive(Debug, Clone)] #[non_exhaustive] @@ -109,9 +265,9 @@ pub struct NostrServerTransportConfig { pub oversized_transfer: OversizedTransferConfig, /// CEP-41 open-stream configuration. Disabled by default (opt-in). /// - /// **Data only in PR1** — the event loop does not consult it yet; activation - /// (capability advertisement, learning, response deferral, the keepalive - /// sweep) lands in PR2. + /// When enabled, drives capability advertisement/learning, server→client + /// writers, response deferral, and the keepalive sweep. Opt in with + /// `OpenStreamConfig::enabled()` / `with_enabled(true)`. pub open_stream: OpenStreamConfig, } @@ -162,6 +318,10 @@ pub struct NostrServerTransport { /// enforces the configured per-peer admission policy. Populated by the inbound /// event loop; cleared on [`close`](Self::close). oversized_receiver: Arc>>, + /// CEP-41: open-stream runtime state (writers, deferred responses, per-peer + /// reader engines, `progress_token → event_id` index). Inert when + /// `open_stream.enabled` is `false`. + open_stream: ServerOpenStreamState, /// Channel for incoming MCP messages (consumed by the MCP server). message_tx: Option>, message_rx: Option>, @@ -257,9 +417,8 @@ impl NostrServerTransportConfig { self.oversized_transfer.enabled = enabled; self } - /// Set the full CEP-41 open-stream configuration. - /// - /// Data only in PR1: the event loop does not read this until PR2. + /// Set the full CEP-41 open-stream configuration (disabled by default; opt in + /// with `OpenStreamConfig::enabled()`). pub fn with_open_stream(mut self, config: OpenStreamConfig) -> Self { self.open_stream = config; self @@ -320,8 +479,9 @@ impl NostrServerTransport { config.publish_relay_list, config.profile_metadata.clone(), ); - // CEP-22: advertise oversized-transfer support in announcements + first responses. - announcement_manager.set_internal_common_tags(oversized_support_tags(&config)); + // CEP-22 + CEP-41: advertise oversized-transfer and open-stream support in + // announcements + first responses (each gated by its own config flag). + announcement_manager.set_internal_common_tags(internal_common_capability_tags(&config)); Ok(Self { announcement_manager, base: BaseTransport { @@ -331,6 +491,7 @@ impl NostrServerTransport { }, sessions: SessionStore::with_capacity(config.max_sessions), oversized_receiver: new_oversized_receiver_store(config.max_sessions), + open_stream: ServerOpenStreamState::new(&config.open_stream, config.max_sessions), config, event_routes: ServerEventRouteStore::new(), request_wrap_kinds: Arc::new(RwLock::new(HashMap::new())), @@ -371,8 +532,9 @@ impl NostrServerTransport { config.publish_relay_list, config.profile_metadata.clone(), ); - // CEP-22: advertise oversized-transfer support in announcements + first responses. - announcement_manager.set_internal_common_tags(oversized_support_tags(&config)); + // CEP-22 + CEP-41: advertise oversized-transfer and open-stream support in + // announcements + first responses (each gated by its own config flag). + announcement_manager.set_internal_common_tags(internal_common_capability_tags(&config)); Ok(Self { announcement_manager, base: BaseTransport { @@ -382,6 +544,7 @@ impl NostrServerTransport { }, sessions: SessionStore::with_capacity(config.max_sessions), oversized_receiver: new_oversized_receiver_store(config.max_sessions), + open_stream: ServerOpenStreamState::new(&config.open_stream, config.max_sessions), config, request_wrap_kinds: Arc::new(RwLock::new(HashMap::new())), event_routes: ServerEventRouteStore::new(), @@ -454,6 +617,7 @@ impl NostrServerTransport { let transfer_policy: TransferPolicy = (&self.config.oversized_transfer).into(); let common_tags_snapshot = self.announcement_manager.common_tags_snapshot(); let seen_gift_wrap_ids = self.seen_gift_wrap_ids.clone(); + let open_stream = self.open_stream.clone(); let event_loop_token = self.cancellation_token.child_token(); let event_loop_handle = tokio::spawn(async move { @@ -473,6 +637,7 @@ impl NostrServerTransport { transfer_policy, common_tags_snapshot, seen_gift_wrap_ids, + open_stream, event_loop_token, ) .await; @@ -561,11 +726,40 @@ impl NostrServerTransport { self.sessions.clear().await; self.event_routes.clear().await; self.oversized_receiver.write().await.clear(); + // CEP-41: dispose every inbound reader session and drop all writer / + // deferred-response state. + { + let mut receivers = self.open_stream.receiver.lock().await; + for (_, receiver) in receivers.iter_mut() { + receiver.clear(); + } + receivers.clear(); + } + self.open_stream.lock_slots().clear(); + self.open_stream.lock_token_index().clear(); Ok(()) } /// Send a response back to the client that sent the original request. pub async fn send_response(&self, event_id: &str, mut response: JsonRpcMessage) -> Result<()> { + // CEP-41: response deferral. Decide BEFORE consuming the route — for a + // started stream the final response rides the captured snapshot, not the + // (possibly-swept) event route. + if self.open_stream.enabled { + match self.try_defer_open_stream_response(event_id, response) { + // Stashed (ordering A) — the close/abort hook flushes it later. + OpenStreamDeferral::Deferred => return Ok(()), + // Stream already closed (ordering B) — deliver from the snapshot now. + OpenStreamDeferral::SendNow { snapshot, response } => { + return self + .send_open_stream_deferred_response(event_id, &snapshot, response) + .await; + } + // No active stream for this event — fall through to the normal path. + OpenStreamDeferral::Passthrough(returned) => response = returned, + } + } + // Consume the route up-front so only one concurrent responder can proceed // for a given event_id. let route = self.event_routes.pop(event_id).await.ok_or_else(|| { @@ -786,6 +980,169 @@ impl NostrServerTransport { Ok(()) } + /// CEP-41: clone the active writer for `event_id` so the rmcp worker can inject + /// it into the request's `extensions` typemap before dispatch. Returns + /// `None` when open-stream is disabled or no writer exists for this request. + #[cfg_attr(not(feature = "rmcp"), allow(dead_code))] + pub(crate) fn get_open_stream_writer(&self, event_id: &str) -> Option { + if !self.open_stream.enabled { + return None; + } + self.open_stream.writer_for(event_id) + } + + /// CEP-41: decide how `send_response` should handle the final response for a + /// (possibly) streaming request. Run under the slots lock so the stash/flush + /// decision is consistent against the close/abort hook's `terminated` flag. + fn try_defer_open_stream_response( + &self, + event_id: &str, + response: JsonRpcMessage, + ) -> OpenStreamDeferral { + let mut slots = self.open_stream.lock_slots(); + let Some(slot) = slots.get_mut(event_id) else { + return OpenStreamDeferral::Passthrough(response); + }; + + if !slot.writer.has_started() { + // The request carried a progressToken but the tool never streamed. + // Drop the writer and send normally (progress-token-conflict guard — + // a deferred-but-never-closed stream would otherwise hang the response). + let token = slot.writer.progress_token().to_string(); + slots.remove(event_id); + drop(slots); + self.open_stream.lock_token_index().remove(&token); + return OpenStreamDeferral::Passthrough(response); + } + + if slot.terminated { + // Ordering B (the common case): the stream already closed/aborted — + // deliver now from the captured snapshot (the route may be swept). + let snapshot = slot.snapshot.clone(); + let token = slot.writer.progress_token().to_string(); + slots.remove(event_id); + drop(slots); + self.open_stream.lock_token_index().remove(&token); + OpenStreamDeferral::SendNow { snapshot, response } + } else { + // Ordering A: the stream is still open — hold the response; the + // close/abort hook flushes it from the snapshot when the stream ends. + slot.pending_response = Some(response); + OpenStreamDeferral::Deferred + } + } + + /// CEP-41: deliver a deferred final response from a captured [`RouteSnapshot`], + /// never consulting `event_routes` (route-lifetime-independent; the route may already be gone). + async fn send_open_stream_deferred_response( + &self, + event_id: &str, + snapshot: &RouteSnapshot, + response: JsonRpcMessage, + ) -> Result<()> { + Self::publish_open_stream_deferred_response( + &self.base, + self.config.gift_wrap_mode, + event_id, + snapshot, + response, + ) + .await + } + + /// CEP-41 (static): the actual deferred-response publish, callable from both the + /// `&self` path and the writer's close/abort hook (which has no `self`). + async fn publish_open_stream_deferred_response( + base: &BaseTransport, + gift_wrap_mode: GiftWrapMode, + event_id: &str, + snapshot: &RouteSnapshot, + mut response: JsonRpcMessage, + ) -> Result<()> { + // Restore the original request id (the normal path restores it from the + // popped route; here it comes from the snapshot). + match &mut response { + JsonRpcMessage::Response(r) => r.id = snapshot.original_request_id.clone(), + JsonRpcMessage::ErrorResponse(r) => r.id = snapshot.original_request_id.clone(), + _ => {} + } + let event_id_parsed = EventId::from_hex(event_id).map_err(|error| { + Error::Other(format!("Invalid event id for deferred response: {error}")) + })?; + // Correlate via the `e` tag exactly like a normal response so the client's + // correlation gate accepts it. + let tags = BaseTransport::create_response_tags(&snapshot.client_pubkey, &event_id_parsed); + let gift_wrap_kind = Self::select_outbound_gift_wrap_kind( + gift_wrap_mode, + snapshot.is_encrypted, + snapshot.mirrored_wrap_kind, + ); + base.send_mcp_message( + &response, + &snapshot.client_pubkey, + CTXVM_MESSAGES_KIND, + tags, + Some(snapshot.is_encrypted), + gift_wrap_kind, + ) + .await + .map(|_| ()) + } + + /// CEP-41 (static): the writer's close/abort hook. Marks the stream terminal + /// and, when the response already arrived (ordering A), flushes it from the + /// snapshot. Ordering B leaves the terminal slot for `send_response`. + async fn flush_open_stream_response( + state: &ServerOpenStreamState, + base: &BaseTransport, + gift_wrap_mode: GiftWrapMode, + event_id: &str, + ) { + let ready = { + let mut slots = state.lock_slots(); + match slots.get_mut(event_id) { + Some(slot) => { + slot.terminated = true; + slot.pending_response.take().map(|response| { + ( + slot.snapshot.clone(), + slot.writer.progress_token().to_string(), + response, + ) + }) + } + None => None, + } + }; + + let Some((snapshot, token, response)) = ready else { + // Ordering B: the response has not arrived yet. Leave the terminal slot + // in place; `send_response` will deliver it from the snapshot. + return; + }; + + // Ordering A: the response was stashed before the stream closed — remove + // the slot and deliver it now. + state.lock_slots().remove(event_id); + state.lock_token_index().remove(&token); + if let Err(error) = Self::publish_open_stream_deferred_response( + base, + gift_wrap_mode, + event_id, + &snapshot, + response, + ) + .await + { + tracing::error!( + target: LOG_TARGET, + error = %error, + event_id = %event_id, + "Failed to flush deferred open-stream response" + ); + } + } + /// CEP-22: publish a response as an ordered oversized-transfer frame sequence. /// /// Splits the post-restoration `serialized` string into `start → chunks… → @@ -1146,6 +1503,7 @@ impl NostrServerTransport { transfer_policy: TransferPolicy, common_tags_snapshot: announcement_manager::CommonTagsSnapshot, seen_gift_wrap_ids: Arc>>, + open_stream: ServerOpenStreamState, cancel: CancellationToken, ) { let mut notifications = relay_pool.notifications(); @@ -1159,6 +1517,19 @@ impl NostrServerTransport { let mut sweep_timer = tokio::time::interval_at(tokio::time::Instant::now() + sweep_interval, sweep_interval); + // CEP-41: keepalive sweep for server-as-reader sessions. Cadence = half the + // idle timeout, clamped to [1s, 30s] (the idle→probe→abort machine only + // needs sub-idle granularity). Armed only when open-stream is enabled. + let open_stream_sweep_enabled = + open_stream.enabled && open_stream.policy.idle_timeout_ms != 0; + let open_stream_sweep_interval = + (Duration::from_millis(open_stream.policy.idle_timeout_ms) / 2) + .clamp(Duration::from_secs(1), Duration::from_secs(30)); + let mut open_stream_sweep_timer = tokio::time::interval_at( + tokio::time::Instant::now() + open_stream_sweep_interval, + open_stream_sweep_interval, + ); + loop { let notification = tokio::select! { _ = cancel.cancelled() => { @@ -1172,6 +1543,16 @@ impl NostrServerTransport { Self::sweep_oversized_receivers(&oversized_receiver).await; continue; } + _ = open_stream_sweep_timer.tick(), if open_stream_sweep_enabled => { + Self::sweep_open_stream_sessions( + &open_stream, + &relay_pool, + encryption_mode, + gift_wrap_mode, + ) + .await; + continue; + } result = notifications.recv() => { match result { Ok(n) => n, @@ -1445,6 +1826,12 @@ impl NostrServerTransport { // CEP-22: only learn oversized support if it is enabled on this server. session.supports_oversized_transfer |= oversized_enabled && discovered.supports_oversized_transfer; + // CEP-41: learn the client's open-stream support (gated on enabled). + // Captured AFTER the OR-learn so the very `start` frame that carries + // the support tag still elicits an `accept`. + session.supports_open_stream |= + open_stream.enabled && discovered.supports_open_stream; + let client_supports_open_stream = session.supports_open_stream; // CEP-22: intercept oversized-transfer frames before request // correlation/dispatch. A disabled server forwards raw progress @@ -1469,6 +1856,34 @@ impl NostrServerTransport { &event_routes, &request_wrap_kinds, &tx, + &open_stream, + ) + .await; + continue; + } + } + } + + // CEP-41: intercept open-stream frames beside the oversized branch. + // Type-disjoint from oversized (`is_open_stream_frame` vs + // `is_oversized_frame` claim distinct `cvm.type`s), so order is + // irrelevant. A disabled server forwards the raw notification. + if open_stream.enabled { + if let JsonRpcMessage::Notification(ref n) = mcp_msg { + if OpenStreamReceiver::is_open_stream_frame(n) { + drop(sessions_w); + Self::handle_open_stream_frame( + &open_stream, + &relay_pool, + encryption_mode, + gift_wrap_mode, + n, + &sender_pubkey, + &event_id, + is_encrypted, + is_gift_wrap, + outer_kind, + client_supports_open_stream, ) .await; continue; @@ -1517,6 +1932,11 @@ impl NostrServerTransport { ); } + // CEP-41: capture the route fields for the writer's snapshot + // BEFORE they are moved into the route store. + let writer_request_id = original_id.clone(); + let writer_token = progress_token.clone(); + event_routes .register( event_id.clone(), @@ -1525,6 +1945,27 @@ impl NostrServerTransport { progress_token, ) .await; + + // CEP-41: a `tools/call` carrying a progressToken gets a + // server→client writer, captured with a route snapshot (so the + // deferred response survives a route sweep) and injected into + // the tool via the rmcp request extensions. + if open_stream.enabled && req.method == "tools/call" { + if let Some(token) = writer_token { + Self::create_open_stream_writer( + &open_stream, + &relay_pool, + encryption_mode, + gift_wrap_mode, + &event_id, + &sender_pubkey, + &token, + writer_request_id, + is_encrypted, + if is_gift_wrap { Some(outer_kind) } else { None }, + ); + } + } } else { drop(sessions_w); } @@ -1574,6 +2015,7 @@ impl NostrServerTransport { event_routes: &ServerEventRouteStore, request_wrap_kinds: &Arc>>>, tx: &tokio::sync::mpsc::UnboundedSender, + open_stream: &ServerOpenStreamState, ) { // The outer progressToken keys the transfer (needed for accept + route). // String or number — defensive only: every known sender stringifies @@ -1646,6 +2088,20 @@ impl NostrServerTransport { // The `end` frame: reassembled request ready to dispatch. Ok(Some(message)) => { let original_id = message.id().cloned().unwrap_or(serde_json::Value::Null); + // CEP-41: extract the writer info from the reassembled `tools/call` + // before `message` is moved. The oversized reassembly path bypasses + // the regular request path, so the writer must be created HERE too + // (mirrors TS `handleIncomingRequest`, which oversized re-enters). + let writer_token = match &message { + JsonRpcMessage::Request(req) if req.method == "tools/call" => req + .params + .as_ref() + .and_then(|p| p.get("_meta")) + .and_then(|m| m.get("progressToken")) + .and_then(progress_token_string), + _ => None, + }; + let writer_request_id = original_id.clone(); // Mirror the incoming wrap kind for the eventual response (CEP-19). { let mut kinds_w = request_wrap_kinds.write().await; @@ -1662,6 +2118,22 @@ impl NostrServerTransport { token, ) .await; + if open_stream.enabled { + if let Some(progress_token) = writer_token { + Self::create_open_stream_writer( + open_stream, + relay_pool, + encryption_mode, + gift_wrap_mode, + event_id, + sender_pubkey, + &progress_token, + writer_request_id, + is_encrypted, + if is_gift_wrap { Some(outer_kind) } else { None }, + ); + } + } let _ = tx.send(IncomingRequest { message, client_pubkey: sender_pubkey.to_string(), @@ -1746,6 +2218,447 @@ impl NostrServerTransport { } } + /// CEP-41: create a server→client [`OpenStreamWriter`] for a `tools/call` + /// carrying a `progressToken`, capture its [`RouteSnapshot`], register the + /// `progress_token → event_id` index, and store the slot. The writer is later + /// injected into the tool via the rmcp request `extensions`; its + /// close/abort hooks flush the deferred final response from the snapshot. + #[allow(clippy::too_many_arguments)] + fn create_open_stream_writer( + state: &ServerOpenStreamState, + relay_pool: &Arc, + encryption_mode: EncryptionMode, + gift_wrap_mode: GiftWrapMode, + event_id: &str, + client_pubkey_hex: &str, + progress_token: &str, + original_request_id: serde_json::Value, + is_encrypted: bool, + mirrored_wrap_kind: Option, + ) { + let client_pubkey = match PublicKey::from_hex(client_pubkey_hex) { + Ok(pk) => pk, + Err(_) => return, + }; + let event_id_parsed = match EventId::from_hex(event_id) { + Ok(id) => id, + Err(_) => return, + }; + let gift_wrap_kind = + Self::select_outbound_gift_wrap_kind(gift_wrap_mode, is_encrypted, mirrored_wrap_kind); + + // Publish closure: every frame is e-tagged to the request event (so the + // client can keep its pending correlation alive) and mirrors the inbound + // gift-wrap kind (CEP-19). `send_notification`'s one-shot discovery tags + // are not replayed — they already rode the initialize response / stream + // start by the time a tool streams. + let publish_relay_pool = Arc::clone(relay_pool); + let publish_frame: PublishFrame = Arc::new(move |notification: JsonRpcNotification| { + let relay_pool = Arc::clone(&publish_relay_pool); + Box::pin(async move { + let base = BaseTransport { + relay_pool, + encryption_mode, + is_connected: true, + }; + let tags = BaseTransport::create_response_tags(&client_pubkey, &event_id_parsed); + let message = JsonRpcMessage::Notification(notification); + base.send_mcp_message( + &message, + &client_pubkey, + CTXVM_MESSAGES_KIND, + tags, + Some(is_encrypted), + gift_wrap_kind, + ) + .await + }) + }); + + // Terminal hooks flush any deferred final response from the snapshot. + let on_close: OnCloseHook = { + let state = state.clone(); + let relay_pool = Arc::clone(relay_pool); + let event_id = event_id.to_string(); + Arc::new(move || { + let state = state.clone(); + let relay_pool = Arc::clone(&relay_pool); + let event_id = event_id.clone(); + Box::pin(async move { + let base = BaseTransport { + relay_pool, + encryption_mode, + is_connected: true, + }; + Self::flush_open_stream_response(&state, &base, gift_wrap_mode, &event_id) + .await; + }) + }) + }; + let on_abort: OnAbortHook = { + let state = state.clone(); + let relay_pool = Arc::clone(relay_pool); + let event_id = event_id.to_string(); + Arc::new(move |_reason| { + let state = state.clone(); + let relay_pool = Arc::clone(&relay_pool); + let event_id = event_id.clone(); + Box::pin(async move { + let base = BaseTransport { + relay_pool, + encryption_mode, + is_connected: true, + }; + Self::flush_open_stream_response(&state, &base, gift_wrap_mode, &event_id) + .await; + }) + }) + }; + + let writer = OpenStreamWriter::new(OpenStreamWriterOptions { + progress_token: progress_token.to_string(), + publish_frame, + content_type: None, + on_close: Some(on_close), + on_abort: Some(on_abort), + }); + let snapshot = RouteSnapshot { + client_pubkey, + original_request_id, + is_encrypted, + mirrored_wrap_kind, + }; + state.lock_slots().insert( + event_id.to_string(), + OpenStreamSlot { + writer, + snapshot, + pending_response: None, + terminated: false, + }, + ); + state + .lock_token_index() + .insert(progress_token.to_string(), event_id.to_string()); + } + + /// CEP-41 inbound interception (beside the oversized branch). Routes control + /// frames to the active writer (`ping → pong`, `abort → abort`) and otherwise + /// drives the server-as-reader engine (`start`/`pong`/`chunk`/`close`). + #[allow(clippy::too_many_arguments)] + async fn handle_open_stream_frame( + state: &ServerOpenStreamState, + relay_pool: &Arc, + encryption_mode: EncryptionMode, + gift_wrap_mode: GiftWrapMode, + notification: &JsonRpcNotification, + sender_pubkey: &str, + event_id: &str, + is_encrypted: bool, + is_gift_wrap: bool, + outer_kind: u16, + client_supports_open_stream: bool, + ) { + let token = notification + .params + .as_ref() + .and_then(|p| p.get("progressToken")) + .and_then(progress_token_string); + // An active server→client writer owns this token's control frames. + let writer = token + .as_deref() + .and_then(|t| state.event_id_for_token(t)) + .and_then(|eid| state.writer_for(&eid)); + + match open_stream_frame_from_notification(notification) { + Some(OpenStreamFrame::Ping { nonce }) => { + if let Some(writer) = writer { + let _ = writer.pong(nonce).await; + } else { + Self::feed_open_stream_reader( + state, + relay_pool, + encryption_mode, + gift_wrap_mode, + notification, + sender_pubkey, + event_id, + is_encrypted, + is_gift_wrap, + outer_kind, + ) + .await; + } + } + Some(OpenStreamFrame::Abort { reason }) => { + if let Some(writer) = writer { + let _ = writer.abort(reason).await; + } else { + Self::feed_open_stream_reader( + state, + relay_pool, + encryption_mode, + gift_wrap_mode, + notification, + sender_pubkey, + event_id, + is_encrypted, + is_gift_wrap, + outer_kind, + ) + .await; + } + } + Some(OpenStreamFrame::Start { .. }) => { + Self::feed_open_stream_reader( + state, + relay_pool, + encryption_mode, + gift_wrap_mode, + notification, + sender_pubkey, + event_id, + is_encrypted, + is_gift_wrap, + outer_kind, + ) + .await; + // Stateless accept: only for clients that advertised support. + if client_supports_open_stream { + if let Some(token) = token.as_deref() { + Self::publish_open_stream_control_frame( + state, + relay_pool, + encryption_mode, + gift_wrap_mode, + OpenStreamFrame::Accept, + token, + sender_pubkey, + Some(event_id), + is_encrypted, + is_gift_wrap, + outer_kind, + ) + .await; + } + } + } + // pong / chunk / close / accept → server-as-reader engine. + _ => { + Self::feed_open_stream_reader( + state, + relay_pool, + encryption_mode, + gift_wrap_mode, + notification, + sender_pubkey, + event_id, + is_encrypted, + is_gift_wrap, + outer_kind, + ) + .await; + } + } + } + + /// CEP-41 server-as-reader: feed an inbound frame to this peer's reader engine + /// (created on demand) and publish a `pong` if its session asks for one. + #[allow(clippy::too_many_arguments)] + async fn feed_open_stream_reader( + state: &ServerOpenStreamState, + relay_pool: &Arc, + encryption_mode: EncryptionMode, + gift_wrap_mode: GiftWrapMode, + notification: &JsonRpcNotification, + sender_pubkey: &str, + event_id: &str, + is_encrypted: bool, + is_gift_wrap: bool, + outer_kind: u16, + ) { + let outcome = { + let mut store = state.receiver.lock().await; + if !store.contains(sender_pubkey) { + store.put( + sender_pubkey.to_string(), + OpenStreamReceiver::with_policy(state.policy), + ); + } + let receiver = store + .get_mut(sender_pubkey) + .expect("open-stream receiver present after insert"); + receiver.process_frame(notification).await + }; + match outcome { + Ok(FrameOutcome::SendPong(nonce)) => { + if let Some(token) = notification + .params + .as_ref() + .and_then(|p| p.get("progressToken")) + .and_then(progress_token_string) + { + Self::publish_open_stream_control_frame( + state, + relay_pool, + encryption_mode, + gift_wrap_mode, + OpenStreamFrame::Pong { nonce }, + &token, + sender_pubkey, + Some(event_id), + is_encrypted, + is_gift_wrap, + outer_kind, + ) + .await; + } + } + Ok(_) => {} + Err(error) => { + tracing::warn!( + target: LOG_TARGET, + error = %error, + sender_pubkey = %sender_pubkey, + "Inbound open-stream frame rejected by server reader engine" + ); + } + } + } + + /// CEP-41: publish one server→client control frame (`accept`/`pong`/`ping`) on + /// the server-as-reader path, e-tagged to `correlated_event_id` and mirroring + /// the inbound gift-wrap kind. + #[allow(clippy::too_many_arguments)] + async fn publish_open_stream_control_frame( + state: &ServerOpenStreamState, + relay_pool: &Arc, + encryption_mode: EncryptionMode, + gift_wrap_mode: GiftWrapMode, + frame: OpenStreamFrame, + token: &str, + recipient_pubkey: &str, + correlated_event_id: Option<&str>, + is_encrypted: bool, + is_gift_wrap: bool, + outer_kind: u16, + ) { + let recipient = match PublicKey::from_hex(recipient_pubkey) { + Ok(pk) => pk, + Err(_) => return, + }; + let progress = state.next_control_progress(); + let notification = match frame.into_progress_notification(token, progress, None) { + Ok(n) => n, + Err(error) => { + tracing::error!( + target: LOG_TARGET, + error = %error, + "Failed to build open-stream control frame" + ); + return; + } + }; + let mut tags = BaseTransport::create_recipient_tags(&recipient); + // The `e`-tag is present only when the frame correlates to a known request + // event (accept/pong reply to an inbound frame); the keepalive ping for a + // server-as-reader session has no correlation and is sent recipient-only. + if let Some(eid) = correlated_event_id.and_then(|id| EventId::from_hex(id).ok()) { + tags.push(Tag::event(eid)); + } + let base = BaseTransport { + relay_pool: Arc::clone(relay_pool), + encryption_mode, + is_connected: true, + }; + let gift_wrap_kind = Self::select_outbound_gift_wrap_kind( + gift_wrap_mode, + is_encrypted, + if is_gift_wrap { Some(outer_kind) } else { None }, + ); + if let Err(error) = base + .send_mcp_message( + &JsonRpcMessage::Notification(notification), + &recipient, + CTXVM_MESSAGES_KIND, + tags, + Some(is_encrypted), + gift_wrap_kind, + ) + .await + { + tracing::warn!( + target: LOG_TARGET, + error = %error, + "Failed to publish open-stream control frame" + ); + } + } + + /// CEP-41: one keepalive sweep over the server-as-reader sessions (mirrors + /// [`sweep_oversized_receivers`]). Drives each session's pure `tick`: idle → + /// publish `ping`; probe/grace deadline → the reader aborted, so abort the + /// paired writer too if one exists. Drops now-empty peer receivers. + async fn sweep_open_stream_sessions( + state: &ServerOpenStreamState, + relay_pool: &Arc, + encryption_mode: EncryptionMode, + gift_wrap_mode: GiftWrapMode, + ) { + let now = Instant::now(); + let mut actions: Vec<(String, String, KeepaliveAction)> = Vec::new(); + { + let mut store = state.receiver.lock().await; + let mut empty_peers = Vec::new(); + for (peer, receiver) in store.iter_mut() { + for (token, action) in receiver.registry_mut().tick_all(now) { + actions.push((peer.clone(), token, action)); + } + if receiver.active_stream_count() == 0 { + empty_peers.push(peer.clone()); + } + } + for peer in empty_peers { + store.pop(&peer); + } + } + + let probe_is_encrypted = encryption_mode != EncryptionMode::Disabled; + for (peer, token, action) in actions { + match action { + KeepaliveAction::SendPing(nonce) => { + // Server-as-reader sessions have no `token_to_event` entry + // (that index is only populated for server→client writers), so + // this ping is uncorrelated until bidirectional streaming wires + // a reader-side event id through. + let correlated = state.event_id_for_token(&token); + Self::publish_open_stream_control_frame( + state, + relay_pool, + encryption_mode, + gift_wrap_mode, + OpenStreamFrame::Ping { nonce }, + &token, + &peer, + correlated.as_deref(), + probe_is_encrypted, + false, + 0, + ) + .await; + } + KeepaliveAction::Abort(reason) => { + if let Some(eid) = state.event_id_for_token(&token) { + if let Some(writer) = state.writer_for(&eid) { + let _ = writer.abort(Some(reason)).await; + } + } + } + KeepaliveAction::None => {} + } + } + } + async fn cleanup_sessions( sessions: &SessionStore, event_routes: &ServerEventRouteStore, @@ -2484,4 +3397,231 @@ mod tests { oversized_enabled && discovered.supports_oversized_transfer; assert!(session.supports_oversized_transfer); } + + // ── CEP-41 open-stream capability advertisement ───────────── + + #[test] + fn test_open_stream_support_tags_helper() { + // Disabled (the default) → no tag; enabled → the single-element tag. + assert!(open_stream_support_tags(&OpenStreamConfig::default()).is_empty()); + let names = first_tag_values(&open_stream_support_tags(&OpenStreamConfig::enabled())); + assert_eq!(names, vec!["support_open_stream"]); + } + + #[test] + fn test_internal_common_capability_tags_merges_both() { + let config = NostrServerTransportConfig::default() + .with_oversized_enabled(true) + .with_open_stream(OpenStreamConfig::enabled()); + let names = first_tag_values(&internal_common_capability_tags(&config)); + assert!(names.contains(&"support_oversized_transfer".to_string())); + assert!(names.contains(&"support_open_stream".to_string())); + } + + #[tokio::test] + async fn test_announcement_includes_open_stream_tag_when_enabled() { + let config = NostrServerTransportConfig { + open_stream: OpenStreamConfig::enabled(), + ..Default::default() + }; + let pool: Arc = Arc::new(crate::relay::mock::MockRelayPool::new()); + let server = NostrServerTransport::with_relay_pool(config, pool) + .await + .expect("server transport construction"); + let names = first_tag_values(&server.announcement_manager.get_common_tags()); + assert!( + names.contains(&"support_open_stream".to_string()), + "announcement must advertise open-stream support when enabled" + ); + } + + #[tokio::test] + async fn test_announcement_omits_open_stream_tag_when_disabled() { + // The default config has open-stream disabled (opt-in). + let pool: Arc = Arc::new(crate::relay::mock::MockRelayPool::new()); + let server = + NostrServerTransport::with_relay_pool(NostrServerTransportConfig::default(), pool) + .await + .expect("server transport construction"); + let names = first_tag_values(&server.announcement_manager.get_common_tags()); + assert!(!names.contains(&"support_open_stream".to_string())); + } + + #[test] + fn test_server_learns_client_open_stream_only_when_enabled() { + let open_stream_tag = Tag::custom( + TagKind::Custom(tags::SUPPORT_OPEN_STREAM.into()), + Vec::::new(), + ); + let discovered = learn_peer_capabilities(&[open_stream_tag]); + assert!(discovered.supports_open_stream); + + // Disabled server: the client flag is ignored. + let mut session = ClientSession::new(false); + let open_stream_enabled = false; + session.supports_open_stream |= open_stream_enabled && discovered.supports_open_stream; + assert!(!session.supports_open_stream); + + // Enabled server: the client flag is learned. + let open_stream_enabled = true; + session.supports_open_stream |= open_stream_enabled && discovered.supports_open_stream; + assert!(session.supports_open_stream); + } + + // ── CEP-41 response deferral (try_defer_open_stream_response) ─────── + + /// A no-op writer (publishes nothing) for exercising the deferral decision. + fn deferral_test_writer(token: &str) -> OpenStreamWriter { + let publish_frame: PublishFrame = Arc::new(|_frame: JsonRpcNotification| { + Box::pin(async move { Ok(EventId::all_zeros()) }) + }); + OpenStreamWriter::new(OpenStreamWriterOptions { + progress_token: token.to_string(), + publish_frame, + content_type: None, + on_close: None, + on_abort: None, + }) + } + + /// Install a writer slot + `token → event_id` index entry, mirroring + /// `create_open_stream_writer`. + fn install_slot( + state: &ServerOpenStreamState, + event_id: &str, + writer: OpenStreamWriter, + terminated: bool, + ) { + let token = writer.progress_token().to_string(); + let snapshot = RouteSnapshot { + client_pubkey: Keys::generate().public_key(), + original_request_id: serde_json::json!(1), + is_encrypted: false, + mirrored_wrap_kind: None, + }; + state.lock_slots().insert( + event_id.to_string(), + OpenStreamSlot { + writer, + snapshot, + pending_response: None, + terminated, + }, + ); + state.lock_token_index().insert(token, event_id.to_string()); + } + + fn dummy_response() -> JsonRpcMessage { + JsonRpcMessage::Response(JsonRpcResponse { + jsonrpc: "2.0".to_string(), + id: serde_json::json!(1), + result: serde_json::json!({ "ok": true }), + }) + } + + #[tokio::test] + async fn try_defer_open_stream_response_branch_coverage() { + let config = NostrServerTransportConfig::default() + .with_open_stream(OpenStreamConfig::default().with_enabled(true)); + let pool: Arc = Arc::new(MockRelayPool::new()); + let transport = NostrServerTransport::with_relay_pool(config, pool) + .await + .expect("server transport"); + + // No slot for the event (`slots.get_mut` is None) → Passthrough. + assert!(matches!( + transport.try_defer_open_stream_response("evt-none", dummy_response()), + OpenStreamDeferral::Passthrough(_) + )); + + // `!writer.has_started()` — writer created (progressToken present) but the + // tool never streamed → drop the writer and Passthrough. The slot AND the + // token index entry must both be removed so the unused writer cannot leak. + install_slot( + &transport.open_stream, + "evt-unstarted", + deferral_test_writer("tok-unstarted"), + false, + ); + assert!(matches!( + transport.try_defer_open_stream_response("evt-unstarted", dummy_response()), + OpenStreamDeferral::Passthrough(_) + )); + assert!( + transport + .open_stream + .lock_slots() + .get("evt-unstarted") + .is_none(), + "unstarted writer slot must be removed (no leak)" + ); + assert!( + transport + .open_stream + .lock_token_index() + .get("tok-unstarted") + .is_none(), + "unstarted writer token index must be removed (no leak)" + ); + + // `slot.terminated` (the function's "Ordering B") — started writer whose + // stream already closed/aborted → deliver now from the snapshot (SendNow); + // the slot + token index are freed. + let terminal = deferral_test_writer("tok-terminal"); + terminal.start().await.expect("start"); + install_slot(&transport.open_stream, "evt-terminal", terminal, true); + assert!(matches!( + transport.try_defer_open_stream_response("evt-terminal", dummy_response()), + OpenStreamDeferral::SendNow { .. } + )); + assert!(transport + .open_stream + .lock_slots() + .get("evt-terminal") + .is_none()); + assert!(transport + .open_stream + .lock_token_index() + .get("tok-terminal") + .is_none()); + + // `else` of `slot.terminated` (the function's "Ordering A") — started + // writer, stream still open → Deferred. The response is stashed and the + // slot retained for the close/abort hook to flush. + let open = deferral_test_writer("tok-open"); + open.start().await.expect("start"); + install_slot(&transport.open_stream, "evt-open", open, false); + assert!(matches!( + transport.try_defer_open_stream_response("evt-open", dummy_response()), + OpenStreamDeferral::Deferred + )); + { + let slots = transport.open_stream.lock_slots(); + let slot = slots.get("evt-open").expect("deferred slot retained"); + assert!( + slot.pending_response.is_some(), + "the deferred response must be stashed for the hook to flush" + ); + } + + // Disabled gate — a server with open-stream disabled never exposes a + // writer, so `send_response` never reaches the deferral decision at all. + let disabled = NostrServerTransport::with_relay_pool( + NostrServerTransportConfig::default() + .with_open_stream(OpenStreamConfig::default().with_enabled(false)), + Arc::new(MockRelayPool::new()) as Arc, + ) + .await + .expect("disabled server transport"); + install_slot( + &disabled.open_stream, + "evt-disabled", + deferral_test_writer("tok-disabled"), + false, + ); + assert!( + disabled.get_open_stream_writer("evt-disabled").is_none(), + "a disabled server must not expose writers (deferral never attempted)" + ); + } } diff --git a/tests/open_stream_e2e.rs b/tests/open_stream_e2e.rs new file mode 100644 index 0000000..849de31 --- /dev/null +++ b/tests/open_stream_e2e.rs @@ -0,0 +1,801 @@ +//! CEP-41 open-stream end-to-end tests — full rmcp client + server over the mock +//! relay, plus the CEP-22 (oversized request) + CEP-41 (streaming response) +//! composition. +//! +//! Declared in `Cargo.toml` with `required-features = ["rmcp", "test-utils"]` +//! (same as `e2e_happy_path`) so plain `cargo test` skips it and stays green. + +use std::sync::Arc; +use std::time::Duration; + +use contextvm_sdk::core::types::EncryptionMode; +use contextvm_sdk::relay::mock::MockRelayPool; +use contextvm_sdk::transport::client::{NostrClientTransport, NostrClientTransportConfig}; +use contextvm_sdk::transport::open_stream::{OpenStreamConfig, OpenStreamWriter}; +use contextvm_sdk::transport::server::{NostrServerTransport, NostrServerTransportConfig}; +use contextvm_sdk::{ + call_tool_stream, progress_aware_options, JsonRpcMessage, JsonRpcRequest, + PeerRequestOptionsExt, RelayPoolTrait, DEFAULT_OVERSIZED_IDLE_TIMEOUT, + DEFAULT_OVERSIZED_MAX_TOTAL_TIMEOUT, +}; +use futures::StreamExt; +use nostr_sdk::prelude::*; +use rmcp::handler::server::wrapper::Parameters; +use rmcp::model::{ + CallToolRequestParams, CallToolResult, Content, ErrorData, Implementation, RawContent, + ServerCapabilities, +}; +use rmcp::service::RequestContext; +use rmcp::{ + schemars, tool, tool_handler, tool_router, ClientHandler, RoleServer, ServerHandler, ServiceExt, +}; +use tokio::sync::Notify; + +// ── harness ──────────────────────────────────────────────────────────────── + +/// A `big_data` response payload size that exceeds the default oversized +/// threshold (48_000 bytes), forcing CEP-22 fragmentation. +const BIG_RESPONSE_LEN: usize = 120_000; + +fn as_pool(pool: MockRelayPool) -> Arc { + Arc::new(pool) +} + +async fn let_event_loops_start() { + tokio::time::sleep(Duration::from_millis(20)).await; +} + +#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] +struct TopicParams { + topic: String, +} + +/// rmcp server exposing CEP-41 streaming tools. Each tool reaches its injected +/// [`OpenStreamWriter`] via `ctx.extensions`. `release` gates the `deferred` +/// tool's `close` so a test can observe response deferral. +#[derive(Clone)] +struct StreamServer { + release: Arc, +} + +impl StreamServer { + fn new(release: Arc) -> Self { + Self { release } + } +} + +fn writer_of(ctx: &RequestContext) -> Option { + ctx.extensions.get::().cloned() +} + +#[tool_router] +impl StreamServer { + /// Stream three chunks then close; the final result must arrive after `close`. + #[tool(description = "Stream a, b, c then complete")] + async fn stream3( + &self, + Parameters(TopicParams { topic }): Parameters, + ctx: RequestContext, + ) -> Result { + if let Some(writer) = writer_of(&ctx) { + let _ = writer.start().await; + let _ = writer.write("a".to_string()).await; + let _ = writer.write("b".to_string()).await; + let _ = writer.write("c".to_string()).await; + let _ = writer.close().await; + } + Ok(CallToolResult::success(vec![Content::text(format!( + "completed:{topic}" + ))])) + } + + /// Stream a, b, c then return the *length* of the received topic (a small + /// response, so a large reassembled request does not force an oversized + /// final response). Used by the CEP-22 + CEP-41 composition test. + #[tool(description = "Stream a, b, c then return the received topic length")] + async fn stream_len( + &self, + Parameters(TopicParams { topic }): Parameters, + ctx: RequestContext, + ) -> Result { + if let Some(writer) = writer_of(&ctx) { + let _ = writer.start().await; + let _ = writer.write("a".to_string()).await; + let _ = writer.write("b".to_string()).await; + let _ = writer.write("c".to_string()).await; + let _ = writer.close().await; + } + Ok(CallToolResult::success(vec![Content::text(format!( + "len:{}", + topic.len() + ))])) + } + + /// Stream one chunk, block until released, then close — proves the final + /// result is held until the stream closes. + #[tool(description = "Stream then block until released, then close")] + async fn deferred( + &self, + Parameters(TopicParams { topic }): Parameters, + ctx: RequestContext, + ) -> Result { + if let Some(writer) = writer_of(&ctx) { + let _ = writer.start().await; + let _ = writer.write(format!("{topic}:1")).await; + self.release.notified().await; + let _ = writer.close().await; + } + Ok(CallToolResult::success(vec![Content::text(format!( + "deferred:{topic}" + ))])) + } + + /// Carries a progress token but never streams — the response must be sent + /// normally (the unstarted-writer / progress-token-conflict guard). + #[tool(description = "Return without streaming")] + async fn no_stream( + &self, + Parameters(TopicParams { topic }): Parameters, + _ctx: RequestContext, + ) -> Result { + Ok(CallToolResult::success(vec![Content::text(format!( + "plain:{topic}" + ))])) + } + + /// Stream one chunk then stay open until the client aborts (the writer goes + /// inactive when the server applies the inbound `abort`). + #[tool(description = "Stream then wait for the client to abort")] + async fn client_abortable( + &self, + Parameters(TopicParams { topic }): Parameters, + ctx: RequestContext, + ) -> Result { + if let Some(writer) = writer_of(&ctx) { + let _ = writer.start().await; + let _ = writer.write(format!("{topic}:1")).await; + for _ in 0..1000 { + if !writer.is_active() { + break; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + } + Ok(CallToolResult::success(vec![Content::text(format!( + "client-aborted:{topic}" + ))])) + } + + /// Stream two topic-specific chunks then close — chunks carry the topic so a + /// concurrent test can detect any token/stream crossing. + #[tool(description = "Stream {topic}:1, {topic}:2 then complete")] + async fn stream_topic( + &self, + Parameters(TopicParams { topic }): Parameters, + ctx: RequestContext, + ) -> Result { + if let Some(writer) = writer_of(&ctx) { + let _ = writer.start().await; + let _ = writer.write(format!("{topic}:1")).await; + let _ = writer.write(format!("{topic}:2")).await; + let _ = writer.close().await; + } + Ok(CallToolResult::success(vec![Content::text(format!( + "completed:{topic}" + ))])) + } + + /// Stream `first:{topic}`, block until released, then stream `second:{topic}` + /// and close. Lets a test interleave another call between the two chunks. + #[tool(description = "Stream first, wait for release, stream second, close")] + async fn stream_pair( + &self, + Parameters(TopicParams { topic }): Parameters, + ctx: RequestContext, + ) -> Result { + if let Some(writer) = writer_of(&ctx) { + let _ = writer.start().await; + let _ = writer.write(format!("first:{topic}")).await; + self.release.notified().await; + let _ = writer.write(format!("second:{topic}")).await; + let _ = writer.close().await; + } + Ok(CallToolResult::success(vec![Content::text(format!( + "done:{topic}" + ))])) + } + + /// Return a large (> oversized threshold) response with no streaming — used to + /// exercise a CEP-22 oversized response while a separate CEP-41 stream is live. + #[tool(description = "Return a large response payload without streaming")] + async fn big_data( + &self, + Parameters(TopicParams { topic: _ }): Parameters, + _ctx: RequestContext, + ) -> Result { + Ok(CallToolResult::success(vec![Content::text( + "X".repeat(BIG_RESPONSE_LEN), + )])) + } +} + +#[tool_handler] +impl ServerHandler for StreamServer { + fn get_info(&self) -> rmcp::model::ServerInfo { + rmcp::model::ServerInfo::new(ServerCapabilities::builder().enable_tools().build()) + .with_server_info(Implementation::new("open-stream-e2e-server", "0.1.0")) + } +} + +#[derive(Clone, Default)] +struct DemoClient; +impl ClientHandler for DemoClient {} + +fn first_text(result: &CallToolResult) -> String { + result + .content + .iter() + .find_map(|c| match &c.raw { + RawContent::Text(t) => Some(t.text.clone()), + _ => None, + }) + .unwrap_or_default() +} + +/// Whether any event stored on `relay` carries a `params.cvm.type == kind` frame. +/// The fixtures disable encryption, so `event.content` is plaintext JSON. +async fn relay_has_cvm_frame(relay: &MockRelayPool, kind: &str) -> bool { + relay.stored_events().await.iter().any(|event| { + serde_json::from_str::(&event.content) + .ok() + .and_then(|v| { + v.get("params") + .and_then(|p| p.get("cvm")) + .and_then(|c| c.get("type")) + .and_then(|t| t.as_str()) + .map(|t| t == kind) + }) + .unwrap_or(false) + }) +} + +fn call_params(name: &'static str, topic: &str) -> CallToolRequestParams { + let mut params = CallToolRequestParams::new(name); + if let Ok(v) = serde_json::from_value(serde_json::json!({ "topic": topic })) { + params = params.with_arguments(v); + } + params +} + +struct Fixture { + client: rmcp::service::RunningService, + handle: contextvm_sdk::ClientOpenStreamHandle, + server_handle: tokio::task::JoinHandle<()>, + relay: Arc, + release: Arc, +} + +/// Build a running rmcp server (open-stream `server_enabled`) + a running rmcp +/// client, returning the client peer wrapper and the client's open-stream handle +/// (captured before the transport is moved into `serve`). +async fn fixture(server_enabled: bool, client_enabled: bool, oversized: bool) -> Fixture { + let (client_pool, server_pool) = MockRelayPool::create_pair(); + let server_pubkey_hex = server_pool.mock_public_key().to_hex(); + let server_pool = Arc::new(server_pool); + let relay = server_pool.clone(); + let release = Arc::new(Notify::new()); + + let server_transport = NostrServerTransport::with_relay_pool( + NostrServerTransportConfig::default() + .with_encryption_mode(EncryptionMode::Disabled) + .with_oversized_enabled(oversized) + .with_open_stream(OpenStreamConfig::default().with_enabled(server_enabled)), + server_pool as Arc, + ) + .await + .expect("create server transport"); + + let client_transport = NostrClientTransport::with_relay_pool( + NostrClientTransportConfig::default() + .with_server_pubkey(server_pubkey_hex) + .with_encryption_mode(EncryptionMode::Disabled) + .with_relay_urls(vec!["wss://mock.relay".to_string()]) + .with_oversized_enabled(oversized) + .with_open_stream(OpenStreamConfig::default().with_enabled(client_enabled)), + as_pool(client_pool), + ) + .await + .expect("create client transport"); + + // Capture the open-stream handle BEFORE `serve` consumes the transport. + let handle = client_transport.open_stream_handle(); + + let server = StreamServer::new(release.clone()); + let server_handle = tokio::spawn(async move { + server + .serve(server_transport) + .await + .expect("server serve failed") + .waiting() + .await + .expect("server error"); + }); + let_event_loops_start().await; + + let client = tokio::time::timeout(Duration::from_secs(5), DemoClient.serve(client_transport)) + .await + .expect("client startup timed out") + .expect("client init failed"); + + Fixture { + client, + handle, + server_handle, + relay, + release, + } +} + +async fn shutdown(fixture: Fixture) { + let _ = fixture.client.cancel().await; + fixture.server_handle.abort(); +} + +async fn collect_chunks( + stream: &mut contextvm_sdk::transport::open_stream::OpenStreamSession, +) -> Vec { + let mut out = Vec::new(); + while let Some(item) = stream.next().await { + match item { + Ok(value) => out.push(value), + Err(error) => panic!("stream yielded an error: {error}"), + } + } + out +} + +// ── tests ────────────────────────────────────────────────────────────────── + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn open_stream_roundtrip_numeric_token() { + let fx = fixture(true, true, false).await; + + let mut call = call_tool_stream( + fx.client.peer(), + &fx.handle, + call_params("stream3", "orders"), + ) + .await + .expect("call_tool_stream"); + + // rmcp stamps a numeric progressToken (wire-stringified into the frames). + assert!( + call.progress_token.parse::().is_ok(), + "expected a numeric (stringified) progress token, got {:?}", + call.progress_token + ); + + let chunks = collect_chunks(&mut call.stream).await; + assert_eq!(chunks, vec!["a", "b", "c"]); + + let result = tokio::time::timeout(Duration::from_secs(5), &mut call.result) + .await + .expect("result timed out") + .expect("tool call failed"); + assert_eq!(first_text(&result), "completed:orders"); + + shutdown(fx).await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn open_stream_concurrent_calls_stay_isolated_by_token() { + // Two `call_tool_stream` calls issued concurrently must not cross their + // tokens/streams (the placeholder push→bind window is serialized). Topic- + // specific chunks make any crossing observable. + let fx = fixture(true, true, false).await; + + let (orders, invoices) = tokio::join!( + call_tool_stream( + fx.client.peer(), + &fx.handle, + call_params("stream_topic", "orders"), + ), + call_tool_stream( + fx.client.peer(), + &fx.handle, + call_params("stream_topic", "invoices"), + ), + ); + let mut orders = orders.expect("orders call_tool_stream"); + let mut invoices = invoices.expect("invoices call_tool_stream"); + + assert_ne!( + orders.progress_token, invoices.progress_token, + "concurrent calls must get distinct tokens" + ); + + let order_chunks = collect_chunks(&mut orders.stream).await; + let invoice_chunks = collect_chunks(&mut invoices.stream).await; + assert_eq!(order_chunks, vec!["orders:1", "orders:2"]); + assert_eq!(invoice_chunks, vec!["invoices:1", "invoices:2"]); + + let order_result = tokio::time::timeout(Duration::from_secs(5), &mut orders.result) + .await + .expect("orders result timed out") + .expect("orders tool failed"); + let invoice_result = tokio::time::timeout(Duration::from_secs(5), &mut invoices.result) + .await + .expect("invoices result timed out") + .expect("invoices tool failed"); + assert_eq!(first_text(&order_result), "completed:orders"); + assert_eq!(first_text(&invoice_result), "completed:invoices"); + + shutdown(fx).await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn open_stream_deferred_response_after_close() { + let fx = fixture(true, true, false).await; + + let mut call = call_tool_stream( + fx.client.peer(), + &fx.handle, + call_params("deferred", "orders"), + ) + .await + .expect("call_tool_stream"); + + // First chunk arrives while the tool is blocked before `close`. + let first = call.stream.next().await.expect("first chunk").expect("ok"); + assert_eq!(first, "orders:1"); + + // The final result must NOT be ready while the stream is still open. + assert!( + tokio::time::timeout(Duration::from_millis(200), &mut call.result) + .await + .is_err(), + "the final response must be deferred until the stream closes" + ); + + // Release the tool → it closes the stream → the deferred response flushes. + fx.release.notify_one(); + + assert!( + call.stream.next().await.is_none(), + "stream must close after release" + ); + let result = tokio::time::timeout(Duration::from_secs(5), &mut call.result) + .await + .expect("result timed out") + .expect("tool call failed"); + assert_eq!(first_text(&result), "deferred:orders"); + + shutdown(fx).await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn open_stream_client_abort_propagates() { + let fx = fixture(true, true, false).await; + + let mut call = call_tool_stream( + fx.client.peer(), + &fx.handle, + call_params("client_abortable", "orders"), + ) + .await + .expect("call_tool_stream"); + + let first = call.stream.next().await.expect("first chunk").expect("ok"); + assert_eq!(first, "orders:1"); + + // Consumer cancels → publishes an `abort` to the server, whose writer aborts. + call.abort(Some("client cancelled".to_string())).await; + + // The local stream surfaces the terminal abort error. + match call.stream.next().await { + Some(Err(error)) => assert!(error.to_string().contains("client cancelled")), + other => panic!("expected an abort error, got {other:?}"), + } + + // (Registry-slot freeing on consumer abort is unit-tested in + // `open_stream/registry.rs::consumer_abort_frees_slot_and_runs_hook`.) + + // The server tool observed the abort (its writer went inactive) and returned. + let result = tokio::time::timeout(Duration::from_secs(5), &mut call.result) + .await + .expect("result timed out") + .expect("tool call failed"); + assert_eq!(first_text(&result), "client-aborted:orders"); + + shutdown(fx).await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn open_stream_unstarted_writer_sends_normal_response() { + let fx = fixture(true, true, false).await; + + let mut call = call_tool_stream( + fx.client.peer(), + &fx.handle, + call_params("no_stream", "orders"), + ) + .await + .expect("call_tool_stream"); + + // The tool never streamed: the response is sent normally (no deferral hang). + let result = tokio::time::timeout(Duration::from_secs(5), &mut call.result) + .await + .expect("response must not hang when the writer never started") + .expect("tool call failed"); + assert_eq!(first_text(&result), "plain:orders"); + + drop(call.stream); + shutdown(fx).await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn open_stream_gate_off_server_disabled_streams_nothing() { + // Server has open-stream disabled: no writer is injected, so the tool cannot + // stream — the response is plain and no open-stream frame ever hits the relay. + let fx = fixture(false, true, false).await; + + let mut call = call_tool_stream( + fx.client.peer(), + &fx.handle, + call_params("stream3", "orders"), + ) + .await + .expect("call_tool_stream"); + + let result = tokio::time::timeout(Duration::from_secs(5), &mut call.result) + .await + .expect("result timed out") + .expect("tool call failed"); + assert_eq!(first_text(&result), "completed:orders"); + + // No open-stream cvm frame was ever published. + let saw_open_stream_frame = fx.relay.stored_events().await.iter().any(|event| { + serde_json::from_str::(&event.content) + .ok() + .and_then(|v| { + v.get("params") + .and_then(|p| p.get("cvm")) + .and_then(|c| c.get("type")) + .and_then(|t| t.as_str()) + .map(|t| t == "open-stream") + }) + .unwrap_or(false) + }); + assert!( + !saw_open_stream_frame, + "a gated-off server must never publish open-stream frames" + ); + + drop(call.stream); + shutdown(fx).await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn open_stream_roundtrip_string_token_greybox() { + // A genuine STRING progressToken (rmcp always stamps numeric ones) driven + // greybox: a raw client crafts the `tools/call`, the real server streams. + let (client_pool, server_pool) = MockRelayPool::create_pair(); + let server_pubkey = server_pool.mock_public_key(); + let release = Arc::new(Notify::new()); + + let server_transport = NostrServerTransport::with_relay_pool( + NostrServerTransportConfig::default() + .with_encryption_mode(EncryptionMode::Disabled) + .with_open_stream(OpenStreamConfig::enabled()), + as_pool(server_pool), + ) + .await + .expect("server transport"); + let server = StreamServer::new(release); + let server_handle = tokio::spawn(async move { + server + .serve(server_transport) + .await + .expect("server serve") + .waiting() + .await + .expect("server error"); + }); + let_event_loops_start().await; + + let mut client = NostrClientTransport::with_relay_pool( + NostrClientTransportConfig::default() + .with_server_pubkey(server_pubkey.to_hex()) + .with_encryption_mode(EncryptionMode::Disabled) + .with_relay_urls(vec!["wss://mock.relay".to_string()]) + .with_open_stream(OpenStreamConfig::enabled()), + as_pool(client_pool), + ) + .await + .expect("client transport"); + let _client_rx = client.take_message_receiver().expect("client rx"); + client.start().await.expect("client start"); + let_event_loops_start().await; + + // Bind a reader session to a string token, then publish a crafted tools/call. + let pending = client.prepare_outbound_open_stream_session(); + let request = JsonRpcMessage::Request(JsonRpcRequest { + jsonrpc: "2.0".to_string(), + id: serde_json::json!(1), + method: "tools/call".to_string(), + params: Some(serde_json::json!({ + "name": "stream3", + "arguments": { "topic": "orders" }, + "_meta": { "progressToken": "string-token-1" }, + })), + }); + client.send(&request).await.expect("send tools/call"); + + let (token, mut stream) = tokio::time::timeout(Duration::from_secs(5), pending) + .await + .expect("placeholder timed out") + .expect("placeholder dropped") + .expect("session admission"); + assert_eq!(token, "string-token-1"); + + let chunks = collect_chunks(&mut stream).await; + assert_eq!(chunks, vec!["a", "b", "c"]); + + let _ = client.close().await; + server_handle.abort(); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn open_stream_oversized_request_streaming_response_composition() { + // CEP-22 (oversized request) + CEP-41 (streaming response) over one token. + let fx = fixture(true, true, true).await; + + // A large topic forces the request over the oversized threshold (CEP-22), + // while the tool streams its response (CEP-41). `stream_len` echoes the + // received length so the final response stays small. + let big_len = 120_000usize; + let big_topic = "Z".repeat(big_len); + let mut call = call_tool_stream( + fx.client.peer(), + &fx.handle, + call_params("stream_len", &big_topic), + ) + .await + .expect("call_tool_stream"); + + // The streaming response is delivered in order (open-stream receiver), and is + // not cross-fed by the oversized receiver (type-disjoint predicates). + let chunks = collect_chunks(&mut call.stream).await; + assert_eq!(chunks, vec!["a", "b", "c"]); + + // The request reassembled (CEP-22): the tool saw the full topic, and the final + // response arrives after `close` (deferral). + let result = tokio::time::timeout(Duration::from_secs(10), &mut call.result) + .await + .expect("result timed out") + .expect("tool call failed"); + assert_eq!(first_text(&result), format!("len:{big_len}")); + + shutdown(fx).await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn open_stream_oversized_response_while_separate_stream_is_live() { + // CEP-22 × CEP-41 composition under concurrency: a live open stream (one + // token) coexists with a separate plain tool whose oversized response (a + // different token) is fragmented. The big tool's progress token creates an + // *unused* server writer that must NOT defer its response (Fix 1) and must + // NOT steal the live stream's session (Fix 2). + let fx = fixture(true, true, true).await; + + // (1) Start the long-lived stream and read its first chunk. + let mut call = call_tool_stream( + fx.client.peer(), + &fx.handle, + call_params("stream_pair", "orders"), + ) + .await + .expect("call_tool_stream"); + let first = call.stream.next().await.expect("first chunk").expect("ok"); + assert_eq!(first, "first:orders"); + + // (2) While the stream is live, a separate plain call returns an oversized + // response. Progress-aware options stamp a progressToken (the trigger for + // both the oversized routing and the unused-writer hazard). + let big_result = fx + .client + .peer() + .call_tool_with_options( + call_params("big_data", "ignored"), + progress_aware_options( + DEFAULT_OVERSIZED_IDLE_TIMEOUT, + DEFAULT_OVERSIZED_MAX_TOTAL_TIMEOUT, + ), + ) + .await + .expect("big_data call"); + let big_text = first_text(&big_result); + assert_eq!( + big_text.len(), + BIG_RESPONSE_LEN, + "oversized response reassembled byte-exactly" + ); + assert!( + big_text.bytes().all(|b| b == b'X'), + "payload integrity preserved" + ); + + // (3) The oversized path must actually have fragmented the response. + assert!( + relay_has_cvm_frame(&fx.relay, "oversized-transfer").await, + "the big response must have been fragmented via CEP-22" + ); + + // (4) The streaming session is unaffected: it delivers its remaining chunk, + // closes, and resolves its final result. + fx.release.notify_one(); + let second = call.stream.next().await.expect("second chunk").expect("ok"); + assert_eq!(second, "second:orders"); + assert!( + call.stream.next().await.is_none(), + "stream must close after release" + ); + let result = tokio::time::timeout(Duration::from_secs(10), &mut call.result) + .await + .expect("result timed out") + .expect("tool call failed"); + assert_eq!(first_text(&result), "done:orders"); + + shutdown(fx).await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn plain_call_with_progress_token_does_not_interfere_with_live_stream() { + // Progress-token-conflict guard: a plain (non-streaming) call that carries a + // progressToken returns normally — its unused server writer must not defer + // the response — while a separate live stream keeps delivering uncrossed. + let fx = fixture(true, true, false).await; + + // (1) Start the long-lived stream and read its first chunk. + let mut call = call_tool_stream( + fx.client.peer(), + &fx.handle, + call_params("stream_pair", "orders"), + ) + .await + .expect("call_tool_stream"); + let first = call.stream.next().await.expect("first chunk").expect("ok"); + assert_eq!(first, "first:orders"); + + // (2) A plain call with progress-aware options (which inject a progressToken) + // must return its own result and must not be deferred forever. + let plain = fx + .client + .peer() + .call_tool_with_options( + call_params("no_stream", "ping"), + progress_aware_options( + DEFAULT_OVERSIZED_IDLE_TIMEOUT, + DEFAULT_OVERSIZED_MAX_TOTAL_TIMEOUT, + ), + ) + .await + .expect("plain call must not hang"); + assert_eq!(first_text(&plain), "plain:ping"); + + // (3) The live stream received only its own chunks and still completes. + fx.release.notify_one(); + let second = call.stream.next().await.expect("second chunk").expect("ok"); + assert_eq!(second, "second:orders"); + assert!( + call.stream.next().await.is_none(), + "stream must close after release" + ); + let result = tokio::time::timeout(Duration::from_secs(5), &mut call.result) + .await + .expect("result timed out") + .expect("tool call failed"); + assert_eq!(first_text(&result), "done:orders"); + + shutdown(fx).await; +} diff --git a/tests/open_stream_timeout_e2e.rs b/tests/open_stream_timeout_e2e.rs new file mode 100644 index 0000000..1ae7d0a --- /dev/null +++ b/tests/open_stream_timeout_e2e.rs @@ -0,0 +1,265 @@ +//! CEP-41 keepalive timer e2e tests — the live client keepalive sweep driven +//! deterministically. +//! +//! The reader session clock is `std::time::Instant` (unaffected by tokio's +//! `start_paused`), so instead of advancing a paused clock we inject an explicit +//! future `now` into [`NostrClientTransport::run_open_stream_keepalive_sweep`] and +//! invoke it manually — no real sleeps for the timer logic. A greybox "server" +//! (a `BaseTransport` over the shared mock store) hand-publishes the inbound +//! frames the client reads. +//! +//! Declared in `Cargo.toml` with `required-features = ["rmcp", "test-utils"]`. + +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use contextvm_sdk::core::constants::CTXVM_MESSAGES_KIND; +use contextvm_sdk::core::types::EncryptionMode; +use contextvm_sdk::relay::mock::MockRelayPool; +use contextvm_sdk::transport::base::BaseTransport; +use contextvm_sdk::transport::client::{NostrClientTransport, NostrClientTransportConfig}; +use contextvm_sdk::transport::open_stream::{OpenStreamConfig, OpenStreamFrame, OpenStreamSession}; +use contextvm_sdk::{JsonRpcMessage, RelayPoolTrait}; +use futures::StreamExt; +use nostr_sdk::prelude::*; + +const IDLE_MS: u64 = 40; +const PROBE_MS: u64 = 60; +// Wide enough that the ~30 ms "let the close land" settle (after which `t0` is +// captured) is negligible against the survive/abort sweep offsets below. +const GRACE_MS: u64 = 400; + +// ── harness ──────────────────────────────────────────────────────────────── + +fn greybox_server_base(server_pool: &Arc) -> BaseTransport { + BaseTransport { + relay_pool: Arc::clone(server_pool) as Arc, + encryption_mode: EncryptionMode::Disabled, + is_connected: true, + } +} + +/// Hand-publish one open-stream frame from the greybox server to the client. +async fn publish_frame( + base: &BaseTransport, + client: &PublicKey, + progress: u64, + frame: OpenStreamFrame, +) { + let notification = frame + .into_progress_notification("tok", progress, None) + .expect("build frame"); + base.send_mcp_message( + &JsonRpcMessage::Notification(notification), + client, + CTXVM_MESSAGES_KIND, + BaseTransport::create_recipient_tags(client), + Some(false), + None, + ) + .await + .expect("publish frame"); +} + +/// Start a client whose reader sessions use the short keepalive timeouts above. +async fn start_keepalive_client( + client_pool: MockRelayPool, + server_pubkey: &PublicKey, +) -> NostrClientTransport { + let mut client = NostrClientTransport::with_relay_pool( + NostrClientTransportConfig::default() + .with_relay_urls(vec!["wss://mock.relay".to_string()]) + .with_server_pubkey(server_pubkey.to_hex()) + .with_encryption_mode(EncryptionMode::Disabled) + .with_open_stream( + OpenStreamConfig::enabled() + .with_idle_timeout_ms(IDLE_MS) + .with_probe_timeout_ms(PROBE_MS) + .with_close_grace_period_ms(GRACE_MS), + ), + Arc::new(client_pool) as Arc, + ) + .await + .expect("client transport"); + let _rx = client.take_message_receiver().expect("client rx"); + client.start().await.expect("client start"); + tokio::time::sleep(Duration::from_millis(20)).await; + client +} + +/// Poll for the reader session to exist and have observed its `start`. +async fn wait_for_started_session(client: &NostrClientTransport) -> OpenStreamSession { + for _ in 0..200 { + if let Some(session) = client.get_open_stream_session("tok").await { + if session.has_started() { + return session; + } + } + tokio::time::sleep(Duration::from_millis(5)).await; + } + panic!("reader session for 'tok' never started"); +} + +/// Poll the shared store for a published frame whose `cvm.frameType == frame_type`, +/// returning its `cvm.nonce` (or "" when absent). +async fn wait_for_published_frame(pool: &MockRelayPool, frame_type: &str) -> String { + for _ in 0..200 { + for event in pool.stored_events().await { + let Ok(v) = serde_json::from_str::(&event.content) else { + continue; + }; + let cvm = v.get("params").and_then(|p| p.get("cvm")); + if cvm.and_then(|c| c.get("type")).and_then(|t| t.as_str()) != Some("open-stream") { + continue; + } + if cvm + .and_then(|c| c.get("frameType")) + .and_then(|t| t.as_str()) + == Some(frame_type) + { + return cvm + .and_then(|c| c.get("nonce")) + .and_then(|n| n.as_str()) + .unwrap_or_default() + .to_string(); + } + } + tokio::time::sleep(Duration::from_millis(5)).await; + } + panic!("no published {frame_type} frame observed"); +} + +fn start_frame() -> OpenStreamFrame { + OpenStreamFrame::Start { content_type: None } +} + +// ── tests ────────────────────────────────────────────────────────────────── + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn idle_elapses_ping_emitted_pong_resets_stream_survives() { + let (client_pool, server_pool) = MockRelayPool::create_pair(); + let server_pubkey = server_pool.mock_public_key(); + let client_pubkey = client_pool.mock_public_key(); + let server_pool = Arc::new(server_pool); + + let mut client = start_keepalive_client(client_pool, &server_pubkey).await; + let base = greybox_server_base(&server_pool); + + publish_frame(&base, &client_pubkey, 1, start_frame()).await; + let session = wait_for_started_session(&client).await; + let t0 = Instant::now(); + + // Idle threshold crossed → the client probes with a `ping`. + client + .run_open_stream_keepalive_sweep(t0 + Duration::from_millis(IDLE_MS + 5)) + .await; + let nonce = wait_for_published_frame(&server_pool, "ping").await; + assert_eq!(nonce, "tok:1", "ping nonce should be {{token}}:{{n}}"); + + // A matching pong clears the probe; the stream must NOT abort. + publish_frame(&base, &client_pubkey, 2, OpenStreamFrame::Pong { nonce }).await; + // Give the inbound pong time to land. + tokio::time::sleep(Duration::from_millis(30)).await; + client + .run_open_stream_keepalive_sweep(t0 + Duration::from_millis(IDLE_MS + PROBE_MS + 10)) + .await; + assert!( + session.is_active(), + "a matching pong must keep the stream alive" + ); + + let _ = client.close().await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn idle_plus_probe_elapse_no_pong_stream_aborts() { + let (client_pool, server_pool) = MockRelayPool::create_pair(); + let server_pubkey = server_pool.mock_public_key(); + let client_pubkey = client_pool.mock_public_key(); + let server_pool = Arc::new(server_pool); + + let mut client = start_keepalive_client(client_pool, &server_pubkey).await; + let base = greybox_server_base(&server_pool); + + publish_frame(&base, &client_pubkey, 1, start_frame()).await; + let mut session = wait_for_started_session(&client).await; + let t0 = Instant::now(); + + // Probe, then let the probe deadline pass with no pong → abort. + client + .run_open_stream_keepalive_sweep(t0 + Duration::from_millis(IDLE_MS + 5)) + .await; + let _ = wait_for_published_frame(&server_pool, "ping").await; + client + .run_open_stream_keepalive_sweep(t0 + Duration::from_millis(IDLE_MS + PROBE_MS + 10)) + .await; + + match session.next().await { + Some(Err(error)) => assert!( + error.to_string().contains("Probe timeout"), + "expected a probe-timeout abort, got: {error}" + ), + other => panic!("expected a terminal abort, got {other:?}"), + } + assert!(!session.is_active()); + + let _ = client.close().await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn close_with_missing_chunk_waits_grace_then_aborts() { + let (client_pool, server_pool) = MockRelayPool::create_pair(); + let server_pubkey = server_pool.mock_public_key(); + let client_pubkey = client_pool.mock_public_key(); + let server_pool = Arc::new(server_pool); + + let mut client = start_keepalive_client(client_pool, &server_pubkey).await; + let base = greybox_server_base(&server_pool); + + // start, then an out-of-order chunk (index 1 — leaves a gap at 0), then a + // close: the gap is unresolved with a chunk buffered, so the grace timer arms. + publish_frame(&base, &client_pubkey, 1, start_frame()).await; + let mut session = wait_for_started_session(&client).await; + publish_frame( + &base, + &client_pubkey, + 2, + OpenStreamFrame::Chunk { + chunk_index: 1, + data: "late".to_string(), + }, + ) + .await; + publish_frame( + &base, + &client_pubkey, + 3, + OpenStreamFrame::Close { + last_chunk_index: None, + }, + ) + .await; + // Let the close land (arming the grace timer). + tokio::time::sleep(Duration::from_millis(30)).await; + let t0 = Instant::now(); + + // Before the grace deadline: nothing. After it: abort. (`t0` is ~30 ms past + // the close, so the deadline sits at ~`t0 + GRACE_MS - 30`.) + client + .run_open_stream_keepalive_sweep(t0 + Duration::from_millis(100)) + .await; + assert!(session.is_active(), "must survive until the grace deadline"); + client + .run_open_stream_keepalive_sweep(t0 + Duration::from_millis(GRACE_MS + 100)) + .await; + + match session.next().await { + Some(Err(error)) => assert!( + error.to_string().contains("Close grace period expired"), + "expected a close-grace abort, got: {error}" + ), + other => panic!("expected a terminal abort, got {other:?}"), + } + + let _ = client.close().await; +}