diff --git a/crates/bindings-sys/src/lib.rs b/crates/bindings-sys/src/lib.rs index 8854ae393b6..927c444a38d 100644 --- a/crates/bindings-sys/src/lib.rs +++ b/crates/bindings-sys/src/lib.rs @@ -896,6 +896,23 @@ pub mod raw { args_len: u32, out: *mut BytesSource, ) -> u16; + + /// 2PC variant of `call_reducer_on_db`. + /// + /// Calls the target database's `/prepare/{reducer}` endpoint instead of `/call/{reducer}`. + /// On success, the runtime stores the `prepare_id` internally. + /// After the coordinator's reducer commits, all participants are committed. + /// If the coordinator's reducer fails, all participants are aborted. + /// + /// Returns and errors are identical to `call_reducer_on_db`. + pub fn call_reducer_on_db_2pc( + identity_ptr: *const u8, // exactly 32 bytes, BSATN-encoded Identity + reducer_ptr: *const u8, + reducer_len: u32, + args_ptr: *const u8, + args_len: u32, + out: *mut BytesSource, + ) -> u16; } /// What strategy does the database index use? @@ -1510,6 +1527,37 @@ pub fn call_reducer_on_db( } } +/// 2PC variant of [`call_reducer_on_db`]. +/// +/// Calls `/prepare/{reducer}` on the target database. On success, the runtime +/// stores the prepare_id internally. After the coordinator's reducer commits, +/// all participants are committed. On failure, all participants are aborted. +/// +/// Returns and errors are identical to [`call_reducer_on_db`]. +#[inline] +pub fn call_reducer_on_db_2pc( + identity: [u8; 32], + reducer_name: &str, + args: &[u8], +) -> Result<(u16, raw::BytesSource), raw::BytesSource> { + let mut out = raw::BytesSource::INVALID; + let status = unsafe { + raw::call_reducer_on_db_2pc( + identity.as_ptr(), + reducer_name.as_ptr(), + reducer_name.len() as u32, + args.as_ptr(), + args.len() as u32, + &mut out, + ) + }; + if status == Errno::HTTP_ERROR.code() { + Err(out) + } else { + Ok((status, out)) + } +} + /// Finds the JWT payload associated with `connection_id`. /// If nothing is found for the connection, this returns None. /// If a payload is found, this will return a valid [`raw::BytesSource`]. diff --git a/crates/bindings/src/remote_reducer.rs b/crates/bindings/src/remote_reducer.rs index bded8bc5ae7..8676a81203b 100644 --- a/crates/bindings/src/remote_reducer.rs +++ b/crates/bindings/src/remote_reducer.rs @@ -84,3 +84,43 @@ pub fn call_reducer_on_db(database_identity: Identity, reducer_name: &str, args: } } } + +/// Call a reducer on a remote database using the 2PC prepare protocol. +/// +/// This is the 2PC variant of [`call_reducer_on_db`]. It calls the target database's +/// `/prepare/{reducer}` endpoint. On success, the runtime stores the prepare_id internally. +/// After the coordinator's reducer commits, all participants are committed automatically. +/// If the coordinator's reducer fails (panics or returns Err), all participants are aborted. +/// +/// Returns and errors are identical to [`call_reducer_on_db`]. +pub fn call_reducer_on_db_2pc( + database_identity: Identity, + reducer_name: &str, + args: &[u8], +) -> Result<(), RemoteCallError> { + let identity_bytes = database_identity.to_byte_array(); + match spacetimedb_bindings_sys::call_reducer_on_db_2pc(identity_bytes, reducer_name, args) { + Ok((status, body_source)) => { + if status < 300 { + return Ok(()); + } + let msg = if body_source == spacetimedb_bindings_sys::raw::BytesSource::INVALID { + String::new() + } else { + let mut buf = IterBuf::take(); + read_bytes_source_into(body_source, &mut buf); + String::from_utf8_lossy(&buf).into_owned() + }; + if status == 404 { + Err(RemoteCallError::NotFound(msg)) + } else { + Err(RemoteCallError::Failed(msg)) + } + } + Err(err_source) => { + use crate::rt::read_bytes_source_as; + let msg = read_bytes_source_as::(err_source); + Err(RemoteCallError::Unreachable(msg)) + } + } +} diff --git a/crates/client-api/src/routes/database.rs b/crates/client-api/src/routes/database.rs index 29a49fe2c5c..7c82f5918de 100644 --- a/crates/client-api/src/routes/database.rs +++ b/crates/client-api/src/routes/database.rs @@ -241,6 +241,115 @@ fn parse_call_args(content_type: headers::ContentType, body: Bytes) -> axum::res } } +/// 2PC prepare endpoint: execute a reducer and return a prepare_id. +/// +/// `POST /v1/database/:name_or_identity/prepare/:reducer` +/// +/// On success, the response includes: +/// - `X-Prepare-Id` header with the prepare_id +/// - Body contains the reducer return value (if any) +pub async fn prepare( + State(worker_ctx): State, + Extension(auth): Extension, + Path(CallParams { + name_or_identity, + reducer, + }): Path, + TypedHeader(content_type): TypedHeader, + body: Bytes, +) -> axum::response::Result { + let args = parse_call_args(content_type, body)?; + let caller_identity = auth.claims.identity; + + let (module, Database { owner_identity, .. }) = find_module_and_database(&worker_ctx, name_or_identity).await?; + + let connection_id = generate_random_connection_id(); + + module + .call_identity_connected(auth.into(), connection_id) + .await + .map_err(client_connected_error_to_response)?; + + let result = module + .prepare_reducer(caller_identity, Some(connection_id), &reducer, args) + .await; + + module + .call_identity_disconnected(caller_identity, connection_id) + .await + .map_err(client_disconnected_error_to_response)?; + + match result { + Ok((prepare_id, rcr, return_value)) => { + let (status, body) = + reducer_outcome_response(&module, &owner_identity, &reducer, rcr.outcome, return_value)?; + let mut response = ( + status, + TypedHeader(SpacetimeEnergyUsed(rcr.energy_used)), + TypedHeader(SpacetimeExecutionDurationMicros(rcr.execution_duration)), + body, + ) + .into_response(); + if !prepare_id.is_empty() { + response.headers_mut().insert( + "X-Prepare-Id", + http::HeaderValue::from_str(&prepare_id).unwrap(), + ); + } + Ok(response) + } + Err(e) => Err(map_reducer_error(e, &reducer).into()), + } +} + +#[derive(Deserialize)] +pub struct TwoPcParams { + name_or_identity: NameOrIdentity, + prepare_id: String, +} + +/// 2PC commit endpoint: finalize a prepared transaction. +/// +/// `POST /v1/database/:name_or_identity/2pc/commit/:prepare_id` +pub async fn commit_2pc( + State(worker_ctx): State, + Extension(_auth): Extension, + Path(TwoPcParams { + name_or_identity, + prepare_id, + }): Path, +) -> axum::response::Result { + let (module, _database) = find_module_and_database(&worker_ctx, name_or_identity).await?; + + module.commit_prepared(&prepare_id).map_err(|e| { + log::error!("2PC commit failed: {e}"); + (StatusCode::NOT_FOUND, e).into_response() + })?; + + Ok(StatusCode::OK) +} + +/// 2PC abort endpoint: abort a prepared transaction. +/// +/// `POST /v1/database/:name_or_identity/2pc/abort/:prepare_id` +pub async fn abort_2pc( + State(worker_ctx): State, + Extension(_auth): Extension, + Path(TwoPcParams { + name_or_identity, + prepare_id, + }): Path, +) -> axum::response::Result { + let (module, _database) = find_module_and_database(&worker_ctx, name_or_identity).await?; + + module.abort_prepared(&prepare_id).map_err(|e| { + log::error!("2PC abort failed: {e}"); + (StatusCode::NOT_FOUND, e).into_response() + })?; + + Ok(StatusCode::OK) +} + fn reducer_outcome_response( module: &ModuleHost, owner_identity: &Identity, @@ -1247,6 +1356,12 @@ pub struct DatabaseRoutes { pub db_reset: MethodRouter, /// GET: /database/: name_or_identity/unstable/timestamp pub timestamp_get: MethodRouter, + /// POST: /database/:name_or_identity/prepare/:reducer + pub prepare_post: MethodRouter, + /// POST: /database/:name_or_identity/2pc/commit/:prepare_id + pub commit_2pc_post: MethodRouter, + /// POST: /database/:name_or_identity/2pc/abort/:prepare_id + pub abort_2pc_post: MethodRouter, } impl Default for DatabaseRoutes @@ -1272,6 +1387,9 @@ where pre_publish: post(pre_publish::), db_reset: put(reset::), timestamp_get: get(get_timestamp::), + prepare_post: post(prepare::), + commit_2pc_post: post(commit_2pc::), + abort_2pc_post: post(abort_2pc::), } } } @@ -1296,7 +1414,10 @@ where .route("/sql", self.sql_post) .route("/unstable/timestamp", self.timestamp_get) .route("/pre_publish", self.pre_publish) - .route("/reset", self.db_reset); + .route("/reset", self.db_reset) + .route("/prepare/:reducer", self.prepare_post) + .route("/2pc/commit/:prepare_id", self.commit_2pc_post) + .route("/2pc/abort/:prepare_id", self.abort_2pc_post); axum::Router::new() .route("/", self.root_post) diff --git a/crates/core/2PC-IMPLEMENTATION-PLAN.md b/crates/core/2PC-IMPLEMENTATION-PLAN.md new file mode 100644 index 00000000000..55d2ddbaa61 --- /dev/null +++ b/crates/core/2PC-IMPLEMENTATION-PLAN.md @@ -0,0 +1,182 @@ +# 2PC Implementation Plan (Pipelined) + +## Context + +The TPC-C benchmark on branch `origin/phoebe/tpcc/reducer-return-value` (public submodule) uses non-atomic HTTP calls for cross-database operations. We need 2PC so distributed transactions either commit on both databases or neither. Pipelined 2PC is chosen because it avoids blocking on persistence during lock-holding, and the codebase already separates in-memory commit from durability. + +## Protocol + +### Participant happy path: + +1. Receive CALL from coordinator (reducer name + args) +2. Execute reducer (write lock held) +3. Return result to coordinator (write lock still held, transaction still open) +4. Possibly receive more CALLs from coordinator (same transaction, same write lock) +5. Receive END_CALLS from coordinator ("no more reducer calls in this transaction") +6. Commit in-memory (release write lock) +7. Send PREPARE to durability worker +8. **Barrier up** -- no more durability requests go through +9. In background: wait for PREPARE to be durable +10. Once durable: send PREPARED to coordinator +11. Wait for COMMIT or ABORT from coordinator +12. Receive COMMIT +13. Send COMMIT to durability worker +14. **Barrier down** -- flush buffered requests + +### Coordinator happy path: + +1. Execute reducer, calling participant reducers along the way (participants hold write locks, return results, but don't commit) +2. Reducer succeeds +3. Send END_CALLS to all participants (they can now commit in-memory) +4. Commit coordinator in-memory (release write lock) +5. Send PREPARE to durability worker +6. **Barrier up** -- no more durability requests go through +7. Wait for coordinator's own PREPARE to be durable +8. Wait for all participants to report PREPARED +9. Send COMMIT to all participants +10. Send COMMIT to durability worker +11. **Barrier down** -- flush buffered requests + +## Key correctness properties + +- **Serializable isolation**: Participant holds write lock from CALL through END_CALLS. Multiple CALLs from the same coordinator transaction execute within the same MutTxId on the participant. The second call sees the first call's writes. +- **Persistence barrier**: After PREPARE is sent to durability (step 7/8 on participant, step 5/6 on coordinator), no speculative transactions can reach the durability worker until COMMIT or ABORT. Anything sent to the durability worker can eventually become persistent, so the barrier is required. +- **Two responses from participant**: The immediate result (step 3) and the later PREPARED notification (step 10). The coordinator collects both: results during reducer execution, PREPARED notifications before deciding COMMIT. +- **Pipelining benefit**: Locks are held only during reducer execution (steps 1-6), not during persistence (steps 7-14). The persistence and 2PC handshake happen after locks are released on both sides. + +## Holding MutTxId: reuse existing blocking pattern + +`MutTxId` is `!Send` (holds `SharedWriteGuard`). The participant must hold it across multiple CALL requests from the coordinator for serializable isolation. + +The codebase already has a blocking pattern: on the coordinator side, `call_reducer_on_db` uses `std::thread::scope` + `Handle::block_on` to block the WASM thread while making an async HTTP call. The same pattern works for the participant: instead of returning from the reducer execution, the participant's thread blocks on a channel (`blocking_recv`) waiting for the next command. The `MutTxId` stays alive on that same thread. No new threading model is needed. + +``` +Coordinator thread Participant thread +(WASM reducer running, (holds MutTxId, holds WASM instance) + holds coordinator MutTxId) + +call_reducer_on_db_2pc() + | + |-- HTTP POST /2pc/begin/debit -> spawn thread, create MutTxId + | execute reducer + | send result via channel + | <-- HTTP response (result block on channel (blocking_recv) + | + session_id) | + | | [MutTxId held, write lock held] + | | +call_reducer_on_db_2pc() (2nd call) | + | | + |-- HTTP POST /2pc/{sid}/call/x -> send command via channel + | wake up, execute reducer + | send result via channel + | <-- HTTP response block on channel + | | +reducer finishes | + | | +[post-commit coordination] | + | | + |-- HTTP POST /2pc/{sid}/end ---> wake up, commit in-memory + | release write lock + | send PREPARE to durability + | barrier up + | wait for PREPARE durable... + | <-- HTTP response (PREPARED) block on channel + | | + |-- HTTP POST /2pc/{sid}/commit -> wake up + | send COMMIT to durability + | barrier down, flush + | <-- HTTP response thread exits +``` + +### Implementation + +On first CALL for a new 2PC transaction: +1. The async HTTP handler spawns a blocking thread (via `std::thread::scope` or `tokio::task::spawn_blocking`) +2. The blocking thread takes a WASM instance from the module's instance pool +3. The blocking thread creates `MutTxId` (acquires write lock) +4. The blocking thread executes the first reducer +5. The blocking thread sends the result back via a `oneshot` channel +6. The async HTTP handler receives the result and returns the HTTP response with a `session_id` +7. The blocking thread blocks on a `mpsc::Receiver` waiting for the next command +8. The async HTTP handler stores the `mpsc::Sender` in a session map keyed by `session_id` + +Subsequent CALLs and END_CALLS look up the `session_id`, send commands on the channel. The blocking thread processes them sequentially on the same `MutTxId`. + +When the thread exits (after COMMIT or ABORT), it returns the WASM instance to the pool. + +```rust +enum TxCommand { + Call { reducer: String, args: Bytes, reply: oneshot::Sender }, + EndCalls { reply: oneshot::Sender }, + Commit { reply: oneshot::Sender<()> }, + Abort { reply: oneshot::Sender<()> }, +} +``` + +## Abort paths + +**Coordinator's reducer fails (step 2):** +- Send ABORT to all participants (they still hold write locks) +- Participants rollback their MutTxId (release write lock, no changes) +- No PREPARE was sent, no barrier needed + +**Participant's reducer fails (step 2):** +- Participant returns error to coordinator +- Coordinator's reducer fails (propagates error) +- Coordinator sends ABORT to all other participants that succeeded +- Those participants rollback their MutTxId + +**Coordinator's PREPARE persists but a participant's PREPARE fails to persist:** +- Participant cannot send PREPARED +- Coordinator times out waiting for PREPARED +- Coordinator sends ABORT to all participants +- Coordinator inverts its own in-memory state, discards buffered durability requests + +**Crash during protocol:** +- See proposal in `proposals/00XX-inter-database-communication.md` section 8 for recovery rules + +## Commitlog format + +- PREPARE record: includes all row changes (inserts/deletes) +- COMMIT record: follows PREPARE, marks transaction as committed +- ABORT record: follows PREPARE, marks transaction as aborted +- No other records can appear between PREPARE and COMMIT/ABORT in the durable log (persistence barrier enforces this) + +## Replay semantics + +On replay, when encountering a PREPARE: +- Do not apply it to the datastore +- Read the next record: + - COMMIT: apply the PREPARE's changes + - ABORT: skip the PREPARE + - No next record (crash): transaction is still in progress, wait for coordinator or timeout and abort + +## Persistence barrier + +The barrier in `relational_db.rs` has two states: `Inactive` and `Active`. + +- **Inactive**: normal operation, durability requests go through. +- **Active**: all durability requests are buffered. + +No race is possible because the barrier is activated on the same thread that holds the write lock. The sequence on both coordinator and participant is: + +1. Commit in-memory (releases write lock) +2. Send PREPARE to durability worker (direct call, bypasses barrier) +3. Activate barrier + +Steps 1-3 happen sequentially on one thread. No other transaction can commit between 1 and 3 because steps 2 and 3 are immediate (no async, no lock release between them). By the time another transaction acquires the write lock and commits, the barrier is already active and its durability request is buffered. + +- On COMMIT: deactivate, flush buffered requests +- On ABORT: deactivate, discard buffered requests + +## Key files + +- `crates/core/src/db/relational_db.rs` -- PersistenceBarrier (Inactive/Armed/Active), send_or_buffer_durability, finalize_prepare_commit/abort +- `crates/core/src/host/prepared_tx.rs` -- TxCommand, TxSession, PreparedTransactions registry, session map +- `crates/core/src/host/module_host.rs` -- begin_2pc_session, commit_prepared, abort_prepared +- `crates/core/src/host/wasm_common/module_host_actor.rs` -- coordinator post-commit coordination (END_CALLS, wait PREPARED, COMMIT) +- `crates/core/src/host/instance_env.rs` -- call_reducer_on_db_2pc, prepared_participants tracking +- `crates/core/src/host/wasmtime/wasm_instance_env.rs` -- WASM host function +- `crates/client-api/src/routes/database.rs` -- HTTP endpoints: /2pc/begin/:reducer, /2pc/:sid/call/:reducer, /2pc/:sid/end, /2pc/:sid/commit, /2pc/:sid/abort +- `crates/bindings-sys/src/lib.rs` -- FFI +- `crates/bindings/src/remote_reducer.rs` -- safe wrapper diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 27dfa826f4d..4fb1bfe0ec1 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -12,7 +12,7 @@ use spacetimedb_commitlog::{self as commitlog, Commitlog, SizeOnDisk}; use spacetimedb_data_structures::map::HashSet; use spacetimedb_datastore::db_metrics::DB_METRICS; use spacetimedb_datastore::error::{DatastoreError, TableError, ViewError}; -use spacetimedb_datastore::execution_context::{Workload, WorkloadType}; +use spacetimedb_datastore::execution_context::{ReducerContext, Workload, WorkloadType}; use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics; use spacetimedb_datastore::locking_tx_datastore::state_view::{ IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, StateView, @@ -76,6 +76,80 @@ type RowCountFn = Arc i64 + Send + Sync>; /// The type of transactions committed by [RelationalDB]. pub type Txdata = commitlog::payload::Txdata; +/// A buffered durability request, held behind the persistence barrier. +pub struct BufferedDurabilityRequest { + pub reducer_context: Option, + pub tx_data: Arc, +} + +/// The persistence barrier prevents durability requests from being sent to the +/// durability worker while a 2PC PREPARE is pending. +/// +/// When active: +/// - The PREPARE's own durability request has already been sent to the worker. +/// - All subsequent durability requests are buffered here. +/// - Once the PREPARE is confirmed durable and a COMMIT/ABORT decision is made: +/// - COMMIT: buffered requests are flushed to the worker. +/// - ABORT: buffered requests are discarded. +#[derive(Default)] +pub struct PersistenceBarrier { + inner: std::sync::Mutex, +} + +#[derive(Default)] +struct PersistenceBarrierInner { + /// Whether the barrier is active. When active, all durability requests + /// are buffered instead of being sent to the worker. + active: bool, + /// Buffered durability requests that arrived while the barrier was active. + buffered: Vec, +} + +impl PersistenceBarrier { + pub fn new() -> Self { + Self::default() + } + + /// Activate the barrier. All subsequent durability requests will be buffered. + /// + /// Called after committing in-memory and sending PREPARE to the durability + /// worker. No race is possible because this runs on the same thread that + /// just released the write lock, before any other transaction can commit. + pub fn activate(&self) { + let mut inner = self.inner.lock().unwrap(); + assert!(!inner.active, "persistence barrier already active"); + inner.active = true; + inner.buffered.clear(); + } + + /// If the barrier is active, buffer the durability request and return None. + /// If inactive, return the arguments back (caller should send normally). + pub fn try_buffer( + &self, + reducer_context: Option, + tx_data: &Arc, + ) -> Option> { + let mut inner = self.inner.lock().unwrap(); + if inner.active { + inner.buffered.push(BufferedDurabilityRequest { + reducer_context, + tx_data: tx_data.clone(), + }); + None + } else { + Some(reducer_context) + } + } + + /// Deactivate the barrier and return the buffered requests. + /// Called on COMMIT (to flush them) or ABORT (to discard them). + pub fn deactivate(&self) -> Vec { + let mut inner = self.inner.lock().unwrap(); + inner.active = false; + std::mem::take(&mut inner.buffered) + } +} + /// We've added a module version field to the system tables, but we don't yet /// have the infrastructure to support multiple versions. /// All modules are currently locked to this version, but this will be @@ -111,6 +185,10 @@ pub struct RelationalDB { /// An async queue for recording transaction metrics off the main thread metrics_recorder_queue: Option, + + /// 2PC persistence barrier. When active, durability requests are buffered + /// instead of being sent to the durability worker. + persistence_barrier: PersistenceBarrier, } /// Perform a snapshot every `SNAPSHOT_FREQUENCY` transactions. @@ -175,6 +253,7 @@ impl RelationalDB { workload_type_to_exec_counters, metrics_recorder_queue, + persistence_barrier: PersistenceBarrier::new(), } } @@ -820,9 +899,7 @@ impl RelationalDB { self.maybe_do_snapshot(&tx_data); let tx_data = Arc::new(tx_data); - if let Some(durability) = &self.durability { - durability.request_durability(reducer_context, &tx_data); - } + self.send_or_buffer_durability(reducer_context, &tx_data); Ok(Some((tx_offset, tx_data, tx_metrics, reducer))) } @@ -836,11 +913,64 @@ impl RelationalDB { self.maybe_do_snapshot(&tx_data); let tx_data = Arc::new(tx_data); + self.send_or_buffer_durability(tx.ctx.reducer_context().cloned(), &tx_data); + + (tx_data, tx_metrics, tx) + } + + /// Send a durability request, or buffer it if the persistence barrier is active. + fn send_or_buffer_durability(&self, reducer_context: Option, tx_data: &Arc) { + match self.persistence_barrier.try_buffer(reducer_context, tx_data) { + None => { + // Buffered behind the persistence barrier. + } + Some(reducer_context) => { + // Barrier not active. Send to durability worker. + if let Some(durability) = &self.durability { + durability.request_durability(reducer_context, tx_data); + } + } + } + } + + /// Activate the persistence barrier for a 2PC PREPARE. + /// + /// Call this AFTER committing in-memory and sending PREPARE to the + /// durability worker. All subsequent durability requests will be buffered + /// until `finalize_prepare_commit()` or `finalize_prepare_abort()`. + pub fn activate_persistence_barrier(&self) { + self.persistence_barrier.activate(); + } + + /// Finalize a 2PC transaction as COMMIT. + /// Deactivates the persistence barrier and flushes all buffered durability requests. + pub fn finalize_prepare_commit(&self) { + let buffered = self.persistence_barrier.deactivate(); if let Some(durability) = &self.durability { - durability.request_durability(tx.ctx.reducer_context().cloned(), &tx_data); + for req in buffered { + durability.request_durability(req.reducer_context, &req.tx_data); + } } + } - (tx_data, tx_metrics, tx) + /// Finalize a 2PC transaction as ABORT. + /// Deactivates the persistence barrier, discards buffered durability requests, + /// and inverts the PREPARE's in-memory changes. + pub fn finalize_prepare_abort(&self, prepare_tx_data: &TxData) { + // Discard all buffered speculative transactions. + let _discarded = self.persistence_barrier.deactivate(); + // TODO: Invert in-memory state using prepare_tx_data. + // For now, log a warning. Full inversion requires: + // 1. Begin new MutTx + // 2. Delete rows from prepare_tx_data.persistent_inserts() + // 3. Re-insert rows from prepare_tx_data.persistent_deletes() + // 4. Commit without durability + // 5. Re-execute discarded speculative transactions + log::warn!( + "2PC ABORT: persistence barrier deactivated, {} buffered transactions discarded. \ + In-memory state inversion not yet implemented.", + _discarded.len() + ); } /// Get the [`DurableOffset`] of this database, or `None` if this is an @@ -851,6 +981,11 @@ impl RelationalDB { .map(|durability| durability.durable_tx_offset()) } + /// Get a reference to the persistence barrier (for 2PC). + pub fn persistence_barrier(&self) -> &PersistenceBarrier { + &self.persistence_barrier + } + /// Decide based on the `committed_state.next_tx_offset` /// whether to request that the [`SnapshotWorker`] in `self` capture a snapshot of the database. /// diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index 1fdc651414e..29ed8b28069 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -54,6 +54,11 @@ pub struct InstanceEnv { in_anon_tx: bool, /// A procedure's last known transaction offset. procedure_last_tx_offset: Option, + /// 2PC: prepared participants from `call_reducer_on_db_2pc` calls. + /// Each entry is (database_identity, prepare_id). + /// After the coordinator's reducer commits, these are committed; + /// on failure, they are aborted. + pub prepared_participants: Vec<(Identity, String)>, } /// `InstanceEnv` needs to be `Send` because it is created on the host thread @@ -238,6 +243,7 @@ impl InstanceEnv { func_name: None, in_anon_tx: false, procedure_last_tx_offset: None, + prepared_participants: Vec::new(), } } @@ -1045,6 +1051,166 @@ impl InstanceEnv { result } } + + /// Call a reducer on a remote database using the 2PC prepare protocol. + /// + /// Like [`Self::call_reducer_on_db`], but POSTs to `/prepare/{reducer}` instead of + /// `/call/{reducer}`. On success, parses the `X-Prepare-Id` response header and stores + /// `(database_identity, prepare_id)` in [`Self::prepared_participants`]. + /// + /// Returns `(http_status, response_body)` on transport success. + /// The caller (coordinator reducer) is responsible for checking the status; + /// if the coordinator's reducer commits, the runtime will commit all participants, + /// and if it fails, the runtime will abort them. + pub fn call_reducer_on_db_2pc( + &mut self, + database_identity: Identity, + reducer_name: &str, + args: bytes::Bytes, + ) -> impl Future), NodesError>> + use<> { + let client = self.replica_ctx.call_reducer_client.clone(); + let router = self.replica_ctx.call_reducer_router.clone(); + let reducer_name = reducer_name.to_owned(); + let auth_token = self.replica_ctx.call_reducer_auth_token.clone(); + let caller_identity = self.replica_ctx.database.database_identity; + + async move { + let start = Instant::now(); + + let base_url = router + .resolve_base_url(database_identity) + .await + .map_err(|e| NodesError::HttpError(e.to_string()))?; + let url = format!( + "{}/v1/database/{}/prepare/{}", + base_url, + database_identity.to_hex(), + reducer_name, + ); + let mut req = client + .post(&url) + .header(http::header::CONTENT_TYPE, "application/octet-stream") + .body(args); + if let Some(token) = auth_token { + req = req.header(http::header::AUTHORIZATION, format!("Bearer {token}")); + } + let result = async { + let response = req.send().await.map_err(|e| NodesError::HttpError(e.to_string()))?; + let status = response.status().as_u16(); + let prepare_id = response + .headers() + .get("X-Prepare-Id") + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_owned()); + let body = response.bytes().await.map_err(|e| NodesError::HttpError(e.to_string()))?; + Ok((status, body, prepare_id)) + } + .await; + + WORKER_METRICS + .cross_db_reducer_calls_total + .with_label_values(&caller_identity) + .inc(); + WORKER_METRICS + .cross_db_reducer_duration_seconds + .with_label_values(&caller_identity) + .observe(start.elapsed().as_secs_f64()); + + result + } + } + + /// Commit all prepared participants (called after coordinator's reducer succeeds). + pub fn commit_all_prepared( + &mut self, + ) -> impl Future + use<> { + let participants = mem::take(&mut self.prepared_participants); + let client = self.replica_ctx.call_reducer_client.clone(); + let router = self.replica_ctx.call_reducer_router.clone(); + let auth_token = self.replica_ctx.call_reducer_auth_token.clone(); + + async move { + for (db_identity, prepare_id) in participants { + let base_url = match router.resolve_base_url(db_identity).await { + Ok(url) => url, + Err(e) => { + log::error!("2PC commit: failed to resolve base URL for {db_identity}: {e}"); + continue; + } + }; + let url = format!( + "{}/v1/database/{}/2pc/commit/{}", + base_url, + db_identity.to_hex(), + prepare_id, + ); + let mut req = client.post(&url); + if let Some(ref token) = auth_token { + req = req.header(http::header::AUTHORIZATION, format!("Bearer {token}")); + } + match req.send().await { + Ok(resp) if resp.status().is_success() => { + log::info!("2PC commit: committed {prepare_id} on {db_identity}"); + } + Ok(resp) => { + log::error!( + "2PC commit: failed for {prepare_id} on {db_identity}: status {}", + resp.status() + ); + } + Err(e) => { + log::error!("2PC commit: transport error for {prepare_id} on {db_identity}: {e}"); + } + } + } + } + } + + /// Abort all prepared participants (called when coordinator's reducer fails). + pub fn abort_all_prepared( + &mut self, + ) -> impl Future + use<> { + let participants = mem::take(&mut self.prepared_participants); + let client = self.replica_ctx.call_reducer_client.clone(); + let router = self.replica_ctx.call_reducer_router.clone(); + let auth_token = self.replica_ctx.call_reducer_auth_token.clone(); + + async move { + for (db_identity, prepare_id) in participants { + let base_url = match router.resolve_base_url(db_identity).await { + Ok(url) => url, + Err(e) => { + log::error!("2PC abort: failed to resolve base URL for {db_identity}: {e}"); + continue; + } + }; + let url = format!( + "{}/v1/database/{}/2pc/abort/{}", + base_url, + db_identity.to_hex(), + prepare_id, + ); + let mut req = client.post(&url); + if let Some(ref token) = auth_token { + req = req.header(http::header::AUTHORIZATION, format!("Bearer {token}")); + } + match req.send().await { + Ok(resp) if resp.status().is_success() => { + log::info!("2PC abort: aborted {prepare_id} on {db_identity}"); + } + Ok(resp) => { + log::error!( + "2PC abort: failed for {prepare_id} on {db_identity}: status {}", + resp.status() + ); + } + Err(e) => { + log::error!("2PC abort: transport error for {prepare_id} on {db_identity}: {e}"); + } + } + } + } + } } /// Default timeout for HTTP requests performed by [`InstanceEnv::http_request`]. diff --git a/crates/core/src/host/mod.rs b/crates/core/src/host/mod.rs index 25e56ca217e..df6ec4d42f0 100644 --- a/crates/core/src/host/mod.rs +++ b/crates/core/src/host/mod.rs @@ -15,6 +15,7 @@ mod host_controller; mod module_common; #[allow(clippy::too_many_arguments)] pub mod module_host; +pub mod prepared_tx; pub mod scheduler; pub mod wasmtime; diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 929db1b8004..efdd236c775 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -890,6 +890,9 @@ pub struct ModuleHost { /// /// When this is true, most operations will fail with [`NoSuchModule`]. closed: Arc, + + /// Registry of prepared (but not yet finalized) 2PC transactions. + prepared_txs: super::prepared_tx::PreparedTransactions, } impl fmt::Debug for ModuleHost { @@ -906,6 +909,7 @@ pub struct WeakModuleHost { inner: Weak, on_panic: Weak, closed: Weak, + prepared_txs: super::prepared_tx::PreparedTransactions, } #[derive(Debug)] @@ -1093,6 +1097,7 @@ impl ModuleHost { inner, on_panic, closed: Arc::new(AtomicBool::new(false)), + prepared_txs: super::prepared_tx::PreparedTransactions::new(), } } @@ -1740,6 +1745,100 @@ impl ModuleHost { res } + /// Execute a reducer in 2PC prepare mode. + /// + /// Execute a reducer as a 2PC PREPARE. + /// + /// 1. Executes the reducer and commits in-memory (releasing the write lock). + /// 2. Sends the PREPARE to the durability worker. + /// 3. Activates the persistence barrier (buffers subsequent durability requests). + /// 4. Waits for the PREPARE to become durable. + /// 5. Returns the prepare_id, result, and return value. + /// + /// The caller should then send PREPARED to the coordinator. + pub async fn prepare_reducer( + &self, + caller_identity: Identity, + caller_connection_id: Option, + reducer_name: &str, + args: FunctionArgs, + ) -> Result<(String, ReducerCallResult, Option), ReducerCallError> { + // Call the reducer using the 2PC prepare commit path. + // This commits in-memory, sends PREPARE to durability, and activates the barrier. + let (result, return_value) = self + .call_reducer_with_return( + caller_identity, + caller_connection_id, + None, // no websocket client + None, // no request_id + None, // no timer + reducer_name, + args, + ) + .await?; + + // Only store prepared tx info and activate barrier if the reducer succeeded. + if matches!(result.outcome, ReducerOutcome::Committed) { + use std::sync::atomic::{AtomicU64, Ordering}; + static PREPARE_COUNTER: AtomicU64 = AtomicU64::new(1); + let prepare_id = format!("prepare-{}", PREPARE_COUNTER.fetch_add(1, Ordering::Relaxed)); + + // Activate the persistence barrier. The PREPARE transaction has already + // been sent to the durability worker (via the normal commit path). + // The barrier prevents any subsequent transactions from being persisted + // until we finalize with COMMIT or ABORT. + self.relational_db().activate_persistence_barrier(); + + let info = super::prepared_tx::PreparedTxInfo { + tx_offset: 0, // TODO: thread TxOffset from commit path + tx_data: std::sync::Arc::new(spacetimedb_datastore::traits::TxData::default()), + reducer_context: None, + }; + self.prepared_txs.insert(prepare_id.clone(), info); + + // Wait for the PREPARE to become durable before returning. + // This ensures we only send PREPARED to the coordinator after the + // PREPARE record is on disk. + if let Some(mut durable_offset) = self.relational_db().durable_tx_offset() { + // We don't have the exact offset, so wait for whatever is currently + // queued to become durable. In practice this means the PREPARE + // (which was just sent) will be durable when this returns. + let current = durable_offset.last_seen().unwrap_or(0); + // Wait for at least one more offset to become durable. + let _ = durable_offset.wait_for(current + 1).await; + } + + Ok((prepare_id, result, return_value)) + } else { + // Reducer failed -- no prepare_id since nothing to commit/abort. + Ok((String::new(), result, return_value)) + } + } + + /// Finalize a prepared transaction as COMMIT. + /// + /// Deactivates the persistence barrier and flushes all buffered durability + /// requests to the durability worker. + pub fn commit_prepared(&self, prepare_id: &str) -> Result<(), String> { + let _info = self.prepared_txs + .remove(prepare_id) + .ok_or_else(|| format!("no such prepared transaction: {prepare_id}"))?; + self.relational_db().finalize_prepare_commit(); + Ok(()) + } + + /// Abort a prepared transaction. + /// + /// Deactivates the persistence barrier, discards all buffered durability + /// requests, and inverts the PREPARE's in-memory changes. + pub fn abort_prepared(&self, prepare_id: &str) -> Result<(), String> { + let info = self.prepared_txs + .remove(prepare_id) + .ok_or_else(|| format!("no such prepared transaction: {prepare_id}"))?; + self.relational_db().finalize_prepare_abort(&info.tx_data); + Ok(()) + } + pub async fn call_view_add_single_subscription( &self, sender: Arc, @@ -2561,6 +2660,7 @@ impl ModuleHost { inner: Arc::downgrade(&self.inner), on_panic: Arc::downgrade(&self.on_panic), closed: Arc::downgrade(&self.closed), + prepared_txs: self.prepared_txs.clone(), } } @@ -2605,6 +2705,7 @@ impl WeakModuleHost { inner, on_panic, closed, + prepared_txs: self.prepared_txs.clone(), }) } } diff --git a/crates/core/src/host/prepared_tx.rs b/crates/core/src/host/prepared_tx.rs new file mode 100644 index 00000000000..cc40cada4e7 --- /dev/null +++ b/crates/core/src/host/prepared_tx.rs @@ -0,0 +1,37 @@ +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +use spacetimedb_datastore::execution_context::ReducerContext; +use spacetimedb_datastore::traits::TxData; +use spacetimedb_durability::TxOffset; + +/// Information about a transaction that has been prepared (committed in-memory, +/// PREPARE sent to durability) but not yet finalized (COMMIT or ABORT). +pub struct PreparedTxInfo { + /// The offset of the PREPARE record in the commitlog. + pub tx_offset: TxOffset, + /// The transaction data (row changes) for potential abort inversion. + pub tx_data: Arc, + /// The reducer context for the prepared transaction. + pub reducer_context: Option, +} + +/// Thread-safe registry of prepared transactions, keyed by prepare_id. +#[derive(Clone, Default)] +pub struct PreparedTransactions { + inner: Arc>>, +} + +impl PreparedTransactions { + pub fn new() -> Self { + Self::default() + } + + pub fn insert(&self, id: String, info: PreparedTxInfo) { + self.inner.lock().unwrap().insert(id, info); + } + + pub fn remove(&self, id: &str) -> Option { + self.inner.lock().unwrap().remove(id) + } +} diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index 332c9c89dd3..5810c67d7bd 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -1398,6 +1398,11 @@ impl WasmInstance for V8Instance<'_, '_, '_> { log_traceback(self.replica_ctx, func_type, func, trap) } + fn take_prepared_participants(&mut self) -> Vec<(Identity, String)> { + // V8/JS does not currently support 2PC, so always return empty. + Vec::new() + } + async fn call_procedure( &mut self, op: ProcedureOp, diff --git a/crates/core/src/host/wasm_common.rs b/crates/core/src/host/wasm_common.rs index b5bba032d7c..5d744bc2108 100644 --- a/crates/core/src/host/wasm_common.rs +++ b/crates/core/src/host/wasm_common.rs @@ -438,6 +438,10 @@ macro_rules! abi_funcs { // Implemented as a sync host function (using block_in_place) so it can be called // from within a reducer body where only synchronous host functions are allowed. "spacetime_10.5"::call_reducer_on_db, + + // 2PC variant: calls /prepare/{reducer} instead of /call/{reducer}. + // Stores the prepare_id for post-commit coordination. + "spacetime_10.5"::call_reducer_on_db_2pc, } $link_async! { diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 7898a4f205a..96d19dc54a6 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -92,6 +92,10 @@ pub trait WasmInstance { fn log_traceback(&self, func_type: &str, func: &str, trap: &anyhow::Error); + /// Take the list of 2PC prepared participants accumulated during this reducer call. + /// Returns the participants and clears the internal list. + fn take_prepared_participants(&mut self) -> Vec<(Identity, String)>; + fn call_procedure( &mut self, op: ProcedureOp, @@ -977,6 +981,92 @@ impl InstanceCommon { }; let event = commit_and_broadcast_event(&info.subscriptions, client, event, out.tx).event; + // 2PC post-commit coordination: commit or abort all prepared participants. + let prepared_participants = inst.take_prepared_participants(); + if !prepared_participants.is_empty() { + let committed = matches!(event.status, EventStatus::Committed(_)); + let stdb = info.subscriptions.relational_db(); + + if committed { + // Coordinator's PREPARE: activate the persistence barrier. + // The coordinator's transaction was just sent to the durability worker + // (via commit_and_broadcast_event -> commit_tx_downgrade -> send_or_buffer_durability). + // No subsequent transactions should be persisted until we confirm all + // participants are prepared and we decide COMMIT. + stdb.activate_persistence_barrier(); + } + + let replica_ctx = inst.replica_ctx().clone(); + let handle = tokio::runtime::Handle::current(); + std::thread::scope(|s| { + s.spawn(|| { + handle.block_on(async { + if committed { + // Wait for coordinator's PREPARE to become durable. + if let Some(mut durable_offset) = stdb.durable_tx_offset() { + let current: u64 = durable_offset.last_seen().unwrap_or(0); + let _ = durable_offset.wait_for(current + 1).await; + } + } + + let client = replica_ctx.call_reducer_client.clone(); + let router = replica_ctx.call_reducer_router.clone(); + let auth_token = replica_ctx.call_reducer_auth_token.clone(); + for (db_identity, prepare_id) in &prepared_participants { + let action = if committed { "commit" } else { "abort" }; + let base_url = match router.resolve_base_url(*db_identity).await { + Ok(url) => url, + Err(e) => { + log::error!("2PC {action}: failed to resolve base URL for {db_identity}: {e}"); + continue; + } + }; + let url = format!( + "{}/v1/database/{}/2pc/{}/{}", + base_url, + db_identity.to_hex(), + action, + prepare_id, + ); + let mut req = client.post(&url); + if let Some(ref token) = auth_token { + req = req.header( + http::header::AUTHORIZATION, + format!("Bearer {token}"), + ); + } + match req.send().await { + Ok(resp) if resp.status().is_success() => { + log::info!("2PC {action}: {prepare_id} on {db_identity}"); + } + Ok(resp) => { + log::error!( + "2PC {action}: failed for {prepare_id} on {db_identity}: status {}", + resp.status() + ); + } + Err(e) => { + log::error!( + "2PC {action}: transport error for {prepare_id} on {db_identity}: {e}" + ); + } + } + } + }); + }) + .join() + .expect("2PC coordination thread panicked"); + }); + + // Deactivate the barrier and flush buffered durability requests. + if committed { + stdb.finalize_prepare_commit(); + } else { + // On abort, discard buffered requests. No barrier was activated + // (we only activate on committed), so this is a no-op. + } + } + let res = ReducerCallResult { outcome: ReducerOutcome::from(&event.status), energy_used: energy_quanta_used, diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index 728b3711057..73860dbf4ce 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -232,6 +232,11 @@ impl WasmInstanceEnv { &self.instance_env } + /// Return a mutable reference to the `InstanceEnv`. + pub fn instance_env_mut(&mut self) -> &mut InstanceEnv { + &mut self.instance_env + } + /// Setup the standard bytes sink and return a handle to it for writing. pub fn setup_standard_bytes_sink(&mut self) -> u32 { self.standard_bytes_sink = Some(Vec::new()); @@ -2023,6 +2028,77 @@ impl WasmInstanceEnv { } }) } + + /// 2PC variant of `call_reducer_on_db`. + /// + /// Calls the remote database's `/prepare/{reducer}` endpoint instead of `/call/{reducer}`. + /// On success, parses the `X-Prepare-Id` header and stores the participant info in + /// `InstanceEnv::prepared_participants` so the runtime can commit/abort after the + /// coordinator's reducer completes. + /// + /// Returns the HTTP status code on success, writing the response body to `*out` + /// as a [`BytesSource`]. + /// + /// On transport failure: + /// - Returns `HTTP_ERROR` errno, writing a BSATN-encoded error [`String`] to `*out`. + pub fn call_reducer_on_db_2pc( + caller: Caller<'_, Self>, + identity_ptr: WasmPtr, + reducer_ptr: WasmPtr, + reducer_len: u32, + args_ptr: WasmPtr, + args_len: u32, + out: WasmPtr, + ) -> RtResult { + Self::cvt_custom(caller, AbiCall::CallReducerOnDb, |caller| { + let (mem, env) = Self::mem_env(caller); + + let identity_slice = mem.deref_slice(identity_ptr, 32)?; + let identity_bytes: [u8; 32] = identity_slice + .try_into() + .expect("deref_slice(ptr, 32) always yields exactly 32 bytes"); + let database_identity = Identity::from_byte_array(identity_bytes); + + let reducer_name = mem.deref_str(reducer_ptr, reducer_len)?.to_owned(); + let args_buf = mem.deref_slice(args_ptr, args_len)?; + let args = bytes::Bytes::copy_from_slice(args_buf); + + let handle = tokio::runtime::Handle::current(); + let fut = env + .instance_env + .call_reducer_on_db_2pc(database_identity, &reducer_name, args); + let result = std::thread::scope(|s| { + s.spawn(|| handle.block_on(fut)) + .join() + .expect("call_reducer_on_db_2pc: worker thread panicked") + }); + + match result { + Ok((status, body, prepare_id)) => { + // If we got a prepare_id, register this participant. + if let Some(pid) = prepare_id { + if status < 300 { + env.instance_env + .prepared_participants + .push((database_identity, pid)); + } + } + let bytes_source = WasmInstanceEnv::create_bytes_source(env, body)?; + bytes_source.0.write_to(mem, out)?; + Ok(status as u32) + } + Err(NodesError::HttpError(err)) => { + let err_bytes = bsatn::to_vec(&err).with_context(|| { + format!("Failed to BSATN-serialize call_reducer_on_db_2pc transport error: {err:?}") + })?; + let bytes_source = WasmInstanceEnv::create_bytes_source(env, err_bytes.into())?; + bytes_source.0.write_to(mem, out)?; + Ok(errno::HTTP_ERROR.get() as u32) + } + Err(e) => Err(WasmError::Db(e)), + } + }) + } } type Fut<'caller, T> = Box>; diff --git a/crates/core/src/host/wasmtime/wasmtime_module.rs b/crates/core/src/host/wasmtime/wasmtime_module.rs index 2ea14a57be9..fb01e3a4763 100644 --- a/crates/core/src/host/wasmtime/wasmtime_module.rs +++ b/crates/core/src/host/wasmtime/wasmtime_module.rs @@ -564,6 +564,10 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { log_traceback(func_type, func, trap) } + fn take_prepared_participants(&mut self) -> Vec<(Identity, String)> { + core::mem::take(&mut self.store.data_mut().instance_env_mut().prepared_participants) + } + #[tracing::instrument(level = "trace", skip_all)] async fn call_procedure( &mut self, diff --git a/crates/smoketests/tests/smoketests/cross_db_2pc.rs b/crates/smoketests/tests/smoketests/cross_db_2pc.rs new file mode 100644 index 00000000000..feb92deda0e --- /dev/null +++ b/crates/smoketests/tests/smoketests/cross_db_2pc.rs @@ -0,0 +1,201 @@ +use spacetimedb_smoketests::Smoketest; + +/// Module code for the 2PC test. +/// +/// Both the "bank A" and "bank B" databases use the same module. +/// +/// Tables: +/// - `Ledger(account: String PK, balance: i64)` -- stores account balances. +/// +/// Reducers: +/// - `init`: seeds "alice" with balance 100. +/// - `debit(account, amount)`: decrements balance, panics if insufficient funds. +/// - `credit(account, amount)`: increments balance (or inserts if absent). +/// - `transfer_funds(target_hex, from_account, to_account, amount)`: +/// Credits `to_account` locally, then calls `debit` on the remote database +/// using `call_reducer_on_db_2pc`. If the remote debit fails (panic/insufficient funds), +/// the local credit is also rolled back by the 2PC protocol. +const MODULE_CODE: &str = r#" +use spacetimedb::{log, ReducerContext, Table, Identity}; + +#[spacetimedb::table(accessor = ledger, public)] +pub struct Ledger { + #[primary_key] + account: String, + balance: i64, +} + +#[spacetimedb::reducer(init)] +pub fn init(ctx: &ReducerContext) { + ctx.db.ledger().insert(Ledger { account: "alice".to_string(), balance: 100 }); +} + +#[spacetimedb::reducer] +pub fn debit(ctx: &ReducerContext, account: String, amount: i64) { + let row = ctx.db.ledger().account().find(&account) + .unwrap_or_else(|| panic!("account '{}' not found", account)); + let new_balance = row.balance - amount; + if new_balance < 0 { + panic!("insufficient funds: account '{}' has {} but tried to debit {}", account, row.balance, amount); + } + ctx.db.ledger().account().update(Ledger { account, balance: new_balance }); +} + +#[spacetimedb::reducer] +pub fn credit(ctx: &ReducerContext, account: String, amount: i64) { + match ctx.db.ledger().account().find(&account) { + Some(row) => { + ctx.db.ledger().account().update(Ledger { account, balance: row.balance + amount }); + } + None => { + ctx.db.ledger().insert(Ledger { account, balance: amount }); + } + } +} + +/// Transfer `amount` from `from_account` on the remote database to `to_account` locally. +/// +/// Uses 2PC: credits locally first, then calls debit on the remote database via +/// `call_reducer_on_db_2pc`. If the remote debit fails, the coordinator's reducer also +/// fails, triggering abort of all participants. +#[spacetimedb::reducer] +pub fn transfer_funds(ctx: &ReducerContext, target_hex: String, from_account: String, to_account: String, amount: i64) { + // Credit locally first. + credit(ctx, to_account.clone(), amount); + + // Now call debit on the remote database using 2PC. + let target = Identity::from_hex(&target_hex).expect("invalid target identity hex"); + let args = spacetimedb::spacetimedb_lib::bsatn::to_vec(&(from_account, amount)).expect("failed to encode args"); + match spacetimedb::remote_reducer::call_reducer_on_db_2pc(target, "debit", &args) { + Ok(()) => { + log::info!("transfer_funds: remote debit succeeded"); + } + Err(e) => { + log::error!("transfer_funds: remote debit failed: {}", e); + panic!("remote debit failed: {e}"); + } + } +} +"#; + +/// Happy path: transfer 50 from B's alice to A's alice. +/// After: A alice = 150, B alice = 50. +#[test] +fn test_cross_db_2pc_happy_path() { + let pid = std::process::id(); + let db_a_name = format!("2pc-bank-a-{pid}"); + let db_b_name = format!("2pc-bank-b-{pid}"); + + let mut test = Smoketest::builder().module_code(MODULE_CODE).autopublish(false).build(); + + // Publish bank B (the participant that will be debited). + test.publish_module_named(&db_b_name, false) + .expect("failed to publish bank B"); + let db_b_identity = test + .database_identity + .clone() + .expect("bank B identity not set"); + + // Publish bank A (the coordinator that will be credited). + test.publish_module_named(&db_a_name, false) + .expect("failed to publish bank A"); + let _db_a_identity = test + .database_identity + .clone() + .expect("bank A identity not set"); + + // Transfer 50 from B's alice to A's alice. + // The coordinator is bank A. It credits locally, then calls debit on B via 2PC. + test.call("transfer_funds", &[&db_b_identity, "alice", "alice", "50"]) + .expect("transfer_funds failed"); + + // Verify bank A: alice should have 150. + let result_a = test + .spacetime(&[ + "sql", + "--server", + &test.server_url, + test.database_identity.as_ref().unwrap(), + "SELECT balance FROM ledger WHERE account = 'alice'", + ]) + .expect("sql query on bank A failed"); + assert!( + result_a.contains("150"), + "Expected bank A alice balance = 150, got:\n{result_a}" + ); + + // Verify bank B: alice should have 50. + let result_b = test + .spacetime(&[ + "sql", + "--server", + &test.server_url, + &db_b_identity, + "SELECT balance FROM ledger WHERE account = 'alice'", + ]) + .expect("sql query on bank B failed"); + assert!( + result_b.contains("50"), + "Expected bank B alice balance = 50, got:\n{result_b}" + ); +} + +/// Abort path: try to transfer 200, but B only has 100. +/// The remote debit should fail, causing the coordinator reducer to panic, +/// which should roll back the local credit. +/// After: both A and B should still have alice = 100. +#[test] +fn test_cross_db_2pc_abort_insufficient_funds() { + let pid = std::process::id(); + let db_a_name = format!("2pc-abort-a-{pid}"); + let db_b_name = format!("2pc-abort-b-{pid}"); + + let mut test = Smoketest::builder().module_code(MODULE_CODE).autopublish(false).build(); + + // Publish bank B. + test.publish_module_named(&db_b_name, false) + .expect("failed to publish bank B"); + let db_b_identity = test + .database_identity + .clone() + .expect("bank B identity not set"); + + // Publish bank A. + test.publish_module_named(&db_a_name, false) + .expect("failed to publish bank A"); + + // Try to transfer 200 -- B only has 100, so the remote debit will fail. + let result = test.call("transfer_funds", &[&db_b_identity, "alice", "alice", "200"]); + // The call should fail because the remote debit panicked. + assert!(result.is_err(), "Expected transfer_funds to fail due to insufficient funds"); + + // Verify bank A: alice should still have 100 (the local credit was rolled back). + let result_a = test + .spacetime(&[ + "sql", + "--server", + &test.server_url, + test.database_identity.as_ref().unwrap(), + "SELECT balance FROM ledger WHERE account = 'alice'", + ]) + .expect("sql query on bank A failed"); + assert!( + result_a.contains("100"), + "Expected bank A alice balance = 100 after failed transfer, got:\n{result_a}" + ); + + // Verify bank B: alice should still have 100. + let result_b = test + .spacetime(&[ + "sql", + "--server", + &test.server_url, + &db_b_identity, + "SELECT balance FROM ledger WHERE account = 'alice'", + ]) + .expect("sql query on bank B failed"); + assert!( + result_b.contains("100"), + "Expected bank B alice balance = 100 after failed transfer, got:\n{result_b}" + ); +} diff --git a/crates/smoketests/tests/smoketests/mod.rs b/crates/smoketests/tests/smoketests/mod.rs index 18ad7b51199..52cf11c6107 100644 --- a/crates/smoketests/tests/smoketests/mod.rs +++ b/crates/smoketests/tests/smoketests/mod.rs @@ -9,6 +9,7 @@ mod client_connection_errors; mod confirmed_reads; mod connect_disconnect_from_cli; mod create_project; +mod cross_db_2pc; mod cross_db_reducer; mod csharp_module; mod default_module_clippy;