diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4324272..eb0f532 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -206,10 +206,67 @@ jobs: fi echo "All proto files match BSR." + audit: + name: Security Audit + runs-on: ubuntu-latest + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@stable + + - name: Install cargo-audit + run: cargo install cargo-audit + + - name: Run cargo audit + run: cargo audit + + coverage: + name: Coverage + runs-on: ubuntu-latest + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@stable + + - name: Cache cargo registry and build + uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} + restore-keys: ${{ runner.os }}-cargo- + + - name: Install protoc + run: | + sudo apt-get update + sudo apt-get install -y protobuf-compiler + + - name: Install cargo-tarpaulin + run: cargo install cargo-tarpaulin + + - name: Generate coverage + run: cargo tarpaulin --all-targets --out xml + env: + MACP_MEMORY_ONLY: "1" + + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v4 + with: + files: cobertura.xml + fail_ci_if_error: false + ci-pass: name: All Checks Passed runs-on: ubuntu-latest - needs: [check, fmt, clippy, test, build, lint-protobuf, proto-sync] + needs: [check, fmt, clippy, test, build, lint-protobuf, proto-sync, audit] steps: - name: Summary @@ -222,3 +279,4 @@ jobs: echo " - cargo build --release" echo " - protobuf lint" echo " - proto sync check" + echo " - cargo audit" diff --git a/Makefile b/Makefile index 6d0ecbe..0c6ed47 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: setup build test fmt clippy check sync-protos sync-protos-local check-protos +.PHONY: setup build test test-integration test-conformance test-all fmt clippy check audit coverage sync-protos sync-protos-local check-protos SPEC_PROTO_DIR := ../multiagentcoordinationprotocol/schemas/proto PROTO_FILES := macp/v1/envelope.proto macp/v1/core.proto macp/modes/decision/v1/decision.proto macp/modes/proposal/v1/proposal.proto macp/modes/task/v1/task.proto macp/modes/handoff/v1/handoff.proto macp/modes/quorum/v1/quorum.proto @@ -20,6 +20,20 @@ fmt: clippy: cargo clippy --all-targets -- -D warnings +test-integration: + cargo test --test '*' + +test-conformance: + cargo test conformance + +test-all: fmt clippy test test-integration test-conformance + +coverage: + cargo tarpaulin --all-targets --out html + +audit: + cargo audit + check: fmt clippy test ## Pull latest proto files from BSR diff --git a/README.md b/README.md index 08c7802..02f7fc9 100644 --- a/README.md +++ b/README.md @@ -190,27 +190,74 @@ cargo run --bin fuzz_client | `WatchModeRegistry` | unimplemented | | `WatchRoots` | unimplemented | +## Architecture + +``` +Client Request + | + [Transport/gRPC] -- server.rs, security.rs + | + [Coordination Kernel] -- runtime.rs + | + [Mode Registry] -- mode_registry.rs + | \ + [Mode Logic] [Discovery] + mode/*.rs ListModes, GetManifest + | + [Storage Layer] -- storage.rs, log_store.rs + | + [Replay] -- replay.rs +``` + +See `docs/architecture.md` for detailed layer descriptions. + ## Project structure ```text runtime/ -├── proto/ # protobuf schemas copied from the RFC/spec repository +├── proto/ # protobuf schemas copied from the RFC/spec repository ├── src/ -│ ├── main.rs # server startup, TLS, persistence, auth wiring -│ ├── server.rs # gRPC adapter and request authentication -│ ├── runtime.rs # coordination kernel and mode dispatch -│ ├── security.rs # auth config, sender derivation, rate limiting -│ ├── session.rs # canonical SessionStart validation and session model -│ ├── registry.rs # session store with optional persistence -│ ├── log_store.rs # in-memory accepted-history log cache -│ ├── storage.rs # storage backend trait, FileBackend persistence, crash recovery -│ ├── replay.rs # session rebuild from append-only log -│ ├── mode/ # mode implementations -│ └── bin/ # local development example clients +│ ├── main.rs # server startup, TLS, persistence, auth wiring +│ ├── server.rs # gRPC adapter and request authentication +│ ├── runtime.rs # coordination kernel and mode dispatch +│ ├── mode_registry.rs # single source of truth for mode registration +│ ├── security.rs # auth config, sender derivation, rate limiting +│ ├── session.rs # canonical SessionStart validation and session model +│ ├── registry.rs # session store with optional persistence +│ ├── log_store.rs # in-memory accepted-history log cache +│ ├── storage.rs # storage backend trait, FileBackend, crash recovery +│ ├── replay.rs # session rebuild from append-only log +│ ├── mode/ # mode implementations +│ └── bin/ # local development example clients +├── tests/ +│ ├── integration_mode_lifecycle.rs # full-stack integration tests +│ ├── replay_round_trip.rs # replay tests for all 5 modes +│ ├── conformance_loader.rs # JSON fixture runner +│ └── conformance/ # per-mode conformance fixtures ├── docs/ └── build.rs ``` +## Troubleshooting + +**TLS required error on startup** +Set `MACP_ALLOW_INSECURE=1` for local development, or provide `MACP_TLS_CERT_PATH` and `MACP_TLS_KEY_PATH` for production. + +**`InvalidSessionId` error** +Session IDs must be UUID v4/v7 in hyphenated lowercase form (36 chars) or base64url tokens (22+ chars). Short or human-readable IDs like `"s1"` or `"my-session"` are rejected. + +**`InvalidPayload` on `SessionStart`** +For standards-track modes, `SessionStartPayload` must include non-empty `participants`, `mode_version`, `configuration_version`, and a positive `ttl_ms`. Empty payloads are rejected. + +**`Forbidden` error** +Check that the sender identity matches the session's participant list. For `Commitment` messages, only the session initiator is authorized. Verify your bearer token maps to the correct sender. + +**`StorageFailed` error** +The runtime requires write access to `MACP_DATA_DIR`. Check directory permissions. Log append failures are fatal — the runtime will not acknowledge a message without a durable record. + +**Proto drift / `make check-protos` failure** +Run `make sync-protos` to update local proto files from BSR. + ## Development notes - The RFC/spec repository remains the normative source for protocol semantics. diff --git a/docs/README.md b/docs/README.md index dfefa44..8bf88ef 100644 --- a/docs/README.md +++ b/docs/README.md @@ -84,4 +84,5 @@ export MACP_MEMORY_ONLY=1 - `../README.md` — root-level quick start and configuration reference - `examples.md` — updated local-development examples and canonical message patterns - `protocol.md` — implementation notes and protocol surface summary -- `architecture.md` — runtime component layout +- `architecture.md` — runtime component layout and mode registry design +- `deployment.md` — production deployment guide, container notes, and environment reference diff --git a/docs/architecture.md b/docs/architecture.md index 085e87a..5e07086 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -23,7 +23,26 @@ Responsibilities: - enforce lazy TTL expiry on reads and writes - persist updated session snapshots -## 3. Mode layer (`src/mode/*`) +## 3. Mode Registry (`src/mode_registry.rs`) + +The `ModeRegistry` is the single source of truth for mode dispatch, replay, and discovery. It eliminates the previous pattern of hardcoded mode maps in `runtime.rs`, `main.rs`, and `replay.rs`. + +Responsibilities: + +- register all mode implementations (standards-track and experimental) +- provide mode lookup for dispatch and replay +- provide standards-track mode names for `ListModes` +- provide mode descriptors for `ListModes` and `GetManifest` +- classify modes as standards-track or experimental + +Key methods: + +- `build_default()` — constructs the canonical mode set +- `get_mode(name)` — mode lookup for dispatch +- `standard_mode_names()` — drives `ListModes` and `GetManifest` +- `standard_mode_descriptors()` — drives `ListModes` response + +## 4. Mode layer (`src/mode/*`) Responsibilities: @@ -34,14 +53,14 @@ Responsibilities: Implemented modes: -- Decision -- Proposal -- Task -- Handoff -- Quorum -- MultiRound (experimental) +- Decision — enforced phase transitions (Proposal -> Evaluation -> Voting -> Committed) +- Proposal — negotiation with counterproposals, acceptance convergence, terminal rejections +- Task — delegated task with serial assignment, progress tracking, terminal reports +- Handoff — serial handoff offers with accept/decline disposition +- Quorum — threshold approval with ballots +- MultiRound (experimental) — iterative value convergence -## 4. Storage layer +## 5. Storage layer ### Storage backend (`src/storage.rs`) @@ -71,7 +90,7 @@ In-memory cache of accepted-history logs. Stores: On-disk persistence is handled by `FileBackend`, not by LogStore. -## 5. Security layer (`src/security.rs`) +## 6. Security layer (`src/security.rs`) Responsibilities: @@ -81,6 +100,25 @@ Responsibilities: - enforce session-start policy - enforce per-sender rate limits +## Architecture diagram + +``` +Client Request + | + [Transport/gRPC] -- server.rs, security.rs + | + [Coordination Kernel] -- runtime.rs + | + [Mode Registry] -- mode_registry.rs + | \ + [Mode Logic] [Discovery] + mode/*.rs ListModes, GetManifest + | + [Storage Layer] -- storage.rs, log_store.rs + | + [Replay] -- replay.rs +``` + ## Request path summary 1. gRPC request arrives in `MacpServer` diff --git a/docs/deployment.md b/docs/deployment.md new file mode 100644 index 0000000..9b6b675 --- /dev/null +++ b/docs/deployment.md @@ -0,0 +1,76 @@ +# Deployment Guide + +## Production checklist + +1. **TLS certificates** — Set `MACP_TLS_CERT_PATH` and `MACP_TLS_KEY_PATH` to valid PEM files +2. **Auth tokens** — Create a `tokens.json` file mapping bearer tokens to agent identities and set `MACP_AUTH_TOKENS_FILE` +3. **Data directory** — Ensure `MACP_DATA_DIR` points to a directory with write permissions +4. **Bind address** — Set `MACP_BIND_ADDR` to the desired listen address (default: `127.0.0.1:50051`) + +## Environment variable reference + +| Variable | Required | Default | Description | +|---|---|---|---| +| `MACP_BIND_ADDR` | No | `127.0.0.1:50051` | gRPC listen address | +| `MACP_TLS_CERT_PATH` | Yes* | — | Path to TLS certificate PEM | +| `MACP_TLS_KEY_PATH` | Yes* | — | Path to TLS private key PEM | +| `MACP_AUTH_TOKENS_FILE` | No | — | Path to `tokens.json` for bearer token auth | +| `MACP_DATA_DIR` | No | `.macp-data` | Directory for session persistence | +| `MACP_MEMORY_ONLY` | No | — | Set to `1` to disable persistence | +| `MACP_ALLOW_INSECURE` | No | — | Set to `1` to allow plaintext (dev only) | +| `MACP_ALLOW_DEV_SENDER_HEADER` | No | — | Set to `1` to trust `x-macp-sender` header (dev only) | +| `MACP_MAX_OPEN_SESSIONS` | No | — | Per-initiator open session limit | +| `MACP_MAX_PAYLOAD_BYTES` | No | `1048576` | Maximum envelope payload size | + +*TLS is required unless `MACP_ALLOW_INSECURE=1`. + +## Persistence and crash recovery + +When `MACP_MEMORY_ONLY` is not set: + +- Each session gets a directory under `MACP_DATA_DIR/sessions//` +- An append-only `log.jsonl` records every accepted message and internal event +- A `session.json` snapshot is written on each state change (best-effort) +- On startup, all sessions are rebuilt from their `log.jsonl` files via `replay_session()` +- Log append failures are **fatal** — the runtime rejects the message rather than acknowledging without a durable record +- Atomic writes (tmp file + rename) prevent partial-write corruption + +## Monitoring + +- **stderr warnings** — Failed persistence operations, replay errors, and recovered session counts are logged to stderr +- **Session counts** — On startup, the runtime prints the number of replayed sessions +- **TTL expiry** — Sessions are lazily expired on next read or write; no background reaper +- **Rate limiting** — Per-sender rate limits for `SessionStart` and in-session messages + +## Container deployment + +```dockerfile +FROM rust:1.85 AS builder +WORKDIR /app +COPY . . +RUN apt-get update && apt-get install -y protobuf-compiler +RUN cargo build --release + +FROM debian:bookworm-slim +RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/* +COPY --from=builder /app/target/release/macp-runtime /usr/local/bin/ +EXPOSE 50051 +VOLUME /data +ENV MACP_DATA_DIR=/data +CMD ["macp-runtime"] +``` + +Key considerations: + +- Mount a persistent volume at `MACP_DATA_DIR` for session durability +- Expose port 50051 (or the configured `MACP_BIND_ADDR` port) +- Provide TLS certificates via mounted secrets +- Set `MACP_AUTH_TOKENS_FILE` to a mounted secrets file for production auth + +## Dev tool prerequisites + +For development, install these additional tools: + +- `cargo-tarpaulin` — Coverage reporting (`cargo install cargo-tarpaulin`) +- `cargo-audit` — Dependency security auditing (`cargo install cargo-audit`) +- `buf` — Protocol buffer tooling (for proto sync and lint) diff --git a/src/lib.rs b/src/lib.rs index 586697e..8060557 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,6 +25,7 @@ pub mod quorum_pb { pub mod error; pub mod log_store; pub mod mode; +pub mod mode_registry; pub mod registry; pub mod replay; pub mod runtime; diff --git a/src/main.rs b/src/main.rs index 3843256..584b9b6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ mod server; use macp_runtime::log_store::LogStore; +use macp_runtime::mode_registry::ModeRegistry; use macp_runtime::pb; use macp_runtime::registry::SessionRegistry; use macp_runtime::replay::replay_session; @@ -36,37 +37,9 @@ async fn main() -> Result<(), Box> { // Load persisted state into in-memory caches let registry = Arc::new(SessionRegistry::new()); let log_store = Arc::new(LogStore::new()); + let mode_registry = Arc::new(ModeRegistry::build_default()); if !memory_only { - // Build mode map for replay - use macp_runtime::mode::Mode; - use std::collections::HashMap; - let mut modes: HashMap> = HashMap::new(); - modes.insert( - "macp.mode.decision.v1".into(), - Box::new(macp_runtime::mode::decision::DecisionMode), - ); - modes.insert( - "macp.mode.proposal.v1".into(), - Box::new(macp_runtime::mode::proposal::ProposalMode), - ); - modes.insert( - "macp.mode.task.v1".into(), - Box::new(macp_runtime::mode::task::TaskMode), - ); - modes.insert( - "macp.mode.handoff.v1".into(), - Box::new(macp_runtime::mode::handoff::HandoffMode), - ); - modes.insert( - "macp.mode.quorum.v1".into(), - Box::new(macp_runtime::mode::quorum::QuorumMode), - ); - modes.insert( - "macp.mode.multi_round.v1".into(), - Box::new(macp_runtime::mode::multi_round::MultiRoundMode), - ); - // Enumerate session directories and replay from logs let sessions_dir = data_dir.join("sessions"); let mut recovered = 0usize; @@ -82,7 +55,7 @@ async fn main() -> Result<(), Box> { continue; } - match replay_session(&session_id, &log_entries, &modes) { + match replay_session(&session_id, &log_entries, &mode_registry) { Ok(session) => { // Best-effort snapshot update if let Err(e) = storage.save_session(&session).await { @@ -98,7 +71,7 @@ async fn main() -> Result<(), Box> { log_store.append(&session_id, log_entry.clone()).await; } - registry.insert_session_for_test(session_id, session).await; + registry.insert_recovered_session(session_id, session).await; recovered += 1; } Err(e) => { @@ -115,10 +88,11 @@ async fn main() -> Result<(), Box> { } } - let runtime = Arc::new(Runtime::new( + let runtime = Arc::new(Runtime::with_mode_registry( Arc::clone(&storage), Arc::clone(®istry), Arc::clone(&log_store), + mode_registry, )); let security = SecurityLayer::from_env()?; let svc = MacpServer::new(runtime, security); diff --git a/src/mode/decision.rs b/src/mode/decision.rs index 14b42aa..307f59a 100644 --- a/src/mode/decision.rs +++ b/src/mode/decision.rs @@ -81,6 +81,12 @@ impl DecisionMode { } impl Mode for DecisionMode { + /// Authorize the sender for decision mode messages. + /// + /// Implements the **coordinator authority model**: the session initiator + /// may emit `Proposal` and `Commitment` regardless of declared participants. + /// Declared participants may emit `Proposal`, `Evaluation`, `Objection`, + /// and `Vote`. Only the initiator may emit `Commitment`. fn authorize_sender(&self, session: &Session, env: &Envelope) -> Result<(), MacpError> { match env.message_type.as_str() { "Proposal" | "Commitment" if env.sender == session.initiator_sender => Ok(()), @@ -120,6 +126,9 @@ impl Mode for DecisionMode { match env.message_type.as_str() { "Proposal" => { + if state.phase == DecisionPhase::Voting { + return Err(MacpError::InvalidPayload); + } let payload = ProposalPayload::decode(&*env.payload) .map_err(|_| MacpError::InvalidPayload)?; if payload.proposal_id.trim().is_empty() @@ -141,6 +150,9 @@ impl Mode for DecisionMode { Ok(ModeResponse::PersistState(Self::encode_state(&state))) } "Evaluation" => { + if state.phase == DecisionPhase::Proposal { + return Err(MacpError::InvalidPayload); + } let payload = EvaluationPayload::decode(&*env.payload) .map_err(|_| MacpError::InvalidPayload)?; if !state.proposals.contains_key(&payload.proposal_id) { @@ -156,6 +168,9 @@ impl Mode for DecisionMode { Ok(ModeResponse::PersistState(Self::encode_state(&state))) } "Objection" => { + if state.phase == DecisionPhase::Proposal { + return Err(MacpError::InvalidPayload); + } let payload = ObjectionPayload::decode(&*env.payload) .map_err(|_| MacpError::InvalidPayload)?; if !state.proposals.contains_key(&payload.proposal_id) { @@ -174,6 +189,9 @@ impl Mode for DecisionMode { Ok(ModeResponse::PersistState(Self::encode_state(&state))) } "Vote" => { + if state.phase == DecisionPhase::Proposal { + return Err(MacpError::InvalidPayload); + } let payload = VotePayload::decode(&*env.payload).map_err(|_| MacpError::InvalidPayload)?; if !state.proposals.contains_key(&payload.proposal_id) { @@ -393,6 +411,185 @@ mod tests { ); } + fn evaluation(proposal_id: &str) -> Vec { + EvaluationPayload { + proposal_id: proposal_id.into(), + recommendation: "proceed".into(), + confidence: 0.9, + reason: "good".into(), + } + .encode_to_vec() + } + + fn objection(proposal_id: &str) -> Vec { + ObjectionPayload { + proposal_id: proposal_id.into(), + reason: "risky".into(), + severity: "high".into(), + } + .encode_to_vec() + } + + #[test] + fn evaluation_before_any_proposal_rejected() { + let mode = DecisionMode; + let mut session = test_session(); + let resp = mode + .on_session_start( + &session, + &env("agent://orchestrator", "SessionStart", vec![]), + ) + .unwrap(); + apply(&mut session, resp); + assert_eq!( + mode.on_message( + &session, + &env("agent://fraud", "Evaluation", evaluation("p1")) + ) + .unwrap_err() + .to_string(), + "InvalidPayload" + ); + } + + #[test] + fn objection_before_any_proposal_rejected() { + let mode = DecisionMode; + let mut session = test_session(); + let resp = mode + .on_session_start( + &session, + &env("agent://orchestrator", "SessionStart", vec![]), + ) + .unwrap(); + apply(&mut session, resp); + assert_eq!( + mode.on_message( + &session, + &env("agent://fraud", "Objection", objection("p1")) + ) + .unwrap_err() + .to_string(), + "InvalidPayload" + ); + } + + #[test] + fn vote_before_any_proposal_rejected() { + let mode = DecisionMode; + let mut session = test_session(); + let resp = mode + .on_session_start( + &session, + &env("agent://orchestrator", "SessionStart", vec![]), + ) + .unwrap(); + apply(&mut session, resp); + assert_eq!( + mode.on_message( + &session, + &env("agent://fraud", "Vote", vote("p1", "approve")) + ) + .unwrap_err() + .to_string(), + "InvalidPayload" + ); + } + + #[test] + fn proposal_after_voting_rejected() { + let mode = DecisionMode; + let mut session = test_session(); + let resp = mode + .on_session_start( + &session, + &env("agent://orchestrator", "SessionStart", vec![]), + ) + .unwrap(); + apply(&mut session, resp); + let resp = mode + .on_message( + &session, + &env("agent://orchestrator", "Proposal", proposal("p1")), + ) + .unwrap(); + apply(&mut session, resp); + let resp = mode + .on_message( + &session, + &env("agent://fraud", "Vote", vote("p1", "approve")), + ) + .unwrap(); + apply(&mut session, resp); + assert_eq!( + mode.on_message( + &session, + &env("agent://orchestrator", "Proposal", proposal("p2")) + ) + .unwrap_err() + .to_string(), + "InvalidPayload" + ); + } + + #[test] + fn commitment_from_non_initiator_rejected() { + let mode = DecisionMode; + let mut session = test_session(); + let resp = mode + .on_session_start( + &session, + &env("agent://orchestrator", "SessionStart", vec![]), + ) + .unwrap(); + apply(&mut session, resp); + let resp = mode + .on_message( + &session, + &env("agent://orchestrator", "Proposal", proposal("p1")), + ) + .unwrap(); + apply(&mut session, resp); + assert_eq!( + mode.authorize_sender( + &session, + &env("agent://fraud", "Commitment", commitment(&session)) + ) + .unwrap_err() + .to_string(), + "Forbidden" + ); + } + + #[test] + fn empty_proposal_id_rejected() { + let mode = DecisionMode; + let mut session = test_session(); + let resp = mode + .on_session_start( + &session, + &env("agent://orchestrator", "SessionStart", vec![]), + ) + .unwrap(); + apply(&mut session, resp); + let empty_proposal = ProposalPayload { + proposal_id: "".into(), + option: "option".into(), + rationale: "because".into(), + supporting_data: vec![], + } + .encode_to_vec(); + assert_eq!( + mode.on_message( + &session, + &env("agent://orchestrator", "Proposal", empty_proposal) + ) + .unwrap_err() + .to_string(), + "InvalidPayload" + ); + } + #[test] fn commitment_versions_must_match_session_bindings() { let mode = DecisionMode; diff --git a/src/mode/handoff.rs b/src/mode/handoff.rs index 4fe2519..bc02744 100644 --- a/src/mode/handoff.rs +++ b/src/mode/handoff.rs @@ -109,6 +109,10 @@ impl Mode for HandoffMode { || state.offers.contains_key(&payload.handoff_id) || !is_declared_participant(&session.participants, &payload.target_participant) || payload.target_participant == env.sender + || state + .offers + .values() + .any(|o| o.disposition == HandoffDisposition::Offered) { return Err(MacpError::InvalidPayload); } @@ -736,6 +740,133 @@ mod tests { assert!(matches!(result, ModeResponse::PersistAndResolve { .. })); } + // --- Serial offer enforcement --- + + #[test] + fn second_offer_while_first_pending_rejected() { + let mode = HandoffMode; + let mut session = base_session(); + session.participants = vec!["owner".into(), "target".into(), "other".into()]; + let result = mode + .on_session_start(&session, &env("owner", "SessionStart", vec![])) + .unwrap(); + apply(&mut session, result); + let result = mode + .on_message( + &session, + &env("owner", "HandoffOffer", make_offer("h1", "target")), + ) + .unwrap(); + apply(&mut session, result); + let err = mode + .on_message( + &session, + &env("owner", "HandoffOffer", make_offer("h2", "other")), + ) + .unwrap_err(); + assert_eq!(err.to_string(), "InvalidPayload"); + } + + #[test] + fn second_offer_after_first_accepted_succeeds() { + let mode = HandoffMode; + let mut session = base_session(); + session.participants = vec!["owner".into(), "target".into(), "other".into()]; + let result = mode + .on_session_start(&session, &env("owner", "SessionStart", vec![])) + .unwrap(); + apply(&mut session, result); + let result = mode + .on_message( + &session, + &env("owner", "HandoffOffer", make_offer("h1", "target")), + ) + .unwrap(); + apply(&mut session, result); + let result = mode + .on_message( + &session, + &env("target", "HandoffAccept", make_accept("h1", "target")), + ) + .unwrap(); + apply(&mut session, result); + mode.on_message( + &session, + &env("owner", "HandoffOffer", make_offer("h2", "other")), + ) + .unwrap(); + } + + #[test] + fn second_offer_after_first_declined_succeeds() { + let mode = HandoffMode; + let mut session = base_session(); + session.participants = vec!["owner".into(), "target".into(), "other".into()]; + let result = mode + .on_session_start(&session, &env("owner", "SessionStart", vec![])) + .unwrap(); + apply(&mut session, result); + let result = mode + .on_message( + &session, + &env("owner", "HandoffOffer", make_offer("h1", "target")), + ) + .unwrap(); + apply(&mut session, result); + let result = mode + .on_message( + &session, + &env("target", "HandoffDecline", make_decline("h1", "target")), + ) + .unwrap(); + apply(&mut session, result); + mode.on_message( + &session, + &env("owner", "HandoffOffer", make_offer("h2", "other")), + ) + .unwrap(); + } + + // --- Commitment version mismatch --- + + #[test] + fn commitment_version_mismatch_rejected() { + let mode = HandoffMode; + let mut session = base_session(); + let result = mode + .on_session_start(&session, &env("owner", "SessionStart", vec![])) + .unwrap(); + apply(&mut session, result); + let result = mode + .on_message( + &session, + &env("owner", "HandoffOffer", make_offer("h1", "target")), + ) + .unwrap(); + apply(&mut session, result); + let result = mode + .on_message( + &session, + &env("target", "HandoffAccept", make_accept("h1", "target")), + ) + .unwrap(); + apply(&mut session, result); + let bad_commitment = CommitmentPayload { + commitment_id: "c1".into(), + action: "handoff.accepted".into(), + authority_scope: "support".into(), + reason: "accepted".into(), + mode_version: "wrong".into(), + policy_version: "policy".into(), + configuration_version: "config".into(), + } + .encode_to_vec(); + let err = mode + .on_message(&session, &env("owner", "Commitment", bad_commitment)) + .unwrap_err(); + assert_eq!(err.to_string(), "InvalidPayload"); + } + // --- Unknown message type --- #[test] diff --git a/src/mode/proposal.rs b/src/mode/proposal.rs index b2632fd..12eb02d 100644 --- a/src/mode/proposal.rs +++ b/src/mode/proposal.rs @@ -480,6 +480,249 @@ mod tests { assert_eq!(err.to_string(), "InvalidPayload"); } + fn make_counter_proposal(id: &str, supersedes: &str) -> Vec { + CounterProposalPayload { + proposal_id: id.into(), + supersedes_proposal_id: supersedes.into(), + title: format!("counter-{id}"), + summary: "counter".into(), + details: vec![], + } + .encode_to_vec() + } + + #[test] + fn empty_proposal_id_rejected() { + let mode = ProposalMode; + let mut session = base_session(); + let resp = mode + .on_session_start(&session, &env("agent://buyer", "SessionStart", vec![])) + .unwrap(); + apply(&mut session, resp); + let empty = ProposalPayload { + proposal_id: "".into(), + title: "title".into(), + summary: "summary".into(), + details: vec![], + tags: vec![], + } + .encode_to_vec(); + assert_eq!( + mode.on_message(&session, &env("agent://seller", "Proposal", empty)) + .unwrap_err() + .to_string(), + "InvalidPayload" + ); + } + + #[test] + fn counterproposal_requires_valid_supersedes() { + let mode = ProposalMode; + let mut session = base_session(); + let resp = mode + .on_session_start(&session, &env("agent://buyer", "SessionStart", vec![])) + .unwrap(); + apply(&mut session, resp); + let resp = mode + .on_message( + &session, + &env("agent://seller", "Proposal", make_proposal("p1")), + ) + .unwrap(); + apply(&mut session, resp); + // Supersedes non-existent proposal + let bad_counter = make_counter_proposal("p2", "nonexistent"); + assert_eq!( + mode.on_message( + &session, + &env("agent://buyer", "CounterProposal", bad_counter) + ) + .unwrap_err() + .to_string(), + "InvalidPayload" + ); + } + + #[test] + fn counterproposal_chain_works() { + let mode = ProposalMode; + let mut session = base_session(); + let resp = mode + .on_session_start(&session, &env("agent://buyer", "SessionStart", vec![])) + .unwrap(); + apply(&mut session, resp); + let resp = mode + .on_message( + &session, + &env("agent://seller", "Proposal", make_proposal("p1")), + ) + .unwrap(); + apply(&mut session, resp); + let resp = mode + .on_message( + &session, + &env( + "agent://buyer", + "CounterProposal", + make_counter_proposal("p2", "p1"), + ), + ) + .unwrap(); + apply(&mut session, resp); + // Chain: p3 supersedes p2 + mode.on_message( + &session, + &env( + "agent://seller", + "CounterProposal", + make_counter_proposal("p3", "p2"), + ), + ) + .unwrap(); + } + + #[test] + fn non_terminal_reject_does_not_enable_commitment() { + let mode = ProposalMode; + let mut session = base_session(); + let resp = mode + .on_session_start(&session, &env("agent://buyer", "SessionStart", vec![])) + .unwrap(); + apply(&mut session, resp); + let resp = mode + .on_message( + &session, + &env("agent://seller", "Proposal", make_proposal("p1")), + ) + .unwrap(); + apply(&mut session, resp); + // Non-terminal reject + let resp = mode + .on_message( + &session, + &env("agent://buyer", "Reject", make_reject("p1", false)), + ) + .unwrap(); + apply(&mut session, resp); + assert_eq!( + mode.on_message( + &session, + &env( + "agent://buyer", + "Commitment", + commitment(&session, "proposal.rejected"), + ), + ) + .unwrap_err() + .to_string(), + "InvalidPayload" + ); + } + + #[test] + fn non_participant_cannot_propose() { + let mode = ProposalMode; + let session = base_session(); + let err = mode + .authorize_sender( + &session, + &env("agent://outsider", "Proposal", make_proposal("p1")), + ) + .unwrap_err(); + assert_eq!(err.to_string(), "Forbidden"); + } + + #[test] + fn commitment_version_mismatch_rejected() { + let mode = ProposalMode; + let mut session = base_session(); + let resp = mode + .on_session_start(&session, &env("agent://buyer", "SessionStart", vec![])) + .unwrap(); + apply(&mut session, resp); + let resp = mode + .on_message( + &session, + &env("agent://seller", "Proposal", make_proposal("p1")), + ) + .unwrap(); + apply(&mut session, resp); + let resp = mode + .on_message(&session, &env("agent://buyer", "Accept", make_accept("p1"))) + .unwrap(); + apply(&mut session, resp); + let resp = mode + .on_message( + &session, + &env("agent://seller", "Accept", make_accept("p1")), + ) + .unwrap(); + apply(&mut session, resp); + let bad = CommitmentPayload { + commitment_id: "c1".into(), + action: "proposal.accepted".into(), + authority_scope: "commercial".into(), + reason: "bound".into(), + mode_version: "wrong".into(), + policy_version: session.policy_version.clone(), + configuration_version: session.configuration_version.clone(), + } + .encode_to_vec(); + assert_eq!( + mode.on_message(&session, &env("agent://buyer", "Commitment", bad)) + .unwrap_err() + .to_string(), + "InvalidPayload" + ); + } + + #[test] + fn accept_on_withdrawn_proposal_rejected() { + let mode = ProposalMode; + let mut session = base_session(); + let resp = mode + .on_session_start(&session, &env("agent://buyer", "SessionStart", vec![])) + .unwrap(); + apply(&mut session, resp); + let resp = mode + .on_message( + &session, + &env("agent://seller", "Proposal", make_proposal("p1")), + ) + .unwrap(); + apply(&mut session, resp); + let resp = mode + .on_message( + &session, + &env("agent://seller", "Withdraw", make_withdraw("p1")), + ) + .unwrap(); + apply(&mut session, resp); + assert_eq!( + mode.on_message(&session, &env("agent://buyer", "Accept", make_accept("p1"))) + .unwrap_err() + .to_string(), + "InvalidPayload" + ); + } + + #[test] + fn commitment_from_non_initiator_rejected() { + let mode = ProposalMode; + let session = base_session(); + let err = mode + .authorize_sender( + &session, + &env( + "agent://seller", + "Commitment", + commitment(&session, "proposal.accepted"), + ), + ) + .unwrap_err(); + assert_eq!(err.to_string(), "Forbidden"); + } + #[test] fn terminal_rejection_on_different_proposal_survives_withdraw() { let mode = ProposalMode; diff --git a/src/mode/quorum.rs b/src/mode/quorum.rs index 85ae90a..6af00ff 100644 --- a/src/mode/quorum.rs +++ b/src/mode/quorum.rs @@ -862,6 +862,54 @@ mod tests { assert!(matches!(result, ModeResponse::PersistAndResolve { .. })); } + // --- Commitment version mismatch --- + + #[test] + fn commitment_version_mismatch_rejected() { + let mode = QuorumMode; + let mut session = base_session(); + let result = mode + .on_session_start(&session, &env("coordinator", "SessionStart", vec![])) + .unwrap(); + apply(&mut session, result); + let result = mode + .on_message( + &session, + &env( + "coordinator", + "ApprovalRequest", + make_approval_request("r1", 2), + ), + ) + .unwrap(); + apply(&mut session, result); + let result = mode + .on_message( + &session, + &env("alice", "Approve", make_approve("r1", "yes")), + ) + .unwrap(); + apply(&mut session, result); + let result = mode + .on_message(&session, &env("bob", "Approve", make_approve("r1", "yes"))) + .unwrap(); + apply(&mut session, result); + let bad_commitment = CommitmentPayload { + commitment_id: "c1".into(), + action: "quorum.approved".into(), + authority_scope: "deploy".into(), + reason: "threshold met".into(), + mode_version: "wrong".into(), + policy_version: "policy".into(), + configuration_version: "config".into(), + } + .encode_to_vec(); + let err = mode + .on_message(&session, &env("coordinator", "Commitment", bad_commitment)) + .unwrap_err(); + assert_eq!(err.to_string(), "InvalidPayload"); + } + // --- Unknown message type --- #[test] diff --git a/src/mode/task.rs b/src/mode/task.rs index 493284e..f4e5ab1 100644 --- a/src/mode/task.rs +++ b/src/mode/task.rs @@ -1029,6 +1029,53 @@ mod tests { } } + // --- Commitment version mismatch --- + + #[test] + fn commitment_version_mismatch_rejected() { + let mode = TaskMode; + let mut session = base_session(); + let result = mode + .on_session_start(&session, &env("planner", "SessionStart", vec![])) + .unwrap(); + apply(&mut session, result); + let result = mode + .on_message( + &session, + &env("planner", "TaskRequest", make_task_request("t1", "worker")), + ) + .unwrap(); + apply(&mut session, result); + let result = mode + .on_message( + &session, + &env("worker", "TaskAccept", make_task_accept("t1", "worker")), + ) + .unwrap(); + apply(&mut session, result); + let result = mode + .on_message( + &session, + &env("worker", "TaskComplete", make_task_complete("t1", "worker")), + ) + .unwrap(); + apply(&mut session, result); + let bad_commitment = CommitmentPayload { + commitment_id: "c1".into(), + action: "task.completed".into(), + authority_scope: "ops".into(), + reason: "done".into(), + mode_version: "wrong".into(), + policy_version: "policy".into(), + configuration_version: "config".into(), + } + .encode_to_vec(); + let err = mode + .on_message(&session, &env("planner", "Commitment", bad_commitment)) + .unwrap_err(); + assert_eq!(err.to_string(), "InvalidPayload"); + } + // --- Unknown message type --- #[test] diff --git a/src/mode_registry.rs b/src/mode_registry.rs new file mode 100644 index 0000000..d468b58 --- /dev/null +++ b/src/mode_registry.rs @@ -0,0 +1,133 @@ +use std::collections::HashMap; + +use crate::mode::decision::DecisionMode; +use crate::mode::handoff::HandoffMode; +use crate::mode::multi_round::MultiRoundMode; +use crate::mode::proposal::ProposalMode; +use crate::mode::quorum::QuorumMode; +use crate::mode::task::TaskMode; +use crate::mode::{standard_mode_descriptors, Mode, STANDARD_MODE_NAMES}; +use crate::pb::ModeDescriptor; + +pub struct ModeRegistration { + pub mode_name: String, + pub mode: Box, + pub descriptor: Option, + pub standards_track: bool, +} + +pub struct ModeRegistry { + entries: HashMap, +} + +impl ModeRegistry { + /// Build the default registry with all 5 standard + 1 experimental modes. + pub fn build_default() -> Self { + let descriptors = standard_mode_descriptors(); + let descriptor_map: HashMap = descriptors + .into_iter() + .map(|d| (d.mode.clone(), d)) + .collect(); + + let mut entries = HashMap::new(); + + let standard_modes: Vec<(&str, Box)> = vec![ + ("macp.mode.decision.v1", Box::new(DecisionMode)), + ("macp.mode.proposal.v1", Box::new(ProposalMode)), + ("macp.mode.task.v1", Box::new(TaskMode)), + ("macp.mode.handoff.v1", Box::new(HandoffMode)), + ("macp.mode.quorum.v1", Box::new(QuorumMode)), + ]; + + for (name, mode) in standard_modes { + entries.insert( + name.to_string(), + ModeRegistration { + mode_name: name.to_string(), + mode, + descriptor: descriptor_map.get(name).cloned(), + standards_track: true, + }, + ); + } + + // Experimental mode + entries.insert( + "macp.mode.multi_round.v1".to_string(), + ModeRegistration { + mode_name: "macp.mode.multi_round.v1".to_string(), + mode: Box::new(MultiRoundMode), + descriptor: None, + standards_track: false, + }, + ); + + Self { entries } + } + + pub fn get_mode(&self, name: &str) -> Option<&dyn Mode> { + self.entries.get(name).map(|e| e.mode.as_ref()) + } + + pub fn standard_mode_names(&self) -> Vec { + STANDARD_MODE_NAMES + .iter() + .filter(|name| self.entries.contains_key(**name)) + .map(|name| (*name).to_string()) + .collect() + } + + pub fn standard_mode_descriptors(&self) -> Vec { + STANDARD_MODE_NAMES + .iter() + .filter_map(|name| self.entries.get(*name).and_then(|e| e.descriptor.clone())) + .collect() + } + + pub fn is_standard_mode(&self, name: &str) -> bool { + self.entries + .get(name) + .map(|e| e.standards_track) + .unwrap_or(false) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn build_default_contains_all_standard_modes() { + let registry = ModeRegistry::build_default(); + for name in STANDARD_MODE_NAMES { + assert!(registry.get_mode(name).is_some(), "missing mode: {name}"); + assert!(registry.is_standard_mode(name)); + } + } + + #[test] + fn build_default_contains_experimental_mode() { + let registry = ModeRegistry::build_default(); + assert!(registry.get_mode("macp.mode.multi_round.v1").is_some()); + assert!(!registry.is_standard_mode("macp.mode.multi_round.v1")); + } + + #[test] + fn standard_mode_names_returns_five() { + let registry = ModeRegistry::build_default(); + assert_eq!(registry.standard_mode_names().len(), 5); + } + + #[test] + fn standard_mode_descriptors_returns_five() { + let registry = ModeRegistry::build_default(); + assert_eq!(registry.standard_mode_descriptors().len(), 5); + } + + #[test] + fn unknown_mode_returns_none() { + let registry = ModeRegistry::build_default(); + assert!(registry.get_mode("nonexistent").is_none()); + assert!(!registry.is_standard_mode("nonexistent")); + } +} diff --git a/src/registry.rs b/src/registry.rs index 3f5e43c..3e71911 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -193,7 +193,7 @@ impl SessionRegistry { guard.values().cloned().collect() } - pub async fn insert_session_for_test(&self, session_id: String, session: Session) { + pub async fn insert_recovered_session(&self, session_id: String, session: Session) { let mut guard = self.sessions.write().await; guard.insert(session_id, session); let _ = self.persist_locked(&guard).await; @@ -255,7 +255,7 @@ mod tests { expired.ttl_expiry = now - 1000; // expired 1 second ago expired.state = SessionState::Open; // still Open but TTL is past registry - .insert_session_for_test("expired-s1".into(), expired) + .insert_recovered_session("expired-s1".into(), expired) .await; // Should not count the expired-but-open session @@ -270,7 +270,7 @@ mod tests { active.ttl_expiry = now + 60_000; // expires in 60s active.state = SessionState::Open; registry - .insert_session_for_test("active-s1".into(), active) + .insert_recovered_session("active-s1".into(), active) .await; let count = registry @@ -291,7 +291,7 @@ mod tests { let registry = SessionRegistry::with_persistence(&base).unwrap(); registry - .insert_session_for_test("s1".into(), sample_session("s1")) + .insert_recovered_session("s1".into(), sample_session("s1")) .await; let reopened = SessionRegistry::with_persistence(&base).unwrap(); diff --git a/src/replay.rs b/src/replay.rs index 4130e66..6f1f90e 100644 --- a/src/replay.rs +++ b/src/replay.rs @@ -1,8 +1,6 @@ -use std::collections::HashMap; - use crate::error::MacpError; use crate::log_store::{EntryKind, LogEntry}; -use crate::mode::Mode; +use crate::mode_registry::ModeRegistry; use crate::pb::Envelope; use crate::session::{ extract_ttl_ms, is_standard_mode, parse_session_start_payload, @@ -19,7 +17,7 @@ const EXPERIMENTAL_DEFAULT_TTL_MS: i64 = 60_000; pub fn replay_session( session_id: &str, log_entries: &[LogEntry], - modes: &HashMap>, + registry: &ModeRegistry, ) -> Result { // 1. Find the SessionStart entry let start_entry = log_entries @@ -36,7 +34,7 @@ pub fn replay_session( &start_entry.mode }; - let mode = modes.get(mode_name).ok_or(MacpError::UnknownMode)?; + let mode = registry.get_mode(mode_name).ok_or(MacpError::UnknownMode)?; // 2. Parse SessionStartPayload let start_payload = if start_entry.raw_payload.is_empty() && !is_standard_mode(mode_name) { @@ -172,15 +170,11 @@ mod tests { use crate::decision_pb::ProposalPayload; use crate::decision_pb::VotePayload; use crate::log_store::EntryKind; - use crate::mode::decision::DecisionMode; - use crate::mode::Mode; use crate::pb::{CommitmentPayload, SessionStartPayload}; use prost::Message; - fn make_modes() -> HashMap> { - let mut modes: HashMap> = HashMap::new(); - modes.insert("macp.mode.decision.v1".into(), Box::new(DecisionMode)); - modes + fn make_registry() -> ModeRegistry { + ModeRegistry::build_default() } fn start_payload_bytes() -> Vec { @@ -233,7 +227,7 @@ mod tests { #[test] fn replay_rebuilds_decision_session() { - let modes = make_modes(); + let registry = make_registry(); let proposal = ProposalPayload { proposal_id: "p1".into(), option: "deploy".into(), @@ -271,7 +265,7 @@ mod tests { incoming_entry("m4", "Commitment", "agent://orchestrator", commitment, 4000), ]; - let session = replay_session("s1", &entries, &modes).unwrap(); + let session = replay_session("s1", &entries, ®istry).unwrap(); assert_eq!(session.state, SessionState::Resolved); assert_eq!(session.session_id, "s1"); assert!(session.seen_message_ids.contains("m1")); @@ -283,7 +277,7 @@ mod tests { #[test] fn replay_preserves_original_ttl() { - let modes = make_modes(); + let registry = make_registry(); let original_time = 1_700_000_000_000i64; let entries = vec![incoming_entry( "m1", @@ -293,7 +287,7 @@ mod tests { original_time, )]; - let session = replay_session("s1", &entries, &modes).unwrap(); + let session = replay_session("s1", &entries, ®istry).unwrap(); assert_eq!(session.started_at_unix_ms, original_time); assert_eq!(session.ttl_expiry, original_time + 60_000); assert_eq!(session.ttl_ms, 60_000); @@ -301,7 +295,7 @@ mod tests { #[test] fn replay_handles_ttl_expired() { - let modes = make_modes(); + let registry = make_registry(); let entries = vec![ incoming_entry( "m1", @@ -313,13 +307,13 @@ mod tests { internal_entry("TtlExpired", 61001), ]; - let session = replay_session("s1", &entries, &modes).unwrap(); + let session = replay_session("s1", &entries, ®istry).unwrap(); assert_eq!(session.state, SessionState::Expired); } #[test] fn replay_handles_session_cancel() { - let modes = make_modes(); + let registry = make_registry(); let entries = vec![ incoming_entry( "m1", @@ -331,14 +325,14 @@ mod tests { internal_entry("SessionCancel", 5000), ]; - let session = replay_session("s1", &entries, &modes).unwrap(); + let session = replay_session("s1", &entries, ®istry).unwrap(); assert_eq!(session.state, SessionState::Expired); } #[test] fn replay_empty_log_returns_error() { - let modes = make_modes(); - let result = replay_session("s1", &[], &modes); + let registry = make_registry(); + let result = replay_session("s1", &[], ®istry); assert!(result.is_err()); } diff --git a/src/runtime.rs b/src/runtime.rs index acd9c19..8b5fe0e 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -1,17 +1,10 @@ use chrono::Utc; -use std::collections::HashMap; use std::sync::Arc; use crate::error::MacpError; use crate::log_store::{EntryKind, LogEntry, LogStore}; -use crate::mode::decision::DecisionMode; -use crate::mode::handoff::HandoffMode; -use crate::mode::multi_round::MultiRoundMode; -use crate::mode::proposal::ProposalMode; -use crate::mode::quorum::QuorumMode; -use crate::mode::task::TaskMode; -use crate::mode::{standard_mode_names, Mode}; -use crate::pb::{Envelope, SessionStartPayload}; +use crate::mode_registry::ModeRegistry; +use crate::pb::{Envelope, ModeDescriptor, SessionStartPayload}; use crate::registry::SessionRegistry; use crate::session::{ extract_ttl_ms, is_standard_mode, parse_session_start_payload, @@ -34,7 +27,7 @@ pub struct Runtime { pub registry: Arc, pub log_store: Arc, stream_bus: Arc, - modes: HashMap>, + mode_registry: Arc, } impl Runtime { @@ -43,29 +36,39 @@ impl Runtime { registry: Arc, log_store: Arc, ) -> Self { - let mut modes: HashMap> = HashMap::new(); - modes.insert("macp.mode.decision.v1".into(), Box::new(DecisionMode)); - modes.insert("macp.mode.proposal.v1".into(), Box::new(ProposalMode)); - modes.insert("macp.mode.task.v1".into(), Box::new(TaskMode)); - modes.insert("macp.mode.handoff.v1".into(), Box::new(HandoffMode)); - modes.insert("macp.mode.quorum.v1".into(), Box::new(QuorumMode)); - modes.insert("macp.mode.multi_round.v1".into(), Box::new(MultiRoundMode)); + Self::with_mode_registry( + storage, + registry, + log_store, + Arc::new(ModeRegistry::build_default()), + ) + } + pub fn with_mode_registry( + storage: Arc, + registry: Arc, + log_store: Arc, + mode_registry: Arc, + ) -> Self { Self { storage, registry, log_store, stream_bus: Arc::new(SessionStreamBus::default()), - modes, + mode_registry, } } pub fn registered_mode_names(&self) -> Vec { - standard_mode_names() - .iter() - .filter(|mode_name| self.modes.contains_key(**mode_name)) - .map(|mode_name| (*mode_name).to_string()) - .collect() + self.mode_registry.standard_mode_names() + } + + pub fn standard_mode_descriptors(&self) -> Vec { + self.mode_registry.standard_mode_descriptors() + } + + pub fn mode_registry(&self) -> &Arc { + &self.mode_registry } pub fn subscribe_session_stream( @@ -164,7 +167,10 @@ impl Runtime { } validate_session_id_for_acceptance(&env.session_id)?; let mode_name = env.mode.as_str(); - let mode = self.modes.get(mode_name).ok_or(MacpError::UnknownMode)?; + let mode = self + .mode_registry + .get_mode(mode_name) + .ok_or(MacpError::UnknownMode)?; let start_payload = if env.payload.is_empty() && !is_standard_mode(mode_name) { SessionStartPayload::default() @@ -307,8 +313,8 @@ impl Runtime { } let mode = self - .modes - .get(&session.mode) + .mode_registry + .get_mode(&session.mode) .ok_or(MacpError::UnknownMode)?; mode.authorize_sender(session, env)?; let response = mode.on_message(session, env)?; diff --git a/src/server.rs b/src/server.rs index 7f2c9bc..f62766f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,5 +1,4 @@ use macp_runtime::error::MacpError; -use macp_runtime::mode::standard_mode_descriptors; use macp_runtime::pb::macp_runtime_service_server::MacpRuntimeService; use macp_runtime::pb::{ Ack, CancelSessionRequest, CancelSessionResponse, CancellationCapability, Capabilities, @@ -418,7 +417,7 @@ impl MacpRuntimeService for MacpServer { experimental: None, }), supported_modes: self.runtime.registered_mode_names(), - instructions: "Authenticate requests with Authorization: Bearer . Use StreamSession for session-scoped bidirectional streaming of accepted envelopes. For local development only, x-macp-agent-id may be enabled by configuration.".into(), + instructions: "Authenticate requests with Authorization: Bearer . Use the unary Send RPC for all session messaging. For local development only, x-macp-agent-id may be enabled by configuration.".into(), })) } @@ -563,6 +562,7 @@ impl MacpRuntimeService for MacpServer { input_content_types: vec!["application/macp-envelope+proto".into()], output_content_types: vec!["application/macp-envelope+proto".into()], metadata: HashMap::new(), + // Empty: unary-first profile has no dedicated transport endpoints. transport_endpoints: vec![], }), })) @@ -573,7 +573,7 @@ impl MacpRuntimeService for MacpServer { _request: Request, ) -> Result, Status> { Ok(Response::new(ListModesResponse { - modes: standard_mode_descriptors(), + modes: self.runtime.standard_mode_descriptors(), })) } diff --git a/src/session.rs b/src/session.rs index 0e3f6f0..f7e1f07 100644 --- a/src/session.rs +++ b/src/session.rs @@ -121,12 +121,17 @@ pub fn validate_session_id_for_acceptance(session_id: &str) -> Result<(), MacpEr return Err(MacpError::InvalidSessionId); } - // Try UUID parse: must be valid UUID and canonical lowercase hyphenated form + // Try UUID parse: must be valid UUID v4 or v7, canonical lowercase hyphenated form if session_id.len() == 36 && session_id.contains('-') { if let Ok(parsed) = uuid::Uuid::parse_str(session_id) { // Verify it's the canonical lowercase hyphenated representation if parsed.as_hyphenated().to_string() == session_id { - return Ok(()); + match parsed.get_version() { + Some(uuid::Version::Random) | Some(uuid::Version::SortRand) => { + return Ok(()); + } + _ => {} + } } } return Err(MacpError::InvalidSessionId); @@ -309,4 +314,35 @@ mod tests { "InvalidSessionId" ); } + + #[test] + fn valid_uuid_v7_accepted() { + // Construct a v7 UUID by patching the version nibble of a v4 UUID + let v4 = uuid::Uuid::new_v4(); + let mut bytes = *v4.as_bytes(); + // Set version nibble (bits 48-51) to 0b0111 (v7) + bytes[6] = (bytes[6] & 0x0F) | 0x70; + // Keep variant bits valid (RFC 4122: 0b10xx) + bytes[8] = (bytes[8] & 0x3F) | 0x80; + let v7_id = uuid::Uuid::from_bytes(bytes).as_hyphenated().to_string(); + assert!(validate_session_id_for_acceptance(&v7_id).is_ok()); + } + + #[test] + fn uuid_v1_rejected() { + // Construct a v1 UUID by patching the version nibble of a v4 UUID + let v4 = uuid::Uuid::new_v4(); + let mut bytes = *v4.as_bytes(); + // Set version nibble (bits 48-51) to 0b0001 (v1) + bytes[6] = (bytes[6] & 0x0F) | 0x10; + // Keep variant bits valid (RFC 4122: 0b10xx) + bytes[8] = (bytes[8] & 0x3F) | 0x80; + let v1_id = uuid::Uuid::from_bytes(bytes).as_hyphenated().to_string(); + assert_eq!( + validate_session_id_for_acceptance(&v1_id) + .unwrap_err() + .to_string(), + "InvalidSessionId" + ); + } } diff --git a/tests/conformance/decision_happy_path.json b/tests/conformance/decision_happy_path.json new file mode 100644 index 0000000..d882967 --- /dev/null +++ b/tests/conformance/decision_happy_path.json @@ -0,0 +1,41 @@ +{ + "mode": "macp.mode.decision.v1", + "initiator": "agent://orchestrator", + "participants": ["agent://a", "agent://b"], + "mode_version": "1.0.0", + "configuration_version": "cfg-1", + "policy_version": "policy-1", + "ttl_ms": 60000, + "messages": [ + { + "sender": "agent://orchestrator", + "message_type": "Proposal", + "payload_type": "decision.Proposal", + "payload": { "proposal_id": "p1", "option": "deploy", "rationale": "ready", "supporting_data": [] }, + "expect": "accept" + }, + { + "sender": "agent://a", + "message_type": "Vote", + "payload_type": "decision.Vote", + "payload": { "proposal_id": "p1", "vote": "approve", "reason": "good" }, + "expect": "accept" + }, + { + "sender": "agent://orchestrator", + "message_type": "Commitment", + "payload_type": "Commitment", + "payload": { + "commitment_id": "c1", + "action": "decision.selected", + "authority_scope": "test", + "reason": "done", + "mode_version": "1.0.0", + "policy_version": "policy-1", + "configuration_version": "cfg-1" + }, + "expect": "accept" + } + ], + "expected_final_state": "Resolved" +} diff --git a/tests/conformance/handoff_happy_path.json b/tests/conformance/handoff_happy_path.json new file mode 100644 index 0000000..6e341cb --- /dev/null +++ b/tests/conformance/handoff_happy_path.json @@ -0,0 +1,41 @@ +{ + "mode": "macp.mode.handoff.v1", + "initiator": "agent://owner", + "participants": ["agent://owner", "agent://target"], + "mode_version": "1.0.0", + "configuration_version": "cfg-1", + "policy_version": "policy-1", + "ttl_ms": 60000, + "messages": [ + { + "sender": "agent://owner", + "message_type": "HandoffOffer", + "payload_type": "handoff.HandoffOffer", + "payload": { "handoff_id": "h1", "target_participant": "agent://target", "scope": "support", "reason": "escalate" }, + "expect": "accept" + }, + { + "sender": "agent://target", + "message_type": "HandoffAccept", + "payload_type": "handoff.HandoffAccept", + "payload": { "handoff_id": "h1", "accepted_by": "agent://target", "reason": "ready" }, + "expect": "accept" + }, + { + "sender": "agent://owner", + "message_type": "Commitment", + "payload_type": "Commitment", + "payload": { + "commitment_id": "c1", + "action": "handoff.accepted", + "authority_scope": "test", + "reason": "done", + "mode_version": "1.0.0", + "policy_version": "policy-1", + "configuration_version": "cfg-1" + }, + "expect": "accept" + } + ], + "expected_final_state": "Resolved" +} diff --git a/tests/conformance/proposal_happy_path.json b/tests/conformance/proposal_happy_path.json new file mode 100644 index 0000000..61956b5 --- /dev/null +++ b/tests/conformance/proposal_happy_path.json @@ -0,0 +1,48 @@ +{ + "mode": "macp.mode.proposal.v1", + "initiator": "agent://buyer", + "participants": ["agent://buyer", "agent://seller"], + "mode_version": "1.0.0", + "configuration_version": "cfg-1", + "policy_version": "policy-1", + "ttl_ms": 60000, + "messages": [ + { + "sender": "agent://seller", + "message_type": "Proposal", + "payload_type": "proposal.Proposal", + "payload": { "proposal_id": "p1", "title": "offer", "summary": "terms", "details": [], "tags": [] }, + "expect": "accept" + }, + { + "sender": "agent://buyer", + "message_type": "Accept", + "payload_type": "proposal.Accept", + "payload": { "proposal_id": "p1", "reason": "" }, + "expect": "accept" + }, + { + "sender": "agent://seller", + "message_type": "Accept", + "payload_type": "proposal.Accept", + "payload": { "proposal_id": "p1", "reason": "" }, + "expect": "accept" + }, + { + "sender": "agent://buyer", + "message_type": "Commitment", + "payload_type": "Commitment", + "payload": { + "commitment_id": "c1", + "action": "proposal.accepted", + "authority_scope": "test", + "reason": "done", + "mode_version": "1.0.0", + "policy_version": "policy-1", + "configuration_version": "cfg-1" + }, + "expect": "accept" + } + ], + "expected_final_state": "Resolved" +} diff --git a/tests/conformance/quorum_happy_path.json b/tests/conformance/quorum_happy_path.json new file mode 100644 index 0000000..acc38a3 --- /dev/null +++ b/tests/conformance/quorum_happy_path.json @@ -0,0 +1,48 @@ +{ + "mode": "macp.mode.quorum.v1", + "initiator": "agent://coordinator", + "participants": ["agent://alice", "agent://bob", "agent://carol"], + "mode_version": "1.0.0", + "configuration_version": "cfg-1", + "policy_version": "policy-1", + "ttl_ms": 60000, + "messages": [ + { + "sender": "agent://coordinator", + "message_type": "ApprovalRequest", + "payload_type": "quorum.ApprovalRequest", + "payload": { "request_id": "r1", "action": "deploy", "summary": "Deploy v2", "details": [], "required_approvals": 2 }, + "expect": "accept" + }, + { + "sender": "agent://alice", + "message_type": "Approve", + "payload_type": "quorum.Approve", + "payload": { "request_id": "r1", "reason": "lgtm" }, + "expect": "accept" + }, + { + "sender": "agent://bob", + "message_type": "Approve", + "payload_type": "quorum.Approve", + "payload": { "request_id": "r1", "reason": "ship it" }, + "expect": "accept" + }, + { + "sender": "agent://coordinator", + "message_type": "Commitment", + "payload_type": "Commitment", + "payload": { + "commitment_id": "c1", + "action": "quorum.approved", + "authority_scope": "test", + "reason": "done", + "mode_version": "1.0.0", + "policy_version": "policy-1", + "configuration_version": "cfg-1" + }, + "expect": "accept" + } + ], + "expected_final_state": "Resolved" +} diff --git a/tests/conformance/task_happy_path.json b/tests/conformance/task_happy_path.json new file mode 100644 index 0000000..a4ae447 --- /dev/null +++ b/tests/conformance/task_happy_path.json @@ -0,0 +1,48 @@ +{ + "mode": "macp.mode.task.v1", + "initiator": "agent://planner", + "participants": ["agent://planner", "agent://worker"], + "mode_version": "1.0.0", + "configuration_version": "cfg-1", + "policy_version": "policy-1", + "ttl_ms": 60000, + "messages": [ + { + "sender": "agent://planner", + "message_type": "TaskRequest", + "payload_type": "task.TaskRequest", + "payload": { "task_id": "t1", "title": "Build", "instructions": "Do it", "requested_assignee": "agent://worker", "input": [], "deadline_unix_ms": 0 }, + "expect": "accept" + }, + { + "sender": "agent://worker", + "message_type": "TaskAccept", + "payload_type": "task.TaskAccept", + "payload": { "task_id": "t1", "assignee": "agent://worker", "reason": "ready" }, + "expect": "accept" + }, + { + "sender": "agent://worker", + "message_type": "TaskComplete", + "payload_type": "task.TaskComplete", + "payload": { "task_id": "t1", "assignee": "agent://worker", "output": [], "summary": "done" }, + "expect": "accept" + }, + { + "sender": "agent://planner", + "message_type": "Commitment", + "payload_type": "Commitment", + "payload": { + "commitment_id": "c1", + "action": "task.completed", + "authority_scope": "test", + "reason": "done", + "mode_version": "1.0.0", + "policy_version": "policy-1", + "configuration_version": "cfg-1" + }, + "expect": "accept" + } + ], + "expected_final_state": "Resolved" +} diff --git a/tests/conformance_loader.rs b/tests/conformance_loader.rs new file mode 100644 index 0000000..fa78ac5 --- /dev/null +++ b/tests/conformance_loader.rs @@ -0,0 +1,294 @@ +use chrono::Utc; +use macp_runtime::log_store::LogStore; +use macp_runtime::pb::{CommitmentPayload, Envelope, SessionStartPayload}; +use macp_runtime::registry::SessionRegistry; +use macp_runtime::runtime::Runtime; +use macp_runtime::session::SessionState; +use macp_runtime::storage::MemoryBackend; +use prost::Message; +use serde::Deserialize; +use std::path::Path; +use std::sync::Arc; + +#[derive(Deserialize)] +struct ConformanceFixture { + mode: String, + initiator: String, + participants: Vec, + mode_version: String, + configuration_version: String, + policy_version: String, + ttl_ms: i64, + messages: Vec, + expected_final_state: String, +} + +#[derive(Deserialize)] +struct ConformanceMessage { + sender: String, + message_type: String, + payload_type: String, + payload: serde_json::Value, + expect: String, +} + +fn new_sid() -> String { + uuid::Uuid::new_v4().as_hyphenated().to_string() +} + +fn make_runtime() -> Runtime { + let storage: Arc = Arc::new(MemoryBackend); + let registry = Arc::new(SessionRegistry::new()); + let log_store = Arc::new(LogStore::new()); + Runtime::new(storage, registry, log_store) +} + +fn encode_payload(fixture: &ConformanceFixture, msg: &ConformanceMessage) -> Vec { + match msg.payload_type.as_str() { + "Commitment" => { + let p = &msg.payload; + CommitmentPayload { + commitment_id: p["commitment_id"].as_str().unwrap_or_default().into(), + action: p["action"].as_str().unwrap_or_default().into(), + authority_scope: p["authority_scope"].as_str().unwrap_or_default().into(), + reason: p["reason"].as_str().unwrap_or_default().into(), + mode_version: p["mode_version"].as_str().unwrap_or_default().into(), + policy_version: p["policy_version"].as_str().unwrap_or_default().into(), + configuration_version: p["configuration_version"] + .as_str() + .unwrap_or_default() + .into(), + } + .encode_to_vec() + } + t if t.starts_with("decision.") => encode_decision_payload(msg), + t if t.starts_with("proposal.") => encode_proposal_payload(msg), + t if t.starts_with("task.") => encode_task_payload(msg), + t if t.starts_with("handoff.") => encode_handoff_payload(msg), + t if t.starts_with("quorum.") => encode_quorum_payload(msg), + _ => panic!( + "Unknown payload_type: {} in fixture for mode {}", + msg.payload_type, fixture.mode + ), + } +} + +fn encode_decision_payload(msg: &ConformanceMessage) -> Vec { + let p = &msg.payload; + match msg.message_type.as_str() { + "Proposal" => macp_runtime::decision_pb::ProposalPayload { + proposal_id: p["proposal_id"].as_str().unwrap_or_default().into(), + option: p["option"].as_str().unwrap_or_default().into(), + rationale: p["rationale"].as_str().unwrap_or_default().into(), + supporting_data: vec![], + } + .encode_to_vec(), + "Vote" => macp_runtime::decision_pb::VotePayload { + proposal_id: p["proposal_id"].as_str().unwrap_or_default().into(), + vote: p["vote"].as_str().unwrap_or_default().into(), + reason: p["reason"].as_str().unwrap_or_default().into(), + } + .encode_to_vec(), + _ => panic!("Unhandled decision message: {}", msg.message_type), + } +} + +fn encode_proposal_payload(msg: &ConformanceMessage) -> Vec { + let p = &msg.payload; + match msg.message_type.as_str() { + "Proposal" => macp_runtime::proposal_pb::ProposalPayload { + proposal_id: p["proposal_id"].as_str().unwrap_or_default().into(), + title: p["title"].as_str().unwrap_or_default().into(), + summary: p["summary"].as_str().unwrap_or_default().into(), + details: vec![], + tags: vec![], + } + .encode_to_vec(), + "Accept" => macp_runtime::proposal_pb::AcceptPayload { + proposal_id: p["proposal_id"].as_str().unwrap_or_default().into(), + reason: p["reason"].as_str().unwrap_or_default().into(), + } + .encode_to_vec(), + _ => panic!("Unhandled proposal message: {}", msg.message_type), + } +} + +fn encode_task_payload(msg: &ConformanceMessage) -> Vec { + let p = &msg.payload; + match msg.message_type.as_str() { + "TaskRequest" => macp_runtime::task_pb::TaskRequestPayload { + task_id: p["task_id"].as_str().unwrap_or_default().into(), + title: p["title"].as_str().unwrap_or_default().into(), + instructions: p["instructions"].as_str().unwrap_or_default().into(), + requested_assignee: p["requested_assignee"].as_str().unwrap_or_default().into(), + input: vec![], + deadline_unix_ms: p["deadline_unix_ms"].as_i64().unwrap_or(0), + } + .encode_to_vec(), + "TaskAccept" => macp_runtime::task_pb::TaskAcceptPayload { + task_id: p["task_id"].as_str().unwrap_or_default().into(), + assignee: p["assignee"].as_str().unwrap_or_default().into(), + reason: p["reason"].as_str().unwrap_or_default().into(), + } + .encode_to_vec(), + "TaskComplete" => macp_runtime::task_pb::TaskCompletePayload { + task_id: p["task_id"].as_str().unwrap_or_default().into(), + assignee: p["assignee"].as_str().unwrap_or_default().into(), + output: vec![], + summary: p["summary"].as_str().unwrap_or_default().into(), + } + .encode_to_vec(), + _ => panic!("Unhandled task message: {}", msg.message_type), + } +} + +fn encode_handoff_payload(msg: &ConformanceMessage) -> Vec { + let p = &msg.payload; + match msg.message_type.as_str() { + "HandoffOffer" => macp_runtime::handoff_pb::HandoffOfferPayload { + handoff_id: p["handoff_id"].as_str().unwrap_or_default().into(), + target_participant: p["target_participant"].as_str().unwrap_or_default().into(), + scope: p["scope"].as_str().unwrap_or_default().into(), + reason: p["reason"].as_str().unwrap_or_default().into(), + } + .encode_to_vec(), + "HandoffAccept" => macp_runtime::handoff_pb::HandoffAcceptPayload { + handoff_id: p["handoff_id"].as_str().unwrap_or_default().into(), + accepted_by: p["accepted_by"].as_str().unwrap_or_default().into(), + reason: p["reason"].as_str().unwrap_or_default().into(), + } + .encode_to_vec(), + _ => panic!("Unhandled handoff message: {}", msg.message_type), + } +} + +fn encode_quorum_payload(msg: &ConformanceMessage) -> Vec { + let p = &msg.payload; + match msg.message_type.as_str() { + "ApprovalRequest" => macp_runtime::quorum_pb::ApprovalRequestPayload { + request_id: p["request_id"].as_str().unwrap_or_default().into(), + action: p["action"].as_str().unwrap_or_default().into(), + summary: p["summary"].as_str().unwrap_or_default().into(), + details: vec![], + required_approvals: p["required_approvals"].as_u64().unwrap_or(0) as u32, + } + .encode_to_vec(), + "Approve" => macp_runtime::quorum_pb::ApprovePayload { + request_id: p["request_id"].as_str().unwrap_or_default().into(), + reason: p["reason"].as_str().unwrap_or_default().into(), + } + .encode_to_vec(), + _ => panic!("Unhandled quorum message: {}", msg.message_type), + } +} + +async fn run_conformance_fixture(path: &Path) { + let content = std::fs::read_to_string(path) + .unwrap_or_else(|e| panic!("Failed to read fixture {}: {e}", path.display())); + let fixture: ConformanceFixture = serde_json::from_str(&content) + .unwrap_or_else(|e| panic!("Failed to parse fixture {}: {e}", path.display())); + + let rt = make_runtime(); + let sid = new_sid(); + + // SessionStart + let start_payload = SessionStartPayload { + intent: "conformance".into(), + participants: fixture.participants.clone(), + mode_version: fixture.mode_version.clone(), + configuration_version: fixture.configuration_version.clone(), + policy_version: fixture.policy_version.clone(), + ttl_ms: fixture.ttl_ms, + context: vec![], + roots: vec![], + } + .encode_to_vec(); + + rt.process( + &Envelope { + macp_version: "1.0".into(), + mode: fixture.mode.clone(), + message_type: "SessionStart".into(), + message_id: "m0".into(), + session_id: sid.clone(), + sender: fixture.initiator.clone(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: start_payload, + }, + None, + ) + .await + .unwrap_or_else(|e| panic!("SessionStart failed for {}: {e}", path.display())); + + // Process messages + for (i, msg) in fixture.messages.iter().enumerate() { + let payload = encode_payload(&fixture, msg); + let env = Envelope { + macp_version: "1.0".into(), + mode: fixture.mode.clone(), + message_type: msg.message_type.clone(), + message_id: format!("m{}", i + 1), + session_id: sid.clone(), + sender: msg.sender.clone(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload, + }; + + let result = rt.process(&env, None).await; + match msg.expect.as_str() { + "accept" => { + result.unwrap_or_else(|e| { + panic!( + "Message {} ({}) expected accept but got error: {e} in {}", + i + 1, + msg.message_type, + path.display() + ) + }); + } + "reject" => { + assert!( + result.is_err(), + "Message {} ({}) expected reject but succeeded in {}", + i + 1, + msg.message_type, + path.display() + ); + } + other => panic!("Unknown expect value: {other}"), + } + } + + // Verify final state + let session = rt.get_session_checked(&sid).await.unwrap(); + let expected_state = match fixture.expected_final_state.as_str() { + "Open" => SessionState::Open, + "Resolved" => SessionState::Resolved, + "Expired" => SessionState::Expired, + other => panic!("Unknown expected_final_state: {other}"), + }; + assert_eq!( + session.state, + expected_state, + "Final state mismatch for {}", + path.display() + ); +} + +macro_rules! conformance_test { + ($name:ident, $file:expr) => { + #[tokio::test] + async fn $name() { + let path = Path::new(env!("CARGO_MANIFEST_DIR")) + .join("tests/conformance") + .join($file); + run_conformance_fixture(&path).await; + } + }; +} + +conformance_test!(conformance_decision_happy_path, "decision_happy_path.json"); +conformance_test!(conformance_proposal_happy_path, "proposal_happy_path.json"); +conformance_test!(conformance_task_happy_path, "task_happy_path.json"); +conformance_test!(conformance_handoff_happy_path, "handoff_happy_path.json"); +conformance_test!(conformance_quorum_happy_path, "quorum_happy_path.json"); diff --git a/tests/integration_mode_lifecycle.rs b/tests/integration_mode_lifecycle.rs new file mode 100644 index 0000000..8219484 --- /dev/null +++ b/tests/integration_mode_lifecycle.rs @@ -0,0 +1,446 @@ +use chrono::Utc; +use macp_runtime::log_store::LogStore; +use macp_runtime::pb::{CommitmentPayload, Envelope, SessionStartPayload}; +use macp_runtime::registry::SessionRegistry; +use macp_runtime::runtime::Runtime; +use macp_runtime::session::SessionState; +use macp_runtime::storage::MemoryBackend; +use prost::Message; +use std::sync::Arc; + +fn new_sid() -> String { + uuid::Uuid::new_v4().as_hyphenated().to_string() +} + +fn make_runtime() -> Runtime { + let storage: Arc = Arc::new(MemoryBackend); + let registry = Arc::new(SessionRegistry::new()); + let log_store = Arc::new(LogStore::new()); + Runtime::new(storage, registry, log_store) +} + +fn session_start(participants: Vec) -> Vec { + SessionStartPayload { + intent: "integration-test".into(), + participants, + mode_version: "1.0.0".into(), + configuration_version: "cfg-1".into(), + policy_version: "policy-1".into(), + ttl_ms: 60_000, + context: vec![], + roots: vec![], + } + .encode_to_vec() +} + +fn envelope( + mode: &str, + message_type: &str, + message_id: &str, + session_id: &str, + sender: &str, + payload: Vec, +) -> Envelope { + Envelope { + macp_version: "1.0".into(), + mode: mode.into(), + message_type: message_type.into(), + message_id: message_id.into(), + session_id: session_id.into(), + sender: sender.into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload, + } +} + +fn commitment(action: &str) -> Vec { + CommitmentPayload { + commitment_id: "c1".into(), + action: action.into(), + authority_scope: "test".into(), + reason: "done".into(), + mode_version: "1.0.0".into(), + policy_version: "policy-1".into(), + configuration_version: "cfg-1".into(), + } + .encode_to_vec() +} + +#[tokio::test] +async fn decision_full_lifecycle_through_runtime() { + use macp_runtime::decision_pb::{ProposalPayload, VotePayload}; + + let rt = make_runtime(); + let sid = new_sid(); + let mode = "macp.mode.decision.v1"; + + rt.process( + &envelope( + mode, + "SessionStart", + "m1", + &sid, + "agent://orchestrator", + session_start(vec!["agent://a".into(), "agent://b".into()]), + ), + None, + ) + .await + .unwrap(); + + let proposal = ProposalPayload { + proposal_id: "p1".into(), + option: "deploy".into(), + rationale: "ready".into(), + supporting_data: vec![], + } + .encode_to_vec(); + rt.process( + &envelope( + mode, + "Proposal", + "m2", + &sid, + "agent://orchestrator", + proposal, + ), + None, + ) + .await + .unwrap(); + + let vote = VotePayload { + proposal_id: "p1".into(), + vote: "approve".into(), + reason: "good".into(), + } + .encode_to_vec(); + rt.process(&envelope(mode, "Vote", "m3", &sid, "agent://a", vote), None) + .await + .unwrap(); + + let result = rt + .process( + &envelope( + mode, + "Commitment", + "m4", + &sid, + "agent://orchestrator", + commitment("decision.selected"), + ), + None, + ) + .await + .unwrap(); + assert_eq!(result.session_state, SessionState::Resolved); +} + +#[tokio::test] +async fn proposal_full_lifecycle_through_runtime() { + use macp_runtime::proposal_pb::{AcceptPayload, ProposalPayload}; + + let rt = make_runtime(); + let sid = new_sid(); + let mode = "macp.mode.proposal.v1"; + + rt.process( + &envelope( + mode, + "SessionStart", + "m1", + &sid, + "agent://buyer", + session_start(vec!["agent://buyer".into(), "agent://seller".into()]), + ), + None, + ) + .await + .unwrap(); + + let proposal = ProposalPayload { + proposal_id: "p1".into(), + title: "offer".into(), + summary: "terms".into(), + details: vec![], + tags: vec![], + } + .encode_to_vec(); + rt.process( + &envelope(mode, "Proposal", "m2", &sid, "agent://seller", proposal), + None, + ) + .await + .unwrap(); + + let accept = AcceptPayload { + proposal_id: "p1".into(), + reason: String::new(), + } + .encode_to_vec(); + rt.process( + &envelope(mode, "Accept", "m3", &sid, "agent://buyer", accept.clone()), + None, + ) + .await + .unwrap(); + rt.process( + &envelope(mode, "Accept", "m4", &sid, "agent://seller", accept), + None, + ) + .await + .unwrap(); + + let result = rt + .process( + &envelope( + mode, + "Commitment", + "m5", + &sid, + "agent://buyer", + commitment("proposal.accepted"), + ), + None, + ) + .await + .unwrap(); + assert_eq!(result.session_state, SessionState::Resolved); +} + +#[tokio::test] +async fn task_full_lifecycle_through_runtime() { + use macp_runtime::task_pb::{TaskAcceptPayload, TaskCompletePayload, TaskRequestPayload}; + + let rt = make_runtime(); + let sid = new_sid(); + let mode = "macp.mode.task.v1"; + + rt.process( + &envelope( + mode, + "SessionStart", + "m1", + &sid, + "agent://planner", + session_start(vec!["agent://planner".into(), "agent://worker".into()]), + ), + None, + ) + .await + .unwrap(); + + let request = TaskRequestPayload { + task_id: "t1".into(), + title: "Build widget".into(), + instructions: "Do it".into(), + requested_assignee: "agent://worker".into(), + input: vec![], + deadline_unix_ms: 0, + } + .encode_to_vec(); + rt.process( + &envelope(mode, "TaskRequest", "m2", &sid, "agent://planner", request), + None, + ) + .await + .unwrap(); + + let accept = TaskAcceptPayload { + task_id: "t1".into(), + assignee: "agent://worker".into(), + reason: "ready".into(), + } + .encode_to_vec(); + rt.process( + &envelope(mode, "TaskAccept", "m3", &sid, "agent://worker", accept), + None, + ) + .await + .unwrap(); + + let complete = TaskCompletePayload { + task_id: "t1".into(), + assignee: "agent://worker".into(), + output: b"result".to_vec(), + summary: "done".into(), + } + .encode_to_vec(); + rt.process( + &envelope(mode, "TaskComplete", "m4", &sid, "agent://worker", complete), + None, + ) + .await + .unwrap(); + + let result = rt + .process( + &envelope( + mode, + "Commitment", + "m5", + &sid, + "agent://planner", + commitment("task.completed"), + ), + None, + ) + .await + .unwrap(); + assert_eq!(result.session_state, SessionState::Resolved); +} + +#[tokio::test] +async fn handoff_full_lifecycle_through_runtime() { + use macp_runtime::handoff_pb::{HandoffAcceptPayload, HandoffOfferPayload}; + + let rt = make_runtime(); + let sid = new_sid(); + let mode = "macp.mode.handoff.v1"; + + rt.process( + &envelope( + mode, + "SessionStart", + "m1", + &sid, + "agent://owner", + session_start(vec!["agent://owner".into(), "agent://target".into()]), + ), + None, + ) + .await + .unwrap(); + + let offer = HandoffOfferPayload { + handoff_id: "h1".into(), + target_participant: "agent://target".into(), + scope: "support".into(), + reason: "escalate".into(), + } + .encode_to_vec(); + rt.process( + &envelope(mode, "HandoffOffer", "m2", &sid, "agent://owner", offer), + None, + ) + .await + .unwrap(); + + let accept = HandoffAcceptPayload { + handoff_id: "h1".into(), + accepted_by: "agent://target".into(), + reason: "ready".into(), + } + .encode_to_vec(); + rt.process( + &envelope(mode, "HandoffAccept", "m3", &sid, "agent://target", accept), + None, + ) + .await + .unwrap(); + + let result = rt + .process( + &envelope( + mode, + "Commitment", + "m4", + &sid, + "agent://owner", + commitment("handoff.accepted"), + ), + None, + ) + .await + .unwrap(); + assert_eq!(result.session_state, SessionState::Resolved); +} + +#[tokio::test] +async fn quorum_full_lifecycle_through_runtime() { + use macp_runtime::quorum_pb::{ApprovalRequestPayload, ApprovePayload}; + + let rt = make_runtime(); + let sid = new_sid(); + let mode = "macp.mode.quorum.v1"; + + rt.process( + &envelope( + mode, + "SessionStart", + "m1", + &sid, + "agent://coordinator", + session_start(vec![ + "agent://alice".into(), + "agent://bob".into(), + "agent://carol".into(), + ]), + ), + None, + ) + .await + .unwrap(); + + let request = ApprovalRequestPayload { + request_id: "r1".into(), + action: "deploy.production".into(), + summary: "Deploy v2".into(), + details: vec![], + required_approvals: 2, + } + .encode_to_vec(); + rt.process( + &envelope( + mode, + "ApprovalRequest", + "m2", + &sid, + "agent://coordinator", + request, + ), + None, + ) + .await + .unwrap(); + + let approve1 = ApprovePayload { + request_id: "r1".into(), + reason: "lgtm".into(), + } + .encode_to_vec(); + rt.process( + &envelope(mode, "Approve", "m3", &sid, "agent://alice", approve1), + None, + ) + .await + .unwrap(); + + let approve2 = ApprovePayload { + request_id: "r1".into(), + reason: "ship it".into(), + } + .encode_to_vec(); + rt.process( + &envelope(mode, "Approve", "m4", &sid, "agent://bob", approve2), + None, + ) + .await + .unwrap(); + + let result = rt + .process( + &envelope( + mode, + "Commitment", + "m5", + &sid, + "agent://coordinator", + commitment("quorum.approved"), + ), + None, + ) + .await + .unwrap(); + assert_eq!(result.session_state, SessionState::Resolved); +} diff --git a/tests/replay_round_trip.rs b/tests/replay_round_trip.rs new file mode 100644 index 0000000..7b6423e --- /dev/null +++ b/tests/replay_round_trip.rs @@ -0,0 +1,332 @@ +use macp_runtime::log_store::{EntryKind, LogEntry}; +use macp_runtime::mode_registry::ModeRegistry; +use macp_runtime::pb::{CommitmentPayload, SessionStartPayload}; +use macp_runtime::replay::replay_session; +use macp_runtime::session::SessionState; +use prost::Message; + +fn make_registry() -> ModeRegistry { + ModeRegistry::build_default() +} + +fn start_payload(participants: Vec<&str>) -> Vec { + SessionStartPayload { + intent: "replay-test".into(), + participants: participants.into_iter().map(String::from).collect(), + mode_version: "1.0.0".into(), + configuration_version: "cfg-1".into(), + policy_version: "policy-1".into(), + ttl_ms: 60_000, + context: vec![], + roots: vec![], + } + .encode_to_vec() +} + +fn incoming( + id: &str, + msg_type: &str, + sender: &str, + payload: Vec, + mode: &str, + ts: i64, +) -> LogEntry { + LogEntry { + message_id: id.into(), + received_at_ms: ts, + sender: sender.into(), + message_type: msg_type.into(), + raw_payload: payload, + entry_kind: EntryKind::Incoming, + session_id: "s1".into(), + mode: mode.into(), + macp_version: "1.0".into(), + } +} + +fn commitment(action: &str) -> Vec { + CommitmentPayload { + commitment_id: "c1".into(), + action: action.into(), + authority_scope: "test".into(), + reason: "done".into(), + mode_version: "1.0.0".into(), + policy_version: "policy-1".into(), + configuration_version: "cfg-1".into(), + } + .encode_to_vec() +} + +#[test] +fn replay_proposal_session() { + use macp_runtime::proposal_pb::{AcceptPayload, ProposalPayload}; + + let registry = make_registry(); + let mode = "macp.mode.proposal.v1"; + + let entries = vec![ + incoming( + "m1", + "SessionStart", + "agent://buyer", + start_payload(vec!["agent://buyer", "agent://seller"]), + mode, + 1000, + ), + incoming( + "m2", + "Proposal", + "agent://seller", + ProposalPayload { + proposal_id: "p1".into(), + title: "offer".into(), + summary: "terms".into(), + details: vec![], + tags: vec![], + } + .encode_to_vec(), + mode, + 2000, + ), + incoming( + "m3", + "Accept", + "agent://buyer", + AcceptPayload { + proposal_id: "p1".into(), + reason: String::new(), + } + .encode_to_vec(), + mode, + 3000, + ), + incoming( + "m4", + "Accept", + "agent://seller", + AcceptPayload { + proposal_id: "p1".into(), + reason: String::new(), + } + .encode_to_vec(), + mode, + 4000, + ), + incoming( + "m5", + "Commitment", + "agent://buyer", + commitment("proposal.accepted"), + mode, + 5000, + ), + ]; + + let session = replay_session("s1", &entries, ®istry).unwrap(); + assert_eq!(session.state, SessionState::Resolved); + assert!(session.resolution.is_some()); + assert_eq!(session.seen_message_ids.len(), 5); +} + +#[test] +fn replay_task_session() { + use macp_runtime::task_pb::{TaskAcceptPayload, TaskCompletePayload, TaskRequestPayload}; + + let registry = make_registry(); + let mode = "macp.mode.task.v1"; + + let entries = vec![ + incoming( + "m1", + "SessionStart", + "agent://planner", + start_payload(vec!["agent://planner", "agent://worker"]), + mode, + 1000, + ), + incoming( + "m2", + "TaskRequest", + "agent://planner", + TaskRequestPayload { + task_id: "t1".into(), + title: "Build".into(), + instructions: "Do it".into(), + requested_assignee: "agent://worker".into(), + input: vec![], + deadline_unix_ms: 0, + } + .encode_to_vec(), + mode, + 2000, + ), + incoming( + "m3", + "TaskAccept", + "agent://worker", + TaskAcceptPayload { + task_id: "t1".into(), + assignee: "agent://worker".into(), + reason: "ready".into(), + } + .encode_to_vec(), + mode, + 3000, + ), + incoming( + "m4", + "TaskComplete", + "agent://worker", + TaskCompletePayload { + task_id: "t1".into(), + assignee: "agent://worker".into(), + output: b"result".to_vec(), + summary: "done".into(), + } + .encode_to_vec(), + mode, + 4000, + ), + incoming( + "m5", + "Commitment", + "agent://planner", + commitment("task.completed"), + mode, + 5000, + ), + ]; + + let session = replay_session("s1", &entries, ®istry).unwrap(); + assert_eq!(session.state, SessionState::Resolved); + assert!(session.resolution.is_some()); +} + +#[test] +fn replay_handoff_session() { + use macp_runtime::handoff_pb::{HandoffAcceptPayload, HandoffOfferPayload}; + + let registry = make_registry(); + let mode = "macp.mode.handoff.v1"; + + let entries = vec![ + incoming( + "m1", + "SessionStart", + "agent://owner", + start_payload(vec!["agent://owner", "agent://target"]), + mode, + 1000, + ), + incoming( + "m2", + "HandoffOffer", + "agent://owner", + HandoffOfferPayload { + handoff_id: "h1".into(), + target_participant: "agent://target".into(), + scope: "support".into(), + reason: "escalate".into(), + } + .encode_to_vec(), + mode, + 2000, + ), + incoming( + "m3", + "HandoffAccept", + "agent://target", + HandoffAcceptPayload { + handoff_id: "h1".into(), + accepted_by: "agent://target".into(), + reason: "ready".into(), + } + .encode_to_vec(), + mode, + 3000, + ), + incoming( + "m4", + "Commitment", + "agent://owner", + commitment("handoff.accepted"), + mode, + 4000, + ), + ]; + + let session = replay_session("s1", &entries, ®istry).unwrap(); + assert_eq!(session.state, SessionState::Resolved); + assert!(session.resolution.is_some()); +} + +#[test] +fn replay_quorum_session() { + use macp_runtime::quorum_pb::{ApprovalRequestPayload, ApprovePayload}; + + let registry = make_registry(); + let mode = "macp.mode.quorum.v1"; + + let entries = vec![ + incoming( + "m1", + "SessionStart", + "agent://coordinator", + start_payload(vec!["agent://alice", "agent://bob", "agent://carol"]), + mode, + 1000, + ), + incoming( + "m2", + "ApprovalRequest", + "agent://coordinator", + ApprovalRequestPayload { + request_id: "r1".into(), + action: "deploy".into(), + summary: "Deploy v2".into(), + details: vec![], + required_approvals: 2, + } + .encode_to_vec(), + mode, + 2000, + ), + incoming( + "m3", + "Approve", + "agent://alice", + ApprovePayload { + request_id: "r1".into(), + reason: "lgtm".into(), + } + .encode_to_vec(), + mode, + 3000, + ), + incoming( + "m4", + "Approve", + "agent://bob", + ApprovePayload { + request_id: "r1".into(), + reason: "ship it".into(), + } + .encode_to_vec(), + mode, + 4000, + ), + incoming( + "m5", + "Commitment", + "agent://coordinator", + commitment("quorum.approved"), + mode, + 5000, + ), + ]; + + let session = replay_session("s1", &entries, ®istry).unwrap(); + assert_eq!(session.state, SessionState::Resolved); + assert!(session.resolution.is_some()); + assert_eq!(session.seen_message_ids.len(), 5); +}