Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ opentelemetry-otlp = { version = "0.15", features = ["tonic"], optional = true }
tracing-opentelemetry = { version = "0.23", optional = true }
rocksdb = { version = "0.22", optional = true }
redis = { version = "0.27", features = ["tokio-comp", "aio"], optional = true }
macp-proto = "0.1.1"
macp-proto = "0.1.2"

[dev-dependencies]
tempfile = "3"
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ This runtime implements the current MACP core/service surface, five standards-tr
- **StreamSession enabled**
- `Initialize` advertises `stream: true`
- `StreamSession` provides per-session bidirectional streaming of accepted envelopes
- Passive subscribe (RFC-MACP-0006-A1): a `subscribe_session_id` + `after_sequence` frame replays accepted history and then delivers live envelopes; allowed for declared participants, the initiator, or observer identities
- `WatchModeRegistry` fires live `RegistryChanged` events on mode register/unregister/promote
- `WatchRoots` implemented (basic: send initial state, hold stream open)
- **Extension mode lifecycle**
Expand Down
2 changes: 2 additions & 0 deletions docs/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ rpc StreamSession(stream StreamSessionRequest) returns (stream StreamSessionResp

The first envelope on the stream binds it to a `session_id`. All subsequent envelopes must target the same session. Responses contain either an accepted `envelope` or an application-level `error` (the stream stays open for application errors). If the client falls behind the broadcast buffer, the stream terminates with `ResourceExhausted`.

**Passive subscribe** (RFC-MACP-0006-A1). A client may observe a session without sending envelopes by sending a request frame where `envelope` is absent and `subscribe_session_id` is set. The runtime replays the session's accepted history starting at log index `after_sequence` (0 = replay from session start) and then delivers live envelopes on the same stream. A single frame must not contain both an `envelope` and `subscribe_session_id` -- the stream terminates with `InvalidArgument` if both are set. Subscribes bind the stream to the given session just like a first envelope; mixing session IDs on the same stream is rejected. Authorization: the caller must be the session initiator, a declared participant, or hold the `is_observer` identity capability. Non-participants receive an inline `FORBIDDEN` error frame and the stream stays open.

## Session Lifecycle

### GetSession
Expand Down
8 changes: 4 additions & 4 deletions docs/sdk-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,14 @@ Use `Send` when you need an explicit acknowledgement per message, or for fire-an
A `StreamSession` connection follows this pattern:

1. Open a bidirectional stream.
2. Send the first envelope, which binds the stream to that `session_id`.
2. Send the first envelope, which binds the stream to that `session_id`. Alternatively, send a **passive subscribe** frame (RFC-MACP-0006-A1) where `envelope` is absent and `subscribe_session_id` is set -- the runtime replays accepted history from log index `after_sequence` and then delivers live envelopes on the same stream. Set `after_sequence = 0` to replay from session start; use a higher value to resume after a known checkpoint.
3. Receive accepted envelopes from all participants in the session.
4. Send additional envelopes as needed.
4. Send additional envelopes as needed (not required for passive observers).
5. The stream closes on client disconnect, lag overflow, auth failure, or server shutdown.

All envelopes on a stream must target the same session. The stream only delivers envelopes accepted after the bind point -- there is no backfill of earlier history.
All envelopes on a stream must target the same session. A single frame must not set both `envelope` and `subscribe_session_id` -- the stream terminates with `InvalidArgument` if both are set. Passive subscribe is authorized for the session initiator, declared participants, and observer identities; non-participants receive an inline `FORBIDDEN` error frame without closing the stream.

Application-level errors (validation failures, authorization denials) are delivered as inline `MACPError` messages and the stream stays open. Transport-level errors (unauthenticated, internal) close the stream.
Application-level errors (validation failures, authorization denials) are delivered as inline `MACPError` messages and the stream stays open. Transport-level errors (unauthenticated, internal, unknown session on subscribe) close the stream.

### Handling stream lag

Expand Down
2 changes: 1 addition & 1 deletion docs/testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ integration_tests/

### Three tiers

**Tier 1: Protocol tests** (47 tests) exercise every mode through scripted gRPC calls. These tests cover the full protocol surface: `Initialize` negotiation, happy-path flows for all five standard modes plus multi-round, signals, deduplication, version binding, cancellation authorization, session lifecycle, mode registry operations, and discovery RPCs. They run in under a second with no external dependencies.
**Tier 1: Protocol tests** exercise every mode through scripted gRPC calls. These tests cover the full protocol surface: `Initialize` negotiation, happy-path flows for all five standard modes plus multi-round, signals, deduplication, version binding, cancellation authorization, session lifecycle, `StreamSession` including RFC-MACP-0006-A1 passive subscribe (`test_passive_subscribe.rs` -- history replay with `after_sequence` offsets, unknown-session and non-participant rejection, late-joiner replay-then-live delivery), mode registry operations, JWT bearer auth, and discovery RPCs. They run in under a second with no external dependencies.

**Tier 2: Rig agent tools** (5 tests) validate the MACP operations implemented as Rig `Tool` trait objects. These are called through `ToolSet::call()`, the same interface an LLM agent would use. They cover all five standard modes and verify that the tool abstraction correctly maps to gRPC operations.

Expand Down
5 changes: 3 additions & 2 deletions integration_tests/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ tonic = { version = "0.14", features = ["transport"] }
prost = "0.14"

tokio = { version = "1", features = ["full", "process"] }
tokio-stream = "0.1"

rig-core = "0.34"

Expand Down
13 changes: 4 additions & 9 deletions integration_tests/tests/tier1_jwt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ use base64::Engine;
use jsonwebtoken::{encode, Algorithm, EncodingKey, Header};
use macp_integration_tests::server_manager::ServerManager;
use macp_runtime::pb::macp_runtime_service_client::MacpRuntimeServiceClient;
use macp_runtime::pb::{
Envelope, GetSessionRequest, SendRequest, SessionStartPayload,
};
use macp_runtime::pb::{Envelope, GetSessionRequest, SendRequest, SessionStartPayload};
use prost::Message;
use serde::Serialize;
use std::sync::OnceLock;
Expand All @@ -37,8 +35,7 @@ static JWT_ENDPOINT: OnceLock<String> = OnceLock::new();

fn jwks_json() -> String {
let k = base64::engine::general_purpose::STANDARD.encode(SECRET);
serde_json::json!({ "keys": [ { "kty": "oct", "alg": "HS256", "k": k } ] })
.to_string()
serde_json::json!({ "keys": [ { "kty": "oct", "alg": "HS256", "k": k } ] }).to_string()
}

async fn endpoint() -> &'static str {
Expand Down Expand Up @@ -98,10 +95,8 @@ fn sign(claims: &Claims) -> String {

fn with_bearer<T>(token: &str, inner: T) -> Request<T> {
let mut req = Request::new(inner);
req.metadata_mut().insert(
"authorization",
format!("Bearer {token}").parse().unwrap(),
);
req.metadata_mut()
.insert("authorization", format!("Bearer {token}").parse().unwrap());
req
}

Expand Down
1 change: 1 addition & 0 deletions integration_tests/tests/tier1_protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod test_handoff_mode;
mod test_initialize;
mod test_mode_registry;
mod test_multi_round_mode;
mod test_passive_subscribe;
mod test_policy_registry;
mod test_proposal_mode;
mod test_quorum_mode;
Expand Down
Loading
Loading