diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index 332c9c89dd3..329222d6c34 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -54,7 +54,7 @@ use std::panic::AssertUnwindSafe; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, LazyLock}; use std::time::Instant; -use tokio::sync::{oneshot, Mutex as AsyncMutex}; +use tokio::sync::{oneshot, Mutex as AsyncMutex, Notify}; use tracing::Instrument; use v8::script_compiler::{compile_module, Source}; use v8::{ @@ -210,6 +210,12 @@ impl V8RuntimeInner { // Convert program to a string. let program: Arc = str::from_utf8(program_bytes)?.into(); + let database_identity = mcc.replica_ctx.database_identity; + let lane_queue = JsWorkerQueue::unbounded_with_metric( + WORKER_METRICS + .v8_instance_lane_queue_length + .with_label_values(&database_identity), + ); // Validate/create the module and spawn the first instance. let mcc = Either::Right(mcc); @@ -221,6 +227,7 @@ impl V8RuntimeInner { load_balance_guard.clone(), core_pinner.clone(), heap_policy, + lane_queue.clone(), ) .await?; let module = JsModule { @@ -229,6 +236,7 @@ impl V8RuntimeInner { load_balance_guard, core_pinner, heap_policy, + lane_queue, }; Ok(ModuleWithInstance::Js { module, init_inst }) @@ -242,6 +250,7 @@ pub struct JsModule { load_balance_guard: Arc, core_pinner: CorePinner, heap_policy: V8HeapPolicyConfig, + lane_queue: Arc, } impl JsModule { @@ -258,11 +267,38 @@ impl JsModule { } pub async fn create_instance(&self) -> JsInstance { + // We use a rendezvous channel for pooled instances, because they are checked + // out one request at a time and subsequently returned to the pool, unlike the + // long lived instance used for executing reducers which isn't checked out but + // fed through a queue. + let request_queue = JsWorkerQueue::bounded(0); + let program = self.program.clone(); + let common = self.common.clone(); + let load_balance_guard = self.load_balance_guard.clone(); + let core_pinner = self.core_pinner.clone(); + let heap_policy = self.heap_policy; + + // This has to be done in a blocking context because of `blocking_recv`. + let (_, instance) = spawn_instance_worker( + program, + Either::Left(common), + load_balance_guard, + core_pinner, + heap_policy, + request_queue, + ) + .await + .expect("`spawn_instance_worker` should succeed when passed `ModuleCommon`"); + instance + } + + async fn create_lane_instance(&self) -> JsInstance { let program = self.program.clone(); let common = self.common.clone(); let load_balance_guard = self.load_balance_guard.clone(); let core_pinner = self.core_pinner.clone(); let heap_policy = self.heap_policy; + let request_queue = self.lane_queue.clone(); // This has to be done in a blocking context because of `blocking_recv`. let (_, instance) = spawn_instance_worker( @@ -271,6 +307,7 @@ impl JsModule { load_balance_guard, core_pinner, heap_policy, + request_queue, ) .await .expect("`spawn_instance_worker` should succeed when passed `ModuleCommon`"); @@ -372,8 +409,8 @@ impl JsInstanceEnv { /// which the instance communicates with through channels. /// /// This handle is cloneable and shared by callers. Requests are queued FIFO -/// on the worker thread so the next reducer can start immediately after the -/// previous one finishes, without waiting for an outer task to hand the +/// on the backing worker queue so the next reducer can start immediately after +/// the previous one finishes, without waiting for an outer task to hand the /// instance back. /// /// When the last handle is dropped, the channels will hang up, @@ -387,8 +424,112 @@ pub struct JsInstance { /// it to tell whether the currently active worker has already been replaced /// after a trap or disconnect. id: u64, - request_tx: flume::Sender, - trapped: Arc, + request_queue: Arc, + worker_state: Arc, +} + +/// Shared request queue for a JS instance lane. +/// +/// Async callers enqueue [`JsWorkerRequest`] values here and wait on their +/// per-request one-shot replies. The dedicated JS worker thread drains this +/// queue and executes those requests on the isolate. +/// +/// Note, this queue outlives any single JS worker or V8 isolate. +/// When the active worker traps or is retired for heap growth, +/// [`JsInstanceLane::replace_active_if_current`] waits for it to fully exit +/// and spawns a fresh worker which then starts reading from the queue so that +/// buffered requests are not dropped. +#[derive(Clone)] +struct JsWorkerQueue { + tx: flume::Sender, + rx: flume::Receiver, + metric: Option, +} + +impl JsWorkerQueue { + fn bounded(capacity: usize) -> Arc { + let (tx, rx) = flume::bounded(capacity); + Arc::new(Self { tx, rx, metric: None }) + } + + fn unbounded_with_metric(metric: IntGauge) -> Arc { + let (tx, rx) = flume::unbounded(); + Arc::new(Self { + tx, + rx, + metric: Some(metric), + }) + } + + async fn send_async(&self, request: JsWorkerRequest) -> Result<(), flume::SendError> { + self.tx.send_async(request).await?; + if let Some(metric) = &self.metric { + metric.inc(); + } + Ok(()) + } + + fn receiver(&self) -> flume::Receiver { + self.rx.clone() + } +} + +impl Drop for JsWorkerQueue { + fn drop(&mut self) { + if let Some(metric) = &self.metric { + metric.sub(self.tx.len() as _); + } + } +} + +/// Lifecycle state for a single JS worker. +/// +/// The `trapped` bit tells the lane that this worker must not keep serving +/// future work. The `exited` bit and [`Notify`] are what make the [JsWorkerQueue] +/// remain single-consumer. Recovery waits on [`Self::wait_exited`] before spawning +/// the replacement worker, so there is no overlap where two workers can both drain +/// the queue. +struct JsWorkerState { + trapped: AtomicBool, + exited: AtomicBool, + exited_notify: Notify, +} + +impl JsWorkerState { + fn new() -> Arc { + Arc::new(Self { + trapped: AtomicBool::new(false), + exited: AtomicBool::new(false), + exited_notify: Notify::new(), + }) + } + + fn trapped(&self) -> bool { + self.trapped.load(Ordering::Relaxed) + } + + fn exited(&self) -> bool { + self.exited.load(Ordering::Relaxed) + } + + fn needs_recovery(&self) -> bool { + self.trapped() || self.exited() + } + + fn mark_trapped(&self) { + self.trapped.store(true, Ordering::Relaxed); + } + + fn mark_exited(&self) { + self.exited.store(true, Ordering::Relaxed); + self.exited_notify.notify_waiters(); + } + + async fn wait_exited(&self) { + while !self.exited() { + self.exited_notify.notified().await; + } + } } impl JsInstance { @@ -397,7 +538,15 @@ impl JsInstance { } pub fn trapped(&self) -> bool { - self.trapped.load(Ordering::Relaxed) + self.worker_state.trapped() + } + + fn needs_recovery(&self) -> bool { + self.worker_state.needs_recovery() + } + + async fn wait_exited(&self) { + self.worker_state.wait_exited().await; } async fn send_request( @@ -405,13 +554,13 @@ impl JsInstance { request: impl FnOnce(JsReplyTx) -> JsWorkerRequest, ) -> Result { let (reply_tx, reply_rx) = oneshot::channel(); - self.request_tx + self.request_queue .send_async(request(reply_tx)) .await .map_err(|_| WorkerDisconnected)?; let JsWorkerReply { value, trapped } = reply_rx.await.map_err(|_| WorkerDisconnected)?; if trapped { - self.trapped.store(true, Ordering::Relaxed); + self.worker_state.mark_trapped(); } Ok(value) } @@ -424,7 +573,7 @@ impl JsInstance { let span = tracing::Span::current(); let (tx, rx) = oneshot::channel(); - self.request_tx + self.request_queue .send_async(JsWorkerRequest::RunFunction(Box::new(move || { async move { let result = AssertUnwindSafe(f().instrument(span)).catch_unwind().await; @@ -764,7 +913,7 @@ impl JsInstanceLane { } } - async fn replace_active_if_current(&self, trapped: &JsInstance) { + async fn replace_active_if_current(&self, stale: &JsInstance) { // `replace_lock` intentionally serializes the rare recovery path. This // prevents a trap observed by many callers from spawning many replacement // workers and racing to install them. @@ -773,7 +922,17 @@ impl JsInstanceLane { // The same trapped instance can be observed by multiple callers at once. // We only want the first one to do the swap; everybody else should notice // that the active handle already changed and get out of the way. - if self.state.active.read().id() != trapped.id() { + if self.state.active.read().id() != stale.id() { + return; + } + + // Wait for the stale worker generation to finish exiting before we + // install a replacement. That keeps the shared queue single-consumer. + if stale.needs_recovery() { + stale.wait_exited().await; + } + + if self.state.active.read().id() != stale.id() { return; } @@ -782,7 +941,7 @@ impl JsInstanceLane { // Keep the awaited instance creation outside of any `parking_lot` guard. // The only lock held across this await is `replace_lock`, which is why it // has to be async. - let next = self.module.create_instance().await; + let next = self.module.create_lane_instance().await; *self.state.active.write() = next; } @@ -791,7 +950,7 @@ impl JsInstanceLane { /// If the worker disappears before replying, we replace it for future /// requests but surface the disconnect to the caller instead of retrying. /// This keeps instance-lane semantics closer to the old pooled-instance - /// model now that the worker queue is a rendezvous channel. + /// model while still preserving one-at-a-time execution on the worker. async fn run_once( &self, label: &'static str, @@ -799,7 +958,11 @@ impl JsInstanceLane { ) -> Result { assert_not_on_js_module_thread(label); - let active = self.active_instance(); + let mut active = self.active_instance(); + if active.needs_recovery() { + self.replace_active_if_current(&active).await; + active = self.active_instance(); + } let result = work(active.clone()).await; match result { Ok(value) => { @@ -828,7 +991,7 @@ impl JsInstanceLane { self.run_once("run_on_thread", async move |inst| { let (tx, rx) = oneshot::channel(); - inst.request_tx + inst.request_queue .send_async(JsWorkerRequest::RunFunction(Box::new(move || { async move { let _on_js_module_thread = EnteredJsModuleThread::new(); @@ -1037,21 +1200,22 @@ async fn spawn_instance_worker( load_balance_guard: Arc, mut core_pinner: CorePinner, heap_policy: V8HeapPolicyConfig, + request_queue: Arc, ) -> anyhow::Result<(ModuleCommon, JsInstance)> { - // Spawn a rendezvous queue for requests to the worker. - // Multiple callers can wait to hand work to the worker, but with - // `bounded(0)` there is no buffered backlog inside the channel itself. - // The worker still processes requests strictly one at a time. - let (request_tx, request_rx) = flume::bounded(0); - // This one-shot channel is used for initial startup error handling within the thread. let (result_tx, result_rx) = oneshot::channel(); - let trapped = Arc::new(AtomicBool::new(false)); - let worker_trapped = trapped.clone(); + let worker_state = JsWorkerState::new(); + let worker_state_in_thread = worker_state.clone(); let rt = tokio::runtime::Handle::current(); + let request_rx = request_queue.receiver(); + let worker_queue_metric = request_queue.metric.clone(); std::thread::spawn(move || { + scopeguard::defer! { + worker_state_in_thread.mark_exited(); + } + let _guard = load_balance_guard; core_pinner.pin_now(); @@ -1129,6 +1293,9 @@ async fn spawn_instance_worker( let mut requests_since_heap_check = 0u64; let mut last_heap_check_at = Instant::now(); for request in request_rx.iter() { + if let Some(metric) = &worker_queue_metric { + metric.dec(); + } let mut call_reducer = |tx, params| instance_common.call_reducer_with_tx(tx, params, &mut inst); let mut should_exit = false; @@ -1147,13 +1314,17 @@ async fn spawn_instance_worker( } JsWorkerRequest::CallReducer { reply_tx, params } => { let (res, trapped) = call_reducer(None, params); - worker_trapped.store(trapped, Ordering::Relaxed); + if trapped { + worker_state_in_thread.mark_trapped(); + } send_worker_reply("call_reducer", reply_tx, res, trapped); should_exit = trapped; } JsWorkerRequest::CallView { reply_tx, cmd } => { let (res, trapped) = instance_common.handle_cmd(cmd, &mut inst); - worker_trapped.store(trapped, Ordering::Relaxed); + if trapped { + worker_state_in_thread.mark_trapped(); + } send_worker_reply("call_view", reply_tx, res, trapped); should_exit = trapped; } @@ -1162,7 +1333,9 @@ async fn spawn_instance_worker( .call_procedure(params, &mut inst) .now_or_never() .expect("our call_procedure implementation is not actually async"); - worker_trapped.store(trapped, Ordering::Relaxed); + if trapped { + worker_state_in_thread.mark_trapped(); + } send_worker_reply("call_procedure", reply_tx, res, trapped); should_exit = trapped; } @@ -1178,7 +1351,9 @@ async fn spawn_instance_worker( let mut trapped = false; let res = call_identity_connected(caller_auth, caller_connection_id, info, call_reducer, &mut trapped); - worker_trapped.store(trapped, Ordering::Relaxed); + if trapped { + worker_state_in_thread.mark_trapped(); + } send_worker_reply("call_identity_connected", reply_tx, res, trapped); should_exit = trapped; } @@ -1195,21 +1370,27 @@ async fn spawn_instance_worker( call_reducer, &mut trapped, ); - worker_trapped.store(trapped, Ordering::Relaxed); + if trapped { + worker_state_in_thread.mark_trapped(); + } send_worker_reply("call_identity_disconnected", reply_tx, res, trapped); should_exit = trapped; } JsWorkerRequest::DisconnectClient { reply_tx, client_id } => { let mut trapped = false; let res = ModuleHost::disconnect_client_inner(client_id, info, call_reducer, &mut trapped); - worker_trapped.store(trapped, Ordering::Relaxed); + if trapped { + worker_state_in_thread.mark_trapped(); + } send_worker_reply("disconnect_client", reply_tx, res, trapped); should_exit = trapped; } JsWorkerRequest::InitDatabase { reply_tx, program } => { let (res, trapped): (Result, anyhow::Error>, bool) = init_database(replica_ctx, &module_common.info().module_def, program, call_reducer); - worker_trapped.store(trapped, Ordering::Relaxed); + if trapped { + worker_state_in_thread.mark_trapped(); + } send_worker_reply("init_database", reply_tx, res, trapped); should_exit = trapped; } @@ -1218,7 +1399,9 @@ async fn spawn_instance_worker( .call_scheduled_function(params, &mut inst) .now_or_never() .expect("our call_procedure implementation is not actually async"); - worker_trapped.store(trapped, Ordering::Relaxed); + if trapped { + worker_state_in_thread.mark_trapped(); + } send_worker_reply("call_scheduled_function", reply_tx, res, trapped); should_exit = trapped; } @@ -1236,7 +1419,7 @@ async fn spawn_instance_worker( requests_since_heap_check = 0; last_heap_check_at = Instant::now(); if let Some((used, limit)) = should_retire_worker_for_heap(inst.scope, &heap_metrics, heap_policy) { - worker_trapped.store(true, Ordering::Relaxed); + worker_state_in_thread.mark_trapped(); should_exit = true; log::warn!( "retiring JS worker after V8 heap stayed high post-GC: used={}MiB limit={}MiB", @@ -1250,7 +1433,8 @@ async fn spawn_instance_worker( // Once a JS instance traps, we must not let later queued work execute // on that poisoned isolate. We reply to the trapping request first so // the caller can observe the actual reducer/procedure error, and then - // shut the worker down so later callers retry on a fresh instance. + // shut the worker down so the shared queue can continue on a fresh + // instance. // // We also retire workers when they stay near the V8 heap limit after // a GC. The instance lane intentionally keeps a JS worker alive for @@ -1267,8 +1451,8 @@ async fn spawn_instance_worker( res.map(|opt_mc| { let inst = JsInstance { id: NEXT_JS_INSTANCE_ID.fetch_add(1, Ordering::Relaxed), - request_tx, - trapped, + request_queue, + worker_state, }; (opt_mc, inst) }) diff --git a/crates/core/src/worker_metrics/mod.rs b/crates/core/src/worker_metrics/mod.rs index 6421a95d7cb..c16c22ddbba 100644 --- a/crates/core/src/worker_metrics/mod.rs +++ b/crates/core/src/worker_metrics/mod.rs @@ -312,6 +312,11 @@ metrics_group!( #[labels(database_identity: Identity)] pub v8_heap_size_limit_bytes: IntGaugeVec, + #[name = spacetime_worker_v8_instance_lane_queue_length] + #[help = "The number of queued requests waiting for a database's JS instance lane worker"] + #[labels(database_identity: Identity)] + pub v8_instance_lane_queue_length: IntGaugeVec, + #[name = spacetime_worker_v8_external_memory_bytes] #[help = "The external memory tracked by V8 for a database's JS worker isolate"] #[labels(database_identity: Identity)]