Skip to content
48 changes: 48 additions & 0 deletions crates/bindings-sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,23 @@ pub mod raw {
args_len: u32,
out: *mut BytesSource,
) -> u16;

/// 2PC variant of `call_reducer_on_db`.
///
/// Calls the target database's `/prepare/{reducer}` endpoint instead of `/call/{reducer}`.
/// On success, the runtime stores the `prepare_id` internally.
/// After the coordinator's reducer commits, all participants are committed.
/// If the coordinator's reducer fails, all participants are aborted.
///
/// Returns and errors are identical to `call_reducer_on_db`.
pub fn call_reducer_on_db_2pc(
identity_ptr: *const u8, // exactly 32 bytes, BSATN-encoded Identity
reducer_ptr: *const u8,
reducer_len: u32,
args_ptr: *const u8,
args_len: u32,
out: *mut BytesSource,
) -> u16;
}

/// What strategy does the database index use?
Expand Down Expand Up @@ -1510,6 +1527,37 @@ pub fn call_reducer_on_db(
}
}

/// 2PC variant of [`call_reducer_on_db`].
///
/// Calls `/prepare/{reducer}` on the target database. On success, the runtime
/// stores the prepare_id internally. After the coordinator's reducer commits,
/// all participants are committed. On failure, all participants are aborted.
///
/// Returns and errors are identical to [`call_reducer_on_db`].
#[inline]
pub fn call_reducer_on_db_2pc(
identity: [u8; 32],
reducer_name: &str,
args: &[u8],
) -> Result<(u16, raw::BytesSource), raw::BytesSource> {
let mut out = raw::BytesSource::INVALID;
let status = unsafe {
raw::call_reducer_on_db_2pc(
identity.as_ptr(),
reducer_name.as_ptr(),
reducer_name.len() as u32,
args.as_ptr(),
args.len() as u32,
&mut out,
)
};
if status == Errno::HTTP_ERROR.code() {
Err(out)
} else {
Ok((status, out))
}
}

/// Finds the JWT payload associated with `connection_id`.
/// If nothing is found for the connection, this returns None.
/// If a payload is found, this will return a valid [`raw::BytesSource`].
Expand Down
40 changes: 40 additions & 0 deletions crates/bindings/src/remote_reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,43 @@ pub fn call_reducer_on_db(database_identity: Identity, reducer_name: &str, args:
}
}
}

/// Call a reducer on a remote database using the 2PC prepare protocol.
///
/// This is the 2PC variant of [`call_reducer_on_db`]. It calls the target database's
/// `/prepare/{reducer}` endpoint. On success, the runtime stores the prepare_id internally.
/// After the coordinator's reducer commits, all participants are committed automatically.
/// If the coordinator's reducer fails (panics or returns Err), all participants are aborted.
///
/// Returns and errors are identical to [`call_reducer_on_db`].
pub fn call_reducer_on_db_2pc(
database_identity: Identity,
reducer_name: &str,
args: &[u8],
) -> Result<(), RemoteCallError> {
let identity_bytes = database_identity.to_byte_array();
match spacetimedb_bindings_sys::call_reducer_on_db_2pc(identity_bytes, reducer_name, args) {
Ok((status, body_source)) => {
if status < 300 {
return Ok(());
}
let msg = if body_source == spacetimedb_bindings_sys::raw::BytesSource::INVALID {
String::new()
} else {
let mut buf = IterBuf::take();
read_bytes_source_into(body_source, &mut buf);
String::from_utf8_lossy(&buf).into_owned()
};
if status == 404 {
Err(RemoteCallError::NotFound(msg))
} else {
Err(RemoteCallError::Failed(msg))
}
}
Err(err_source) => {
use crate::rt::read_bytes_source_as;
let msg = read_bytes_source_as::<String>(err_source);
Err(RemoteCallError::Unreachable(msg))
}
}
}
123 changes: 122 additions & 1 deletion crates/client-api/src/routes/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,115 @@ fn parse_call_args(content_type: headers::ContentType, body: Bytes) -> axum::res
}
}

/// 2PC prepare endpoint: execute a reducer and return a prepare_id.
///
/// `POST /v1/database/:name_or_identity/prepare/:reducer`
///
/// On success, the response includes:
/// - `X-Prepare-Id` header with the prepare_id
/// - Body contains the reducer return value (if any)
pub async fn prepare<S: ControlStateDelegate + NodeDelegate>(
State(worker_ctx): State<S>,
Extension(auth): Extension<SpacetimeAuth>,
Path(CallParams {
name_or_identity,
reducer,
}): Path<CallParams>,
TypedHeader(content_type): TypedHeader<headers::ContentType>,
body: Bytes,
) -> axum::response::Result<impl IntoResponse> {
let args = parse_call_args(content_type, body)?;
let caller_identity = auth.claims.identity;

let (module, Database { owner_identity, .. }) = find_module_and_database(&worker_ctx, name_or_identity).await?;

let connection_id = generate_random_connection_id();

module
.call_identity_connected(auth.into(), connection_id)
.await
.map_err(client_connected_error_to_response)?;

let result = module
.prepare_reducer(caller_identity, Some(connection_id), &reducer, args)
.await;

module
.call_identity_disconnected(caller_identity, connection_id)
.await
.map_err(client_disconnected_error_to_response)?;

match result {
Ok((prepare_id, rcr, return_value)) => {
let (status, body) =
reducer_outcome_response(&module, &owner_identity, &reducer, rcr.outcome, return_value)?;
let mut response = (
status,
TypedHeader(SpacetimeEnergyUsed(rcr.energy_used)),
TypedHeader(SpacetimeExecutionDurationMicros(rcr.execution_duration)),
body,
)
.into_response();
if !prepare_id.is_empty() {
response.headers_mut().insert(
"X-Prepare-Id",
http::HeaderValue::from_str(&prepare_id).unwrap(),
);
}
Ok(response)
}
Err(e) => Err(map_reducer_error(e, &reducer).into()),
}
}

#[derive(Deserialize)]
pub struct TwoPcParams {
name_or_identity: NameOrIdentity,
prepare_id: String,
}

/// 2PC commit endpoint: finalize a prepared transaction.
///
/// `POST /v1/database/:name_or_identity/2pc/commit/:prepare_id`
pub async fn commit_2pc<S: ControlStateDelegate + NodeDelegate>(
State(worker_ctx): State<S>,
Extension(_auth): Extension<SpacetimeAuth>,
Path(TwoPcParams {
name_or_identity,
prepare_id,
}): Path<TwoPcParams>,
) -> axum::response::Result<impl IntoResponse> {
let (module, _database) = find_module_and_database(&worker_ctx, name_or_identity).await?;

module.commit_prepared(&prepare_id).map_err(|e| {
log::error!("2PC commit failed: {e}");
(StatusCode::NOT_FOUND, e).into_response()
})?;

Ok(StatusCode::OK)
}

/// 2PC abort endpoint: abort a prepared transaction.
///
/// `POST /v1/database/:name_or_identity/2pc/abort/:prepare_id`
pub async fn abort_2pc<S: ControlStateDelegate + NodeDelegate>(
State(worker_ctx): State<S>,
Extension(_auth): Extension<SpacetimeAuth>,
Path(TwoPcParams {
name_or_identity,
prepare_id,
}): Path<TwoPcParams>,
) -> axum::response::Result<impl IntoResponse> {
let (module, _database) = find_module_and_database(&worker_ctx, name_or_identity).await?;

module.abort_prepared(&prepare_id).map_err(|e| {
log::error!("2PC abort failed: {e}");
(StatusCode::NOT_FOUND, e).into_response()
})?;

Ok(StatusCode::OK)
}

fn reducer_outcome_response(
module: &ModuleHost,
owner_identity: &Identity,
Expand Down Expand Up @@ -1247,6 +1356,12 @@ pub struct DatabaseRoutes<S> {
pub db_reset: MethodRouter<S>,
/// GET: /database/: name_or_identity/unstable/timestamp
pub timestamp_get: MethodRouter<S>,
/// POST: /database/:name_or_identity/prepare/:reducer
pub prepare_post: MethodRouter<S>,
/// POST: /database/:name_or_identity/2pc/commit/:prepare_id
pub commit_2pc_post: MethodRouter<S>,
/// POST: /database/:name_or_identity/2pc/abort/:prepare_id
pub abort_2pc_post: MethodRouter<S>,
}

impl<S> Default for DatabaseRoutes<S>
Expand All @@ -1272,6 +1387,9 @@ where
pre_publish: post(pre_publish::<S>),
db_reset: put(reset::<S>),
timestamp_get: get(get_timestamp::<S>),
prepare_post: post(prepare::<S>),
commit_2pc_post: post(commit_2pc::<S>),
abort_2pc_post: post(abort_2pc::<S>),
}
}
}
Expand All @@ -1296,7 +1414,10 @@ where
.route("/sql", self.sql_post)
.route("/unstable/timestamp", self.timestamp_get)
.route("/pre_publish", self.pre_publish)
.route("/reset", self.db_reset);
.route("/reset", self.db_reset)
.route("/prepare/:reducer", self.prepare_post)
.route("/2pc/commit/:prepare_id", self.commit_2pc_post)
.route("/2pc/abort/:prepare_id", self.abort_2pc_post);

axum::Router::new()
.route("/", self.root_post)
Expand Down
Loading
Loading