Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ anymap = "0.12"
arrayvec = "0.7.2"
async-stream = "0.3.6"
async-trait = "0.1.68"
axum = { version = "0.7", features = ["tracing"] }
axum = { version = "0.7", features = ["tracing", "http2"] }
axum-extra = { version = "0.9", features = ["typed-header"] }
backtrace = "0.3.66"
base64 = "0.21.2"
Expand Down
48 changes: 48 additions & 0 deletions crates/bindings-sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,23 @@ pub mod raw {
args_len: u32,
out: *mut BytesSource,
) -> u16;

/// 2PC variant of `call_reducer_on_db`.
///
/// Calls the target database's `/prepare/{reducer}` endpoint instead of `/call/{reducer}`.
/// On success, the runtime stores the `prepare_id` internally.
/// After the coordinator's reducer commits, all participants are committed.
/// If the coordinator's reducer fails, all participants are aborted.
///
/// Returns and errors are identical to `call_reducer_on_db`.
pub fn call_reducer_on_db_2pc(
identity_ptr: *const u8, // exactly 32 bytes, BSATN-encoded Identity
reducer_ptr: *const u8,
reducer_len: u32,
args_ptr: *const u8,
args_len: u32,
out: *mut BytesSource,
) -> u16;
}

/// What strategy does the database index use?
Expand Down Expand Up @@ -1510,6 +1527,37 @@ pub fn call_reducer_on_db(
}
}

/// 2PC variant of [`call_reducer_on_db`].
///
/// Calls `/prepare/{reducer}` on the target database. On success, the runtime
/// stores the prepare_id internally. After the coordinator's reducer commits,
/// all participants are committed. On failure, all participants are aborted.
///
/// Returns and errors are identical to [`call_reducer_on_db`].
#[inline]
pub fn call_reducer_on_db_2pc(
identity: [u8; 32],
reducer_name: &str,
args: &[u8],
) -> Result<(u16, raw::BytesSource), raw::BytesSource> {
let mut out = raw::BytesSource::INVALID;
let status = unsafe {
raw::call_reducer_on_db_2pc(
identity.as_ptr(),
reducer_name.as_ptr(),
reducer_name.len() as u32,
args.as_ptr(),
args.len() as u32,
&mut out,
)
};
if status == Errno::HTTP_ERROR.code() {
Err(out)
} else {
Ok((status, out))
}
}

/// Finds the JWT payload associated with `connection_id`.
/// If nothing is found for the connection, this returns None.
/// If a payload is found, this will return a valid [`raw::BytesSource`].
Expand Down
40 changes: 40 additions & 0 deletions crates/bindings/src/remote_reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,43 @@ pub fn call_reducer_on_db(
}
}
}

/// Call a reducer on a remote database using the 2PC prepare protocol.
///
/// This is the 2PC variant of [`call_reducer_on_db`]. It calls the target database's
/// `/prepare/{reducer}` endpoint. On success, the runtime stores the prepare_id internally.
/// After the coordinator's reducer commits, all participants are committed automatically.
/// If the coordinator's reducer fails (panics or returns Err), all participants are aborted.
///
/// Returns and errors are identical to [`call_reducer_on_db`].
pub fn call_reducer_on_db_2pc(
database_identity: Identity,
reducer_name: &str,
args: &[u8],
) -> Result<(), RemoteCallError> {
let identity_bytes = database_identity.to_byte_array();
match spacetimedb_bindings_sys::call_reducer_on_db_2pc(identity_bytes, reducer_name, args) {
Ok((status, body_source)) => {
if status < 300 {
return Ok(());
}
let msg = if body_source == spacetimedb_bindings_sys::raw::BytesSource::INVALID {
String::new()
} else {
let mut buf = IterBuf::take();
read_bytes_source_into(body_source, &mut buf);
String::from_utf8_lossy(&buf).into_owned()
};
if status == 404 {
Err(RemoteCallError::NotFound(msg))
} else {
Err(RemoteCallError::Failed(msg))
}
}
Err(err_source) => {
use crate::rt::read_bytes_source_as;
let msg = read_bytes_source_as::<String>(err_source);
Err(RemoteCallError::Unreachable(msg))
}
}
}
179 changes: 178 additions & 1 deletion crates/client-api/src/routes/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,163 @@ fn parse_call_args(content_type: headers::ContentType, body: Bytes) -> axum::res
}
}

/// 2PC prepare endpoint: execute a reducer and return a prepare_id.
///
/// `POST /v1/database/:name_or_identity/prepare/:reducer`
///
/// On success, the response includes:
/// - `X-Prepare-Id` header with the prepare_id
/// - Body contains the reducer return value (if any)
pub async fn prepare<S: ControlStateDelegate + NodeDelegate>(
State(worker_ctx): State<S>,
Extension(auth): Extension<SpacetimeAuth>,
Path(CallParams {
name_or_identity,
reducer,
}): Path<CallParams>,
TypedHeader(content_type): TypedHeader<headers::ContentType>,
headers: axum::http::HeaderMap,
body: Bytes,
) -> axum::response::Result<impl IntoResponse> {
let args = parse_call_args(content_type, body)?;
let caller_identity = auth.claims.identity;

// The coordinator sends its actual database identity in `X-Coordinator-Identity`.
// Without this, `anon_auth_middleware` gives the HTTP caller an ephemeral random
// identity, which gets stored in `st_2pc_state` and breaks recovery polling.
let coordinator_identity = headers
.get("X-Coordinator-Identity")
.and_then(|v| v.to_str().ok())
.and_then(|s| spacetimedb_lib::Identity::from_hex(s).ok());

let (module, Database { owner_identity, .. }) = find_module_and_database(&worker_ctx, name_or_identity).await?;

// 2PC prepare is a server-to-server call; no client lifecycle management needed.
// call_identity_connected/disconnected submit jobs to the module's executor, which
// will be blocked holding the 2PC write lock after prepare_reducer returns — deadlock.
let result = module
.prepare_reducer(caller_identity, None, &reducer, args.0, coordinator_identity)
.await;

match result {
Ok((prepare_id, rcr, return_value)) => {
let (status, body) =
reducer_outcome_response(&module, &owner_identity, &reducer, rcr.outcome, return_value, args.1)?;
let mut response = (
status,
TypedHeader(SpacetimeEnergyUsed(rcr.energy_used)),
TypedHeader(SpacetimeExecutionDurationMicros(rcr.execution_duration)),
body,
)
.into_response();
if !prepare_id.is_empty() {
response
.headers_mut()
.insert("X-Prepare-Id", http::HeaderValue::from_str(&prepare_id).unwrap());
}
Ok(response)
}
Err(e) => Err(map_reducer_error(e, &reducer).into()),
}
}

#[derive(Deserialize)]
pub struct TwoPcParams {
name_or_identity: NameOrIdentity,
prepare_id: String,
}

/// 2PC commit endpoint: finalize a prepared transaction.
///
/// `POST /v1/database/:name_or_identity/2pc/commit/:prepare_id`
pub async fn commit_2pc<S: ControlStateDelegate + NodeDelegate>(
State(worker_ctx): State<S>,
Extension(_auth): Extension<SpacetimeAuth>,
Path(TwoPcParams {
name_or_identity,
prepare_id,
}): Path<TwoPcParams>,
) -> axum::response::Result<impl IntoResponse> {
let (module, _database) = find_module_and_database(&worker_ctx, name_or_identity).await?;

module.commit_prepared(&prepare_id).map_err(|e| {
log::error!("2PC commit failed: {e}");
(StatusCode::NOT_FOUND, e).into_response()
})?;

Ok(StatusCode::OK)
}

/// 2PC abort endpoint: abort a prepared transaction.
///
/// `POST /v1/database/:name_or_identity/2pc/abort/:prepare_id`
pub async fn abort_2pc<S: ControlStateDelegate + NodeDelegate>(
State(worker_ctx): State<S>,
Extension(_auth): Extension<SpacetimeAuth>,
Path(TwoPcParams {
name_or_identity,
prepare_id,
}): Path<TwoPcParams>,
) -> axum::response::Result<impl IntoResponse> {
let (module, _database) = find_module_and_database(&worker_ctx, name_or_identity).await?;

module.abort_prepared(&prepare_id).map_err(|e| {
log::error!("2PC abort failed: {e}");
(StatusCode::NOT_FOUND, e).into_response()
})?;

Ok(StatusCode::OK)
}

/// 2PC coordinator status endpoint.
///
/// Returns `"commit"` if the coordinator has durably decided COMMIT for `prepare_id`,
/// or `"abort"` otherwise. Participant B polls this to recover from a timeout or crash.
///
/// `GET /v1/database/:name_or_identity/2pc/status/:prepare_id`
pub async fn status_2pc<S: ControlStateDelegate + NodeDelegate>(
State(worker_ctx): State<S>,
Extension(_auth): Extension<SpacetimeAuth>,
Path(TwoPcParams {
name_or_identity,
prepare_id,
}): Path<TwoPcParams>,
) -> axum::response::Result<impl IntoResponse> {
let (module, _database) = find_module_and_database(&worker_ctx, name_or_identity).await?;

let decision = if module.has_2pc_coordinator_commit(&prepare_id) {
"commit"
} else {
"abort"
};

Ok((StatusCode::OK, decision))
}

/// 2PC commit-ack endpoint.
///
/// Called by participant B after it commits via the status-poll recovery path,
/// so that the coordinator can delete its `st_2pc_coordinator_log` entry.
///
/// `POST /v1/database/:name_or_identity/2pc/ack-commit/:prepare_id`
pub async fn ack_commit_2pc<S: ControlStateDelegate + NodeDelegate>(
State(worker_ctx): State<S>,
Extension(_auth): Extension<SpacetimeAuth>,
Path(TwoPcParams {
name_or_identity,
prepare_id,
}): Path<TwoPcParams>,
) -> axum::response::Result<impl IntoResponse> {
let (module, _database) = find_module_and_database(&worker_ctx, name_or_identity).await?;

module.ack_2pc_coordinator_commit(&prepare_id).map_err(|e| {
log::error!("2PC ack-commit failed: {e}");
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response()
})?;

Ok(StatusCode::OK)
}

/// Encode a reducer return value as an HTTP response.
///
/// If the outcome is an error, return a raw string with `application/text`. Ignore `want_bsatn` in this case.
Expand Down Expand Up @@ -1280,6 +1437,16 @@ pub struct DatabaseRoutes<S> {
pub db_reset: MethodRouter<S>,
/// GET: /database/: name_or_identity/unstable/timestamp
pub timestamp_get: MethodRouter<S>,
/// POST: /database/:name_or_identity/prepare/:reducer
pub prepare_post: MethodRouter<S>,
/// POST: /database/:name_or_identity/2pc/commit/:prepare_id
pub commit_2pc_post: MethodRouter<S>,
/// POST: /database/:name_or_identity/2pc/abort/:prepare_id
pub abort_2pc_post: MethodRouter<S>,
/// GET: /database/:name_or_identity/2pc/status/:prepare_id
pub status_2pc_get: MethodRouter<S>,
/// POST: /database/:name_or_identity/2pc/ack-commit/:prepare_id
pub ack_commit_2pc_post: MethodRouter<S>,
}

impl<S> Default for DatabaseRoutes<S>
Expand All @@ -1305,6 +1472,11 @@ where
pre_publish: post(pre_publish::<S>),
db_reset: put(reset::<S>),
timestamp_get: get(get_timestamp::<S>),
prepare_post: post(prepare::<S>),
commit_2pc_post: post(commit_2pc::<S>),
abort_2pc_post: post(abort_2pc::<S>),
status_2pc_get: get(status_2pc::<S>),
ack_commit_2pc_post: post(ack_commit_2pc::<S>),
}
}
}
Expand All @@ -1329,7 +1501,12 @@ where
.route("/sql", self.sql_post)
.route("/unstable/timestamp", self.timestamp_get)
.route("/pre_publish", self.pre_publish)
.route("/reset", self.db_reset);
.route("/reset", self.db_reset)
.route("/prepare/:reducer", self.prepare_post)
.route("/2pc/commit/:prepare_id", self.commit_2pc_post)
.route("/2pc/abort/:prepare_id", self.abort_2pc_post)
.route("/2pc/status/:prepare_id", self.status_2pc_get)
.route("/2pc/ack-commit/:prepare_id", self.ack_commit_2pc_post);

axum::Router::new()
.route("/", self.root_post)
Expand Down
32 changes: 31 additions & 1 deletion crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use spacetimedb_commitlog::{self as commitlog, Commitlog, SizeOnDisk};
use spacetimedb_data_structures::map::HashSet;
use spacetimedb_datastore::db_metrics::DB_METRICS;
use spacetimedb_datastore::error::{DatastoreError, TableError, ViewError};
use spacetimedb_datastore::execution_context::{Workload, WorkloadType};
use spacetimedb_datastore::execution_context::{ReducerContext, Workload, WorkloadType};
use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
use spacetimedb_datastore::locking_tx_datastore::state_view::{
IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, StateView,
Expand Down Expand Up @@ -454,6 +454,26 @@ impl RelationalDB {
Ok(self.with_read_only(Workload::Internal, |tx| self.inner.program(tx))?)
}

/// Read any 2PC participant transactions that were in PREPARE state when the database
/// last shut down (or crashed).
///
/// Each returned row contains all the information needed to resume the transaction:
/// the prepare_id, coordinator identity, reducer name/args, and caller context.
/// B never aborts on its own — it polls the coordinator for a decision.
pub fn pending_2pc_prepares(&self) -> Result<Vec<spacetimedb_datastore::system_tables::St2pcStateRow>, DBError> {
self.with_auto_commit(Workload::Internal, |tx| tx.scan_st_2pc_state().map_err(DBError::from))
}

/// Read any 2PC coordinator log entries that have not yet been acknowledged by their
/// participants. Used on coordinator crash-recovery to retransmit COMMIT decisions.
pub fn pending_2pc_coordinator_commits(
&self,
) -> Result<Vec<spacetimedb_datastore::system_tables::St2pcCoordinatorLogRow>, DBError> {
self.with_auto_commit(Workload::Internal, |tx| {
tx.scan_st_2pc_coordinator_log().map_err(DBError::from)
})
}

/// Read the set of clients currently connected to the database.
pub fn connected_clients(&self) -> Result<ConnectedClients, DBError> {
self.with_read_only(Workload::Internal, |tx| {
Expand Down Expand Up @@ -844,6 +864,16 @@ impl RelationalDB {
(tx_data, tx_metrics, tx)
}

/// Forward a pre-built `TxData` directly to the durability worker.
///
/// Used by the 2PC participant path to make the `st_2pc_state` PREPARE marker durable
/// while the main write lock is still held (i.e. without going through a full commit).
pub fn request_durability_for_tx_data(&self, reducer_context: Option<ReducerContext>, tx_data: &Arc<TxData>) {
if let Some(durability) = &self.durability {
durability.request_durability(reducer_context, tx_data);
}
}

/// Get the [`DurableOffset`] of this database, or `None` if this is an
/// in-memory instance.
pub fn durable_tx_offset(&self) -> Option<DurableOffset> {
Expand Down
5 changes: 5 additions & 0 deletions crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1141,6 +1141,11 @@ impl Host {
module_host.clear_all_clients().await?;

scheduler_starter.start(&module_host)?;

// Crash recovery: retransmit any pending 2PC decisions from before the restart.
module_host.recover_2pc_coordinator();
module_host.recover_2pc_participant();

let disk_metrics_recorder_task = tokio::spawn(metric_reporter(replica_ctx.clone())).abort_handle();
let view_cleanup_task = spawn_view_cleanup_loop(replica_ctx.relational_db().clone());

Expand Down
Loading
Loading