diff --git a/packages/gossipsub/package.json b/packages/gossipsub/package.json index e0caa03f2b..92ba1a9b19 100644 --- a/packages/gossipsub/package.json +++ b/packages/gossipsub/package.json @@ -52,8 +52,8 @@ "release": "aegir release --no-types", "build": "aegir build", "generate": "protons ./src/message/rpc.proto", - "benchmark": "yarn benchmark:files test/benchmark/**/*.test.ts", - "benchmark:files": "NODE_OPTIONS='--max-old-space-size=4096 --loader=ts-node/esm' benchmark --config .benchrc.yaml --defaultBranch master", + "benchmark": "npm run benchmark:files -- 'dist/test/benchmark/**/*.test.js' -- --exit", + "benchmark:files": "NODE_OPTIONS='--max-old-space-size=4096' aegir test -t node --timeout 0 -f", "test": "aegir test -f dist/test/*.spec.js", "test:node": "aegir test -t node -f dist/test/*.spec.js -f dist/test/unit/*.spec.js -f dist/test/e2e/*.spec.js", "test:chrome": "aegir test -f dist/test/*.spec.js -t browser", @@ -96,14 +96,15 @@ }, "devDependencies": { "@chainsafe/as-sha256": "^1.2.0", - "@dapplion/benchmark": "^1.0.0", "@libp2p/floodsub": "^11.0.22", "@libp2p/logger": "^6.2.7", "@libp2p/peer-store": "^12.0.20", + "@types/benchmark": "^2.1.5", "@types/node": "^22.18.1", "@types/sinon": "^21.0.1", "abortable-iterator": "^5.1.0", "aegir": "^47.0.21", + "benchmark": "^2.1.4", "datastore-core": "^11.0.1", "delay": "^7.0.0", "it-all": "^3.0.6", diff --git a/packages/gossipsub/test/benchmark/asyncIterable.test.ts b/packages/gossipsub/test/benchmark/asyncIterable.test.ts index 724c202559..48ac4beca5 100644 --- a/packages/gossipsub/test/benchmark/asyncIterable.test.ts +++ b/packages/gossipsub/test/benchmark/asyncIterable.test.ts @@ -1,7 +1,7 @@ -import { itBench } from '@dapplion/benchmark' import { abortableSource } from 'abortable-iterator' import all from 'it-all' import { pipe } from 'it-pipe' +import { runBenchmark } from '../utils/benchmark.js' describe('abortableSource cost', function () { const n = 10000 @@ -16,21 +16,18 @@ describe('abortableSource cost', function () { } for (let k = 0; k < 5; k++) { - itBench({ - id: `async iterate abortable x${k} bytesSource ${n}`, - beforeEach: () => { + it(`async iterate abortable x${k} bytesSource ${n}`, async () => { + await runBenchmark(`async iterate abortable x${k} bytesSource ${n}`, async () => { let source = bytesSource() for (let i = 0; i < k; i++) { source = abortableSource(source, controller.signal) } - return source - }, - fn: async (source) => { + for await (const chunk of source) { // eslint-disable-next-line @typescript-eslint/no-unused-expressions chunk } - } + }) }) } }) @@ -51,35 +48,31 @@ describe('pipe extra iterables cost', function () { } } - itBench({ - id: `async iterate pipe x0 transforms ${n}`, - fn: async () => { + it(`async iterate pipe x0 transforms ${n}`, async () => { + await runBenchmark(`async iterate pipe x0 transforms ${n}`, async () => { await pipe(numberSource, all) - } + }) }) - itBench({ - id: `async iterate pipe x1 transforms ${n}`, - fn: async () => { + it(`async iterate pipe x1 transforms ${n}`, async () => { + await runBenchmark(`async iterate pipe x1 transforms ${n}`, async () => { await pipe(numberSource, numberTransform, all) - } + }) }) - itBench({ - id: `async iterate pipe x2 transforms ${n}`, - fn: async () => { + it(`async iterate pipe x2 transforms ${n}`, async () => { + await runBenchmark(`async iterate pipe x2 transforms ${n}`, async () => { await pipe( numberSource, numberTransform, numberTransform, all ) - } + }) }) - itBench({ - id: `async iterate pipe x4 transforms ${n}`, - fn: async () => { + it(`async iterate pipe x4 transforms ${n}`, async () => { + await runBenchmark(`async iterate pipe x4 transforms ${n}`, async () => { await pipe( numberSource, numberTransform, @@ -88,12 +81,11 @@ describe('pipe extra iterables cost', function () { numberTransform, all ) - } + }) }) - itBench({ - id: `async iterate pipe x8 transforms ${n}`, - fn: async () => { + it(`async iterate pipe x8 transforms ${n}`, async () => { + await runBenchmark(`async iterate pipe x8 transforms ${n}`, async () => { await pipe( numberSource, numberTransform, @@ -106,6 +98,6 @@ describe('pipe extra iterables cost', function () { numberTransform, all ) - } + }) }) }) diff --git a/packages/gossipsub/test/benchmark/index.test.ts b/packages/gossipsub/test/benchmark/index.test.ts index e2a1daea9a..34a8be74f1 100644 --- a/packages/gossipsub/test/benchmark/index.test.ts +++ b/packages/gossipsub/test/benchmark/index.test.ts @@ -1,6 +1,6 @@ -import { itBench } from '@dapplion/benchmark' import { expect } from 'aegir/chai' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' +import { runBenchmark } from '../utils/benchmark.js' import { connectPubsubNodes, createComponentsArray, @@ -8,14 +8,12 @@ import { } from '../utils/create-pubsub.js' import { awaitEvents, checkReceivedSubscriptions, checkReceivedSubscription } from '../utils/events.js' -import type { GossipSubAndComponents } from '../utils/create-pubsub.js' describe('heartbeat', function () { const topic = 'foobar' const numTopic = 70 const numPeers = 50 const numPeersPerTopic = 30 - let numLoop = 0 const getTopic = (i: number): string => { return topic + String(i) @@ -48,94 +46,91 @@ describe('heartbeat', function () { * | 0 | 0, 1, 2, 3| * | 1 | 0, 2, 3, 4| */ - itBench({ - id: 'heartbeat', - before: async () => { - const psubs = await createComponentsArray({ - number: numPeers, - init: { - scoreParams: { - IPColocationFactorWeight: 0 - }, - floodPublish: true, - // TODO: why we need to configure this low score - // probably we should tweak topic score params - // is that why we don't have mesh peers? - scoreThresholds: { - gossipThreshold: -10, - publishThreshold: -100, - graylistThreshold: -1000 - } + it('heartbeat', async () => { + const psubs = await createComponentsArray({ + number: numPeers, + init: { + scoreParams: { + IPColocationFactorWeight: 0 + }, + floodPublish: true, + // TODO: why we need to configure this low score + // probably we should tweak topic score params + // is that why we don't have mesh peers? + scoreThresholds: { + gossipThreshold: -10, + publishThreshold: -100, + graylistThreshold: -1000 } - }) + } + }) - // build the star - await Promise.all(psubs.slice(1).map(async (ps) => connectPubsubNodes(psubs[0], ps))) - await Promise.all(psubs.map(async (ps) => awaitEvents(ps.pubsub, 'gossipsub:heartbeat', 2))) + // build the star + await Promise.all(psubs.slice(1).map(async (ps) => connectPubsubNodes(psubs[0], ps))) + await Promise.all(psubs.map(async (ps) => awaitEvents(ps.pubsub, 'gossipsub:heartbeat', 2))) - await denseConnect(psubs) + await denseConnect(psubs) - // make sure psub 0 has `numPeers - 1` peers - expect(psubs[0].pubsub.getPeers().length).to.be.gte( - numPeers - 1, - `peer 0 should have at least ${numPeers - 1} peers` - ) + // make sure psub 0 has `numPeers - 1` peers + expect(psubs[0].pubsub.getPeers().length).to.be.gte( + numPeers - 1, + `peer 0 should have at least ${numPeers - 1} peers` + ) - const peerIds = psubs.map((psub) => psub.components.peerId.toString()) - for (let topicIndex = 0; topicIndex < numTopic; topicIndex++) { - const topic = getTopic(topicIndex) - psubs.forEach((ps) => { ps.pubsub.subscribe(topic) }) - const peerIndices = getTopicPeerIndices(topicIndex) - const peerIdsOnTopic = peerIndices.map((peerIndex) => peerIds[peerIndex]) - // peer 0 see all subscriptions from other - const subscription = checkReceivedSubscriptions(psubs[0], peerIdsOnTopic, topic) - // other peers should see the subsription from peer 0 to prevent PublishError.InsufficientPeers error - const otherSubscriptions = peerIndices - .slice(1) - .map((peerIndex) => psubs[peerIndex]) - .map(async (psub) => checkReceivedSubscription(psub, peerIds[0], topic, 0)) - peerIndices.forEach((peerIndex) => { psubs[peerIndex].pubsub.subscribe(topic) }) - await Promise.all([subscription, ...otherSubscriptions]) - } + const peerIds = psubs.map((psub) => psub.components.peerId.toString()) + for (let topicIndex = 0; topicIndex < numTopic; topicIndex++) { + const topic = getTopic(topicIndex) + psubs.forEach((ps) => { ps.pubsub.subscribe(topic) }) + const peerIndices = getTopicPeerIndices(topicIndex) + const peerIdsOnTopic = peerIndices.map((peerIndex) => peerIds[peerIndex]) + // peer 0 see all subscriptions from other + const subscription = checkReceivedSubscriptions(psubs[0], peerIdsOnTopic, topic) + // other peers should see the subsription from peer 0 to prevent PublishError.InsufficientPeers error + const otherSubscriptions = peerIndices + .slice(1) + .map((peerIndex) => psubs[peerIndex]) + .map(async (psub) => checkReceivedSubscription(psub, peerIds[0], topic, 0)) + peerIndices.forEach((peerIndex) => { psubs[peerIndex].pubsub.subscribe(topic) }) + await Promise.all([subscription, ...otherSubscriptions]) + } - // wait for heartbeats to build mesh - await Promise.all(psubs.map(async (ps) => awaitEvents(ps.pubsub, 'gossipsub:heartbeat', 3))) + // wait for heartbeats to build mesh + await Promise.all(psubs.map(async (ps) => awaitEvents(ps.pubsub, 'gossipsub:heartbeat', 3))) - // make sure psubs 0 have at least 10 topic peers and 4 mesh peers for each topic - for (let i = 0; i < numTopic; i++) { - expect((psubs[0].pubsub).getSubscribers(getTopic(i)).length).to.be.gte( - 10, - `psub 0: topic ${i} does not have enough topic peers` - ) + // make sure psubs 0 have at least 10 topic peers and 4 mesh peers for each topic + for (let i = 0; i < numTopic; i++) { + expect((psubs[0].pubsub).getSubscribers(getTopic(i)).length).to.be.gte( + 10, + `psub 0: topic ${i} does not have enough topic peers` + ) - expect((psubs[0].pubsub).getMeshPeers(getTopic(i)).length).to.be.gte( - 4, - `psub 0: topic ${i} does not have enough mesh peers` - ) - } + expect((psubs[0].pubsub).getMeshPeers(getTopic(i)).length).to.be.gte( + 4, + `psub 0: topic ${i} does not have enough mesh peers` + ) + } - return psubs - }, - beforeEach: async (psubs) => { - numLoop++ - const msg = `its not a flooooood ${numLoop}` - const promises = [] - for (let topicIndex = 0; topicIndex < numTopic; topicIndex++) { - for (const peerIndex of getTopicPeerIndices(topicIndex)) { - promises.push( - psubs[peerIndex].pubsub.publish( - getTopic(topicIndex), - uint8ArrayFromString(psubs[peerIndex].components.peerId.toString() + msg) - ) + // publish one round of messages so the measured heartbeat has gossip to emit + const msg = 'its not a flooooood' + const promises = [] + for (let topicIndex = 0; topicIndex < numTopic; topicIndex++) { + for (const peerIndex of getTopicPeerIndices(topicIndex)) { + promises.push( + psubs[peerIndex].pubsub.publish( + getTopic(topicIndex), + uint8ArrayFromString(psubs[peerIndex].components.peerId.toString() + msg) ) - } + ) } - await Promise.all(promises) - - return psubs[0] - }, - fn: async (firstPsub: GossipSubAndComponents) => { - return (firstPsub.pubsub).heartbeat() } + await Promise.all(promises) + + // measure heartbeat() in isolation — the publish round above is excluded from the timing + await runBenchmark('heartbeat', async () => { + await psubs[0].pubsub.heartbeat() + }) + + // stop the nodes so their heartbeat timers don't keep the process alive + await Promise.allSettled(psubs.map(async (ps) => ps.pubsub.stop())) }) }) diff --git a/packages/gossipsub/test/benchmark/protobuf.test.ts b/packages/gossipsub/test/benchmark/protobuf.test.ts index e94b369a8a..86522374b8 100644 --- a/packages/gossipsub/test/benchmark/protobuf.test.ts +++ b/packages/gossipsub/test/benchmark/protobuf.test.ts @@ -1,6 +1,6 @@ import crypto from 'node:crypto' -import { itBench } from '@dapplion/benchmark' import { RPC } from '../../src/message/rpc.js' +import { runBenchmark } from '../utils/benchmark.js' describe('protobuf', function () { const testCases: Array<{ name: string, length: number }> = [ @@ -30,24 +30,20 @@ describe('protobuf', function () { const runsFactor = 1000 - itBench({ - id: `decode ${name} message ${length} bytes`, - fn: () => { + it(`decode ${name} message ${length} bytes`, async () => { + await runBenchmark(`decode ${name} message ${length} bytes`, () => { for (let i = 0; i < runsFactor; i++) { RPC.decode(bytes) } - }, - runsFactor + }, runsFactor) }) - itBench({ - id: `encode ${name} message ${length} bytes`, - fn: () => { + it(`encode ${name} message ${length} bytes`, async () => { + await runBenchmark(`encode ${name} message ${length} bytes`, () => { for (let i = 0; i < runsFactor; i++) { RPC.encode(rpc) } - }, - runsFactor + }, runsFactor) }) } }) diff --git a/packages/gossipsub/test/benchmark/time-cache.test.ts b/packages/gossipsub/test/benchmark/time-cache.test.ts index ef06d8d2de..ac5d0d8e7f 100644 --- a/packages/gossipsub/test/benchmark/time-cache.test.ts +++ b/packages/gossipsub/test/benchmark/time-cache.test.ts @@ -1,7 +1,7 @@ -import { itBench } from '@dapplion/benchmark' // @ts-expect-error no types import TimeCache from 'time-cache' import { SimpleTimeCache } from '../../src/utils/time-cache.js' +import { runBenchmark } from '../utils/benchmark.js' // TODO: errors with "Error: root suite not found" describe('npm TimeCache vs SimpleTimeCache', () => { @@ -10,12 +10,16 @@ describe('npm TimeCache vs SimpleTimeCache', () => { const simpleTimeCache = new SimpleTimeCache({ validityMs: 1000 }) for (const iteration of iterations) { - itBench(`npm TimeCache.put x${iteration}`, () => { - for (let j = 0; j < iteration; j++) { timeCache.put(String(j)) } + it(`npm TimeCache.put x${iteration}`, async () => { + await runBenchmark(`npm TimeCache.put x${iteration}`, () => { + for (let j = 0; j < iteration; j++) { timeCache.put(String(j)) } + }) }) - itBench(`SimpleTimeCache.put x${iteration}`, () => { - for (let j = 0; j < iteration; j++) { simpleTimeCache.put(String(j), true) } + it(`SimpleTimeCache.put x${iteration}`, async () => { + await runBenchmark(`SimpleTimeCache.put x${iteration}`, () => { + for (let j = 0; j < iteration; j++) { simpleTimeCache.put(String(j), true) } + }) }) } }) diff --git a/packages/gossipsub/test/utils/benchmark.ts b/packages/gossipsub/test/utils/benchmark.ts new file mode 100644 index 0000000000..552868d116 --- /dev/null +++ b/packages/gossipsub/test/utils/benchmark.ts @@ -0,0 +1,26 @@ +import Benchmark from 'benchmark' + +export async function runBenchmark (name: string, fn: () => void | Promise, runsFactor = 1): Promise { + await new Promise((resolve, reject) => { + new Benchmark(name, { + defer: true, + initCount: 1, + maxTime: 1, + minSamples: 1, + minTime: 0.1, + fn (deferred: Benchmark.Deferred) { + Promise.resolve() + .then(fn) + .then(() => { deferred.resolve() }) + .catch(reject) + } + }) + .on('complete', function (this: Benchmark) { + const hz = this.hz * runsFactor + process.stdout.write(` ${name}: ${hz.toFixed(4)} ops/s ${(1000 / hz).toFixed(6)} ms/op ${this.count} runs\n`) + resolve() + }) + .on('error', reject) + .run({ async: true }) + }) +}