Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion templates/keynote-2/DEVELOP.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ Notes:

- `--warmup-seconds` is the unmeasured warmup period. Generators submit requests during warmup, but those transactions are excluded from TPS.
- `--window-seconds` is the measured interval.
- `--pipelined 1` enables request pipelining. Omit it or pass `--pipelined 0` to stay in closed-loop mode, one request at a time.
- `--max-inflight-per-connection` caps the number of in-flight requests each connection may have when pipelining is enabled. The default is `8`.
- `--verify 1` preserves the existing benchmark semantics by running one verification pass centrally after the epoch completes.
- The coordinator derives the HTTP metrics endpoint from `--stdb-url` by switching to `http://` or `https://` and appending `/v1/metrics`.
- For a real multi-machine run, change `--bind 127.0.0.1` to `--bind 0.0.0.0` so remote generators can reach the coordinator.
Expand Down Expand Up @@ -351,6 +353,7 @@ The result contains:

- participating generator IDs
- total participating connections
- whether the epoch used closed-loop or pipelined load, and the per-connection in-flight cap
- committed transaction delta from the server metrics endpoint
- measured window duration
- computed TPS
Expand All @@ -361,7 +364,7 @@ The result contains:
- Start the coordinator before the generators.
- Generators begin submitting requests when the coordinator enters `warmup`, not when the measured window begins.
- Throughput is measured only from the committed transaction counter delta recorded after warmup, so warmup transactions are excluded.
- For this distributed TypeScript mode, each connection runs closed-loop with one request at a time. There is no pipelining in this flow.
- Distributed TypeScript mode defaults to closed-loop, one request at a time per connection. Enable pipelining on the coordinator with `--pipelined 1`, and all generators will follow that setting for the epoch.
- Late generators are allowed to register and become ready while an epoch is already running, but they only participate in the next epoch.
- The coordinator does not use heartbeats. It includes generators that most recently reported `ready`.
- If a participating generator dies and never sends `/stopped`, the epoch result is written with an `error`, and that generator remains `running` in coordinator status until you restart it and let it register again.
Expand Down
1 change: 1 addition & 0 deletions templates/keynote-2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ This architectural difference means SpacetimeDB can execute transactions in micr
### Client Pipelining

The benchmark supports **pipelining** for all clients - sending multiple requests without waiting for responses. This maximizes throughput by keeping connections saturated.
The distributed TypeScript SpacetimeDB flow also supports coordinator-controlled pipelining via `bench-dist-coordinator -- --pipelined 1`, with `--max-inflight-per-connection` to cap outstanding requests per connection.

### Confirmed Reads (`withConfirmedReads`)

Expand Down
4 changes: 2 additions & 2 deletions templates/keynote-2/src/distributed/control.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ function printState(state: CoordinatorState): void {
`phase=${state.phase} epoch=${state.currentEpoch ?? '-'} label=${state.currentLabel ?? '-'}`,
);
console.log(
`test=${state.test} connector=${state.connector} participants=${state.participants.length}`,
`test=${state.test} connector=${state.connector} participants=${state.participants.length} mode=${state.loadOptions.pipelined ? `pipelined/${state.loadOptions.maxInflightPerConnection}` : 'closed-loop'}`,
);

if (state.generators.length === 0) {
Expand All @@ -35,7 +35,7 @@ function printState(state: CoordinatorState): void {
if (state.lastResult) {
console.log('last_result:');
console.log(
` epoch=${state.lastResult.epoch} tps=${state.lastResult.tps.toFixed(2)} delta=${state.lastResult.committedDelta} verification=${state.lastResult.verification}${state.lastResult.error ? ` error=${state.lastResult.error}` : ''}`,
` epoch=${state.lastResult.epoch} mode=${state.lastResult.loadOptions.pipelined ? `pipelined/${state.lastResult.loadOptions.maxInflightPerConnection}` : 'closed-loop'} tps=${state.lastResult.tps.toFixed(2)} delta=${state.lastResult.committedDelta} verification=${state.lastResult.verification}${state.lastResult.error ? ` error=${state.lastResult.error}` : ''}`,
);
}
}
Expand Down
24 changes: 23 additions & 1 deletion templates/keynote-2/src/distributed/coordinator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
import type {
CoordinatorPhase,
CoordinatorState,
DistributedLoadOptions,
EpochResult,
GeneratorLocalState,
GeneratorSnapshot,
Expand Down Expand Up @@ -103,6 +104,7 @@ async function runVerification(url: string, moduleName: string): Promise<void> {
class DistributedCoordinator {
private readonly testName: string;
private readonly connectorName: string;
private readonly loadOptions: DistributedLoadOptions;
private readonly warmupMs: number;
private readonly windowMs: number;
private readonly verifyAfterEpoch: boolean;
Expand All @@ -121,6 +123,7 @@ class DistributedCoordinator {
constructor(opts: {
testName: string;
connectorName: string;
loadOptions: DistributedLoadOptions;
warmupMs: number;
windowMs: number;
verifyAfterEpoch: boolean;
Expand All @@ -131,6 +134,7 @@ class DistributedCoordinator {
}) {
this.testName = opts.testName;
this.connectorName = opts.connectorName;
this.loadOptions = opts.loadOptions;
this.warmupMs = opts.warmupMs;
this.windowMs = opts.windowMs;
this.verifyAfterEpoch = opts.verifyAfterEpoch;
Expand Down Expand Up @@ -159,6 +163,7 @@ class DistributedCoordinator {
participants: this.currentEpoch?.participantIds ?? [],
test: this.testName,
connector: this.connectorName,
loadOptions: this.loadOptions,
generators,
lastResult: this.lastResult,
};
Expand Down Expand Up @@ -365,6 +370,7 @@ class DistributedCoordinator {
label: activeEpoch.label,
test: this.testName,
connector: this.connectorName,
loadOptions: this.loadOptions,
warmupSeconds: this.warmupMs / 1000,
windowSeconds: this.windowMs / 1000,
actualWindowSeconds,
Expand Down Expand Up @@ -438,11 +444,23 @@ async function main(): Promise<void> {
const resultsDir = getStringFlag(flags, 'results-dir', defaultResultsDir);
const warmupSeconds = getNumberFlag(flags, 'warmup-seconds', 15);
const windowSeconds = getNumberFlag(flags, 'window-seconds', 60);
const pipelined = getBoolFlag(flags, 'pipelined', false);
const maxInflightPerConnection = getNumberFlag(
flags,
'max-inflight-per-connection',
8,
);
const stopAckTimeoutSeconds = getNumberFlag(
flags,
'stop-ack-timeout-seconds',
60,
);
if (
!Number.isInteger(maxInflightPerConnection) ||
maxInflightPerConnection < 1
) {
throw new Error('--max-inflight-per-connection must be an integer >= 1');
}
const verifyAfterEpoch = getBoolFlag(flags, 'verify', false);
const rawStdbUrl = getStringFlag(
flags,
Expand All @@ -460,6 +478,10 @@ async function main(): Promise<void> {
const coordinator = new DistributedCoordinator({
testName,
connectorName,
loadOptions: {
pipelined,
maxInflightPerConnection,
},
warmupMs: warmupSeconds * 1000,
windowMs: windowSeconds * 1000,
verifyAfterEpoch,
Expand Down Expand Up @@ -530,7 +552,7 @@ async function main(): Promise<void> {
});

console.log(
`[coordinator] listening on http://${bind}:${port} test=${testName} connector=${connectorName} warmup=${warmupSeconds}s window=${windowSeconds}s verify=${verifyAfterEpoch ? 'on' : 'off'} stdb=${stdbUrl}`,
`[coordinator] listening on http://${bind}:${port} test=${testName} connector=${connectorName} mode=${pipelined ? `pipelined/${maxInflightPerConnection}` : 'closed-loop'} warmup=${warmupSeconds}s window=${windowSeconds}s verify=${verifyAfterEpoch ? 'on' : 'off'} stdb=${stdbUrl}`,
);
}

Expand Down
7 changes: 5 additions & 2 deletions templates/keynote-2/src/distributed/generator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,11 @@ async function main(): Promise<void> {
state.currentEpoch != null &&
state.participants.includes(id)
) {
console.log(`[generator ${id}] starting epoch ${state.currentEpoch}`);
await session.startEpoch(state.currentEpoch);
const { pipelined, maxInflightPerConnection } = state.loadOptions;
console.log(
`[generator ${id}] starting epoch ${state.currentEpoch} mode=${pipelined ? `pipelined/${maxInflightPerConnection}` : 'closed-loop'}`,
);
await session.startEpoch(state.currentEpoch, state.loadOptions);
activeEpoch = state.currentEpoch;
}
} else if (!shouldKeepRunning) {
Expand Down
89 changes: 75 additions & 14 deletions templates/keynote-2/src/distributed/loadSession.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { ReducerConnector } from '../core/connectors.ts';
import { performance } from 'node:perf_hooks';
import { pickTwoDistinct, zipfSampler } from '../core/zipf.ts';
import type { DistributedLoadOptions } from './protocol.ts';

const OP_TIMEOUT_MS = Number(process.env.BENCH_OP_TIMEOUT_MS ?? '15000');
const DEFAULT_PRECOMPUTED_TRANSFER_PAIRS = 10_000_000;
Expand Down Expand Up @@ -67,6 +68,7 @@ type RunState = {
epoch: number;
stopRequested: boolean;
workerPromises: Promise<void>[];
loadOptions: DistributedLoadOptions;
};

export class LoadSession {
Expand Down Expand Up @@ -137,7 +139,7 @@ export class LoadSession {
}
}

async startEpoch(epoch: number): Promise<void> {
async startEpoch(epoch: number, loadOptions: DistributedLoadOptions): Promise<void> {
if (this.openedConnections !== this.concurrency) {
throw new Error(
`Cannot start epoch ${epoch}: expected ${this.concurrency} open connections, got ${this.openedConnections}`,
Expand All @@ -153,9 +155,15 @@ export class LoadSession {
epoch,
stopRequested: false,
workerPromises: [],
loadOptions,
};
this.runState = runState;

const mode = loadOptions.pipelined
? `pipelined max_inflight=${loadOptions.maxInflightPerConnection}`
: 'closed-loop';
console.log(`[distributed] starting epoch ${epoch} in ${mode} mode`);

runState.workerPromises = this.conns.map((conn, workerIndex) => {
if (!conn) {
throw new Error(`Connection ${workerIndex} not open`);
Expand Down Expand Up @@ -213,13 +221,23 @@ export class LoadSession {
workerIndex: number,
runState: RunState,
): Promise<void> {
const nextTransferPair = this.makeTransferPairPicker(workerIndex);
if (!runState.loadOptions.pipelined) {
await this.closedLoopWorker(conn, workerIndex, runState, nextTransferPair);
return;
}

await this.pipelinedWorker(conn, workerIndex, runState, nextTransferPair);
}

private makeTransferPairPicker(workerIndex: number): () => [number, number] {
const pairsPerWorker = Math.max(
1,
Math.floor(this.pairs.count / this.concurrency),
);
let pairIndex = workerIndex * pairsPerWorker;

const nextTransferPair = (): [number, number] => {
return (): [number, number] => {
if (pairIndex >= this.pairs.count) {
pairIndex = 0;
}
Expand All @@ -229,22 +247,65 @@ export class LoadSession {
pairIndex++;
return [from, to];
};
}

private async closedLoopWorker(
conn: ReducerConnector,
workerIndex: number,
runState: RunState,
nextTransferPair: () => [number, number],
): Promise<void> {
while (!runState.stopRequested) {
const [from, to] = nextTransferPair();
await this.runTransfer(conn, workerIndex, nextTransferPair);
}
}

try {
await withOpTimeout(
this.scenario(conn, from, to, 1),
`[distributed] worker ${workerIndex} transfer ${from}->${to}`,
private async pipelinedWorker(
conn: ReducerConnector,
workerIndex: number,
runState: RunState,
nextTransferPair: () => [number, number],
): Promise<void> {
const maxInflight = runState.loadOptions.maxInflightPerConnection;
const inflight = new Set<Promise<void>>();

const launchTransfer = () => {
const transfer = this.runTransfer(conn, workerIndex, nextTransferPair);
inflight.add(transfer);
transfer.finally(() => {
inflight.delete(transfer);
});
};

while (!runState.stopRequested) {
if (inflight.size < maxInflight) {
launchTransfer();
} else {
await Promise.race(inflight);
}
}

await Promise.all(inflight);
}

private async runTransfer(
conn: ReducerConnector,
workerIndex: number,
nextTransferPair: () => [number, number],
): Promise<void> {
const [from, to] = nextTransferPair();

try {
await withOpTimeout(
this.scenario(conn, from, to, 1),
`[distributed] worker ${workerIndex} transfer ${from}->${to}`,
);
} catch (err) {
if (process.env.LOG_ERRORS === '1') {
const msg = err instanceof Error ? err.message : String(err);
console.warn(
`[distributed] worker ${workerIndex} failed ${from}->${to}: ${msg}`,
);
} catch (err) {
if (process.env.LOG_ERRORS === '1') {
const msg = err instanceof Error ? err.message : String(err);
console.warn(
`[distributed] worker ${workerIndex} failed ${from}->${to}: ${msg}`,
);
}
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions templates/keynote-2/src/distributed/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ export type GeneratorLocalState = 'registered' | 'ready' | 'running';

export type CoordinatorPhase = 'idle' | 'warmup' | 'measure' | 'stop';

export type DistributedLoadOptions = {
pipelined: boolean;
maxInflightPerConnection: number;
};

export type GeneratorSnapshot = {
id: string;
hostname: string;
Expand All @@ -16,6 +21,7 @@ export type EpochResult = {
label: string | null;
test: string;
connector: string;
loadOptions: DistributedLoadOptions;
warmupSeconds: number;
windowSeconds: number;
actualWindowSeconds: number;
Expand All @@ -39,6 +45,7 @@ export type CoordinatorState = {
participants: string[];
test: string;
connector: string;
loadOptions: DistributedLoadOptions;
generators: GeneratorSnapshot[];
lastResult: EpochResult | null;
};
Expand Down
Loading