diff --git a/README.md b/README.md index 959ad12..512476d 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ Reference runtime for the Multi-Agent Coordination Protocol (MACP). -This runtime implements the current MACP core/service surface and all six standards-track modes in the main RFC repository. The focus of this release is freeze-readiness for SDKs and real-world unary and streaming integrations: strict `SessionStart`, mode-semantic correctness, authenticated senders, bounded resources, and durable restart recovery. +This runtime implements the current MACP core/service surface, five standards-track modes, and one built-in extension mode. The focus of this release is freeze-readiness for SDKs and real-world unary and streaming integrations: strict `SessionStart`, mode-semantic correctness, authenticated senders, bounded resources, durable restart recovery, and extension mode lifecycle management. ## What changed in v0.4.0 @@ -40,7 +40,14 @@ This runtime implements the current MACP core/service surface and all six standa - **StreamSession enabled** - `Initialize` advertises `stream: true` - `StreamSession` provides per-session bidirectional streaming of accepted envelopes - - `WatchModeRegistry` and `WatchRoots` implemented (basic: send initial state, hold stream open) + - `WatchModeRegistry` fires live `RegistryChanged` events on mode register/unregister/promote + - `WatchRoots` implemented (basic: send initial state, hold stream open) +- **Extension mode lifecycle** + - `multi_round` demoted from standards-track to built-in extension (`ext.multi_round.v1`) + - `ListExtModes` returns extension mode descriptors + - `RegisterExtMode` dynamically registers new extension modes with a passthrough handler + - `UnregisterExtMode` removes dynamically registered extensions (built-in modes protected) + - `PromoteMode` promotes extensions to standards-track with optional identifier rename - **Structured logging via `tracing`** - use `RUST_LOG` env var to control log level (e.g. `RUST_LOG=info`) - **Per-mode metrics** @@ -55,13 +62,16 @@ Standards-track modes: - `macp.mode.task.v1` - `macp.mode.handoff.v1` - `macp.mode.quorum.v1` -- `macp.mode.multi_round.v1` + +Built-in extension modes: + +- `ext.multi_round.v1` ## Runtime behavior that SDKs should assume ### Session bootstrap -For all six standards-track modes, `SessionStartPayload` must include: +For all standards-track modes and built-in extensions, `SessionStartPayload` must include: - `participants` - `mode_version` @@ -191,6 +201,10 @@ cargo run --bin fuzz_client | `StreamSession` | implemented | | `WatchModeRegistry` | implemented | | `WatchRoots` | implemented | +| `ListExtModes` | implemented | +| `RegisterExtMode` | implemented | +| `UnregisterExtMode` | implemented | +| `PromoteMode` | implemented | ## Architecture @@ -203,8 +217,9 @@ Client Request | [Mode Registry] -- mode_registry.rs | \ - [Mode Logic] [Discovery] - mode/*.rs ListModes, GetManifest + [Mode Logic] [Discovery + Extension Lifecycle] + mode/*.rs ListModes, ListExtModes, GetManifest, + RegisterExtMode, UnregisterExtMode, PromoteMode | [Storage Layer] -- storage.rs, log_store.rs | @@ -230,11 +245,13 @@ runtime/ │ ├── storage.rs # storage backend trait, FileBackend, crash recovery │ ├── replay.rs # session rebuild from append-only log │ ├── metrics.rs # per-mode metrics counters -│ ├── mode/ # mode implementations +│ ├── mode/ # mode implementations (standards-track + extensions) +│ │ ├── passthrough.rs # generic handler for dynamically registered extensions +│ │ └── ... │ └── bin/ # local development example clients ├── tests/ │ ├── integration_mode_lifecycle.rs # full-stack integration tests -│ ├── replay_round_trip.rs # replay tests for all 6 modes +│ ├── replay_round_trip.rs # replay tests for all modes │ ├── conformance_loader.rs # JSON fixture runner │ └── conformance/ # per-mode conformance fixtures ├── docs/ @@ -250,7 +267,7 @@ Set `MACP_ALLOW_INSECURE=1` for local development, or provide `MACP_TLS_CERT_PAT Session IDs must be UUID v4/v7 in hyphenated lowercase form (36 chars) or base64url tokens (22+ chars). Short or human-readable IDs like `"s1"` or `"my-session"` are rejected. **`InvalidPayload` on `SessionStart`** -For standards-track modes, `SessionStartPayload` must include non-empty `participants`, `mode_version`, `configuration_version`, and a positive `ttl_ms`. Empty payloads are rejected. +For standards-track modes and built-in extensions (including `ext.multi_round.v1`), `SessionStartPayload` must include non-empty `participants`, `mode_version`, `configuration_version`, and a positive `ttl_ms`. Empty payloads are rejected. **`Forbidden` error** Check that the sender identity matches the session's participant list. For `Commitment` messages, only the session initiator is authorized. Verify your bearer token maps to the correct sender. @@ -264,8 +281,9 @@ Run `make sync-protos` to update local proto files from BSR. ## Development notes - The RFC/spec repository remains the normative source for protocol semantics. -- This runtime only accepts the canonical standards-track mode identifiers for all six modes. -- `multi_round` is now standards-track and is advertised by discovery RPCs. +- Five standards-track modes use the canonical `macp.mode.*` identifiers. +- `multi_round` is a built-in extension (`ext.multi_round.v1`) — not standards-track, but ships with the runtime and enforces strict `SessionStart`. +- Extension modes can be dynamically registered, unregistered, and promoted via `RegisterExtMode`, `UnregisterExtMode`, and `PromoteMode` RPCs. - `StreamSession` is enabled and binds one gRPC stream to one session, emitting accepted envelopes in order. See `docs/README.md` and `docs/examples.md` for the updated local development and usage guidance. diff --git a/docs/README.md b/docs/README.md index 3be980a..a0f96fe 100644 --- a/docs/README.md +++ b/docs/README.md @@ -7,11 +7,12 @@ The RFC/spec repository is still the normative source for MACP semantics. These ## What is in this runtime profile - MACP server over gRPC with unary RPCs and per-session bidirectional streaming -- six standards-track modes from the main RFC repository -- strict canonical `SessionStart` for standards-track modes +- five standards-track modes from the main RFC repository and one built-in extension +- strict canonical `SessionStart` for standards-track modes and qualifying extensions - authenticated sender derivation - payload limits and rate limiting - optional file-backed persistence for sessions and accepted-history logs +- extension mode lifecycle management (register, unregister, promote) ## Standards-track modes @@ -20,7 +21,10 @@ The RFC/spec repository is still the normative source for MACP semantics. These - `macp.mode.task.v1` - `macp.mode.handoff.v1` - `macp.mode.quorum.v1` -- `macp.mode.multi_round.v1` + +## Built-in extension modes + +- `ext.multi_round.v1` ## Freeze profile @@ -37,10 +41,17 @@ Implemented and supported: - `ListModes` - `ListRoots` -Streaming watch RPCs (basic — send initial state, hold stream open): +Extension mode lifecycle: + +- `ListExtModes` +- `RegisterExtMode` +- `UnregisterExtMode` +- `PromoteMode` + +Streaming watch RPCs: -- `WatchModeRegistry` -- `WatchRoots` +- `WatchModeRegistry` — sends initial state, then fires on register/unregister/promote changes +- `WatchRoots` — sends initial state, holds stream open ## Security model diff --git a/docs/architecture.md b/docs/architecture.md index b5e2f00..a13347c 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -34,12 +34,20 @@ Responsibilities: - provide standards-track mode names for `ListModes` - provide mode descriptors for `ListModes` and `GetManifest` +The registry uses `RwLock` for thread-safe dynamic mode registration. + Key methods: -- `build_default()` — constructs the canonical mode set +- `build_default()` — constructs the canonical mode set (5 standards-track + 1 built-in extension) - `get_mode(name)` — mode lookup for dispatch -- `standard_mode_names()` — drives `ListModes` and `GetManifest` +- `standard_mode_names()` — drives `ListModes` - `standard_mode_descriptors()` — drives `ListModes` response +- `all_mode_names()` — drives `GetManifest` and `Initialize` (all modes) +- `extension_mode_descriptors()` — drives `ListExtModes` +- `register_extension(descriptor)` — dynamic extension registration +- `unregister_extension(mode)` — dynamic extension removal (built-in modes cannot be removed) +- `promote_mode(mode, new_name)` — promote extension to standards-track +- `subscribe_changes()` — broadcast channel for `WatchModeRegistry` ## 4. Mode layer (`src/mode/*`) @@ -57,7 +65,8 @@ Implemented modes: - Task — delegated task with serial assignment, progress tracking, terminal reports - Handoff — serial handoff offers with accept/decline disposition - Quorum — threshold approval with ballots -- MultiRound — iterative value convergence with explicit Commitment +- MultiRound (`ext.multi_round.v1`) — built-in extension: iterative value convergence with explicit Commitment +- PassthroughMode — generic handler for dynamically registered extension modes ## 5. Storage layer @@ -110,8 +119,9 @@ Client Request | [Mode Registry] -- mode_registry.rs | \ - [Mode Logic] [Discovery] - mode/*.rs ListModes, GetManifest + [Mode Logic] [Discovery + Extension Lifecycle] + mode/*.rs ListModes, ListExtModes, GetManifest, + RegisterExtMode, UnregisterExtMode, PromoteMode | [Storage Layer] -- storage.rs, log_store.rs | diff --git a/docs/examples.md b/docs/examples.md index 5d662a1..2e6c706 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -12,16 +12,16 @@ cargo run The example binaries in `src/bin` attach `x-macp-agent-id` metadata so the runtime can derive the authenticated sender. -## Ground rules for every standards-track session +## Ground rules for sessions with strict validation -For these modes: +For these standards-track modes and built-in extensions: - `macp.mode.decision.v1` - `macp.mode.proposal.v1` - `macp.mode.task.v1` - `macp.mode.handoff.v1` - `macp.mode.quorum.v1` -- `macp.mode.multi_round.v1` +- `ext.multi_round.v1` (built-in extension) `SessionStartPayload` must include all of the following: @@ -132,7 +132,7 @@ Flow: 3. participants send ballots 4. coordinator emits `Commitment` after threshold is satisfied -## Example 6: Multi-round mode +## Example 6: Multi-round mode (built-in extension) Run: @@ -142,16 +142,16 @@ cargo run --bin multi_round_client Flow: -1. coordinator starts the session with strict `SessionStart` (participants, mode_version, configuration_version, ttl_ms) -2. participants exchange proposals across multiple rounds +1. coordinator starts the session with mode `ext.multi_round.v1` and strict `SessionStart` (participants, mode_version, configuration_version, ttl_ms) +2. participants exchange contributions across multiple rounds 3. convergence is tracked by the runtime 4. coordinator emits `Commitment` after convergence Important runtime behavior: -- `macp.mode.multi_round.v1` is a standards-track mode and is advertised by discovery RPCs +- `ext.multi_round.v1` is a built-in extension mode, discoverable via `ListExtModes` - convergence is tracked but does not auto-resolve the session — an explicit `Commitment` is required -- uses the same strict `SessionStart` contract as all other standards-track modes +- uses the same strict `SessionStart` contract as standards-track modes ## Example 7: StreamSession @@ -167,7 +167,23 @@ Practical notes: - use a session-scoped `Signal` envelope with the correct `session_id` and `mode` to attach to an existing session without mutating it - mixed-session streams are rejected with `FAILED_PRECONDITION` -## Example 8: Freeze-check / error-path client +## Example 8: Extension mode lifecycle + +The runtime supports dynamic extension mode management via gRPC: + +1. **`ListExtModes`** — discover available extension modes (e.g. `ext.multi_round.v1`) +2. **`RegisterExtMode`** — register a new extension mode by providing a `ModeDescriptor` with the mode name, message types, and terminal message types; the runtime creates a passthrough handler +3. **`UnregisterExtMode`** — remove a dynamically registered extension (built-in modes like `ext.multi_round.v1` cannot be removed) +4. **`PromoteMode`** — promote an extension to standards-track, optionally renaming the identifier (e.g. `ext.foo.v1` to `macp.mode.foo.v1`) + +Important runtime behavior: + +- extension mode names must not use the reserved `macp.mode.*` namespace +- dynamically registered modes use a passthrough handler that accepts any listed message type and requires explicit `Commitment` from the initiator to resolve +- `WatchModeRegistry` subscribers receive `RegistryChanged` events on every register, unregister, or promote operation +- `GetManifest` and `Initialize` always include all modes (standards-track + extensions); `ListModes` only returns standards-track + +## Example 9: Freeze-check / error-path client Run: @@ -208,7 +224,7 @@ and ensure the client sets `x-macp-agent-id`. ### `INVALID_ENVELOPE` on `SessionStart` -For a standards-track mode, check that: +For a standards-track mode or built-in extension, check that: - the mode name is canonical - the payload is not empty diff --git a/docs/protocol.md b/docs/protocol.md index 3ce45a1..6e9d1b5 100644 --- a/docs/protocol.md +++ b/docs/protocol.md @@ -22,12 +22,23 @@ Clients should call `Initialize` before using the runtime. - `ListRoots` - `WatchModeRegistry` - `WatchRoots` +- `ListExtModes` +- `RegisterExtMode` +- `UnregisterExtMode` +- `PromoteMode` ## Streaming watch RPCs -- `WatchModeRegistry` — sends the current registry state, then holds the stream open +- `WatchModeRegistry` — sends the current registry state, then fires `RegistryChanged` on register/unregister/promote - `WatchRoots` — sends the current roots state, then holds the stream open +## Extension mode lifecycle RPCs + +- `ListExtModes` — returns `ModeDescriptor` entries for all extension modes +- `RegisterExtMode` — registers a new extension mode from a `ModeDescriptor`; the runtime creates a passthrough handler that accepts message types listed in the descriptor and requires explicit `Commitment` to resolve +- `UnregisterExtMode` — removes a dynamically registered extension; built-in and standards-track modes cannot be removed +- `PromoteMode` — promotes an extension to standards-track; optionally renames the mode identifier (e.g. `ext.foo.v1` to `macp.mode.foo.v1`) + ## StreamSession profile `StreamSession` is session-scoped and authoritative for accepted envelopes: @@ -40,16 +51,16 @@ Clients should call `Initialize` before using the runtime. - stream-level validation failures terminate the stream with a gRPC status; use `Send` if you need explicit per-message negative acknowledgements - to attach to an existing session without mutating it, send a session-scoped `Signal` envelope with the correct `session_id` and `mode` -## Standards-track mode rules +## Strict session start rules -For these modes: +For these standards-track modes and built-in extensions: - `macp.mode.decision.v1` - `macp.mode.proposal.v1` - `macp.mode.task.v1` - `macp.mode.handoff.v1` - `macp.mode.quorum.v1` -- `macp.mode.multi_round.v1` +- `ext.multi_round.v1` (built-in extension) `SessionStartPayload` must bind: @@ -62,7 +73,7 @@ Empty payloads are rejected. Empty `mode` values are rejected. Duplicate partici ## Multi-round mode -`macp.mode.multi_round.v1` is a standards-track mode. It uses the same strict `SessionStart` contract as all other standards-track modes. Convergence is tracked but does not auto-resolve the session — an explicit `Commitment` is required after convergence. +`ext.multi_round.v1` is a built-in extension mode. It uses the same strict `SessionStart` contract as standards-track modes. Convergence is tracked but does not auto-resolve the session — an explicit `Commitment` is required after convergence. ## Security profile @@ -92,8 +103,8 @@ This gives restart recovery for session metadata, dedup state, and accepted-hist ## Commitment validation -For standards-track modes, `CommitmentPayload` must carry version fields that match the session-bound values. +For standards-track modes and built-in extensions, `CommitmentPayload` must carry version fields that match the session-bound values. Dynamically registered extension modes use a passthrough handler that also validates commitment version fields. ## Discovery notes -`ListModes` returns all six standards-track modes. `GetManifest` exposes a manifest that matches the implemented unary and streaming capabilities. +`ListModes` returns five standards-track modes. `ListExtModes` returns extension mode descriptors. `GetManifest` exposes all supported modes (standards-track + extensions). `RegisterExtMode`, `UnregisterExtMode`, and `PromoteMode` manage extension lifecycle at runtime. diff --git a/proto/macp/v1/core.proto b/proto/macp/v1/core.proto index 3964b17..55a9487 100644 --- a/proto/macp/v1/core.proto +++ b/proto/macp/v1/core.proto @@ -239,6 +239,44 @@ message WatchRootsResponse { RootsChanged change = 1; } +// Extension mode lifecycle management +message ListExtModesRequest {} + +message ListExtModesResponse { + repeated ModeDescriptor modes = 1; +} + +message RegisterExtModeRequest { + ModeDescriptor descriptor = 1; +} + +message RegisterExtModeResponse { + bool ok = 1; + string error = 2; +} + +message UnregisterExtModeRequest { + string mode = 1; +} + +message UnregisterExtModeResponse { + bool ok = 1; + string error = 2; +} + +message PromoteModeRequest { + string mode = 1; + // Optional new identifier for the promoted mode (e.g., "macp.mode.foo.v1"). + // If empty, the existing identifier is kept. + string promoted_mode_name = 2; +} + +message PromoteModeResponse { + bool ok = 1; + string error = 2; + string mode = 3; +} + service MACPRuntimeService { rpc Initialize(InitializeRequest) returns (InitializeResponse); rpc Send(SendRequest) returns (SendResponse); @@ -250,4 +288,9 @@ service MACPRuntimeService { rpc WatchModeRegistry(WatchModeRegistryRequest) returns (stream WatchModeRegistryResponse); rpc ListRoots(ListRootsRequest) returns (ListRootsResponse); rpc WatchRoots(WatchRootsRequest) returns (stream WatchRootsResponse); + // Extension mode lifecycle + rpc ListExtModes(ListExtModesRequest) returns (ListExtModesResponse); + rpc RegisterExtMode(RegisterExtModeRequest) returns (RegisterExtModeResponse); + rpc UnregisterExtMode(UnregisterExtModeRequest) returns (UnregisterExtModeResponse); + rpc PromoteMode(PromoteModeRequest) returns (PromoteModeResponse); } diff --git a/proto/macp/v1/envelope.proto b/proto/macp/v1/envelope.proto index df9b1dd..b1a7c54 100644 --- a/proto/macp/v1/envelope.proto +++ b/proto/macp/v1/envelope.proto @@ -6,7 +6,7 @@ package macp.v1; message Envelope { string macp_version = 1; - string mode = 2; + string mode = 2; // empty for ambient Signals string message_type = 3; string message_id = 4; string session_id = 5; // empty for ambient messages diff --git a/src/bin/multi_round_client.rs b/src/bin/multi_round_client.rs index 3445f02..7a93546 100644 --- a/src/bin/multi_round_client.rs +++ b/src/bin/multi_round_client.rs @@ -18,7 +18,7 @@ async fn main() -> Result<(), Box> { &mut client, "coordinator", envelope( - "macp.mode.multi_round.v1", + "ext.multi_round.v1", "SessionStart", "m0", &session_id, @@ -34,7 +34,7 @@ async fn main() -> Result<(), Box> { &mut client, "alice", envelope( - "macp.mode.multi_round.v1", + "ext.multi_round.v1", "Contribute", "m1", &session_id, @@ -50,7 +50,7 @@ async fn main() -> Result<(), Box> { &mut client, "bob", envelope( - "macp.mode.multi_round.v1", + "ext.multi_round.v1", "Contribute", "m2", &session_id, @@ -70,7 +70,7 @@ async fn main() -> Result<(), Box> { &mut client, "bob", envelope( - "macp.mode.multi_round.v1", + "ext.multi_round.v1", "Contribute", "m3", &session_id, @@ -93,7 +93,7 @@ async fn main() -> Result<(), Box> { &mut client, "coordinator", envelope( - "macp.mode.multi_round.v1", + "ext.multi_round.v1", "Commitment", "m4", &session_id, diff --git a/src/mode/mod.rs b/src/mode/mod.rs index 758cf0b..16647ec 100644 --- a/src/mode/mod.rs +++ b/src/mode/mod.rs @@ -1,6 +1,7 @@ pub mod decision; pub mod handoff; pub mod multi_round; +pub mod passthrough; pub mod proposal; pub mod quorum; pub mod task; @@ -18,9 +19,11 @@ pub const STANDARD_MODE_NAMES: &[&str] = &[ "macp.mode.task.v1", "macp.mode.handoff.v1", "macp.mode.quorum.v1", - "macp.mode.multi_round.v1", ]; +/// Built-in extension modes shipped with this runtime but not yet standards-track. +pub const EXTENSION_MODE_NAMES: &[&str] = &["ext.multi_round.v1"]; + /// The result of a Mode processing a message. /// The runtime applies this response to mutate session state. #[derive(Debug)] @@ -61,11 +64,15 @@ pub fn standard_mode_names() -> &'static [&'static str] { STANDARD_MODE_NAMES } -pub fn standard_mode_descriptors() -> Vec { - fn schema_map(path: &str) -> HashMap { - HashMap::from([("protobuf".to_string(), path.to_string())]) - } +pub fn extension_mode_names() -> &'static [&'static str] { + EXTENSION_MODE_NAMES +} + +fn schema_map(path: &str) -> HashMap { + HashMap::from([("protobuf".to_string(), path.to_string())]) +} +pub fn standard_mode_descriptors() -> Vec { vec![ ModeDescriptor { mode: "macp.mode.decision.v1".into(), @@ -160,20 +167,29 @@ pub fn standard_mode_descriptors() -> Vec { terminal_message_types: vec!["Commitment".into()], schema_uris: schema_map("buf.build/multiagentcoordinationprotocol/macp"), }, - ModeDescriptor { - mode: "macp.mode.multi_round.v1".into(), - mode_version: "1.0.0".into(), - title: "Multi-Round Mode".into(), - description: "Iterative convergence through multiple contribution rounds until all participants agree, with a terminal Commitment.".into(), - determinism_class: "semantic-deterministic".into(), - participant_model: "peer".into(), - message_types: vec![ - "SessionStart".into(), - "Contribute".into(), - "Commitment".into(), - ], - terminal_message_types: vec!["Commitment".into()], - schema_uris: schema_map("buf.build/multiagentcoordinationprotocol/macp"), - }, ] } + +pub fn extension_mode_descriptors() -> Vec { + vec![ModeDescriptor { + mode: "ext.multi_round.v1".into(), + mode_version: "1.0.0".into(), + title: "Multi-Round Mode".into(), + description: "Iterative convergence through multiple contribution rounds until all participants agree, with a terminal Commitment.".into(), + determinism_class: "semantic-deterministic".into(), + participant_model: "peer".into(), + message_types: vec![ + "SessionStart".into(), + "Contribute".into(), + "Commitment".into(), + ], + terminal_message_types: vec!["Commitment".into()], + schema_uris: schema_map("buf.build/multiagentcoordinationprotocol/macp"), + }] +} + +pub fn all_mode_descriptors() -> Vec { + let mut all = standard_mode_descriptors(); + all.extend(extension_mode_descriptors()); + all +} diff --git a/src/mode/multi_round.rs b/src/mode/multi_round.rs index 9524203..43a4cef 100644 --- a/src/mode/multi_round.rs +++ b/src/mode/multi_round.rs @@ -189,7 +189,7 @@ mod tests { ttl_ms: 60_000, started_at_unix_ms: 0, resolution: None, - mode: "macp.mode.multi_round.v1".into(), + mode: "ext.multi_round.v1".into(), mode_state: vec![], participants: vec![], seen_message_ids: HashSet::new(), @@ -206,7 +206,7 @@ mod tests { fn session_start_env() -> Envelope { Envelope { macp_version: "1.0".into(), - mode: "macp.mode.multi_round.v1".into(), + mode: "ext.multi_round.v1".into(), message_type: "SessionStart".into(), message_id: "m0".into(), session_id: "s1".into(), @@ -220,7 +220,7 @@ mod tests { let payload = serde_json::json!({"value": value}).to_string(); Envelope { macp_version: "1.0".into(), - mode: "macp.mode.multi_round.v1".into(), + mode: "ext.multi_round.v1".into(), message_type: "Contribute".into(), message_id: format!("m_{}", sender), session_id: "s1".into(), @@ -243,7 +243,7 @@ mod tests { .encode_to_vec(); Envelope { macp_version: "1.0".into(), - mode: "macp.mode.multi_round.v1".into(), + mode: "ext.multi_round.v1".into(), message_type: "Commitment".into(), message_id: "m_commit".into(), session_id: "s1".into(), @@ -531,7 +531,7 @@ mod tests { let session = session_with_state(&state); let env = Envelope { macp_version: "1.0".into(), - mode: "macp.mode.multi_round.v1".into(), + mode: "ext.multi_round.v1".into(), message_type: "Message".into(), message_id: "m1".into(), session_id: "s1".into(), @@ -557,7 +557,7 @@ mod tests { let session = session_with_state(&state); let env = Envelope { macp_version: "1.0".into(), - mode: "macp.mode.multi_round.v1".into(), + mode: "ext.multi_round.v1".into(), message_type: "Contribute".into(), message_id: "m1".into(), session_id: "s1".into(), diff --git a/src/mode/passthrough.rs b/src/mode/passthrough.rs new file mode 100644 index 0000000..210cc37 --- /dev/null +++ b/src/mode/passthrough.rs @@ -0,0 +1,156 @@ +use crate::error::MacpError; +use crate::mode::{Mode, ModeResponse}; +use crate::pb::Envelope; +use crate::session::Session; + +/// Generic extension mode handler for dynamically registered modes. +/// +/// Accepts any message type listed in the mode descriptor. Commitment messages +/// from the initiator resolve the session. All other messages are accepted and +/// the payload is persisted as mode state. +pub struct PassthroughMode { + pub allowed_message_types: Vec, +} + +impl Mode for PassthroughMode { + fn on_session_start( + &self, + _session: &Session, + _env: &Envelope, + ) -> Result { + Ok(ModeResponse::NoOp) + } + + fn on_message(&self, session: &Session, env: &Envelope) -> Result { + if !self.allowed_message_types.is_empty() + && !self + .allowed_message_types + .iter() + .any(|t| t == &env.message_type) + { + return Err(MacpError::InvalidPayload); + } + + if env.message_type == "Commitment" { + let commitment = + crate::mode::util::validate_commitment_payload_for_session(session, &env.payload)?; + let resolution = serde_json::json!({ + "action": commitment.action, + "commitment_id": commitment.commitment_id, + }) + .to_string() + .into_bytes(); + return Ok(ModeResponse::Resolve(resolution)); + } + + Ok(ModeResponse::PersistState(env.payload.clone())) + } + + fn authorize_sender(&self, session: &Session, env: &Envelope) -> Result<(), MacpError> { + if env.message_type == "Commitment" { + if env.sender != session.initiator_sender { + return Err(MacpError::Forbidden); + } + return Ok(()); + } + if !session.participants.is_empty() && !session.participants.contains(&env.sender) { + return Err(MacpError::Forbidden); + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::pb::CommitmentPayload; + use crate::session::SessionState; + use prost::Message; + + fn make_session() -> Session { + Session { + session_id: "s1".into(), + state: SessionState::Open, + ttl_expiry: i64::MAX, + ttl_ms: 60_000, + started_at_unix_ms: 1000, + resolution: None, + mode: "ext.test.v1".into(), + mode_state: vec![], + participants: vec!["alice".into(), "bob".into()], + seen_message_ids: std::collections::HashSet::new(), + intent: String::new(), + mode_version: "1.0.0".into(), + configuration_version: "cfg-1".into(), + policy_version: String::new(), + context: vec![], + roots: vec![], + initiator_sender: "alice".into(), + } + } + + fn make_env(sender: &str, message_type: &str, payload: Vec) -> Envelope { + Envelope { + macp_version: "1.0".into(), + mode: "ext.test.v1".into(), + message_type: message_type.into(), + message_id: "m1".into(), + session_id: "s1".into(), + sender: sender.into(), + timestamp_unix_ms: 1000, + payload, + } + } + + #[test] + fn accepts_any_message_when_no_filter() { + let mode = PassthroughMode { + allowed_message_types: vec![], + }; + let session = make_session(); + let env = make_env("alice", "CustomMessage", b"data".to_vec()); + let result = mode.on_message(&session, &env).unwrap(); + assert!(matches!(result, ModeResponse::PersistState(_))); + } + + #[test] + fn rejects_unlisted_message_type() { + let mode = PassthroughMode { + allowed_message_types: vec!["Allowed".into()], + }; + let session = make_session(); + let env = make_env("alice", "NotAllowed", vec![]); + assert!(mode.on_message(&session, &env).is_err()); + } + + #[test] + fn commitment_resolves_session() { + let mode = PassthroughMode { + allowed_message_types: vec!["Commitment".into()], + }; + let session = make_session(); + let payload = CommitmentPayload { + commitment_id: "c1".into(), + action: "test.done".into(), + authority_scope: "test".into(), + reason: "done".into(), + mode_version: "1.0.0".into(), + policy_version: String::new(), + configuration_version: "cfg-1".into(), + } + .encode_to_vec(); + let env = make_env("alice", "Commitment", payload); + let result = mode.on_message(&session, &env).unwrap(); + assert!(matches!(result, ModeResponse::Resolve(_))); + } + + #[test] + fn non_initiator_commitment_forbidden() { + let mode = PassthroughMode { + allowed_message_types: vec!["Commitment".into()], + }; + let session = make_session(); + let env = make_env("bob", "Commitment", vec![]); + assert!(mode.authorize_sender(&session, &env).is_err()); + } +} diff --git a/src/mode_registry.rs b/src/mode_registry.rs index 1553d2d..de44174 100644 --- a/src/mode_registry.rs +++ b/src/mode_registry.rs @@ -1,12 +1,17 @@ use std::collections::HashMap; +use std::sync::RwLock; +use tokio::sync::broadcast; use crate::mode::decision::DecisionMode; use crate::mode::handoff::HandoffMode; use crate::mode::multi_round::MultiRoundMode; +use crate::mode::passthrough::PassthroughMode; use crate::mode::proposal::ProposalMode; use crate::mode::quorum::QuorumMode; use crate::mode::task::TaskMode; -use crate::mode::{standard_mode_descriptors, Mode, STANDARD_MODE_NAMES}; +use crate::mode::{ + extension_mode_descriptors, standard_mode_descriptors, Mode, STANDARD_MODE_NAMES, +}; use crate::pb::ModeDescriptor; pub struct ModeRegistration { @@ -14,30 +19,34 @@ pub struct ModeRegistration { pub mode: Box, pub descriptor: Option, pub standards_track: bool, + pub builtin: bool, } pub struct ModeRegistry { - entries: HashMap, + entries: RwLock>, + change_tx: broadcast::Sender<()>, } impl ModeRegistry { - /// Build the default registry with all 6 standards-track modes. + /// Build the default registry with 5 standards-track modes and 1 built-in extension. pub fn build_default() -> Self { - let descriptors = standard_mode_descriptors(); - let descriptor_map: HashMap = descriptors + let std_descriptors = standard_mode_descriptors(); + let ext_descriptors = extension_mode_descriptors(); + let mut descriptor_map: HashMap = std_descriptors .into_iter() + .chain(ext_descriptors) .map(|d| (d.mode.clone(), d)) .collect(); let mut entries = HashMap::new(); + // Standards-track modes let standard_modes: Vec<(&str, Box)> = vec![ ("macp.mode.decision.v1", Box::new(DecisionMode)), ("macp.mode.proposal.v1", Box::new(ProposalMode)), ("macp.mode.task.v1", Box::new(TaskMode)), ("macp.mode.handoff.v1", Box::new(HandoffMode)), ("macp.mode.quorum.v1", Box::new(QuorumMode)), - ("macp.mode.multi_round.v1", Box::new(MultiRoundMode)), ]; for (name, mode) in standard_modes { @@ -46,45 +55,270 @@ impl ModeRegistry { ModeRegistration { mode_name: name.to_string(), mode, - descriptor: descriptor_map.get(name).cloned(), + descriptor: descriptor_map.remove(name), standards_track: true, + builtin: true, }, ); } - Self { entries } + // Built-in extension modes + let extension_modes: Vec<(&str, Box)> = + vec![("ext.multi_round.v1", Box::new(MultiRoundMode))]; + + for (name, mode) in extension_modes { + entries.insert( + name.to_string(), + ModeRegistration { + mode_name: name.to_string(), + mode, + descriptor: descriptor_map.remove(name), + standards_track: false, + builtin: true, + }, + ); + } + + let (change_tx, _) = broadcast::channel(16); + Self { + entries: RwLock::new(entries), + change_tx, + } + } + + pub fn get_mode(&self, name: &str) -> Option> { + let guard = self.entries.read().expect("mode registry lock poisoned"); + if guard.contains_key(name) { + Some(ModeRef { + registry: self, + name: name.to_string(), + }) + } else { + None + } } - pub fn get_mode(&self, name: &str) -> Option<&dyn Mode> { - self.entries.get(name).map(|e| e.mode.as_ref()) + /// Execute a mode callback while holding the read lock. + pub fn with_mode(&self, name: &str, f: F) -> Option + where + F: FnOnce(&dyn Mode) -> R, + { + let guard = self.entries.read().expect("mode registry lock poisoned"); + guard.get(name).map(|e| f(e.mode.as_ref())) } pub fn standard_mode_names(&self) -> Vec { + let guard = self.entries.read().expect("mode registry lock poisoned"); STANDARD_MODE_NAMES .iter() - .filter(|name| self.entries.contains_key(**name)) + .filter(|name| guard.contains_key(**name)) .map(|name| (*name).to_string()) .collect() } pub fn standard_mode_descriptors(&self) -> Vec { + let guard = self.entries.read().expect("mode registry lock poisoned"); STANDARD_MODE_NAMES .iter() - .filter_map(|name| self.entries.get(*name).and_then(|e| e.descriptor.clone())) + .filter_map(|name| guard.get(*name).and_then(|e| e.descriptor.clone())) + .collect() + } + + pub fn extension_mode_names(&self) -> Vec { + let guard = self.entries.read().expect("mode registry lock poisoned"); + guard + .iter() + .filter(|(_, e)| !e.standards_track) + .map(|(name, _)| name.clone()) + .collect() + } + + pub fn extension_mode_descriptors(&self) -> Vec { + let guard = self.entries.read().expect("mode registry lock poisoned"); + guard + .iter() + .filter(|(_, e)| !e.standards_track) + .filter_map(|(_, e)| e.descriptor.clone()) + .collect() + } + + pub fn all_mode_names(&self) -> Vec { + let guard = self.entries.read().expect("mode registry lock poisoned"); + guard.keys().cloned().collect() + } + + pub fn all_mode_descriptors(&self) -> Vec { + let guard = self.entries.read().expect("mode registry lock poisoned"); + guard + .values() + .filter_map(|e| e.descriptor.clone()) .collect() } pub fn is_standard_mode(&self, name: &str) -> bool { - self.entries - .get(name) - .map(|e| e.standards_track) - .unwrap_or(false) + let guard = self.entries.read().expect("mode registry lock poisoned"); + guard.get(name).map(|e| e.standards_track).unwrap_or(false) + } + + /// Register a new extension mode dynamically. + pub fn register_extension(&self, descriptor: ModeDescriptor) -> Result<(), String> { + let name = descriptor.mode.clone(); + if name.is_empty() { + return Err("mode name must not be empty".into()); + } + if name.starts_with("macp.mode.") { + return Err("cannot register extension with reserved macp.mode.* namespace".into()); + } + + let allowed_types = descriptor + .message_types + .iter() + .filter(|t| *t != "SessionStart") + .cloned() + .collect(); + let mode: Box = Box::new(PassthroughMode { + allowed_message_types: allowed_types, + }); + + let mut guard = self.entries.write().expect("mode registry lock poisoned"); + if guard.contains_key(&name) { + return Err(format!("mode '{}' is already registered", name)); + } + guard.insert( + name.clone(), + ModeRegistration { + mode_name: name, + mode, + descriptor: Some(descriptor), + standards_track: false, + builtin: false, + }, + ); + drop(guard); + let _ = self.change_tx.send(()); + Ok(()) + } + + /// Unregister a dynamically registered extension mode. + pub fn unregister_extension(&self, mode: &str) -> Result<(), String> { + let mut guard = self.entries.write().expect("mode registry lock poisoned"); + match guard.get(mode) { + None => return Err(format!("mode '{}' not found", mode)), + Some(entry) if entry.builtin => { + return Err(format!("cannot unregister built-in mode '{}'", mode)) + } + Some(entry) if entry.standards_track => { + return Err(format!("cannot unregister standards-track mode '{}'", mode)) + } + _ => {} + } + guard.remove(mode); + drop(guard); + let _ = self.change_tx.send(()); + Ok(()) + } + + /// Promote an extension mode to standards-track. + /// Optionally re-keys the entry with a new identifier. + pub fn promote_mode(&self, mode: &str, new_name: Option<&str>) -> Result { + let mut guard = self.entries.write().expect("mode registry lock poisoned"); + let entry = guard + .get(mode) + .ok_or_else(|| format!("mode '{}' not found", mode))?; + if entry.standards_track { + return Err(format!("mode '{}' is already standards-track", mode)); + } + + let final_name = new_name.unwrap_or(mode).to_string(); + if final_name != mode && guard.contains_key(&final_name) { + return Err(format!( + "cannot promote: target name '{}' already exists", + final_name + )); + } + + // Remove old entry and re-insert with updated flags + let mut registration = guard + .remove(mode) + .ok_or_else(|| format!("mode '{}' not found", mode))?; + registration.standards_track = true; + registration.mode_name = final_name.clone(); + if let Some(ref mut desc) = registration.descriptor { + desc.mode = final_name.clone(); + } + guard.insert(final_name.clone(), registration); + drop(guard); + let _ = self.change_tx.send(()); + Ok(final_name) + } + + /// Subscribe to mode registry change notifications. + pub fn subscribe_changes(&self) -> broadcast::Receiver<()> { + self.change_tx.subscribe() + } +} + +/// A handle that allows calling mode methods while the registry read lock is not held +/// across await points. Callers use `with_mode` or obtain a `ModeRef` and call its methods. +pub struct ModeRef<'a> { + registry: &'a ModeRegistry, + name: String, +} + +impl<'a> ModeRef<'a> { + pub fn on_session_start( + &self, + session: &crate::session::Session, + env: &crate::pb::Envelope, + ) -> Result { + let guard = self + .registry + .entries + .read() + .expect("mode registry lock poisoned"); + let entry = guard + .get(&self.name) + .ok_or(crate::error::MacpError::UnknownMode)?; + entry.mode.on_session_start(session, env) + } + + pub fn on_message( + &self, + session: &crate::session::Session, + env: &crate::pb::Envelope, + ) -> Result { + let guard = self + .registry + .entries + .read() + .expect("mode registry lock poisoned"); + let entry = guard + .get(&self.name) + .ok_or(crate::error::MacpError::UnknownMode)?; + entry.mode.on_message(session, env) + } + + pub fn authorize_sender( + &self, + session: &crate::session::Session, + env: &crate::pb::Envelope, + ) -> Result<(), crate::error::MacpError> { + let guard = self + .registry + .entries + .read() + .expect("mode registry lock poisoned"); + let entry = guard + .get(&self.name) + .ok_or(crate::error::MacpError::UnknownMode)?; + entry.mode.authorize_sender(session, env) } } #[cfg(test)] mod tests { use super::*; + use crate::mode::EXTENSION_MODE_NAMES; #[test] fn build_default_contains_all_standard_modes() { @@ -96,22 +330,44 @@ mod tests { } #[test] - fn build_default_contains_multi_round_as_standard() { + fn build_default_contains_multi_round_as_extension() { + let registry = ModeRegistry::build_default(); + assert!(registry.get_mode("ext.multi_round.v1").is_some()); + assert!(!registry.is_standard_mode("ext.multi_round.v1")); + } + + #[test] + fn standard_mode_names_returns_five() { let registry = ModeRegistry::build_default(); - assert!(registry.get_mode("macp.mode.multi_round.v1").is_some()); - assert!(registry.is_standard_mode("macp.mode.multi_round.v1")); + assert_eq!(registry.standard_mode_names().len(), 5); } #[test] - fn standard_mode_names_returns_six() { + fn standard_mode_descriptors_returns_five() { let registry = ModeRegistry::build_default(); - assert_eq!(registry.standard_mode_names().len(), 6); + assert_eq!(registry.standard_mode_descriptors().len(), 5); } #[test] - fn standard_mode_descriptors_returns_six() { + fn all_mode_names_returns_six() { let registry = ModeRegistry::build_default(); - assert_eq!(registry.standard_mode_descriptors().len(), 6); + assert_eq!(registry.all_mode_names().len(), 6); + } + + #[test] + fn extension_mode_names_returns_one() { + let registry = ModeRegistry::build_default(); + let ext = registry.extension_mode_names(); + assert_eq!(ext.len(), 1); + assert!(ext.contains(&"ext.multi_round.v1".to_string())); + } + + #[test] + fn extension_mode_descriptors_returns_one() { + let registry = ModeRegistry::build_default(); + let descs = registry.extension_mode_descriptors(); + assert_eq!(descs.len(), 1); + assert_eq!(descs[0].mode, "ext.multi_round.v1"); } #[test] @@ -120,4 +376,127 @@ mod tests { assert!(registry.get_mode("nonexistent").is_none()); assert!(!registry.is_standard_mode("nonexistent")); } + + #[test] + fn register_extension_mode() { + let registry = ModeRegistry::build_default(); + let descriptor = ModeDescriptor { + mode: "ext.custom.v1".into(), + mode_version: "1.0.0".into(), + title: "Custom Mode".into(), + description: "Test custom mode".into(), + message_types: vec![ + "SessionStart".into(), + "CustomMsg".into(), + "Commitment".into(), + ], + terminal_message_types: vec!["Commitment".into()], + ..Default::default() + }; + registry.register_extension(descriptor).unwrap(); + assert!(registry.get_mode("ext.custom.v1").is_some()); + assert!(!registry.is_standard_mode("ext.custom.v1")); + assert_eq!(registry.all_mode_names().len(), 7); + assert_eq!(registry.extension_mode_names().len(), 2); + } + + #[test] + fn register_rejects_macp_namespace() { + let registry = ModeRegistry::build_default(); + let descriptor = ModeDescriptor { + mode: "macp.mode.evil.v1".into(), + ..Default::default() + }; + assert!(registry.register_extension(descriptor).is_err()); + } + + #[test] + fn register_rejects_duplicate() { + let registry = ModeRegistry::build_default(); + let descriptor = ModeDescriptor { + mode: "ext.multi_round.v1".into(), + ..Default::default() + }; + assert!(registry.register_extension(descriptor).is_err()); + } + + #[test] + fn unregister_extension_mode() { + let registry = ModeRegistry::build_default(); + let descriptor = ModeDescriptor { + mode: "ext.temp.v1".into(), + mode_version: "1.0.0".into(), + message_types: vec!["SessionStart".into(), "Commitment".into()], + ..Default::default() + }; + registry.register_extension(descriptor).unwrap(); + assert_eq!(registry.all_mode_names().len(), 7); + registry.unregister_extension("ext.temp.v1").unwrap(); + assert_eq!(registry.all_mode_names().len(), 6); + assert!(registry.get_mode("ext.temp.v1").is_none()); + } + + #[test] + fn cannot_unregister_builtin() { + let registry = ModeRegistry::build_default(); + assert!(registry.unregister_extension("ext.multi_round.v1").is_err()); + assert!(registry + .unregister_extension("macp.mode.decision.v1") + .is_err()); + } + + #[test] + fn promote_extension_to_standard() { + let registry = ModeRegistry::build_default(); + let descriptor = ModeDescriptor { + mode: "ext.new.v1".into(), + mode_version: "1.0.0".into(), + message_types: vec!["SessionStart".into(), "Commitment".into()], + ..Default::default() + }; + registry.register_extension(descriptor).unwrap(); + assert!(!registry.is_standard_mode("ext.new.v1")); + + let final_name = registry + .promote_mode("ext.new.v1", Some("macp.mode.new.v1")) + .unwrap(); + assert_eq!(final_name, "macp.mode.new.v1"); + assert!(registry.is_standard_mode("macp.mode.new.v1")); + assert!(registry.get_mode("ext.new.v1").is_none()); + assert!(registry.get_mode("macp.mode.new.v1").is_some()); + } + + #[test] + fn promote_without_rename() { + let registry = ModeRegistry::build_default(); + let descriptor = ModeDescriptor { + mode: "ext.keep.v1".into(), + mode_version: "1.0.0".into(), + message_types: vec!["SessionStart".into(), "Commitment".into()], + ..Default::default() + }; + registry.register_extension(descriptor).unwrap(); + let final_name = registry.promote_mode("ext.keep.v1", None).unwrap(); + assert_eq!(final_name, "ext.keep.v1"); + assert!(registry.is_standard_mode("ext.keep.v1")); + } + + #[test] + fn promote_already_standard_fails() { + let registry = ModeRegistry::build_default(); + assert!(registry + .promote_mode("macp.mode.decision.v1", None) + .is_err()); + } + + #[test] + fn extension_names_constant_matches() { + for name in EXTENSION_MODE_NAMES { + let registry = ModeRegistry::build_default(); + assert!( + registry.get_mode(name).is_some(), + "EXTENSION_MODE_NAMES entry missing from registry: {name}" + ); + } + } } diff --git a/src/replay.rs b/src/replay.rs index 036c567..ddc7e04 100644 --- a/src/replay.rs +++ b/src/replay.rs @@ -3,8 +3,8 @@ use crate::log_store::{EntryKind, LogEntry}; use crate::mode_registry::ModeRegistry; use crate::pb::Envelope; use crate::session::{ - extract_ttl_ms, is_standard_mode, parse_session_start_payload, - validate_standard_session_start_payload, Session, SessionState, + extract_ttl_ms, parse_session_start_payload, requires_strict_session_start, + validate_strict_session_start_payload, Session, SessionState, }; /// Rebuild a `Session` from its append-only log. @@ -35,14 +35,15 @@ pub fn replay_session( let mode = registry.get_mode(mode_name).ok_or(MacpError::UnknownMode)?; // 2. Parse SessionStartPayload - let start_payload = if start_entry.raw_payload.is_empty() && !is_standard_mode(mode_name) { - crate::pb::SessionStartPayload::default() - } else { - parse_session_start_payload(&start_entry.raw_payload)? - }; - validate_standard_session_start_payload(mode_name, &start_payload)?; + let start_payload = + if start_entry.raw_payload.is_empty() && !requires_strict_session_start(mode_name) { + crate::pb::SessionStartPayload::default() + } else { + parse_session_start_payload(&start_entry.raw_payload)? + }; + validate_strict_session_start_payload(mode_name, &start_payload)?; - let ttl_ms = if !is_standard_mode(mode_name) && start_payload.ttl_ms == 0 { + let ttl_ms = if !requires_strict_session_start(mode_name) && start_payload.ttl_ms == 0 { // Legacy experimental modes may have 0 ttl_ms 60_000i64 } else { diff --git a/src/runtime.rs b/src/runtime.rs index bbf14d9..5b630dc 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -9,7 +9,7 @@ use crate::pb::{Envelope, ModeDescriptor}; use crate::registry::SessionRegistry; use crate::session::{ extract_ttl_ms, parse_session_start_payload, validate_session_id_for_acceptance, - validate_standard_session_start_payload, Session, SessionState, + validate_strict_session_start_payload, Session, SessionState, }; use crate::storage::StorageBackend; use crate::stream_bus::SessionStreamBus; @@ -59,14 +59,38 @@ impl Runtime { } } + /// Returns all mode names the runtime can handle (standards-track + extensions). + /// Used by Initialize and GetManifest to advertise full capability. pub fn registered_mode_names(&self) -> Vec { - self.mode_registry.standard_mode_names() + self.mode_registry.all_mode_names() } + /// Returns only standards-track mode descriptors for ListModes. pub fn standard_mode_descriptors(&self) -> Vec { self.mode_registry.standard_mode_descriptors() } + /// Returns only extension mode descriptors for ListExtModes. + pub fn extension_mode_descriptors(&self) -> Vec { + self.mode_registry.extension_mode_descriptors() + } + + pub fn register_extension(&self, descriptor: ModeDescriptor) -> Result<(), String> { + self.mode_registry.register_extension(descriptor) + } + + pub fn unregister_extension(&self, mode: &str) -> Result<(), String> { + self.mode_registry.unregister_extension(mode) + } + + pub fn promote_mode(&self, mode: &str, new_name: Option<&str>) -> Result { + self.mode_registry.promote_mode(mode, new_name) + } + + pub fn subscribe_mode_changes(&self) -> tokio::sync::broadcast::Receiver<()> { + self.mode_registry.subscribe_changes() + } + pub fn mode_registry(&self) -> &Arc { &self.mode_registry } @@ -180,7 +204,7 @@ impl Runtime { .ok_or(MacpError::UnknownMode)?; let start_payload = parse_session_start_payload(&env.payload)?; - validate_standard_session_start_payload(mode_name, &start_payload)?; + validate_strict_session_start_payload(mode_name, &start_payload)?; let ttl_ms = extract_ttl_ms(&start_payload)?; let mut guard = self.registry.sessions.write().await; @@ -630,7 +654,7 @@ mod tests { let err = rt .process( &env( - "macp.mode.multi_round.v1", + "ext.multi_round.v1", "SessionStart", "m1", &sid, @@ -654,7 +678,7 @@ mod tests { let payload = session_start(vec!["alice".into(), "bob".into()]); rt.process( &env( - "macp.mode.multi_round.v1", + "ext.multi_round.v1", "SessionStart", "m1", &sid, @@ -666,7 +690,7 @@ mod tests { .await .unwrap(); let session = rt.get_session_checked(&sid).await.unwrap(); - assert_eq!(session.mode, "macp.mode.multi_round.v1"); + assert_eq!(session.mode, "ext.multi_round.v1"); assert_eq!(session.participants, vec!["alice", "bob"]); } diff --git a/src/server.rs b/src/server.rs index 75f67d6..b256f47 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,12 +3,14 @@ use macp_runtime::pb::macp_runtime_service_server::MacpRuntimeService; use macp_runtime::pb::{ Ack, CancelSessionRequest, CancelSessionResponse, CancellationCapability, Capabilities, Envelope, GetManifestRequest, GetManifestResponse, GetSessionRequest, GetSessionResponse, - InitializeRequest, InitializeResponse, ListModesRequest, ListModesResponse, ListRootsRequest, - ListRootsResponse, MacpError as PbMacpError, ManifestCapability, ModeRegistryCapability, - ProgressCapability, RootsCapability, RuntimeInfo, SendRequest, SendResponse, SessionMetadata, + InitializeRequest, InitializeResponse, ListExtModesRequest, ListExtModesResponse, + ListModesRequest, ListModesResponse, ListRootsRequest, ListRootsResponse, + MacpError as PbMacpError, ManifestCapability, ModeRegistryCapability, ProgressCapability, + PromoteModeRequest, PromoteModeResponse, RegisterExtModeRequest, RegisterExtModeResponse, + RootsCapability, RuntimeInfo, SendRequest, SendResponse, SessionMetadata, SessionState as PbSessionState, SessionsCapability, StreamSessionRequest, - StreamSessionResponse, WatchModeRegistryRequest, WatchModeRegistryResponse, WatchRootsRequest, - WatchRootsResponse, + StreamSessionResponse, UnregisterExtModeRequest, UnregisterExtModeResponse, + WatchModeRegistryRequest, WatchModeRegistryResponse, WatchRootsRequest, WatchRootsResponse, }; use macp_runtime::runtime::Runtime; use macp_runtime::security::{AuthIdentity, SecurityLayer}; @@ -411,7 +413,11 @@ impl MacpRuntimeService for MacpServer { list_roots: true, list_changed: true, }), - experimental: None, + experimental: Some(macp_runtime::pb::ExperimentalCapabilities { + features: HashMap::from([ + ("ext_mode_lifecycle".into(), "true".into()), + ]), + }), }), supported_modes: self.runtime.registered_mode_names(), instructions: "Authenticate requests with Authorization: Bearer . Use the unary Send RPC for all session messaging. For local development only, x-macp-agent-id may be enabled by configuration.".into(), @@ -605,16 +611,24 @@ impl MacpRuntimeService for MacpServer { &self, _request: Request, ) -> Result, Status> { - let initial = WatchModeRegistryResponse { - change: Some(macp_runtime::pb::RegistryChanged { - registry: "modes".into(), - observed_at_unix_ms: chrono::Utc::now().timestamp_millis(), - }), - }; + let mut rx = self.runtime.subscribe_mode_changes(); let stream = async_stream::try_stream! { - yield initial; - // Modes are static at runtime — keep the stream open but idle. - std::future::pending::<()>().await; + // Send initial state + yield WatchModeRegistryResponse { + change: Some(macp_runtime::pb::RegistryChanged { + registry: "modes".into(), + observed_at_unix_ms: chrono::Utc::now().timestamp_millis(), + }), + }; + // Wait for changes from register/unregister/promote + while rx.recv().await.is_ok() { + yield WatchModeRegistryResponse { + change: Some(macp_runtime::pb::RegistryChanged { + registry: "modes".into(), + observed_at_unix_ms: chrono::Utc::now().timestamp_millis(), + }), + }; + } }; Ok(Response::new(Box::pin(stream))) } @@ -639,6 +653,78 @@ impl MacpRuntimeService for MacpServer { }; Ok(Response::new(Box::pin(stream))) } + + // Extension mode lifecycle RPCs + + async fn list_ext_modes( + &self, + _request: Request, + ) -> Result, Status> { + Ok(Response::new(ListExtModesResponse { + modes: self.runtime.extension_mode_descriptors(), + })) + } + + async fn register_ext_mode( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + let descriptor = req + .descriptor + .ok_or_else(|| Status::invalid_argument("descriptor required"))?; + match self.runtime.register_extension(descriptor) { + Ok(()) => Ok(Response::new(RegisterExtModeResponse { + ok: true, + error: String::new(), + })), + Err(e) => Ok(Response::new(RegisterExtModeResponse { + ok: false, + error: e, + })), + } + } + + async fn unregister_ext_mode( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + match self.runtime.unregister_extension(&req.mode) { + Ok(()) => Ok(Response::new(UnregisterExtModeResponse { + ok: true, + error: String::new(), + })), + Err(e) => Ok(Response::new(UnregisterExtModeResponse { + ok: false, + error: e, + })), + } + } + + async fn promote_mode( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + let new_name = if req.promoted_mode_name.is_empty() { + None + } else { + Some(req.promoted_mode_name.as_str()) + }; + match self.runtime.promote_mode(&req.mode, new_name) { + Ok(final_name) => Ok(Response::new(PromoteModeResponse { + ok: true, + error: String::new(), + mode: final_name, + })), + Err(e) => Ok(Response::new(PromoteModeResponse { + ok: false, + error: e, + mode: String::new(), + })), + } + } } #[cfg(test)] @@ -861,13 +947,47 @@ mod tests { .iter() .map(|m| m.mode.clone()) .collect(); - assert_eq!(names.len(), 6); + assert_eq!(names.len(), 5); assert!(names.contains(&"macp.mode.decision.v1".to_string())); assert!(names.contains(&"macp.mode.proposal.v1".to_string())); assert!(names.contains(&"macp.mode.task.v1".to_string())); assert!(names.contains(&"macp.mode.handoff.v1".to_string())); assert!(names.contains(&"macp.mode.quorum.v1".to_string())); - assert!(names.contains(&"macp.mode.multi_round.v1".to_string())); + // multi_round is now an extension, not in ListModes + assert!(!names.contains(&"ext.multi_round.v1".to_string())); + } + + #[tokio::test] + async fn list_ext_modes_returns_extensions() { + let (server, _) = make_server(); + let resp = server + .list_ext_modes(Request::new(ListExtModesRequest {})) + .await + .unwrap(); + let names: Vec = resp + .into_inner() + .modes + .iter() + .map(|m| m.mode.clone()) + .collect(); + assert_eq!(names.len(), 1); + assert!(names.contains(&"ext.multi_round.v1".to_string())); + } + + #[tokio::test] + async fn get_manifest_includes_all_modes() { + let (server, _) = make_server(); + let resp = server + .get_manifest(Request::new(macp_runtime::pb::GetManifestRequest { + agent_id: String::new(), + })) + .await + .unwrap(); + let manifest = resp.into_inner().manifest.unwrap(); + assert_eq!(manifest.supported_modes.len(), 6); + assert!(manifest + .supported_modes + .contains(&"ext.multi_round.v1".to_string())); } #[tokio::test] diff --git a/src/session.rs b/src/session.rs index 7a7eceb..957ecf8 100644 --- a/src/session.rs +++ b/src/session.rs @@ -52,7 +52,7 @@ impl Session { } } -pub fn is_standard_mode(mode: &str) -> bool { +pub fn requires_strict_session_start(mode: &str) -> bool { matches!( mode, "macp.mode.decision.v1" @@ -60,7 +60,7 @@ pub fn is_standard_mode(mode: &str) -> bool { | "macp.mode.task.v1" | "macp.mode.handoff.v1" | "macp.mode.quorum.v1" - | "macp.mode.multi_round.v1" + | "ext.multi_round.v1" ) } @@ -80,12 +80,12 @@ pub fn extract_ttl_ms(payload: &SessionStartPayload) -> Result { Ok(payload.ttl_ms) } -/// Enforce the canonical SessionStart binding contract for standards-track modes. -pub fn validate_standard_session_start_payload( +/// Enforce the strict SessionStart binding contract for standards-track and qualifying extension modes. +pub fn validate_strict_session_start_payload( mode: &str, payload: &SessionStartPayload, ) -> Result<(), MacpError> { - if !is_standard_mode(mode) { + if !requires_strict_session_start(mode) { return Ok(()); } @@ -208,7 +208,7 @@ mod tests { ..Default::default() }; assert_eq!( - validate_standard_session_start_payload("macp.mode.decision.v1", &payload) + validate_strict_session_start_payload("macp.mode.decision.v1", &payload) .unwrap_err() .to_string(), "InvalidPayload" @@ -222,7 +222,7 @@ mod tests { ..Default::default() }; assert_eq!( - validate_standard_session_start_payload("macp.mode.decision.v1", &payload) + validate_strict_session_start_payload("macp.mode.decision.v1", &payload) .unwrap_err() .to_string(), "InvalidPayload" @@ -239,7 +239,7 @@ mod tests { ..Default::default() }; assert_eq!( - validate_standard_session_start_payload("macp.mode.proposal.v1", &payload) + validate_strict_session_start_payload("macp.mode.proposal.v1", &payload) .unwrap_err() .to_string(), "InvalidPayload" @@ -247,11 +247,9 @@ mod tests { } #[test] - fn multi_round_now_requires_standard_validation() { + fn multi_round_requires_strict_session_start() { let payload = SessionStartPayload::default(); - assert!( - validate_standard_session_start_payload("macp.mode.multi_round.v1", &payload).is_err() - ); + assert!(validate_strict_session_start_payload("ext.multi_round.v1", &payload).is_err()); } #[test] diff --git a/tests/concurrent_messages.rs b/tests/concurrent_messages.rs index 8e231f3..a1f327b 100644 --- a/tests/concurrent_messages.rs +++ b/tests/concurrent_messages.rs @@ -55,7 +55,7 @@ fn envelope( async fn concurrent_contributes_all_accepted() { let rt = make_runtime(); let sid = new_sid(); - let mode = "macp.mode.multi_round.v1"; + let mode = "ext.multi_round.v1"; // Create participants let n = 10; @@ -87,7 +87,7 @@ async fn concurrent_contributes_all_accepted() { .into_bytes(); rt.process( &envelope( - "macp.mode.multi_round.v1", + "ext.multi_round.v1", "Contribute", &format!("m{}", i + 1), &sid, diff --git a/tests/conformance/multi_round_happy_path.json b/tests/conformance/multi_round_happy_path.json index 3a03b2d..4296f52 100644 --- a/tests/conformance/multi_round_happy_path.json +++ b/tests/conformance/multi_round_happy_path.json @@ -1,5 +1,5 @@ { - "mode": "macp.mode.multi_round.v1", + "mode": "ext.multi_round.v1", "initiator": "agent://coordinator", "participants": ["agent://alice", "agent://bob"], "mode_version": "1.0.0", diff --git a/tests/conformance/multi_round_reject_paths.json b/tests/conformance/multi_round_reject_paths.json index ba15e0b..e916c1e 100644 --- a/tests/conformance/multi_round_reject_paths.json +++ b/tests/conformance/multi_round_reject_paths.json @@ -1,5 +1,5 @@ { - "mode": "macp.mode.multi_round.v1", + "mode": "ext.multi_round.v1", "initiator": "agent://coordinator", "participants": ["agent://alice", "agent://bob"], "mode_version": "1.0.0", diff --git a/tests/integration_mode_lifecycle.rs b/tests/integration_mode_lifecycle.rs index eb01912..08879b7 100644 --- a/tests/integration_mode_lifecycle.rs +++ b/tests/integration_mode_lifecycle.rs @@ -449,7 +449,7 @@ async fn quorum_full_lifecycle_through_runtime() { async fn multi_round_full_lifecycle_through_runtime() { let rt = make_runtime(); let sid = new_sid(); - let mode = "macp.mode.multi_round.v1"; + let mode = "ext.multi_round.v1"; rt.process( &envelope( diff --git a/tests/replay_round_trip.rs b/tests/replay_round_trip.rs index 167fba7..e7df68d 100644 --- a/tests/replay_round_trip.rs +++ b/tests/replay_round_trip.rs @@ -334,7 +334,7 @@ fn replay_quorum_session() { #[test] fn replay_multi_round_session() { let registry = make_registry(); - let mode = "macp.mode.multi_round.v1"; + let mode = "ext.multi_round.v1"; let entries = vec![ incoming(