From 8203605e2507ecb620034efcc5548cc708cbe587 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Sat, 28 Mar 2026 01:12:39 +0530 Subject: [PATCH 1/9] abi --- crates/bindings-sys/src/lib.rs | 33 ++++++++ crates/client-api/src/routes/database.rs | 22 +++-- crates/core/src/host/host_controller.rs | 17 +++- crates/core/src/host/instance_env.rs | 45 +++++++++- crates/core/src/host/mod.rs | 2 + crates/core/src/host/wasm_common.rs | 4 + .../src/host/wasmtime/wasm_instance_env.rs | 84 ++++++++++++++++++- .../core/src/host/wasmtime/wasmtime_module.rs | 2 +- crates/core/src/replica_context.rs | 46 ++++++++++ 9 files changed, 243 insertions(+), 12 deletions(-) diff --git a/crates/bindings-sys/src/lib.rs b/crates/bindings-sys/src/lib.rs index 95dfbc7e600..79835e120a3 100644 --- a/crates/bindings-sys/src/lib.rs +++ b/crates/bindings-sys/src/lib.rs @@ -865,6 +865,39 @@ pub mod raw { ) -> u16; } + #[link(wasm_import_module = "spacetime_10.5")] + unsafe extern "C" { + /// Call a reducer on another SpacetimeDB database via the local reverse proxy at `localhost:80`. + /// + /// - `identity_ptr` must point to exactly 32 bytes — the BSATN (little-endian) encoding of + /// the target database `Identity`. + /// - `reducer_ptr[..reducer_len]` is the UTF-8 name of the reducer to call. + /// - `args_ptr[..args_len]` is the BSATN-encoded reducer arguments. + /// + /// On transport success (any HTTP response received): + /// - Returns the HTTP status code (e.g. 200, 400, 530). + /// - Writes a [`BytesSource`] containing the response body bytes to `*out`. + /// + /// On transport failure (connection refused, timeout, etc.): + /// - Returns [`errno::HTTP_ERROR`] (21). + /// - Writes a [`BytesSource`] containing a BSATN-encoded error [`String`] to `*out`. + /// + /// Unlike `procedure_http_request`, this syscall may be called while a transaction + /// is open (i.e. from within a reducer body). + /// + /// # Traps + /// + /// Traps if any pointer is NULL or its range falls outside of linear memory. + pub fn call_reducer_on_db( + identity_ptr: *const u8, // exactly 32 bytes, BSATN-encoded Identity + reducer_ptr: *const u8, + reducer_len: u32, + args_ptr: *const u8, + args_len: u32, + out: *mut BytesSource, + ) -> u16; + } + /// What strategy does the database index use? /// /// See also: diff --git a/crates/client-api/src/routes/database.rs b/crates/client-api/src/routes/database.rs index 6b753a9c8fd..881abcc0e18 100644 --- a/crates/client-api/src/routes/database.rs +++ b/crates/client-api/src/routes/database.rs @@ -136,13 +136,11 @@ pub async fn call( reducer, }): Path, TypedHeader(content_type): TypedHeader, - ByteStringBody(body): ByteStringBody, + body: Bytes, ) -> axum::response::Result { - assert_content_type_json(content_type)?; - let caller_identity = auth.claims.identity; - let args = FunctionArgs::Json(body); + let args = parse_call_args(content_type, body)?; // HTTP callers always need a connection ID to provide to connect/disconnect, // so generate one. @@ -216,11 +214,19 @@ pub async fn call( } } -fn assert_content_type_json(content_type: headers::ContentType) -> axum::response::Result<()> { - if content_type != headers::ContentType::json() { - Err(axum::extract::rejection::MissingJsonContentType::default().into()) +/// Parse call arguments from an HTTP body based on content type. +/// +/// - `application/json` → [`FunctionArgs::Json`] (UTF-8 required). +/// - `application/octet-stream` → [`FunctionArgs::Bsatn`] (raw BSATN bytes). +fn parse_call_args(content_type: headers::ContentType, body: Bytes) -> axum::response::Result { + if content_type == headers::ContentType::json() { + let s = bytestring::ByteString::try_from(body) + .map_err(|_| (StatusCode::BAD_REQUEST, "request body is not valid UTF-8").into_response())?; + Ok(FunctionArgs::Json(s)) + } else if content_type == headers::ContentType::from(mime::APPLICATION_OCTET_STREAM) { + Ok(FunctionArgs::Bsatn(body)) } else { - Ok(()) + Err(axum::extract::rejection::MissingJsonContentType::default().into()) } } diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index e67e67540eb..a813a1cb426 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -13,7 +13,7 @@ use crate::host::v8::V8Runtime; use crate::host::ProcedureCallError; use crate::messages::control_db::{Database, HostType}; use crate::module_host_context::ModuleCreationContext; -use crate::replica_context::ReplicaContext; +use crate::replica_context::{CallReducerOnDbConfig, ReplicaContext}; use crate::subscription::module_subscription_actor::ModuleSubscriptions; use crate::subscription::module_subscription_manager::{spawn_send_worker, SubscriptionManager, TransactionOffset}; use crate::subscription::row_list_builder_pool::BsatnRowListBuilderPool; @@ -117,6 +117,11 @@ pub struct HostController { db_cores: JobCores, /// The pool of buffers used to build `BsatnRowList`s in subscriptions. pub bsatn_rlb_pool: BsatnRowListBuilderPool, + /// Warmed HTTP/2 client shared by all replicas on this host for + /// [`crate::host::instance_env::InstanceEnv::call_reducer_on_db`]. + /// + /// All per-replica clones share the same underlying connection pool. + pub call_reducer_client: reqwest::Client, } pub(crate) struct HostRuntimes { @@ -228,6 +233,7 @@ impl HostController { page_pool: PagePool::new(default_config.page_pool_max_size), bsatn_rlb_pool: BsatnRowListBuilderPool::new(), db_cores, + call_reducer_client: ReplicaContext::new_call_reducer_client(&CallReducerOnDbConfig::default()), } } @@ -664,6 +670,7 @@ async fn make_replica_ctx( replica_id: u64, relational_db: Arc, bsatn_rlb_pool: BsatnRowListBuilderPool, + call_reducer_client: reqwest::Client, ) -> anyhow::Result { let logger = match module_logs { Some(path) => asyncify(move || Arc::new(DatabaseLogger::open_today(path))).await, @@ -696,6 +703,7 @@ async fn make_replica_ctx( replica_id, logger, subscriptions, + call_reducer_client, }) } @@ -771,6 +779,7 @@ struct ModuleLauncher { runtimes: Arc, core: AllocatedJobCore, bsatn_rlb_pool: BsatnRowListBuilderPool, + call_reducer_client: reqwest::Client, } impl ModuleLauncher { @@ -790,6 +799,7 @@ impl ModuleLauncher { self.replica_id, self.relational_db, self.bsatn_rlb_pool, + self.call_reducer_client, ) .await .map(Arc::new)?; @@ -991,6 +1001,7 @@ impl Host { runtimes: runtimes.clone(), core: host_controller.db_cores.take(), bsatn_rlb_pool: bsatn_rlb_pool.clone(), + call_reducer_client: host_controller.call_reducer_client.clone(), } .launch_module() .await? @@ -1020,6 +1031,7 @@ impl Host { runtimes: runtimes.clone(), core: host_controller.db_cores.take(), bsatn_rlb_pool: bsatn_rlb_pool.clone(), + call_reducer_client: host_controller.call_reducer_client.clone(), } .launch_module() .await; @@ -1043,6 +1055,7 @@ impl Host { runtimes: runtimes.clone(), core: host_controller.db_cores.take(), bsatn_rlb_pool: bsatn_rlb_pool.clone(), + call_reducer_client: host_controller.call_reducer_client.clone(), } .launch_module() .await; @@ -1150,6 +1163,8 @@ impl Host { runtimes: runtimes.clone(), core, bsatn_rlb_pool, + // Transient validation-only module; build its own client with defaults. + call_reducer_client: ReplicaContext::new_call_reducer_client(&CallReducerOnDbConfig::default()), } .launch_module() .await diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index 311c34775fd..641e1692485 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -977,6 +977,48 @@ impl InstanceEnv { Ok((response, body)) }) } + + /// Call a reducer on a remote database via the local reverse proxy (`localhost:80`). + /// + /// Unlike [`Self::http_request`], this is explicitly allowed while a transaction is open — + /// the caller is responsible for understanding the consistency implications. + /// + /// Uses the warmed HTTP/2 client stored in [`ReplicaContext::call_reducer_client`], + /// configured when the replica was constructed. + /// + /// Returns `(http_status, response_body)` on transport success, + /// or [`NodesError::HttpError`] if the connection itself fails. + pub fn call_reducer_on_db( + &self, + database_identity: Identity, + reducer_name: &str, + args: bytes::Bytes, + ) -> impl Future> + use<> { + let client = self.replica_ctx.call_reducer_client.clone(); + let url = format!( + "http://localhost/v1/database/{}/call/{}", + database_identity.to_hex(), + reducer_name, + ); + + async move { + let response = client + .post(&url) + .header(http::header::CONTENT_TYPE, "application/octet-stream") + .body(args) + .send() + .await + .map_err(|e| NodesError::HttpError(e.to_string()))?; + + let status = response.status().as_u16(); + let body = response + .bytes() + .await + .map_err(|e| NodesError::HttpError(e.to_string()))?; + + Ok((status, body)) + } + } } /// Default timeout for HTTP requests performed by [`InstanceEnv::http_request`]. @@ -1317,7 +1359,7 @@ mod test { }, host::Scheduler, messages::control_db::{Database, HostType}, - replica_context::ReplicaContext, + replica_context::{CallReducerOnDbConfig, ReplicaContext}, subscription::module_subscription_actor::ModuleSubscriptions, }; use anyhow::{anyhow, Result}; @@ -1351,6 +1393,7 @@ mod test { replica_id: 0, logger, subscriptions: subs, + call_reducer_client: ReplicaContext::new_call_reducer_client(&CallReducerOnDbConfig::default()), }, runtime, )) diff --git a/crates/core/src/host/mod.rs b/crates/core/src/host/mod.rs index 0daa9c359bc..3123913fcf6 100644 --- a/crates/core/src/host/mod.rs +++ b/crates/core/src/host/mod.rs @@ -194,4 +194,6 @@ pub enum AbiCall { ProcedureCommitMutTransaction, ProcedureAbortMutTransaction, ProcedureHttpRequest, + + CallReducerOnDb, } diff --git a/crates/core/src/host/wasm_common.rs b/crates/core/src/host/wasm_common.rs index a5c737d54d6..2c0c5fa57e6 100644 --- a/crates/core/src/host/wasm_common.rs +++ b/crates/core/src/host/wasm_common.rs @@ -436,6 +436,10 @@ macro_rules! abi_funcs { $link_async! { "spacetime_10.3"::procedure_sleep_until, "spacetime_10.3"::procedure_http_request, + + // Call a reducer on another database while holding (or not holding) a transaction. + // Uses a warmed HTTP/2 connection pool to localhost:80. + "spacetime_10.5"::call_reducer_on_db, } }; } diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index 74a57b35e92..868aa8b2ea5 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -17,7 +17,7 @@ use crate::subscription::module_subscription_manager::TransactionOffset; use anyhow::{anyhow, Context as _}; use spacetimedb_data_structures::map::IntMap; use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, MutTxId, ViewCallInfo}; -use spacetimedb_lib::{bsatn, ConnectionId, Timestamp}; +use spacetimedb_lib::{bsatn, ConnectionId, Identity, Timestamp}; use spacetimedb_primitives::errno::HOST_CALL_FAILURE; use spacetimedb_primitives::{errno, ColId}; use spacetimedb_schema::def::ModuleDef; @@ -1943,6 +1943,88 @@ impl WasmInstanceEnv { ) }) } + + /// Call a reducer on another SpacetimeDB database via the local reverse proxy at `localhost:80`. + /// + /// - `identity_ptr` must point to exactly 32 bytes — the BSATN (little-endian) encoding of the + /// target [`Identity`]. + /// - `reducer_ptr[..reducer_len]` is the UTF-8 name of the reducer to call. + /// - `args_ptr[..args_len]` is the BSATN-encoded reducer arguments. + /// + /// On transport success (any HTTP response received from the server): + /// - Returns the HTTP status code (e.g. 200, 400, 530). + /// - Writes a [`BytesSource`] containing the response body bytes to `*out`. + /// + /// On transport failure (connection refused, timeout, etc.): + /// - Returns [`errno::HTTP_ERROR`]. + /// - Writes a [`BytesSource`] containing a BSATN-encoded error [`String`] to `*out`. + /// + /// Unlike [`Self::procedure_http_request`], this ABI may be called while holding + /// an open transaction (i.e. from within a reducer body). + /// + /// # Traps + /// + /// Traps if any pointer is NULL or its range falls outside of linear memory. + pub fn call_reducer_on_db<'caller>( + caller: Caller<'caller, Self>, + (identity_ptr, reducer_ptr, reducer_len, args_ptr, args_len, out): ( + WasmPtr, + WasmPtr, + u32, + WasmPtr, + u32, + WasmPtr, + ), + ) -> Fut<'caller, RtResult> { + Self::async_with_span(caller, AbiCall::CallReducerOnDb, move |mut caller| async move { + let (mem, env) = Self::mem_env(&mut caller); + + #[allow(clippy::redundant_closure_call)] + let res = (async move || { + // Read the 32-byte BSATN-encoded Identity (little-endian). + let identity_slice = mem.deref_slice(identity_ptr, 32)?; + let identity_bytes: [u8; 32] = identity_slice + .try_into() + .expect("deref_slice(ptr, 32) always yields exactly 32 bytes"); + let database_identity = Identity::from_byte_array(identity_bytes); + + // Read the reducer name as a UTF-8 string. + let reducer_name = mem.deref_str(reducer_ptr, reducer_len)?; + + // Read the BSATN-encoded args as raw bytes. + let args_buf = mem.deref_slice(args_ptr, args_len)?; + let args = bytes::Bytes::copy_from_slice(args_buf); + + let result = env + .instance_env + .call_reducer_on_db(database_identity, reducer_name, args) + .await; + + match result { + Ok((status, body)) => { + let bytes_source = WasmInstanceEnv::create_bytes_source(env, body)?; + bytes_source.0.write_to(mem, out)?; + Ok(status as u32) + } + Err(NodesError::HttpError(err)) => { + let err_bytes = bsatn::to_vec(&err).with_context(|| { + format!("Failed to BSATN-serialize call_reducer_on_db transport error: {err:?}") + })?; + let bytes_source = WasmInstanceEnv::create_bytes_source(env, err_bytes.into())?; + bytes_source.0.write_to(mem, out)?; + Ok(errno::HTTP_ERROR.get() as u32) + } + Err(e) => Err(WasmError::Db(e)), + } + })() + .await; + + ( + caller, + res.or_else(|err| Self::convert_wasm_result(AbiCall::CallReducerOnDb, err)), + ) + }) + } } type Fut<'caller, T> = Box>; diff --git a/crates/core/src/host/wasmtime/wasmtime_module.rs b/crates/core/src/host/wasmtime/wasmtime_module.rs index 48ac0fe80e2..0690120eb16 100644 --- a/crates/core/src/host/wasmtime/wasmtime_module.rs +++ b/crates/core/src/host/wasmtime/wasmtime_module.rs @@ -50,7 +50,7 @@ impl WasmtimeModule { WasmtimeModule { module } } - pub const IMPLEMENTED_ABI: abi::VersionTuple = abi::VersionTuple::new(10, 4); + pub const IMPLEMENTED_ABI: abi::VersionTuple = abi::VersionTuple::new(10, 5); pub(super) fn link_imports(linker: &mut Linker) -> anyhow::Result<()> { const { assert!(WasmtimeModule::IMPLEMENTED_ABI.major == spacetimedb_lib::MODULE_ABI_MAJOR_VERSION) }; diff --git a/crates/core/src/replica_context.rs b/crates/core/src/replica_context.rs index 731288bb459..ede7fa44cdd 100644 --- a/crates/core/src/replica_context.rs +++ b/crates/core/src/replica_context.rs @@ -8,9 +8,37 @@ use crate::subscription::module_subscription_actor::ModuleSubscriptions; use std::io; use std::ops::Deref; use std::sync::Arc; +use std::time::Duration; pub type Result = anyhow::Result; +/// Configuration for the HTTP/2 client used to call reducers on remote databases. +/// +/// Pass to [`ReplicaContext::new_call_reducer_client`] or supply directly when +/// constructing [`ReplicaContext`]. +#[derive(Debug, Clone)] +pub struct CallReducerOnDbConfig { + /// How long idle connections are held open. Default: 90 s. + pub pool_idle_timeout: Duration, + /// Max idle connections per host. Default: 8. + pub pool_max_idle_per_host: usize, + /// TCP keepalive sent to the OS. Default: 20 s. + pub tcp_keepalive: Duration, + /// Per-request timeout. Default: 30 s. + pub request_timeout: Duration, +} + +impl Default for CallReducerOnDbConfig { + fn default() -> Self { + Self { + pool_idle_timeout: Duration::from_secs(90), + pool_max_idle_per_host: 8, + tcp_keepalive: Duration::from_secs(20), + request_timeout: Duration::from_secs(30), + } + } +} + /// A "live" database. #[derive(Clone)] pub struct ReplicaContext { @@ -18,6 +46,24 @@ pub struct ReplicaContext { pub replica_id: u64, pub logger: Arc, pub subscriptions: ModuleSubscriptions, + /// Warmed HTTP/2 client for [`crate::host::instance_env::InstanceEnv::call_reducer_on_db`]. + /// + /// `reqwest::Client` is internally an `Arc`, so cloning `ReplicaContext` shares the pool. + pub call_reducer_client: reqwest::Client, +} + +impl ReplicaContext { + /// Build a warmed `reqwest::Client` from `config`. + pub fn new_call_reducer_client(config: &CallReducerOnDbConfig) -> reqwest::Client { + reqwest::Client::builder() + .http2_prior_knowledge() + .tcp_keepalive(config.tcp_keepalive) + .pool_idle_timeout(config.pool_idle_timeout) + .pool_max_idle_per_host(config.pool_max_idle_per_host) + .timeout(config.request_timeout) + .build() + .expect("failed to build call_reducer_on_db HTTP/2 client") + } } impl ReplicaContext { From 24b4e62ec92640070aca39a6c5ded9cf50a1717a Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Sat, 28 Mar 2026 11:44:20 +0530 Subject: [PATCH 2/9] unreviewed code --- crates/bindings-sys/src/lib.rs | 35 +++++++ crates/bindings/src/lib.rs | 1 + crates/bindings/src/remote_reducer.rs | 52 ++++++++++ crates/core/src/host/host_controller.rs | 17 +++- crates/core/src/host/instance_env.rs | 25 +++-- crates/core/src/host/mod.rs | 1 + crates/core/src/host/reducer_router.rs | 61 ++++++++++++ crates/core/src/replica_context.rs | 6 ++ .../tests/smoketests/cross_db_reducer.rs | 95 +++++++++++++++++++ crates/smoketests/tests/smoketests/mod.rs | 1 + 10 files changed, 285 insertions(+), 9 deletions(-) create mode 100644 crates/bindings/src/remote_reducer.rs create mode 100644 crates/core/src/host/reducer_router.rs create mode 100644 crates/smoketests/tests/smoketests/cross_db_reducer.rs diff --git a/crates/bindings-sys/src/lib.rs b/crates/bindings-sys/src/lib.rs index 79835e120a3..c9fb1849fb6 100644 --- a/crates/bindings-sys/src/lib.rs +++ b/crates/bindings-sys/src/lib.rs @@ -1471,6 +1471,41 @@ pub fn identity() -> [u8; 32] { buf } +/// Call a reducer on a remote database identified by `identity` (little-endian 32-byte array). +/// +/// On transport success (any HTTP response received): +/// - Returns `Ok((status, body_source))` where `status` is the HTTP status code and +/// `body_source` is a [`raw::BytesSource`] containing the raw response body bytes. +/// +/// On transport failure (connection refused, timeout, etc.): +/// - Returns `Err(err_source)` where `err_source` is a [`raw::BytesSource`] containing +/// a BSATN-encoded error [`String`]. +/// +/// Unlike HTTP requests, this syscall may be called while a transaction is open. +#[inline] +pub fn call_reducer_on_db( + identity: [u8; 32], + reducer_name: &str, + args: &[u8], +) -> Result<(u16, raw::BytesSource), raw::BytesSource> { + let mut out = raw::BytesSource::INVALID; + let status = unsafe { + raw::call_reducer_on_db( + identity.as_ptr(), + reducer_name.as_ptr(), + reducer_name.len() as u32, + args.as_ptr(), + args.len() as u32, + &mut out, + ) + }; + match Errno::from_code(status) { + None => Ok((status, out)), + Some(errno) if errno == Errno::HTTP_ERROR => Err(out), + Some(errno) => panic!("call_reducer_on_db: unexpected errno {errno}"), + } +} + /// Finds the JWT payload associated with `connection_id`. /// If nothing is found for the connection, this returns None. /// If a payload is found, this will return a valid [`raw::BytesSource`]. diff --git a/crates/bindings/src/lib.rs b/crates/bindings/src/lib.rs index 9e02a3a97f0..f91151ffb88 100644 --- a/crates/bindings/src/lib.rs +++ b/crates/bindings/src/lib.rs @@ -12,6 +12,7 @@ mod client_visibility_filter; pub mod http; pub mod log_stopwatch; mod logger; +pub mod remote_reducer; #[cfg(feature = "rand08")] mod rng; #[doc(hidden)] diff --git a/crates/bindings/src/remote_reducer.rs b/crates/bindings/src/remote_reducer.rs new file mode 100644 index 00000000000..80a71d3613f --- /dev/null +++ b/crates/bindings/src/remote_reducer.rs @@ -0,0 +1,52 @@ +//! Naive binding for calling reducers on remote SpacetimeDB databases. +//! +//! Call a reducer on another database using [`call_reducer_on_db`]. +//! +//! The args must be BSATN-encoded. The response body is raw bytes returned by +//! the remote database's HTTP handler. An HTTP status >= 400 does not cause an +//! `Err` return; only a transport failure (connection refused, timeout, …) does. +//! +//! # Example +//! +//! ```no_run +//! use spacetimedb::{remote_reducer, Identity}; +//! +//! #[spacetimedb::reducer] +//! fn call_remote(ctx: &spacetimedb::ReducerContext, target: Identity) { +//! // Empty BSATN args for a zero-argument reducer. +//! let args = spacetimedb::bsatn::to_vec(&()).unwrap(); +//! match remote_reducer::call_reducer_on_db(target, "my_reducer", &args) { +//! Ok((status, body)) => log::info!("status={status} body={body:?}"), +//! Err(msg) => log::error!("transport error: {msg}"), +//! } +//! } +//! ``` + +use crate::{rt::{read_bytes_source_as, read_bytes_source_into}, IterBuf, Identity}; + +/// Call a reducer on a remote database. +/// +/// - `database_identity`: the target database. +/// - `reducer_name`: the name of the reducer to invoke (must be valid UTF-8). +/// - `args`: BSATN-encoded reducer arguments. +/// +/// Returns `Ok((status, body))` on any transport success (including HTTP errors like 400/500). +/// Returns `Err(message)` on transport failure (connection refused, timeout, …). +pub fn call_reducer_on_db( + database_identity: Identity, + reducer_name: &str, + args: &[u8], +) -> Result<(u16, Vec), String> { + let identity_bytes = database_identity.to_byte_array(); + match spacetimedb_bindings_sys::call_reducer_on_db(identity_bytes, reducer_name, args) { + Ok((status, body_source)) => { + let mut buf = IterBuf::take(); + read_bytes_source_into(body_source, &mut buf); + Ok((status, buf.to_vec())) + } + Err(err_source) => { + let message = read_bytes_source_as::(err_source); + Err(message) + } + } +} diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index a813a1cb426..a2a18606614 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -13,6 +13,7 @@ use crate::host::v8::V8Runtime; use crate::host::ProcedureCallError; use crate::messages::control_db::{Database, HostType}; use crate::module_host_context::ModuleCreationContext; +use crate::host::reducer_router::{LocalReducerRouter, ReducerCallRouter}; use crate::replica_context::{CallReducerOnDbConfig, ReplicaContext}; use crate::subscription::module_subscription_actor::ModuleSubscriptions; use crate::subscription::module_subscription_manager::{spawn_send_worker, SubscriptionManager, TransactionOffset}; @@ -122,6 +123,11 @@ pub struct HostController { /// /// All per-replica clones share the same underlying connection pool. pub call_reducer_client: reqwest::Client, + /// Router that resolves the HTTP base URL of the leader node for a given database. + /// + /// Set to [`LocalReducerRouter`] by default; replaced with `ClusterReducerRouter` + /// in cluster deployments via [`HostController::new`] receiving the router directly. + pub call_reducer_router: Arc, } pub(crate) struct HostRuntimes { @@ -234,6 +240,7 @@ impl HostController { bsatn_rlb_pool: BsatnRowListBuilderPool::new(), db_cores, call_reducer_client: ReplicaContext::new_call_reducer_client(&CallReducerOnDbConfig::default()), + call_reducer_router: Arc::new(LocalReducerRouter::new("http://127.0.0.1:3000")), } } @@ -671,6 +678,7 @@ async fn make_replica_ctx( relational_db: Arc, bsatn_rlb_pool: BsatnRowListBuilderPool, call_reducer_client: reqwest::Client, + call_reducer_router: Arc, ) -> anyhow::Result { let logger = match module_logs { Some(path) => asyncify(move || Arc::new(DatabaseLogger::open_today(path))).await, @@ -704,6 +712,7 @@ async fn make_replica_ctx( logger, subscriptions, call_reducer_client, + call_reducer_router, }) } @@ -780,6 +789,7 @@ struct ModuleLauncher { core: AllocatedJobCore, bsatn_rlb_pool: BsatnRowListBuilderPool, call_reducer_client: reqwest::Client, + call_reducer_router: Arc, } impl ModuleLauncher { @@ -800,6 +810,7 @@ impl ModuleLauncher { self.relational_db, self.bsatn_rlb_pool, self.call_reducer_client, + self.call_reducer_router, ) .await .map(Arc::new)?; @@ -1002,6 +1013,7 @@ impl Host { core: host_controller.db_cores.take(), bsatn_rlb_pool: bsatn_rlb_pool.clone(), call_reducer_client: host_controller.call_reducer_client.clone(), + call_reducer_router: host_controller.call_reducer_router.clone(), } .launch_module() .await? @@ -1032,6 +1044,7 @@ impl Host { core: host_controller.db_cores.take(), bsatn_rlb_pool: bsatn_rlb_pool.clone(), call_reducer_client: host_controller.call_reducer_client.clone(), + call_reducer_router: host_controller.call_reducer_router.clone(), } .launch_module() .await; @@ -1056,6 +1069,7 @@ impl Host { core: host_controller.db_cores.take(), bsatn_rlb_pool: bsatn_rlb_pool.clone(), call_reducer_client: host_controller.call_reducer_client.clone(), + call_reducer_router: host_controller.call_reducer_router.clone(), } .launch_module() .await; @@ -1163,8 +1177,9 @@ impl Host { runtimes: runtimes.clone(), core, bsatn_rlb_pool, - // Transient validation-only module; build its own client with defaults. + // Transient validation-only module; build its own client and router with defaults. call_reducer_client: ReplicaContext::new_call_reducer_client(&CallReducerOnDbConfig::default()), + call_reducer_router: Arc::new(LocalReducerRouter::new("http://127.0.0.1:3000")), } .launch_module() .await diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index 641e1692485..f778621e470 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -983,8 +983,9 @@ impl InstanceEnv { /// Unlike [`Self::http_request`], this is explicitly allowed while a transaction is open — /// the caller is responsible for understanding the consistency implications. /// - /// Uses the warmed HTTP/2 client stored in [`ReplicaContext::call_reducer_client`], - /// configured when the replica was constructed. + /// Uses [`ReplicaContext::call_reducer_router`] to resolve the leader node for + /// `database_identity`, then sends the request via the warmed HTTP/2 client in + /// [`ReplicaContext::call_reducer_client`]. /// /// Returns `(http_status, response_body)` on transport success, /// or [`NodesError::HttpError`] if the connection itself fails. @@ -995,13 +996,20 @@ impl InstanceEnv { args: bytes::Bytes, ) -> impl Future> + use<> { let client = self.replica_ctx.call_reducer_client.clone(); - let url = format!( - "http://localhost/v1/database/{}/call/{}", - database_identity.to_hex(), - reducer_name, - ); + let router = self.replica_ctx.call_reducer_router.clone(); + let reducer_name = reducer_name.to_owned(); async move { + let base_url = router + .resolve_base_url(database_identity) + .await + .map_err(|e| NodesError::HttpError(e.to_string()))?; + let url = format!( + "{}/v1/database/{}/call/{}", + base_url, + database_identity.to_hex(), + reducer_name, + ); let response = client .post(&url) .header(http::header::CONTENT_TYPE, "application/octet-stream") @@ -1357,7 +1365,7 @@ mod test { tests_utils::{begin_mut_tx, with_auto_commit, with_read_only, TestDB}, RelationalDB, }, - host::Scheduler, + host::{reducer_router::LocalReducerRouter, Scheduler}, messages::control_db::{Database, HostType}, replica_context::{CallReducerOnDbConfig, ReplicaContext}, subscription::module_subscription_actor::ModuleSubscriptions, @@ -1394,6 +1402,7 @@ mod test { logger, subscriptions: subs, call_reducer_client: ReplicaContext::new_call_reducer_client(&CallReducerOnDbConfig::default()), + call_reducer_router: Arc::new(LocalReducerRouter::new("http://127.0.0.1:3000")), }, runtime, )) diff --git a/crates/core/src/host/mod.rs b/crates/core/src/host/mod.rs index 3123913fcf6..25e56ca217e 100644 --- a/crates/core/src/host/mod.rs +++ b/crates/core/src/host/mod.rs @@ -20,6 +20,7 @@ pub mod wasmtime; // Visible for integration testing. pub mod instance_env; +pub mod reducer_router; pub mod v8; // only pub for testing mod wasm_common; diff --git a/crates/core/src/host/reducer_router.rs b/crates/core/src/host/reducer_router.rs new file mode 100644 index 00000000000..93891ecc6e6 --- /dev/null +++ b/crates/core/src/host/reducer_router.rs @@ -0,0 +1,61 @@ +/// Trait for resolving which node to contact when calling a reducer on another database. +/// +/// Implementations differ between deployment modes: +/// +/// - **Standalone / single-node** — [`LocalReducerRouter`] always returns the local node's +/// HTTP base URL. Every database is on the same node, so there is nothing to resolve. +/// +/// - **Cluster / multi-node** — `ClusterReducerRouter` (private crate) queries the control DB +/// to discover the leader replica's node and returns that node's advertise address. +/// +/// The trait is object-safe (futures are boxed) so it can be stored as `Arc` +/// in [`crate::replica_context::ReplicaContext`] and swapped at startup. +use spacetimedb_lib::Identity; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; + +pub type BoxFuture<'a, T> = Pin + Send + 'a>>; + +pub trait ReducerCallRouter: Send + Sync + 'static { + /// Return the HTTP base URL (e.g. `"http://10.0.0.5:3000"`) of the node that + /// is the leader for `database_identity`. + /// + /// The caller appends `/v1/database/{identity}/call/{reducer}` to produce the full URL. + /// + /// # Errors + /// + /// Returns an error string when the leader cannot be resolved + /// (database not found, no leader elected yet, node has no network address, etc.). + fn resolve_base_url<'a>(&'a self, database_identity: Identity) -> BoxFuture<'a, anyhow::Result>; +} + +// Arc is itself a ReducerCallRouter. +impl ReducerCallRouter for Arc { + fn resolve_base_url<'a>(&'a self, database_identity: Identity) -> BoxFuture<'a, anyhow::Result> { + (**self).resolve_base_url(database_identity) + } +} + +/// Single-node implementation of [`ReducerCallRouter`]. +/// +/// Always routes to the same fixed base URL regardless of which database is targeted. +/// Suitable for standalone (single-node) deployments where every database is local. +/// +/// For cluster deployments, replace this with `ClusterReducerRouter` from the private crate. +pub struct LocalReducerRouter { + pub base_url: String, +} + +impl LocalReducerRouter { + pub fn new(base_url: impl Into) -> Self { + Self { base_url: base_url.into() } + } +} + +impl ReducerCallRouter for LocalReducerRouter { + fn resolve_base_url<'a>(&'a self, _database_identity: Identity) -> BoxFuture<'a, anyhow::Result> { + let url = self.base_url.clone(); + Box::pin(async move { Ok(url) }) + } +} diff --git a/crates/core/src/replica_context.rs b/crates/core/src/replica_context.rs index ede7fa44cdd..99208940d3a 100644 --- a/crates/core/src/replica_context.rs +++ b/crates/core/src/replica_context.rs @@ -3,6 +3,7 @@ use spacetimedb_commitlog::SizeOnDisk; use super::database_logger::DatabaseLogger; use crate::db::relational_db::RelationalDB; use crate::error::DBError; +use crate::host::reducer_router::ReducerCallRouter; use crate::messages::control_db::Database; use crate::subscription::module_subscription_actor::ModuleSubscriptions; use std::io; @@ -50,6 +51,11 @@ pub struct ReplicaContext { /// /// `reqwest::Client` is internally an `Arc`, so cloning `ReplicaContext` shares the pool. pub call_reducer_client: reqwest::Client, + /// Resolves the HTTP base URL of the leader node for a given database identity. + /// + /// - Standalone: always returns the local node URL ([`crate::host::reducer_router::LocalReducerRouter`]). + /// - Cluster: queries the control DB to find the leader replica's node. + pub call_reducer_router: Arc, } impl ReplicaContext { diff --git a/crates/smoketests/tests/smoketests/cross_db_reducer.rs b/crates/smoketests/tests/smoketests/cross_db_reducer.rs new file mode 100644 index 00000000000..9fcd7f3f563 --- /dev/null +++ b/crates/smoketests/tests/smoketests/cross_db_reducer.rs @@ -0,0 +1,95 @@ +use spacetimedb_smoketests::Smoketest; + +/// Module code used for both the "receiver" and "caller" databases. +/// +/// - `record_ping(message)` is called by the caller via `call_reducer_on_db` and stores the +/// message in `ping_log`. +/// - `call_remote(target, message)` is the entry point: it BSATN-encodes `message` and invokes +/// `record_ping` on `target` over the cross-DB ABI. +const MODULE_CODE: &str = r#" +use spacetimedb::{log, ReducerContext, Table, Identity}; + +#[spacetimedb::table(name = ping_log, public)] +pub struct PingLog { + #[primary_key] + #[auto_inc] + id: u64, + message: String, +} + +/// Writes one row to `ping_log` with the given message. Called via the cross-DB ABI. +#[spacetimedb::reducer] +pub fn record_ping(ctx: &ReducerContext, message: String) { + log::info!("record_ping: got message: {}", message); + ctx.db.ping_log().insert(PingLog { id: 0, message }); +} + +/// Calls `record_ping(message)` on `target` via the cross-database ABI. +/// +/// Args are BSATN-encoded as a 1-tuple `(message,)` — the same layout the host-side +/// `invoke_reducer` expects when decoding a single-`String` reducer. +#[spacetimedb::reducer] +pub fn call_remote(ctx: &ReducerContext, target: Identity, message: String) { + let args = spacetimedb::bsatn::to_vec(&(message,)).expect("failed to encode args"); + match spacetimedb::remote_reducer::call_reducer_on_db(target, "record_ping", &args) { + Ok((status, _body)) => { + log::info!("call_remote: got HTTP status {}", status); + } + Err(err) => { + log::error!("call_remote: transport failure: {}", err); + panic!("call_reducer_on_db transport failure: {err}"); + } + } +} +"#; + +/// Smoke test for the cross-database reducer call ABI. +/// +/// Publishes the same module twice on one server, then calls `call_remote` on the +/// "caller" database with the "receiver" database's identity as an argument. +/// Verifies that `receiver` has a new row in `ping_log` written by the cross-DB call. +#[test] +fn test_cross_db_reducer_call() { + let pid = std::process::id(); + let receiver_name = format!("cross-db-receiver-{pid}"); + let caller_name = format!("cross-db-caller-{pid}"); + + // Build one server with the shared module code. + let mut test = Smoketest::builder() + .module_code(MODULE_CODE) + .autopublish(false) + .build(); + + // Publish the receiver database first. + test.publish_module_named(&receiver_name, false) + .expect("failed to publish receiver module"); + let receiver_identity = test + .database_identity + .clone() + .expect("receiver database_identity not set after publish"); + + // Publish the caller database (same code, different name). + test.publish_module_named(&caller_name, false) + .expect("failed to publish caller module"); + // test.database_identity is now caller_name — calls/sql default to caller. + + // Invoke call_remote on the caller, passing the receiver's identity and a test message. + test.call("call_remote", &[&receiver_identity, "hello from caller"]) + .expect("call_remote failed"); + + // Verify that the receiver's ping_log has the expected message row. + let result = test + .spacetime(&[ + "sql", + "--server", + &test.server_url, + &receiver_identity, + "SELECT message FROM ping_log", + ]) + .expect("sql query failed"); + + assert!( + result.contains("hello from caller"), + "Expected ping_log to contain 'hello from caller' after cross-DB call, got:\n{result}" + ); +} diff --git a/crates/smoketests/tests/smoketests/mod.rs b/crates/smoketests/tests/smoketests/mod.rs index f5053652dd3..d046c795d88 100644 --- a/crates/smoketests/tests/smoketests/mod.rs +++ b/crates/smoketests/tests/smoketests/mod.rs @@ -4,6 +4,7 @@ mod auto_inc; mod auto_migration; mod call; mod change_host_type; +mod cross_db_reducer; mod cli; mod client_connection_errors; mod confirmed_reads; From 279d86fa79e3e5649442b671737958c78bcb8460 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Sat, 28 Mar 2026 13:01:19 +0530 Subject: [PATCH 3/9] run smoketests --- crates/bindings-sys/src/lib.rs | 12 +- crates/bindings/src/remote_reducer.rs | 12 +- crates/core/src/host/wasm_common.rs | 11 +- .../src/host/wasmtime/wasm_instance_env.rs | 108 +++++++++--------- crates/core/src/replica_context.rs | 3 +- .../tests/smoketests/cross_db_reducer.rs | 10 +- crates/standalone/src/lib.rs | 16 ++- crates/standalone/src/subcommands/start.rs | 48 ++++---- crates/testing/src/modules.rs | 2 + 9 files changed, 122 insertions(+), 100 deletions(-) diff --git a/crates/bindings-sys/src/lib.rs b/crates/bindings-sys/src/lib.rs index c9fb1849fb6..1bad2a8abff 100644 --- a/crates/bindings-sys/src/lib.rs +++ b/crates/bindings-sys/src/lib.rs @@ -1499,10 +1499,14 @@ pub fn call_reducer_on_db( &mut out, ) }; - match Errno::from_code(status) { - None => Ok((status, out)), - Some(errno) if errno == Errno::HTTP_ERROR => Err(out), - Some(errno) => panic!("call_reducer_on_db: unexpected errno {errno}"), + // The raw ABI returns either the HTTP status code (100-599) or HTTP_ERROR errno + // on transport failure. Unlike other ABI functions, a non-zero return value here + // does NOT indicate a generic errno — it's the HTTP status code. Only HTTP_ERROR + // specifically signals a transport-level failure. + if status == Errno::HTTP_ERROR.code() { + Err(out) + } else { + Ok((status, out)) } } diff --git a/crates/bindings/src/remote_reducer.rs b/crates/bindings/src/remote_reducer.rs index 80a71d3613f..e0b68924458 100644 --- a/crates/bindings/src/remote_reducer.rs +++ b/crates/bindings/src/remote_reducer.rs @@ -40,9 +40,15 @@ pub fn call_reducer_on_db( let identity_bytes = database_identity.to_byte_array(); match spacetimedb_bindings_sys::call_reducer_on_db(identity_bytes, reducer_name, args) { Ok((status, body_source)) => { - let mut buf = IterBuf::take(); - read_bytes_source_into(body_source, &mut buf); - Ok((status, buf.to_vec())) + // INVALID signals an empty body (host optimization to avoid allocation). + let body = if body_source == spacetimedb_bindings_sys::raw::BytesSource::INVALID { + Vec::new() + } else { + let mut buf = IterBuf::take(); + read_bytes_source_into(body_source, &mut buf); + buf.to_vec() + }; + Ok((status, body)) } Err(err_source) => { let message = read_bytes_source_as::(err_source); diff --git a/crates/core/src/host/wasm_common.rs b/crates/core/src/host/wasm_common.rs index 2c0c5fa57e6..b5bba032d7c 100644 --- a/crates/core/src/host/wasm_common.rs +++ b/crates/core/src/host/wasm_common.rs @@ -433,13 +433,16 @@ macro_rules! abi_funcs { } + $link_sync! { + // Call a reducer on another database while holding (or not holding) a transaction. + // Implemented as a sync host function (using block_in_place) so it can be called + // from within a reducer body where only synchronous host functions are allowed. + "spacetime_10.5"::call_reducer_on_db, + } + $link_async! { "spacetime_10.3"::procedure_sleep_until, "spacetime_10.3"::procedure_http_request, - - // Call a reducer on another database while holding (or not holding) a transaction. - // Uses a warmed HTTP/2 connection pool to localhost:80. - "spacetime_10.5"::call_reducer_on_db, } }; } diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index 868aa8b2ea5..87182c30be2 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -1965,64 +1965,60 @@ impl WasmInstanceEnv { /// # Traps /// /// Traps if any pointer is NULL or its range falls outside of linear memory. - pub fn call_reducer_on_db<'caller>( - caller: Caller<'caller, Self>, - (identity_ptr, reducer_ptr, reducer_len, args_ptr, args_len, out): ( - WasmPtr, - WasmPtr, - u32, - WasmPtr, - u32, - WasmPtr, - ), - ) -> Fut<'caller, RtResult> { - Self::async_with_span(caller, AbiCall::CallReducerOnDb, move |mut caller| async move { - let (mem, env) = Self::mem_env(&mut caller); - - #[allow(clippy::redundant_closure_call)] - let res = (async move || { - // Read the 32-byte BSATN-encoded Identity (little-endian). - let identity_slice = mem.deref_slice(identity_ptr, 32)?; - let identity_bytes: [u8; 32] = identity_slice - .try_into() - .expect("deref_slice(ptr, 32) always yields exactly 32 bytes"); - let database_identity = Identity::from_byte_array(identity_bytes); - - // Read the reducer name as a UTF-8 string. - let reducer_name = mem.deref_str(reducer_ptr, reducer_len)?; - - // Read the BSATN-encoded args as raw bytes. - let args_buf = mem.deref_slice(args_ptr, args_len)?; - let args = bytes::Bytes::copy_from_slice(args_buf); - - let result = env - .instance_env - .call_reducer_on_db(database_identity, reducer_name, args) - .await; + pub fn call_reducer_on_db( + caller: Caller<'_, Self>, + identity_ptr: WasmPtr, + reducer_ptr: WasmPtr, + reducer_len: u32, + args_ptr: WasmPtr, + args_len: u32, + out: WasmPtr, + ) -> RtResult { + Self::cvt_custom(caller, AbiCall::CallReducerOnDb, |caller| { + let (mem, env) = Self::mem_env(caller); - match result { - Ok((status, body)) => { - let bytes_source = WasmInstanceEnv::create_bytes_source(env, body)?; - bytes_source.0.write_to(mem, out)?; - Ok(status as u32) - } - Err(NodesError::HttpError(err)) => { - let err_bytes = bsatn::to_vec(&err).with_context(|| { - format!("Failed to BSATN-serialize call_reducer_on_db transport error: {err:?}") - })?; - let bytes_source = WasmInstanceEnv::create_bytes_source(env, err_bytes.into())?; - bytes_source.0.write_to(mem, out)?; - Ok(errno::HTTP_ERROR.get() as u32) - } - Err(e) => Err(WasmError::Db(e)), + // Read the 32-byte BSATN-encoded Identity (little-endian). + let identity_slice = mem.deref_slice(identity_ptr, 32)?; + let identity_bytes: [u8; 32] = identity_slice + .try_into() + .expect("deref_slice(ptr, 32) always yields exactly 32 bytes"); + let database_identity = Identity::from_byte_array(identity_bytes); + + // Read the reducer name as a UTF-8 string. + let reducer_name = mem.deref_str(reducer_ptr, reducer_len)?.to_owned(); + + // Read the BSATN-encoded args as raw bytes. + let args_buf = mem.deref_slice(args_ptr, args_len)?; + let args = bytes::Bytes::copy_from_slice(args_buf); + + // Reducers run inside a tokio LocalSet (single-threaded), so block_in_place + // is unavailable and futures::executor::block_on can't drive tokio I/O. + // Spawn a new OS thread and call Handle::block_on from there, which is + // designed to be called from synchronous (non-async) contexts. + let handle = tokio::runtime::Handle::current(); + let fut = env.instance_env.call_reducer_on_db(database_identity, &reducer_name, args); + let result = std::thread::scope(|s| { + s.spawn(|| handle.block_on(fut)) + .join() + .expect("call_reducer_on_db: worker thread panicked") + }); + + match result { + Ok((status, body)) => { + let bytes_source = WasmInstanceEnv::create_bytes_source(env, body)?; + bytes_source.0.write_to(mem, out)?; + Ok(status as u32) } - })() - .await; - - ( - caller, - res.or_else(|err| Self::convert_wasm_result(AbiCall::CallReducerOnDb, err)), - ) + Err(NodesError::HttpError(err)) => { + let err_bytes = bsatn::to_vec(&err).with_context(|| { + format!("Failed to BSATN-serialize call_reducer_on_db transport error: {err:?}") + })?; + let bytes_source = WasmInstanceEnv::create_bytes_source(env, err_bytes.into())?; + bytes_source.0.write_to(mem, out)?; + Ok(errno::HTTP_ERROR.get() as u32) + } + Err(e) => Err(WasmError::Db(e)), + } }) } } diff --git a/crates/core/src/replica_context.rs b/crates/core/src/replica_context.rs index 99208940d3a..f4e20dcd4d7 100644 --- a/crates/core/src/replica_context.rs +++ b/crates/core/src/replica_context.rs @@ -62,13 +62,12 @@ impl ReplicaContext { /// Build a warmed `reqwest::Client` from `config`. pub fn new_call_reducer_client(config: &CallReducerOnDbConfig) -> reqwest::Client { reqwest::Client::builder() - .http2_prior_knowledge() .tcp_keepalive(config.tcp_keepalive) .pool_idle_timeout(config.pool_idle_timeout) .pool_max_idle_per_host(config.pool_max_idle_per_host) .timeout(config.request_timeout) .build() - .expect("failed to build call_reducer_on_db HTTP/2 client") + .expect("failed to build call_reducer_on_db HTTP client") } } diff --git a/crates/smoketests/tests/smoketests/cross_db_reducer.rs b/crates/smoketests/tests/smoketests/cross_db_reducer.rs index 9fcd7f3f563..8bfb88f34b7 100644 --- a/crates/smoketests/tests/smoketests/cross_db_reducer.rs +++ b/crates/smoketests/tests/smoketests/cross_db_reducer.rs @@ -9,7 +9,7 @@ use spacetimedb_smoketests::Smoketest; const MODULE_CODE: &str = r#" use spacetimedb::{log, ReducerContext, Table, Identity}; -#[spacetimedb::table(name = ping_log, public)] +#[spacetimedb::table(accessor = ping_log, public)] pub struct PingLog { #[primary_key] #[auto_inc] @@ -24,13 +24,15 @@ pub fn record_ping(ctx: &ReducerContext, message: String) { ctx.db.ping_log().insert(PingLog { id: 0, message }); } -/// Calls `record_ping(message)` on `target` via the cross-database ABI. +/// Calls `record_ping(message)` on `target_hex` via the cross-database ABI. /// +/// `target_hex` is the hex-encoded identity of the target database. /// Args are BSATN-encoded as a 1-tuple `(message,)` — the same layout the host-side /// `invoke_reducer` expects when decoding a single-`String` reducer. #[spacetimedb::reducer] -pub fn call_remote(ctx: &ReducerContext, target: Identity, message: String) { - let args = spacetimedb::bsatn::to_vec(&(message,)).expect("failed to encode args"); +pub fn call_remote(ctx: &ReducerContext, target_hex: String, message: String) { + let target = Identity::from_hex(&target_hex).expect("invalid target identity hex"); + let args = spacetimedb::spacetimedb_lib::bsatn::to_vec(&(message,)).expect("failed to encode args"); match spacetimedb::remote_reducer::call_reducer_on_db(target, "record_ping", &args) { Ok((status, _body)) => { log::info!("call_remote: got HTTP status {}", status); diff --git a/crates/standalone/src/lib.rs b/crates/standalone/src/lib.rs index 3343a9292b1..3b97c6d46fd 100644 --- a/crates/standalone/src/lib.rs +++ b/crates/standalone/src/lib.rs @@ -14,7 +14,9 @@ use spacetimedb::config::{CertificateAuthority, MetadataFile, V8HeapPolicyConfig use spacetimedb::db; use spacetimedb::db::persistence::LocalPersistenceProvider; use spacetimedb::energy::{EnergyBalance, EnergyQuanta, NullEnergyMonitor}; -use spacetimedb::host::{DiskStorage, HostController, MigratePlanResult, UpdateDatabaseResult}; +use spacetimedb::host::{ + reducer_router::LocalReducerRouter, DiskStorage, HostController, MigratePlanResult, UpdateDatabaseResult, +}; use spacetimedb::identity::{AuthCtx, Identity}; use spacetimedb::messages::control_db::{Database, Node, Replica}; use spacetimedb::subscription::row_list_builder_pool::BsatnRowListBuilderPool; @@ -38,11 +40,15 @@ use std::time::Duration; pub use spacetimedb_client_api::routes::subscribe::{BIN_PROTOCOL, TEXT_PROTOCOL}; -#[derive(Clone, Copy)] +#[derive(Clone)] pub struct StandaloneOptions { pub db_config: db::Config, pub websocket: WebSocketOptions, pub v8_heap_policy: V8HeapPolicyConfig, + /// HTTP base URL of this node's API server (e.g. `"http://127.0.0.1:3000"`). + /// Used to configure the `LocalReducerRouter` so that cross-DB reducer calls + /// reach the correct address when the server listens on a dynamic port. + pub local_api_url: String, } pub struct StandaloneEnv { @@ -76,7 +82,7 @@ impl StandaloneEnv { let program_store = Arc::new(DiskStorage::new(data_dir.program_bytes().0).await?); let persistence_provider = Arc::new(LocalPersistenceProvider::new(data_dir.clone())); - let host_controller = HostController::new( + let mut host_controller = HostController::new( data_dir, config.db_config, config.v8_heap_policy, @@ -85,6 +91,7 @@ impl StandaloneEnv { persistence_provider, db_cores, ); + host_controller.call_reducer_router = Arc::new(LocalReducerRouter::new(config.local_api_url)); let client_actor_index = ClientActorIndex::new(); let jwt_keys = certs.get_or_create_keys()?; @@ -651,9 +658,10 @@ mod tests { }, websocket: WebSocketOptions::default(), v8_heap_policy: V8HeapPolicyConfig::default(), + local_api_url: "http://127.0.0.1:3000".to_owned(), }; - let _env = StandaloneEnv::init(config, &ca, data_dir.clone(), JobCores::without_pinned_cores()).await?; + let _env = StandaloneEnv::init(config.clone(), &ca, data_dir.clone(), JobCores::without_pinned_cores()).await?; // Ensure that we have a lock. assert!( StandaloneEnv::init(config, &ca, data_dir.clone(), JobCores::without_pinned_cores()) diff --git a/crates/standalone/src/subcommands/start.rs b/crates/standalone/src/subcommands/start.rs index 399b7e9a350..30eb64c5a6c 100644 --- a/crates/standalone/src/subcommands/start.rs +++ b/crates/standalone/src/subcommands/start.rs @@ -177,29 +177,6 @@ pub async fn exec(args: &ArgMatches, db_cores: JobCores) -> anyhow::Result<()> { .or_else(|| cert_dir.map(CertificateAuthority::in_cli_config_dir)) .context("cannot omit --jwt-{pub,priv}-key-path when those options are not specified in config.toml")?; - let data_dir = Arc::new(data_dir.clone()); - let ctx = StandaloneEnv::init( - StandaloneOptions { - db_config, - websocket: config.websocket, - v8_heap_policy: config.common.v8_heap_policy, - }, - &certs, - data_dir, - db_cores, - ) - .await?; - worker_metrics::spawn_jemalloc_stats(listen_addr.clone()); - worker_metrics::spawn_tokio_stats(listen_addr.clone()); - worker_metrics::spawn_page_pool_stats(listen_addr.clone(), ctx.page_pool().clone()); - worker_metrics::spawn_bsatn_rlb_pool_stats(listen_addr.clone(), ctx.bsatn_rlb_pool().clone()); - let mut db_routes = DatabaseRoutes::default(); - db_routes.root_post = db_routes.root_post.layer(DefaultBodyLimit::disable()); - db_routes.db_put = db_routes.db_put.layer(DefaultBodyLimit::disable()); - db_routes.pre_publish = db_routes.pre_publish.layer(DefaultBodyLimit::disable()); - let extra = axum::Router::new().nest("/health", spacetimedb_client_api::routes::health::router()); - let service = router(&ctx, db_routes, IdentityRoutes::default(), extra).with_state(ctx.clone()); - // Check if the requested port is available on both IPv4 and IPv6. // If not, offer to find an available port by incrementing (unless non-interactive). let listen_addr = if let Some((host, port_str)) = listen_addr.rsplit_once(':') { @@ -251,6 +228,31 @@ pub async fn exec(args: &ArgMatches, db_cores: JobCores) -> anyhow::Result<()> { socket2::SockRef::from(&tcp).set_nodelay(true)?; log::info!("Starting SpacetimeDB listening on {}", tcp.local_addr()?); + let local_port = tcp.local_addr()?.port(); + let data_dir = Arc::new(data_dir.clone()); + let ctx = StandaloneEnv::init( + StandaloneOptions { + db_config, + websocket: config.websocket, + v8_heap_policy: config.common.v8_heap_policy, + local_api_url: format!("http://127.0.0.1:{local_port}"), + }, + &certs, + data_dir, + db_cores, + ) + .await?; + worker_metrics::spawn_jemalloc_stats(listen_addr.clone()); + worker_metrics::spawn_tokio_stats(listen_addr.clone()); + worker_metrics::spawn_page_pool_stats(listen_addr.clone(), ctx.page_pool().clone()); + worker_metrics::spawn_bsatn_rlb_pool_stats(listen_addr.clone(), ctx.bsatn_rlb_pool().clone()); + let mut db_routes = DatabaseRoutes::default(); + db_routes.root_post = db_routes.root_post.layer(DefaultBodyLimit::disable()); + db_routes.db_put = db_routes.db_put.layer(DefaultBodyLimit::disable()); + db_routes.pre_publish = db_routes.pre_publish.layer(DefaultBodyLimit::disable()); + let extra = axum::Router::new().nest("/health", spacetimedb_client_api::routes::health::router()); + let service = router(&ctx, db_routes, IdentityRoutes::default(), extra).with_state(ctx.clone()); + if let Some(pg_port) = pg_port { let server_addr = listen_addr.split(':').next().unwrap(); let tcp_pg = TcpListener::bind(format!("{server_addr}:{pg_port}")).await.context(format!( diff --git a/crates/testing/src/modules.rs b/crates/testing/src/modules.rs index 318fff8d0cf..0721409c64b 100644 --- a/crates/testing/src/modules.rs +++ b/crates/testing/src/modules.rs @@ -202,6 +202,8 @@ impl CompiledModule { db_config: config, websocket: WebSocketOptions::default(), v8_heap_policy: Default::default(), + // Tests use internal routing; cross-DB HTTP calls aren't tested here. + local_api_url: "http://127.0.0.1:3000".to_owned(), }, &certs, paths.data_dir.into(), From cd3e25bfe9d1a5b0ee8e9cc5d72f2e57b91fea91 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Sat, 28 Mar 2026 15:14:47 +0530 Subject: [PATCH 4/9] token --- crates/core/src/host/host_controller.rs | 19 +++++++ crates/core/src/host/instance_env.rs | 16 ++++-- crates/core/src/replica_context.rs | 8 +++ .../tests/smoketests/cross_db_reducer.rs | 55 +++++++++++-------- crates/standalone/src/lib.rs | 2 +- 5 files changed, 72 insertions(+), 28 deletions(-) diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index a2a18606614..5d8123ee427 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -128,6 +128,16 @@ pub struct HostController { /// Set to [`LocalReducerRouter`] by default; replaced with `ClusterReducerRouter` /// in cluster deployments via [`HostController::new`] receiving the router directly. pub call_reducer_router: Arc, + /// A single node-level Bearer token included in all outgoing cross-DB reducer calls. + /// + /// Set once at node startup by the deployment layer (standalone / cluster) so that + /// `anon_auth_middleware` on the target node accepts the request without generating a + /// fresh ephemeral identity on every call. All replicas on this node share the same + /// token — the target only needs proof that the caller is a legitimate node, not which + /// specific database initiated the call. + /// + /// `None` in test/embedded contexts where no JWT signer is configured. + pub call_reducer_auth_token: Option, } pub(crate) struct HostRuntimes { @@ -241,6 +251,7 @@ impl HostController { db_cores, call_reducer_client: ReplicaContext::new_call_reducer_client(&CallReducerOnDbConfig::default()), call_reducer_router: Arc::new(LocalReducerRouter::new("http://127.0.0.1:3000")), + call_reducer_auth_token: None, } } @@ -679,6 +690,7 @@ async fn make_replica_ctx( bsatn_rlb_pool: BsatnRowListBuilderPool, call_reducer_client: reqwest::Client, call_reducer_router: Arc, + call_reducer_auth_token: Option, ) -> anyhow::Result { let logger = match module_logs { Some(path) => asyncify(move || Arc::new(DatabaseLogger::open_today(path))).await, @@ -713,6 +725,7 @@ async fn make_replica_ctx( subscriptions, call_reducer_client, call_reducer_router, + call_reducer_auth_token, }) } @@ -790,6 +803,7 @@ struct ModuleLauncher { bsatn_rlb_pool: BsatnRowListBuilderPool, call_reducer_client: reqwest::Client, call_reducer_router: Arc, + call_reducer_auth_token: Option, } impl ModuleLauncher { @@ -811,6 +825,7 @@ impl ModuleLauncher { self.bsatn_rlb_pool, self.call_reducer_client, self.call_reducer_router, + self.call_reducer_auth_token, ) .await .map(Arc::new)?; @@ -1014,6 +1029,7 @@ impl Host { bsatn_rlb_pool: bsatn_rlb_pool.clone(), call_reducer_client: host_controller.call_reducer_client.clone(), call_reducer_router: host_controller.call_reducer_router.clone(), + call_reducer_auth_token: host_controller.call_reducer_auth_token.clone(), } .launch_module() .await? @@ -1045,6 +1061,7 @@ impl Host { bsatn_rlb_pool: bsatn_rlb_pool.clone(), call_reducer_client: host_controller.call_reducer_client.clone(), call_reducer_router: host_controller.call_reducer_router.clone(), + call_reducer_auth_token: host_controller.call_reducer_auth_token.clone(), } .launch_module() .await; @@ -1070,6 +1087,7 @@ impl Host { bsatn_rlb_pool: bsatn_rlb_pool.clone(), call_reducer_client: host_controller.call_reducer_client.clone(), call_reducer_router: host_controller.call_reducer_router.clone(), + call_reducer_auth_token: host_controller.call_reducer_auth_token.clone(), } .launch_module() .await; @@ -1180,6 +1198,7 @@ impl Host { // Transient validation-only module; build its own client and router with defaults. call_reducer_client: ReplicaContext::new_call_reducer_client(&CallReducerOnDbConfig::default()), call_reducer_router: Arc::new(LocalReducerRouter::new("http://127.0.0.1:3000")), + call_reducer_auth_token: None, } .launch_module() .await diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index f778621e470..2684e60a85a 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -998,6 +998,10 @@ impl InstanceEnv { let client = self.replica_ctx.call_reducer_client.clone(); let router = self.replica_ctx.call_reducer_router.clone(); let reducer_name = reducer_name.to_owned(); + // Node-level auth token: a single token minted at startup and shared by all replicas + // on this node. Passed as a Bearer token so `anon_auth_middleware` on the target node + // accepts the request without generating a fresh ephemeral identity per call. + let auth_token = self.replica_ctx.call_reducer_auth_token.clone(); async move { let base_url = router @@ -1010,13 +1014,14 @@ impl InstanceEnv { database_identity.to_hex(), reducer_name, ); - let response = client + let mut req = client .post(&url) .header(http::header::CONTENT_TYPE, "application/octet-stream") - .body(args) - .send() - .await - .map_err(|e| NodesError::HttpError(e.to_string()))?; + .body(args); + if let Some(token) = auth_token { + req = req.header(http::header::AUTHORIZATION, format!("Bearer {token}")); + } + let response = req.send().await.map_err(|e| NodesError::HttpError(e.to_string()))?; let status = response.status().as_u16(); let body = response @@ -1403,6 +1408,7 @@ mod test { subscriptions: subs, call_reducer_client: ReplicaContext::new_call_reducer_client(&CallReducerOnDbConfig::default()), call_reducer_router: Arc::new(LocalReducerRouter::new("http://127.0.0.1:3000")), + call_reducer_auth_token: None, }, runtime, )) diff --git a/crates/core/src/replica_context.rs b/crates/core/src/replica_context.rs index f4e20dcd4d7..2ecec56273e 100644 --- a/crates/core/src/replica_context.rs +++ b/crates/core/src/replica_context.rs @@ -56,6 +56,14 @@ pub struct ReplicaContext { /// - Standalone: always returns the local node URL ([`crate::host::reducer_router::LocalReducerRouter`]). /// - Cluster: queries the control DB to find the leader replica's node. pub call_reducer_router: Arc, + /// Pre-signed `Authorization: Bearer ` value for outgoing cross-DB reducer calls. + /// + /// Set at replica launch time by the deployment layer (standalone / cluster) using the + /// local JWT signing key. The token identifies this database as the caller, so the target + /// reducer sees a stable, verifiable identity instead of an anonymous ephemeral one. + /// + /// `None` in contexts where no JWT signer is configured (e.g. unit tests). + pub call_reducer_auth_token: Option, } impl ReplicaContext { diff --git a/crates/smoketests/tests/smoketests/cross_db_reducer.rs b/crates/smoketests/tests/smoketests/cross_db_reducer.rs index 8bfb88f34b7..f4d31364333 100644 --- a/crates/smoketests/tests/smoketests/cross_db_reducer.rs +++ b/crates/smoketests/tests/smoketests/cross_db_reducer.rs @@ -2,12 +2,19 @@ use spacetimedb_smoketests::Smoketest; /// Module code used for both the "receiver" and "caller" databases. /// -/// - `record_ping(message)` is called by the caller via `call_reducer_on_db` and stores the -/// message in `ping_log`. -/// - `call_remote(target, message)` is the entry point: it BSATN-encodes `message` and invokes +/// - `record_ping(payload)` is called by the caller via `call_reducer_on_db` and stores the +/// payload fields in `ping_log`. +/// - `call_remote(target, payload)` is the entry point: it BSATN-encodes `payload` and invokes /// `record_ping` on `target` over the cross-DB ABI. const MODULE_CODE: &str = r#" -use spacetimedb::{log, ReducerContext, Table, Identity}; +use spacetimedb::{log, ReducerContext, Table, Identity, SpacetimeType}; + +/// A structured ping payload — used to exercise BSATN encoding of a multi-field struct. +#[derive(SpacetimeType)] +pub struct PingPayload { + pub message: String, + pub priority: u32, +} #[spacetimedb::table(accessor = ping_log, public)] pub struct PingLog { @@ -15,24 +22,25 @@ pub struct PingLog { #[auto_inc] id: u64, message: String, + priority: u32, } -/// Writes one row to `ping_log` with the given message. Called via the cross-DB ABI. +/// Writes one row to `ping_log` from the payload. Called via the cross-DB ABI. #[spacetimedb::reducer] -pub fn record_ping(ctx: &ReducerContext, message: String) { - log::info!("record_ping: got message: {}", message); - ctx.db.ping_log().insert(PingLog { id: 0, message }); +pub fn record_ping(ctx: &ReducerContext, payload: PingPayload) { + log::info!("record_ping: got message={} priority={}", payload.message, payload.priority); + ctx.db.ping_log().insert(PingLog { id: 0, message: payload.message, priority: payload.priority }); } -/// Calls `record_ping(message)` on `target_hex` via the cross-database ABI. +/// Calls `record_ping(payload)` on `target_hex` via the cross-database ABI. /// /// `target_hex` is the hex-encoded identity of the target database. -/// Args are BSATN-encoded as a 1-tuple `(message,)` — the same layout the host-side -/// `invoke_reducer` expects when decoding a single-`String` reducer. +/// Args are BSATN-encoded as a 1-tuple `(payload,)`. #[spacetimedb::reducer] -pub fn call_remote(ctx: &ReducerContext, target_hex: String, message: String) { +pub fn call_remote(ctx: &ReducerContext, target_hex: String, message: String, priority: u32) { let target = Identity::from_hex(&target_hex).expect("invalid target identity hex"); - let args = spacetimedb::spacetimedb_lib::bsatn::to_vec(&(message,)).expect("failed to encode args"); + let payload = PingPayload { message, priority }; + let args = spacetimedb::spacetimedb_lib::bsatn::to_vec(&(payload,)).expect("failed to encode args"); match spacetimedb::remote_reducer::call_reducer_on_db(target, "record_ping", &args) { Ok((status, _body)) => { log::info!("call_remote: got HTTP status {}", status); @@ -49,7 +57,9 @@ pub fn call_remote(ctx: &ReducerContext, target_hex: String, message: String) { /// /// Publishes the same module twice on one server, then calls `call_remote` on the /// "caller" database with the "receiver" database's identity as an argument. -/// Verifies that `receiver` has a new row in `ping_log` written by the cross-DB call. +/// Passes a structured `PingPayload` (message + priority) to exercise multi-field +/// BSATN encoding over the cross-DB boundary. +/// Verifies that `receiver` has the expected row in `ping_log`. #[test] fn test_cross_db_reducer_call() { let pid = std::process::id(); @@ -57,10 +67,7 @@ fn test_cross_db_reducer_call() { let caller_name = format!("cross-db-caller-{pid}"); // Build one server with the shared module code. - let mut test = Smoketest::builder() - .module_code(MODULE_CODE) - .autopublish(false) - .build(); + let mut test = Smoketest::builder().module_code(MODULE_CODE).autopublish(false).build(); // Publish the receiver database first. test.publish_module_named(&receiver_name, false) @@ -75,18 +82,18 @@ fn test_cross_db_reducer_call() { .expect("failed to publish caller module"); // test.database_identity is now caller_name — calls/sql default to caller. - // Invoke call_remote on the caller, passing the receiver's identity and a test message. - test.call("call_remote", &[&receiver_identity, "hello from caller"]) + // Invoke call_remote on the caller, passing the receiver's identity, message, and priority. + test.call("call_remote", &[&receiver_identity, "hello from caller", "42"]) .expect("call_remote failed"); - // Verify that the receiver's ping_log has the expected message row. + // Verify that the receiver's ping_log has the expected row. let result = test .spacetime(&[ "sql", "--server", &test.server_url, &receiver_identity, - "SELECT message FROM ping_log", + "SELECT message, priority FROM ping_log", ]) .expect("sql query failed"); @@ -94,4 +101,8 @@ fn test_cross_db_reducer_call() { result.contains("hello from caller"), "Expected ping_log to contain 'hello from caller' after cross-DB call, got:\n{result}" ); + assert!( + result.contains("42"), + "Expected ping_log to contain priority 42 after cross-DB call, got:\n{result}" + ); } diff --git a/crates/standalone/src/lib.rs b/crates/standalone/src/lib.rs index 3b97c6d46fd..6772703c379 100644 --- a/crates/standalone/src/lib.rs +++ b/crates/standalone/src/lib.rs @@ -22,7 +22,7 @@ use spacetimedb::messages::control_db::{Database, Node, Replica}; use spacetimedb::subscription::row_list_builder_pool::BsatnRowListBuilderPool; use spacetimedb::util::jobs::JobCores; use spacetimedb::worker_metrics::WORKER_METRICS; -use spacetimedb_client_api::auth::{self, LOCALHOST}; +use spacetimedb_client_api::auth::{self, JwtAuthProvider, LOCALHOST}; use spacetimedb_client_api::routes::subscribe::{HasWebSocketOptions, WebSocketOptions}; use spacetimedb_client_api::{ControlStateReadAccess, DatabaseResetDef, Host, NodeDelegate}; use spacetimedb_client_api_messages::name::{ From 850ca801b3f025325b7eaa6664e2d5ef42c20778 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Sat, 28 Mar 2026 17:23:52 +0530 Subject: [PATCH 5/9] fmt --- crates/bindings/src/remote_reducer.rs | 5 ++++- crates/core/src/host/host_controller.rs | 3 ++- crates/core/src/host/reducer_router.rs | 4 +++- crates/core/src/host/wasmtime/wasm_instance_env.rs | 4 +++- crates/smoketests/tests/smoketests/mod.rs | 2 +- crates/standalone/src/lib.rs | 2 +- 6 files changed, 14 insertions(+), 6 deletions(-) diff --git a/crates/bindings/src/remote_reducer.rs b/crates/bindings/src/remote_reducer.rs index e0b68924458..e004f495aaa 100644 --- a/crates/bindings/src/remote_reducer.rs +++ b/crates/bindings/src/remote_reducer.rs @@ -22,7 +22,10 @@ //! } //! ``` -use crate::{rt::{read_bytes_source_as, read_bytes_source_into}, IterBuf, Identity}; +use crate::{ + rt::{read_bytes_source_as, read_bytes_source_into}, + Identity, IterBuf, +}; /// Call a reducer on a remote database. /// diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index 5d8123ee427..dd774111b86 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -9,11 +9,11 @@ use crate::db::persistence::PersistenceProvider; use crate::db::relational_db::{self, spawn_view_cleanup_loop, DiskSizeFn, RelationalDB, Txdata}; use crate::db::{self, spawn_tx_metrics_recorder}; use crate::energy::{EnergyMonitor, EnergyQuanta, NullEnergyMonitor}; +use crate::host::reducer_router::{LocalReducerRouter, ReducerCallRouter}; use crate::host::v8::V8Runtime; use crate::host::ProcedureCallError; use crate::messages::control_db::{Database, HostType}; use crate::module_host_context::ModuleCreationContext; -use crate::host::reducer_router::{LocalReducerRouter, ReducerCallRouter}; use crate::replica_context::{CallReducerOnDbConfig, ReplicaContext}; use crate::subscription::module_subscription_actor::ModuleSubscriptions; use crate::subscription::module_subscription_manager::{spawn_send_worker, SubscriptionManager, TransactionOffset}; @@ -682,6 +682,7 @@ fn stored_program_hash(db: &RelationalDB) -> anyhow::Result> { Ok(meta.map(|meta| meta.program_hash)) } +#[allow(clippy::too_many_arguments)] async fn make_replica_ctx( module_logs: Option, database: Database, diff --git a/crates/core/src/host/reducer_router.rs b/crates/core/src/host/reducer_router.rs index 93891ecc6e6..dcbf20c51c8 100644 --- a/crates/core/src/host/reducer_router.rs +++ b/crates/core/src/host/reducer_router.rs @@ -49,7 +49,9 @@ pub struct LocalReducerRouter { impl LocalReducerRouter { pub fn new(base_url: impl Into) -> Self { - Self { base_url: base_url.into() } + Self { + base_url: base_url.into(), + } } } diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index 87182c30be2..7ed23331d7e 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -1996,7 +1996,9 @@ impl WasmInstanceEnv { // Spawn a new OS thread and call Handle::block_on from there, which is // designed to be called from synchronous (non-async) contexts. let handle = tokio::runtime::Handle::current(); - let fut = env.instance_env.call_reducer_on_db(database_identity, &reducer_name, args); + let fut = env + .instance_env + .call_reducer_on_db(database_identity, &reducer_name, args); let result = std::thread::scope(|s| { s.spawn(|| handle.block_on(fut)) .join() diff --git a/crates/smoketests/tests/smoketests/mod.rs b/crates/smoketests/tests/smoketests/mod.rs index d046c795d88..18ad7b51199 100644 --- a/crates/smoketests/tests/smoketests/mod.rs +++ b/crates/smoketests/tests/smoketests/mod.rs @@ -4,12 +4,12 @@ mod auto_inc; mod auto_migration; mod call; mod change_host_type; -mod cross_db_reducer; mod cli; mod client_connection_errors; mod confirmed_reads; mod connect_disconnect_from_cli; mod create_project; +mod cross_db_reducer; mod csharp_module; mod default_module_clippy; mod delete_database; diff --git a/crates/standalone/src/lib.rs b/crates/standalone/src/lib.rs index 6772703c379..3b97c6d46fd 100644 --- a/crates/standalone/src/lib.rs +++ b/crates/standalone/src/lib.rs @@ -22,7 +22,7 @@ use spacetimedb::messages::control_db::{Database, Node, Replica}; use spacetimedb::subscription::row_list_builder_pool::BsatnRowListBuilderPool; use spacetimedb::util::jobs::JobCores; use spacetimedb::worker_metrics::WORKER_METRICS; -use spacetimedb_client_api::auth::{self, JwtAuthProvider, LOCALHOST}; +use spacetimedb_client_api::auth::{self, LOCALHOST}; use spacetimedb_client_api::routes::subscribe::{HasWebSocketOptions, WebSocketOptions}; use spacetimedb_client_api::{ControlStateReadAccess, DatabaseResetDef, Host, NodeDelegate}; use spacetimedb_client_api_messages::name::{ From 9360096e638518610473e59e85f21e0759271402 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Sat, 28 Mar 2026 17:43:10 +0530 Subject: [PATCH 6/9] betetr error handling --- crates/bindings/src/remote_reducer.rs | 66 ++++++++++++++----- .../tests/smoketests/cross_db_reducer.rs | 10 +-- 2 files changed, 54 insertions(+), 22 deletions(-) diff --git a/crates/bindings/src/remote_reducer.rs b/crates/bindings/src/remote_reducer.rs index e004f495aaa..6bfdf75414e 100644 --- a/crates/bindings/src/remote_reducer.rs +++ b/crates/bindings/src/remote_reducer.rs @@ -1,10 +1,9 @@ -//! Naive binding for calling reducers on remote SpacetimeDB databases. +//! Binding for calling reducers on remote SpacetimeDB databases. //! //! Call a reducer on another database using [`call_reducer_on_db`]. //! -//! The args must be BSATN-encoded. The response body is raw bytes returned by -//! the remote database's HTTP handler. An HTTP status >= 400 does not cause an -//! `Err` return; only a transport failure (connection refused, timeout, …) does. +//! The args must be BSATN-encoded. Returns `Ok(())` when the remote reducer +//! ran and succeeded, or one of the [`RemoteCallError`] variants on failure. //! //! # Example //! @@ -16,46 +15,79 @@ //! // Empty BSATN args for a zero-argument reducer. //! let args = spacetimedb::bsatn::to_vec(&()).unwrap(); //! match remote_reducer::call_reducer_on_db(target, "my_reducer", &args) { -//! Ok((status, body)) => log::info!("status={status} body={body:?}"), -//! Err(msg) => log::error!("transport error: {msg}"), +//! Ok(()) => log::info!("remote reducer succeeded"), +//! Err(remote_reducer::RemoteCallError::Failed(msg)) => log::error!("reducer failed: {msg}"), +//! Err(remote_reducer::RemoteCallError::NotFound(msg)) => log::error!("not found: {msg}"), +//! Err(remote_reducer::RemoteCallError::Unreachable(msg)) => log::error!("unreachable: {msg}"), //! } //! } //! ``` use crate::{ - rt::{read_bytes_source_as, read_bytes_source_into}, + rt::read_bytes_source_into, Identity, IterBuf, }; +/// Error returned by [`call_reducer_on_db`]. +#[derive(Debug)] +pub enum RemoteCallError { + /// The remote reducer ran but returned an error. Contains the error message from the server. + Failed(String), + /// The target database or reducer does not exist (HTTP 404). + NotFound(String), + /// The call could not be delivered (connection refused, timeout, network error, etc.). + Unreachable(String), +} + +impl core::fmt::Display for RemoteCallError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + RemoteCallError::Failed(msg) => write!(f, "remote reducer failed: {msg}"), + RemoteCallError::NotFound(msg) => write!(f, "remote database or reducer not found: {msg}"), + RemoteCallError::Unreachable(msg) => write!(f, "remote database unreachable: {msg}"), + } + } +} + /// Call a reducer on a remote database. /// /// - `database_identity`: the target database. /// - `reducer_name`: the name of the reducer to invoke (must be valid UTF-8). /// - `args`: BSATN-encoded reducer arguments. /// -/// Returns `Ok((status, body))` on any transport success (including HTTP errors like 400/500). -/// Returns `Err(message)` on transport failure (connection refused, timeout, …). +/// Returns `Ok(())` when the remote reducer ran and succeeded. +/// Returns `Err(RemoteCallError::Failed(msg))` when the reducer ran but returned an error. +/// Returns `Err(RemoteCallError::NotFound(msg))` when the database or reducer does not exist. +/// Returns `Err(RemoteCallError::Unreachable(msg))` on transport failure (connection refused, timeout, …). pub fn call_reducer_on_db( database_identity: Identity, reducer_name: &str, args: &[u8], -) -> Result<(u16, Vec), String> { +) -> Result<(), RemoteCallError> { let identity_bytes = database_identity.to_byte_array(); match spacetimedb_bindings_sys::call_reducer_on_db(identity_bytes, reducer_name, args) { Ok((status, body_source)) => { - // INVALID signals an empty body (host optimization to avoid allocation). - let body = if body_source == spacetimedb_bindings_sys::raw::BytesSource::INVALID { - Vec::new() + if status < 300 { + return Ok(()); + } + // Decode the response body as the error message. + let msg = if body_source == spacetimedb_bindings_sys::raw::BytesSource::INVALID { + String::new() } else { let mut buf = IterBuf::take(); read_bytes_source_into(body_source, &mut buf); - buf.to_vec() + String::from_utf8_lossy(&buf).into_owned() }; - Ok((status, body)) + if status == 404 { + Err(RemoteCallError::NotFound(msg)) + } else { + Err(RemoteCallError::Failed(msg)) + } } Err(err_source) => { - let message = read_bytes_source_as::(err_source); - Err(message) + use crate::rt::read_bytes_source_as; + let msg = read_bytes_source_as::(err_source); + Err(RemoteCallError::Unreachable(msg)) } } } diff --git a/crates/smoketests/tests/smoketests/cross_db_reducer.rs b/crates/smoketests/tests/smoketests/cross_db_reducer.rs index f4d31364333..22d4a94b9f1 100644 --- a/crates/smoketests/tests/smoketests/cross_db_reducer.rs +++ b/crates/smoketests/tests/smoketests/cross_db_reducer.rs @@ -42,12 +42,12 @@ pub fn call_remote(ctx: &ReducerContext, target_hex: String, message: String, pr let payload = PingPayload { message, priority }; let args = spacetimedb::spacetimedb_lib::bsatn::to_vec(&(payload,)).expect("failed to encode args"); match spacetimedb::remote_reducer::call_reducer_on_db(target, "record_ping", &args) { - Ok((status, _body)) => { - log::info!("call_remote: got HTTP status {}", status); + Ok(()) => { + log::info!("call_remote: remote reducer succeeded"); } - Err(err) => { - log::error!("call_remote: transport failure: {}", err); - panic!("call_reducer_on_db transport failure: {err}"); + Err(e) => { + log::error!("call_remote: {}", e); + panic!("call_reducer_on_db error: {e}"); } } } From faccc62ae9f0ded3bda1bf47a6e65a9b2b70e7ab Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Sat, 28 Mar 2026 18:16:34 +0530 Subject: [PATCH 7/9] fmt --- crates/bindings/src/remote_reducer.rs | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/crates/bindings/src/remote_reducer.rs b/crates/bindings/src/remote_reducer.rs index 6bfdf75414e..bded8bc5ae7 100644 --- a/crates/bindings/src/remote_reducer.rs +++ b/crates/bindings/src/remote_reducer.rs @@ -23,10 +23,7 @@ //! } //! ``` -use crate::{ - rt::read_bytes_source_into, - Identity, IterBuf, -}; +use crate::{rt::read_bytes_source_into, Identity, IterBuf}; /// Error returned by [`call_reducer_on_db`]. #[derive(Debug)] @@ -59,11 +56,7 @@ impl core::fmt::Display for RemoteCallError { /// Returns `Err(RemoteCallError::Failed(msg))` when the reducer ran but returned an error. /// Returns `Err(RemoteCallError::NotFound(msg))` when the database or reducer does not exist. /// Returns `Err(RemoteCallError::Unreachable(msg))` on transport failure (connection refused, timeout, …). -pub fn call_reducer_on_db( - database_identity: Identity, - reducer_name: &str, - args: &[u8], -) -> Result<(), RemoteCallError> { +pub fn call_reducer_on_db(database_identity: Identity, reducer_name: &str, args: &[u8]) -> Result<(), RemoteCallError> { let identity_bytes = database_identity.to_byte_array(); match spacetimedb_bindings_sys::call_reducer_on_db(identity_bytes, reducer_name, args) { Ok((status, body_source)) => { From 22eeae0e5a1d047586af3385f0ba3c5bf6eda07d Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Sat, 28 Mar 2026 19:04:11 +0530 Subject: [PATCH 8/9] docs --- crates/bindings-sys/src/lib.rs | 4 ++-- crates/core/src/host/wasmtime/wasm_instance_env.rs | 2 +- crates/core/src/replica_context.rs | 10 +++++----- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/crates/bindings-sys/src/lib.rs b/crates/bindings-sys/src/lib.rs index 1bad2a8abff..8854ae393b6 100644 --- a/crates/bindings-sys/src/lib.rs +++ b/crates/bindings-sys/src/lib.rs @@ -867,7 +867,7 @@ pub mod raw { #[link(wasm_import_module = "spacetime_10.5")] unsafe extern "C" { - /// Call a reducer on another SpacetimeDB database via the local reverse proxy at `localhost:80`. + /// Call a reducer on another SpacetimeDB database. /// /// - `identity_ptr` must point to exactly 32 bytes — the BSATN (little-endian) encoding of /// the target database `Identity`. @@ -879,7 +879,7 @@ pub mod raw { /// - Writes a [`BytesSource`] containing the response body bytes to `*out`. /// /// On transport failure (connection refused, timeout, etc.): - /// - Returns [`errno::HTTP_ERROR`] (21). + /// - Returns `errno::HTTP_ERROR` (21). /// - Writes a [`BytesSource`] containing a BSATN-encoded error [`String`] to `*out`. /// /// Unlike `procedure_http_request`, this syscall may be called while a transaction diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index 7ed23331d7e..f24b8a4f3dc 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -1944,7 +1944,7 @@ impl WasmInstanceEnv { }) } - /// Call a reducer on another SpacetimeDB database via the local reverse proxy at `localhost:80`. + /// Call a reducer on another SpacetimeDB database. /// /// - `identity_ptr` must point to exactly 32 bytes — the BSATN (little-endian) encoding of the /// target [`Identity`]. diff --git a/crates/core/src/replica_context.rs b/crates/core/src/replica_context.rs index 2ecec56273e..8c9f8804f24 100644 --- a/crates/core/src/replica_context.rs +++ b/crates/core/src/replica_context.rs @@ -56,13 +56,13 @@ pub struct ReplicaContext { /// - Standalone: always returns the local node URL ([`crate::host::reducer_router::LocalReducerRouter`]). /// - Cluster: queries the control DB to find the leader replica's node. pub call_reducer_router: Arc, - /// Pre-signed `Authorization: Bearer ` value for outgoing cross-DB reducer calls. + /// `Authorization: Bearer ` value for outgoing cross-DB reducer calls. /// - /// Set at replica launch time by the deployment layer (standalone / cluster) using the - /// local JWT signing key. The token identifies this database as the caller, so the target - /// reducer sees a stable, verifiable identity instead of an anonymous ephemeral one. + /// A single node-level token set once at startup and shared by all replicas on this node. + /// Passed as a Bearer token so `anon_auth_middleware` on the target node accepts the request + /// without generating a fresh ephemeral identity per call. /// - /// `None` in contexts where no JWT signer is configured (e.g. unit tests). + /// `None` in contexts where no auth token is configured (e.g. unit tests). pub call_reducer_auth_token: Option, } From a58a4e30dd7fbfa55864541d440e353005d6183b Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Sat, 28 Mar 2026 19:52:30 +0530 Subject: [PATCH 9/9] metrics --- crates/core/src/host/instance_env.rs | 35 ++++++++++++++++++--------- crates/core/src/worker_metrics/mod.rs | 10 ++++++++ 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index 2684e60a85a..1fdc651414e 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -8,6 +8,7 @@ use crate::replica_context::ReplicaContext; use crate::subscription::module_subscription_actor::{commit_and_broadcast_event, ModuleSubscriptions}; use crate::subscription::module_subscription_manager::{from_tx_offset, TransactionOffset}; use crate::util::prometheus_handle::IntGaugeExt; +use crate::worker_metrics::WORKER_METRICS; use chrono::{DateTime, Utc}; use core::mem; use futures::TryFutureExt; @@ -978,13 +979,13 @@ impl InstanceEnv { }) } - /// Call a reducer on a remote database via the local reverse proxy (`localhost:80`). + /// Call a reducer on a remote database. /// /// Unlike [`Self::http_request`], this is explicitly allowed while a transaction is open — /// the caller is responsible for understanding the consistency implications. /// /// Uses [`ReplicaContext::call_reducer_router`] to resolve the leader node for - /// `database_identity`, then sends the request via the warmed HTTP/2 client in + /// `database_identity`, then sends the request via the warmed HTTP client in /// [`ReplicaContext::call_reducer_client`]. /// /// Returns `(http_status, response_body)` on transport success, @@ -1002,8 +1003,11 @@ impl InstanceEnv { // on this node. Passed as a Bearer token so `anon_auth_middleware` on the target node // accepts the request without generating a fresh ephemeral identity per call. let auth_token = self.replica_ctx.call_reducer_auth_token.clone(); + let caller_identity = self.replica_ctx.database.database_identity; async move { + let start = Instant::now(); + let base_url = router .resolve_base_url(database_identity) .await @@ -1021,15 +1025,24 @@ impl InstanceEnv { if let Some(token) = auth_token { req = req.header(http::header::AUTHORIZATION, format!("Bearer {token}")); } - let response = req.send().await.map_err(|e| NodesError::HttpError(e.to_string()))?; - - let status = response.status().as_u16(); - let body = response - .bytes() - .await - .map_err(|e| NodesError::HttpError(e.to_string()))?; - - Ok((status, body)) + let result = async { + let response = req.send().await.map_err(|e| NodesError::HttpError(e.to_string()))?; + let status = response.status().as_u16(); + let body = response.bytes().await.map_err(|e| NodesError::HttpError(e.to_string()))?; + Ok((status, body)) + } + .await; + + WORKER_METRICS + .cross_db_reducer_calls_total + .with_label_values(&caller_identity) + .inc(); + WORKER_METRICS + .cross_db_reducer_duration_seconds + .with_label_values(&caller_identity) + .observe(start.elapsed().as_secs_f64()); + + result } } } diff --git a/crates/core/src/worker_metrics/mod.rs b/crates/core/src/worker_metrics/mod.rs index 6421a95d7cb..5b84e230045 100644 --- a/crates/core/src/worker_metrics/mod.rs +++ b/crates/core/src/worker_metrics/mod.rs @@ -43,6 +43,16 @@ metrics_group!( #[labels(database_identity: Identity, protocol: str)] pub websocket_request_msg_size: HistogramVec, + #[name = spacetime_cross_db_reducer_calls_total] + #[help = "Total number of cross-database reducer calls made by this database."] + #[labels(caller_identity: Identity)] + pub cross_db_reducer_calls_total: IntCounterVec, + + #[name = spacetime_cross_db_reducer_duration_seconds] + #[help = "Duration of cross-database reducer calls in seconds."] + #[labels(caller_identity: Identity)] + pub cross_db_reducer_duration_seconds: HistogramVec, + #[name = jemalloc_active_bytes] #[help = "Number of bytes in jemallocs heap"] #[labels(node_id: str)]