diff --git a/proto/buf.lock b/proto/buf.lock new file mode 100644 index 0000000..4f98143 --- /dev/null +++ b/proto/buf.lock @@ -0,0 +1,2 @@ +# Generated by buf. DO NOT EDIT. +version: v2 diff --git a/src/main.rs b/src/main.rs index 6719ff6..36a97e8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,7 +17,7 @@ async fn main() -> Result<(), Box> { let runtime = Arc::new(Runtime::new(registry, log_store)); let svc = MacpServer::new(runtime); - println!("macp-runtime v0.2 (RFC-0001) listening on {}", addr); + println!("macp-runtime v0.3 (RFC-0001) listening on {}", addr); Server::builder() .add_service(pb::macp_runtime_service_server::MacpRuntimeServiceServer::new(svc)) diff --git a/src/mode/decision.rs b/src/mode/decision.rs index 4879a43..67fcc00 100644 --- a/src/mode/decision.rs +++ b/src/mode/decision.rs @@ -1,9 +1,11 @@ +use crate::decision_pb::{EvaluationPayload, ObjectionPayload, ProposalPayload, VotePayload}; use crate::error::MacpError; use crate::mode::{Mode, ModeResponse}; use crate::pb::Envelope; use crate::session::Session; +use prost::Message; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; +use std::collections::BTreeMap; /// Phase of the decision lifecycle. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] @@ -17,10 +19,10 @@ pub enum DecisionPhase { /// Internal state tracked across the decision lifecycle. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DecisionState { - pub proposals: HashMap, + pub proposals: BTreeMap, pub evaluations: Vec, pub objections: Vec, - pub votes: HashMap, + pub votes: BTreeMap, pub phase: DecisionPhase, } @@ -58,6 +60,7 @@ pub struct Vote { } /// DecisionMode implements the RFC-compliant Proposal -> Evaluation -> Vote -> Commitment lifecycle. +/// Payloads are protobuf-encoded using types from `decision.proto`. /// Also supports the legacy `payload == b"resolve"` behavior for backward compatibility. pub struct DecisionMode; @@ -74,14 +77,18 @@ impl DecisionMode { impl Mode for DecisionMode { fn on_session_start( &self, - _session: &Session, + session: &Session, _env: &Envelope, ) -> Result { + // Enforce declared participant model for canonical mode name + if session.mode == "macp.mode.decision.v1" && session.participants.is_empty() { + return Err(MacpError::InvalidPayload); + } let state = DecisionState { - proposals: HashMap::new(), + proposals: BTreeMap::new(), evaluations: Vec::new(), objections: Vec::new(), - votes: HashMap::new(), + votes: BTreeMap::new(), phase: DecisionPhase::Proposal, }; Ok(ModeResponse::PersistState(Self::encode_state(&state))) @@ -101,10 +108,10 @@ impl Mode for DecisionMode { let mut state = if session.mode_state.is_empty() { DecisionState { - proposals: HashMap::new(), + proposals: BTreeMap::new(), evaluations: Vec::new(), objections: Vec::new(), - votes: HashMap::new(), + votes: BTreeMap::new(), phase: DecisionPhase::Proposal, } } else { @@ -117,8 +124,8 @@ impl Mode for DecisionMode { match env.message_type.as_str() { "Proposal" => { - let payload: ProposalInput = - serde_json::from_slice(&env.payload).map_err(|_| MacpError::InvalidPayload)?; + let payload = ProposalPayload::decode(&*env.payload) + .map_err(|_| MacpError::InvalidPayload)?; if payload.proposal_id.is_empty() { return Err(MacpError::InvalidPayload); } @@ -127,7 +134,7 @@ impl Mode for DecisionMode { Proposal { proposal_id: payload.proposal_id, option: payload.option, - rationale: payload.rationale.unwrap_or_default(), + rationale: payload.rationale, sender: env.sender.clone(), }, ); @@ -135,30 +142,34 @@ impl Mode for DecisionMode { Ok(ModeResponse::PersistState(Self::encode_state(&state))) } "Evaluation" => { - let payload: EvaluationInput = - serde_json::from_slice(&env.payload).map_err(|_| MacpError::InvalidPayload)?; + let payload = EvaluationPayload::decode(&*env.payload) + .map_err(|_| MacpError::InvalidPayload)?; if !state.proposals.contains_key(&payload.proposal_id) { return Err(MacpError::InvalidPayload); } state.evaluations.push(Evaluation { proposal_id: payload.proposal_id, recommendation: payload.recommendation, - confidence: payload.confidence.unwrap_or(0.0), - reason: payload.reason.unwrap_or_default(), + confidence: payload.confidence, + reason: payload.reason, sender: env.sender.clone(), }); Ok(ModeResponse::PersistState(Self::encode_state(&state))) } "Objection" => { - let payload: ObjectionInput = - serde_json::from_slice(&env.payload).map_err(|_| MacpError::InvalidPayload)?; + let payload = ObjectionPayload::decode(&*env.payload) + .map_err(|_| MacpError::InvalidPayload)?; if !state.proposals.contains_key(&payload.proposal_id) { return Err(MacpError::InvalidPayload); } state.objections.push(Objection { proposal_id: payload.proposal_id, reason: payload.reason, - severity: payload.severity.unwrap_or_else(|| "medium".into()), + severity: if payload.severity.is_empty() { + "medium".into() + } else { + payload.severity + }, sender: env.sender.clone(), }); Ok(ModeResponse::PersistState(Self::encode_state(&state))) @@ -167,8 +178,8 @@ impl Mode for DecisionMode { if state.proposals.is_empty() { return Err(MacpError::InvalidPayload); } - let payload: VoteInput = - serde_json::from_slice(&env.payload).map_err(|_| MacpError::InvalidPayload)?; + let payload = + VotePayload::decode(&*env.payload).map_err(|_| MacpError::InvalidPayload)?; if !state.proposals.contains_key(&payload.proposal_id) { return Err(MacpError::InvalidPayload); } @@ -177,7 +188,7 @@ impl Mode for DecisionMode { Vote { proposal_id: payload.proposal_id, vote: payload.vote, - reason: payload.reason.unwrap_or_default(), + reason: payload.reason, sender: env.sender.clone(), }, ); @@ -199,36 +210,6 @@ impl Mode for DecisionMode { } } -// Input types for JSON deserialization from payload -#[derive(Deserialize)] -struct ProposalInput { - proposal_id: String, - option: String, - rationale: Option, -} - -#[derive(Deserialize)] -struct EvaluationInput { - proposal_id: String, - recommendation: String, - confidence: Option, - reason: Option, -} - -#[derive(Deserialize)] -struct ObjectionInput { - proposal_id: String, - reason: String, - severity: Option, -} - -#[derive(Deserialize)] -struct VoteInput { - proposal_id: String, - vote: String, - reason: Option, -} - #[cfg(test)] mod tests { use super::*; @@ -274,10 +255,10 @@ mod tests { fn empty_state() -> DecisionState { DecisionState { - proposals: HashMap::new(), + proposals: BTreeMap::new(), evaluations: Vec::new(), objections: Vec::new(), - votes: HashMap::new(), + votes: BTreeMap::new(), phase: DecisionPhase::Proposal, } } @@ -312,6 +293,50 @@ mod tests { state } + // Helper to encode protobuf payloads + fn encode_proposal(proposal_id: &str, option: &str, rationale: &str) -> Vec { + ProposalPayload { + proposal_id: proposal_id.into(), + option: option.into(), + rationale: rationale.into(), + supporting_data: vec![], + } + .encode_to_vec() + } + + fn encode_evaluation( + proposal_id: &str, + recommendation: &str, + confidence: f64, + reason: &str, + ) -> Vec { + EvaluationPayload { + proposal_id: proposal_id.into(), + recommendation: recommendation.into(), + confidence, + reason: reason.into(), + } + .encode_to_vec() + } + + fn encode_objection(proposal_id: &str, reason: &str, severity: &str) -> Vec { + ObjectionPayload { + proposal_id: proposal_id.into(), + reason: reason.into(), + severity: severity.into(), + } + .encode_to_vec() + } + + fn encode_vote(proposal_id: &str, vote: &str, reason: &str) -> Vec { + VotePayload { + proposal_id: proposal_id.into(), + vote: vote.into(), + reason: reason.into(), + } + .encode_to_vec() + } + #[test] fn session_start_initializes_state() { let mode = DecisionMode; @@ -357,12 +382,8 @@ mod tests { fn proposal_creates_entry_and_advances_phase() { let mode = DecisionMode; let session = session_with_state(&empty_state()); - let payload = serde_json::json!({ - "proposal_id": "p1", - "option": "option_a", - "rationale": "it's the best" - }); - let env = test_envelope("Proposal", payload.to_string().as_bytes()); + let payload = encode_proposal("p1", "option_a", "it's the best"); + let env = test_envelope("Proposal", &payload); let result = mode.on_message(&session, &env).unwrap(); match result { @@ -382,20 +403,17 @@ mod tests { fn proposal_with_empty_id_rejected() { let mode = DecisionMode; let session = session_with_state(&empty_state()); - let payload = serde_json::json!({ - "proposal_id": "", - "option": "opt" - }); - let env = test_envelope("Proposal", payload.to_string().as_bytes()); + let payload = encode_proposal("", "opt", ""); + let env = test_envelope("Proposal", &payload); let err = mode.on_message(&session, &env).unwrap_err(); assert_eq!(err.to_string(), "InvalidPayload"); } #[test] - fn proposal_with_bad_json_rejected() { + fn proposal_with_bad_payload_rejected() { let mode = DecisionMode; let session = session_with_state(&empty_state()); - let env = test_envelope("Proposal", b"not json"); + let env = test_envelope("Proposal", b"not protobuf"); let err = mode.on_message(&session, &env).unwrap_err(); assert_eq!(err.to_string(), "InvalidPayload"); } @@ -406,13 +424,8 @@ mod tests { fn evaluation_for_existing_proposal() { let mode = DecisionMode; let session = session_with_state(&state_with_proposal()); - let payload = serde_json::json!({ - "proposal_id": "p1", - "recommendation": "APPROVE", - "confidence": 0.9, - "reason": "looks good" - }); - let env = test_envelope("Evaluation", payload.to_string().as_bytes()); + let payload = encode_evaluation("p1", "APPROVE", 0.9, "looks good"); + let env = test_envelope("Evaluation", &payload); let result = mode.on_message(&session, &env).unwrap(); match result { @@ -429,11 +442,8 @@ mod tests { fn evaluation_for_nonexistent_proposal_rejected() { let mode = DecisionMode; let session = session_with_state(&state_with_proposal()); - let payload = serde_json::json!({ - "proposal_id": "nonexistent", - "recommendation": "APPROVE" - }); - let env = test_envelope("Evaluation", payload.to_string().as_bytes()); + let payload = encode_evaluation("nonexistent", "APPROVE", 0.9, ""); + let env = test_envelope("Evaluation", &payload); let err = mode.on_message(&session, &env).unwrap_err(); assert_eq!(err.to_string(), "InvalidPayload"); } @@ -444,12 +454,8 @@ mod tests { fn objection_for_existing_proposal() { let mode = DecisionMode; let session = session_with_state(&state_with_proposal()); - let payload = serde_json::json!({ - "proposal_id": "p1", - "reason": "too risky", - "severity": "high" - }); - let env = test_envelope("Objection", payload.to_string().as_bytes()); + let payload = encode_objection("p1", "too risky", "high"); + let env = test_envelope("Objection", &payload); let result = mode.on_message(&session, &env).unwrap(); match result { @@ -466,11 +472,8 @@ mod tests { fn objection_for_nonexistent_proposal_rejected() { let mode = DecisionMode; let session = session_with_state(&state_with_proposal()); - let payload = serde_json::json!({ - "proposal_id": "nope", - "reason": "bad" - }); - let env = test_envelope("Objection", payload.to_string().as_bytes()); + let payload = encode_objection("nope", "bad", "medium"); + let env = test_envelope("Objection", &payload); let err = mode.on_message(&session, &env).unwrap_err(); assert_eq!(err.to_string(), "InvalidPayload"); } @@ -481,12 +484,8 @@ mod tests { fn vote_for_existing_proposal() { let mode = DecisionMode; let session = session_with_state(&state_with_proposal()); - let payload = serde_json::json!({ - "proposal_id": "p1", - "vote": "approve", - "reason": "I agree" - }); - let env = test_envelope("Vote", payload.to_string().as_bytes()); + let payload = encode_vote("p1", "approve", "I agree"); + let env = test_envelope("Vote", &payload); let result = mode.on_message(&session, &env).unwrap(); match result { @@ -504,11 +503,8 @@ mod tests { fn vote_before_proposal_rejected() { let mode = DecisionMode; let session = session_with_state(&empty_state()); - let payload = serde_json::json!({ - "proposal_id": "p1", - "vote": "approve" - }); - let env = test_envelope("Vote", payload.to_string().as_bytes()); + let payload = encode_vote("p1", "approve", ""); + let env = test_envelope("Vote", &payload); let err = mode.on_message(&session, &env).unwrap_err(); assert_eq!(err.to_string(), "InvalidPayload"); } @@ -517,11 +513,8 @@ mod tests { fn vote_for_nonexistent_proposal_rejected() { let mode = DecisionMode; let session = session_with_state(&state_with_proposal()); - let payload = serde_json::json!({ - "proposal_id": "nope", - "vote": "approve" - }); - let env = test_envelope("Vote", payload.to_string().as_bytes()); + let payload = encode_vote("nope", "approve", ""); + let env = test_envelope("Vote", &payload); let err = mode.on_message(&session, &env).unwrap_err(); assert_eq!(err.to_string(), "InvalidPayload"); } @@ -530,11 +523,8 @@ mod tests { fn vote_overwrites_previous_vote_by_sender() { let mode = DecisionMode; let session = session_with_state(&state_with_proposal()); - let payload = serde_json::json!({ - "proposal_id": "p1", - "vote": "approve" - }); - let env = test_envelope("Vote", payload.to_string().as_bytes()); + let payload = encode_vote("p1", "approve", ""); + let env = test_envelope("Vote", &payload); let result = mode.on_message(&session, &env).unwrap(); let data = match result { ModeResponse::PersistState(d) => d, @@ -544,11 +534,8 @@ mod tests { // Second vote by same sender let mut session2 = test_session(); session2.mode_state = data; - let payload2 = serde_json::json!({ - "proposal_id": "p1", - "vote": "reject" - }); - let env2 = test_envelope("Vote", payload2.to_string().as_bytes()); + let payload2 = encode_vote("p1", "reject", ""); + let env2 = test_envelope("Vote", &payload2); let result2 = mode.on_message(&session2, &env2).unwrap(); match result2 { ModeResponse::PersistState(data) => { @@ -566,12 +553,8 @@ mod tests { fn commitment_resolves_session() { let mode = DecisionMode; let session = session_with_state(&state_with_vote()); - let payload = serde_json::json!({ - "commitment_id": "c1", - "action": "deploy option_a", - "authority_scope": "team-alpha" - }); - let env = test_envelope("Commitment", payload.to_string().as_bytes()); + let payload = b"commitment-data"; + let env = test_envelope("Commitment", payload); let result = mode.on_message(&session, &env).unwrap(); match result { @@ -588,11 +571,7 @@ mod tests { fn commitment_without_votes_rejected() { let mode = DecisionMode; let session = session_with_state(&state_with_proposal()); - let payload = serde_json::json!({ - "commitment_id": "c1", - "action": "deploy" - }); - let env = test_envelope("Commitment", payload.to_string().as_bytes()); + let env = test_envelope("Commitment", b"commit-data"); let err = mode.on_message(&session, &env).unwrap_err(); assert_eq!(err.to_string(), "InvalidPayload"); } @@ -612,49 +591,31 @@ mod tests { } // Proposal - let payload = serde_json::json!({ - "proposal_id": "p1", - "option": "option_a", - "rationale": "best choice" - }); - let env = test_envelope("Proposal", payload.to_string().as_bytes()); + let payload = encode_proposal("p1", "option_a", "best choice"); + let env = test_envelope("Proposal", &payload); let result = mode.on_message(&session, &env).unwrap(); if let ModeResponse::PersistState(data) = result { session.mode_state = data; } // Evaluation - let payload = serde_json::json!({ - "proposal_id": "p1", - "recommendation": "APPROVE", - "confidence": 0.95 - }); - let env = test_envelope("Evaluation", payload.to_string().as_bytes()); + let payload = encode_evaluation("p1", "APPROVE", 0.95, ""); + let env = test_envelope("Evaluation", &payload); let result = mode.on_message(&session, &env).unwrap(); if let ModeResponse::PersistState(data) = result { session.mode_state = data; } // Vote - let payload = serde_json::json!({ - "proposal_id": "p1", - "vote": "approve", - "reason": "agreed" - }); - let env = test_envelope("Vote", payload.to_string().as_bytes()); + let payload = encode_vote("p1", "approve", "agreed"); + let env = test_envelope("Vote", &payload); let result = mode.on_message(&session, &env).unwrap(); if let ModeResponse::PersistState(data) = result { session.mode_state = data; } // Commitment - let payload = serde_json::json!({ - "commitment_id": "c1", - "action": "deploy option_a", - "authority_scope": "team", - "reason": "consensus reached" - }); - let env = test_envelope("Commitment", payload.to_string().as_bytes()); + let env = test_envelope("Commitment", b"final-commitment"); let result = mode.on_message(&session, &env).unwrap(); assert!(matches!(result, ModeResponse::PersistAndResolve { .. })); } @@ -666,12 +627,113 @@ mod tests { state.phase = DecisionPhase::Committed; let session = session_with_state(&state); - let payload = serde_json::json!({ - "proposal_id": "p1", - "option": "option_b" - }); - let env = test_envelope("Proposal", payload.to_string().as_bytes()); + let payload = encode_proposal("p1", "option_b", ""); + let env = test_envelope("Proposal", &payload); let err = mode.on_message(&session, &env).unwrap_err(); assert_eq!(err.to_string(), "SessionNotOpen"); } + + #[test] + fn objection_default_severity() { + let mode = DecisionMode; + let session = session_with_state(&state_with_proposal()); + // Encode with empty severity -- should default to "medium" + let payload = encode_objection("p1", "bad idea", ""); + let env = test_envelope("Objection", &payload); + + let result = mode.on_message(&session, &env).unwrap(); + match result { + ModeResponse::PersistState(data) => { + let state: DecisionState = serde_json::from_slice(&data).unwrap(); + assert_eq!(state.objections[0].severity, "medium"); + } + _ => panic!("Expected PersistState"), + } + } + + #[test] + fn btreemap_deterministic_serialization() { + // Verify BTreeMap produces deterministic output + let mut state1 = empty_state(); + state1.proposals.insert( + "z".into(), + Proposal { + proposal_id: "z".into(), + option: "z".into(), + rationale: "".into(), + sender: "".into(), + }, + ); + state1.proposals.insert( + "a".into(), + Proposal { + proposal_id: "a".into(), + option: "a".into(), + rationale: "".into(), + sender: "".into(), + }, + ); + + let mut state2 = empty_state(); + state2.proposals.insert( + "a".into(), + Proposal { + proposal_id: "a".into(), + option: "a".into(), + rationale: "".into(), + sender: "".into(), + }, + ); + state2.proposals.insert( + "z".into(), + Proposal { + proposal_id: "z".into(), + option: "z".into(), + rationale: "".into(), + sender: "".into(), + }, + ); + + let enc1 = DecisionMode::encode_state(&state1); + let enc2 = DecisionMode::encode_state(&state2); + assert_eq!(enc1, enc2); + } + + // --- Participant enforcement tests --- + + #[test] + fn canonical_mode_requires_participants() { + let mode = DecisionMode; + let mut session = test_session(); + session.mode = "macp.mode.decision.v1".into(); + session.participants = vec![]; // empty + let env = test_envelope("SessionStart", b""); + + let err = mode.on_session_start(&session, &env).unwrap_err(); + assert_eq!(err.to_string(), "InvalidPayload"); + } + + #[test] + fn canonical_mode_with_participants_succeeds() { + let mode = DecisionMode; + let mut session = test_session(); + session.mode = "macp.mode.decision.v1".into(); + session.participants = vec!["alice".into(), "bob".into()]; + let env = test_envelope("SessionStart", b""); + + let result = mode.on_session_start(&session, &env).unwrap(); + assert!(matches!(result, ModeResponse::PersistState(_))); + } + + #[test] + fn legacy_alias_allows_empty_participants() { + let mode = DecisionMode; + let mut session = test_session(); + session.mode = "decision".into(); + session.participants = vec![]; // empty -- should be allowed for legacy alias + let env = test_envelope("SessionStart", b""); + + let result = mode.on_session_start(&session, &env).unwrap(); + assert!(matches!(result, ModeResponse::PersistState(_))); + } } diff --git a/src/runtime.rs b/src/runtime.rs index ecccb85..663ad77 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -130,18 +130,9 @@ impl Runtime { let now = Utc::now().timestamp_millis(); let ttl_expiry = now + ttl_ms; - // Create session log and append incoming entry - self.log_store.create_session_log(&env.session_id).await; - self.log_store - .append(&env.session_id, Self::make_incoming_entry(env)) - .await; - // Extract participants from SessionStartPayload let participants = start_payload.participants.clone(); - let mut seen_message_ids = HashSet::new(); - seen_message_ids.insert(env.message_id.clone()); - // Create session with initial state and RFC version fields let session = Session { session_id: env.session_id.clone(), @@ -152,18 +143,24 @@ impl Runtime { mode: mode_name.to_string(), mode_state: vec![], participants, - seen_message_ids, + seen_message_ids: HashSet::new(), intent: start_payload.intent.clone(), mode_version: start_payload.mode_version.clone(), configuration_version: start_payload.configuration_version.clone(), policy_version: start_payload.policy_version.clone(), }; - // Call mode's on_session_start + // Call mode's on_session_start BEFORE recording side effects let response = mode.on_session_start(&session, env)?; - // Insert session and apply response + // Only on success: create log and record message_id + self.log_store.create_session_log(&env.session_id).await; + self.log_store + .append(&env.session_id, Self::make_incoming_entry(env)) + .await; + let mut session = session; + session.seen_message_ids.insert(env.message_id.clone()); Self::apply_mode_response(&mut session, response); // For multi_round mode, extract participants from mode_state if not already set @@ -221,18 +218,18 @@ impl Runtime { return Err(MacpError::Forbidden); } - // Record message_id - session.seen_message_ids.insert(env.message_id.clone()); + let mode_name = session.mode.clone(); + let mode = self.modes.get(&mode_name).ok_or(MacpError::UnknownMode)?; + + // Dispatch to mode BEFORE recording side effects + let response = mode.on_message(session, env)?; - // Log incoming message before mode dispatch + // Only on success: record message_id and log + session.seen_message_ids.insert(env.message_id.clone()); self.log_store .append(&env.session_id, Self::make_incoming_entry(env)) .await; - let mode_name = session.mode.clone(); - let mode = self.modes.get(&mode_name).ok_or(MacpError::UnknownMode)?; - - let response = mode.on_message(session, env)?; Self::apply_mode_response(session, response); Ok(ProcessResult { @@ -249,6 +246,23 @@ impl Runtime { }) } + /// Get a session with TTL check. Transitions expired sessions to Expired state. + pub async fn get_session_checked(&self, session_id: &str) -> Option { + let mut guard = self.registry.sessions.write().await; + if let Some(session) = guard.get_mut(session_id) { + let now = Utc::now().timestamp_millis(); + if session.state == SessionState::Open && now > session.ttl_expiry { + self.log_store + .append(session_id, Self::make_internal_entry("TtlExpired", b"")) + .await; + session.state = SessionState::Expired; + } + Some(session.clone()) + } else { + None + } + } + /// Cancel a session by ID. Idempotent for already-resolved/expired sessions. pub async fn cancel_session( &self, @@ -259,6 +273,15 @@ impl Runtime { let session = guard.get_mut(session_id).ok_or(MacpError::UnknownSession)?; + // TTL check: transition expired sessions + let now = Utc::now().timestamp_millis(); + if session.state == SessionState::Open && now > session.ttl_expiry { + self.log_store + .append(session_id, Self::make_internal_entry("TtlExpired", b"")) + .await; + session.state = SessionState::Expired; + } + // Idempotent: already resolved or expired if session.state == SessionState::Resolved || session.state == SessionState::Expired { return Ok(ProcessResult { @@ -287,6 +310,7 @@ impl Runtime { #[cfg(test)] mod tests { use super::*; + use crate::decision_pb::ProposalPayload; use crate::pb::SessionStartPayload; use prost::Message; @@ -356,7 +380,9 @@ mod tests { async fn empty_mode_defaults_to_decision() { let rt = make_runtime(); - let e = env("", "SessionStart", "m1", "s1", "alice", b""); + // Empty mode defaults to macp.mode.decision.v1 which requires participants + let payload = encode_session_start(0, vec!["alice".into()]); + let e = env("", "SessionStart", "m1", "s1", "alice", &payload); rt.process(&e).await.unwrap(); let guard = rt.registry.sessions.read().await; @@ -703,13 +729,14 @@ mod tests { async fn rfc_mode_name_works() { let rt = make_runtime(); + let payload = encode_session_start(0, vec!["alice".into()]); let e = env( "macp.mode.decision.v1", "SessionStart", "m1", "s1", "alice", - b"", + &payload, ); rt.process(&e).await.unwrap(); @@ -1097,4 +1124,164 @@ mod tests { assert_eq!(s.configuration_version, "cfg-v2"); assert_eq!(s.policy_version, "pol-v1"); } + + // --- PR #1a: Admission pipeline bug tests --- + + #[tokio::test] + async fn rejected_message_does_not_burn_message_id() { + let rt = make_runtime(); + + // Create a decision session with a proposal so we can test invalid payloads + let e = env("decision", "SessionStart", "m1", "s1", "alice", b""); + rt.process(&e).await.unwrap(); + + // Send a Proposal with invalid payload — should be rejected + let e = env("decision", "Proposal", "m2", "s1", "alice", b"not protobuf"); + let err = rt.process(&e).await.unwrap_err(); + assert_eq!(err.to_string(), "InvalidPayload"); + + // Retry the same message_id with valid payload — should succeed (not duplicate) + let valid_payload = ProposalPayload { + proposal_id: "p1".into(), + option: "option_a".into(), + rationale: "test".into(), + supporting_data: vec![], + } + .encode_to_vec(); + let e = env("decision", "Proposal", "m2", "s1", "alice", &valid_payload); + let result = rt.process(&e).await.unwrap(); + assert!(!result.duplicate); + } + + #[tokio::test] + async fn rejected_message_not_in_log() { + let rt = make_runtime(); + + let e = env("decision", "SessionStart", "m1", "s1", "alice", b""); + rt.process(&e).await.unwrap(); + + let log_before = rt.log_store.get_log("s1").await.unwrap().len(); + + // Send a Proposal with invalid payload — should be rejected + let e = env("decision", "Proposal", "m2", "s1", "alice", b"not protobuf"); + let _ = rt.process(&e).await; + + let log_after = rt.log_store.get_log("s1").await.unwrap().len(); + assert_eq!(log_before, log_after); // No new log entry for rejected message + } + + #[tokio::test] + async fn accepted_messages_still_dedup_correctly() { + let rt = make_runtime(); + + let e = env("decision", "SessionStart", "m1", "s1", "alice", b""); + rt.process(&e).await.unwrap(); + + let e = env("decision", "Message", "m2", "s1", "alice", b"hello"); + let result = rt.process(&e).await.unwrap(); + assert!(!result.duplicate); + + // Same message_id again — should be duplicate + let e = env("decision", "Message", "m2", "s1", "alice", b"hello"); + let result = rt.process(&e).await.unwrap(); + assert!(result.duplicate); + } + + #[tokio::test] + async fn session_start_mode_rejection_no_side_effects() { + let rt = make_runtime(); + + // MultiRound requires participants — empty participants should fail + let e = env("multi_round", "SessionStart", "m1", "s1", "creator", b""); + let err = rt.process(&e).await.unwrap_err(); + assert_eq!(err.to_string(), "InvalidPayload"); + + // Session should not exist + assert!(rt.registry.get_session("s1").await.is_none()); + // Log should not exist + assert!(rt.log_store.get_log("s1").await.is_none()); + } + + // --- PR #1b: TTL on GetSession/CancelSession tests --- + + #[tokio::test] + async fn get_session_checked_transitions_expired() { + let rt = make_runtime(); + + let payload = encode_session_start(1, vec![]); + let e = env("decision", "SessionStart", "m1", "s1", "alice", &payload); + rt.process(&e).await.unwrap(); + + // Wait for TTL to expire + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + + let session = rt.get_session_checked("s1").await.unwrap(); + assert_eq!(session.state, SessionState::Expired); + } + + #[tokio::test] + async fn cancel_expired_session_returns_expired_idempotent() { + let rt = make_runtime(); + + let payload = encode_session_start(1, vec![]); + let e = env("decision", "SessionStart", "m1", "s1", "alice", &payload); + rt.process(&e).await.unwrap(); + + // Wait for TTL to expire + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + + // Cancel should detect TTL expiry and return Expired idempotently + let result = rt.cancel_session("s1", "cancel attempt").await.unwrap(); + assert_eq!(result.session_state, SessionState::Expired); + } + + // --- PR #5: Participant enforcement tests --- + + #[tokio::test] + async fn canonical_decision_mode_requires_participants() { + let rt = make_runtime(); + + // macp.mode.decision.v1 with no participants should fail + let e = env( + "macp.mode.decision.v1", + "SessionStart", + "m1", + "s1", + "alice", + b"", + ); + let err = rt.process(&e).await.unwrap_err(); + assert_eq!(err.to_string(), "InvalidPayload"); + } + + #[tokio::test] + async fn canonical_decision_mode_with_participants_succeeds() { + let rt = make_runtime(); + + let payload = encode_session_start(0, vec!["alice".into(), "bob".into()]); + let e = env( + "macp.mode.decision.v1", + "SessionStart", + "m1", + "s1", + "alice", + &payload, + ); + rt.process(&e).await.unwrap(); + + let s = rt.registry.get_session("s1").await.unwrap(); + assert_eq!(s.mode, "macp.mode.decision.v1"); + assert_eq!(s.participants, vec!["alice", "bob"]); + } + + #[tokio::test] + async fn legacy_decision_alias_allows_empty_participants() { + let rt = make_runtime(); + + let e = env("decision", "SessionStart", "m1", "s1", "alice", b""); + rt.process(&e).await.unwrap(); + + let s = rt.registry.get_session("s1").await.unwrap(); + assert_eq!(s.mode, "decision"); + } } diff --git a/src/server.rs b/src/server.rs index 73caf4a..a979f57 100644 --- a/src/server.rs +++ b/src/server.rs @@ -89,7 +89,7 @@ impl MacpRuntimeService for MacpServer { runtime_info: Some(RuntimeInfo { name: "macp-runtime".into(), title: "MACP Reference Runtime".into(), - version: "0.2".into(), + version: "0.3".into(), description: "Reference implementation of the Multi-Agent Coordination Protocol" .into(), website_url: String::new(), @@ -150,7 +150,7 @@ impl MacpRuntimeService for MacpServer { ) -> Result, Status> { let req = request.into_inner(); - match self.runtime.registry.get_session(&req.session_id).await { + match self.runtime.get_session_checked(&req.session_id).await { Some(session) => { let state = Self::session_state_to_pb(&session.state); @@ -247,8 +247,9 @@ impl MacpRuntimeService for MacpServer { title: "Decision Mode".into(), description: "Proposal-based decision making with voting".into(), determinism_class: "semantic-deterministic".into(), - participant_model: "open".into(), + participant_model: "declared".into(), message_types: vec![ + "SessionStart".into(), "Proposal".into(), "Evaluation".into(), "Objection".into(), @@ -262,10 +263,11 @@ impl MacpRuntimeService for MacpServer { mode: "macp.mode.multi_round.v1".into(), mode_version: "1.0".into(), title: "Multi-Round Convergence Mode".into(), - description: "Participant-based convergence with all_equal strategy".into(), + description: "Participant-based convergence with all_equal strategy (experimental)" + .into(), determinism_class: "semantic-deterministic".into(), - participant_model: "closed".into(), - message_types: vec!["Contribute".into()], + participant_model: "declared".into(), + message_types: vec!["SessionStart".into(), "Contribute".into()], terminal_message_types: vec![], schema_uris: HashMap::new(), }, @@ -308,9 +310,24 @@ impl MacpRuntimeService for MacpServer { .await; match result { - Ok(_) => { + Ok(result) => { + let ack_env = Envelope { + macp_version: "1.0".into(), + mode: env.mode.clone(), + message_type: "Ack".into(), + message_id: format!("ack-{}", env.message_id), + session_id: env.session_id.clone(), + sender: "_runtime".into(), + timestamp_unix_ms: chrono::Utc::now().timestamp_millis(), + payload: serde_json::to_vec(&serde_json::json!({ + "ok": true, + "duplicate": result.duplicate, + "session_state": format!("{:?}", result.session_state), + })) + .unwrap_or_default(), + }; yield StreamSessionResponse { - envelope: Some(env), + envelope: Some(ack_env), }; } Err(e) => { @@ -1216,7 +1233,7 @@ mod tests { assert!(init.runtime_info.is_some()); let info = init.runtime_info.unwrap(); assert_eq!(info.name, "macp-runtime"); - assert_eq!(info.version, "0.2"); + assert_eq!(info.version, "0.3"); assert!(init.capabilities.is_some()); assert!(!init.supported_modes.is_empty()); }