From 0df5446f47063242240097e03a171a80fd7906d9 Mon Sep 17 00:00:00 2001 From: Ajit Koti Date: Fri, 6 Mar 2026 10:52:42 -0800 Subject: [PATCH] Format the code --- Cargo.toml | 3 + build.rs | 11 +- proto/macp/modes/decision/v1/decision.proto | 31 + proto/macp/v1/core.proto | 241 ++++ proto/macp/v1/envelope.proto | 41 + proto/macp/v1/macp.proto | 42 - src/bin/client.rs | 105 +- src/bin/fuzz_client.rs | 339 +++++- src/bin/multi_round_client.rs | 64 +- src/error.rs | 86 ++ src/lib.rs | 4 + src/main.rs | 4 +- src/mode/decision.rs | 633 +++++++++- src/mode/multi_round.rs | 110 +- src/runtime.rs | 771 +++++++++++- src/server.rs | 1165 ++++++++++++++++++- src/session.rs | 185 ++- 17 files changed, 3433 insertions(+), 402 deletions(-) create mode 100644 proto/macp/modes/decision/v1/decision.proto create mode 100644 proto/macp/v1/core.proto create mode 100644 proto/macp/v1/envelope.proto delete mode 100644 proto/macp/v1/macp.proto diff --git a/Cargo.toml b/Cargo.toml index bd7a008..2ad501e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,9 @@ thiserror = "1" chrono = "0.4" serde = { version = "1", features = ["derive"] } serde_json = "1" +tokio-stream = "0.1" +futures-core = "0.3" +async-stream = "0.3" [build-dependencies] tonic-build = "0.11" diff --git a/build.rs b/build.rs index 1404cba..4e65bd0 100644 --- a/build.rs +++ b/build.rs @@ -1,6 +1,11 @@ fn main() -> Result<(), Box> { - tonic_build::configure() - .build_server(true) - .compile(&["macp/v1/macp.proto"], &["proto"])?; + tonic_build::configure().build_server(true).compile( + &[ + "macp/v1/envelope.proto", + "macp/v1/core.proto", + "macp/modes/decision/v1/decision.proto", + ], + &["proto"], + )?; Ok(()) } diff --git a/proto/macp/modes/decision/v1/decision.proto b/proto/macp/modes/decision/v1/decision.proto new file mode 100644 index 0000000..82a28c6 --- /dev/null +++ b/proto/macp/modes/decision/v1/decision.proto @@ -0,0 +1,31 @@ +syntax = "proto3"; + +package macp.modes.decision.v1; + +message ProposalPayload { + string proposal_id = 1; + string option = 2; + string rationale = 3; + bytes supporting_data = 4; +} + +message EvaluationPayload { + string proposal_id = 1; + string recommendation = 2; // APPROVE | REVIEW | BLOCK | REJECT + double confidence = 3; + string reason = 4; +} + +message ObjectionPayload { + string proposal_id = 1; + string reason = 2; + string severity = 3; // low | medium | high | critical +} + +message VotePayload { + string proposal_id = 1; + string vote = 2; // approve | reject | abstain + string reason = 3; +} + +// Decision Mode typically reuses macp.v1.CommitmentPayload for terminal results. diff --git a/proto/macp/v1/core.proto b/proto/macp/v1/core.proto new file mode 100644 index 0000000..b1eff25 --- /dev/null +++ b/proto/macp/v1/core.proto @@ -0,0 +1,241 @@ +syntax = "proto3"; + +package macp.v1; + +import "macp/v1/envelope.proto"; + +message Root { + string uri = 1; + string name = 2; +} + +message ClientInfo { + string name = 1; + string title = 2; + string version = 3; + string description = 4; + string website_url = 5; +} + +message RuntimeInfo { + string name = 1; + string title = 2; + string version = 3; + string description = 4; + string website_url = 5; +} + +message SessionsCapability { + bool stream = 1; +} + +message CancellationCapability { + bool cancel_session = 1; +} + +message ProgressCapability { + bool progress = 1; +} + +message ManifestCapability { + bool get_manifest = 1; +} + +message ModeRegistryCapability { + bool list_modes = 1; + bool list_changed = 2; +} + +message RootsCapability { + bool list_roots = 1; + bool list_changed = 2; +} + +message ExperimentalCapabilities { + map features = 1; +} + +message Capabilities { + SessionsCapability sessions = 1; + CancellationCapability cancellation = 2; + ProgressCapability progress = 3; + ManifestCapability manifest = 4; + ModeRegistryCapability mode_registry = 5; + RootsCapability roots = 6; + ExperimentalCapabilities experimental = 100; +} + +message InitializeRequest { + repeated string supported_protocol_versions = 1; + ClientInfo client_info = 2; + Capabilities capabilities = 3; +} + +message InitializeResponse { + string selected_protocol_version = 1; + RuntimeInfo runtime_info = 2; + Capabilities capabilities = 3; + repeated string supported_modes = 4; + string instructions = 5; +} + +message SignalPayload { + string signal_type = 1; + bytes data = 2; + double confidence = 3; + string correlation_session_id = 4; +} + +message ProgressPayload { + string progress_token = 1; + double progress = 2; + double total = 3; + string message = 4; + string target_message_id = 5; +} + +message SessionStartPayload { + string intent = 1; + repeated string participants = 2; + string mode_version = 3; + string configuration_version = 4; + string policy_version = 5; + int64 ttl_ms = 6; + bytes context = 7; + repeated Root roots = 8; +} + +message SessionCancelPayload { + string reason = 1; + string cancelled_by = 2; +} + +message CommitmentPayload { + string commitment_id = 1; + string action = 2; + string authority_scope = 3; + string reason = 4; + string mode_version = 5; + string policy_version = 6; + string configuration_version = 7; +} + +message SessionMetadata { + string session_id = 1; + string mode = 2; + SessionState state = 3; + int64 started_at_unix_ms = 4; + int64 expires_at_unix_ms = 5; + string mode_version = 6; + string configuration_version = 7; + string policy_version = 8; +} + +message GetSessionRequest { + string session_id = 1; +} + +message CancelSessionRequest { + string session_id = 1; + string reason = 2; +} + +message GetManifestRequest { + string agent_id = 1; +} + +message AgentManifest { + string agent_id = 1; + string title = 2; + string description = 3; + repeated string supported_modes = 4; + repeated string input_content_types = 5; + repeated string output_content_types = 6; + map metadata = 7; +} + +message ModeDescriptor { + string mode = 1; + string mode_version = 2; + string title = 3; + string description = 4; + string determinism_class = 5; + string participant_model = 6; + repeated string message_types = 7; + repeated string terminal_message_types = 8; + map schema_uris = 9; +} + +message ListModesRequest {} + +message ListModesResponse { + repeated ModeDescriptor modes = 1; +} + +message ListRootsRequest {} + +message ListRootsResponse { + repeated Root roots = 1; +} + +message WatchModeRegistryRequest {} + +message WatchRootsRequest {} + +message RegistryChanged { + string registry = 1; + int64 observed_at_unix_ms = 2; +} + +message RootsChanged { + int64 observed_at_unix_ms = 1; +} + +message SendRequest { + Envelope envelope = 1; +} + +message SendResponse { + Ack ack = 1; +} + +message StreamSessionRequest { + Envelope envelope = 1; +} + +message StreamSessionResponse { + Envelope envelope = 1; +} + +message GetSessionResponse { + SessionMetadata metadata = 1; +} + +message CancelSessionResponse { + Ack ack = 1; +} + +message GetManifestResponse { + AgentManifest manifest = 1; +} + +message WatchModeRegistryResponse { + RegistryChanged change = 1; +} + +message WatchRootsResponse { + RootsChanged change = 1; +} + +service MACPRuntimeService { + rpc Initialize(InitializeRequest) returns (InitializeResponse); + rpc Send(SendRequest) returns (SendResponse); + rpc StreamSession(stream StreamSessionRequest) returns (stream StreamSessionResponse); + rpc GetSession(GetSessionRequest) returns (GetSessionResponse); + rpc CancelSession(CancelSessionRequest) returns (CancelSessionResponse); + rpc GetManifest(GetManifestRequest) returns (GetManifestResponse); + rpc ListModes(ListModesRequest) returns (ListModesResponse); + rpc WatchModeRegistry(WatchModeRegistryRequest) returns (stream WatchModeRegistryResponse); + rpc ListRoots(ListRootsRequest) returns (ListRootsResponse); + rpc WatchRoots(WatchRootsRequest) returns (stream WatchRootsResponse); +} diff --git a/proto/macp/v1/envelope.proto b/proto/macp/v1/envelope.proto new file mode 100644 index 0000000..df9b1dd --- /dev/null +++ b/proto/macp/v1/envelope.proto @@ -0,0 +1,41 @@ +syntax = "proto3"; + +package macp.v1; + +// Canonical MACP envelope and generic acknowledgement/error shapes. + +message Envelope { + string macp_version = 1; + string mode = 2; + string message_type = 3; + string message_id = 4; + string session_id = 5; // empty for ambient messages + string sender = 6; + int64 timestamp_unix_ms = 7; // informational only + bytes payload = 8; +} + +message MACPError { + string code = 1; + string message = 2; + string session_id = 3; + string message_id = 4; + bytes details = 5; +} + +enum SessionState { + SESSION_STATE_UNSPECIFIED = 0; + SESSION_STATE_OPEN = 1; + SESSION_STATE_RESOLVED = 2; + SESSION_STATE_EXPIRED = 3; +} + +message Ack { + bool ok = 1; + bool duplicate = 2; + string message_id = 3; + string session_id = 4; + int64 accepted_at_unix_ms = 5; + SessionState session_state = 6; + MACPError error = 7; +} diff --git a/proto/macp/v1/macp.proto b/proto/macp/v1/macp.proto deleted file mode 100644 index 794156c..0000000 --- a/proto/macp/v1/macp.proto +++ /dev/null @@ -1,42 +0,0 @@ -syntax = "proto3"; - -package macp.v1; - -message Envelope { - string macp_version = 1; - string mode = 2; - string message_type = 3; - string message_id = 4; - string session_id = 5; - string sender = 6; - int64 timestamp_unix_ms = 7; - bytes payload = 8; -} - -message SendMessageRequest { - Envelope envelope = 1; -} - -message SendMessageResponse { - bool accepted = 1; - string error = 2; -} - -message GetSessionRequest { - string session_id = 1; -} - -message GetSessionResponse { - string session_id = 1; - string mode = 2; - string state = 3; - int64 ttl_expiry = 4; - bytes resolution = 5; - bytes mode_state = 6; - repeated string participants = 7; -} - -service MACPService { - rpc SendMessage(SendMessageRequest) returns (SendMessageResponse); - rpc GetSession(GetSessionRequest) returns (GetSessionResponse); -} diff --git a/src/bin/client.rs b/src/bin/client.rs index a1d1621..4024cd6 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -1,14 +1,41 @@ -use macp_runtime::pb::macp_service_client::MacpServiceClient; -use macp_runtime::pb::{Envelope, SendMessageRequest}; +use macp_runtime::pb::macp_runtime_service_client::MacpRuntimeServiceClient; +use macp_runtime::pb::{ + Envelope, GetSessionRequest, InitializeRequest, ListModesRequest, SendRequest, +}; #[tokio::main] async fn main() -> Result<(), Box> { - // Server address from main.rs - let mut client = MacpServiceClient::connect("http://127.0.0.1:50051").await?; + let mut client = MacpRuntimeServiceClient::connect("http://127.0.0.1:50051").await?; - // 1) SessionStart + // 1) Initialize — negotiate protocol version + let init_resp = client + .initialize(InitializeRequest { + supported_protocol_versions: vec!["1.0".into()], + client_info: None, + capabilities: None, + }) + .await? + .into_inner(); + println!( + "Initialize: version={} runtime={}", + init_resp.selected_protocol_version, + init_resp + .runtime_info + .as_ref() + .map(|r| r.name.as_str()) + .unwrap_or("?") + ); + + // 2) ListModes — discover available modes + let modes_resp = client.list_modes(ListModesRequest {}).await?.into_inner(); + println!( + "ListModes: {:?}", + modes_resp.modes.iter().map(|m| &m.mode).collect::>() + ); + + // 3) SessionStart let start = Envelope { - macp_version: "v1".into(), + macp_version: "1.0".into(), mode: "decision".into(), message_type: "SessionStart".into(), message_id: "m1".into(), @@ -19,19 +46,22 @@ async fn main() -> Result<(), Box> { }; let ack = client - .send_message(SendMessageRequest { + .send(SendRequest { envelope: Some(start), }) .await? - .into_inner(); + .into_inner() + .ack + .unwrap(); println!( - "SessionStart ack: accepted={} error='{}'", - ack.accepted, ack.error + "SessionStart ack: ok={} error={:?}", + ack.ok, + ack.error.as_ref().map(|e| &e.code) ); - // 2) Normal message + // 4) Normal message let msg = Envelope { - macp_version: "v1".into(), + macp_version: "1.0".into(), mode: "decision".into(), message_type: "Message".into(), message_id: "m2".into(), @@ -42,19 +72,22 @@ async fn main() -> Result<(), Box> { }; let ack = client - .send_message(SendMessageRequest { + .send(SendRequest { envelope: Some(msg), }) .await? - .into_inner(); + .into_inner() + .ack + .unwrap(); println!( - "Message ack: accepted={} error='{}'", - ack.accepted, ack.error + "Message ack: ok={} error={:?}", + ack.ok, + ack.error.as_ref().map(|e| &e.code) ); - // 3) Resolve message (DecisionMode resolves when payload == "resolve") + // 5) Resolve message (DecisionMode resolves when payload == "resolve") let resolve = Envelope { - macp_version: "v1".into(), + macp_version: "1.0".into(), mode: "decision".into(), message_type: "Message".into(), message_id: "m3".into(), @@ -65,19 +98,22 @@ async fn main() -> Result<(), Box> { }; let ack = client - .send_message(SendMessageRequest { + .send(SendRequest { envelope: Some(resolve), }) .await? - .into_inner(); + .into_inner() + .ack + .unwrap(); println!( - "Resolve ack: accepted={} error='{}'", - ack.accepted, ack.error + "Resolve ack: ok={} error={:?}", + ack.ok, + ack.error.as_ref().map(|e| &e.code) ); - // 4) Message after resolve (should be rejected: SessionNotOpen) + // 6) Message after resolve (should be rejected: SessionNotOpen) let after = Envelope { - macp_version: "v1".into(), + macp_version: "1.0".into(), mode: "decision".into(), message_type: "Message".into(), message_id: "m4".into(), @@ -88,15 +124,28 @@ async fn main() -> Result<(), Box> { }; let ack = client - .send_message(SendMessageRequest { + .send(SendRequest { envelope: Some(after), }) .await? - .into_inner(); + .into_inner() + .ack + .unwrap(); println!( - "After-resolve ack: accepted={} error='{}'", - ack.accepted, ack.error + "After-resolve ack: ok={} error={:?}", + ack.ok, + ack.error.as_ref().map(|e| &e.code) ); + // 7) GetSession — verify resolved state + let resp = client + .get_session(GetSessionRequest { + session_id: "s1".into(), + }) + .await? + .into_inner(); + let meta = resp.metadata.unwrap(); + println!("GetSession: state={} mode={}", meta.state, meta.mode); + Ok(()) } diff --git a/src/bin/fuzz_client.rs b/src/bin/fuzz_client.rs index 508deef..cd517fd 100644 --- a/src/bin/fuzz_client.rs +++ b/src/bin/fuzz_client.rs @@ -1,5 +1,9 @@ -use macp_runtime::pb::macp_service_client::MacpServiceClient; -use macp_runtime::pb::{Envelope, SendMessageRequest}; +use macp_runtime::pb::macp_runtime_service_client::MacpRuntimeServiceClient; +use macp_runtime::pb::{ + CancelSessionRequest, Envelope, GetManifestRequest, GetSessionRequest, InitializeRequest, + ListModesRequest, ListRootsRequest, SendRequest, SessionStartPayload, +}; +use prost::Message; use tokio::time::{sleep, Duration}; #[allow(clippy::too_many_arguments)] @@ -25,12 +29,33 @@ fn env( } } -async fn send(client: &mut MacpServiceClient, label: &str, e: Envelope) { - let req = SendMessageRequest { envelope: Some(e) }; - match client.send_message(req).await { +fn encode_session_start(ttl_ms: i64, participants: Vec) -> Vec { + let payload = SessionStartPayload { + intent: String::new(), + participants, + mode_version: String::new(), + configuration_version: String::new(), + policy_version: String::new(), + ttl_ms, + context: vec![], + roots: vec![], + }; + payload.encode_to_vec() +} + +async fn send( + client: &mut MacpRuntimeServiceClient, + label: &str, + e: Envelope, +) { + match client.send(SendRequest { envelope: Some(e) }).await { Ok(resp) => { - let ack = resp.into_inner(); - println!("[{label}] accepted={} error='{}'", ack.accepted, ack.error); + let ack = resp.into_inner().ack.unwrap(); + let err_code = ack.error.as_ref().map(|e| e.code.as_str()).unwrap_or(""); + println!( + "[{label}] ok={} duplicate={} error='{}'", + ack.ok, ack.duplicate, err_code + ); } Err(status) => { println!("[{label}] grpc error: {status}"); @@ -40,9 +65,38 @@ async fn send(client: &mut MacpServiceClient, label: #[tokio::main] async fn main() -> Result<(), Box> { - let mut client = MacpServiceClient::connect("http://127.0.0.1:50051").await?; + let mut client = MacpRuntimeServiceClient::connect("http://127.0.0.1:50051").await?; - // --- 1) Wrong MACP version (should reject InvalidMacpVersion) + // --- 0) Initialize + match client + .initialize(InitializeRequest { + supported_protocol_versions: vec!["1.0".into()], + client_info: None, + capabilities: None, + }) + .await + { + Ok(resp) => { + let init = resp.into_inner(); + println!("[initialize] version={}", init.selected_protocol_version); + } + Err(status) => println!("[initialize] error: {status}"), + } + + // --- 0b) Initialize with bad version + match client + .initialize(InitializeRequest { + supported_protocol_versions: vec!["2.0".into()], + client_info: None, + capabilities: None, + }) + .await + { + Ok(_) => println!("[initialize_bad_version] unexpected success"), + Err(status) => println!("[initialize_bad_version] error: {status}"), + } + + // --- 1) Wrong MACP version (should reject) send( &mut client, "wrong_version", @@ -60,12 +114,11 @@ async fn main() -> Result<(), Box> { .await; // --- 2) Missing required fields (should reject InvalidEnvelope) - // message_id empty + sender empty + ts <= 0 send( &mut client, "missing_fields", env( - "v1", + "1.0", "decision", "SessionStart", "", @@ -82,7 +135,7 @@ async fn main() -> Result<(), Box> { &mut client, "unknown_session_message", env( - "v1", + "1.0", "decision", "Message", "m_unknown", @@ -99,7 +152,7 @@ async fn main() -> Result<(), Box> { &mut client, "session_start_ok", env( - "v1", + "1.0", "decision", "SessionStart", "m1", @@ -111,12 +164,12 @@ async fn main() -> Result<(), Box> { ) .await; - // --- 5) Duplicate SessionStart (should reject DuplicateSession) + // --- 5) Duplicate SessionStart (should reject → INVALID_ENVELOPE) send( &mut client, "session_start_duplicate", env( - "v1", + "1.0", "decision", "SessionStart", "m1_dup", @@ -128,12 +181,29 @@ async fn main() -> Result<(), Box> { ) .await; + // --- 5b) Duplicate SessionStart with SAME message_id (idempotent) + send( + &mut client, + "session_start_idempotent", + env( + "1.0", + "decision", + "SessionStart", + "m1", + "s1", + "ajit", + 1_700_000_000_104, + b"", + ), + ) + .await; + // --- 6) Valid Message (should accept) send( &mut client, "message_ok", env( - "v1", + "1.0", "decision", "Message", "m2", @@ -145,12 +215,29 @@ async fn main() -> Result<(), Box> { ) .await; + // --- 6b) Duplicate message (same message_id) + send( + &mut client, + "message_duplicate", + env( + "1.0", + "decision", + "Message", + "m2", + "s1", + "ajit", + 1_700_000_000_105, + b"hello", + ), + ) + .await; + // --- 7) Resolve (payload == "resolve" => session becomes RESOLVED) send( &mut client, "resolve", env( - "v1", + "1.0", "decision", "Message", "m3", @@ -162,12 +249,12 @@ async fn main() -> Result<(), Box> { ) .await; - // --- 8) Message after resolved (should reject SessionNotOpen) + // --- 8) Message after resolved (should reject SESSION_NOT_OPEN) send( &mut client, "after_resolve", env( - "v1", + "1.0", "decision", "Message", "m4", @@ -179,19 +266,20 @@ async fn main() -> Result<(), Box> { ) .await; - // --- 9) TTL Expiry: SessionStart with short TTL, wait, then send message + // --- 9) TTL Expiry + let ttl_payload = encode_session_start(1000, vec![]); send( &mut client, "ttl_session_start", env( - "v1", + "1.0", "decision", "SessionStart", "m_ttl1", "s_ttl", "ajit", 1_700_000_000_200, - br#"{"ttl_ms":1000}"#, + &ttl_payload, ), ) .await; @@ -202,7 +290,7 @@ async fn main() -> Result<(), Box> { &mut client, "ttl_expired_message", env( - "v1", + "1.0", "decision", "Message", "m_ttl2", @@ -215,68 +303,54 @@ async fn main() -> Result<(), Box> { .await; // --- 10) Invalid TTL values - send( - &mut client, - "invalid_ttl_zero", - env( - "v1", - "decision", - "SessionStart", - "m_bad_ttl1", - "s_bad_ttl1", - "ajit", - 1_700_000_000_300, - br#"{"ttl_ms":0}"#, - ), - ) - .await; - + let bad_ttl = encode_session_start(-5000, vec![]); send( &mut client, "invalid_ttl_negative", env( - "v1", + "1.0", "decision", "SessionStart", "m_bad_ttl2", "s_bad_ttl2", "ajit", 1_700_000_000_301, - br#"{"ttl_ms":-5000}"#, + &bad_ttl, ), ) .await; + let bad_ttl = encode_session_start(86_400_001, vec![]); send( &mut client, "invalid_ttl_exceeds_max", env( - "v1", + "1.0", "decision", "SessionStart", "m_bad_ttl3", "s_bad_ttl3", "ajit", 1_700_000_000_302, - br#"{"ttl_ms":86400001}"#, + &bad_ttl, ), ) .await; // --- 11) Multi-round convergence test - let mr_payload = r#"{"participants":["alice","bob"],"convergence":{"type":"all_equal"}}"#; + let mr_payload = encode_session_start(0, vec!["alice".into(), "bob".into()]); send( &mut client, "multi_round_start", env( - "v1", + "1.0", "multi_round", "SessionStart", "m_mr0", "s_mr", "creator", 1_700_000_000_400, - mr_payload.as_bytes(), + &mr_payload, ), ) .await; @@ -285,7 +359,7 @@ async fn main() -> Result<(), Box> { &mut client, "multi_round_alice", env( - "v1", + "1.0", "multi_round", "Contribute", "m_mr1", @@ -296,12 +370,11 @@ async fn main() -> Result<(), Box> { ), ) .await; - send( &mut client, "multi_round_bob_diff", env( - "v1", + "1.0", "multi_round", "Contribute", "m_mr2", @@ -312,12 +385,11 @@ async fn main() -> Result<(), Box> { ), ) .await; - send( &mut client, "multi_round_bob_converge", env( - "v1", + "1.0", "multi_round", "Contribute", "m_mr3", @@ -328,12 +400,11 @@ async fn main() -> Result<(), Box> { ), ) .await; - send( &mut client, "multi_round_after_resolve", env( - "v1", + "1.0", "multi_round", "Contribute", "m_mr4", @@ -345,5 +416,167 @@ async fn main() -> Result<(), Box> { ) .await; + // --- 12) CancelSession + send( + &mut client, + "cancel_session_start", + env( + "1.0", + "decision", + "SessionStart", + "m_c1", + "s_cancel", + "ajit", + 1_700_000_000_500, + b"", + ), + ) + .await; + + match client + .cancel_session(CancelSessionRequest { + session_id: "s_cancel".into(), + reason: "test cancellation".into(), + }) + .await + { + Ok(resp) => { + let ack = resp.into_inner().ack.unwrap(); + println!("[cancel_session] ok={}", ack.ok); + } + Err(status) => println!("[cancel_session] error: {status}"), + } + + send( + &mut client, + "after_cancel", + env( + "1.0", + "decision", + "Message", + "m_c2", + "s_cancel", + "ajit", + 1_700_000_000_501, + b"should_fail", + ), + ) + .await; + + // --- 13) Participant validation + let p_payload = encode_session_start(0, vec!["alice".into(), "bob".into()]); + send( + &mut client, + "participant_session_start", + env( + "1.0", + "decision", + "SessionStart", + "m_p1", + "s_participant", + "alice", + 1_700_000_000_600, + &p_payload, + ), + ) + .await; + send( + &mut client, + "unauthorized_sender", + env( + "1.0", + "decision", + "Message", + "m_p2", + "s_participant", + "charlie", + 1_700_000_000_601, + b"hello", + ), + ) + .await; + send( + &mut client, + "authorized_sender", + env( + "1.0", + "decision", + "Message", + "m_p3", + "s_participant", + "alice", + 1_700_000_000_602, + b"hello", + ), + ) + .await; + + // --- 14) Signal + send( + &mut client, + "signal", + env( + "1.0", + "", + "Signal", + "sig1", + "", + "alice", + 1_700_000_000_700, + b"", + ), + ) + .await; + + // --- 15) GetSession + match client + .get_session(GetSessionRequest { + session_id: "s1".into(), + }) + .await + { + Ok(resp) => { + let meta = resp.into_inner().metadata.unwrap(); + println!("[get_session] state={} mode={}", meta.state, meta.mode); + } + Err(status) => println!("[get_session] error: {status}"), + } + + // --- 16) ListModes + match client.list_modes(ListModesRequest {}).await { + Ok(resp) => { + let modes = resp.into_inner().modes; + println!( + "[list_modes] count={} modes={:?}", + modes.len(), + modes.iter().map(|m| &m.mode).collect::>() + ); + } + Err(status) => println!("[list_modes] error: {status}"), + } + + // --- 17) GetManifest + match client + .get_manifest(GetManifestRequest { + agent_id: String::new(), + }) + .await + { + Ok(resp) => { + let manifest = resp.into_inner().manifest.unwrap(); + println!( + "[get_manifest] agent_id={} modes={:?}", + manifest.agent_id, manifest.supported_modes + ); + } + Err(status) => println!("[get_manifest] error: {status}"), + } + + // --- 18) ListRoots + match client.list_roots(ListRootsRequest {}).await { + Ok(resp) => println!("[list_roots] count={}", resp.into_inner().roots.len()), + Err(status) => println!("[list_roots] error: {status}"), + } + Ok(()) } diff --git a/src/bin/multi_round_client.rs b/src/bin/multi_round_client.rs index 00265ae..d8f4cd3 100644 --- a/src/bin/multi_round_client.rs +++ b/src/bin/multi_round_client.rs @@ -1,12 +1,17 @@ -use macp_runtime::pb::macp_service_client::MacpServiceClient; -use macp_runtime::pb::{Envelope, GetSessionRequest, SendMessageRequest}; +use macp_runtime::pb::macp_runtime_service_client::MacpRuntimeServiceClient; +use macp_runtime::pb::{Envelope, GetSessionRequest, SendRequest, SessionStartPayload}; +use prost::Message; -async fn send(client: &mut MacpServiceClient, label: &str, e: Envelope) { - let req = SendMessageRequest { envelope: Some(e) }; - match client.send_message(req).await { +async fn send( + client: &mut MacpRuntimeServiceClient, + label: &str, + e: Envelope, +) { + match client.send(SendRequest { envelope: Some(e) }).await { Ok(resp) => { - let ack = resp.into_inner(); - println!("[{label}] accepted={} error='{}'", ack.accepted, ack.error); + let ack = resp.into_inner().ack.unwrap(); + let err_code = ack.error.as_ref().map(|e| e.code.as_str()).unwrap_or(""); + println!("[{label}] ok={} error='{}'", ack.ok, err_code); } Err(status) => { println!("[{label}] grpc error: {status}"); @@ -16,28 +21,33 @@ async fn send(client: &mut MacpServiceClient, label: #[tokio::main] async fn main() -> Result<(), Box> { - let mut client = MacpServiceClient::connect("http://127.0.0.1:50051").await?; + let mut client = MacpRuntimeServiceClient::connect("http://127.0.0.1:50051").await?; println!("=== Multi-Round Convergence Demo ===\n"); // 1) SessionStart with multi_round mode - let payload = serde_json::json!({ - "participants": ["alice", "bob"], - "convergence": {"type": "all_equal"}, - "ttl_ms": 60000 - }); + let start_payload = SessionStartPayload { + intent: "convergence test".into(), + ttl_ms: 60000, + participants: vec!["alice".into(), "bob".into()], + mode_version: String::new(), + configuration_version: String::new(), + policy_version: String::new(), + context: vec![], + roots: vec![], + }; send( &mut client, "session_start", Envelope { - macp_version: "v1".into(), + macp_version: "1.0".into(), mode: "multi_round".into(), message_type: "SessionStart".into(), message_id: "m0".into(), session_id: "mr1".into(), sender: "coordinator".into(), timestamp_unix_ms: chrono::Utc::now().timestamp_millis(), - payload: payload.to_string().into_bytes(), + payload: start_payload.encode_to_vec(), }, ) .await; @@ -47,7 +57,7 @@ async fn main() -> Result<(), Box> { &mut client, "alice_contributes_a", Envelope { - macp_version: "v1".into(), + macp_version: "1.0".into(), mode: "multi_round".into(), message_type: "Contribute".into(), message_id: "m1".into(), @@ -64,7 +74,7 @@ async fn main() -> Result<(), Box> { &mut client, "bob_contributes_b", Envelope { - macp_version: "v1".into(), + macp_version: "1.0".into(), mode: "multi_round".into(), message_type: "Contribute".into(), message_id: "m2".into(), @@ -84,21 +94,18 @@ async fn main() -> Result<(), Box> { .await { Ok(resp) => { - let info = resp.into_inner(); - println!( - "[get_session] state={} mode={} participants={:?}", - info.state, info.mode, info.participants - ); + let meta = resp.into_inner().metadata.unwrap(); + println!("[get_session] state={} mode={}", meta.state, meta.mode); } Err(status) => println!("[get_session] error: {status}"), } - // 4) Bob revises to "option_a" (convergence → auto-resolved) + // 4) Bob revises to "option_a" (convergence -> auto-resolved) send( &mut client, "bob_revises_to_a", Envelope { - macp_version: "v1".into(), + macp_version: "1.0".into(), mode: "multi_round".into(), message_type: "Contribute".into(), message_id: "m3".into(), @@ -118,11 +125,10 @@ async fn main() -> Result<(), Box> { .await { Ok(resp) => { - let info = resp.into_inner(); + let meta = resp.into_inner().metadata.unwrap(); println!( - "[get_session] state={} resolution={}", - info.state, - String::from_utf8_lossy(&info.resolution) + "[get_session] state={} mode_version={}", + meta.state, meta.mode_version ); } Err(status) => println!("[get_session] error: {status}"), @@ -133,7 +139,7 @@ async fn main() -> Result<(), Box> { &mut client, "after_convergence", Envelope { - macp_version: "v1".into(), + macp_version: "1.0".into(), mode: "multi_round".into(), message_type: "Contribute".into(), message_id: "m4".into(), diff --git a/src/error.rs b/src/error.rs index 2b4940c..bb3a364 100644 --- a/src/error.rs +++ b/src/error.rs @@ -22,4 +22,90 @@ pub enum MacpError { InvalidModeState, #[error("InvalidPayload")] InvalidPayload, + #[error("Forbidden")] + Forbidden, + #[error("Unauthenticated")] + Unauthenticated, + #[error("DuplicateMessage")] + DuplicateMessage, + #[error("PayloadTooLarge")] + PayloadTooLarge, + #[error("RateLimited")] + RateLimited, +} + +impl MacpError { + /// Returns the RFC error code string for this error variant. + pub fn error_code(&self) -> &'static str { + match self { + MacpError::InvalidMacpVersion => "UNSUPPORTED_PROTOCOL_VERSION", + MacpError::InvalidEnvelope => "INVALID_ENVELOPE", + MacpError::DuplicateSession => "INVALID_ENVELOPE", + MacpError::UnknownSession => "SESSION_NOT_FOUND", + MacpError::SessionNotOpen => "SESSION_NOT_OPEN", + MacpError::TtlExpired => "SESSION_NOT_OPEN", + MacpError::InvalidTtl => "INVALID_ENVELOPE", + MacpError::UnknownMode => "MODE_NOT_SUPPORTED", + MacpError::InvalidModeState => "INVALID_ENVELOPE", + MacpError::InvalidPayload => "INVALID_ENVELOPE", + MacpError::Forbidden => "FORBIDDEN", + MacpError::Unauthenticated => "UNAUTHENTICATED", + MacpError::DuplicateMessage => "DUPLICATE_MESSAGE", + MacpError::PayloadTooLarge => "PAYLOAD_TOO_LARGE", + MacpError::RateLimited => "RATE_LIMITED", + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn error_code_mapping_covers_all_variants() { + let cases: Vec<(MacpError, &str)> = vec![ + ( + MacpError::InvalidMacpVersion, + "UNSUPPORTED_PROTOCOL_VERSION", + ), + (MacpError::InvalidEnvelope, "INVALID_ENVELOPE"), + (MacpError::DuplicateSession, "INVALID_ENVELOPE"), + (MacpError::UnknownSession, "SESSION_NOT_FOUND"), + (MacpError::SessionNotOpen, "SESSION_NOT_OPEN"), + (MacpError::TtlExpired, "SESSION_NOT_OPEN"), + (MacpError::InvalidTtl, "INVALID_ENVELOPE"), + (MacpError::UnknownMode, "MODE_NOT_SUPPORTED"), + (MacpError::InvalidModeState, "INVALID_ENVELOPE"), + (MacpError::InvalidPayload, "INVALID_ENVELOPE"), + (MacpError::Forbidden, "FORBIDDEN"), + (MacpError::Unauthenticated, "UNAUTHENTICATED"), + (MacpError::DuplicateMessage, "DUPLICATE_MESSAGE"), + (MacpError::PayloadTooLarge, "PAYLOAD_TOO_LARGE"), + (MacpError::RateLimited, "RATE_LIMITED"), + ]; + + for (error, expected_code) in cases { + assert_eq!( + error.error_code(), + expected_code, + "error_code() mismatch for {:?}", + error + ); + assert!(!error.to_string().is_empty()); + } + } + + #[test] + fn display_matches_variant_name() { + assert_eq!( + MacpError::InvalidMacpVersion.to_string(), + "InvalidMacpVersion" + ); + assert_eq!(MacpError::Forbidden.to_string(), "Forbidden"); + assert_eq!(MacpError::TtlExpired.to_string(), "TtlExpired"); + assert_eq!(MacpError::Unauthenticated.to_string(), "Unauthenticated"); + assert_eq!(MacpError::DuplicateMessage.to_string(), "DuplicateMessage"); + assert_eq!(MacpError::PayloadTooLarge.to_string(), "PayloadTooLarge"); + assert_eq!(MacpError::RateLimited.to_string(), "RateLimited"); + } } diff --git a/src/lib.rs b/src/lib.rs index 57e9c24..85b47d5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,10 @@ pub mod pb { tonic::include_proto!("macp.v1"); } +pub mod decision_pb { + tonic::include_proto!("macp.modes.decision.v1"); +} + pub mod error; pub mod log_store; pub mod mode; diff --git a/src/main.rs b/src/main.rs index 67901f8..6719ff6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,10 +17,10 @@ async fn main() -> Result<(), Box> { let runtime = Arc::new(Runtime::new(registry, log_store)); let svc = MacpServer::new(runtime); - println!("macp-runtime v0.1 listening on {}", addr); + println!("macp-runtime v0.2 (RFC-0001) listening on {}", addr); Server::builder() - .add_service(pb::macp_service_server::MacpServiceServer::new(svc)) + .add_service(pb::macp_runtime_service_server::MacpRuntimeServiceServer::new(svc)) .serve(addr) .await?; diff --git a/src/mode/decision.rs b/src/mode/decision.rs index 263dff4..4879a43 100644 --- a/src/mode/decision.rs +++ b/src/mode/decision.rs @@ -2,51 +2,262 @@ use crate::error::MacpError; use crate::mode::{Mode, ModeResponse}; use crate::pb::Envelope; use crate::session::Session; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; -/// DecisionMode wraps the original `payload == b"resolve"` behavior. -/// This preserves backward compatibility for existing clients. +/// Phase of the decision lifecycle. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum DecisionPhase { + Proposal, + Evaluation, + Voting, + Committed, +} + +/// Internal state tracked across the decision lifecycle. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DecisionState { + pub proposals: HashMap, + pub evaluations: Vec, + pub objections: Vec, + pub votes: HashMap, + pub phase: DecisionPhase, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Proposal { + pub proposal_id: String, + pub option: String, + pub rationale: String, + pub sender: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Evaluation { + pub proposal_id: String, + pub recommendation: String, + pub confidence: f64, + pub reason: String, + pub sender: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Objection { + pub proposal_id: String, + pub reason: String, + pub severity: String, + pub sender: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Vote { + pub proposal_id: String, + pub vote: String, + pub reason: String, + pub sender: String, +} + +/// DecisionMode implements the RFC-compliant Proposal -> Evaluation -> Vote -> Commitment lifecycle. +/// Also supports the legacy `payload == b"resolve"` behavior for backward compatibility. pub struct DecisionMode; +impl DecisionMode { + fn encode_state(state: &DecisionState) -> Vec { + serde_json::to_vec(state).expect("DecisionState is always serializable") + } + + fn decode_state(data: &[u8]) -> Result { + serde_json::from_slice(data).map_err(|_| MacpError::InvalidModeState) + } +} + impl Mode for DecisionMode { fn on_session_start( &self, _session: &Session, _env: &Envelope, ) -> Result { - Ok(ModeResponse::NoOp) + let state = DecisionState { + proposals: HashMap::new(), + evaluations: Vec::new(), + objections: Vec::new(), + votes: HashMap::new(), + phase: DecisionPhase::Proposal, + }; + Ok(ModeResponse::PersistState(Self::encode_state(&state))) } - fn on_message(&self, _session: &Session, env: &Envelope) -> Result { - if env.payload == b"resolve" { - Ok(ModeResponse::Resolve(env.payload.clone())) + fn on_message(&self, session: &Session, env: &Envelope) -> Result { + // Legacy backward compatibility: payload == "resolve" resolves immediately + if env.message_type == "Message" && env.payload == b"resolve" { + return Ok(ModeResponse::Resolve(env.payload.clone())); + } + + // For non-typed messages, just pass through + match env.message_type.as_str() { + "Proposal" | "Evaluation" | "Objection" | "Vote" | "Commitment" => {} + _ => return Ok(ModeResponse::NoOp), + } + + let mut state = if session.mode_state.is_empty() { + DecisionState { + proposals: HashMap::new(), + evaluations: Vec::new(), + objections: Vec::new(), + votes: HashMap::new(), + phase: DecisionPhase::Proposal, + } } else { - Ok(ModeResponse::NoOp) + Self::decode_state(&session.mode_state)? + }; + + if state.phase == DecisionPhase::Committed { + return Err(MacpError::SessionNotOpen); + } + + match env.message_type.as_str() { + "Proposal" => { + let payload: ProposalInput = + serde_json::from_slice(&env.payload).map_err(|_| MacpError::InvalidPayload)?; + if payload.proposal_id.is_empty() { + return Err(MacpError::InvalidPayload); + } + state.proposals.insert( + payload.proposal_id.clone(), + Proposal { + proposal_id: payload.proposal_id, + option: payload.option, + rationale: payload.rationale.unwrap_or_default(), + sender: env.sender.clone(), + }, + ); + state.phase = DecisionPhase::Evaluation; + Ok(ModeResponse::PersistState(Self::encode_state(&state))) + } + "Evaluation" => { + let payload: EvaluationInput = + serde_json::from_slice(&env.payload).map_err(|_| MacpError::InvalidPayload)?; + if !state.proposals.contains_key(&payload.proposal_id) { + return Err(MacpError::InvalidPayload); + } + state.evaluations.push(Evaluation { + proposal_id: payload.proposal_id, + recommendation: payload.recommendation, + confidence: payload.confidence.unwrap_or(0.0), + reason: payload.reason.unwrap_or_default(), + sender: env.sender.clone(), + }); + Ok(ModeResponse::PersistState(Self::encode_state(&state))) + } + "Objection" => { + let payload: ObjectionInput = + serde_json::from_slice(&env.payload).map_err(|_| MacpError::InvalidPayload)?; + if !state.proposals.contains_key(&payload.proposal_id) { + return Err(MacpError::InvalidPayload); + } + state.objections.push(Objection { + proposal_id: payload.proposal_id, + reason: payload.reason, + severity: payload.severity.unwrap_or_else(|| "medium".into()), + sender: env.sender.clone(), + }); + Ok(ModeResponse::PersistState(Self::encode_state(&state))) + } + "Vote" => { + if state.proposals.is_empty() { + return Err(MacpError::InvalidPayload); + } + let payload: VoteInput = + serde_json::from_slice(&env.payload).map_err(|_| MacpError::InvalidPayload)?; + if !state.proposals.contains_key(&payload.proposal_id) { + return Err(MacpError::InvalidPayload); + } + state.votes.insert( + env.sender.clone(), + Vote { + proposal_id: payload.proposal_id, + vote: payload.vote, + reason: payload.reason.unwrap_or_default(), + sender: env.sender.clone(), + }, + ); + state.phase = DecisionPhase::Voting; + Ok(ModeResponse::PersistState(Self::encode_state(&state))) + } + "Commitment" => { + if state.votes.is_empty() { + return Err(MacpError::InvalidPayload); + } + state.phase = DecisionPhase::Committed; + Ok(ModeResponse::PersistAndResolve { + state: Self::encode_state(&state), + resolution: env.payload.clone(), + }) + } + _ => Ok(ModeResponse::NoOp), } } } +// Input types for JSON deserialization from payload +#[derive(Deserialize)] +struct ProposalInput { + proposal_id: String, + option: String, + rationale: Option, +} + +#[derive(Deserialize)] +struct EvaluationInput { + proposal_id: String, + recommendation: String, + confidence: Option, + reason: Option, +} + +#[derive(Deserialize)] +struct ObjectionInput { + proposal_id: String, + reason: String, + severity: Option, +} + +#[derive(Deserialize)] +struct VoteInput { + proposal_id: String, + vote: String, + reason: Option, +} + #[cfg(test)] mod tests { use super::*; use crate::session::SessionState; + use std::collections::HashSet; fn test_session() -> Session { Session { session_id: "s1".into(), state: SessionState::Open, ttl_expiry: i64::MAX, + started_at_unix_ms: 0, resolution: None, mode: "decision".into(), mode_state: vec![], participants: vec![], + seen_message_ids: HashSet::new(), + intent: String::new(), + mode_version: String::new(), + configuration_version: String::new(), + policy_version: String::new(), } } - fn test_envelope(payload: &[u8]) -> Envelope { + fn test_envelope(message_type: &str, payload: &[u8]) -> Envelope { Envelope { - macp_version: "v1".into(), + macp_version: "1.0".into(), mode: "decision".into(), - message_type: "Message".into(), + message_type: message_type.into(), message_id: "m1".into(), session_id: "s1".into(), sender: "test".into(), @@ -55,42 +266,412 @@ mod tests { } } + fn session_with_state(state: &DecisionState) -> Session { + let mut s = test_session(); + s.mode_state = DecisionMode::encode_state(state); + s + } + + fn empty_state() -> DecisionState { + DecisionState { + proposals: HashMap::new(), + evaluations: Vec::new(), + objections: Vec::new(), + votes: HashMap::new(), + phase: DecisionPhase::Proposal, + } + } + + fn state_with_proposal() -> DecisionState { + let mut state = empty_state(); + state.proposals.insert( + "p1".into(), + Proposal { + proposal_id: "p1".into(), + option: "option_a".into(), + rationale: "because".into(), + sender: "alice".into(), + }, + ); + state.phase = DecisionPhase::Evaluation; + state + } + + fn state_with_vote() -> DecisionState { + let mut state = state_with_proposal(); + state.votes.insert( + "alice".into(), + Vote { + proposal_id: "p1".into(), + vote: "approve".into(), + reason: String::new(), + sender: "alice".into(), + }, + ); + state.phase = DecisionPhase::Voting; + state + } + #[test] - fn session_start_returns_noop() { + fn session_start_initializes_state() { let mode = DecisionMode; let session = test_session(); - let env = Envelope { - macp_version: "v1".into(), - mode: "decision".into(), - message_type: "SessionStart".into(), - message_id: "m1".into(), - session_id: "s1".into(), - sender: "test".into(), - timestamp_unix_ms: 1_700_000_000_000, - payload: vec![], - }; + let env = test_envelope("SessionStart", b""); let result = mode.on_session_start(&session, &env).unwrap(); - assert!(matches!(result, ModeResponse::NoOp)); + match result { + ModeResponse::PersistState(data) => { + let state: DecisionState = serde_json::from_slice(&data).unwrap(); + assert_eq!(state.phase, DecisionPhase::Proposal); + assert!(state.proposals.is_empty()); + } + _ => panic!("Expected PersistState"), + } } + // --- Legacy backward compatibility --- + #[test] - fn resolve_payload_returns_resolve() { + fn legacy_resolve_payload_still_works() { let mode = DecisionMode; let session = test_session(); - let env = test_envelope(b"resolve"); + let env = test_envelope("Message", b"resolve"); let result = mode.on_message(&session, &env).unwrap(); assert!(matches!(result, ModeResponse::Resolve(_))); } #[test] - fn other_payload_returns_noop() { + fn other_message_payload_returns_noop() { let mode = DecisionMode; let session = test_session(); - let env = test_envelope(b"hello world"); + let env = test_envelope("Message", b"hello world"); let result = mode.on_message(&session, &env).unwrap(); assert!(matches!(result, ModeResponse::NoOp)); } + + // --- Proposal --- + + #[test] + fn proposal_creates_entry_and_advances_phase() { + let mode = DecisionMode; + let session = session_with_state(&empty_state()); + let payload = serde_json::json!({ + "proposal_id": "p1", + "option": "option_a", + "rationale": "it's the best" + }); + let env = test_envelope("Proposal", payload.to_string().as_bytes()); + + let result = mode.on_message(&session, &env).unwrap(); + match result { + ModeResponse::PersistState(data) => { + let state: DecisionState = serde_json::from_slice(&data).unwrap(); + assert_eq!(state.phase, DecisionPhase::Evaluation); + assert!(state.proposals.contains_key("p1")); + let p = &state.proposals["p1"]; + assert_eq!(p.option, "option_a"); + assert_eq!(p.sender, "test"); + } + _ => panic!("Expected PersistState"), + } + } + + #[test] + fn proposal_with_empty_id_rejected() { + let mode = DecisionMode; + let session = session_with_state(&empty_state()); + let payload = serde_json::json!({ + "proposal_id": "", + "option": "opt" + }); + let env = test_envelope("Proposal", payload.to_string().as_bytes()); + let err = mode.on_message(&session, &env).unwrap_err(); + assert_eq!(err.to_string(), "InvalidPayload"); + } + + #[test] + fn proposal_with_bad_json_rejected() { + let mode = DecisionMode; + let session = session_with_state(&empty_state()); + let env = test_envelope("Proposal", b"not json"); + let err = mode.on_message(&session, &env).unwrap_err(); + assert_eq!(err.to_string(), "InvalidPayload"); + } + + // --- Evaluation --- + + #[test] + fn evaluation_for_existing_proposal() { + let mode = DecisionMode; + let session = session_with_state(&state_with_proposal()); + let payload = serde_json::json!({ + "proposal_id": "p1", + "recommendation": "APPROVE", + "confidence": 0.9, + "reason": "looks good" + }); + let env = test_envelope("Evaluation", payload.to_string().as_bytes()); + + let result = mode.on_message(&session, &env).unwrap(); + match result { + ModeResponse::PersistState(data) => { + let state: DecisionState = serde_json::from_slice(&data).unwrap(); + assert_eq!(state.evaluations.len(), 1); + assert_eq!(state.evaluations[0].recommendation, "APPROVE"); + } + _ => panic!("Expected PersistState"), + } + } + + #[test] + fn evaluation_for_nonexistent_proposal_rejected() { + let mode = DecisionMode; + let session = session_with_state(&state_with_proposal()); + let payload = serde_json::json!({ + "proposal_id": "nonexistent", + "recommendation": "APPROVE" + }); + let env = test_envelope("Evaluation", payload.to_string().as_bytes()); + let err = mode.on_message(&session, &env).unwrap_err(); + assert_eq!(err.to_string(), "InvalidPayload"); + } + + // --- Objection --- + + #[test] + fn objection_for_existing_proposal() { + let mode = DecisionMode; + let session = session_with_state(&state_with_proposal()); + let payload = serde_json::json!({ + "proposal_id": "p1", + "reason": "too risky", + "severity": "high" + }); + let env = test_envelope("Objection", payload.to_string().as_bytes()); + + let result = mode.on_message(&session, &env).unwrap(); + match result { + ModeResponse::PersistState(data) => { + let state: DecisionState = serde_json::from_slice(&data).unwrap(); + assert_eq!(state.objections.len(), 1); + assert_eq!(state.objections[0].severity, "high"); + } + _ => panic!("Expected PersistState"), + } + } + + #[test] + fn objection_for_nonexistent_proposal_rejected() { + let mode = DecisionMode; + let session = session_with_state(&state_with_proposal()); + let payload = serde_json::json!({ + "proposal_id": "nope", + "reason": "bad" + }); + let env = test_envelope("Objection", payload.to_string().as_bytes()); + let err = mode.on_message(&session, &env).unwrap_err(); + assert_eq!(err.to_string(), "InvalidPayload"); + } + + // --- Vote --- + + #[test] + fn vote_for_existing_proposal() { + let mode = DecisionMode; + let session = session_with_state(&state_with_proposal()); + let payload = serde_json::json!({ + "proposal_id": "p1", + "vote": "approve", + "reason": "I agree" + }); + let env = test_envelope("Vote", payload.to_string().as_bytes()); + + let result = mode.on_message(&session, &env).unwrap(); + match result { + ModeResponse::PersistState(data) => { + let state: DecisionState = serde_json::from_slice(&data).unwrap(); + assert_eq!(state.phase, DecisionPhase::Voting); + assert!(state.votes.contains_key("test")); + assert_eq!(state.votes["test"].vote, "approve"); + } + _ => panic!("Expected PersistState"), + } + } + + #[test] + fn vote_before_proposal_rejected() { + let mode = DecisionMode; + let session = session_with_state(&empty_state()); + let payload = serde_json::json!({ + "proposal_id": "p1", + "vote": "approve" + }); + let env = test_envelope("Vote", payload.to_string().as_bytes()); + let err = mode.on_message(&session, &env).unwrap_err(); + assert_eq!(err.to_string(), "InvalidPayload"); + } + + #[test] + fn vote_for_nonexistent_proposal_rejected() { + let mode = DecisionMode; + let session = session_with_state(&state_with_proposal()); + let payload = serde_json::json!({ + "proposal_id": "nope", + "vote": "approve" + }); + let env = test_envelope("Vote", payload.to_string().as_bytes()); + let err = mode.on_message(&session, &env).unwrap_err(); + assert_eq!(err.to_string(), "InvalidPayload"); + } + + #[test] + fn vote_overwrites_previous_vote_by_sender() { + let mode = DecisionMode; + let session = session_with_state(&state_with_proposal()); + let payload = serde_json::json!({ + "proposal_id": "p1", + "vote": "approve" + }); + let env = test_envelope("Vote", payload.to_string().as_bytes()); + let result = mode.on_message(&session, &env).unwrap(); + let data = match result { + ModeResponse::PersistState(d) => d, + _ => panic!("Expected PersistState"), + }; + + // Second vote by same sender + let mut session2 = test_session(); + session2.mode_state = data; + let payload2 = serde_json::json!({ + "proposal_id": "p1", + "vote": "reject" + }); + let env2 = test_envelope("Vote", payload2.to_string().as_bytes()); + let result2 = mode.on_message(&session2, &env2).unwrap(); + match result2 { + ModeResponse::PersistState(data) => { + let state: DecisionState = serde_json::from_slice(&data).unwrap(); + assert_eq!(state.votes.len(), 1); + assert_eq!(state.votes["test"].vote, "reject"); + } + _ => panic!("Expected PersistState"), + } + } + + // --- Commitment --- + + #[test] + fn commitment_resolves_session() { + let mode = DecisionMode; + let session = session_with_state(&state_with_vote()); + let payload = serde_json::json!({ + "commitment_id": "c1", + "action": "deploy option_a", + "authority_scope": "team-alpha" + }); + let env = test_envelope("Commitment", payload.to_string().as_bytes()); + + let result = mode.on_message(&session, &env).unwrap(); + match result { + ModeResponse::PersistAndResolve { state, resolution } => { + let final_state: DecisionState = serde_json::from_slice(&state).unwrap(); + assert_eq!(final_state.phase, DecisionPhase::Committed); + assert!(!resolution.is_empty()); + } + _ => panic!("Expected PersistAndResolve"), + } + } + + #[test] + fn commitment_without_votes_rejected() { + let mode = DecisionMode; + let session = session_with_state(&state_with_proposal()); + let payload = serde_json::json!({ + "commitment_id": "c1", + "action": "deploy" + }); + let env = test_envelope("Commitment", payload.to_string().as_bytes()); + let err = mode.on_message(&session, &env).unwrap_err(); + assert_eq!(err.to_string(), "InvalidPayload"); + } + + // --- Full lifecycle --- + + #[test] + fn full_decision_lifecycle() { + let mode = DecisionMode; + let mut session = test_session(); + + // SessionStart + let env = test_envelope("SessionStart", b""); + let result = mode.on_session_start(&session, &env).unwrap(); + if let ModeResponse::PersistState(data) = result { + session.mode_state = data; + } + + // Proposal + let payload = serde_json::json!({ + "proposal_id": "p1", + "option": "option_a", + "rationale": "best choice" + }); + let env = test_envelope("Proposal", payload.to_string().as_bytes()); + let result = mode.on_message(&session, &env).unwrap(); + if let ModeResponse::PersistState(data) = result { + session.mode_state = data; + } + + // Evaluation + let payload = serde_json::json!({ + "proposal_id": "p1", + "recommendation": "APPROVE", + "confidence": 0.95 + }); + let env = test_envelope("Evaluation", payload.to_string().as_bytes()); + let result = mode.on_message(&session, &env).unwrap(); + if let ModeResponse::PersistState(data) = result { + session.mode_state = data; + } + + // Vote + let payload = serde_json::json!({ + "proposal_id": "p1", + "vote": "approve", + "reason": "agreed" + }); + let env = test_envelope("Vote", payload.to_string().as_bytes()); + let result = mode.on_message(&session, &env).unwrap(); + if let ModeResponse::PersistState(data) = result { + session.mode_state = data; + } + + // Commitment + let payload = serde_json::json!({ + "commitment_id": "c1", + "action": "deploy option_a", + "authority_scope": "team", + "reason": "consensus reached" + }); + let env = test_envelope("Commitment", payload.to_string().as_bytes()); + let result = mode.on_message(&session, &env).unwrap(); + assert!(matches!(result, ModeResponse::PersistAndResolve { .. })); + } + + #[test] + fn message_after_commitment_rejected() { + let mode = DecisionMode; + let mut state = state_with_vote(); + state.phase = DecisionPhase::Committed; + let session = session_with_state(&state); + + let payload = serde_json::json!({ + "proposal_id": "p1", + "option": "option_b" + }); + let env = test_envelope("Proposal", payload.to_string().as_bytes()); + let err = mode.on_message(&session, &env).unwrap_err(); + assert_eq!(err.to_string(), "SessionNotOpen"); + } } diff --git a/src/mode/multi_round.rs b/src/mode/multi_round.rs index 9b06dd5..4329941 100644 --- a/src/mode/multi_round.rs +++ b/src/mode/multi_round.rs @@ -5,27 +5,14 @@ use crate::session::Session; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; -/// Convergence strategy configuration. -#[derive(Debug, Clone, Deserialize)] -pub struct ConvergenceConfig { - #[serde(rename = "type")] - pub convergence_type: String, -} - -/// SessionStart payload for multi_round mode. -#[derive(Debug, Clone, Deserialize)] -pub struct MultiRoundConfig { - pub participants: Vec, - pub convergence: ConvergenceConfig, - pub ttl_ms: Option, -} - /// Internal state tracked across rounds. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct MultiRoundState { pub round: u64, pub participants: Vec, pub contributions: BTreeMap, + #[serde(default)] + pub convergence_type: String, } /// Payload for Contribute messages. @@ -58,25 +45,21 @@ impl MultiRoundMode { impl Mode for MultiRoundMode { fn on_session_start( &self, - _session: &Session, - env: &Envelope, + session: &Session, + _env: &Envelope, ) -> Result { - let text = std::str::from_utf8(&env.payload).map_err(|_| MacpError::InvalidPayload)?; - let config: MultiRoundConfig = - serde_json::from_str(text).map_err(|_| MacpError::InvalidPayload)?; - - if config.participants.is_empty() { - return Err(MacpError::InvalidPayload); - } + // Participants come from the runtime via SessionStartPayload + let participants = session.participants.clone(); - if config.convergence.convergence_type != "all_equal" { + if participants.is_empty() { return Err(MacpError::InvalidPayload); } let state = MultiRoundState { round: 0, - participants: config.participants, + participants, contributions: BTreeMap::new(), + convergence_type: "all_equal".into(), }; Ok(ModeResponse::PersistState(Self::encode_state(&state))) @@ -138,36 +121,43 @@ impl Mode for MultiRoundMode { mod tests { use super::*; use crate::session::SessionState; + use std::collections::HashSet; fn base_session() -> Session { Session { session_id: "s1".into(), state: SessionState::Open, ttl_expiry: i64::MAX, + started_at_unix_ms: 0, resolution: None, mode: "multi_round".into(), mode_state: vec![], participants: vec![], + seen_message_ids: HashSet::new(), + intent: String::new(), + mode_version: String::new(), + configuration_version: String::new(), + policy_version: String::new(), } } - fn session_start_env(payload: &str) -> Envelope { + fn session_start_env() -> Envelope { Envelope { - macp_version: "v1".into(), + macp_version: "1.0".into(), mode: "multi_round".into(), message_type: "SessionStart".into(), message_id: "m0".into(), session_id: "s1".into(), sender: "creator".into(), timestamp_unix_ms: 1_700_000_000_000, - payload: payload.as_bytes().to_vec(), + payload: vec![], } } fn contribute_env(sender: &str, value: &str) -> Envelope { let payload = serde_json::json!({"value": value}).to_string(); Envelope { - macp_version: "v1".into(), + macp_version: "1.0".into(), mode: "multi_round".into(), message_type: "Contribute".into(), message_id: format!("m_{}", sender), @@ -181,15 +171,16 @@ mod tests { fn session_with_state(state: &MultiRoundState) -> Session { let mut s = base_session(); s.mode_state = MultiRoundMode::encode_state(state); + s.participants = state.participants.clone(); s } #[test] fn session_start_parses_valid_config() { let mode = MultiRoundMode; - let session = base_session(); - let payload = r#"{"participants":["alice","bob"],"convergence":{"type":"all_equal"}}"#; - let env = session_start_env(payload); + let mut session = base_session(); + session.participants = vec!["alice".into(), "bob".into()]; + let env = session_start_env(); let result = mode.on_session_start(&session, &env).unwrap(); match result { @@ -203,45 +194,11 @@ mod tests { } } - #[test] - fn session_start_with_ttl() { - let mode = MultiRoundMode; - let session = base_session(); - let payload = - r#"{"participants":["alice"],"convergence":{"type":"all_equal"},"ttl_ms":5000}"#; - let env = session_start_env(payload); - - let result = mode.on_session_start(&session, &env).unwrap(); - assert!(matches!(result, ModeResponse::PersistState(_))); - } - #[test] fn session_start_rejects_empty_participants() { let mode = MultiRoundMode; - let session = base_session(); - let payload = r#"{"participants":[],"convergence":{"type":"all_equal"}}"#; - let env = session_start_env(payload); - - let err = mode.on_session_start(&session, &env).unwrap_err(); - assert_eq!(err.to_string(), "InvalidPayload"); - } - - #[test] - fn session_start_rejects_unknown_convergence() { - let mode = MultiRoundMode; - let session = base_session(); - let payload = r#"{"participants":["alice"],"convergence":{"type":"majority"}}"#; - let env = session_start_env(payload); - - let err = mode.on_session_start(&session, &env).unwrap_err(); - assert_eq!(err.to_string(), "InvalidPayload"); - } - - #[test] - fn session_start_rejects_invalid_json() { - let mode = MultiRoundMode; - let session = base_session(); - let env = session_start_env("not json"); + let session = base_session(); // empty participants + let env = session_start_env(); let err = mode.on_session_start(&session, &env).unwrap_err(); assert_eq!(err.to_string(), "InvalidPayload"); @@ -254,6 +211,7 @@ mod tests { round: 0, participants: vec!["alice".into(), "bob".into()], contributions: BTreeMap::new(), + convergence_type: "all_equal".into(), }; let session = session_with_state(&state); let env = contribute_env("alice", "option_a"); @@ -278,6 +236,7 @@ mod tests { round: 1, participants: vec!["alice".into(), "bob".into()], contributions, + convergence_type: "all_equal".into(), }; let session = session_with_state(&state); let env = contribute_env("alice", "option_a"); @@ -301,6 +260,7 @@ mod tests { round: 1, participants: vec!["alice".into(), "bob".into()], contributions, + convergence_type: "all_equal".into(), }; let session = session_with_state(&state); let env = contribute_env("alice", "option_b"); @@ -325,6 +285,7 @@ mod tests { round: 1, participants: vec!["alice".into(), "bob".into()], contributions, + convergence_type: "all_equal".into(), }; let session = session_with_state(&state); let env = contribute_env("bob", "option_a"); @@ -357,6 +318,7 @@ mod tests { round: 1, participants: vec!["alice".into(), "bob".into()], contributions, + convergence_type: "all_equal".into(), }; let session = session_with_state(&state); let env = contribute_env("bob", "option_b"); @@ -372,6 +334,7 @@ mod tests { round: 0, participants: vec!["alice".into(), "bob".into(), "carol".into()], contributions: BTreeMap::new(), + convergence_type: "all_equal".into(), }; let session = session_with_state(&state); let env = contribute_env("alice", "option_a"); @@ -387,10 +350,11 @@ mod tests { round: 0, participants: vec!["alice".into()], contributions: BTreeMap::new(), + convergence_type: "all_equal".into(), }; let session = session_with_state(&state); let env = Envelope { - macp_version: "v1".into(), + macp_version: "1.0".into(), mode: "multi_round".into(), message_type: "Message".into(), message_id: "m1".into(), @@ -411,10 +375,11 @@ mod tests { round: 0, participants: vec!["alice".into()], contributions: BTreeMap::new(), + convergence_type: "all_equal".into(), }; let session = session_with_state(&state); let env = Envelope { - macp_version: "v1".into(), + macp_version: "1.0".into(), mode: "multi_round".into(), message_type: "Contribute".into(), message_id: "m1".into(), @@ -436,6 +401,7 @@ mod tests { round: 5, participants: vec!["alice".into(), "bob".into()], contributions, + convergence_type: "all_equal".into(), }; let encoded = MultiRoundMode::encode_state(&original); @@ -456,7 +422,6 @@ mod tests { fn three_participant_convergence() { let mode = MultiRoundMode; - // Alice and Bob already contributed "option_a" let mut contributions = BTreeMap::new(); contributions.insert("alice".to_string(), "option_a".to_string()); contributions.insert("bob".to_string(), "option_a".to_string()); @@ -464,6 +429,7 @@ mod tests { round: 2, participants: vec!["alice".into(), "bob".into(), "carol".into()], contributions, + convergence_type: "all_equal".into(), }; let session = session_with_state(&state); let env = contribute_env("carol", "option_a"); diff --git a/src/runtime.rs b/src/runtime.rs index 797692b..ecccb85 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -1,5 +1,5 @@ use chrono::Utc; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use crate::error::MacpError; @@ -9,7 +9,14 @@ use crate::mode::multi_round::MultiRoundMode; use crate::mode::{Mode, ModeResponse}; use crate::pb::Envelope; use crate::registry::SessionRegistry; -use crate::session::{parse_session_start_ttl_ms, Session, SessionState}; +use crate::session::{extract_ttl_ms, parse_session_start_payload, Session, SessionState}; + +/// Result of processing a message through the runtime. +#[derive(Debug)] +pub struct ProcessResult { + pub session_state: SessionState, + pub duplicate: bool, +} pub struct Runtime { pub registry: Arc, @@ -20,6 +27,10 @@ pub struct Runtime { impl Runtime { pub fn new(registry: Arc, log_store: Arc) -> Self { let mut modes: HashMap> = HashMap::new(); + // RFC-compliant names + modes.insert("macp.mode.decision.v1".into(), Box::new(DecisionMode)); + modes.insert("macp.mode.multi_round.v1".into(), Box::new(MultiRoundMode)); + // Short aliases for backward compatibility modes.insert("decision".into(), Box::new(DecisionMode)); modes.insert("multi_round".into(), Box::new(MultiRoundMode)); @@ -30,9 +41,18 @@ impl Runtime { } } + /// Returns the list of RFC-compliant mode names registered. + pub fn registered_mode_names(&self) -> Vec { + self.modes + .keys() + .filter(|k| k.starts_with("macp.mode.")) + .cloned() + .collect() + } + fn resolve_mode_name(mode_field: &str) -> &str { if mode_field.is_empty() { - "decision" + "macp.mode.decision.v1" } else { mode_field } @@ -78,27 +98,37 @@ impl Runtime { } } - pub async fn process(&self, env: &Envelope) -> Result<(), MacpError> { - if env.message_type == "SessionStart" { - self.process_session_start(env).await - } else { - self.process_message(env).await + pub async fn process(&self, env: &Envelope) -> Result { + match env.message_type.as_str() { + "SessionStart" => self.process_session_start(env).await, + "Signal" => self.process_signal(env).await, + _ => self.process_message(env).await, } } - async fn process_session_start(&self, env: &Envelope) -> Result<(), MacpError> { + async fn process_session_start(&self, env: &Envelope) -> Result { let mode_name = Self::resolve_mode_name(&env.mode); let mode = self.modes.get(mode_name).ok_or(MacpError::UnknownMode)?; - let ttl_ms = parse_session_start_ttl_ms(&env.payload)?; + // Parse protobuf SessionStartPayload + let start_payload = parse_session_start_payload(&env.payload)?; + let ttl_ms = extract_ttl_ms(&start_payload)?; let mut guard = self.registry.sessions.write().await; - if guard.contains_key(&env.session_id) { + // Check for duplicate session — idempotent if same message_id + if let Some(existing) = guard.get(&env.session_id) { + if existing.seen_message_ids.contains(&env.message_id) { + return Ok(ProcessResult { + session_state: existing.state.clone(), + duplicate: true, + }); + } return Err(MacpError::DuplicateSession); } - let ttl_expiry = Utc::now().timestamp_millis() + ttl_ms; + let now = Utc::now().timestamp_millis(); + let ttl_expiry = now + ttl_ms; // Create session log and append incoming entry self.log_store.create_session_log(&env.session_id).await; @@ -106,15 +136,27 @@ impl Runtime { .append(&env.session_id, Self::make_incoming_entry(env)) .await; - // Create session with initial state + // Extract participants from SessionStartPayload + let participants = start_payload.participants.clone(); + + let mut seen_message_ids = HashSet::new(); + seen_message_ids.insert(env.message_id.clone()); + + // Create session with initial state and RFC version fields let session = Session { session_id: env.session_id.clone(), state: SessionState::Open, ttl_expiry, + started_at_unix_ms: now, resolution: None, mode: mode_name.to_string(), mode_state: vec![], - participants: vec![], + participants, + seen_message_ids, + intent: start_payload.intent.clone(), + mode_version: start_payload.mode_version.clone(), + configuration_version: start_payload.configuration_version.clone(), + policy_version: start_payload.policy_version.clone(), }; // Call mode's on_session_start @@ -124,8 +166,8 @@ impl Runtime { let mut session = session; Self::apply_mode_response(&mut session, response); - // Extract participants from mode_state for multi_round - if mode_name == "multi_round" && !session.mode_state.is_empty() { + // For multi_round mode, extract participants from mode_state if not already set + if session.participants.is_empty() && !session.mode_state.is_empty() { if let Ok(state) = serde_json::from_slice::( &session.mode_state, ) { @@ -133,22 +175,33 @@ impl Runtime { } } + let result_state = session.state.clone(); guard.insert(env.session_id.clone(), session); - Ok(()) + Ok(ProcessResult { + session_state: result_state, + duplicate: false, + }) } - async fn process_message(&self, env: &Envelope) -> Result<(), MacpError> { + async fn process_message(&self, env: &Envelope) -> Result { let mut guard = self.registry.sessions.write().await; let session = guard .get_mut(&env.session_id) .ok_or(MacpError::UnknownSession)?; + // Message deduplication + if session.seen_message_ids.contains(&env.message_id) { + return Ok(ProcessResult { + session_state: session.state.clone(), + duplicate: true, + }); + } + // TTL check let now = Utc::now().timestamp_millis(); if session.state == SessionState::Open && now > session.ttl_expiry { - // Log internal TTL expiry event before state mutation self.log_store .append( &env.session_id, @@ -163,6 +216,14 @@ impl Runtime { return Err(MacpError::SessionNotOpen); } + // Participant validation + if !session.participants.is_empty() && !session.participants.contains(&env.sender) { + return Err(MacpError::Forbidden); + } + + // Record message_id + session.seen_message_ids.insert(env.message_id.clone()); + // Log incoming message before mode dispatch self.log_store .append(&env.session_id, Self::make_incoming_entry(env)) @@ -174,13 +235,60 @@ impl Runtime { let response = mode.on_message(session, env)?; Self::apply_mode_response(session, response); - Ok(()) + Ok(ProcessResult { + session_state: session.state.clone(), + duplicate: false, + }) + } + + /// Process a Signal message. Signals are non-binding and non-session-scoped. + async fn process_signal(&self, _env: &Envelope) -> Result { + Ok(ProcessResult { + session_state: SessionState::Open, + duplicate: false, + }) + } + + /// Cancel a session by ID. Idempotent for already-resolved/expired sessions. + pub async fn cancel_session( + &self, + session_id: &str, + reason: &str, + ) -> Result { + let mut guard = self.registry.sessions.write().await; + + let session = guard.get_mut(session_id).ok_or(MacpError::UnknownSession)?; + + // Idempotent: already resolved or expired + if session.state == SessionState::Resolved || session.state == SessionState::Expired { + return Ok(ProcessResult { + session_state: session.state.clone(), + duplicate: false, + }); + } + + // Log cancellation + self.log_store + .append( + session_id, + Self::make_internal_entry("SessionCancel", reason.as_bytes()), + ) + .await; + + session.state = SessionState::Expired; + + Ok(ProcessResult { + session_state: SessionState::Expired, + duplicate: false, + }) } } #[cfg(test)] mod tests { use super::*; + use crate::pb::SessionStartPayload; + use prost::Message; fn make_runtime() -> Runtime { let registry = Arc::new(SessionRegistry::new()); @@ -188,6 +296,20 @@ mod tests { Runtime::new(registry, log_store) } + fn encode_session_start(ttl_ms: i64, participants: Vec) -> Vec { + let payload = SessionStartPayload { + intent: String::new(), + participants, + mode_version: String::new(), + configuration_version: String::new(), + policy_version: String::new(), + ttl_ms, + context: vec![], + roots: vec![], + }; + payload.encode_to_vec() + } + fn env( mode: &str, message_type: &str, @@ -197,7 +319,7 @@ mod tests { payload: &[u8], ) -> Envelope { Envelope { - macp_version: "v1".into(), + macp_version: "1.0".into(), mode: mode.into(), message_type: message_type.into(), message_id: message_id.into(), @@ -238,7 +360,7 @@ mod tests { rt.process(&e).await.unwrap(); let guard = rt.registry.sessions.read().await; - assert_eq!(guard["s1"].mode, "decision"); + assert_eq!(guard["s1"].mode, "macp.mode.decision.v1"); } #[tokio::test] @@ -254,14 +376,17 @@ mod tests { async fn multi_round_flow() { let rt = make_runtime(); - let payload = r#"{"participants":["alice","bob"],"convergence":{"type":"all_equal"}}"#; + let start_payload = encode_session_start( + 0, // default TTL + vec!["alice".into(), "bob".into()], + ); let e = env( "multi_round", "SessionStart", "m0", "s1", "creator", - payload.as_bytes(), + &start_payload, ); rt.process(&e).await.unwrap(); @@ -318,10 +443,16 @@ mod tests { session_id: "s".into(), state: SessionState::Open, ttl_expiry: i64::MAX, + started_at_unix_ms: 0, resolution: None, mode: "decision".into(), mode_state: vec![], participants: vec![], + seen_message_ids: HashSet::new(), + intent: String::new(), + mode_version: String::new(), + configuration_version: String::new(), + policy_version: String::new(), }; Runtime::apply_mode_response(&mut session, ModeResponse::NoOp); assert_eq!(session.state, SessionState::Open); @@ -334,10 +465,16 @@ mod tests { session_id: "s".into(), state: SessionState::Open, ttl_expiry: i64::MAX, + started_at_unix_ms: 0, resolution: None, mode: "multi_round".into(), mode_state: vec![], participants: vec![], + seen_message_ids: HashSet::new(), + intent: String::new(), + mode_version: String::new(), + configuration_version: String::new(), + policy_version: String::new(), }; Runtime::apply_mode_response( &mut session, @@ -368,28 +505,596 @@ mod tests { async fn ttl_expiry_logs_internal_entry() { let rt = make_runtime(); - // Create session with very short TTL + let payload = encode_session_start(1, vec![]); + let e = env("decision", "SessionStart", "m1", "s1", "alice", &payload); + rt.process(&e).await.unwrap(); + + // Wait for TTL to expire + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + + let e = env("decision", "Message", "m2", "s1", "alice", b"hello"); + let err = rt.process(&e).await.unwrap_err(); + assert_eq!(err.to_string(), "TtlExpired"); + + let log = rt.log_store.get_log("s1").await.unwrap(); + assert_eq!(log.len(), 2); + assert_eq!(log[1].entry_kind, EntryKind::Internal); + assert_eq!(log[1].message_type, "TtlExpired"); + } + + // --- Phase 3: Deduplication tests --- + + #[tokio::test] + async fn duplicate_message_returns_duplicate_true() { + let rt = make_runtime(); + + let e = env("decision", "SessionStart", "m1", "s1", "alice", b""); + rt.process(&e).await.unwrap(); + + let e = env("decision", "Message", "m2", "s1", "alice", b"hello"); + let result = rt.process(&e).await.unwrap(); + assert!(!result.duplicate); + + // Same message_id again + let e = env("decision", "Message", "m2", "s1", "alice", b"hello"); + let result = rt.process(&e).await.unwrap(); + assert!(result.duplicate); + } + + #[tokio::test] + async fn duplicate_session_start_same_message_id_is_idempotent() { + let rt = make_runtime(); + + let e = env("decision", "SessionStart", "m1", "s1", "alice", b""); + rt.process(&e).await.unwrap(); + + // Same session_id and same message_id + let e = env("decision", "SessionStart", "m1", "s1", "alice", b""); + let result = rt.process(&e).await.unwrap(); + assert!(result.duplicate); + } + + #[tokio::test] + async fn duplicate_session_start_different_message_id_errors() { + let rt = make_runtime(); + + let e = env("decision", "SessionStart", "m1", "s1", "alice", b""); + rt.process(&e).await.unwrap(); + + // Same session_id but different message_id + let e = env("decision", "SessionStart", "m2", "s1", "alice", b""); + let err = rt.process(&e).await.unwrap_err(); + assert_eq!(err.to_string(), "DuplicateSession"); + } + + #[tokio::test] + async fn duplicate_does_not_log() { + let rt = make_runtime(); + + let e = env("decision", "SessionStart", "m1", "s1", "alice", b""); + rt.process(&e).await.unwrap(); + + let e = env("decision", "Message", "m2", "s1", "alice", b"hello"); + rt.process(&e).await.unwrap(); + + let log_before = rt.log_store.get_log("s1").await.unwrap().len(); + + // Duplicate + let e = env("decision", "Message", "m2", "s1", "alice", b"hello"); + let result = rt.process(&e).await.unwrap(); + assert!(result.duplicate); + + let log_after = rt.log_store.get_log("s1").await.unwrap().len(); + assert_eq!(log_before, log_after); // No new log entry + } + + // --- CancelSession tests --- + + #[tokio::test] + async fn cancel_open_session() { + let rt = make_runtime(); + + let e = env("decision", "SessionStart", "m1", "s1", "alice", b""); + rt.process(&e).await.unwrap(); + + let result = rt.cancel_session("s1", "test cancel").await.unwrap(); + assert_eq!(result.session_state, SessionState::Expired); + + let s = rt.registry.get_session("s1").await.unwrap(); + assert_eq!(s.state, SessionState::Expired); + } + + #[tokio::test] + async fn cancel_resolved_session_is_idempotent() { + let rt = make_runtime(); + + let e = env("decision", "SessionStart", "m1", "s1", "alice", b""); + rt.process(&e).await.unwrap(); + + let e = env("decision", "Message", "m2", "s1", "alice", b"resolve"); + rt.process(&e).await.unwrap(); + + let result = rt.cancel_session("s1", "too late").await.unwrap(); + assert_eq!(result.session_state, SessionState::Resolved); + } + + #[tokio::test] + async fn cancel_unknown_session_errors() { + let rt = make_runtime(); + let err = rt + .cancel_session("nonexistent", "reason") + .await + .unwrap_err(); + assert_eq!(err.to_string(), "UnknownSession"); + } + + #[tokio::test] + async fn message_after_cancel_rejected() { + let rt = make_runtime(); + + let e = env("decision", "SessionStart", "m1", "s1", "alice", b""); + rt.process(&e).await.unwrap(); + + rt.cancel_session("s1", "cancelled").await.unwrap(); + + let e = env("decision", "Message", "m2", "s1", "alice", b"hello"); + let err = rt.process(&e).await.unwrap_err(); + assert_eq!(err.to_string(), "SessionNotOpen"); + } + + #[tokio::test] + async fn cancel_logs_entry() { + let rt = make_runtime(); + + let e = env("decision", "SessionStart", "m1", "s1", "alice", b""); + rt.process(&e).await.unwrap(); + + rt.cancel_session("s1", "test reason").await.unwrap(); + + let log = rt.log_store.get_log("s1").await.unwrap(); + assert_eq!(log.len(), 2); + assert_eq!(log[1].message_type, "SessionCancel"); + assert_eq!(log[1].entry_kind, EntryKind::Internal); + } + + // --- Participant validation tests --- + + #[tokio::test] + async fn forbidden_when_sender_not_in_participants() { + let rt = make_runtime(); + + let payload = encode_session_start(0, vec!["alice".into(), "bob".into()]); + let e = env("decision", "SessionStart", "m1", "s1", "alice", &payload); + rt.process(&e).await.unwrap(); + + // "charlie" is not a participant + let e = env("decision", "Message", "m2", "s1", "charlie", b"hello"); + let err = rt.process(&e).await.unwrap_err(); + assert_eq!(err.to_string(), "Forbidden"); + } + + #[tokio::test] + async fn allowed_when_participants_empty() { + let rt = make_runtime(); + + let e = env("decision", "SessionStart", "m1", "s1", "alice", b""); + rt.process(&e).await.unwrap(); + + // Any sender allowed when participants is empty + let e = env("decision", "Message", "m2", "s1", "charlie", b"hello"); + rt.process(&e).await.unwrap(); + } + + #[tokio::test] + async fn allowed_when_sender_is_participant() { + let rt = make_runtime(); + + let payload = encode_session_start(0, vec!["alice".into(), "bob".into()]); + let e = env("decision", "SessionStart", "m1", "s1", "alice", &payload); + rt.process(&e).await.unwrap(); + + let e = env("decision", "Message", "m2", "s1", "alice", b"hello"); + rt.process(&e).await.unwrap(); + } + + // --- Mode naming tests --- + + #[tokio::test] + async fn rfc_mode_name_works() { + let rt = make_runtime(); + let e = env( - "decision", + "macp.mode.decision.v1", "SessionStart", "m1", "s1", "alice", - br#"{"ttl_ms":1}"#, + b"", ); rt.process(&e).await.unwrap(); - // Wait for TTL to expire - tokio::time::sleep(std::time::Duration::from_millis(10)).await; + let guard = rt.registry.sessions.read().await; + assert_eq!(guard["s1"].mode, "macp.mode.decision.v1"); + } + + #[tokio::test] + async fn short_alias_still_works() { + let rt = make_runtime(); + + let e = env("decision", "SessionStart", "m1", "s1", "alice", b""); + rt.process(&e).await.unwrap(); + + let guard = rt.registry.sessions.read().await; + assert_eq!(guard["s1"].mode, "decision"); + } + + // --- Signal tests --- + + #[tokio::test] + async fn signal_with_empty_session_id_accepted() { + let rt = make_runtime(); + + let e = env("", "Signal", "sig1", "", "alice", b""); + let result = rt.process(&e).await.unwrap(); + assert!(!result.duplicate); + } + + #[tokio::test] + async fn signal_does_not_create_session() { + let rt = make_runtime(); + + let e = env("", "Signal", "sig1", "s1", "alice", b""); + rt.process(&e).await.unwrap(); + + let s = rt.registry.get_session("s1").await; + assert!(s.is_none()); + } + + // --- ProcessResult state field tests --- + + #[tokio::test] + async fn process_result_state_open_after_session_start() { + let rt = make_runtime(); + + let e = env("decision", "SessionStart", "m1", "s1", "alice", b""); + let result = rt.process(&e).await.unwrap(); + assert_eq!(result.session_state, SessionState::Open); + assert!(!result.duplicate); + } + #[tokio::test] + async fn process_result_state_resolved_after_resolve() { + let rt = make_runtime(); + + let e = env("decision", "SessionStart", "m1", "s1", "alice", b""); + rt.process(&e).await.unwrap(); + + let e = env("decision", "Message", "m2", "s1", "alice", b"resolve"); + let result = rt.process(&e).await.unwrap(); + assert_eq!(result.session_state, SessionState::Resolved); + } + + #[tokio::test] + async fn process_result_state_open_for_normal_message() { + let rt = make_runtime(); + + let e = env("decision", "SessionStart", "m1", "s1", "alice", b""); + rt.process(&e).await.unwrap(); + + let e = env("decision", "Message", "m2", "s1", "alice", b"hello"); + let result = rt.process(&e).await.unwrap(); + assert_eq!(result.session_state, SessionState::Open); + } + + // --- started_at_unix_ms tests --- + + #[tokio::test] + async fn started_at_unix_ms_populated() { + let rt = make_runtime(); + let before = chrono::Utc::now().timestamp_millis(); + + let e = env("decision", "SessionStart", "m1", "s1", "alice", b""); + rt.process(&e).await.unwrap(); + + let after = chrono::Utc::now().timestamp_millis(); + let s = rt.registry.get_session("s1").await.unwrap(); + assert!(s.started_at_unix_ms >= before); + assert!(s.started_at_unix_ms <= after); + } + + // --- Dedup across different sessions --- + + #[tokio::test] + async fn same_message_id_different_sessions_not_duplicate() { + let rt = make_runtime(); + + let e = env("decision", "SessionStart", "m1", "s1", "alice", b""); + rt.process(&e).await.unwrap(); + + let e = env("decision", "SessionStart", "m1", "s2", "alice", b""); + let result = rt.process(&e).await.unwrap(); + assert!(!result.duplicate); + } + + // --- Cancel already-expired session is idempotent --- + + #[tokio::test] + async fn cancel_already_expired_session_is_idempotent() { + let rt = make_runtime(); + + let payload = encode_session_start(1, vec![]); + let e = env("decision", "SessionStart", "m1", "s1", "alice", &payload); + rt.process(&e).await.unwrap(); + + // Wait for TTL to expire, then trigger expiry + tokio::time::sleep(std::time::Duration::from_millis(10)).await; let e = env("decision", "Message", "m2", "s1", "alice", b"hello"); + let _ = rt.process(&e).await; // triggers expiry + + // Cancel expired session — should be idempotent + let result = rt.cancel_session("s1", "already expired").await.unwrap(); + assert_eq!(result.session_state, SessionState::Expired); + } + + // --- Multi-round with protobuf participants --- + + #[tokio::test] + async fn multi_round_participants_from_protobuf_payload() { + let rt = make_runtime(); + + let payload = encode_session_start(0, vec!["alice".into(), "bob".into()]); + let e = env( + "multi_round", + "SessionStart", + "m0", + "s1", + "creator", + &payload, + ); + rt.process(&e).await.unwrap(); + + let s = rt.registry.get_session("s1").await.unwrap(); + assert_eq!(s.participants, vec!["alice", "bob"]); + } + + #[tokio::test] + async fn multi_round_with_participant_validation() { + let rt = make_runtime(); + + let payload = encode_session_start(0, vec!["alice".into(), "bob".into()]); + let e = env( + "multi_round", + "SessionStart", + "m0", + "s1", + "creator", + &payload, + ); + rt.process(&e).await.unwrap(); + + // Unauthorized participant + let e = env( + "multi_round", + "Contribute", + "m1", + "s1", + "charlie", + br#"{"value":"x"}"#, + ); let err = rt.process(&e).await.unwrap_err(); - assert_eq!(err.to_string(), "TtlExpired"); + assert_eq!(err.to_string(), "Forbidden"); + + // Authorized participant + let e = env( + "multi_round", + "Contribute", + "m2", + "s1", + "alice", + br#"{"value":"x"}"#, + ); + rt.process(&e).await.unwrap(); + } + + // --- RFC mode names for multi_round --- + + #[tokio::test] + async fn rfc_mode_name_multi_round_works() { + let rt = make_runtime(); + + let payload = encode_session_start(0, vec!["alice".into()]); + let e = env( + "macp.mode.multi_round.v1", + "SessionStart", + "m0", + "s1", + "alice", + &payload, + ); + rt.process(&e).await.unwrap(); + + let guard = rt.registry.sessions.read().await; + assert_eq!(guard["s1"].mode, "macp.mode.multi_round.v1"); + } + + // --- Signal with payload --- + + #[tokio::test] + async fn signal_with_payload_accepted() { + let rt = make_runtime(); + + let e = env("", "Signal", "sig1", "", "alice", b"some signal data"); + let result = rt.process(&e).await.unwrap(); + assert_eq!(result.session_state, SessionState::Open); + } + + // --- ModeResponse::PersistState --- + + #[tokio::test] + async fn mode_response_apply_persist_state() { + let mut session = Session { + session_id: "s".into(), + state: SessionState::Open, + ttl_expiry: i64::MAX, + started_at_unix_ms: 0, + resolution: None, + mode: "decision".into(), + mode_state: vec![], + participants: vec![], + seen_message_ids: HashSet::new(), + intent: String::new(), + mode_version: String::new(), + configuration_version: String::new(), + policy_version: String::new(), + }; + Runtime::apply_mode_response( + &mut session, + ModeResponse::PersistState(b"persisted".to_vec()), + ); + assert_eq!(session.state, SessionState::Open); + assert_eq!(session.mode_state, b"persisted"); + assert!(session.resolution.is_none()); + } + + #[tokio::test] + async fn mode_response_apply_resolve() { + let mut session = Session { + session_id: "s".into(), + state: SessionState::Open, + ttl_expiry: i64::MAX, + started_at_unix_ms: 0, + resolution: None, + mode: "decision".into(), + mode_state: vec![], + participants: vec![], + seen_message_ids: HashSet::new(), + intent: String::new(), + mode_version: String::new(), + configuration_version: String::new(), + policy_version: String::new(), + }; + Runtime::apply_mode_response(&mut session, ModeResponse::Resolve(b"resolved".to_vec())); + assert_eq!(session.state, SessionState::Resolved); + assert_eq!(session.resolution, Some(b"resolved".to_vec())); + assert!(session.mode_state.is_empty()); + } + + // --- Multiple sessions isolation --- + + #[tokio::test] + async fn multiple_sessions_independent() { + let rt = make_runtime(); + + let e = env("decision", "SessionStart", "m1", "s1", "alice", b""); + rt.process(&e).await.unwrap(); + + let e = env("decision", "SessionStart", "m2", "s2", "bob", b""); + rt.process(&e).await.unwrap(); + + let e = env("decision", "Message", "m3", "s1", "alice", b"resolve"); + rt.process(&e).await.unwrap(); + + let s2 = rt.registry.get_session("s2").await.unwrap(); + assert_eq!(s2.state, SessionState::Open); + + let s1 = rt.registry.get_session("s1").await.unwrap(); + assert_eq!(s1.state, SessionState::Resolved); + } + + // --- Protobuf TTL from SessionStartPayload --- + + #[tokio::test] + async fn session_start_with_protobuf_ttl() { + let rt = make_runtime(); + + let payload = encode_session_start(30_000, vec![]); + let e = env("decision", "SessionStart", "m1", "s1", "alice", &payload); + rt.process(&e).await.unwrap(); + + let s = rt.registry.get_session("s1").await.unwrap(); + let now = chrono::Utc::now().timestamp_millis(); + assert!(s.ttl_expiry > now); + assert!(s.ttl_expiry <= now + 31_000); + } + + #[tokio::test] + async fn session_start_invalid_protobuf_ttl_rejected() { + let rt = make_runtime(); + + let payload = encode_session_start(-100, vec![]); + let e = env("decision", "SessionStart", "m1", "s1", "alice", &payload); + let err = rt.process(&e).await.unwrap_err(); + assert_eq!(err.to_string(), "InvalidTtl"); + } + + // --- Cancel preserves log integrity --- + + #[tokio::test] + async fn cancel_reason_recorded_in_log() { + let rt = make_runtime(); + + let e = env("decision", "SessionStart", "m1", "s1", "alice", b""); + rt.process(&e).await.unwrap(); + + rt.cancel_session("s1", "user requested").await.unwrap(); let log = rt.log_store.get_log("s1").await.unwrap(); - // Should have: SessionStart (incoming) + TtlExpired (internal) assert_eq!(log.len(), 2); - assert_eq!(log[1].entry_kind, EntryKind::Internal); - assert_eq!(log[1].message_type, "TtlExpired"); + assert_eq!(log[1].message_type, "SessionCancel"); + assert_eq!(log[1].raw_payload, b"user requested"); + assert_eq!(log[1].sender, "_runtime"); + } + + // --- Dedup does not invoke mode --- + + #[tokio::test] + async fn duplicate_resolve_does_not_double_resolve() { + let rt = make_runtime(); + + let e = env("decision", "SessionStart", "m1", "s1", "alice", b""); + rt.process(&e).await.unwrap(); + + let e = env("decision", "Message", "m2", "s1", "alice", b"resolve"); + let result = rt.process(&e).await.unwrap(); + assert_eq!(result.session_state, SessionState::Resolved); + assert!(!result.duplicate); + + let e = env("decision", "Message", "m2", "s1", "alice", b"resolve"); + let result = rt.process(&e).await.unwrap(); + assert!(result.duplicate); + assert_eq!(result.session_state, SessionState::Resolved); + } + + // --- Version fields stored on session --- + + #[tokio::test] + async fn session_stores_version_fields() { + let rt = make_runtime(); + + let payload = SessionStartPayload { + intent: "test coordination".into(), + participants: vec![], + mode_version: "1.0".into(), + configuration_version: "cfg-v2".into(), + policy_version: "pol-v1".into(), + ttl_ms: 0, + context: vec![], + roots: vec![], + }; + let e = env( + "decision", + "SessionStart", + "m1", + "s1", + "alice", + &payload.encode_to_vec(), + ); + rt.process(&e).await.unwrap(); + + let s = rt.registry.get_session("s1").await.unwrap(); + assert_eq!(s.intent, "test coordination"); + assert_eq!(s.mode_version, "1.0"); + assert_eq!(s.configuration_version, "cfg-v2"); + assert_eq!(s.policy_version, "pol-v1"); } } diff --git a/src/server.rs b/src/server.rs index 11bc534..73caf4a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,10 +1,18 @@ use macp_runtime::error::MacpError; -use macp_runtime::pb::macp_service_server::MacpService; +use macp_runtime::pb::macp_runtime_service_server::MacpRuntimeService; use macp_runtime::pb::{ - Envelope, GetSessionRequest, GetSessionResponse, SendMessageRequest, SendMessageResponse, + Ack, CancelSessionRequest, CancelSessionResponse, CancellationCapability, Capabilities, + Envelope, GetManifestRequest, GetManifestResponse, GetSessionRequest, GetSessionResponse, + InitializeRequest, InitializeResponse, ListModesRequest, ListModesResponse, ListRootsRequest, + ListRootsResponse, MacpError as PbMacpError, ManifestCapability, ModeDescriptor, + ModeRegistryCapability, ProgressCapability, RootsCapability, RuntimeInfo, SendRequest, + SendResponse, SessionMetadata, SessionState as PbSessionState, SessionsCapability, + StreamSessionRequest, StreamSessionResponse, WatchModeRegistryRequest, + WatchModeRegistryResponse, WatchRootsRequest, WatchRootsResponse, }; use macp_runtime::runtime::Runtime; use macp_runtime::session::SessionState; +use std::collections::HashMap; use std::sync::Arc; use tonic::{Request, Response, Status}; @@ -18,26 +26,101 @@ impl MacpServer { } fn validate(env: &Envelope) -> Result<(), MacpError> { - if env.macp_version != "v1" { + if env.macp_version != "1.0" { return Err(MacpError::InvalidMacpVersion); } - if env.session_id.is_empty() || env.message_id.is_empty() { + // Signals may have empty session_id + if env.message_type != "Signal" && env.session_id.is_empty() { + return Err(MacpError::InvalidEnvelope); + } + if env.message_id.is_empty() { return Err(MacpError::InvalidEnvelope); } Ok(()) } + + fn session_state_to_pb(state: &SessionState) -> i32 { + match state { + SessionState::Open => PbSessionState::Open.into(), + SessionState::Resolved => PbSessionState::Resolved.into(), + SessionState::Expired => PbSessionState::Expired.into(), + } + } + + fn make_error_ack(e: &MacpError, env: &Envelope) -> Ack { + Ack { + ok: false, + duplicate: false, + message_id: env.message_id.clone(), + session_id: env.session_id.clone(), + accepted_at_unix_ms: chrono::Utc::now().timestamp_millis(), + session_state: PbSessionState::Unspecified.into(), + error: Some(PbMacpError { + code: e.error_code().into(), + message: e.to_string(), + session_id: env.session_id.clone(), + message_id: env.message_id.clone(), + details: vec![], + }), + } + } } #[tonic::async_trait] -impl MacpService for MacpServer { - async fn send_message( +impl MacpRuntimeService for MacpServer { + async fn initialize( &self, - request: Request, - ) -> Result, Status> { + request: Request, + ) -> Result, Status> { let req = request.into_inner(); - let env = req + + // Negotiate protocol version + let supported = &req.supported_protocol_versions; + if !supported.iter().any(|v| v == "1.0") { + return Err(Status::failed_precondition( + "UNSUPPORTED_PROTOCOL_VERSION: no mutually supported protocol version", + )); + } + + let mode_names = self.runtime.registered_mode_names(); + + Ok(Response::new(InitializeResponse { + selected_protocol_version: "1.0".into(), + runtime_info: Some(RuntimeInfo { + name: "macp-runtime".into(), + title: "MACP Reference Runtime".into(), + version: "0.2".into(), + description: "Reference implementation of the Multi-Agent Coordination Protocol" + .into(), + website_url: String::new(), + }), + capabilities: Some(Capabilities { + sessions: Some(SessionsCapability { stream: true }), + cancellation: Some(CancellationCapability { + cancel_session: true, + }), + progress: Some(ProgressCapability { progress: false }), + manifest: Some(ManifestCapability { get_manifest: true }), + mode_registry: Some(ModeRegistryCapability { + list_modes: true, + list_changed: false, + }), + roots: Some(RootsCapability { + list_roots: true, + list_changed: false, + }), + experimental: None, + }), + supported_modes: mode_names, + instructions: String::new(), + })) + } + + async fn send(&self, request: Request) -> Result, Status> { + let send_req = request.into_inner(); + let env = send_req .envelope - .ok_or_else(|| Status::invalid_argument("missing envelope"))?; + .ok_or_else(|| Status::invalid_argument("SendRequest must contain an envelope"))?; let result = async { Self::validate(&env)?; @@ -45,50 +128,241 @@ impl MacpService for MacpServer { } .await; - let resp = match result { - Ok(_) => SendMessageResponse { - accepted: true, - error: "".into(), - }, - Err(e) => SendMessageResponse { - accepted: false, - error: e.to_string(), + let ack = match result { + Ok(process_result) => Ack { + ok: true, + duplicate: process_result.duplicate, + message_id: env.message_id.clone(), + session_id: env.session_id.clone(), + accepted_at_unix_ms: chrono::Utc::now().timestamp_millis(), + session_state: Self::session_state_to_pb(&process_result.session_state), + error: None, }, + Err(e) => Self::make_error_ack(&e, &env), }; - Ok(Response::new(resp)) + Ok(Response::new(SendResponse { ack: Some(ack) })) } async fn get_session( &self, request: Request, ) -> Result, Status> { - let query = request.into_inner(); + let req = request.into_inner(); - match self.runtime.registry.get_session(&query.session_id).await { + match self.runtime.registry.get_session(&req.session_id).await { Some(session) => { - let state_str = match session.state { - SessionState::Open => "Open", - SessionState::Resolved => "Resolved", - SessionState::Expired => "Expired", - }; + let state = Self::session_state_to_pb(&session.state); Ok(Response::new(GetSessionResponse { - session_id: session.session_id.clone(), - mode: session.mode.clone(), - state: state_str.into(), - ttl_expiry: session.ttl_expiry, - resolution: session.resolution.clone().unwrap_or_default(), - mode_state: session.mode_state.clone(), - participants: session.participants.clone(), + metadata: Some(SessionMetadata { + session_id: session.session_id.clone(), + mode: session.mode.clone(), + state, + started_at_unix_ms: session.started_at_unix_ms, + expires_at_unix_ms: session.ttl_expiry, + mode_version: session.mode_version.clone(), + configuration_version: session.configuration_version.clone(), + policy_version: session.policy_version.clone(), + }), })) } None => Err(Status::not_found(format!( "Session '{}' not found", - query.session_id + req.session_id ))), } } + + async fn cancel_session( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + + match self + .runtime + .cancel_session(&req.session_id, &req.reason) + .await + { + Ok(result) => Ok(Response::new(CancelSessionResponse { + ack: Some(Ack { + ok: true, + duplicate: false, + message_id: String::new(), + session_id: req.session_id, + accepted_at_unix_ms: chrono::Utc::now().timestamp_millis(), + session_state: Self::session_state_to_pb(&result.session_state), + error: None, + }), + })), + Err(e) => { + let ack = Ack { + ok: false, + duplicate: false, + message_id: String::new(), + session_id: req.session_id.clone(), + accepted_at_unix_ms: chrono::Utc::now().timestamp_millis(), + session_state: PbSessionState::Unspecified.into(), + error: Some(PbMacpError { + code: e.error_code().into(), + message: e.to_string(), + session_id: req.session_id, + message_id: String::new(), + details: vec![], + }), + }; + Ok(Response::new(CancelSessionResponse { ack: Some(ack) })) + } + } + } + + async fn get_manifest( + &self, + _request: Request, + ) -> Result, Status> { + let mode_names = self.runtime.registered_mode_names(); + + Ok(Response::new(GetManifestResponse { + manifest: Some(macp_runtime::pb::AgentManifest { + agent_id: "macp-runtime".into(), + title: "MACP Reference Runtime".into(), + description: "Reference implementation of MACP".into(), + supported_modes: mode_names, + input_content_types: vec!["application/protobuf".into()], + output_content_types: vec!["application/protobuf".into()], + metadata: HashMap::new(), + }), + })) + } + + async fn list_modes( + &self, + _request: Request, + ) -> Result, Status> { + let modes = vec![ + ModeDescriptor { + mode: "macp.mode.decision.v1".into(), + mode_version: "1.0".into(), + title: "Decision Mode".into(), + description: "Proposal-based decision making with voting".into(), + determinism_class: "semantic-deterministic".into(), + participant_model: "open".into(), + message_types: vec![ + "Proposal".into(), + "Evaluation".into(), + "Objection".into(), + "Vote".into(), + "Commitment".into(), + ], + terminal_message_types: vec!["Commitment".into()], + schema_uris: HashMap::new(), + }, + ModeDescriptor { + mode: "macp.mode.multi_round.v1".into(), + mode_version: "1.0".into(), + title: "Multi-Round Convergence Mode".into(), + description: "Participant-based convergence with all_equal strategy".into(), + determinism_class: "semantic-deterministic".into(), + participant_model: "closed".into(), + message_types: vec!["Contribute".into()], + terminal_message_types: vec![], + schema_uris: HashMap::new(), + }, + ]; + + Ok(Response::new(ListModesResponse { modes })) + } + + async fn list_roots( + &self, + _request: Request, + ) -> Result, Status> { + Ok(Response::new(ListRootsResponse { roots: vec![] })) + } + + type StreamSessionStream = std::pin::Pin< + Box> + Send>, + >; + + async fn stream_session( + &self, + request: Request>, + ) -> Result, Status> { + use tokio_stream::StreamExt; + + let runtime = self.runtime.clone(); + let mut inbound = request.into_inner(); + + let output = async_stream::try_stream! { + while let Some(req) = inbound.next().await { + let req = req?; + let env = req.envelope.ok_or_else(|| { + Status::invalid_argument("StreamSessionRequest must contain an envelope") + })?; + + let result = async { + MacpServer::validate(&env)?; + runtime.process(&env).await + } + .await; + + match result { + Ok(_) => { + yield StreamSessionResponse { + envelope: Some(env), + }; + } + Err(e) => { + // Build an error envelope as a response + let error_ack = MacpServer::make_error_ack(&e, &env); + let error_env = Envelope { + macp_version: "1.0".into(), + mode: env.mode.clone(), + message_type: "Error".into(), + message_id: format!("err-{}", env.message_id), + session_id: env.session_id.clone(), + sender: "_runtime".into(), + timestamp_unix_ms: chrono::Utc::now().timestamp_millis(), + payload: serde_json::to_vec(&serde_json::json!({ + "code": error_ack.error.as_ref().map(|e| &e.code), + "message": error_ack.error.as_ref().map(|e| &e.message), + })).unwrap_or_default(), + }; + yield StreamSessionResponse { + envelope: Some(error_env), + }; + } + } + } + }; + + Ok(Response::new(Box::pin(output))) + } + + type WatchModeRegistryStream = std::pin::Pin< + Box> + Send>, + >; + + async fn watch_mode_registry( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented( + "WatchModeRegistry is not yet implemented", + )) + } + + type WatchRootsStream = std::pin::Pin< + Box> + Send>, + >; + + async fn watch_roots( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("WatchRoots is not yet implemented")) + } } #[cfg(test)] @@ -96,8 +370,11 @@ mod tests { use super::*; use chrono::Utc; use macp_runtime::log_store::LogStore; + use macp_runtime::pb::SessionStartPayload; use macp_runtime::registry::SessionRegistry; use macp_runtime::session::Session; + use prost::Message; + use std::collections::HashSet; fn make_server() -> (MacpServer, Arc) { let registry = Arc::new(SessionRegistry::new()); @@ -107,18 +384,24 @@ mod tests { (server, runtime) } - fn wrap(env: Envelope) -> SendMessageRequest { - SendMessageRequest { + fn send_req(env: Envelope) -> Request { + Request::new(SendRequest { envelope: Some(env), - } + }) + } + + async fn do_send(server: &MacpServer, env: Envelope) -> Ack { + let resp = server.send(send_req(env)).await.unwrap(); + resp.into_inner().ack.unwrap() } + // --- TTL/Session state tests --- + #[tokio::test] async fn expired_session_transitions_to_expired() { let (_, runtime) = make_server(); let server = MacpServer::new(runtime.clone()); - // Insert a session that expired 1 second ago let expired_ttl = Utc::now().timestamp_millis() - 1000; runtime .registry @@ -128,16 +411,22 @@ mod tests { session_id: "s_expired".into(), state: SessionState::Open, ttl_expiry: expired_ttl, + started_at_unix_ms: expired_ttl - 60_000, resolution: None, mode: "decision".into(), mode_state: vec![], participants: vec![], + seen_message_ids: HashSet::new(), + intent: String::new(), + mode_version: String::new(), + configuration_version: String::new(), + policy_version: String::new(), }, ) .await; let env = Envelope { - macp_version: "v1".into(), + macp_version: "1.0".into(), mode: "decision".into(), message_type: "Message".into(), message_id: "m1".into(), @@ -147,12 +436,10 @@ mod tests { payload: b"hello".to_vec(), }; - let resp = server.send_message(Request::new(wrap(env))).await.unwrap(); - let ack = resp.into_inner(); - assert!(!ack.accepted); - assert_eq!(ack.error, "TtlExpired"); + let ack = do_send(&server, env).await; + assert!(!ack.ok); + assert_eq!(ack.error.as_ref().unwrap().code, "SESSION_NOT_OPEN"); - // Verify the session state was set to Expired let s = runtime.registry.get_session("s_expired").await.unwrap(); assert_eq!(s.state, SessionState::Expired); } @@ -162,7 +449,6 @@ mod tests { let (_, runtime) = make_server(); let server = MacpServer::new(runtime.clone()); - // Insert a session that expires far in the future let future_ttl = Utc::now().timestamp_millis() + 60_000; runtime .registry @@ -172,16 +458,22 @@ mod tests { session_id: "s_alive".into(), state: SessionState::Open, ttl_expiry: future_ttl, + started_at_unix_ms: future_ttl - 120_000, resolution: None, mode: "decision".into(), mode_state: vec![], participants: vec![], + seen_message_ids: HashSet::new(), + intent: String::new(), + mode_version: String::new(), + configuration_version: String::new(), + policy_version: String::new(), }, ) .await; let env = Envelope { - macp_version: "v1".into(), + macp_version: "1.0".into(), mode: "decision".into(), message_type: "Message".into(), message_id: "m1".into(), @@ -191,9 +483,8 @@ mod tests { payload: b"hello".to_vec(), }; - let resp = server.send_message(Request::new(wrap(env))).await.unwrap(); - let ack = resp.into_inner(); - assert!(ack.accepted); + let ack = do_send(&server, env).await; + assert!(ack.ok); let s = runtime.registry.get_session("s_alive").await.unwrap(); assert_eq!(s.state, SessionState::Open); @@ -204,7 +495,6 @@ mod tests { let (_, runtime) = make_server(); let server = MacpServer::new(runtime.clone()); - // Insert a resolved session with an expired TTL let expired_ttl = Utc::now().timestamp_millis() - 1000; runtime .registry @@ -214,16 +504,22 @@ mod tests { session_id: "s_resolved".into(), state: SessionState::Resolved, ttl_expiry: expired_ttl, + started_at_unix_ms: expired_ttl - 60_000, resolution: Some(b"resolve".to_vec()), mode: "decision".into(), mode_state: vec![], participants: vec![], + seen_message_ids: HashSet::new(), + intent: String::new(), + mode_version: String::new(), + configuration_version: String::new(), + policy_version: String::new(), }, ) .await; let env = Envelope { - macp_version: "v1".into(), + macp_version: "1.0".into(), mode: "decision".into(), message_type: "Message".into(), message_id: "m1".into(), @@ -233,13 +529,770 @@ mod tests { payload: b"hello".to_vec(), }; - let resp = server.send_message(Request::new(wrap(env))).await.unwrap(); - let ack = resp.into_inner(); - assert!(!ack.accepted); - assert_eq!(ack.error, "SessionNotOpen"); + let ack = do_send(&server, env).await; + assert!(!ack.ok); + assert_eq!(ack.error.as_ref().unwrap().code, "SESSION_NOT_OPEN"); - // State should still be Resolved, not overwritten to Expired let s = runtime.registry.get_session("s_resolved").await.unwrap(); assert_eq!(s.state, SessionState::Resolved); } + + // --- Version validation --- + + #[tokio::test] + async fn version_1_0_accepted() { + let (server, _) = make_server(); + let env = Envelope { + macp_version: "1.0".into(), + mode: "decision".into(), + message_type: "SessionStart".into(), + message_id: "m1".into(), + session_id: "s1".into(), + sender: "test".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: vec![], + }; + let ack = do_send(&server, env).await; + assert!(ack.ok); + } + + #[tokio::test] + async fn version_v1_rejected() { + let (server, _) = make_server(); + let env = Envelope { + macp_version: "v1".into(), + mode: "decision".into(), + message_type: "SessionStart".into(), + message_id: "m1".into(), + session_id: "s1".into(), + sender: "test".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: vec![], + }; + let ack = do_send(&server, env).await; + assert!(!ack.ok); + assert_eq!( + ack.error.as_ref().unwrap().code, + "UNSUPPORTED_PROTOCOL_VERSION" + ); + } + + #[tokio::test] + async fn empty_version_rejected() { + let (server, _) = make_server(); + let env = Envelope { + macp_version: String::new(), + mode: "decision".into(), + message_type: "SessionStart".into(), + message_id: "m1".into(), + session_id: "s1".into(), + sender: "test".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: vec![], + }; + let ack = do_send(&server, env).await; + assert!(!ack.ok); + assert_eq!( + ack.error.as_ref().unwrap().code, + "UNSUPPORTED_PROTOCOL_VERSION" + ); + } + + // --- Envelope validation --- + + #[tokio::test] + async fn empty_session_id_rejected() { + let (server, _) = make_server(); + let env = Envelope { + macp_version: "1.0".into(), + mode: "decision".into(), + message_type: "SessionStart".into(), + message_id: "m1".into(), + session_id: String::new(), + sender: "test".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: vec![], + }; + let ack = do_send(&server, env).await; + assert!(!ack.ok); + assert_eq!(ack.error.as_ref().unwrap().code, "INVALID_ENVELOPE"); + } + + #[tokio::test] + async fn empty_message_id_rejected() { + let (server, _) = make_server(); + let env = Envelope { + macp_version: "1.0".into(), + mode: "decision".into(), + message_type: "SessionStart".into(), + message_id: String::new(), + session_id: "s1".into(), + sender: "test".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: vec![], + }; + let ack = do_send(&server, env).await; + assert!(!ack.ok); + assert_eq!(ack.error.as_ref().unwrap().code, "INVALID_ENVELOPE"); + } + + // --- Signal with empty session_id allowed --- + + #[tokio::test] + async fn signal_empty_session_id_allowed() { + let (server, _) = make_server(); + let env = Envelope { + macp_version: "1.0".into(), + mode: String::new(), + message_type: "Signal".into(), + message_id: "sig1".into(), + session_id: String::new(), + sender: "test".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: vec![], + }; + let ack = do_send(&server, env).await; + assert!(ack.ok); + } + + // --- Ack fields populated correctly --- + + #[tokio::test] + async fn ack_fields_on_success() { + let (server, _) = make_server(); + let before = Utc::now().timestamp_millis(); + + let env = Envelope { + macp_version: "1.0".into(), + mode: "decision".into(), + message_type: "SessionStart".into(), + message_id: "msg123".into(), + session_id: "sess456".into(), + sender: "test".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: vec![], + }; + let ack = do_send(&server, env).await; + + assert!(ack.ok); + assert!(!ack.duplicate); + assert_eq!(ack.message_id, "msg123"); + assert_eq!(ack.session_id, "sess456"); + assert!(ack.accepted_at_unix_ms >= before); + assert_eq!(ack.session_state, PbSessionState::Open as i32); + assert!(ack.error.is_none()); + } + + #[tokio::test] + async fn ack_fields_on_error() { + let (server, _) = make_server(); + let env = Envelope { + macp_version: "1.0".into(), + mode: "decision".into(), + message_type: "Message".into(), + message_id: "msg789".into(), + session_id: "nonexistent".into(), + sender: "test".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: vec![], + }; + let ack = do_send(&server, env).await; + + assert!(!ack.ok); + assert!(!ack.duplicate); + assert_eq!(ack.message_id, "msg789"); + assert_eq!(ack.session_id, "nonexistent"); + let err = ack.error.unwrap(); + assert_eq!(err.code, "SESSION_NOT_FOUND"); + assert_eq!(err.message, "UnknownSession"); + assert_eq!(err.session_id, "nonexistent"); + assert_eq!(err.message_id, "msg789"); + } + + // --- Deduplication at server layer --- + + #[tokio::test] + async fn ack_duplicate_field_set() { + let (server, _) = make_server(); + + let env = Envelope { + macp_version: "1.0".into(), + mode: "decision".into(), + message_type: "SessionStart".into(), + message_id: "m1".into(), + session_id: "s1".into(), + sender: "test".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: vec![], + }; + let ack = do_send(&server, env).await; + assert!(ack.ok); + assert!(!ack.duplicate); + + // Send a message + let env = Envelope { + macp_version: "1.0".into(), + mode: "decision".into(), + message_type: "Message".into(), + message_id: "m2".into(), + session_id: "s1".into(), + sender: "test".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: b"hello".to_vec(), + }; + let ack = do_send(&server, env).await; + assert!(!ack.duplicate); + + // Duplicate + let env = Envelope { + macp_version: "1.0".into(), + mode: "decision".into(), + message_type: "Message".into(), + message_id: "m2".into(), + session_id: "s1".into(), + sender: "test".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: b"hello".to_vec(), + }; + let ack = do_send(&server, env).await; + assert!(ack.ok); + assert!(ack.duplicate); + } + + // --- GetSession --- + + #[tokio::test] + async fn get_session_success() { + let (server, _) = make_server(); + + let env = Envelope { + macp_version: "1.0".into(), + mode: "decision".into(), + message_type: "SessionStart".into(), + message_id: "m1".into(), + session_id: "s_get".into(), + sender: "test".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: vec![], + }; + do_send(&server, env).await; + + let resp = server + .get_session(Request::new(GetSessionRequest { + session_id: "s_get".into(), + })) + .await + .unwrap(); + let meta = resp.into_inner().metadata.unwrap(); + assert_eq!(meta.session_id, "s_get"); + assert_eq!(meta.mode, "decision"); + assert_eq!(meta.state, PbSessionState::Open as i32); + assert!(meta.started_at_unix_ms > 0); + assert!(meta.expires_at_unix_ms > meta.started_at_unix_ms); + } + + #[tokio::test] + async fn get_session_not_found() { + let (server, _) = make_server(); + + let result = server + .get_session(Request::new(GetSessionRequest { + session_id: "nonexistent".into(), + })) + .await; + assert!(result.is_err()); + assert_eq!(result.unwrap_err().code(), tonic::Code::NotFound); + } + + #[tokio::test] + async fn get_session_shows_resolved_state() { + let (server, _) = make_server(); + + let env = Envelope { + macp_version: "1.0".into(), + mode: "decision".into(), + message_type: "SessionStart".into(), + message_id: "m1".into(), + session_id: "s_res".into(), + sender: "test".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: vec![], + }; + do_send(&server, env).await; + + let env = Envelope { + macp_version: "1.0".into(), + mode: "decision".into(), + message_type: "Message".into(), + message_id: "m2".into(), + session_id: "s_res".into(), + sender: "test".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: b"resolve".to_vec(), + }; + do_send(&server, env).await; + + let resp = server + .get_session(Request::new(GetSessionRequest { + session_id: "s_res".into(), + })) + .await + .unwrap(); + let meta = resp.into_inner().metadata.unwrap(); + assert_eq!(meta.state, PbSessionState::Resolved as i32); + } + + #[tokio::test] + async fn get_session_with_version_fields() { + let (server, _) = make_server(); + + let start_payload = SessionStartPayload { + intent: "coordinate".into(), + participants: vec![], + mode_version: "1.0".into(), + configuration_version: "cfg-1".into(), + policy_version: "pol-1".into(), + ttl_ms: 0, + context: vec![], + roots: vec![], + }; + + let env = Envelope { + macp_version: "1.0".into(), + mode: "decision".into(), + message_type: "SessionStart".into(), + message_id: "m1".into(), + session_id: "s_ver".into(), + sender: "test".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: start_payload.encode_to_vec(), + }; + do_send(&server, env).await; + + let resp = server + .get_session(Request::new(GetSessionRequest { + session_id: "s_ver".into(), + })) + .await + .unwrap(); + let meta = resp.into_inner().metadata.unwrap(); + assert_eq!(meta.mode_version, "1.0"); + assert_eq!(meta.configuration_version, "cfg-1"); + assert_eq!(meta.policy_version, "pol-1"); + } + + // --- CancelSession at server layer --- + + #[tokio::test] + async fn cancel_session_success() { + let (server, _) = make_server(); + + let env = Envelope { + macp_version: "1.0".into(), + mode: "decision".into(), + message_type: "SessionStart".into(), + message_id: "m1".into(), + session_id: "s_cancel".into(), + sender: "test".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: vec![], + }; + do_send(&server, env).await; + + let resp = server + .cancel_session(Request::new(CancelSessionRequest { + session_id: "s_cancel".into(), + reason: "testing".into(), + })) + .await + .unwrap(); + let ack = resp.into_inner().ack.unwrap(); + assert!(ack.ok); + assert_eq!(ack.session_id, "s_cancel"); + assert_eq!(ack.session_state, PbSessionState::Expired as i32); + } + + #[tokio::test] + async fn cancel_session_not_found() { + let (server, _) = make_server(); + + let resp = server + .cancel_session(Request::new(CancelSessionRequest { + session_id: "nonexistent".into(), + reason: "testing".into(), + })) + .await + .unwrap(); + let ack = resp.into_inner().ack.unwrap(); + assert!(!ack.ok); + assert_eq!(ack.error.as_ref().unwrap().code, "SESSION_NOT_FOUND"); + } + + #[tokio::test] + async fn cancel_then_message_rejected() { + let (server, _) = make_server(); + + let env = Envelope { + macp_version: "1.0".into(), + mode: "decision".into(), + message_type: "SessionStart".into(), + message_id: "m1".into(), + session_id: "s_cm".into(), + sender: "test".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: vec![], + }; + do_send(&server, env).await; + + server + .cancel_session(Request::new(CancelSessionRequest { + session_id: "s_cm".into(), + reason: "done".into(), + })) + .await + .unwrap(); + + let env = Envelope { + macp_version: "1.0".into(), + mode: "decision".into(), + message_type: "Message".into(), + message_id: "m2".into(), + session_id: "s_cm".into(), + sender: "test".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: b"hello".to_vec(), + }; + let ack = do_send(&server, env).await; + assert!(!ack.ok); + assert_eq!(ack.error.as_ref().unwrap().code, "SESSION_NOT_OPEN"); + } + + // --- Participant validation at server layer --- + + #[tokio::test] + async fn forbidden_error_at_server_layer() { + let (server, _) = make_server(); + + let start_payload = SessionStartPayload { + intent: String::new(), + participants: vec!["alice".into()], + mode_version: String::new(), + configuration_version: String::new(), + policy_version: String::new(), + ttl_ms: 0, + context: vec![], + roots: vec![], + }; + + let env = Envelope { + macp_version: "1.0".into(), + mode: "decision".into(), + message_type: "SessionStart".into(), + message_id: "m1".into(), + session_id: "s_auth".into(), + sender: "test".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: start_payload.encode_to_vec(), + }; + do_send(&server, env).await; + + let env = Envelope { + macp_version: "1.0".into(), + mode: "decision".into(), + message_type: "Message".into(), + message_id: "m2".into(), + session_id: "s_auth".into(), + sender: "bob".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: b"hello".to_vec(), + }; + let ack = do_send(&server, env).await; + assert!(!ack.ok); + assert_eq!(ack.error.as_ref().unwrap().code, "FORBIDDEN"); + } + + // --- Full decision flow through server --- + + #[tokio::test] + async fn full_decision_flow_through_server() { + let (server, _) = make_server(); + + let env = Envelope { + macp_version: "1.0".into(), + mode: "decision".into(), + message_type: "SessionStart".into(), + message_id: "m1".into(), + session_id: "s_flow".into(), + sender: "alice".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: vec![], + }; + let ack = do_send(&server, env).await; + assert!(ack.ok); + assert_eq!(ack.session_state, PbSessionState::Open as i32); + + let env = Envelope { + macp_version: "1.0".into(), + mode: "decision".into(), + message_type: "Message".into(), + message_id: "m2".into(), + session_id: "s_flow".into(), + sender: "alice".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: b"hello".to_vec(), + }; + let ack = do_send(&server, env).await; + assert!(ack.ok); + assert_eq!(ack.session_state, PbSessionState::Open as i32); + + let env = Envelope { + macp_version: "1.0".into(), + mode: "decision".into(), + message_type: "Message".into(), + message_id: "m3".into(), + session_id: "s_flow".into(), + sender: "alice".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: b"resolve".to_vec(), + }; + let ack = do_send(&server, env).await; + assert!(ack.ok); + assert_eq!(ack.session_state, PbSessionState::Resolved as i32); + + let env = Envelope { + macp_version: "1.0".into(), + mode: "decision".into(), + message_type: "Message".into(), + message_id: "m4".into(), + session_id: "s_flow".into(), + sender: "alice".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: b"too late".to_vec(), + }; + let ack = do_send(&server, env).await; + assert!(!ack.ok); + assert_eq!(ack.error.unwrap().code, "SESSION_NOT_OPEN"); + } + + // --- Full multi-round flow through server --- + + #[tokio::test] + async fn full_multi_round_flow_through_server() { + let (server, _) = make_server(); + + let start_payload = SessionStartPayload { + intent: String::new(), + ttl_ms: 60_000, + participants: vec!["alice".into(), "bob".into()], + mode_version: String::new(), + configuration_version: String::new(), + policy_version: String::new(), + context: vec![], + roots: vec![], + }; + + let env = Envelope { + macp_version: "1.0".into(), + mode: "multi_round".into(), + message_type: "SessionStart".into(), + message_id: "m0".into(), + session_id: "s_mr".into(), + sender: "coordinator".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: start_payload.encode_to_vec(), + }; + let ack = do_send(&server, env).await; + assert!(ack.ok); + assert_eq!(ack.session_state, PbSessionState::Open as i32); + + let env = Envelope { + macp_version: "1.0".into(), + mode: "multi_round".into(), + message_type: "Contribute".into(), + message_id: "m1".into(), + session_id: "s_mr".into(), + sender: "alice".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: br#"{"value":"option_a"}"#.to_vec(), + }; + let ack = do_send(&server, env).await; + assert!(ack.ok); + assert_eq!(ack.session_state, PbSessionState::Open as i32); + + let env = Envelope { + macp_version: "1.0".into(), + mode: "multi_round".into(), + message_type: "Contribute".into(), + message_id: "m2".into(), + session_id: "s_mr".into(), + sender: "bob".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: br#"{"value":"option_a"}"#.to_vec(), + }; + let ack = do_send(&server, env).await; + assert!(ack.ok); + assert_eq!(ack.session_state, PbSessionState::Resolved as i32); + + let resp = server + .get_session(Request::new(GetSessionRequest { + session_id: "s_mr".into(), + })) + .await + .unwrap(); + let meta = resp.into_inner().metadata.unwrap(); + assert_eq!(meta.state, PbSessionState::Resolved as i32); + } + + // --- Unknown mode error code --- + + #[tokio::test] + async fn unknown_mode_error_code() { + let (server, _) = make_server(); + + let env = Envelope { + macp_version: "1.0".into(), + mode: "nonexistent_mode".into(), + message_type: "SessionStart".into(), + message_id: "m1".into(), + session_id: "s1".into(), + sender: "test".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: vec![], + }; + let ack = do_send(&server, env).await; + assert!(!ack.ok); + assert_eq!(ack.error.as_ref().unwrap().code, "MODE_NOT_SUPPORTED"); + } + + // --- Duplicate session error code --- + + #[tokio::test] + async fn duplicate_session_error_code() { + let (server, _) = make_server(); + + let env = Envelope { + macp_version: "1.0".into(), + mode: "decision".into(), + message_type: "SessionStart".into(), + message_id: "m1".into(), + session_id: "s1".into(), + sender: "test".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: vec![], + }; + do_send(&server, env).await; + + let env = Envelope { + macp_version: "1.0".into(), + mode: "decision".into(), + message_type: "SessionStart".into(), + message_id: "m2".into(), + session_id: "s1".into(), + sender: "test".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: vec![], + }; + let ack = do_send(&server, env).await; + assert!(!ack.ok); + assert_eq!(ack.error.as_ref().unwrap().code, "INVALID_ENVELOPE"); + } + + // --- Initialize RPC --- + + #[tokio::test] + async fn initialize_success() { + let (server, _) = make_server(); + + let resp = server + .initialize(Request::new(InitializeRequest { + supported_protocol_versions: vec!["1.0".into()], + client_info: None, + capabilities: None, + })) + .await + .unwrap(); + let init = resp.into_inner(); + assert_eq!(init.selected_protocol_version, "1.0"); + assert!(init.runtime_info.is_some()); + let info = init.runtime_info.unwrap(); + assert_eq!(info.name, "macp-runtime"); + assert_eq!(info.version, "0.2"); + assert!(init.capabilities.is_some()); + assert!(!init.supported_modes.is_empty()); + } + + #[tokio::test] + async fn initialize_no_common_version() { + let (server, _) = make_server(); + + let result = server + .initialize(Request::new(InitializeRequest { + supported_protocol_versions: vec!["2.0".into()], + client_info: None, + capabilities: None, + })) + .await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn initialize_selects_1_0_from_multiple() { + let (server, _) = make_server(); + + let resp = server + .initialize(Request::new(InitializeRequest { + supported_protocol_versions: vec!["1.0".into(), "2.0".into()], + client_info: None, + capabilities: None, + })) + .await + .unwrap(); + assert_eq!(resp.into_inner().selected_protocol_version, "1.0"); + } + + // --- ListModes --- + + #[tokio::test] + async fn list_modes_returns_descriptors() { + let (server, _) = make_server(); + + let resp = server + .list_modes(Request::new(ListModesRequest {})) + .await + .unwrap(); + let modes = resp.into_inner().modes; + assert_eq!(modes.len(), 2); + assert_eq!(modes[0].mode, "macp.mode.decision.v1"); + assert_eq!(modes[1].mode, "macp.mode.multi_round.v1"); + } + + // --- GetManifest --- + + #[tokio::test] + async fn get_manifest_returns_runtime_manifest() { + let (server, _) = make_server(); + + let resp = server + .get_manifest(Request::new(GetManifestRequest { + agent_id: String::new(), + })) + .await + .unwrap(); + let manifest = resp.into_inner().manifest.unwrap(); + assert_eq!(manifest.agent_id, "macp-runtime"); + assert!(!manifest.supported_modes.is_empty()); + } + + // --- ListRoots --- + + #[tokio::test] + async fn list_roots_returns_empty() { + let (server, _) = make_server(); + + let resp = server + .list_roots(Request::new(ListRootsRequest {})) + .await + .unwrap(); + assert!(resp.into_inner().roots.is_empty()); + } } diff --git a/src/session.rs b/src/session.rs index a3ad16a..9a22744 100644 --- a/src/session.rs +++ b/src/session.rs @@ -1,5 +1,7 @@ use crate::error::MacpError; -use serde::Deserialize; +use crate::pb::SessionStartPayload; +use prost::Message; +use std::collections::HashSet; pub const DEFAULT_TTL_MS: i64 = 60_000; pub const MAX_TTL_MS: i64 = 24 * 60 * 60 * 1000; @@ -16,115 +18,182 @@ pub struct Session { pub session_id: String, pub state: SessionState, pub ttl_expiry: i64, + pub started_at_unix_ms: i64, pub resolution: Option>, pub mode: String, pub mode_state: Vec, pub participants: Vec, + pub seen_message_ids: HashSet, + // RFC version fields from SessionStartPayload + pub intent: String, + pub mode_version: String, + pub configuration_version: String, + pub policy_version: String, } -#[derive(Debug, Deserialize)] -struct SessionStartConfig { - ttl_ms: Option, +/// Parse a protobuf-encoded SessionStartPayload from raw bytes. +pub fn parse_session_start_payload(payload: &[u8]) -> Result { + if payload.is_empty() { + return Ok(SessionStartPayload::default()); + } + SessionStartPayload::decode(payload).map_err(|_| MacpError::InvalidPayload) } -pub fn parse_session_start_ttl_ms(payload: &[u8]) -> Result { - if payload.is_empty() { +/// Extract and validate TTL from a parsed SessionStartPayload. +pub fn extract_ttl_ms(payload: &SessionStartPayload) -> Result { + if payload.ttl_ms == 0 { return Ok(DEFAULT_TTL_MS); } - - let text = std::str::from_utf8(payload).map_err(|_| MacpError::InvalidEnvelope)?; - let config: SessionStartConfig = - serde_json::from_str(text).map_err(|_| MacpError::InvalidEnvelope)?; - - match config.ttl_ms { - None => Ok(DEFAULT_TTL_MS), - Some(ms) if !(1..=MAX_TTL_MS).contains(&ms) => Err(MacpError::InvalidTtl), - Some(ms) => Ok(ms), + if !(1..=MAX_TTL_MS).contains(&payload.ttl_ms) { + return Err(MacpError::InvalidTtl); } + Ok(payload.ttl_ms) } #[cfg(test)] mod tests { use super::*; + use prost::Message; + + fn encode_payload(ttl_ms: i64, participants: Vec) -> Vec { + let payload = SessionStartPayload { + intent: String::new(), + participants, + mode_version: String::new(), + configuration_version: String::new(), + policy_version: String::new(), + ttl_ms, + context: vec![], + roots: vec![], + }; + payload.encode_to_vec() + } #[test] - fn parse_ttl_empty_payload_returns_default() { - let result = parse_session_start_ttl_ms(b""); - assert_eq!(result.unwrap(), DEFAULT_TTL_MS); + fn parse_empty_payload_returns_default() { + let result = parse_session_start_payload(b"").unwrap(); + assert_eq!(result.ttl_ms, 0); + assert!(result.participants.is_empty()); } #[test] - fn parse_ttl_valid_json_with_ttl() { - let payload = br#"{"ttl_ms": 5000}"#; - assert_eq!(parse_session_start_ttl_ms(payload).unwrap(), 5000); + fn parse_valid_protobuf_payload() { + let bytes = encode_payload(5000, vec!["alice".into(), "bob".into()]); + let result = parse_session_start_payload(&bytes).unwrap(); + assert_eq!(result.ttl_ms, 5000); + assert_eq!(result.participants, vec!["alice", "bob"]); } #[test] - fn parse_ttl_json_missing_field_returns_default() { - let payload = br#"{}"#; - assert_eq!(parse_session_start_ttl_ms(payload).unwrap(), DEFAULT_TTL_MS); + fn parse_invalid_bytes_returns_error() { + let err = parse_session_start_payload(b"not protobuf").unwrap_err(); + assert_eq!(err.to_string(), "InvalidPayload"); } #[test] - fn parse_ttl_json_null_field_returns_default() { - let payload = br#"{"ttl_ms": null}"#; - assert_eq!(parse_session_start_ttl_ms(payload).unwrap(), DEFAULT_TTL_MS); + fn extract_ttl_default_when_zero() { + let payload = SessionStartPayload::default(); + assert_eq!(extract_ttl_ms(&payload).unwrap(), DEFAULT_TTL_MS); } #[test] - fn parse_ttl_boundary_min_valid() { - let payload = br#"{"ttl_ms": 1}"#; - assert_eq!(parse_session_start_ttl_ms(payload).unwrap(), 1); + fn extract_ttl_valid() { + let payload = SessionStartPayload { + ttl_ms: 5000, + ..Default::default() + }; + assert_eq!(extract_ttl_ms(&payload).unwrap(), 5000); } #[test] - fn parse_ttl_boundary_max_valid() { - let payload = format!(r#"{{"ttl_ms": {}}}"#, MAX_TTL_MS); - assert_eq!( - parse_session_start_ttl_ms(payload.as_bytes()).unwrap(), - MAX_TTL_MS - ); + fn extract_ttl_boundary_min() { + let payload = SessionStartPayload { + ttl_ms: 1, + ..Default::default() + }; + assert_eq!(extract_ttl_ms(&payload).unwrap(), 1); } #[test] - fn parse_ttl_zero_returns_invalid() { - let payload = br#"{"ttl_ms": 0}"#; - let err = parse_session_start_ttl_ms(payload).unwrap_err(); - assert_eq!(err.to_string(), "InvalidTtl"); + fn extract_ttl_boundary_max() { + let payload = SessionStartPayload { + ttl_ms: MAX_TTL_MS, + ..Default::default() + }; + assert_eq!(extract_ttl_ms(&payload).unwrap(), MAX_TTL_MS); } #[test] - fn parse_ttl_negative_returns_invalid() { - let payload = br#"{"ttl_ms": -5000}"#; - let err = parse_session_start_ttl_ms(payload).unwrap_err(); + fn extract_ttl_negative_returns_invalid() { + let payload = SessionStartPayload { + ttl_ms: -5000, + ..Default::default() + }; + let err = extract_ttl_ms(&payload).unwrap_err(); assert_eq!(err.to_string(), "InvalidTtl"); } #[test] - fn parse_ttl_exceeds_max_returns_invalid() { - let payload = format!(r#"{{"ttl_ms": {}}}"#, MAX_TTL_MS + 1); - let err = parse_session_start_ttl_ms(payload.as_bytes()).unwrap_err(); + fn extract_ttl_exceeds_max_returns_invalid() { + let payload = SessionStartPayload { + ttl_ms: MAX_TTL_MS + 1, + ..Default::default() + }; + let err = extract_ttl_ms(&payload).unwrap_err(); assert_eq!(err.to_string(), "InvalidTtl"); } #[test] - fn parse_ttl_invalid_utf8_returns_invalid_envelope() { - let payload: &[u8] = &[0xff, 0xfe, 0xfd]; - let err = parse_session_start_ttl_ms(payload).unwrap_err(); - assert_eq!(err.to_string(), "InvalidEnvelope"); + fn parse_payload_with_context_bytes() { + let payload = SessionStartPayload { + intent: "test intent".into(), + ttl_ms: 10_000, + participants: vec!["alice".into()], + mode_version: "1.0".into(), + configuration_version: String::new(), + policy_version: String::new(), + context: b"some context data".to_vec(), + roots: vec![], + }; + let bytes = payload.encode_to_vec(); + let result = parse_session_start_payload(&bytes).unwrap(); + assert_eq!(result.ttl_ms, 10_000); + assert_eq!(result.participants, vec!["alice"]); + assert_eq!(result.intent, "test intent"); + assert_eq!(result.mode_version, "1.0"); + assert_eq!(result.context, b"some context data"); } #[test] - fn parse_ttl_invalid_json_returns_invalid_envelope() { - let payload = b"not json at all"; - let err = parse_session_start_ttl_ms(payload).unwrap_err(); - assert_eq!(err.to_string(), "InvalidEnvelope"); + fn parse_payload_with_only_participants() { + let payload = SessionStartPayload { + ttl_ms: 0, + participants: vec!["a".into(), "b".into(), "c".into()], + ..Default::default() + }; + let bytes = payload.encode_to_vec(); + let result = parse_session_start_payload(&bytes).unwrap(); + assert_eq!(result.ttl_ms, 0); + assert_eq!(result.participants.len(), 3); + } + + #[test] + fn extract_ttl_at_minus_one_returns_invalid() { + let payload = SessionStartPayload { + ttl_ms: -1, + ..Default::default() + }; + let err = extract_ttl_ms(&payload).unwrap_err(); + assert_eq!(err.to_string(), "InvalidTtl"); } #[test] - fn parse_ttl_wrong_type_returns_invalid_envelope() { - let payload = br#"{"ttl_ms": "five thousand"}"#; - let err = parse_session_start_ttl_ms(payload).unwrap_err(); - assert_eq!(err.to_string(), "InvalidEnvelope"); + fn session_state_equality() { + assert_eq!(SessionState::Open, SessionState::Open); + assert_eq!(SessionState::Resolved, SessionState::Resolved); + assert_eq!(SessionState::Expired, SessionState::Expired); + assert_ne!(SessionState::Open, SessionState::Resolved); + assert_ne!(SessionState::Open, SessionState::Expired); + assert_ne!(SessionState::Resolved, SessionState::Expired); } }