Skip to content

Commit 8bcf2ee

Browse files
authored
Merge pull request #9 from multiagentcoordinationprotocol/add-session-stream
Add Session Stream
2 parents 2011d31 + b979f93 commit 8bcf2ee

File tree

9 files changed

+493
-31
lines changed

9 files changed

+493
-31
lines changed

README.md

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
Reference runtime for the Multi-Agent Coordination Protocol (MACP).
44

5-
This runtime implements the current MACP core/service surface, the five standards-track modes in the main RFC repository, and one experimental `multi_round` mode that remains available only by explicit canonical name. The focus of this release is freeze-readiness for SDKs and real-world unary integrations: strict `SessionStart`, mode-semantic correctness, authenticated senders, bounded resources, and durable restart recovery.
5+
This runtime implements the current MACP core/service surface, the five standards-track modes in the main RFC repository, and one experimental `multi_round` mode that remains available only by explicit canonical name. The focus of this release is freeze-readiness for SDKs and real-world unary and streaming integrations: strict `SessionStart`, mode-semantic correctness, authenticated senders, bounded resources, and durable restart recovery.
66

77
## What changed in v0.4.0
88

@@ -28,8 +28,9 @@ This runtime implements the current MACP core/service surface, the five standard
2828
- per-session append-only log files and session snapshots via `FileBackend`
2929
- crash recovery with dedup state reconciliation
3030
- atomic writes (tmp file + rename) prevent partial-write corruption
31-
- **Unary freeze profile**
32-
- `StreamSession` is intentionally disabled in this profile
31+
- **Authoritative session streaming**
32+
- `StreamSession` now emits accepted MACP envelopes for one bound session per stream
33+
- mixed-session streams are rejected
3334
- `WatchModeRegistry` and `WatchRoots` remain unimplemented
3435

3536
## Implemented modes
@@ -176,7 +177,7 @@ cargo run --bin fuzz_client
176177
| `GetManifest` | implemented |
177178
| `ListModes` | implemented |
178179
| `ListRoots` | implemented |
179-
| `StreamSession` | intentionally disabled in freeze profile |
180+
| `StreamSession` | implemented (accepted-envelope session stream) |
180181
| `WatchModeRegistry` | unimplemented |
181182
| `WatchRoots` | unimplemented |
182183

@@ -205,6 +206,6 @@ runtime/
205206
- The RFC/spec repository remains the normative source for protocol semantics.
206207
- This runtime only accepts the canonical standards-track mode identifiers for the five main modes.
207208
- `multi_round` remains experimental and is not advertised by discovery RPCs.
208-
- `StreamSession` is intentionally not part of the freeze surface for the first SDKs.
209+
- `StreamSession` is available for bidirectional session-scoped coordination. Use `Send` when you need per-message negative acknowledgements. The current implementation streams future accepted envelopes from the time the stream binds; it does not backfill earlier accepted history.
209210

210211
See `docs/README.md` and `docs/examples.md` for the updated local development and usage guidance.

docs/README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ The RFC/spec repository is still the normative source for MACP semantics. These
66

77
## What is in this runtime profile
88

9-
- unary-first MACP server over gRPC
9+
- MACP server over gRPC with unary RPCs and per-session bidirectional streaming
1010
- five standards-track modes from the main RFC repository
1111
- one experimental `macp.mode.multi_round.v1` mode kept off discovery surfaces
1212
- strict canonical `SessionStart` for standards-track modes
@@ -24,21 +24,21 @@ The RFC/spec repository is still the normative source for MACP semantics. These
2424

2525
## Freeze profile
2626

27-
The current runtime is intended to be the freeze candidate for unary SDKs and reference examples.
27+
The current runtime is intended to be the freeze candidate for unary and streaming SDKs and reference examples.
2828

2929
Implemented and supported:
3030

3131
- `Initialize`
3232
- `Send`
33+
- `StreamSession`
3334
- `GetSession`
3435
- `CancelSession`
3536
- `GetManifest`
3637
- `ListModes`
3738
- `ListRoots`
3839

39-
Not part of the freeze surface:
40+
Not yet implemented:
4041

41-
- `StreamSession` is intentionally disabled in this profile
4242
- `WatchModeRegistry` is unimplemented
4343
- `WatchRoots` is unimplemented
4444

docs/architecture.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,4 +93,4 @@ Responsibilities:
9393

9494
## Freeze-profile design choice
9595

96-
The runtime intentionally prioritizes unary correctness over streaming completeness. `StreamSession` is therefore disabled in this release profile so SDKs can target a stable, explicit surface.
96+
The runtime now exposes `StreamSession` as a per-session accepted-envelope stream. Each gRPC stream binds to one session and receives canonical MACP envelopes in runtime acceptance order. Unary `Send` remains the path for explicit per-message acknowledgement semantics.

docs/examples.md

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Examples and local development guide
22

3-
These examples target `macp-runtime v0.4.0` and the unary freeze profile.
3+
These examples target `macp-runtime v0.4.0` and the stream-capable freeze profile.
44

55
They intentionally use the local-development security shortcut:
66

@@ -141,7 +141,19 @@ cargo run --bin multi_round_client
141141

142142
This mode is still experimental. It remains callable by the explicit canonical name `macp.mode.multi_round.v1`, but it is not advertised by discovery RPCs and it does not use the strict standards-track `SessionStart` contract.
143143

144-
## Example 7: Freeze-check / error-path client
144+
## Example 7: StreamSession
145+
146+
`StreamSession` emits only accepted canonical MACP envelopes. A single gRPC stream binds to one session. If a client needs negative per-message acknowledgements, it should continue to use `Send`.
147+
148+
Practical notes:
149+
150+
- bind a stream by sending a session-scoped envelope for the target session
151+
- use `SessionStart` to create a new session over the stream
152+
- stream attachment starts observing future accepted envelopes from the bind point; it does not replay earlier history
153+
- use a session-scoped `Signal` envelope with the correct `session_id` and `mode` to attach to an existing session without mutating it
154+
- mixed-session streams are rejected with `FAILED_PRECONDITION`
155+
156+
## Example 8: Freeze-check / error-path client
145157

146158
Run:
147159

docs/protocol.md

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,34 @@ The RFC/spec repository is the normative source for protocol semantics. This fil
1010

1111
Clients should call `Initialize` before using the runtime.
1212

13-
## Implemented unary RPCs
13+
## Implemented RPCs
1414

1515
- `Initialize`
1616
- `Send`
17+
- `StreamSession`
1718
- `GetSession`
1819
- `CancelSession`
1920
- `GetManifest`
2021
- `ListModes`
2122
- `ListRoots`
2223

23-
## Not in the freeze surface
24+
## Still unimplemented
2425

25-
- `StreamSession` exists in the protobuf surface but is intentionally disabled in this runtime profile
2626
- `WatchModeRegistry` is unimplemented
2727
- `WatchRoots` is unimplemented
2828

29+
## StreamSession profile
30+
31+
`StreamSession` is session-scoped and authoritative for accepted envelopes:
32+
33+
- one gRPC stream binds to one non-empty `session_id`
34+
- the server emits only accepted canonical MACP envelopes
35+
- stream attachment observes future accepted envelopes from the bind point; it does not backfill earlier history
36+
- accepted envelope order matches runtime admission order for that session
37+
- mixed-session streams are rejected with `FAILED_PRECONDITION`
38+
- stream-level validation failures terminate the stream with a gRPC status; use `Send` if you need explicit per-message negative acknowledgements
39+
- to attach to an existing session without mutating it, send a session-scoped `Signal` envelope with the correct `session_id` and `mode`
40+
2941
## Standards-track mode rules
3042

3143
For these modes:
@@ -81,4 +93,4 @@ For standards-track modes, `CommitmentPayload` must carry version fields that ma
8193

8294
## Discovery notes
8395

84-
`ListModes` returns the five standards-track modes. `GetManifest` exposes a freeze-profile manifest that matches the implemented unary capabilities.
96+
`ListModes` returns the five standards-track modes. `GetManifest` exposes a manifest that matches the implemented unary and streaming capabilities.

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,6 @@ pub mod registry;
2929
pub mod runtime;
3030
pub mod session;
3131
pub mod storage;
32+
pub mod stream_bus;
3233

3334
pub mod security;

src/runtime.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use crate::session::{
1818
validate_standard_session_start_payload, Session, SessionState,
1919
};
2020
use crate::storage::StorageBackend;
21+
use crate::stream_bus::SessionStreamBus;
2122

2223
const EXPERIMENTAL_DEFAULT_TTL_MS: i64 = 60_000;
2324

@@ -31,6 +32,7 @@ pub struct Runtime {
3132
pub storage: Arc<dyn StorageBackend>,
3233
pub registry: Arc<SessionRegistry>,
3334
pub log_store: Arc<LogStore>,
35+
stream_bus: Arc<SessionStreamBus>,
3436
modes: HashMap<String, Box<dyn Mode>>,
3537
}
3638

@@ -52,6 +54,7 @@ impl Runtime {
5254
storage,
5355
registry,
5456
log_store,
57+
stream_bus: Arc::new(SessionStreamBus::default()),
5558
modes,
5659
}
5760
}
@@ -64,6 +67,19 @@ impl Runtime {
6467
.collect()
6568
}
6669

70+
pub fn subscribe_session_stream(
71+
&self,
72+
session_id: &str,
73+
) -> tokio::sync::broadcast::Receiver<Envelope> {
74+
self.stream_bus.subscribe(session_id)
75+
}
76+
77+
fn publish_accepted_envelope(&self, env: &Envelope) {
78+
if !env.session_id.is_empty() {
79+
self.stream_bus.publish(&env.session_id, env.clone());
80+
}
81+
}
82+
6783
fn make_incoming_entry(env: &Envelope) -> LogEntry {
6884
LogEntry {
6985
message_id: env.message_id.clone(),
@@ -259,6 +275,7 @@ impl Runtime {
259275
// 3. Best-effort session save
260276
self.save_session_to_storage(&session).await;
261277
guard.insert(env.session_id.clone(), session);
278+
self.publish_accepted_envelope(env);
262279

263280
Ok(ProcessResult {
264281
session_state: result_state,
@@ -323,6 +340,7 @@ impl Runtime {
323340

324341
// 3. Best-effort session save
325342
self.save_session_to_storage(session).await;
343+
self.publish_accepted_envelope(env);
326344

327345
Ok(ProcessResult {
328346
session_state: result_state,
@@ -714,6 +732,45 @@ mod tests {
714732
assert_eq!(result.session_state, SessionState::Expired);
715733
}
716734

735+
#[tokio::test]
736+
async fn accepted_envelopes_are_published_in_order() {
737+
let rt = make_runtime();
738+
let mut events = rt.subscribe_session_stream("s1");
739+
740+
let start = env(
741+
"macp.mode.decision.v1",
742+
"SessionStart",
743+
"m1",
744+
"s1",
745+
"agent://orchestrator",
746+
session_start(vec!["agent://fraud".into()]),
747+
);
748+
rt.process(&start, None).await.unwrap();
749+
let first = events.recv().await.unwrap();
750+
assert_eq!(first.message_id, "m1");
751+
assert_eq!(first.message_type, "SessionStart");
752+
753+
let proposal = ProposalPayload {
754+
proposal_id: "p1".into(),
755+
option: "step-up".into(),
756+
rationale: "risk".into(),
757+
supporting_data: vec![],
758+
}
759+
.encode_to_vec();
760+
let proposal_env = env(
761+
"macp.mode.decision.v1",
762+
"Proposal",
763+
"m2",
764+
"s1",
765+
"agent://orchestrator",
766+
proposal,
767+
);
768+
rt.process(&proposal_env, None).await.unwrap();
769+
let second = events.recv().await.unwrap();
770+
assert_eq!(second.message_id, "m2");
771+
assert_eq!(second.message_type, "Proposal");
772+
}
773+
717774
#[tokio::test]
718775
async fn commitment_versions_are_carried_into_resolution() {
719776
let rt = make_runtime();

0 commit comments

Comments
 (0)