Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions packages/gossipsub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@
"release": "aegir release --no-types",
"build": "aegir build",
"generate": "protons ./src/message/rpc.proto",
"benchmark": "yarn benchmark:files test/benchmark/**/*.test.ts",
"benchmark:files": "NODE_OPTIONS='--max-old-space-size=4096 --loader=ts-node/esm' benchmark --config .benchrc.yaml --defaultBranch master",
"benchmark": "npm run benchmark:files -- 'dist/test/benchmark/**/*.test.js' -- --exit",
"benchmark:files": "NODE_OPTIONS='--max-old-space-size=4096' aegir test -t node --timeout 0 -f",
"test": "aegir test -f dist/test/*.spec.js",
"test:node": "aegir test -t node -f dist/test/*.spec.js -f dist/test/unit/*.spec.js -f dist/test/e2e/*.spec.js",
"test:chrome": "aegir test -f dist/test/*.spec.js -t browser",
Expand Down Expand Up @@ -96,14 +96,15 @@
},
"devDependencies": {
"@chainsafe/as-sha256": "^1.2.0",
"@dapplion/benchmark": "^1.0.0",
"@libp2p/floodsub": "^11.0.22",
"@libp2p/logger": "^6.2.7",
"@libp2p/peer-store": "^12.0.20",
"@types/benchmark": "^2.1.5",
"@types/node": "^22.18.1",
"@types/sinon": "^21.0.1",
"abortable-iterator": "^5.1.0",
"aegir": "^47.0.21",
"benchmark": "^2.1.4",
"datastore-core": "^11.0.1",
"delay": "^7.0.0",
"it-all": "^3.0.6",
Expand Down
48 changes: 20 additions & 28 deletions packages/gossipsub/test/benchmark/asyncIterable.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { itBench } from '@dapplion/benchmark'
import { abortableSource } from 'abortable-iterator'
import all from 'it-all'
import { pipe } from 'it-pipe'
import { runBenchmark } from '../utils/benchmark.js'

describe('abortableSource cost', function () {
const n = 10000
Expand All @@ -16,21 +16,18 @@ describe('abortableSource cost', function () {
}

for (let k = 0; k < 5; k++) {
itBench({
id: `async iterate abortable x${k} bytesSource ${n}`,
beforeEach: () => {
it(`async iterate abortable x${k} bytesSource ${n}`, async () => {
await runBenchmark(`async iterate abortable x${k} bytesSource ${n}`, async () => {
let source = bytesSource()
for (let i = 0; i < k; i++) {
source = abortableSource(source, controller.signal)
}
return source
},
fn: async (source) => {

for await (const chunk of source) {
// eslint-disable-next-line @typescript-eslint/no-unused-expressions
chunk
}
}
})
})
}
})
Expand All @@ -51,35 +48,31 @@ describe('pipe extra iterables cost', function () {
}
}

itBench({
id: `async iterate pipe x0 transforms ${n}`,
fn: async () => {
it(`async iterate pipe x0 transforms ${n}`, async () => {
await runBenchmark(`async iterate pipe x0 transforms ${n}`, async () => {
await pipe(numberSource, all)
}
})
})

itBench({
id: `async iterate pipe x1 transforms ${n}`,
fn: async () => {
it(`async iterate pipe x1 transforms ${n}`, async () => {
await runBenchmark(`async iterate pipe x1 transforms ${n}`, async () => {
await pipe(numberSource, numberTransform, all)
}
})
})

itBench({
id: `async iterate pipe x2 transforms ${n}`,
fn: async () => {
it(`async iterate pipe x2 transforms ${n}`, async () => {
await runBenchmark(`async iterate pipe x2 transforms ${n}`, async () => {
await pipe(
numberSource,
numberTransform,
numberTransform,
all
)
}
})
})

itBench({
id: `async iterate pipe x4 transforms ${n}`,
fn: async () => {
it(`async iterate pipe x4 transforms ${n}`, async () => {
await runBenchmark(`async iterate pipe x4 transforms ${n}`, async () => {
await pipe(
numberSource,
numberTransform,
Expand All @@ -88,12 +81,11 @@ describe('pipe extra iterables cost', function () {
numberTransform,
all
)
}
})
})

itBench({
id: `async iterate pipe x8 transforms ${n}`,
fn: async () => {
it(`async iterate pipe x8 transforms ${n}`, async () => {
await runBenchmark(`async iterate pipe x8 transforms ${n}`, async () => {
await pipe(
numberSource,
numberTransform,
Expand All @@ -106,6 +98,6 @@ describe('pipe extra iterables cost', function () {
numberTransform,
all
)
}
})
})
})
155 changes: 75 additions & 80 deletions packages/gossipsub/test/benchmark/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
import { itBench } from '@dapplion/benchmark'
import { expect } from 'aegir/chai'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { runBenchmark } from '../utils/benchmark.js'
import {
connectPubsubNodes,
createComponentsArray,
denseConnect

} from '../utils/create-pubsub.js'
import { awaitEvents, checkReceivedSubscriptions, checkReceivedSubscription } from '../utils/events.js'
import type { GossipSubAndComponents } from '../utils/create-pubsub.js'

describe('heartbeat', function () {
const topic = 'foobar'
const numTopic = 70
const numPeers = 50
const numPeersPerTopic = 30
let numLoop = 0

const getTopic = (i: number): string => {
return topic + String(i)
Expand Down Expand Up @@ -48,94 +46,91 @@ describe('heartbeat', function () {
* | 0 | 0, 1, 2, 3|
* | 1 | 0, 2, 3, 4|
*/
itBench({
id: 'heartbeat',
before: async () => {
const psubs = await createComponentsArray({
number: numPeers,
init: {
scoreParams: {
IPColocationFactorWeight: 0
},
floodPublish: true,
// TODO: why we need to configure this low score
// probably we should tweak topic score params
// is that why we don't have mesh peers?
scoreThresholds: {
gossipThreshold: -10,
publishThreshold: -100,
graylistThreshold: -1000
}
it('heartbeat', async () => {
const psubs = await createComponentsArray({
number: numPeers,
init: {
scoreParams: {
IPColocationFactorWeight: 0
},
floodPublish: true,
// TODO: why we need to configure this low score
// probably we should tweak topic score params
// is that why we don't have mesh peers?
scoreThresholds: {
gossipThreshold: -10,
publishThreshold: -100,
graylistThreshold: -1000
}
})
}
})

// build the star
await Promise.all(psubs.slice(1).map(async (ps) => connectPubsubNodes(psubs[0], ps)))
await Promise.all(psubs.map(async (ps) => awaitEvents(ps.pubsub, 'gossipsub:heartbeat', 2)))
// build the star
await Promise.all(psubs.slice(1).map(async (ps) => connectPubsubNodes(psubs[0], ps)))
await Promise.all(psubs.map(async (ps) => awaitEvents(ps.pubsub, 'gossipsub:heartbeat', 2)))

await denseConnect(psubs)
await denseConnect(psubs)

// make sure psub 0 has `numPeers - 1` peers
expect(psubs[0].pubsub.getPeers().length).to.be.gte(
numPeers - 1,
`peer 0 should have at least ${numPeers - 1} peers`
)
// make sure psub 0 has `numPeers - 1` peers
expect(psubs[0].pubsub.getPeers().length).to.be.gte(
numPeers - 1,
`peer 0 should have at least ${numPeers - 1} peers`
)

const peerIds = psubs.map((psub) => psub.components.peerId.toString())
for (let topicIndex = 0; topicIndex < numTopic; topicIndex++) {
const topic = getTopic(topicIndex)
psubs.forEach((ps) => { ps.pubsub.subscribe(topic) })
const peerIndices = getTopicPeerIndices(topicIndex)
const peerIdsOnTopic = peerIndices.map((peerIndex) => peerIds[peerIndex])
// peer 0 see all subscriptions from other
const subscription = checkReceivedSubscriptions(psubs[0], peerIdsOnTopic, topic)
// other peers should see the subsription from peer 0 to prevent PublishError.InsufficientPeers error
const otherSubscriptions = peerIndices
.slice(1)
.map((peerIndex) => psubs[peerIndex])
.map(async (psub) => checkReceivedSubscription(psub, peerIds[0], topic, 0))
peerIndices.forEach((peerIndex) => { psubs[peerIndex].pubsub.subscribe(topic) })
await Promise.all([subscription, ...otherSubscriptions])
}
const peerIds = psubs.map((psub) => psub.components.peerId.toString())
for (let topicIndex = 0; topicIndex < numTopic; topicIndex++) {
const topic = getTopic(topicIndex)
psubs.forEach((ps) => { ps.pubsub.subscribe(topic) })
const peerIndices = getTopicPeerIndices(topicIndex)
const peerIdsOnTopic = peerIndices.map((peerIndex) => peerIds[peerIndex])
// peer 0 see all subscriptions from other
const subscription = checkReceivedSubscriptions(psubs[0], peerIdsOnTopic, topic)
// other peers should see the subsription from peer 0 to prevent PublishError.InsufficientPeers error
const otherSubscriptions = peerIndices
.slice(1)
.map((peerIndex) => psubs[peerIndex])
.map(async (psub) => checkReceivedSubscription(psub, peerIds[0], topic, 0))
peerIndices.forEach((peerIndex) => { psubs[peerIndex].pubsub.subscribe(topic) })
await Promise.all([subscription, ...otherSubscriptions])
}

// wait for heartbeats to build mesh
await Promise.all(psubs.map(async (ps) => awaitEvents(ps.pubsub, 'gossipsub:heartbeat', 3)))
// wait for heartbeats to build mesh
await Promise.all(psubs.map(async (ps) => awaitEvents(ps.pubsub, 'gossipsub:heartbeat', 3)))

// make sure psubs 0 have at least 10 topic peers and 4 mesh peers for each topic
for (let i = 0; i < numTopic; i++) {
expect((psubs[0].pubsub).getSubscribers(getTopic(i)).length).to.be.gte(
10,
`psub 0: topic ${i} does not have enough topic peers`
)
// make sure psubs 0 have at least 10 topic peers and 4 mesh peers for each topic
for (let i = 0; i < numTopic; i++) {
expect((psubs[0].pubsub).getSubscribers(getTopic(i)).length).to.be.gte(
10,
`psub 0: topic ${i} does not have enough topic peers`
)

expect((psubs[0].pubsub).getMeshPeers(getTopic(i)).length).to.be.gte(
4,
`psub 0: topic ${i} does not have enough mesh peers`
)
}
expect((psubs[0].pubsub).getMeshPeers(getTopic(i)).length).to.be.gte(
4,
`psub 0: topic ${i} does not have enough mesh peers`
)
}

return psubs
},
beforeEach: async (psubs) => {
numLoop++
const msg = `its not a flooooood ${numLoop}`
const promises = []
for (let topicIndex = 0; topicIndex < numTopic; topicIndex++) {
for (const peerIndex of getTopicPeerIndices(topicIndex)) {
promises.push(
psubs[peerIndex].pubsub.publish(
getTopic(topicIndex),
uint8ArrayFromString(psubs[peerIndex].components.peerId.toString() + msg)
)
// publish one round of messages so the measured heartbeat has gossip to emit
const msg = 'its not a flooooood'
const promises = []
for (let topicIndex = 0; topicIndex < numTopic; topicIndex++) {
for (const peerIndex of getTopicPeerIndices(topicIndex)) {
promises.push(
psubs[peerIndex].pubsub.publish(
getTopic(topicIndex),
uint8ArrayFromString(psubs[peerIndex].components.peerId.toString() + msg)
)
}
)
}
await Promise.all(promises)

return psubs[0]
},
fn: async (firstPsub: GossipSubAndComponents) => {
return (firstPsub.pubsub).heartbeat()
}
await Promise.all(promises)

// measure heartbeat() in isolation — the publish round above is excluded from the timing
await runBenchmark('heartbeat', async () => {
await psubs[0].pubsub.heartbeat()
})

// stop the nodes so their heartbeat timers don't keep the process alive
await Promise.allSettled(psubs.map(async (ps) => ps.pubsub.stop()))
})
})
18 changes: 7 additions & 11 deletions packages/gossipsub/test/benchmark/protobuf.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import crypto from 'node:crypto'
import { itBench } from '@dapplion/benchmark'
import { RPC } from '../../src/message/rpc.js'
import { runBenchmark } from '../utils/benchmark.js'

describe('protobuf', function () {
const testCases: Array<{ name: string, length: number }> = [
Expand Down Expand Up @@ -30,24 +30,20 @@ describe('protobuf', function () {

const runsFactor = 1000

itBench({
id: `decode ${name} message ${length} bytes`,
fn: () => {
it(`decode ${name} message ${length} bytes`, async () => {
await runBenchmark(`decode ${name} message ${length} bytes`, () => {
for (let i = 0; i < runsFactor; i++) {
RPC.decode(bytes)
}
},
runsFactor
}, runsFactor)
})

itBench({
id: `encode ${name} message ${length} bytes`,
fn: () => {
it(`encode ${name} message ${length} bytes`, async () => {
await runBenchmark(`encode ${name} message ${length} bytes`, () => {
for (let i = 0; i < runsFactor; i++) {
RPC.encode(rpc)
}
},
runsFactor
}, runsFactor)
})
}
})
14 changes: 9 additions & 5 deletions packages/gossipsub/test/benchmark/time-cache.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { itBench } from '@dapplion/benchmark'
// @ts-expect-error no types
import TimeCache from 'time-cache'
import { SimpleTimeCache } from '../../src/utils/time-cache.js'
import { runBenchmark } from '../utils/benchmark.js'

// TODO: errors with "Error: root suite not found"
describe('npm TimeCache vs SimpleTimeCache', () => {
Expand All @@ -10,12 +10,16 @@ describe('npm TimeCache vs SimpleTimeCache', () => {
const simpleTimeCache = new SimpleTimeCache({ validityMs: 1000 })

for (const iteration of iterations) {
itBench(`npm TimeCache.put x${iteration}`, () => {
for (let j = 0; j < iteration; j++) { timeCache.put(String(j)) }
it(`npm TimeCache.put x${iteration}`, async () => {
await runBenchmark(`npm TimeCache.put x${iteration}`, () => {
for (let j = 0; j < iteration; j++) { timeCache.put(String(j)) }
})
})

itBench(`SimpleTimeCache.put x${iteration}`, () => {
for (let j = 0; j < iteration; j++) { simpleTimeCache.put(String(j), true) }
it(`SimpleTimeCache.put x${iteration}`, async () => {
await runBenchmark(`SimpleTimeCache.put x${iteration}`, () => {
for (let j = 0; j < iteration; j++) { simpleTimeCache.put(String(j), true) }
})
})
}
})
Loading
Loading