diff --git a/src/main.rs b/src/main.rs index 9e68b99..31a9f90 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,6 +11,7 @@ use macp_runtime::storage::{ cleanup_temp_files, migrate_if_needed, FileBackend, MemoryBackend, StorageBackend, }; use server::MacpServer; +use std::io; use std::path::PathBuf; use std::sync::Arc; use tonic::transport::{Identity, Server, ServerTlsConfig}; @@ -31,6 +32,7 @@ async fn main() -> Result<(), Box> { let memory_only = std::env::var("MACP_MEMORY_ONLY").ok().as_deref() == Some("1"); let data_dir = PathBuf::from(std::env::var("MACP_DATA_DIR").unwrap_or_else(|_| ".macp-data".into())); + let strict_recovery = std::env::var("MACP_STRICT_RECOVERY").ok().as_deref() == Some("1"); let storage: Arc = if memory_only { Arc::new(MemoryBackend) @@ -50,22 +52,44 @@ async fn main() -> Result<(), Box> { // Enumerate session directories and replay from logs let sessions_dir = data_dir.join("sessions"); let mut recovered = 0usize; - if sessions_dir.exists() { - for entry in std::fs::read_dir(&sessions_dir)? { - let entry = entry?; - if !entry.file_type()?.is_dir() { + if tokio::fs::metadata(&sessions_dir).await.is_ok() { + let mut entries = tokio::fs::read_dir(&sessions_dir).await?; + while let Some(entry) = entries.next_entry().await? { + if !entry.file_type().await?.is_dir() { continue; } let session_id = entry.file_name().to_string_lossy().to_string(); - let log_entries = storage.load_log(&session_id).await?; + let log_entries = match storage.load_log(&session_id).await { + Ok(entries) => entries, + Err(e) if strict_recovery => { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("strict recovery: failed to load log for {session_id}: {e}"), + ) + .into()); + } + Err(e) => { + tracing::warn!( + session_id = %session_id, + error = %e, + "failed to load session log; skipping" + ); + continue; + } + }; if log_entries.is_empty() { continue; } match replay_session(&session_id, &log_entries, &mode_registry) { Ok(session) => { - // Best-effort snapshot update if let Err(e) = storage.save_session(&session).await { + if strict_recovery { + return Err(io::Error::other(format!( + "strict recovery: failed to persist recovered session {session_id}: {e}" + )) + .into()); + } tracing::warn!( session_id = %session_id, error = %e, @@ -73,7 +97,6 @@ async fn main() -> Result<(), Box> { ); } - // Populate in-memory log store log_store.create_session_log(&session_id).await; for log_entry in &log_entries { log_store.append(&session_id, log_entry.clone()).await; @@ -82,6 +105,13 @@ async fn main() -> Result<(), Box> { registry.insert_recovered_session(session_id, session).await; recovered += 1; } + Err(e) if strict_recovery => { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("strict recovery: failed to replay session {session_id}: {e}"), + ) + .into()); + } Err(e) => { tracing::warn!( session_id = %session_id, @@ -93,7 +123,11 @@ async fn main() -> Result<(), Box> { } } if recovered > 0 { - tracing::info!(count = recovered, "replayed sessions from log"); + tracing::info!( + count = recovered, + strict_recovery, + "replayed sessions from log" + ); } } diff --git a/src/mode/decision.rs b/src/mode/decision.rs index 307f59a..74e7667 100644 --- a/src/mode/decision.rs +++ b/src/mode/decision.rs @@ -8,8 +8,9 @@ use prost::Message; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)] pub enum DecisionPhase { + #[default] Proposal, Evaluation, Voting, @@ -78,15 +79,62 @@ impl DecisionMode { fn decode_state(data: &[u8]) -> Result { serde_json::from_slice(data).map_err(|_| MacpError::InvalidModeState) } + + fn ensure_not_committed(state: &DecisionState) -> Result<(), MacpError> { + if state.phase == DecisionPhase::Committed { + Err(MacpError::SessionNotOpen) + } else { + Ok(()) + } + } + + fn ensure_known_proposal(state: &DecisionState, proposal_id: &str) -> Result<(), MacpError> { + if !state.proposals.contains_key(proposal_id) { + return Err(MacpError::InvalidPayload); + } + Ok(()) + } + + fn ensure_can_propose(state: &DecisionState) -> Result<(), MacpError> { + Self::ensure_not_committed(state)?; + if state.phase == DecisionPhase::Voting { + return Err(MacpError::InvalidPayload); + } + Ok(()) + } + + fn ensure_can_deliberate(state: &DecisionState) -> Result<(), MacpError> { + Self::ensure_not_committed(state)?; + if state.phase != DecisionPhase::Evaluation { + return Err(MacpError::InvalidPayload); + } + Ok(()) + } + + fn ensure_can_vote(state: &DecisionState) -> Result<(), MacpError> { + Self::ensure_not_committed(state)?; + if matches!( + state.phase, + DecisionPhase::Proposal | DecisionPhase::Committed + ) { + return Err(MacpError::InvalidPayload); + } + Ok(()) + } + + fn commitment_ready(state: &DecisionState) -> bool { + !state.proposals.is_empty() + } } impl Mode for DecisionMode { /// Authorize the sender for decision mode messages. /// - /// Implements the **coordinator authority model**: the session initiator - /// may emit `Proposal` and `Commitment` regardless of declared participants. - /// Declared participants may emit `Proposal`, `Evaluation`, `Objection`, - /// and `Vote`. Only the initiator may emit `Commitment`. + /// Implements the coordinator authority model reflected in the Decision RFC: + /// the session initiator may emit `Proposal` and `Commitment` regardless of + /// declared participants. Declared participants may emit `Proposal`, + /// `Evaluation`, `Objection`, and `Vote`. Only the initiator may emit + /// `Commitment`. fn authorize_sender(&self, session: &Session, env: &Envelope) -> Result<(), MacpError> { match env.message_type.as_str() { "Proposal" | "Commitment" if env.sender == session.initiator_sender => Ok(()), @@ -120,15 +168,11 @@ impl Mode for DecisionMode { Self::decode_state(&session.mode_state)? }; - if state.phase == DecisionPhase::Committed { - return Err(MacpError::SessionNotOpen); - } + Self::ensure_not_committed(&state)?; match env.message_type.as_str() { "Proposal" => { - if state.phase == DecisionPhase::Voting { - return Err(MacpError::InvalidPayload); - } + Self::ensure_can_propose(&state)?; let payload = ProposalPayload::decode(&*env.payload) .map_err(|_| MacpError::InvalidPayload)?; if payload.proposal_id.trim().is_empty() @@ -150,14 +194,10 @@ impl Mode for DecisionMode { Ok(ModeResponse::PersistState(Self::encode_state(&state))) } "Evaluation" => { - if state.phase == DecisionPhase::Proposal { - return Err(MacpError::InvalidPayload); - } let payload = EvaluationPayload::decode(&*env.payload) .map_err(|_| MacpError::InvalidPayload)?; - if !state.proposals.contains_key(&payload.proposal_id) { - return Err(MacpError::InvalidPayload); - } + Self::ensure_can_deliberate(&state)?; + Self::ensure_known_proposal(&state, &payload.proposal_id)?; state.evaluations.push(Evaluation { proposal_id: payload.proposal_id, recommendation: payload.recommendation, @@ -168,14 +208,10 @@ impl Mode for DecisionMode { Ok(ModeResponse::PersistState(Self::encode_state(&state))) } "Objection" => { - if state.phase == DecisionPhase::Proposal { - return Err(MacpError::InvalidPayload); - } let payload = ObjectionPayload::decode(&*env.payload) .map_err(|_| MacpError::InvalidPayload)?; - if !state.proposals.contains_key(&payload.proposal_id) { - return Err(MacpError::InvalidPayload); - } + Self::ensure_can_deliberate(&state)?; + Self::ensure_known_proposal(&state, &payload.proposal_id)?; state.objections.push(Objection { proposal_id: payload.proposal_id, reason: payload.reason, @@ -189,14 +225,10 @@ impl Mode for DecisionMode { Ok(ModeResponse::PersistState(Self::encode_state(&state))) } "Vote" => { - if state.phase == DecisionPhase::Proposal { - return Err(MacpError::InvalidPayload); - } let payload = VotePayload::decode(&*env.payload).map_err(|_| MacpError::InvalidPayload)?; - if !state.proposals.contains_key(&payload.proposal_id) { - return Err(MacpError::InvalidPayload); - } + Self::ensure_can_vote(&state)?; + Self::ensure_known_proposal(&state, &payload.proposal_id)?; let proposal_votes = state.votes.entry(payload.proposal_id.clone()).or_default(); if proposal_votes.contains_key(&env.sender) { return Err(MacpError::InvalidPayload); @@ -215,7 +247,7 @@ impl Mode for DecisionMode { } "Commitment" => { validate_commitment_payload_for_session(session, &env.payload)?; - if state.proposals.is_empty() { + if !Self::commitment_ready(&state) { return Err(MacpError::InvalidPayload); } state.phase = DecisionPhase::Committed; @@ -290,6 +322,25 @@ mod tests { .encode_to_vec() } + fn evaluation(proposal_id: &str) -> Vec { + EvaluationPayload { + proposal_id: proposal_id.into(), + recommendation: "proceed".into(), + confidence: 0.9, + reason: "good".into(), + } + .encode_to_vec() + } + + fn objection(proposal_id: &str) -> Vec { + ObjectionPayload { + proposal_id: proposal_id.into(), + reason: "risky".into(), + severity: "high".into(), + } + .encode_to_vec() + } + fn commitment(session: &Session) -> Vec { CommitmentPayload { commitment_id: "c1".into(), @@ -311,6 +362,10 @@ mod tests { } } + fn decode(session: &Session) -> DecisionState { + serde_json::from_slice(&session.mode_state).unwrap() + } + #[test] fn session_start_requires_declared_participants() { let mode = DecisionMode; @@ -411,25 +466,6 @@ mod tests { ); } - fn evaluation(proposal_id: &str) -> Vec { - EvaluationPayload { - proposal_id: proposal_id.into(), - recommendation: "proceed".into(), - confidence: 0.9, - reason: "good".into(), - } - .encode_to_vec() - } - - fn objection(proposal_id: &str) -> Vec { - ObjectionPayload { - proposal_id: proposal_id.into(), - reason: "risky".into(), - severity: "high".into(), - } - .encode_to_vec() - } - #[test] fn evaluation_before_any_proposal_rejected() { let mode = DecisionMode; @@ -532,6 +568,78 @@ mod tests { ); } + #[test] + fn evaluation_after_voting_rejected() { + let mode = DecisionMode; + let mut session = test_session(); + let resp = mode + .on_session_start( + &session, + &env("agent://orchestrator", "SessionStart", vec![]), + ) + .unwrap(); + apply(&mut session, resp); + let resp = mode + .on_message( + &session, + &env("agent://orchestrator", "Proposal", proposal("p1")), + ) + .unwrap(); + apply(&mut session, resp); + let resp = mode + .on_message( + &session, + &env("agent://fraud", "Vote", vote("p1", "approve")), + ) + .unwrap(); + apply(&mut session, resp); + assert_eq!( + mode.on_message( + &session, + &env("agent://growth", "Evaluation", evaluation("p1")) + ) + .unwrap_err() + .to_string(), + "InvalidPayload" + ); + } + + #[test] + fn objection_after_voting_rejected() { + let mode = DecisionMode; + let mut session = test_session(); + let resp = mode + .on_session_start( + &session, + &env("agent://orchestrator", "SessionStart", vec![]), + ) + .unwrap(); + apply(&mut session, resp); + let resp = mode + .on_message( + &session, + &env("agent://orchestrator", "Proposal", proposal("p1")), + ) + .unwrap(); + apply(&mut session, resp); + let resp = mode + .on_message( + &session, + &env("agent://fraud", "Vote", vote("p1", "approve")), + ) + .unwrap(); + apply(&mut session, resp); + assert_eq!( + mode.on_message( + &session, + &env("agent://growth", "Objection", objection("p1")) + ) + .unwrap_err() + .to_string(), + "InvalidPayload" + ); + } + #[test] fn commitment_from_non_initiator_rejected() { let mode = DecisionMode; @@ -590,6 +698,73 @@ mod tests { ); } + #[test] + fn malformed_vote_payload_rejected() { + let mode = DecisionMode; + let mut session = test_session(); + let resp = mode + .on_session_start( + &session, + &env("agent://orchestrator", "SessionStart", vec![]), + ) + .unwrap(); + apply(&mut session, resp); + let resp = mode + .on_message( + &session, + &env("agent://orchestrator", "Proposal", proposal("p1")), + ) + .unwrap(); + apply(&mut session, resp); + assert_eq!( + mode.on_message(&session, &env("agent://fraud", "Vote", vec![0xff, 0x00])) + .unwrap_err() + .to_string(), + "InvalidPayload" + ); + } + + #[test] + fn phase_advances_from_proposal_to_voting_to_committed() { + let mode = DecisionMode; + let mut session = test_session(); + let resp = mode + .on_session_start( + &session, + &env("agent://orchestrator", "SessionStart", vec![]), + ) + .unwrap(); + apply(&mut session, resp); + assert_eq!(decode(&session).phase, DecisionPhase::Proposal); + + let resp = mode + .on_message( + &session, + &env("agent://orchestrator", "Proposal", proposal("p1")), + ) + .unwrap(); + apply(&mut session, resp); + assert_eq!(decode(&session).phase, DecisionPhase::Evaluation); + + let resp = mode + .on_message( + &session, + &env("agent://fraud", "Vote", vote("p1", "approve")), + ) + .unwrap(); + apply(&mut session, resp); + assert_eq!(decode(&session).phase, DecisionPhase::Voting); + + let resp = mode + .on_message( + &session, + &env("agent://orchestrator", "Commitment", commitment(&session)), + ) + .unwrap(); + apply(&mut session, resp); + assert_eq!(decode(&session).phase, DecisionPhase::Committed); + } + #[test] fn commitment_versions_must_match_session_bindings() { let mode = DecisionMode; diff --git a/src/mode/handoff.rs b/src/mode/handoff.rs index bc02744..9c100c3 100644 --- a/src/mode/handoff.rs +++ b/src/mode/handoff.rs @@ -142,6 +142,9 @@ impl Mode for HandoffMode { if offer.offered_by != env.sender { return Err(MacpError::Forbidden); } + if offer.disposition != HandoffDisposition::Offered { + return Err(MacpError::InvalidPayload); + } state .contexts .entry(payload.handoff_id) @@ -882,4 +885,37 @@ mod tests { .unwrap_err(); assert_eq!(err.to_string(), "InvalidPayload"); } + + #[test] + fn context_after_accept_is_rejected() { + let mode = HandoffMode; + let mut session = base_session(); + let resp = mode + .on_session_start(&session, &env("owner", "SessionStart", vec![])) + .unwrap(); + apply(&mut session, resp); + let resp = mode + .on_message( + &session, + &env("owner", "HandoffOffer", make_offer("h1", "target")), + ) + .unwrap(); + apply(&mut session, resp); + let resp = mode + .on_message( + &session, + &env("target", "HandoffAccept", make_accept("h1", "target")), + ) + .unwrap(); + apply(&mut session, resp); + assert_eq!( + mode.on_message( + &session, + &env("owner", "HandoffContext", make_context("h1")) + ) + .unwrap_err() + .to_string(), + "InvalidPayload" + ); + } } diff --git a/src/mode/proposal.rs b/src/mode/proposal.rs index 12eb02d..b3c933d 100644 --- a/src/mode/proposal.rs +++ b/src/mode/proposal.rs @@ -12,12 +12,21 @@ use prost::Message; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub enum ProposalDisposition { Live, Withdrawn, } +#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)] +pub enum ProposalPhase { + #[default] + Negotiating, + Converged, + TerminalRejected, + Committed, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ProposalRecord { pub proposal_id: String, @@ -42,6 +51,8 @@ pub struct ProposalState { pub proposals: BTreeMap, pub accepts: BTreeMap, pub terminal_rejections: Vec, + #[serde(default)] + pub phase: ProposalPhase, } pub struct ProposalMode; @@ -65,12 +76,10 @@ impl ProposalMode { .filter(|record| record.disposition == ProposalDisposition::Live) } - fn commitment_ready(session: &Session, state: &ProposalState) -> bool { - if !state.terminal_rejections.is_empty() { - return true; - } - - state + fn refresh_phase(session: &Session, state: &mut ProposalState) { + state.phase = if !state.terminal_rejections.is_empty() { + ProposalPhase::TerminalRejected + } else if state .proposals .values() .filter(|proposal| proposal.disposition == ProposalDisposition::Live) @@ -81,6 +90,26 @@ impl ProposalMode { &proposal.proposal_id, ) }) + { + ProposalPhase::Converged + } else { + ProposalPhase::Negotiating + }; + } + + fn commitment_ready(state: &ProposalState) -> bool { + matches!( + state.phase, + ProposalPhase::Converged | ProposalPhase::TerminalRejected + ) + } + + fn ensure_mutable(state: &ProposalState) -> Result<(), MacpError> { + if state.phase == ProposalPhase::Committed { + Err(MacpError::SessionNotOpen) + } else { + Ok(()) + } } } @@ -113,12 +142,13 @@ impl Mode for ProposalMode { } else { Self::decode_state(&session.mode_state)? }; + Self::ensure_mutable(&state)?; match env.message_type.as_str() { "Proposal" => { let payload = ProposalPayload::decode(&*env.payload) .map_err(|_| MacpError::InvalidPayload)?; - if payload.proposal_id.is_empty() + if payload.proposal_id.trim().is_empty() || state.proposals.contains_key(&payload.proposal_id) { return Err(MacpError::InvalidPayload); @@ -136,13 +166,14 @@ impl Mode for ProposalMode { disposition: ProposalDisposition::Live, }, ); + Self::refresh_phase(session, &mut state); Ok(ModeResponse::PersistState(Self::encode_state(&state))) } "CounterProposal" => { let payload = CounterProposalPayload::decode(&*env.payload) .map_err(|_| MacpError::InvalidPayload)?; - if payload.proposal_id.is_empty() - || payload.supersedes_proposal_id.is_empty() + if payload.proposal_id.trim().is_empty() + || payload.supersedes_proposal_id.trim().is_empty() || state.proposals.contains_key(&payload.proposal_id) || !state .proposals @@ -163,6 +194,7 @@ impl Mode for ProposalMode { disposition: ProposalDisposition::Live, }, ); + Self::refresh_phase(session, &mut state); Ok(ModeResponse::PersistState(Self::encode_state(&state))) } "Accept" => { @@ -174,6 +206,7 @@ impl Mode for ProposalMode { state .accepts .insert(env.sender.clone(), payload.proposal_id); + Self::refresh_phase(session, &mut state); Ok(ModeResponse::PersistState(Self::encode_state(&state))) } "Reject" => { @@ -189,6 +222,7 @@ impl Mode for ProposalMode { reason: payload.reason, }); } + Self::refresh_phase(session, &mut state); Ok(ModeResponse::PersistState(Self::encode_state(&state))) } "Withdraw" => { @@ -209,6 +243,7 @@ impl Mode for ProposalMode { state .terminal_rejections .retain(|r| r.proposal_id != payload.proposal_id); + Self::refresh_phase(session, &mut state); Ok(ModeResponse::PersistState(Self::encode_state(&state))) } "Commitment" => { @@ -216,9 +251,11 @@ impl Mode for ProposalMode { return Err(MacpError::Forbidden); } validate_commitment_payload_for_session(session, &env.payload)?; - if !Self::commitment_ready(session, &state) { + Self::refresh_phase(session, &mut state); + if !Self::commitment_ready(&state) { return Err(MacpError::InvalidPayload); } + state.phase = ProposalPhase::Committed; Ok(ModeResponse::PersistAndResolve { state: Self::encode_state(&state), resolution: env.payload.clone(), @@ -258,6 +295,10 @@ mod tests { } } + fn decode(session: &Session) -> ProposalState { + serde_json::from_slice(&session.mode_state).unwrap() + } + fn env(sender: &str, message_type: &str, payload: Vec) -> Envelope { Envelope { macp_version: "1.0".into(), @@ -773,4 +814,83 @@ mod tests { ) .unwrap(); } + + #[test] + fn phase_becomes_converged_when_all_participants_accept_same_live_proposal() { + let mode = ProposalMode; + let mut session = base_session(); + let resp = mode + .on_session_start(&session, &env("agent://buyer", "SessionStart", vec![])) + .unwrap(); + apply(&mut session, resp); + assert_eq!(decode(&session).phase, ProposalPhase::Negotiating); + + let resp = mode + .on_message( + &session, + &env("agent://seller", "Proposal", make_proposal("p1")), + ) + .unwrap(); + apply(&mut session, resp); + assert_eq!(decode(&session).phase, ProposalPhase::Negotiating); + + let resp = mode + .on_message(&session, &env("agent://buyer", "Accept", make_accept("p1"))) + .unwrap(); + apply(&mut session, resp); + assert_eq!(decode(&session).phase, ProposalPhase::Negotiating); + + let resp = mode + .on_message( + &session, + &env("agent://seller", "Accept", make_accept("p1")), + ) + .unwrap(); + apply(&mut session, resp); + assert_eq!(decode(&session).phase, ProposalPhase::Converged); + } + + #[test] + fn terminal_reject_sets_terminal_rejected_phase() { + let mode = ProposalMode; + let mut session = base_session(); + let resp = mode + .on_session_start(&session, &env("agent://buyer", "SessionStart", vec![])) + .unwrap(); + apply(&mut session, resp); + let resp = mode + .on_message( + &session, + &env("agent://seller", "Proposal", make_proposal("p1")), + ) + .unwrap(); + apply(&mut session, resp); + let resp = mode + .on_message( + &session, + &env("agent://buyer", "Reject", make_reject("p1", true)), + ) + .unwrap(); + apply(&mut session, resp); + assert_eq!(decode(&session).phase, ProposalPhase::TerminalRejected); + } + + #[test] + fn malformed_counterproposal_payload_rejected() { + let mode = ProposalMode; + let mut session = base_session(); + let resp = mode + .on_session_start(&session, &env("agent://buyer", "SessionStart", vec![])) + .unwrap(); + apply(&mut session, resp); + assert_eq!( + mode.on_message( + &session, + &env("agent://seller", "CounterProposal", vec![0xff, 0x00]) + ) + .unwrap_err() + .to_string(), + "InvalidPayload" + ); + } } diff --git a/src/mode_registry.rs b/src/mode_registry.rs index fa950e1..47f05cc 100644 --- a/src/mode_registry.rs +++ b/src/mode_registry.rs @@ -1,5 +1,5 @@ use std::collections::HashMap; -use std::sync::RwLock; +use std::sync::{Arc, RwLock}; use tokio::sync::broadcast; use crate::mode::decision::DecisionMode; @@ -14,12 +14,169 @@ use crate::mode::{ }; use crate::pb::ModeDescriptor; +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct ModeConformanceCatalog { + pub fixture_set_name: String, + pub golden_transcript_paths: Vec, + pub conformance_fixture_paths: Vec, +} + +pub trait ModeFactory: Send + Sync { + fn create(&self) -> Box; +} + +pub trait ModeDescriptorProvider: Send + Sync { + fn descriptor(&self) -> Option; +} + +pub trait ModeSchemaProvider: Send + Sync { + fn schema_uris(&self) -> HashMap; +} + +pub trait ModeConformanceProvider: Send + Sync { + fn fixture_set_name(&self) -> &'static str; + fn golden_transcript_paths(&self) -> Vec<&'static str>; + fn conformance_fixture_paths(&self) -> Vec<&'static str>; +} + +pub struct StaticModeFactory { + constructor: fn() -> Box, +} + +impl StaticModeFactory { + pub fn new(constructor: fn() -> Box) -> Self { + Self { constructor } + } +} + +impl ModeFactory for StaticModeFactory { + fn create(&self) -> Box { + (self.constructor)() + } +} + +pub struct ClosureModeFactory { + constructor: Arc Box + Send + Sync>, +} + +impl ClosureModeFactory { + pub fn new(constructor: Arc Box + Send + Sync>) -> Self { + Self { constructor } + } +} + +impl ModeFactory for ClosureModeFactory { + fn create(&self) -> Box { + (self.constructor)() + } +} + +#[derive(Clone)] +pub struct StaticModeDescriptorProvider { + descriptor: Option, +} + +impl StaticModeDescriptorProvider { + pub fn new(descriptor: Option) -> Self { + Self { descriptor } + } +} + +impl ModeDescriptorProvider for StaticModeDescriptorProvider { + fn descriptor(&self) -> Option { + self.descriptor.clone() + } +} + +#[derive(Clone, Default)] +pub struct StaticModeSchemaProvider { + schema_uris: HashMap, +} + +impl StaticModeSchemaProvider { + pub fn new(schema_uris: HashMap) -> Self { + Self { schema_uris } + } +} + +impl ModeSchemaProvider for StaticModeSchemaProvider { + fn schema_uris(&self) -> HashMap { + self.schema_uris.clone() + } +} + +#[derive(Clone, Default)] +pub struct StaticModeConformanceProvider { + fixture_set_name: &'static str, + golden_transcript_paths: Vec<&'static str>, + conformance_fixture_paths: Vec<&'static str>, +} + +impl StaticModeConformanceProvider { + pub fn new( + fixture_set_name: &'static str, + golden_transcript_paths: Vec<&'static str>, + conformance_fixture_paths: Vec<&'static str>, + ) -> Self { + Self { + fixture_set_name, + golden_transcript_paths, + conformance_fixture_paths, + } + } +} + +impl ModeConformanceProvider for StaticModeConformanceProvider { + fn fixture_set_name(&self) -> &'static str { + self.fixture_set_name + } + + fn golden_transcript_paths(&self) -> Vec<&'static str> { + self.golden_transcript_paths.clone() + } + + fn conformance_fixture_paths(&self) -> Vec<&'static str> { + self.conformance_fixture_paths.clone() + } +} + pub struct ModeRegistration { pub mode_name: String, - pub mode: Box, - pub descriptor: Option, + pub factory: Arc, + pub descriptor_provider: Arc, + pub schema_provider: Arc, + pub conformance_provider: Arc, pub standards_track: bool, pub builtin: bool, + pub strict_session_start: bool, +} + +impl ModeRegistration { + pub fn descriptor(&self) -> Option { + self.descriptor_provider.descriptor().map(|mut descriptor| { + descriptor.mode = self.mode_name.clone(); + descriptor.schema_uris = self.schema_provider.schema_uris(); + descriptor + }) + } + + pub fn conformance_catalog(&self) -> ModeConformanceCatalog { + ModeConformanceCatalog { + fixture_set_name: self.conformance_provider.fixture_set_name().to_string(), + golden_transcript_paths: self + .conformance_provider + .golden_transcript_paths() + .into_iter() + .map(str::to_string) + .collect(), + conformance_fixture_paths: self + .conformance_provider + .conformance_fixture_paths() + .into_iter() + .map(str::to_string) + .collect(), + } + } } pub struct ModeRegistry { @@ -40,44 +197,102 @@ impl ModeRegistry { let mut entries = HashMap::new(); - // Standards-track modes - let standard_modes: Vec<(&str, Box)> = vec![ - ("macp.mode.decision.v1", Box::new(DecisionMode)), - ("macp.mode.proposal.v1", Box::new(ProposalMode)), - ("macp.mode.task.v1", Box::new(TaskMode)), - ("macp.mode.handoff.v1", Box::new(HandoffMode)), - ("macp.mode.quorum.v1", Box::new(QuorumMode)), - ]; - - for (name, mode) in standard_modes { - entries.insert( - name.to_string(), - ModeRegistration { - mode_name: name.to_string(), - mode, - descriptor: descriptor_map.remove(name), - standards_track: true, - builtin: true, - }, - ); - } - - // Built-in extension modes - let extension_modes: Vec<(&str, Box)> = - vec![("ext.multi_round.v1", Box::new(MultiRoundMode))]; - - for (name, mode) in extension_modes { - entries.insert( - name.to_string(), - ModeRegistration { - mode_name: name.to_string(), - mode, - descriptor: descriptor_map.remove(name), - standards_track: false, - builtin: true, - }, - ); - } + Self::insert_builtin( + &mut entries, + "macp.mode.decision.v1", + Arc::new(StaticModeFactory::new(|| { + Box::new(DecisionMode) as Box + })), + descriptor_map.remove("macp.mode.decision.v1"), + true, + true, + "decision", + vec!["tests/conformance/decision_happy_path.json"], + vec![ + "tests/conformance/decision_happy_path.json", + "tests/conformance/decision_reject_paths.json", + ], + ); + Self::insert_builtin( + &mut entries, + "macp.mode.proposal.v1", + Arc::new(StaticModeFactory::new(|| { + Box::new(ProposalMode) as Box + })), + descriptor_map.remove("macp.mode.proposal.v1"), + true, + true, + "proposal", + vec!["tests/conformance/proposal_happy_path.json"], + vec![ + "tests/conformance/proposal_happy_path.json", + "tests/conformance/proposal_reject_paths.json", + ], + ); + Self::insert_builtin( + &mut entries, + "macp.mode.task.v1", + Arc::new(StaticModeFactory::new(|| { + Box::new(TaskMode) as Box + })), + descriptor_map.remove("macp.mode.task.v1"), + true, + true, + "task", + vec!["tests/conformance/task_happy_path.json"], + vec![ + "tests/conformance/task_happy_path.json", + "tests/conformance/task_reject_paths.json", + ], + ); + Self::insert_builtin( + &mut entries, + "macp.mode.handoff.v1", + Arc::new(StaticModeFactory::new(|| { + Box::new(HandoffMode) as Box + })), + descriptor_map.remove("macp.mode.handoff.v1"), + true, + true, + "handoff", + vec!["tests/conformance/handoff_happy_path.json"], + vec![ + "tests/conformance/handoff_happy_path.json", + "tests/conformance/handoff_reject_paths.json", + ], + ); + Self::insert_builtin( + &mut entries, + "macp.mode.quorum.v1", + Arc::new(StaticModeFactory::new(|| { + Box::new(QuorumMode) as Box + })), + descriptor_map.remove("macp.mode.quorum.v1"), + true, + true, + "quorum", + vec!["tests/conformance/quorum_happy_path.json"], + vec![ + "tests/conformance/quorum_happy_path.json", + "tests/conformance/quorum_reject_paths.json", + ], + ); + Self::insert_builtin( + &mut entries, + "ext.multi_round.v1", + Arc::new(StaticModeFactory::new(|| { + Box::new(MultiRoundMode) as Box + })), + descriptor_map.remove("ext.multi_round.v1"), + false, + true, + "multi_round", + vec!["tests/conformance/multi_round_happy_path.json"], + vec![ + "tests/conformance/multi_round_happy_path.json", + "tests/conformance/multi_round_reject_paths.json", + ], + ); let (change_tx, _) = broadcast::channel(16); Self { @@ -86,6 +301,44 @@ impl ModeRegistry { } } + #[allow(clippy::too_many_arguments)] + fn insert_builtin( + entries: &mut HashMap, + name: &str, + factory: Arc, + descriptor: Option, + standards_track: bool, + strict_session_start: bool, + fixture_set_name: &'static str, + golden_transcripts: Vec<&'static str>, + conformance_fixtures: Vec<&'static str>, + ) { + let schema_uris = descriptor + .as_ref() + .map(|d| d.schema_uris.clone()) + .unwrap_or_default(); + let descriptor_provider = Arc::new(StaticModeDescriptorProvider::new(descriptor)); + let schema_provider = Arc::new(StaticModeSchemaProvider::new(schema_uris)); + let conformance_provider = Arc::new(StaticModeConformanceProvider::new( + fixture_set_name, + golden_transcripts, + conformance_fixtures, + )); + entries.insert( + name.to_string(), + ModeRegistration { + mode_name: name.to_string(), + factory, + descriptor_provider, + schema_provider, + conformance_provider, + standards_track, + builtin: true, + strict_session_start, + }, + ); + } + fn ordered_standard_names(entries: &HashMap) -> Vec { let mut names: Vec = STANDARD_MODE_NAMES .iter() @@ -122,15 +375,6 @@ impl ModeRegistry { } } - /// Execute a mode callback while holding the read lock. - pub fn with_mode(&self, name: &str, f: F) -> Option - where - F: FnOnce(&dyn Mode) -> R, - { - let guard = self.entries.read().expect("mode registry lock poisoned"); - guard.get(name).map(|e| f(e.mode.as_ref())) - } - pub fn standard_mode_names(&self) -> Vec { let guard = self.entries.read().expect("mode registry lock poisoned"); Self::ordered_standard_names(&guard) @@ -140,7 +384,7 @@ impl ModeRegistry { let guard = self.entries.read().expect("mode registry lock poisoned"); Self::ordered_standard_names(&guard) .into_iter() - .filter_map(|name| guard.get(&name).and_then(|entry| entry.descriptor.clone())) + .filter_map(|name| guard.get(&name).and_then(ModeRegistration::descriptor)) .collect() } @@ -160,7 +404,7 @@ impl ModeRegistry { let mut descriptors: Vec = guard .iter() .filter(|(_, e)| !e.standards_track) - .filter_map(|(_, e)| e.descriptor.clone()) + .filter_map(|(_, e)| e.descriptor()) .collect(); descriptors.sort_by(|a, b| a.mode.cmp(&b.mode)); descriptors @@ -177,36 +421,91 @@ impl ModeRegistry { let guard = self.entries.read().expect("mode registry lock poisoned"); let mut descriptors: Vec = guard .values() - .filter_map(|e| e.descriptor.clone()) + .filter_map(ModeRegistration::descriptor) .collect(); descriptors.sort_by(|a, b| a.mode.cmp(&b.mode)); descriptors } + pub fn all_mode_conformance(&self) -> Vec<(String, ModeConformanceCatalog)> { + let guard = self.entries.read().expect("mode registry lock poisoned"); + let mut conformance: Vec<(String, ModeConformanceCatalog)> = guard + .iter() + .map(|(name, entry)| (name.clone(), entry.conformance_catalog())) + .collect(); + conformance.sort_by(|a, b| a.0.cmp(&b.0)); + conformance + } + pub fn is_standard_mode(&self, name: &str) -> bool { let guard = self.entries.read().expect("mode registry lock poisoned"); guard.get(name).map(|e| e.standards_track).unwrap_or(false) } - /// Register a new extension mode dynamically. - pub fn register_extension(&self, descriptor: ModeDescriptor) -> Result<(), String> { - let name = descriptor.mode.clone(); - if name.is_empty() { + pub fn requires_strict_session_start(&self, name: &str) -> bool { + let guard = self.entries.read().expect("mode registry lock poisoned"); + guard + .get(name) + .map(|entry| entry.strict_session_start) + .unwrap_or(false) + } + + fn validate_extension_descriptor(descriptor: &ModeDescriptor) -> Result<(), String> { + if descriptor.mode.trim().is_empty() { return Err("mode name must not be empty".into()); } - if name.starts_with("macp.mode.") { + if descriptor.mode.starts_with("macp.mode.") { return Err("cannot register extension with reserved macp.mode.* namespace".into()); } + if descriptor.message_types.is_empty() { + return Err("extension descriptor must declare at least one message type".into()); + } + if descriptor.mode_version.trim().is_empty() { + return Err("extension descriptor must bind mode_version".into()); + } + for terminal in &descriptor.terminal_message_types { + if !descriptor + .message_types + .iter() + .any(|message_type| message_type == terminal) + { + return Err(format!( + "terminal message type '{}' must also appear in message_types", + terminal + )); + } + } + Ok(()) + } - let allowed_types = descriptor + /// Register a new descriptor-driven extension mode dynamically. + /// + /// Dynamically registered extensions currently use `PassthroughMode`, which + /// validates only the descriptor-declared message types and commitment + /// authority. This keeps runtime behavior explicit until a richer external + /// plugin mechanism is introduced. + pub fn register_extension(&self, descriptor: ModeDescriptor) -> Result<(), String> { + Self::validate_extension_descriptor(&descriptor)?; + let name = descriptor.mode.clone(); + let schema_uris = descriptor.schema_uris.clone(); + let allowed_types: Vec = descriptor .message_types .iter() .filter(|t| *t != "SessionStart") .cloned() .collect(); - let mode: Box = Box::new(PassthroughMode { - allowed_message_types: allowed_types, - }); + let allowed_types = Arc::new(allowed_types); + let factory: Arc = Arc::new(ClosureModeFactory::new(Arc::new({ + let allowed_types = Arc::clone(&allowed_types); + move || { + Box::new(PassthroughMode { + allowed_message_types: (*allowed_types).clone(), + }) as Box + } + }))); + let descriptor_provider = Arc::new(StaticModeDescriptorProvider::new(Some(descriptor))); + let schema_provider = Arc::new(StaticModeSchemaProvider::new(schema_uris)); + let conformance_provider = Arc::new(StaticModeConformanceProvider::default()); let mut guard = self.entries.write().expect("mode registry lock poisoned"); if guard.contains_key(&name) { @@ -216,10 +515,13 @@ impl ModeRegistry { name.clone(), ModeRegistration { mode_name: name, - mode, - descriptor: Some(descriptor), + factory, + descriptor_provider, + schema_provider, + conformance_provider, standards_track: false, builtin: false, + strict_session_start: false, }, ); drop(guard); @@ -265,15 +567,12 @@ impl ModeRegistry { )); } - // Remove old entry and re-insert with updated flags let mut registration = guard .remove(mode) .ok_or_else(|| format!("mode '{}' not found", mode))?; registration.standards_track = true; + registration.strict_session_start = true; registration.mode_name = final_name.clone(); - if let Some(ref mut desc) = registration.descriptor { - desc.mode = final_name.clone(); - } guard.insert(final_name.clone(), registration); drop(guard); let _ = self.change_tx.send(()); @@ -286,28 +585,33 @@ impl ModeRegistry { } } -/// A handle that allows calling mode methods while the registry read lock is not held -/// across await points. Callers use `with_mode` or obtain a `ModeRef` and call its methods. +/// A handle that allows calling mode methods without keeping the registry read +/// lock held across callback execution. pub struct ModeRef<'a> { registry: &'a ModeRegistry, name: String, } impl<'a> ModeRef<'a> { - pub fn on_session_start( - &self, - session: &crate::session::Session, - env: &crate::pb::Envelope, - ) -> Result { + fn factory(&self) -> Result, crate::error::MacpError> { let guard = self .registry .entries .read() .expect("mode registry lock poisoned"); - let entry = guard + guard .get(&self.name) - .ok_or(crate::error::MacpError::UnknownMode)?; - entry.mode.on_session_start(session, env) + .map(|entry| Arc::clone(&entry.factory)) + .ok_or(crate::error::MacpError::UnknownMode) + } + + pub fn on_session_start( + &self, + session: &crate::session::Session, + env: &crate::pb::Envelope, + ) -> Result { + let mode = self.factory()?.create(); + mode.on_session_start(session, env) } pub fn on_message( @@ -315,15 +619,8 @@ impl<'a> ModeRef<'a> { session: &crate::session::Session, env: &crate::pb::Envelope, ) -> Result { - let guard = self - .registry - .entries - .read() - .expect("mode registry lock poisoned"); - let entry = guard - .get(&self.name) - .ok_or(crate::error::MacpError::UnknownMode)?; - entry.mode.on_message(session, env) + let mode = self.factory()?.create(); + mode.on_message(session, env) } pub fn authorize_sender( @@ -331,15 +628,8 @@ impl<'a> ModeRef<'a> { session: &crate::session::Session, env: &crate::pb::Envelope, ) -> Result<(), crate::error::MacpError> { - let guard = self - .registry - .entries - .read() - .expect("mode registry lock poisoned"); - let entry = guard - .get(&self.name) - .ok_or(crate::error::MacpError::UnknownMode)?; - entry.mode.authorize_sender(session, env) + let mode = self.factory()?.create(); + mode.authorize_sender(session, env) } } @@ -354,6 +644,7 @@ mod tests { for name in STANDARD_MODE_NAMES { assert!(registry.get_mode(name).is_some(), "missing mode: {name}"); assert!(registry.is_standard_mode(name)); + assert!(registry.requires_strict_session_start(name)); } } @@ -362,6 +653,7 @@ mod tests { let registry = ModeRegistry::build_default(); assert!(registry.get_mode("ext.multi_round.v1").is_some()); assert!(!registry.is_standard_mode("ext.multi_round.v1")); + assert!(registry.requires_strict_session_start("ext.multi_round.v1")); } #[test] @@ -403,6 +695,7 @@ mod tests { let registry = ModeRegistry::build_default(); assert!(registry.get_mode("nonexistent").is_none()); assert!(!registry.is_standard_mode("nonexistent")); + assert!(!registry.requires_strict_session_start("nonexistent")); } #[test] @@ -433,6 +726,32 @@ mod tests { let registry = ModeRegistry::build_default(); let descriptor = ModeDescriptor { mode: "macp.mode.evil.v1".into(), + mode_version: "1.0.0".into(), + message_types: vec!["SessionStart".into(), "Commitment".into()], + ..Default::default() + }; + assert!(registry.register_extension(descriptor).is_err()); + } + + #[test] + fn register_rejects_empty_message_types() { + let registry = ModeRegistry::build_default(); + let descriptor = ModeDescriptor { + mode: "ext.invalid.v1".into(), + mode_version: "1.0.0".into(), + ..Default::default() + }; + assert!(registry.register_extension(descriptor).is_err()); + } + + #[test] + fn register_rejects_terminal_not_in_message_types() { + let registry = ModeRegistry::build_default(); + let descriptor = ModeDescriptor { + mode: "ext.invalid.v1".into(), + mode_version: "1.0.0".into(), + message_types: vec!["SessionStart".into(), "Custom".into()], + terminal_message_types: vec!["Commitment".into()], ..Default::default() }; assert!(registry.register_extension(descriptor).is_err()); @@ -443,6 +762,8 @@ mod tests { let registry = ModeRegistry::build_default(); let descriptor = ModeDescriptor { mode: "ext.multi_round.v1".into(), + mode_version: "1.0.0".into(), + message_types: vec!["SessionStart".into(), "Commitment".into()], ..Default::default() }; assert!(registry.register_extension(descriptor).is_err()); @@ -480,16 +801,19 @@ mod tests { mode: "ext.new.v1".into(), mode_version: "1.0.0".into(), message_types: vec!["SessionStart".into(), "Commitment".into()], + terminal_message_types: vec!["Commitment".into()], ..Default::default() }; registry.register_extension(descriptor).unwrap(); assert!(!registry.is_standard_mode("ext.new.v1")); + assert!(!registry.requires_strict_session_start("ext.new.v1")); let final_name = registry .promote_mode("ext.new.v1", Some("macp.mode.new.v1")) .unwrap(); assert_eq!(final_name, "macp.mode.new.v1"); assert!(registry.is_standard_mode("macp.mode.new.v1")); + assert!(registry.requires_strict_session_start("macp.mode.new.v1")); assert!(registry.get_mode("ext.new.v1").is_none()); assert!(registry.get_mode("macp.mode.new.v1").is_some()); } @@ -501,12 +825,14 @@ mod tests { mode: "ext.keep.v1".into(), mode_version: "1.0.0".into(), message_types: vec!["SessionStart".into(), "Commitment".into()], + terminal_message_types: vec!["Commitment".into()], ..Default::default() }; registry.register_extension(descriptor).unwrap(); let final_name = registry.promote_mode("ext.keep.v1", None).unwrap(); assert_eq!(final_name, "ext.keep.v1"); assert!(registry.is_standard_mode("ext.keep.v1")); + assert!(registry.requires_strict_session_start("ext.keep.v1")); } #[test] @@ -517,6 +843,7 @@ mod tests { mode_version: "1.0.0".into(), title: "Promoted".into(), message_types: vec!["SessionStart".into(), "Commitment".into()], + terminal_message_types: vec!["Commitment".into()], ..Default::default() }; registry.register_extension(descriptor).unwrap(); @@ -553,4 +880,20 @@ mod tests { ); } } + + #[test] + fn conformance_catalog_exposes_builtin_fixture_sets() { + let registry = ModeRegistry::build_default(); + let catalog = registry.all_mode_conformance(); + let decision = catalog + .into_iter() + .find(|(name, _)| name == "macp.mode.decision.v1") + .expect("decision catalog should exist"); + assert_eq!(decision.1.fixture_set_name, "decision"); + assert!(decision + .1 + .conformance_fixture_paths + .iter() + .any(|path| path.ends_with("decision_happy_path.json"))); + } } diff --git a/src/replay.rs b/src/replay.rs index 5d9e923..aa854a3 100644 --- a/src/replay.rs +++ b/src/replay.rs @@ -35,8 +35,7 @@ pub fn replay_session( let mode = registry.get_mode(mode_name).ok_or(MacpError::UnknownMode)?; // 2. Parse SessionStartPayload - let require_complete_start = - registry.is_standard_mode(mode_name) || mode_name == "ext.multi_round.v1"; + let require_complete_start = registry.requires_strict_session_start(mode_name); let start_payload = if start_entry.raw_payload.is_empty() && !require_complete_start { crate::pb::SessionStartPayload::default() } else { diff --git a/src/runtime.rs b/src/runtime.rs index f3e6f82..b82f876 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -204,8 +204,7 @@ impl Runtime { .ok_or(MacpError::UnknownMode)?; let start_payload = parse_session_start_payload(&env.payload)?; - let require_complete_start = - self.mode_registry.is_standard_mode(mode_name) || mode_name == "ext.multi_round.v1"; + let require_complete_start = self.mode_registry.requires_strict_session_start(mode_name); if require_complete_start { validate_canonical_session_start_payload(&start_payload)?; } diff --git a/src/storage.rs b/src/storage.rs index 3729d1b..684ccf6 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -3,8 +3,10 @@ use crate::registry::PersistedSession; use crate::session::Session; use std::collections::HashMap; use std::fs; -use std::io::{self, BufRead, Write}; +use std::io; use std::path::{Path, PathBuf}; +use tokio::fs as tfs; +use tokio::io::AsyncWriteExt; const STORAGE_VERSION: u32 = 3; @@ -81,32 +83,32 @@ impl FileBackend { self.session_dir(session_id).join("log.jsonl") } - fn atomic_write(path: &Path, data: &[u8]) -> io::Result<()> { + async fn atomic_write(path: &Path, data: &[u8]) -> io::Result<()> { let tmp_path = path.with_extension("json.tmp"); - fs::write(&tmp_path, data)?; - fs::rename(&tmp_path, path) + tfs::write(&tmp_path, data).await?; + tfs::rename(&tmp_path, path).await } } #[async_trait::async_trait] impl StorageBackend for FileBackend { async fn create_session_storage(&self, session_id: &str) -> io::Result<()> { - fs::create_dir_all(self.session_dir(session_id)) + tfs::create_dir_all(self.session_dir(session_id)).await } async fn save_session(&self, session: &Session) -> io::Result<()> { let persisted = PersistedSession::from(session); let bytes = serde_json::to_vec_pretty(&persisted) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - Self::atomic_write(&self.session_file(&session.session_id), &bytes) + Self::atomic_write(&self.session_file(&session.session_id), &bytes).await } async fn load_session(&self, session_id: &str) -> io::Result> { let path = self.session_file(session_id); - if !path.exists() { + if tfs::metadata(&path).await.is_err() { return Ok(None); } - let bytes = fs::read(&path)?; + let bytes = tfs::read(&path).await?; let persisted: PersistedSession = serde_json::from_slice(&bytes) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; Ok(Some(Session::from(persisted))) @@ -114,20 +116,20 @@ impl StorageBackend for FileBackend { async fn load_all_sessions(&self) -> io::Result> { let sessions_dir = self.base_dir.join("sessions"); - if !sessions_dir.exists() { + if tfs::metadata(&sessions_dir).await.is_err() { return Ok(vec![]); } let mut sessions = Vec::new(); - for entry in fs::read_dir(&sessions_dir)? { - let entry = entry?; - if !entry.file_type()?.is_dir() { + let mut entries = tfs::read_dir(&sessions_dir).await?; + while let Some(entry) = entries.next_entry().await? { + if !entry.file_type().await?.is_dir() { continue; } let session_file = entry.path().join("session.json"); - if !session_file.exists() { + if tfs::metadata(&session_file).await.is_err() { continue; } - let bytes = fs::read(&session_file)?; + let bytes = tfs::read(&session_file).await?; match serde_json::from_slice::(&bytes) { Ok(persisted) => sessions.push(Session::from(persisted)), Err(e) => { @@ -147,29 +149,28 @@ impl StorageBackend for FileBackend { .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; line.push('\n'); - let mut file = fs::OpenOptions::new() + let mut file = tfs::OpenOptions::new() .create(true) .append(true) - .open(&path)?; - file.write_all(line.as_bytes())?; - file.sync_data()?; + .open(&path) + .await?; + file.write_all(line.as_bytes()).await?; + file.sync_data().await?; Ok(()) } async fn load_log(&self, session_id: &str) -> io::Result> { let path = self.log_file(session_id); - if !path.exists() { + if tfs::metadata(&path).await.is_err() { return Ok(vec![]); } - let file = fs::File::open(&path)?; - let reader = io::BufReader::new(file); + let content = tfs::read_to_string(&path).await?; let mut entries = Vec::new(); - for (line_num, line) in reader.lines().enumerate() { - let line = line?; + for (line_num, line) in content.lines().enumerate() { if line.trim().is_empty() { continue; } - match serde_json::from_str::(&line) { + match serde_json::from_str::(line) { Ok(entry) => entries.push(entry), Err(e) => { eprintln!( diff --git a/tests/conformance/decision_happy_path.json b/tests/conformance/decision_happy_path.json index d882967..7d601cf 100644 --- a/tests/conformance/decision_happy_path.json +++ b/tests/conformance/decision_happy_path.json @@ -1,7 +1,10 @@ { "mode": "macp.mode.decision.v1", "initiator": "agent://orchestrator", - "participants": ["agent://a", "agent://b"], + "participants": [ + "agent://a", + "agent://b" + ], "mode_version": "1.0.0", "configuration_version": "cfg-1", "policy_version": "policy-1", @@ -11,14 +14,23 @@ "sender": "agent://orchestrator", "message_type": "Proposal", "payload_type": "decision.Proposal", - "payload": { "proposal_id": "p1", "option": "deploy", "rationale": "ready", "supporting_data": [] }, + "payload": { + "proposal_id": "p1", + "option": "deploy", + "rationale": "ready", + "supporting_data": [] + }, "expect": "accept" }, { "sender": "agent://a", "message_type": "Vote", "payload_type": "decision.Vote", - "payload": { "proposal_id": "p1", "vote": "approve", "reason": "good" }, + "payload": { + "proposal_id": "p1", + "vote": "approve", + "reason": "good" + }, "expect": "accept" }, { @@ -37,5 +49,27 @@ "expect": "accept" } ], - "expected_final_state": "Resolved" + "expected_final_state": "Resolved", + "expect_resolution_present": true, + "expected_resolution": { + "action": "decision.selected", + "mode_version": "1.0.0", + "configuration_version": "cfg-1" + }, + "expected_mode_state": { + "phase": "Committed", + "proposals": { + "p1": { + "proposal_id": "p1", + "sender": "agent://orchestrator" + } + }, + "votes": { + "p1": { + "agent://a": { + "vote": "approve" + } + } + } + } } diff --git a/tests/conformance/decision_reject_paths.json b/tests/conformance/decision_reject_paths.json index bd5e3ba..2268557 100644 --- a/tests/conformance/decision_reject_paths.json +++ b/tests/conformance/decision_reject_paths.json @@ -1,7 +1,10 @@ { "mode": "macp.mode.decision.v1", "initiator": "agent://orchestrator", - "participants": ["agent://a", "agent://b"], + "participants": [ + "agent://a", + "agent://b" + ], "mode_version": "1.0.0", "configuration_version": "cfg-1", "policy_version": "policy-1", @@ -11,14 +14,25 @@ "sender": "agent://outsider", "message_type": "Proposal", "payload_type": "decision.Proposal", - "payload": { "proposal_id": "p1", "option": "deploy", "rationale": "ready", "supporting_data": [] }, - "expect": "reject" + "payload": { + "proposal_id": "p1", + "option": "deploy", + "rationale": "ready", + "supporting_data": [] + }, + "expect": "reject", + "expected_error_code": "FORBIDDEN" }, { "sender": "agent://orchestrator", "message_type": "Proposal", "payload_type": "decision.Proposal", - "payload": { "proposal_id": "p1", "option": "deploy", "rationale": "ready", "supporting_data": [] }, + "payload": { + "proposal_id": "p1", + "option": "deploy", + "rationale": "ready", + "supporting_data": [] + }, "expect": "accept" }, { @@ -34,8 +48,44 @@ "policy_version": "policy-1", "configuration_version": "cfg-1" }, - "expect": "reject" + "expect": "reject", + "expected_error_code": "FORBIDDEN" + }, + { + "sender": "agent://a", + "message_type": "Vote", + "payload_type": "decision.Vote", + "payload": { + "proposal_id": "p1", + "vote": "approve", + "reason": "good" + }, + "expect": "accept" + }, + { + "sender": "agent://b", + "message_type": "Evaluation", + "payload_type": "decision.Evaluation", + "payload": { + "proposal_id": "p1", + "recommendation": "approve", + "confidence": 0.7, + "reason": "late" + }, + "expect": "reject", + "expected_error_code": "INVALID_ENVELOPE" } ], - "expected_final_state": "Open" + "expected_final_state": "Open", + "expected_mode_state": { + "phase": "Voting", + "votes": { + "p1": { + "agent://a": { + "vote": "approve" + } + } + } + }, + "expect_resolution_present": false } diff --git a/tests/conformance/handoff_reject_paths.json b/tests/conformance/handoff_reject_paths.json index 8d2e9a4..b40ba22 100644 --- a/tests/conformance/handoff_reject_paths.json +++ b/tests/conformance/handoff_reject_paths.json @@ -1,7 +1,10 @@ { "mode": "macp.mode.handoff.v1", "initiator": "agent://owner", - "participants": ["agent://owner", "agent://target"], + "participants": [ + "agent://owner", + "agent://target" + ], "mode_version": "1.0.0", "configuration_version": "cfg-1", "policy_version": "policy-1", @@ -11,16 +14,57 @@ "sender": "agent://target", "message_type": "HandoffAccept", "payload_type": "handoff.HandoffAccept", - "payload": { "handoff_id": "h1", "accepted_by": "agent://target", "reason": "ready" }, - "expect": "reject" + "payload": { + "handoff_id": "h1", + "accepted_by": "agent://target", + "reason": "ready" + }, + "expect": "reject", + "expected_error_code": "INVALID_ENVELOPE" }, { "sender": "agent://owner", "message_type": "HandoffOffer", "payload_type": "handoff.HandoffOffer", - "payload": { "handoff_id": "h1", "target_participant": "agent://target", "scope": "support", "reason": "escalate" }, + "payload": { + "handoff_id": "h1", + "target_participant": "agent://target", + "scope": "support", + "reason": "escalate" + }, "expect": "accept" + }, + { + "sender": "agent://target", + "message_type": "HandoffAccept", + "payload_type": "handoff.HandoffAccept", + "payload": { + "handoff_id": "h1", + "accepted_by": "agent://target", + "reason": "ready" + }, + "expect": "accept" + }, + { + "sender": "agent://owner", + "message_type": "HandoffContext", + "payload_type": "handoff.HandoffContext", + "payload": { + "handoff_id": "h1", + "content_type": "text/plain", + "context": "late context" + }, + "expect": "reject", + "expected_error_code": "INVALID_ENVELOPE" } ], - "expected_final_state": "Open" + "expected_final_state": "Open", + "expected_mode_state": { + "offers": { + "h1": { + "disposition": "Accepted" + } + } + }, + "expect_resolution_present": false } diff --git a/tests/conformance/proposal_happy_path.json b/tests/conformance/proposal_happy_path.json index 61956b5..824cf96 100644 --- a/tests/conformance/proposal_happy_path.json +++ b/tests/conformance/proposal_happy_path.json @@ -1,7 +1,10 @@ { "mode": "macp.mode.proposal.v1", "initiator": "agent://buyer", - "participants": ["agent://buyer", "agent://seller"], + "participants": [ + "agent://buyer", + "agent://seller" + ], "mode_version": "1.0.0", "configuration_version": "cfg-1", "policy_version": "policy-1", @@ -11,21 +14,33 @@ "sender": "agent://seller", "message_type": "Proposal", "payload_type": "proposal.Proposal", - "payload": { "proposal_id": "p1", "title": "offer", "summary": "terms", "details": [], "tags": [] }, + "payload": { + "proposal_id": "p1", + "title": "offer", + "summary": "terms", + "details": [], + "tags": [] + }, "expect": "accept" }, { "sender": "agent://buyer", "message_type": "Accept", "payload_type": "proposal.Accept", - "payload": { "proposal_id": "p1", "reason": "" }, + "payload": { + "proposal_id": "p1", + "reason": "" + }, "expect": "accept" }, { "sender": "agent://seller", "message_type": "Accept", "payload_type": "proposal.Accept", - "payload": { "proposal_id": "p1", "reason": "" }, + "payload": { + "proposal_id": "p1", + "reason": "" + }, "expect": "accept" }, { @@ -44,5 +59,25 @@ "expect": "accept" } ], - "expected_final_state": "Resolved" + "expected_final_state": "Resolved", + "expect_resolution_present": true, + "expected_resolution": { + "action": "proposal.accepted", + "mode_version": "1.0.0", + "configuration_version": "cfg-1" + }, + "expected_mode_state": { + "phase": "Committed", + "accepts": { + "agent://buyer": "p1", + "agent://seller": "p1" + }, + "proposals": { + "p1": { + "proposal_id": "p1", + "proposer": "agent://seller", + "disposition": "Live" + } + } + } } diff --git a/tests/conformance/proposal_reject_paths.json b/tests/conformance/proposal_reject_paths.json index 5820a82..5b1af5a 100644 --- a/tests/conformance/proposal_reject_paths.json +++ b/tests/conformance/proposal_reject_paths.json @@ -1,7 +1,10 @@ { "mode": "macp.mode.proposal.v1", "initiator": "agent://buyer", - "participants": ["agent://buyer", "agent://seller"], + "participants": [ + "agent://buyer", + "agent://seller" + ], "mode_version": "1.0.0", "configuration_version": "cfg-1", "policy_version": "policy-1", @@ -20,8 +23,26 @@ "policy_version": "policy-1", "configuration_version": "cfg-1" }, - "expect": "reject" + "expect": "reject", + "expected_error_code": "INVALID_ENVELOPE" + }, + { + "sender": "agent://seller", + "message_type": "CounterProposal", + "payload_type": "proposal.CounterProposal", + "payload": { + "proposal_id": "p2", + "supersedes_proposal_id": "", + "title": "counter", + "summary": "missing supersedes" + }, + "expect": "reject", + "expected_error_code": "INVALID_ENVELOPE" } ], - "expected_final_state": "Open" + "expected_final_state": "Open", + "expect_resolution_present": false, + "expected_mode_state": { + "phase": "Negotiating" + } } diff --git a/tests/conformance_loader.rs b/tests/conformance_loader.rs index 9801db4..e967b78 100644 --- a/tests/conformance_loader.rs +++ b/tests/conformance_loader.rs @@ -2,11 +2,13 @@ use chrono::Utc; use macp_runtime::log_store::LogStore; use macp_runtime::pb::{CommitmentPayload, Envelope, SessionStartPayload}; use macp_runtime::registry::SessionRegistry; +use macp_runtime::replay::replay_session; use macp_runtime::runtime::Runtime; -use macp_runtime::session::SessionState; +use macp_runtime::session::{Session, SessionState}; use macp_runtime::storage::MemoryBackend; use prost::Message; use serde::Deserialize; +use serde_json::{json, Value}; use std::path::Path; use std::sync::Arc; @@ -21,6 +23,14 @@ struct ConformanceFixture { ttl_ms: i64, messages: Vec, expected_final_state: String, + #[serde(default)] + expected_mode_state: Option, + #[serde(default)] + expected_resolution: Option, + #[serde(default)] + expect_resolution_present: Option, + #[serde(default = "default_true")] + verify_replay_equivalence: bool, } #[derive(Deserialize)] @@ -28,8 +38,14 @@ struct ConformanceMessage { sender: String, message_type: String, payload_type: String, - payload: serde_json::Value, + payload: Value, expect: String, + #[serde(default)] + expected_error_code: Option, +} + +fn default_true() -> bool { + true } fn new_sid() -> String { @@ -84,6 +100,19 @@ fn encode_decision_payload(msg: &ConformanceMessage) -> Vec { supporting_data: vec![], } .encode_to_vec(), + "Evaluation" => macp_runtime::decision_pb::EvaluationPayload { + proposal_id: p["proposal_id"].as_str().unwrap_or_default().into(), + recommendation: p["recommendation"].as_str().unwrap_or_default().into(), + confidence: p["confidence"].as_f64().unwrap_or_default(), + reason: p["reason"].as_str().unwrap_or_default().into(), + } + .encode_to_vec(), + "Objection" => macp_runtime::decision_pb::ObjectionPayload { + proposal_id: p["proposal_id"].as_str().unwrap_or_default().into(), + reason: p["reason"].as_str().unwrap_or_default().into(), + severity: p["severity"].as_str().unwrap_or_default().into(), + } + .encode_to_vec(), "Vote" => macp_runtime::decision_pb::VotePayload { proposal_id: p["proposal_id"].as_str().unwrap_or_default().into(), vote: p["vote"].as_str().unwrap_or_default().into(), @@ -105,11 +134,33 @@ fn encode_proposal_payload(msg: &ConformanceMessage) -> Vec { tags: vec![], } .encode_to_vec(), + "CounterProposal" => macp_runtime::proposal_pb::CounterProposalPayload { + proposal_id: p["proposal_id"].as_str().unwrap_or_default().into(), + supersedes_proposal_id: p["supersedes_proposal_id"] + .as_str() + .unwrap_or_default() + .into(), + title: p["title"].as_str().unwrap_or_default().into(), + summary: p["summary"].as_str().unwrap_or_default().into(), + details: vec![], + } + .encode_to_vec(), "Accept" => macp_runtime::proposal_pb::AcceptPayload { proposal_id: p["proposal_id"].as_str().unwrap_or_default().into(), reason: p["reason"].as_str().unwrap_or_default().into(), } .encode_to_vec(), + "Reject" => macp_runtime::proposal_pb::RejectPayload { + proposal_id: p["proposal_id"].as_str().unwrap_or_default().into(), + reason: p["reason"].as_str().unwrap_or_default().into(), + terminal: p["terminal"].as_bool().unwrap_or(false), + } + .encode_to_vec(), + "Withdraw" => macp_runtime::proposal_pb::WithdrawPayload { + proposal_id: p["proposal_id"].as_str().unwrap_or_default().into(), + reason: p["reason"].as_str().unwrap_or_default().into(), + } + .encode_to_vec(), _ => panic!("Unhandled proposal message: {}", msg.message_type), } } @@ -159,6 +210,21 @@ fn encode_handoff_payload(msg: &ConformanceMessage) -> Vec { reason: p["reason"].as_str().unwrap_or_default().into(), } .encode_to_vec(), + "HandoffDecline" => macp_runtime::handoff_pb::HandoffDeclinePayload { + handoff_id: p["handoff_id"].as_str().unwrap_or_default().into(), + declined_by: p["declined_by"].as_str().unwrap_or_default().into(), + reason: p["reason"].as_str().unwrap_or_default().into(), + } + .encode_to_vec(), + "HandoffContext" => macp_runtime::handoff_pb::HandoffContextPayload { + handoff_id: p["handoff_id"].as_str().unwrap_or_default().into(), + content_type: p["content_type"].as_str().unwrap_or_default().into(), + context: p["context"] + .as_str() + .map(|s| s.as_bytes().to_vec()) + .unwrap_or_default(), + } + .encode_to_vec(), _ => panic!("Unhandled handoff message: {}", msg.message_type), } } @@ -186,16 +252,91 @@ fn encode_quorum_payload(msg: &ConformanceMessage) -> Vec { fn encode_multi_round_payload(msg: &ConformanceMessage) -> Vec { let p = &msg.payload; match msg.message_type.as_str() { - "Contribute" => { - // Multi-round uses JSON-encoded ContributePayload - serde_json::json!({"value": p["value"].as_str().unwrap_or_default()}) - .to_string() - .into_bytes() - } + "Contribute" => json!({"value": p["value"].as_str().unwrap_or_default()}) + .to_string() + .into_bytes(), _ => panic!("Unhandled multi_round message: {}", msg.message_type), } } +fn expected_state(name: &str) -> SessionState { + match name { + "Open" => SessionState::Open, + "Resolved" => SessionState::Resolved, + "Expired" => SessionState::Expired, + other => panic!("Unknown expected_final_state: {other}"), + } +} + +fn resolution_to_json(resolution: &[u8]) -> Option { + CommitmentPayload::decode(resolution) + .ok() + .map(|commitment| { + json!({ + "commitment_id": commitment.commitment_id, + "action": commitment.action, + "authority_scope": commitment.authority_scope, + "reason": commitment.reason, + "mode_version": commitment.mode_version, + "policy_version": commitment.policy_version, + "configuration_version": commitment.configuration_version, + }) + }) +} + +fn mode_state_to_json(session: &Session) -> Option { + if session.mode_state.is_empty() { + return None; + } + serde_json::from_slice(&session.mode_state).ok() +} + +fn assert_json_contains(actual: &Value, expected: &Value) { + match (actual, expected) { + (Value::Object(actual_map), Value::Object(expected_map)) => { + for (key, expected_value) in expected_map { + let actual_value = actual_map + .get(key) + .unwrap_or_else(|| panic!("missing key '{key}' in actual json {actual:?}")); + assert_json_contains(actual_value, expected_value); + } + } + (Value::Array(actual_items), Value::Array(expected_items)) => { + assert!( + actual_items.len() >= expected_items.len(), + "actual array shorter than expected: {actual_items:?} vs {expected_items:?}" + ); + for (idx, expected_item) in expected_items.iter().enumerate() { + assert_json_contains(&actual_items[idx], expected_item); + } + } + _ => assert_eq!(actual, expected), + } +} + +async fn assert_replay_equivalence(rt: &Runtime, sid: &str, live_session: &Session) { + let log = rt + .log_store + .get_log(sid) + .await + .unwrap_or_else(|| panic!("missing log entries for {sid}")); + let replayed = replay_session(sid, &log, rt.mode_registry().as_ref()) + .unwrap_or_else(|e| panic!("replay failed for {sid}: {e}")); + assert_eq!(replayed.state, live_session.state, "replay state mismatch"); + assert_eq!( + replayed.resolution, live_session.resolution, + "replay resolution mismatch" + ); + assert_eq!( + replayed.mode_state, live_session.mode_state, + "replay mode_state mismatch" + ); + assert_eq!( + replayed.seen_message_ids, live_session.seen_message_ids, + "replay dedup state mismatch" + ); +} + async fn run_conformance_fixture(path: &Path) { let content = std::fs::read_to_string(path) .unwrap_or_else(|e| panic!("Failed to read fixture {}: {e}", path.display())); @@ -205,7 +346,6 @@ async fn run_conformance_fixture(path: &Path) { let rt = make_runtime(); let sid = new_sid(); - // SessionStart let start_payload = SessionStartPayload { intent: "conformance".into(), participants: fixture.participants.clone(), @@ -234,7 +374,6 @@ async fn run_conformance_fixture(path: &Path) { .await .unwrap_or_else(|e| panic!("SessionStart failed for {}: {e}", path.display())); - // Process messages for (i, msg) in fixture.messages.iter().enumerate() { let payload = encode_payload(&fixture, msg); let env = Envelope { @@ -268,25 +407,59 @@ async fn run_conformance_fixture(path: &Path) { msg.message_type, path.display() ); + if let Some(expected_error_code) = &msg.expected_error_code { + let err = result.unwrap_err(); + assert_eq!( + err.error_code(), + expected_error_code, + "reject error code mismatch at message {} ({}) in {}", + i + 1, + msg.message_type, + path.display() + ); + } } other => panic!("Unknown expect value: {other}"), } } - // Verify final state let session = rt.get_session_checked(&sid).await.unwrap(); - let expected_state = match fixture.expected_final_state.as_str() { - "Open" => SessionState::Open, - "Resolved" => SessionState::Resolved, - "Expired" => SessionState::Expired, - other => panic!("Unknown expected_final_state: {other}"), - }; assert_eq!( session.state, - expected_state, + expected_state(&fixture.expected_final_state), "Final state mismatch for {}", path.display() ); + + if let Some(expect_resolution_present) = fixture.expect_resolution_present { + assert_eq!( + session.resolution.is_some(), + expect_resolution_present, + "resolution presence mismatch for {}", + path.display() + ); + } + + if let Some(expected_resolution) = &fixture.expected_resolution { + let actual_resolution = session + .resolution + .as_ref() + .and_then(|resolution| resolution_to_json(resolution)) + .unwrap_or_else(|| { + panic!("resolution missing or not decodable for {}", path.display()) + }); + assert_json_contains(&actual_resolution, expected_resolution); + } + + if let Some(expected_mode_state) = &fixture.expected_mode_state { + let actual_mode_state = mode_state_to_json(&session) + .unwrap_or_else(|| panic!("mode state missing or not json for {}", path.display())); + assert_json_contains(&actual_mode_state, expected_mode_state); + } + + if fixture.verify_replay_equivalence { + assert_replay_equivalence(&rt, &sid, &session).await; + } } macro_rules! conformance_test { @@ -310,8 +483,6 @@ conformance_test!( conformance_multi_round_happy_path, "multi_round_happy_path.json" ); - -// Negative-path (rejection) conformance tests conformance_test!( conformance_decision_reject_paths, "decision_reject_paths.json" diff --git a/tests/replay_round_trip.rs b/tests/replay_round_trip.rs index e7df68d..47c8123 100644 --- a/tests/replay_round_trip.rs +++ b/tests/replay_round_trip.rs @@ -57,6 +57,68 @@ fn commitment(action: &str) -> Vec { .encode_to_vec() } +#[test] +fn replay_decision_session() { + use macp_runtime::decision_pb::{ProposalPayload, VotePayload}; + + let registry = make_registry(); + let mode = "macp.mode.decision.v1"; + + let entries = vec![ + incoming( + "m1", + "SessionStart", + "agent://orchestrator", + start_payload(vec!["agent://a", "agent://b"]), + mode, + 1000, + ), + incoming( + "m2", + "Proposal", + "agent://orchestrator", + ProposalPayload { + proposal_id: "p1".into(), + option: "deploy".into(), + rationale: "ready".into(), + supporting_data: vec![], + } + .encode_to_vec(), + mode, + 2000, + ), + incoming( + "m3", + "Vote", + "agent://a", + VotePayload { + proposal_id: "p1".into(), + vote: "approve".into(), + reason: "good".into(), + } + .encode_to_vec(), + mode, + 3000, + ), + incoming( + "m4", + "Commitment", + "agent://orchestrator", + commitment("decision.selected"), + mode, + 4000, + ), + ]; + + let session = replay_session("s1", &entries, ®istry).unwrap(); + assert_eq!(session.state, SessionState::Resolved); + assert!(session.resolution.is_some()); + assert_eq!(session.seen_message_ids.len(), 4); + let mode_state: serde_json::Value = serde_json::from_slice(&session.mode_state).unwrap(); + assert_eq!(mode_state["phase"], "Committed"); + assert_eq!(mode_state["votes"]["p1"]["agent://a"]["vote"], "approve"); +} + #[test] fn replay_proposal_session() { use macp_runtime::proposal_pb::{AcceptPayload, ProposalPayload};