From 466b8bc381675ebbdc161144b6dcf4d526aaf27a Mon Sep 17 00:00:00 2001 From: paschal533 Date: Thu, 9 Apr 2026 16:58:37 +0100 Subject: [PATCH] feat: rate limit inbound streams per protocol per connection Closes #2093 --- packages/interface/src/errors.ts | 12 ++ packages/interface/src/stream-handler.ts | 20 +++ packages/libp2p/src/connection.ts | 41 ++++- packages/libp2p/test/connection/index.spec.ts | 165 ++++++++++++++++++ 4 files changed, 237 insertions(+), 1 deletion(-) diff --git a/packages/interface/src/errors.ts b/packages/interface/src/errors.ts index 74584a9dc4..f7af218282 100644 --- a/packages/interface/src/errors.ts +++ b/packages/interface/src/errors.ts @@ -364,6 +364,18 @@ export class TooManyInboundProtocolStreamsError extends Error { } } +/** + * This error is thrown when an inbound protocol stream exceeds the configured rate limit + */ +export class InboundStreamRateLimitError extends Error { + static name = 'InboundStreamRateLimitError' + + constructor (message = 'Inbound stream rate limit exceeded') { + super(message) + this.name = 'InboundStreamRateLimitError' + } +} + /** * This error is thrown where there are too many outbound protocols streams open */ diff --git a/packages/interface/src/stream-handler.ts b/packages/interface/src/stream-handler.ts index 728fedfe40..ff174307aa 100644 --- a/packages/interface/src/stream-handler.ts +++ b/packages/interface/src/stream-handler.ts @@ -22,6 +22,26 @@ export interface StreamHandlerOptions extends AbortOptions { */ maxInboundStreams?: number + /** + * Limits how many new inbound streams can be opened for this protocol per + * connection within a sliding time window. Use this to prevent malicious + * peers from flooding built-in protocols (e.g. identify/push). + * + * @example + * // Allow at most 10 new streams per 60 seconds per connection + * inboundStreamRateLimit: { count: 10, interval: 60_000 } + */ + inboundStreamRateLimit?: { + /** + * Maximum number of new inbound streams allowed within the interval + */ + count: number + /** + * Length of the time window in milliseconds + */ + interval: number + } + /** * How many outgoing streams can be open for this protocol at the same time on each connection * diff --git a/packages/libp2p/src/connection.ts b/packages/libp2p/src/connection.ts index 6308e1e422..18f37586ce 100644 --- a/packages/libp2p/src/connection.ts +++ b/packages/libp2p/src/connection.ts @@ -1,4 +1,4 @@ -import { connectionSymbol, LimitedConnectionError, ConnectionClosedError, TooManyOutboundProtocolStreamsError, TooManyInboundProtocolStreamsError, StreamCloseEvent } from '@libp2p/interface' +import { connectionSymbol, LimitedConnectionError, ConnectionClosedError, TooManyOutboundProtocolStreamsError, TooManyInboundProtocolStreamsError, InboundStreamRateLimitError, StreamCloseEvent } from '@libp2p/interface' import * as mss from '@libp2p/multistream-select' import { CODE_P2P } from '@multiformats/multiaddr' import { setMaxListeners, TypedEventEmitter } from 'main-event' @@ -53,6 +53,12 @@ export class Connection extends TypedEventEmitter implement private readonly inboundStreamProtocolNegotiationTimeout: number private readonly closeTimeout: number + /** + * Tracks per-protocol inbound stream rate limit state for this connection. + * Key is protocol string; value is a fixed-window counter. + */ + private readonly inboundRateLimitState: Map + constructor (components: ConnectionComponents, init: ConnectionInit) { super() @@ -71,6 +77,7 @@ export class Connection extends TypedEventEmitter implement this.inboundStreamProtocolNegotiationTimeout = init.inboundStreamProtocolNegotiationTimeout ?? PROTOCOL_NEGOTIATION_TIMEOUT this.closeTimeout = init.closeTimeout ?? CONNECTION_CLOSE_TIMEOUT this.direct = isDirect(init.maConn.remoteAddr) + this.inboundRateLimitState = new Map() this.onIncomingStream = this.onIncomingStream.bind(this) @@ -221,6 +228,25 @@ export class Connection extends TypedEventEmitter implement throw new TooManyInboundProtocolStreamsError(`Too many inbound protocol streams for protocol "${muxedStream.protocol}" - limit ${incomingLimit}`) } + // Check per-protocol inbound stream rate limit + const rateLimit = findIncomingStreamRateLimit(muxedStream.protocol, this.components.registrar) + + if (rateLimit != null) { + const now = Date.now() + let state = this.inboundRateLimitState.get(muxedStream.protocol) + + if (state == null || now >= state.resetAt) { + state = { count: 0, resetAt: now + rateLimit.interval } + this.inboundRateLimitState.set(muxedStream.protocol, state) + } + + state.count++ + + if (state.count > rateLimit.count) { + throw new InboundStreamRateLimitError(`Inbound stream rate limit exceeded for protocol "${muxedStream.protocol}" - ${state.count}/${rateLimit.count} streams opened within ${rateLimit.interval}ms`) + } + } + // If a protocol stream has been successfully negotiated and is to be passed to the application, // the peer store should ensure that the peer is registered with that protocol await this.components.peerStore.merge(this.remotePeer, { @@ -340,6 +366,19 @@ function findOutgoingStreamLimit (protocol: string, registrar: Registrar, option return options.maxOutboundStreams ?? DEFAULT_MAX_OUTBOUND_STREAMS } +function findIncomingStreamRateLimit (protocol: string, registrar: Registrar): { count: number, interval: number } | undefined { + try { + const { options } = registrar.getHandler(protocol) + return options.inboundStreamRateLimit + } catch (err: any) { + if (err.name !== 'UnhandledProtocolError') { + throw err + } + } + + return undefined +} + function countStreams (protocol: string, direction: 'inbound' | 'outbound', connection: Connection): number { let streamCount = 0 diff --git a/packages/libp2p/test/connection/index.spec.ts b/packages/libp2p/test/connection/index.spec.ts index df97511479..3edaf7bfac 100644 --- a/packages/libp2p/test/connection/index.spec.ts +++ b/packages/libp2p/test/connection/index.spec.ts @@ -251,6 +251,171 @@ describe('connection', () => { expect(muxer.streams[2]).to.have.property('status', 'aborted') }) + describe('inboundStreamRateLimit', () => { + it('should abort inbound streams that exceed the rate limit within the window', async () => { + const protocol = '/test/rate-limit' + const rateLimit = { count: 2, interval: 10_000 } + + registrar.getHandler.withArgs(protocol).returns({ + handler: Sinon.stub(), + options: { inboundStreamRateLimit: rateLimit } + }) + registrar.getProtocols.returns([protocol]) + registrar.getMiddleware.withArgs(protocol).returns([]) + + const connection = createConnection(components, init) + + // Open count streams — all should be allowed + for (let i = 0; i < rateLimit.count; i++) { + const [outboundStream, inboundStream] = await streamPair() + outboundStream.send(encode.single(uint8ArrayFromString('/multistream/1.0.0\n'))) + outboundStream.send(encode.single(uint8ArrayFromString(`${protocol}\n`))) + muxer.streams.push(inboundStream) + muxer.safeDispatchEvent('stream', { detail: inboundStream }) + await delay(50) + } + + // Open one more stream — should be rate-limited and aborted + const [outboundStream, inboundStream] = await streamPair() + outboundStream.send(encode.single(uint8ArrayFromString('/multistream/1.0.0\n'))) + outboundStream.send(encode.single(uint8ArrayFromString(`${protocol}\n`))) + muxer.streams.push(inboundStream) + muxer.safeDispatchEvent('stream', { detail: inboundStream }) + + await delay(100) + + const streams = muxer.streams + expect(streams).to.have.lengthOf(rateLimit.count + 1) + + // First `count` streams are open, the last is aborted + for (let i = 0; i < rateLimit.count; i++) { + expect(streams[i]).to.have.property('status', 'open') + } + expect(streams[rateLimit.count]).to.have.property('status', 'aborted') + }) + + it('should reset the rate limit counter after the interval expires', async () => { + const protocol = '/test/rate-limit-reset' + const rateLimit = { count: 2, interval: 200 } // short window for testing + + registrar.getHandler.withArgs(protocol).returns({ + handler: Sinon.stub(), + options: { inboundStreamRateLimit: rateLimit } + }) + registrar.getProtocols.returns([protocol]) + registrar.getMiddleware.withArgs(protocol).returns([]) + + const connection = createConnection(components, init) + + // Fill up the first window + for (let i = 0; i < rateLimit.count; i++) { + const [outboundStream, inboundStream] = await streamPair() + outboundStream.send(encode.single(uint8ArrayFromString('/multistream/1.0.0\n'))) + outboundStream.send(encode.single(uint8ArrayFromString(`${protocol}\n`))) + muxer.streams.push(inboundStream) + muxer.safeDispatchEvent('stream', { detail: inboundStream }) + await delay(50) + } + + // Wait for the window to expire + await delay(rateLimit.interval + 50) + + // Open a new stream after the reset — should be allowed + const [outboundStream, inboundStream] = await streamPair() + outboundStream.send(encode.single(uint8ArrayFromString('/multistream/1.0.0\n'))) + outboundStream.send(encode.single(uint8ArrayFromString(`${protocol}\n`))) + muxer.streams.push(inboundStream) + muxer.safeDispatchEvent('stream', { detail: inboundStream }) + + await delay(100) + + // All streams should be open — the counter was reset + const streams = muxer.streams + expect(streams).to.have.lengthOf(rateLimit.count + 1) + for (const stream of streams) { + expect(stream).to.have.property('status', 'open') + } + }) + + it('should apply rate limits per protocol independently', async () => { + const protocolA = '/test/rate-limit-a' + const protocolB = '/test/rate-limit-b' + const rateLimit = { count: 1, interval: 10_000 } + + for (const protocol of [protocolA, protocolB]) { + registrar.getHandler.withArgs(protocol).returns({ + handler: Sinon.stub(), + options: { inboundStreamRateLimit: rateLimit } + }) + registrar.getMiddleware.withArgs(protocol).returns([]) + } + registrar.getProtocols.returns([protocolA, protocolB]) + + createConnection(components, init) + + // Open 1 stream on protocol A (hits its limit) + const [outA1, inA1] = await streamPair() + outA1.send(encode.single(uint8ArrayFromString('/multistream/1.0.0\n'))) + outA1.send(encode.single(uint8ArrayFromString(`${protocolA}\n`))) + muxer.streams.push(inA1) + muxer.safeDispatchEvent('stream', { detail: inA1 }) + await delay(50) + + // Open 1 stream on protocol B — independent limit, should be allowed + const [outB1, inB1] = await streamPair() + outB1.send(encode.single(uint8ArrayFromString('/multistream/1.0.0\n'))) + outB1.send(encode.single(uint8ArrayFromString(`${protocolB}\n`))) + muxer.streams.push(inB1) + muxer.safeDispatchEvent('stream', { detail: inB1 }) + await delay(50) + + // Second stream on protocol A — should be rate-limited + const [outA2, inA2] = await streamPair() + outA2.send(encode.single(uint8ArrayFromString('/multistream/1.0.0\n'))) + outA2.send(encode.single(uint8ArrayFromString(`${protocolA}\n`))) + muxer.streams.push(inA2) + muxer.safeDispatchEvent('stream', { detail: inA2 }) + + await delay(100) + + expect(muxer.streams).to.have.lengthOf(3) + expect(muxer.streams[0]).to.have.property('status', 'open') // protocolA stream 1 + expect(muxer.streams[1]).to.have.property('status', 'open') // protocolB stream 1 + expect(muxer.streams[2]).to.have.property('status', 'aborted') // protocolA stream 2 (rate-limited) + }) + + it('should not rate limit protocols that have no inboundStreamRateLimit configured', async () => { + const protocol = '/test/no-rate-limit' + const maxInboundStreams = 10 + + registrar.getHandler.withArgs(protocol).returns({ + handler: Sinon.stub(), + options: { maxInboundStreams } + }) + registrar.getProtocols.returns([protocol]) + registrar.getMiddleware.withArgs(protocol).returns([]) + + createConnection(components, init) + + // Open many streams rapidly — none should be rate-limited + for (let i = 0; i < 5; i++) { + const [outboundStream, inboundStream] = await streamPair() + outboundStream.send(encode.single(uint8ArrayFromString('/multistream/1.0.0\n'))) + outboundStream.send(encode.single(uint8ArrayFromString(`${protocol}\n`))) + muxer.streams.push(inboundStream) + muxer.safeDispatchEvent('stream', { detail: inboundStream }) + await delay(20) + } + + await delay(100) + + expect(muxer.streams).to.have.lengthOf(5) + for (const stream of muxer.streams) { + expect(stream).to.have.property('status', 'open') + } + }) + }) + it('should limit the number of outgoing streams that can be opened using a protocol', async () => { const protocol = '/test/protocol' const maxOutboundStreams = 2