Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ serde_json = "1"
tokio-stream = "0.1"
futures-core = "0.3"
async-stream = "0.3"
async-trait = "0.1"

[dev-dependencies]
tempfile = "3"
Expand Down
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ This runtime implements the current MACP core/service surface, the five standard
- payload size limits
- rate limiting
- **Durable local persistence**
- session registry snapshots
- accepted-history log snapshots
- dedup state survives restart
- per-session append-only log files and session snapshots via `FileBackend`
- crash recovery with dedup state reconciliation
- atomic writes (tmp file + rename) prevent partial-write corruption
- **Unary freeze profile**
- `StreamSession` is intentionally disabled in this profile
- `WatchModeRegistry` and `WatchRoots` remain unimplemented
Expand Down Expand Up @@ -192,7 +192,8 @@ runtime/
│ ├── security.rs # auth config, sender derivation, rate limiting
│ ├── session.rs # canonical SessionStart validation and session model
│ ├── registry.rs # session store with optional persistence
│ ├── log_store.rs # accepted-history log store with optional persistence
│ ├── log_store.rs # in-memory accepted-history log cache
│ ├── storage.rs # storage backend trait, FileBackend persistence, crash recovery
│ ├── mode/ # mode implementations
│ └── bin/ # local development example clients
├── docs/
Expand Down
7 changes: 4 additions & 3 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,11 @@ In dev mode, example clients attach `x-macp-agent-id` metadata and may use plain

## Persistence model

By default the runtime persists snapshots under `.macp-data/`:
By default the runtime persists state under `.macp-data/` via `FileBackend`:

- `sessions.json`
- `logs.json`
- per-session directories containing `session.json` and append-only `log.jsonl`
- crash recovery reconciles dedup state from the log on startup
- atomic writes (tmp file + rename) prevent partial-write corruption

If a snapshot file contains corrupt or incompatible JSON, the runtime logs a warning to stderr and starts with empty state.

Expand Down
23 changes: 11 additions & 12 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,34 +43,33 @@ Implemented modes:

## 4. Storage layer

### Storage backend (`src/storage.rs`)

Provides the `StorageBackend` trait with two implementations:

- `FileBackend` — per-session directories containing `session.json` and append-only `log.jsonl`, with crash recovery and atomic writes
- `MemoryBackend` — no-op backend for `MACP_MEMORY_ONLY=1`

### Session registry (`src/registry.rs`)

Stores:
In-memory cache of all sessions, loaded from `FileBackend` on startup. Stores:

- session metadata
- bound versions
- participants
- dedup state
- current session state

Supports:

- in-memory mode
- file-backed snapshot persistence

Both stores log a warning and fall back to empty state if snapshot deserialization fails.
Supports optional file-backed snapshot persistence for backward compatibility.

### Log store (`src/log_store.rs`)

Stores:
In-memory cache of accepted-history logs. Stores:

- accepted incoming envelopes
- runtime-generated internal events such as TTL expiry and session cancellation

Supports:

- in-memory mode
- file-backed snapshot persistence
On-disk persistence is handled by `FileBackend`, not by LogStore.

## 5. Security layer (`src/security.rs`)

Expand Down
9 changes: 5 additions & 4 deletions docs/protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,13 @@ Local development profile:

## Persistence profile

By default the runtime persists snapshots of:
By default the runtime persists state via `FileBackend` under `MACP_DATA_DIR`:

- session registry
- accepted-history log store
- per-session `session.json` and append-only `log.jsonl` files
- crash recovery reconciles dedup state from the log on startup
- atomic writes (tmp file + rename) prevent partial-write corruption

This gives restart recovery for session metadata, dedup state, and accepted-history inspection. Corrupt or incompatible snapshot files produce a warning on stderr; the runtime falls back to empty state instead of refusing to start.
This gives restart recovery for session metadata, dedup state, and accepted-history inspection. Corrupt or incompatible files produce a warning on stderr; the runtime falls back to empty state instead of refusing to start.

## Commitment validation

Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ pub mod mode;
pub mod registry;
pub mod runtime;
pub mod session;
pub mod storage;

pub mod security;
65 changes: 0 additions & 65 deletions src/log_store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use std::collections::HashMap;
use std::fs;
use std::path::{Path, PathBuf};
use tokio::sync::RwLock;

#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
Expand All @@ -21,7 +19,6 @@ pub struct LogEntry {

pub struct LogStore {
logs: RwLock<HashMap<String, Vec<LogEntry>>>,
persistence_path: Option<PathBuf>,
}

impl Default for LogStore {
Expand All @@ -34,60 +31,17 @@ impl LogStore {
pub fn new() -> Self {
Self {
logs: RwLock::new(HashMap::new()),
persistence_path: None,
}
}

pub fn with_persistence<P: AsRef<Path>>(dir: P) -> std::io::Result<Self> {
let dir = dir.as_ref().to_path_buf();
fs::create_dir_all(&dir)?;
let path = dir.join("logs.json");
let logs = if path.exists() {
match serde_json::from_slice(&fs::read(&path)?) {
Ok(v) => v,
Err(e) => {
eprintln!("warning: failed to deserialize logs from {}: {e}; starting with empty state", path.display());
HashMap::new()
}
}
} else {
HashMap::new()
};
Ok(Self {
logs: RwLock::new(logs),
persistence_path: Some(path),
})
}

fn persist_map(path: &Path, logs: &HashMap<String, Vec<LogEntry>>) -> std::io::Result<()> {
let bytes = serde_json::to_vec_pretty(logs)?;
let tmp_path = path.with_extension("json.tmp");
fs::write(&tmp_path, bytes)?;
fs::rename(&tmp_path, path)
}

async fn persist_locked(&self, logs: &HashMap<String, Vec<LogEntry>>) -> std::io::Result<()> {
if let Some(path) = &self.persistence_path {
Self::persist_map(path, logs)?;
}
Ok(())
}

pub async fn persist_snapshot(&self) -> std::io::Result<()> {
let guard = self.logs.read().await;
self.persist_locked(&guard).await
}

pub async fn create_session_log(&self, session_id: &str) {
let mut guard = self.logs.write().await;
guard.entry(session_id.to_string()).or_default();
let _ = self.persist_locked(&guard).await;
}

pub async fn append(&self, session_id: &str, entry: LogEntry) {
let mut guard = self.logs.write().await;
guard.entry(session_id.to_string()).or_default().push(entry);
let _ = self.persist_locked(&guard).await;
}

pub async fn get_log(&self, session_id: &str) -> Option<Vec<LogEntry>> {
Expand All @@ -99,7 +53,6 @@ impl LogStore {
#[cfg(test)]
mod tests {
use super::*;
use std::time::{SystemTime, UNIX_EPOCH};

fn entry(id: &str, kind: EntryKind) -> LogEntry {
LogEntry {
Expand All @@ -124,22 +77,4 @@ mod tests {
assert_eq!(log[0].message_id, "m1");
assert_eq!(log[1].message_id, "m2");
}

#[tokio::test]
async fn persistent_log_store_round_trip() {
let base = std::env::temp_dir().join(format!(
"macp-log-test-{}",
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos()
));
let store = LogStore::with_persistence(&base).unwrap();
store.append("s1", entry("m1", EntryKind::Incoming)).await;

let reopened = LogStore::with_persistence(&base).unwrap();
let log = reopened.get_log("s1").await.unwrap();
assert_eq!(log.len(), 1);
assert_eq!(log[0].message_id, "m1");
}
}
78 changes: 60 additions & 18 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ use macp_runtime::pb;
use macp_runtime::registry::SessionRegistry;
use macp_runtime::runtime::Runtime;
use macp_runtime::security::SecurityLayer;
use macp_runtime::storage::{
cleanup_temp_files, migrate_if_needed, recover_session, FileBackend, MemoryBackend,
StorageBackend,
};
use server::MacpServer;
use std::path::PathBuf;
use std::sync::Arc;
Expand All @@ -20,21 +24,55 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let data_dir =
PathBuf::from(std::env::var("MACP_DATA_DIR").unwrap_or_else(|_| ".macp-data".into()));

let registry = Arc::new(if memory_only {
SessionRegistry::new()
let storage: Arc<dyn StorageBackend> = if memory_only {
Arc::new(MemoryBackend)
} else {
SessionRegistry::with_persistence(&data_dir)?
});
let log_store = Arc::new(if memory_only {
LogStore::new()
} else {
LogStore::with_persistence(&data_dir)?
});
std::fs::create_dir_all(&data_dir)?;
migrate_if_needed(&data_dir)?;
cleanup_temp_files(&data_dir);
Arc::new(FileBackend::new(data_dir.clone())?)
};

// Load persisted state into in-memory caches
let registry = Arc::new(SessionRegistry::new());
let log_store = Arc::new(LogStore::new());

if !memory_only {
let mut sessions = storage.load_all_sessions().await?;
for session in &mut sessions {
let log_entries = storage.load_log(&session.session_id).await?;

// Crash recovery: reconcile dedup state from log
recover_session(session, &log_entries);

// Populate in-memory log store
log_store.create_session_log(&session.session_id).await;
for entry in &log_entries {
log_store.append(&session.session_id, entry.clone()).await;
}

let registry_ref = Arc::clone(&registry);
let log_store_ref = Arc::clone(&log_store);
// Persist recovered session state if it changed
if let Err(e) = storage.save_session(session).await {
eprintln!(
"warning: failed to persist recovered session '{}': {e}",
session.session_id
);
}

let runtime = Arc::new(Runtime::new(registry, log_store));
registry
.insert_session_for_test(session.session_id.clone(), session.clone())
.await;
}
if !sessions.is_empty() {
println!("Loaded {} sessions from storage.", sessions.len());
}
}

let runtime = Arc::new(Runtime::new(
Arc::clone(&storage),
Arc::clone(&registry),
Arc::clone(&log_store),
));
let security = SecurityLayer::from_env()?;
let svc = MacpServer::new(runtime, security);

Expand Down Expand Up @@ -78,12 +116,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}

// Persist final state on shutdown
if let Err(e) = registry_ref.persist_snapshot().await {
eprintln!("warning: failed to persist session registry: {}", e);
}
if let Err(e) = log_store_ref.persist_snapshot().await {
eprintln!("warning: failed to persist log store: {}", e);
// Final snapshot: persist all sessions to storage
if !memory_only {
for session in registry.get_all_sessions().await {
if let Err(e) = storage.save_session(&session).await {
eprintln!(
"warning: failed to persist session '{}' on shutdown: {e}",
session.session_id
);
}
}
}
println!("State persisted. Goodbye.");

Expand Down
1 change: 1 addition & 0 deletions src/mode/decision.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ mod tests {
session_id: "s1".into(),
state: SessionState::Open,
ttl_expiry: i64::MAX,
ttl_ms: 60_000,
started_at_unix_ms: 0,
resolution: None,
mode: "macp.mode.decision.v1".into(),
Expand Down
19 changes: 19 additions & 0 deletions src/mode/handoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ impl Mode for HandoffMode {
if session.participants.len() < 2 {
return Err(MacpError::InvalidPayload);
}
if !session
.participants
.iter()
.any(|p| p == &session.initiator_sender)
{
return Err(MacpError::InvalidPayload);
}
Ok(ModeResponse::PersistState(Self::encode_state(
&HandoffState::default(),
)))
Expand Down Expand Up @@ -214,6 +221,7 @@ mod tests {
session_id: "s1".into(),
state: SessionState::Open,
ttl_expiry: i64::MAX,
ttl_ms: 60_000,
started_at_unix_ms: 0,
resolution: None,
mode: "macp.mode.handoff.v1".into(),
Expand Down Expand Up @@ -330,6 +338,17 @@ mod tests {
assert_eq!(err.to_string(), "InvalidPayload");
}

#[test]
fn session_start_rejects_when_initiator_not_participant() {
let mode = HandoffMode;
let mut session = base_session();
session.participants = vec!["target".into(), "other".into()]; // owner not included
let err = mode
.on_session_start(&session, &env("owner", "SessionStart", vec![]))
.unwrap_err();
assert_eq!(err.to_string(), "InvalidPayload");
}

// --- HandoffOffer ---

#[test]
Expand Down
1 change: 1 addition & 0 deletions src/mode/multi_round.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ mod tests {
session_id: "s1".into(),
state: SessionState::Open,
ttl_expiry: i64::MAX,
ttl_ms: 60_000,
started_at_unix_ms: 0,
resolution: None,
mode: "multi_round".into(),
Expand Down
1 change: 1 addition & 0 deletions src/mode/proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ mod tests {
session_id: "s1".into(),
state: SessionState::Open,
ttl_expiry: i64::MAX,
ttl_ms: 60_000,
started_at_unix_ms: 0,
resolution: None,
mode: "macp.mode.proposal.v1".into(),
Expand Down
1 change: 1 addition & 0 deletions src/mode/quorum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ mod tests {
session_id: "s1".into(),
state: SessionState::Open,
ttl_expiry: i64::MAX,
ttl_ms: 60_000,
started_at_unix_ms: 0,
resolution: None,
mode: "macp.mode.quorum.v1".into(),
Expand Down
Loading
Loading