From e63a86ea406d10ae82bec4aeceae14902a2142b4 Mon Sep 17 00:00:00 2001 From: Ajit Koti Date: Thu, 19 Mar 2026 18:57:26 -0700 Subject: [PATCH 1/2] Add Session Stream --- README.md | 11 +- docs/README.md | 8 +- docs/architecture.md | 2 +- docs/examples.md | 16 ++- docs/protocol.md | 20 ++- src/lib.rs | 1 + src/runtime.rs | 57 ++++++++ src/server.rs | 328 +++++++++++++++++++++++++++++++++++++++++-- src/stream_bus.rs | 80 +++++++++++ 9 files changed, 492 insertions(+), 31 deletions(-) create mode 100644 src/stream_bus.rs diff --git a/README.md b/README.md index b429142..d960dd7 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ Reference runtime for the Multi-Agent Coordination Protocol (MACP). -This runtime implements the current MACP core/service surface, the five standards-track modes in the main RFC repository, and one experimental `multi_round` mode that remains available only by explicit canonical name. The focus of this release is freeze-readiness for SDKs and real-world unary integrations: strict `SessionStart`, mode-semantic correctness, authenticated senders, bounded resources, and durable restart recovery. +This runtime implements the current MACP core/service surface, the five standards-track modes in the main RFC repository, and one experimental `multi_round` mode that remains available only by explicit canonical name. The focus of this release is freeze-readiness for SDKs and real-world unary and streaming integrations: strict `SessionStart`, mode-semantic correctness, authenticated senders, bounded resources, and durable restart recovery. ## What changed in v0.4.0 @@ -28,8 +28,9 @@ This runtime implements the current MACP core/service surface, the five standard - per-session append-only log files and session snapshots via `FileBackend` - crash recovery with dedup state reconciliation - atomic writes (tmp file + rename) prevent partial-write corruption -- **Unary freeze profile** - - `StreamSession` is intentionally disabled in this profile +- **Authoritative session streaming** + - `StreamSession` now emits accepted MACP envelopes for one bound session per stream + - mixed-session streams are rejected - `WatchModeRegistry` and `WatchRoots` remain unimplemented ## Implemented modes @@ -176,7 +177,7 @@ cargo run --bin fuzz_client | `GetManifest` | implemented | | `ListModes` | implemented | | `ListRoots` | implemented | -| `StreamSession` | intentionally disabled in freeze profile | +| `StreamSession` | implemented (accepted-envelope session stream) | | `WatchModeRegistry` | unimplemented | | `WatchRoots` | unimplemented | @@ -205,6 +206,6 @@ runtime/ - The RFC/spec repository remains the normative source for protocol semantics. - This runtime only accepts the canonical standards-track mode identifiers for the five main modes. - `multi_round` remains experimental and is not advertised by discovery RPCs. -- `StreamSession` is intentionally not part of the freeze surface for the first SDKs. +- `StreamSession` is available for bidirectional session-scoped coordination. Use `Send` when you need per-message negative acknowledgements. The current implementation streams future accepted envelopes from the time the stream binds; it does not backfill earlier accepted history. See `docs/README.md` and `docs/examples.md` for the updated local development and usage guidance. diff --git a/docs/README.md b/docs/README.md index fb7a3e9..dfefa44 100644 --- a/docs/README.md +++ b/docs/README.md @@ -6,7 +6,7 @@ The RFC/spec repository is still the normative source for MACP semantics. These ## What is in this runtime profile -- unary-first MACP server over gRPC +- MACP server over gRPC with unary RPCs and per-session bidirectional streaming - five standards-track modes from the main RFC repository - one experimental `macp.mode.multi_round.v1` mode kept off discovery surfaces - strict canonical `SessionStart` for standards-track modes @@ -24,21 +24,21 @@ The RFC/spec repository is still the normative source for MACP semantics. These ## Freeze profile -The current runtime is intended to be the freeze candidate for unary SDKs and reference examples. +The current runtime is intended to be the freeze candidate for unary and streaming SDKs and reference examples. Implemented and supported: - `Initialize` - `Send` +- `StreamSession` - `GetSession` - `CancelSession` - `GetManifest` - `ListModes` - `ListRoots` -Not part of the freeze surface: +Not yet implemented: -- `StreamSession` is intentionally disabled in this profile - `WatchModeRegistry` is unimplemented - `WatchRoots` is unimplemented diff --git a/docs/architecture.md b/docs/architecture.md index c5ee203..085e87a 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -93,4 +93,4 @@ Responsibilities: ## Freeze-profile design choice -The runtime intentionally prioritizes unary correctness over streaming completeness. `StreamSession` is therefore disabled in this release profile so SDKs can target a stable, explicit surface. +The runtime now exposes `StreamSession` as a per-session accepted-envelope stream. Each gRPC stream binds to one session and receives canonical MACP envelopes in runtime acceptance order. Unary `Send` remains the path for explicit per-message acknowledgement semantics. diff --git a/docs/examples.md b/docs/examples.md index c019557..922124a 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -1,6 +1,6 @@ # Examples and local development guide -These examples target `macp-runtime v0.4.0` and the unary freeze profile. +These examples target `macp-runtime v0.4.0` and the stream-capable freeze profile. They intentionally use the local-development security shortcut: @@ -141,7 +141,19 @@ cargo run --bin multi_round_client This mode is still experimental. It remains callable by the explicit canonical name `macp.mode.multi_round.v1`, but it is not advertised by discovery RPCs and it does not use the strict standards-track `SessionStart` contract. -## Example 7: Freeze-check / error-path client +## Example 7: StreamSession + +`StreamSession` emits only accepted canonical MACP envelopes. A single gRPC stream binds to one session. If a client needs negative per-message acknowledgements, it should continue to use `Send`. + +Practical notes: + +- bind a stream by sending a session-scoped envelope for the target session +- use `SessionStart` to create a new session over the stream +- stream attachment starts observing future accepted envelopes from the bind point; it does not replay earlier history +- use a session-scoped `Signal` envelope with the correct `session_id` and `mode` to attach to an existing session without mutating it +- mixed-session streams are rejected with `FAILED_PRECONDITION` + +## Example 8: Freeze-check / error-path client Run: diff --git a/docs/protocol.md b/docs/protocol.md index 022301e..1119c3f 100644 --- a/docs/protocol.md +++ b/docs/protocol.md @@ -10,22 +10,34 @@ The RFC/spec repository is the normative source for protocol semantics. This fil Clients should call `Initialize` before using the runtime. -## Implemented unary RPCs +## Implemented RPCs - `Initialize` - `Send` +- `StreamSession` - `GetSession` - `CancelSession` - `GetManifest` - `ListModes` - `ListRoots` -## Not in the freeze surface +## Still unimplemented -- `StreamSession` exists in the protobuf surface but is intentionally disabled in this runtime profile - `WatchModeRegistry` is unimplemented - `WatchRoots` is unimplemented +## StreamSession profile + +`StreamSession` is session-scoped and authoritative for accepted envelopes: + +- one gRPC stream binds to one non-empty `session_id` +- the server emits only accepted canonical MACP envelopes +- stream attachment observes future accepted envelopes from the bind point; it does not backfill earlier history +- accepted envelope order matches runtime admission order for that session +- mixed-session streams are rejected with `FAILED_PRECONDITION` +- stream-level validation failures terminate the stream with a gRPC status; use `Send` if you need explicit per-message negative acknowledgements +- to attach to an existing session without mutating it, send a session-scoped `Signal` envelope with the correct `session_id` and `mode` + ## Standards-track mode rules For these modes: @@ -81,4 +93,4 @@ For standards-track modes, `CommitmentPayload` must carry version fields that ma ## Discovery notes -`ListModes` returns the five standards-track modes. `GetManifest` exposes a freeze-profile manifest that matches the implemented unary capabilities. +`ListModes` returns the five standards-track modes. `GetManifest` exposes a manifest that matches the implemented unary and streaming capabilities. diff --git a/src/lib.rs b/src/lib.rs index 689f39f..770c7f7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -29,5 +29,6 @@ pub mod registry; pub mod runtime; pub mod session; pub mod storage; +pub mod stream_bus; pub mod security; diff --git a/src/runtime.rs b/src/runtime.rs index cbee836..af1cbaa 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -18,6 +18,7 @@ use crate::session::{ validate_standard_session_start_payload, Session, SessionState, }; use crate::storage::StorageBackend; +use crate::stream_bus::SessionStreamBus; const EXPERIMENTAL_DEFAULT_TTL_MS: i64 = 60_000; @@ -31,6 +32,7 @@ pub struct Runtime { pub storage: Arc, pub registry: Arc, pub log_store: Arc, + stream_bus: Arc, modes: HashMap>, } @@ -52,6 +54,7 @@ impl Runtime { storage, registry, log_store, + stream_bus: Arc::new(SessionStreamBus::default()), modes, } } @@ -64,6 +67,19 @@ impl Runtime { .collect() } + pub fn subscribe_session_stream( + &self, + session_id: &str, + ) -> tokio::sync::broadcast::Receiver { + self.stream_bus.subscribe(session_id) + } + + fn publish_accepted_envelope(&self, env: &Envelope) { + if !env.session_id.is_empty() { + self.stream_bus.publish(&env.session_id, env.clone()); + } + } + fn make_incoming_entry(env: &Envelope) -> LogEntry { LogEntry { message_id: env.message_id.clone(), @@ -259,6 +275,7 @@ impl Runtime { // 3. Best-effort session save self.save_session_to_storage(&session).await; guard.insert(env.session_id.clone(), session); + self.publish_accepted_envelope(env); Ok(ProcessResult { session_state: result_state, @@ -323,6 +340,7 @@ impl Runtime { // 3. Best-effort session save self.save_session_to_storage(session).await; + self.publish_accepted_envelope(env); Ok(ProcessResult { session_state: result_state, @@ -714,6 +732,45 @@ mod tests { assert_eq!(result.session_state, SessionState::Expired); } + #[tokio::test] + async fn accepted_envelopes_are_published_in_order() { + let rt = make_runtime(); + let mut events = rt.subscribe_session_stream("s1"); + + let start = env( + "macp.mode.decision.v1", + "SessionStart", + "m1", + "s1", + "agent://orchestrator", + session_start(vec!["agent://fraud".into()]), + ); + rt.process(&start, None).await.unwrap(); + let first = events.recv().await.unwrap(); + assert_eq!(first.message_id, "m1"); + assert_eq!(first.message_type, "SessionStart"); + + let proposal = ProposalPayload { + proposal_id: "p1".into(), + option: "step-up".into(), + rationale: "risk".into(), + supporting_data: vec![], + } + .encode_to_vec(); + let proposal_env = env( + "macp.mode.decision.v1", + "Proposal", + "m2", + "s1", + "agent://orchestrator", + proposal, + ); + rt.process(&proposal_env, None).await.unwrap(); + let second = events.recv().await.unwrap(); + assert_eq!(second.message_id, "m2"); + assert_eq!(second.message_type, "Proposal"); + } + #[tokio::test] async fn commitment_versions_are_carried_into_resolution() { let rt = make_runtime(); diff --git a/src/server.rs b/src/server.rs index aa64c99..b385ac3 100644 --- a/src/server.rs +++ b/src/server.rs @@ -18,6 +18,11 @@ use std::collections::HashMap; use std::sync::Arc; use tonic::{Request, Response, Status}; +type SessionResponseStream = std::pin::Pin< + Box> + Send>, +>; + +#[derive(Clone)] pub struct MacpServer { runtime: Arc, security: SecurityLayer, @@ -97,9 +102,6 @@ impl MacpServer { self.security .enforce_rate_limit(&identity.sender, is_session_start) .await?; - // max_open_sessions is passed to runtime.process() where it is - // enforced atomically under the session write lock, avoiding a - // TOCTOU race between the count check and session insertion. let max_open = if is_session_start { identity.max_open_sessions } else { @@ -132,6 +134,225 @@ impl MacpServer { Ok(identity) } + fn try_next_stream_event( + receiver: &mut Option>, + ) -> Result, Status> { + use tokio::sync::broadcast::error::TryRecvError; + + let rx = match receiver.as_mut() { + Some(rx) => rx, + None => return Ok(None), + }; + + match rx.try_recv() { + Ok(envelope) => Ok(Some(envelope)), + Err(TryRecvError::Empty) => Ok(None), + Err(TryRecvError::Closed) => { + *receiver = None; + Ok(None) + } + Err(TryRecvError::Lagged(skipped)) => Err(Status::resource_exhausted(format!( + "StreamSession receiver fell behind by {skipped} envelopes" + ))), + } + } + + async fn process_stream_request( + &self, + identity: &AuthIdentity, + req: StreamSessionRequest, + bound_session_id: &mut Option, + session_events: &mut Option>, + ) -> Result<(), Status> { + let envelope = req.envelope.ok_or_else(|| { + Status::invalid_argument("StreamSessionRequest must contain an envelope") + })?; + + self.validate_envelope_shape(&envelope) + .map_err(Self::status_from_error)?; + if envelope.session_id.trim().is_empty() { + return Err(Status::invalid_argument( + "StreamSession requires a non-empty session_id", + )); + } + if envelope.mode.trim().is_empty() { + return Err(Status::invalid_argument( + "StreamSession requires a non-empty mode", + )); + } + if let Some(bound) = bound_session_id.as_ref() { + if bound != &envelope.session_id { + return Err(Status::failed_precondition( + "StreamSession may only carry envelopes for one session_id", + )); + } + } + + let envelope = Self::apply_authenticated_sender(identity, envelope) + .map_err(Self::status_from_error)?; + let is_session_start = envelope.message_type == "SessionStart"; + + if !is_session_start { + if let Some(session) = self.runtime.get_session_checked(&envelope.session_id).await { + if envelope.mode != session.mode { + return Err(Status::failed_precondition( + "INVALID_ENVELOPE: envelope mode does not match the bound session mode", + )); + } + if session.state != SessionState::Open { + return Err(Status::failed_precondition("SESSION_NOT_OPEN")); + } + } else if envelope.message_type == "Signal" { + return Err(Status::not_found(format!( + "Session '{}' not found", + envelope.session_id + ))); + } + } + + self.security + .authorize_mode(identity, &envelope.mode, is_session_start) + .map_err(Self::status_from_error)?; + self.security + .enforce_rate_limit(&identity.sender, is_session_start) + .await + .map_err(Self::status_from_error)?; + + if session_events.is_none() { + *bound_session_id = Some(envelope.session_id.clone()); + *session_events = Some(self.runtime.subscribe_session_stream(&envelope.session_id)); + } + + let max_open = if is_session_start { + identity.max_open_sessions + } else { + None + }; + self.runtime + .process(&envelope, max_open) + .await + .map_err(Self::status_from_error)?; + Ok(()) + } + + fn build_stream_session_stream( + &self, + identity: AuthIdentity, + inbound: S, + ) -> SessionResponseStream + where + S: futures_core::Stream> + Send + 'static, + { + use tokio::sync::broadcast; + use tokio_stream::StreamExt; + + // Actions collected from tokio::select! arms to process outside the + // select scope, avoiding borrow and macro-expansion issues with `?` + // and `yield` inside select branches within try_stream!. + enum StreamAction { + ProcessRequest(StreamSessionRequest), + EmitEnvelope(Envelope), + ClientError(Status), + ClientDone, + EventsClosed, + Lagged(u64), + } + + let server = self.clone(); + let output = async_stream::try_stream! { + let mut inbound = Box::pin(inbound); + let mut bound_session_id: Option = None; + let mut session_events: Option> = None; + + loop { + if session_events.is_some() { + let action = { + let events = session_events.as_mut().unwrap(); + tokio::select! { + maybe_req = inbound.next() => { + match maybe_req { + Some(Ok(req)) => StreamAction::ProcessRequest(req), + Some(Err(status)) => StreamAction::ClientError(status), + None => StreamAction::ClientDone, + } + } + recv_result = events.recv() => { + match recv_result { + Ok(envelope) => StreamAction::EmitEnvelope(envelope), + Err(broadcast::error::RecvError::Closed) => StreamAction::EventsClosed, + Err(broadcast::error::RecvError::Lagged(n)) => StreamAction::Lagged(n), + } + } + } + }; + + match action { + StreamAction::ProcessRequest(req) => { + server + .process_stream_request( + &identity, + req, + &mut bound_session_id, + &mut session_events, + ) + .await?; + while let Some(envelope) = Self::try_next_stream_event(&mut session_events)? { + yield StreamSessionResponse { + envelope: Some(envelope), + }; + } + } + StreamAction::EmitEnvelope(envelope) => { + yield StreamSessionResponse { + envelope: Some(envelope), + }; + } + StreamAction::ClientError(status) => { + Err(status)?; + } + StreamAction::ClientDone => { + while let Some(envelope) = Self::try_next_stream_event(&mut session_events)? { + yield StreamSessionResponse { + envelope: Some(envelope), + }; + } + break; + } + StreamAction::EventsClosed => { + session_events = None; + } + StreamAction::Lagged(skipped) => { + Err(Status::resource_exhausted(format!( + "StreamSession receiver fell behind by {skipped} envelopes" + )))?; + } + } + } else { + match inbound.next().await { + Some(Ok(req)) => { + server + .process_stream_request( + &identity, + req, + &mut bound_session_id, + &mut session_events, + ) + .await?; + while let Some(envelope) = Self::try_next_stream_event(&mut session_events)? { + yield StreamSessionResponse { + envelope: Some(envelope), + }; + } + } + Some(Err(status)) => Err(status)?, + None => break, + } + } + } + }; + Box::pin(output) + } + fn status_from_error(err: MacpError) -> Status { match err { MacpError::Unauthenticated => Status::unauthenticated(err.to_string()), @@ -167,7 +388,7 @@ impl MacpRuntimeService for MacpServer { website_url: String::new(), }), capabilities: Some(Capabilities { - sessions: Some(SessionsCapability { stream: false }), + sessions: Some(SessionsCapability { stream: true }), cancellation: Some(CancellationCapability { cancel_session: true, }), @@ -184,7 +405,7 @@ impl MacpRuntimeService for MacpServer { experimental: None, }), supported_modes: self.runtime.registered_mode_names(), - instructions: "Authenticate requests with Authorization: Bearer . For local development only, x-macp-agent-id may be enabled by configuration.".into(), + instructions: "Authenticate requests with Authorization: Bearer . Use StreamSession for session-scoped bidirectional streaming of accepted envelopes. For local development only, x-macp-agent-id may be enabled by configuration.".into(), })) } @@ -350,17 +571,20 @@ impl MacpRuntimeService for MacpServer { Ok(Response::new(ListRootsResponse { roots: vec![] })) } - type StreamSessionStream = std::pin::Pin< - Box> + Send>, - >; + type StreamSessionStream = SessionResponseStream; async fn stream_session( &self, - _request: Request>, + request: Request>, ) -> Result, Status> { - Err(Status::unimplemented( - "StreamSession is intentionally disabled in the unary freeze profile", - )) + let identity = self + .security + .authenticate_metadata(request.metadata()) + .map_err(Self::status_from_error)?; + Ok(Response::new(self.build_stream_session_stream( + identity, + request.into_inner(), + ))) } type WatchModeRegistryStream = std::pin::Pin< @@ -509,9 +733,83 @@ mod tests { assert_eq!(err.code(), tonic::Code::PermissionDenied); } - // Note: stream_session cannot be unit-tested directly because - // tonic::Streaming requires an HTTP/2 body. The endpoint returns - // Status::unimplemented, verified via integration testing. + fn stream_identity(sender: &str) -> AuthIdentity { + AuthIdentity { + sender: sender.into(), + allowed_modes: None, + can_start_sessions: true, + max_open_sessions: None, + } + } + + #[tokio::test] + async fn stream_session_emits_accepted_envelopes_only() { + use tokio_stream::{iter, StreamExt}; + + let (server, _) = make_server(); + let requests = iter(vec![Ok(StreamSessionRequest { + envelope: Some(Envelope { + macp_version: "1.0".into(), + mode: "macp.mode.decision.v1".into(), + message_type: "SessionStart".into(), + message_id: "m1".into(), + session_id: "s1".into(), + sender: String::new(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: start_payload(), + }), + })]); + + let mut stream = + server.build_stream_session_stream(stream_identity("agent://orchestrator"), requests); + + let response = stream.next().await.unwrap().unwrap(); + let envelope = response.envelope.unwrap(); + assert_eq!(envelope.message_type, "SessionStart"); + assert_eq!(envelope.message_id, "m1"); + assert!(stream.next().await.is_none()); + } + + #[tokio::test] + async fn stream_session_rejects_mixed_session_ids() { + use tokio_stream::{iter, StreamExt}; + + let (server, _) = make_server(); + let requests = iter(vec![ + Ok(StreamSessionRequest { + envelope: Some(Envelope { + macp_version: "1.0".into(), + mode: "macp.mode.decision.v1".into(), + message_type: "SessionStart".into(), + message_id: "m1".into(), + session_id: "s1".into(), + sender: String::new(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: start_payload(), + }), + }), + Ok(StreamSessionRequest { + envelope: Some(Envelope { + macp_version: "1.0".into(), + mode: "macp.mode.decision.v1".into(), + message_type: "SessionStart".into(), + message_id: "m2".into(), + session_id: "s2".into(), + sender: String::new(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: start_payload(), + }), + }), + ]); + + let mut stream = + server.build_stream_session_stream(stream_identity("agent://orchestrator"), requests); + + let first = stream.next().await.unwrap().unwrap(); + assert_eq!(first.envelope.unwrap().session_id, "s1"); + let err = stream.next().await.unwrap().unwrap_err(); + assert_eq!(err.code(), tonic::Code::FailedPrecondition); + } #[tokio::test] async fn list_modes_returns_standard_modes() { diff --git a/src/stream_bus.rs b/src/stream_bus.rs new file mode 100644 index 0000000..25ad852 --- /dev/null +++ b/src/stream_bus.rs @@ -0,0 +1,80 @@ +use crate::pb::Envelope; +use std::collections::HashMap; +use std::sync::Mutex; +use tokio::sync::broadcast; + +const DEFAULT_SESSION_STREAM_CAPACITY: usize = 256; + +pub struct SessionStreamBus { + channels: Mutex>>, + capacity: usize, +} + +impl Default for SessionStreamBus { + fn default() -> Self { + Self::new(DEFAULT_SESSION_STREAM_CAPACITY) + } +} + +impl SessionStreamBus { + pub fn new(capacity: usize) -> Self { + Self { + channels: Mutex::new(HashMap::new()), + capacity, + } + } + + pub fn subscribe(&self, session_id: &str) -> broadcast::Receiver { + let mut guard = self + .channels + .lock() + .expect("session stream bus lock poisoned"); + guard + .entry(session_id.to_string()) + .or_insert_with(|| { + let (sender, _receiver) = broadcast::channel(self.capacity); + sender + }) + .subscribe() + } + + pub fn publish(&self, session_id: &str, envelope: Envelope) { + let sender = { + let guard = self + .channels + .lock() + .expect("session stream bus lock poisoned"); + guard.get(session_id).cloned() + }; + if let Some(sender) = sender { + let _ = sender.send(envelope); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn env(message_id: &str) -> Envelope { + Envelope { + macp_version: "1.0".into(), + mode: "macp.mode.decision.v1".into(), + message_type: "Proposal".into(), + message_id: message_id.into(), + session_id: "s1".into(), + sender: "agent://sender".into(), + timestamp_unix_ms: 1, + payload: vec![], + } + } + + #[test] + fn subscribe_then_publish_round_trip() { + let bus = SessionStreamBus::default(); + let mut rx = bus.subscribe("s1"); + bus.publish("s1", env("m1")); + let envelope = rx.try_recv().expect("stream event"); + assert_eq!(envelope.message_id, "m1"); + } +} From b979f9324f16a10266f42b9878bdda0326cafd67 Mon Sep 17 00:00:00 2001 From: Ajit Koti Date: Thu, 19 Mar 2026 19:03:37 -0700 Subject: [PATCH 2/2] Fix Clippy Issue --- src/server.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/server.rs b/src/server.rs index b385ac3..3283770 100644 --- a/src/server.rs +++ b/src/server.rs @@ -134,6 +134,7 @@ impl MacpServer { Ok(identity) } + #[allow(clippy::result_large_err)] fn try_next_stream_event( receiver: &mut Option>, ) -> Result, Status> {