diff --git a/Cargo.toml b/Cargo.toml index a4604484441..2e8daf523d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -159,7 +159,7 @@ anymap = "0.12" arrayvec = "0.7.2" async-stream = "0.3.6" async-trait = "0.1.68" -axum = { version = "0.7", features = ["tracing"] } +axum = { version = "0.7", features = ["tracing", "http2"] } axum-extra = { version = "0.9", features = ["typed-header"] } backtrace = "0.3.66" base64 = "0.21.2" diff --git a/crates/bindings-sys/src/lib.rs b/crates/bindings-sys/src/lib.rs index 8854ae393b6..927c444a38d 100644 --- a/crates/bindings-sys/src/lib.rs +++ b/crates/bindings-sys/src/lib.rs @@ -896,6 +896,23 @@ pub mod raw { args_len: u32, out: *mut BytesSource, ) -> u16; + + /// 2PC variant of `call_reducer_on_db`. + /// + /// Calls the target database's `/prepare/{reducer}` endpoint instead of `/call/{reducer}`. + /// On success, the runtime stores the `prepare_id` internally. + /// After the coordinator's reducer commits, all participants are committed. + /// If the coordinator's reducer fails, all participants are aborted. + /// + /// Returns and errors are identical to `call_reducer_on_db`. + pub fn call_reducer_on_db_2pc( + identity_ptr: *const u8, // exactly 32 bytes, BSATN-encoded Identity + reducer_ptr: *const u8, + reducer_len: u32, + args_ptr: *const u8, + args_len: u32, + out: *mut BytesSource, + ) -> u16; } /// What strategy does the database index use? @@ -1510,6 +1527,37 @@ pub fn call_reducer_on_db( } } +/// 2PC variant of [`call_reducer_on_db`]. +/// +/// Calls `/prepare/{reducer}` on the target database. On success, the runtime +/// stores the prepare_id internally. After the coordinator's reducer commits, +/// all participants are committed. On failure, all participants are aborted. +/// +/// Returns and errors are identical to [`call_reducer_on_db`]. +#[inline] +pub fn call_reducer_on_db_2pc( + identity: [u8; 32], + reducer_name: &str, + args: &[u8], +) -> Result<(u16, raw::BytesSource), raw::BytesSource> { + let mut out = raw::BytesSource::INVALID; + let status = unsafe { + raw::call_reducer_on_db_2pc( + identity.as_ptr(), + reducer_name.as_ptr(), + reducer_name.len() as u32, + args.as_ptr(), + args.len() as u32, + &mut out, + ) + }; + if status == Errno::HTTP_ERROR.code() { + Err(out) + } else { + Ok((status, out)) + } +} + /// Finds the JWT payload associated with `connection_id`. /// If nothing is found for the connection, this returns None. /// If a payload is found, this will return a valid [`raw::BytesSource`]. diff --git a/crates/bindings/src/remote_reducer.rs b/crates/bindings/src/remote_reducer.rs index 73bb13c7cd8..701282c66f6 100644 --- a/crates/bindings/src/remote_reducer.rs +++ b/crates/bindings/src/remote_reducer.rs @@ -90,3 +90,43 @@ pub fn call_reducer_on_db( } } } + +/// Call a reducer on a remote database using the 2PC prepare protocol. +/// +/// This is the 2PC variant of [`call_reducer_on_db`]. It calls the target database's +/// `/prepare/{reducer}` endpoint. On success, the runtime stores the prepare_id internally. +/// After the coordinator's reducer commits, all participants are committed automatically. +/// If the coordinator's reducer fails (panics or returns Err), all participants are aborted. +/// +/// Returns and errors are identical to [`call_reducer_on_db`]. +pub fn call_reducer_on_db_2pc( + database_identity: Identity, + reducer_name: &str, + args: &[u8], +) -> Result<(), RemoteCallError> { + let identity_bytes = database_identity.to_byte_array(); + match spacetimedb_bindings_sys::call_reducer_on_db_2pc(identity_bytes, reducer_name, args) { + Ok((status, body_source)) => { + if status < 300 { + return Ok(()); + } + let msg = if body_source == spacetimedb_bindings_sys::raw::BytesSource::INVALID { + String::new() + } else { + let mut buf = IterBuf::take(); + read_bytes_source_into(body_source, &mut buf); + String::from_utf8_lossy(&buf).into_owned() + }; + if status == 404 { + Err(RemoteCallError::NotFound(msg)) + } else { + Err(RemoteCallError::Failed(msg)) + } + } + Err(err_source) => { + use crate::rt::read_bytes_source_as; + let msg = read_bytes_source_as::(err_source); + Err(RemoteCallError::Unreachable(msg)) + } + } +} diff --git a/crates/client-api/src/routes/database.rs b/crates/client-api/src/routes/database.rs index 2dc62bb1734..d0088687498 100644 --- a/crates/client-api/src/routes/database.rs +++ b/crates/client-api/src/routes/database.rs @@ -249,6 +249,163 @@ fn parse_call_args(content_type: headers::ContentType, body: Bytes) -> axum::res } } +/// 2PC prepare endpoint: execute a reducer and return a prepare_id. +/// +/// `POST /v1/database/:name_or_identity/prepare/:reducer` +/// +/// On success, the response includes: +/// - `X-Prepare-Id` header with the prepare_id +/// - Body contains the reducer return value (if any) +pub async fn prepare( + State(worker_ctx): State, + Extension(auth): Extension, + Path(CallParams { + name_or_identity, + reducer, + }): Path, + TypedHeader(content_type): TypedHeader, + headers: axum::http::HeaderMap, + body: Bytes, +) -> axum::response::Result { + let args = parse_call_args(content_type, body)?; + let caller_identity = auth.claims.identity; + + // The coordinator sends its actual database identity in `X-Coordinator-Identity`. + // Without this, `anon_auth_middleware` gives the HTTP caller an ephemeral random + // identity, which gets stored in `st_2pc_state` and breaks recovery polling. + let coordinator_identity = headers + .get("X-Coordinator-Identity") + .and_then(|v| v.to_str().ok()) + .and_then(|s| spacetimedb_lib::Identity::from_hex(s).ok()); + + let (module, Database { owner_identity, .. }) = find_module_and_database(&worker_ctx, name_or_identity).await?; + + // 2PC prepare is a server-to-server call; no client lifecycle management needed. + // call_identity_connected/disconnected submit jobs to the module's executor, which + // will be blocked holding the 2PC write lock after prepare_reducer returns — deadlock. + let result = module + .prepare_reducer(caller_identity, None, &reducer, args.0, coordinator_identity) + .await; + + match result { + Ok((prepare_id, rcr, return_value)) => { + let (status, body) = + reducer_outcome_response(&module, &owner_identity, &reducer, rcr.outcome, return_value, args.1)?; + let mut response = ( + status, + TypedHeader(SpacetimeEnergyUsed(rcr.energy_used)), + TypedHeader(SpacetimeExecutionDurationMicros(rcr.execution_duration)), + body, + ) + .into_response(); + if !prepare_id.is_empty() { + response + .headers_mut() + .insert("X-Prepare-Id", http::HeaderValue::from_str(&prepare_id).unwrap()); + } + Ok(response) + } + Err(e) => Err(map_reducer_error(e, &reducer).into()), + } +} + +#[derive(Deserialize)] +pub struct TwoPcParams { + name_or_identity: NameOrIdentity, + prepare_id: String, +} + +/// 2PC commit endpoint: finalize a prepared transaction. +/// +/// `POST /v1/database/:name_or_identity/2pc/commit/:prepare_id` +pub async fn commit_2pc( + State(worker_ctx): State, + Extension(_auth): Extension, + Path(TwoPcParams { + name_or_identity, + prepare_id, + }): Path, +) -> axum::response::Result { + let (module, _database) = find_module_and_database(&worker_ctx, name_or_identity).await?; + + module.commit_prepared(&prepare_id).map_err(|e| { + log::error!("2PC commit failed: {e}"); + (StatusCode::NOT_FOUND, e).into_response() + })?; + + Ok(StatusCode::OK) +} + +/// 2PC abort endpoint: abort a prepared transaction. +/// +/// `POST /v1/database/:name_or_identity/2pc/abort/:prepare_id` +pub async fn abort_2pc( + State(worker_ctx): State, + Extension(_auth): Extension, + Path(TwoPcParams { + name_or_identity, + prepare_id, + }): Path, +) -> axum::response::Result { + let (module, _database) = find_module_and_database(&worker_ctx, name_or_identity).await?; + + module.abort_prepared(&prepare_id).map_err(|e| { + log::error!("2PC abort failed: {e}"); + (StatusCode::NOT_FOUND, e).into_response() + })?; + + Ok(StatusCode::OK) +} + +/// 2PC coordinator status endpoint. +/// +/// Returns `"commit"` if the coordinator has durably decided COMMIT for `prepare_id`, +/// or `"abort"` otherwise. Participant B polls this to recover from a timeout or crash. +/// +/// `GET /v1/database/:name_or_identity/2pc/status/:prepare_id` +pub async fn status_2pc( + State(worker_ctx): State, + Extension(_auth): Extension, + Path(TwoPcParams { + name_or_identity, + prepare_id, + }): Path, +) -> axum::response::Result { + let (module, _database) = find_module_and_database(&worker_ctx, name_or_identity).await?; + + let decision = if module.has_2pc_coordinator_commit(&prepare_id) { + "commit" + } else { + "abort" + }; + + Ok((StatusCode::OK, decision)) +} + +/// 2PC commit-ack endpoint. +/// +/// Called by participant B after it commits via the status-poll recovery path, +/// so that the coordinator can delete its `st_2pc_coordinator_log` entry. +/// +/// `POST /v1/database/:name_or_identity/2pc/ack-commit/:prepare_id` +pub async fn ack_commit_2pc( + State(worker_ctx): State, + Extension(_auth): Extension, + Path(TwoPcParams { + name_or_identity, + prepare_id, + }): Path, +) -> axum::response::Result { + let (module, _database) = find_module_and_database(&worker_ctx, name_or_identity).await?; + + module.ack_2pc_coordinator_commit(&prepare_id).map_err(|e| { + log::error!("2PC ack-commit failed: {e}"); + (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response() + })?; + + Ok(StatusCode::OK) +} + /// Encode a reducer return value as an HTTP response. /// /// If the outcome is an error, return a raw string with `application/text`. Ignore `want_bsatn` in this case. @@ -1280,6 +1437,16 @@ pub struct DatabaseRoutes { pub db_reset: MethodRouter, /// GET: /database/: name_or_identity/unstable/timestamp pub timestamp_get: MethodRouter, + /// POST: /database/:name_or_identity/prepare/:reducer + pub prepare_post: MethodRouter, + /// POST: /database/:name_or_identity/2pc/commit/:prepare_id + pub commit_2pc_post: MethodRouter, + /// POST: /database/:name_or_identity/2pc/abort/:prepare_id + pub abort_2pc_post: MethodRouter, + /// GET: /database/:name_or_identity/2pc/status/:prepare_id + pub status_2pc_get: MethodRouter, + /// POST: /database/:name_or_identity/2pc/ack-commit/:prepare_id + pub ack_commit_2pc_post: MethodRouter, } impl Default for DatabaseRoutes @@ -1305,6 +1472,11 @@ where pre_publish: post(pre_publish::), db_reset: put(reset::), timestamp_get: get(get_timestamp::), + prepare_post: post(prepare::), + commit_2pc_post: post(commit_2pc::), + abort_2pc_post: post(abort_2pc::), + status_2pc_get: get(status_2pc::), + ack_commit_2pc_post: post(ack_commit_2pc::), } } } @@ -1329,7 +1501,12 @@ where .route("/sql", self.sql_post) .route("/unstable/timestamp", self.timestamp_get) .route("/pre_publish", self.pre_publish) - .route("/reset", self.db_reset); + .route("/reset", self.db_reset) + .route("/prepare/:reducer", self.prepare_post) + .route("/2pc/commit/:prepare_id", self.commit_2pc_post) + .route("/2pc/abort/:prepare_id", self.abort_2pc_post) + .route("/2pc/status/:prepare_id", self.status_2pc_get) + .route("/2pc/ack-commit/:prepare_id", self.ack_commit_2pc_post); axum::Router::new() .route("/", self.root_post) diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index e55bcbabf99..12837a34ad5 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -12,7 +12,7 @@ use spacetimedb_commitlog::{self as commitlog, Commitlog, SizeOnDisk}; use spacetimedb_data_structures::map::HashSet; use spacetimedb_datastore::db_metrics::DB_METRICS; use spacetimedb_datastore::error::{DatastoreError, TableError, ViewError}; -use spacetimedb_datastore::execution_context::{Workload, WorkloadType}; +use spacetimedb_datastore::execution_context::{ReducerContext, Workload, WorkloadType}; use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics; use spacetimedb_datastore::locking_tx_datastore::state_view::{ IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, StateView, @@ -454,6 +454,26 @@ impl RelationalDB { Ok(self.with_read_only(Workload::Internal, |tx| self.inner.program(tx))?) } + /// Read any 2PC participant transactions that were in PREPARE state when the database + /// last shut down (or crashed). + /// + /// Each returned row contains all the information needed to resume the transaction: + /// the prepare_id, coordinator identity, reducer name/args, and caller context. + /// B never aborts on its own — it polls the coordinator for a decision. + pub fn pending_2pc_prepares(&self) -> Result, DBError> { + self.with_auto_commit(Workload::Internal, |tx| tx.scan_st_2pc_state().map_err(DBError::from)) + } + + /// Read any 2PC coordinator log entries that have not yet been acknowledged by their + /// participants. Used on coordinator crash-recovery to retransmit COMMIT decisions. + pub fn pending_2pc_coordinator_commits( + &self, + ) -> Result, DBError> { + self.with_auto_commit(Workload::Internal, |tx| { + tx.scan_st_2pc_coordinator_log().map_err(DBError::from) + }) + } + /// Read the set of clients currently connected to the database. pub fn connected_clients(&self) -> Result { self.with_read_only(Workload::Internal, |tx| { @@ -844,6 +864,16 @@ impl RelationalDB { (tx_data, tx_metrics, tx) } + /// Forward a pre-built `TxData` directly to the durability worker. + /// + /// Used by the 2PC participant path to make the `st_2pc_state` PREPARE marker durable + /// while the main write lock is still held (i.e. without going through a full commit). + pub fn request_durability_for_tx_data(&self, reducer_context: Option, tx_data: &Arc) { + if let Some(durability) = &self.durability { + durability.request_durability(reducer_context, tx_data); + } + } + /// Get the [`DurableOffset`] of this database, or `None` if this is an /// in-memory instance. pub fn durable_tx_offset(&self) -> Option { diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index dd774111b86..8ab53513314 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -1141,6 +1141,11 @@ impl Host { module_host.clear_all_clients().await?; scheduler_starter.start(&module_host)?; + + // Crash recovery: retransmit any pending 2PC decisions from before the restart. + module_host.recover_2pc_coordinator(); + module_host.recover_2pc_participant(); + let disk_metrics_recorder_task = tokio::spawn(metric_reporter(replica_ctx.clone())).abort_handle(); let view_cleanup_task = spawn_view_cleanup_loop(replica_ctx.relational_db().clone()); diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index b337e5bbb2e..46af8def3cc 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -54,6 +54,11 @@ pub struct InstanceEnv { in_anon_tx: bool, /// A procedure's last known transaction offset. procedure_last_tx_offset: Option, + /// 2PC: prepared participants from `call_reducer_on_db_2pc` calls. + /// Each entry is (database_identity, prepare_id). + /// After the coordinator's reducer commits, these are committed; + /// on failure, they are aborted. + pub prepared_participants: Vec<(Identity, String)>, } /// `InstanceEnv` needs to be `Send` because it is created on the host thread @@ -238,6 +243,7 @@ impl InstanceEnv { func_name: None, in_anon_tx: false, procedure_last_tx_offset: None, + prepared_participants: Vec::new(), } } @@ -1034,8 +1040,83 @@ impl InstanceEnv { let result = async { let response = req.send().await.map_err(|e| NodesError::HttpError(e.to_string()))?; let status = response.status().as_u16(); - let body = response.bytes().await.map_err(|e| NodesError::HttpError(e.to_string()))?; - Ok((status, body)) + let body = response + .bytes() + .await + .map_err(|e| NodesError::HttpError(e.to_string()))?; + Ok::<_, NodesError>((status, body)) + } + .await; + + WORKER_METRICS + .cross_db_reducer_calls_total + .with_label_values(&caller_identity) + .inc(); + WORKER_METRICS + .cross_db_reducer_duration_seconds + .with_label_values(&caller_identity) + .observe(start.elapsed().as_secs_f64()); + + result + } + } + + /// Call a reducer on a remote database using the 2PC prepare protocol. + /// + /// Like [`Self::call_reducer_on_db`], but POSTs to `/prepare/{reducer}` instead of + /// `/call/{reducer}`. On success, parses the `X-Prepare-Id` response header and stores + /// `(database_identity, prepare_id)` in [`Self::prepared_participants`]. + /// + /// Returns `(http_status, response_body)` on transport success. + /// The caller (coordinator reducer) is responsible for checking the status; + /// if the coordinator's reducer commits, the runtime will commit all participants, + /// and if it fails, the runtime will abort them. + pub fn call_reducer_on_db_2pc( + &mut self, + database_identity: Identity, + reducer_name: &str, + args: bytes::Bytes, + ) -> impl Future), NodesError>> + use<> { + let client = self.replica_ctx.call_reducer_client.clone(); + let router = self.replica_ctx.call_reducer_router.clone(); + let reducer_name = reducer_name.to_owned(); + let auth_token = self.replica_ctx.call_reducer_auth_token.clone(); + let caller_identity = self.replica_ctx.database.database_identity; + + async move { + let start = Instant::now(); + + let base_url = router + .resolve_base_url(database_identity) + .await + .map_err(|e| NodesError::HttpError(e.to_string()))?; + let url = format!( + "{}/v1/database/{}/prepare/{}", + base_url, + database_identity.to_hex(), + reducer_name, + ); + let mut req = client + .post(&url) + .header(http::header::CONTENT_TYPE, "application/octet-stream") + .header("X-Coordinator-Identity", caller_identity.to_hex().to_string()) + .body(args); + if let Some(token) = auth_token { + req = req.header(http::header::AUTHORIZATION, format!("Bearer {token}")); + } + let result = async { + let response = req.send().await.map_err(|e| NodesError::HttpError(e.to_string()))?; + let status = response.status().as_u16(); + let prepare_id = response + .headers() + .get("X-Prepare-Id") + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_owned()); + let body = response + .bytes() + .await + .map_err(|e| NodesError::HttpError(e.to_string()))?; + Ok((status, body, prepare_id)) } .await; diff --git a/crates/core/src/host/mod.rs b/crates/core/src/host/mod.rs index 25e56ca217e..06e55de6444 100644 --- a/crates/core/src/host/mod.rs +++ b/crates/core/src/host/mod.rs @@ -1,6 +1,7 @@ use anyhow::Context; use bytes::Bytes; use bytestring::ByteString; +use core::future::Future; use derive_more::Display; use enum_map::Enum; use once_cell::sync::OnceCell; @@ -10,11 +11,37 @@ use spacetimedb_lib::ProductValue; use spacetimedb_schema::def::deserialize::{ArgsSeed, FunctionDef}; use spacetimedb_schema::def::ModuleDef; +/// Block on `fut` from a synchronous context that may be inside a Tokio runtime. +/// +/// `Handle::block_on` and `block_in_place` both panic when the calling thread is +/// a custom (`std::thread::spawn`) thread that has entered the runtime via +/// `Handle::enter()` — which is exactly the pattern used by `SingleCoreExecutor`. +/// +/// The fix (same as the non-2PC `call_reducer_on_db` path): spawn a **scoped** +/// OS thread. The scoped thread starts with no Tokio context, so `Handle::block_on` +/// works normally and drives the future using the **original** runtime's I/O reactor +/// and connection pools. +/// +/// Use this for every place in the 2PC / cross-DB call paths that needs to +/// synchronously drive a future from blocking (WASM executor) context. +pub(crate) fn block_on_scoped(handle: &tokio::runtime::Handle, fut: F) -> F::Output +where + F: Future + Send, + F::Output: Send, +{ + std::thread::scope(|s| { + s.spawn(|| handle.block_on(fut)) + .join() + .expect("block_on_scoped: thread panicked") + }) +} + mod disk_storage; mod host_controller; mod module_common; #[allow(clippy::too_many_arguments)] pub mod module_host; +pub mod prepared_tx; pub mod scheduler; pub mod wasmtime; diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 929db1b8004..b3ecf0c28c7 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -890,6 +890,9 @@ pub struct ModuleHost { /// /// When this is true, most operations will fail with [`NoSuchModule`]. closed: Arc, + + /// Registry of prepared (but not yet finalized) 2PC transactions. + prepared_txs: super::prepared_tx::PreparedTransactions, } impl fmt::Debug for ModuleHost { @@ -906,6 +909,7 @@ pub struct WeakModuleHost { inner: Weak, on_panic: Weak, closed: Weak, + prepared_txs: super::prepared_tx::PreparedTransactions, } #[derive(Debug)] @@ -1093,6 +1097,7 @@ impl ModuleHost { inner, on_panic, closed: Arc::new(AtomicBool::new(false)), + prepared_txs: super::prepared_tx::PreparedTransactions::new(), } } @@ -1740,6 +1745,396 @@ impl ModuleHost { res } + /// Execute a reducer as a 2PC PREPARE on behalf of a remote coordinator. + /// + /// Holds the write lock (exclusive tx) open until a COMMIT or ABORT is received. + /// The `st_2pc_state` marker is committed atomically with the reducer's data changes at + /// actual COMMIT time. This means no other transaction can interleave between PREPARE + /// and COMMIT/ABORT, and there is no separate persistence barrier needed. + pub async fn prepare_reducer( + &self, + caller_identity: Identity, + caller_connection_id: Option, + reducer_name: &str, + args: FunctionArgs, + // The actual coordinator database identity (from `X-Coordinator-Identity` header). + // When `Some`, used for `prepare_id` namespacing and stored in `st_2pc_state` for + // recovery. Falls back to `caller_identity` when `None` (e.g., internal calls). + coordinator_identity_override: Option, + ) -> Result<(String, ReducerCallResult, Option), ReducerCallError> { + use std::sync::atomic::{AtomicU64, Ordering}; + use std::sync::OnceLock; + // Counter seeded from current time on first use so that restarts begin from a + // different value than any existing st_2pc_state entries (which hold IDs from + // previous sessions starting at much smaller counter values). + static PREPARE_COUNTER: AtomicU64 = AtomicU64::new(0); + static PREPARE_COUNTER_INIT: OnceLock<()> = OnceLock::new(); + PREPARE_COUNTER_INIT.get_or_init(|| { + let seed = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_micros() as u64; + PREPARE_COUNTER.store(seed, Ordering::Relaxed); + }); + + let (reducer_id, reducer_def) = self + .info + .module_def + .reducer_full(reducer_name) + .ok_or(ReducerCallError::NoSuchReducer)?; + if let Some(lifecycle) = reducer_def.lifecycle { + return Err(ReducerCallError::LifecycleReducer(lifecycle)); + } + if reducer_def.visibility.is_private() && !self.is_database_owner(caller_identity) { + return Err(ReducerCallError::NoSuchReducer); + } + + let args = args + .into_tuple_for_def(&self.info.module_def, reducer_def) + .map_err(InvalidReducerArguments)?; + let caller_connection_id = caller_connection_id.unwrap_or(ConnectionId::ZERO); + let params = CallReducerParams { + timestamp: Timestamp::now(), + caller_identity, + caller_connection_id, + client: None, + request_id: None, + timer: None, + reducer_id, + args, + }; + + // Resolve the effective coordinator identity before generating the prepare_id so + // the prefix is namespaced correctly even when called from the HTTP prepare handler. + let coordinator_identity = coordinator_identity_override.unwrap_or(caller_identity); + + // Include the coordinator identity so prepare_ids from different coordinators + // cannot collide on the participant's st_2pc_state table. + let coordinator_hex = coordinator_identity.to_hex(); + let prepare_id = format!( + "prepare-{}-{}", + &coordinator_hex.to_string()[..16], + PREPARE_COUNTER.fetch_add(1, Ordering::Relaxed), + ); + + // Channel for signalling PREPARED result back to this task. + let (prepared_tx, prepared_rx) = tokio::sync::oneshot::channel::<(ReducerCallResult, Option)>(); + // Channel for sending the COMMIT/ABORT decision to the executor thread. + let (decision_tx, decision_rx) = std::sync::mpsc::channel::(); + + self.prepared_txs.insert( + prepare_id.clone(), + super::prepared_tx::PreparedTxInfo { + decision_sender: decision_tx, + }, + ); + + // Spawn a background task that runs the reducer and holds the write lock + // until we send a decision. The executor thread blocks inside + // `call_reducer_prepare_and_hold` on `decision_rx.recv()`. + let this = self.clone(); + let reducer_name_owned = reducer_def.name.clone(); + let prepare_id_clone = prepare_id.clone(); + tokio::spawn(async move { + let _ = this + .call( + &reducer_name_owned, + (params, prepare_id_clone, coordinator_identity, prepared_tx, decision_rx), + async |(p, pid, cid, ptx, drx), inst| { + inst.call_reducer_prepare_and_hold(p, pid, cid, ptx, drx); + Ok::<(), ReducerCallError>(()) + }, + // JS modules: no 2PC support yet. + async |(_p, _pid, _cid, _ptx, _drx), _inst| Err(ReducerCallError::NoSuchReducer), + ) + .await; + }); + + // Wait for the PREPARED result (or failure) from `call_reducer_prepare_and_hold`. + match prepared_rx.await { + Ok((result, return_value)) => { + if matches!(result.outcome, ReducerOutcome::Committed) { + Ok((prepare_id, result, return_value)) + } else { + // Reducer failed — remove the entry we registered (no hold in progress). + self.prepared_txs.remove(&prepare_id); + Ok((String::new(), result, return_value)) + } + } + Err(_) => Err(ReducerCallError::NoSuchModule(NoSuchModule)), + } + } + + /// Finalize a prepared transaction as COMMIT. + pub fn commit_prepared(&self, prepare_id: &str) -> Result<(), String> { + let info = self + .prepared_txs + .remove(prepare_id) + .ok_or_else(|| format!("no such prepared transaction: {prepare_id}"))?; + // Unblock the executor thread to commit. + let _ = info.decision_sender.send(true); + Ok(()) + } + + /// Abort a prepared transaction. + pub fn abort_prepared(&self, prepare_id: &str) -> Result<(), String> { + let info = self + .prepared_txs + .remove(prepare_id) + .ok_or_else(|| format!("no such prepared transaction: {prepare_id}"))?; + // Unblock the executor thread to abort. + let _ = info.decision_sender.send(false); + Ok(()) + } + + /// Delete a coordinator log entry for `prepare_id`. + /// Called when B has confirmed it committed, so A can stop retransmitting. + pub fn ack_2pc_coordinator_commit(&self, prepare_id: &str) -> Result<(), anyhow::Error> { + let db = self.relational_db().clone(); + db.with_auto_commit::<_, _, anyhow::Error>(Workload::Internal, |tx| { + tx.delete_st_2pc_coordinator_log(prepare_id) + .map_err(anyhow::Error::from) + }) + } + + /// Check whether `prepare_id` is present in the coordinator log of this database. + /// Used by participant B to ask coordinator A: "did you commit?" + pub fn has_2pc_coordinator_commit(&self, prepare_id: &str) -> bool { + let db = self.relational_db(); + db.pending_2pc_coordinator_commits() + .map(|rows| rows.iter().any(|r| r.participant_prepare_id == prepare_id)) + .unwrap_or(false) + } + + /// Crash recovery for the **coordinator** role. + /// + /// Scans `st_2pc_coordinator_log` for participants that have not yet acked + /// COMMIT and retransmits the HTTP commit call. Deletes the log entry on success. + pub fn recover_2pc_coordinator(&self) { + let db = self.relational_db().clone(); + let rows = match db.pending_2pc_coordinator_commits() { + Ok(r) => r, + Err(e) => { + log::error!("recover_2pc_coordinator: scan failed: {e}"); + return; + } + }; + if rows.is_empty() { + return; + } + let replica_ctx = self.replica_ctx().clone(); + let db2 = db.clone(); + tokio::spawn(async move { + let client = replica_ctx.call_reducer_client.clone(); + let router = replica_ctx.call_reducer_router.clone(); + let auth_token = replica_ctx.call_reducer_auth_token.clone(); + for row in rows { + let prepare_id = row.participant_prepare_id.clone(); + let participant_identity = match Identity::from_hex(&row.participant_identity_hex) { + Ok(id) => id, + Err(e) => { + log::error!( + "recover_2pc_coordinator: invalid participant identity hex {}: {e}", + row.participant_identity_hex + ); + continue; + } + }; + let base_url = match router.resolve_base_url(participant_identity).await { + Ok(url) => url, + Err(e) => { + log::warn!("recover_2pc_coordinator: cannot resolve URL for {participant_identity}: {e}"); + continue; + } + }; + let url = format!( + "{}/v1/database/{}/2pc/commit/{}", + base_url, + participant_identity.to_hex(), + prepare_id, + ); + let mut req = client.post(&url); + if let Some(ref token) = auth_token { + req = req.header(http::header::AUTHORIZATION, format!("Bearer {token}")); + } + match req.send().await { + Ok(resp) if resp.status().is_success() => { + log::info!("recover_2pc_coordinator: re-committed {prepare_id} on {participant_identity}"); + if let Err(e) = db2.with_auto_commit::<_, _, anyhow::Error>(Workload::Internal, |tx| { + Ok(tx.delete_st_2pc_coordinator_log(&prepare_id)?) + }) { + log::warn!("recover_2pc_coordinator: delete coordinator log failed for {prepare_id}: {e}"); + } + } + Ok(resp) => { + log::warn!( + "recover_2pc_coordinator: commit for {prepare_id} returned {}", + resp.status() + ); + } + Err(e) => { + log::warn!("recover_2pc_coordinator: transport error for {prepare_id}: {e}"); + } + } + } + }); + } + + /// Crash recovery for the **participant** role. + /// + /// Scans `st_2pc_state` for any prepared-but-not-decided transactions, re-runs + /// each reducer to reacquire the write lock, then polls the coordinator for a decision. + /// + /// **B never aborts on its own** — only the coordinator's response yields ABORT. + pub fn recover_2pc_participant(&self) { + let db = self.relational_db().clone(); + let rows = match db.pending_2pc_prepares() { + Ok(r) => r, + Err(e) => { + log::error!("recover_2pc_participant: scan failed: {e}"); + return; + } + }; + if rows.is_empty() { + return; + } + let this = self.clone(); + tokio::spawn(async move { + for row in rows { + let original_prepare_id = row.prepare_id.clone(); + let coordinator_identity = match Identity::from_hex(&row.coordinator_identity_hex) { + Ok(id) => id, + Err(e) => { + log::error!( + "recover_2pc_participant: invalid coordinator identity hex for {original_prepare_id}: {e}" + ); + continue; + } + }; + let caller_identity = match Identity::from_hex(&row.caller_identity_hex) { + Ok(id) => id, + Err(e) => { + log::error!( + "recover_2pc_participant: invalid caller identity hex for {original_prepare_id}: {e}" + ); + continue; + } + }; + let caller_connection_id = u128::from_str_radix(&row.caller_connection_id_hex, 16) + .map(ConnectionId::from_u128) + .unwrap_or(ConnectionId::ZERO); + let args = FunctionArgs::Bsatn(row.args_bsatn.clone().into()); + + // Step 1: Re-run the reducer to reacquire the write lock. + let new_prepare_id = match this + .prepare_reducer(caller_identity, Some(caller_connection_id), &row.reducer_name, args, Some(coordinator_identity)) + .await + { + Ok((pid, result, _rv)) if !pid.is_empty() => { + log::info!( + "recover_2pc_participant: re-prepared {original_prepare_id} as {pid}: {:?}", + result.outcome + ); + pid + } + Ok(_) => { + // Reducer failed — treat as abort, clean up old marker. + log::warn!("recover_2pc_participant: reducer failed on re-run for {original_prepare_id}"); + let _ = db.with_auto_commit::<_, _, anyhow::Error>(Workload::Internal, |tx| { + Ok(tx.delete_st_2pc_state(&original_prepare_id)?) + }); + continue; + } + Err(e) => { + log::error!("recover_2pc_participant: prepare_reducer error for {original_prepare_id}: {e:?}"); + continue; + } + }; + + // Step 2: Poll coordinator with the ORIGINAL prepare_id until we get a decision. + // We do this in a separate task so the loop can proceed to the next row. + let this2 = this.clone(); + let db2 = db.clone(); + let client = this.replica_ctx().call_reducer_client.clone(); + let router = this.replica_ctx().call_reducer_router.clone(); + let auth_token = this.replica_ctx().call_reducer_auth_token.clone(); + tokio::spawn(async move { + loop { + let decision = Self::query_coordinator_status_with_client( + &client, + &router, + auth_token.clone(), + coordinator_identity, + &original_prepare_id, + ) + .await; + match decision { + Some(commit) => { + if commit { + let _ = this2.commit_prepared(&new_prepare_id); + // The actor thread (call_reducer_prepare_and_hold) will wait + // for B's commit to be durable and then send the ack-commit + // to the coordinator. Nothing to do here. + } else { + let _ = this2.abort_prepared(&new_prepare_id); + } + // Clean up the old st_2pc_state entry. + let _ = db2.with_auto_commit::<_, _, anyhow::Error>(Workload::Internal, |tx| { + Ok(tx.delete_st_2pc_state(&original_prepare_id)?) + }); + break; + } + None => tokio::time::sleep(std::time::Duration::from_secs(5)).await, + } + } + }); + } + }); + } + + /// Query `GET /v1/database/{coordinator}/2pc/status/{prepare_id}`. + /// + /// Returns `Some(true)` = COMMIT, `Some(false)` = ABORT, `None` = transient error (retry). + async fn query_coordinator_status_with_client( + client: &reqwest::Client, + router: &std::sync::Arc, + auth_token: Option, + coordinator_identity: Identity, + prepare_id: &str, + ) -> Option { + let base_url = match router.resolve_base_url(coordinator_identity).await { + Ok(url) => url, + Err(e) => { + log::warn!("2PC recovery status poll: cannot resolve coordinator URL: {e}"); + return None; + } + }; + let url = format!( + "{}/v1/database/{}/2pc/status/{}", + base_url, + coordinator_identity.to_hex(), + prepare_id, + ); + let mut req = client.get(&url); + if let Some(token) = &auth_token { + req = req.header(http::header::AUTHORIZATION, format!("Bearer {token}")); + } + match req.send().await { + Ok(resp) if resp.status().is_success() => { + let body = resp.text().await.unwrap_or_default(); + Some(body.trim() == "commit") + } + Ok(resp) => { + log::warn!("2PC recovery status poll: coordinator returned {}", resp.status()); + None + } + Err(e) => { + log::warn!("2PC recovery status poll: transport error: {e}"); + None + } + } + } + pub async fn call_view_add_single_subscription( &self, sender: Arc, @@ -2561,6 +2956,7 @@ impl ModuleHost { inner: Arc::downgrade(&self.inner), on_panic: Arc::downgrade(&self.on_panic), closed: Arc::downgrade(&self.closed), + prepared_txs: self.prepared_txs.clone(), } } @@ -2605,6 +3001,7 @@ impl WeakModuleHost { inner, on_panic, closed, + prepared_txs: self.prepared_txs.clone(), }) } } diff --git a/crates/core/src/host/prepared_tx.rs b/crates/core/src/host/prepared_tx.rs new file mode 100644 index 00000000000..f3676779eb7 --- /dev/null +++ b/crates/core/src/host/prepared_tx.rs @@ -0,0 +1,28 @@ +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +/// Information about a prepared (but not yet committed or aborted) 2PC transaction. +/// Sending `true` commits; sending `false` aborts. +pub struct PreparedTxInfo { + pub decision_sender: std::sync::mpsc::Sender, +} + +/// Thread-safe registry of prepared transactions, keyed by prepare_id. +#[derive(Clone, Default)] +pub struct PreparedTransactions { + inner: Arc>>, +} + +impl PreparedTransactions { + pub fn new() -> Self { + Self::default() + } + + pub fn insert(&self, id: String, info: PreparedTxInfo) { + self.inner.lock().unwrap().insert(id, info); + } + + pub fn remove(&self, id: &str) -> Option { + self.inner.lock().unwrap().remove(id) + } +} diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index 332c9c89dd3..5810c67d7bd 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -1398,6 +1398,11 @@ impl WasmInstance for V8Instance<'_, '_, '_> { log_traceback(self.replica_ctx, func_type, func, trap) } + fn take_prepared_participants(&mut self) -> Vec<(Identity, String)> { + // V8/JS does not currently support 2PC, so always return empty. + Vec::new() + } + async fn call_procedure( &mut self, op: ProcedureOp, diff --git a/crates/core/src/host/wasm_common.rs b/crates/core/src/host/wasm_common.rs index b5bba032d7c..5d744bc2108 100644 --- a/crates/core/src/host/wasm_common.rs +++ b/crates/core/src/host/wasm_common.rs @@ -438,6 +438,10 @@ macro_rules! abi_funcs { // Implemented as a sync host function (using block_in_place) so it can be called // from within a reducer body where only synchronous host functions are allowed. "spacetime_10.5"::call_reducer_on_db, + + // 2PC variant: calls /prepare/{reducer} instead of /call/{reducer}. + // Stores the prepare_id for post-commit coordination. + "spacetime_10.5"::call_reducer_on_db_2pc, } $link_async! { diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 7898a4f205a..1b0dd352641 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -4,6 +4,7 @@ use crate::client::ClientActorId; use crate::database_logger; use crate::energy::{EnergyMonitor, FunctionBudget, FunctionFingerprint}; use crate::error::DBError; +use crate::host::block_on_scoped; use crate::host::host_controller::CallProcedureReturn; use crate::host::instance_env::{InstanceEnv, TxSlot}; use crate::host::module_common::{build_common_module_from_raw, ModuleCommon}; @@ -92,6 +93,10 @@ pub trait WasmInstance { fn log_traceback(&self, func_type: &str, func: &str, trap: &anyhow::Error); + /// Take the list of 2PC prepared participants accumulated during this reducer call. + /// Returns the participants and clears the internal list. + fn take_prepared_participants(&mut self) -> Vec<(Identity, String)>; + fn call_procedure( &mut self, op: ProcedureOp, @@ -395,6 +400,50 @@ impl WasmModuleHostActor { } } +/// Notify coordinator A that B has committed, so A can delete its coordinator log entry. +/// +/// Called AFTER B's commit is durable. Fire-and-forget: failure is tolerated because +/// `recover_2pc_coordinator` on A will retransmit COMMIT on restart. +async fn send_ack_commit_to_coordinator( + client: reqwest::Client, + router: std::sync::Arc, + auth_token: Option, + coordinator_identity: crate::identity::Identity, + prepare_id: String, +) { + let base_url = match router.resolve_base_url(coordinator_identity).await { + Ok(url) => url, + Err(e) => { + log::warn!("2PC ack-commit: cannot resolve coordinator URL: {e}"); + return; + } + }; + let url = format!( + "{}/v1/database/{}/2pc/ack-commit/{}", + base_url, + coordinator_identity.to_hex(), + prepare_id, + ); + let mut req = client.post(&url); + if let Some(token) = &auth_token { + req = req.header(http::header::AUTHORIZATION, format!("Bearer {token}")); + } + match req.send().await { + Ok(resp) if resp.status().is_success() => { + log::info!("2PC ack-commit: notified coordinator for {prepare_id}"); + } + Ok(resp) => { + log::warn!( + "2PC ack-commit: coordinator returned {} for {prepare_id}", + resp.status() + ); + } + Err(e) => { + log::warn!("2PC ack-commit: transport error for {prepare_id}: {e}"); + } + } +} + impl WasmModuleHostActor { fn make_from_instance(&self, mut instance: T::Instance) -> WasmModuleInstance { let common = InstanceCommon::new(&self.common); @@ -562,6 +611,245 @@ impl WasmModuleInstance { }) } + /// Run the reducer as a 2PC participant PREPARE. + /// + /// Holds the write lock (MutTxId) open until a COMMIT or ABORT decision arrives. + /// The flow: + /// 1. Extract recovery info from `params` (reducer name, args, caller context). + /// 2. Run reducer (no commit); hold open MutTxId (write lock). + /// 3. If reducer failed: send failure via `prepared_tx`; rollback; return. + /// 4. Flush `st_2pc_state` marker (with recovery fields) directly into committed state. + /// The marker's `TxData` is forwarded to durability so PREPARE is durable. + /// 5. Signal PREPARED via `prepared_tx`. + /// 6. Wait for decision: + /// - Fast path: `decision_rx.recv_timeout(60s)` delivers COMMIT or ABORT. + /// - Slow path: on timeout/disconnect, poll coordinator status endpoint every 5s. + /// **B never aborts on its own** — only A's response can yield ABORT. + /// - COMMIT: delete `st_2pc_state` in the same tx as reducer changes (atomic). + /// - ABORT: rollback, delete `st_2pc_state` in a new tx. + pub fn call_reducer_prepare_and_hold( + &mut self, + params: CallReducerParams, + prepare_id: String, + coordinator_identity: crate::identity::Identity, + prepared_tx: tokio::sync::oneshot::Sender<(ReducerCallResult, Option)>, + decision_rx: std::sync::mpsc::Receiver, + ) { + let stdb = self.instance.replica_ctx().relational_db().clone(); + let replica_ctx = self.instance.replica_ctx().clone(); + + // Extract recovery info before params are consumed. + let recovery_reducer_name = self + .common + .info + .module_def + .reducer_by_id(params.reducer_id) + .name + .to_string(); + let recovery_args_bsatn = params.args.get_bsatn().to_vec(); + let recovery_caller_identity_hex = params.caller_identity.to_hex().to_string(); + let recovery_caller_connection_id_hex = format!("{:x}", params.caller_connection_id.to_u128()); + let recovery_timestamp_micros = params.timestamp.to_micros_since_unix_epoch(); + + // Step 1: run the reducer and hold the write lock open. + let (mut tx, event, client, trapped) = crate::callgrind_flag::invoke_allowing_callgrind(|| { + self.common.run_reducer_no_commit(None, params, &mut self.instance) + }); + self.trapped = trapped; + + let energy_quanta_used = event.energy_quanta_used; + let total_duration = event.host_execution_duration; + + if !matches!(event.status, EventStatus::Committed(_)) { + // Reducer failed — roll back and signal failure; no marker was written. + let res = ReducerCallResult { + outcome: ReducerOutcome::from(&event.status), + energy_used: energy_quanta_used, + execution_duration: total_duration, + }; + let return_value = event.reducer_return_value.clone(); + let _ = prepared_tx.send((res, return_value)); + let _ = stdb.rollback_mut_tx(tx); + return; + } + + // Step 2: flush the st_2pc_state marker with recovery fields directly into committed + // state, assign a tx_offset, and forward to durability — while holding the write lock. + let marker_tx_data = match tx.flush_2pc_prepare_marker( + &prepare_id, + coordinator_identity.to_hex().to_string(), + recovery_reducer_name, + recovery_args_bsatn, + recovery_caller_identity_hex, + recovery_caller_connection_id_hex, + recovery_timestamp_micros, + ) { + Ok(td) => std::sync::Arc::new(td), + Err(e) => { + log::error!("call_reducer_prepare_and_hold: flush_2pc_prepare_marker failed for {prepare_id}: {e}"); + let _ = stdb.rollback_mut_tx(tx); + return; + } + }; + stdb.request_durability_for_tx_data(None, &marker_tx_data); + + // Step 3: wait for the PREPARE marker to be durable before signalling PREPARED. + // B must not claim PREPARED until the marker is on disk — if B crashes after + // claiming PREPARED but before the marker is durable, recovery has nothing to recover. + if let Some(prepare_offset) = marker_tx_data.tx_offset() { + if let Some(mut durable) = stdb.durable_tx_offset() { + let handle = tokio::runtime::Handle::current(); + let _ = block_on_scoped(&handle, durable.wait_for(prepare_offset)); + } + } + + // Step 4: signal PREPARED. + let res = ReducerCallResult { + outcome: ReducerOutcome::from(&event.status), + energy_used: energy_quanta_used, + execution_duration: total_duration, + }; + let return_value = event.reducer_return_value.clone(); + let _ = prepared_tx.send((res, return_value)); + + // Step 4: wait for coordinator's decision (B never aborts on its own). + let commit = Self::wait_for_2pc_decision(decision_rx, &prepare_id, coordinator_identity, &replica_ctx); + + if commit { + // Delete the marker in the same tx as the reducer changes (atomic commit). + if let Err(e) = tx.delete_st_2pc_state(&prepare_id) { + log::error!("call_reducer_prepare_and_hold: failed to delete st_2pc_state for {prepare_id}: {e}"); + } + let commit_result = commit_and_broadcast_event(&self.common.info.subscriptions, client, event, tx); + + // Wait for B's COMMIT to be durable before acking to coordinator. + // Without this, A could delete its coordinator log entry while B's commit + // is still in-memory — a B crash at that point would leave the tx uncommitted + // with no way to recover (A has already forgotten it committed). + if let Some(mut durable) = stdb.durable_tx_offset() { + let handle = tokio::runtime::Handle::current(); + block_on_scoped(&handle, async move { + if let Ok(offset) = commit_result.tx_offset.await { + let _ = durable.wait_for(offset).await; + } + }); + } + + // Notify coordinator that B has committed so it can delete its coordinator log entry. + // Fire-and-forget: if this fails, coordinator's recover_2pc_coordinator will retry on + // restart, and B's commit_prepared will then return a harmless "not found" error. + let router = replica_ctx.call_reducer_router.clone(); + let client_http = replica_ctx.call_reducer_client.clone(); + let auth_token = replica_ctx.call_reducer_auth_token.clone(); + tokio::runtime::Handle::current().spawn(send_ack_commit_to_coordinator( + client_http, + router, + auth_token, + coordinator_identity, + prepare_id, + )); + } else { + // ABORT: roll back reducer changes; clean up the already-committed marker. + let _ = stdb.rollback_mut_tx(tx); + if let Err(e) = stdb.with_auto_commit::<_, _, anyhow::Error>(Workload::Internal, |del_tx| { + Ok(del_tx.delete_st_2pc_state(&prepare_id)?) + }) { + log::error!( + "call_reducer_prepare_and_hold: abort: failed to delete st_2pc_state for {prepare_id}: {e}" + ); + } + } + } + + /// Wait for a 2PC COMMIT or ABORT decision for `prepare_id`. + /// + /// First waits on `decision_rx` for up to 60 seconds. If no decision arrives, + /// switches to polling the coordinator's `GET /2pc/status/{prepare_id}` endpoint + /// every 5 seconds until a definitive answer is received. + /// + /// **B never aborts on its own** — ABORT is only returned when A explicitly says so. + fn wait_for_2pc_decision( + decision_rx: std::sync::mpsc::Receiver, + prepare_id: &str, + coordinator_identity: crate::identity::Identity, + replica_ctx: &std::sync::Arc, + ) -> bool { + match decision_rx.recv_timeout(Duration::from_secs(60)) { + Ok(commit) => return commit, + Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { + log::warn!("2PC prepare_id={prepare_id}: no decision after 60s, polling coordinator"); + } + Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => { + log::warn!("2PC prepare_id={prepare_id}: decision channel closed, polling coordinator"); + } + } + + let handle = tokio::runtime::Handle::current(); + let client = replica_ctx.call_reducer_client.clone(); + let router = replica_ctx.call_reducer_router.clone(); + let auth_token = replica_ctx.call_reducer_auth_token.clone(); + let prepare_id_owned = prepare_id.to_owned(); + loop { + let decision = block_on_scoped( + &handle, + Self::query_coordinator_status( + &client, + &router, + auth_token.clone(), + coordinator_identity, + &prepare_id_owned, + ), + ); + match decision { + Some(commit) => return commit, + None => std::thread::sleep(Duration::from_secs(5)), + } + } + } + + /// Query `GET /v1/database/{coordinator}/2pc/status/{prepare_id}`. + /// + /// Returns `Some(true)` = COMMIT, `Some(false)` = ABORT, `None` = transient error (retry). + async fn query_coordinator_status( + client: &reqwest::Client, + router: &std::sync::Arc, + auth_token: Option, + coordinator_identity: crate::identity::Identity, + prepare_id: &str, + ) -> Option { + let base_url = match router.resolve_base_url(coordinator_identity).await { + Ok(url) => url, + Err(e) => { + log::warn!("2PC status poll: cannot resolve coordinator URL: {e}"); + return None; + } + }; + let url = format!( + "{}/v1/database/{}/2pc/status/{}", + base_url, + coordinator_identity.to_hex(), + prepare_id, + ); + let mut req = client.get(&url); + if let Some(token) = &auth_token { + req = req.header(http::header::AUTHORIZATION, format!("Bearer {token}")); + } + match req.send().await { + Ok(resp) if resp.status().is_success() => { + let body = resp.text().await.unwrap_or_default(); + Some(body.trim() == "commit") + } + Ok(resp) => { + log::warn!("2PC status poll: coordinator returned {}", resp.status()); + None + } + Err(e) => { + log::warn!("2PC status poll: transport error: {e}"); + None + } + } + } + pub fn call_view(&mut self, cmd: ViewCommand) -> ViewCommandResult { let (res, trapped) = self.common.handle_cmd(cmd, &mut self.instance); self.trapped = trapped; @@ -838,6 +1126,138 @@ impl InstanceCommon { params: CallReducerParams, inst: &mut I, ) -> (ReducerCallResult, Option, bool) { + let (mut tx, event, client, trapped) = self.run_reducer_no_commit(tx, params, inst); + + let energy_quanta_used = event.energy_quanta_used; + let total_duration = event.host_execution_duration; + + // Take participants before commit so we can write the coordinator log atomically. + let prepared_participants = inst.take_prepared_participants(); + + // If this coordinator tx is committed and has participants, write coordinator log + // entries into the still-open tx. They are committed atomically with the tx, + // making A's COMMIT decision durable before any HTTP is sent to B (scenario 2 + // crash recovery). + if matches!(event.status, EventStatus::Committed(_)) && !prepared_participants.is_empty() { + for (db_identity, prepare_id) in &prepared_participants { + if let Err(e) = tx.insert_st_2pc_coordinator_log(prepare_id, &db_identity.to_hex().to_string()) { + log::error!("insert_st_2pc_coordinator_log failed for {prepare_id}: {e}"); + } + } + } + + let commit_result = commit_and_broadcast_event(&self.info.subscriptions, client, event, tx); + let commit_tx_offset = commit_result.tx_offset; + let event = commit_result.event; + + // 2PC post-commit coordination: send COMMIT or ABORT to each participant. + if !prepared_participants.is_empty() { + let committed = matches!(event.status, EventStatus::Committed(_)); + let stdb = self.info.subscriptions.relational_db().clone(); + let handle = tokio::runtime::Handle::current(); + + // Wait for A's coordinator log (committed atomically with the tx) to be + // durable before sending COMMIT to B. This guarantees that if A crashes + // after sending COMMIT, recovery can retransmit from the durable log. + // Only needed for COMMIT — ABORT carries no durability requirement. + if committed { + if let Some(mut durable_offset) = stdb.durable_tx_offset() { + block_on_scoped(&handle, async move { + if let Ok(offset) = commit_tx_offset.await { + let _ = durable_offset.wait_for(offset).await; + } + }); + } + } + + // Fire-and-forget: send COMMIT/ABORT to each participant. + // The coordinator log (written atomically with A's tx above) is the + // durability guarantee — if a send fails, recover_2pc_coordinator retransmits + // on restart and recover_2pc_participant polls the status endpoint. + // Blocking the executor here would stall A's next reducer for up to + // 30 s × number of participants with no correctness benefit. + let replica_ctx = inst.replica_ctx().clone(); + handle.spawn(async move { + let client = replica_ctx.call_reducer_client.clone(); + let router = replica_ctx.call_reducer_router.clone(); + let auth_token = replica_ctx.call_reducer_auth_token.clone(); + for (db_identity, prepare_id) in &prepared_participants { + let action = if committed { "commit" } else { "abort" }; + let base_url = match router.resolve_base_url(*db_identity).await { + Ok(url) => url, + Err(e) => { + log::error!("2PC {action}: failed to resolve base URL for {db_identity}: {e}"); + continue; + } + }; + let url = format!( + "{}/v1/database/{}/2pc/{}/{}", + base_url, + db_identity.to_hex(), + action, + prepare_id, + ); + let mut req = client.post(&url); + if let Some(ref token) = auth_token { + req = req.header(http::header::AUTHORIZATION, format!("Bearer {token}")); + } + match req.send().await { + Ok(resp) if resp.status().is_success() => { + log::info!("2PC {action}: {prepare_id} on {db_identity}"); + // B acknowledged COMMIT — remove coordinator log entry + // (best-effort; recovery will clean up on restart if missed). + if committed { + if let Err(e) = stdb + .with_auto_commit::<_, _, anyhow::Error>(Workload::Internal, |del_tx| { + Ok(del_tx.delete_st_2pc_coordinator_log(prepare_id)?) + }) + { + log::warn!("delete_st_2pc_coordinator_log failed for {prepare_id}: {e}"); + } + } + } + Ok(resp) => { + log::error!( + "2PC {action}: failed for {prepare_id} on {db_identity}: status {}", + resp.status() + ); + } + Err(e) => { + log::error!("2PC {action}: transport error for {prepare_id} on {db_identity}: {e}"); + } + } + } + }); + } + + let res = ReducerCallResult { + outcome: ReducerOutcome::from(&event.status), + energy_used: energy_quanta_used, + execution_duration: total_duration, + }; + + (res, event.reducer_return_value.clone(), trapped) + } + + /// Run the reducer and views, but do NOT commit or broadcast yet. + /// + /// Returns `(open_tx, event, client, trapped)`. The `MutTxId` write lock is + /// still held. The caller is responsible for either committing (via + /// [`commit_and_broadcast_event`]) or rolling back. + /// + /// This is the building block for both the normal path and the 2PC participant + /// PREPARE path. + pub(crate) fn run_reducer_no_commit( + &mut self, + tx: Option, + params: CallReducerParams, + inst: &mut I, + ) -> ( + MutTxId, + ModuleEvent, + Option>, + bool, + ) { let CallReducerParams { timestamp, caller_identity, @@ -947,9 +1367,9 @@ impl InstanceCommon { vm_metrics.report_total_duration(out.total_duration); vm_metrics.report_abi_duration(out.abi_duration); - let status = match out.outcome { + let status = match &out.outcome { ViewOutcome::BudgetExceeded => EventStatus::OutOfEnergy, - ViewOutcome::Failed(err) => EventStatus::FailedInternal(err), + ViewOutcome::Failed(err) => EventStatus::FailedInternal(err.clone()), ViewOutcome::Success => status, }; if !matches!(status, EventStatus::Committed(_)) { @@ -975,15 +1395,7 @@ impl InstanceCommon { request_id, timer, }; - let event = commit_and_broadcast_event(&info.subscriptions, client, event, out.tx).event; - - let res = ReducerCallResult { - outcome: ReducerOutcome::from(&event.status), - energy_used: energy_quanta_used, - execution_duration: total_duration, - }; - - (res, event.reducer_return_value.clone(), trapped) + (out.tx, event, client, trapped) } fn handle_outer_error(&mut self, energy: &EnergyStats, reducer_name: &str) -> EventStatus { diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index 728b3711057..e23e3da01bb 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -232,6 +232,11 @@ impl WasmInstanceEnv { &self.instance_env } + /// Return a mutable reference to the `InstanceEnv`. + pub fn instance_env_mut(&mut self) -> &mut InstanceEnv { + &mut self.instance_env + } + /// Setup the standard bytes sink and return a handle to it for writing. pub fn setup_standard_bytes_sink(&mut self) -> u32 { self.standard_bytes_sink = Some(Vec::new()); @@ -1991,19 +1996,11 @@ impl WasmInstanceEnv { let args_buf = mem.deref_slice(args_ptr, args_len)?; let args = bytes::Bytes::copy_from_slice(args_buf); - // Reducers run inside a tokio LocalSet (single-threaded), so block_in_place - // is unavailable and futures::executor::block_on can't drive tokio I/O. - // Spawn a new OS thread and call Handle::block_on from there, which is - // designed to be called from synchronous (non-async) contexts. let handle = tokio::runtime::Handle::current(); let fut = env .instance_env .call_reducer_on_db(database_identity, &reducer_name, args); - let result = std::thread::scope(|s| { - s.spawn(|| handle.block_on(fut)) - .join() - .expect("call_reducer_on_db: worker thread panicked") - }); + let result = super::super::block_on_scoped(&handle, fut); match result { Ok((status, body)) => { @@ -2023,6 +2020,71 @@ impl WasmInstanceEnv { } }) } + + /// 2PC variant of `call_reducer_on_db`. + /// + /// Calls the remote database's `/prepare/{reducer}` endpoint instead of `/call/{reducer}`. + /// On success, parses the `X-Prepare-Id` header and stores the participant info in + /// `InstanceEnv::prepared_participants` so the runtime can commit/abort after the + /// coordinator's reducer completes. + /// + /// Returns the HTTP status code on success, writing the response body to `*out` + /// as a [`BytesSource`]. + /// + /// On transport failure: + /// - Returns `HTTP_ERROR` errno, writing a BSATN-encoded error [`String`] to `*out`. + pub fn call_reducer_on_db_2pc( + caller: Caller<'_, Self>, + identity_ptr: WasmPtr, + reducer_ptr: WasmPtr, + reducer_len: u32, + args_ptr: WasmPtr, + args_len: u32, + out: WasmPtr, + ) -> RtResult { + Self::cvt_custom(caller, AbiCall::CallReducerOnDb, |caller| { + let (mem, env) = Self::mem_env(caller); + + let identity_slice = mem.deref_slice(identity_ptr, 32)?; + let identity_bytes: [u8; 32] = identity_slice + .try_into() + .expect("deref_slice(ptr, 32) always yields exactly 32 bytes"); + let database_identity = Identity::from_byte_array(identity_bytes); + + let reducer_name = mem.deref_str(reducer_ptr, reducer_len)?.to_owned(); + let args_buf = mem.deref_slice(args_ptr, args_len)?; + let args = bytes::Bytes::copy_from_slice(args_buf); + + let handle = tokio::runtime::Handle::current(); + let fut = env + .instance_env + .call_reducer_on_db_2pc(database_identity, &reducer_name, args); + let result = super::super::block_on_scoped(&handle, fut); + + match result { + Ok((status, body, prepare_id)) => { + // If we got a prepare_id, register this participant. + if let Some(pid) = prepare_id + && status < 300 + { + env.instance_env.prepared_participants.push((database_identity, pid)); + } + let bytes_source = WasmInstanceEnv::create_bytes_source(env, body)?; + bytes_source.0.write_to(mem, out)?; + Ok(status as u32) + } + Err(NodesError::HttpError(err)) => { + let err_bytes = bsatn::to_vec(&err).with_context(|| { + format!("Failed to BSATN-serialize call_reducer_on_db_2pc transport error: {err:?}") + })?; + let bytes_source = WasmInstanceEnv::create_bytes_source(env, err_bytes.into())?; + bytes_source.0.write_to(mem, out)?; + Ok(errno::HTTP_ERROR.get() as u32) + } + Err(e) => Err(WasmError::Db(e)), + } + }) + } } type Fut<'caller, T> = Box>; diff --git a/crates/core/src/host/wasmtime/wasmtime_module.rs b/crates/core/src/host/wasmtime/wasmtime_module.rs index 2ea14a57be9..fb01e3a4763 100644 --- a/crates/core/src/host/wasmtime/wasmtime_module.rs +++ b/crates/core/src/host/wasmtime/wasmtime_module.rs @@ -564,6 +564,10 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { log_traceback(func_type, func, trap) } + fn take_prepared_participants(&mut self) -> Vec<(Identity, String)> { + core::mem::take(&mut self.store.data_mut().instance_env_mut().prepared_participants) + } + #[tracing::instrument(level = "trace", skip_all)] async fn call_procedure( &mut self, diff --git a/crates/core/src/replica_context.rs b/crates/core/src/replica_context.rs index 8c9f8804f24..307dcf9f70c 100644 --- a/crates/core/src/replica_context.rs +++ b/crates/core/src/replica_context.rs @@ -68,12 +68,16 @@ pub struct ReplicaContext { impl ReplicaContext { /// Build a warmed `reqwest::Client` from `config`. + /// + /// Uses HTTP/2 prior knowledge (h2c) for all connections. + /// The server must be configured to accept h2c (HTTP/2 cleartext) connections. pub fn new_call_reducer_client(config: &CallReducerOnDbConfig) -> reqwest::Client { reqwest::Client::builder() .tcp_keepalive(config.tcp_keepalive) .pool_idle_timeout(config.pool_idle_timeout) .pool_max_idle_per_host(config.pool_max_idle_per_host) .timeout(config.request_timeout) + .http2_prior_knowledge() .build() .expect("failed to build call_reducer_on_db HTTP client") } diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index 3472fae82d5..8663186c336 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -29,6 +29,7 @@ use crate::{ use crate::{ locking_tx_datastore::ViewCallInfo, system_tables::{ + ST_2PC_COORDINATOR_LOG_ID, ST_2PC_COORDINATOR_LOG_IDX, ST_2PC_STATE_ID, ST_2PC_STATE_IDX, ST_COLUMN_ACCESSOR_ID, ST_COLUMN_ACCESSOR_IDX, ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_IDX, ST_EVENT_TABLE_ID, ST_EVENT_TABLE_IDX, ST_INDEX_ACCESSOR_ID, ST_INDEX_ACCESSOR_IDX, ST_TABLE_ACCESSOR_ID, ST_TABLE_ACCESSOR_IDX, ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_IDX, ST_VIEW_ID, ST_VIEW_IDX, ST_VIEW_PARAM_ID, @@ -477,6 +478,9 @@ impl CommittedState { self.create_table(ST_INDEX_ACCESSOR_ID, schemas[ST_INDEX_ACCESSOR_IDX].clone()); self.create_table(ST_COLUMN_ACCESSOR_ID, schemas[ST_COLUMN_ACCESSOR_IDX].clone()); + self.create_table(ST_2PC_STATE_ID, schemas[ST_2PC_STATE_IDX].clone()); + self.create_table(ST_2PC_COORDINATOR_LOG_ID, schemas[ST_2PC_COORDINATOR_LOG_IDX].clone()); + // Insert the sequences into `st_sequences` let (st_sequences, blob_store, pool) = self.get_table_and_blob_store_or_create(ST_SEQUENCE_ID, &schemas[ST_SEQUENCE_IDX]); @@ -1380,6 +1384,35 @@ impl CommittedState { self.tables.insert(table_id, Self::make_table(schema)); } + /// Insert a single row directly into the committed state, bypassing `TxState`. + /// + /// Assigns the next `tx_offset` to the resulting `TxData` and increments the counter. + /// The write lock (and therefore the transaction) is **not** released. + /// + /// Used by the 2PC participant path to flush the `st_2pc_state` PREPARE marker to the + /// commitlog (via the durability worker) while keeping the reducer's write lock open, + /// so that no other transaction can interleave between PREPARE and COMMIT/ABORT. + pub(super) fn insert_row_and_consume_offset( + &mut self, + table_id: TableId, + schema: &Arc, + row: &ProductValue, + ) -> Result { + let (table, blob_store, pool) = self.get_table_and_blob_store_or_create(table_id, schema); + table.insert(pool, blob_store, row).map_err(|e| match e { + InsertError::Duplicate(e) => DatastoreError::from(TableError::Duplicate(e)), + InsertError::Bflatn(e) => DatastoreError::from(TableError::Bflatn(e)), + InsertError::IndexError(e) => DatastoreError::from(IndexError::UniqueConstraintViolation(e)), + })?; + + let row_arc: Arc<[ProductValue]> = Arc::from([row.clone()]); + let mut tx_data = TxData::default(); + tx_data.set_inserts_for_table(table_id, &schema.table_name, row_arc); + tx_data.set_tx_offset(self.next_tx_offset); + self.next_tx_offset += 1; + Ok(tx_data) + } + pub(super) fn get_table_and_blob_store_or_create<'this>( &'this mut self, table_id: TableId, diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index f152f3306af..0a27ffa9388 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -19,13 +19,14 @@ use crate::{ use crate::{ error::{IndexError, SequenceError, TableError}, system_tables::{ - with_sys_table_buf, StClientFields, StClientRow, StColumnAccessorFields, StColumnAccessorRow, StColumnFields, - StColumnRow, StConstraintFields, StConstraintRow, StEventTableRow, StFields as _, StIndexAccessorFields, - StIndexAccessorRow, StIndexFields, StIndexRow, StRowLevelSecurityFields, StRowLevelSecurityRow, - StScheduledFields, StScheduledRow, StSequenceFields, StSequenceRow, StTableAccessorFields, StTableAccessorRow, - StTableFields, StTableRow, SystemTable, ST_CLIENT_ID, ST_COLUMN_ACCESSOR_ID, ST_COLUMN_ID, ST_CONSTRAINT_ID, - ST_EVENT_TABLE_ID, ST_INDEX_ACCESSOR_ID, ST_INDEX_ID, ST_ROW_LEVEL_SECURITY_ID, ST_SCHEDULED_ID, - ST_SEQUENCE_ID, ST_TABLE_ACCESSOR_ID, ST_TABLE_ID, + with_sys_table_buf, St2pcCoordinatorLogFields, St2pcCoordinatorLogRow, St2pcStateFields, St2pcStateRow, + StClientFields, StClientRow, StColumnAccessorFields, StColumnAccessorRow, StColumnFields, StColumnRow, + StConstraintFields, StConstraintRow, StEventTableRow, StFields as _, StIndexAccessorFields, StIndexAccessorRow, + StIndexFields, StIndexRow, StRowLevelSecurityFields, StRowLevelSecurityRow, StScheduledFields, StScheduledRow, + StSequenceFields, StSequenceRow, StTableAccessorFields, StTableAccessorRow, StTableFields, StTableRow, + SystemTable, ST_2PC_COORDINATOR_LOG_ID, ST_2PC_STATE_ID, ST_CLIENT_ID, ST_COLUMN_ACCESSOR_ID, ST_COLUMN_ID, + ST_CONSTRAINT_ID, ST_EVENT_TABLE_ID, ST_INDEX_ACCESSOR_ID, ST_INDEX_ID, ST_ROW_LEVEL_SECURITY_ID, + ST_SCHEDULED_ID, ST_SEQUENCE_ID, ST_TABLE_ACCESSOR_ID, ST_TABLE_ID, }, }; use crate::{execution_context::ExecutionContext, system_tables::StViewColumnRow}; @@ -2611,6 +2612,110 @@ impl MutTxId { .map(|row| row.pointer()) } + /// Write the `st_2pc_state` PREPARE marker directly to the committed state and allocate a + /// `tx_offset` for it, **without** releasing the write lock or committing the pending + /// reducer changes in `tx_state`. + /// + /// Stores all fields needed for crash recovery (coordinator identity, reducer name/args, + /// caller context) so that, on restart, the participant can re-run the reducer and poll + /// the coordinator for a COMMIT or ABORT decision. + /// + /// Returns the `TxData` for the marker row (with its own `tx_offset`). The caller must + /// forward this to the durability worker so the PREPARE record becomes durable. + /// + /// The write lock remains held after this call. On ABORT the caller must delete the + /// `st_2pc_state` row in a subsequent transaction. + #[allow(clippy::too_many_arguments)] + pub fn flush_2pc_prepare_marker( + &mut self, + prepare_id: &str, + coordinator_identity_hex: String, + reducer_name: String, + args_bsatn: Vec, + caller_identity_hex: String, + caller_connection_id_hex: String, + timestamp_micros: i64, + ) -> Result { + let schema = self + .committed_state_write_lock + .get_schema(ST_2PC_STATE_ID) + .cloned() + .expect("st_2pc_state system table must exist in committed state"); + let row = ProductValue::from(St2pcStateRow { + prepare_id: prepare_id.to_owned(), + coordinator_identity_hex, + reducer_name, + args_bsatn, + caller_identity_hex, + caller_connection_id_hex, + timestamp_micros, + }); + self.committed_state_write_lock + .insert_row_and_consume_offset(ST_2PC_STATE_ID, &schema, &row) + } + + /// Delete the `st_2pc_state` row for the given `prepare_id`, called on COMMIT or ABORT. + pub fn delete_st_2pc_state(&mut self, prepare_id: &str) -> Result<()> { + if let Err(e) = self.delete_col_eq( + ST_2PC_STATE_ID, + St2pcStateFields::PrepareId.col_id(), + &AlgebraicValue::String(prepare_id.into()), + ) { + log::error!("delete_st_2pc_state: no row for prepare_id ({prepare_id}), error: {e}"); + } + Ok(()) + } + + /// Return all rows in `st_2pc_state` (prepared but not yet committed/aborted). + /// Used on recovery: each row describes a transaction to resume. + pub fn scan_st_2pc_state(&self) -> Result> { + self.iter(ST_2PC_STATE_ID)? + .map(|row| St2pcStateRow::try_from(row)) + .collect() + } + + /// Insert a row into `st_2pc_coordinator_log` recording that the coordinator has + /// decided COMMIT for `participant_prepare_id` on `participant_identity_hex`. + /// + /// Called inside the coordinator's open `MutTxId` so the entry is committed + /// atomically with the coordinator's own changes. + pub fn insert_st_2pc_coordinator_log( + &mut self, + participant_prepare_id: &str, + participant_identity_hex: &str, + ) -> Result<()> { + let row = &St2pcCoordinatorLogRow { + participant_prepare_id: participant_prepare_id.to_owned(), + participant_identity_hex: participant_identity_hex.to_owned(), + }; + self.insert_via_serialize_bsatn(ST_2PC_COORDINATOR_LOG_ID, row) + .map(|_| ()) + .inspect_err(|e| { + log::error!("insert_st_2pc_coordinator_log: failed for prepare_id ({participant_prepare_id}): {e}"); + }) + } + + /// Delete the coordinator log entry for `participant_prepare_id` once the participant + /// has acknowledged COMMIT. + pub fn delete_st_2pc_coordinator_log(&mut self, participant_prepare_id: &str) -> Result<()> { + if let Err(e) = self.delete_col_eq( + ST_2PC_COORDINATOR_LOG_ID, + St2pcCoordinatorLogFields::ParticipantPrepareId.col_id(), + &AlgebraicValue::String(participant_prepare_id.into()), + ) { + log::error!("delete_st_2pc_coordinator_log: no row for prepare_id ({participant_prepare_id}): {e}"); + } + Ok(()) + } + + /// Return all entries in the coordinator log (COMMIT decisions not yet acknowledged). + /// Used on coordinator crash-recovery to retransmit COMMIT to participants. + pub fn scan_st_2pc_coordinator_log(&self) -> Result> { + self.iter(ST_2PC_COORDINATOR_LOG_ID)? + .map(|row| St2pcCoordinatorLogRow::try_from(row)) + .collect() + } + pub fn insert_via_serialize_bsatn<'a, T: Serialize>( &'a mut self, table_id: TableId, diff --git a/crates/datastore/src/system_tables.rs b/crates/datastore/src/system_tables.rs index e75cc76b365..e19cc47fd74 100644 --- a/crates/datastore/src/system_tables.rs +++ b/crates/datastore/src/system_tables.rs @@ -88,6 +88,14 @@ pub const ST_TABLE_ACCESSOR_ID: TableId = TableId(18); pub const ST_INDEX_ACCESSOR_ID: TableId = TableId(19); /// The static ID of the table that maps canonical column names to accessor names pub const ST_COLUMN_ACCESSOR_ID: TableId = TableId(20); +/// The static ID of the 2PC participant state table +pub const ST_2PC_STATE_ID: TableId = TableId(21); +pub(crate) const ST_2PC_STATE_NAME: &str = "st_2pc_state"; +/// The static ID of the 2PC coordinator log table. +/// A row is written atomically with the coordinator's commit, before sending COMMIT to participants. +/// Used on coordinator crash-recovery to retransmit COMMIT decisions. +pub const ST_2PC_COORDINATOR_LOG_ID: TableId = TableId(22); +pub(crate) const ST_2PC_COORDINATOR_LOG_NAME: &str = "st_2pc_coordinator_log"; pub(crate) const ST_CONNECTION_CREDENTIALS_NAME: &str = "st_connection_credentials"; pub const ST_TABLE_NAME: &str = "st_table"; @@ -205,7 +213,7 @@ pub enum SystemTable { st_table_accessor, } -pub fn system_tables() -> [TableSchema; 20] { +pub fn system_tables() -> [TableSchema; 22] { [ // The order should match the `id` of the system table, that start with [ST_TABLE_IDX]. st_table_schema(), @@ -228,6 +236,8 @@ pub fn system_tables() -> [TableSchema; 20] { st_table_accessor_schema(), st_index_accessor_schema(), st_column_accessor_schema(), + st_2pc_state_schema(), + st_2pc_coordinator_log_schema(), ] } @@ -276,6 +286,8 @@ pub(crate) const ST_EVENT_TABLE_IDX: usize = 16; pub(crate) const ST_TABLE_ACCESSOR_IDX: usize = 17; pub(crate) const ST_INDEX_ACCESSOR_IDX: usize = 18; pub(crate) const ST_COLUMN_ACCESSOR_IDX: usize = 19; +pub(crate) const ST_2PC_STATE_IDX: usize = 20; +pub(crate) const ST_2PC_COORDINATOR_LOG_IDX: usize = 21; macro_rules! st_fields_enum { ($(#[$attr:meta])* enum $ty_name:ident { $($name:expr, $var:ident = $discr:expr,)* }) => { @@ -450,6 +462,23 @@ st_fields_enum!(enum StColumnAccessorFields { "accessor_name", AccessorName = 2, }); +// WARNING: For a stable schema, don't change the field names and discriminants. +st_fields_enum!(enum St2pcStateFields { + "prepare_id", PrepareId = 0, + "coordinator_identity_hex",CoordinatorIdentityHex = 1, + "reducer_name", ReducerName = 2, + "args_bsatn", ArgsBsatn = 3, + "caller_identity_hex", CallerIdentityHex = 4, + "caller_connection_id_hex", CallerConnectionIdHex = 5, + "timestamp_micros", TimestampMicros = 6, +}); + +// WARNING: For a stable schema, don't change the field names and discriminants. +st_fields_enum!(enum St2pcCoordinatorLogFields { + "participant_prepare_id", ParticipantPrepareId = 0, + "participant_identity_hex", ParticipantIdentityHex = 1, +}); + /// Helper method to check that a system table has the correct fields. /// Does not check field types since those aren't included in `StFields` types. /// If anything in here is not true, the system is completely broken, so it's fine to assert. @@ -668,6 +697,25 @@ fn system_module_def() -> ModuleDef { .with_unique_constraint(st_column_accessor_table_alias_cols) .with_index_no_accessor_name(btree(st_column_accessor_table_alias_cols)); + let st_2pc_state_type = builder.add_type::(); + builder + .build_table(ST_2PC_STATE_NAME, *st_2pc_state_type.as_ref().expect("should be ref")) + .with_type(TableType::System) + .with_unique_constraint(St2pcStateFields::PrepareId) + .with_index_no_accessor_name(btree(St2pcStateFields::PrepareId)) + .with_access(v9::TableAccess::Private); + + let st_2pc_coordinator_log_type = builder.add_type::(); + builder + .build_table( + ST_2PC_COORDINATOR_LOG_NAME, + *st_2pc_coordinator_log_type.as_ref().expect("should be ref"), + ) + .with_type(TableType::System) + .with_unique_constraint(St2pcCoordinatorLogFields::ParticipantPrepareId) + .with_index_no_accessor_name(btree(St2pcCoordinatorLogFields::ParticipantPrepareId)) + .with_access(v9::TableAccess::Private); + let result = builder .finish() .try_into() @@ -693,6 +741,8 @@ fn system_module_def() -> ModuleDef { validate_system_table::(&result, ST_TABLE_ACCESSOR_NAME); validate_system_table::(&result, ST_INDEX_ACCESSOR_NAME); validate_system_table::(&result, ST_COLUMN_ACCESSOR_NAME); + validate_system_table::(&result, ST_2PC_STATE_NAME); + validate_system_table::(&result, ST_2PC_COORDINATOR_LOG_NAME); result } @@ -741,6 +791,8 @@ lazy_static::lazy_static! { m.insert("st_index_accessor_accessor_name_key", ConstraintId(23)); m.insert("st_column_accessor_table_name_col_name_key", ConstraintId(24)); m.insert("st_column_accessor_table_name_accessor_name_key", ConstraintId(25)); + m.insert("st_2pc_state_prepare_id_key", ConstraintId(26)); + m.insert("st_2pc_coordinator_log_participant_prepare_id_key", ConstraintId(27)); m }; } @@ -779,6 +831,8 @@ lazy_static::lazy_static! { m.insert("st_index_accessor_accessor_name_idx_btree", IndexId(27)); m.insert("st_column_accessor_table_name_col_name_idx_btree", IndexId(28)); m.insert("st_column_accessor_table_name_accessor_name_idx_btree", IndexId(29)); + m.insert("st_2pc_state_prepare_id_idx_btree", IndexId(30)); + m.insert("st_2pc_coordinator_log_participant_prepare_id_idx_btree", IndexId(31)); m }; } @@ -892,6 +946,14 @@ fn st_client_schema() -> TableSchema { st_schema(ST_CLIENT_NAME, ST_CLIENT_ID) } +fn st_2pc_state_schema() -> TableSchema { + st_schema(ST_2PC_STATE_NAME, ST_2PC_STATE_ID) +} + +fn st_2pc_coordinator_log_schema() -> TableSchema { + st_schema(ST_2PC_COORDINATOR_LOG_NAME, ST_2PC_COORDINATOR_LOG_ID) +} + fn st_connection_credential_schema() -> TableSchema { st_schema(ST_CONNECTION_CREDENTIALS_NAME, ST_CONNECTION_CREDENTIALS_ID) } @@ -968,6 +1030,8 @@ pub(crate) fn system_table_schema(table_id: TableId) -> Option { ST_TABLE_ACCESSOR_ID => Some(st_table_accessor_schema()), ST_INDEX_ACCESSOR_ID => Some(st_index_accessor_schema()), ST_COLUMN_ACCESSOR_ID => Some(st_column_accessor_schema()), + ST_2PC_STATE_ID => Some(st_2pc_state_schema()), + ST_2PC_COORDINATOR_LOG_ID => Some(st_2pc_coordinator_log_schema()), _ => None, } } @@ -1859,6 +1923,77 @@ impl From for ProductValue { } } +/// System table [ST_2PC_STATE_NAME] +/// +/// Tracks in-flight 2PC participant transactions. +/// A row is inserted when B enters PREPARE state and deleted on COMMIT or ABORT. +/// On recovery, any row here indicates a pending prepared transaction that must +/// be resumed: B re-runs the reducer with the stored args, then polls the coordinator +/// for a COMMIT or ABORT decision (B never aborts on its own). +#[derive(Clone, Debug, Eq, PartialEq, SpacetimeType)] +#[sats(crate = spacetimedb_lib)] +pub struct St2pcStateRow { + /// The unique prepare ID for this transaction, generated by this participant. + pub prepare_id: String, + /// Hex-encoded identity of the coordinator database (A). Used on recovery to query + /// `GET /v1/database/{coordinator}/2pc/status/{prepare_id}` for the decision. + pub coordinator_identity_hex: String, + /// Name of the reducer that was prepared. + pub reducer_name: String, + /// BSATN-encoded reducer arguments. Re-used when replaying the reducer on recovery. + pub args_bsatn: Vec, + /// Hex-encoded identity of the original caller. + pub caller_identity_hex: String, + /// Hex-encoded connection ID of the original caller ("0" if none). + pub caller_connection_id_hex: String, + /// Timestamp of the original call (microseconds since Unix epoch). + pub timestamp_micros: i64, +} + +impl TryFrom> for St2pcStateRow { + type Error = DatastoreError; + fn try_from(row: RowRef<'_>) -> Result { + read_via_bsatn(row) + } +} + +impl From for ProductValue { + fn from(x: St2pcStateRow) -> Self { + to_product_value(&x) + } +} + +/// System table [ST_2PC_COORDINATOR_LOG_NAME] +/// +/// Written atomically with the coordinator's COMMIT transaction (one row per participant). +/// Used on coordinator crash-recovery to retransmit COMMIT decisions to participants. +/// Also serves as the authoritative answer for the status endpoint: +/// - present → COMMIT (coordinator decided COMMIT for this participant prepare_id) +/// - absent → ABORT (coordinator never committed, or already cleaned up) +/// +/// A row is deleted after the participant acknowledges the COMMIT. +#[derive(Clone, Debug, Eq, PartialEq, SpacetimeType)] +#[sats(crate = spacetimedb_lib)] +pub struct St2pcCoordinatorLogRow { + /// The participant's prepare_id (B's prepare_id). + pub participant_prepare_id: String, + /// Hex-encoded identity of the participant database (B). + pub participant_identity_hex: String, +} + +impl TryFrom> for St2pcCoordinatorLogRow { + type Error = DatastoreError; + fn try_from(row: RowRef<'_>) -> Result { + read_via_bsatn(row) + } +} + +impl From for ProductValue { + fn from(x: St2pcCoordinatorLogRow) -> Self { + to_product_value(&x) + } +} + thread_local! { static READ_BUF: RefCell> = const { RefCell::new(Vec::new()) }; } diff --git a/crates/smoketests/tests/smoketests/cross_db_2pc.rs b/crates/smoketests/tests/smoketests/cross_db_2pc.rs new file mode 100644 index 00000000000..3359fb115bd --- /dev/null +++ b/crates/smoketests/tests/smoketests/cross_db_2pc.rs @@ -0,0 +1,193 @@ +use spacetimedb_smoketests::Smoketest; + +/// Module code for the 2PC test. +/// +/// All three databases (A = coordinator, B and C = participants) use the same module. +/// +/// Tables: +/// - `Ledger(account: String PK, balance: i64)` -- stores account balances. +/// +/// Reducers: +/// - `init`: seeds "alice" with balance 100. +/// - `balance(account) -> i64`: returns the current balance for an account. +/// - `debit(account, amount)`: decrements balance, panics if insufficient funds. +/// - `credit(account, amount)`: increments balance (or inserts if absent). +/// - `transfer_funds(b_hex, c_hex, from_account, to_account, amount) -> TransferResult`: +/// Credits `amount * 2` to `to_account` locally (collecting `amount` from each of B and C), +/// then calls `debit(from_account, amount)` on both B and C via `call_reducer_on_db_2pc`. +/// If either remote debit fails, all three databases are rolled back atomically. +/// On success, returns the new local balance so the caller can verify without a second query. +const MODULE_CODE: &str = r#" +use spacetimedb::{log, ReducerContext, Table, Identity}; + +#[spacetimedb::table(accessor = ledger, public)] +pub struct Ledger { + #[primary_key] + account: String, + balance: i64, +} + +#[spacetimedb::reducer(init)] +pub fn init(ctx: &ReducerContext) { + ctx.db.ledger().insert(Ledger { account: "alice".to_string(), balance: 100 }); +} + +/// Returns the current balance for `account`. +#[spacetimedb::reducer] +pub fn balance(ctx: &ReducerContext, account: String) -> Result { + ctx.db.ledger().account().find(&account) + .map(|r| r.balance) + .ok_or_else(|| format!("account '{}' not found", account)) +} + +#[spacetimedb::reducer] +pub fn debit(ctx: &ReducerContext, account: String, amount: i64) { + let row = ctx.db.ledger().account().find(&account) + .unwrap_or_else(|| panic!("account '{}' not found", account)); + let new_balance = row.balance - amount; + if new_balance < 0 { + panic!("insufficient funds: account '{}' has {} but tried to debit {}", account, row.balance, amount); + } + ctx.db.ledger().account().update(Ledger { account, balance: new_balance }); +} + +#[spacetimedb::reducer] +pub fn credit(ctx: &ReducerContext, account: String, amount: i64) { + match ctx.db.ledger().account().find(&account) { + Some(row) => { + ctx.db.ledger().account().update(Ledger { account, balance: row.balance + amount }); + } + None => { + ctx.db.ledger().insert(Ledger { account, balance: amount }); + } + } +} + +/// Transfer `amount` from `from_account` on both B and C to `to_account` on A (locally). +/// +/// Returns the new local balance of `to_account` so the caller can verify correctness +/// without issuing a separate query. +/// +/// If either remote debit fails (insufficient funds), returns Err and the 2PC protocol +/// rolls back all three databases atomically. +#[spacetimedb::reducer] +pub fn transfer_funds(ctx: &ReducerContext, b_hex: String, c_hex: String, from_account: String, to_account: String, amount: i64) -> Result { + credit(ctx, to_account.clone(), amount * 2); + + let b = Identity::from_hex(&b_hex).map_err(|e| format!("invalid B identity: {e}"))?; + let args_b = spacetimedb::spacetimedb_lib::bsatn::to_vec(&(from_account.clone(), amount)).map_err(|e| format!("failed to encode args: {e}"))?; + spacetimedb::remote_reducer::call_reducer_on_db_2pc(b, "debit", &args_b) + .map_err(|e| format!("debit on B failed: {e}"))?; + log::info!("transfer_funds: debit on B succeeded"); + + let c = Identity::from_hex(&c_hex).map_err(|e| format!("invalid C identity: {e}"))?; + let args_c = spacetimedb::spacetimedb_lib::bsatn::to_vec(&(from_account, amount)).map_err(|e| format!("failed to encode args: {e}"))?; + spacetimedb::remote_reducer::call_reducer_on_db_2pc(c, "debit", &args_c) + .map_err(|e| format!("debit on C failed: {e}"))?; + log::info!("transfer_funds: debit on C succeeded"); + + // Return new local balance so the caller can assert correctness immediately. + ctx.db.ledger().account().find(&to_account) + .map(|r| r.balance) + .ok_or_else(|| format!("account '{}' not found after credit", to_account)) +} +"#; + +/// Call `balance(account)` on `db_identity` via the HTTP API and return the i64 result. +fn call_balance(test: &Smoketest, db_identity: &str, account: &str) -> i64 { + let resp = test + .api_call_json( + "POST", + &format!("/v1/database/{db_identity}/call/balance"), + &format!("[\"{account}\"]"), + ) + .unwrap_or_else(|e| panic!("balance call failed for {db_identity}: {e}")); + assert!(resp.is_success(), "balance reducer returned {}", resp.status_code); + resp.json() + .unwrap_or_else(|e| panic!("failed to parse balance JSON: {e}")) + .as_i64() + .unwrap_or_else(|| panic!("balance JSON was not an integer")) +} + +/// Happy path: transfer 30 from both B's alice and C's alice to A's alice. +/// +/// The coordinator reducer returns the new local balance (160), which is used directly +/// to assert A's result. B and C balances are verified via `balance` reducer calls. +/// +/// Expected: A=160, B=70, C=70. +#[test] +fn test_cross_db_2pc_happy_path() { + let pid = std::process::id(); + let db_a_name = format!("2pc-bank-a-{pid}"); + let db_b_name = format!("2pc-bank-b-{pid}"); + let db_c_name = format!("2pc-bank-c-{pid}"); + + let mut test = Smoketest::builder().module_code(MODULE_CODE).autopublish(false).build(); + + // Publish participants first, then coordinator. + test.publish_module_named(&db_b_name, false).expect("failed to publish bank B"); + let db_b_identity = test.database_identity.clone().expect("bank B identity not set"); + + test.publish_module_named(&db_c_name, false).expect("failed to publish bank C"); + let db_c_identity = test.database_identity.clone().expect("bank C identity not set"); + + test.publish_module_named(&db_a_name, false).expect("failed to publish bank A"); + let db_a_identity = test.database_identity.clone().expect("bank A identity not set"); + + // Call transfer_funds; the return value is A's new alice balance. + let resp = test + .api_call_json( + "POST", + &format!("/v1/database/{db_a_identity}/call/transfer_funds"), + &format!("[\"{db_b_identity}\", \"{db_c_identity}\", \"alice\", \"alice\", 30]"), + ) + .expect("transfer_funds call failed"); + assert!(resp.is_success(), "transfer_funds failed: {}", resp.status_code); + let new_a_balance = resp.json().expect("invalid JSON").as_i64().expect("not i64"); + assert_eq!(new_a_balance, 160, "transfer_funds return value: expected A alice=160"); + + // Verify B and C via balance reducer. + assert_eq!(call_balance(&test, &db_b_identity, "alice"), 70, "B alice should be 70"); + assert_eq!(call_balance(&test, &db_c_identity, "alice"), 70, "C alice should be 70"); +} + +/// Abort path: try to transfer 110 from B and C, but both only have 100. +/// +/// B's debit fails (insufficient funds), so the coordinator reducer panics and the +/// 2PC protocol rolls back all three databases. We verify via `balance` reducer calls +/// that every account is still at 100. +/// +/// Expected: A=100, B=100, C=100. +#[test] +fn test_cross_db_2pc_abort_insufficient_funds() { + let pid = std::process::id(); + let db_a_name = format!("2pc-abort-a-{pid}"); + let db_b_name = format!("2pc-abort-b-{pid}"); + let db_c_name = format!("2pc-abort-c-{pid}"); + + let mut test = Smoketest::builder().module_code(MODULE_CODE).autopublish(false).build(); + + test.publish_module_named(&db_b_name, false).expect("failed to publish bank B"); + let db_b_identity = test.database_identity.clone().expect("bank B identity not set"); + + test.publish_module_named(&db_c_name, false).expect("failed to publish bank C"); + let db_c_identity = test.database_identity.clone().expect("bank C identity not set"); + + test.publish_module_named(&db_a_name, false).expect("failed to publish bank A"); + let db_a_identity = test.database_identity.clone().expect("bank A identity not set"); + + // Transfer 110 from each — both only have 100, so B's debit panics → 2PC aborts all. + let resp = test + .api_call_json( + "POST", + &format!("/v1/database/{db_a_identity}/call/transfer_funds"), + &format!("[\"{db_b_identity}\", \"{db_c_identity}\", \"alice\", \"alice\", 110]"), + ) + .expect("api_call failed"); + assert!(!resp.is_success(), "Expected transfer_funds to fail due to insufficient funds"); + + // All three accounts must still be at 100. + assert_eq!(call_balance(&test, &db_a_identity, "alice"), 100, "A alice should still be 100"); + assert_eq!(call_balance(&test, &db_b_identity, "alice"), 100, "B alice should still be 100"); + assert_eq!(call_balance(&test, &db_c_identity, "alice"), 100, "C alice should still be 100"); +} diff --git a/crates/smoketests/tests/smoketests/cross_db_2pc_recovery.rs b/crates/smoketests/tests/smoketests/cross_db_2pc_recovery.rs new file mode 100644 index 00000000000..de1c06f79c1 --- /dev/null +++ b/crates/smoketests/tests/smoketests/cross_db_2pc_recovery.rs @@ -0,0 +1,401 @@ +use spacetimedb_guard::ensure_binaries_built; +use spacetimedb_smoketests::{require_local_server, Smoketest}; +use std::time::Duration; + +/// Module code used for all recovery tests. +/// +/// All three databases (A = coordinator, B and C = participants) use the same module. +/// +/// `transfer_funds_slow` calls `debit_slow` on B and regular `debit` on C, creating +/// a reliable ~2-3s window while B's slow reducer is executing — useful for crash tests. +const MODULE_CODE: &str = r#" +use spacetimedb::{log, ReducerContext, Table, Identity}; + +#[spacetimedb::table(accessor = ledger, public)] +pub struct Ledger { + #[primary_key] + account: String, + balance: i64, +} + +#[spacetimedb::reducer(init)] +pub fn init(ctx: &ReducerContext) { + ctx.db.ledger().insert(Ledger { account: "alice".to_string(), balance: 100 }); +} + +/// Returns the current balance for `account`. +#[spacetimedb::reducer] +pub fn balance(ctx: &ReducerContext, account: String) -> Result { + ctx.db.ledger().account().find(&account) + .map(|r| r.balance) + .ok_or_else(|| format!("account '{}' not found", account)) +} + +#[spacetimedb::reducer] +pub fn debit(ctx: &ReducerContext, account: String, amount: i64) { + let row = ctx.db.ledger().account().find(&account) + .unwrap_or_else(|| panic!("account '{}' not found", account)); + let new_balance = row.balance - amount; + if new_balance < 0 { + panic!("insufficient funds: account '{}' has {} but tried to debit {}", account, row.balance, amount); + } + ctx.db.ledger().account().update(Ledger { account, balance: new_balance }); +} + +/// Same as `debit` but wastes ~2-3 seconds of CPU first. +/// This creates a reliable timing window for crash recovery tests: +/// the server can be killed while this reducer is executing or just after. +#[spacetimedb::reducer] +pub fn debit_slow(ctx: &ReducerContext, account: String, amount: i64) { + // Busy-wait loop. ~100M multiply-add iterations ≈ 2-3s in WASM. + let mut x: u64 = ctx.timestamp.to_micros_since_unix_epoch() as u64; + for i in 0u64..100_000_000 { + x = x.wrapping_mul(6364136223846793005u64).wrapping_add(i | 1); + } + if x == 0 { panic!("impossible: loop result was zero"); } + debit(ctx, account, amount); +} + +#[spacetimedb::reducer] +pub fn credit(ctx: &ReducerContext, account: String, amount: i64) { + match ctx.db.ledger().account().find(&account) { + Some(row) => { + ctx.db.ledger().account().update(Ledger { account, balance: row.balance + amount }); + } + None => { + ctx.db.ledger().insert(Ledger { account, balance: amount }); + } + } +} + +/// Transfer `amount` from `from_account` on both B and C to `to_account` on A. +/// A credits `amount * 2` locally, then calls `debit(from_account, amount)` on each +/// of B and C via 2PC. If either fails, all three roll back atomically. +#[spacetimedb::reducer] +pub fn transfer_funds(ctx: &ReducerContext, b_hex: String, c_hex: String, from_account: String, to_account: String, amount: i64) { + credit(ctx, to_account.clone(), amount * 2); + + let b = Identity::from_hex(&b_hex).expect("invalid B identity"); + let args_b = spacetimedb::spacetimedb_lib::bsatn::to_vec(&(from_account.clone(), amount)).expect("failed to encode args"); + match spacetimedb::remote_reducer::call_reducer_on_db_2pc(b, "debit", &args_b) { + Ok(()) => log::info!("transfer_funds: debit on B succeeded"), + Err(e) => panic!("debit on B failed: {e}"), + } + + let c = Identity::from_hex(&c_hex).expect("invalid C identity"); + let args_c = spacetimedb::spacetimedb_lib::bsatn::to_vec(&(from_account, amount)).expect("failed to encode args"); + match spacetimedb::remote_reducer::call_reducer_on_db_2pc(c, "debit", &args_c) { + Ok(()) => log::info!("transfer_funds: debit on C succeeded"), + Err(e) => panic!("debit on C failed: {e}"), + } +} + +/// Same as `transfer_funds` but calls `debit_slow` on B and regular `debit` on C. +/// The slow call on B creates a ~2-3s window for crash recovery tests. +#[spacetimedb::reducer] +pub fn transfer_funds_slow(ctx: &ReducerContext, b_hex: String, c_hex: String, from_account: String, to_account: String, amount: i64) { + credit(ctx, to_account.clone(), amount * 2); + + let b = Identity::from_hex(&b_hex).expect("invalid B identity"); + let args_b = spacetimedb::spacetimedb_lib::bsatn::to_vec(&(from_account.clone(), amount)).expect("failed to encode args"); + match spacetimedb::remote_reducer::call_reducer_on_db_2pc(b, "debit_slow", &args_b) { + Ok(()) => log::info!("transfer_funds_slow: debit_slow on B succeeded"), + Err(e) => panic!("debit_slow on B failed: {e}"), + } + + let c = Identity::from_hex(&c_hex).expect("invalid C identity"); + let args_c = spacetimedb::spacetimedb_lib::bsatn::to_vec(&(from_account, amount)).expect("failed to encode args"); + match spacetimedb::remote_reducer::call_reducer_on_db_2pc(c, "debit", &args_c) { + Ok(()) => log::info!("transfer_funds_slow: debit on C succeeded"), + Err(e) => panic!("debit on C failed: {e}"), + } +} +"#; + +/// Spawn a background thread that fires `transfer_funds_slow` and ignores the result. +/// +/// Used to start a long-running 2PC in the background so the main thread can crash +/// the server mid-flight. The call is expected to fail with a connection error when +/// the server is restarted. +fn spawn_transfer_funds_slow( + server_url: String, + config_path: std::path::PathBuf, + db_a_identity: String, + db_b_identity: String, + db_c_identity: String, + amount: i64, +) -> std::thread::JoinHandle<()> { + std::thread::spawn(move || { + let cli = ensure_binaries_built(); + let _ = std::process::Command::new(&cli) + .arg("--config-path") + .arg(&config_path) + .args([ + "call", + "--server", + &server_url, + "--", + &db_a_identity, + "transfer_funds_slow", + &db_b_identity, + &db_c_identity, + "alice", + "alice", + &amount.to_string(), + ]) + .output(); + }) +} + +/// Call the `balance(account)` reducer on `db_identity` and return the i64 result. +fn alice_balance(test: &Smoketest, db_identity: &str) -> i64 { + let resp = test + .api_call_json( + "POST", + &format!("/v1/database/{db_identity}/call/balance"), + "[\"alice\"]", + ) + .unwrap_or_else(|e| panic!("balance call failed for {db_identity}: {e}")); + assert!(resp.is_success(), "balance reducer returned {}", resp.status_code); + resp.json() + .unwrap_or_else(|e| panic!("failed to parse balance JSON: {e}")) + .as_i64() + .unwrap_or_else(|| panic!("balance JSON was not an integer")) +} + +/// Set up three databases (A = coordinator, B and C = participants) on the same server. +/// Returns `(db_a_identity, db_b_identity, db_c_identity)`. `test.database_identity` points to A. +fn setup_three_banks(test: &mut Smoketest, pid: u32, suffix: &str) -> (String, String, String) { + let db_b_name = format!("2pc-rec-b-{pid}-{suffix}"); + let db_c_name = format!("2pc-rec-c-{pid}-{suffix}"); + let db_a_name = format!("2pc-rec-a-{pid}-{suffix}"); + + test.publish_module_named(&db_b_name, false) + .expect("failed to publish bank B"); + let db_b_identity = test.database_identity.clone().expect("bank B identity"); + + test.publish_module_named(&db_c_name, false) + .expect("failed to publish bank C"); + let db_c_identity = test.database_identity.clone().expect("bank C identity"); + + test.publish_module_named(&db_a_name, false) + .expect("failed to publish bank A"); + let db_a_identity = test.database_identity.clone().expect("bank A identity"); + + (db_a_identity, db_b_identity, db_c_identity) +} + +// ───────────────────────────────────────────────────────────────────────────── +// Test 1: committed data survives a full server restart. +// ───────────────────────────────────────────────────────────────────────────── +#[test] +fn test_2pc_committed_data_survives_restart() { + require_local_server!(); + let pid = std::process::id(); + let mut test = Smoketest::builder().module_code(MODULE_CODE).autopublish(false).build(); + + let (db_a_identity, db_b_identity, db_c_identity) = setup_three_banks(&mut test, pid, "dur"); + + // Successful 2PC: transfer 30 from both B and C to A. + test.call("transfer_funds", &[&db_b_identity, &db_c_identity, "alice", "alice", "30"]) + .expect("transfer_funds failed"); + + assert_eq!(alice_balance(&test, &db_a_identity), 160, "A should have 160 before restart"); + assert_eq!(alice_balance(&test, &db_b_identity), 70, "B should have 70 before restart"); + assert_eq!(alice_balance(&test, &db_c_identity), 70, "C should have 70 before restart"); + + test.restart_server(); + + assert_eq!(alice_balance(&test, &db_a_identity), 160, "A's committed data should survive restart"); + assert_eq!(alice_balance(&test, &db_b_identity), 70, "B's committed data should survive restart"); + assert_eq!(alice_balance(&test, &db_c_identity), 70, "C's committed data should survive restart"); +} + +// ───────────────────────────────────────────────────────────────────────────── +// Test 2: aborted 2PC rollback also survives a restart. +// ───────────────────────────────────────────────────────────────────────────── +#[test] +fn test_2pc_aborted_state_survives_restart() { + require_local_server!(); + let pid = std::process::id(); + let mut test = Smoketest::builder().module_code(MODULE_CODE).autopublish(false).build(); + + let (db_a_identity, db_b_identity, db_c_identity) = setup_three_banks(&mut test, pid, "abort-dur"); + + // Transfer 110 from each — both only have 100, so B's debit panics → abort. + let _ = test.call("transfer_funds", &[&db_b_identity, &db_c_identity, "alice", "alice", "110"]); + + assert_eq!(alice_balance(&test, &db_a_identity), 100, "A should still be 100 after abort"); + assert_eq!(alice_balance(&test, &db_b_identity), 100, "B should still be 100 after abort"); + assert_eq!(alice_balance(&test, &db_c_identity), 100, "C should still be 100 after abort"); + + test.restart_server(); + + assert_eq!(alice_balance(&test, &db_a_identity), 100, "A's aborted rollback should survive restart"); + assert_eq!(alice_balance(&test, &db_b_identity), 100, "B's aborted rollback should survive restart"); + assert_eq!(alice_balance(&test, &db_c_identity), 100, "C's aborted rollback should survive restart"); +} + +// ───────────────────────────────────────────────────────────────────────────── +// Test 3: status endpoint returns "abort" for an unknown prepare_id. +// ───────────────────────────────────────────────────────────────────────────── +#[test] +fn test_2pc_status_endpoint_unknown_returns_abort() { + let pid = std::process::id(); + let mut test = Smoketest::builder().module_code(MODULE_CODE).autopublish(false).build(); + + let (db_a_identity, _db_b_identity, _db_c_identity) = setup_three_banks(&mut test, pid, "status"); + + let resp = test + .api_call( + "GET", + &format!("/v1/database/{db_a_identity}/2pc/status/nonexistent-prepare-id"), + ) + .expect("api_call failed"); + + assert_eq!(resp.status_code, 200, "status endpoint should return 200"); + let body_text = resp.text().expect("response body is not UTF-8"); + assert_eq!( + body_text.trim(), + "abort", + "unknown prepare_id should return 'abort', got: {:?}", + body_text + ); +} + +// ───────────────────────────────────────────────────────────────────────────── +// Test 4: 2PC atomicity is maintained when the server crashes mid-flight. +// +// `transfer_funds_slow` calls `debit_slow` on B (~2-3s) then `debit` on C. +// We crash after 1s (B is definitely mid-execution). After restart, all three +// databases must agree: either all committed (A=160, B=70, C=70) or all rolled +// back (A=100, B=100, C=100). +// ───────────────────────────────────────────────────────────────────────────── +#[test] +fn test_2pc_atomicity_under_crash() { + require_local_server!(); + let pid = std::process::id(); + let mut test = Smoketest::builder().module_code(MODULE_CODE).autopublish(false).build(); + + let (db_a_identity, db_b_identity, db_c_identity) = setup_three_banks(&mut test, pid, "crash"); + + let _call_thread = spawn_transfer_funds_slow( + test.server_url.clone(), + test.config_path.clone(), + db_a_identity.clone(), + db_b_identity.clone(), + db_c_identity.clone(), + 30, + ); + + std::thread::sleep(Duration::from_millis(1000)); + test.restart_server(); + + std::thread::sleep(Duration::from_secs(5)); + + let bal_a = alice_balance(&test, &db_a_identity); + let bal_b = alice_balance(&test, &db_b_identity); + let bal_c = alice_balance(&test, &db_c_identity); + + let both_committed = bal_a == 160 && bal_b == 70 && bal_c == 70; + let both_rolled_back = bal_a == 100 && bal_b == 100 && bal_c == 100; + assert!( + both_committed || both_rolled_back, + "2PC atomicity violated after crash: A={bal_a}, B={bal_b}, C={bal_c}. \ + Expected either (160, 70, 70) or (100, 100, 100)." + ); +} + +// ───────────────────────────────────────────────────────────────────────────── +// Test 5: coordinator recovery — A crashes after writing its coordinator log, +// before B and C commit. +// +// `transfer_funds_slow` calls `debit_slow` on B (~2-3s) then `debit` on C. +// We poll until A=160 (A committed, coordinator log written for both B and C), +// then crash. At this point B is still inside `debit_slow` awaiting COMMIT. +// Recovery must bring all three to the committed state: A=160, B=70, C=70. +// ───────────────────────────────────────────────────────────────────────────── +#[test] +fn test_2pc_coordinator_recovery() { + require_local_server!(); + let pid = std::process::id(); + let mut test = Smoketest::builder().module_code(MODULE_CODE).autopublish(false).build(); + + let (db_a_identity, db_b_identity, db_c_identity) = setup_three_banks(&mut test, pid, "coord-rec"); + + let _call_thread = spawn_transfer_funds_slow( + test.server_url.clone(), + test.config_path.clone(), + db_a_identity.clone(), + db_b_identity.clone(), + db_c_identity.clone(), + 30, + ); + + // Wait until A has committed (alice_a=160), meaning both B and C have sent PREPARED + // and A's coordinator log entries for both are on disk. + let deadline = std::time::Instant::now() + Duration::from_secs(30); + loop { + std::thread::sleep(Duration::from_millis(100)); + if alice_balance(&test, &db_a_identity) == 160 { + break; + } + if std::time::Instant::now() > deadline { + panic!("timed out waiting for A to commit"); + } + } + + // Crash: A has coordinator log for both B and C; B is waiting in decision_rx for COMMIT. + test.restart_server(); + + // Allow recovery to complete. + std::thread::sleep(Duration::from_secs(5)); + + assert_eq!(alice_balance(&test, &db_a_identity), 160, "A should remain committed"); + assert_eq!(alice_balance(&test, &db_b_identity), 70, "B should have committed via coordinator recovery"); + assert_eq!(alice_balance(&test, &db_c_identity), 70, "C should have committed via coordinator recovery"); +} + +// ───────────────────────────────────────────────────────────────────────────── +// Test 6: participant recovery — crash before A commits. +// +// We crash early (~500ms into the slow debit on B). A has not yet received +// PREPARED from B, so A has no coordinator log. After restart B (and possibly C) +// recover by polling A's status endpoint, which returns "abort". Both sides +// must end up consistent. +// ───────────────────────────────────────────────────────────────────────────── +#[test] +fn test_2pc_participant_recovery_polls_and_aborts() { + require_local_server!(); + let pid = std::process::id(); + let mut test = Smoketest::builder().module_code(MODULE_CODE).autopublish(false).build(); + + let (db_a_identity, db_b_identity, db_c_identity) = setup_three_banks(&mut test, pid, "part-rec"); + + let _call_thread = spawn_transfer_funds_slow( + test.server_url.clone(), + test.config_path.clone(), + db_a_identity.clone(), + db_b_identity.clone(), + db_c_identity.clone(), + 30, + ); + + // Crash early: B's slow reducer is mid-execution, A has no coordinator log yet. + std::thread::sleep(Duration::from_millis(500)); + test.restart_server(); + + // Allow participant recovery to settle (polls status every 5s). + std::thread::sleep(Duration::from_secs(15)); + + let bal_a = alice_balance(&test, &db_a_identity); + let bal_b = alice_balance(&test, &db_b_identity); + let bal_c = alice_balance(&test, &db_c_identity); + + let both_committed = bal_a == 160 && bal_b == 70 && bal_c == 70; + let both_rolled_back = bal_a == 100 && bal_b == 100 && bal_c == 100; + assert!( + both_committed || both_rolled_back, + "Inconsistent state after participant recovery: A={bal_a}, B={bal_b}, C={bal_c}" + ); +} diff --git a/crates/smoketests/tests/smoketests/mod.rs b/crates/smoketests/tests/smoketests/mod.rs index 18ad7b51199..f6acc606dd2 100644 --- a/crates/smoketests/tests/smoketests/mod.rs +++ b/crates/smoketests/tests/smoketests/mod.rs @@ -9,6 +9,8 @@ mod client_connection_errors; mod confirmed_reads; mod connect_disconnect_from_cli; mod create_project; +mod cross_db_2pc; +mod cross_db_2pc_recovery; mod cross_db_reducer; mod csharp_module; mod default_module_clippy; diff --git a/tools/tpcc-runner/src/loader.rs b/tools/tpcc-runner/src/loader.rs index 14b15134056..559aa819d79 100644 --- a/tools/tpcc-runner/src/loader.rs +++ b/tools/tpcc-runner/src/loader.rs @@ -112,7 +112,7 @@ fn run_one_database(config: &LoadConfig, database_number: u32, topology: &Databa }); log::info!("tpcc load for database {database_identity} finished"); - Ok(()) + Ok(()) }) }