Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions crates/bindings-sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,39 @@ pub mod raw {
) -> u16;
}

#[link(wasm_import_module = "spacetime_10.5")]
unsafe extern "C" {
/// Call a reducer on another SpacetimeDB database.
///
/// - `identity_ptr` must point to exactly 32 bytes — the BSATN (little-endian) encoding of
/// the target database `Identity`.
/// - `reducer_ptr[..reducer_len]` is the UTF-8 name of the reducer to call.
/// - `args_ptr[..args_len]` is the BSATN-encoded reducer arguments.
///
/// On transport success (any HTTP response received):
/// - Returns the HTTP status code (e.g. 200, 400, 530).
/// - Writes a [`BytesSource`] containing the response body bytes to `*out`.
///
/// On transport failure (connection refused, timeout, etc.):
/// - Returns `errno::HTTP_ERROR` (21).
/// - Writes a [`BytesSource`] containing a BSATN-encoded error [`String`] to `*out`.
///
/// Unlike `procedure_http_request`, this syscall may be called while a transaction
/// is open (i.e. from within a reducer body).
///
/// # Traps
///
/// Traps if any pointer is NULL or its range falls outside of linear memory.
pub fn call_reducer_on_db(
identity_ptr: *const u8, // exactly 32 bytes, BSATN-encoded Identity
reducer_ptr: *const u8,
reducer_len: u32,
args_ptr: *const u8,
args_len: u32,
out: *mut BytesSource,
) -> u16;
}

/// What strategy does the database index use?
///
/// See also: <https://www.postgresql.org/docs/current/sql-createindex.html>
Expand Down Expand Up @@ -1438,6 +1471,45 @@ pub fn identity() -> [u8; 32] {
buf
}

/// Call a reducer on a remote database identified by `identity` (little-endian 32-byte array).
///
/// On transport success (any HTTP response received):
/// - Returns `Ok((status, body_source))` where `status` is the HTTP status code and
/// `body_source` is a [`raw::BytesSource`] containing the raw response body bytes.
///
/// On transport failure (connection refused, timeout, etc.):
/// - Returns `Err(err_source)` where `err_source` is a [`raw::BytesSource`] containing
/// a BSATN-encoded error [`String`].
///
/// Unlike HTTP requests, this syscall may be called while a transaction is open.
#[inline]
pub fn call_reducer_on_db(
identity: [u8; 32],
reducer_name: &str,
args: &[u8],
) -> Result<(u16, raw::BytesSource), raw::BytesSource> {
let mut out = raw::BytesSource::INVALID;
let status = unsafe {
raw::call_reducer_on_db(
identity.as_ptr(),
reducer_name.as_ptr(),
reducer_name.len() as u32,
args.as_ptr(),
args.len() as u32,
&mut out,
)
};
// The raw ABI returns either the HTTP status code (100-599) or HTTP_ERROR errno
// on transport failure. Unlike other ABI functions, a non-zero return value here
// does NOT indicate a generic errno — it's the HTTP status code. Only HTTP_ERROR
// specifically signals a transport-level failure.
if status == Errno::HTTP_ERROR.code() {
Err(out)
} else {
Ok((status, out))
}
}

/// Finds the JWT payload associated with `connection_id`.
/// If nothing is found for the connection, this returns None.
/// If a payload is found, this will return a valid [`raw::BytesSource`].
Expand Down
1 change: 1 addition & 0 deletions crates/bindings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod client_visibility_filter;
pub mod http;
pub mod log_stopwatch;
mod logger;
pub mod remote_reducer;
#[cfg(feature = "rand08")]
mod rng;
#[doc(hidden)]
Expand Down
86 changes: 86 additions & 0 deletions crates/bindings/src/remote_reducer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
//! Binding for calling reducers on remote SpacetimeDB databases.
//!
//! Call a reducer on another database using [`call_reducer_on_db`].
//!
//! The args must be BSATN-encoded. Returns `Ok(())` when the remote reducer
//! ran and succeeded, or one of the [`RemoteCallError`] variants on failure.
//!
//! # Example
//!
//! ```no_run
//! use spacetimedb::{remote_reducer, Identity};
//!
//! #[spacetimedb::reducer]
//! fn call_remote(ctx: &spacetimedb::ReducerContext, target: Identity) {
//! // Empty BSATN args for a zero-argument reducer.
//! let args = spacetimedb::bsatn::to_vec(&()).unwrap();
//! match remote_reducer::call_reducer_on_db(target, "my_reducer", &args) {
//! Ok(()) => log::info!("remote reducer succeeded"),
//! Err(remote_reducer::RemoteCallError::Failed(msg)) => log::error!("reducer failed: {msg}"),
//! Err(remote_reducer::RemoteCallError::NotFound(msg)) => log::error!("not found: {msg}"),
//! Err(remote_reducer::RemoteCallError::Unreachable(msg)) => log::error!("unreachable: {msg}"),
//! }
//! }
//! ```

use crate::{rt::read_bytes_source_into, Identity, IterBuf};

/// Error returned by [`call_reducer_on_db`].
#[derive(Debug)]
pub enum RemoteCallError {
/// The remote reducer ran but returned an error. Contains the error message from the server.
Failed(String),
/// The target database or reducer does not exist (HTTP 404).
NotFound(String),
/// The call could not be delivered (connection refused, timeout, network error, etc.).
Unreachable(String),
}

impl core::fmt::Display for RemoteCallError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
RemoteCallError::Failed(msg) => write!(f, "remote reducer failed: {msg}"),
RemoteCallError::NotFound(msg) => write!(f, "remote database or reducer not found: {msg}"),
RemoteCallError::Unreachable(msg) => write!(f, "remote database unreachable: {msg}"),
}
}
}

/// Call a reducer on a remote database.
///
/// - `database_identity`: the target database.
/// - `reducer_name`: the name of the reducer to invoke (must be valid UTF-8).
/// - `args`: BSATN-encoded reducer arguments.
///
/// Returns `Ok(())` when the remote reducer ran and succeeded.
/// Returns `Err(RemoteCallError::Failed(msg))` when the reducer ran but returned an error.
/// Returns `Err(RemoteCallError::NotFound(msg))` when the database or reducer does not exist.
/// Returns `Err(RemoteCallError::Unreachable(msg))` on transport failure (connection refused, timeout, …).
pub fn call_reducer_on_db(database_identity: Identity, reducer_name: &str, args: &[u8]) -> Result<(), RemoteCallError> {
let identity_bytes = database_identity.to_byte_array();
match spacetimedb_bindings_sys::call_reducer_on_db(identity_bytes, reducer_name, args) {
Ok((status, body_source)) => {
if status < 300 {
return Ok(());
}
// Decode the response body as the error message.
let msg = if body_source == spacetimedb_bindings_sys::raw::BytesSource::INVALID {
String::new()
} else {
let mut buf = IterBuf::take();
read_bytes_source_into(body_source, &mut buf);
String::from_utf8_lossy(&buf).into_owned()
};
if status == 404 {
Err(RemoteCallError::NotFound(msg))
} else {
Err(RemoteCallError::Failed(msg))
}
}
Err(err_source) => {
use crate::rt::read_bytes_source_as;
let msg = read_bytes_source_as::<String>(err_source);
Err(RemoteCallError::Unreachable(msg))
}
}
}
22 changes: 14 additions & 8 deletions crates/client-api/src/routes/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,11 @@ pub async fn call<S: ControlStateDelegate + NodeDelegate>(
reducer,
}): Path<CallParams>,
TypedHeader(content_type): TypedHeader<headers::ContentType>,
ByteStringBody(body): ByteStringBody,
body: Bytes,
) -> axum::response::Result<impl IntoResponse> {
assert_content_type_json(content_type)?;

let caller_identity = auth.claims.identity;

let args = FunctionArgs::Json(body);
let args = parse_call_args(content_type, body)?;

// HTTP callers always need a connection ID to provide to connect/disconnect,
// so generate one.
Expand Down Expand Up @@ -216,11 +214,19 @@ pub async fn call<S: ControlStateDelegate + NodeDelegate>(
}
}

fn assert_content_type_json(content_type: headers::ContentType) -> axum::response::Result<()> {
if content_type != headers::ContentType::json() {
Err(axum::extract::rejection::MissingJsonContentType::default().into())
/// Parse call arguments from an HTTP body based on content type.
///
/// - `application/json` → [`FunctionArgs::Json`] (UTF-8 required).
/// - `application/octet-stream` → [`FunctionArgs::Bsatn`] (raw BSATN bytes).
fn parse_call_args(content_type: headers::ContentType, body: Bytes) -> axum::response::Result<FunctionArgs> {
if content_type == headers::ContentType::json() {
let s = bytestring::ByteString::try_from(body)
.map_err(|_| (StatusCode::BAD_REQUEST, "request body is not valid UTF-8").into_response())?;
Ok(FunctionArgs::Json(s))
} else if content_type == headers::ContentType::from(mime::APPLICATION_OCTET_STREAM) {
Ok(FunctionArgs::Bsatn(body))
} else {
Ok(())
Err(axum::extract::rejection::MissingJsonContentType::default().into())
}
}

Expand Down
52 changes: 51 additions & 1 deletion crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ use crate::db::persistence::PersistenceProvider;
use crate::db::relational_db::{self, spawn_view_cleanup_loop, DiskSizeFn, RelationalDB, Txdata};
use crate::db::{self, spawn_tx_metrics_recorder};
use crate::energy::{EnergyMonitor, EnergyQuanta, NullEnergyMonitor};
use crate::host::reducer_router::{LocalReducerRouter, ReducerCallRouter};
use crate::host::v8::V8Runtime;
use crate::host::ProcedureCallError;
use crate::messages::control_db::{Database, HostType};
use crate::module_host_context::ModuleCreationContext;
use crate::replica_context::ReplicaContext;
use crate::replica_context::{CallReducerOnDbConfig, ReplicaContext};
use crate::subscription::module_subscription_actor::ModuleSubscriptions;
use crate::subscription::module_subscription_manager::{spawn_send_worker, SubscriptionManager, TransactionOffset};
use crate::subscription::row_list_builder_pool::BsatnRowListBuilderPool;
Expand Down Expand Up @@ -117,6 +118,26 @@ pub struct HostController {
db_cores: JobCores,
/// The pool of buffers used to build `BsatnRowList`s in subscriptions.
pub bsatn_rlb_pool: BsatnRowListBuilderPool,
/// Warmed HTTP/2 client shared by all replicas on this host for
/// [`crate::host::instance_env::InstanceEnv::call_reducer_on_db`].
///
/// All per-replica clones share the same underlying connection pool.
pub call_reducer_client: reqwest::Client,
/// Router that resolves the HTTP base URL of the leader node for a given database.
///
/// Set to [`LocalReducerRouter`] by default; replaced with `ClusterReducerRouter`
/// in cluster deployments via [`HostController::new`] receiving the router directly.
pub call_reducer_router: Arc<dyn ReducerCallRouter>,
/// A single node-level Bearer token included in all outgoing cross-DB reducer calls.
///
/// Set once at node startup by the deployment layer (standalone / cluster) so that
/// `anon_auth_middleware` on the target node accepts the request without generating a
/// fresh ephemeral identity on every call. All replicas on this node share the same
/// token — the target only needs proof that the caller is a legitimate node, not which
/// specific database initiated the call.
///
/// `None` in test/embedded contexts where no JWT signer is configured.
pub call_reducer_auth_token: Option<String>,
}

pub(crate) struct HostRuntimes {
Expand Down Expand Up @@ -228,6 +249,9 @@ impl HostController {
page_pool: PagePool::new(default_config.page_pool_max_size),
bsatn_rlb_pool: BsatnRowListBuilderPool::new(),
db_cores,
call_reducer_client: ReplicaContext::new_call_reducer_client(&CallReducerOnDbConfig::default()),
call_reducer_router: Arc::new(LocalReducerRouter::new("http://127.0.0.1:3000")),
call_reducer_auth_token: None,
}
}

Expand Down Expand Up @@ -658,12 +682,16 @@ fn stored_program_hash(db: &RelationalDB) -> anyhow::Result<Option<Hash>> {
Ok(meta.map(|meta| meta.program_hash))
}

#[allow(clippy::too_many_arguments)]
async fn make_replica_ctx(
module_logs: Option<ModuleLogsDir>,
database: Database,
replica_id: u64,
relational_db: Arc<RelationalDB>,
bsatn_rlb_pool: BsatnRowListBuilderPool,
call_reducer_client: reqwest::Client,
call_reducer_router: Arc<dyn ReducerCallRouter>,
call_reducer_auth_token: Option<String>,
) -> anyhow::Result<ReplicaContext> {
let logger = match module_logs {
Some(path) => asyncify(move || Arc::new(DatabaseLogger::open_today(path))).await,
Expand Down Expand Up @@ -696,6 +724,9 @@ async fn make_replica_ctx(
replica_id,
logger,
subscriptions,
call_reducer_client,
call_reducer_router,
call_reducer_auth_token,
})
}

Expand Down Expand Up @@ -771,6 +802,9 @@ struct ModuleLauncher<F> {
runtimes: Arc<HostRuntimes>,
core: AllocatedJobCore,
bsatn_rlb_pool: BsatnRowListBuilderPool,
call_reducer_client: reqwest::Client,
call_reducer_router: Arc<dyn ReducerCallRouter>,
call_reducer_auth_token: Option<String>,
}

impl<F: Fn() + Send + Sync + 'static> ModuleLauncher<F> {
Expand All @@ -790,6 +824,9 @@ impl<F: Fn() + Send + Sync + 'static> ModuleLauncher<F> {
self.replica_id,
self.relational_db,
self.bsatn_rlb_pool,
self.call_reducer_client,
self.call_reducer_router,
self.call_reducer_auth_token,
)
.await
.map(Arc::new)?;
Expand Down Expand Up @@ -991,6 +1028,9 @@ impl Host {
runtimes: runtimes.clone(),
core: host_controller.db_cores.take(),
bsatn_rlb_pool: bsatn_rlb_pool.clone(),
call_reducer_client: host_controller.call_reducer_client.clone(),
call_reducer_router: host_controller.call_reducer_router.clone(),
call_reducer_auth_token: host_controller.call_reducer_auth_token.clone(),
}
.launch_module()
.await?
Expand Down Expand Up @@ -1020,6 +1060,9 @@ impl Host {
runtimes: runtimes.clone(),
core: host_controller.db_cores.take(),
bsatn_rlb_pool: bsatn_rlb_pool.clone(),
call_reducer_client: host_controller.call_reducer_client.clone(),
call_reducer_router: host_controller.call_reducer_router.clone(),
call_reducer_auth_token: host_controller.call_reducer_auth_token.clone(),
}
.launch_module()
.await;
Expand All @@ -1043,6 +1086,9 @@ impl Host {
runtimes: runtimes.clone(),
core: host_controller.db_cores.take(),
bsatn_rlb_pool: bsatn_rlb_pool.clone(),
call_reducer_client: host_controller.call_reducer_client.clone(),
call_reducer_router: host_controller.call_reducer_router.clone(),
call_reducer_auth_token: host_controller.call_reducer_auth_token.clone(),
}
.launch_module()
.await;
Expand Down Expand Up @@ -1150,6 +1196,10 @@ impl Host {
runtimes: runtimes.clone(),
core,
bsatn_rlb_pool,
// Transient validation-only module; build its own client and router with defaults.
call_reducer_client: ReplicaContext::new_call_reducer_client(&CallReducerOnDbConfig::default()),
call_reducer_router: Arc::new(LocalReducerRouter::new("http://127.0.0.1:3000")),
call_reducer_auth_token: None,
}
.launch_module()
.await
Expand Down
Loading
Loading