Skip to content

Commit 3fec4df

Browse files
committed
Merge remote-tracking branch 'origin/shub/2pc-regular' into jdetter/tpcc
2 parents 3cec08f + f0c14b3 commit 3fec4df

File tree

23 files changed

+2227
-34
lines changed

23 files changed

+2227
-34
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ anymap = "0.12"
159159
arrayvec = "0.7.2"
160160
async-stream = "0.3.6"
161161
async-trait = "0.1.68"
162-
axum = { version = "0.7", features = ["tracing"] }
162+
axum = { version = "0.7", features = ["tracing", "http2"] }
163163
axum-extra = { version = "0.9", features = ["typed-header"] }
164164
backtrace = "0.3.66"
165165
base64 = "0.21.2"

crates/bindings-sys/src/lib.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -896,6 +896,23 @@ pub mod raw {
896896
args_len: u32,
897897
out: *mut BytesSource,
898898
) -> u16;
899+
900+
/// 2PC variant of `call_reducer_on_db`.
901+
///
902+
/// Calls the target database's `/prepare/{reducer}` endpoint instead of `/call/{reducer}`.
903+
/// On success, the runtime stores the `prepare_id` internally.
904+
/// After the coordinator's reducer commits, all participants are committed.
905+
/// If the coordinator's reducer fails, all participants are aborted.
906+
///
907+
/// Returns and errors are identical to `call_reducer_on_db`.
908+
pub fn call_reducer_on_db_2pc(
909+
identity_ptr: *const u8, // exactly 32 bytes, BSATN-encoded Identity
910+
reducer_ptr: *const u8,
911+
reducer_len: u32,
912+
args_ptr: *const u8,
913+
args_len: u32,
914+
out: *mut BytesSource,
915+
) -> u16;
899916
}
900917

901918
/// What strategy does the database index use?
@@ -1510,6 +1527,37 @@ pub fn call_reducer_on_db(
15101527
}
15111528
}
15121529

1530+
/// 2PC variant of [`call_reducer_on_db`].
1531+
///
1532+
/// Calls `/prepare/{reducer}` on the target database. On success, the runtime
1533+
/// stores the prepare_id internally. After the coordinator's reducer commits,
1534+
/// all participants are committed. On failure, all participants are aborted.
1535+
///
1536+
/// Returns and errors are identical to [`call_reducer_on_db`].
1537+
#[inline]
1538+
pub fn call_reducer_on_db_2pc(
1539+
identity: [u8; 32],
1540+
reducer_name: &str,
1541+
args: &[u8],
1542+
) -> Result<(u16, raw::BytesSource), raw::BytesSource> {
1543+
let mut out = raw::BytesSource::INVALID;
1544+
let status = unsafe {
1545+
raw::call_reducer_on_db_2pc(
1546+
identity.as_ptr(),
1547+
reducer_name.as_ptr(),
1548+
reducer_name.len() as u32,
1549+
args.as_ptr(),
1550+
args.len() as u32,
1551+
&mut out,
1552+
)
1553+
};
1554+
if status == Errno::HTTP_ERROR.code() {
1555+
Err(out)
1556+
} else {
1557+
Ok((status, out))
1558+
}
1559+
}
1560+
15131561
/// Finds the JWT payload associated with `connection_id`.
15141562
/// If nothing is found for the connection, this returns None.
15151563
/// If a payload is found, this will return a valid [`raw::BytesSource`].

crates/bindings/src/remote_reducer.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,3 +90,43 @@ pub fn call_reducer_on_db(
9090
}
9191
}
9292
}
93+
94+
/// Call a reducer on a remote database using the 2PC prepare protocol.
95+
///
96+
/// This is the 2PC variant of [`call_reducer_on_db`]. It calls the target database's
97+
/// `/prepare/{reducer}` endpoint. On success, the runtime stores the prepare_id internally.
98+
/// After the coordinator's reducer commits, all participants are committed automatically.
99+
/// If the coordinator's reducer fails (panics or returns Err), all participants are aborted.
100+
///
101+
/// Returns and errors are identical to [`call_reducer_on_db`].
102+
pub fn call_reducer_on_db_2pc(
103+
database_identity: Identity,
104+
reducer_name: &str,
105+
args: &[u8],
106+
) -> Result<(), RemoteCallError> {
107+
let identity_bytes = database_identity.to_byte_array();
108+
match spacetimedb_bindings_sys::call_reducer_on_db_2pc(identity_bytes, reducer_name, args) {
109+
Ok((status, body_source)) => {
110+
if status < 300 {
111+
return Ok(());
112+
}
113+
let msg = if body_source == spacetimedb_bindings_sys::raw::BytesSource::INVALID {
114+
String::new()
115+
} else {
116+
let mut buf = IterBuf::take();
117+
read_bytes_source_into(body_source, &mut buf);
118+
String::from_utf8_lossy(&buf).into_owned()
119+
};
120+
if status == 404 {
121+
Err(RemoteCallError::NotFound(msg))
122+
} else {
123+
Err(RemoteCallError::Failed(msg))
124+
}
125+
}
126+
Err(err_source) => {
127+
use crate::rt::read_bytes_source_as;
128+
let msg = read_bytes_source_as::<String>(err_source);
129+
Err(RemoteCallError::Unreachable(msg))
130+
}
131+
}
132+
}

crates/client-api/src/routes/database.rs

Lines changed: 178 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,163 @@ fn parse_call_args(content_type: headers::ContentType, body: Bytes) -> axum::res
249249
}
250250
}
251251

252+
/// 2PC prepare endpoint: execute a reducer and return a prepare_id.
253+
///
254+
/// `POST /v1/database/:name_or_identity/prepare/:reducer`
255+
///
256+
/// On success, the response includes:
257+
/// - `X-Prepare-Id` header with the prepare_id
258+
/// - Body contains the reducer return value (if any)
259+
pub async fn prepare<S: ControlStateDelegate + NodeDelegate>(
260+
State(worker_ctx): State<S>,
261+
Extension(auth): Extension<SpacetimeAuth>,
262+
Path(CallParams {
263+
name_or_identity,
264+
reducer,
265+
}): Path<CallParams>,
266+
TypedHeader(content_type): TypedHeader<headers::ContentType>,
267+
headers: axum::http::HeaderMap,
268+
body: Bytes,
269+
) -> axum::response::Result<impl IntoResponse> {
270+
let args = parse_call_args(content_type, body)?;
271+
let caller_identity = auth.claims.identity;
272+
273+
// The coordinator sends its actual database identity in `X-Coordinator-Identity`.
274+
// Without this, `anon_auth_middleware` gives the HTTP caller an ephemeral random
275+
// identity, which gets stored in `st_2pc_state` and breaks recovery polling.
276+
let coordinator_identity = headers
277+
.get("X-Coordinator-Identity")
278+
.and_then(|v| v.to_str().ok())
279+
.and_then(|s| spacetimedb_lib::Identity::from_hex(s).ok());
280+
281+
let (module, Database { owner_identity, .. }) = find_module_and_database(&worker_ctx, name_or_identity).await?;
282+
283+
// 2PC prepare is a server-to-server call; no client lifecycle management needed.
284+
// call_identity_connected/disconnected submit jobs to the module's executor, which
285+
// will be blocked holding the 2PC write lock after prepare_reducer returns — deadlock.
286+
let result = module
287+
.prepare_reducer(caller_identity, None, &reducer, args.0, coordinator_identity)
288+
.await;
289+
290+
match result {
291+
Ok((prepare_id, rcr, return_value)) => {
292+
let (status, body) =
293+
reducer_outcome_response(&module, &owner_identity, &reducer, rcr.outcome, return_value, args.1)?;
294+
let mut response = (
295+
status,
296+
TypedHeader(SpacetimeEnergyUsed(rcr.energy_used)),
297+
TypedHeader(SpacetimeExecutionDurationMicros(rcr.execution_duration)),
298+
body,
299+
)
300+
.into_response();
301+
if !prepare_id.is_empty() {
302+
response
303+
.headers_mut()
304+
.insert("X-Prepare-Id", http::HeaderValue::from_str(&prepare_id).unwrap());
305+
}
306+
Ok(response)
307+
}
308+
Err(e) => Err(map_reducer_error(e, &reducer).into()),
309+
}
310+
}
311+
312+
#[derive(Deserialize)]
313+
pub struct TwoPcParams {
314+
name_or_identity: NameOrIdentity,
315+
prepare_id: String,
316+
}
317+
318+
/// 2PC commit endpoint: finalize a prepared transaction.
319+
///
320+
/// `POST /v1/database/:name_or_identity/2pc/commit/:prepare_id`
321+
pub async fn commit_2pc<S: ControlStateDelegate + NodeDelegate>(
322+
State(worker_ctx): State<S>,
323+
Extension(_auth): Extension<SpacetimeAuth>,
324+
Path(TwoPcParams {
325+
name_or_identity,
326+
prepare_id,
327+
}): Path<TwoPcParams>,
328+
) -> axum::response::Result<impl IntoResponse> {
329+
let (module, _database) = find_module_and_database(&worker_ctx, name_or_identity).await?;
330+
331+
module.commit_prepared(&prepare_id).map_err(|e| {
332+
log::error!("2PC commit failed: {e}");
333+
(StatusCode::NOT_FOUND, e).into_response()
334+
})?;
335+
336+
Ok(StatusCode::OK)
337+
}
338+
339+
/// 2PC abort endpoint: abort a prepared transaction.
340+
///
341+
/// `POST /v1/database/:name_or_identity/2pc/abort/:prepare_id`
342+
pub async fn abort_2pc<S: ControlStateDelegate + NodeDelegate>(
343+
State(worker_ctx): State<S>,
344+
Extension(_auth): Extension<SpacetimeAuth>,
345+
Path(TwoPcParams {
346+
name_or_identity,
347+
prepare_id,
348+
}): Path<TwoPcParams>,
349+
) -> axum::response::Result<impl IntoResponse> {
350+
let (module, _database) = find_module_and_database(&worker_ctx, name_or_identity).await?;
351+
352+
module.abort_prepared(&prepare_id).map_err(|e| {
353+
log::error!("2PC abort failed: {e}");
354+
(StatusCode::NOT_FOUND, e).into_response()
355+
})?;
356+
357+
Ok(StatusCode::OK)
358+
}
359+
360+
/// 2PC coordinator status endpoint.
361+
///
362+
/// Returns `"commit"` if the coordinator has durably decided COMMIT for `prepare_id`,
363+
/// or `"abort"` otherwise. Participant B polls this to recover from a timeout or crash.
364+
///
365+
/// `GET /v1/database/:name_or_identity/2pc/status/:prepare_id`
366+
pub async fn status_2pc<S: ControlStateDelegate + NodeDelegate>(
367+
State(worker_ctx): State<S>,
368+
Extension(_auth): Extension<SpacetimeAuth>,
369+
Path(TwoPcParams {
370+
name_or_identity,
371+
prepare_id,
372+
}): Path<TwoPcParams>,
373+
) -> axum::response::Result<impl IntoResponse> {
374+
let (module, _database) = find_module_and_database(&worker_ctx, name_or_identity).await?;
375+
376+
let decision = if module.has_2pc_coordinator_commit(&prepare_id) {
377+
"commit"
378+
} else {
379+
"abort"
380+
};
381+
382+
Ok((StatusCode::OK, decision))
383+
}
384+
385+
/// 2PC commit-ack endpoint.
386+
///
387+
/// Called by participant B after it commits via the status-poll recovery path,
388+
/// so that the coordinator can delete its `st_2pc_coordinator_log` entry.
389+
///
390+
/// `POST /v1/database/:name_or_identity/2pc/ack-commit/:prepare_id`
391+
pub async fn ack_commit_2pc<S: ControlStateDelegate + NodeDelegate>(
392+
State(worker_ctx): State<S>,
393+
Extension(_auth): Extension<SpacetimeAuth>,
394+
Path(TwoPcParams {
395+
name_or_identity,
396+
prepare_id,
397+
}): Path<TwoPcParams>,
398+
) -> axum::response::Result<impl IntoResponse> {
399+
let (module, _database) = find_module_and_database(&worker_ctx, name_or_identity).await?;
400+
401+
module.ack_2pc_coordinator_commit(&prepare_id).map_err(|e| {
402+
log::error!("2PC ack-commit failed: {e}");
403+
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response()
404+
})?;
405+
406+
Ok(StatusCode::OK)
407+
}
408+
252409
/// Encode a reducer return value as an HTTP response.
253410
///
254411
/// If the outcome is an error, return a raw string with `application/text`. Ignore `want_bsatn` in this case.
@@ -1278,6 +1435,16 @@ pub struct DatabaseRoutes<S> {
12781435
pub db_reset: MethodRouter<S>,
12791436
/// GET: /database/: name_or_identity/unstable/timestamp
12801437
pub timestamp_get: MethodRouter<S>,
1438+
/// POST: /database/:name_or_identity/prepare/:reducer
1439+
pub prepare_post: MethodRouter<S>,
1440+
/// POST: /database/:name_or_identity/2pc/commit/:prepare_id
1441+
pub commit_2pc_post: MethodRouter<S>,
1442+
/// POST: /database/:name_or_identity/2pc/abort/:prepare_id
1443+
pub abort_2pc_post: MethodRouter<S>,
1444+
/// GET: /database/:name_or_identity/2pc/status/:prepare_id
1445+
pub status_2pc_get: MethodRouter<S>,
1446+
/// POST: /database/:name_or_identity/2pc/ack-commit/:prepare_id
1447+
pub ack_commit_2pc_post: MethodRouter<S>,
12811448
}
12821449

12831450
impl<S> Default for DatabaseRoutes<S>
@@ -1303,6 +1470,11 @@ where
13031470
pre_publish: post(pre_publish::<S>),
13041471
db_reset: put(reset::<S>),
13051472
timestamp_get: get(get_timestamp::<S>),
1473+
prepare_post: post(prepare::<S>),
1474+
commit_2pc_post: post(commit_2pc::<S>),
1475+
abort_2pc_post: post(abort_2pc::<S>),
1476+
status_2pc_get: get(status_2pc::<S>),
1477+
ack_commit_2pc_post: post(ack_commit_2pc::<S>),
13061478
}
13071479
}
13081480
}
@@ -1327,7 +1499,12 @@ where
13271499
.route("/sql", self.sql_post)
13281500
.route("/unstable/timestamp", self.timestamp_get)
13291501
.route("/pre_publish", self.pre_publish)
1330-
.route("/reset", self.db_reset);
1502+
.route("/reset", self.db_reset)
1503+
.route("/prepare/:reducer", self.prepare_post)
1504+
.route("/2pc/commit/:prepare_id", self.commit_2pc_post)
1505+
.route("/2pc/abort/:prepare_id", self.abort_2pc_post)
1506+
.route("/2pc/status/:prepare_id", self.status_2pc_get)
1507+
.route("/2pc/ack-commit/:prepare_id", self.ack_commit_2pc_post);
13311508

13321509
axum::Router::new()
13331510
.route("/", self.root_post)

crates/core/src/db/relational_db.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use spacetimedb_commitlog::{self as commitlog, Commitlog, SizeOnDisk};
1212
use spacetimedb_data_structures::map::HashSet;
1313
use spacetimedb_datastore::db_metrics::DB_METRICS;
1414
use spacetimedb_datastore::error::{DatastoreError, TableError, ViewError};
15-
use spacetimedb_datastore::execution_context::{Workload, WorkloadType};
15+
use spacetimedb_datastore::execution_context::{ReducerContext, Workload, WorkloadType};
1616
use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
1717
use spacetimedb_datastore::locking_tx_datastore::state_view::{
1818
IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, StateView,
@@ -454,6 +454,26 @@ impl RelationalDB {
454454
Ok(self.with_read_only(Workload::Internal, |tx| self.inner.program(tx))?)
455455
}
456456

457+
/// Read any 2PC participant transactions that were in PREPARE state when the database
458+
/// last shut down (or crashed).
459+
///
460+
/// Each returned row contains all the information needed to resume the transaction:
461+
/// the prepare_id, coordinator identity, reducer name/args, and caller context.
462+
/// B never aborts on its own — it polls the coordinator for a decision.
463+
pub fn pending_2pc_prepares(&self) -> Result<Vec<spacetimedb_datastore::system_tables::St2pcStateRow>, DBError> {
464+
self.with_auto_commit(Workload::Internal, |tx| tx.scan_st_2pc_state().map_err(DBError::from))
465+
}
466+
467+
/// Read any 2PC coordinator log entries that have not yet been acknowledged by their
468+
/// participants. Used on coordinator crash-recovery to retransmit COMMIT decisions.
469+
pub fn pending_2pc_coordinator_commits(
470+
&self,
471+
) -> Result<Vec<spacetimedb_datastore::system_tables::St2pcCoordinatorLogRow>, DBError> {
472+
self.with_auto_commit(Workload::Internal, |tx| {
473+
tx.scan_st_2pc_coordinator_log().map_err(DBError::from)
474+
})
475+
}
476+
457477
/// Read the set of clients currently connected to the database.
458478
pub fn connected_clients(&self) -> Result<ConnectedClients, DBError> {
459479
self.with_read_only(Workload::Internal, |tx| {
@@ -844,6 +864,16 @@ impl RelationalDB {
844864
(tx_data, tx_metrics, tx)
845865
}
846866

867+
/// Forward a pre-built `TxData` directly to the durability worker.
868+
///
869+
/// Used by the 2PC participant path to make the `st_2pc_state` PREPARE marker durable
870+
/// while the main write lock is still held (i.e. without going through a full commit).
871+
pub fn request_durability_for_tx_data(&self, reducer_context: Option<ReducerContext>, tx_data: &Arc<TxData>) {
872+
if let Some(durability) = &self.durability {
873+
durability.request_durability(reducer_context, tx_data);
874+
}
875+
}
876+
847877
/// Get the [`DurableOffset`] of this database, or `None` if this is an
848878
/// in-memory instance.
849879
pub fn durable_tx_offset(&self) -> Option<DurableOffset> {

crates/core/src/host/host_controller.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1141,6 +1141,11 @@ impl Host {
11411141
module_host.clear_all_clients().await?;
11421142

11431143
scheduler_starter.start(&module_host)?;
1144+
1145+
// Crash recovery: retransmit any pending 2PC decisions from before the restart.
1146+
module_host.recover_2pc_coordinator();
1147+
module_host.recover_2pc_participant();
1148+
11441149
let disk_metrics_recorder_task = tokio::spawn(metric_reporter(replica_ctx.clone())).abort_handle();
11451150
let view_cleanup_task = spawn_view_cleanup_loop(replica_ctx.relational_db().clone());
11461151

0 commit comments

Comments
 (0)