From 10cd5d92303d05baca986b9bc686313705a7c56f Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Wed, 25 Mar 2026 19:09:49 -0700 Subject: [PATCH] Add pipelining option to distributed keynote benchmark --- templates/keynote-2/DEVELOP.md | 5 +- templates/keynote-2/README.md | 1 + .../keynote-2/src/distributed/control.ts | 4 +- .../keynote-2/src/distributed/coordinator.ts | 24 ++++- .../keynote-2/src/distributed/generator.ts | 7 +- .../keynote-2/src/distributed/loadSession.ts | 89 ++++++++++++++++--- .../keynote-2/src/distributed/protocol.ts | 7 ++ 7 files changed, 117 insertions(+), 20 deletions(-) diff --git a/templates/keynote-2/DEVELOP.md b/templates/keynote-2/DEVELOP.md index 391a13f7488..4449b884d50 100644 --- a/templates/keynote-2/DEVELOP.md +++ b/templates/keynote-2/DEVELOP.md @@ -264,6 +264,8 @@ Notes: - `--warmup-seconds` is the unmeasured warmup period. Generators submit requests during warmup, but those transactions are excluded from TPS. - `--window-seconds` is the measured interval. +- `--pipelined 1` enables request pipelining. Omit it or pass `--pipelined 0` to stay in closed-loop mode, one request at a time. +- `--max-inflight-per-connection` caps the number of in-flight requests each connection may have when pipelining is enabled. The default is `8`. - `--verify 1` preserves the existing benchmark semantics by running one verification pass centrally after the epoch completes. - The coordinator derives the HTTP metrics endpoint from `--stdb-url` by switching to `http://` or `https://` and appending `/v1/metrics`. - For a real multi-machine run, change `--bind 127.0.0.1` to `--bind 0.0.0.0` so remote generators can reach the coordinator. @@ -351,6 +353,7 @@ The result contains: - participating generator IDs - total participating connections +- whether the epoch used closed-loop or pipelined load, and the per-connection in-flight cap - committed transaction delta from the server metrics endpoint - measured window duration - computed TPS @@ -361,7 +364,7 @@ The result contains: - Start the coordinator before the generators. - Generators begin submitting requests when the coordinator enters `warmup`, not when the measured window begins. - Throughput is measured only from the committed transaction counter delta recorded after warmup, so warmup transactions are excluded. -- For this distributed TypeScript mode, each connection runs closed-loop with one request at a time. There is no pipelining in this flow. +- Distributed TypeScript mode defaults to closed-loop, one request at a time per connection. Enable pipelining on the coordinator with `--pipelined 1`, and all generators will follow that setting for the epoch. - Late generators are allowed to register and become ready while an epoch is already running, but they only participate in the next epoch. - The coordinator does not use heartbeats. It includes generators that most recently reported `ready`. - If a participating generator dies and never sends `/stopped`, the epoch result is written with an `error`, and that generator remains `running` in coordinator status until you restart it and let it register again. diff --git a/templates/keynote-2/README.md b/templates/keynote-2/README.md index 560a538215d..3fc4f66eaac 100644 --- a/templates/keynote-2/README.md +++ b/templates/keynote-2/README.md @@ -149,6 +149,7 @@ This architectural difference means SpacetimeDB can execute transactions in micr ### Client Pipelining The benchmark supports **pipelining** for all clients - sending multiple requests without waiting for responses. This maximizes throughput by keeping connections saturated. +The distributed TypeScript SpacetimeDB flow also supports coordinator-controlled pipelining via `bench-dist-coordinator -- --pipelined 1`, with `--max-inflight-per-connection` to cap outstanding requests per connection. ### Confirmed Reads (`withConfirmedReads`) diff --git a/templates/keynote-2/src/distributed/control.ts b/templates/keynote-2/src/distributed/control.ts index 3c7d1d3e370..dc75617cf1f 100644 --- a/templates/keynote-2/src/distributed/control.ts +++ b/templates/keynote-2/src/distributed/control.ts @@ -18,7 +18,7 @@ function printState(state: CoordinatorState): void { `phase=${state.phase} epoch=${state.currentEpoch ?? '-'} label=${state.currentLabel ?? '-'}`, ); console.log( - `test=${state.test} connector=${state.connector} participants=${state.participants.length}`, + `test=${state.test} connector=${state.connector} participants=${state.participants.length} mode=${state.loadOptions.pipelined ? `pipelined/${state.loadOptions.maxInflightPerConnection}` : 'closed-loop'}`, ); if (state.generators.length === 0) { @@ -35,7 +35,7 @@ function printState(state: CoordinatorState): void { if (state.lastResult) { console.log('last_result:'); console.log( - ` epoch=${state.lastResult.epoch} tps=${state.lastResult.tps.toFixed(2)} delta=${state.lastResult.committedDelta} verification=${state.lastResult.verification}${state.lastResult.error ? ` error=${state.lastResult.error}` : ''}`, + ` epoch=${state.lastResult.epoch} mode=${state.lastResult.loadOptions.pipelined ? `pipelined/${state.lastResult.loadOptions.maxInflightPerConnection}` : 'closed-loop'} tps=${state.lastResult.tps.toFixed(2)} delta=${state.lastResult.committedDelta} verification=${state.lastResult.verification}${state.lastResult.error ? ` error=${state.lastResult.error}` : ''}`, ); } } diff --git a/templates/keynote-2/src/distributed/coordinator.ts b/templates/keynote-2/src/distributed/coordinator.ts index 0f4f3c2c03f..09c3166da97 100644 --- a/templates/keynote-2/src/distributed/coordinator.ts +++ b/templates/keynote-2/src/distributed/coordinator.ts @@ -20,6 +20,7 @@ import { import type { CoordinatorPhase, CoordinatorState, + DistributedLoadOptions, EpochResult, GeneratorLocalState, GeneratorSnapshot, @@ -103,6 +104,7 @@ async function runVerification(url: string, moduleName: string): Promise { class DistributedCoordinator { private readonly testName: string; private readonly connectorName: string; + private readonly loadOptions: DistributedLoadOptions; private readonly warmupMs: number; private readonly windowMs: number; private readonly verifyAfterEpoch: boolean; @@ -121,6 +123,7 @@ class DistributedCoordinator { constructor(opts: { testName: string; connectorName: string; + loadOptions: DistributedLoadOptions; warmupMs: number; windowMs: number; verifyAfterEpoch: boolean; @@ -131,6 +134,7 @@ class DistributedCoordinator { }) { this.testName = opts.testName; this.connectorName = opts.connectorName; + this.loadOptions = opts.loadOptions; this.warmupMs = opts.warmupMs; this.windowMs = opts.windowMs; this.verifyAfterEpoch = opts.verifyAfterEpoch; @@ -159,6 +163,7 @@ class DistributedCoordinator { participants: this.currentEpoch?.participantIds ?? [], test: this.testName, connector: this.connectorName, + loadOptions: this.loadOptions, generators, lastResult: this.lastResult, }; @@ -365,6 +370,7 @@ class DistributedCoordinator { label: activeEpoch.label, test: this.testName, connector: this.connectorName, + loadOptions: this.loadOptions, warmupSeconds: this.warmupMs / 1000, windowSeconds: this.windowMs / 1000, actualWindowSeconds, @@ -438,11 +444,23 @@ async function main(): Promise { const resultsDir = getStringFlag(flags, 'results-dir', defaultResultsDir); const warmupSeconds = getNumberFlag(flags, 'warmup-seconds', 15); const windowSeconds = getNumberFlag(flags, 'window-seconds', 60); + const pipelined = getBoolFlag(flags, 'pipelined', false); + const maxInflightPerConnection = getNumberFlag( + flags, + 'max-inflight-per-connection', + 8, + ); const stopAckTimeoutSeconds = getNumberFlag( flags, 'stop-ack-timeout-seconds', 60, ); + if ( + !Number.isInteger(maxInflightPerConnection) || + maxInflightPerConnection < 1 + ) { + throw new Error('--max-inflight-per-connection must be an integer >= 1'); + } const verifyAfterEpoch = getBoolFlag(flags, 'verify', false); const rawStdbUrl = getStringFlag( flags, @@ -460,6 +478,10 @@ async function main(): Promise { const coordinator = new DistributedCoordinator({ testName, connectorName, + loadOptions: { + pipelined, + maxInflightPerConnection, + }, warmupMs: warmupSeconds * 1000, windowMs: windowSeconds * 1000, verifyAfterEpoch, @@ -530,7 +552,7 @@ async function main(): Promise { }); console.log( - `[coordinator] listening on http://${bind}:${port} test=${testName} connector=${connectorName} warmup=${warmupSeconds}s window=${windowSeconds}s verify=${verifyAfterEpoch ? 'on' : 'off'} stdb=${stdbUrl}`, + `[coordinator] listening on http://${bind}:${port} test=${testName} connector=${connectorName} mode=${pipelined ? `pipelined/${maxInflightPerConnection}` : 'closed-loop'} warmup=${warmupSeconds}s window=${windowSeconds}s verify=${verifyAfterEpoch ? 'on' : 'off'} stdb=${stdbUrl}`, ); } diff --git a/templates/keynote-2/src/distributed/generator.ts b/templates/keynote-2/src/distributed/generator.ts index 24ffde1ff47..21bb1366cf2 100644 --- a/templates/keynote-2/src/distributed/generator.ts +++ b/templates/keynote-2/src/distributed/generator.ts @@ -149,8 +149,11 @@ async function main(): Promise { state.currentEpoch != null && state.participants.includes(id) ) { - console.log(`[generator ${id}] starting epoch ${state.currentEpoch}`); - await session.startEpoch(state.currentEpoch); + const { pipelined, maxInflightPerConnection } = state.loadOptions; + console.log( + `[generator ${id}] starting epoch ${state.currentEpoch} mode=${pipelined ? `pipelined/${maxInflightPerConnection}` : 'closed-loop'}`, + ); + await session.startEpoch(state.currentEpoch, state.loadOptions); activeEpoch = state.currentEpoch; } } else if (!shouldKeepRunning) { diff --git a/templates/keynote-2/src/distributed/loadSession.ts b/templates/keynote-2/src/distributed/loadSession.ts index 90f7f870b80..5fc5b8be32f 100644 --- a/templates/keynote-2/src/distributed/loadSession.ts +++ b/templates/keynote-2/src/distributed/loadSession.ts @@ -1,6 +1,7 @@ import type { ReducerConnector } from '../core/connectors.ts'; import { performance } from 'node:perf_hooks'; import { pickTwoDistinct, zipfSampler } from '../core/zipf.ts'; +import type { DistributedLoadOptions } from './protocol.ts'; const OP_TIMEOUT_MS = Number(process.env.BENCH_OP_TIMEOUT_MS ?? '15000'); const DEFAULT_PRECOMPUTED_TRANSFER_PAIRS = 10_000_000; @@ -67,6 +68,7 @@ type RunState = { epoch: number; stopRequested: boolean; workerPromises: Promise[]; + loadOptions: DistributedLoadOptions; }; export class LoadSession { @@ -137,7 +139,7 @@ export class LoadSession { } } - async startEpoch(epoch: number): Promise { + async startEpoch(epoch: number, loadOptions: DistributedLoadOptions): Promise { if (this.openedConnections !== this.concurrency) { throw new Error( `Cannot start epoch ${epoch}: expected ${this.concurrency} open connections, got ${this.openedConnections}`, @@ -153,9 +155,15 @@ export class LoadSession { epoch, stopRequested: false, workerPromises: [], + loadOptions, }; this.runState = runState; + const mode = loadOptions.pipelined + ? `pipelined max_inflight=${loadOptions.maxInflightPerConnection}` + : 'closed-loop'; + console.log(`[distributed] starting epoch ${epoch} in ${mode} mode`); + runState.workerPromises = this.conns.map((conn, workerIndex) => { if (!conn) { throw new Error(`Connection ${workerIndex} not open`); @@ -213,13 +221,23 @@ export class LoadSession { workerIndex: number, runState: RunState, ): Promise { + const nextTransferPair = this.makeTransferPairPicker(workerIndex); + if (!runState.loadOptions.pipelined) { + await this.closedLoopWorker(conn, workerIndex, runState, nextTransferPair); + return; + } + + await this.pipelinedWorker(conn, workerIndex, runState, nextTransferPair); + } + + private makeTransferPairPicker(workerIndex: number): () => [number, number] { const pairsPerWorker = Math.max( 1, Math.floor(this.pairs.count / this.concurrency), ); let pairIndex = workerIndex * pairsPerWorker; - const nextTransferPair = (): [number, number] => { + return (): [number, number] => { if (pairIndex >= this.pairs.count) { pairIndex = 0; } @@ -229,22 +247,65 @@ export class LoadSession { pairIndex++; return [from, to]; }; + } + private async closedLoopWorker( + conn: ReducerConnector, + workerIndex: number, + runState: RunState, + nextTransferPair: () => [number, number], + ): Promise { while (!runState.stopRequested) { - const [from, to] = nextTransferPair(); + await this.runTransfer(conn, workerIndex, nextTransferPair); + } + } - try { - await withOpTimeout( - this.scenario(conn, from, to, 1), - `[distributed] worker ${workerIndex} transfer ${from}->${to}`, + private async pipelinedWorker( + conn: ReducerConnector, + workerIndex: number, + runState: RunState, + nextTransferPair: () => [number, number], + ): Promise { + const maxInflight = runState.loadOptions.maxInflightPerConnection; + const inflight = new Set>(); + + const launchTransfer = () => { + const transfer = this.runTransfer(conn, workerIndex, nextTransferPair); + inflight.add(transfer); + transfer.finally(() => { + inflight.delete(transfer); + }); + }; + + while (!runState.stopRequested) { + if (inflight.size < maxInflight) { + launchTransfer(); + } else { + await Promise.race(inflight); + } + } + + await Promise.all(inflight); + } + + private async runTransfer( + conn: ReducerConnector, + workerIndex: number, + nextTransferPair: () => [number, number], + ): Promise { + const [from, to] = nextTransferPair(); + + try { + await withOpTimeout( + this.scenario(conn, from, to, 1), + `[distributed] worker ${workerIndex} transfer ${from}->${to}`, + ); + } catch (err) { + if (process.env.LOG_ERRORS === '1') { + const msg = err instanceof Error ? err.message : String(err); + console.warn( + `[distributed] worker ${workerIndex} failed ${from}->${to}: ${msg}`, ); - } catch (err) { - if (process.env.LOG_ERRORS === '1') { - const msg = err instanceof Error ? err.message : String(err); - console.warn( - `[distributed] worker ${workerIndex} failed ${from}->${to}: ${msg}`, - ); - } } } } diff --git a/templates/keynote-2/src/distributed/protocol.ts b/templates/keynote-2/src/distributed/protocol.ts index 91b128f0ee0..5ebcc652463 100644 --- a/templates/keynote-2/src/distributed/protocol.ts +++ b/templates/keynote-2/src/distributed/protocol.ts @@ -2,6 +2,11 @@ export type GeneratorLocalState = 'registered' | 'ready' | 'running'; export type CoordinatorPhase = 'idle' | 'warmup' | 'measure' | 'stop'; +export type DistributedLoadOptions = { + pipelined: boolean; + maxInflightPerConnection: number; +}; + export type GeneratorSnapshot = { id: string; hostname: string; @@ -16,6 +21,7 @@ export type EpochResult = { label: string | null; test: string; connector: string; + loadOptions: DistributedLoadOptions; warmupSeconds: number; windowSeconds: number; actualWindowSeconds: number; @@ -39,6 +45,7 @@ export type CoordinatorState = { participants: string[]; test: string; connector: string; + loadOptions: DistributedLoadOptions; generators: GeneratorSnapshot[]; lastResult: EpochResult | null; };