Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ tokio-stream = "0.1"
futures-core = "0.3"
async-stream = "0.3"
async-trait = "0.1"
uuid = { version = "1", features = ["v4"] }

[dev-dependencies]
tempfile = "3"
Expand Down
20 changes: 15 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,18 @@ This runtime implements the current MACP core/service surface, the five standard
- per-session append-only log files and session snapshots via `FileBackend`
- crash recovery with dedup state reconciliation
- atomic writes (tmp file + rename) prevent partial-write corruption
- **Authoritative session streaming**
- `StreamSession` now emits accepted MACP envelopes for one bound session per stream
- mixed-session streams are rejected
- **Authoritative accepted history**
- log append failures are now fatal — messages are not acknowledged without a durable record
- session state is rebuilt from append-only logs on startup via replay (no snapshot dependency)
- `LogEntry` enriched with `session_id`, `mode`, `macp_version` for self-describing replay
- **Session ID security policy**
- session IDs must be UUID v4/v7 (hyphenated lowercase) or base64url tokens (22+ chars)
- weak/human-readable IDs are rejected with `INVALID_SESSION_ID`
- **Signal enforcement**
- Signals are strictly ambient — non-empty `session_id` or `mode` is rejected
- **StreamSession disabled in freeze profile**
- `Initialize` advertises `stream: false`
- `StreamSession` RPC returns `UNIMPLEMENTED` (implementation retained for future activation)
- `WatchModeRegistry` and `WatchRoots` remain unimplemented

## Implemented modes
Expand Down Expand Up @@ -177,7 +186,7 @@ cargo run --bin fuzz_client
| `GetManifest` | implemented |
| `ListModes` | implemented |
| `ListRoots` | implemented |
| `StreamSession` | implemented (accepted-envelope session stream) |
| `StreamSession` | disabled (unary-first freeze profile) |
| `WatchModeRegistry` | unimplemented |
| `WatchRoots` | unimplemented |

Expand All @@ -195,6 +204,7 @@ runtime/
│ ├── registry.rs # session store with optional persistence
│ ├── log_store.rs # in-memory accepted-history log cache
│ ├── storage.rs # storage backend trait, FileBackend persistence, crash recovery
│ ├── replay.rs # session rebuild from append-only log
│ ├── mode/ # mode implementations
│ └── bin/ # local development example clients
├── docs/
Expand All @@ -206,6 +216,6 @@ runtime/
- The RFC/spec repository remains the normative source for protocol semantics.
- This runtime only accepts the canonical standards-track mode identifiers for the five main modes.
- `multi_round` remains experimental and is not advertised by discovery RPCs.
- `StreamSession` is available for bidirectional session-scoped coordination. Use `Send` when you need per-message negative acknowledgements. The current implementation streams future accepted envelopes from the time the stream binds; it does not backfill earlier accepted history.
- `StreamSession` is disabled in the freeze profile. The implementation is retained for future activation.

See `docs/README.md` and `docs/examples.md` for the updated local development and usage guidance.
17 changes: 14 additions & 3 deletions docs/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Flow:
Important runtime behavior:

- initiator/coordinator may emit `Proposal` and `Commitment`
- participants emit `Evaluation`, `Objection`, and `Vote`
- declared participants may also emit `Proposal`, `Evaluation`, `Objection`, and `Vote`
- duplicate proposal IDs are rejected
- votes are tracked per proposal, per sender
- `CommitmentPayload` version fields must match the bound session versions
Expand Down Expand Up @@ -141,9 +141,11 @@ cargo run --bin multi_round_client

This mode is still experimental. It remains callable by the explicit canonical name `macp.mode.multi_round.v1`, but it is not advertised by discovery RPCs and it does not use the strict standards-track `SessionStart` contract.

## Example 7: StreamSession
## Example 7: StreamSession (disabled in freeze profile)

`StreamSession` emits only accepted canonical MACP envelopes. A single gRPC stream binds to one session. If a client needs negative per-message acknowledgements, it should continue to use `Send`.
`StreamSession` is disabled in the unary-first freeze profile. The `Initialize` response advertises `stream: false` and the RPC returns `UNIMPLEMENTED`. The implementation is retained for future activation.

When enabled, `StreamSession` emits only accepted canonical MACP envelopes. A single gRPC stream binds to one session. If a client needs negative per-message acknowledgements, it should continue to use `Send`.

Practical notes:

Expand Down Expand Up @@ -171,6 +173,15 @@ This client exercises common failure paths for the freeze profile, including:
- payload too large
- session access without membership

## Session ID policy

Session IDs must be either:

- **UUID v4/v7** in hyphenated lowercase canonical form (36 characters, e.g. `550e8400-e29b-41d4-a716-446655440000`)
- **Base64url token** of at least 22 characters using only `[A-Za-z0-9_-]`

Human-readable or short IDs (e.g. `"my-session"`, `"s1"`) are rejected with `INVALID_SESSION_ID`. The example clients generate UUID v4 session IDs automatically.

## Common troubleshooting

### `UNAUTHENTICATED`
Expand Down
13 changes: 7 additions & 6 deletions src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use prost::Message;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = common::connect_client().await?;
let session_id = common::new_session_id();

let init = initialize(&mut client).await?;
println!(
Expand All @@ -36,7 +37,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
"macp.mode.decision.v1",
"SessionStart",
"m1",
"decision-demo-1",
&session_id,
"coordinator",
canonical_start_payload("select the deployment plan", &["alice", "bob"], 60_000),
);
Expand All @@ -56,7 +57,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
"macp.mode.decision.v1",
"Proposal",
"m2",
"decision-demo-1",
&session_id,
"coordinator",
proposal.encode_to_vec(),
),
Expand All @@ -77,7 +78,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
"macp.mode.decision.v1",
"Evaluation",
"m3",
"decision-demo-1",
&session_id,
"alice",
evaluation.encode_to_vec(),
),
Expand All @@ -97,7 +98,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
"macp.mode.decision.v1",
"Vote",
"m4",
"decision-demo-1",
&session_id,
"bob",
vote.encode_to_vec(),
),
Expand All @@ -112,7 +113,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
"macp.mode.decision.v1",
"Commitment",
"m5",
"decision-demo-1",
&session_id,
"coordinator",
canonical_commitment_payload(
"c1",
Expand All @@ -125,7 +126,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.await?;
print_ack("commitment", &ack);

let session = get_session_as(&mut client, "alice", "decision-demo-1").await?;
let session = get_session_as(&mut client, "alice", &session_id).await?;
let meta = session.metadata.expect("metadata");
println!("GetSession: state={} mode={}", meta.state, meta.mode);

Expand Down
13 changes: 7 additions & 6 deletions src/bin/fuzz_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use prost::Message;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = common::connect_client().await?;
let session_id = common::new_session_id();

println!("=== Freeze-Profile Error Path Demo ===\n");

Expand Down Expand Up @@ -72,7 +73,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
"macp.mode.task.v1",
"SessionStart",
"m0",
"freeze-task-1",
&session_id,
"planner",
canonical_start_payload("freeze checks", &["planner", "worker"], 60_000),
);
Expand All @@ -91,7 +92,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
"macp.mode.task.v1",
"TaskRequest",
"dup-1",
"freeze-task-1",
&session_id,
"planner",
duplicate_request.encode_to_vec(),
);
Expand All @@ -104,7 +105,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
"macp.mode.task.v1",
"TaskRequest",
"spoof-1",
"freeze-task-1",
&session_id,
"mallory",
vec![1, 2, 3],
);
Expand All @@ -115,22 +116,22 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
"macp.mode.task.v1",
"TaskUpdate",
"big-1",
"freeze-task-1",
&session_id,
"worker",
vec![b'x'; 2_000_000],
);
let ack = send_as(&mut client, "worker", oversized).await?;
print_ack("payload_too_large", &ack);

match get_session_as(&mut client, "outsider", "freeze-task-1").await {
match get_session_as(&mut client, "outsider", &session_id).await {
Ok(resp) => println!(
"[forbidden_get_session] unexpected success: {:?}",
resp.metadata
),
Err(status) => println!("[forbidden_get_session] grpc error: {status}"),
}

let cancelled = cancel_session_as(&mut client, "planner", "freeze-task-1", "end demo").await?;
let cancelled = cancel_session_as(&mut client, "planner", &session_id, "end demo").await?;
if let Some(ack) = cancelled.ack.as_ref() {
print_ack("cancel_session", ack);
}
Expand Down
13 changes: 7 additions & 6 deletions src/bin/handoff_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use prost::Message;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = common::connect_client().await?;
let session_id = common::new_session_id();

println!("=== Handoff Mode Demo ===\n");

Expand All @@ -21,7 +22,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
"macp.mode.handoff.v1",
"SessionStart",
"m0",
"handoff-demo-1",
&session_id,
"owner",
canonical_start_payload("escalate support ticket", &["owner", "target"], 60_000),
),
Expand All @@ -42,7 +43,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
"macp.mode.handoff.v1",
"HandoffOffer",
"m1",
"handoff-demo-1",
&session_id,
"owner",
offer.encode_to_vec(),
),
Expand All @@ -62,7 +63,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
"macp.mode.handoff.v1",
"HandoffContext",
"m2",
"handoff-demo-1",
&session_id,
"owner",
context.encode_to_vec(),
),
Expand All @@ -82,7 +83,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
"macp.mode.handoff.v1",
"HandoffAccept",
"m3",
"handoff-demo-1",
&session_id,
"target",
accept.encode_to_vec(),
),
Expand All @@ -97,7 +98,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
"macp.mode.handoff.v1",
"Commitment",
"m4",
"handoff-demo-1",
&session_id,
"owner",
canonical_commitment_payload(
"c1",
Expand All @@ -110,7 +111,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.await?;
print_ack("commitment", &ack);

let session = get_session_as(&mut client, "target", "handoff-demo-1").await?;
let session = get_session_as(&mut client, "target", &session_id).await?;
let meta = session.metadata.expect("metadata");
println!("[get_session] state={} mode={}", meta.state, meta.mode);

Expand Down
13 changes: 7 additions & 6 deletions src/bin/multi_round_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use prost::Message;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = common::connect_client().await?;
let session_id = common::new_session_id();

println!("=== Multi-Round Convergence Demo ===\n");

Expand All @@ -28,7 +29,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
"macp.mode.multi_round.v1",
"SessionStart",
"m0",
"multi-round-demo-1",
&session_id,
"coordinator",
start_payload.encode_to_vec(),
),
Expand All @@ -43,7 +44,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
"macp.mode.multi_round.v1",
"Contribute",
"m1",
"multi-round-demo-1",
&session_id,
"alice",
br#"{"value":"option_a"}"#.to_vec(),
),
Expand All @@ -58,15 +59,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
"macp.mode.multi_round.v1",
"Contribute",
"m2",
"multi-round-demo-1",
&session_id,
"bob",
br#"{"value":"option_b"}"#.to_vec(),
),
)
.await?;
print_ack("bob_contributes_b", &ack);

let session = get_session_as(&mut client, "alice", "multi-round-demo-1").await?;
let session = get_session_as(&mut client, "alice", &session_id).await?;
let meta = session.metadata.expect("metadata");
println!("[get_session] state={} mode={}", meta.state, meta.mode);

Expand All @@ -77,15 +78,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
"macp.mode.multi_round.v1",
"Contribute",
"m3",
"multi-round-demo-1",
&session_id,
"bob",
br#"{"value":"option_a"}"#.to_vec(),
),
)
.await?;
print_ack("bob_revises", &ack);

let session = get_session_as(&mut client, "alice", "multi-round-demo-1").await?;
let session = get_session_as(&mut client, "alice", &session_id).await?;
let meta = session.metadata.expect("metadata");
println!("[get_session] state={} mode={}", meta.state, meta.mode);

Expand Down
Loading
Loading