Skip to content

Replace JS worker's rendezvous channel with unbounded queue#4704

Open
joshua-spacetime wants to merge 1 commit intomasterfrom
joshua/js-worker-queue
Open

Replace JS worker's rendezvous channel with unbounded queue#4704
joshua-spacetime wants to merge 1 commit intomasterfrom
joshua/js-worker-queue

Conversation

@joshua-spacetime
Copy link
Copy Markdown
Collaborator

@joshua-spacetime joshua-spacetime commented Mar 24, 2026

Description of Changes

Previously, a module’s JS worker thread was fed through a zero-capacity channel. That made every request handoff a rendezvous between the async producer task and the single JS worker thread. Under high concurrency, that synchronous handoff showed up as hot flume/lock/wakeup stacks on the critical path - the JS worker thread.

This patch brings V8 execution in line with WASM which also uses an unbounded request queue.

Changing that handoff to an unbounded queue decouples request producers from the JS worker. Producers can enqueue work without synchronizing directly with the worker on every request, and the worker can continue draining queued requests without paying the rendezvous cost each time. This shortens the critical path, reduces scheduler/locking overhead and increases throughput.

API and ABI breaking changes

None

Expected complexity level and risk

2

Testing

Manual performance testing

@joshua-spacetime joshua-spacetime changed the title test: make js worker queue unbounded Replace JS worker's rendezvous channel with bounded queue Mar 25, 2026
@joshua-spacetime joshua-spacetime marked this pull request as ready for review March 25, 2026 06:49
@joshua-spacetime joshua-spacetime changed the title Replace JS worker's rendezvous channel with bounded queue Replace JS worker's rendezvous channel with unbounded queue Mar 25, 2026
Base automatically changed from joshua/v8-heap-metrics to master March 25, 2026 22:37
@Centril Centril self-requested a review March 27, 2026 11:31
/// remain single-consumer. Recovery waits on [`Self::wait_exited`] before spawning
/// the replacement worker, so there is no overlap where two workers can both drain
/// the queue.
struct JsWorkerState {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
struct JsWorkerState {
#[derive(Default)]
struct JsWorkerState {

Comment on lines +499 to +505
fn new() -> Arc<Self> {
Arc::new(Self {
trapped: AtomicBool::new(false),
exited: AtomicBool::new(false),
exited_notify: Notify::new(),
})
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of these are Default.

Suggested change
fn new() -> Arc<Self> {
Arc::new(Self {
trapped: AtomicBool::new(false),
exited: AtomicBool::new(false),
exited_notify: Notify::new(),
})
}

let (result_tx, result_rx) = oneshot::channel();
let trapped = Arc::new(AtomicBool::new(false));
let worker_trapped = trapped.clone();
let worker_state = JsWorkerState::new();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let worker_state = JsWorkerState::new();
let worker_state: Arc<JsWorkerState> = <_>::default();

Comment on lines +315 to +318
#[name = spacetime_worker_v8_instance_lane_queue_length]
#[help = "The number of queued requests waiting for a database's JS instance lane worker"]
#[labels(database_identity: Identity)]
pub v8_instance_lane_queue_length: IntGaugeVec,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that you've discovered the issue and resolved it, I think we can remove these metrics as they mostly add (a little bit of) cost and complexity.

Comment on lines 269 to 315
pub async fn create_instance(&self) -> JsInstance {
// We use a rendezvous channel for pooled instances, because they are checked
// out one request at a time and subsequently returned to the pool, unlike the
// long lived instance used for executing reducers which isn't checked out but
// fed through a queue.
let request_queue = JsWorkerQueue::bounded(0);
let program = self.program.clone();
let common = self.common.clone();
let load_balance_guard = self.load_balance_guard.clone();
let core_pinner = self.core_pinner.clone();
let heap_policy = self.heap_policy;

// This has to be done in a blocking context because of `blocking_recv`.
let (_, instance) = spawn_instance_worker(
program,
Either::Left(common),
load_balance_guard,
core_pinner,
heap_policy,
request_queue,
)
.await
.expect("`spawn_instance_worker` should succeed when passed `ModuleCommon`");
instance
}

async fn create_lane_instance(&self) -> JsInstance {
let program = self.program.clone();
let common = self.common.clone();
let load_balance_guard = self.load_balance_guard.clone();
let core_pinner = self.core_pinner.clone();
let heap_policy = self.heap_policy;
let request_queue = self.lane_queue.clone();

// This has to be done in a blocking context because of `blocking_recv`.
let (_, instance) = spawn_instance_worker(
program,
Either::Left(common),
load_balance_guard,
core_pinner,
heap_policy,
request_queue,
)
.await
.expect("`spawn_instance_worker` should succeed when passed `ModuleCommon`");
instance
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would have made reviewing easier and the diff smaller if this code was not duplicated. Please dedup these into a common base method taking request_queue.

Comment on lines +433 to +435
/// Async callers enqueue [`JsWorkerRequest`] values here and wait on their
/// per-request one-shot replies. The dedicated JS worker thread drains this
/// queue and executes those requests on the isolate.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Async callers enqueue [`JsWorkerRequest`] values here and wait on their
/// per-request one-shot replies. The dedicated JS worker thread drains this
/// queue and executes those requests on the isolate.
/// Async callers enqueue [`JsWorkerRequest`] values here
/// and wait on their per-request one-shot replies sent back via [`JsReplyTx`].
/// The dedicated JS worker thread, spawned in [`spawn_instance_worker`]
/// drains this queue and executes those requests on the isolate.

Comment on lines +507 to +528
fn trapped(&self) -> bool {
self.trapped.load(Ordering::Relaxed)
}

fn exited(&self) -> bool {
self.exited.load(Ordering::Relaxed)
}

fn needs_recovery(&self) -> bool {
self.trapped() || self.exited()
}

fn mark_trapped(&self) {
self.trapped.store(true, Ordering::Relaxed);
}

fn mark_exited(&self) {
self.exited.store(true, Ordering::Relaxed);
self.exited_notify.notify_waiters();
}

async fn wait_exited(&self) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add doc comments to these.

id: u64,
request_tx: flume::Sender<JsWorkerRequest>,
trapped: Arc<AtomicBool>,
request_queue: Arc<JsWorkerQueue>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the Arc necessary here? The inner type is Clone and consists of cheaply cloneable types where each is already wrapped in Arc.

Comment on lines +962 to +965
if active.needs_recovery() {
self.replace_active_if_current(&active).await;
active = self.active_instance();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we enter this method with a bad instance now? Some commentary on this would be good in the code.

Comment on lines +1317 to +1319
if trapped {
worker_state_in_thread.mark_trapped();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing that would be good to comment on is why we need to do worker_state_in_thread.mark_trapped(); when we already do that in send_request. Ostensibly, we are setting the flag twice now.

@Centril
Copy link
Copy Markdown
Contributor

Centril commented Mar 27, 2026

On phoenix nap, using the rust client, I get:

  • TS module: 156.5k TPS
  • Rust module: 168.5k TPS

The difference with this PR is at 12k TPS 🚀

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants