diff --git a/Cargo.lock b/Cargo.lock index a28279f..7d9ce16 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1082,9 +1082,9 @@ dependencies = [ [[package]] name = "macp-proto" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ca7b5c6a2f0de7adb7c6159b00b11f8385eb216b0e68c74f2ef7a95fbd980ea" +checksum = "0b75392118927e66872d554e8ee365af4f054470bfe39f24cbf3070d59dd7d5e" [[package]] name = "macp-runtime" diff --git a/Cargo.toml b/Cargo.toml index d3538d2..6efe0fb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,7 @@ opentelemetry-otlp = { version = "0.15", features = ["tonic"], optional = true } tracing-opentelemetry = { version = "0.23", optional = true } rocksdb = { version = "0.22", optional = true } redis = { version = "0.27", features = ["tokio-comp", "aio"], optional = true } -macp-proto = "0.1.1" +macp-proto = "0.1.2" [dev-dependencies] tempfile = "3" diff --git a/README.md b/README.md index 7ac56cc..bd0abca 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,7 @@ This runtime implements the current MACP core/service surface, five standards-tr - **StreamSession enabled** - `Initialize` advertises `stream: true` - `StreamSession` provides per-session bidirectional streaming of accepted envelopes + - Passive subscribe (RFC-MACP-0006-A1): a `subscribe_session_id` + `after_sequence` frame replays accepted history and then delivers live envelopes; allowed for declared participants, the initiator, or observer identities - `WatchModeRegistry` fires live `RegistryChanged` events on mode register/unregister/promote - `WatchRoots` implemented (basic: send initial state, hold stream open) - **Extension mode lifecycle** diff --git a/docs/API.md b/docs/API.md index 9323bb6..868e016 100644 --- a/docs/API.md +++ b/docs/API.md @@ -81,6 +81,8 @@ rpc StreamSession(stream StreamSessionRequest) returns (stream StreamSessionResp The first envelope on the stream binds it to a `session_id`. All subsequent envelopes must target the same session. Responses contain either an accepted `envelope` or an application-level `error` (the stream stays open for application errors). If the client falls behind the broadcast buffer, the stream terminates with `ResourceExhausted`. +**Passive subscribe** (RFC-MACP-0006-A1). A client may observe a session without sending envelopes by sending a request frame where `envelope` is absent and `subscribe_session_id` is set. The runtime replays the session's accepted history starting at log index `after_sequence` (0 = replay from session start) and then delivers live envelopes on the same stream. A single frame must not contain both an `envelope` and `subscribe_session_id` -- the stream terminates with `InvalidArgument` if both are set. Subscribes bind the stream to the given session just like a first envelope; mixing session IDs on the same stream is rejected. Authorization: the caller must be the session initiator, a declared participant, or hold the `is_observer` identity capability. Non-participants receive an inline `FORBIDDEN` error frame and the stream stays open. + ## Session Lifecycle ### GetSession diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md index 87db607..6eb4639 100644 --- a/docs/sdk-guide.md +++ b/docs/sdk-guide.md @@ -96,14 +96,14 @@ Use `Send` when you need an explicit acknowledgement per message, or for fire-an A `StreamSession` connection follows this pattern: 1. Open a bidirectional stream. -2. Send the first envelope, which binds the stream to that `session_id`. +2. Send the first envelope, which binds the stream to that `session_id`. Alternatively, send a **passive subscribe** frame (RFC-MACP-0006-A1) where `envelope` is absent and `subscribe_session_id` is set -- the runtime replays accepted history from log index `after_sequence` and then delivers live envelopes on the same stream. Set `after_sequence = 0` to replay from session start; use a higher value to resume after a known checkpoint. 3. Receive accepted envelopes from all participants in the session. -4. Send additional envelopes as needed. +4. Send additional envelopes as needed (not required for passive observers). 5. The stream closes on client disconnect, lag overflow, auth failure, or server shutdown. -All envelopes on a stream must target the same session. The stream only delivers envelopes accepted after the bind point -- there is no backfill of earlier history. +All envelopes on a stream must target the same session. A single frame must not set both `envelope` and `subscribe_session_id` -- the stream terminates with `InvalidArgument` if both are set. Passive subscribe is authorized for the session initiator, declared participants, and observer identities; non-participants receive an inline `FORBIDDEN` error frame without closing the stream. -Application-level errors (validation failures, authorization denials) are delivered as inline `MACPError` messages and the stream stays open. Transport-level errors (unauthenticated, internal) close the stream. +Application-level errors (validation failures, authorization denials) are delivered as inline `MACPError` messages and the stream stays open. Transport-level errors (unauthenticated, internal, unknown session on subscribe) close the stream. ### Handling stream lag diff --git a/docs/testing.md b/docs/testing.md index f80e8b1..02b11cc 100644 --- a/docs/testing.md +++ b/docs/testing.md @@ -37,7 +37,7 @@ integration_tests/ ### Three tiers -**Tier 1: Protocol tests** (47 tests) exercise every mode through scripted gRPC calls. These tests cover the full protocol surface: `Initialize` negotiation, happy-path flows for all five standard modes plus multi-round, signals, deduplication, version binding, cancellation authorization, session lifecycle, mode registry operations, and discovery RPCs. They run in under a second with no external dependencies. +**Tier 1: Protocol tests** exercise every mode through scripted gRPC calls. These tests cover the full protocol surface: `Initialize` negotiation, happy-path flows for all five standard modes plus multi-round, signals, deduplication, version binding, cancellation authorization, session lifecycle, `StreamSession` including RFC-MACP-0006-A1 passive subscribe (`test_passive_subscribe.rs` -- history replay with `after_sequence` offsets, unknown-session and non-participant rejection, late-joiner replay-then-live delivery), mode registry operations, JWT bearer auth, and discovery RPCs. They run in under a second with no external dependencies. **Tier 2: Rig agent tools** (5 tests) validate the MACP operations implemented as Rig `Tool` trait objects. These are called through `ToolSet::call()`, the same interface an LLM agent would use. They cover all five standard modes and verify that the tool abstraction correctly maps to gRPC operations. diff --git a/integration_tests/Cargo.lock b/integration_tests/Cargo.lock index ff2fb7c..a619b29 100644 --- a/integration_tests/Cargo.lock +++ b/integration_tests/Cargo.lock @@ -1058,6 +1058,7 @@ dependencies = [ "serde_json", "thiserror 1.0.69", "tokio", + "tokio-stream", "tonic", "tracing", "tracing-subscriber", @@ -1066,9 +1067,9 @@ dependencies = [ [[package]] name = "macp-proto" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ca7b5c6a2f0de7adb7c6159b00b11f8385eb216b0e68c74f2ef7a95fbd980ea" +checksum = "0b75392118927e66872d554e8ee365af4f054470bfe39f24cbf3070d59dd7d5e" [[package]] name = "macp-runtime" diff --git a/integration_tests/Cargo.toml b/integration_tests/Cargo.toml index 240459e..6974413 100644 --- a/integration_tests/Cargo.toml +++ b/integration_tests/Cargo.toml @@ -11,6 +11,7 @@ tonic = { version = "0.14", features = ["transport"] } prost = "0.14" tokio = { version = "1", features = ["full", "process"] } +tokio-stream = "0.1" rig-core = "0.34" diff --git a/integration_tests/tests/tier1_jwt.rs b/integration_tests/tests/tier1_jwt.rs index 3fd77f9..bfd337a 100644 --- a/integration_tests/tests/tier1_jwt.rs +++ b/integration_tests/tests/tier1_jwt.rs @@ -14,9 +14,7 @@ use base64::Engine; use jsonwebtoken::{encode, Algorithm, EncodingKey, Header}; use macp_integration_tests::server_manager::ServerManager; use macp_runtime::pb::macp_runtime_service_client::MacpRuntimeServiceClient; -use macp_runtime::pb::{ - Envelope, GetSessionRequest, SendRequest, SessionStartPayload, -}; +use macp_runtime::pb::{Envelope, GetSessionRequest, SendRequest, SessionStartPayload}; use prost::Message; use serde::Serialize; use std::sync::OnceLock; @@ -37,8 +35,7 @@ static JWT_ENDPOINT: OnceLock = OnceLock::new(); fn jwks_json() -> String { let k = base64::engine::general_purpose::STANDARD.encode(SECRET); - serde_json::json!({ "keys": [ { "kty": "oct", "alg": "HS256", "k": k } ] }) - .to_string() + serde_json::json!({ "keys": [ { "kty": "oct", "alg": "HS256", "k": k } ] }).to_string() } async fn endpoint() -> &'static str { @@ -98,10 +95,8 @@ fn sign(claims: &Claims) -> String { fn with_bearer(token: &str, inner: T) -> Request { let mut req = Request::new(inner); - req.metadata_mut().insert( - "authorization", - format!("Bearer {token}").parse().unwrap(), - ); + req.metadata_mut() + .insert("authorization", format!("Bearer {token}").parse().unwrap()); req } diff --git a/integration_tests/tests/tier1_protocol/mod.rs b/integration_tests/tests/tier1_protocol/mod.rs index 27ba4a7..05e7e1a 100644 --- a/integration_tests/tests/tier1_protocol/mod.rs +++ b/integration_tests/tests/tier1_protocol/mod.rs @@ -6,6 +6,7 @@ mod test_handoff_mode; mod test_initialize; mod test_mode_registry; mod test_multi_round_mode; +mod test_passive_subscribe; mod test_policy_registry; mod test_proposal_mode; mod test_quorum_mode; diff --git a/integration_tests/tests/tier1_protocol/test_passive_subscribe.rs b/integration_tests/tests/tier1_protocol/test_passive_subscribe.rs new file mode 100644 index 0000000..98d77f3 --- /dev/null +++ b/integration_tests/tests/tier1_protocol/test_passive_subscribe.rs @@ -0,0 +1,281 @@ +//! RFC-MACP-0006-A1: StreamSession passive subscribe. +//! +//! A client opens a StreamSession and sends a subscribe-only frame +//! (subscribe_session_id + after_sequence, no envelope) to replay accepted +//! history and then receive live envelopes. + +use std::time::Duration; + +use macp_integration_tests::helpers::*; +use macp_runtime::pb::macp_runtime_service_client::MacpRuntimeServiceClient; +use macp_runtime::pb::stream_session_response::Response as StreamResp; +use macp_runtime::pb::{Envelope, StreamSessionRequest, StreamSessionResponse}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tonic::transport::Channel; +use tonic::Streaming; + +use crate::common; + +const COORD: &str = "agent://coordinator"; +const PEER: &str = "agent://voter"; +const OUTSIDER: &str = "agent://outsider"; + +fn subscribe_frame(session_id: &str, after_sequence: u64) -> StreamSessionRequest { + StreamSessionRequest { + subscribe_session_id: session_id.into(), + after_sequence, + envelope: None, + } +} + +fn envelope_frame(env: Envelope) -> StreamSessionRequest { + StreamSessionRequest { + subscribe_session_id: String::new(), + after_sequence: 0, + envelope: Some(env), + } +} + +/// Open a stream bound to `sender` and return the response stream plus the +/// request-sender channel. Dropping the sender closes the client side cleanly. +async fn open_stream( + client: &mut MacpRuntimeServiceClient, + sender: &str, +) -> ( + mpsc::Sender, + Streaming, +) { + let (tx, rx) = mpsc::channel::(8); + let request_stream = ReceiverStream::new(rx); + let mut req = tonic::Request::new(request_stream); + req.metadata_mut().insert( + "authorization", + format!("Bearer {sender}").parse().expect("valid auth"), + ); + let response = client + .stream_session(req) + .await + .expect("stream_session opened"); + (tx, response.into_inner()) +} + +async fn next_envelope(stream: &mut Streaming) -> Envelope { + let resp = tokio::time::timeout(Duration::from_secs(2), stream.message()) + .await + .expect("timed out waiting for envelope") + .expect("stream returned error") + .expect("stream ended unexpectedly"); + match resp.response.expect("response variant") { + StreamResp::Envelope(env) => env, + StreamResp::Error(err) => panic!("expected envelope, got error: {err:?}"), + } +} + +async fn next_error_code(stream: &mut Streaming) -> String { + let resp = tokio::time::timeout(Duration::from_secs(2), stream.message()) + .await + .expect("timed out waiting for error") + .expect("stream returned error") + .expect("stream ended unexpectedly"); + match resp.response.expect("response variant") { + StreamResp::Error(err) => err.code, + StreamResp::Envelope(env) => panic!("expected error, got envelope: {}", env.message_type), + } +} + +async fn start_decision_session_with_proposal( + client: &mut MacpRuntimeServiceClient, + sid: &str, + proposal_id: &str, + proposal_message_id: &str, +) { + let ack = send_as( + client, + COORD, + envelope( + MODE_DECISION, + "SessionStart", + &new_message_id(), + sid, + COORD, + session_start_payload("subscribe test", &[COORD, PEER], 60_000), + ), + ) + .await + .expect("SessionStart Send"); + assert!(ack.ok, "SessionStart rejected: {:?}", ack.error); + + let ack = send_as( + client, + COORD, + envelope( + MODE_DECISION, + "Proposal", + proposal_message_id, + sid, + COORD, + proposal_payload(proposal_id, "option-A", "initial"), + ), + ) + .await + .expect("Proposal Send"); + assert!(ack.ok, "Proposal rejected: {:?}", ack.error); +} + +#[tokio::test] +async fn subscribe_replays_full_history_from_zero() { + let mut client = common::grpc_client().await; + let sid = new_session_id(); + start_decision_session_with_proposal(&mut client, &sid, "p1", "msg-proposal").await; + + let (tx, mut stream) = open_stream(&mut client, PEER).await; + tx.send(subscribe_frame(&sid, 0)) + .await + .expect("send subscribe"); + + let first = next_envelope(&mut stream).await; + assert_eq!(first.message_type, "SessionStart"); + assert_eq!(first.session_id, sid); + + let second = next_envelope(&mut stream).await; + assert_eq!(second.message_type, "Proposal"); + assert_eq!(second.message_id, "msg-proposal"); + + // Drop the request sender — the server stream should drain and close. + drop(tx); + let trailing = tokio::time::timeout(Duration::from_secs(2), stream.message()).await; + assert!( + matches!(trailing, Ok(Ok(None)) | Err(_)), + "expected stream end after client close, got {trailing:?}" + ); +} + +#[tokio::test] +async fn subscribe_with_after_sequence_skips_replayed_entries() { + let mut client = common::grpc_client().await; + let sid = new_session_id(); + start_decision_session_with_proposal(&mut client, &sid, "p1", "msg-proposal").await; + + let (tx, mut stream) = open_stream(&mut client, PEER).await; + // SessionStart is at log index 0, Proposal is at log index 1 — skip the + // SessionStart by asking for entries after_sequence=1. + tx.send(subscribe_frame(&sid, 1)) + .await + .expect("send subscribe"); + + let env = next_envelope(&mut stream).await; + assert_eq!(env.message_type, "Proposal"); + assert_eq!(env.message_id, "msg-proposal"); + + drop(tx); +} + +#[tokio::test] +async fn subscribe_unknown_session_terminates_stream() { + let mut client = common::grpc_client().await; + let (tx, mut stream) = open_stream(&mut client, COORD).await; + tx.send(subscribe_frame("no-such-session", 0)) + .await + .expect("send subscribe"); + + // NOT_FOUND is a terminal error — the stream closes with a tonic Status. + let result = tokio::time::timeout(Duration::from_secs(2), stream.message()) + .await + .expect("timed out waiting for stream termination"); + let status = result.expect_err("expected NotFound status on unknown session"); + assert_eq!(status.code(), tonic::Code::NotFound, "{status:?}"); +} + +#[tokio::test] +async fn subscribe_as_non_participant_yields_inline_error() { + let mut client = common::grpc_client().await; + let sid = new_session_id(); + start_decision_session_with_proposal(&mut client, &sid, "p1", "msg-proposal").await; + + let (tx, mut stream) = open_stream(&mut client, OUTSIDER).await; + tx.send(subscribe_frame(&sid, 0)) + .await + .expect("send subscribe"); + + // PERMISSION_DENIED is non-terminal: the server sends an inline error and + // keeps the stream open. + let code = next_error_code(&mut stream).await; + assert!( + code.contains("FORBIDDEN") || code.to_uppercase().contains("PERMISSION"), + "expected forbidden error code, got {code}" + ); + + drop(tx); +} + +#[tokio::test] +async fn subscribe_replays_history_then_receives_live_envelope() { + let mut client = common::grpc_client().await; + let sid = new_session_id(); + start_decision_session_with_proposal(&mut client, &sid, "p1", "msg-proposal").await; + + let (tx, mut stream) = open_stream(&mut client, PEER).await; + tx.send(subscribe_frame(&sid, 0)) + .await + .expect("send subscribe"); + + // Drain replayed history (SessionStart + Proposal). + assert_eq!( + next_envelope(&mut stream).await.message_type, + "SessionStart" + ); + assert_eq!(next_envelope(&mut stream).await.message_type, "Proposal"); + + // Send an Evaluation via a unary Send from PEER. It should arrive over the + // live stream as well. + let eval_id = "msg-evaluation"; + let ack = send_as( + &mut client, + PEER, + envelope( + MODE_DECISION, + "Evaluation", + eval_id, + &sid, + PEER, + evaluation_payload("p1", "APPROVE", 0.9, "looks good"), + ), + ) + .await + .expect("Evaluation Send"); + assert!(ack.ok, "Evaluation rejected: {:?}", ack.error); + + let live = next_envelope(&mut stream).await; + assert_eq!(live.message_type, "Evaluation"); + assert_eq!(live.message_id, eval_id); + + drop(tx); +} + +#[tokio::test] +async fn stream_request_with_both_envelope_and_subscribe_is_rejected() { + let mut client = common::grpc_client().await; + let sid = new_session_id(); + let (tx, mut stream) = open_stream(&mut client, COORD).await; + + // Build a malformed request that sets both fields. + let env = envelope( + MODE_DECISION, + "SessionStart", + &new_message_id(), + &sid, + COORD, + session_start_payload("bad", &[COORD, PEER], 60_000), + ); + let mut bad = envelope_frame(env); + bad.subscribe_session_id = sid.clone(); + + tx.send(bad).await.expect("send frame"); + + let result = tokio::time::timeout(Duration::from_secs(2), stream.message()) + .await + .expect("timed out waiting for termination"); + let status = result.expect_err("expected InvalidArgument status"); + assert_eq!(status.code(), tonic::Code::InvalidArgument, "{status:?}"); +} diff --git a/integration_tests/tests/tier1_protocol/test_validation_gaps.rs b/integration_tests/tests/tier1_protocol/test_validation_gaps.rs index aed635a..5ff5c99 100644 --- a/integration_tests/tests/tier1_protocol/test_validation_gaps.rs +++ b/integration_tests/tests/tier1_protocol/test_validation_gaps.rs @@ -147,10 +147,7 @@ async fn objection_invalid_severity_rejected() { ) .await .unwrap(); - assert!( - !ack.ok, - "Objection with invalid severity must be rejected" - ); + assert!(!ack.ok, "Objection with invalid severity must be rejected"); } // ── Signal Payload Validation (RFC-MACP-0001 §4) ────────────────────── @@ -191,10 +188,7 @@ async fn signal_empty_signal_type_rejected() { .into_inner() .ack .unwrap(); - assert!( - !ack.ok, - "Signal with empty signal_type must be rejected" - ); + assert!(!ack.ok, "Signal with empty signal_type must be rejected"); } // ── Max Participants (Safety Limit) ─────────────────────────────────── diff --git a/src/log_store.rs b/src/log_store.rs index 0f3a03f..2d62fa7 100644 --- a/src/log_store.rs +++ b/src/log_store.rs @@ -58,6 +58,30 @@ impl LogStore { let guard = self.logs.read().await; guard.get(session_id).cloned() } + + /// Returns incoming log entries with 0-based index >= `after_sequence`. + /// Only `Incoming` entries are returned (Internal/Checkpoint are filtered). + /// RFC-MACP-0006-A1: used by StreamSession passive subscribe to replay + /// accepted session history to late-joining agents. + pub async fn get_incoming_after( + &self, + session_id: &str, + after_sequence: u64, + ) -> Vec<(u64, LogEntry)> { + let guard = self.logs.read().await; + guard + .get(session_id) + .map(|entries| { + entries + .iter() + .enumerate() + .filter(|(_, e)| e.entry_kind == EntryKind::Incoming) + .filter(|(idx, _)| *idx as u64 >= after_sequence) + .map(|(idx, e)| (idx as u64, e.clone())) + .collect() + }) + .unwrap_or_default() + } } #[cfg(test)] @@ -91,4 +115,34 @@ mod tests { assert_eq!(log[0].message_id, "m1"); assert_eq!(log[1].message_id, "m2"); } + + #[tokio::test] + async fn get_incoming_after_filters_by_sequence_and_kind() { + let store = LogStore::new(); + store.create_session_log("s1").await; + store.append("s1", entry("m0", EntryKind::Incoming)).await; + store.append("s1", entry("m1", EntryKind::Internal)).await; + store.append("s1", entry("m2", EntryKind::Incoming)).await; + store.append("s1", entry("m3", EntryKind::Incoming)).await; + store.append("s1", entry("m4", EntryKind::Checkpoint)).await; + + // after_sequence=0 returns all Incoming entries + let all = store.get_incoming_after("s1", 0).await; + assert_eq!(all.len(), 3); + assert_eq!(all[0].1.message_id, "m0"); + assert_eq!(all[1].1.message_id, "m2"); + assert_eq!(all[2].1.message_id, "m3"); + + // after_sequence=2 skips index 0 and 1 + let after2 = store.get_incoming_after("s1", 2).await; + assert_eq!(after2.len(), 2); + assert_eq!(after2[0].0, 2); // index + assert_eq!(after2[0].1.message_id, "m2"); + assert_eq!(after2[1].0, 3); + assert_eq!(after2[1].1.message_id, "m3"); + + // nonexistent session returns empty + let empty = store.get_incoming_after("nope", 0).await; + assert!(empty.is_empty()); + } } diff --git a/src/runtime.rs b/src/runtime.rs index 630d026..8658fee 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -185,6 +185,39 @@ impl Runtime { self.session_lifecycle_bus.subscribe() } + /// RFC-MACP-0006-A1: Replay accepted envelopes from the session log for + /// passive subscribe. Returns `Incoming` log entries as reconstructed + /// `Envelope` values, starting from `after_sequence` (0-based log index). + pub async fn get_session_envelopes_after( + &self, + session_id: &str, + after_sequence: u64, + ) -> Vec { + self.log_store + .get_incoming_after(session_id, after_sequence) + .await + .into_iter() + .map(|(_idx, entry)| Envelope { + macp_version: if entry.macp_version.is_empty() { + "1.0".into() + } else { + entry.macp_version + }, + mode: entry.mode, + message_type: entry.message_type, + message_id: entry.message_id, + session_id: entry.session_id, + sender: entry.sender, + timestamp_unix_ms: if entry.timestamp_unix_ms != 0 { + entry.timestamp_unix_ms + } else { + entry.received_at_ms + }, + payload: entry.raw_payload, + }) + .collect() + } + fn publish_accepted_envelope(&self, env: &Envelope) { if !env.session_id.is_empty() { self.stream_bus.publish(&env.session_id, env.clone()); diff --git a/src/server.rs b/src/server.rs index 064a56e..10c23fd 100644 --- a/src/server.rs +++ b/src/server.rs @@ -235,15 +235,41 @@ impl MacpServer { } } + /// Process a single StreamSessionRequest frame. + /// + /// Returns `Ok(replay_envelopes)` — empty for normal sends, non-empty when + /// a subscribe frame triggers history replay (RFC-MACP-0006-A1). async fn process_stream_request( &self, identity: &AuthIdentity, req: StreamSessionRequest, bound_session_id: &mut Option, session_events: &mut Option>, - ) -> Result<(), Status> { + ) -> Result, Status> { + // RFC-MACP-0006-A1: Handle subscribe-only frame. + // When subscribe_session_id is set and envelope is absent, subscribe to + // the session's broadcast channel and replay accepted history. + if !req.subscribe_session_id.is_empty() { + if req.envelope.is_some() { + return Err(Status::invalid_argument( + "StreamSessionRequest must not contain both envelope and subscribe_session_id", + )); + } + return self + .process_subscribe_frame( + identity, + &req.subscribe_session_id, + req.after_sequence, + bound_session_id, + session_events, + ) + .await; + } + let envelope = req.envelope.ok_or_else(|| { - Status::invalid_argument("StreamSessionRequest must contain an envelope") + Status::invalid_argument( + "StreamSessionRequest must contain an envelope or subscribe_session_id", + ) })?; self.validate_envelope_shape(&envelope) @@ -310,7 +336,66 @@ impl MacpServer { .process(&envelope, max_open) .await .map_err(Self::status_from_error)?; - Ok(()) + Ok(vec![]) + } + + /// RFC-MACP-0006-A1: Process a subscribe-only frame. + /// Subscribes the stream to the session's broadcast channel and replays + /// accepted envelope history from `after_sequence` onwards. + async fn process_subscribe_frame( + &self, + identity: &AuthIdentity, + session_id: &str, + after_sequence: u64, + bound_session_id: &mut Option, + session_events: &mut Option>, + ) -> Result, Status> { + // Validate: only one session per stream + if let Some(bound) = bound_session_id.as_ref() { + if bound != session_id { + return Err(Status::invalid_argument( + "StreamSession may only carry envelopes for one session_id", + )); + } + } + + // Validate session exists + let session = self + .runtime + .get_session_checked(session_id) + .await + .ok_or_else(|| Status::not_found(format!("Session '{}' not found", session_id)))?; + + // Authorize: caller must be a declared participant, initiator, or observer + let allowed = identity.is_observer + || session.initiator_sender == identity.sender + || session.participants.iter().any(|p| p == &identity.sender); + if !allowed { + return Err(Status::permission_denied( + "FORBIDDEN: caller is not a declared participant or observer for this session", + )); + } + + // Subscribe to live broadcast (if not already subscribed) + if session_events.is_none() { + *bound_session_id = Some(session_id.to_string()); + *session_events = Some(self.runtime.subscribe_session_stream(session_id)); + } + + tracing::info!( + session_id = %session_id, + sender = %identity.sender, + after_sequence = after_sequence, + "passive subscribe: replaying session history" + ); + + // Replay accepted envelopes from LogStore + let replay = self + .runtime + .get_session_envelopes_after(session_id, after_sequence) + .await; + + Ok(replay) } fn build_stream_session_stream( @@ -375,7 +460,16 @@ impl MacpServer { ) .await { - Ok(()) => {} + Ok(replay) => { + // RFC-MACP-0006-A1: yield replayed envelopes from subscribe + for env in replay { + yield StreamSessionResponse { + response: Some( + macp_runtime::pb::stream_session_response::Response::Envelope(env), + ), + }; + } + } Err(status) if Self::is_stream_terminal_error(&status) => { Err(status)?; } @@ -446,7 +540,16 @@ impl MacpServer { ) .await { - Ok(()) => {} + Ok(replay) => { + // RFC-MACP-0006-A1: yield replayed envelopes from subscribe + for env in replay { + yield StreamSessionResponse { + response: Some( + macp_runtime::pb::stream_session_response::Response::Envelope(env), + ), + }; + } + } Err(status) if Self::is_stream_terminal_error(&status) => { Err(status)?; } @@ -1329,6 +1432,8 @@ mod tests { let (server, _) = make_server(); let sid = new_sid(); let requests = iter(vec![Ok(StreamSessionRequest { + subscribe_session_id: String::new(), + after_sequence: 0, envelope: Some(Envelope { macp_version: "1.0".into(), mode: "macp.mode.decision.v1".into(), @@ -1363,6 +1468,8 @@ mod tests { let sid2 = new_sid(); let requests = iter(vec![ Ok(StreamSessionRequest { + subscribe_session_id: String::new(), + after_sequence: 0, envelope: Some(Envelope { macp_version: "1.0".into(), mode: "macp.mode.decision.v1".into(), @@ -1375,6 +1482,8 @@ mod tests { }), }), Ok(StreamSessionRequest { + subscribe_session_id: String::new(), + after_sequence: 0, envelope: Some(Envelope { macp_version: "1.0".into(), mode: "macp.mode.decision.v1".into(), @@ -1732,4 +1841,345 @@ mod tests { .unwrap_err(); assert_eq!(err.code(), tonic::Code::FailedPrecondition); } + + // ── RFC-MACP-0006-A1: passive subscribe tests ────────────────────── + + fn observer_identity(sender: &str) -> AuthIdentity { + AuthIdentity { + sender: sender.into(), + allowed_modes: None, + can_start_sessions: false, + max_open_sessions: None, + can_manage_mode_registry: false, + is_observer: true, + } + } + + fn subscribe_frame(session_id: &str, after: u64) -> StreamSessionRequest { + StreamSessionRequest { + subscribe_session_id: session_id.into(), + after_sequence: after, + envelope: None, + } + } + + fn start_multi_participant(participants: Vec) -> Vec { + SessionStartPayload { + intent: "intent".into(), + participants, + mode_version: "1.0.0".into(), + configuration_version: "cfg-1".into(), + policy_version: String::new(), + ttl_ms: 60_000, + context_id: String::new(), + extensions: std::collections::HashMap::new(), + roots: vec![], + } + .encode_to_vec() + } + + async fn start_session( + server: &MacpServer, + initiator: &str, + sid: &str, + participants: Vec, + ) { + let ack = do_send( + server, + initiator, + Envelope { + macp_version: "1.0".into(), + mode: "macp.mode.decision.v1".into(), + message_type: "SessionStart".into(), + message_id: "start".into(), + session_id: sid.into(), + sender: String::new(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: start_multi_participant(participants), + }, + ) + .await; + assert!(ack.ok, "SessionStart failed: {:?}", ack.error); + } + + async fn send_proposal( + server: &MacpServer, + sender: &str, + sid: &str, + message_id: &str, + proposal_id: &str, + ) { + let payload = macp_runtime::decision_pb::ProposalPayload { + proposal_id: proposal_id.into(), + option: "opt".into(), + rationale: "r".into(), + supporting_data: vec![], + } + .encode_to_vec(); + let ack = do_send( + server, + sender, + Envelope { + macp_version: "1.0".into(), + mode: "macp.mode.decision.v1".into(), + message_type: "Proposal".into(), + message_id: message_id.into(), + session_id: sid.into(), + sender: String::new(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload, + }, + ) + .await; + assert!(ack.ok, "Proposal failed: {:?}", ack.error); + } + + #[tokio::test] + async fn subscribe_replays_session_history_from_zero() { + let (server, _) = make_server(); + let sid = new_sid(); + let initiator = "agent://orchestrator"; + let peer = "agent://fraud"; + start_session( + &server, + initiator, + &sid, + vec![initiator.into(), peer.into()], + ) + .await; + send_proposal(&server, peer, &sid, "m2", "p1").await; + + let mut bound = None; + let mut events = None; + let replay = server + .process_stream_request( + &stream_identity(peer), + subscribe_frame(&sid, 0), + &mut bound, + &mut events, + ) + .await + .unwrap(); + + assert_eq!(replay.len(), 2); + assert_eq!(replay[0].message_type, "SessionStart"); + assert_eq!(replay[0].message_id, "start"); + assert_eq!(replay[1].message_type, "Proposal"); + assert_eq!(replay[1].message_id, "m2"); + assert_eq!(bound.as_deref(), Some(sid.as_str())); + assert!(events.is_some()); + } + + #[tokio::test] + async fn subscribe_after_sequence_filters_history() { + let (server, _) = make_server(); + let sid = new_sid(); + let initiator = "agent://orchestrator"; + let peer = "agent://fraud"; + start_session( + &server, + initiator, + &sid, + vec![initiator.into(), peer.into()], + ) + .await; + send_proposal(&server, peer, &sid, "m2", "p1").await; + send_proposal(&server, peer, &sid, "m3", "p2").await; + + let mut bound = None; + let mut events = None; + let replay = server + .process_stream_request( + &stream_identity(peer), + subscribe_frame(&sid, 2), + &mut bound, + &mut events, + ) + .await + .unwrap(); + + assert_eq!(replay.len(), 1); + assert_eq!(replay[0].message_id, "m3"); + } + + #[tokio::test] + async fn subscribe_unknown_session_returns_not_found() { + let (server, _) = make_server(); + let mut bound = None; + let mut events = None; + let status = server + .process_stream_request( + &stream_identity("agent://orchestrator"), + subscribe_frame("missing-session", 0), + &mut bound, + &mut events, + ) + .await + .unwrap_err(); + assert_eq!(status.code(), tonic::Code::NotFound); + assert!(bound.is_none()); + assert!(events.is_none()); + } + + #[tokio::test] + async fn subscribe_non_participant_is_forbidden() { + let (server, _) = make_server(); + let sid = new_sid(); + start_session( + &server, + "agent://orchestrator", + &sid, + vec!["agent://orchestrator".into(), "agent://fraud".into()], + ) + .await; + + let mut bound = None; + let mut events = None; + let status = server + .process_stream_request( + &stream_identity("agent://outsider"), + subscribe_frame(&sid, 0), + &mut bound, + &mut events, + ) + .await + .unwrap_err(); + assert_eq!(status.code(), tonic::Code::PermissionDenied); + } + + #[tokio::test] + async fn subscribe_observer_identity_allowed() { + let (server, _) = make_server(); + let sid = new_sid(); + start_session( + &server, + "agent://orchestrator", + &sid, + vec!["agent://orchestrator".into(), "agent://fraud".into()], + ) + .await; + + let mut bound = None; + let mut events = None; + let replay = server + .process_stream_request( + &observer_identity("agent://auditor"), + subscribe_frame(&sid, 0), + &mut bound, + &mut events, + ) + .await + .unwrap(); + assert_eq!(replay.len(), 1); + assert_eq!(replay[0].message_type, "SessionStart"); + } + + #[tokio::test] + async fn subscribe_initiator_allowed_even_when_not_listed() { + // Per RFC-MACP-0007, the initiator is always authorized for session + // access, even if not present in the participants list. + let (server, _) = make_server(); + let sid = new_sid(); + start_session( + &server, + "agent://orchestrator", + &sid, + vec!["agent://fraud".into()], + ) + .await; + + let mut bound = None; + let mut events = None; + let replay = server + .process_stream_request( + &stream_identity("agent://orchestrator"), + subscribe_frame(&sid, 0), + &mut bound, + &mut events, + ) + .await + .unwrap(); + assert_eq!(replay.len(), 1); + } + + #[tokio::test] + async fn stream_request_with_envelope_and_subscribe_is_rejected() { + let (server, _) = make_server(); + let sid = new_sid(); + let req = StreamSessionRequest { + subscribe_session_id: sid.clone(), + after_sequence: 0, + envelope: Some(Envelope { + macp_version: "1.0".into(), + mode: "macp.mode.decision.v1".into(), + message_type: "SessionStart".into(), + message_id: "m1".into(), + session_id: sid, + sender: String::new(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: start_payload(), + }), + }; + + let mut bound = None; + let mut events = None; + let status = server + .process_stream_request( + &stream_identity("agent://orchestrator"), + req, + &mut bound, + &mut events, + ) + .await + .unwrap_err(); + assert_eq!(status.code(), tonic::Code::InvalidArgument); + } + + #[tokio::test] + async fn subscribe_to_different_session_on_bound_stream_is_rejected() { + let (server, _) = make_server(); + let sid1 = new_sid(); + let sid2 = new_sid(); + start_session( + &server, + "agent://orchestrator", + &sid1, + vec!["agent://orchestrator".into(), "agent://fraud".into()], + ) + .await; + start_session( + &server, + "agent://orchestrator", + &sid2, + vec!["agent://orchestrator".into(), "agent://fraud".into()], + ) + .await; + + // First subscribe binds the stream to sid1 + let identity = stream_identity("agent://fraud"); + let mut bound = None; + let mut events = None; + server + .process_stream_request( + &identity, + subscribe_frame(&sid1, 0), + &mut bound, + &mut events, + ) + .await + .unwrap(); + assert_eq!(bound.as_deref(), Some(sid1.as_str())); + + // Second subscribe to sid2 on the same stream must be rejected + let status = server + .process_stream_request( + &identity, + subscribe_frame(&sid2, 0), + &mut bound, + &mut events, + ) + .await + .unwrap_err(); + assert_eq!(status.code(), tonic::Code::InvalidArgument); + } }