Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Reference runtime for the Multi-Agent Coordination Protocol (MACP).

This runtime implements the current MACP core/service surface, the five standards-track modes in the main RFC repository, and one experimental `multi_round` mode that remains available only by explicit canonical name. The focus of this release is freeze-readiness for SDKs and real-world unary integrations: strict `SessionStart`, mode-semantic correctness, authenticated senders, bounded resources, and durable restart recovery.
This runtime implements the current MACP core/service surface, the five standards-track modes in the main RFC repository, and one experimental `multi_round` mode that remains available only by explicit canonical name. The focus of this release is freeze-readiness for SDKs and real-world unary and streaming integrations: strict `SessionStart`, mode-semantic correctness, authenticated senders, bounded resources, and durable restart recovery.

## What changed in v0.4.0

Expand All @@ -28,8 +28,9 @@ This runtime implements the current MACP core/service surface, the five standard
- per-session append-only log files and session snapshots via `FileBackend`
- crash recovery with dedup state reconciliation
- atomic writes (tmp file + rename) prevent partial-write corruption
- **Unary freeze profile**
- `StreamSession` is intentionally disabled in this profile
- **Authoritative session streaming**
- `StreamSession` now emits accepted MACP envelopes for one bound session per stream
- mixed-session streams are rejected
- `WatchModeRegistry` and `WatchRoots` remain unimplemented

## Implemented modes
Expand Down Expand Up @@ -176,7 +177,7 @@ cargo run --bin fuzz_client
| `GetManifest` | implemented |
| `ListModes` | implemented |
| `ListRoots` | implemented |
| `StreamSession` | intentionally disabled in freeze profile |
| `StreamSession` | implemented (accepted-envelope session stream) |
| `WatchModeRegistry` | unimplemented |
| `WatchRoots` | unimplemented |

Expand Down Expand Up @@ -205,6 +206,6 @@ runtime/
- The RFC/spec repository remains the normative source for protocol semantics.
- This runtime only accepts the canonical standards-track mode identifiers for the five main modes.
- `multi_round` remains experimental and is not advertised by discovery RPCs.
- `StreamSession` is intentionally not part of the freeze surface for the first SDKs.
- `StreamSession` is available for bidirectional session-scoped coordination. Use `Send` when you need per-message negative acknowledgements. The current implementation streams future accepted envelopes from the time the stream binds; it does not backfill earlier accepted history.

See `docs/README.md` and `docs/examples.md` for the updated local development and usage guidance.
8 changes: 4 additions & 4 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ The RFC/spec repository is still the normative source for MACP semantics. These

## What is in this runtime profile

- unary-first MACP server over gRPC
- MACP server over gRPC with unary RPCs and per-session bidirectional streaming
- five standards-track modes from the main RFC repository
- one experimental `macp.mode.multi_round.v1` mode kept off discovery surfaces
- strict canonical `SessionStart` for standards-track modes
Expand All @@ -24,21 +24,21 @@ The RFC/spec repository is still the normative source for MACP semantics. These

## Freeze profile

The current runtime is intended to be the freeze candidate for unary SDKs and reference examples.
The current runtime is intended to be the freeze candidate for unary and streaming SDKs and reference examples.

Implemented and supported:

- `Initialize`
- `Send`
- `StreamSession`
- `GetSession`
- `CancelSession`
- `GetManifest`
- `ListModes`
- `ListRoots`

Not part of the freeze surface:
Not yet implemented:

- `StreamSession` is intentionally disabled in this profile
- `WatchModeRegistry` is unimplemented
- `WatchRoots` is unimplemented

Expand Down
2 changes: 1 addition & 1 deletion docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,4 @@ Responsibilities:

## Freeze-profile design choice

The runtime intentionally prioritizes unary correctness over streaming completeness. `StreamSession` is therefore disabled in this release profile so SDKs can target a stable, explicit surface.
The runtime now exposes `StreamSession` as a per-session accepted-envelope stream. Each gRPC stream binds to one session and receives canonical MACP envelopes in runtime acceptance order. Unary `Send` remains the path for explicit per-message acknowledgement semantics.
16 changes: 14 additions & 2 deletions docs/examples.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Examples and local development guide

These examples target `macp-runtime v0.4.0` and the unary freeze profile.
These examples target `macp-runtime v0.4.0` and the stream-capable freeze profile.

They intentionally use the local-development security shortcut:

Expand Down Expand Up @@ -141,7 +141,19 @@ cargo run --bin multi_round_client

This mode is still experimental. It remains callable by the explicit canonical name `macp.mode.multi_round.v1`, but it is not advertised by discovery RPCs and it does not use the strict standards-track `SessionStart` contract.

## Example 7: Freeze-check / error-path client
## Example 7: StreamSession

`StreamSession` emits only accepted canonical MACP envelopes. A single gRPC stream binds to one session. If a client needs negative per-message acknowledgements, it should continue to use `Send`.

Practical notes:

- bind a stream by sending a session-scoped envelope for the target session
- use `SessionStart` to create a new session over the stream
- stream attachment starts observing future accepted envelopes from the bind point; it does not replay earlier history
- use a session-scoped `Signal` envelope with the correct `session_id` and `mode` to attach to an existing session without mutating it
- mixed-session streams are rejected with `FAILED_PRECONDITION`

## Example 8: Freeze-check / error-path client

Run:

Expand Down
20 changes: 16 additions & 4 deletions docs/protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,34 @@ The RFC/spec repository is the normative source for protocol semantics. This fil

Clients should call `Initialize` before using the runtime.

## Implemented unary RPCs
## Implemented RPCs

- `Initialize`
- `Send`
- `StreamSession`
- `GetSession`
- `CancelSession`
- `GetManifest`
- `ListModes`
- `ListRoots`

## Not in the freeze surface
## Still unimplemented

- `StreamSession` exists in the protobuf surface but is intentionally disabled in this runtime profile
- `WatchModeRegistry` is unimplemented
- `WatchRoots` is unimplemented

## StreamSession profile

`StreamSession` is session-scoped and authoritative for accepted envelopes:

- one gRPC stream binds to one non-empty `session_id`
- the server emits only accepted canonical MACP envelopes
- stream attachment observes future accepted envelopes from the bind point; it does not backfill earlier history
- accepted envelope order matches runtime admission order for that session
- mixed-session streams are rejected with `FAILED_PRECONDITION`
- stream-level validation failures terminate the stream with a gRPC status; use `Send` if you need explicit per-message negative acknowledgements
- to attach to an existing session without mutating it, send a session-scoped `Signal` envelope with the correct `session_id` and `mode`

## Standards-track mode rules

For these modes:
Expand Down Expand Up @@ -81,4 +93,4 @@ For standards-track modes, `CommitmentPayload` must carry version fields that ma

## Discovery notes

`ListModes` returns the five standards-track modes. `GetManifest` exposes a freeze-profile manifest that matches the implemented unary capabilities.
`ListModes` returns the five standards-track modes. `GetManifest` exposes a manifest that matches the implemented unary and streaming capabilities.
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ pub mod registry;
pub mod runtime;
pub mod session;
pub mod storage;
pub mod stream_bus;

pub mod security;
57 changes: 57 additions & 0 deletions src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::session::{
validate_standard_session_start_payload, Session, SessionState,
};
use crate::storage::StorageBackend;
use crate::stream_bus::SessionStreamBus;

const EXPERIMENTAL_DEFAULT_TTL_MS: i64 = 60_000;

Expand All @@ -31,6 +32,7 @@ pub struct Runtime {
pub storage: Arc<dyn StorageBackend>,
pub registry: Arc<SessionRegistry>,
pub log_store: Arc<LogStore>,
stream_bus: Arc<SessionStreamBus>,
modes: HashMap<String, Box<dyn Mode>>,
}

Expand All @@ -52,6 +54,7 @@ impl Runtime {
storage,
registry,
log_store,
stream_bus: Arc::new(SessionStreamBus::default()),
modes,
}
}
Expand All @@ -64,6 +67,19 @@ impl Runtime {
.collect()
}

pub fn subscribe_session_stream(
&self,
session_id: &str,
) -> tokio::sync::broadcast::Receiver<Envelope> {
self.stream_bus.subscribe(session_id)
}

fn publish_accepted_envelope(&self, env: &Envelope) {
if !env.session_id.is_empty() {
self.stream_bus.publish(&env.session_id, env.clone());
}
}

fn make_incoming_entry(env: &Envelope) -> LogEntry {
LogEntry {
message_id: env.message_id.clone(),
Expand Down Expand Up @@ -259,6 +275,7 @@ impl Runtime {
// 3. Best-effort session save
self.save_session_to_storage(&session).await;
guard.insert(env.session_id.clone(), session);
self.publish_accepted_envelope(env);

Ok(ProcessResult {
session_state: result_state,
Expand Down Expand Up @@ -323,6 +340,7 @@ impl Runtime {

// 3. Best-effort session save
self.save_session_to_storage(session).await;
self.publish_accepted_envelope(env);

Ok(ProcessResult {
session_state: result_state,
Expand Down Expand Up @@ -714,6 +732,45 @@ mod tests {
assert_eq!(result.session_state, SessionState::Expired);
}

#[tokio::test]
async fn accepted_envelopes_are_published_in_order() {
let rt = make_runtime();
let mut events = rt.subscribe_session_stream("s1");

let start = env(
"macp.mode.decision.v1",
"SessionStart",
"m1",
"s1",
"agent://orchestrator",
session_start(vec!["agent://fraud".into()]),
);
rt.process(&start, None).await.unwrap();
let first = events.recv().await.unwrap();
assert_eq!(first.message_id, "m1");
assert_eq!(first.message_type, "SessionStart");

let proposal = ProposalPayload {
proposal_id: "p1".into(),
option: "step-up".into(),
rationale: "risk".into(),
supporting_data: vec![],
}
.encode_to_vec();
let proposal_env = env(
"macp.mode.decision.v1",
"Proposal",
"m2",
"s1",
"agent://orchestrator",
proposal,
);
rt.process(&proposal_env, None).await.unwrap();
let second = events.recv().await.unwrap();
assert_eq!(second.message_id, "m2");
assert_eq!(second.message_type, "Proposal");
}

#[tokio::test]
async fn commitment_versions_are_carried_into_resolution() {
let rt = make_runtime();
Expand Down
Loading
Loading