From eaae4175e9e1ecbfe8efcddb783ffea098579405 Mon Sep 17 00:00:00 2001 From: dozyio Date: Tue, 31 Mar 2026 04:49:41 +0100 Subject: [PATCH 01/10] fix: kad getClosetPeer --- packages/kad-dht/src/query/manager.ts | 27 +++++++++++++----------- packages/kad-dht/src/query/query-path.ts | 27 ++++++++++-------------- 2 files changed, 26 insertions(+), 28 deletions(-) diff --git a/packages/kad-dht/src/query/manager.ts b/packages/kad-dht/src/query/manager.ts index 681c682fcf..678591c9b8 100644 --- a/packages/kad-dht/src/query/manager.ts +++ b/packages/kad-dht/src/query/manager.ts @@ -220,23 +220,26 @@ export class QueryManager implements Startable { log.error('query error - %e', event.error) } + signal.throwIfAborted() + yield event + if (event.name === 'PEER_RESPONSE') { for (const peer of [...event.closer, ...event.providers]) { - // eslint-disable-next-line max-depth - if (!(await this.connectionManager.isDialable(peer.multiaddrs, { - signal - }))) { - continue - } - - await this.routingTable.add(peer.id, { - signal + void (async () => { + if (!(await this.connectionManager.isDialable(peer.multiaddrs, { + signal + }))) { + return + } + + await this.routingTable.add(peer.id, { + signal + }) + })().catch(err => { + log.error('could not update routing table from peer response - %e', err) }) } } - - signal.throwIfAborted() - yield event } queryFinished = true diff --git a/packages/kad-dht/src/query/query-path.ts b/packages/kad-dht/src/query/query-path.ts index 978d173bdc..951dd2a522 100644 --- a/packages/kad-dht/src/query/query-path.ts +++ b/packages/kad-dht/src/query/query-path.ts @@ -77,7 +77,7 @@ interface QueryQueueOptions extends AbortOptions { * every peer encountered that we have not seen before */ export async function * queryPath (options: QueryPathOptions): AsyncGenerator { - const { key, startingPeers, ourPeerId, query, alpha, path, numPaths, log, peersSeen, connectionManager, signal } = options + const { key, startingPeers, ourPeerId, query, alpha, path, numPaths, log, peersSeen, signal } = options const events = pushable({ objectMode: true }) @@ -146,6 +146,16 @@ export async function * queryPath (options: QueryPathOptions): AsyncGenerator Date: Tue, 31 Mar 2026 04:57:12 +0100 Subject: [PATCH 02/10] dialability and routing-table updates --- packages/kad-dht/src/query/manager.ts | 48 ++++++++++++++++++------ packages/kad-dht/src/query/query-path.ts | 9 ++++- 2 files changed, 45 insertions(+), 12 deletions(-) diff --git a/packages/kad-dht/src/query/manager.ts b/packages/kad-dht/src/query/manager.ts index 678591c9b8..4c6c225ef5 100644 --- a/packages/kad-dht/src/query/manager.ts +++ b/packages/kad-dht/src/query/manager.ts @@ -1,4 +1,4 @@ -import { createScalableCuckooFilter } from '@libp2p/utils' +import { createScalableCuckooFilter, Queue } from '@libp2p/utils' import { anySignal } from 'any-signal' import merge from 'it-merge' import { setMaxListeners } from 'main-event' @@ -143,6 +143,10 @@ export class QueryManager implements Startable { // query a subset of peers up to `kBucketSize / 2` in length let queryFinished = false + const routingUpdateQueue = new Queue({ + concurrency: Math.max(1, Math.min(this.alpha * 2, 16)) + }) + const routingUpdatePeers = new Set() try { if (this.routingTable.size === 0 && !this.allowQueryWithZeroPeers) { @@ -225,17 +229,30 @@ export class QueryManager implements Startable { if (event.name === 'PEER_RESPONSE') { for (const peer of [...event.closer, ...event.providers]) { - void (async () => { - if (!(await this.connectionManager.isDialable(peer.multiaddrs, { - signal - }))) { - return + const peerId = peer.id.toString() + + if (routingUpdatePeers.has(peerId)) { + continue + } + + routingUpdatePeers.add(peerId) + + void routingUpdateQueue.add(async () => { + try { + if (!(await this.connectionManager.isDialable(peer.multiaddrs, { + signal + }))) { + return + } + + await this.routingTable.add(peer.id, { + signal + }) + } finally { + routingUpdatePeers.delete(peerId) } - - await this.routingTable.add(peer.id, { - signal - }) - })().catch(err => { + }).catch(err => { + routingUpdatePeers.delete(peerId) log.error('could not update routing table from peer response - %e', err) }) } @@ -252,6 +269,15 @@ export class QueryManager implements Startable { if (!queryFinished) { log('query exited early') queryEarlyExitController.abort() + routingUpdateQueue.abort() + } else { + try { + await routingUpdateQueue.onIdle({ + signal: AbortSignal.timeout(1000) + }) + } catch { + + } } signal.clear() diff --git a/packages/kad-dht/src/query/query-path.ts b/packages/kad-dht/src/query/query-path.ts index 951dd2a522..75a2061266 100644 --- a/packages/kad-dht/src/query/query-path.ts +++ b/packages/kad-dht/src/query/query-path.ts @@ -77,7 +77,7 @@ interface QueryQueueOptions extends AbortOptions { * every peer encountered that we have not seen before */ export async function * queryPath (options: QueryPathOptions): AsyncGenerator { - const { key, startingPeers, ourPeerId, query, alpha, path, numPaths, log, peersSeen, signal } = options + const { key, startingPeers, ourPeerId, query, alpha, path, numPaths, log, peersSeen, connectionManager, signal } = options const events = pushable({ objectMode: true }) @@ -169,6 +169,13 @@ export async function * queryPath (options: QueryPathOptions): AsyncGenerator Date: Tue, 31 Mar 2026 19:46:27 +0100 Subject: [PATCH 03/10] chore: fix linting issues --- packages/kad-dht/src/query/manager.ts | 2 +- packages/kad-dht/src/query/query-path.ts | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/kad-dht/src/query/manager.ts b/packages/kad-dht/src/query/manager.ts index 4c6c225ef5..eabbeea2c4 100644 --- a/packages/kad-dht/src/query/manager.ts +++ b/packages/kad-dht/src/query/manager.ts @@ -231,7 +231,7 @@ export class QueryManager implements Startable { for (const peer of [...event.closer, ...event.providers]) { const peerId = peer.id.toString() - if (routingUpdatePeers.has(peerId)) { + if (routingUpdatePeers.has(peerId)) { // eslint-disable-line max-depth continue } diff --git a/packages/kad-dht/src/query/query-path.ts b/packages/kad-dht/src/query/query-path.ts index 75a2061266..18766cf1be 100644 --- a/packages/kad-dht/src/query/query-path.ts +++ b/packages/kad-dht/src/query/query-path.ts @@ -169,9 +169,9 @@ export async function * queryPath (options: QueryPathOptions): AsyncGenerator Date: Fri, 3 Apr 2026 00:35:48 +0100 Subject: [PATCH 04/10] global routing update queue --- packages/kad-dht/src/index.ts | 15 ++ packages/kad-dht/src/kad-dht.ts | 13 +- packages/kad-dht/src/peer-routing/index.ts | 8 +- packages/kad-dht/src/query/manager.ts | 178 +++++++++---- packages/kad-dht/test/kad-dht.spec.ts | 52 ++++ packages/kad-dht/test/query.spec.ts | 277 +++++++++++++++++++++ 6 files changed, 491 insertions(+), 52 deletions(-) diff --git a/packages/kad-dht/src/index.ts b/packages/kad-dht/src/index.ts index 16c2409da1..30737ddc5b 100644 --- a/packages/kad-dht/src/index.ts +++ b/packages/kad-dht/src/index.ts @@ -500,6 +500,21 @@ export interface KadDHTInit { */ disjointPaths?: number + /** + * Concurrency for background routing table updates from query responses. + * + * @default min(alpha * 2, 16) + */ + routingUpdateQueueConcurrency?: number + + /** + * Minimum time in ms between background routing table update attempts for + * the same responding peer. + * + * @default 30_000 + */ + routingUpdatePeerTtl?: number + /** * How many bits of the KAD-ID of peers to use when creating the routing * table. diff --git a/packages/kad-dht/src/kad-dht.ts b/packages/kad-dht/src/kad-dht.ts index 5fb8b0d89b..126edf8939 100644 --- a/packages/kad-dht/src/kad-dht.ts +++ b/packages/kad-dht/src/kad-dht.ts @@ -228,6 +228,8 @@ export class KadDHT extends TypedEventEmitter implements Ka this.queryManager = new QueryManager(components, { disjointPaths: this.d, alpha: this.a, + routingUpdateQueueConcurrency: init.routingUpdateQueueConcurrency, + routingUpdatePeerTtl: init.routingUpdatePeerTtl, logPrefix, metricsPrefix, initialQuerySelfHasRun, @@ -404,14 +406,9 @@ export class KadDHT extends TypedEventEmitter implements Ka const signal = AbortSignal.timeout(this.onPeerConnectTimeout) setMaxListeners(Infinity, signal) - - try { - await this.routingTable.add(peerData.id, { - signal - }) - } catch (err: any) { - this.log.error('could not add %p to routing table - %e', peerData.id, err) - } + this.queryManager.queueRoutingTableUpdate(peerData.id, { + signal + }) } /** diff --git a/packages/kad-dht/src/peer-routing/index.ts b/packages/kad-dht/src/peer-routing/index.ts index 76e0196165..19d56ffeba 100644 --- a/packages/kad-dht/src/peer-routing/index.ts +++ b/packages/kad-dht/src/peer-routing/index.ts @@ -275,9 +275,15 @@ export class PeerRouting { continue } + if (!(await this.components.connectionManager.isDialable(peer.multiaddrs, { + signal: options.signal + }))) { + continue + } + yield finalPeerEvent({ from: this.components.peerId, - peer: await self.components.peerStore.getInfo(peer.id, options), + peer, path: { index: path.index, queued: 0, diff --git a/packages/kad-dht/src/query/manager.ts b/packages/kad-dht/src/query/manager.ts index eabbeea2c4..4b397c393e 100644 --- a/packages/kad-dht/src/query/manager.ts +++ b/packages/kad-dht/src/query/manager.ts @@ -13,7 +13,7 @@ import { queryPath } from './query-path.js' import type { QueryFunc } from './types.js' import type { QueryEvent } from '../index.js' import type { RoutingTable } from '../routing-table/index.js' -import type { ComponentLogger, Metrics, PeerId, RoutingOptions, Startable } from '@libp2p/interface' +import type { AbortOptions, ComponentLogger, Metrics, PeerId, RoutingOptions, Startable } from '@libp2p/interface' import type { ConnectionManager } from '@libp2p/interface-internal' import type { DeferredPromise } from 'p-defer' @@ -26,6 +26,8 @@ export interface QueryManagerInit { metricsPrefix: string disjointPaths?: number alpha?: number + routingUpdateQueueConcurrency?: number + routingUpdatePeerTtl?: number initialQuerySelfHasRun: DeferredPromise allowQueryWithZeroPeers?: boolean routingTable: RoutingTable @@ -54,9 +56,23 @@ export class QueryManager implements Startable { private readonly peerId: PeerId private readonly connectionManager: ConnectionManager private readonly routingTable: RoutingTable + private readonly routingUpdateQueueConcurrency: number + private readonly routingUpdatePeerTtl: number + private readonly routingUpdateRecent: Map + private readonly routingUpdateInFlight: Set private initialQuerySelfHasRun?: DeferredPromise private readonly logPrefix: string private readonly allowQueryWithZeroPeers: boolean + private routingUpdateQueue?: Queue + private routingUpdateStats: { + enqueued: number + deduped: number + completed: number + failed: number + aborted: number + cancelledBeforeStart: number + ttlSkipped: number + } constructor (components: QueryManagerComponents, init: QueryManagerInit) { this.logPrefix = init.logPrefix @@ -64,10 +80,24 @@ export class QueryManager implements Startable { this.alpha = init.alpha ?? ALPHA this.initialQuerySelfHasRun = init.initialQuerySelfHasRun this.routingTable = init.routingTable + this.routingUpdateQueueConcurrency = init.routingUpdateQueueConcurrency ?? Math.max(1, Math.min(this.alpha * 2, 16)) + this.routingUpdatePeerTtl = init.routingUpdatePeerTtl ?? 30_000 + this.routingUpdateRecent = new Map() + this.routingUpdateInFlight = new Set() this.logger = components.logger this.peerId = components.peerId this.connectionManager = components.connectionManager this.allowQueryWithZeroPeers = init.allowQueryWithZeroPeers ?? false + const routingUpdateStats = { + enqueued: 0, + deduped: 0, + completed: 0, + failed: 0, + aborted: 0, + cancelledBeforeStart: 0, + ttlSkipped: 0 + } + this.routingUpdateStats = routingUpdateStats // allow us to stop queries on shut down this.shutDownController = new AbortController() @@ -77,10 +107,102 @@ export class QueryManager implements Startable { this.running = false } + getRoutingUpdateQueueStats (): { + queued: number + running: number + total: number + enqueued: number + deduped: number + completed: number + failed: number + aborted: number + cancelledBeforeStart: number + ttlSkipped: number + } { + return { + queued: this.routingUpdateQueue?.queued ?? 0, + running: this.routingUpdateQueue?.running ?? 0, + total: this.routingUpdateQueue?.size ?? 0, + ...this.routingUpdateStats + } + } + isStarted (): boolean { return this.running } + queueRoutingTableUpdate (peerId: PeerId, options: AbortOptions = {}): void { + const queue = this.routingUpdateQueue + + if (queue == null) { + return + } + + const peerIdStr = peerId.toString() + const now = Date.now() + + this.pruneRoutingUpdateRecent(now) + + const updateAllowedAt = this.routingUpdateRecent.get(peerIdStr) + if (updateAllowedAt != null && updateAllowedAt > now) { + this.routingUpdateStats.ttlSkipped++ + return + } + + if (this.routingUpdateInFlight.has(peerIdStr)) { + this.routingUpdateStats.deduped++ + return + } + + this.routingUpdateInFlight.add(peerIdStr) + this.routingUpdateRecent.set(peerIdStr, now + this.routingUpdatePeerTtl) + this.routingUpdateStats.enqueued++ + + void queue.add(async () => { + const signal = options.signal == null + ? this.shutDownController.signal + : anySignal([this.shutDownController.signal, options.signal]) + + setMaxListeners(Infinity, signal) + + try { + await this.routingTable.add(peerId, { + signal + }) + this.routingUpdateStats.completed++ + } catch (err: any) { + if (signal.aborted || err?.name === 'AbortError') { + this.routingUpdateStats.aborted++ + return + } + + this.routingUpdateStats.failed++ + throw err + } finally { + this.routingUpdateInFlight.delete(peerIdStr) + + if (options.signal != null && 'clear' in signal) { + (signal as any).clear() + } + } + }).catch(err => { + this.routingUpdateInFlight.delete(peerIdStr) + this.logger.forComponent(`${this.logPrefix}:routing-update`).error('could not update routing table for peer %p - %e', peerId, err) + }) + } + + private pruneRoutingUpdateRecent (now: number): void { + if (this.routingUpdateRecent.size < 4096) { + return + } + + for (const [peerId, expiresAt] of this.routingUpdateRecent.entries()) { + if (expiresAt <= now) { + this.routingUpdateRecent.delete(peerId) + } + } + } + /** * Starts the query manager */ @@ -95,6 +217,10 @@ export class QueryManager implements Startable { this.shutDownController = new AbortController() // make sure we don't make a lot of noise in the logs setMaxListeners(Infinity, this.shutDownController.signal) + + this.routingUpdateQueue = new Queue({ + concurrency: this.routingUpdateQueueConcurrency + }) } /** @@ -103,6 +229,13 @@ export class QueryManager implements Startable { async stop (): Promise { this.running = false + if (this.routingUpdateQueue != null) { + this.routingUpdateStats.cancelledBeforeStart += this.routingUpdateQueue.queued + this.routingUpdateQueue.abort() + this.routingUpdateQueue = undefined + } + this.routingUpdateInFlight.clear() + this.shutDownController.abort() } @@ -143,11 +276,6 @@ export class QueryManager implements Startable { // query a subset of peers up to `kBucketSize / 2` in length let queryFinished = false - const routingUpdateQueue = new Queue({ - concurrency: Math.max(1, Math.min(this.alpha * 2, 16)) - }) - const routingUpdatePeers = new Set() - try { if (this.routingTable.size === 0 && !this.allowQueryWithZeroPeers) { log('routing table was empty, waiting for some peers before running%s query', options.isSelfQuery === true ? ' self' : '') @@ -228,34 +356,7 @@ export class QueryManager implements Startable { yield event if (event.name === 'PEER_RESPONSE') { - for (const peer of [...event.closer, ...event.providers]) { - const peerId = peer.id.toString() - - if (routingUpdatePeers.has(peerId)) { // eslint-disable-line max-depth - continue - } - - routingUpdatePeers.add(peerId) - - void routingUpdateQueue.add(async () => { - try { - if (!(await this.connectionManager.isDialable(peer.multiaddrs, { - signal - }))) { - return - } - - await this.routingTable.add(peer.id, { - signal - }) - } finally { - routingUpdatePeers.delete(peerId) - } - }).catch(err => { - routingUpdatePeers.delete(peerId) - log.error('could not update routing table from peer response - %e', err) - }) - } + this.queueRoutingTableUpdate(event.from) } } @@ -269,15 +370,6 @@ export class QueryManager implements Startable { if (!queryFinished) { log('query exited early') queryEarlyExitController.abort() - routingUpdateQueue.abort() - } else { - try { - await routingUpdateQueue.onIdle({ - signal: AbortSignal.timeout(1000) - }) - } catch { - - } } signal.clear() diff --git a/packages/kad-dht/test/kad-dht.spec.ts b/packages/kad-dht/test/kad-dht.spec.ts index 6afb0dbaf2..a32f1dcb11 100644 --- a/packages/kad-dht/test/kad-dht.spec.ts +++ b/packages/kad-dht/test/kad-dht.spec.ts @@ -3,11 +3,13 @@ import { Libp2pRecord } from '@libp2p/record' import { expect } from 'aegir/chai' +import delay from 'delay' import all from 'it-all' import drain from 'it-drain' import filter from 'it-filter' import last from 'it-last' import sinon from 'sinon' +import { multiaddr } from '@multiformats/multiaddr' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { MessageType } from '../src/index.js' import { peerResponseEvent } from '../src/query/events.js' @@ -111,6 +113,56 @@ describe('KadDHT', () => { await dht.dht.stop() }) + + it('should enqueue peer connect routing updates via query manager', async () => { + const dht = await testDHT.spawn(undefined, false) + + const queueRoutingTableUpdateSpy = sinon.spy((dht.dht as any).queryManager, 'queueRoutingTableUpdate') + + await dht.dht.onPeerConnect({ + id: peerIds[0].peerId, + multiaddrs: [multiaddr('/ip4/127.0.0.1/tcp/1234')] + }) + + expect(queueRoutingTableUpdateSpy).to.have.property('calledOnce', true) + expect(queueRoutingTableUpdateSpy.firstCall.args[0].toString()).to.equal(peerIds[0].peerId.toString()) + }) + + it('should dedupe onPeerConnect routing updates via ttl', async () => { + const dht = await testDHT.spawn({ + routingUpdatePeerTtl: 60_000, + routingUpdateQueueConcurrency: 1 + }, false) + + await dht.dht.start() + + const routingTableAddSpy = sinon.spy((dht.dht as any).routingTable, 'add') + + await dht.dht.onPeerConnect({ + id: peerIds[1].peerId, + multiaddrs: [multiaddr('/ip4/127.0.0.1/tcp/2234')] + }) + + await dht.dht.onPeerConnect({ + id: peerIds[1].peerId, + multiaddrs: [multiaddr('/ip4/127.0.0.1/tcp/2234')] + }) + + for (let i = 0; i < 40; i++) { + const stats = (dht.dht as any).queryManager.getRoutingUpdateQueueStats() + + if (stats.ttlSkipped >= 1 && stats.completed >= 1) { + break + } + + await delay(10) + } + + const stats = (dht.dht as any).queryManager.getRoutingUpdateQueueStats() + + expect(routingTableAddSpy.callCount).to.equal(1) + expect(stats.ttlSkipped).to.be.greaterThan(0) + }) }) describe('content fetching', () => { diff --git a/packages/kad-dht/test/query.spec.ts b/packages/kad-dht/test/query.spec.ts index d0c7c522f9..51fbf5f3ac 100644 --- a/packages/kad-dht/test/query.spec.ts +++ b/packages/kad-dht/test/query.spec.ts @@ -998,4 +998,281 @@ describe('QueryManager', () => { await manager.stop() }) + + it('should apply routing update ttl across queries', async () => { + routingTable.add.resetHistory() + routingTable.add.resetBehavior() + + const manager = new QueryManager({ + peerId: ourPeerId, + logger: defaultLogger(), + connectionManager: stubInterface({ + isDialable: async () => true + }) + }, { + ...defaultInit(), + disjointPaths: 1, + alpha: 1, + routingUpdateQueueConcurrency: 1, + routingUpdatePeerTtl: 60_000 + }) + await manager.start() + + const queryFunc: QueryFunc = async function * ({ peer, path }) { + yield peerResponseEvent({ + from: peer.id, + messageType: MessageType.GET_VALUE, + path + }) + } + + routingTable.closestPeers.returns([peers[0].peerId]) + + await all(manager.run(key, queryFunc)) + await all(manager.run(key, queryFunc)) + + for (let i = 0; i < 40; i++) { + const stats = manager.getRoutingUpdateQueueStats() + + if (stats.completed >= 1) { + break + } + + await delay(10) + } + + const stats = manager.getRoutingUpdateQueueStats() + + expect(routingTable.add.calledOnce).to.be.true() + expect(stats.enqueued).to.equal(1) + expect(stats.ttlSkipped).to.equal(1) + + await manager.stop() + }) + + it('should dedupe routing table updates while a peer update is in flight', async () => { + routingTable.add.resetHistory() + routingTable.add.resetBehavior() + + routingTable.add.callsFake(async () => { + await delay(25) + }) + + const manager = new QueryManager({ + peerId: ourPeerId, + logger: defaultLogger(), + connectionManager: stubInterface({ + isDialable: async () => true + }) + }, { + ...defaultInit(), + routingUpdateQueueConcurrency: 1, + routingUpdatePeerTtl: 0 + }) + await manager.start() + + manager.queueRoutingTableUpdate(peers[0].peerId) + manager.queueRoutingTableUpdate(peers[0].peerId) + + for (let i = 0; i < 40; i++) { + const stats = manager.getRoutingUpdateQueueStats() + + if (stats.completed >= 1) { + break + } + + await delay(10) + } + + const stats = manager.getRoutingUpdateQueueStats() + + expect(routingTable.add.calledOnce).to.be.true() + expect(stats.deduped).to.equal(1) + expect(stats.ttlSkipped).to.equal(0) + + await manager.stop() + }) + + it('should count queued routing updates cancelled on stop', async () => { + routingTable.add.resetHistory() + routingTable.add.resetBehavior() + + routingTable.add.callsFake(async () => { + await delay(100) + }) + + const manager = new QueryManager({ + peerId: ourPeerId, + logger: defaultLogger(), + connectionManager: stubInterface({ + isDialable: async () => true + }) + }, { + ...defaultInit(), + routingUpdateQueueConcurrency: 1, + routingUpdatePeerTtl: 0 + }) + await manager.start() + + manager.queueRoutingTableUpdate(peers[0].peerId) + manager.queueRoutingTableUpdate(peers[1].peerId) + manager.queueRoutingTableUpdate(peers[2].peerId) + + await delay(10) + await manager.stop() + + const stats = manager.getRoutingUpdateQueueStats() + + expect(stats.cancelledBeforeStart).to.be.greaterThan(0) + }) + + it('should enqueue a routing update again after ttl expires', async () => { + routingTable.add.resetHistory() + routingTable.add.resetBehavior() + + const manager = new QueryManager({ + peerId: ourPeerId, + logger: defaultLogger(), + connectionManager: stubInterface({ + isDialable: async () => true + }) + }, { + ...defaultInit(), + routingUpdateQueueConcurrency: 1, + routingUpdatePeerTtl: 5 + }) + await manager.start() + + manager.queueRoutingTableUpdate(peers[0].peerId) + + for (let i = 0; i < 40; i++) { + const stats = manager.getRoutingUpdateQueueStats() + + if (stats.completed >= 1) { + break + } + + await delay(10) + } + + await delay(10) + manager.queueRoutingTableUpdate(peers[0].peerId) + + for (let i = 0; i < 40; i++) { + const stats = manager.getRoutingUpdateQueueStats() + + if (stats.completed >= 2) { + break + } + + await delay(10) + } + + const stats = manager.getRoutingUpdateQueueStats() + + expect(routingTable.add.calledTwice).to.be.true() + expect(stats.enqueued).to.equal(2) + + await manager.stop() + }) + + it('should honor routing update queue concurrency', async () => { + routingTable.add.resetHistory() + routingTable.add.resetBehavior() + + let running = 0 + let maxRunning = 0 + + routingTable.add.callsFake(async () => { + running++ + maxRunning = Math.max(maxRunning, running) + + try { + await delay(20) + } finally { + running-- + } + }) + + const manager = new QueryManager({ + peerId: ourPeerId, + logger: defaultLogger(), + connectionManager: stubInterface({ + isDialable: async () => true + }) + }, { + ...defaultInit(), + routingUpdateQueueConcurrency: 1, + routingUpdatePeerTtl: 0 + }) + await manager.start() + + manager.queueRoutingTableUpdate(peers[0].peerId) + manager.queueRoutingTableUpdate(peers[1].peerId) + manager.queueRoutingTableUpdate(peers[2].peerId) + + for (let i = 0; i < 80; i++) { + const stats = manager.getRoutingUpdateQueueStats() + + if (stats.completed >= 3) { + break + } + + await delay(10) + } + + expect(routingTable.add.callCount).to.equal(3) + expect(maxRunning).to.equal(1) + + await manager.stop() + }) + + it('should track aborted routing updates from update signal', async () => { + routingTable.add.resetHistory() + routingTable.add.resetBehavior() + + routingTable.add.callsFake(async (_peer, options: any) => { + await delay(10) + + if (options?.signal?.aborted === true) { + const err = new Error('aborted') + ;(err as any).name = 'AbortError' + throw err + } + }) + + const manager = new QueryManager({ + peerId: ourPeerId, + logger: defaultLogger(), + connectionManager: stubInterface({ + isDialable: async () => true + }) + }, { + ...defaultInit(), + routingUpdateQueueConcurrency: 1, + routingUpdatePeerTtl: 0 + }) + await manager.start() + + manager.queueRoutingTableUpdate(peers[0].peerId, { + signal: AbortSignal.timeout(1) + }) + + for (let i = 0; i < 40; i++) { + const stats = manager.getRoutingUpdateQueueStats() + + if (stats.aborted >= 1) { + break + } + + await delay(10) + } + + const stats = manager.getRoutingUpdateQueueStats() + + expect(stats.aborted).to.equal(1) + expect(stats.failed).to.equal(0) + + await manager.stop() + }) }) From ac3a76a2c0e7e9f82dd1a444b643f4a779edcc84 Mon Sep 17 00:00:00 2001 From: dozyio Date: Fri, 3 Apr 2026 01:07:26 +0100 Subject: [PATCH 05/10] chore: lint fix --- packages/kad-dht/test/kad-dht.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/kad-dht/test/kad-dht.spec.ts b/packages/kad-dht/test/kad-dht.spec.ts index a32f1dcb11..8861f172f2 100644 --- a/packages/kad-dht/test/kad-dht.spec.ts +++ b/packages/kad-dht/test/kad-dht.spec.ts @@ -2,6 +2,7 @@ /* eslint max-nested-callbacks: ["error", 8] */ import { Libp2pRecord } from '@libp2p/record' +import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' import delay from 'delay' import all from 'it-all' @@ -9,7 +10,6 @@ import drain from 'it-drain' import filter from 'it-filter' import last from 'it-last' import sinon from 'sinon' -import { multiaddr } from '@multiformats/multiaddr' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { MessageType } from '../src/index.js' import { peerResponseEvent } from '../src/query/events.js' From c29afddd03d98068d22fc4995c9306c8bebcc96a Mon Sep 17 00:00:00 2001 From: dozyio Date: Fri, 3 Apr 2026 03:12:01 +0100 Subject: [PATCH 06/10] clear signal --- packages/kad-dht/src/kad-dht.ts | 4 +++- packages/kad-dht/src/query/manager.ts | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/kad-dht/src/kad-dht.ts b/packages/kad-dht/src/kad-dht.ts index 126edf8939..59680d893c 100644 --- a/packages/kad-dht/src/kad-dht.ts +++ b/packages/kad-dht/src/kad-dht.ts @@ -196,7 +196,8 @@ export class KadDHT extends TypedEventEmitter implements Ka this.network = new Network(components, { protocol: this.protocol, logPrefix, - metricsPrefix + metricsPrefix, + timeout: init.networkDialTimeout }) this.routingTable = new RoutingTable(components, { @@ -406,6 +407,7 @@ export class KadDHT extends TypedEventEmitter implements Ka const signal = AbortSignal.timeout(this.onPeerConnectTimeout) setMaxListeners(Infinity, signal) + this.queryManager.queueRoutingTableUpdate(peerData.id, { signal }) diff --git a/packages/kad-dht/src/query/manager.ts b/packages/kad-dht/src/query/manager.ts index 4b397c393e..d05377a4e5 100644 --- a/packages/kad-dht/src/query/manager.ts +++ b/packages/kad-dht/src/query/manager.ts @@ -181,7 +181,7 @@ export class QueryManager implements Startable { } finally { this.routingUpdateInFlight.delete(peerIdStr) - if (options.signal != null && 'clear' in signal) { + if ('clear' in signal) { (signal as any).clear() } } From 69ec4179f85a49fb8ee7a9412acb6ec6f1d5a8b6 Mon Sep 17 00:00:00 2001 From: dozyio Date: Fri, 10 Apr 2026 11:23:43 +0100 Subject: [PATCH 07/10] wip --- packages/kad-dht/src/index.ts | 4 +- packages/kad-dht/src/kad-dht.ts | 8 +- packages/kad-dht/src/peer-routing/index.ts | 40 +-- packages/kad-dht/src/query/manager.ts | 130 +-------- packages/kad-dht/src/query/query-path.ts | 20 +- packages/kad-dht/src/routing-table/index.ts | 124 +++++++++ packages/kad-dht/test/kad-dht.spec.ts | 8 +- packages/kad-dht/test/query.spec.ts | 276 +++----------------- 8 files changed, 197 insertions(+), 413 deletions(-) diff --git a/packages/kad-dht/src/index.ts b/packages/kad-dht/src/index.ts index 30737ddc5b..30c9c0a1d4 100644 --- a/packages/kad-dht/src/index.ts +++ b/packages/kad-dht/src/index.ts @@ -505,7 +505,7 @@ export interface KadDHTInit { * * @default min(alpha * 2, 16) */ - routingUpdateQueueConcurrency?: number + routingTableUpdateQueueConcurrency?: number /** * Minimum time in ms between background routing table update attempts for @@ -513,7 +513,7 @@ export interface KadDHTInit { * * @default 30_000 */ - routingUpdatePeerTtl?: number + routingTableUpdatePeerTtl?: number /** * How many bits of the KAD-ID of peers to use when creating the routing diff --git a/packages/kad-dht/src/kad-dht.ts b/packages/kad-dht/src/kad-dht.ts index 59680d893c..ad47b964de 100644 --- a/packages/kad-dht/src/kad-dht.ts +++ b/packages/kad-dht/src/kad-dht.ts @@ -213,7 +213,9 @@ export class KadDHT extends TypedEventEmitter implements Ka metricsPrefix, prefixLength: init.prefixLength, splitThreshold: init.kBucketSplitThreshold, - network: this.network + network: this.network, + routingTableUpdateQueueConcurrency: init.routingTableUpdateQueueConcurrency ?? Math.max(1, Math.min(this.a * 2, 16)), + routingTableUpdatePeerTtl: init.routingTableUpdatePeerTtl }) // all queries should wait for the initial query-self query to run so we have @@ -229,8 +231,6 @@ export class KadDHT extends TypedEventEmitter implements Ka this.queryManager = new QueryManager(components, { disjointPaths: this.d, alpha: this.a, - routingUpdateQueueConcurrency: init.routingUpdateQueueConcurrency, - routingUpdatePeerTtl: init.routingUpdatePeerTtl, logPrefix, metricsPrefix, initialQuerySelfHasRun, @@ -408,7 +408,7 @@ export class KadDHT extends TypedEventEmitter implements Ka const signal = AbortSignal.timeout(this.onPeerConnectTimeout) setMaxListeners(Infinity, signal) - this.queryManager.queueRoutingTableUpdate(peerData.id, { + this.routingTable.queueRoutingTableUpdate(peerData.id, { signal }) } diff --git a/packages/kad-dht/src/peer-routing/index.ts b/packages/kad-dht/src/peer-routing/index.ts index 19d56ffeba..ce10abb290 100644 --- a/packages/kad-dht/src/peer-routing/index.ts +++ b/packages/kad-dht/src/peer-routing/index.ts @@ -21,7 +21,6 @@ import type { QueryFunc } from '../query/types.js' import type { RoutingTable } from '../routing-table/index.js' import type { GetClosestPeersOptions } from '../routing-table/k-bucket.ts' import type { ComponentLogger, Logger, Metrics, PeerId, PeerInfo, PeerStore, RoutingOptions } from '@libp2p/interface' -import type { ConnectionManager } from '@libp2p/interface-internal' import type { AbortOptions } from 'it-pushable' export interface PeerRoutingComponents { @@ -29,7 +28,6 @@ export interface PeerRoutingComponents { peerStore: PeerStore logger: ComponentLogger metrics?: Metrics - connectionManager: ConnectionManager } export interface PeerRoutingInit { @@ -265,35 +263,17 @@ export class PeerRouting { this.log('found %d peers close to %b', peers.length, key) - for (let { peer, path } of peers.peers) { - try { - if (peer.multiaddrs.length === 0) { - peer = await self.components.peerStore.getInfo(peer.id, options) - } - - if (peer.multiaddrs.length === 0) { - continue - } - - if (!(await this.components.connectionManager.isDialable(peer.multiaddrs, { - signal: options.signal - }))) { - continue + for (const { peer, path } of peers.peers) { + yield finalPeerEvent({ + from: this.components.peerId, + peer, + path: { + index: path.index, + queued: 0, + running: 0, + total: 0 } - - yield finalPeerEvent({ - from: this.components.peerId, - peer, - path: { - index: path.index, - queued: 0, - running: 0, - total: 0 - } - }, options) - } catch { - continue - } + }, options) } } diff --git a/packages/kad-dht/src/query/manager.ts b/packages/kad-dht/src/query/manager.ts index d05377a4e5..dd0763f403 100644 --- a/packages/kad-dht/src/query/manager.ts +++ b/packages/kad-dht/src/query/manager.ts @@ -1,4 +1,4 @@ -import { createScalableCuckooFilter, Queue } from '@libp2p/utils' +import { createScalableCuckooFilter } from '@libp2p/utils' import { anySignal } from 'any-signal' import merge from 'it-merge' import { setMaxListeners } from 'main-event' @@ -26,8 +26,6 @@ export interface QueryManagerInit { metricsPrefix: string disjointPaths?: number alpha?: number - routingUpdateQueueConcurrency?: number - routingUpdatePeerTtl?: number initialQuerySelfHasRun: DeferredPromise allowQueryWithZeroPeers?: boolean routingTable: RoutingTable @@ -56,23 +54,9 @@ export class QueryManager implements Startable { private readonly peerId: PeerId private readonly connectionManager: ConnectionManager private readonly routingTable: RoutingTable - private readonly routingUpdateQueueConcurrency: number - private readonly routingUpdatePeerTtl: number - private readonly routingUpdateRecent: Map - private readonly routingUpdateInFlight: Set private initialQuerySelfHasRun?: DeferredPromise private readonly logPrefix: string private readonly allowQueryWithZeroPeers: boolean - private routingUpdateQueue?: Queue - private routingUpdateStats: { - enqueued: number - deduped: number - completed: number - failed: number - aborted: number - cancelledBeforeStart: number - ttlSkipped: number - } constructor (components: QueryManagerComponents, init: QueryManagerInit) { this.logPrefix = init.logPrefix @@ -80,24 +64,10 @@ export class QueryManager implements Startable { this.alpha = init.alpha ?? ALPHA this.initialQuerySelfHasRun = init.initialQuerySelfHasRun this.routingTable = init.routingTable - this.routingUpdateQueueConcurrency = init.routingUpdateQueueConcurrency ?? Math.max(1, Math.min(this.alpha * 2, 16)) - this.routingUpdatePeerTtl = init.routingUpdatePeerTtl ?? 30_000 - this.routingUpdateRecent = new Map() - this.routingUpdateInFlight = new Set() this.logger = components.logger this.peerId = components.peerId this.connectionManager = components.connectionManager this.allowQueryWithZeroPeers = init.allowQueryWithZeroPeers ?? false - const routingUpdateStats = { - enqueued: 0, - deduped: 0, - completed: 0, - failed: 0, - aborted: 0, - cancelledBeforeStart: 0, - ttlSkipped: 0 - } - this.routingUpdateStats = routingUpdateStats // allow us to stop queries on shut down this.shutDownController = new AbortController() @@ -119,90 +89,13 @@ export class QueryManager implements Startable { cancelledBeforeStart: number ttlSkipped: number } { - return { - queued: this.routingUpdateQueue?.queued ?? 0, - running: this.routingUpdateQueue?.running ?? 0, - total: this.routingUpdateQueue?.size ?? 0, - ...this.routingUpdateStats - } + return this.routingTable.getRoutingUpdateQueueStats() } isStarted (): boolean { return this.running } - queueRoutingTableUpdate (peerId: PeerId, options: AbortOptions = {}): void { - const queue = this.routingUpdateQueue - - if (queue == null) { - return - } - - const peerIdStr = peerId.toString() - const now = Date.now() - - this.pruneRoutingUpdateRecent(now) - - const updateAllowedAt = this.routingUpdateRecent.get(peerIdStr) - if (updateAllowedAt != null && updateAllowedAt > now) { - this.routingUpdateStats.ttlSkipped++ - return - } - - if (this.routingUpdateInFlight.has(peerIdStr)) { - this.routingUpdateStats.deduped++ - return - } - - this.routingUpdateInFlight.add(peerIdStr) - this.routingUpdateRecent.set(peerIdStr, now + this.routingUpdatePeerTtl) - this.routingUpdateStats.enqueued++ - - void queue.add(async () => { - const signal = options.signal == null - ? this.shutDownController.signal - : anySignal([this.shutDownController.signal, options.signal]) - - setMaxListeners(Infinity, signal) - - try { - await this.routingTable.add(peerId, { - signal - }) - this.routingUpdateStats.completed++ - } catch (err: any) { - if (signal.aborted || err?.name === 'AbortError') { - this.routingUpdateStats.aborted++ - return - } - - this.routingUpdateStats.failed++ - throw err - } finally { - this.routingUpdateInFlight.delete(peerIdStr) - - if ('clear' in signal) { - (signal as any).clear() - } - } - }).catch(err => { - this.routingUpdateInFlight.delete(peerIdStr) - this.logger.forComponent(`${this.logPrefix}:routing-update`).error('could not update routing table for peer %p - %e', peerId, err) - }) - } - - private pruneRoutingUpdateRecent (now: number): void { - if (this.routingUpdateRecent.size < 4096) { - return - } - - for (const [peerId, expiresAt] of this.routingUpdateRecent.entries()) { - if (expiresAt <= now) { - this.routingUpdateRecent.delete(peerId) - } - } - } - /** * Starts the query manager */ @@ -217,10 +110,6 @@ export class QueryManager implements Startable { this.shutDownController = new AbortController() // make sure we don't make a lot of noise in the logs setMaxListeners(Infinity, this.shutDownController.signal) - - this.routingUpdateQueue = new Queue({ - concurrency: this.routingUpdateQueueConcurrency - }) } /** @@ -229,13 +118,6 @@ export class QueryManager implements Startable { async stop (): Promise { this.running = false - if (this.routingUpdateQueue != null) { - this.routingUpdateStats.cancelledBeforeStart += this.routingUpdateQueue.queued - this.routingUpdateQueue.abort() - this.routingUpdateQueue = undefined - } - this.routingUpdateInFlight.clear() - this.shutDownController.abort() } @@ -352,12 +234,12 @@ export class QueryManager implements Startable { log.error('query error - %e', event.error) } - signal.throwIfAborted() - yield event - if (event.name === 'PEER_RESPONSE') { - this.queueRoutingTableUpdate(event.from) + this.routingTable.queueRoutingTableUpdate(event.from) } + + signal.throwIfAborted() + yield event } queryFinished = true diff --git a/packages/kad-dht/src/query/query-path.ts b/packages/kad-dht/src/query/query-path.ts index 18766cf1be..1329aa4e52 100644 --- a/packages/kad-dht/src/query/query-path.ts +++ b/packages/kad-dht/src/query/query-path.ts @@ -146,16 +146,6 @@ export async function * queryPath (options: QueryPathOptions): AsyncGenerator implemen routingTableKadBucketMaxOccupancy: Metric kadBucketEvents: CounterGroup<'ping_old_contact' | 'ping_old_contact_error' | 'ping_new_contact' | 'ping_new_contact_error' | 'peer_added' | 'peer_removed'> } + private readonly routingUpdateQueue: PeerQueue + private readonly routingUpdatePeerTtl: number + private readonly routingUpdateRecent: Map + private readonly routingUpdateStats: RoutingTableUpdateQueueStats private shutdownController: AbortController @@ -116,9 +133,26 @@ export class RoutingTable extends TypedEventEmitter implemen this.peerRemoved = this.peerRemoved.bind(this) this.populateFromDatastoreOnStart = init.populateFromDatastoreOnStart ?? POPULATE_FROM_DATASTORE_ON_START this.populateFromDatastoreLimit = init.populateFromDatastoreLimit ?? POPULATE_FROM_DATASTORE_LIMIT + this.routingUpdatePeerTtl = init.routingTableUpdatePeerTtl ?? ROUTING_UPDATE_PEER_TTL + this.routingUpdateRecent = new Map() + this.routingUpdateStats = { + enqueued: 0, + deduped: 0, + completed: 0, + failed: 0, + aborted: 0, + cancelledBeforeStart: 0, + ttlSkipped: 0 + } this.shutdownController = new AbortController() setMaxListeners(Infinity, this.shutdownController.signal) + this.routingUpdateQueue = new PeerQueue({ + concurrency: init.routingTableUpdateQueueConcurrency ?? PING_NEW_CONTACT_CONCURRENCY, + metricName: `${init.metricsPrefix}_routing_update_queue`, + metrics: this.components.metrics + }) + this.pingOldContactQueue = new PeerQueue({ concurrency: init.pingOldContactConcurrency ?? PING_OLD_CONTACT_CONCURRENCY, metricName: `${init.metricsPrefix}_ping_old_contact_queue`, @@ -253,11 +287,101 @@ export class RoutingTable extends TypedEventEmitter implemen async stop (): Promise { this.running = false await stop(this.closestPeerTagger, this.kb) + this.routingUpdateStats.cancelledBeforeStart += this.routingUpdateQueue.queued + this.routingUpdateQueue.abort() this.pingOldContactQueue.abort() this.pingNewContactQueue.abort() this.shutdownController.abort() } + getRoutingUpdateQueueStats (): { + queued: number + running: number + total: number + enqueued: number + deduped: number + completed: number + failed: number + aborted: number + cancelledBeforeStart: number + ttlSkipped: number + } { + return { + queued: this.routingUpdateQueue.queued, + running: this.routingUpdateQueue.running, + total: this.routingUpdateQueue.size, + ...this.routingUpdateStats + } + } + + queueRoutingTableUpdate (peerId: PeerId, options: AbortOptions = {}): void { + const peerIdStr = peerId.toString() + const now = Date.now() + + this.pruneRoutingUpdateRecent(now) + + const updateAllowedAt = this.routingUpdateRecent.get(peerIdStr) + if (updateAllowedAt != null && updateAllowedAt > now) { + this.routingUpdateStats.ttlSkipped++ + return + } + + const existingJob = this.routingUpdateQueue.find(peerId) + + if (existingJob != null) { + this.routingUpdateStats.deduped++ + void existingJob.join(options).catch(() => {}) + return + } + + this.routingUpdateRecent.set(peerIdStr, now + this.routingUpdatePeerTtl) + this.routingUpdateStats.enqueued++ + + void this.routingUpdateQueue.add(async (jobOptions) => { + const signal = jobOptions.signal == null + ? this.shutdownController.signal + : anySignal([this.shutdownController.signal, jobOptions.signal]) + + setMaxListeners(Infinity, signal) + + try { + await this.add(peerId, { + signal + }) + this.routingUpdateStats.completed++ + } catch (err: any) { + if (signal.aborted || err?.name === 'AbortError') { + this.routingUpdateStats.aborted++ + return + } + + this.routingUpdateStats.failed++ + throw err + } finally { + if ('clear' in signal) { + (signal as any).clear() + } + } + }, { + peerId, + signal: options.signal + }).catch(err => { + this.log.error('could not update routing table for peer %p - %e', peerId, err) + }) + } + + private pruneRoutingUpdateRecent (now: number): void { + if (this.routingUpdateRecent.size < 4096) { + return + } + + for (const [peerId, expiresAt] of this.routingUpdateRecent.entries()) { + if (expiresAt <= now) { + this.routingUpdateRecent.delete(peerId) + } + } + } + private async peerAdded (peer: Peer, bucket: LeafBucket, options?: AbortOptions): Promise { if (!this.components.peerId.equals(peer.peerId)) { await this.components.peerStore.merge(peer.peerId, { diff --git a/packages/kad-dht/test/kad-dht.spec.ts b/packages/kad-dht/test/kad-dht.spec.ts index 8861f172f2..42d15d89c7 100644 --- a/packages/kad-dht/test/kad-dht.spec.ts +++ b/packages/kad-dht/test/kad-dht.spec.ts @@ -114,10 +114,10 @@ describe('KadDHT', () => { await dht.dht.stop() }) - it('should enqueue peer connect routing updates via query manager', async () => { + it('should enqueue peer connect routing updates via routing table', async () => { const dht = await testDHT.spawn(undefined, false) - const queueRoutingTableUpdateSpy = sinon.spy((dht.dht as any).queryManager, 'queueRoutingTableUpdate') + const queueRoutingTableUpdateSpy = sinon.spy((dht.dht as any).routingTable, 'queueRoutingTableUpdate') await dht.dht.onPeerConnect({ id: peerIds[0].peerId, @@ -130,8 +130,8 @@ describe('KadDHT', () => { it('should dedupe onPeerConnect routing updates via ttl', async () => { const dht = await testDHT.spawn({ - routingUpdatePeerTtl: 60_000, - routingUpdateQueueConcurrency: 1 + routingTableUpdatePeerTtl: 60_000, + routingTableUpdateQueueConcurrency: 1 }, false) await dht.dht.start() diff --git a/packages/kad-dht/test/query.spec.ts b/packages/kad-dht/test/query.spec.ts index 51fbf5f3ac..c43986efbc 100644 --- a/packages/kad-dht/test/query.spec.ts +++ b/packages/kad-dht/test/query.spec.ts @@ -152,6 +152,20 @@ describe('QueryManager', () => { beforeEach(async () => { routingTable.closestPeers.returns(peers.slice(0, K).map(p => p.peerId)) + routingTable.queueRoutingTableUpdate.resetHistory() + routingTable.getRoutingUpdateQueueStats.resetHistory() + routingTable.getRoutingUpdateQueueStats.returns({ + queued: 0, + running: 0, + total: 0, + enqueued: 0, + deduped: 0, + completed: 0, + failed: 0, + aborted: 0, + cancelledBeforeStart: 0, + ttlSkipped: 0 + }) }) it('does not run queries before start', async () => { @@ -999,10 +1013,7 @@ describe('QueryManager', () => { await manager.stop() }) - it('should apply routing update ttl across queries', async () => { - routingTable.add.resetHistory() - routingTable.add.resetBehavior() - + it('should queue routing table updates from peer response events', async () => { const manager = new QueryManager({ peerId: ourPeerId, logger: defaultLogger(), @@ -1010,12 +1021,10 @@ describe('QueryManager', () => { isDialable: async () => true }) }, { - ...defaultInit(), - disjointPaths: 1, - alpha: 1, - routingUpdateQueueConcurrency: 1, - routingUpdatePeerTtl: 60_000 + ...defaultInit() }) + + routingTable.closestPeers.returns([peers[0].peerId]) await manager.start() const queryFunc: QueryFunc = async function * ({ peer, path }) { @@ -1026,220 +1035,30 @@ describe('QueryManager', () => { }) } - routingTable.closestPeers.returns([peers[0].peerId]) - - await all(manager.run(key, queryFunc)) await all(manager.run(key, queryFunc)) - for (let i = 0; i < 40; i++) { - const stats = manager.getRoutingUpdateQueueStats() - - if (stats.completed >= 1) { - break - } - - await delay(10) - } - - const stats = manager.getRoutingUpdateQueueStats() - - expect(routingTable.add.calledOnce).to.be.true() - expect(stats.enqueued).to.equal(1) - expect(stats.ttlSkipped).to.equal(1) - - await manager.stop() - }) - - it('should dedupe routing table updates while a peer update is in flight', async () => { - routingTable.add.resetHistory() - routingTable.add.resetBehavior() - - routingTable.add.callsFake(async () => { - await delay(25) - }) - - const manager = new QueryManager({ - peerId: ourPeerId, - logger: defaultLogger(), - connectionManager: stubInterface({ - isDialable: async () => true - }) - }, { - ...defaultInit(), - routingUpdateQueueConcurrency: 1, - routingUpdatePeerTtl: 0 - }) - await manager.start() - - manager.queueRoutingTableUpdate(peers[0].peerId) - manager.queueRoutingTableUpdate(peers[0].peerId) - - for (let i = 0; i < 40; i++) { - const stats = manager.getRoutingUpdateQueueStats() - - if (stats.completed >= 1) { - break - } - - await delay(10) - } - - const stats = manager.getRoutingUpdateQueueStats() - - expect(routingTable.add.calledOnce).to.be.true() - expect(stats.deduped).to.equal(1) - expect(stats.ttlSkipped).to.equal(0) - - await manager.stop() - }) - - it('should count queued routing updates cancelled on stop', async () => { - routingTable.add.resetHistory() - routingTable.add.resetBehavior() - - routingTable.add.callsFake(async () => { - await delay(100) - }) - - const manager = new QueryManager({ - peerId: ourPeerId, - logger: defaultLogger(), - connectionManager: stubInterface({ - isDialable: async () => true - }) - }, { - ...defaultInit(), - routingUpdateQueueConcurrency: 1, - routingUpdatePeerTtl: 0 - }) - await manager.start() - - manager.queueRoutingTableUpdate(peers[0].peerId) - manager.queueRoutingTableUpdate(peers[1].peerId) - manager.queueRoutingTableUpdate(peers[2].peerId) - - await delay(10) - await manager.stop() - - const stats = manager.getRoutingUpdateQueueStats() - - expect(stats.cancelledBeforeStart).to.be.greaterThan(0) - }) - - it('should enqueue a routing update again after ttl expires', async () => { - routingTable.add.resetHistory() - routingTable.add.resetBehavior() - - const manager = new QueryManager({ - peerId: ourPeerId, - logger: defaultLogger(), - connectionManager: stubInterface({ - isDialable: async () => true - }) - }, { - ...defaultInit(), - routingUpdateQueueConcurrency: 1, - routingUpdatePeerTtl: 5 - }) - await manager.start() - - manager.queueRoutingTableUpdate(peers[0].peerId) - - for (let i = 0; i < 40; i++) { - const stats = manager.getRoutingUpdateQueueStats() - - if (stats.completed >= 1) { - break - } - - await delay(10) - } - - await delay(10) - manager.queueRoutingTableUpdate(peers[0].peerId) - - for (let i = 0; i < 40; i++) { - const stats = manager.getRoutingUpdateQueueStats() - - if (stats.completed >= 2) { - break - } - - await delay(10) - } - - const stats = manager.getRoutingUpdateQueueStats() - - expect(routingTable.add.calledTwice).to.be.true() - expect(stats.enqueued).to.equal(2) + expect(routingTable.queueRoutingTableUpdate.calledOnce).to.be.true() + expect(routingTable.queueRoutingTableUpdate.firstCall.args[0].toString()).to.equal(peers[0].peerId.toString()) + expect(routingTable.queueRoutingTableUpdate.firstCall.args).to.have.lengthOf(1) await manager.stop() }) - it('should honor routing update queue concurrency', async () => { - routingTable.add.resetHistory() - routingTable.add.resetBehavior() - - let running = 0 - let maxRunning = 0 - - routingTable.add.callsFake(async () => { - running++ - maxRunning = Math.max(maxRunning, running) - - try { - await delay(20) - } finally { - running-- - } - }) - - const manager = new QueryManager({ - peerId: ourPeerId, - logger: defaultLogger(), - connectionManager: stubInterface({ - isDialable: async () => true - }) - }, { - ...defaultInit(), - routingUpdateQueueConcurrency: 1, - routingUpdatePeerTtl: 0 - }) - await manager.start() - - manager.queueRoutingTableUpdate(peers[0].peerId) - manager.queueRoutingTableUpdate(peers[1].peerId) - manager.queueRoutingTableUpdate(peers[2].peerId) - - for (let i = 0; i < 80; i++) { - const stats = manager.getRoutingUpdateQueueStats() - - if (stats.completed >= 3) { - break - } - - await delay(10) + it('should delegate routing update queue stats to the routing table', async () => { + const expectedStats = { + queued: 1, + running: 2, + total: 3, + enqueued: 4, + deduped: 5, + completed: 6, + failed: 7, + aborted: 8, + cancelledBeforeStart: 9, + ttlSkipped: 10 } - expect(routingTable.add.callCount).to.equal(3) - expect(maxRunning).to.equal(1) - - await manager.stop() - }) - - it('should track aborted routing updates from update signal', async () => { - routingTable.add.resetHistory() - routingTable.add.resetBehavior() - - routingTable.add.callsFake(async (_peer, options: any) => { - await delay(10) - - if (options?.signal?.aborted === true) { - const err = new Error('aborted') - ;(err as any).name = 'AbortError' - throw err - } - }) + routingTable.getRoutingUpdateQueueStats.returns(expectedStats) const manager = new QueryManager({ peerId: ourPeerId, @@ -1248,31 +1067,10 @@ describe('QueryManager', () => { isDialable: async () => true }) }, { - ...defaultInit(), - routingUpdateQueueConcurrency: 1, - routingUpdatePeerTtl: 0 + ...defaultInit() }) - await manager.start() - - manager.queueRoutingTableUpdate(peers[0].peerId, { - signal: AbortSignal.timeout(1) - }) - - for (let i = 0; i < 40; i++) { - const stats = manager.getRoutingUpdateQueueStats() - if (stats.aborted >= 1) { - break - } - - await delay(10) - } - - const stats = manager.getRoutingUpdateQueueStats() - - expect(stats.aborted).to.equal(1) - expect(stats.failed).to.equal(0) - - await manager.stop() + expect(manager.getRoutingUpdateQueueStats()).to.deep.equal(expectedStats) + expect(routingTable.getRoutingUpdateQueueStats.calledOnce).to.be.true() }) }) From 373c3c272eaa847a3ddd979a055f17436aa58cd3 Mon Sep 17 00:00:00 2001 From: dozyio Date: Sat, 11 Apr 2026 13:28:40 +0100 Subject: [PATCH 08/10] fix: webrtc NoValidAddressesError - dial realAddr before relayPeer --- .../transport-circuit-relay-v2/src/transport/index.ts | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/packages/transport-circuit-relay-v2/src/transport/index.ts b/packages/transport-circuit-relay-v2/src/transport/index.ts index 3008ec71ec..676ae30018 100644 --- a/packages/transport-circuit-relay-v2/src/transport/index.ts +++ b/packages/transport-circuit-relay-v2/src/transport/index.ts @@ -165,7 +165,16 @@ export class CircuitRelayTransport implements Transport }) options.onProgress?.(new CustomProgressEvent('circuit-relay:open-connection')) - relayConnection = await this.components.connectionManager.openConnection(relayPeer, options) + + try { + // Dial the concrete relay address first. Peer-id dialing can fail with + // NoValidAddressesError in nested relay/WebRTC flows if candidate + // address discovery/filtering returns no dialable addresses. + relayConnection = await this.components.connectionManager.openConnection(relayAddr, options) + } catch (err: any) { + this.log.error('opening direct relay connection to %a failed, falling back to peer dial for %p - %e', relayAddr, relayPeer, err) + relayConnection = await this.components.connectionManager.openConnection(relayPeer, options) + } } else { options.onProgress?.(new CustomProgressEvent('circuit-relay:reuse-connection')) } From 8306cf167ac63dbb288bd85b0db77946dfbf85f9 Mon Sep 17 00:00:00 2001 From: dozyio Date: Sat, 11 Apr 2026 17:06:16 +0100 Subject: [PATCH 09/10] Revert "Merge branch 'fix/kad-getClosestPeers' into fix/webrtc-NoValidAddressesError" This reverts commit cc1ee710a41756eaf35a17ade098831b15617b19, reversing changes made to 373c3c272eaa847a3ddd979a055f17436aa58cd3. --- packages/kad-dht/src/index.ts | 15 --- packages/kad-dht/src/kad-dht.ts | 17 +-- packages/kad-dht/src/peer-routing/index.ts | 34 ++++-- packages/kad-dht/src/query/manager.ts | 31 +++-- packages/kad-dht/src/query/query-path.ts | 4 +- packages/kad-dht/src/routing-table/index.ts | 124 -------------------- packages/kad-dht/test/kad-dht.spec.ts | 52 -------- packages/kad-dht/test/query.spec.ts | 75 ------------ 8 files changed, 48 insertions(+), 304 deletions(-) diff --git a/packages/kad-dht/src/index.ts b/packages/kad-dht/src/index.ts index 30c9c0a1d4..16c2409da1 100644 --- a/packages/kad-dht/src/index.ts +++ b/packages/kad-dht/src/index.ts @@ -500,21 +500,6 @@ export interface KadDHTInit { */ disjointPaths?: number - /** - * Concurrency for background routing table updates from query responses. - * - * @default min(alpha * 2, 16) - */ - routingTableUpdateQueueConcurrency?: number - - /** - * Minimum time in ms between background routing table update attempts for - * the same responding peer. - * - * @default 30_000 - */ - routingTableUpdatePeerTtl?: number - /** * How many bits of the KAD-ID of peers to use when creating the routing * table. diff --git a/packages/kad-dht/src/kad-dht.ts b/packages/kad-dht/src/kad-dht.ts index ad47b964de..5fb8b0d89b 100644 --- a/packages/kad-dht/src/kad-dht.ts +++ b/packages/kad-dht/src/kad-dht.ts @@ -196,8 +196,7 @@ export class KadDHT extends TypedEventEmitter implements Ka this.network = new Network(components, { protocol: this.protocol, logPrefix, - metricsPrefix, - timeout: init.networkDialTimeout + metricsPrefix }) this.routingTable = new RoutingTable(components, { @@ -213,9 +212,7 @@ export class KadDHT extends TypedEventEmitter implements Ka metricsPrefix, prefixLength: init.prefixLength, splitThreshold: init.kBucketSplitThreshold, - network: this.network, - routingTableUpdateQueueConcurrency: init.routingTableUpdateQueueConcurrency ?? Math.max(1, Math.min(this.a * 2, 16)), - routingTableUpdatePeerTtl: init.routingTableUpdatePeerTtl + network: this.network }) // all queries should wait for the initial query-self query to run so we have @@ -408,9 +405,13 @@ export class KadDHT extends TypedEventEmitter implements Ka const signal = AbortSignal.timeout(this.onPeerConnectTimeout) setMaxListeners(Infinity, signal) - this.routingTable.queueRoutingTableUpdate(peerData.id, { - signal - }) + try { + await this.routingTable.add(peerData.id, { + signal + }) + } catch (err: any) { + this.log.error('could not add %p to routing table - %e', peerData.id, err) + } } /** diff --git a/packages/kad-dht/src/peer-routing/index.ts b/packages/kad-dht/src/peer-routing/index.ts index ce10abb290..76e0196165 100644 --- a/packages/kad-dht/src/peer-routing/index.ts +++ b/packages/kad-dht/src/peer-routing/index.ts @@ -21,6 +21,7 @@ import type { QueryFunc } from '../query/types.js' import type { RoutingTable } from '../routing-table/index.js' import type { GetClosestPeersOptions } from '../routing-table/k-bucket.ts' import type { ComponentLogger, Logger, Metrics, PeerId, PeerInfo, PeerStore, RoutingOptions } from '@libp2p/interface' +import type { ConnectionManager } from '@libp2p/interface-internal' import type { AbortOptions } from 'it-pushable' export interface PeerRoutingComponents { @@ -28,6 +29,7 @@ export interface PeerRoutingComponents { peerStore: PeerStore logger: ComponentLogger metrics?: Metrics + connectionManager: ConnectionManager } export interface PeerRoutingInit { @@ -263,17 +265,29 @@ export class PeerRouting { this.log('found %d peers close to %b', peers.length, key) - for (const { peer, path } of peers.peers) { - yield finalPeerEvent({ - from: this.components.peerId, - peer, - path: { - index: path.index, - queued: 0, - running: 0, - total: 0 + for (let { peer, path } of peers.peers) { + try { + if (peer.multiaddrs.length === 0) { + peer = await self.components.peerStore.getInfo(peer.id, options) + } + + if (peer.multiaddrs.length === 0) { + continue } - }, options) + + yield finalPeerEvent({ + from: this.components.peerId, + peer: await self.components.peerStore.getInfo(peer.id, options), + path: { + index: path.index, + queued: 0, + running: 0, + total: 0 + } + }, options) + } catch { + continue + } } } diff --git a/packages/kad-dht/src/query/manager.ts b/packages/kad-dht/src/query/manager.ts index dd0763f403..681c682fcf 100644 --- a/packages/kad-dht/src/query/manager.ts +++ b/packages/kad-dht/src/query/manager.ts @@ -13,7 +13,7 @@ import { queryPath } from './query-path.js' import type { QueryFunc } from './types.js' import type { QueryEvent } from '../index.js' import type { RoutingTable } from '../routing-table/index.js' -import type { AbortOptions, ComponentLogger, Metrics, PeerId, RoutingOptions, Startable } from '@libp2p/interface' +import type { ComponentLogger, Metrics, PeerId, RoutingOptions, Startable } from '@libp2p/interface' import type { ConnectionManager } from '@libp2p/interface-internal' import type { DeferredPromise } from 'p-defer' @@ -77,21 +77,6 @@ export class QueryManager implements Startable { this.running = false } - getRoutingUpdateQueueStats (): { - queued: number - running: number - total: number - enqueued: number - deduped: number - completed: number - failed: number - aborted: number - cancelledBeforeStart: number - ttlSkipped: number - } { - return this.routingTable.getRoutingUpdateQueueStats() - } - isStarted (): boolean { return this.running } @@ -158,6 +143,7 @@ export class QueryManager implements Startable { // query a subset of peers up to `kBucketSize / 2` in length let queryFinished = false + try { if (this.routingTable.size === 0 && !this.allowQueryWithZeroPeers) { log('routing table was empty, waiting for some peers before running%s query', options.isSelfQuery === true ? ' self' : '') @@ -235,7 +221,18 @@ export class QueryManager implements Startable { } if (event.name === 'PEER_RESPONSE') { - this.routingTable.queueRoutingTableUpdate(event.from) + for (const peer of [...event.closer, ...event.providers]) { + // eslint-disable-next-line max-depth + if (!(await this.connectionManager.isDialable(peer.multiaddrs, { + signal + }))) { + continue + } + + await this.routingTable.add(peer.id, { + signal + }) + } } signal.throwIfAborted() diff --git a/packages/kad-dht/src/query/query-path.ts b/packages/kad-dht/src/query/query-path.ts index 1329aa4e52..978d173bdc 100644 --- a/packages/kad-dht/src/query/query-path.ts +++ b/packages/kad-dht/src/query/query-path.ts @@ -159,9 +159,7 @@ export async function * queryPath (options: QueryPathOptions): AsyncGenerator implemen routingTableKadBucketMaxOccupancy: Metric kadBucketEvents: CounterGroup<'ping_old_contact' | 'ping_old_contact_error' | 'ping_new_contact' | 'ping_new_contact_error' | 'peer_added' | 'peer_removed'> } - private readonly routingUpdateQueue: PeerQueue - private readonly routingUpdatePeerTtl: number - private readonly routingUpdateRecent: Map - private readonly routingUpdateStats: RoutingTableUpdateQueueStats private shutdownController: AbortController @@ -133,26 +116,9 @@ export class RoutingTable extends TypedEventEmitter implemen this.peerRemoved = this.peerRemoved.bind(this) this.populateFromDatastoreOnStart = init.populateFromDatastoreOnStart ?? POPULATE_FROM_DATASTORE_ON_START this.populateFromDatastoreLimit = init.populateFromDatastoreLimit ?? POPULATE_FROM_DATASTORE_LIMIT - this.routingUpdatePeerTtl = init.routingTableUpdatePeerTtl ?? ROUTING_UPDATE_PEER_TTL - this.routingUpdateRecent = new Map() - this.routingUpdateStats = { - enqueued: 0, - deduped: 0, - completed: 0, - failed: 0, - aborted: 0, - cancelledBeforeStart: 0, - ttlSkipped: 0 - } this.shutdownController = new AbortController() setMaxListeners(Infinity, this.shutdownController.signal) - this.routingUpdateQueue = new PeerQueue({ - concurrency: init.routingTableUpdateQueueConcurrency ?? PING_NEW_CONTACT_CONCURRENCY, - metricName: `${init.metricsPrefix}_routing_update_queue`, - metrics: this.components.metrics - }) - this.pingOldContactQueue = new PeerQueue({ concurrency: init.pingOldContactConcurrency ?? PING_OLD_CONTACT_CONCURRENCY, metricName: `${init.metricsPrefix}_ping_old_contact_queue`, @@ -287,101 +253,11 @@ export class RoutingTable extends TypedEventEmitter implemen async stop (): Promise { this.running = false await stop(this.closestPeerTagger, this.kb) - this.routingUpdateStats.cancelledBeforeStart += this.routingUpdateQueue.queued - this.routingUpdateQueue.abort() this.pingOldContactQueue.abort() this.pingNewContactQueue.abort() this.shutdownController.abort() } - getRoutingUpdateQueueStats (): { - queued: number - running: number - total: number - enqueued: number - deduped: number - completed: number - failed: number - aborted: number - cancelledBeforeStart: number - ttlSkipped: number - } { - return { - queued: this.routingUpdateQueue.queued, - running: this.routingUpdateQueue.running, - total: this.routingUpdateQueue.size, - ...this.routingUpdateStats - } - } - - queueRoutingTableUpdate (peerId: PeerId, options: AbortOptions = {}): void { - const peerIdStr = peerId.toString() - const now = Date.now() - - this.pruneRoutingUpdateRecent(now) - - const updateAllowedAt = this.routingUpdateRecent.get(peerIdStr) - if (updateAllowedAt != null && updateAllowedAt > now) { - this.routingUpdateStats.ttlSkipped++ - return - } - - const existingJob = this.routingUpdateQueue.find(peerId) - - if (existingJob != null) { - this.routingUpdateStats.deduped++ - void existingJob.join(options).catch(() => {}) - return - } - - this.routingUpdateRecent.set(peerIdStr, now + this.routingUpdatePeerTtl) - this.routingUpdateStats.enqueued++ - - void this.routingUpdateQueue.add(async (jobOptions) => { - const signal = jobOptions.signal == null - ? this.shutdownController.signal - : anySignal([this.shutdownController.signal, jobOptions.signal]) - - setMaxListeners(Infinity, signal) - - try { - await this.add(peerId, { - signal - }) - this.routingUpdateStats.completed++ - } catch (err: any) { - if (signal.aborted || err?.name === 'AbortError') { - this.routingUpdateStats.aborted++ - return - } - - this.routingUpdateStats.failed++ - throw err - } finally { - if ('clear' in signal) { - (signal as any).clear() - } - } - }, { - peerId, - signal: options.signal - }).catch(err => { - this.log.error('could not update routing table for peer %p - %e', peerId, err) - }) - } - - private pruneRoutingUpdateRecent (now: number): void { - if (this.routingUpdateRecent.size < 4096) { - return - } - - for (const [peerId, expiresAt] of this.routingUpdateRecent.entries()) { - if (expiresAt <= now) { - this.routingUpdateRecent.delete(peerId) - } - } - } - private async peerAdded (peer: Peer, bucket: LeafBucket, options?: AbortOptions): Promise { if (!this.components.peerId.equals(peer.peerId)) { await this.components.peerStore.merge(peer.peerId, { diff --git a/packages/kad-dht/test/kad-dht.spec.ts b/packages/kad-dht/test/kad-dht.spec.ts index 42d15d89c7..6afb0dbaf2 100644 --- a/packages/kad-dht/test/kad-dht.spec.ts +++ b/packages/kad-dht/test/kad-dht.spec.ts @@ -2,9 +2,7 @@ /* eslint max-nested-callbacks: ["error", 8] */ import { Libp2pRecord } from '@libp2p/record' -import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' -import delay from 'delay' import all from 'it-all' import drain from 'it-drain' import filter from 'it-filter' @@ -113,56 +111,6 @@ describe('KadDHT', () => { await dht.dht.stop() }) - - it('should enqueue peer connect routing updates via routing table', async () => { - const dht = await testDHT.spawn(undefined, false) - - const queueRoutingTableUpdateSpy = sinon.spy((dht.dht as any).routingTable, 'queueRoutingTableUpdate') - - await dht.dht.onPeerConnect({ - id: peerIds[0].peerId, - multiaddrs: [multiaddr('/ip4/127.0.0.1/tcp/1234')] - }) - - expect(queueRoutingTableUpdateSpy).to.have.property('calledOnce', true) - expect(queueRoutingTableUpdateSpy.firstCall.args[0].toString()).to.equal(peerIds[0].peerId.toString()) - }) - - it('should dedupe onPeerConnect routing updates via ttl', async () => { - const dht = await testDHT.spawn({ - routingTableUpdatePeerTtl: 60_000, - routingTableUpdateQueueConcurrency: 1 - }, false) - - await dht.dht.start() - - const routingTableAddSpy = sinon.spy((dht.dht as any).routingTable, 'add') - - await dht.dht.onPeerConnect({ - id: peerIds[1].peerId, - multiaddrs: [multiaddr('/ip4/127.0.0.1/tcp/2234')] - }) - - await dht.dht.onPeerConnect({ - id: peerIds[1].peerId, - multiaddrs: [multiaddr('/ip4/127.0.0.1/tcp/2234')] - }) - - for (let i = 0; i < 40; i++) { - const stats = (dht.dht as any).queryManager.getRoutingUpdateQueueStats() - - if (stats.ttlSkipped >= 1 && stats.completed >= 1) { - break - } - - await delay(10) - } - - const stats = (dht.dht as any).queryManager.getRoutingUpdateQueueStats() - - expect(routingTableAddSpy.callCount).to.equal(1) - expect(stats.ttlSkipped).to.be.greaterThan(0) - }) }) describe('content fetching', () => { diff --git a/packages/kad-dht/test/query.spec.ts b/packages/kad-dht/test/query.spec.ts index c43986efbc..d0c7c522f9 100644 --- a/packages/kad-dht/test/query.spec.ts +++ b/packages/kad-dht/test/query.spec.ts @@ -152,20 +152,6 @@ describe('QueryManager', () => { beforeEach(async () => { routingTable.closestPeers.returns(peers.slice(0, K).map(p => p.peerId)) - routingTable.queueRoutingTableUpdate.resetHistory() - routingTable.getRoutingUpdateQueueStats.resetHistory() - routingTable.getRoutingUpdateQueueStats.returns({ - queued: 0, - running: 0, - total: 0, - enqueued: 0, - deduped: 0, - completed: 0, - failed: 0, - aborted: 0, - cancelledBeforeStart: 0, - ttlSkipped: 0 - }) }) it('does not run queries before start', async () => { @@ -1012,65 +998,4 @@ describe('QueryManager', () => { await manager.stop() }) - - it('should queue routing table updates from peer response events', async () => { - const manager = new QueryManager({ - peerId: ourPeerId, - logger: defaultLogger(), - connectionManager: stubInterface({ - isDialable: async () => true - }) - }, { - ...defaultInit() - }) - - routingTable.closestPeers.returns([peers[0].peerId]) - await manager.start() - - const queryFunc: QueryFunc = async function * ({ peer, path }) { - yield peerResponseEvent({ - from: peer.id, - messageType: MessageType.GET_VALUE, - path - }) - } - - await all(manager.run(key, queryFunc)) - - expect(routingTable.queueRoutingTableUpdate.calledOnce).to.be.true() - expect(routingTable.queueRoutingTableUpdate.firstCall.args[0].toString()).to.equal(peers[0].peerId.toString()) - expect(routingTable.queueRoutingTableUpdate.firstCall.args).to.have.lengthOf(1) - - await manager.stop() - }) - - it('should delegate routing update queue stats to the routing table', async () => { - const expectedStats = { - queued: 1, - running: 2, - total: 3, - enqueued: 4, - deduped: 5, - completed: 6, - failed: 7, - aborted: 8, - cancelledBeforeStart: 9, - ttlSkipped: 10 - } - - routingTable.getRoutingUpdateQueueStats.returns(expectedStats) - - const manager = new QueryManager({ - peerId: ourPeerId, - logger: defaultLogger(), - connectionManager: stubInterface({ - isDialable: async () => true - }) - }, { - ...defaultInit() - }) - - expect(manager.getRoutingUpdateQueueStats()).to.deep.equal(expectedStats) - expect(routingTable.getRoutingUpdateQueueStats.calledOnce).to.be.true() - }) }) From 230bfa8a7bb4b90f56a113f52968deae54a09d25 Mon Sep 17 00:00:00 2001 From: dozyio Date: Sat, 11 Apr 2026 17:19:38 +0100 Subject: [PATCH 10/10] chore: ircuit -> circuit --- packages/transport-circuit-relay-v2/src/transport/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/transport-circuit-relay-v2/src/transport/index.ts b/packages/transport-circuit-relay-v2/src/transport/index.ts index 676ae30018..e9ed99a726 100644 --- a/packages/transport-circuit-relay-v2/src/transport/index.ts +++ b/packages/transport-circuit-relay-v2/src/transport/index.ts @@ -148,7 +148,7 @@ export class CircuitRelayTransport implements Transport const destinationId = destinationAddr.getComponents().find(c => c.code === CODE_P2P)?.value if (relayId == null || destinationId == null) { - const errMsg = `ircuit relay dial to ${ma.toString()} failed as address did not have both relay and destination PeerIDs` + const errMsg = `circuit relay dial to ${ma.toString()} failed as address did not have both relay and destination PeerIDs` this.log.error(`c${errMsg}`) throw new DialError(`C${errMsg}`) }