diff --git a/crates/bindings-sys/src/lib.rs b/crates/bindings-sys/src/lib.rs index 95dfbc7e600..8854ae393b6 100644 --- a/crates/bindings-sys/src/lib.rs +++ b/crates/bindings-sys/src/lib.rs @@ -865,6 +865,39 @@ pub mod raw { ) -> u16; } + #[link(wasm_import_module = "spacetime_10.5")] + unsafe extern "C" { + /// Call a reducer on another SpacetimeDB database. + /// + /// - `identity_ptr` must point to exactly 32 bytes — the BSATN (little-endian) encoding of + /// the target database `Identity`. + /// - `reducer_ptr[..reducer_len]` is the UTF-8 name of the reducer to call. + /// - `args_ptr[..args_len]` is the BSATN-encoded reducer arguments. + /// + /// On transport success (any HTTP response received): + /// - Returns the HTTP status code (e.g. 200, 400, 530). + /// - Writes a [`BytesSource`] containing the response body bytes to `*out`. + /// + /// On transport failure (connection refused, timeout, etc.): + /// - Returns `errno::HTTP_ERROR` (21). + /// - Writes a [`BytesSource`] containing a BSATN-encoded error [`String`] to `*out`. + /// + /// Unlike `procedure_http_request`, this syscall may be called while a transaction + /// is open (i.e. from within a reducer body). + /// + /// # Traps + /// + /// Traps if any pointer is NULL or its range falls outside of linear memory. + pub fn call_reducer_on_db( + identity_ptr: *const u8, // exactly 32 bytes, BSATN-encoded Identity + reducer_ptr: *const u8, + reducer_len: u32, + args_ptr: *const u8, + args_len: u32, + out: *mut BytesSource, + ) -> u16; + } + /// What strategy does the database index use? /// /// See also: @@ -1438,6 +1471,45 @@ pub fn identity() -> [u8; 32] { buf } +/// Call a reducer on a remote database identified by `identity` (little-endian 32-byte array). +/// +/// On transport success (any HTTP response received): +/// - Returns `Ok((status, body_source))` where `status` is the HTTP status code and +/// `body_source` is a [`raw::BytesSource`] containing the raw response body bytes. +/// +/// On transport failure (connection refused, timeout, etc.): +/// - Returns `Err(err_source)` where `err_source` is a [`raw::BytesSource`] containing +/// a BSATN-encoded error [`String`]. +/// +/// Unlike HTTP requests, this syscall may be called while a transaction is open. +#[inline] +pub fn call_reducer_on_db( + identity: [u8; 32], + reducer_name: &str, + args: &[u8], +) -> Result<(u16, raw::BytesSource), raw::BytesSource> { + let mut out = raw::BytesSource::INVALID; + let status = unsafe { + raw::call_reducer_on_db( + identity.as_ptr(), + reducer_name.as_ptr(), + reducer_name.len() as u32, + args.as_ptr(), + args.len() as u32, + &mut out, + ) + }; + // The raw ABI returns either the HTTP status code (100-599) or HTTP_ERROR errno + // on transport failure. Unlike other ABI functions, a non-zero return value here + // does NOT indicate a generic errno — it's the HTTP status code. Only HTTP_ERROR + // specifically signals a transport-level failure. + if status == Errno::HTTP_ERROR.code() { + Err(out) + } else { + Ok((status, out)) + } +} + /// Finds the JWT payload associated with `connection_id`. /// If nothing is found for the connection, this returns None. /// If a payload is found, this will return a valid [`raw::BytesSource`]. diff --git a/crates/bindings/src/lib.rs b/crates/bindings/src/lib.rs index 9e02a3a97f0..f91151ffb88 100644 --- a/crates/bindings/src/lib.rs +++ b/crates/bindings/src/lib.rs @@ -12,6 +12,7 @@ mod client_visibility_filter; pub mod http; pub mod log_stopwatch; mod logger; +pub mod remote_reducer; #[cfg(feature = "rand08")] mod rng; #[doc(hidden)] diff --git a/crates/bindings/src/remote_reducer.rs b/crates/bindings/src/remote_reducer.rs new file mode 100644 index 00000000000..bded8bc5ae7 --- /dev/null +++ b/crates/bindings/src/remote_reducer.rs @@ -0,0 +1,86 @@ +//! Binding for calling reducers on remote SpacetimeDB databases. +//! +//! Call a reducer on another database using [`call_reducer_on_db`]. +//! +//! The args must be BSATN-encoded. Returns `Ok(())` when the remote reducer +//! ran and succeeded, or one of the [`RemoteCallError`] variants on failure. +//! +//! # Example +//! +//! ```no_run +//! use spacetimedb::{remote_reducer, Identity}; +//! +//! #[spacetimedb::reducer] +//! fn call_remote(ctx: &spacetimedb::ReducerContext, target: Identity) { +//! // Empty BSATN args for a zero-argument reducer. +//! let args = spacetimedb::bsatn::to_vec(&()).unwrap(); +//! match remote_reducer::call_reducer_on_db(target, "my_reducer", &args) { +//! Ok(()) => log::info!("remote reducer succeeded"), +//! Err(remote_reducer::RemoteCallError::Failed(msg)) => log::error!("reducer failed: {msg}"), +//! Err(remote_reducer::RemoteCallError::NotFound(msg)) => log::error!("not found: {msg}"), +//! Err(remote_reducer::RemoteCallError::Unreachable(msg)) => log::error!("unreachable: {msg}"), +//! } +//! } +//! ``` + +use crate::{rt::read_bytes_source_into, Identity, IterBuf}; + +/// Error returned by [`call_reducer_on_db`]. +#[derive(Debug)] +pub enum RemoteCallError { + /// The remote reducer ran but returned an error. Contains the error message from the server. + Failed(String), + /// The target database or reducer does not exist (HTTP 404). + NotFound(String), + /// The call could not be delivered (connection refused, timeout, network error, etc.). + Unreachable(String), +} + +impl core::fmt::Display for RemoteCallError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + RemoteCallError::Failed(msg) => write!(f, "remote reducer failed: {msg}"), + RemoteCallError::NotFound(msg) => write!(f, "remote database or reducer not found: {msg}"), + RemoteCallError::Unreachable(msg) => write!(f, "remote database unreachable: {msg}"), + } + } +} + +/// Call a reducer on a remote database. +/// +/// - `database_identity`: the target database. +/// - `reducer_name`: the name of the reducer to invoke (must be valid UTF-8). +/// - `args`: BSATN-encoded reducer arguments. +/// +/// Returns `Ok(())` when the remote reducer ran and succeeded. +/// Returns `Err(RemoteCallError::Failed(msg))` when the reducer ran but returned an error. +/// Returns `Err(RemoteCallError::NotFound(msg))` when the database or reducer does not exist. +/// Returns `Err(RemoteCallError::Unreachable(msg))` on transport failure (connection refused, timeout, …). +pub fn call_reducer_on_db(database_identity: Identity, reducer_name: &str, args: &[u8]) -> Result<(), RemoteCallError> { + let identity_bytes = database_identity.to_byte_array(); + match spacetimedb_bindings_sys::call_reducer_on_db(identity_bytes, reducer_name, args) { + Ok((status, body_source)) => { + if status < 300 { + return Ok(()); + } + // Decode the response body as the error message. + let msg = if body_source == spacetimedb_bindings_sys::raw::BytesSource::INVALID { + String::new() + } else { + let mut buf = IterBuf::take(); + read_bytes_source_into(body_source, &mut buf); + String::from_utf8_lossy(&buf).into_owned() + }; + if status == 404 { + Err(RemoteCallError::NotFound(msg)) + } else { + Err(RemoteCallError::Failed(msg)) + } + } + Err(err_source) => { + use crate::rt::read_bytes_source_as; + let msg = read_bytes_source_as::(err_source); + Err(RemoteCallError::Unreachable(msg)) + } + } +} diff --git a/crates/client-api/src/routes/database.rs b/crates/client-api/src/routes/database.rs index 6b753a9c8fd..881abcc0e18 100644 --- a/crates/client-api/src/routes/database.rs +++ b/crates/client-api/src/routes/database.rs @@ -136,13 +136,11 @@ pub async fn call( reducer, }): Path, TypedHeader(content_type): TypedHeader, - ByteStringBody(body): ByteStringBody, + body: Bytes, ) -> axum::response::Result { - assert_content_type_json(content_type)?; - let caller_identity = auth.claims.identity; - let args = FunctionArgs::Json(body); + let args = parse_call_args(content_type, body)?; // HTTP callers always need a connection ID to provide to connect/disconnect, // so generate one. @@ -216,11 +214,19 @@ pub async fn call( } } -fn assert_content_type_json(content_type: headers::ContentType) -> axum::response::Result<()> { - if content_type != headers::ContentType::json() { - Err(axum::extract::rejection::MissingJsonContentType::default().into()) +/// Parse call arguments from an HTTP body based on content type. +/// +/// - `application/json` → [`FunctionArgs::Json`] (UTF-8 required). +/// - `application/octet-stream` → [`FunctionArgs::Bsatn`] (raw BSATN bytes). +fn parse_call_args(content_type: headers::ContentType, body: Bytes) -> axum::response::Result { + if content_type == headers::ContentType::json() { + let s = bytestring::ByteString::try_from(body) + .map_err(|_| (StatusCode::BAD_REQUEST, "request body is not valid UTF-8").into_response())?; + Ok(FunctionArgs::Json(s)) + } else if content_type == headers::ContentType::from(mime::APPLICATION_OCTET_STREAM) { + Ok(FunctionArgs::Bsatn(body)) } else { - Ok(()) + Err(axum::extract::rejection::MissingJsonContentType::default().into()) } } diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index e67e67540eb..dd774111b86 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -9,11 +9,12 @@ use crate::db::persistence::PersistenceProvider; use crate::db::relational_db::{self, spawn_view_cleanup_loop, DiskSizeFn, RelationalDB, Txdata}; use crate::db::{self, spawn_tx_metrics_recorder}; use crate::energy::{EnergyMonitor, EnergyQuanta, NullEnergyMonitor}; +use crate::host::reducer_router::{LocalReducerRouter, ReducerCallRouter}; use crate::host::v8::V8Runtime; use crate::host::ProcedureCallError; use crate::messages::control_db::{Database, HostType}; use crate::module_host_context::ModuleCreationContext; -use crate::replica_context::ReplicaContext; +use crate::replica_context::{CallReducerOnDbConfig, ReplicaContext}; use crate::subscription::module_subscription_actor::ModuleSubscriptions; use crate::subscription::module_subscription_manager::{spawn_send_worker, SubscriptionManager, TransactionOffset}; use crate::subscription::row_list_builder_pool::BsatnRowListBuilderPool; @@ -117,6 +118,26 @@ pub struct HostController { db_cores: JobCores, /// The pool of buffers used to build `BsatnRowList`s in subscriptions. pub bsatn_rlb_pool: BsatnRowListBuilderPool, + /// Warmed HTTP/2 client shared by all replicas on this host for + /// [`crate::host::instance_env::InstanceEnv::call_reducer_on_db`]. + /// + /// All per-replica clones share the same underlying connection pool. + pub call_reducer_client: reqwest::Client, + /// Router that resolves the HTTP base URL of the leader node for a given database. + /// + /// Set to [`LocalReducerRouter`] by default; replaced with `ClusterReducerRouter` + /// in cluster deployments via [`HostController::new`] receiving the router directly. + pub call_reducer_router: Arc, + /// A single node-level Bearer token included in all outgoing cross-DB reducer calls. + /// + /// Set once at node startup by the deployment layer (standalone / cluster) so that + /// `anon_auth_middleware` on the target node accepts the request without generating a + /// fresh ephemeral identity on every call. All replicas on this node share the same + /// token — the target only needs proof that the caller is a legitimate node, not which + /// specific database initiated the call. + /// + /// `None` in test/embedded contexts where no JWT signer is configured. + pub call_reducer_auth_token: Option, } pub(crate) struct HostRuntimes { @@ -228,6 +249,9 @@ impl HostController { page_pool: PagePool::new(default_config.page_pool_max_size), bsatn_rlb_pool: BsatnRowListBuilderPool::new(), db_cores, + call_reducer_client: ReplicaContext::new_call_reducer_client(&CallReducerOnDbConfig::default()), + call_reducer_router: Arc::new(LocalReducerRouter::new("http://127.0.0.1:3000")), + call_reducer_auth_token: None, } } @@ -658,12 +682,16 @@ fn stored_program_hash(db: &RelationalDB) -> anyhow::Result> { Ok(meta.map(|meta| meta.program_hash)) } +#[allow(clippy::too_many_arguments)] async fn make_replica_ctx( module_logs: Option, database: Database, replica_id: u64, relational_db: Arc, bsatn_rlb_pool: BsatnRowListBuilderPool, + call_reducer_client: reqwest::Client, + call_reducer_router: Arc, + call_reducer_auth_token: Option, ) -> anyhow::Result { let logger = match module_logs { Some(path) => asyncify(move || Arc::new(DatabaseLogger::open_today(path))).await, @@ -696,6 +724,9 @@ async fn make_replica_ctx( replica_id, logger, subscriptions, + call_reducer_client, + call_reducer_router, + call_reducer_auth_token, }) } @@ -771,6 +802,9 @@ struct ModuleLauncher { runtimes: Arc, core: AllocatedJobCore, bsatn_rlb_pool: BsatnRowListBuilderPool, + call_reducer_client: reqwest::Client, + call_reducer_router: Arc, + call_reducer_auth_token: Option, } impl ModuleLauncher { @@ -790,6 +824,9 @@ impl ModuleLauncher { self.replica_id, self.relational_db, self.bsatn_rlb_pool, + self.call_reducer_client, + self.call_reducer_router, + self.call_reducer_auth_token, ) .await .map(Arc::new)?; @@ -991,6 +1028,9 @@ impl Host { runtimes: runtimes.clone(), core: host_controller.db_cores.take(), bsatn_rlb_pool: bsatn_rlb_pool.clone(), + call_reducer_client: host_controller.call_reducer_client.clone(), + call_reducer_router: host_controller.call_reducer_router.clone(), + call_reducer_auth_token: host_controller.call_reducer_auth_token.clone(), } .launch_module() .await? @@ -1020,6 +1060,9 @@ impl Host { runtimes: runtimes.clone(), core: host_controller.db_cores.take(), bsatn_rlb_pool: bsatn_rlb_pool.clone(), + call_reducer_client: host_controller.call_reducer_client.clone(), + call_reducer_router: host_controller.call_reducer_router.clone(), + call_reducer_auth_token: host_controller.call_reducer_auth_token.clone(), } .launch_module() .await; @@ -1043,6 +1086,9 @@ impl Host { runtimes: runtimes.clone(), core: host_controller.db_cores.take(), bsatn_rlb_pool: bsatn_rlb_pool.clone(), + call_reducer_client: host_controller.call_reducer_client.clone(), + call_reducer_router: host_controller.call_reducer_router.clone(), + call_reducer_auth_token: host_controller.call_reducer_auth_token.clone(), } .launch_module() .await; @@ -1150,6 +1196,10 @@ impl Host { runtimes: runtimes.clone(), core, bsatn_rlb_pool, + // Transient validation-only module; build its own client and router with defaults. + call_reducer_client: ReplicaContext::new_call_reducer_client(&CallReducerOnDbConfig::default()), + call_reducer_router: Arc::new(LocalReducerRouter::new("http://127.0.0.1:3000")), + call_reducer_auth_token: None, } .launch_module() .await diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index 311c34775fd..1fdc651414e 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -8,6 +8,7 @@ use crate::replica_context::ReplicaContext; use crate::subscription::module_subscription_actor::{commit_and_broadcast_event, ModuleSubscriptions}; use crate::subscription::module_subscription_manager::{from_tx_offset, TransactionOffset}; use crate::util::prometheus_handle::IntGaugeExt; +use crate::worker_metrics::WORKER_METRICS; use chrono::{DateTime, Utc}; use core::mem; use futures::TryFutureExt; @@ -977,6 +978,73 @@ impl InstanceEnv { Ok((response, body)) }) } + + /// Call a reducer on a remote database. + /// + /// Unlike [`Self::http_request`], this is explicitly allowed while a transaction is open — + /// the caller is responsible for understanding the consistency implications. + /// + /// Uses [`ReplicaContext::call_reducer_router`] to resolve the leader node for + /// `database_identity`, then sends the request via the warmed HTTP client in + /// [`ReplicaContext::call_reducer_client`]. + /// + /// Returns `(http_status, response_body)` on transport success, + /// or [`NodesError::HttpError`] if the connection itself fails. + pub fn call_reducer_on_db( + &self, + database_identity: Identity, + reducer_name: &str, + args: bytes::Bytes, + ) -> impl Future> + use<> { + let client = self.replica_ctx.call_reducer_client.clone(); + let router = self.replica_ctx.call_reducer_router.clone(); + let reducer_name = reducer_name.to_owned(); + // Node-level auth token: a single token minted at startup and shared by all replicas + // on this node. Passed as a Bearer token so `anon_auth_middleware` on the target node + // accepts the request without generating a fresh ephemeral identity per call. + let auth_token = self.replica_ctx.call_reducer_auth_token.clone(); + let caller_identity = self.replica_ctx.database.database_identity; + + async move { + let start = Instant::now(); + + let base_url = router + .resolve_base_url(database_identity) + .await + .map_err(|e| NodesError::HttpError(e.to_string()))?; + let url = format!( + "{}/v1/database/{}/call/{}", + base_url, + database_identity.to_hex(), + reducer_name, + ); + let mut req = client + .post(&url) + .header(http::header::CONTENT_TYPE, "application/octet-stream") + .body(args); + if let Some(token) = auth_token { + req = req.header(http::header::AUTHORIZATION, format!("Bearer {token}")); + } + let result = async { + let response = req.send().await.map_err(|e| NodesError::HttpError(e.to_string()))?; + let status = response.status().as_u16(); + let body = response.bytes().await.map_err(|e| NodesError::HttpError(e.to_string()))?; + Ok((status, body)) + } + .await; + + WORKER_METRICS + .cross_db_reducer_calls_total + .with_label_values(&caller_identity) + .inc(); + WORKER_METRICS + .cross_db_reducer_duration_seconds + .with_label_values(&caller_identity) + .observe(start.elapsed().as_secs_f64()); + + result + } + } } /// Default timeout for HTTP requests performed by [`InstanceEnv::http_request`]. @@ -1315,9 +1383,9 @@ mod test { tests_utils::{begin_mut_tx, with_auto_commit, with_read_only, TestDB}, RelationalDB, }, - host::Scheduler, + host::{reducer_router::LocalReducerRouter, Scheduler}, messages::control_db::{Database, HostType}, - replica_context::ReplicaContext, + replica_context::{CallReducerOnDbConfig, ReplicaContext}, subscription::module_subscription_actor::ModuleSubscriptions, }; use anyhow::{anyhow, Result}; @@ -1351,6 +1419,9 @@ mod test { replica_id: 0, logger, subscriptions: subs, + call_reducer_client: ReplicaContext::new_call_reducer_client(&CallReducerOnDbConfig::default()), + call_reducer_router: Arc::new(LocalReducerRouter::new("http://127.0.0.1:3000")), + call_reducer_auth_token: None, }, runtime, )) diff --git a/crates/core/src/host/mod.rs b/crates/core/src/host/mod.rs index 0daa9c359bc..25e56ca217e 100644 --- a/crates/core/src/host/mod.rs +++ b/crates/core/src/host/mod.rs @@ -20,6 +20,7 @@ pub mod wasmtime; // Visible for integration testing. pub mod instance_env; +pub mod reducer_router; pub mod v8; // only pub for testing mod wasm_common; @@ -194,4 +195,6 @@ pub enum AbiCall { ProcedureCommitMutTransaction, ProcedureAbortMutTransaction, ProcedureHttpRequest, + + CallReducerOnDb, } diff --git a/crates/core/src/host/reducer_router.rs b/crates/core/src/host/reducer_router.rs new file mode 100644 index 00000000000..dcbf20c51c8 --- /dev/null +++ b/crates/core/src/host/reducer_router.rs @@ -0,0 +1,63 @@ +/// Trait for resolving which node to contact when calling a reducer on another database. +/// +/// Implementations differ between deployment modes: +/// +/// - **Standalone / single-node** — [`LocalReducerRouter`] always returns the local node's +/// HTTP base URL. Every database is on the same node, so there is nothing to resolve. +/// +/// - **Cluster / multi-node** — `ClusterReducerRouter` (private crate) queries the control DB +/// to discover the leader replica's node and returns that node's advertise address. +/// +/// The trait is object-safe (futures are boxed) so it can be stored as `Arc` +/// in [`crate::replica_context::ReplicaContext`] and swapped at startup. +use spacetimedb_lib::Identity; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; + +pub type BoxFuture<'a, T> = Pin + Send + 'a>>; + +pub trait ReducerCallRouter: Send + Sync + 'static { + /// Return the HTTP base URL (e.g. `"http://10.0.0.5:3000"`) of the node that + /// is the leader for `database_identity`. + /// + /// The caller appends `/v1/database/{identity}/call/{reducer}` to produce the full URL. + /// + /// # Errors + /// + /// Returns an error string when the leader cannot be resolved + /// (database not found, no leader elected yet, node has no network address, etc.). + fn resolve_base_url<'a>(&'a self, database_identity: Identity) -> BoxFuture<'a, anyhow::Result>; +} + +// Arc is itself a ReducerCallRouter. +impl ReducerCallRouter for Arc { + fn resolve_base_url<'a>(&'a self, database_identity: Identity) -> BoxFuture<'a, anyhow::Result> { + (**self).resolve_base_url(database_identity) + } +} + +/// Single-node implementation of [`ReducerCallRouter`]. +/// +/// Always routes to the same fixed base URL regardless of which database is targeted. +/// Suitable for standalone (single-node) deployments where every database is local. +/// +/// For cluster deployments, replace this with `ClusterReducerRouter` from the private crate. +pub struct LocalReducerRouter { + pub base_url: String, +} + +impl LocalReducerRouter { + pub fn new(base_url: impl Into) -> Self { + Self { + base_url: base_url.into(), + } + } +} + +impl ReducerCallRouter for LocalReducerRouter { + fn resolve_base_url<'a>(&'a self, _database_identity: Identity) -> BoxFuture<'a, anyhow::Result> { + let url = self.base_url.clone(); + Box::pin(async move { Ok(url) }) + } +} diff --git a/crates/core/src/host/wasm_common.rs b/crates/core/src/host/wasm_common.rs index a5c737d54d6..b5bba032d7c 100644 --- a/crates/core/src/host/wasm_common.rs +++ b/crates/core/src/host/wasm_common.rs @@ -433,6 +433,13 @@ macro_rules! abi_funcs { } + $link_sync! { + // Call a reducer on another database while holding (or not holding) a transaction. + // Implemented as a sync host function (using block_in_place) so it can be called + // from within a reducer body where only synchronous host functions are allowed. + "spacetime_10.5"::call_reducer_on_db, + } + $link_async! { "spacetime_10.3"::procedure_sleep_until, "spacetime_10.3"::procedure_http_request, diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index 74a57b35e92..f24b8a4f3dc 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -17,7 +17,7 @@ use crate::subscription::module_subscription_manager::TransactionOffset; use anyhow::{anyhow, Context as _}; use spacetimedb_data_structures::map::IntMap; use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, MutTxId, ViewCallInfo}; -use spacetimedb_lib::{bsatn, ConnectionId, Timestamp}; +use spacetimedb_lib::{bsatn, ConnectionId, Identity, Timestamp}; use spacetimedb_primitives::errno::HOST_CALL_FAILURE; use spacetimedb_primitives::{errno, ColId}; use spacetimedb_schema::def::ModuleDef; @@ -1943,6 +1943,86 @@ impl WasmInstanceEnv { ) }) } + + /// Call a reducer on another SpacetimeDB database. + /// + /// - `identity_ptr` must point to exactly 32 bytes — the BSATN (little-endian) encoding of the + /// target [`Identity`]. + /// - `reducer_ptr[..reducer_len]` is the UTF-8 name of the reducer to call. + /// - `args_ptr[..args_len]` is the BSATN-encoded reducer arguments. + /// + /// On transport success (any HTTP response received from the server): + /// - Returns the HTTP status code (e.g. 200, 400, 530). + /// - Writes a [`BytesSource`] containing the response body bytes to `*out`. + /// + /// On transport failure (connection refused, timeout, etc.): + /// - Returns [`errno::HTTP_ERROR`]. + /// - Writes a [`BytesSource`] containing a BSATN-encoded error [`String`] to `*out`. + /// + /// Unlike [`Self::procedure_http_request`], this ABI may be called while holding + /// an open transaction (i.e. from within a reducer body). + /// + /// # Traps + /// + /// Traps if any pointer is NULL or its range falls outside of linear memory. + pub fn call_reducer_on_db( + caller: Caller<'_, Self>, + identity_ptr: WasmPtr, + reducer_ptr: WasmPtr, + reducer_len: u32, + args_ptr: WasmPtr, + args_len: u32, + out: WasmPtr, + ) -> RtResult { + Self::cvt_custom(caller, AbiCall::CallReducerOnDb, |caller| { + let (mem, env) = Self::mem_env(caller); + + // Read the 32-byte BSATN-encoded Identity (little-endian). + let identity_slice = mem.deref_slice(identity_ptr, 32)?; + let identity_bytes: [u8; 32] = identity_slice + .try_into() + .expect("deref_slice(ptr, 32) always yields exactly 32 bytes"); + let database_identity = Identity::from_byte_array(identity_bytes); + + // Read the reducer name as a UTF-8 string. + let reducer_name = mem.deref_str(reducer_ptr, reducer_len)?.to_owned(); + + // Read the BSATN-encoded args as raw bytes. + let args_buf = mem.deref_slice(args_ptr, args_len)?; + let args = bytes::Bytes::copy_from_slice(args_buf); + + // Reducers run inside a tokio LocalSet (single-threaded), so block_in_place + // is unavailable and futures::executor::block_on can't drive tokio I/O. + // Spawn a new OS thread and call Handle::block_on from there, which is + // designed to be called from synchronous (non-async) contexts. + let handle = tokio::runtime::Handle::current(); + let fut = env + .instance_env + .call_reducer_on_db(database_identity, &reducer_name, args); + let result = std::thread::scope(|s| { + s.spawn(|| handle.block_on(fut)) + .join() + .expect("call_reducer_on_db: worker thread panicked") + }); + + match result { + Ok((status, body)) => { + let bytes_source = WasmInstanceEnv::create_bytes_source(env, body)?; + bytes_source.0.write_to(mem, out)?; + Ok(status as u32) + } + Err(NodesError::HttpError(err)) => { + let err_bytes = bsatn::to_vec(&err).with_context(|| { + format!("Failed to BSATN-serialize call_reducer_on_db transport error: {err:?}") + })?; + let bytes_source = WasmInstanceEnv::create_bytes_source(env, err_bytes.into())?; + bytes_source.0.write_to(mem, out)?; + Ok(errno::HTTP_ERROR.get() as u32) + } + Err(e) => Err(WasmError::Db(e)), + } + }) + } } type Fut<'caller, T> = Box>; diff --git a/crates/core/src/host/wasmtime/wasmtime_module.rs b/crates/core/src/host/wasmtime/wasmtime_module.rs index 48ac0fe80e2..0690120eb16 100644 --- a/crates/core/src/host/wasmtime/wasmtime_module.rs +++ b/crates/core/src/host/wasmtime/wasmtime_module.rs @@ -50,7 +50,7 @@ impl WasmtimeModule { WasmtimeModule { module } } - pub const IMPLEMENTED_ABI: abi::VersionTuple = abi::VersionTuple::new(10, 4); + pub const IMPLEMENTED_ABI: abi::VersionTuple = abi::VersionTuple::new(10, 5); pub(super) fn link_imports(linker: &mut Linker) -> anyhow::Result<()> { const { assert!(WasmtimeModule::IMPLEMENTED_ABI.major == spacetimedb_lib::MODULE_ABI_MAJOR_VERSION) }; diff --git a/crates/core/src/replica_context.rs b/crates/core/src/replica_context.rs index 731288bb459..8c9f8804f24 100644 --- a/crates/core/src/replica_context.rs +++ b/crates/core/src/replica_context.rs @@ -3,14 +3,43 @@ use spacetimedb_commitlog::SizeOnDisk; use super::database_logger::DatabaseLogger; use crate::db::relational_db::RelationalDB; use crate::error::DBError; +use crate::host::reducer_router::ReducerCallRouter; use crate::messages::control_db::Database; use crate::subscription::module_subscription_actor::ModuleSubscriptions; use std::io; use std::ops::Deref; use std::sync::Arc; +use std::time::Duration; pub type Result = anyhow::Result; +/// Configuration for the HTTP/2 client used to call reducers on remote databases. +/// +/// Pass to [`ReplicaContext::new_call_reducer_client`] or supply directly when +/// constructing [`ReplicaContext`]. +#[derive(Debug, Clone)] +pub struct CallReducerOnDbConfig { + /// How long idle connections are held open. Default: 90 s. + pub pool_idle_timeout: Duration, + /// Max idle connections per host. Default: 8. + pub pool_max_idle_per_host: usize, + /// TCP keepalive sent to the OS. Default: 20 s. + pub tcp_keepalive: Duration, + /// Per-request timeout. Default: 30 s. + pub request_timeout: Duration, +} + +impl Default for CallReducerOnDbConfig { + fn default() -> Self { + Self { + pool_idle_timeout: Duration::from_secs(90), + pool_max_idle_per_host: 8, + tcp_keepalive: Duration::from_secs(20), + request_timeout: Duration::from_secs(30), + } + } +} + /// A "live" database. #[derive(Clone)] pub struct ReplicaContext { @@ -18,6 +47,36 @@ pub struct ReplicaContext { pub replica_id: u64, pub logger: Arc, pub subscriptions: ModuleSubscriptions, + /// Warmed HTTP/2 client for [`crate::host::instance_env::InstanceEnv::call_reducer_on_db`]. + /// + /// `reqwest::Client` is internally an `Arc`, so cloning `ReplicaContext` shares the pool. + pub call_reducer_client: reqwest::Client, + /// Resolves the HTTP base URL of the leader node for a given database identity. + /// + /// - Standalone: always returns the local node URL ([`crate::host::reducer_router::LocalReducerRouter`]). + /// - Cluster: queries the control DB to find the leader replica's node. + pub call_reducer_router: Arc, + /// `Authorization: Bearer ` value for outgoing cross-DB reducer calls. + /// + /// A single node-level token set once at startup and shared by all replicas on this node. + /// Passed as a Bearer token so `anon_auth_middleware` on the target node accepts the request + /// without generating a fresh ephemeral identity per call. + /// + /// `None` in contexts where no auth token is configured (e.g. unit tests). + pub call_reducer_auth_token: Option, +} + +impl ReplicaContext { + /// Build a warmed `reqwest::Client` from `config`. + pub fn new_call_reducer_client(config: &CallReducerOnDbConfig) -> reqwest::Client { + reqwest::Client::builder() + .tcp_keepalive(config.tcp_keepalive) + .pool_idle_timeout(config.pool_idle_timeout) + .pool_max_idle_per_host(config.pool_max_idle_per_host) + .timeout(config.request_timeout) + .build() + .expect("failed to build call_reducer_on_db HTTP client") + } } impl ReplicaContext { diff --git a/crates/core/src/worker_metrics/mod.rs b/crates/core/src/worker_metrics/mod.rs index 6421a95d7cb..5b84e230045 100644 --- a/crates/core/src/worker_metrics/mod.rs +++ b/crates/core/src/worker_metrics/mod.rs @@ -43,6 +43,16 @@ metrics_group!( #[labels(database_identity: Identity, protocol: str)] pub websocket_request_msg_size: HistogramVec, + #[name = spacetime_cross_db_reducer_calls_total] + #[help = "Total number of cross-database reducer calls made by this database."] + #[labels(caller_identity: Identity)] + pub cross_db_reducer_calls_total: IntCounterVec, + + #[name = spacetime_cross_db_reducer_duration_seconds] + #[help = "Duration of cross-database reducer calls in seconds."] + #[labels(caller_identity: Identity)] + pub cross_db_reducer_duration_seconds: HistogramVec, + #[name = jemalloc_active_bytes] #[help = "Number of bytes in jemallocs heap"] #[labels(node_id: str)] diff --git a/crates/smoketests/tests/smoketests/cross_db_reducer.rs b/crates/smoketests/tests/smoketests/cross_db_reducer.rs new file mode 100644 index 00000000000..22d4a94b9f1 --- /dev/null +++ b/crates/smoketests/tests/smoketests/cross_db_reducer.rs @@ -0,0 +1,108 @@ +use spacetimedb_smoketests::Smoketest; + +/// Module code used for both the "receiver" and "caller" databases. +/// +/// - `record_ping(payload)` is called by the caller via `call_reducer_on_db` and stores the +/// payload fields in `ping_log`. +/// - `call_remote(target, payload)` is the entry point: it BSATN-encodes `payload` and invokes +/// `record_ping` on `target` over the cross-DB ABI. +const MODULE_CODE: &str = r#" +use spacetimedb::{log, ReducerContext, Table, Identity, SpacetimeType}; + +/// A structured ping payload — used to exercise BSATN encoding of a multi-field struct. +#[derive(SpacetimeType)] +pub struct PingPayload { + pub message: String, + pub priority: u32, +} + +#[spacetimedb::table(accessor = ping_log, public)] +pub struct PingLog { + #[primary_key] + #[auto_inc] + id: u64, + message: String, + priority: u32, +} + +/// Writes one row to `ping_log` from the payload. Called via the cross-DB ABI. +#[spacetimedb::reducer] +pub fn record_ping(ctx: &ReducerContext, payload: PingPayload) { + log::info!("record_ping: got message={} priority={}", payload.message, payload.priority); + ctx.db.ping_log().insert(PingLog { id: 0, message: payload.message, priority: payload.priority }); +} + +/// Calls `record_ping(payload)` on `target_hex` via the cross-database ABI. +/// +/// `target_hex` is the hex-encoded identity of the target database. +/// Args are BSATN-encoded as a 1-tuple `(payload,)`. +#[spacetimedb::reducer] +pub fn call_remote(ctx: &ReducerContext, target_hex: String, message: String, priority: u32) { + let target = Identity::from_hex(&target_hex).expect("invalid target identity hex"); + let payload = PingPayload { message, priority }; + let args = spacetimedb::spacetimedb_lib::bsatn::to_vec(&(payload,)).expect("failed to encode args"); + match spacetimedb::remote_reducer::call_reducer_on_db(target, "record_ping", &args) { + Ok(()) => { + log::info!("call_remote: remote reducer succeeded"); + } + Err(e) => { + log::error!("call_remote: {}", e); + panic!("call_reducer_on_db error: {e}"); + } + } +} +"#; + +/// Smoke test for the cross-database reducer call ABI. +/// +/// Publishes the same module twice on one server, then calls `call_remote` on the +/// "caller" database with the "receiver" database's identity as an argument. +/// Passes a structured `PingPayload` (message + priority) to exercise multi-field +/// BSATN encoding over the cross-DB boundary. +/// Verifies that `receiver` has the expected row in `ping_log`. +#[test] +fn test_cross_db_reducer_call() { + let pid = std::process::id(); + let receiver_name = format!("cross-db-receiver-{pid}"); + let caller_name = format!("cross-db-caller-{pid}"); + + // Build one server with the shared module code. + let mut test = Smoketest::builder().module_code(MODULE_CODE).autopublish(false).build(); + + // Publish the receiver database first. + test.publish_module_named(&receiver_name, false) + .expect("failed to publish receiver module"); + let receiver_identity = test + .database_identity + .clone() + .expect("receiver database_identity not set after publish"); + + // Publish the caller database (same code, different name). + test.publish_module_named(&caller_name, false) + .expect("failed to publish caller module"); + // test.database_identity is now caller_name — calls/sql default to caller. + + // Invoke call_remote on the caller, passing the receiver's identity, message, and priority. + test.call("call_remote", &[&receiver_identity, "hello from caller", "42"]) + .expect("call_remote failed"); + + // Verify that the receiver's ping_log has the expected row. + let result = test + .spacetime(&[ + "sql", + "--server", + &test.server_url, + &receiver_identity, + "SELECT message, priority FROM ping_log", + ]) + .expect("sql query failed"); + + assert!( + result.contains("hello from caller"), + "Expected ping_log to contain 'hello from caller' after cross-DB call, got:\n{result}" + ); + assert!( + result.contains("42"), + "Expected ping_log to contain priority 42 after cross-DB call, got:\n{result}" + ); +} diff --git a/crates/smoketests/tests/smoketests/mod.rs b/crates/smoketests/tests/smoketests/mod.rs index f5053652dd3..18ad7b51199 100644 --- a/crates/smoketests/tests/smoketests/mod.rs +++ b/crates/smoketests/tests/smoketests/mod.rs @@ -9,6 +9,7 @@ mod client_connection_errors; mod confirmed_reads; mod connect_disconnect_from_cli; mod create_project; +mod cross_db_reducer; mod csharp_module; mod default_module_clippy; mod delete_database; diff --git a/crates/standalone/src/lib.rs b/crates/standalone/src/lib.rs index 3343a9292b1..3b97c6d46fd 100644 --- a/crates/standalone/src/lib.rs +++ b/crates/standalone/src/lib.rs @@ -14,7 +14,9 @@ use spacetimedb::config::{CertificateAuthority, MetadataFile, V8HeapPolicyConfig use spacetimedb::db; use spacetimedb::db::persistence::LocalPersistenceProvider; use spacetimedb::energy::{EnergyBalance, EnergyQuanta, NullEnergyMonitor}; -use spacetimedb::host::{DiskStorage, HostController, MigratePlanResult, UpdateDatabaseResult}; +use spacetimedb::host::{ + reducer_router::LocalReducerRouter, DiskStorage, HostController, MigratePlanResult, UpdateDatabaseResult, +}; use spacetimedb::identity::{AuthCtx, Identity}; use spacetimedb::messages::control_db::{Database, Node, Replica}; use spacetimedb::subscription::row_list_builder_pool::BsatnRowListBuilderPool; @@ -38,11 +40,15 @@ use std::time::Duration; pub use spacetimedb_client_api::routes::subscribe::{BIN_PROTOCOL, TEXT_PROTOCOL}; -#[derive(Clone, Copy)] +#[derive(Clone)] pub struct StandaloneOptions { pub db_config: db::Config, pub websocket: WebSocketOptions, pub v8_heap_policy: V8HeapPolicyConfig, + /// HTTP base URL of this node's API server (e.g. `"http://127.0.0.1:3000"`). + /// Used to configure the `LocalReducerRouter` so that cross-DB reducer calls + /// reach the correct address when the server listens on a dynamic port. + pub local_api_url: String, } pub struct StandaloneEnv { @@ -76,7 +82,7 @@ impl StandaloneEnv { let program_store = Arc::new(DiskStorage::new(data_dir.program_bytes().0).await?); let persistence_provider = Arc::new(LocalPersistenceProvider::new(data_dir.clone())); - let host_controller = HostController::new( + let mut host_controller = HostController::new( data_dir, config.db_config, config.v8_heap_policy, @@ -85,6 +91,7 @@ impl StandaloneEnv { persistence_provider, db_cores, ); + host_controller.call_reducer_router = Arc::new(LocalReducerRouter::new(config.local_api_url)); let client_actor_index = ClientActorIndex::new(); let jwt_keys = certs.get_or_create_keys()?; @@ -651,9 +658,10 @@ mod tests { }, websocket: WebSocketOptions::default(), v8_heap_policy: V8HeapPolicyConfig::default(), + local_api_url: "http://127.0.0.1:3000".to_owned(), }; - let _env = StandaloneEnv::init(config, &ca, data_dir.clone(), JobCores::without_pinned_cores()).await?; + let _env = StandaloneEnv::init(config.clone(), &ca, data_dir.clone(), JobCores::without_pinned_cores()).await?; // Ensure that we have a lock. assert!( StandaloneEnv::init(config, &ca, data_dir.clone(), JobCores::without_pinned_cores()) diff --git a/crates/standalone/src/subcommands/start.rs b/crates/standalone/src/subcommands/start.rs index 399b7e9a350..30eb64c5a6c 100644 --- a/crates/standalone/src/subcommands/start.rs +++ b/crates/standalone/src/subcommands/start.rs @@ -177,29 +177,6 @@ pub async fn exec(args: &ArgMatches, db_cores: JobCores) -> anyhow::Result<()> { .or_else(|| cert_dir.map(CertificateAuthority::in_cli_config_dir)) .context("cannot omit --jwt-{pub,priv}-key-path when those options are not specified in config.toml")?; - let data_dir = Arc::new(data_dir.clone()); - let ctx = StandaloneEnv::init( - StandaloneOptions { - db_config, - websocket: config.websocket, - v8_heap_policy: config.common.v8_heap_policy, - }, - &certs, - data_dir, - db_cores, - ) - .await?; - worker_metrics::spawn_jemalloc_stats(listen_addr.clone()); - worker_metrics::spawn_tokio_stats(listen_addr.clone()); - worker_metrics::spawn_page_pool_stats(listen_addr.clone(), ctx.page_pool().clone()); - worker_metrics::spawn_bsatn_rlb_pool_stats(listen_addr.clone(), ctx.bsatn_rlb_pool().clone()); - let mut db_routes = DatabaseRoutes::default(); - db_routes.root_post = db_routes.root_post.layer(DefaultBodyLimit::disable()); - db_routes.db_put = db_routes.db_put.layer(DefaultBodyLimit::disable()); - db_routes.pre_publish = db_routes.pre_publish.layer(DefaultBodyLimit::disable()); - let extra = axum::Router::new().nest("/health", spacetimedb_client_api::routes::health::router()); - let service = router(&ctx, db_routes, IdentityRoutes::default(), extra).with_state(ctx.clone()); - // Check if the requested port is available on both IPv4 and IPv6. // If not, offer to find an available port by incrementing (unless non-interactive). let listen_addr = if let Some((host, port_str)) = listen_addr.rsplit_once(':') { @@ -251,6 +228,31 @@ pub async fn exec(args: &ArgMatches, db_cores: JobCores) -> anyhow::Result<()> { socket2::SockRef::from(&tcp).set_nodelay(true)?; log::info!("Starting SpacetimeDB listening on {}", tcp.local_addr()?); + let local_port = tcp.local_addr()?.port(); + let data_dir = Arc::new(data_dir.clone()); + let ctx = StandaloneEnv::init( + StandaloneOptions { + db_config, + websocket: config.websocket, + v8_heap_policy: config.common.v8_heap_policy, + local_api_url: format!("http://127.0.0.1:{local_port}"), + }, + &certs, + data_dir, + db_cores, + ) + .await?; + worker_metrics::spawn_jemalloc_stats(listen_addr.clone()); + worker_metrics::spawn_tokio_stats(listen_addr.clone()); + worker_metrics::spawn_page_pool_stats(listen_addr.clone(), ctx.page_pool().clone()); + worker_metrics::spawn_bsatn_rlb_pool_stats(listen_addr.clone(), ctx.bsatn_rlb_pool().clone()); + let mut db_routes = DatabaseRoutes::default(); + db_routes.root_post = db_routes.root_post.layer(DefaultBodyLimit::disable()); + db_routes.db_put = db_routes.db_put.layer(DefaultBodyLimit::disable()); + db_routes.pre_publish = db_routes.pre_publish.layer(DefaultBodyLimit::disable()); + let extra = axum::Router::new().nest("/health", spacetimedb_client_api::routes::health::router()); + let service = router(&ctx, db_routes, IdentityRoutes::default(), extra).with_state(ctx.clone()); + if let Some(pg_port) = pg_port { let server_addr = listen_addr.split(':').next().unwrap(); let tcp_pg = TcpListener::bind(format!("{server_addr}:{pg_port}")).await.context(format!( diff --git a/crates/testing/src/modules.rs b/crates/testing/src/modules.rs index 318fff8d0cf..0721409c64b 100644 --- a/crates/testing/src/modules.rs +++ b/crates/testing/src/modules.rs @@ -202,6 +202,8 @@ impl CompiledModule { db_config: config, websocket: WebSocketOptions::default(), v8_heap_policy: Default::default(), + // Tests use internal routing; cross-DB HTTP calls aren't tested here. + local_api_url: "http://127.0.0.1:3000".to_owned(), }, &certs, paths.data_dir.into(),