Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
d84b7a7
Add first-pass TPC-C module and Rust benchmark runner
joshua-spacetime Mar 26, 2026
1a74fe9
Start rewriting tpcc module to do IDC for cross-warehouse transactions
gefjon Mar 26, 2026
d35b491
Finish rewriting `new_order`, and commentary on non-conformance
gefjon Mar 27, 2026
9d3086c
Reorganize module, make tests work
gefjon Mar 27, 2026
8203605
abi
Shubham8287 Mar 27, 2026
f718db5
Update payment to connect to a remote database when necessary
gefjon Mar 27, 2026
d7776c7
Include content-type header
gefjon Mar 27, 2026
10f21f2
Use BSATN for remote calls instead of JSON
gefjon Mar 27, 2026
dd76795
Break the driver by making the runner support multiple DBs
gefjon Mar 27, 2026
1836f95
update tpcc driver for multiple databases
joshua-spacetime Mar 28, 2026
24b4e62
unreviewed code
Shubham8287 Mar 28, 2026
279d86f
run smoketests
Shubham8287 Mar 28, 2026
cd3e25b
token
Shubham8287 Mar 28, 2026
850ca80
fmt
Shubham8287 Mar 28, 2026
9360096
betetr error handling
Shubham8287 Mar 28, 2026
faccc62
fmt
Shubham8287 Mar 28, 2026
22eeae0
docs
Shubham8287 Mar 28, 2026
a58a4e3
metrics
Shubham8287 Mar 28, 2026
33e8ec0
Extend timeout on remote calls
gefjon Mar 28, 2026
311c1f2
Add a whole bunch of logging on start and end of operations
gefjon Mar 28, 2026
c089c3a
Hopefully fix a spurious timeout bug
gefjon Mar 28, 2026
07a71f0
Add parallel database loading
joshua-spacetime Mar 28, 2026
91ac6aa
fix delivery; auto-inc its scheduled_id
joshua-spacetime Mar 28, 2026
4781c22
Vibecode: reducer return values.
gefjon Mar 28, 2026
c59d370
`cargo fmt`
gefjon Mar 28, 2026
94ba2e9
insta
gefjon Mar 28, 2026
232b795
coordinator handles warehouse assignment for distributed runs
joshua-spacetime Mar 28, 2026
8ae04d0
Debug for timeout error
jdetter Mar 28, 2026
84737a6
`log_stopwatch` when loading
gefjon Mar 28, 2026
79b8c1a
Revert "`log_stopwatch` when loading"
gefjon Mar 28, 2026
a0eef62
`log_stopwatch` when loading
gefjon Mar 28, 2026
a017111
Merge branch 'phoebe/tpcc-distributed-naive-http-requests' of github.…
gefjon Mar 28, 2026
090c070
Vibecode: pipeline loading of TPC-C seed data
gefjon Mar 28, 2026
7d84ee3
Merge branch 'phoebe/tpcc-distributed-naive-http-requests' into phoeb…
gefjon Mar 28, 2026
c7550a1
Merge remote-tracking branch 'origin/shub/sync-reducer-call' into pho…
gefjon Mar 28, 2026
474ce45
server-side datagen
joshua-spacetime Mar 28, 2026
398cb8a
Add timers around server-side loaders
joshua-spacetime Mar 28, 2026
ed4c6e8
Add instructions for resuming a failed load
joshua-spacetime Mar 29, 2026
3050d7d
Revert default batch size for server side loader
joshua-spacetime Mar 29, 2026
e44225f
Rework for reducers & remote reducer calls
gefjon Mar 29, 2026
3219020
Merge remote-tracking branch 'origin/phoebe/tpcc-distributed-naive-ht…
gefjon Mar 29, 2026
1dd621d
Add timing, rename some spans
gefjon Mar 29, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
31 changes: 31 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ members = [
"modules/sdk-test-view",
"modules/sdk-test-view-pk",
"modules/sdk-test-event-table",
"modules/tpcc",
"sdks/rust/tests/test-client",
"sdks/rust/tests/test-counter",
"sdks/rust/tests/connect_disconnect_client",
Expand All @@ -62,6 +63,7 @@ members = [
"tools/upgrade-version",
"tools/license-check",
"tools/replace-spacetimedb",
"tools/tpcc-runner",
"tools/generate-client-api",
"tools/gen-bindings",
"tools/xtask-llm-benchmark",
Expand Down
18 changes: 11 additions & 7 deletions crates/bindings-macro/src/reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,11 @@ pub(crate) fn reducer_impl(args: ReducerArgs, original_function: &ItemFn) -> syn
let first_arg_ty = arg_tys.first().into_iter();
let rest_arg_tys = arg_tys.iter().skip(1);

// Extract the return type.
let ret_ty = match &original_function.sig.output {
syn::ReturnType::Default => None,
syn::ReturnType::Type(_, t) => Some(&**t),
}
.into_iter();
// Extract the return type (defaulting to `()`).
let ret_ty_for_info: syn::Type = match &original_function.sig.output {
syn::ReturnType::Default => syn::parse_quote!(()),
syn::ReturnType::Type(_, t) => (**t).clone(),
};

let register_describer_symbol = format!("__preinit__20_register_describer_{}", reducer_name.value());

Expand All @@ -151,7 +150,7 @@ pub(crate) fn reducer_impl(args: ReducerArgs, original_function: &ItemFn) -> syn
fn _assert_args #lt_params () #lt_where_clause {
#(let _ = <#first_arg_ty as spacetimedb::rt::ReducerContextArg>::_ITEM;)*
#(let _ = <#rest_arg_tys as spacetimedb::rt::ReducerArg>::_ITEM;)*
#(let _ = <#ret_ty as spacetimedb::rt::IntoReducerResult>::into_result;)*
let _ = <#ret_ty_for_info as spacetimedb::rt::IntoReducerResult>::into_result;
}
};
impl #func_name {
Expand All @@ -168,6 +167,11 @@ pub(crate) fn reducer_impl(args: ReducerArgs, original_function: &ItemFn) -> syn
#(const LIFECYCLE: Option<spacetimedb::rt::LifecycleReducer> = Some(#lifecycle);)*
const ARG_NAMES: &'static [Option<&'static str>] = &[#(#opt_arg_names),*];
const INVOKE: Self::Invoke = #func_name::invoke;
fn return_type(
ts: &mut impl spacetimedb::sats::typespace::TypespaceBuilder,
) -> Option<spacetimedb::sats::AlgebraicType> {
Some(<#ret_ty_for_info as spacetimedb::rt::IntoReducerResult>::ok_return_type(ts))
}
}

#generate_explicit_names
Expand Down
72 changes: 72 additions & 0 deletions crates/bindings-sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,39 @@ pub mod raw {
) -> u16;
}

#[link(wasm_import_module = "spacetime_10.5")]
unsafe extern "C" {
/// Call a reducer on another SpacetimeDB database.
///
/// - `identity_ptr` must point to exactly 32 bytes — the BSATN (little-endian) encoding of
/// the target database `Identity`.
/// - `reducer_ptr[..reducer_len]` is the UTF-8 name of the reducer to call.
/// - `args_ptr[..args_len]` is the BSATN-encoded reducer arguments.
///
/// On transport success (any HTTP response received):
/// - Returns the HTTP status code (e.g. 200, 400, 530).
/// - Writes a [`BytesSource`] containing the response body bytes to `*out`.
///
/// On transport failure (connection refused, timeout, etc.):
/// - Returns `errno::HTTP_ERROR` (21).
/// - Writes a [`BytesSource`] containing a BSATN-encoded error [`String`] to `*out`.
///
/// Unlike `procedure_http_request`, this syscall may be called while a transaction
/// is open (i.e. from within a reducer body).
///
/// # Traps
///
/// Traps if any pointer is NULL or its range falls outside of linear memory.
pub fn call_reducer_on_db(
identity_ptr: *const u8, // exactly 32 bytes, BSATN-encoded Identity
reducer_ptr: *const u8,
reducer_len: u32,
args_ptr: *const u8,
args_len: u32,
out: *mut BytesSource,
) -> u16;
}

/// What strategy does the database index use?
///
/// See also: <https://www.postgresql.org/docs/current/sql-createindex.html>
Expand Down Expand Up @@ -1438,6 +1471,45 @@ pub fn identity() -> [u8; 32] {
buf
}

/// Call a reducer on a remote database identified by `identity` (little-endian 32-byte array).
///
/// On transport success (any HTTP response received):
/// - Returns `Ok((status, body_source))` where `status` is the HTTP status code and
/// `body_source` is a [`raw::BytesSource`] containing the raw response body bytes.
///
/// On transport failure (connection refused, timeout, etc.):
/// - Returns `Err(err_source)` where `err_source` is a [`raw::BytesSource`] containing
/// a BSATN-encoded error [`String`].
///
/// Unlike HTTP requests, this syscall may be called while a transaction is open.
#[inline]
pub fn call_reducer_on_db(
identity: [u8; 32],
reducer_name: &str,
args: &[u8],
) -> Result<(u16, raw::BytesSource), raw::BytesSource> {
let mut out = raw::BytesSource::INVALID;
let status = unsafe {
raw::call_reducer_on_db(
identity.as_ptr(),
reducer_name.as_ptr(),
reducer_name.len() as u32,
args.as_ptr(),
args.len() as u32,
&mut out,
)
};
// The raw ABI returns either the HTTP status code (100-599) or HTTP_ERROR errno
// on transport failure. Unlike other ABI functions, a non-zero return value here
// does NOT indicate a generic errno — it's the HTTP status code. Only HTTP_ERROR
// specifically signals a transport-level failure.
if status == Errno::HTTP_ERROR.code() {
Err(out)
} else {
Ok((status, out))
}
}

/// Finds the JWT payload associated with `connection_id`.
/// If nothing is found for the connection, this returns None.
/// If a payload is found, this will return a valid [`raw::BytesSource`].
Expand Down
3 changes: 2 additions & 1 deletion crates/bindings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod client_visibility_filter;
pub mod http;
pub mod log_stopwatch;
mod logger;
pub mod remote_reducer;
#[cfg(feature = "rand08")]
mod rng;
#[doc(hidden)]
Expand Down Expand Up @@ -56,7 +57,7 @@ pub use table::{
UniqueColumn, UniqueColumnReadOnly, UniqueConstraintViolation,
};

pub type ReducerResult = core::result::Result<(), Box<str>>;
pub type ReducerResult = core::result::Result<Option<Vec<u8>>, Box<str>>;

pub type ProcedureResult = Vec<u8>;

Expand Down
92 changes: 92 additions & 0 deletions crates/bindings/src/remote_reducer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
//! Binding for calling reducers on remote SpacetimeDB databases.
//!
//! Call a reducer on another database using [`call_reducer_on_db`].
//!
//! The args must be BSATN-encoded. Returns `Ok(())` when the remote reducer
//! ran and succeeded, or one of the [`RemoteCallError`] variants on failure.
//!
//! # Example
//!
//! ```no_run
//! use spacetimedb::{remote_reducer, Identity};
//!
//! #[spacetimedb::reducer]
//! fn call_remote(ctx: &spacetimedb::ReducerContext, target: Identity) {
//! // Empty BSATN args for a zero-argument reducer.
//! let args = spacetimedb::bsatn::to_vec(&()).unwrap();
//! match remote_reducer::call_reducer_on_db(target, "my_reducer", &args) {
//! Ok(()) => log::info!("remote reducer succeeded"),
//! Err(remote_reducer::RemoteCallError::Failed(msg)) => log::error!("reducer failed: {msg}"),
//! Err(remote_reducer::RemoteCallError::NotFound(msg)) => log::error!("not found: {msg}"),
//! Err(remote_reducer::RemoteCallError::Unreachable(msg)) => log::error!("unreachable: {msg}"),
//! }
//! }
//! ```

use crate::{rt::read_bytes_source_into, Identity, IterBuf};

/// Error returned by [`call_reducer_on_db`].
#[derive(Debug)]
pub enum RemoteCallError {
/// The remote reducer ran but returned an error. Contains the error message from the server.
Failed(String),
/// The target database or reducer does not exist (HTTP 404).
NotFound(String),
/// The call could not be delivered (connection refused, timeout, network error, etc.).
Unreachable(String),
}

impl core::fmt::Display for RemoteCallError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
RemoteCallError::Failed(msg) => write!(f, "remote reducer failed: {msg}"),
RemoteCallError::NotFound(msg) => write!(f, "remote database or reducer not found: {msg}"),
RemoteCallError::Unreachable(msg) => write!(f, "remote database unreachable: {msg}"),
}
}
}

/// Call a reducer on a remote database.
///
/// - `database_identity`: the target database.
/// - `reducer_name`: the name of the reducer to invoke (must be valid UTF-8).
/// - `args`: BSATN-encoded reducer arguments.
///
/// Returns `Ok(bytes)` when the remote reducer ran and succeeded, with `bytes` being the reducer's output.
/// Returns `Err(RemoteCallError::Failed(msg))` when the reducer ran but returned an error.
/// Returns `Err(RemoteCallError::NotFound(msg))` when the database or reducer does not exist.
/// Returns `Err(RemoteCallError::Unreachable(msg))` on transport failure (connection refused, timeout, …).
pub fn call_reducer_on_db(
database_identity: Identity,
reducer_name: &str,
args: &[u8],
) -> Result<Vec<u8>, RemoteCallError> {
let identity_bytes = database_identity.to_byte_array();
match spacetimedb_bindings_sys::call_reducer_on_db(identity_bytes, reducer_name, args) {
Ok((status, body_source)) => {
if status < 300 {
let mut out = Vec::new();
read_bytes_source_into(body_source, &mut out);
return Ok(out);
}
// Decode the response body as the error message.
let msg = if body_source == spacetimedb_bindings_sys::raw::BytesSource::INVALID {
String::new()
} else {
let mut buf = IterBuf::take();
read_bytes_source_into(body_source, &mut buf);
String::from_utf8_lossy(&buf).into_owned()
};
if status == 404 {
Err(RemoteCallError::NotFound(msg))
} else {
Err(RemoteCallError::Failed(msg))
}
}
Err(err_source) => {
use crate::rt::read_bytes_source_as;
let msg = read_bytes_source_as::<String>(err_source);
Err(RemoteCallError::Unreachable(msg))
}
}
}
Loading
Loading