diff --git a/Cargo.toml b/Cargo.toml index 163db37..8dd0cbf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ tokio-stream = "0.1" futures-core = "0.3" async-stream = "0.3" async-trait = "0.1" +uuid = { version = "1", features = ["v4"] } [dev-dependencies] tempfile = "3" diff --git a/README.md b/README.md index d960dd7..08c7802 100644 --- a/README.md +++ b/README.md @@ -28,9 +28,18 @@ This runtime implements the current MACP core/service surface, the five standard - per-session append-only log files and session snapshots via `FileBackend` - crash recovery with dedup state reconciliation - atomic writes (tmp file + rename) prevent partial-write corruption -- **Authoritative session streaming** - - `StreamSession` now emits accepted MACP envelopes for one bound session per stream - - mixed-session streams are rejected +- **Authoritative accepted history** + - log append failures are now fatal — messages are not acknowledged without a durable record + - session state is rebuilt from append-only logs on startup via replay (no snapshot dependency) + - `LogEntry` enriched with `session_id`, `mode`, `macp_version` for self-describing replay +- **Session ID security policy** + - session IDs must be UUID v4/v7 (hyphenated lowercase) or base64url tokens (22+ chars) + - weak/human-readable IDs are rejected with `INVALID_SESSION_ID` +- **Signal enforcement** + - Signals are strictly ambient — non-empty `session_id` or `mode` is rejected +- **StreamSession disabled in freeze profile** + - `Initialize` advertises `stream: false` + - `StreamSession` RPC returns `UNIMPLEMENTED` (implementation retained for future activation) - `WatchModeRegistry` and `WatchRoots` remain unimplemented ## Implemented modes @@ -177,7 +186,7 @@ cargo run --bin fuzz_client | `GetManifest` | implemented | | `ListModes` | implemented | | `ListRoots` | implemented | -| `StreamSession` | implemented (accepted-envelope session stream) | +| `StreamSession` | disabled (unary-first freeze profile) | | `WatchModeRegistry` | unimplemented | | `WatchRoots` | unimplemented | @@ -195,6 +204,7 @@ runtime/ │ ├── registry.rs # session store with optional persistence │ ├── log_store.rs # in-memory accepted-history log cache │ ├── storage.rs # storage backend trait, FileBackend persistence, crash recovery +│ ├── replay.rs # session rebuild from append-only log │ ├── mode/ # mode implementations │ └── bin/ # local development example clients ├── docs/ @@ -206,6 +216,6 @@ runtime/ - The RFC/spec repository remains the normative source for protocol semantics. - This runtime only accepts the canonical standards-track mode identifiers for the five main modes. - `multi_round` remains experimental and is not advertised by discovery RPCs. -- `StreamSession` is available for bidirectional session-scoped coordination. Use `Send` when you need per-message negative acknowledgements. The current implementation streams future accepted envelopes from the time the stream binds; it does not backfill earlier accepted history. +- `StreamSession` is disabled in the freeze profile. The implementation is retained for future activation. See `docs/README.md` and `docs/examples.md` for the updated local development and usage guidance. diff --git a/docs/examples.md b/docs/examples.md index 922124a..fc57bce 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -57,7 +57,7 @@ Flow: Important runtime behavior: - initiator/coordinator may emit `Proposal` and `Commitment` -- participants emit `Evaluation`, `Objection`, and `Vote` +- declared participants may also emit `Proposal`, `Evaluation`, `Objection`, and `Vote` - duplicate proposal IDs are rejected - votes are tracked per proposal, per sender - `CommitmentPayload` version fields must match the bound session versions @@ -141,9 +141,11 @@ cargo run --bin multi_round_client This mode is still experimental. It remains callable by the explicit canonical name `macp.mode.multi_round.v1`, but it is not advertised by discovery RPCs and it does not use the strict standards-track `SessionStart` contract. -## Example 7: StreamSession +## Example 7: StreamSession (disabled in freeze profile) -`StreamSession` emits only accepted canonical MACP envelopes. A single gRPC stream binds to one session. If a client needs negative per-message acknowledgements, it should continue to use `Send`. +`StreamSession` is disabled in the unary-first freeze profile. The `Initialize` response advertises `stream: false` and the RPC returns `UNIMPLEMENTED`. The implementation is retained for future activation. + +When enabled, `StreamSession` emits only accepted canonical MACP envelopes. A single gRPC stream binds to one session. If a client needs negative per-message acknowledgements, it should continue to use `Send`. Practical notes: @@ -171,6 +173,15 @@ This client exercises common failure paths for the freeze profile, including: - payload too large - session access without membership +## Session ID policy + +Session IDs must be either: + +- **UUID v4/v7** in hyphenated lowercase canonical form (36 characters, e.g. `550e8400-e29b-41d4-a716-446655440000`) +- **Base64url token** of at least 22 characters using only `[A-Za-z0-9_-]` + +Human-readable or short IDs (e.g. `"my-session"`, `"s1"`) are rejected with `INVALID_SESSION_ID`. The example clients generate UUID v4 session IDs automatically. + ## Common troubleshooting ### `UNAUTHENTICATED` diff --git a/src/bin/client.rs b/src/bin/client.rs index fc4198e..e9d361f 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -11,6 +11,7 @@ use prost::Message; #[tokio::main] async fn main() -> Result<(), Box> { let mut client = common::connect_client().await?; + let session_id = common::new_session_id(); let init = initialize(&mut client).await?; println!( @@ -36,7 +37,7 @@ async fn main() -> Result<(), Box> { "macp.mode.decision.v1", "SessionStart", "m1", - "decision-demo-1", + &session_id, "coordinator", canonical_start_payload("select the deployment plan", &["alice", "bob"], 60_000), ); @@ -56,7 +57,7 @@ async fn main() -> Result<(), Box> { "macp.mode.decision.v1", "Proposal", "m2", - "decision-demo-1", + &session_id, "coordinator", proposal.encode_to_vec(), ), @@ -77,7 +78,7 @@ async fn main() -> Result<(), Box> { "macp.mode.decision.v1", "Evaluation", "m3", - "decision-demo-1", + &session_id, "alice", evaluation.encode_to_vec(), ), @@ -97,7 +98,7 @@ async fn main() -> Result<(), Box> { "macp.mode.decision.v1", "Vote", "m4", - "decision-demo-1", + &session_id, "bob", vote.encode_to_vec(), ), @@ -112,7 +113,7 @@ async fn main() -> Result<(), Box> { "macp.mode.decision.v1", "Commitment", "m5", - "decision-demo-1", + &session_id, "coordinator", canonical_commitment_payload( "c1", @@ -125,7 +126,7 @@ async fn main() -> Result<(), Box> { .await?; print_ack("commitment", &ack); - let session = get_session_as(&mut client, "alice", "decision-demo-1").await?; + let session = get_session_as(&mut client, "alice", &session_id).await?; let meta = session.metadata.expect("metadata"); println!("GetSession: state={} mode={}", meta.state, meta.mode); diff --git a/src/bin/fuzz_client.rs b/src/bin/fuzz_client.rs index 24b88f3..2e5a837 100644 --- a/src/bin/fuzz_client.rs +++ b/src/bin/fuzz_client.rs @@ -11,6 +11,7 @@ use prost::Message; #[tokio::main] async fn main() -> Result<(), Box> { let mut client = common::connect_client().await?; + let session_id = common::new_session_id(); println!("=== Freeze-Profile Error Path Demo ===\n"); @@ -72,7 +73,7 @@ async fn main() -> Result<(), Box> { "macp.mode.task.v1", "SessionStart", "m0", - "freeze-task-1", + &session_id, "planner", canonical_start_payload("freeze checks", &["planner", "worker"], 60_000), ); @@ -91,7 +92,7 @@ async fn main() -> Result<(), Box> { "macp.mode.task.v1", "TaskRequest", "dup-1", - "freeze-task-1", + &session_id, "planner", duplicate_request.encode_to_vec(), ); @@ -104,7 +105,7 @@ async fn main() -> Result<(), Box> { "macp.mode.task.v1", "TaskRequest", "spoof-1", - "freeze-task-1", + &session_id, "mallory", vec![1, 2, 3], ); @@ -115,14 +116,14 @@ async fn main() -> Result<(), Box> { "macp.mode.task.v1", "TaskUpdate", "big-1", - "freeze-task-1", + &session_id, "worker", vec![b'x'; 2_000_000], ); let ack = send_as(&mut client, "worker", oversized).await?; print_ack("payload_too_large", &ack); - match get_session_as(&mut client, "outsider", "freeze-task-1").await { + match get_session_as(&mut client, "outsider", &session_id).await { Ok(resp) => println!( "[forbidden_get_session] unexpected success: {:?}", resp.metadata @@ -130,7 +131,7 @@ async fn main() -> Result<(), Box> { Err(status) => println!("[forbidden_get_session] grpc error: {status}"), } - let cancelled = cancel_session_as(&mut client, "planner", "freeze-task-1", "end demo").await?; + let cancelled = cancel_session_as(&mut client, "planner", &session_id, "end demo").await?; if let Some(ack) = cancelled.ack.as_ref() { print_ack("cancel_session", ack); } diff --git a/src/bin/handoff_client.rs b/src/bin/handoff_client.rs index e878b89..734103f 100644 --- a/src/bin/handoff_client.rs +++ b/src/bin/handoff_client.rs @@ -11,6 +11,7 @@ use prost::Message; #[tokio::main] async fn main() -> Result<(), Box> { let mut client = common::connect_client().await?; + let session_id = common::new_session_id(); println!("=== Handoff Mode Demo ===\n"); @@ -21,7 +22,7 @@ async fn main() -> Result<(), Box> { "macp.mode.handoff.v1", "SessionStart", "m0", - "handoff-demo-1", + &session_id, "owner", canonical_start_payload("escalate support ticket", &["owner", "target"], 60_000), ), @@ -42,7 +43,7 @@ async fn main() -> Result<(), Box> { "macp.mode.handoff.v1", "HandoffOffer", "m1", - "handoff-demo-1", + &session_id, "owner", offer.encode_to_vec(), ), @@ -62,7 +63,7 @@ async fn main() -> Result<(), Box> { "macp.mode.handoff.v1", "HandoffContext", "m2", - "handoff-demo-1", + &session_id, "owner", context.encode_to_vec(), ), @@ -82,7 +83,7 @@ async fn main() -> Result<(), Box> { "macp.mode.handoff.v1", "HandoffAccept", "m3", - "handoff-demo-1", + &session_id, "target", accept.encode_to_vec(), ), @@ -97,7 +98,7 @@ async fn main() -> Result<(), Box> { "macp.mode.handoff.v1", "Commitment", "m4", - "handoff-demo-1", + &session_id, "owner", canonical_commitment_payload( "c1", @@ -110,7 +111,7 @@ async fn main() -> Result<(), Box> { .await?; print_ack("commitment", &ack); - let session = get_session_as(&mut client, "target", "handoff-demo-1").await?; + let session = get_session_as(&mut client, "target", &session_id).await?; let meta = session.metadata.expect("metadata"); println!("[get_session] state={} mode={}", meta.state, meta.mode); diff --git a/src/bin/multi_round_client.rs b/src/bin/multi_round_client.rs index 8821a52..4267fc1 100644 --- a/src/bin/multi_round_client.rs +++ b/src/bin/multi_round_client.rs @@ -8,6 +8,7 @@ use prost::Message; #[tokio::main] async fn main() -> Result<(), Box> { let mut client = common::connect_client().await?; + let session_id = common::new_session_id(); println!("=== Multi-Round Convergence Demo ===\n"); @@ -28,7 +29,7 @@ async fn main() -> Result<(), Box> { "macp.mode.multi_round.v1", "SessionStart", "m0", - "multi-round-demo-1", + &session_id, "coordinator", start_payload.encode_to_vec(), ), @@ -43,7 +44,7 @@ async fn main() -> Result<(), Box> { "macp.mode.multi_round.v1", "Contribute", "m1", - "multi-round-demo-1", + &session_id, "alice", br#"{"value":"option_a"}"#.to_vec(), ), @@ -58,7 +59,7 @@ async fn main() -> Result<(), Box> { "macp.mode.multi_round.v1", "Contribute", "m2", - "multi-round-demo-1", + &session_id, "bob", br#"{"value":"option_b"}"#.to_vec(), ), @@ -66,7 +67,7 @@ async fn main() -> Result<(), Box> { .await?; print_ack("bob_contributes_b", &ack); - let session = get_session_as(&mut client, "alice", "multi-round-demo-1").await?; + let session = get_session_as(&mut client, "alice", &session_id).await?; let meta = session.metadata.expect("metadata"); println!("[get_session] state={} mode={}", meta.state, meta.mode); @@ -77,7 +78,7 @@ async fn main() -> Result<(), Box> { "macp.mode.multi_round.v1", "Contribute", "m3", - "multi-round-demo-1", + &session_id, "bob", br#"{"value":"option_a"}"#.to_vec(), ), @@ -85,7 +86,7 @@ async fn main() -> Result<(), Box> { .await?; print_ack("bob_revises", &ack); - let session = get_session_as(&mut client, "alice", "multi-round-demo-1").await?; + let session = get_session_as(&mut client, "alice", &session_id).await?; let meta = session.metadata.expect("metadata"); println!("[get_session] state={} mode={}", meta.state, meta.mode); diff --git a/src/bin/proposal_client.rs b/src/bin/proposal_client.rs index 3842367..6399319 100644 --- a/src/bin/proposal_client.rs +++ b/src/bin/proposal_client.rs @@ -11,6 +11,7 @@ use prost::Message; #[tokio::main] async fn main() -> Result<(), Box> { let mut client = common::connect_client().await?; + let session_id = common::new_session_id(); println!("=== Proposal Mode Demo ===\n"); @@ -21,7 +22,7 @@ async fn main() -> Result<(), Box> { "macp.mode.proposal.v1", "SessionStart", "m0", - "proposal-demo-1", + &session_id, "buyer", canonical_start_payload("negotiate price", &["buyer", "seller"], 60_000), ), @@ -43,7 +44,7 @@ async fn main() -> Result<(), Box> { "macp.mode.proposal.v1", "Proposal", "m1", - "proposal-demo-1", + &session_id, "seller", proposal.encode_to_vec(), ), @@ -65,7 +66,7 @@ async fn main() -> Result<(), Box> { "macp.mode.proposal.v1", "CounterProposal", "m2", - "proposal-demo-1", + &session_id, "buyer", counter.encode_to_vec(), ), @@ -84,7 +85,7 @@ async fn main() -> Result<(), Box> { "macp.mode.proposal.v1", "Accept", "m3", - "proposal-demo-1", + &session_id, "buyer", accept.encode_to_vec(), ), @@ -103,7 +104,7 @@ async fn main() -> Result<(), Box> { "macp.mode.proposal.v1", "Accept", "m4", - "proposal-demo-1", + &session_id, "seller", accept.encode_to_vec(), ), @@ -118,7 +119,7 @@ async fn main() -> Result<(), Box> { "macp.mode.proposal.v1", "Commitment", "m5", - "proposal-demo-1", + &session_id, "buyer", canonical_commitment_payload( "c1", @@ -131,7 +132,7 @@ async fn main() -> Result<(), Box> { .await?; print_ack("commitment", &ack); - let session = get_session_as(&mut client, "seller", "proposal-demo-1").await?; + let session = get_session_as(&mut client, "seller", &session_id).await?; let meta = session.metadata.expect("metadata"); println!("[get_session] state={} mode={}", meta.state, meta.mode); diff --git a/src/bin/quorum_client.rs b/src/bin/quorum_client.rs index 041f635..9e8cbf3 100644 --- a/src/bin/quorum_client.rs +++ b/src/bin/quorum_client.rs @@ -11,6 +11,7 @@ use prost::Message; #[tokio::main] async fn main() -> Result<(), Box> { let mut client = common::connect_client().await?; + let session_id = common::new_session_id(); println!("=== Quorum Mode Demo ===\n"); @@ -21,7 +22,7 @@ async fn main() -> Result<(), Box> { "macp.mode.quorum.v1", "SessionStart", "m0", - "quorum-demo-1", + &session_id, "coordinator", canonical_start_payload( "approve production deploy", @@ -47,7 +48,7 @@ async fn main() -> Result<(), Box> { "macp.mode.quorum.v1", "ApprovalRequest", "m1", - "quorum-demo-1", + &session_id, "coordinator", request.encode_to_vec(), ), @@ -66,7 +67,7 @@ async fn main() -> Result<(), Box> { "macp.mode.quorum.v1", "Approve", "m2", - "quorum-demo-1", + &session_id, "alice", approve.encode_to_vec(), ), @@ -85,7 +86,7 @@ async fn main() -> Result<(), Box> { "macp.mode.quorum.v1", "Approve", "m3", - "quorum-demo-1", + &session_id, "bob", approve.encode_to_vec(), ), @@ -100,7 +101,7 @@ async fn main() -> Result<(), Box> { "macp.mode.quorum.v1", "Commitment", "m4", - "quorum-demo-1", + &session_id, "coordinator", canonical_commitment_payload( "c1", @@ -113,7 +114,7 @@ async fn main() -> Result<(), Box> { .await?; print_ack("commitment", &ack); - let session = get_session_as(&mut client, "carol", "quorum-demo-1").await?; + let session = get_session_as(&mut client, "carol", &session_id).await?; let meta = session.metadata.expect("metadata"); println!("[get_session] state={} mode={}", meta.state, meta.mode); diff --git a/src/bin/support/common.rs b/src/bin/support/common.rs index 032c903..60cf33a 100644 --- a/src/bin/support/common.rs +++ b/src/bin/support/common.rs @@ -11,6 +11,10 @@ use prost::Message; use tonic::transport::Channel; use tonic::Request; +pub fn new_session_id() -> String { + uuid::Uuid::new_v4().as_hyphenated().to_string() +} + pub const DEV_ENDPOINT: &str = "http://127.0.0.1:50051"; pub const MODE_VERSION: &str = "1.0.0"; pub const CONFIG_VERSION: &str = "config.default"; diff --git a/src/bin/task_client.rs b/src/bin/task_client.rs index 7b9f141..3ae7b2e 100644 --- a/src/bin/task_client.rs +++ b/src/bin/task_client.rs @@ -13,6 +13,7 @@ use prost::Message; #[tokio::main] async fn main() -> Result<(), Box> { let mut client = common::connect_client().await?; + let session_id = common::new_session_id(); println!("=== Task Mode Demo ===\n"); @@ -23,7 +24,7 @@ async fn main() -> Result<(), Box> { "macp.mode.task.v1", "SessionStart", "m0", - "task-demo-1", + &session_id, "planner", canonical_start_payload("summarize quarterly report", &["planner", "worker"], 60_000), ), @@ -46,7 +47,7 @@ async fn main() -> Result<(), Box> { "macp.mode.task.v1", "TaskRequest", "m1", - "task-demo-1", + &session_id, "planner", task_request.encode_to_vec(), ), @@ -66,7 +67,7 @@ async fn main() -> Result<(), Box> { "macp.mode.task.v1", "TaskAccept", "m2", - "task-demo-1", + &session_id, "worker", accept.encode_to_vec(), ), @@ -88,7 +89,7 @@ async fn main() -> Result<(), Box> { "macp.mode.task.v1", "TaskUpdate", "m3", - "task-demo-1", + &session_id, "worker", update.encode_to_vec(), ), @@ -109,7 +110,7 @@ async fn main() -> Result<(), Box> { "macp.mode.task.v1", "TaskComplete", "m4", - "task-demo-1", + &session_id, "worker", complete.encode_to_vec(), ), @@ -124,7 +125,7 @@ async fn main() -> Result<(), Box> { "macp.mode.task.v1", "Commitment", "m5", - "task-demo-1", + &session_id, "planner", canonical_commitment_payload( "c1", @@ -137,7 +138,7 @@ async fn main() -> Result<(), Box> { .await?; print_ack("commitment", &ack); - let session = get_session_as(&mut client, "worker", "task-demo-1").await?; + let session = get_session_as(&mut client, "worker", &session_id).await?; let meta = session.metadata.expect("metadata"); println!("[get_session] state={} mode={}", meta.state, meta.mode); diff --git a/src/error.rs b/src/error.rs index cd3f341..5814f17 100644 --- a/src/error.rs +++ b/src/error.rs @@ -35,6 +35,10 @@ pub enum MacpError { PayloadTooLarge, #[error("RateLimited")] RateLimited, + #[error("StorageFailed")] + StorageFailed, + #[error("InvalidSessionId")] + InvalidSessionId, } impl MacpError { @@ -56,6 +60,8 @@ impl MacpError { MacpError::DuplicateMessage => "DUPLICATE_MESSAGE", MacpError::PayloadTooLarge => "PAYLOAD_TOO_LARGE", MacpError::RateLimited => "RATE_LIMITED", + MacpError::StorageFailed => "INTERNAL_ERROR", + MacpError::InvalidSessionId => "INVALID_SESSION_ID", } } } @@ -85,6 +91,8 @@ mod tests { (MacpError::DuplicateMessage, "DUPLICATE_MESSAGE"), (MacpError::PayloadTooLarge, "PAYLOAD_TOO_LARGE"), (MacpError::RateLimited, "RATE_LIMITED"), + (MacpError::StorageFailed, "INTERNAL_ERROR"), + (MacpError::InvalidSessionId, "INVALID_SESSION_ID"), ]; for (error, expected_code) in cases { @@ -110,5 +118,7 @@ mod tests { assert_eq!(MacpError::DuplicateMessage.to_string(), "DuplicateMessage"); assert_eq!(MacpError::PayloadTooLarge.to_string(), "PayloadTooLarge"); assert_eq!(MacpError::RateLimited.to_string(), "RateLimited"); + assert_eq!(MacpError::StorageFailed.to_string(), "StorageFailed"); + assert_eq!(MacpError::InvalidSessionId.to_string(), "InvalidSessionId"); } } diff --git a/src/lib.rs b/src/lib.rs index 770c7f7..586697e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,6 +26,7 @@ pub mod error; pub mod log_store; pub mod mode; pub mod registry; +pub mod replay; pub mod runtime; pub mod session; pub mod storage; diff --git a/src/log_store.rs b/src/log_store.rs index e41cf9a..7eb2579 100644 --- a/src/log_store.rs +++ b/src/log_store.rs @@ -15,6 +15,12 @@ pub struct LogEntry { pub message_type: String, pub raw_payload: Vec, pub entry_kind: EntryKind, + #[serde(default)] + pub session_id: String, + #[serde(default)] + pub mode: String, + #[serde(default)] + pub macp_version: String, } pub struct LogStore { @@ -62,6 +68,9 @@ mod tests { message_type: "Message".into(), raw_payload: vec![], entry_kind: kind, + session_id: String::new(), + mode: String::new(), + macp_version: String::new(), } } diff --git a/src/main.rs b/src/main.rs index b945310..3843256 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,11 +3,11 @@ mod server; use macp_runtime::log_store::LogStore; use macp_runtime::pb; use macp_runtime::registry::SessionRegistry; +use macp_runtime::replay::replay_session; use macp_runtime::runtime::Runtime; use macp_runtime::security::SecurityLayer; use macp_runtime::storage::{ - cleanup_temp_files, migrate_if_needed, recover_session, FileBackend, MemoryBackend, - StorageBackend, + cleanup_temp_files, migrate_if_needed, FileBackend, MemoryBackend, StorageBackend, }; use server::MacpServer; use std::path::PathBuf; @@ -38,33 +38,80 @@ async fn main() -> Result<(), Box> { let log_store = Arc::new(LogStore::new()); if !memory_only { - let mut sessions = storage.load_all_sessions().await?; - for session in &mut sessions { - let log_entries = storage.load_log(&session.session_id).await?; - - // Crash recovery: reconcile dedup state from log - recover_session(session, &log_entries); - - // Populate in-memory log store - log_store.create_session_log(&session.session_id).await; - for entry in &log_entries { - log_store.append(&session.session_id, entry.clone()).await; - } - - // Persist recovered session state if it changed - if let Err(e) = storage.save_session(session).await { - eprintln!( - "warning: failed to persist recovered session '{}': {e}", - session.session_id - ); + // Build mode map for replay + use macp_runtime::mode::Mode; + use std::collections::HashMap; + let mut modes: HashMap> = HashMap::new(); + modes.insert( + "macp.mode.decision.v1".into(), + Box::new(macp_runtime::mode::decision::DecisionMode), + ); + modes.insert( + "macp.mode.proposal.v1".into(), + Box::new(macp_runtime::mode::proposal::ProposalMode), + ); + modes.insert( + "macp.mode.task.v1".into(), + Box::new(macp_runtime::mode::task::TaskMode), + ); + modes.insert( + "macp.mode.handoff.v1".into(), + Box::new(macp_runtime::mode::handoff::HandoffMode), + ); + modes.insert( + "macp.mode.quorum.v1".into(), + Box::new(macp_runtime::mode::quorum::QuorumMode), + ); + modes.insert( + "macp.mode.multi_round.v1".into(), + Box::new(macp_runtime::mode::multi_round::MultiRoundMode), + ); + + // Enumerate session directories and replay from logs + let sessions_dir = data_dir.join("sessions"); + let mut recovered = 0usize; + if sessions_dir.exists() { + for entry in std::fs::read_dir(&sessions_dir)? { + let entry = entry?; + if !entry.file_type()?.is_dir() { + continue; + } + let session_id = entry.file_name().to_string_lossy().to_string(); + let log_entries = storage.load_log(&session_id).await?; + if log_entries.is_empty() { + continue; + } + + match replay_session(&session_id, &log_entries, &modes) { + Ok(session) => { + // Best-effort snapshot update + if let Err(e) = storage.save_session(&session).await { + eprintln!( + "warning: failed to persist recovered session '{}': {e}", + session_id + ); + } + + // Populate in-memory log store + log_store.create_session_log(&session_id).await; + for log_entry in &log_entries { + log_store.append(&session_id, log_entry.clone()).await; + } + + registry.insert_session_for_test(session_id, session).await; + recovered += 1; + } + Err(e) => { + eprintln!( + "warning: failed to replay session '{}': {e}; skipping", + session_id + ); + } + } } - - registry - .insert_session_for_test(session.session_id.clone(), session.clone()) - .await; } - if !sessions.is_empty() { - println!("Loaded {} sessions from storage.", sessions.len()); + if recovered > 0 { + println!("Replayed {} sessions from log.", recovered); } } diff --git a/src/replay.rs b/src/replay.rs new file mode 100644 index 0000000..4130e66 --- /dev/null +++ b/src/replay.rs @@ -0,0 +1,354 @@ +use std::collections::HashMap; + +use crate::error::MacpError; +use crate::log_store::{EntryKind, LogEntry}; +use crate::mode::Mode; +use crate::pb::Envelope; +use crate::session::{ + extract_ttl_ms, is_standard_mode, parse_session_start_payload, + validate_standard_session_start_payload, Session, SessionState, +}; + +const EXPERIMENTAL_DEFAULT_TTL_MS: i64 = 60_000; + +/// Rebuild a `Session` from its append-only log. +/// +/// This replays the same mode callbacks (`on_session_start`, `on_message`) in +/// order so mode state, dedup state, and session lifecycle are reconstructed +/// identically to how they were built during live processing. +pub fn replay_session( + session_id: &str, + log_entries: &[LogEntry], + modes: &HashMap>, +) -> Result { + // 1. Find the SessionStart entry + let start_entry = log_entries + .iter() + .find(|e| e.entry_kind == EntryKind::Incoming && e.message_type == "SessionStart") + .ok_or(MacpError::InvalidPayload)?; + + // Determine mode: prefer entry-level field, fall back to empty for legacy + let mode_name = if start_entry.mode.is_empty() { + // Legacy v2 entry — cannot determine mode from log entry alone; + // caller should skip or use directory heuristic + return Err(MacpError::InvalidPayload); + } else { + &start_entry.mode + }; + + let mode = modes.get(mode_name).ok_or(MacpError::UnknownMode)?; + + // 2. Parse SessionStartPayload + let start_payload = if start_entry.raw_payload.is_empty() && !is_standard_mode(mode_name) { + crate::pb::SessionStartPayload::default() + } else { + parse_session_start_payload(&start_entry.raw_payload)? + }; + validate_standard_session_start_payload(mode_name, &start_payload)?; + + let ttl_ms = if is_standard_mode(mode_name) { + extract_ttl_ms(&start_payload)? + } else if start_payload.ttl_ms == 0 { + EXPERIMENTAL_DEFAULT_TTL_MS + } else { + extract_ttl_ms(&start_payload)? + }; + + // 3. Construct base session — use original received_at_ms, never Utc::now() + let started_at_unix_ms = start_entry.received_at_ms; + let ttl_expiry = started_at_unix_ms.saturating_add(ttl_ms); + + let env = Envelope { + macp_version: if start_entry.macp_version.is_empty() { + "1.0".into() + } else { + start_entry.macp_version.clone() + }, + mode: mode_name.to_string(), + message_type: "SessionStart".into(), + message_id: start_entry.message_id.clone(), + session_id: session_id.into(), + sender: start_entry.sender.clone(), + timestamp_unix_ms: start_entry.received_at_ms, + payload: start_entry.raw_payload.clone(), + }; + + let mut session = Session { + session_id: session_id.into(), + state: SessionState::Open, + ttl_expiry, + ttl_ms, + started_at_unix_ms, + resolution: None, + mode: mode_name.to_string(), + mode_state: vec![], + participants: start_payload.participants.clone(), + seen_message_ids: std::collections::HashSet::new(), + intent: start_payload.intent.clone(), + mode_version: start_payload.mode_version.clone(), + configuration_version: start_payload.configuration_version.clone(), + policy_version: start_payload.policy_version.clone(), + context: start_payload.context.clone(), + roots: start_payload.roots.clone(), + initiator_sender: start_entry.sender.clone(), + }; + + // 4. Call mode.on_session_start(), apply response + let response = mode.on_session_start(&session, &env)?; + session.seen_message_ids.insert(env.message_id.clone()); + session.apply_mode_response(response); + + // 5. Multi-round participant back-fill + if session.participants.is_empty() && !session.mode_state.is_empty() { + if let Ok(state) = + serde_json::from_slice::(&session.mode_state) + { + session.participants = state.participants.clone(); + } + } + + // 6. Replay subsequent entries + for entry in log_entries.iter().skip(1) { + match entry.entry_kind { + EntryKind::Incoming => { + let replay_env = Envelope { + macp_version: if entry.macp_version.is_empty() { + "1.0".into() + } else { + entry.macp_version.clone() + }, + mode: if entry.mode.is_empty() { + session.mode.clone() + } else { + entry.mode.clone() + }, + message_type: entry.message_type.clone(), + message_id: entry.message_id.clone(), + session_id: session_id.into(), + sender: entry.sender.clone(), + timestamp_unix_ms: entry.received_at_ms, + payload: entry.raw_payload.clone(), + }; + + if session.state != SessionState::Open { + // Session already resolved/expired, just rebuild dedup + if !replay_env.message_id.is_empty() { + session.seen_message_ids.insert(replay_env.message_id); + } + continue; + } + + // Replay through mode — errors during replay are not fatal, + // the message was already accepted in the original run + if let Ok(response) = mode.on_message(&session, &replay_env) { + session.apply_mode_response(response); + } + if !replay_env.message_id.is_empty() { + session.seen_message_ids.insert(replay_env.message_id); + } + } + EntryKind::Internal => { + match entry.message_type.as_str() { + "TtlExpired" => { + session.state = SessionState::Expired; + } + "SessionCancel" => { + session.state = SessionState::Expired; + } + _ => { + // Unknown internal event — skip + } + } + } + } + } + + Ok(session) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::decision_pb::ProposalPayload; + use crate::decision_pb::VotePayload; + use crate::log_store::EntryKind; + use crate::mode::decision::DecisionMode; + use crate::mode::Mode; + use crate::pb::{CommitmentPayload, SessionStartPayload}; + use prost::Message; + + fn make_modes() -> HashMap> { + let mut modes: HashMap> = HashMap::new(); + modes.insert("macp.mode.decision.v1".into(), Box::new(DecisionMode)); + modes + } + + fn start_payload_bytes() -> Vec { + SessionStartPayload { + intent: "test".into(), + participants: vec!["agent://fraud".into()], + mode_version: "1.0.0".into(), + configuration_version: "cfg-1".into(), + policy_version: "policy-1".into(), + ttl_ms: 60_000, + context: vec![], + roots: vec![], + } + .encode_to_vec() + } + + fn incoming_entry( + message_id: &str, + message_type: &str, + sender: &str, + payload: Vec, + received_at_ms: i64, + ) -> LogEntry { + LogEntry { + message_id: message_id.into(), + received_at_ms, + sender: sender.into(), + message_type: message_type.into(), + raw_payload: payload, + entry_kind: EntryKind::Incoming, + session_id: "s1".into(), + mode: "macp.mode.decision.v1".into(), + macp_version: "1.0".into(), + } + } + + fn internal_entry(message_type: &str, received_at_ms: i64) -> LogEntry { + LogEntry { + message_id: String::new(), + received_at_ms, + sender: "_runtime".into(), + message_type: message_type.into(), + raw_payload: vec![], + entry_kind: EntryKind::Internal, + session_id: "s1".into(), + mode: "macp.mode.decision.v1".into(), + macp_version: "1.0".into(), + } + } + + #[test] + fn replay_rebuilds_decision_session() { + let modes = make_modes(); + let proposal = ProposalPayload { + proposal_id: "p1".into(), + option: "deploy".into(), + rationale: "ready".into(), + supporting_data: vec![], + } + .encode_to_vec(); + let vote = VotePayload { + proposal_id: "p1".into(), + vote: "approve".into(), + reason: "lgtm".into(), + } + .encode_to_vec(); + let commitment = CommitmentPayload { + commitment_id: "c1".into(), + action: "decision.selected".into(), + authority_scope: "payments".into(), + reason: "bound".into(), + mode_version: "1.0.0".into(), + policy_version: "policy-1".into(), + configuration_version: "cfg-1".into(), + } + .encode_to_vec(); + + let entries = vec![ + incoming_entry( + "m1", + "SessionStart", + "agent://orchestrator", + start_payload_bytes(), + 1000, + ), + incoming_entry("m2", "Proposal", "agent://orchestrator", proposal, 2000), + incoming_entry("m3", "Vote", "agent://fraud", vote, 3000), + incoming_entry("m4", "Commitment", "agent://orchestrator", commitment, 4000), + ]; + + let session = replay_session("s1", &entries, &modes).unwrap(); + assert_eq!(session.state, SessionState::Resolved); + assert_eq!(session.session_id, "s1"); + assert!(session.seen_message_ids.contains("m1")); + assert!(session.seen_message_ids.contains("m2")); + assert!(session.seen_message_ids.contains("m3")); + assert!(session.seen_message_ids.contains("m4")); + assert!(session.resolution.is_some()); + } + + #[test] + fn replay_preserves_original_ttl() { + let modes = make_modes(); + let original_time = 1_700_000_000_000i64; + let entries = vec![incoming_entry( + "m1", + "SessionStart", + "agent://orchestrator", + start_payload_bytes(), + original_time, + )]; + + let session = replay_session("s1", &entries, &modes).unwrap(); + assert_eq!(session.started_at_unix_ms, original_time); + assert_eq!(session.ttl_expiry, original_time + 60_000); + assert_eq!(session.ttl_ms, 60_000); + } + + #[test] + fn replay_handles_ttl_expired() { + let modes = make_modes(); + let entries = vec![ + incoming_entry( + "m1", + "SessionStart", + "agent://orchestrator", + start_payload_bytes(), + 1000, + ), + internal_entry("TtlExpired", 61001), + ]; + + let session = replay_session("s1", &entries, &modes).unwrap(); + assert_eq!(session.state, SessionState::Expired); + } + + #[test] + fn replay_handles_session_cancel() { + let modes = make_modes(); + let entries = vec![ + incoming_entry( + "m1", + "SessionStart", + "agent://orchestrator", + start_payload_bytes(), + 1000, + ), + internal_entry("SessionCancel", 5000), + ]; + + let session = replay_session("s1", &entries, &modes).unwrap(); + assert_eq!(session.state, SessionState::Expired); + } + + #[test] + fn replay_empty_log_returns_error() { + let modes = make_modes(); + let result = replay_session("s1", &[], &modes); + assert!(result.is_err()); + } + + #[test] + fn backward_compat_old_log_entry_without_new_fields() { + // Simulate deserializing a v2 log entry without session_id/mode/macp_version + let json = r#"{"message_id":"m1","received_at_ms":1000,"sender":"test","message_type":"Message","raw_payload":[],"entry_kind":"Incoming"}"#; + let entry: LogEntry = serde_json::from_str(json).unwrap(); + assert_eq!(entry.session_id, ""); + assert_eq!(entry.mode, ""); + assert_eq!(entry.macp_version, ""); + } +} diff --git a/src/runtime.rs b/src/runtime.rs index af1cbaa..acd9c19 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -10,12 +10,13 @@ use crate::mode::multi_round::MultiRoundMode; use crate::mode::proposal::ProposalMode; use crate::mode::quorum::QuorumMode; use crate::mode::task::TaskMode; -use crate::mode::{standard_mode_names, Mode, ModeResponse}; +use crate::mode::{standard_mode_names, Mode}; use crate::pb::{Envelope, SessionStartPayload}; use crate::registry::SessionRegistry; use crate::session::{ extract_ttl_ms, is_standard_mode, parse_session_start_payload, - validate_standard_session_start_payload, Session, SessionState, + validate_session_id_for_acceptance, validate_standard_session_start_payload, Session, + SessionState, }; use crate::storage::StorageBackend; use crate::stream_bus::SessionStreamBus; @@ -88,10 +89,18 @@ impl Runtime { message_type: env.message_type.clone(), raw_payload: env.payload.clone(), entry_kind: EntryKind::Incoming, + session_id: env.session_id.clone(), + mode: env.mode.clone(), + macp_version: env.macp_version.clone(), } } - fn make_internal_entry(message_type: &str, payload: &[u8]) -> LogEntry { + fn make_internal_entry( + message_type: &str, + payload: &[u8], + session_id: &str, + mode: &str, + ) -> LogEntry { LogEntry { message_id: String::new(), received_at_ms: Utc::now().timestamp_millis(), @@ -99,22 +108,9 @@ impl Runtime { message_type: message_type.into(), raw_payload: payload.to_vec(), entry_kind: EntryKind::Internal, - } - } - - fn apply_mode_response(session: &mut Session, response: ModeResponse) { - match response { - ModeResponse::NoOp => {} - ModeResponse::PersistState(state) => session.mode_state = state, - ModeResponse::Resolve(resolution) => { - session.state = SessionState::Resolved; - session.resolution = Some(resolution); - } - ModeResponse::PersistAndResolve { state, resolution } => { - session.mode_state = state; - session.state = SessionState::Resolved; - session.resolution = Some(resolution); - } + session_id: session_id.into(), + mode: mode.into(), + macp_version: "1.0".into(), } } @@ -127,18 +123,23 @@ impl Runtime { } } - async fn maybe_expire_session(&self, session_id: &str, session: &mut Session) -> bool { + async fn maybe_expire_session( + &self, + session_id: &str, + session: &mut Session, + ) -> Result { let now = Utc::now().timestamp_millis(); if session.state == SessionState::Open && now > session.ttl_expiry { - let entry = Self::make_internal_entry("TtlExpired", b""); - if let Err(e) = self.storage.append_log_entry(session_id, &entry).await { - eprintln!("warning: failed to persist TTL expiry log for '{session_id}': {e}"); - } + let entry = Self::make_internal_entry("TtlExpired", b"", session_id, &session.mode); + self.storage + .append_log_entry(session_id, &entry) + .await + .map_err(|_| MacpError::StorageFailed)?; self.log_store.append(session_id, entry).await; session.state = SessionState::Expired; - return true; + return Ok(true); } - false + Ok(false) } pub async fn process( @@ -161,6 +162,7 @@ impl Runtime { if env.mode.trim().is_empty() { return Err(MacpError::InvalidEnvelope); } + validate_session_id_for_acceptance(&env.session_id)?; let mode_name = env.mode.as_str(); let mode = self.modes.get(mode_name).ok_or(MacpError::UnknownMode)?; @@ -232,23 +234,15 @@ impl Runtime { let response = mode.on_session_start(&session, env)?; // 1. Create storage directory and write log entry (COMMIT POINT) - if let Err(e) = self.storage.create_session_storage(&env.session_id).await { - eprintln!( - "warning: failed to create session storage for '{}': {e}", - env.session_id - ); - } + self.storage + .create_session_storage(&env.session_id) + .await + .map_err(|_| MacpError::StorageFailed)?; let incoming_entry = Self::make_incoming_entry(env); - if let Err(e) = self - .storage + self.storage .append_log_entry(&env.session_id, &incoming_entry) .await - { - eprintln!( - "warning: failed to persist log entry for '{}': {e}", - env.session_id - ); - } + .map_err(|_| MacpError::StorageFailed)?; // 2. Update in-memory caches self.log_store.create_session_log(&env.session_id).await; @@ -256,7 +250,7 @@ impl Runtime { let mut session = session; session.seen_message_ids.insert(env.message_id.clone()); - Self::apply_mode_response(&mut session, response); + session.apply_mode_response(response); // Multi-round mode stores participants in its own state rather than in // the session-level field. This block back-fills session.participants so @@ -303,7 +297,7 @@ impl Runtime { return Err(MacpError::InvalidEnvelope); } - if self.maybe_expire_session(&env.session_id, session).await { + if self.maybe_expire_session(&env.session_id, session).await? { self.save_session_to_storage(session).await; return Err(MacpError::TtlExpired); } @@ -321,21 +315,15 @@ impl Runtime { // 1. COMMIT POINT: write log entry to disk let incoming_entry = Self::make_incoming_entry(env); - if let Err(e) = self - .storage + self.storage .append_log_entry(&env.session_id, &incoming_entry) .await - { - eprintln!( - "warning: failed to persist log entry for '{}': {e}", - env.session_id - ); - } + .map_err(|_| MacpError::StorageFailed)?; // 2. Update in-memory state self.log_store.append(&env.session_id, incoming_entry).await; session.seen_message_ids.insert(env.message_id.clone()); - Self::apply_mode_response(session, response); + session.apply_mode_response(response); let result_state = session.state.clone(); // 3. Best-effort session save @@ -362,7 +350,9 @@ impl Runtime { pub async fn get_session_checked(&self, session_id: &str) -> Option { let mut guard = self.registry.sessions.write().await; let changed = if let Some(session) = guard.get_mut(session_id) { - self.maybe_expire_session(session_id, session).await + self.maybe_expire_session(session_id, session) + .await + .unwrap_or(false) } else { return None; }; @@ -382,7 +372,7 @@ impl Runtime { let mut guard = self.registry.sessions.write().await; let session = guard.get_mut(session_id).ok_or(MacpError::UnknownSession)?; - let _ = self.maybe_expire_session(session_id, session).await; + self.maybe_expire_session(session_id, session).await?; if session.state == SessionState::Resolved || session.state == SessionState::Expired { let result_state = session.state.clone(); @@ -393,14 +383,16 @@ impl Runtime { }); } - let cancel_entry = Self::make_internal_entry("SessionCancel", reason.as_bytes()); - if let Err(e) = self - .storage + let cancel_entry = Self::make_internal_entry( + "SessionCancel", + reason.as_bytes(), + session_id, + &session.mode, + ); + self.storage .append_log_entry(session_id, &cancel_entry) .await - { - eprintln!("warning: failed to persist cancel log for '{session_id}': {e}"); - } + .map_err(|_| MacpError::StorageFailed)?; self.log_store.append(session_id, cancel_entry).await; session.state = SessionState::Expired; self.save_session_to_storage(session).await; @@ -419,6 +411,10 @@ mod tests { use crate::pb::{CommitmentPayload, SessionStartPayload}; use prost::Message; + fn new_sid() -> String { + uuid::Uuid::new_v4().as_hyphenated().to_string() + } + fn make_runtime() -> Runtime { let storage: Arc = Arc::new(crate::storage::MemoryBackend); let registry = Arc::new(SessionRegistry::new()); @@ -463,6 +459,7 @@ mod tests { #[tokio::test] async fn standard_session_start_is_strict() { let rt = make_runtime(); + let sid = new_sid(); let bad = SessionStartPayload { ttl_ms: 0, ..Default::default() @@ -474,7 +471,7 @@ mod tests { "macp.mode.decision.v1", "SessionStart", "m1", - "s1", + &sid, "agent://orchestrator", bad, ), @@ -491,13 +488,14 @@ mod tests { #[tokio::test] async fn empty_mode_is_rejected() { let rt = make_runtime(); + let sid = new_sid(); let err = rt .process( &env( "", "SessionStart", "m1", - "s1", + &sid, "agent://orchestrator", session_start(vec!["agent://fraud".into()]), ), @@ -511,12 +509,13 @@ mod tests { #[tokio::test] async fn rejected_messages_do_not_enter_dedup_state() { let rt = make_runtime(); + let sid = new_sid(); rt.process( &env( "macp.mode.decision.v1", "SessionStart", "m1", - "s1", + &sid, "agent://orchestrator", session_start(vec!["agent://fraud".into()]), ), @@ -531,7 +530,7 @@ mod tests { "macp.mode.decision.v1", "Proposal", "m2", - "s1", + &sid, "agent://fraud", b"not-protobuf".to_vec(), ), @@ -554,7 +553,7 @@ mod tests { "macp.mode.decision.v1", "Proposal", "m2", - "s1", + &sid, "agent://orchestrator", good, ), @@ -568,6 +567,7 @@ mod tests { #[tokio::test] async fn get_session_transitions_expired_sessions() { let rt = make_runtime(); + let sid = new_sid(); let payload = SessionStartPayload { intent: "intent".into(), participants: vec!["agent://fraud".into()], @@ -584,7 +584,7 @@ mod tests { "macp.mode.decision.v1", "SessionStart", "m1", - "s1", + &sid, "agent://orchestrator", payload, ), @@ -593,13 +593,14 @@ mod tests { .await .unwrap(); tokio::time::sleep(std::time::Duration::from_millis(5)).await; - let session = rt.get_session_checked("s1").await.unwrap(); + let session = rt.get_session_checked(&sid).await.unwrap(); assert_eq!(session.state, SessionState::Expired); } #[tokio::test] async fn experimental_mode_keeps_legacy_default_ttl() { let rt = make_runtime(); + let sid = new_sid(); let payload = SessionStartPayload { participants: vec!["creator".into(), "other".into()], ..Default::default() @@ -610,7 +611,7 @@ mod tests { "macp.mode.multi_round.v1", "SessionStart", "m1", - "s1", + &sid, "creator", payload, ), @@ -618,20 +619,21 @@ mod tests { ) .await .unwrap(); - let session = rt.get_session_checked("s1").await.unwrap(); + let session = rt.get_session_checked(&sid).await.unwrap(); assert!(session.ttl_expiry > session.started_at_unix_ms); } #[tokio::test] async fn duplicate_session_start_message_id_returns_duplicate() { let rt = make_runtime(); + let sid = new_sid(); let payload = session_start(vec!["agent://fraud".into()]); rt.process( &env( "macp.mode.decision.v1", "SessionStart", "m1", - "s1", + &sid, "agent://orchestrator", payload.clone(), ), @@ -646,7 +648,7 @@ mod tests { "macp.mode.decision.v1", "SessionStart", "m1", - "s1", + &sid, "agent://orchestrator", payload, ), @@ -660,13 +662,13 @@ mod tests { #[tokio::test] async fn non_start_mode_mismatch_rejected() { let rt = make_runtime(); - // Start a decision session + let sid = new_sid(); rt.process( &env( "macp.mode.decision.v1", "SessionStart", "m1", - "s1", + &sid, "agent://orchestrator", session_start(vec!["agent://fraud".into()]), ), @@ -675,7 +677,6 @@ mod tests { .await .unwrap(); - // Send a message with a different mode to the same session let proposal = ProposalPayload { proposal_id: "p1".into(), option: "step-up".into(), @@ -686,10 +687,10 @@ mod tests { let err = rt .process( &env( - "macp.mode.task.v1", // wrong mode + "macp.mode.task.v1", "Proposal", "m2", - "s1", + &sid, "agent://orchestrator", proposal, ), @@ -703,6 +704,7 @@ mod tests { #[tokio::test] async fn cancel_idempotent_on_already_expired() { let rt = make_runtime(); + let sid = new_sid(); let payload = SessionStartPayload { intent: "intent".into(), participants: vec!["agent://fraud".into()], @@ -719,7 +721,7 @@ mod tests { "macp.mode.decision.v1", "SessionStart", "m1", - "s1", + &sid, "agent://orchestrator", payload, ), @@ -728,20 +730,21 @@ mod tests { .await .unwrap(); tokio::time::sleep(std::time::Duration::from_millis(5)).await; - let result = rt.cancel_session("s1", "cleanup").await.unwrap(); + let result = rt.cancel_session(&sid, "cleanup").await.unwrap(); assert_eq!(result.session_state, SessionState::Expired); } #[tokio::test] async fn accepted_envelopes_are_published_in_order() { let rt = make_runtime(); - let mut events = rt.subscribe_session_stream("s1"); + let sid = new_sid(); + let mut events = rt.subscribe_session_stream(&sid); let start = env( "macp.mode.decision.v1", "SessionStart", "m1", - "s1", + &sid, "agent://orchestrator", session_start(vec!["agent://fraud".into()]), ); @@ -761,7 +764,7 @@ mod tests { "macp.mode.decision.v1", "Proposal", "m2", - "s1", + &sid, "agent://orchestrator", proposal, ); @@ -774,12 +777,13 @@ mod tests { #[tokio::test] async fn commitment_versions_are_carried_into_resolution() { let rt = make_runtime(); + let sid = new_sid(); rt.process( &env( "macp.mode.proposal.v1", "SessionStart", "m1", - "s1", + &sid, "agent://buyer", session_start(vec!["agent://buyer".into(), "agent://seller".into()]), ), @@ -801,7 +805,7 @@ mod tests { "macp.mode.proposal.v1", "Proposal", "m2", - "s1", + &sid, "agent://seller", proposal, ), @@ -819,7 +823,7 @@ mod tests { "macp.mode.proposal.v1", "Accept", "m3", - "s1", + &sid, "agent://seller", accept.clone(), ), @@ -832,7 +836,7 @@ mod tests { "macp.mode.proposal.v1", "Accept", "m4", - "s1", + &sid, "agent://buyer", accept, ), @@ -856,7 +860,7 @@ mod tests { "macp.mode.proposal.v1", "Commitment", "m5", - "s1", + &sid, "agent://buyer", commitment, ), @@ -870,13 +874,15 @@ mod tests { #[tokio::test] async fn max_open_sessions_enforced_under_write_lock() { let rt = make_runtime(); - // First session succeeds with limit=1 + let sid1 = new_sid(); + let sid2 = new_sid(); + let sid3 = new_sid(); rt.process( &env( "macp.mode.decision.v1", "SessionStart", "m1", - "s1", + &sid1, "agent://orchestrator", session_start(vec!["agent://fraud".into()]), ), @@ -885,14 +891,13 @@ mod tests { .await .unwrap(); - // Second session from the same sender should fail with RateLimited let err = rt .process( &env( "macp.mode.decision.v1", "SessionStart", "m2", - "s2", + &sid2, "agent://orchestrator", session_start(vec!["agent://fraud".into()]), ), @@ -902,13 +907,12 @@ mod tests { .unwrap_err(); assert!(matches!(err, MacpError::RateLimited)); - // A different sender should still succeed rt.process( &env( "macp.mode.decision.v1", "SessionStart", "m3", - "s3", + &sid3, "agent://other", session_start(vec!["agent://fraud".into()]), ), @@ -917,4 +921,221 @@ mod tests { .await .unwrap(); } + + #[tokio::test] + async fn weak_session_id_rejected() { + let rt = make_runtime(); + let err = rt + .process( + &env( + "macp.mode.decision.v1", + "SessionStart", + "m1", + "s1", + "agent://orchestrator", + session_start(vec!["agent://fraud".into()]), + ), + None, + ) + .await + .unwrap_err(); + assert_eq!(err.to_string(), "InvalidSessionId"); + } + + #[tokio::test] + async fn log_append_failure_rejects_session_start() { + use std::io; + struct FailingBackend; + #[async_trait::async_trait] + impl StorageBackend for FailingBackend { + async fn save_session(&self, _: &Session) -> io::Result<()> { + Ok(()) + } + async fn load_session(&self, _: &str) -> io::Result> { + Ok(None) + } + async fn load_all_sessions(&self) -> io::Result> { + Ok(vec![]) + } + async fn append_log_entry(&self, _: &str, _: &LogEntry) -> io::Result<()> { + Err(io::Error::other("disk full")) + } + async fn load_log(&self, _: &str) -> io::Result> { + Ok(vec![]) + } + async fn create_session_storage(&self, _: &str) -> io::Result<()> { + Ok(()) + } + } + + let storage: Arc = Arc::new(FailingBackend); + let registry = Arc::new(SessionRegistry::new()); + let log_store = Arc::new(LogStore::new()); + let rt = Runtime::new(storage, registry, log_store); + let sid = new_sid(); + + let err = rt + .process( + &env( + "macp.mode.decision.v1", + "SessionStart", + "m1", + &sid, + "agent://orchestrator", + session_start(vec!["agent://fraud".into()]), + ), + None, + ) + .await + .unwrap_err(); + assert_eq!(err.to_string(), "StorageFailed"); + } + + #[tokio::test] + async fn log_append_failure_rejects_in_session_message() { + use std::io; + use std::sync::atomic::{AtomicUsize, Ordering}; + + struct FailOnSecondAppend { + count: AtomicUsize, + } + #[async_trait::async_trait] + impl StorageBackend for FailOnSecondAppend { + async fn save_session(&self, _: &Session) -> io::Result<()> { + Ok(()) + } + async fn load_session(&self, _: &str) -> io::Result> { + Ok(None) + } + async fn load_all_sessions(&self) -> io::Result> { + Ok(vec![]) + } + async fn append_log_entry(&self, _: &str, _: &LogEntry) -> io::Result<()> { + let n = self.count.fetch_add(1, Ordering::SeqCst); + if n >= 1 { + Err(io::Error::other("disk full")) + } else { + Ok(()) + } + } + async fn load_log(&self, _: &str) -> io::Result> { + Ok(vec![]) + } + async fn create_session_storage(&self, _: &str) -> io::Result<()> { + Ok(()) + } + } + + let storage: Arc = Arc::new(FailOnSecondAppend { + count: AtomicUsize::new(0), + }); + let registry = Arc::new(SessionRegistry::new()); + let log_store = Arc::new(LogStore::new()); + let rt = Runtime::new(storage, registry, log_store); + let sid = new_sid(); + + // SessionStart succeeds (first append) + rt.process( + &env( + "macp.mode.decision.v1", + "SessionStart", + "m1", + &sid, + "agent://orchestrator", + session_start(vec!["agent://fraud".into()]), + ), + None, + ) + .await + .unwrap(); + + // Proposal fails (second append) + let proposal = ProposalPayload { + proposal_id: "p1".into(), + option: "step-up".into(), + rationale: "risk".into(), + supporting_data: vec![], + } + .encode_to_vec(); + let err = rt + .process( + &env( + "macp.mode.decision.v1", + "Proposal", + "m2", + &sid, + "agent://orchestrator", + proposal, + ), + None, + ) + .await + .unwrap_err(); + assert_eq!(err.to_string(), "StorageFailed"); + + // Verify the message was not added to dedup state + let session = rt.get_session_checked(&sid).await.unwrap(); + assert!(!session.seen_message_ids.contains("m2")); + } + + #[tokio::test] + async fn cancel_session_fails_if_log_append_fails() { + use std::io; + use std::sync::atomic::{AtomicUsize, Ordering}; + + struct FailOnSecondAppend { + count: AtomicUsize, + } + #[async_trait::async_trait] + impl StorageBackend for FailOnSecondAppend { + async fn save_session(&self, _: &Session) -> io::Result<()> { + Ok(()) + } + async fn load_session(&self, _: &str) -> io::Result> { + Ok(None) + } + async fn load_all_sessions(&self) -> io::Result> { + Ok(vec![]) + } + async fn append_log_entry(&self, _: &str, _: &LogEntry) -> io::Result<()> { + let n = self.count.fetch_add(1, Ordering::SeqCst); + if n >= 1 { + Err(io::Error::other("disk full")) + } else { + Ok(()) + } + } + async fn load_log(&self, _: &str) -> io::Result> { + Ok(vec![]) + } + async fn create_session_storage(&self, _: &str) -> io::Result<()> { + Ok(()) + } + } + + let storage: Arc = Arc::new(FailOnSecondAppend { + count: AtomicUsize::new(0), + }); + let registry = Arc::new(SessionRegistry::new()); + let log_store = Arc::new(LogStore::new()); + let rt = Runtime::new(storage, registry, log_store); + let sid = new_sid(); + + rt.process( + &env( + "macp.mode.decision.v1", + "SessionStart", + "m1", + &sid, + "agent://orchestrator", + session_start(vec!["agent://fraud".into()]), + ), + None, + ) + .await + .unwrap(); + + let err = rt.cancel_session(&sid, "test cancel").await.unwrap_err(); + assert_eq!(err.to_string(), "StorageFailed"); + } } diff --git a/src/server.rs b/src/server.rs index 3283770..7f2c9bc 100644 --- a/src/server.rs +++ b/src/server.rs @@ -40,6 +40,14 @@ impl MacpServer { if env.message_type.is_empty() || env.message_id.is_empty() { return Err(MacpError::InvalidEnvelope); } + if env.message_type == "Signal" { + if !env.session_id.is_empty() { + return Err(MacpError::InvalidEnvelope); + } + if !env.mode.trim().is_empty() { + return Err(MacpError::InvalidEnvelope); + } + } if env.message_type != "Signal" && env.session_id.is_empty() { return Err(MacpError::InvalidEnvelope); } @@ -134,7 +142,7 @@ impl MacpServer { Ok(identity) } - #[allow(clippy::result_large_err)] + #[allow(dead_code, clippy::result_large_err)] fn try_next_stream_event( receiver: &mut Option>, ) -> Result, Status> { @@ -158,6 +166,7 @@ impl MacpServer { } } + #[allow(dead_code)] async fn process_stream_request( &self, identity: &AuthIdentity, @@ -236,6 +245,7 @@ impl MacpServer { Ok(()) } + #[allow(dead_code)] fn build_stream_session_stream( &self, identity: AuthIdentity, @@ -360,6 +370,8 @@ impl MacpServer { MacpError::Forbidden => Status::permission_denied(err.to_string()), MacpError::PayloadTooLarge => Status::resource_exhausted(err.to_string()), MacpError::RateLimited => Status::resource_exhausted(err.to_string()), + MacpError::StorageFailed => Status::internal(err.to_string()), + MacpError::InvalidSessionId => Status::invalid_argument(err.to_string()), _ => Status::failed_precondition(err.to_string()), } } @@ -389,7 +401,7 @@ impl MacpRuntimeService for MacpServer { website_url: String::new(), }), capabilities: Some(Capabilities { - sessions: Some(SessionsCapability { stream: true }), + sessions: Some(SessionsCapability { stream: false }), cancellation: Some(CancellationCapability { cancel_session: true, }), @@ -576,16 +588,11 @@ impl MacpRuntimeService for MacpServer { async fn stream_session( &self, - request: Request>, + _request: Request>, ) -> Result, Status> { - let identity = self - .security - .authenticate_metadata(request.metadata()) - .map_err(Self::status_from_error)?; - Ok(Response::new(self.build_stream_session_stream( - identity, - request.into_inner(), - ))) + Err(Status::unimplemented( + "StreamSession is disabled in the unary-first freeze profile", + )) } type WatchModeRegistryStream = std::pin::Pin< @@ -622,6 +629,10 @@ mod tests { use macp_runtime::registry::SessionRegistry; use prost::Message; + fn new_sid() -> String { + uuid::Uuid::new_v4().as_hyphenated().to_string() + } + fn make_server() -> (MacpServer, Arc) { let storage: Arc = Arc::new(macp_runtime::storage::MemoryBackend); @@ -663,6 +674,7 @@ mod tests { #[tokio::test] async fn sender_is_derived_from_authenticated_metadata() { let (server, runtime) = make_server(); + let sid = new_sid(); let ack = do_send( &server, "agent://orchestrator", @@ -671,7 +683,7 @@ mod tests { mode: "macp.mode.decision.v1".into(), message_type: "SessionStart".into(), message_id: "m1".into(), - session_id: "s1".into(), + session_id: sid.clone(), sender: String::new(), timestamp_unix_ms: Utc::now().timestamp_millis(), payload: start_payload(), @@ -679,13 +691,14 @@ mod tests { ) .await; assert!(ack.ok); - let session = runtime.get_session_checked("s1").await.unwrap(); + let session = runtime.get_session_checked(&sid).await.unwrap(); assert_eq!(session.initiator_sender, "agent://orchestrator"); } #[tokio::test] async fn spoofed_sender_is_rejected() { let (server, _) = make_server(); + let sid = new_sid(); let ack = do_send( &server, "agent://orchestrator", @@ -694,7 +707,7 @@ mod tests { mode: "macp.mode.decision.v1".into(), message_type: "SessionStart".into(), message_id: "m1".into(), - session_id: "s1".into(), + session_id: sid, sender: "agent://spoof".into(), timestamp_unix_ms: Utc::now().timestamp_millis(), payload: start_payload(), @@ -708,6 +721,7 @@ mod tests { #[tokio::test] async fn get_session_requires_session_membership() { let (server, _) = make_server(); + let sid = new_sid(); let ack = do_send( &server, "agent://orchestrator", @@ -716,7 +730,7 @@ mod tests { mode: "macp.mode.decision.v1".into(), message_type: "SessionStart".into(), message_id: "m1".into(), - session_id: "s1".into(), + session_id: sid.clone(), sender: String::new(), timestamp_unix_ms: Utc::now().timestamp_millis(), payload: start_payload(), @@ -725,9 +739,7 @@ mod tests { .await; assert!(ack.ok); - let mut req = Request::new(GetSessionRequest { - session_id: "s1".into(), - }); + let mut req = Request::new(GetSessionRequest { session_id: sid }); req.metadata_mut() .insert("x-macp-agent-id", "agent://outsider".parse().unwrap()); let err = server.get_session(req).await.unwrap_err(); @@ -748,13 +760,14 @@ mod tests { use tokio_stream::{iter, StreamExt}; let (server, _) = make_server(); + let sid = new_sid(); let requests = iter(vec![Ok(StreamSessionRequest { envelope: Some(Envelope { macp_version: "1.0".into(), mode: "macp.mode.decision.v1".into(), message_type: "SessionStart".into(), message_id: "m1".into(), - session_id: "s1".into(), + session_id: sid.clone(), sender: String::new(), timestamp_unix_ms: Utc::now().timestamp_millis(), payload: start_payload(), @@ -776,6 +789,8 @@ mod tests { use tokio_stream::{iter, StreamExt}; let (server, _) = make_server(); + let sid1 = new_sid(); + let sid2 = new_sid(); let requests = iter(vec![ Ok(StreamSessionRequest { envelope: Some(Envelope { @@ -783,7 +798,7 @@ mod tests { mode: "macp.mode.decision.v1".into(), message_type: "SessionStart".into(), message_id: "m1".into(), - session_id: "s1".into(), + session_id: sid1.clone(), sender: String::new(), timestamp_unix_ms: Utc::now().timestamp_millis(), payload: start_payload(), @@ -795,7 +810,7 @@ mod tests { mode: "macp.mode.decision.v1".into(), message_type: "SessionStart".into(), message_id: "m2".into(), - session_id: "s2".into(), + session_id: sid2, sender: String::new(), timestamp_unix_ms: Utc::now().timestamp_millis(), payload: start_payload(), @@ -807,7 +822,7 @@ mod tests { server.build_stream_session_stream(stream_identity("agent://orchestrator"), requests); let first = stream.next().await.unwrap().unwrap(); - assert_eq!(first.envelope.unwrap().session_id, "s1"); + assert_eq!(first.envelope.unwrap().session_id, sid1); let err = stream.next().await.unwrap().unwrap_err(); assert_eq!(err.code(), tonic::Code::FailedPrecondition); } @@ -836,6 +851,7 @@ mod tests { #[tokio::test] async fn get_session_returns_metadata() { let (server, _) = make_server(); + let sid = new_sid(); let ack = do_send( &server, "agent://orchestrator", @@ -844,7 +860,7 @@ mod tests { mode: "macp.mode.decision.v1".into(), message_type: "SessionStart".into(), message_id: "m1".into(), - session_id: "s1".into(), + session_id: sid.clone(), sender: String::new(), timestamp_unix_ms: Utc::now().timestamp_millis(), payload: start_payload(), @@ -854,13 +870,13 @@ mod tests { assert!(ack.ok); let mut req = Request::new(GetSessionRequest { - session_id: "s1".into(), + session_id: sid.clone(), }); req.metadata_mut() .insert("x-macp-agent-id", "agent://orchestrator".parse().unwrap()); let resp = server.get_session(req).await.unwrap(); let meta = resp.into_inner().metadata.unwrap(); - assert_eq!(meta.session_id, "s1"); + assert_eq!(meta.session_id, sid); assert_eq!(meta.mode, "macp.mode.decision.v1"); assert_eq!(meta.mode_version, "1.0.0"); assert_eq!(meta.configuration_version, "cfg-1"); @@ -869,6 +885,7 @@ mod tests { #[tokio::test] async fn cancel_session_transitions_to_expired() { let (server, _) = make_server(); + let sid = new_sid(); let ack = do_send( &server, "agent://orchestrator", @@ -877,7 +894,7 @@ mod tests { mode: "macp.mode.decision.v1".into(), message_type: "SessionStart".into(), message_id: "m1".into(), - session_id: "s1".into(), + session_id: sid.clone(), sender: String::new(), timestamp_unix_ms: Utc::now().timestamp_millis(), payload: start_payload(), @@ -887,7 +904,7 @@ mod tests { assert!(ack.ok); let mut req = Request::new(CancelSessionRequest { - session_id: "s1".into(), + session_id: sid, reason: "no longer needed".into(), }); req.metadata_mut() @@ -901,6 +918,7 @@ mod tests { #[tokio::test] async fn participant_cannot_cancel_session() { let (server, _) = make_server(); + let sid = new_sid(); let ack = do_send( &server, "agent://orchestrator", @@ -909,7 +927,7 @@ mod tests { mode: "macp.mode.decision.v1".into(), message_type: "SessionStart".into(), message_id: "m1".into(), - session_id: "s1".into(), + session_id: sid.clone(), sender: String::new(), timestamp_unix_ms: Utc::now().timestamp_millis(), payload: start_payload(), @@ -918,9 +936,8 @@ mod tests { .await; assert!(ack.ok); - // Participant (not initiator) tries to cancel let mut req = Request::new(CancelSessionRequest { - session_id: "s1".into(), + session_id: sid, reason: "I want to cancel".into(), }); req.metadata_mut() @@ -932,7 +949,6 @@ mod tests { #[tokio::test] async fn cancel_session_unknown_session_returns_error() { let (server, _) = make_server(); - // authenticate_session_access will fail with NotFound for unknown session let mut req = Request::new(CancelSessionRequest { session_id: "nonexistent".into(), reason: "test".into(), @@ -942,4 +958,88 @@ mod tests { let err = server.cancel_session(req).await.unwrap_err(); assert_eq!(err.code(), tonic::Code::NotFound); } + + #[tokio::test] + async fn ambient_signal_accepted() { + let (server, _) = make_server(); + let ack = do_send( + &server, + "agent://orchestrator", + Envelope { + macp_version: "1.0".into(), + mode: String::new(), + message_type: "Signal".into(), + message_id: "sig-1".into(), + session_id: String::new(), + sender: String::new(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: vec![], + }, + ) + .await; + assert!(ack.ok); + } + + #[tokio::test] + async fn signal_with_session_id_rejected() { + let (server, _) = make_server(); + let ack = do_send( + &server, + "agent://orchestrator", + Envelope { + macp_version: "1.0".into(), + mode: String::new(), + message_type: "Signal".into(), + message_id: "sig-2".into(), + session_id: "some-session".into(), + sender: String::new(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: vec![], + }, + ) + .await; + assert!(!ack.ok); + assert_eq!(ack.error.as_ref().unwrap().code, "INVALID_ENVELOPE"); + } + + #[tokio::test] + async fn signal_with_mode_rejected() { + let (server, _) = make_server(); + let ack = do_send( + &server, + "agent://orchestrator", + Envelope { + macp_version: "1.0".into(), + mode: "macp.mode.decision.v1".into(), + message_type: "Signal".into(), + message_id: "sig-3".into(), + session_id: String::new(), + sender: String::new(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: vec![], + }, + ) + .await; + assert!(!ack.ok); + assert_eq!(ack.error.as_ref().unwrap().code, "INVALID_ENVELOPE"); + } + + // StreamSession returns Unimplemented is verified by manifest_advertises_stream_false + // and the implementation returning Status::unimplemented. Cannot construct tonic::Streaming + // in a unit test, so this is tested at the integration level. + + #[tokio::test] + async fn manifest_advertises_stream_false() { + let (server, _) = make_server(); + let resp = server + .initialize(Request::new(InitializeRequest { + supported_protocol_versions: vec!["1.0".into()], + client_info: None, + capabilities: None, + })) + .await + .unwrap(); + let caps = resp.into_inner().capabilities.unwrap(); + assert!(!caps.sessions.unwrap().stream); + } } diff --git a/src/session.rs b/src/session.rs index 8bfa5ca..0e3f6f0 100644 --- a/src/session.rs +++ b/src/session.rs @@ -1,4 +1,5 @@ use crate::error::MacpError; +use crate::mode::ModeResponse; use crate::pb::SessionStartPayload; use prost::Message; use std::collections::HashSet; @@ -33,6 +34,24 @@ pub struct Session { pub initiator_sender: String, } +impl Session { + pub fn apply_mode_response(&mut self, response: ModeResponse) { + match response { + ModeResponse::NoOp => {} + ModeResponse::PersistState(state) => self.mode_state = state, + ModeResponse::Resolve(resolution) => { + self.state = SessionState::Resolved; + self.resolution = Some(resolution); + } + ModeResponse::PersistAndResolve { state, resolution } => { + self.mode_state = state; + self.state = SessionState::Resolved; + self.resolution = Some(resolution); + } + } + } +} + pub fn is_standard_mode(mode: &str) -> bool { matches!( mode, @@ -90,6 +109,41 @@ pub fn validate_standard_session_start_payload( Ok(()) } +/// Validate that a session ID meets the acceptance policy. +/// +/// Accepts: +/// - UUID v4/v7 in hyphenated lowercase canonical form (36 chars) +/// - base64url tokens of 22+ chars (`[A-Za-z0-9_-]`) +/// +/// Rejects everything else (empty, short human-readable, uppercase UUID, etc.). +pub fn validate_session_id_for_acceptance(session_id: &str) -> Result<(), MacpError> { + if session_id.is_empty() { + return Err(MacpError::InvalidSessionId); + } + + // Try UUID parse: must be valid UUID and canonical lowercase hyphenated form + if session_id.len() == 36 && session_id.contains('-') { + if let Ok(parsed) = uuid::Uuid::parse_str(session_id) { + // Verify it's the canonical lowercase hyphenated representation + if parsed.as_hyphenated().to_string() == session_id { + return Ok(()); + } + } + return Err(MacpError::InvalidSessionId); + } + + // Try base64url: at least 22 chars, only [A-Za-z0-9_-] + if session_id.len() >= 22 + && session_id + .chars() + .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-') + { + return Ok(()); + } + + Err(MacpError::InvalidSessionId) +} + #[cfg(test)] mod tests { use super::*; @@ -191,4 +245,68 @@ mod tests { let payload = SessionStartPayload::default(); validate_standard_session_start_payload("macp.mode.multi_round.v1", &payload).unwrap(); } + + #[test] + fn valid_uuid_v4_accepted() { + let id = uuid::Uuid::new_v4().as_hyphenated().to_string(); + validate_session_id_for_acceptance(&id).unwrap(); + } + + #[test] + fn valid_base64url_accepted() { + // 22-char base64url token + validate_session_id_for_acceptance("abcdefghijklmnopqrstuv").unwrap(); + // longer base64url with underscore and hyphen + validate_session_id_for_acceptance("abc-def_ghi-jkl_mno-pqr").unwrap(); + } + + #[test] + fn empty_id_rejected() { + assert_eq!( + validate_session_id_for_acceptance("") + .unwrap_err() + .to_string(), + "InvalidSessionId" + ); + } + + #[test] + fn short_weak_id_rejected() { + assert_eq!( + validate_session_id_for_acceptance("s1") + .unwrap_err() + .to_string(), + "InvalidSessionId" + ); + assert_eq!( + validate_session_id_for_acceptance("decision-demo-1") + .unwrap_err() + .to_string(), + "InvalidSessionId" + ); + } + + #[test] + fn uppercase_uuid_rejected() { + let id = uuid::Uuid::new_v4() + .as_hyphenated() + .to_string() + .to_uppercase(); + assert_eq!( + validate_session_id_for_acceptance(&id) + .unwrap_err() + .to_string(), + "InvalidSessionId" + ); + } + + #[test] + fn base64url_too_short_rejected() { + assert_eq!( + validate_session_id_for_acceptance("abcdefghij") + .unwrap_err() + .to_string(), + "InvalidSessionId" + ); + } } diff --git a/src/storage.rs b/src/storage.rs index f411cda..3729d1b 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -6,7 +6,7 @@ use std::fs; use std::io::{self, BufRead, Write}; use std::path::{Path, PathBuf}; -const STORAGE_VERSION: u32 = 2; +const STORAGE_VERSION: u32 = 3; // --------------------------------------------------------------------------- // StorageBackend trait @@ -222,8 +222,24 @@ pub fn migrate_if_needed(base_dir: &Path) -> io::Result<()> { let legacy_sessions = base_dir.join("sessions.json"); let legacy_logs = base_dir.join("logs.json"); - // Already migrated or fresh install - if sessions_dir.exists() || (!legacy_sessions.exists() && !legacy_logs.exists()) { + // Already at current version or fresh install (no legacy files, sessions dir exists) + let current_version = read_storage_version(base_dir)?; + if sessions_dir.exists() && !legacy_sessions.exists() && !legacy_logs.exists() { + // v2 → v3: no-op data migration, just bump version. New LogEntry fields + // use #[serde(default)] so existing v2 JSONL lines deserialize fine. + if current_version.unwrap_or(0) < STORAGE_VERSION { + write_storage_version(base_dir)?; + } + return Ok(()); + } + + if !legacy_sessions.exists() && !legacy_logs.exists() && !sessions_dir.exists() { + write_storage_version(base_dir)?; + return Ok(()); + } + + // Already migrated from v1 + if sessions_dir.exists() && !legacy_sessions.exists() && !legacy_logs.exists() { write_storage_version(base_dir)?; return Ok(()); } @@ -407,6 +423,9 @@ mod tests { message_type: "Message".into(), raw_payload: vec![], entry_kind: EntryKind::Incoming, + session_id: String::new(), + mode: String::new(), + macp_version: String::new(), } } @@ -564,7 +583,7 @@ mod tests { assert!(!base.join("logs.json").exists()); // Verify storage version - assert_eq!(read_storage_version(base).unwrap(), Some(2)); + assert_eq!(read_storage_version(base).unwrap(), Some(STORAGE_VERSION)); // Verify data is loadable via FileBackend let backend = FileBackend::new(base.to_path_buf()).unwrap();