From 168ab318bffa86986db47b0bdb89fec25af206f1 Mon Sep 17 00:00:00 2001 From: Ajit Koti Date: Fri, 20 Mar 2026 19:50:52 -0700 Subject: [PATCH 1/2] Promote multi-round to standards-track, add tracing/metrics, implement WatchModeRegistry/WatchRoots MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Promote macp.mode.multi_round.v1 from experimental to the 6th standards-track mode. Multi-round now requires strict SessionStart validation (participants, mode_version, configuration_version, positive ttl_ms) and explicit Commitment after convergence — matching the pattern of all other standards-track modes. Auto-resolve on convergence is removed; convergence is tracked via a `converged` flag in mode state and the initiator must emit a validated Commitment to resolve the session. Runtime changes: - Remove experimental back-fill hack for multi-round participants - Remove experimental default TTL (60s) fallback - Add `tracing` + `tracing-subscriber` for structured logging - Replace all eprintln!/println! with tracing macros (info/warn/debug) - Add per-mode atomic metrics counters (src/metrics.rs) - Implement WatchModeRegistry (sends initial RegistryChanged, holds stream open) - Implement WatchRoots (sends initial RootsChanged, holds stream open) - Add Signal processing debug logging Test additions (240 total, up from 224): - 7 negative-path conformance fixtures (one per mode) - 1 multi-round happy-path conformance fixture - Multi-round integration lifecycle test - Multi-round replay round-trip test - FileBackend integration tests (lifecycle + crash recovery) - StreamSession integration tests (ordering, concurrent subscribers) - Concurrent message handling tests (N parallel, duplicate dedup) --- Cargo.toml | 2 + README.md | 25 +- docs/README.md | 10 +- docs/architecture.md | 5 +- docs/examples.md | 16 +- docs/protocol.md | 15 +- src/bin/multi_round_client.rs | 51 +++- src/lib.rs | 1 + src/main.rs | 40 ++- src/metrics.rs | 154 ++++++++++ src/mode/mod.rs | 16 + src/mode/multi_round.rs | 282 ++++++++++++++---- src/mode_registry.rs | 26 +- src/replay.rs | 20 +- src/runtime.rs | 125 +++++--- src/server.rs | 34 ++- src/session.rs | 7 +- tests/concurrent_messages.rs | 187 ++++++++++++ tests/conformance/decision_reject_paths.json | 41 +++ tests/conformance/handoff_reject_paths.json | 26 ++ tests/conformance/multi_round_happy_path.json | 48 +++ .../conformance/multi_round_reject_paths.json | 56 ++++ tests/conformance/proposal_reject_paths.json | 27 ++ tests/conformance/quorum_reject_paths.json | 48 +++ tests/conformance/task_reject_paths.json | 33 ++ tests/conformance_loader.rs | 38 +++ tests/file_backend_integration.rs | 200 +++++++++++++ tests/integration_mode_lifecycle.rs | 83 ++++++ tests/replay_round_trip.rs | 46 +++ tests/stream_integration.rs | 157 ++++++++++ 30 files changed, 1628 insertions(+), 191 deletions(-) create mode 100644 src/metrics.rs create mode 100644 tests/concurrent_messages.rs create mode 100644 tests/conformance/decision_reject_paths.json create mode 100644 tests/conformance/handoff_reject_paths.json create mode 100644 tests/conformance/multi_round_happy_path.json create mode 100644 tests/conformance/multi_round_reject_paths.json create mode 100644 tests/conformance/proposal_reject_paths.json create mode 100644 tests/conformance/quorum_reject_paths.json create mode 100644 tests/conformance/task_reject_paths.json create mode 100644 tests/file_backend_integration.rs create mode 100644 tests/stream_integration.rs diff --git a/Cargo.toml b/Cargo.toml index c6473ab..cafd299 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,8 @@ futures-core = "0.3" async-stream = "0.3" async-trait = "0.1" uuid = { version = "1", features = ["v4"] } +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } [dev-dependencies] tempfile = "3" diff --git a/README.md b/README.md index 7c02708..959ad12 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ Reference runtime for the Multi-Agent Coordination Protocol (MACP). -This runtime implements the current MACP core/service surface, the five standards-track modes in the main RFC repository, and one experimental `multi_round` mode that remains available only by explicit canonical name. The focus of this release is freeze-readiness for SDKs and real-world unary and streaming integrations: strict `SessionStart`, mode-semantic correctness, authenticated senders, bounded resources, and durable restart recovery. +This runtime implements the current MACP core/service surface and all six standards-track modes in the main RFC repository. The focus of this release is freeze-readiness for SDKs and real-world unary and streaming integrations: strict `SessionStart`, mode-semantic correctness, authenticated senders, bounded resources, and durable restart recovery. ## What changed in v0.4.0 @@ -40,7 +40,11 @@ This runtime implements the current MACP core/service surface, the five standard - **StreamSession enabled** - `Initialize` advertises `stream: true` - `StreamSession` provides per-session bidirectional streaming of accepted envelopes - - `WatchModeRegistry` and `WatchRoots` remain unimplemented + - `WatchModeRegistry` and `WatchRoots` implemented (basic: send initial state, hold stream open) +- **Structured logging via `tracing`** + - use `RUST_LOG` env var to control log level (e.g. `RUST_LOG=info`) +- **Per-mode metrics** + - tracked via `src/metrics.rs` ## Implemented modes @@ -51,16 +55,13 @@ Standards-track modes: - `macp.mode.task.v1` - `macp.mode.handoff.v1` - `macp.mode.quorum.v1` - -Experimental mode: - - `macp.mode.multi_round.v1` ## Runtime behavior that SDKs should assume ### Session bootstrap -For the five standards-track modes, `SessionStartPayload` must include: +For all six standards-track modes, `SessionStartPayload` must include: - `participants` - `mode_version` @@ -95,6 +96,7 @@ Unless `MACP_MEMORY_ONLY=1` is set, the runtime persists session and log snapsho | `MACP_BIND_ADDR` | bind address | `127.0.0.1:50051` | | `MACP_DATA_DIR` | persistence directory | `.macp-data` | | `MACP_MEMORY_ONLY` | disable persistence when set to `1` | unset | +| `RUST_LOG` | `tracing` log level filter (e.g. `info`, `debug`) | unset | | `MACP_ALLOW_INSECURE` | allow plaintext transport when set to `1` | unset | | `MACP_TLS_CERT_PATH` | PEM certificate for TLS | unset | | `MACP_TLS_KEY_PATH` | PEM private key for TLS | unset | @@ -187,8 +189,8 @@ cargo run --bin fuzz_client | `ListModes` | implemented | | `ListRoots` | implemented | | `StreamSession` | implemented | -| `WatchModeRegistry` | unimplemented | -| `WatchRoots` | unimplemented | +| `WatchModeRegistry` | implemented | +| `WatchRoots` | implemented | ## Architecture @@ -227,11 +229,12 @@ runtime/ │ ├── log_store.rs # in-memory accepted-history log cache │ ├── storage.rs # storage backend trait, FileBackend, crash recovery │ ├── replay.rs # session rebuild from append-only log +│ ├── metrics.rs # per-mode metrics counters │ ├── mode/ # mode implementations │ └── bin/ # local development example clients ├── tests/ │ ├── integration_mode_lifecycle.rs # full-stack integration tests -│ ├── replay_round_trip.rs # replay tests for all 5 modes +│ ├── replay_round_trip.rs # replay tests for all 6 modes │ ├── conformance_loader.rs # JSON fixture runner │ └── conformance/ # per-mode conformance fixtures ├── docs/ @@ -261,8 +264,8 @@ Run `make sync-protos` to update local proto files from BSR. ## Development notes - The RFC/spec repository remains the normative source for protocol semantics. -- This runtime only accepts the canonical standards-track mode identifiers for the five main modes. -- `multi_round` remains experimental and is not advertised by discovery RPCs. +- This runtime only accepts the canonical standards-track mode identifiers for all six modes. +- `multi_round` is now standards-track and is advertised by discovery RPCs. - `StreamSession` is enabled and binds one gRPC stream to one session, emitting accepted envelopes in order. See `docs/README.md` and `docs/examples.md` for the updated local development and usage guidance. diff --git a/docs/README.md b/docs/README.md index 8bf88ef..3be980a 100644 --- a/docs/README.md +++ b/docs/README.md @@ -7,8 +7,7 @@ The RFC/spec repository is still the normative source for MACP semantics. These ## What is in this runtime profile - MACP server over gRPC with unary RPCs and per-session bidirectional streaming -- five standards-track modes from the main RFC repository -- one experimental `macp.mode.multi_round.v1` mode kept off discovery surfaces +- six standards-track modes from the main RFC repository - strict canonical `SessionStart` for standards-track modes - authenticated sender derivation - payload limits and rate limiting @@ -21,6 +20,7 @@ The RFC/spec repository is still the normative source for MACP semantics. These - `macp.mode.task.v1` - `macp.mode.handoff.v1` - `macp.mode.quorum.v1` +- `macp.mode.multi_round.v1` ## Freeze profile @@ -37,10 +37,10 @@ Implemented and supported: - `ListModes` - `ListRoots` -Not yet implemented: +Streaming watch RPCs (basic — send initial state, hold stream open): -- `WatchModeRegistry` is unimplemented -- `WatchRoots` is unimplemented +- `WatchModeRegistry` +- `WatchRoots` ## Security model diff --git a/docs/architecture.md b/docs/architecture.md index 5e07086..b5e2f00 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -29,11 +29,10 @@ The `ModeRegistry` is the single source of truth for mode dispatch, replay, and Responsibilities: -- register all mode implementations (standards-track and experimental) +- register all mode implementations - provide mode lookup for dispatch and replay - provide standards-track mode names for `ListModes` - provide mode descriptors for `ListModes` and `GetManifest` -- classify modes as standards-track or experimental Key methods: @@ -58,7 +57,7 @@ Implemented modes: - Task — delegated task with serial assignment, progress tracking, terminal reports - Handoff — serial handoff offers with accept/decline disposition - Quorum — threshold approval with ballots -- MultiRound (experimental) — iterative value convergence +- MultiRound — iterative value convergence with explicit Commitment ## 5. Storage layer diff --git a/docs/examples.md b/docs/examples.md index 2fe20ea..5d662a1 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -21,6 +21,7 @@ For these modes: - `macp.mode.task.v1` - `macp.mode.handoff.v1` - `macp.mode.quorum.v1` +- `macp.mode.multi_round.v1` `SessionStartPayload` must include all of the following: @@ -131,7 +132,7 @@ Flow: 3. participants send ballots 4. coordinator emits `Commitment` after threshold is satisfied -## Example 6: Experimental multi-round mode +## Example 6: Multi-round mode Run: @@ -139,7 +140,18 @@ Run: cargo run --bin multi_round_client ``` -This mode is still experimental. It remains callable by the explicit canonical name `macp.mode.multi_round.v1`, but it is not advertised by discovery RPCs and it does not use the strict standards-track `SessionStart` contract. +Flow: + +1. coordinator starts the session with strict `SessionStart` (participants, mode_version, configuration_version, ttl_ms) +2. participants exchange proposals across multiple rounds +3. convergence is tracked by the runtime +4. coordinator emits `Commitment` after convergence + +Important runtime behavior: + +- `macp.mode.multi_round.v1` is a standards-track mode and is advertised by discovery RPCs +- convergence is tracked but does not auto-resolve the session — an explicit `Commitment` is required +- uses the same strict `SessionStart` contract as all other standards-track modes ## Example 7: StreamSession diff --git a/docs/protocol.md b/docs/protocol.md index 1119c3f..3ce45a1 100644 --- a/docs/protocol.md +++ b/docs/protocol.md @@ -20,11 +20,13 @@ Clients should call `Initialize` before using the runtime. - `GetManifest` - `ListModes` - `ListRoots` +- `WatchModeRegistry` +- `WatchRoots` -## Still unimplemented +## Streaming watch RPCs -- `WatchModeRegistry` is unimplemented -- `WatchRoots` is unimplemented +- `WatchModeRegistry` — sends the current registry state, then holds the stream open +- `WatchRoots` — sends the current roots state, then holds the stream open ## StreamSession profile @@ -47,6 +49,7 @@ For these modes: - `macp.mode.task.v1` - `macp.mode.handoff.v1` - `macp.mode.quorum.v1` +- `macp.mode.multi_round.v1` `SessionStartPayload` must bind: @@ -57,9 +60,9 @@ For these modes: Empty payloads are rejected. Empty `mode` values are rejected. Duplicate participant IDs are rejected. -## Experimental mode rule +## Multi-round mode -`macp.mode.multi_round.v1` remains available as an explicit experimental mode. It is not advertised by discovery RPCs and retains a more permissive bootstrap path for backward compatibility. +`macp.mode.multi_round.v1` is a standards-track mode. It uses the same strict `SessionStart` contract as all other standards-track modes. Convergence is tracked but does not auto-resolve the session — an explicit `Commitment` is required after convergence. ## Security profile @@ -93,4 +96,4 @@ For standards-track modes, `CommitmentPayload` must carry version fields that ma ## Discovery notes -`ListModes` returns the five standards-track modes. `GetManifest` exposes a manifest that matches the implemented unary and streaming capabilities. +`ListModes` returns all six standards-track modes. `GetManifest` exposes a manifest that matches the implemented unary and streaming capabilities. diff --git a/src/bin/multi_round_client.rs b/src/bin/multi_round_client.rs index 4267fc1..3445f02 100644 --- a/src/bin/multi_round_client.rs +++ b/src/bin/multi_round_client.rs @@ -1,9 +1,10 @@ #[path = "support/common.rs"] mod common; -use common::{envelope, get_session_as, print_ack, send_as}; -use macp_runtime::pb::SessionStartPayload; -use prost::Message; +use common::{ + canonical_commitment_payload, canonical_start_payload, envelope, get_session_as, print_ack, + send_as, +}; #[tokio::main] async fn main() -> Result<(), Box> { @@ -12,16 +13,7 @@ async fn main() -> Result<(), Box> { println!("=== Multi-Round Convergence Demo ===\n"); - let start_payload = SessionStartPayload { - intent: "convergence test".into(), - ttl_ms: 60_000, - participants: vec!["alice".into(), "bob".into()], - mode_version: "experimental".into(), - configuration_version: "legacy".into(), - policy_version: String::new(), - context: vec![], - roots: vec![], - }; + // Standards-track SessionStart with required fields let ack = send_as( &mut client, "coordinator", @@ -31,12 +23,13 @@ async fn main() -> Result<(), Box> { "m0", &session_id, "coordinator", - start_payload.encode_to_vec(), + canonical_start_payload("convergence test", &["alice", "bob"], 60_000), ), ) .await?; print_ack("session_start", &ack); + // Alice contributes let ack = send_as( &mut client, "alice", @@ -52,6 +45,7 @@ async fn main() -> Result<(), Box> { .await?; print_ack("alice_contributes", &ack); + // Bob contributes differently let ack = send_as( &mut client, "bob", @@ -71,6 +65,7 @@ async fn main() -> Result<(), Box> { let meta = session.metadata.expect("metadata"); println!("[get_session] state={} mode={}", meta.state, meta.mode); + // Bob revises to match Alice → convergence let ack = send_as( &mut client, "bob", @@ -86,6 +81,34 @@ async fn main() -> Result<(), Box> { .await?; print_ack("bob_revises", &ack); + let session = get_session_as(&mut client, "alice", &session_id).await?; + let meta = session.metadata.expect("metadata"); + println!( + "[get_session] state={} mode={} (converged, awaiting commitment)", + meta.state, meta.mode + ); + + // Coordinator commits after convergence + let ack = send_as( + &mut client, + "coordinator", + envelope( + "macp.mode.multi_round.v1", + "Commitment", + "m4", + &session_id, + "coordinator", + canonical_commitment_payload( + "c1", + "multi_round.converged", + "convergence", + "all participants agreed", + ), + ), + ) + .await?; + print_ack("commitment", &ack); + let session = get_session_as(&mut client, "alice", &session_id).await?; let meta = session.metadata.expect("metadata"); println!("[get_session] state={} mode={}", meta.state, meta.mode); diff --git a/src/lib.rs b/src/lib.rs index 8060557..01a495e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,6 +24,7 @@ pub mod quorum_pb { pub mod error; pub mod log_store; +pub mod metrics; pub mod mode; pub mod mode_registry; pub mod registry; diff --git a/src/main.rs b/src/main.rs index 584b9b6..9e68b99 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,6 +17,13 @@ use tonic::transport::{Identity, Server, ServerTlsConfig}; #[tokio::main] async fn main() -> Result<(), Box> { + tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), + ) + .init(); + let addr = std::env::var("MACP_BIND_ADDR") .unwrap_or_else(|_| "127.0.0.1:50051".into()) .parse()?; @@ -59,9 +66,10 @@ async fn main() -> Result<(), Box> { Ok(session) => { // Best-effort snapshot update if let Err(e) = storage.save_session(&session).await { - eprintln!( - "warning: failed to persist recovered session '{}': {e}", - session_id + tracing::warn!( + session_id = %session_id, + error = %e, + "failed to persist recovered session" ); } @@ -75,16 +83,17 @@ async fn main() -> Result<(), Box> { recovered += 1; } Err(e) => { - eprintln!( - "warning: failed to replay session '{}': {e}; skipping", - session_id + tracing::warn!( + session_id = %session_id, + error = %e, + "failed to replay session; skipping" ); } } } } if recovered > 0 { - println!("Replayed {} sessions from log.", recovered); + tracing::info!(count = recovered, "replayed sessions from log"); } } @@ -101,7 +110,7 @@ async fn main() -> Result<(), Box> { let tls_cert = std::env::var("MACP_TLS_CERT_PATH").ok(); let tls_key = std::env::var("MACP_TLS_KEY_PATH").ok(); - println!("macp-runtime v0.4.0 listening on {}", addr); + tracing::info!(%addr, "macp-runtime v0.4.0 listening"); let builder = Server::builder(); let mut builder = match (tls_cert, tls_key) { @@ -111,8 +120,8 @@ async fn main() -> Result<(), Box> { builder.tls_config(ServerTlsConfig::new().identity(Identity::from_pem(cert, key)))? } _ if allow_insecure => { - eprintln!( - "warning: starting without TLS because MACP_ALLOW_INSECURE=1; this is not RFC-compliant" + tracing::warn!( + "starting without TLS because MACP_ALLOW_INSECURE=1; this is not RFC-compliant" ); builder } @@ -133,7 +142,7 @@ async fn main() -> Result<(), Box> { result?; } _ = tokio::signal::ctrl_c() => { - println!("\nShutting down gracefully..."); + tracing::info!("shutting down gracefully..."); } } @@ -141,14 +150,15 @@ async fn main() -> Result<(), Box> { if !memory_only { for session in registry.get_all_sessions().await { if let Err(e) = storage.save_session(&session).await { - eprintln!( - "warning: failed to persist session '{}' on shutdown: {e}", - session.session_id + tracing::warn!( + session_id = %session.session_id, + error = %e, + "failed to persist session on shutdown" ); } } } - println!("State persisted. Goodbye."); + tracing::info!("state persisted, goodbye"); Ok(()) } diff --git a/src/metrics.rs b/src/metrics.rs new file mode 100644 index 0000000..0b3273d --- /dev/null +++ b/src/metrics.rs @@ -0,0 +1,154 @@ +use std::collections::HashMap; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::RwLock; + +pub struct ModeMetrics { + pub messages_accepted: AtomicU64, + pub messages_rejected: AtomicU64, + pub sessions_started: AtomicU64, + pub sessions_resolved: AtomicU64, + pub sessions_expired: AtomicU64, + pub sessions_cancelled: AtomicU64, + pub commitments_accepted: AtomicU64, + pub commitments_rejected: AtomicU64, +} + +impl ModeMetrics { + pub fn new() -> Self { + Self { + messages_accepted: AtomicU64::new(0), + messages_rejected: AtomicU64::new(0), + sessions_started: AtomicU64::new(0), + sessions_resolved: AtomicU64::new(0), + sessions_expired: AtomicU64::new(0), + sessions_cancelled: AtomicU64::new(0), + commitments_accepted: AtomicU64::new(0), + commitments_rejected: AtomicU64::new(0), + } + } +} + +impl Default for ModeMetrics { + fn default() -> Self { + Self::new() + } +} + +pub struct RuntimeMetrics { + per_mode: RwLock>, +} + +impl RuntimeMetrics { + pub fn new() -> Self { + Self { + per_mode: RwLock::new(HashMap::new()), + } + } + + pub fn record_session_start(&self, mode: &str) { + self.get_or_create(mode) + .sessions_started + .fetch_add(1, Ordering::Relaxed); + } + + pub fn record_message_accepted(&self, mode: &str) { + self.get_or_create(mode) + .messages_accepted + .fetch_add(1, Ordering::Relaxed); + } + + pub fn record_message_rejected(&self, mode: &str) { + self.get_or_create(mode) + .messages_rejected + .fetch_add(1, Ordering::Relaxed); + } + + pub fn record_session_resolved(&self, mode: &str) { + self.get_or_create(mode) + .sessions_resolved + .fetch_add(1, Ordering::Relaxed); + } + + pub fn record_session_expired(&self, mode: &str) { + self.get_or_create(mode) + .sessions_expired + .fetch_add(1, Ordering::Relaxed); + } + + pub fn record_session_cancelled(&self, mode: &str) { + self.get_or_create(mode) + .sessions_cancelled + .fetch_add(1, Ordering::Relaxed); + } + + pub fn record_commitment_accepted(&self, mode: &str) { + self.get_or_create(mode) + .commitments_accepted + .fetch_add(1, Ordering::Relaxed); + } + + pub fn record_commitment_rejected(&self, mode: &str) { + self.get_or_create(mode) + .commitments_rejected + .fetch_add(1, Ordering::Relaxed); + } + + fn get_or_create(&self, mode: &str) -> &ModeMetrics { + // Fast path: read lock + { + let guard = self.per_mode.read().unwrap(); + if guard.contains_key(mode) { + // SAFETY: We never remove entries and HashMap doesn't move values + // on insert of other keys. The reference is valid for the lifetime + // of RuntimeMetrics. + let ptr = guard.get(mode).unwrap() as *const ModeMetrics; + return unsafe { &*ptr }; + } + } + // Slow path: write lock to insert + let mut guard = self.per_mode.write().unwrap(); + guard.entry(mode.to_string()).or_default(); + let ptr = guard.get(mode).unwrap() as *const ModeMetrics; + unsafe { &*ptr } + } + + pub fn snapshot(&self) -> Vec<(String, MetricsSnapshot)> { + let guard = self.per_mode.read().unwrap(); + guard + .iter() + .map(|(mode, m)| { + ( + mode.clone(), + MetricsSnapshot { + messages_accepted: m.messages_accepted.load(Ordering::Relaxed), + messages_rejected: m.messages_rejected.load(Ordering::Relaxed), + sessions_started: m.sessions_started.load(Ordering::Relaxed), + sessions_resolved: m.sessions_resolved.load(Ordering::Relaxed), + sessions_expired: m.sessions_expired.load(Ordering::Relaxed), + sessions_cancelled: m.sessions_cancelled.load(Ordering::Relaxed), + commitments_accepted: m.commitments_accepted.load(Ordering::Relaxed), + commitments_rejected: m.commitments_rejected.load(Ordering::Relaxed), + }, + ) + }) + .collect() + } +} + +impl Default for RuntimeMetrics { + fn default() -> Self { + Self::new() + } +} + +#[derive(Debug)] +pub struct MetricsSnapshot { + pub messages_accepted: u64, + pub messages_rejected: u64, + pub sessions_started: u64, + pub sessions_resolved: u64, + pub sessions_expired: u64, + pub sessions_cancelled: u64, + pub commitments_accepted: u64, + pub commitments_rejected: u64, +} diff --git a/src/mode/mod.rs b/src/mode/mod.rs index 40b252a..758cf0b 100644 --- a/src/mode/mod.rs +++ b/src/mode/mod.rs @@ -18,6 +18,7 @@ pub const STANDARD_MODE_NAMES: &[&str] = &[ "macp.mode.task.v1", "macp.mode.handoff.v1", "macp.mode.quorum.v1", + "macp.mode.multi_round.v1", ]; /// The result of a Mode processing a message. @@ -159,5 +160,20 @@ pub fn standard_mode_descriptors() -> Vec { terminal_message_types: vec!["Commitment".into()], schema_uris: schema_map("buf.build/multiagentcoordinationprotocol/macp"), }, + ModeDescriptor { + mode: "macp.mode.multi_round.v1".into(), + mode_version: "1.0.0".into(), + title: "Multi-Round Mode".into(), + description: "Iterative convergence through multiple contribution rounds until all participants agree, with a terminal Commitment.".into(), + determinism_class: "semantic-deterministic".into(), + participant_model: "peer".into(), + message_types: vec![ + "SessionStart".into(), + "Contribute".into(), + "Commitment".into(), + ], + terminal_message_types: vec!["Commitment".into()], + schema_uris: schema_map("buf.build/multiagentcoordinationprotocol/macp"), + }, ] } diff --git a/src/mode/multi_round.rs b/src/mode/multi_round.rs index aab684b..9524203 100644 --- a/src/mode/multi_round.rs +++ b/src/mode/multi_round.rs @@ -1,4 +1,5 @@ use crate::error::MacpError; +use crate::mode::util::validate_commitment_payload_for_session; use crate::mode::{Mode, ModeResponse}; use crate::pb::Envelope; use crate::session::Session; @@ -13,6 +14,8 @@ pub struct MultiRoundState { pub contributions: BTreeMap, #[serde(default)] pub convergence_type: String, + #[serde(default)] + pub converged: bool, } /// Payload for Contribute messages. @@ -40,6 +43,20 @@ impl MultiRoundMode { fn decode_state(data: &[u8]) -> Result { serde_json::from_slice(data).map_err(|_| MacpError::InvalidModeState) } + + fn check_convergence(state: &MultiRoundState) -> bool { + let all_contributed = state + .participants + .iter() + .all(|p| state.contributions.contains_key(p)); + + if !all_contributed { + return false; + } + + let values: Vec<&String> = state.contributions.values().collect(); + values.windows(2).all(|w| w[0] == w[1]) + } } impl Mode for MultiRoundMode { @@ -48,7 +65,6 @@ impl Mode for MultiRoundMode { session: &Session, _env: &Envelope, ) -> Result { - // Participants come from the runtime via SessionStartPayload let participants = session.participants.clone(); if participants.is_empty() { @@ -60,23 +76,52 @@ impl Mode for MultiRoundMode { participants, contributions: BTreeMap::new(), convergence_type: "all_equal".into(), + converged: false, }; Ok(ModeResponse::PersistState(Self::encode_state(&state))) } fn on_message(&self, session: &Session, env: &Envelope) -> Result { - if env.message_type != "Contribute" { - return Ok(ModeResponse::NoOp); + match env.message_type.as_str() { + "Contribute" => self.handle_contribute(session, env), + "Commitment" => self.handle_commitment(session, env), + _ => Ok(ModeResponse::NoOp), + } + } + + fn authorize_sender(&self, session: &Session, env: &Envelope) -> Result<(), MacpError> { + if env.message_type == "Commitment" { + // Only the initiator can emit Commitment + if env.sender != session.initiator_sender { + return Err(MacpError::Forbidden); + } + return Ok(()); + } + // Default: must be a declared participant + if !session.participants.is_empty() && !session.participants.contains(&env.sender) { + return Err(MacpError::Forbidden); } + Ok(()) + } +} +impl MultiRoundMode { + fn handle_contribute( + &self, + session: &Session, + env: &Envelope, + ) -> Result { let mut state = Self::decode_state(&session.mode_state)?; + if state.converged { + return Err(MacpError::InvalidPayload); + } + let text = std::str::from_utf8(&env.payload).map_err(|_| MacpError::InvalidPayload)?; let contribute: ContributePayload = serde_json::from_str(text).map_err(|_| MacpError::InvalidPayload)?; - // Check if the value changed from previous contribution let previous = state.contributions.get(&env.sender); let value_changed = previous.is_none_or(|prev| *prev != contribute.value); @@ -87,40 +132,53 @@ impl Mode for MultiRoundMode { .insert(env.sender.clone(), contribute.value); } - // Check convergence: all participants contributed + all values identical - let all_contributed = state - .participants - .iter() - .all(|p| state.contributions.contains_key(p)); - - if all_contributed { - let values: Vec<&String> = state.contributions.values().collect(); - let all_equal = values.windows(2).all(|w| w[0] == w[1]); - - if all_equal { - let converged_value = values[0].clone(); - let resolution = ResolutionPayload { - converged_value, - round: state.round, - final_values: state.contributions.clone(), - }; - let resolution_bytes = serde_json::to_vec(&resolution) - .expect("ResolutionPayload is always serializable"); - return Ok(ModeResponse::PersistAndResolve { - state: Self::encode_state(&state), - resolution: resolution_bytes, - }); - } + if Self::check_convergence(&state) { + state.converged = true; } Ok(ModeResponse::PersistState(Self::encode_state(&state))) } + + fn handle_commitment( + &self, + session: &Session, + env: &Envelope, + ) -> Result { + let state = Self::decode_state(&session.mode_state)?; + + if !state.converged { + return Err(MacpError::InvalidPayload); + } + + validate_commitment_payload_for_session(session, &env.payload)?; + + let converged_value = state + .contributions + .values() + .next() + .cloned() + .unwrap_or_default(); + let resolution = ResolutionPayload { + converged_value, + round: state.round, + final_values: state.contributions.clone(), + }; + let resolution_bytes = + serde_json::to_vec(&resolution).expect("ResolutionPayload is always serializable"); + + Ok(ModeResponse::PersistAndResolve { + state: Self::encode_state(&state), + resolution: resolution_bytes, + }) + } } #[cfg(test)] mod tests { use super::*; + use crate::pb::CommitmentPayload; use crate::session::SessionState; + use prost::Message; use std::collections::HashSet; fn base_session() -> Session { @@ -131,28 +189,28 @@ mod tests { ttl_ms: 60_000, started_at_unix_ms: 0, resolution: None, - mode: "multi_round".into(), + mode: "macp.mode.multi_round.v1".into(), mode_state: vec![], participants: vec![], seen_message_ids: HashSet::new(), intent: String::new(), - mode_version: String::new(), - configuration_version: String::new(), + mode_version: "1.0.0".into(), + configuration_version: "cfg-1".into(), policy_version: String::new(), context: vec![], roots: vec![], - initiator_sender: String::new(), + initiator_sender: "coordinator".into(), } } fn session_start_env() -> Envelope { Envelope { macp_version: "1.0".into(), - mode: "multi_round".into(), + mode: "macp.mode.multi_round.v1".into(), message_type: "SessionStart".into(), message_id: "m0".into(), session_id: "s1".into(), - sender: "creator".into(), + sender: "coordinator".into(), timestamp_unix_ms: 1_700_000_000_000, payload: vec![], } @@ -162,7 +220,7 @@ mod tests { let payload = serde_json::json!({"value": value}).to_string(); Envelope { macp_version: "1.0".into(), - mode: "multi_round".into(), + mode: "macp.mode.multi_round.v1".into(), message_type: "Contribute".into(), message_id: format!("m_{}", sender), session_id: "s1".into(), @@ -172,6 +230,29 @@ mod tests { } } + fn commitment_env(sender: &str) -> Envelope { + let payload = CommitmentPayload { + commitment_id: "c1".into(), + action: "multi_round.converged".into(), + authority_scope: "test".into(), + reason: "converged".into(), + mode_version: "1.0.0".into(), + policy_version: String::new(), + configuration_version: "cfg-1".into(), + } + .encode_to_vec(); + Envelope { + macp_version: "1.0".into(), + mode: "macp.mode.multi_round.v1".into(), + message_type: "Commitment".into(), + message_id: "m_commit".into(), + session_id: "s1".into(), + sender: sender.into(), + timestamp_unix_ms: 1_700_000_000_000, + payload, + } + } + fn session_with_state(state: &MultiRoundState) -> Session { let mut s = base_session(); s.mode_state = MultiRoundMode::encode_state(state); @@ -193,6 +274,7 @@ mod tests { assert_eq!(state.round, 0); assert_eq!(state.participants, vec!["alice", "bob"]); assert!(state.contributions.is_empty()); + assert!(!state.converged); } _ => panic!("Expected PersistState"), } @@ -201,7 +283,7 @@ mod tests { #[test] fn session_start_rejects_empty_participants() { let mode = MultiRoundMode; - let session = base_session(); // empty participants + let session = base_session(); let env = session_start_env(); let err = mode.on_session_start(&session, &env).unwrap_err(); @@ -216,6 +298,7 @@ mod tests { participants: vec!["alice".into(), "bob".into()], contributions: BTreeMap::new(), convergence_type: "all_equal".into(), + converged: false, }; let session = session_with_state(&state); let env = contribute_env("alice", "option_a"); @@ -226,6 +309,7 @@ mod tests { let new_state: MultiRoundState = serde_json::from_slice(&data).unwrap(); assert_eq!(new_state.round, 1); assert_eq!(new_state.contributions.get("alice").unwrap(), "option_a"); + assert!(!new_state.converged); } _ => panic!("Expected PersistState"), } @@ -241,6 +325,7 @@ mod tests { participants: vec!["alice".into(), "bob".into()], contributions, convergence_type: "all_equal".into(), + converged: false, }; let session = session_with_state(&state); let env = contribute_env("alice", "option_a"); @@ -249,7 +334,7 @@ mod tests { match result { ModeResponse::PersistState(data) => { let new_state: MultiRoundState = serde_json::from_slice(&data).unwrap(); - assert_eq!(new_state.round, 1); // unchanged + assert_eq!(new_state.round, 1); } _ => panic!("Expected PersistState"), } @@ -265,6 +350,7 @@ mod tests { participants: vec!["alice".into(), "bob".into()], contributions, convergence_type: "all_equal".into(), + converged: false, }; let session = session_with_state(&state); let env = contribute_env("alice", "option_b"); @@ -281,7 +367,7 @@ mod tests { } #[test] - fn convergence_when_all_equal() { + fn convergence_sets_converged_flag() { let mode = MultiRoundMode; let mut contributions = BTreeMap::new(); contributions.insert("alice".to_string(), "option_a".to_string()); @@ -290,29 +376,106 @@ mod tests { participants: vec!["alice".into(), "bob".into()], contributions, convergence_type: "all_equal".into(), + converged: false, }; let session = session_with_state(&state); let env = contribute_env("bob", "option_a"); let result = mode.on_message(&session, &env).unwrap(); match result { - ModeResponse::PersistAndResolve { - state: state_bytes, - resolution, - } => { - let final_state: MultiRoundState = serde_json::from_slice(&state_bytes).unwrap(); - assert_eq!(final_state.round, 2); + ModeResponse::PersistState(data) => { + let new_state: MultiRoundState = serde_json::from_slice(&data).unwrap(); + assert_eq!(new_state.round, 2); + assert!(new_state.converged); + } + _ => panic!("Expected PersistState (convergence tracked, not auto-resolved)"), + } + } + + #[test] + fn commitment_after_convergence_resolves() { + let mode = MultiRoundMode; + let mut contributions = BTreeMap::new(); + contributions.insert("alice".to_string(), "option_a".to_string()); + contributions.insert("bob".to_string(), "option_a".to_string()); + let state = MultiRoundState { + round: 2, + participants: vec!["alice".into(), "bob".into()], + contributions, + convergence_type: "all_equal".into(), + converged: true, + }; + let session = session_with_state(&state); + let env = commitment_env("coordinator"); + let result = mode.on_message(&session, &env).unwrap(); + match result { + ModeResponse::PersistAndResolve { resolution, .. } => { let res: serde_json::Value = serde_json::from_slice(&resolution).unwrap(); assert_eq!(res["converged_value"], "option_a"); assert_eq!(res["round"], 2); - assert_eq!(res["final"]["alice"], "option_a"); - assert_eq!(res["final"]["bob"], "option_a"); } _ => panic!("Expected PersistAndResolve"), } } + #[test] + fn commitment_before_convergence_rejected() { + let mode = MultiRoundMode; + let state = MultiRoundState { + round: 0, + participants: vec!["alice".into(), "bob".into()], + contributions: BTreeMap::new(), + convergence_type: "all_equal".into(), + converged: false, + }; + let session = session_with_state(&state); + let env = commitment_env("coordinator"); + + let err = mode.on_message(&session, &env).unwrap_err(); + assert_eq!(err.to_string(), "InvalidPayload"); + } + + #[test] + fn contribute_after_convergence_rejected() { + let mode = MultiRoundMode; + let mut contributions = BTreeMap::new(); + contributions.insert("alice".to_string(), "option_a".to_string()); + contributions.insert("bob".to_string(), "option_a".to_string()); + let state = MultiRoundState { + round: 2, + participants: vec!["alice".into(), "bob".into()], + contributions, + convergence_type: "all_equal".into(), + converged: true, + }; + let session = session_with_state(&state); + let env = contribute_env("alice", "option_b"); + + let err = mode.on_message(&session, &env).unwrap_err(); + assert_eq!(err.to_string(), "InvalidPayload"); + } + + #[test] + fn non_initiator_commitment_rejected() { + let mode = MultiRoundMode; + let mut contributions = BTreeMap::new(); + contributions.insert("alice".to_string(), "option_a".to_string()); + contributions.insert("bob".to_string(), "option_a".to_string()); + let state = MultiRoundState { + round: 2, + participants: vec!["alice".into(), "bob".into()], + contributions, + convergence_type: "all_equal".into(), + converged: true, + }; + let session = session_with_state(&state); + let env = commitment_env("alice"); // not the initiator + + let err = mode.authorize_sender(&session, &env).unwrap_err(); + assert_eq!(err.to_string(), "Forbidden"); + } + #[test] fn no_convergence_when_values_differ() { let mode = MultiRoundMode; @@ -323,12 +486,19 @@ mod tests { participants: vec!["alice".into(), "bob".into()], contributions, convergence_type: "all_equal".into(), + converged: false, }; let session = session_with_state(&state); let env = contribute_env("bob", "option_b"); let result = mode.on_message(&session, &env).unwrap(); - assert!(matches!(result, ModeResponse::PersistState(_))); + match result { + ModeResponse::PersistState(data) => { + let new_state: MultiRoundState = serde_json::from_slice(&data).unwrap(); + assert!(!new_state.converged); + } + _ => panic!("Expected PersistState"), + } } #[test] @@ -339,6 +509,7 @@ mod tests { participants: vec!["alice".into(), "bob".into(), "carol".into()], contributions: BTreeMap::new(), convergence_type: "all_equal".into(), + converged: false, }; let session = session_with_state(&state); let env = contribute_env("alice", "option_a"); @@ -355,11 +526,12 @@ mod tests { participants: vec!["alice".into()], contributions: BTreeMap::new(), convergence_type: "all_equal".into(), + converged: false, }; let session = session_with_state(&state); let env = Envelope { macp_version: "1.0".into(), - mode: "multi_round".into(), + mode: "macp.mode.multi_round.v1".into(), message_type: "Message".into(), message_id: "m1".into(), session_id: "s1".into(), @@ -380,11 +552,12 @@ mod tests { participants: vec!["alice".into()], contributions: BTreeMap::new(), convergence_type: "all_equal".into(), + converged: false, }; let session = session_with_state(&state); let env = Envelope { macp_version: "1.0".into(), - mode: "multi_round".into(), + mode: "macp.mode.multi_round.v1".into(), message_type: "Contribute".into(), message_id: "m1".into(), session_id: "s1".into(), @@ -406,6 +579,7 @@ mod tests { participants: vec!["alice".into(), "bob".into()], contributions, convergence_type: "all_equal".into(), + converged: true, }; let encoded = MultiRoundMode::encode_state(&original); @@ -414,6 +588,7 @@ mod tests { assert_eq!(decoded.round, original.round); assert_eq!(decoded.participants, original.participants); assert_eq!(decoded.contributions, original.contributions); + assert_eq!(decoded.converged, original.converged); } #[test] @@ -434,11 +609,18 @@ mod tests { participants: vec!["alice".into(), "bob".into(), "carol".into()], contributions, convergence_type: "all_equal".into(), + converged: false, }; let session = session_with_state(&state); let env = contribute_env("carol", "option_a"); let result = mode.on_message(&session, &env).unwrap(); - assert!(matches!(result, ModeResponse::PersistAndResolve { .. })); + match result { + ModeResponse::PersistState(data) => { + let new_state: MultiRoundState = serde_json::from_slice(&data).unwrap(); + assert!(new_state.converged); + } + _ => panic!("Expected PersistState with converged=true"), + } } } diff --git a/src/mode_registry.rs b/src/mode_registry.rs index d468b58..1553d2d 100644 --- a/src/mode_registry.rs +++ b/src/mode_registry.rs @@ -21,7 +21,7 @@ pub struct ModeRegistry { } impl ModeRegistry { - /// Build the default registry with all 5 standard + 1 experimental modes. + /// Build the default registry with all 6 standards-track modes. pub fn build_default() -> Self { let descriptors = standard_mode_descriptors(); let descriptor_map: HashMap = descriptors @@ -37,6 +37,7 @@ impl ModeRegistry { ("macp.mode.task.v1", Box::new(TaskMode)), ("macp.mode.handoff.v1", Box::new(HandoffMode)), ("macp.mode.quorum.v1", Box::new(QuorumMode)), + ("macp.mode.multi_round.v1", Box::new(MultiRoundMode)), ]; for (name, mode) in standard_modes { @@ -51,17 +52,6 @@ impl ModeRegistry { ); } - // Experimental mode - entries.insert( - "macp.mode.multi_round.v1".to_string(), - ModeRegistration { - mode_name: "macp.mode.multi_round.v1".to_string(), - mode: Box::new(MultiRoundMode), - descriptor: None, - standards_track: false, - }, - ); - Self { entries } } @@ -106,22 +96,22 @@ mod tests { } #[test] - fn build_default_contains_experimental_mode() { + fn build_default_contains_multi_round_as_standard() { let registry = ModeRegistry::build_default(); assert!(registry.get_mode("macp.mode.multi_round.v1").is_some()); - assert!(!registry.is_standard_mode("macp.mode.multi_round.v1")); + assert!(registry.is_standard_mode("macp.mode.multi_round.v1")); } #[test] - fn standard_mode_names_returns_five() { + fn standard_mode_names_returns_six() { let registry = ModeRegistry::build_default(); - assert_eq!(registry.standard_mode_names().len(), 5); + assert_eq!(registry.standard_mode_names().len(), 6); } #[test] - fn standard_mode_descriptors_returns_five() { + fn standard_mode_descriptors_returns_six() { let registry = ModeRegistry::build_default(); - assert_eq!(registry.standard_mode_descriptors().len(), 5); + assert_eq!(registry.standard_mode_descriptors().len(), 6); } #[test] diff --git a/src/replay.rs b/src/replay.rs index 6f1f90e..036c567 100644 --- a/src/replay.rs +++ b/src/replay.rs @@ -7,8 +7,6 @@ use crate::session::{ validate_standard_session_start_payload, Session, SessionState, }; -const EXPERIMENTAL_DEFAULT_TTL_MS: i64 = 60_000; - /// Rebuild a `Session` from its append-only log. /// /// This replays the same mode callbacks (`on_session_start`, `on_message`) in @@ -44,10 +42,9 @@ pub fn replay_session( }; validate_standard_session_start_payload(mode_name, &start_payload)?; - let ttl_ms = if is_standard_mode(mode_name) { - extract_ttl_ms(&start_payload)? - } else if start_payload.ttl_ms == 0 { - EXPERIMENTAL_DEFAULT_TTL_MS + let ttl_ms = if !is_standard_mode(mode_name) && start_payload.ttl_ms == 0 { + // Legacy experimental modes may have 0 ttl_ms + 60_000i64 } else { extract_ttl_ms(&start_payload)? }; @@ -96,16 +93,7 @@ pub fn replay_session( session.seen_message_ids.insert(env.message_id.clone()); session.apply_mode_response(response); - // 5. Multi-round participant back-fill - if session.participants.is_empty() && !session.mode_state.is_empty() { - if let Ok(state) = - serde_json::from_slice::(&session.mode_state) - { - session.participants = state.participants.clone(); - } - } - - // 6. Replay subsequent entries + // 5. Replay subsequent entries for entry in log_entries.iter().skip(1) { match entry.entry_kind { EntryKind::Incoming => { diff --git a/src/runtime.rs b/src/runtime.rs index 8b5fe0e..bbf14d9 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -3,19 +3,17 @@ use std::sync::Arc; use crate::error::MacpError; use crate::log_store::{EntryKind, LogEntry, LogStore}; +use crate::metrics::RuntimeMetrics; use crate::mode_registry::ModeRegistry; -use crate::pb::{Envelope, ModeDescriptor, SessionStartPayload}; +use crate::pb::{Envelope, ModeDescriptor}; use crate::registry::SessionRegistry; use crate::session::{ - extract_ttl_ms, is_standard_mode, parse_session_start_payload, - validate_session_id_for_acceptance, validate_standard_session_start_payload, Session, - SessionState, + extract_ttl_ms, parse_session_start_payload, validate_session_id_for_acceptance, + validate_standard_session_start_payload, Session, SessionState, }; use crate::storage::StorageBackend; use crate::stream_bus::SessionStreamBus; -const EXPERIMENTAL_DEFAULT_TTL_MS: i64 = 60_000; - #[derive(Debug)] pub struct ProcessResult { pub session_state: SessionState, @@ -28,6 +26,7 @@ pub struct Runtime { pub log_store: Arc, stream_bus: Arc, mode_registry: Arc, + metrics: Arc, } impl Runtime { @@ -56,6 +55,7 @@ impl Runtime { log_store, stream_bus: Arc::new(SessionStreamBus::default()), mode_registry, + metrics: Arc::new(RuntimeMetrics::new()), } } @@ -71,6 +71,10 @@ impl Runtime { &self.mode_registry } + pub fn metrics(&self) -> &Arc { + &self.metrics + } + pub fn subscribe_session_stream( &self, session_id: &str, @@ -119,9 +123,10 @@ impl Runtime { async fn save_session_to_storage(&self, session: &Session) { if let Err(err) = self.storage.save_session(session).await { - eprintln!( - "warning: failed to persist session '{}': {err}", - session.session_id + tracing::warn!( + session_id = %session.session_id, + error = %err, + "failed to persist session snapshot" ); } } @@ -140,6 +145,8 @@ impl Runtime { .map_err(|_| MacpError::StorageFailed)?; self.log_store.append(session_id, entry).await; session.state = SessionState::Expired; + self.metrics.record_session_expired(&session.mode); + tracing::info!(session_id, "session expired via TTL"); return Ok(true); } Ok(false) @@ -172,19 +179,9 @@ impl Runtime { .get_mode(mode_name) .ok_or(MacpError::UnknownMode)?; - let start_payload = if env.payload.is_empty() && !is_standard_mode(mode_name) { - SessionStartPayload::default() - } else { - parse_session_start_payload(&env.payload)? - }; + let start_payload = parse_session_start_payload(&env.payload)?; validate_standard_session_start_payload(mode_name, &start_payload)?; - let ttl_ms = if is_standard_mode(mode_name) { - extract_ttl_ms(&start_payload)? - } else if start_payload.ttl_ms == 0 { - EXPERIMENTAL_DEFAULT_TTL_MS - } else { - extract_ttl_ms(&start_payload)? - }; + let ttl_ms = extract_ttl_ms(&start_payload)?; let mut guard = self.registry.sessions.write().await; if let Some(existing) = guard.get(&env.session_id) { @@ -258,22 +255,16 @@ impl Runtime { session.seen_message_ids.insert(env.message_id.clone()); session.apply_mode_response(response); - // Multi-round mode stores participants in its own state rather than in - // the session-level field. This block back-fills session.participants so - // that authorization checks work uniformly. It is intentionally coupled - // to MultiRoundState; if new experimental modes adopt the same pattern - // this should be generalized. - if session.participants.is_empty() && !session.mode_state.is_empty() { - if let Ok(state) = serde_json::from_slice::( - &session.mode_state, - ) { - session.participants = state.participants.clone(); - } - } - let result_state = session.state.clone(); // 3. Best-effort session save self.save_session_to_storage(&session).await; + self.metrics.record_session_start(mode_name); + tracing::info!( + session_id = %env.session_id, + mode = mode_name, + sender = %env.sender, + "session started" + ); guard.insert(env.session_id.clone(), session); self.publish_accepted_envelope(env); @@ -332,6 +323,24 @@ impl Runtime { session.apply_mode_response(response); let result_state = session.state.clone(); + self.metrics.record_message_accepted(&session.mode); + if env.message_type == "Commitment" { + self.metrics.record_commitment_accepted(&session.mode); + } + + tracing::debug!( + session_id = %env.session_id, + message_type = %env.message_type, + sender = %env.sender, + state = ?result_state, + "message accepted" + ); + + if result_state == SessionState::Resolved { + self.metrics.record_session_resolved(&session.mode); + tracing::info!(session_id = %env.session_id, mode = %session.mode, "session resolved"); + } + // 3. Best-effort session save self.save_session_to_storage(session).await; self.publish_accepted_envelope(env); @@ -342,11 +351,14 @@ impl Runtime { }) } - /// Process a Signal envelope. Signals are defined in the MACP spec for - /// out-of-band notifications (progress, heartbeat, etc.) but their semantics - /// are not yet finalized. This stub accepts any Signal without side-effects - /// so that compliant clients can send them without error. - async fn process_signal(&self, _env: &Envelope) -> Result { + /// Process a Signal envelope. Signals are informational out-of-band + /// notifications (progress, heartbeat, etc.). Logged but no state mutation. + async fn process_signal(&self, env: &Envelope) -> Result { + tracing::debug!( + sender = %env.sender, + message_id = %env.message_id, + "signal received" + ); Ok(ProcessResult { session_state: SessionState::Open, duplicate: false, @@ -402,6 +414,8 @@ impl Runtime { self.log_store.append(session_id, cancel_entry).await; session.state = SessionState::Expired; self.save_session_to_storage(session).await; + self.metrics.record_session_cancelled(&session.mode); + tracing::info!(session_id, reason, "session cancelled"); Ok(ProcessResult { session_state: SessionState::Expired, @@ -604,21 +618,47 @@ mod tests { } #[tokio::test] - async fn experimental_mode_keeps_legacy_default_ttl() { + async fn multi_round_requires_standard_session_start() { let rt = make_runtime(); let sid = new_sid(); + // multi-round is now standards-track: empty mode_version should fail let payload = SessionStartPayload { participants: vec!["creator".into(), "other".into()], ..Default::default() } .encode_to_vec(); + let err = rt + .process( + &env( + "macp.mode.multi_round.v1", + "SessionStart", + "m1", + &sid, + "creator", + payload, + ), + None, + ) + .await + .unwrap_err(); + assert!(matches!( + err, + MacpError::InvalidPayload | MacpError::InvalidTtl + )); + } + + #[tokio::test] + async fn multi_round_valid_session_start() { + let rt = make_runtime(); + let sid = new_sid(); + let payload = session_start(vec!["alice".into(), "bob".into()]); rt.process( &env( "macp.mode.multi_round.v1", "SessionStart", "m1", &sid, - "creator", + "coordinator", payload, ), None, @@ -626,7 +666,8 @@ mod tests { .await .unwrap(); let session = rt.get_session_checked(&sid).await.unwrap(); - assert!(session.ttl_expiry > session.started_at_unix_ms); + assert_eq!(session.mode, "macp.mode.multi_round.v1"); + assert_eq!(session.participants, vec!["alice", "bob"]); } #[tokio::test] diff --git a/src/server.rs b/src/server.rs index a1c74e5..75f67d6 100644 --- a/src/server.rs +++ b/src/server.rs @@ -405,11 +405,11 @@ impl MacpRuntimeService for MacpServer { manifest: Some(ManifestCapability { get_manifest: true }), mode_registry: Some(ModeRegistryCapability { list_modes: true, - list_changed: false, + list_changed: true, }), roots: Some(RootsCapability { list_roots: true, - list_changed: false, + list_changed: true, }), experimental: None, }), @@ -605,9 +605,18 @@ impl MacpRuntimeService for MacpServer { &self, _request: Request, ) -> Result, Status> { - Err(Status::unimplemented( - "WatchModeRegistry is not yet implemented", - )) + let initial = WatchModeRegistryResponse { + change: Some(macp_runtime::pb::RegistryChanged { + registry: "modes".into(), + observed_at_unix_ms: chrono::Utc::now().timestamp_millis(), + }), + }; + let stream = async_stream::try_stream! { + yield initial; + // Modes are static at runtime — keep the stream open but idle. + std::future::pending::<()>().await; + }; + Ok(Response::new(Box::pin(stream))) } type WatchRootsStream = std::pin::Pin< @@ -618,7 +627,17 @@ impl MacpRuntimeService for MacpServer { &self, _request: Request, ) -> Result, Status> { - Err(Status::unimplemented("WatchRoots is not yet implemented")) + let initial = WatchRootsResponse { + change: Some(macp_runtime::pb::RootsChanged { + observed_at_unix_ms: chrono::Utc::now().timestamp_millis(), + }), + }; + let stream = async_stream::try_stream! { + yield initial; + // Roots are static — keep the stream open but idle. + std::future::pending::<()>().await; + }; + Ok(Response::new(Box::pin(stream))) } } @@ -842,12 +861,13 @@ mod tests { .iter() .map(|m| m.mode.clone()) .collect(); - assert_eq!(names.len(), 5); + assert_eq!(names.len(), 6); assert!(names.contains(&"macp.mode.decision.v1".to_string())); assert!(names.contains(&"macp.mode.proposal.v1".to_string())); assert!(names.contains(&"macp.mode.task.v1".to_string())); assert!(names.contains(&"macp.mode.handoff.v1".to_string())); assert!(names.contains(&"macp.mode.quorum.v1".to_string())); + assert!(names.contains(&"macp.mode.multi_round.v1".to_string())); } #[tokio::test] diff --git a/src/session.rs b/src/session.rs index f7e1f07..7a7eceb 100644 --- a/src/session.rs +++ b/src/session.rs @@ -60,6 +60,7 @@ pub fn is_standard_mode(mode: &str) -> bool { | "macp.mode.task.v1" | "macp.mode.handoff.v1" | "macp.mode.quorum.v1" + | "macp.mode.multi_round.v1" ) } @@ -246,9 +247,11 @@ mod tests { } #[test] - fn experimental_modes_keep_legacy_flexibility() { + fn multi_round_now_requires_standard_validation() { let payload = SessionStartPayload::default(); - validate_standard_session_start_payload("macp.mode.multi_round.v1", &payload).unwrap(); + assert!( + validate_standard_session_start_payload("macp.mode.multi_round.v1", &payload).is_err() + ); } #[test] diff --git a/tests/concurrent_messages.rs b/tests/concurrent_messages.rs new file mode 100644 index 0000000..7ba3449 --- /dev/null +++ b/tests/concurrent_messages.rs @@ -0,0 +1,187 @@ +use macp_runtime::log_store::LogStore; +use macp_runtime::pb::{Envelope, SessionStartPayload}; +use macp_runtime::registry::SessionRegistry; +use macp_runtime::runtime::Runtime; +use macp_runtime::storage::MemoryBackend; +use prost::Message; +use std::sync::Arc; + +fn new_sid() -> String { + uuid::Uuid::new_v4().as_hyphenated().to_string() +} + +fn make_runtime() -> Arc { + let storage: Arc = Arc::new(MemoryBackend); + let registry = Arc::new(SessionRegistry::new()); + let log_store = Arc::new(LogStore::new()); + Arc::new(Runtime::new(storage, registry, log_store)) +} + +fn session_start(participants: Vec) -> Vec { + SessionStartPayload { + intent: "concurrent-test".into(), + participants, + mode_version: "1.0.0".into(), + configuration_version: "cfg-1".into(), + policy_version: "policy-1".into(), + ttl_ms: 60_000, + context: vec![], + roots: vec![], + } + .encode_to_vec() +} + +fn envelope( + mode: &str, + message_type: &str, + message_id: &str, + session_id: &str, + sender: &str, + payload: Vec, +) -> Envelope { + Envelope { + macp_version: "1.0".into(), + mode: mode.into(), + message_type: message_type.into(), + message_id: message_id.into(), + session_id: session_id.into(), + sender: sender.into(), + timestamp_unix_ms: chrono::Utc::now().timestamp_millis(), + payload, + } +} + +#[tokio::test] +async fn concurrent_contributes_all_accepted() { + let rt = make_runtime(); + let sid = new_sid(); + let mode = "macp.mode.multi_round.v1"; + + // Create participants + let n = 10; + let participants: Vec = (0..n).map(|i| format!("agent://p{i}")).collect(); + + rt.process( + &envelope( + mode, + "SessionStart", + "m0", + &sid, + "agent://coordinator", + session_start(participants.clone()), + ), + None, + ) + .await + .unwrap(); + + // Spawn N concurrent tasks, each sending a Contribute + let mut handles = Vec::new(); + for i in 0..n { + let rt = Arc::clone(&rt); + let sid = sid.clone(); + let sender = format!("agent://p{i}"); + handles.push(tokio::spawn(async move { + let payload = serde_json::json!({"value": format!("option_{i}")}) + .to_string() + .into_bytes(); + rt.process( + &envelope( + "macp.mode.multi_round.v1", + "Contribute", + &format!("m{}", i + 1), + &sid, + &sender, + payload, + ), + None, + ) + .await + })); + } + + let mut accepted = 0; + for handle in handles { + let result = handle.await.unwrap(); + if result.is_ok() { + accepted += 1; + } + } + + assert_eq!(accepted, n, "All contributions should be accepted"); + + // Verify dedup state + let session = rt.get_session_checked(&sid).await.unwrap(); + // m0 (SessionStart) + N contributes + assert_eq!(session.seen_message_ids.len(), n + 1); +} + +#[tokio::test] +async fn concurrent_duplicate_messages_handled() { + let rt = make_runtime(); + let sid = new_sid(); + let mode = "macp.mode.decision.v1"; + + rt.process( + &envelope( + mode, + "SessionStart", + "m0", + &sid, + "agent://orchestrator", + session_start(vec!["agent://a".into()]), + ), + None, + ) + .await + .unwrap(); + + let proposal = macp_runtime::decision_pb::ProposalPayload { + proposal_id: "p1".into(), + option: "deploy".into(), + rationale: "ready".into(), + supporting_data: vec![], + } + .encode_to_vec(); + + // Send same message concurrently from multiple tasks + let mut handles = Vec::new(); + for _ in 0..5 { + let rt = Arc::clone(&rt); + let sid = sid.clone(); + let proposal = proposal.clone(); + handles.push(tokio::spawn(async move { + rt.process( + &envelope( + "macp.mode.decision.v1", + "Proposal", + "m1", + &sid, + "agent://orchestrator", + proposal, + ), + None, + ) + .await + })); + } + + let mut accepted = 0; + let mut duplicates = 0; + for handle in handles { + match handle.await.unwrap() { + Ok(result) => { + if result.duplicate { + duplicates += 1; + } else { + accepted += 1; + } + } + Err(_) => {} + } + } + + // Exactly one should be accepted, rest should be duplicates + assert_eq!(accepted, 1, "Exactly one should be non-duplicate"); + assert_eq!(duplicates, 4, "Rest should be duplicates"); +} diff --git a/tests/conformance/decision_reject_paths.json b/tests/conformance/decision_reject_paths.json new file mode 100644 index 0000000..bd5e3ba --- /dev/null +++ b/tests/conformance/decision_reject_paths.json @@ -0,0 +1,41 @@ +{ + "mode": "macp.mode.decision.v1", + "initiator": "agent://orchestrator", + "participants": ["agent://a", "agent://b"], + "mode_version": "1.0.0", + "configuration_version": "cfg-1", + "policy_version": "policy-1", + "ttl_ms": 60000, + "messages": [ + { + "sender": "agent://outsider", + "message_type": "Proposal", + "payload_type": "decision.Proposal", + "payload": { "proposal_id": "p1", "option": "deploy", "rationale": "ready", "supporting_data": [] }, + "expect": "reject" + }, + { + "sender": "agent://orchestrator", + "message_type": "Proposal", + "payload_type": "decision.Proposal", + "payload": { "proposal_id": "p1", "option": "deploy", "rationale": "ready", "supporting_data": [] }, + "expect": "accept" + }, + { + "sender": "agent://a", + "message_type": "Commitment", + "payload_type": "Commitment", + "payload": { + "commitment_id": "c1", + "action": "decision.selected", + "authority_scope": "test", + "reason": "done", + "mode_version": "1.0.0", + "policy_version": "policy-1", + "configuration_version": "cfg-1" + }, + "expect": "reject" + } + ], + "expected_final_state": "Open" +} diff --git a/tests/conformance/handoff_reject_paths.json b/tests/conformance/handoff_reject_paths.json new file mode 100644 index 0000000..8d2e9a4 --- /dev/null +++ b/tests/conformance/handoff_reject_paths.json @@ -0,0 +1,26 @@ +{ + "mode": "macp.mode.handoff.v1", + "initiator": "agent://owner", + "participants": ["agent://owner", "agent://target"], + "mode_version": "1.0.0", + "configuration_version": "cfg-1", + "policy_version": "policy-1", + "ttl_ms": 60000, + "messages": [ + { + "sender": "agent://target", + "message_type": "HandoffAccept", + "payload_type": "handoff.HandoffAccept", + "payload": { "handoff_id": "h1", "accepted_by": "agent://target", "reason": "ready" }, + "expect": "reject" + }, + { + "sender": "agent://owner", + "message_type": "HandoffOffer", + "payload_type": "handoff.HandoffOffer", + "payload": { "handoff_id": "h1", "target_participant": "agent://target", "scope": "support", "reason": "escalate" }, + "expect": "accept" + } + ], + "expected_final_state": "Open" +} diff --git a/tests/conformance/multi_round_happy_path.json b/tests/conformance/multi_round_happy_path.json new file mode 100644 index 0000000..3a03b2d --- /dev/null +++ b/tests/conformance/multi_round_happy_path.json @@ -0,0 +1,48 @@ +{ + "mode": "macp.mode.multi_round.v1", + "initiator": "agent://coordinator", + "participants": ["agent://alice", "agent://bob"], + "mode_version": "1.0.0", + "configuration_version": "cfg-1", + "policy_version": "policy-1", + "ttl_ms": 60000, + "messages": [ + { + "sender": "agent://alice", + "message_type": "Contribute", + "payload_type": "multi_round.Contribute", + "payload": { "value": "option_a" }, + "expect": "accept" + }, + { + "sender": "agent://bob", + "message_type": "Contribute", + "payload_type": "multi_round.Contribute", + "payload": { "value": "option_b" }, + "expect": "accept" + }, + { + "sender": "agent://bob", + "message_type": "Contribute", + "payload_type": "multi_round.Contribute", + "payload": { "value": "option_a" }, + "expect": "accept" + }, + { + "sender": "agent://coordinator", + "message_type": "Commitment", + "payload_type": "Commitment", + "payload": { + "commitment_id": "c1", + "action": "multi_round.converged", + "authority_scope": "test", + "reason": "done", + "mode_version": "1.0.0", + "policy_version": "policy-1", + "configuration_version": "cfg-1" + }, + "expect": "accept" + } + ], + "expected_final_state": "Resolved" +} diff --git a/tests/conformance/multi_round_reject_paths.json b/tests/conformance/multi_round_reject_paths.json new file mode 100644 index 0000000..ba15e0b --- /dev/null +++ b/tests/conformance/multi_round_reject_paths.json @@ -0,0 +1,56 @@ +{ + "mode": "macp.mode.multi_round.v1", + "initiator": "agent://coordinator", + "participants": ["agent://alice", "agent://bob"], + "mode_version": "1.0.0", + "configuration_version": "cfg-1", + "policy_version": "policy-1", + "ttl_ms": 60000, + "messages": [ + { + "sender": "agent://coordinator", + "message_type": "Commitment", + "payload_type": "Commitment", + "payload": { + "commitment_id": "c1", + "action": "multi_round.converged", + "authority_scope": "test", + "reason": "done", + "mode_version": "1.0.0", + "policy_version": "policy-1", + "configuration_version": "cfg-1" + }, + "expect": "reject" + }, + { + "sender": "agent://alice", + "message_type": "Contribute", + "payload_type": "multi_round.Contribute", + "payload": { "value": "option_a" }, + "expect": "accept" + }, + { + "sender": "agent://bob", + "message_type": "Contribute", + "payload_type": "multi_round.Contribute", + "payload": { "value": "option_a" }, + "expect": "accept" + }, + { + "sender": "agent://alice", + "message_type": "Commitment", + "payload_type": "Commitment", + "payload": { + "commitment_id": "c1", + "action": "multi_round.converged", + "authority_scope": "test", + "reason": "done", + "mode_version": "1.0.0", + "policy_version": "policy-1", + "configuration_version": "cfg-1" + }, + "expect": "reject" + } + ], + "expected_final_state": "Open" +} diff --git a/tests/conformance/proposal_reject_paths.json b/tests/conformance/proposal_reject_paths.json new file mode 100644 index 0000000..5820a82 --- /dev/null +++ b/tests/conformance/proposal_reject_paths.json @@ -0,0 +1,27 @@ +{ + "mode": "macp.mode.proposal.v1", + "initiator": "agent://buyer", + "participants": ["agent://buyer", "agent://seller"], + "mode_version": "1.0.0", + "configuration_version": "cfg-1", + "policy_version": "policy-1", + "ttl_ms": 60000, + "messages": [ + { + "sender": "agent://buyer", + "message_type": "Commitment", + "payload_type": "Commitment", + "payload": { + "commitment_id": "c1", + "action": "proposal.accepted", + "authority_scope": "test", + "reason": "done", + "mode_version": "1.0.0", + "policy_version": "policy-1", + "configuration_version": "cfg-1" + }, + "expect": "reject" + } + ], + "expected_final_state": "Open" +} diff --git a/tests/conformance/quorum_reject_paths.json b/tests/conformance/quorum_reject_paths.json new file mode 100644 index 0000000..47e0a33 --- /dev/null +++ b/tests/conformance/quorum_reject_paths.json @@ -0,0 +1,48 @@ +{ + "mode": "macp.mode.quorum.v1", + "initiator": "agent://coordinator", + "participants": ["agent://alice", "agent://bob", "agent://carol"], + "mode_version": "1.0.0", + "configuration_version": "cfg-1", + "policy_version": "policy-1", + "ttl_ms": 60000, + "messages": [ + { + "sender": "agent://alice", + "message_type": "Approve", + "payload_type": "quorum.Approve", + "payload": { "request_id": "r1", "reason": "lgtm" }, + "expect": "reject" + }, + { + "sender": "agent://coordinator", + "message_type": "ApprovalRequest", + "payload_type": "quorum.ApprovalRequest", + "payload": { "request_id": "r1", "action": "deploy", "summary": "Deploy v2", "required_approvals": 2 }, + "expect": "accept" + }, + { + "sender": "agent://alice", + "message_type": "Approve", + "payload_type": "quorum.Approve", + "payload": { "request_id": "r1", "reason": "lgtm" }, + "expect": "accept" + }, + { + "sender": "agent://coordinator", + "message_type": "Commitment", + "payload_type": "Commitment", + "payload": { + "commitment_id": "c1", + "action": "quorum.approved", + "authority_scope": "test", + "reason": "done", + "mode_version": "1.0.0", + "policy_version": "policy-1", + "configuration_version": "cfg-1" + }, + "expect": "reject" + } + ], + "expected_final_state": "Open" +} diff --git a/tests/conformance/task_reject_paths.json b/tests/conformance/task_reject_paths.json new file mode 100644 index 0000000..eb3c9db --- /dev/null +++ b/tests/conformance/task_reject_paths.json @@ -0,0 +1,33 @@ +{ + "mode": "macp.mode.task.v1", + "initiator": "agent://planner", + "participants": ["agent://planner", "agent://worker"], + "mode_version": "1.0.0", + "configuration_version": "cfg-1", + "policy_version": "policy-1", + "ttl_ms": 60000, + "messages": [ + { + "sender": "agent://worker", + "message_type": "TaskRequest", + "payload_type": "task.TaskRequest", + "payload": { "task_id": "t1", "title": "Build", "instructions": "Do it", "requested_assignee": "agent://worker", "deadline_unix_ms": 0 }, + "expect": "reject" + }, + { + "sender": "agent://planner", + "message_type": "TaskRequest", + "payload_type": "task.TaskRequest", + "payload": { "task_id": "t1", "title": "Build", "instructions": "Do it", "requested_assignee": "agent://worker", "deadline_unix_ms": 0 }, + "expect": "accept" + }, + { + "sender": "agent://planner", + "message_type": "TaskRequest", + "payload_type": "task.TaskRequest", + "payload": { "task_id": "t2", "title": "Another", "instructions": "Do more", "requested_assignee": "agent://worker", "deadline_unix_ms": 0 }, + "expect": "reject" + } + ], + "expected_final_state": "Open" +} diff --git a/tests/conformance_loader.rs b/tests/conformance_loader.rs index fa78ac5..9801db4 100644 --- a/tests/conformance_loader.rs +++ b/tests/conformance_loader.rs @@ -66,6 +66,7 @@ fn encode_payload(fixture: &ConformanceFixture, msg: &ConformanceMessage) -> Vec t if t.starts_with("task.") => encode_task_payload(msg), t if t.starts_with("handoff.") => encode_handoff_payload(msg), t if t.starts_with("quorum.") => encode_quorum_payload(msg), + t if t.starts_with("multi_round.") => encode_multi_round_payload(msg), _ => panic!( "Unknown payload_type: {} in fixture for mode {}", msg.payload_type, fixture.mode @@ -182,6 +183,19 @@ fn encode_quorum_payload(msg: &ConformanceMessage) -> Vec { } } +fn encode_multi_round_payload(msg: &ConformanceMessage) -> Vec { + let p = &msg.payload; + match msg.message_type.as_str() { + "Contribute" => { + // Multi-round uses JSON-encoded ContributePayload + serde_json::json!({"value": p["value"].as_str().unwrap_or_default()}) + .to_string() + .into_bytes() + } + _ => panic!("Unhandled multi_round message: {}", msg.message_type), + } +} + async fn run_conformance_fixture(path: &Path) { let content = std::fs::read_to_string(path) .unwrap_or_else(|e| panic!("Failed to read fixture {}: {e}", path.display())); @@ -292,3 +306,27 @@ conformance_test!(conformance_proposal_happy_path, "proposal_happy_path.json"); conformance_test!(conformance_task_happy_path, "task_happy_path.json"); conformance_test!(conformance_handoff_happy_path, "handoff_happy_path.json"); conformance_test!(conformance_quorum_happy_path, "quorum_happy_path.json"); +conformance_test!( + conformance_multi_round_happy_path, + "multi_round_happy_path.json" +); + +// Negative-path (rejection) conformance tests +conformance_test!( + conformance_decision_reject_paths, + "decision_reject_paths.json" +); +conformance_test!( + conformance_proposal_reject_paths, + "proposal_reject_paths.json" +); +conformance_test!(conformance_task_reject_paths, "task_reject_paths.json"); +conformance_test!( + conformance_handoff_reject_paths, + "handoff_reject_paths.json" +); +conformance_test!(conformance_quorum_reject_paths, "quorum_reject_paths.json"); +conformance_test!( + conformance_multi_round_reject_paths, + "multi_round_reject_paths.json" +); diff --git a/tests/file_backend_integration.rs b/tests/file_backend_integration.rs new file mode 100644 index 0000000..b8d08e1 --- /dev/null +++ b/tests/file_backend_integration.rs @@ -0,0 +1,200 @@ +use macp_runtime::log_store::LogStore; +use macp_runtime::mode_registry::ModeRegistry; +use macp_runtime::pb::{CommitmentPayload, Envelope, SessionStartPayload}; +use macp_runtime::registry::SessionRegistry; +use macp_runtime::replay::replay_session; +use macp_runtime::runtime::Runtime; +use macp_runtime::session::SessionState; +use macp_runtime::storage::FileBackend; +use prost::Message; +use std::sync::Arc; + +fn new_sid() -> String { + uuid::Uuid::new_v4().as_hyphenated().to_string() +} + +fn session_start(participants: Vec) -> Vec { + SessionStartPayload { + intent: "file-backend-test".into(), + participants, + mode_version: "1.0.0".into(), + configuration_version: "cfg-1".into(), + policy_version: "policy-1".into(), + ttl_ms: 60_000, + context: vec![], + roots: vec![], + } + .encode_to_vec() +} + +fn envelope( + mode: &str, + message_type: &str, + message_id: &str, + session_id: &str, + sender: &str, + payload: Vec, +) -> Envelope { + Envelope { + macp_version: "1.0".into(), + mode: mode.into(), + message_type: message_type.into(), + message_id: message_id.into(), + session_id: session_id.into(), + sender: sender.into(), + timestamp_unix_ms: chrono::Utc::now().timestamp_millis(), + payload, + } +} + +fn commitment(action: &str) -> Vec { + CommitmentPayload { + commitment_id: "c1".into(), + action: action.into(), + authority_scope: "test".into(), + reason: "done".into(), + mode_version: "1.0.0".into(), + policy_version: "policy-1".into(), + configuration_version: "cfg-1".into(), + } + .encode_to_vec() +} + +#[tokio::test] +async fn file_backend_full_lifecycle() { + let dir = tempfile::tempdir().unwrap(); + let storage: Arc = + Arc::new(FileBackend::new(dir.path().to_path_buf()).unwrap()); + let registry = Arc::new(SessionRegistry::new()); + let log_store = Arc::new(LogStore::new()); + let rt = Runtime::new(Arc::clone(&storage), registry, log_store); + + let sid = new_sid(); + let mode = "macp.mode.decision.v1"; + + rt.process( + &envelope( + mode, + "SessionStart", + "m1", + &sid, + "agent://orchestrator", + session_start(vec!["agent://a".into()]), + ), + None, + ) + .await + .unwrap(); + + let proposal = macp_runtime::decision_pb::ProposalPayload { + proposal_id: "p1".into(), + option: "deploy".into(), + rationale: "ready".into(), + supporting_data: vec![], + } + .encode_to_vec(); + rt.process( + &envelope( + mode, + "Proposal", + "m2", + &sid, + "agent://orchestrator", + proposal, + ), + None, + ) + .await + .unwrap(); + + let vote = macp_runtime::decision_pb::VotePayload { + proposal_id: "p1".into(), + vote: "approve".into(), + reason: "good".into(), + } + .encode_to_vec(); + rt.process(&envelope(mode, "Vote", "m3", &sid, "agent://a", vote), None) + .await + .unwrap(); + + let result = rt + .process( + &envelope( + mode, + "Commitment", + "m4", + &sid, + "agent://orchestrator", + commitment("decision.selected"), + ), + None, + ) + .await + .unwrap(); + assert_eq!(result.session_state, SessionState::Resolved); + + // Verify log was persisted + let log = storage.load_log(&sid).await.unwrap(); + assert_eq!(log.len(), 4); +} + +#[tokio::test] +async fn file_backend_crash_recovery_via_replay() { + let dir = tempfile::tempdir().unwrap(); + let storage: Arc = + Arc::new(FileBackend::new(dir.path().to_path_buf()).unwrap()); + let registry = Arc::new(SessionRegistry::new()); + let log_store = Arc::new(LogStore::new()); + let rt = Runtime::new(Arc::clone(&storage), registry, log_store); + + let sid = new_sid(); + let mode = "macp.mode.decision.v1"; + + rt.process( + &envelope( + mode, + "SessionStart", + "m1", + &sid, + "agent://orchestrator", + session_start(vec!["agent://a".into()]), + ), + None, + ) + .await + .unwrap(); + + let proposal = macp_runtime::decision_pb::ProposalPayload { + proposal_id: "p1".into(), + option: "deploy".into(), + rationale: "ready".into(), + supporting_data: vec![], + } + .encode_to_vec(); + rt.process( + &envelope( + mode, + "Proposal", + "m2", + &sid, + "agent://orchestrator", + proposal, + ), + None, + ) + .await + .unwrap(); + + // "Crash": discard in-memory state, replay from disk + drop(rt); + + let log_entries = storage.load_log(&sid).await.unwrap(); + assert_eq!(log_entries.len(), 2); + + let mode_registry = ModeRegistry::build_default(); + let session = replay_session(&sid, &log_entries, &mode_registry).unwrap(); + assert_eq!(session.state, SessionState::Open); + assert_eq!(session.seen_message_ids.len(), 2); + assert!(session.seen_message_ids.contains("m1")); + assert!(session.seen_message_ids.contains("m2")); +} diff --git a/tests/integration_mode_lifecycle.rs b/tests/integration_mode_lifecycle.rs index 8219484..eb01912 100644 --- a/tests/integration_mode_lifecycle.rs +++ b/tests/integration_mode_lifecycle.rs @@ -444,3 +444,86 @@ async fn quorum_full_lifecycle_through_runtime() { .unwrap(); assert_eq!(result.session_state, SessionState::Resolved); } + +#[tokio::test] +async fn multi_round_full_lifecycle_through_runtime() { + let rt = make_runtime(); + let sid = new_sid(); + let mode = "macp.mode.multi_round.v1"; + + rt.process( + &envelope( + mode, + "SessionStart", + "m1", + &sid, + "agent://coordinator", + session_start(vec!["agent://alice".into(), "agent://bob".into()]), + ), + None, + ) + .await + .unwrap(); + + rt.process( + &envelope( + mode, + "Contribute", + "m2", + &sid, + "agent://alice", + br#"{"value":"option_a"}"#.to_vec(), + ), + None, + ) + .await + .unwrap(); + + rt.process( + &envelope( + mode, + "Contribute", + "m3", + &sid, + "agent://bob", + br#"{"value":"option_b"}"#.to_vec(), + ), + None, + ) + .await + .unwrap(); + + rt.process( + &envelope( + mode, + "Contribute", + "m4", + &sid, + "agent://bob", + br#"{"value":"option_a"}"#.to_vec(), + ), + None, + ) + .await + .unwrap(); + + // Session still Open — convergence tracked, requires Commitment + let session = rt.get_session_checked(&sid).await.unwrap(); + assert_eq!(session.state, SessionState::Open); + + let result = rt + .process( + &envelope( + mode, + "Commitment", + "m5", + &sid, + "agent://coordinator", + commitment("multi_round.converged"), + ), + None, + ) + .await + .unwrap(); + assert_eq!(result.session_state, SessionState::Resolved); +} diff --git a/tests/replay_round_trip.rs b/tests/replay_round_trip.rs index 7b6423e..167fba7 100644 --- a/tests/replay_round_trip.rs +++ b/tests/replay_round_trip.rs @@ -330,3 +330,49 @@ fn replay_quorum_session() { assert!(session.resolution.is_some()); assert_eq!(session.seen_message_ids.len(), 5); } + +#[test] +fn replay_multi_round_session() { + let registry = make_registry(); + let mode = "macp.mode.multi_round.v1"; + + let entries = vec![ + incoming( + "m1", + "SessionStart", + "agent://coordinator", + start_payload(vec!["agent://alice", "agent://bob"]), + mode, + 1000, + ), + incoming( + "m2", + "Contribute", + "agent://alice", + br#"{"value":"option_a"}"#.to_vec(), + mode, + 2000, + ), + incoming( + "m3", + "Contribute", + "agent://bob", + br#"{"value":"option_a"}"#.to_vec(), + mode, + 3000, + ), + incoming( + "m4", + "Commitment", + "agent://coordinator", + commitment("multi_round.converged"), + mode, + 4000, + ), + ]; + + let session = replay_session("s1", &entries, ®istry).unwrap(); + assert_eq!(session.state, SessionState::Resolved); + assert!(session.resolution.is_some()); + assert_eq!(session.seen_message_ids.len(), 4); +} diff --git a/tests/stream_integration.rs b/tests/stream_integration.rs new file mode 100644 index 0000000..401b120 --- /dev/null +++ b/tests/stream_integration.rs @@ -0,0 +1,157 @@ +use macp_runtime::log_store::LogStore; +use macp_runtime::pb::{Envelope, SessionStartPayload}; +use macp_runtime::registry::SessionRegistry; +use macp_runtime::runtime::Runtime; +use macp_runtime::storage::MemoryBackend; +use prost::Message; +use std::sync::Arc; + +fn new_sid() -> String { + uuid::Uuid::new_v4().as_hyphenated().to_string() +} + +fn make_runtime() -> Runtime { + let storage: Arc = Arc::new(MemoryBackend); + let registry = Arc::new(SessionRegistry::new()); + let log_store = Arc::new(LogStore::new()); + Runtime::new(storage, registry, log_store) +} + +fn session_start(participants: Vec) -> Vec { + SessionStartPayload { + intent: "stream-test".into(), + participants, + mode_version: "1.0.0".into(), + configuration_version: "cfg-1".into(), + policy_version: "policy-1".into(), + ttl_ms: 60_000, + context: vec![], + roots: vec![], + } + .encode_to_vec() +} + +fn envelope( + mode: &str, + message_type: &str, + message_id: &str, + session_id: &str, + sender: &str, + payload: Vec, +) -> Envelope { + Envelope { + macp_version: "1.0".into(), + mode: mode.into(), + message_type: message_type.into(), + message_id: message_id.into(), + session_id: session_id.into(), + sender: sender.into(), + timestamp_unix_ms: chrono::Utc::now().timestamp_millis(), + payload, + } +} + +#[tokio::test] +async fn stream_receives_accepted_envelopes() { + let rt = make_runtime(); + let sid = new_sid(); + let mode = "macp.mode.decision.v1"; + + let mut rx = rt.subscribe_session_stream(&sid); + + rt.process( + &envelope( + mode, + "SessionStart", + "m1", + &sid, + "agent://orchestrator", + session_start(vec!["agent://a".into()]), + ), + None, + ) + .await + .unwrap(); + + let env = rx.recv().await.unwrap(); + assert_eq!(env.message_id, "m1"); + assert_eq!(env.message_type, "SessionStart"); +} + +#[tokio::test] +async fn stream_ordering_matches_processing_order() { + let rt = make_runtime(); + let sid = new_sid(); + let mode = "macp.mode.decision.v1"; + + let mut rx = rt.subscribe_session_stream(&sid); + + rt.process( + &envelope( + mode, + "SessionStart", + "m1", + &sid, + "agent://orchestrator", + session_start(vec!["agent://a".into()]), + ), + None, + ) + .await + .unwrap(); + + let proposal = macp_runtime::decision_pb::ProposalPayload { + proposal_id: "p1".into(), + option: "deploy".into(), + rationale: "ready".into(), + supporting_data: vec![], + } + .encode_to_vec(); + rt.process( + &envelope( + mode, + "Proposal", + "m2", + &sid, + "agent://orchestrator", + proposal, + ), + None, + ) + .await + .unwrap(); + + let first = rx.recv().await.unwrap(); + let second = rx.recv().await.unwrap(); + assert_eq!(first.message_id, "m1"); + assert_eq!(second.message_id, "m2"); +} + +#[tokio::test] +async fn concurrent_subscribers_both_receive_events() { + let rt = make_runtime(); + let sid = new_sid(); + let mode = "macp.mode.decision.v1"; + + let mut rx1 = rt.subscribe_session_stream(&sid); + let mut rx2 = rt.subscribe_session_stream(&sid); + + rt.process( + &envelope( + mode, + "SessionStart", + "m1", + &sid, + "agent://orchestrator", + session_start(vec!["agent://a".into()]), + ), + None, + ) + .await + .unwrap(); + + let env1 = rx1.recv().await.unwrap(); + let env2 = rx2.recv().await.unwrap(); + assert_eq!(env1.message_id, "m1"); + assert_eq!(env2.message_id, "m1"); +} From 2f04362874fe78b94435f72591d591dba1be9076 Mon Sep 17 00:00:00 2001 From: Ajit Koti Date: Fri, 20 Mar 2026 19:58:33 -0700 Subject: [PATCH 2/2] Fix Clippy Issue --- tests/concurrent_messages.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/tests/concurrent_messages.rs b/tests/concurrent_messages.rs index 7ba3449..8e231f3 100644 --- a/tests/concurrent_messages.rs +++ b/tests/concurrent_messages.rs @@ -169,15 +169,12 @@ async fn concurrent_duplicate_messages_handled() { let mut accepted = 0; let mut duplicates = 0; for handle in handles { - match handle.await.unwrap() { - Ok(result) => { - if result.duplicate { - duplicates += 1; - } else { - accepted += 1; - } + if let Ok(result) = handle.await.unwrap() { + if result.duplicate { + duplicates += 1; + } else { + accepted += 1; } - Err(_) => {} } }