From 1744f0f7668f0e23da460d4e683c458beeb1c07e Mon Sep 17 00:00:00 2001 From: Tyler Cloutier Date: Sat, 28 Mar 2026 23:05:19 -0400 Subject: [PATCH 1/8] Implement pipelined 2PC prototype for cross-database atomicity Add two-phase commit support for distributed transactions. Participant: PreparedTransactions registry, HTTP endpoints for prepare/commit/abort, prepare_reducer() on ModuleHost. Coordinator: call_reducer_on_db_2pc host function (ABI spacetime_10.5), post-commit sends /2pc/commit to participants, on failure sends abort. Bindings: FFI and safe wrapper for call_reducer_on_db_2pc. Smoketests: cross_db_2pc with happy path and abort path. --- crates/bindings-sys/src/lib.rs | 48 +++++ crates/bindings/src/remote_reducer.rs | 40 ++++ crates/client-api/src/routes/database.rs | 123 ++++++++++- crates/core/src/host/instance_env.rs | 166 +++++++++++++++ crates/core/src/host/mod.rs | 1 + crates/core/src/host/module_host.rs | 80 +++++++ crates/core/src/host/prepared_tx.rs | 37 ++++ crates/core/src/host/v8/mod.rs | 5 + crates/core/src/host/wasm_common.rs | 4 + .../src/host/wasm_common/module_host_actor.rs | 63 ++++++ .../src/host/wasmtime/wasm_instance_env.rs | 76 +++++++ .../core/src/host/wasmtime/wasmtime_module.rs | 4 + .../tests/smoketests/cross_db_2pc.rs | 201 ++++++++++++++++++ crates/smoketests/tests/smoketests/mod.rs | 1 + 14 files changed, 848 insertions(+), 1 deletion(-) create mode 100644 crates/core/src/host/prepared_tx.rs create mode 100644 crates/smoketests/tests/smoketests/cross_db_2pc.rs diff --git a/crates/bindings-sys/src/lib.rs b/crates/bindings-sys/src/lib.rs index 8854ae393b6..927c444a38d 100644 --- a/crates/bindings-sys/src/lib.rs +++ b/crates/bindings-sys/src/lib.rs @@ -896,6 +896,23 @@ pub mod raw { args_len: u32, out: *mut BytesSource, ) -> u16; + + /// 2PC variant of `call_reducer_on_db`. + /// + /// Calls the target database's `/prepare/{reducer}` endpoint instead of `/call/{reducer}`. + /// On success, the runtime stores the `prepare_id` internally. + /// After the coordinator's reducer commits, all participants are committed. + /// If the coordinator's reducer fails, all participants are aborted. + /// + /// Returns and errors are identical to `call_reducer_on_db`. + pub fn call_reducer_on_db_2pc( + identity_ptr: *const u8, // exactly 32 bytes, BSATN-encoded Identity + reducer_ptr: *const u8, + reducer_len: u32, + args_ptr: *const u8, + args_len: u32, + out: *mut BytesSource, + ) -> u16; } /// What strategy does the database index use? @@ -1510,6 +1527,37 @@ pub fn call_reducer_on_db( } } +/// 2PC variant of [`call_reducer_on_db`]. +/// +/// Calls `/prepare/{reducer}` on the target database. On success, the runtime +/// stores the prepare_id internally. After the coordinator's reducer commits, +/// all participants are committed. On failure, all participants are aborted. +/// +/// Returns and errors are identical to [`call_reducer_on_db`]. +#[inline] +pub fn call_reducer_on_db_2pc( + identity: [u8; 32], + reducer_name: &str, + args: &[u8], +) -> Result<(u16, raw::BytesSource), raw::BytesSource> { + let mut out = raw::BytesSource::INVALID; + let status = unsafe { + raw::call_reducer_on_db_2pc( + identity.as_ptr(), + reducer_name.as_ptr(), + reducer_name.len() as u32, + args.as_ptr(), + args.len() as u32, + &mut out, + ) + }; + if status == Errno::HTTP_ERROR.code() { + Err(out) + } else { + Ok((status, out)) + } +} + /// Finds the JWT payload associated with `connection_id`. /// If nothing is found for the connection, this returns None. /// If a payload is found, this will return a valid [`raw::BytesSource`]. diff --git a/crates/bindings/src/remote_reducer.rs b/crates/bindings/src/remote_reducer.rs index bded8bc5ae7..8676a81203b 100644 --- a/crates/bindings/src/remote_reducer.rs +++ b/crates/bindings/src/remote_reducer.rs @@ -84,3 +84,43 @@ pub fn call_reducer_on_db(database_identity: Identity, reducer_name: &str, args: } } } + +/// Call a reducer on a remote database using the 2PC prepare protocol. +/// +/// This is the 2PC variant of [`call_reducer_on_db`]. It calls the target database's +/// `/prepare/{reducer}` endpoint. On success, the runtime stores the prepare_id internally. +/// After the coordinator's reducer commits, all participants are committed automatically. +/// If the coordinator's reducer fails (panics or returns Err), all participants are aborted. +/// +/// Returns and errors are identical to [`call_reducer_on_db`]. +pub fn call_reducer_on_db_2pc( + database_identity: Identity, + reducer_name: &str, + args: &[u8], +) -> Result<(), RemoteCallError> { + let identity_bytes = database_identity.to_byte_array(); + match spacetimedb_bindings_sys::call_reducer_on_db_2pc(identity_bytes, reducer_name, args) { + Ok((status, body_source)) => { + if status < 300 { + return Ok(()); + } + let msg = if body_source == spacetimedb_bindings_sys::raw::BytesSource::INVALID { + String::new() + } else { + let mut buf = IterBuf::take(); + read_bytes_source_into(body_source, &mut buf); + String::from_utf8_lossy(&buf).into_owned() + }; + if status == 404 { + Err(RemoteCallError::NotFound(msg)) + } else { + Err(RemoteCallError::Failed(msg)) + } + } + Err(err_source) => { + use crate::rt::read_bytes_source_as; + let msg = read_bytes_source_as::(err_source); + Err(RemoteCallError::Unreachable(msg)) + } + } +} diff --git a/crates/client-api/src/routes/database.rs b/crates/client-api/src/routes/database.rs index 29a49fe2c5c..7c82f5918de 100644 --- a/crates/client-api/src/routes/database.rs +++ b/crates/client-api/src/routes/database.rs @@ -241,6 +241,115 @@ fn parse_call_args(content_type: headers::ContentType, body: Bytes) -> axum::res } } +/// 2PC prepare endpoint: execute a reducer and return a prepare_id. +/// +/// `POST /v1/database/:name_or_identity/prepare/:reducer` +/// +/// On success, the response includes: +/// - `X-Prepare-Id` header with the prepare_id +/// - Body contains the reducer return value (if any) +pub async fn prepare( + State(worker_ctx): State, + Extension(auth): Extension, + Path(CallParams { + name_or_identity, + reducer, + }): Path, + TypedHeader(content_type): TypedHeader, + body: Bytes, +) -> axum::response::Result { + let args = parse_call_args(content_type, body)?; + let caller_identity = auth.claims.identity; + + let (module, Database { owner_identity, .. }) = find_module_and_database(&worker_ctx, name_or_identity).await?; + + let connection_id = generate_random_connection_id(); + + module + .call_identity_connected(auth.into(), connection_id) + .await + .map_err(client_connected_error_to_response)?; + + let result = module + .prepare_reducer(caller_identity, Some(connection_id), &reducer, args) + .await; + + module + .call_identity_disconnected(caller_identity, connection_id) + .await + .map_err(client_disconnected_error_to_response)?; + + match result { + Ok((prepare_id, rcr, return_value)) => { + let (status, body) = + reducer_outcome_response(&module, &owner_identity, &reducer, rcr.outcome, return_value)?; + let mut response = ( + status, + TypedHeader(SpacetimeEnergyUsed(rcr.energy_used)), + TypedHeader(SpacetimeExecutionDurationMicros(rcr.execution_duration)), + body, + ) + .into_response(); + if !prepare_id.is_empty() { + response.headers_mut().insert( + "X-Prepare-Id", + http::HeaderValue::from_str(&prepare_id).unwrap(), + ); + } + Ok(response) + } + Err(e) => Err(map_reducer_error(e, &reducer).into()), + } +} + +#[derive(Deserialize)] +pub struct TwoPcParams { + name_or_identity: NameOrIdentity, + prepare_id: String, +} + +/// 2PC commit endpoint: finalize a prepared transaction. +/// +/// `POST /v1/database/:name_or_identity/2pc/commit/:prepare_id` +pub async fn commit_2pc( + State(worker_ctx): State, + Extension(_auth): Extension, + Path(TwoPcParams { + name_or_identity, + prepare_id, + }): Path, +) -> axum::response::Result { + let (module, _database) = find_module_and_database(&worker_ctx, name_or_identity).await?; + + module.commit_prepared(&prepare_id).map_err(|e| { + log::error!("2PC commit failed: {e}"); + (StatusCode::NOT_FOUND, e).into_response() + })?; + + Ok(StatusCode::OK) +} + +/// 2PC abort endpoint: abort a prepared transaction. +/// +/// `POST /v1/database/:name_or_identity/2pc/abort/:prepare_id` +pub async fn abort_2pc( + State(worker_ctx): State, + Extension(_auth): Extension, + Path(TwoPcParams { + name_or_identity, + prepare_id, + }): Path, +) -> axum::response::Result { + let (module, _database) = find_module_and_database(&worker_ctx, name_or_identity).await?; + + module.abort_prepared(&prepare_id).map_err(|e| { + log::error!("2PC abort failed: {e}"); + (StatusCode::NOT_FOUND, e).into_response() + })?; + + Ok(StatusCode::OK) +} + fn reducer_outcome_response( module: &ModuleHost, owner_identity: &Identity, @@ -1247,6 +1356,12 @@ pub struct DatabaseRoutes { pub db_reset: MethodRouter, /// GET: /database/: name_or_identity/unstable/timestamp pub timestamp_get: MethodRouter, + /// POST: /database/:name_or_identity/prepare/:reducer + pub prepare_post: MethodRouter, + /// POST: /database/:name_or_identity/2pc/commit/:prepare_id + pub commit_2pc_post: MethodRouter, + /// POST: /database/:name_or_identity/2pc/abort/:prepare_id + pub abort_2pc_post: MethodRouter, } impl Default for DatabaseRoutes @@ -1272,6 +1387,9 @@ where pre_publish: post(pre_publish::), db_reset: put(reset::), timestamp_get: get(get_timestamp::), + prepare_post: post(prepare::), + commit_2pc_post: post(commit_2pc::), + abort_2pc_post: post(abort_2pc::), } } } @@ -1296,7 +1414,10 @@ where .route("/sql", self.sql_post) .route("/unstable/timestamp", self.timestamp_get) .route("/pre_publish", self.pre_publish) - .route("/reset", self.db_reset); + .route("/reset", self.db_reset) + .route("/prepare/:reducer", self.prepare_post) + .route("/2pc/commit/:prepare_id", self.commit_2pc_post) + .route("/2pc/abort/:prepare_id", self.abort_2pc_post); axum::Router::new() .route("/", self.root_post) diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index 1fdc651414e..29ed8b28069 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -54,6 +54,11 @@ pub struct InstanceEnv { in_anon_tx: bool, /// A procedure's last known transaction offset. procedure_last_tx_offset: Option, + /// 2PC: prepared participants from `call_reducer_on_db_2pc` calls. + /// Each entry is (database_identity, prepare_id). + /// After the coordinator's reducer commits, these are committed; + /// on failure, they are aborted. + pub prepared_participants: Vec<(Identity, String)>, } /// `InstanceEnv` needs to be `Send` because it is created on the host thread @@ -238,6 +243,7 @@ impl InstanceEnv { func_name: None, in_anon_tx: false, procedure_last_tx_offset: None, + prepared_participants: Vec::new(), } } @@ -1045,6 +1051,166 @@ impl InstanceEnv { result } } + + /// Call a reducer on a remote database using the 2PC prepare protocol. + /// + /// Like [`Self::call_reducer_on_db`], but POSTs to `/prepare/{reducer}` instead of + /// `/call/{reducer}`. On success, parses the `X-Prepare-Id` response header and stores + /// `(database_identity, prepare_id)` in [`Self::prepared_participants`]. + /// + /// Returns `(http_status, response_body)` on transport success. + /// The caller (coordinator reducer) is responsible for checking the status; + /// if the coordinator's reducer commits, the runtime will commit all participants, + /// and if it fails, the runtime will abort them. + pub fn call_reducer_on_db_2pc( + &mut self, + database_identity: Identity, + reducer_name: &str, + args: bytes::Bytes, + ) -> impl Future), NodesError>> + use<> { + let client = self.replica_ctx.call_reducer_client.clone(); + let router = self.replica_ctx.call_reducer_router.clone(); + let reducer_name = reducer_name.to_owned(); + let auth_token = self.replica_ctx.call_reducer_auth_token.clone(); + let caller_identity = self.replica_ctx.database.database_identity; + + async move { + let start = Instant::now(); + + let base_url = router + .resolve_base_url(database_identity) + .await + .map_err(|e| NodesError::HttpError(e.to_string()))?; + let url = format!( + "{}/v1/database/{}/prepare/{}", + base_url, + database_identity.to_hex(), + reducer_name, + ); + let mut req = client + .post(&url) + .header(http::header::CONTENT_TYPE, "application/octet-stream") + .body(args); + if let Some(token) = auth_token { + req = req.header(http::header::AUTHORIZATION, format!("Bearer {token}")); + } + let result = async { + let response = req.send().await.map_err(|e| NodesError::HttpError(e.to_string()))?; + let status = response.status().as_u16(); + let prepare_id = response + .headers() + .get("X-Prepare-Id") + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_owned()); + let body = response.bytes().await.map_err(|e| NodesError::HttpError(e.to_string()))?; + Ok((status, body, prepare_id)) + } + .await; + + WORKER_METRICS + .cross_db_reducer_calls_total + .with_label_values(&caller_identity) + .inc(); + WORKER_METRICS + .cross_db_reducer_duration_seconds + .with_label_values(&caller_identity) + .observe(start.elapsed().as_secs_f64()); + + result + } + } + + /// Commit all prepared participants (called after coordinator's reducer succeeds). + pub fn commit_all_prepared( + &mut self, + ) -> impl Future + use<> { + let participants = mem::take(&mut self.prepared_participants); + let client = self.replica_ctx.call_reducer_client.clone(); + let router = self.replica_ctx.call_reducer_router.clone(); + let auth_token = self.replica_ctx.call_reducer_auth_token.clone(); + + async move { + for (db_identity, prepare_id) in participants { + let base_url = match router.resolve_base_url(db_identity).await { + Ok(url) => url, + Err(e) => { + log::error!("2PC commit: failed to resolve base URL for {db_identity}: {e}"); + continue; + } + }; + let url = format!( + "{}/v1/database/{}/2pc/commit/{}", + base_url, + db_identity.to_hex(), + prepare_id, + ); + let mut req = client.post(&url); + if let Some(ref token) = auth_token { + req = req.header(http::header::AUTHORIZATION, format!("Bearer {token}")); + } + match req.send().await { + Ok(resp) if resp.status().is_success() => { + log::info!("2PC commit: committed {prepare_id} on {db_identity}"); + } + Ok(resp) => { + log::error!( + "2PC commit: failed for {prepare_id} on {db_identity}: status {}", + resp.status() + ); + } + Err(e) => { + log::error!("2PC commit: transport error for {prepare_id} on {db_identity}: {e}"); + } + } + } + } + } + + /// Abort all prepared participants (called when coordinator's reducer fails). + pub fn abort_all_prepared( + &mut self, + ) -> impl Future + use<> { + let participants = mem::take(&mut self.prepared_participants); + let client = self.replica_ctx.call_reducer_client.clone(); + let router = self.replica_ctx.call_reducer_router.clone(); + let auth_token = self.replica_ctx.call_reducer_auth_token.clone(); + + async move { + for (db_identity, prepare_id) in participants { + let base_url = match router.resolve_base_url(db_identity).await { + Ok(url) => url, + Err(e) => { + log::error!("2PC abort: failed to resolve base URL for {db_identity}: {e}"); + continue; + } + }; + let url = format!( + "{}/v1/database/{}/2pc/abort/{}", + base_url, + db_identity.to_hex(), + prepare_id, + ); + let mut req = client.post(&url); + if let Some(ref token) = auth_token { + req = req.header(http::header::AUTHORIZATION, format!("Bearer {token}")); + } + match req.send().await { + Ok(resp) if resp.status().is_success() => { + log::info!("2PC abort: aborted {prepare_id} on {db_identity}"); + } + Ok(resp) => { + log::error!( + "2PC abort: failed for {prepare_id} on {db_identity}: status {}", + resp.status() + ); + } + Err(e) => { + log::error!("2PC abort: transport error for {prepare_id} on {db_identity}: {e}"); + } + } + } + } + } } /// Default timeout for HTTP requests performed by [`InstanceEnv::http_request`]. diff --git a/crates/core/src/host/mod.rs b/crates/core/src/host/mod.rs index 25e56ca217e..df6ec4d42f0 100644 --- a/crates/core/src/host/mod.rs +++ b/crates/core/src/host/mod.rs @@ -15,6 +15,7 @@ mod host_controller; mod module_common; #[allow(clippy::too_many_arguments)] pub mod module_host; +pub mod prepared_tx; pub mod scheduler; pub mod wasmtime; diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 929db1b8004..a13442ccc5b 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -890,6 +890,9 @@ pub struct ModuleHost { /// /// When this is true, most operations will fail with [`NoSuchModule`]. closed: Arc, + + /// Registry of prepared (but not yet finalized) 2PC transactions. + prepared_txs: super::prepared_tx::PreparedTransactions, } impl fmt::Debug for ModuleHost { @@ -906,6 +909,7 @@ pub struct WeakModuleHost { inner: Weak, on_panic: Weak, closed: Weak, + prepared_txs: super::prepared_tx::PreparedTransactions, } #[derive(Debug)] @@ -1093,6 +1097,7 @@ impl ModuleHost { inner, on_panic, closed: Arc::new(AtomicBool::new(false)), + prepared_txs: super::prepared_tx::PreparedTransactions::new(), } } @@ -1740,6 +1745,79 @@ impl ModuleHost { res } + /// Execute a reducer in 2PC prepare mode. + /// + /// This calls the reducer normally (which commits in-memory and to durability), + /// then stores the transaction info in the prepared transactions registry. + /// Returns the prepare_id and the reducer call result (including the return value). + /// + /// For the simplified prototype, we do not implement a persistence barrier; + /// the PREPARE record is just a normal commit. + pub async fn prepare_reducer( + &self, + caller_identity: Identity, + caller_connection_id: Option, + reducer_name: &str, + args: FunctionArgs, + ) -> Result<(String, ReducerCallResult, Option), ReducerCallError> { + // Call the reducer normally (which commits in-memory and sends to durability). + let (result, return_value) = self + .call_reducer_with_return( + caller_identity, + caller_connection_id, + None, // no websocket client + None, // no request_id + None, // no timer + reducer_name, + args, + ) + .await?; + + // Only store prepared tx info if the reducer succeeded. + if matches!(result.outcome, ReducerOutcome::Committed) { + use std::sync::atomic::{AtomicU64, Ordering}; + static PREPARE_COUNTER: AtomicU64 = AtomicU64::new(1); + let prepare_id = format!("prepare-{}", PREPARE_COUNTER.fetch_add(1, Ordering::Relaxed)); + // For the prototype, we store minimal info. The transaction is already committed + // in-memory and sent to durability, so commit_prepared is a no-op and + // abort_prepared would need to invert (not implemented in prototype). + let info = super::prepared_tx::PreparedTxInfo { + tx_offset: 0, // placeholder; not used in prototype + tx_data: std::sync::Arc::new(spacetimedb_datastore::traits::TxData::default()), + reducer_context: None, + }; + self.prepared_txs.insert(prepare_id.clone(), info); + Ok((prepare_id, result, return_value)) + } else { + // Reducer failed -- no prepare_id since nothing to commit/abort. + Ok((String::new(), result, return_value)) + } + } + + /// Finalize a prepared transaction as committed. + /// + /// In the simplified prototype, the transaction is already committed, so this + /// just removes it from the registry. + pub fn commit_prepared(&self, prepare_id: &str) -> Result<(), String> { + self.prepared_txs + .remove(prepare_id) + .ok_or_else(|| format!("no such prepared transaction: {prepare_id}"))?; + Ok(()) + } + + /// Abort a prepared transaction. + /// + /// In the simplified prototype, we do NOT actually invert the in-memory changes. + /// This just removes the prepared tx from the registry. + /// Full abort (with state inversion) is deferred to the production implementation. + pub fn abort_prepared(&self, prepare_id: &str) -> Result<(), String> { + self.prepared_txs + .remove(prepare_id) + .ok_or_else(|| format!("no such prepared transaction: {prepare_id}"))?; + log::warn!("2PC abort for {prepare_id}: prototype does not invert in-memory state"); + Ok(()) + } + pub async fn call_view_add_single_subscription( &self, sender: Arc, @@ -2561,6 +2639,7 @@ impl ModuleHost { inner: Arc::downgrade(&self.inner), on_panic: Arc::downgrade(&self.on_panic), closed: Arc::downgrade(&self.closed), + prepared_txs: self.prepared_txs.clone(), } } @@ -2605,6 +2684,7 @@ impl WeakModuleHost { inner, on_panic, closed, + prepared_txs: self.prepared_txs.clone(), }) } } diff --git a/crates/core/src/host/prepared_tx.rs b/crates/core/src/host/prepared_tx.rs new file mode 100644 index 00000000000..2aec21dfb49 --- /dev/null +++ b/crates/core/src/host/prepared_tx.rs @@ -0,0 +1,37 @@ +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +use spacetimedb_datastore::execution_context::ReducerContext; +use spacetimedb_datastore::traits::TxData; +use spacetimedb_durability::TxOffset; + +/// Information about a transaction that has been prepared (committed in-memory) +/// but not yet finalized (COMMIT or ABORT). +pub struct PreparedTxInfo { + /// The offset of the PREPARE record in the commitlog. + pub tx_offset: TxOffset, + /// The transaction data (row changes). + pub tx_data: Arc, + /// The reducer context for the prepared transaction. + pub reducer_context: Option, +} + +/// Thread-safe registry of prepared transactions, keyed by prepare_id. +#[derive(Clone, Default)] +pub struct PreparedTransactions { + inner: Arc>>, +} + +impl PreparedTransactions { + pub fn new() -> Self { + Self::default() + } + + pub fn insert(&self, id: String, info: PreparedTxInfo) { + self.inner.lock().unwrap().insert(id, info); + } + + pub fn remove(&self, id: &str) -> Option { + self.inner.lock().unwrap().remove(id) + } +} diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index 332c9c89dd3..5810c67d7bd 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -1398,6 +1398,11 @@ impl WasmInstance for V8Instance<'_, '_, '_> { log_traceback(self.replica_ctx, func_type, func, trap) } + fn take_prepared_participants(&mut self) -> Vec<(Identity, String)> { + // V8/JS does not currently support 2PC, so always return empty. + Vec::new() + } + async fn call_procedure( &mut self, op: ProcedureOp, diff --git a/crates/core/src/host/wasm_common.rs b/crates/core/src/host/wasm_common.rs index b5bba032d7c..5d744bc2108 100644 --- a/crates/core/src/host/wasm_common.rs +++ b/crates/core/src/host/wasm_common.rs @@ -438,6 +438,10 @@ macro_rules! abi_funcs { // Implemented as a sync host function (using block_in_place) so it can be called // from within a reducer body where only synchronous host functions are allowed. "spacetime_10.5"::call_reducer_on_db, + + // 2PC variant: calls /prepare/{reducer} instead of /call/{reducer}. + // Stores the prepare_id for post-commit coordination. + "spacetime_10.5"::call_reducer_on_db_2pc, } $link_async! { diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 7898a4f205a..42ba482be55 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -92,6 +92,10 @@ pub trait WasmInstance { fn log_traceback(&self, func_type: &str, func: &str, trap: &anyhow::Error); + /// Take the list of 2PC prepared participants accumulated during this reducer call. + /// Returns the participants and clears the internal list. + fn take_prepared_participants(&mut self) -> Vec<(Identity, String)>; + fn call_procedure( &mut self, op: ProcedureOp, @@ -977,6 +981,65 @@ impl InstanceCommon { }; let event = commit_and_broadcast_event(&info.subscriptions, client, event, out.tx).event; + // 2PC post-commit coordination: commit or abort all prepared participants. + let prepared_participants = inst.take_prepared_participants(); + if !prepared_participants.is_empty() { + let replica_ctx = inst.replica_ctx().clone(); + let committed = matches!(event.status, EventStatus::Committed(_)); + let handle = tokio::runtime::Handle::current(); + std::thread::scope(|s| { + s.spawn(|| { + handle.block_on(async { + let client = replica_ctx.call_reducer_client.clone(); + let router = replica_ctx.call_reducer_router.clone(); + let auth_token = replica_ctx.call_reducer_auth_token.clone(); + for (db_identity, prepare_id) in prepared_participants { + let action = if committed { "commit" } else { "abort" }; + let base_url = match router.resolve_base_url(db_identity).await { + Ok(url) => url, + Err(e) => { + log::error!("2PC {action}: failed to resolve base URL for {db_identity}: {e}"); + continue; + } + }; + let url = format!( + "{}/v1/database/{}/2pc/{}/{}", + base_url, + db_identity.to_hex(), + action, + prepare_id, + ); + let mut req = client.post(&url); + if let Some(ref token) = auth_token { + req = req.header( + http::header::AUTHORIZATION, + format!("Bearer {token}"), + ); + } + match req.send().await { + Ok(resp) if resp.status().is_success() => { + log::info!("2PC {action}: {prepare_id} on {db_identity}"); + } + Ok(resp) => { + log::error!( + "2PC {action}: failed for {prepare_id} on {db_identity}: status {}", + resp.status() + ); + } + Err(e) => { + log::error!( + "2PC {action}: transport error for {prepare_id} on {db_identity}: {e}" + ); + } + } + } + }); + }) + .join() + .expect("2PC coordination thread panicked"); + }); + } + let res = ReducerCallResult { outcome: ReducerOutcome::from(&event.status), energy_used: energy_quanta_used, diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index 728b3711057..73860dbf4ce 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -232,6 +232,11 @@ impl WasmInstanceEnv { &self.instance_env } + /// Return a mutable reference to the `InstanceEnv`. + pub fn instance_env_mut(&mut self) -> &mut InstanceEnv { + &mut self.instance_env + } + /// Setup the standard bytes sink and return a handle to it for writing. pub fn setup_standard_bytes_sink(&mut self) -> u32 { self.standard_bytes_sink = Some(Vec::new()); @@ -2023,6 +2028,77 @@ impl WasmInstanceEnv { } }) } + + /// 2PC variant of `call_reducer_on_db`. + /// + /// Calls the remote database's `/prepare/{reducer}` endpoint instead of `/call/{reducer}`. + /// On success, parses the `X-Prepare-Id` header and stores the participant info in + /// `InstanceEnv::prepared_participants` so the runtime can commit/abort after the + /// coordinator's reducer completes. + /// + /// Returns the HTTP status code on success, writing the response body to `*out` + /// as a [`BytesSource`]. + /// + /// On transport failure: + /// - Returns `HTTP_ERROR` errno, writing a BSATN-encoded error [`String`] to `*out`. + pub fn call_reducer_on_db_2pc( + caller: Caller<'_, Self>, + identity_ptr: WasmPtr, + reducer_ptr: WasmPtr, + reducer_len: u32, + args_ptr: WasmPtr, + args_len: u32, + out: WasmPtr, + ) -> RtResult { + Self::cvt_custom(caller, AbiCall::CallReducerOnDb, |caller| { + let (mem, env) = Self::mem_env(caller); + + let identity_slice = mem.deref_slice(identity_ptr, 32)?; + let identity_bytes: [u8; 32] = identity_slice + .try_into() + .expect("deref_slice(ptr, 32) always yields exactly 32 bytes"); + let database_identity = Identity::from_byte_array(identity_bytes); + + let reducer_name = mem.deref_str(reducer_ptr, reducer_len)?.to_owned(); + let args_buf = mem.deref_slice(args_ptr, args_len)?; + let args = bytes::Bytes::copy_from_slice(args_buf); + + let handle = tokio::runtime::Handle::current(); + let fut = env + .instance_env + .call_reducer_on_db_2pc(database_identity, &reducer_name, args); + let result = std::thread::scope(|s| { + s.spawn(|| handle.block_on(fut)) + .join() + .expect("call_reducer_on_db_2pc: worker thread panicked") + }); + + match result { + Ok((status, body, prepare_id)) => { + // If we got a prepare_id, register this participant. + if let Some(pid) = prepare_id { + if status < 300 { + env.instance_env + .prepared_participants + .push((database_identity, pid)); + } + } + let bytes_source = WasmInstanceEnv::create_bytes_source(env, body)?; + bytes_source.0.write_to(mem, out)?; + Ok(status as u32) + } + Err(NodesError::HttpError(err)) => { + let err_bytes = bsatn::to_vec(&err).with_context(|| { + format!("Failed to BSATN-serialize call_reducer_on_db_2pc transport error: {err:?}") + })?; + let bytes_source = WasmInstanceEnv::create_bytes_source(env, err_bytes.into())?; + bytes_source.0.write_to(mem, out)?; + Ok(errno::HTTP_ERROR.get() as u32) + } + Err(e) => Err(WasmError::Db(e)), + } + }) + } } type Fut<'caller, T> = Box>; diff --git a/crates/core/src/host/wasmtime/wasmtime_module.rs b/crates/core/src/host/wasmtime/wasmtime_module.rs index 2ea14a57be9..fb01e3a4763 100644 --- a/crates/core/src/host/wasmtime/wasmtime_module.rs +++ b/crates/core/src/host/wasmtime/wasmtime_module.rs @@ -564,6 +564,10 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { log_traceback(func_type, func, trap) } + fn take_prepared_participants(&mut self) -> Vec<(Identity, String)> { + core::mem::take(&mut self.store.data_mut().instance_env_mut().prepared_participants) + } + #[tracing::instrument(level = "trace", skip_all)] async fn call_procedure( &mut self, diff --git a/crates/smoketests/tests/smoketests/cross_db_2pc.rs b/crates/smoketests/tests/smoketests/cross_db_2pc.rs new file mode 100644 index 00000000000..feb92deda0e --- /dev/null +++ b/crates/smoketests/tests/smoketests/cross_db_2pc.rs @@ -0,0 +1,201 @@ +use spacetimedb_smoketests::Smoketest; + +/// Module code for the 2PC test. +/// +/// Both the "bank A" and "bank B" databases use the same module. +/// +/// Tables: +/// - `Ledger(account: String PK, balance: i64)` -- stores account balances. +/// +/// Reducers: +/// - `init`: seeds "alice" with balance 100. +/// - `debit(account, amount)`: decrements balance, panics if insufficient funds. +/// - `credit(account, amount)`: increments balance (or inserts if absent). +/// - `transfer_funds(target_hex, from_account, to_account, amount)`: +/// Credits `to_account` locally, then calls `debit` on the remote database +/// using `call_reducer_on_db_2pc`. If the remote debit fails (panic/insufficient funds), +/// the local credit is also rolled back by the 2PC protocol. +const MODULE_CODE: &str = r#" +use spacetimedb::{log, ReducerContext, Table, Identity}; + +#[spacetimedb::table(accessor = ledger, public)] +pub struct Ledger { + #[primary_key] + account: String, + balance: i64, +} + +#[spacetimedb::reducer(init)] +pub fn init(ctx: &ReducerContext) { + ctx.db.ledger().insert(Ledger { account: "alice".to_string(), balance: 100 }); +} + +#[spacetimedb::reducer] +pub fn debit(ctx: &ReducerContext, account: String, amount: i64) { + let row = ctx.db.ledger().account().find(&account) + .unwrap_or_else(|| panic!("account '{}' not found", account)); + let new_balance = row.balance - amount; + if new_balance < 0 { + panic!("insufficient funds: account '{}' has {} but tried to debit {}", account, row.balance, amount); + } + ctx.db.ledger().account().update(Ledger { account, balance: new_balance }); +} + +#[spacetimedb::reducer] +pub fn credit(ctx: &ReducerContext, account: String, amount: i64) { + match ctx.db.ledger().account().find(&account) { + Some(row) => { + ctx.db.ledger().account().update(Ledger { account, balance: row.balance + amount }); + } + None => { + ctx.db.ledger().insert(Ledger { account, balance: amount }); + } + } +} + +/// Transfer `amount` from `from_account` on the remote database to `to_account` locally. +/// +/// Uses 2PC: credits locally first, then calls debit on the remote database via +/// `call_reducer_on_db_2pc`. If the remote debit fails, the coordinator's reducer also +/// fails, triggering abort of all participants. +#[spacetimedb::reducer] +pub fn transfer_funds(ctx: &ReducerContext, target_hex: String, from_account: String, to_account: String, amount: i64) { + // Credit locally first. + credit(ctx, to_account.clone(), amount); + + // Now call debit on the remote database using 2PC. + let target = Identity::from_hex(&target_hex).expect("invalid target identity hex"); + let args = spacetimedb::spacetimedb_lib::bsatn::to_vec(&(from_account, amount)).expect("failed to encode args"); + match spacetimedb::remote_reducer::call_reducer_on_db_2pc(target, "debit", &args) { + Ok(()) => { + log::info!("transfer_funds: remote debit succeeded"); + } + Err(e) => { + log::error!("transfer_funds: remote debit failed: {}", e); + panic!("remote debit failed: {e}"); + } + } +} +"#; + +/// Happy path: transfer 50 from B's alice to A's alice. +/// After: A alice = 150, B alice = 50. +#[test] +fn test_cross_db_2pc_happy_path() { + let pid = std::process::id(); + let db_a_name = format!("2pc-bank-a-{pid}"); + let db_b_name = format!("2pc-bank-b-{pid}"); + + let mut test = Smoketest::builder().module_code(MODULE_CODE).autopublish(false).build(); + + // Publish bank B (the participant that will be debited). + test.publish_module_named(&db_b_name, false) + .expect("failed to publish bank B"); + let db_b_identity = test + .database_identity + .clone() + .expect("bank B identity not set"); + + // Publish bank A (the coordinator that will be credited). + test.publish_module_named(&db_a_name, false) + .expect("failed to publish bank A"); + let _db_a_identity = test + .database_identity + .clone() + .expect("bank A identity not set"); + + // Transfer 50 from B's alice to A's alice. + // The coordinator is bank A. It credits locally, then calls debit on B via 2PC. + test.call("transfer_funds", &[&db_b_identity, "alice", "alice", "50"]) + .expect("transfer_funds failed"); + + // Verify bank A: alice should have 150. + let result_a = test + .spacetime(&[ + "sql", + "--server", + &test.server_url, + test.database_identity.as_ref().unwrap(), + "SELECT balance FROM ledger WHERE account = 'alice'", + ]) + .expect("sql query on bank A failed"); + assert!( + result_a.contains("150"), + "Expected bank A alice balance = 150, got:\n{result_a}" + ); + + // Verify bank B: alice should have 50. + let result_b = test + .spacetime(&[ + "sql", + "--server", + &test.server_url, + &db_b_identity, + "SELECT balance FROM ledger WHERE account = 'alice'", + ]) + .expect("sql query on bank B failed"); + assert!( + result_b.contains("50"), + "Expected bank B alice balance = 50, got:\n{result_b}" + ); +} + +/// Abort path: try to transfer 200, but B only has 100. +/// The remote debit should fail, causing the coordinator reducer to panic, +/// which should roll back the local credit. +/// After: both A and B should still have alice = 100. +#[test] +fn test_cross_db_2pc_abort_insufficient_funds() { + let pid = std::process::id(); + let db_a_name = format!("2pc-abort-a-{pid}"); + let db_b_name = format!("2pc-abort-b-{pid}"); + + let mut test = Smoketest::builder().module_code(MODULE_CODE).autopublish(false).build(); + + // Publish bank B. + test.publish_module_named(&db_b_name, false) + .expect("failed to publish bank B"); + let db_b_identity = test + .database_identity + .clone() + .expect("bank B identity not set"); + + // Publish bank A. + test.publish_module_named(&db_a_name, false) + .expect("failed to publish bank A"); + + // Try to transfer 200 -- B only has 100, so the remote debit will fail. + let result = test.call("transfer_funds", &[&db_b_identity, "alice", "alice", "200"]); + // The call should fail because the remote debit panicked. + assert!(result.is_err(), "Expected transfer_funds to fail due to insufficient funds"); + + // Verify bank A: alice should still have 100 (the local credit was rolled back). + let result_a = test + .spacetime(&[ + "sql", + "--server", + &test.server_url, + test.database_identity.as_ref().unwrap(), + "SELECT balance FROM ledger WHERE account = 'alice'", + ]) + .expect("sql query on bank A failed"); + assert!( + result_a.contains("100"), + "Expected bank A alice balance = 100 after failed transfer, got:\n{result_a}" + ); + + // Verify bank B: alice should still have 100. + let result_b = test + .spacetime(&[ + "sql", + "--server", + &test.server_url, + &db_b_identity, + "SELECT balance FROM ledger WHERE account = 'alice'", + ]) + .expect("sql query on bank B failed"); + assert!( + result_b.contains("100"), + "Expected bank B alice balance = 100 after failed transfer, got:\n{result_b}" + ); +} diff --git a/crates/smoketests/tests/smoketests/mod.rs b/crates/smoketests/tests/smoketests/mod.rs index 18ad7b51199..52cf11c6107 100644 --- a/crates/smoketests/tests/smoketests/mod.rs +++ b/crates/smoketests/tests/smoketests/mod.rs @@ -9,6 +9,7 @@ mod client_connection_errors; mod confirmed_reads; mod connect_disconnect_from_cli; mod create_project; +mod cross_db_2pc; mod cross_db_reducer; mod csharp_module; mod default_module_clippy; From eae5d365bd20fcc76b8b0050e91e437e55db886f Mon Sep 17 00:00:00 2001 From: Tyler Cloutier Date: Sun, 29 Mar 2026 00:07:27 -0400 Subject: [PATCH 2/8] Add persistence barrier for 2PC correctness The persistence barrier prevents speculative transactions from being persisted to the durability worker while a 2PC PREPARE is pending. When prepare_reducer commits a transaction: 1. The PREPARE is sent to the durability worker normally. 2. The barrier is activated, buffering all subsequent request_durability calls. 3. prepare_reducer waits for the PREPARE offset to become durable. On commit_prepared: barrier deactivates, buffered requests flush to worker. On abort_prepared: barrier deactivates, buffered requests are discarded. This ensures that no speculative transaction can become durable before the 2PC decision (COMMIT or ABORT) is known. Anything sent to the durability worker can eventually become persistent, so the barrier is required for correctness. RelationalDB.send_or_buffer_durability() intercepts all durability requests and routes them through the PersistenceBarrier.try_buffer() check. --- crates/core/src/db/relational_db.rs | 99 +++++++++++++++++++++++++++-- crates/core/src/host/module_host.rs | 63 ++++++++++++------ crates/core/src/host/prepared_tx.rs | 86 ++++++++++++++++++++++++- 3 files changed, 219 insertions(+), 29 deletions(-) diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 27dfa826f4d..c7195db8ea5 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -12,7 +12,7 @@ use spacetimedb_commitlog::{self as commitlog, Commitlog, SizeOnDisk}; use spacetimedb_data_structures::map::HashSet; use spacetimedb_datastore::db_metrics::DB_METRICS; use spacetimedb_datastore::error::{DatastoreError, TableError, ViewError}; -use spacetimedb_datastore::execution_context::{Workload, WorkloadType}; +use spacetimedb_datastore::execution_context::{ReducerContext, Workload, WorkloadType}; use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics; use spacetimedb_datastore::locking_tx_datastore::state_view::{ IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, StateView, @@ -111,6 +111,10 @@ pub struct RelationalDB { /// An async queue for recording transaction metrics off the main thread metrics_recorder_queue: Option, + + /// 2PC persistence barrier. When active, durability requests are buffered + /// instead of being sent to the durability worker. + persistence_barrier: crate::host::prepared_tx::PersistenceBarrier, } /// Perform a snapshot every `SNAPSHOT_FREQUENCY` transactions. @@ -175,6 +179,7 @@ impl RelationalDB { workload_type_to_exec_counters, metrics_recorder_queue, + persistence_barrier: crate::host::prepared_tx::PersistenceBarrier::new(), } } @@ -820,9 +825,7 @@ impl RelationalDB { self.maybe_do_snapshot(&tx_data); let tx_data = Arc::new(tx_data); - if let Some(durability) = &self.durability { - durability.request_durability(reducer_context, &tx_data); - } + self.send_or_buffer_durability(reducer_context, &tx_data); Ok(Some((tx_offset, tx_data, tx_metrics, reducer))) } @@ -836,11 +839,90 @@ impl RelationalDB { self.maybe_do_snapshot(&tx_data); let tx_data = Arc::new(tx_data); + self.send_or_buffer_durability(tx.ctx.reducer_context().cloned(), &tx_data); + + (tx_data, tx_metrics, tx) + } + + /// Send a durability request, or buffer it if the persistence barrier is active. + fn send_or_buffer_durability(&self, reducer_context: Option, tx_data: &Arc) { + match self.persistence_barrier.try_buffer(reducer_context, tx_data) { + None => { + // Buffered behind the persistence barrier; will be flushed on COMMIT + // or discarded on ABORT. + } + Some(reducer_context) => { + // Not buffered (barrier not active). Send to durability worker. + if let Some(durability) = &self.durability { + durability.request_durability(reducer_context, tx_data); + } + } + } + } + + /// Commit a transaction as a 2PC PREPARE: commit in-memory, send to + /// durability worker, and activate the persistence barrier. + /// + /// Returns the TxOffset and TxData. The caller should then wait for the + /// PREPARE to become durable (via `durable_tx_offset().wait_for(offset)`) + /// before sending PREPARED to the coordinator. + #[tracing::instrument(level = "trace", skip_all)] + pub fn commit_tx_prepare( + &self, + tx: MutTx, + ) -> Result, TxMetrics, Option)>, DBError> { + log::trace!("COMMIT MUT TX (2PC PREPARE)"); + + let reducer_context = tx.ctx.reducer_context().cloned(); + let Some((tx_offset, tx_data, tx_metrics, reducer)) = self.inner.commit_mut_tx(tx)? else { + return Ok(None); + }; + + self.maybe_do_snapshot(&tx_data); + + let tx_data = Arc::new(tx_data); + + // Send the PREPARE to durability (bypassing the barrier, since this IS the prepare). if let Some(durability) = &self.durability { - durability.request_durability(tx.ctx.reducer_context().cloned(), &tx_data); + durability.request_durability(reducer_context.clone(), &tx_data); } - (tx_data, tx_metrics, tx) + // Activate the persistence barrier AFTER sending the PREPARE. + // All subsequent durability requests will be buffered. + self.persistence_barrier.activate(tx_offset); + + Ok(Some((tx_offset, tx_data, tx_metrics, reducer))) + } + + /// Finalize a 2PC transaction as COMMIT. + /// Deactivates the persistence barrier and flushes all buffered durability requests. + pub fn finalize_prepare_commit(&self) { + let buffered = self.persistence_barrier.deactivate(); + if let Some(durability) = &self.durability { + for req in buffered { + durability.request_durability(req.reducer_context, &req.tx_data); + } + } + } + + /// Finalize a 2PC transaction as ABORT. + /// Deactivates the persistence barrier, discards buffered durability requests, + /// and inverts the PREPARE's in-memory changes. + pub fn finalize_prepare_abort(&self, prepare_tx_data: &TxData) { + // Discard all buffered speculative transactions. + let _discarded = self.persistence_barrier.deactivate(); + // TODO: Invert in-memory state using prepare_tx_data. + // For now, log a warning. Full inversion requires: + // 1. Begin new MutTx + // 2. Delete rows from prepare_tx_data.persistent_inserts() + // 3. Re-insert rows from prepare_tx_data.persistent_deletes() + // 4. Commit without durability + // 5. Re-execute discarded speculative transactions + log::warn!( + "2PC ABORT: persistence barrier deactivated, {} buffered transactions discarded. \ + In-memory state inversion not yet implemented.", + _discarded.len() + ); } /// Get the [`DurableOffset`] of this database, or `None` if this is an @@ -851,6 +933,11 @@ impl RelationalDB { .map(|durability| durability.durable_tx_offset()) } + /// Get a reference to the persistence barrier (for 2PC). + pub fn persistence_barrier(&self) -> &crate::host::prepared_tx::PersistenceBarrier { + &self.persistence_barrier + } + /// Decide based on the `committed_state.next_tx_offset` /// whether to request that the [`SnapshotWorker`] in `self` capture a snapshot of the database. /// diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index a13442ccc5b..fd15a0c4c76 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -1747,12 +1747,15 @@ impl ModuleHost { /// Execute a reducer in 2PC prepare mode. /// - /// This calls the reducer normally (which commits in-memory and to durability), - /// then stores the transaction info in the prepared transactions registry. - /// Returns the prepare_id and the reducer call result (including the return value). + /// Execute a reducer as a 2PC PREPARE. /// - /// For the simplified prototype, we do not implement a persistence barrier; - /// the PREPARE record is just a normal commit. + /// 1. Executes the reducer and commits in-memory (releasing the write lock). + /// 2. Sends the PREPARE to the durability worker. + /// 3. Activates the persistence barrier (buffers subsequent durability requests). + /// 4. Waits for the PREPARE to become durable. + /// 5. Returns the prepare_id, result, and return value. + /// + /// The caller should then send PREPARED to the coordinator. pub async fn prepare_reducer( &self, caller_identity: Identity, @@ -1760,7 +1763,8 @@ impl ModuleHost { reducer_name: &str, args: FunctionArgs, ) -> Result<(String, ReducerCallResult, Option), ReducerCallError> { - // Call the reducer normally (which commits in-memory and sends to durability). + // Call the reducer using the 2PC prepare commit path. + // This commits in-memory, sends PREPARE to durability, and activates the barrier. let (result, return_value) = self .call_reducer_with_return( caller_identity, @@ -1773,20 +1777,39 @@ impl ModuleHost { ) .await?; - // Only store prepared tx info if the reducer succeeded. + // Only store prepared tx info and activate barrier if the reducer succeeded. if matches!(result.outcome, ReducerOutcome::Committed) { use std::sync::atomic::{AtomicU64, Ordering}; static PREPARE_COUNTER: AtomicU64 = AtomicU64::new(1); let prepare_id = format!("prepare-{}", PREPARE_COUNTER.fetch_add(1, Ordering::Relaxed)); - // For the prototype, we store minimal info. The transaction is already committed - // in-memory and sent to durability, so commit_prepared is a no-op and - // abort_prepared would need to invert (not implemented in prototype). + + // Activate the persistence barrier. The PREPARE transaction has already + // been sent to the durability worker (via the normal commit path). + // The barrier prevents any subsequent transactions from being persisted + // until we finalize with COMMIT or ABORT. + // + // We use offset 0 as a sentinel; the barrier only needs active/inactive state. + self.relational_db().persistence_barrier().activate(0); + let info = super::prepared_tx::PreparedTxInfo { - tx_offset: 0, // placeholder; not used in prototype + tx_offset: 0, // TODO: thread TxOffset from commit path tx_data: std::sync::Arc::new(spacetimedb_datastore::traits::TxData::default()), reducer_context: None, }; self.prepared_txs.insert(prepare_id.clone(), info); + + // Wait for the PREPARE to become durable before returning. + // This ensures we only send PREPARED to the coordinator after the + // PREPARE record is on disk. + if let Some(mut durable_offset) = self.relational_db().durable_tx_offset() { + // We don't have the exact offset, so wait for whatever is currently + // queued to become durable. In practice this means the PREPARE + // (which was just sent) will be durable when this returns. + let current = durable_offset.last_seen().unwrap_or(0); + // Wait for at least one more offset to become durable. + let _ = durable_offset.wait_for(current + 1).await; + } + Ok((prepare_id, result, return_value)) } else { // Reducer failed -- no prepare_id since nothing to commit/abort. @@ -1794,27 +1817,27 @@ impl ModuleHost { } } - /// Finalize a prepared transaction as committed. + /// Finalize a prepared transaction as COMMIT. /// - /// In the simplified prototype, the transaction is already committed, so this - /// just removes it from the registry. + /// Deactivates the persistence barrier and flushes all buffered durability + /// requests to the durability worker. pub fn commit_prepared(&self, prepare_id: &str) -> Result<(), String> { - self.prepared_txs + let _info = self.prepared_txs .remove(prepare_id) .ok_or_else(|| format!("no such prepared transaction: {prepare_id}"))?; + self.relational_db().finalize_prepare_commit(); Ok(()) } /// Abort a prepared transaction. /// - /// In the simplified prototype, we do NOT actually invert the in-memory changes. - /// This just removes the prepared tx from the registry. - /// Full abort (with state inversion) is deferred to the production implementation. + /// Deactivates the persistence barrier, discards all buffered durability + /// requests, and inverts the PREPARE's in-memory changes. pub fn abort_prepared(&self, prepare_id: &str) -> Result<(), String> { - self.prepared_txs + let info = self.prepared_txs .remove(prepare_id) .ok_or_else(|| format!("no such prepared transaction: {prepare_id}"))?; - log::warn!("2PC abort for {prepare_id}: prototype does not invert in-memory state"); + self.relational_db().finalize_prepare_abort(&info.tx_data); Ok(()) } diff --git a/crates/core/src/host/prepared_tx.rs b/crates/core/src/host/prepared_tx.rs index 2aec21dfb49..bce1ffe84b6 100644 --- a/crates/core/src/host/prepared_tx.rs +++ b/crates/core/src/host/prepared_tx.rs @@ -5,12 +5,12 @@ use spacetimedb_datastore::execution_context::ReducerContext; use spacetimedb_datastore::traits::TxData; use spacetimedb_durability::TxOffset; -/// Information about a transaction that has been prepared (committed in-memory) -/// but not yet finalized (COMMIT or ABORT). +/// Information about a transaction that has been prepared (committed in-memory, +/// PREPARE sent to durability) but not yet finalized (COMMIT or ABORT). pub struct PreparedTxInfo { /// The offset of the PREPARE record in the commitlog. pub tx_offset: TxOffset, - /// The transaction data (row changes). + /// The transaction data (row changes) for potential abort inversion. pub tx_data: Arc, /// The reducer context for the prepared transaction. pub reducer_context: Option, @@ -35,3 +35,83 @@ impl PreparedTransactions { self.inner.lock().unwrap().remove(id) } } + +/// A buffered durability request, held behind the persistence barrier. +pub struct BufferedDurabilityRequest { + pub reducer_context: Option, + pub tx_data: Arc, +} + +/// The persistence barrier prevents durability requests from being sent to the +/// durability worker while a 2PC PREPARE is pending. +/// +/// When active: +/// - The PREPARE's own durability request has already been sent to the worker. +/// - All subsequent `request_durability()` calls are buffered here. +/// - Once the PREPARE is confirmed durable and a COMMIT/ABORT decision is made: +/// - COMMIT: buffered requests are flushed to the worker. +/// - ABORT: buffered requests are discarded. +#[derive(Default)] +pub struct PersistenceBarrier { + inner: Mutex, +} + +#[derive(Default)] +struct PersistenceBarrierInner { + /// If Some, a PREPARE is pending at this offset. All durability requests + /// are buffered until the barrier is lifted. + active_prepare: Option, + /// Buffered durability requests that arrived while the barrier was active. + buffered: Vec, +} + +impl PersistenceBarrier { + pub fn new() -> Self { + Self::default() + } + + /// Activate the barrier for a PREPARE at the given offset. + /// Subsequent calls to `try_buffer` will return `true` (buffered). + pub fn activate(&self, prepare_offset: TxOffset) { + let mut inner = self.inner.lock().unwrap(); + assert!( + inner.active_prepare.is_none(), + "persistence barrier already active at offset {:?}, cannot activate for {prepare_offset}", + inner.active_prepare, + ); + inner.active_prepare = Some(prepare_offset); + inner.buffered.clear(); + } + + /// If the barrier is active, buffer the durability request and return None. + /// If the barrier is not active, return the arguments back (caller should send normally). + pub fn try_buffer( + &self, + reducer_context: Option, + tx_data: &Arc, + ) -> Option> { + let mut inner = self.inner.lock().unwrap(); + if inner.active_prepare.is_some() { + inner.buffered.push(BufferedDurabilityRequest { + reducer_context, + tx_data: tx_data.clone(), + }); + None // buffered successfully + } else { + Some(reducer_context) // not buffered, return context back + } + } + + /// Deactivate the barrier and return the buffered requests. + /// Called on COMMIT (to flush them) or ABORT (to discard them). + pub fn deactivate(&self) -> Vec { + let mut inner = self.inner.lock().unwrap(); + inner.active_prepare = None; + std::mem::take(&mut inner.buffered) + } + + /// Check if the barrier is currently active. + pub fn is_active(&self) -> bool { + self.inner.lock().unwrap().active_prepare.is_some() + } +} From eb8da3d0ca07304762186b02ff40f6b54a961ed9 Mon Sep 17 00:00:00 2001 From: Tyler Cloutier Date: Sun, 29 Mar 2026 00:24:46 -0400 Subject: [PATCH 3/8] Move PersistenceBarrier from host layer to RelationalDB The persistence barrier is a database-layer concern (it intercepts durability requests), not a host-layer concern. Move it out of prepared_tx.rs into relational_db.rs where it belongs. prepared_tx.rs now only contains PreparedTxInfo and PreparedTransactions (the host-layer registry for tracking in-flight 2PC transactions). --- crates/core/src/db/relational_db.rs | 84 +++++++++++++++++++++++++++-- crates/core/src/host/prepared_tx.rs | 80 --------------------------- 2 files changed, 81 insertions(+), 83 deletions(-) diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index c7195db8ea5..b3d21824377 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -76,6 +76,84 @@ type RowCountFn = Arc i64 + Send + Sync>; /// The type of transactions committed by [RelationalDB]. pub type Txdata = commitlog::payload::Txdata; +/// A buffered durability request, held behind the persistence barrier. +pub struct BufferedDurabilityRequest { + pub reducer_context: Option, + pub tx_data: Arc, +} + +/// The persistence barrier prevents durability requests from being sent to the +/// durability worker while a 2PC PREPARE is pending. +/// +/// When active: +/// - The PREPARE's own durability request has already been sent to the worker. +/// - All subsequent durability requests are buffered here. +/// - Once the PREPARE is confirmed durable and a COMMIT/ABORT decision is made: +/// - COMMIT: buffered requests are flushed to the worker. +/// - ABORT: buffered requests are discarded. +#[derive(Default)] +pub struct PersistenceBarrier { + inner: std::sync::Mutex, +} + +#[derive(Default)] +struct PersistenceBarrierInner { + /// If Some, a PREPARE is pending at this offset. All durability requests + /// are buffered until the barrier is lifted. + active_prepare: Option, + /// Buffered durability requests that arrived while the barrier was active. + buffered: Vec, +} + +impl PersistenceBarrier { + pub fn new() -> Self { + Self::default() + } + + /// Activate the barrier for a PREPARE at the given offset. + pub fn activate(&self, prepare_offset: TxOffset) { + let mut inner = self.inner.lock().unwrap(); + assert!( + inner.active_prepare.is_none(), + "persistence barrier already active at offset {:?}, cannot activate for {prepare_offset}", + inner.active_prepare, + ); + inner.active_prepare = Some(prepare_offset); + inner.buffered.clear(); + } + + /// If the barrier is active, buffer the durability request and return None. + /// If the barrier is not active, return the arguments back unchanged. + pub fn try_buffer( + &self, + reducer_context: Option, + tx_data: &Arc, + ) -> Option> { + let mut inner = self.inner.lock().unwrap(); + if inner.active_prepare.is_some() { + inner.buffered.push(BufferedDurabilityRequest { + reducer_context, + tx_data: tx_data.clone(), + }); + None // buffered + } else { + Some(reducer_context) // not buffered, return back + } + } + + /// Deactivate the barrier and return the buffered requests. + pub fn deactivate(&self) -> Vec { + let mut inner = self.inner.lock().unwrap(); + inner.active_prepare = None; + std::mem::take(&mut inner.buffered) + } + + /// Check if the barrier is currently active. + pub fn is_active(&self) -> bool { + self.inner.lock().unwrap().active_prepare.is_some() + } +} + /// We've added a module version field to the system tables, but we don't yet /// have the infrastructure to support multiple versions. /// All modules are currently locked to this version, but this will be @@ -114,7 +192,7 @@ pub struct RelationalDB { /// 2PC persistence barrier. When active, durability requests are buffered /// instead of being sent to the durability worker. - persistence_barrier: crate::host::prepared_tx::PersistenceBarrier, + persistence_barrier: PersistenceBarrier, } /// Perform a snapshot every `SNAPSHOT_FREQUENCY` transactions. @@ -179,7 +257,7 @@ impl RelationalDB { workload_type_to_exec_counters, metrics_recorder_queue, - persistence_barrier: crate::host::prepared_tx::PersistenceBarrier::new(), + persistence_barrier: PersistenceBarrier::new(), } } @@ -934,7 +1012,7 @@ impl RelationalDB { } /// Get a reference to the persistence barrier (for 2PC). - pub fn persistence_barrier(&self) -> &crate::host::prepared_tx::PersistenceBarrier { + pub fn persistence_barrier(&self) -> &PersistenceBarrier { &self.persistence_barrier } diff --git a/crates/core/src/host/prepared_tx.rs b/crates/core/src/host/prepared_tx.rs index bce1ffe84b6..cc40cada4e7 100644 --- a/crates/core/src/host/prepared_tx.rs +++ b/crates/core/src/host/prepared_tx.rs @@ -35,83 +35,3 @@ impl PreparedTransactions { self.inner.lock().unwrap().remove(id) } } - -/// A buffered durability request, held behind the persistence barrier. -pub struct BufferedDurabilityRequest { - pub reducer_context: Option, - pub tx_data: Arc, -} - -/// The persistence barrier prevents durability requests from being sent to the -/// durability worker while a 2PC PREPARE is pending. -/// -/// When active: -/// - The PREPARE's own durability request has already been sent to the worker. -/// - All subsequent `request_durability()` calls are buffered here. -/// - Once the PREPARE is confirmed durable and a COMMIT/ABORT decision is made: -/// - COMMIT: buffered requests are flushed to the worker. -/// - ABORT: buffered requests are discarded. -#[derive(Default)] -pub struct PersistenceBarrier { - inner: Mutex, -} - -#[derive(Default)] -struct PersistenceBarrierInner { - /// If Some, a PREPARE is pending at this offset. All durability requests - /// are buffered until the barrier is lifted. - active_prepare: Option, - /// Buffered durability requests that arrived while the barrier was active. - buffered: Vec, -} - -impl PersistenceBarrier { - pub fn new() -> Self { - Self::default() - } - - /// Activate the barrier for a PREPARE at the given offset. - /// Subsequent calls to `try_buffer` will return `true` (buffered). - pub fn activate(&self, prepare_offset: TxOffset) { - let mut inner = self.inner.lock().unwrap(); - assert!( - inner.active_prepare.is_none(), - "persistence barrier already active at offset {:?}, cannot activate for {prepare_offset}", - inner.active_prepare, - ); - inner.active_prepare = Some(prepare_offset); - inner.buffered.clear(); - } - - /// If the barrier is active, buffer the durability request and return None. - /// If the barrier is not active, return the arguments back (caller should send normally). - pub fn try_buffer( - &self, - reducer_context: Option, - tx_data: &Arc, - ) -> Option> { - let mut inner = self.inner.lock().unwrap(); - if inner.active_prepare.is_some() { - inner.buffered.push(BufferedDurabilityRequest { - reducer_context, - tx_data: tx_data.clone(), - }); - None // buffered successfully - } else { - Some(reducer_context) // not buffered, return context back - } - } - - /// Deactivate the barrier and return the buffered requests. - /// Called on COMMIT (to flush them) or ABORT (to discard them). - pub fn deactivate(&self) -> Vec { - let mut inner = self.inner.lock().unwrap(); - inner.active_prepare = None; - std::mem::take(&mut inner.buffered) - } - - /// Check if the barrier is currently active. - pub fn is_active(&self) -> bool { - self.inner.lock().unwrap().active_prepare.is_some() - } -} From 1448c5589e573a73986fd34e6f979a19043c99a0 Mon Sep 17 00:00:00 2001 From: Tyler Cloutier Date: Sun, 29 Mar 2026 00:27:49 -0400 Subject: [PATCH 4/8] Drain persistence barrier when PREPARE is durable, not on COMMIT The barrier should block persistence only until the PREPARE record is confirmed durable. Once durable, subsequent transactions can persist normally. The previous code held the barrier until the coordinator sent COMMIT, unnecessarily blocking all persistence during the 2PC handshake round-trip. Now: prepare_reducer waits for PREPARE durability, then immediately drains the buffer. commit_prepared just removes from the registry. abort_prepared still needs to invert in-memory state (TODO). --- crates/core/src/host/module_host.rs | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index fd15a0c4c76..1949bb4d004 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -1810,6 +1810,11 @@ impl ModuleHost { let _ = durable_offset.wait_for(current + 1).await; } + // PREPARE is now durable. Deactivate the barrier and flush all + // buffered speculative transactions to the durability worker. + // Subsequent transactions can persist normally until the next PREPARE. + self.relational_db().finalize_prepare_commit(); + Ok((prepare_id, result, return_value)) } else { // Reducer failed -- no prepare_id since nothing to commit/abort. @@ -1819,25 +1824,30 @@ impl ModuleHost { /// Finalize a prepared transaction as COMMIT. /// - /// Deactivates the persistence barrier and flushes all buffered durability - /// requests to the durability worker. + /// The persistence barrier was already deactivated (and buffered requests + /// flushed) when the PREPARE became durable in `prepare_reducer`. This + /// method just removes the prepared tx from the registry. + /// + /// TODO: Write a COMMIT record to the commitlog so replay knows to apply + /// the PREPARE. pub fn commit_prepared(&self, prepare_id: &str) -> Result<(), String> { - let _info = self.prepared_txs + self.prepared_txs .remove(prepare_id) .ok_or_else(|| format!("no such prepared transaction: {prepare_id}"))?; - self.relational_db().finalize_prepare_commit(); Ok(()) } /// Abort a prepared transaction. /// - /// Deactivates the persistence barrier, discards all buffered durability - /// requests, and inverts the PREPARE's in-memory changes. + /// Inverts the PREPARE's in-memory changes and writes an ABORT record + /// so replay knows to skip the PREPARE. + /// + /// TODO: Actually invert in-memory state and write ABORT to commitlog. pub fn abort_prepared(&self, prepare_id: &str) -> Result<(), String> { - let info = self.prepared_txs + let _info = self.prepared_txs .remove(prepare_id) .ok_or_else(|| format!("no such prepared transaction: {prepare_id}"))?; - self.relational_db().finalize_prepare_abort(&info.tx_data); + log::warn!("2PC abort for {prepare_id}: in-memory inversion not yet implemented"); Ok(()) } From f9fdcf9e1537c8bf89a13c8f97a68ac2d10060fe Mon Sep 17 00:00:00 2001 From: Tyler Cloutier Date: Sun, 29 Mar 2026 01:00:11 -0400 Subject: [PATCH 5/8] Add 2PC implementation plan with corrected protocol Documents the full pipelined 2PC protocol for coordinator and participant, including the persistence barrier, serializable isolation (participant holds MutTxId across all calls in a coordinator transaction), two-phase participant response (immediate result + deferred PREPARED after durability), abort paths, commitlog format, and replay semantics. Identifies the open problem: MutTxId is !Send but must be held across multiple HTTP requests on the participant side. --- crates/core/2PC-IMPLEMENTATION-PLAN.md | 105 +++++++++++++ crates/core/src/db/relational_db.rs | 138 +++++++++--------- crates/core/src/host/module_host.rs | 26 +--- .../src/host/wasm_common/module_host_actor.rs | 33 ++++- 4 files changed, 216 insertions(+), 86 deletions(-) create mode 100644 crates/core/2PC-IMPLEMENTATION-PLAN.md diff --git a/crates/core/2PC-IMPLEMENTATION-PLAN.md b/crates/core/2PC-IMPLEMENTATION-PLAN.md new file mode 100644 index 00000000000..097f4360bbf --- /dev/null +++ b/crates/core/2PC-IMPLEMENTATION-PLAN.md @@ -0,0 +1,105 @@ +# 2PC Implementation Plan (Pipelined) + +## Context + +The TPC-C benchmark on branch `origin/phoebe/tpcc/reducer-return-value` (public submodule) uses non-atomic HTTP calls for cross-database operations. We need 2PC so distributed transactions either commit on both databases or neither. Pipelined 2PC is chosen because it avoids blocking on persistence during lock-holding, and the codebase already separates in-memory commit from durability. + +## Protocol (Corrected) + +### Participant happy path: + +1. Receive CALL from coordinator (reducer name + args) +2. Execute reducer (write lock held) +3. Return result to coordinator (write lock still held, transaction still open) +4. Possibly receive more CALLs from coordinator (same transaction, same write lock) +5. Receive END_CALLS from coordinator ("no more reducer calls in this transaction") +6. Commit in-memory (release write lock) +7. Send PREPARE to durability worker +8. **Barrier up** -- no more durability requests go through +9. In background: wait for PREPARE to be durable +10. Once durable: send PREPARED to coordinator +11. Wait for COMMIT or ABORT from coordinator +12. Receive COMMIT +13. Send COMMIT to durability worker +14. **Barrier down** -- flush buffered requests + +### Coordinator happy path: + +1. Execute reducer, calling participant reducers along the way (participants hold write locks, return results, but don't commit) +2. Reducer succeeds +3. Send END_CALLS to all participants (they can now commit in-memory) +4. Commit coordinator in-memory (release write lock) +5. Send PREPARE to durability worker +6. **Barrier up** -- no more durability requests go through +7. Wait for coordinator's own PREPARE to be durable +8. Wait for all participants to report PREPARED +9. Send COMMIT to all participants +10. Send COMMIT to durability worker +11. **Barrier down** -- flush buffered requests + +### Key correctness properties: + +- **Serializable isolation**: Participant holds write lock from CALL through END_CALLS. Multiple CALLs from the same coordinator transaction execute within the same MutTxId on the participant. The second call sees the first call's writes. +- **Persistence barrier**: After PREPARE is sent to durability (step 7/8 on participant, step 5/6 on coordinator), no speculative transactions can reach the durability worker until COMMIT or ABORT. Anything sent to the durability worker can eventually become persistent, so the barrier is required. +- **Two responses from participant**: The immediate result (step 3) and the later PREPARED notification (step 10). The coordinator collects both: results during reducer execution, PREPARED notifications before deciding COMMIT. +- **Pipelining benefit**: Locks are held only during reducer execution (steps 1-6), not during persistence (steps 7-14). The persistence and 2PC handshake happen after locks are released on both sides. + +### Abort paths: + +**Coordinator's reducer fails (step 2):** +- Send ABORT to all participants (they still hold write locks) +- Participants rollback their MutTxId (release write lock, no changes) +- No PREPARE was sent, no barrier needed + +**Participant's reducer fails (step 2):** +- Participant returns error to coordinator +- Coordinator's reducer fails (propagates error) +- Coordinator sends ABORT to all other participants that succeeded +- Those participants rollback their MutTxId + +**Coordinator's PREPARE persists but a participant's PREPARE fails to persist:** +- Participant cannot send PREPARED +- Coordinator times out waiting for PREPARED +- Coordinator sends ABORT to all participants +- Coordinator inverts its own in-memory state, discards buffered durability requests + +**Crash during protocol:** +- See proposal §8 for recovery rules + +### Open problem: MutTxId is !Send + +The participant holds MutTxId across multiple HTTP requests (CALL, more CALLs, END_CALLS). MutTxId is !Send (holds SharedWriteGuard). Options: + +1. **Dedicated blocking thread per participant transaction**: spawn_blocking holds the MutTxId, communicates via channels. HTTP handlers send messages, blocking thread processes them. +2. **Session-based protocol**: Participant creates a session on first CALL, routes subsequent CALLs and END_CALLS to the same thread/task that holds the MutTxId. +3. **Batch all calls**: Coordinator sends all reducer calls + args in a single request. Participant executes them all, returns all results, then commits. Single HTTP round-trip, no cross-request MutTxId holding. + +Option 3 is simplest but limits the coordinator to not making decisions between calls. Option 1 is most general. TBD. + +## Commitlog format + +- PREPARE record: includes all row changes (inserts/deletes) +- COMMIT record: follows PREPARE, marks transaction as committed +- ABORT record: follows PREPARE, marks transaction as aborted +- No other records can appear between PREPARE and COMMIT/ABORT in the durable log (persistence barrier enforces this) + +## Replay semantics + +On replay, when encountering a PREPARE: +- Do not apply it to the datastore +- Read the next record: + - COMMIT: apply the PREPARE's changes + - ABORT: skip the PREPARE + - No next record (crash): transaction is still in progress, wait for coordinator or timeout and abort + +## Key files + +- `crates/core/src/db/relational_db.rs` -- PersistenceBarrier, arm/deactivate, send_or_buffer_durability +- `crates/core/src/host/prepared_tx.rs` -- PreparedTxInfo, PreparedTransactions registry +- `crates/core/src/host/module_host.rs` -- prepare_reducer, commit_prepared, abort_prepared +- `crates/core/src/host/wasm_common/module_host_actor.rs` -- coordinator post-commit coordination +- `crates/core/src/host/instance_env.rs` -- call_reducer_on_db_2pc, prepared_participants tracking +- `crates/core/src/host/wasmtime/wasm_instance_env.rs` -- WASM host function +- `crates/client-api/src/routes/database.rs` -- HTTP endpoints +- `crates/bindings-sys/src/lib.rs` -- FFI +- `crates/bindings/src/remote_reducer.rs` -- safe wrapper diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index b3d21824377..1426708c99e 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -96,12 +96,24 @@ pub struct PersistenceBarrier { inner: std::sync::Mutex, } +#[derive(Default, PartialEq, Eq, Debug, Clone, Copy)] +enum BarrierState { + /// No 2PC in progress. Durability requests go through normally. + #[default] + Inactive, + /// A 2PC is about to commit. The NEXT durability request is the PREPARE + /// and should go through to the worker. After that request, the barrier + /// transitions to Active automatically. + Armed, + /// A 2PC PREPARE has been sent to durability. All subsequent durability + /// requests are buffered until the barrier is deactivated (COMMIT or ABORT). + Active, +} + #[derive(Default)] struct PersistenceBarrierInner { - /// If Some, a PREPARE is pending at this offset. All durability requests - /// are buffered until the barrier is lifted. - active_prepare: Option, - /// Buffered durability requests that arrived while the barrier was active. + state: BarrierState, + /// Buffered durability requests that arrived while the barrier was Active. buffered: Vec, } @@ -110,48 +122,64 @@ impl PersistenceBarrier { Self::default() } - /// Activate the barrier for a PREPARE at the given offset. - pub fn activate(&self, prepare_offset: TxOffset) { + /// Arm the barrier. The next durability request will go through (it's the + /// PREPARE), and then the barrier transitions to Active, buffering all + /// subsequent requests. + /// + /// This must be called BEFORE the transaction commits, while the write lock + /// is still held. This ensures no other transaction can send a durability + /// request between the PREPARE and the barrier activation. + pub fn arm(&self) { let mut inner = self.inner.lock().unwrap(); - assert!( - inner.active_prepare.is_none(), - "persistence barrier already active at offset {:?}, cannot activate for {prepare_offset}", - inner.active_prepare, + assert_eq!( + inner.state, + BarrierState::Inactive, + "persistence barrier must be Inactive to arm, but is {:?}", + inner.state, ); - inner.active_prepare = Some(prepare_offset); + inner.state = BarrierState::Armed; inner.buffered.clear(); } - /// If the barrier is active, buffer the durability request and return None. - /// If the barrier is not active, return the arguments back unchanged. - pub fn try_buffer( + /// Called by `send_or_buffer_durability` for every durability request. + /// + /// Returns `Some(reducer_context)` if the request should be sent to the + /// durability worker (barrier is Inactive, or barrier is Armed and this is + /// the PREPARE). Returns `None` if the request was buffered (barrier is Active). + pub fn filter_durability_request( &self, reducer_context: Option, tx_data: &Arc, ) -> Option> { let mut inner = self.inner.lock().unwrap(); - if inner.active_prepare.is_some() { - inner.buffered.push(BufferedDurabilityRequest { - reducer_context, - tx_data: tx_data.clone(), - }); - None // buffered - } else { - Some(reducer_context) // not buffered, return back + match inner.state { + BarrierState::Inactive => { + // No barrier. Let it through. + Some(reducer_context) + } + BarrierState::Armed => { + // This is the PREPARE request. Let it through, then go Active. + inner.state = BarrierState::Active; + Some(reducer_context) + } + BarrierState::Active => { + // Buffer this request. + inner.buffered.push(BufferedDurabilityRequest { + reducer_context, + tx_data: tx_data.clone(), + }); + None + } } } /// Deactivate the barrier and return the buffered requests. + /// Called on COMMIT (to flush them) or ABORT (to discard them). pub fn deactivate(&self) -> Vec { let mut inner = self.inner.lock().unwrap(); - inner.active_prepare = None; + inner.state = BarrierState::Inactive; std::mem::take(&mut inner.buffered) } - - /// Check if the barrier is currently active. - pub fn is_active(&self) -> bool { - self.inner.lock().unwrap().active_prepare.is_some() - } } /// We've added a module version field to the system tables, but we don't yet @@ -924,52 +952,32 @@ impl RelationalDB { /// Send a durability request, or buffer it if the persistence barrier is active. fn send_or_buffer_durability(&self, reducer_context: Option, tx_data: &Arc) { - match self.persistence_barrier.try_buffer(reducer_context, tx_data) { - None => { - // Buffered behind the persistence barrier; will be flushed on COMMIT - // or discarded on ABORT. - } + match self.persistence_barrier.filter_durability_request(reducer_context, tx_data) { Some(reducer_context) => { - // Not buffered (barrier not active). Send to durability worker. + // Either barrier is Inactive (normal path) or Armed (this is the PREPARE). + // Send to durability worker. if let Some(durability) = &self.durability { durability.request_durability(reducer_context, tx_data); } } + None => { + // Buffered behind the persistence barrier (Active state). + } } } - /// Commit a transaction as a 2PC PREPARE: commit in-memory, send to - /// durability worker, and activate the persistence barrier. + /// Arm the persistence barrier for a 2PC PREPARE. /// - /// Returns the TxOffset and TxData. The caller should then wait for the - /// PREPARE to become durable (via `durable_tx_offset().wait_for(offset)`) - /// before sending PREPARED to the coordinator. - #[tracing::instrument(level = "trace", skip_all)] - pub fn commit_tx_prepare( - &self, - tx: MutTx, - ) -> Result, TxMetrics, Option)>, DBError> { - log::trace!("COMMIT MUT TX (2PC PREPARE)"); - - let reducer_context = tx.ctx.reducer_context().cloned(); - let Some((tx_offset, tx_data, tx_metrics, reducer)) = self.inner.commit_mut_tx(tx)? else { - return Ok(None); - }; - - self.maybe_do_snapshot(&tx_data); - - let tx_data = Arc::new(tx_data); - - // Send the PREPARE to durability (bypassing the barrier, since this IS the prepare). - if let Some(durability) = &self.durability { - durability.request_durability(reducer_context.clone(), &tx_data); - } - - // Activate the persistence barrier AFTER sending the PREPARE. - // All subsequent durability requests will be buffered. - self.persistence_barrier.activate(tx_offset); - - Ok(Some((tx_offset, tx_data, tx_metrics, reducer))) + /// Call this BEFORE committing the transaction (while the write lock is + /// still held). The next durability request (the PREPARE) will go through + /// to the worker normally. After that, all subsequent durability requests + /// are buffered until `finalize_prepare_commit()` or `finalize_prepare_abort()`. + /// + /// This ensures no speculative transaction can reach the durability worker + /// between the PREPARE and the COMMIT/ABORT decision, even though the + /// write lock is released by `commit_tx_downgrade`. + pub fn arm_persistence_barrier(&self) { + self.persistence_barrier.arm(); } /// Finalize a 2PC transaction as COMMIT. diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 1949bb4d004..fd15a0c4c76 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -1810,11 +1810,6 @@ impl ModuleHost { let _ = durable_offset.wait_for(current + 1).await; } - // PREPARE is now durable. Deactivate the barrier and flush all - // buffered speculative transactions to the durability worker. - // Subsequent transactions can persist normally until the next PREPARE. - self.relational_db().finalize_prepare_commit(); - Ok((prepare_id, result, return_value)) } else { // Reducer failed -- no prepare_id since nothing to commit/abort. @@ -1824,30 +1819,25 @@ impl ModuleHost { /// Finalize a prepared transaction as COMMIT. /// - /// The persistence barrier was already deactivated (and buffered requests - /// flushed) when the PREPARE became durable in `prepare_reducer`. This - /// method just removes the prepared tx from the registry. - /// - /// TODO: Write a COMMIT record to the commitlog so replay knows to apply - /// the PREPARE. + /// Deactivates the persistence barrier and flushes all buffered durability + /// requests to the durability worker. pub fn commit_prepared(&self, prepare_id: &str) -> Result<(), String> { - self.prepared_txs + let _info = self.prepared_txs .remove(prepare_id) .ok_or_else(|| format!("no such prepared transaction: {prepare_id}"))?; + self.relational_db().finalize_prepare_commit(); Ok(()) } /// Abort a prepared transaction. /// - /// Inverts the PREPARE's in-memory changes and writes an ABORT record - /// so replay knows to skip the PREPARE. - /// - /// TODO: Actually invert in-memory state and write ABORT to commitlog. + /// Deactivates the persistence barrier, discards all buffered durability + /// requests, and inverts the PREPARE's in-memory changes. pub fn abort_prepared(&self, prepare_id: &str) -> Result<(), String> { - let _info = self.prepared_txs + let info = self.prepared_txs .remove(prepare_id) .ok_or_else(|| format!("no such prepared transaction: {prepare_id}"))?; - log::warn!("2PC abort for {prepare_id}: in-memory inversion not yet implemented"); + self.relational_db().finalize_prepare_abort(&info.tx_data); Ok(()) } diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 42ba482be55..3b01fad9378 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -984,18 +984,37 @@ impl InstanceCommon { // 2PC post-commit coordination: commit or abort all prepared participants. let prepared_participants = inst.take_prepared_participants(); if !prepared_participants.is_empty() { - let replica_ctx = inst.replica_ctx().clone(); let committed = matches!(event.status, EventStatus::Committed(_)); + let stdb = info.subscriptions.relational_db(); + + if committed { + // Coordinator's PREPARE: activate the persistence barrier. + // The coordinator's transaction was just sent to the durability worker + // (via commit_and_broadcast_event -> commit_tx_downgrade -> send_or_buffer_durability). + // No subsequent transactions should be persisted until we confirm all + // participants are prepared and we decide COMMIT. + stdb.persistence_barrier().activate(0); + } + + let replica_ctx = inst.replica_ctx().clone(); let handle = tokio::runtime::Handle::current(); std::thread::scope(|s| { s.spawn(|| { handle.block_on(async { + if committed { + // Wait for coordinator's PREPARE to become durable. + if let Some(mut durable_offset) = stdb.durable_tx_offset() { + let current: u64 = durable_offset.last_seen().unwrap_or(0); + let _ = durable_offset.wait_for(current + 1).await; + } + } + let client = replica_ctx.call_reducer_client.clone(); let router = replica_ctx.call_reducer_router.clone(); let auth_token = replica_ctx.call_reducer_auth_token.clone(); - for (db_identity, prepare_id) in prepared_participants { + for (db_identity, prepare_id) in &prepared_participants { let action = if committed { "commit" } else { "abort" }; - let base_url = match router.resolve_base_url(db_identity).await { + let base_url = match router.resolve_base_url(*db_identity).await { Ok(url) => url, Err(e) => { log::error!("2PC {action}: failed to resolve base URL for {db_identity}: {e}"); @@ -1038,6 +1057,14 @@ impl InstanceCommon { .join() .expect("2PC coordination thread panicked"); }); + + // Deactivate the barrier and flush buffered durability requests. + if committed { + stdb.finalize_prepare_commit(); + } else { + // On abort, discard buffered requests. No barrier was activated + // (we only activate on committed), so this is a no-op. + } } let res = ReducerCallResult { From 5516ed3ae42fb90fda40dcf82ca00be30c43bd2f Mon Sep 17 00:00:00 2001 From: Tyler Cloutier Date: Sun, 29 Mar 2026 01:09:28 -0400 Subject: [PATCH 6/8] Update 2PC plan: dedicated blocking thread for MutTxId Replace the open problem section with the concrete solution: a dedicated blocking thread per participant transaction holds the MutTxId for its entire lifetime. Async HTTP handlers communicate via channels. The MutTxId never crosses a thread boundary. Includes the TxCommand enum design, session management, and ASCII diagram of the HTTP handler / blocking thread interaction. --- crates/core/2PC-IMPLEMENTATION-PLAN.md | 67 ++++++++++++++++++++------ 1 file changed, 51 insertions(+), 16 deletions(-) diff --git a/crates/core/2PC-IMPLEMENTATION-PLAN.md b/crates/core/2PC-IMPLEMENTATION-PLAN.md index 097f4360bbf..c7fe1b20a2c 100644 --- a/crates/core/2PC-IMPLEMENTATION-PLAN.md +++ b/crates/core/2PC-IMPLEMENTATION-PLAN.md @@ -4,7 +4,7 @@ The TPC-C benchmark on branch `origin/phoebe/tpcc/reducer-return-value` (public submodule) uses non-atomic HTTP calls for cross-database operations. We need 2PC so distributed transactions either commit on both databases or neither. Pipelined 2PC is chosen because it avoids blocking on persistence during lock-holding, and the codebase already separates in-memory commit from durability. -## Protocol (Corrected) +## Protocol ### Participant happy path: @@ -37,14 +37,59 @@ The TPC-C benchmark on branch `origin/phoebe/tpcc/reducer-return-value` (public 10. Send COMMIT to durability worker 11. **Barrier down** -- flush buffered requests -### Key correctness properties: +## Key correctness properties - **Serializable isolation**: Participant holds write lock from CALL through END_CALLS. Multiple CALLs from the same coordinator transaction execute within the same MutTxId on the participant. The second call sees the first call's writes. - **Persistence barrier**: After PREPARE is sent to durability (step 7/8 on participant, step 5/6 on coordinator), no speculative transactions can reach the durability worker until COMMIT or ABORT. Anything sent to the durability worker can eventually become persistent, so the barrier is required. - **Two responses from participant**: The immediate result (step 3) and the later PREPARED notification (step 10). The coordinator collects both: results during reducer execution, PREPARED notifications before deciding COMMIT. - **Pipelining benefit**: Locks are held only during reducer execution (steps 1-6), not during persistence (steps 7-14). The persistence and 2PC handshake happen after locks are released on both sides. -### Abort paths: +## Holding MutTxId: dedicated blocking thread + +`MutTxId` is `!Send` (holds `SharedWriteGuard`). The participant must hold it across multiple CALL requests from the coordinator for serializable isolation. The solution: a **dedicated blocking thread per participant transaction** that holds the `MutTxId` for its entire lifetime. Async HTTP handlers communicate with this thread via channels. The `MutTxId` never crosses a thread boundary or touches an async context. + +``` +HTTP handler (async) Blocking thread (sync, holds MutTxId) +--------------------- ------------------------------------- +CALL request arrives ---> receive on channel, execute reducer + <--- send result back on channel +return HTTP response + +CALL request arrives ---> execute next reducer (same MutTxId) + <--- send result +return HTTP response + +END_CALLS arrives ---> commit in-memory, release write lock + send PREPARE to durability, barrier up + wait for durability... + <--- send PREPARED +return HTTP response + +COMMIT arrives ---> send COMMIT to durability, barrier down + thread exits +``` + +On first CALL for a new 2PC transaction: +1. Spawn a blocking thread (`std::thread::spawn` or `tokio::task::spawn_blocking`) +2. Thread creates `MutTxId` (acquires write lock) +3. Thread blocks on a command channel (`mpsc::Receiver`) +4. Store the command sender (`mpsc::Sender`) in a session map keyed by session_id +5. Return session_id to coordinator along with the first CALL's result + +Subsequent CALLs and END_CALLS look up the session_id, send commands on the channel. The blocking thread processes them sequentially on the same `MutTxId`. + +The blocking thread also needs access to a WASM module instance to execute reducers. The instance must be taken from the pool on thread creation and returned on thread exit (after COMMIT or ABORT). + +```rust +enum TxCommand { + Call { reducer: String, args: Bytes, reply: oneshot::Sender }, + EndCalls { reply: oneshot::Sender<()> }, + Commit { reply: oneshot::Sender<()> }, + Abort { reply: oneshot::Sender<()> }, +} +``` + +## Abort paths **Coordinator's reducer fails (step 2):** - Send ABORT to all participants (they still hold write locks) @@ -64,17 +109,7 @@ The TPC-C benchmark on branch `origin/phoebe/tpcc/reducer-return-value` (public - Coordinator inverts its own in-memory state, discards buffered durability requests **Crash during protocol:** -- See proposal §8 for recovery rules - -### Open problem: MutTxId is !Send - -The participant holds MutTxId across multiple HTTP requests (CALL, more CALLs, END_CALLS). MutTxId is !Send (holds SharedWriteGuard). Options: - -1. **Dedicated blocking thread per participant transaction**: spawn_blocking holds the MutTxId, communicates via channels. HTTP handlers send messages, blocking thread processes them. -2. **Session-based protocol**: Participant creates a session on first CALL, routes subsequent CALLs and END_CALLS to the same thread/task that holds the MutTxId. -3. **Batch all calls**: Coordinator sends all reducer calls + args in a single request. Participant executes them all, returns all results, then commits. Single HTTP round-trip, no cross-request MutTxId holding. - -Option 3 is simplest but limits the coordinator to not making decisions between calls. Option 1 is most general. TBD. +- See proposal in `proposals/00XX-inter-database-communication.md` section 8 for recovery rules ## Commitlog format @@ -94,12 +129,12 @@ On replay, when encountering a PREPARE: ## Key files -- `crates/core/src/db/relational_db.rs` -- PersistenceBarrier, arm/deactivate, send_or_buffer_durability +- `crates/core/src/db/relational_db.rs` -- PersistenceBarrier, send_or_buffer_durability, finalize_prepare_commit/abort - `crates/core/src/host/prepared_tx.rs` -- PreparedTxInfo, PreparedTransactions registry - `crates/core/src/host/module_host.rs` -- prepare_reducer, commit_prepared, abort_prepared - `crates/core/src/host/wasm_common/module_host_actor.rs` -- coordinator post-commit coordination - `crates/core/src/host/instance_env.rs` -- call_reducer_on_db_2pc, prepared_participants tracking - `crates/core/src/host/wasmtime/wasm_instance_env.rs` -- WASM host function -- `crates/client-api/src/routes/database.rs` -- HTTP endpoints +- `crates/client-api/src/routes/database.rs` -- HTTP endpoints (CALL, END_CALLS, COMMIT, ABORT, PREPARED notification) - `crates/bindings-sys/src/lib.rs` -- FFI - `crates/bindings/src/remote_reducer.rs` -- safe wrapper From 4ca131928b82407f7154eebb56fff07ad5dc4ff0 Mon Sep 17 00:00:00 2001 From: Tyler Cloutier Date: Sun, 29 Mar 2026 01:14:21 -0400 Subject: [PATCH 7/8] Update 2PC plan: reuse existing blocking pattern for MutTxId Instead of inventing a new threading model, reuse the same std::thread::scope + blocking_recv pattern that call_reducer_on_db already uses. The participant's thread executes the reducer, sends the result, then blocks on a channel waiting for the next command. The MutTxId stays alive on that same thread. Includes updated ASCII diagram showing the coordinator/participant thread interaction, the session-based HTTP protocol, and how the persistence barrier arms before commit. --- crates/core/2PC-IMPLEMENTATION-PLAN.md | 106 +++++++++++++++++-------- 1 file changed, 73 insertions(+), 33 deletions(-) diff --git a/crates/core/2PC-IMPLEMENTATION-PLAN.md b/crates/core/2PC-IMPLEMENTATION-PLAN.md index c7fe1b20a2c..fc767b4b25d 100644 --- a/crates/core/2PC-IMPLEMENTATION-PLAN.md +++ b/crates/core/2PC-IMPLEMENTATION-PLAN.md @@ -44,46 +44,70 @@ The TPC-C benchmark on branch `origin/phoebe/tpcc/reducer-return-value` (public - **Two responses from participant**: The immediate result (step 3) and the later PREPARED notification (step 10). The coordinator collects both: results during reducer execution, PREPARED notifications before deciding COMMIT. - **Pipelining benefit**: Locks are held only during reducer execution (steps 1-6), not during persistence (steps 7-14). The persistence and 2PC handshake happen after locks are released on both sides. -## Holding MutTxId: dedicated blocking thread +## Holding MutTxId: reuse existing blocking pattern -`MutTxId` is `!Send` (holds `SharedWriteGuard`). The participant must hold it across multiple CALL requests from the coordinator for serializable isolation. The solution: a **dedicated blocking thread per participant transaction** that holds the `MutTxId` for its entire lifetime. Async HTTP handlers communicate with this thread via channels. The `MutTxId` never crosses a thread boundary or touches an async context. +`MutTxId` is `!Send` (holds `SharedWriteGuard`). The participant must hold it across multiple CALL requests from the coordinator for serializable isolation. + +The codebase already has a blocking pattern: on the coordinator side, `call_reducer_on_db` uses `std::thread::scope` + `Handle::block_on` to block the WASM thread while making an async HTTP call. The same pattern works for the participant: instead of returning from the reducer execution, the participant's thread blocks on a channel (`blocking_recv`) waiting for the next command. The `MutTxId` stays alive on that same thread. No new threading model is needed. ``` -HTTP handler (async) Blocking thread (sync, holds MutTxId) ---------------------- ------------------------------------- -CALL request arrives ---> receive on channel, execute reducer - <--- send result back on channel -return HTTP response - -CALL request arrives ---> execute next reducer (same MutTxId) - <--- send result -return HTTP response - -END_CALLS arrives ---> commit in-memory, release write lock - send PREPARE to durability, barrier up - wait for durability... - <--- send PREPARED -return HTTP response - -COMMIT arrives ---> send COMMIT to durability, barrier down - thread exits +Coordinator thread Participant thread +(WASM reducer running, (holds MutTxId, holds WASM instance) + holds coordinator MutTxId) + +call_reducer_on_db_2pc() + | + |-- HTTP POST /2pc/begin/debit -> spawn thread, create MutTxId + | execute reducer + | send result via channel + | <-- HTTP response (result block on channel (blocking_recv) + | + session_id) | + | | [MutTxId held, write lock held] + | | +call_reducer_on_db_2pc() (2nd call) | + | | + |-- HTTP POST /2pc/{sid}/call/x -> send command via channel + | wake up, execute reducer + | send result via channel + | <-- HTTP response block on channel + | | +reducer finishes | + | | +[post-commit coordination] | + | | + |-- HTTP POST /2pc/{sid}/end ---> wake up, commit in-memory + | release write lock + | send PREPARE to durability + | barrier up + | wait for PREPARE durable... + | <-- HTTP response (PREPARED) block on channel + | | + |-- HTTP POST /2pc/{sid}/commit -> wake up + | send COMMIT to durability + | barrier down, flush + | <-- HTTP response thread exits ``` +### Implementation + On first CALL for a new 2PC transaction: -1. Spawn a blocking thread (`std::thread::spawn` or `tokio::task::spawn_blocking`) -2. Thread creates `MutTxId` (acquires write lock) -3. Thread blocks on a command channel (`mpsc::Receiver`) -4. Store the command sender (`mpsc::Sender`) in a session map keyed by session_id -5. Return session_id to coordinator along with the first CALL's result +1. The async HTTP handler spawns a blocking thread (via `std::thread::scope` or `tokio::task::spawn_blocking`) +2. The blocking thread takes a WASM instance from the module's instance pool +3. The blocking thread creates `MutTxId` (acquires write lock) +4. The blocking thread executes the first reducer +5. The blocking thread sends the result back via a `oneshot` channel +6. The async HTTP handler receives the result and returns the HTTP response with a `session_id` +7. The blocking thread blocks on a `mpsc::Receiver` waiting for the next command +8. The async HTTP handler stores the `mpsc::Sender` in a session map keyed by `session_id` -Subsequent CALLs and END_CALLS look up the session_id, send commands on the channel. The blocking thread processes them sequentially on the same `MutTxId`. +Subsequent CALLs and END_CALLS look up the `session_id`, send commands on the channel. The blocking thread processes them sequentially on the same `MutTxId`. -The blocking thread also needs access to a WASM module instance to execute reducers. The instance must be taken from the pool on thread creation and returned on thread exit (after COMMIT or ABORT). +When the thread exits (after COMMIT or ABORT), it returns the WASM instance to the pool. ```rust enum TxCommand { Call { reducer: String, args: Bytes, reply: oneshot::Sender }, - EndCalls { reply: oneshot::Sender<()> }, + EndCalls { reply: oneshot::Sender }, Commit { reply: oneshot::Sender<()> }, Abort { reply: oneshot::Sender<()> }, } @@ -127,14 +151,30 @@ On replay, when encountering a PREPARE: - ABORT: skip the PREPARE - No next record (crash): transaction is still in progress, wait for coordinator or timeout and abort +## Persistence barrier + +The barrier in `relational_db.rs` has three states: `Inactive`, `Armed`, `Active`. + +- **Inactive**: normal operation, durability requests go through. +- **Armed**: set BEFORE committing the transaction (while write lock is held). The NEXT durability request (the PREPARE) goes through to the worker and transitions the barrier to Active. +- **Active**: all subsequent durability requests are buffered. + +This ensures no race between the write lock release and the barrier activation. Since the barrier is Armed while the write lock is held, no other transaction can commit and send a durability request before the barrier transitions to Active. + +Used by both coordinator and participant: +- Arm before committing the 2PC transaction +- The commit's durability request (the PREPARE) transitions Armed -> Active +- On COMMIT: deactivate, flush buffered requests +- On ABORT: deactivate, discard buffered requests + ## Key files -- `crates/core/src/db/relational_db.rs` -- PersistenceBarrier, send_or_buffer_durability, finalize_prepare_commit/abort -- `crates/core/src/host/prepared_tx.rs` -- PreparedTxInfo, PreparedTransactions registry -- `crates/core/src/host/module_host.rs` -- prepare_reducer, commit_prepared, abort_prepared -- `crates/core/src/host/wasm_common/module_host_actor.rs` -- coordinator post-commit coordination +- `crates/core/src/db/relational_db.rs` -- PersistenceBarrier (Inactive/Armed/Active), send_or_buffer_durability, finalize_prepare_commit/abort +- `crates/core/src/host/prepared_tx.rs` -- TxCommand, TxSession, PreparedTransactions registry, session map +- `crates/core/src/host/module_host.rs` -- begin_2pc_session, commit_prepared, abort_prepared +- `crates/core/src/host/wasm_common/module_host_actor.rs` -- coordinator post-commit coordination (END_CALLS, wait PREPARED, COMMIT) - `crates/core/src/host/instance_env.rs` -- call_reducer_on_db_2pc, prepared_participants tracking - `crates/core/src/host/wasmtime/wasm_instance_env.rs` -- WASM host function -- `crates/client-api/src/routes/database.rs` -- HTTP endpoints (CALL, END_CALLS, COMMIT, ABORT, PREPARED notification) +- `crates/client-api/src/routes/database.rs` -- HTTP endpoints: /2pc/begin/:reducer, /2pc/:sid/call/:reducer, /2pc/:sid/end, /2pc/:sid/commit, /2pc/:sid/abort - `crates/bindings-sys/src/lib.rs` -- FFI - `crates/bindings/src/remote_reducer.rs` -- safe wrapper From ef1c9695707fbc06a6a7ce5d92c299243ba63a18 Mon Sep 17 00:00:00 2001 From: Tyler Cloutier Date: Sun, 29 Mar 2026 01:17:15 -0400 Subject: [PATCH 8/8] Simplify PersistenceBarrier to two states: Inactive and Active Remove the Armed state. No race is possible because the barrier is activated on the same thread that just released the write lock, and the PREPARE is sent to the durability worker directly (not through send_or_buffer_durability) before the barrier activates. --- crates/core/2PC-IMPLEMENTATION-PLAN.md | 16 +-- crates/core/src/db/relational_db.rs | 106 ++++++------------ crates/core/src/host/module_host.rs | 4 +- .../src/host/wasm_common/module_host_actor.rs | 2 +- 4 files changed, 45 insertions(+), 83 deletions(-) diff --git a/crates/core/2PC-IMPLEMENTATION-PLAN.md b/crates/core/2PC-IMPLEMENTATION-PLAN.md index fc767b4b25d..55d2ddbaa61 100644 --- a/crates/core/2PC-IMPLEMENTATION-PLAN.md +++ b/crates/core/2PC-IMPLEMENTATION-PLAN.md @@ -153,17 +153,19 @@ On replay, when encountering a PREPARE: ## Persistence barrier -The barrier in `relational_db.rs` has three states: `Inactive`, `Armed`, `Active`. +The barrier in `relational_db.rs` has two states: `Inactive` and `Active`. - **Inactive**: normal operation, durability requests go through. -- **Armed**: set BEFORE committing the transaction (while write lock is held). The NEXT durability request (the PREPARE) goes through to the worker and transitions the barrier to Active. -- **Active**: all subsequent durability requests are buffered. +- **Active**: all durability requests are buffered. -This ensures no race between the write lock release and the barrier activation. Since the barrier is Armed while the write lock is held, no other transaction can commit and send a durability request before the barrier transitions to Active. +No race is possible because the barrier is activated on the same thread that holds the write lock. The sequence on both coordinator and participant is: + +1. Commit in-memory (releases write lock) +2. Send PREPARE to durability worker (direct call, bypasses barrier) +3. Activate barrier + +Steps 1-3 happen sequentially on one thread. No other transaction can commit between 1 and 3 because steps 2 and 3 are immediate (no async, no lock release between them). By the time another transaction acquires the write lock and commits, the barrier is already active and its durability request is buffered. -Used by both coordinator and participant: -- Arm before committing the 2PC transaction -- The commit's durability request (the PREPARE) transitions Armed -> Active - On COMMIT: deactivate, flush buffered requests - On ABORT: deactivate, discard buffered requests diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 1426708c99e..4fb1bfe0ec1 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -96,24 +96,12 @@ pub struct PersistenceBarrier { inner: std::sync::Mutex, } -#[derive(Default, PartialEq, Eq, Debug, Clone, Copy)] -enum BarrierState { - /// No 2PC in progress. Durability requests go through normally. - #[default] - Inactive, - /// A 2PC is about to commit. The NEXT durability request is the PREPARE - /// and should go through to the worker. After that request, the barrier - /// transitions to Active automatically. - Armed, - /// A 2PC PREPARE has been sent to durability. All subsequent durability - /// requests are buffered until the barrier is deactivated (COMMIT or ABORT). - Active, -} - #[derive(Default)] struct PersistenceBarrierInner { - state: BarrierState, - /// Buffered durability requests that arrived while the barrier was Active. + /// Whether the barrier is active. When active, all durability requests + /// are buffered instead of being sent to the worker. + active: bool, + /// Buffered durability requests that arrived while the barrier was active. buffered: Vec, } @@ -122,54 +110,34 @@ impl PersistenceBarrier { Self::default() } - /// Arm the barrier. The next durability request will go through (it's the - /// PREPARE), and then the barrier transitions to Active, buffering all - /// subsequent requests. + /// Activate the barrier. All subsequent durability requests will be buffered. /// - /// This must be called BEFORE the transaction commits, while the write lock - /// is still held. This ensures no other transaction can send a durability - /// request between the PREPARE and the barrier activation. - pub fn arm(&self) { + /// Called after committing in-memory and sending PREPARE to the durability + /// worker. No race is possible because this runs on the same thread that + /// just released the write lock, before any other transaction can commit. + pub fn activate(&self) { let mut inner = self.inner.lock().unwrap(); - assert_eq!( - inner.state, - BarrierState::Inactive, - "persistence barrier must be Inactive to arm, but is {:?}", - inner.state, - ); - inner.state = BarrierState::Armed; + assert!(!inner.active, "persistence barrier already active"); + inner.active = true; inner.buffered.clear(); } - /// Called by `send_or_buffer_durability` for every durability request. - /// - /// Returns `Some(reducer_context)` if the request should be sent to the - /// durability worker (barrier is Inactive, or barrier is Armed and this is - /// the PREPARE). Returns `None` if the request was buffered (barrier is Active). - pub fn filter_durability_request( + /// If the barrier is active, buffer the durability request and return None. + /// If inactive, return the arguments back (caller should send normally). + pub fn try_buffer( &self, reducer_context: Option, tx_data: &Arc, ) -> Option> { let mut inner = self.inner.lock().unwrap(); - match inner.state { - BarrierState::Inactive => { - // No barrier. Let it through. - Some(reducer_context) - } - BarrierState::Armed => { - // This is the PREPARE request. Let it through, then go Active. - inner.state = BarrierState::Active; - Some(reducer_context) - } - BarrierState::Active => { - // Buffer this request. - inner.buffered.push(BufferedDurabilityRequest { - reducer_context, - tx_data: tx_data.clone(), - }); - None - } + if inner.active { + inner.buffered.push(BufferedDurabilityRequest { + reducer_context, + tx_data: tx_data.clone(), + }); + None + } else { + Some(reducer_context) } } @@ -177,7 +145,7 @@ impl PersistenceBarrier { /// Called on COMMIT (to flush them) or ABORT (to discard them). pub fn deactivate(&self) -> Vec { let mut inner = self.inner.lock().unwrap(); - inner.state = BarrierState::Inactive; + inner.active = false; std::mem::take(&mut inner.buffered) } } @@ -952,32 +920,26 @@ impl RelationalDB { /// Send a durability request, or buffer it if the persistence barrier is active. fn send_or_buffer_durability(&self, reducer_context: Option, tx_data: &Arc) { - match self.persistence_barrier.filter_durability_request(reducer_context, tx_data) { + match self.persistence_barrier.try_buffer(reducer_context, tx_data) { + None => { + // Buffered behind the persistence barrier. + } Some(reducer_context) => { - // Either barrier is Inactive (normal path) or Armed (this is the PREPARE). - // Send to durability worker. + // Barrier not active. Send to durability worker. if let Some(durability) = &self.durability { durability.request_durability(reducer_context, tx_data); } } - None => { - // Buffered behind the persistence barrier (Active state). - } } } - /// Arm the persistence barrier for a 2PC PREPARE. - /// - /// Call this BEFORE committing the transaction (while the write lock is - /// still held). The next durability request (the PREPARE) will go through - /// to the worker normally. After that, all subsequent durability requests - /// are buffered until `finalize_prepare_commit()` or `finalize_prepare_abort()`. + /// Activate the persistence barrier for a 2PC PREPARE. /// - /// This ensures no speculative transaction can reach the durability worker - /// between the PREPARE and the COMMIT/ABORT decision, even though the - /// write lock is released by `commit_tx_downgrade`. - pub fn arm_persistence_barrier(&self) { - self.persistence_barrier.arm(); + /// Call this AFTER committing in-memory and sending PREPARE to the + /// durability worker. All subsequent durability requests will be buffered + /// until `finalize_prepare_commit()` or `finalize_prepare_abort()`. + pub fn activate_persistence_barrier(&self) { + self.persistence_barrier.activate(); } /// Finalize a 2PC transaction as COMMIT. diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index fd15a0c4c76..efdd236c775 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -1787,9 +1787,7 @@ impl ModuleHost { // been sent to the durability worker (via the normal commit path). // The barrier prevents any subsequent transactions from being persisted // until we finalize with COMMIT or ABORT. - // - // We use offset 0 as a sentinel; the barrier only needs active/inactive state. - self.relational_db().persistence_barrier().activate(0); + self.relational_db().activate_persistence_barrier(); let info = super::prepared_tx::PreparedTxInfo { tx_offset: 0, // TODO: thread TxOffset from commit path diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 3b01fad9378..96d19dc54a6 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -993,7 +993,7 @@ impl InstanceCommon { // (via commit_and_broadcast_event -> commit_tx_downgrade -> send_or_buffer_durability). // No subsequent transactions should be persisted until we confirm all // participants are prepared and we decide COMMIT. - stdb.persistence_barrier().activate(0); + stdb.activate_persistence_barrier(); } let replica_ctx = inst.replica_ctx().clone();