Skip to content

Commit 6b94a7e

Browse files
Add pipelining option to distributed keynote benchmark
1 parent 36c416f commit 6b94a7e

7 files changed

Lines changed: 117 additions & 20 deletions

File tree

templates/keynote-2/DEVELOP.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,8 @@ Notes:
262262

263263
- `--warmup-seconds` is the unmeasured warmup period. Generators submit requests during warmup, but those transactions are excluded from TPS.
264264
- `--window-seconds` is the measured interval.
265+
- `--pipelined 1` enables request pipelining. Omit it or pass `--pipelined 0` to stay in closed-loop mode, one request at a time.
266+
- `--max-inflight-per-connection` caps the number of in-flight requests each connection may have when pipelining is enabled. The default is `8`.
265267
- `--verify 1` preserves the existing benchmark semantics by running one verification pass centrally after the epoch completes.
266268
- The coordinator derives the HTTP metrics endpoint from `--stdb-url` by switching to `http://` or `https://` and appending `/v1/metrics`.
267269
- For a real multi-machine run, change `--bind 127.0.0.1` to `--bind 0.0.0.0` so remote generators can reach the coordinator.
@@ -349,6 +351,7 @@ The result contains:
349351

350352
- participating generator IDs
351353
- total participating connections
354+
- whether the epoch used closed-loop or pipelined load, and the per-connection in-flight cap
352355
- committed transaction delta from the server metrics endpoint
353356
- measured window duration
354357
- computed TPS
@@ -359,7 +362,7 @@ The result contains:
359362
- Start the coordinator before the generators.
360363
- Generators begin submitting requests when the coordinator enters `warmup`, not when the measured window begins.
361364
- Throughput is measured only from the committed transaction counter delta recorded after warmup, so warmup transactions are excluded.
362-
- For this distributed TypeScript mode, each connection runs closed-loop with one request at a time. There is no pipelining in this flow.
365+
- Distributed TypeScript mode defaults to closed-loop, one request at a time per connection. Enable pipelining on the coordinator with `--pipelined 1`, and all generators will follow that setting for the epoch.
363366
- Late generators are allowed to register and become ready while an epoch is already running, but they only participate in the next epoch.
364367
- The coordinator does not use heartbeats. It includes generators that most recently reported `ready`.
365368
- If a participating generator dies and never sends `/stopped`, the epoch result is written with an `error`, and that generator remains `running` in coordinator status until you restart it and let it register again.

templates/keynote-2/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ This architectural difference means SpacetimeDB can execute transactions in micr
148148
### Client Pipelining
149149

150150
The benchmark supports **pipelining** for all clients - sending multiple requests without waiting for responses. This maximizes throughput by keeping connections saturated.
151+
The distributed TypeScript SpacetimeDB flow also supports coordinator-controlled pipelining via `bench-dist-coordinator -- --pipelined 1`, with `--max-inflight-per-connection` to cap outstanding requests per connection.
151152

152153
### Confirmed Reads (`withConfirmedReads`)
153154

templates/keynote-2/src/distributed/control.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ function printState(state: CoordinatorState): void {
1818
`phase=${state.phase} epoch=${state.currentEpoch ?? '-'} label=${state.currentLabel ?? '-'}`,
1919
);
2020
console.log(
21-
`test=${state.test} connector=${state.connector} participants=${state.participants.length}`,
21+
`test=${state.test} connector=${state.connector} participants=${state.participants.length} mode=${state.loadOptions.pipelined ? `pipelined/${state.loadOptions.maxInflightPerConnection}` : 'closed-loop'}`,
2222
);
2323

2424
if (state.generators.length === 0) {
@@ -35,7 +35,7 @@ function printState(state: CoordinatorState): void {
3535
if (state.lastResult) {
3636
console.log('last_result:');
3737
console.log(
38-
` epoch=${state.lastResult.epoch} tps=${state.lastResult.tps.toFixed(2)} delta=${state.lastResult.committedDelta} verification=${state.lastResult.verification}${state.lastResult.error ? ` error=${state.lastResult.error}` : ''}`,
38+
` epoch=${state.lastResult.epoch} mode=${state.lastResult.loadOptions.pipelined ? `pipelined/${state.lastResult.loadOptions.maxInflightPerConnection}` : 'closed-loop'} tps=${state.lastResult.tps.toFixed(2)} delta=${state.lastResult.committedDelta} verification=${state.lastResult.verification}${state.lastResult.error ? ` error=${state.lastResult.error}` : ''}`,
3939
);
4040
}
4141
}

templates/keynote-2/src/distributed/coordinator.ts

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import {
1616
import type {
1717
CoordinatorPhase,
1818
CoordinatorState,
19+
DistributedLoadOptions,
1920
EpochResult,
2021
GeneratorLocalState,
2122
GeneratorSnapshot,
@@ -91,6 +92,7 @@ async function runVerification(url: string, moduleName: string): Promise<void> {
9192
class DistributedCoordinator {
9293
private readonly testName: string;
9394
private readonly connectorName: string;
95+
private readonly loadOptions: DistributedLoadOptions;
9496
private readonly warmupMs: number;
9597
private readonly windowMs: number;
9698
private readonly verifyAfterEpoch: boolean;
@@ -109,6 +111,7 @@ class DistributedCoordinator {
109111
constructor(opts: {
110112
testName: string;
111113
connectorName: string;
114+
loadOptions: DistributedLoadOptions;
112115
warmupMs: number;
113116
windowMs: number;
114117
verifyAfterEpoch: boolean;
@@ -119,6 +122,7 @@ class DistributedCoordinator {
119122
}) {
120123
this.testName = opts.testName;
121124
this.connectorName = opts.connectorName;
125+
this.loadOptions = opts.loadOptions;
122126
this.warmupMs = opts.warmupMs;
123127
this.windowMs = opts.windowMs;
124128
this.verifyAfterEpoch = opts.verifyAfterEpoch;
@@ -147,6 +151,7 @@ class DistributedCoordinator {
147151
participants: this.currentEpoch?.participantIds ?? [],
148152
test: this.testName,
149153
connector: this.connectorName,
154+
loadOptions: this.loadOptions,
150155
generators,
151156
lastResult: this.lastResult,
152157
};
@@ -353,6 +358,7 @@ class DistributedCoordinator {
353358
label: activeEpoch.label,
354359
test: this.testName,
355360
connector: this.connectorName,
361+
loadOptions: this.loadOptions,
356362
warmupSeconds: this.warmupMs / 1000,
357363
windowSeconds: this.windowMs / 1000,
358364
actualWindowSeconds,
@@ -426,11 +432,23 @@ async function main(): Promise<void> {
426432
const resultsDir = getStringFlag(flags, 'results-dir', defaultResultsDir);
427433
const warmupSeconds = getNumberFlag(flags, 'warmup-seconds', 15);
428434
const windowSeconds = getNumberFlag(flags, 'window-seconds', 60);
435+
const pipelined = getBoolFlag(flags, 'pipelined', false);
436+
const maxInflightPerConnection = getNumberFlag(
437+
flags,
438+
'max-inflight-per-connection',
439+
8,
440+
);
429441
const stopAckTimeoutSeconds = getNumberFlag(
430442
flags,
431443
'stop-ack-timeout-seconds',
432444
60,
433445
);
446+
if (
447+
!Number.isInteger(maxInflightPerConnection) ||
448+
maxInflightPerConnection < 1
449+
) {
450+
throw new Error('--max-inflight-per-connection must be an integer >= 1');
451+
}
434452
const verifyAfterEpoch = getBoolFlag(flags, 'verify', false);
435453
const rawStdbUrl = getStringFlag(
436454
flags,
@@ -448,6 +466,10 @@ async function main(): Promise<void> {
448466
const coordinator = new DistributedCoordinator({
449467
testName,
450468
connectorName,
469+
loadOptions: {
470+
pipelined,
471+
maxInflightPerConnection,
472+
},
451473
warmupMs: warmupSeconds * 1000,
452474
windowMs: windowSeconds * 1000,
453475
verifyAfterEpoch,
@@ -518,7 +540,7 @@ async function main(): Promise<void> {
518540
});
519541

520542
console.log(
521-
`[coordinator] listening on http://${bind}:${port} test=${testName} connector=${connectorName} warmup=${warmupSeconds}s window=${windowSeconds}s verify=${verifyAfterEpoch ? 'on' : 'off'} stdb=${stdbUrl}`,
543+
`[coordinator] listening on http://${bind}:${port} test=${testName} connector=${connectorName} mode=${pipelined ? `pipelined/${maxInflightPerConnection}` : 'closed-loop'} warmup=${warmupSeconds}s window=${windowSeconds}s verify=${verifyAfterEpoch ? 'on' : 'off'} stdb=${stdbUrl}`,
522544
);
523545
}
524546

templates/keynote-2/src/distributed/generator.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,11 @@ async function main(): Promise<void> {
138138
state.currentEpoch != null &&
139139
state.participants.includes(id)
140140
) {
141-
console.log(`[generator ${id}] starting epoch ${state.currentEpoch}`);
142-
await session.startEpoch(state.currentEpoch);
141+
const { pipelined, maxInflightPerConnection } = state.loadOptions;
142+
console.log(
143+
`[generator ${id}] starting epoch ${state.currentEpoch} mode=${pipelined ? `pipelined/${maxInflightPerConnection}` : 'closed-loop'}`,
144+
);
145+
await session.startEpoch(state.currentEpoch, state.loadOptions);
143146
activeEpoch = state.currentEpoch;
144147
}
145148
} else if (!shouldKeepRunning) {

templates/keynote-2/src/distributed/loadSession.ts

Lines changed: 75 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type { ReducerConnector } from '../core/connectors.ts';
22
import { performance } from 'node:perf_hooks';
33
import { pickTwoDistinct, zipfSampler } from '../core/zipf.ts';
4+
import type { DistributedLoadOptions } from './protocol.ts';
45

56
const OP_TIMEOUT_MS = Number(process.env.BENCH_OP_TIMEOUT_MS ?? '15000');
67
const DEFAULT_PRECOMPUTED_TRANSFER_PAIRS = 10_000_000;
@@ -67,6 +68,7 @@ type RunState = {
6768
epoch: number;
6869
stopRequested: boolean;
6970
workerPromises: Promise<void>[];
71+
loadOptions: DistributedLoadOptions;
7072
};
7173

7274
export class LoadSession {
@@ -137,7 +139,7 @@ export class LoadSession {
137139
}
138140
}
139141

140-
async startEpoch(epoch: number): Promise<void> {
142+
async startEpoch(epoch: number, loadOptions: DistributedLoadOptions): Promise<void> {
141143
if (this.openedConnections !== this.concurrency) {
142144
throw new Error(
143145
`Cannot start epoch ${epoch}: expected ${this.concurrency} open connections, got ${this.openedConnections}`,
@@ -153,9 +155,15 @@ export class LoadSession {
153155
epoch,
154156
stopRequested: false,
155157
workerPromises: [],
158+
loadOptions,
156159
};
157160
this.runState = runState;
158161

162+
const mode = loadOptions.pipelined
163+
? `pipelined max_inflight=${loadOptions.maxInflightPerConnection}`
164+
: 'closed-loop';
165+
console.log(`[distributed] starting epoch ${epoch} in ${mode} mode`);
166+
159167
runState.workerPromises = this.conns.map((conn, workerIndex) => {
160168
if (!conn) {
161169
throw new Error(`Connection ${workerIndex} not open`);
@@ -213,13 +221,23 @@ export class LoadSession {
213221
workerIndex: number,
214222
runState: RunState,
215223
): Promise<void> {
224+
const nextTransferPair = this.makeTransferPairPicker(workerIndex);
225+
if (!runState.loadOptions.pipelined) {
226+
await this.closedLoopWorker(conn, workerIndex, runState, nextTransferPair);
227+
return;
228+
}
229+
230+
await this.pipelinedWorker(conn, workerIndex, runState, nextTransferPair);
231+
}
232+
233+
private makeTransferPairPicker(workerIndex: number): () => [number, number] {
216234
const pairsPerWorker = Math.max(
217235
1,
218236
Math.floor(this.pairs.count / this.concurrency),
219237
);
220238
let pairIndex = workerIndex * pairsPerWorker;
221239

222-
const nextTransferPair = (): [number, number] => {
240+
return (): [number, number] => {
223241
if (pairIndex >= this.pairs.count) {
224242
pairIndex = 0;
225243
}
@@ -229,22 +247,65 @@ export class LoadSession {
229247
pairIndex++;
230248
return [from, to];
231249
};
250+
}
232251

252+
private async closedLoopWorker(
253+
conn: ReducerConnector,
254+
workerIndex: number,
255+
runState: RunState,
256+
nextTransferPair: () => [number, number],
257+
): Promise<void> {
233258
while (!runState.stopRequested) {
234-
const [from, to] = nextTransferPair();
259+
await this.runTransfer(conn, workerIndex, nextTransferPair);
260+
}
261+
}
235262

236-
try {
237-
await withOpTimeout(
238-
this.scenario(conn, from, to, 1),
239-
`[distributed] worker ${workerIndex} transfer ${from}->${to}`,
263+
private async pipelinedWorker(
264+
conn: ReducerConnector,
265+
workerIndex: number,
266+
runState: RunState,
267+
nextTransferPair: () => [number, number],
268+
): Promise<void> {
269+
const maxInflight = runState.loadOptions.maxInflightPerConnection;
270+
const inflight = new Set<Promise<void>>();
271+
272+
const launchTransfer = () => {
273+
const transfer = this.runTransfer(conn, workerIndex, nextTransferPair);
274+
inflight.add(transfer);
275+
transfer.finally(() => {
276+
inflight.delete(transfer);
277+
});
278+
};
279+
280+
while (!runState.stopRequested) {
281+
if (inflight.size < maxInflight) {
282+
launchTransfer();
283+
} else {
284+
await Promise.race(inflight);
285+
}
286+
}
287+
288+
await Promise.all(inflight);
289+
}
290+
291+
private async runTransfer(
292+
conn: ReducerConnector,
293+
workerIndex: number,
294+
nextTransferPair: () => [number, number],
295+
): Promise<void> {
296+
const [from, to] = nextTransferPair();
297+
298+
try {
299+
await withOpTimeout(
300+
this.scenario(conn, from, to, 1),
301+
`[distributed] worker ${workerIndex} transfer ${from}->${to}`,
302+
);
303+
} catch (err) {
304+
if (process.env.LOG_ERRORS === '1') {
305+
const msg = err instanceof Error ? err.message : String(err);
306+
console.warn(
307+
`[distributed] worker ${workerIndex} failed ${from}->${to}: ${msg}`,
240308
);
241-
} catch (err) {
242-
if (process.env.LOG_ERRORS === '1') {
243-
const msg = err instanceof Error ? err.message : String(err);
244-
console.warn(
245-
`[distributed] worker ${workerIndex} failed ${from}->${to}: ${msg}`,
246-
);
247-
}
248309
}
249310
}
250311
}

templates/keynote-2/src/distributed/protocol.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@ export type GeneratorLocalState = 'registered' | 'ready' | 'running';
22

33
export type CoordinatorPhase = 'idle' | 'warmup' | 'measure' | 'stop';
44

5+
export type DistributedLoadOptions = {
6+
pipelined: boolean;
7+
maxInflightPerConnection: number;
8+
};
9+
510
export type GeneratorSnapshot = {
611
id: string;
712
hostname: string;
@@ -16,6 +21,7 @@ export type EpochResult = {
1621
label: string | null;
1722
test: string;
1823
connector: string;
24+
loadOptions: DistributedLoadOptions;
1925
warmupSeconds: number;
2026
windowSeconds: number;
2127
actualWindowSeconds: number;
@@ -39,6 +45,7 @@ export type CoordinatorState = {
3945
participants: string[];
4046
test: string;
4147
connector: string;
48+
loadOptions: DistributedLoadOptions;
4249
generators: GeneratorSnapshot[];
4350
lastResult: EpochResult | null;
4451
};

0 commit comments

Comments
 (0)