From b36c84d9fea4b9849b0eaa784ddffe1995afb6c3 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Tue, 24 Mar 2026 14:53:39 -0700 Subject: [PATCH 1/3] Replace JS worker's rendezvous channel with unbounded queue --- crates/core/src/host/v8/mod.rs | 254 ++++++++++++++++++++++---- crates/core/src/worker_metrics/mod.rs | 5 + 2 files changed, 224 insertions(+), 35 deletions(-) diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index 332c9c89dd3..e91cbb41776 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -54,7 +54,7 @@ use std::panic::AssertUnwindSafe; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, LazyLock}; use std::time::Instant; -use tokio::sync::{oneshot, Mutex as AsyncMutex}; +use tokio::sync::{oneshot, Mutex as AsyncMutex, Notify}; use tracing::Instrument; use v8::script_compiler::{compile_module, Source}; use v8::{ @@ -210,6 +210,12 @@ impl V8RuntimeInner { // Convert program to a string. let program: Arc = str::from_utf8(program_bytes)?.into(); + let database_identity = mcc.replica_ctx.database_identity; + let lane_queue = JsWorkerQueue::unbounded_with_metric( + WORKER_METRICS + .v8_instance_lane_queue_length + .with_label_values(&database_identity), + ); // Validate/create the module and spawn the first instance. let mcc = Either::Right(mcc); @@ -221,6 +227,7 @@ impl V8RuntimeInner { load_balance_guard.clone(), core_pinner.clone(), heap_policy, + lane_queue.clone(), ) .await?; let module = JsModule { @@ -229,6 +236,7 @@ impl V8RuntimeInner { load_balance_guard, core_pinner, heap_policy, + lane_queue, }; Ok(ModuleWithInstance::Js { module, init_inst }) @@ -242,6 +250,7 @@ pub struct JsModule { load_balance_guard: Arc, core_pinner: CorePinner, heap_policy: V8HeapPolicyConfig, + lane_queue: Arc, } impl JsModule { @@ -258,11 +267,38 @@ impl JsModule { } pub async fn create_instance(&self) -> JsInstance { + // Pooled instances are checked out to one caller at a time, so keep the + // private per-instance queue as a rendezvous channel. Unlike the shared + // instance lane, there is no replay path for buffered pooled requests + // across worker replacement. + let request_queue = JsWorkerQueue::bounded(0); + let program = self.program.clone(); + let common = self.common.clone(); + let load_balance_guard = self.load_balance_guard.clone(); + let core_pinner = self.core_pinner.clone(); + let heap_policy = self.heap_policy; + + // This has to be done in a blocking context because of `blocking_recv`. + let (_, instance) = spawn_instance_worker( + program, + Either::Left(common), + load_balance_guard, + core_pinner, + heap_policy, + request_queue, + ) + .await + .expect("`spawn_instance_worker` should succeed when passed `ModuleCommon`"); + instance + } + + async fn create_lane_instance(&self) -> JsInstance { let program = self.program.clone(); let common = self.common.clone(); let load_balance_guard = self.load_balance_guard.clone(); let core_pinner = self.core_pinner.clone(); let heap_policy = self.heap_policy; + let request_queue = self.lane_queue.clone(); // This has to be done in a blocking context because of `blocking_recv`. let (_, instance) = spawn_instance_worker( @@ -271,6 +307,7 @@ impl JsModule { load_balance_guard, core_pinner, heap_policy, + request_queue, ) .await .expect("`spawn_instance_worker` should succeed when passed `ModuleCommon`"); @@ -372,8 +409,8 @@ impl JsInstanceEnv { /// which the instance communicates with through channels. /// /// This handle is cloneable and shared by callers. Requests are queued FIFO -/// on the worker thread so the next reducer can start immediately after the -/// previous one finishes, without waiting for an outer task to hand the +/// on the backing worker queue so the next reducer can start immediately after +/// the previous one finishes, without waiting for an outer task to hand the /// instance back. /// /// When the last handle is dropped, the channels will hang up, @@ -387,8 +424,112 @@ pub struct JsInstance { /// it to tell whether the currently active worker has already been replaced /// after a trap or disconnect. id: u64, - request_tx: flume::Sender, - trapped: Arc, + request_queue: Arc, + worker_state: Arc, +} + +/// Shared request queue for a JS instance lane. +/// +/// Async callers enqueue [`JsWorkerRequest`] values here and wait on their +/// per-request one-shot replies. The dedicated JS worker thread drains this +/// queue and executes those requests on the isolate. +/// +/// Note, this queue outlives any single JS worker or V8 isolate. +/// When the active worker traps or is retired for heap growth, +/// [`JsInstanceLane::replace_active_if_current`] waits for it to fully exit +/// and spawns a fresh worker which then starts reading from the queue so that +/// buffered requests are not dropped. +#[derive(Clone)] +struct JsWorkerQueue { + tx: flume::Sender, + rx: flume::Receiver, + metric: Option, +} + +impl JsWorkerQueue { + fn bounded(capacity: usize) -> Arc { + let (tx, rx) = flume::bounded(capacity); + Arc::new(Self { tx, rx, metric: None }) + } + + fn unbounded_with_metric(metric: IntGauge) -> Arc { + let (tx, rx) = flume::unbounded(); + Arc::new(Self { + tx, + rx, + metric: Some(metric), + }) + } + + async fn send_async(&self, request: JsWorkerRequest) -> Result<(), flume::SendError> { + self.tx.send_async(request).await?; + if let Some(metric) = &self.metric { + metric.inc(); + } + Ok(()) + } + + fn receiver(&self) -> flume::Receiver { + self.rx.clone() + } +} + +impl Drop for JsWorkerQueue { + fn drop(&mut self) { + if let Some(metric) = &self.metric { + metric.sub(self.tx.len() as _); + } + } +} + +/// Lifecycle state for a single JS worker. +/// +/// The `trapped` bit tells the lane that this worker must not keep serving +/// future work. The `exited` bit and [`Notify`] are what make the [JsWorkerQueue] +/// remain single-consumer. Recovery waits on [`Self::wait_exited`] before spawning +/// the replacement worker, so there is no overlap where two workers can both drain +/// the queue. +struct JsWorkerState { + trapped: AtomicBool, + exited: AtomicBool, + exited_notify: Notify, +} + +impl JsWorkerState { + fn new() -> Arc { + Arc::new(Self { + trapped: AtomicBool::new(false), + exited: AtomicBool::new(false), + exited_notify: Notify::new(), + }) + } + + fn trapped(&self) -> bool { + self.trapped.load(Ordering::Relaxed) + } + + fn exited(&self) -> bool { + self.exited.load(Ordering::Relaxed) + } + + fn needs_recovery(&self) -> bool { + self.trapped() || self.exited() + } + + fn mark_trapped(&self) { + self.trapped.store(true, Ordering::Relaxed); + } + + fn mark_exited(&self) { + self.exited.store(true, Ordering::Relaxed); + self.exited_notify.notify_waiters(); + } + + async fn wait_exited(&self) { + while !self.exited() { + self.exited_notify.notified().await; + } + } } impl JsInstance { @@ -397,7 +538,15 @@ impl JsInstance { } pub fn trapped(&self) -> bool { - self.trapped.load(Ordering::Relaxed) + self.worker_state.trapped() + } + + fn needs_recovery(&self) -> bool { + self.worker_state.needs_recovery() + } + + async fn wait_exited(&self) { + self.worker_state.wait_exited().await; } async fn send_request( @@ -405,13 +554,13 @@ impl JsInstance { request: impl FnOnce(JsReplyTx) -> JsWorkerRequest, ) -> Result { let (reply_tx, reply_rx) = oneshot::channel(); - self.request_tx + self.request_queue .send_async(request(reply_tx)) .await .map_err(|_| WorkerDisconnected)?; let JsWorkerReply { value, trapped } = reply_rx.await.map_err(|_| WorkerDisconnected)?; if trapped { - self.trapped.store(true, Ordering::Relaxed); + self.worker_state.mark_trapped(); } Ok(value) } @@ -424,7 +573,7 @@ impl JsInstance { let span = tracing::Span::current(); let (tx, rx) = oneshot::channel(); - self.request_tx + self.request_queue .send_async(JsWorkerRequest::RunFunction(Box::new(move || { async move { let result = AssertUnwindSafe(f().instrument(span)).catch_unwind().await; @@ -764,7 +913,7 @@ impl JsInstanceLane { } } - async fn replace_active_if_current(&self, trapped: &JsInstance) { + async fn replace_active_if_current(&self, stale: &JsInstance) { // `replace_lock` intentionally serializes the rare recovery path. This // prevents a trap observed by many callers from spawning many replacement // workers and racing to install them. @@ -773,7 +922,17 @@ impl JsInstanceLane { // The same trapped instance can be observed by multiple callers at once. // We only want the first one to do the swap; everybody else should notice // that the active handle already changed and get out of the way. - if self.state.active.read().id() != trapped.id() { + if self.state.active.read().id() != stale.id() { + return; + } + + // Wait for the stale worker generation to finish exiting before we + // install a replacement. That keeps the shared queue single-consumer. + if stale.needs_recovery() { + stale.wait_exited().await; + } + + if self.state.active.read().id() != stale.id() { return; } @@ -782,7 +941,7 @@ impl JsInstanceLane { // Keep the awaited instance creation outside of any `parking_lot` guard. // The only lock held across this await is `replace_lock`, which is why it // has to be async. - let next = self.module.create_instance().await; + let next = self.module.create_lane_instance().await; *self.state.active.write() = next; } @@ -791,7 +950,7 @@ impl JsInstanceLane { /// If the worker disappears before replying, we replace it for future /// requests but surface the disconnect to the caller instead of retrying. /// This keeps instance-lane semantics closer to the old pooled-instance - /// model now that the worker queue is a rendezvous channel. + /// model while still preserving one-at-a-time execution on the worker. async fn run_once( &self, label: &'static str, @@ -799,7 +958,11 @@ impl JsInstanceLane { ) -> Result { assert_not_on_js_module_thread(label); - let active = self.active_instance(); + let mut active = self.active_instance(); + if active.needs_recovery() { + self.replace_active_if_current(&active).await; + active = self.active_instance(); + } let result = work(active.clone()).await; match result { Ok(value) => { @@ -828,7 +991,7 @@ impl JsInstanceLane { self.run_once("run_on_thread", async move |inst| { let (tx, rx) = oneshot::channel(); - inst.request_tx + inst.request_queue .send_async(JsWorkerRequest::RunFunction(Box::new(move || { async move { let _on_js_module_thread = EnteredJsModuleThread::new(); @@ -1037,21 +1200,22 @@ async fn spawn_instance_worker( load_balance_guard: Arc, mut core_pinner: CorePinner, heap_policy: V8HeapPolicyConfig, + request_queue: Arc, ) -> anyhow::Result<(ModuleCommon, JsInstance)> { - // Spawn a rendezvous queue for requests to the worker. - // Multiple callers can wait to hand work to the worker, but with - // `bounded(0)` there is no buffered backlog inside the channel itself. - // The worker still processes requests strictly one at a time. - let (request_tx, request_rx) = flume::bounded(0); - // This one-shot channel is used for initial startup error handling within the thread. let (result_tx, result_rx) = oneshot::channel(); - let trapped = Arc::new(AtomicBool::new(false)); - let worker_trapped = trapped.clone(); + let worker_state = JsWorkerState::new(); + let worker_state_in_thread = worker_state.clone(); let rt = tokio::runtime::Handle::current(); + let request_rx = request_queue.receiver(); + let worker_queue_metric = request_queue.metric.clone(); std::thread::spawn(move || { + scopeguard::defer! { + worker_state_in_thread.mark_exited(); + } + let _guard = load_balance_guard; core_pinner.pin_now(); @@ -1129,6 +1293,9 @@ async fn spawn_instance_worker( let mut requests_since_heap_check = 0u64; let mut last_heap_check_at = Instant::now(); for request in request_rx.iter() { + if let Some(metric) = &worker_queue_metric { + metric.dec(); + } let mut call_reducer = |tx, params| instance_common.call_reducer_with_tx(tx, params, &mut inst); let mut should_exit = false; @@ -1147,13 +1314,17 @@ async fn spawn_instance_worker( } JsWorkerRequest::CallReducer { reply_tx, params } => { let (res, trapped) = call_reducer(None, params); - worker_trapped.store(trapped, Ordering::Relaxed); + if trapped { + worker_state_in_thread.mark_trapped(); + } send_worker_reply("call_reducer", reply_tx, res, trapped); should_exit = trapped; } JsWorkerRequest::CallView { reply_tx, cmd } => { let (res, trapped) = instance_common.handle_cmd(cmd, &mut inst); - worker_trapped.store(trapped, Ordering::Relaxed); + if trapped { + worker_state_in_thread.mark_trapped(); + } send_worker_reply("call_view", reply_tx, res, trapped); should_exit = trapped; } @@ -1162,7 +1333,9 @@ async fn spawn_instance_worker( .call_procedure(params, &mut inst) .now_or_never() .expect("our call_procedure implementation is not actually async"); - worker_trapped.store(trapped, Ordering::Relaxed); + if trapped { + worker_state_in_thread.mark_trapped(); + } send_worker_reply("call_procedure", reply_tx, res, trapped); should_exit = trapped; } @@ -1178,7 +1351,9 @@ async fn spawn_instance_worker( let mut trapped = false; let res = call_identity_connected(caller_auth, caller_connection_id, info, call_reducer, &mut trapped); - worker_trapped.store(trapped, Ordering::Relaxed); + if trapped { + worker_state_in_thread.mark_trapped(); + } send_worker_reply("call_identity_connected", reply_tx, res, trapped); should_exit = trapped; } @@ -1195,21 +1370,27 @@ async fn spawn_instance_worker( call_reducer, &mut trapped, ); - worker_trapped.store(trapped, Ordering::Relaxed); + if trapped { + worker_state_in_thread.mark_trapped(); + } send_worker_reply("call_identity_disconnected", reply_tx, res, trapped); should_exit = trapped; } JsWorkerRequest::DisconnectClient { reply_tx, client_id } => { let mut trapped = false; let res = ModuleHost::disconnect_client_inner(client_id, info, call_reducer, &mut trapped); - worker_trapped.store(trapped, Ordering::Relaxed); + if trapped { + worker_state_in_thread.mark_trapped(); + } send_worker_reply("disconnect_client", reply_tx, res, trapped); should_exit = trapped; } JsWorkerRequest::InitDatabase { reply_tx, program } => { let (res, trapped): (Result, anyhow::Error>, bool) = init_database(replica_ctx, &module_common.info().module_def, program, call_reducer); - worker_trapped.store(trapped, Ordering::Relaxed); + if trapped { + worker_state_in_thread.mark_trapped(); + } send_worker_reply("init_database", reply_tx, res, trapped); should_exit = trapped; } @@ -1218,7 +1399,9 @@ async fn spawn_instance_worker( .call_scheduled_function(params, &mut inst) .now_or_never() .expect("our call_procedure implementation is not actually async"); - worker_trapped.store(trapped, Ordering::Relaxed); + if trapped { + worker_state_in_thread.mark_trapped(); + } send_worker_reply("call_scheduled_function", reply_tx, res, trapped); should_exit = trapped; } @@ -1236,7 +1419,7 @@ async fn spawn_instance_worker( requests_since_heap_check = 0; last_heap_check_at = Instant::now(); if let Some((used, limit)) = should_retire_worker_for_heap(inst.scope, &heap_metrics, heap_policy) { - worker_trapped.store(true, Ordering::Relaxed); + worker_state_in_thread.mark_trapped(); should_exit = true; log::warn!( "retiring JS worker after V8 heap stayed high post-GC: used={}MiB limit={}MiB", @@ -1250,7 +1433,8 @@ async fn spawn_instance_worker( // Once a JS instance traps, we must not let later queued work execute // on that poisoned isolate. We reply to the trapping request first so // the caller can observe the actual reducer/procedure error, and then - // shut the worker down so later callers retry on a fresh instance. + // shut the worker down so the shared queue can continue on a fresh + // instance. // // We also retire workers when they stay near the V8 heap limit after // a GC. The instance lane intentionally keeps a JS worker alive for @@ -1267,8 +1451,8 @@ async fn spawn_instance_worker( res.map(|opt_mc| { let inst = JsInstance { id: NEXT_JS_INSTANCE_ID.fetch_add(1, Ordering::Relaxed), - request_tx, - trapped, + request_queue, + worker_state, }; (opt_mc, inst) }) diff --git a/crates/core/src/worker_metrics/mod.rs b/crates/core/src/worker_metrics/mod.rs index faa283390f9..545bb96d62f 100644 --- a/crates/core/src/worker_metrics/mod.rs +++ b/crates/core/src/worker_metrics/mod.rs @@ -312,6 +312,11 @@ metrics_group!( #[labels(database_identity: Identity)] pub v8_heap_size_limit_bytes: IntGaugeVec, + #[name = spacetime_worker_v8_instance_lane_queue_length] + #[help = "The number of queued requests waiting for a database's JS instance lane worker"] + #[labels(database_identity: Identity)] + pub v8_instance_lane_queue_length: IntGaugeVec, + #[name = spacetime_worker_v8_external_memory_bytes] #[help = "The external memory tracked by V8 for a database's JS worker isolate"] #[labels(database_identity: Identity)] From 929decb15d91c2c1c1b65d587bc5f51a76a7115b Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Wed, 25 Mar 2026 00:30:46 -0700 Subject: [PATCH 2/3] test: no core pinning --- crates/standalone/src/main.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/crates/standalone/src/main.rs b/crates/standalone/src/main.rs index 06114d63b8d..ed48cd6352c 100644 --- a/crates/standalone/src/main.rs +++ b/crates/standalone/src/main.rs @@ -1,6 +1,5 @@ use clap::Command; -use spacetimedb::startup; use spacetimedb::util::jobs::JobCores; use tokio::runtime::Builder; @@ -68,15 +67,11 @@ fn main() -> anyhow::Result<()> { process::exit(1); })); - let cores = startup::pin_threads(); - // Create a multi-threaded run loop let mut builder = Builder::new_multi_thread(); builder.enable_all(); - cores.tokio.configure(&mut builder); let rt = builder.build().unwrap(); - cores.rayon.configure(rt.handle()); - let database_cores = cores.databases.make_database_runners(); + let database_cores = JobCores::without_pinned_cores(); // Keep a handle on the `database_cores` alive outside of `async_main` // and explicitly drop it to avoid dropping it from an `async` context - From 535d9c97809de27e7a931bea47b245ffaefd78d6 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Wed, 25 Mar 2026 11:14:26 -0700 Subject: [PATCH 3/3] pin cores but give tokio workers more --- crates/standalone/src/main.rs | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/crates/standalone/src/main.rs b/crates/standalone/src/main.rs index ed48cd6352c..81cf794f852 100644 --- a/crates/standalone/src/main.rs +++ b/crates/standalone/src/main.rs @@ -1,5 +1,6 @@ use clap::Command; +use spacetimedb::startup; use spacetimedb::util::jobs::JobCores; use tokio::runtime::Builder; @@ -59,6 +60,8 @@ static GLOBAL: Jemalloc = Jemalloc; pub static _rjem_malloc_conf: &[u8] = b"prof:true,prof_active:false,lg_prof_sample:19\0"; fn main() -> anyhow::Result<()> { + const RESERVED_DATABASE_CORES: usize = 2; + // take_hook() returns the default hook in case when a custom one is not set let orig_hook = panic::take_hook(); panic::set_hook(Box::new(move |panic_info| { @@ -70,8 +73,22 @@ fn main() -> anyhow::Result<()> { // Create a multi-threaded run loop let mut builder = Builder::new_multi_thread(); builder.enable_all(); + let database_cores = if let Some(core_ids) = startup::Cores::get_core_ids() { + let reserved_database_cores = RESERVED_DATABASE_CORES.min(core_ids.len()); + let database_fraction = reserved_database_cores as f64 / core_ids.len() as f64; + let cores = startup::pin_threads_with_reservations(startup::CoreReservations { + databases: database_fraction, + tokio_workers: 1.0, + rayon: 0.0, + irq: 0, + reserved: 0, + }); + cores.tokio.configure(&mut builder); + cores.databases.make_database_runners() + } else { + JobCores::without_pinned_cores() + }; let rt = builder.build().unwrap(); - let database_cores = JobCores::without_pinned_cores(); // Keep a handle on the `database_cores` alive outside of `async_main` // and explicitly drop it to avoid dropping it from an `async` context -