Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ name = "macp-runtime"
version = "0.4.0"
edition = "2021"

[features]
default = []
rocksdb-backend = ["dep:rocksdb"]
redis-backend = ["dep:redis"]

[dependencies]
tokio = { version = "1", features = ["full"] }
tonic = { version = "0.14", features = ["transport", "tls-ring"] }
Expand All @@ -20,6 +25,8 @@ async-trait = "0.1"
uuid = { version = "1", features = ["v4"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
rocksdb = { version = "0.22", optional = true }
redis = { version = "0.27", features = ["tokio-comp", "aio"], optional = true }

[dev-dependencies]
tempfile = "3"
Expand Down
1 change: 1 addition & 0 deletions src/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use tokio::sync::RwLock;
pub enum EntryKind {
Incoming,
Internal,
Checkpoint,
}

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
Expand Down
142 changes: 79 additions & 63 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,36 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
PathBuf::from(std::env::var("MACP_DATA_DIR").unwrap_or_else(|_| ".macp-data".into()));
let strict_recovery = std::env::var("MACP_STRICT_RECOVERY").ok().as_deref() == Some("1");

let backend_name = std::env::var("MACP_STORAGE_BACKEND").unwrap_or_else(|_| "file".into());
let storage: Arc<dyn StorageBackend> = if memory_only {
Arc::new(MemoryBackend)
} else {
std::fs::create_dir_all(&data_dir)?;
migrate_if_needed(&data_dir)?;
cleanup_temp_files(&data_dir);
Arc::new(FileBackend::new(data_dir.clone())?)
match backend_name.as_str() {
"file" => {
std::fs::create_dir_all(&data_dir)?;
migrate_if_needed(&data_dir)?;
cleanup_temp_files(&data_dir);
Arc::new(FileBackend::new(data_dir.clone())?)
}
#[cfg(feature = "rocksdb-backend")]
"rocksdb" => {
let path = std::env::var("MACP_ROCKSDB_PATH")
.unwrap_or_else(|_| data_dir.join("rocksdb").to_string_lossy().to_string());
Arc::new(macp_runtime::storage::RocksDbBackend::open(&path)?)
}
#[cfg(feature = "redis-backend")]
"redis" => {
let url = std::env::var("MACP_REDIS_URL")
.unwrap_or_else(|_| "redis://127.0.0.1:6379".into());
Arc::new(macp_runtime::storage::RedisBackend::connect(&url, "macp").await?)
}
other => {
return Err(format!(
"unknown storage backend: {other}. Valid: file, rocksdb, redis"
)
.into());
}
}
};

// Load persisted state into in-memory caches
Expand All @@ -49,76 +72,69 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mode_registry = Arc::new(ModeRegistry::build_default());

if !memory_only {
// Enumerate session directories and replay from logs
let sessions_dir = data_dir.join("sessions");
// Replay sessions from logs
let session_ids = storage.list_session_ids().await?;
let mut recovered = 0usize;
if tokio::fs::metadata(&sessions_dir).await.is_ok() {
let mut entries = tokio::fs::read_dir(&sessions_dir).await?;
while let Some(entry) = entries.next_entry().await? {
if !entry.file_type().await?.is_dir() {
continue;
for session_id in session_ids {
let log_entries = match storage.load_log(&session_id).await {
Ok(entries) => entries,
Err(e) if strict_recovery => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("strict recovery: failed to load log for {session_id}: {e}"),
)
.into());
}
let session_id = entry.file_name().to_string_lossy().to_string();
let log_entries = match storage.load_log(&session_id).await {
Ok(entries) => entries,
Err(e) if strict_recovery => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("strict recovery: failed to load log for {session_id}: {e}"),
)
.into());
}
Err(e) => {
tracing::warn!(
session_id = %session_id,
error = %e,
"failed to load session log; skipping"
);
continue;
}
};
if log_entries.is_empty() {
Err(e) => {
tracing::warn!(
session_id = %session_id,
error = %e,
"failed to load session log; skipping"
);
continue;
}
};
if log_entries.is_empty() {
continue;
}

match replay_session(&session_id, &log_entries, &mode_registry) {
Ok(session) => {
if let Err(e) = storage.save_session(&session).await {
if strict_recovery {
return Err(io::Error::other(format!(
"strict recovery: failed to persist recovered session {session_id}: {e}"
))
.into());
}
tracing::warn!(
session_id = %session_id,
error = %e,
"failed to persist recovered session"
);
}

log_store.create_session_log(&session_id).await;
for log_entry in &log_entries {
log_store.append(&session_id, log_entry.clone()).await;
match replay_session(&session_id, &log_entries, &mode_registry) {
Ok(session) => {
if let Err(e) = storage.save_session(&session).await {
if strict_recovery {
return Err(io::Error::other(format!(
"strict recovery: failed to persist recovered session {session_id}: {e}"
))
.into());
}

registry.insert_recovered_session(session_id, session).await;
recovered += 1;
}
Err(e) if strict_recovery => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("strict recovery: failed to replay session {session_id}: {e}"),
)
.into());
}
Err(e) => {
tracing::warn!(
session_id = %session_id,
error = %e,
"failed to replay session; skipping"
"failed to persist recovered session"
);
}

log_store.create_session_log(&session_id).await;
for log_entry in &log_entries {
log_store.append(&session_id, log_entry.clone()).await;
}

registry.insert_recovered_session(session_id, session).await;
recovered += 1;
}
Err(e) if strict_recovery => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("strict recovery: failed to replay session {session_id}: {e}"),
)
.into());
}
Err(e) => {
tracing::warn!(
session_id = %session_id,
error = %e,
"failed to replay session; skipping"
);
}
}
}
Expand Down
Loading
Loading