Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions packages/interface/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,18 @@ export class TooManyInboundProtocolStreamsError extends Error {
}
}

/**
* This error is thrown when an inbound protocol stream exceeds the configured rate limit
*/
export class InboundStreamRateLimitError extends Error {
static name = 'InboundStreamRateLimitError'

constructor (message = 'Inbound stream rate limit exceeded') {
super(message)
this.name = 'InboundStreamRateLimitError'
}
}

/**
* This error is thrown where there are too many outbound protocols streams open
*/
Expand Down
20 changes: 20 additions & 0 deletions packages/interface/src/stream-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,26 @@ export interface StreamHandlerOptions extends AbortOptions {
*/
maxInboundStreams?: number

/**
* Limits how many new inbound streams can be opened for this protocol per
* connection within a sliding time window. Use this to prevent malicious
* peers from flooding built-in protocols (e.g. identify/push).
*
* @example
* // Allow at most 10 new streams per 60 seconds per connection
* inboundStreamRateLimit: { count: 10, interval: 60_000 }
*/
inboundStreamRateLimit?: {
/**
* Maximum number of new inbound streams allowed within the interval
*/
count: number
/**
* Length of the time window in milliseconds
*/
interval: number
}

/**
* How many outgoing streams can be open for this protocol at the same time on each connection
*
Expand Down
41 changes: 40 additions & 1 deletion packages/libp2p/src/connection.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { connectionSymbol, LimitedConnectionError, ConnectionClosedError, TooManyOutboundProtocolStreamsError, TooManyInboundProtocolStreamsError, StreamCloseEvent } from '@libp2p/interface'
import { connectionSymbol, LimitedConnectionError, ConnectionClosedError, TooManyOutboundProtocolStreamsError, TooManyInboundProtocolStreamsError, InboundStreamRateLimitError, StreamCloseEvent } from '@libp2p/interface'
import * as mss from '@libp2p/multistream-select'
import { CODE_P2P } from '@multiformats/multiaddr'
import { setMaxListeners, TypedEventEmitter } from 'main-event'
Expand Down Expand Up @@ -54,6 +54,12 @@ export class Connection extends TypedEventEmitter<MessageStreamEvents> implement
private readonly inboundStreamProtocolNegotiationTimeout: number
private readonly closeTimeout: number

/**
* Tracks per-protocol inbound stream rate limit state for this connection.
* Key is protocol string; value is a fixed-window counter.
*/
private readonly inboundRateLimitState: Map<string, { count: number, resetAt: number }>

constructor (components: ConnectionComponents, init: ConnectionInit) {
super()

Expand All @@ -72,6 +78,7 @@ export class Connection extends TypedEventEmitter<MessageStreamEvents> implement
this.inboundStreamProtocolNegotiationTimeout = init.inboundStreamProtocolNegotiationTimeout ?? PROTOCOL_NEGOTIATION_TIMEOUT
this.closeTimeout = init.closeTimeout ?? CONNECTION_CLOSE_TIMEOUT
this.direct = isDirect(init.maConn.remoteAddr)
this.inboundRateLimitState = new Map()

this.onIncomingStream = this.onIncomingStream.bind(this)

Expand Down Expand Up @@ -234,6 +241,25 @@ export class Connection extends TypedEventEmitter<MessageStreamEvents> implement
throw new TooManyInboundProtocolStreamsError(`Too many inbound protocol streams for protocol "${muxedStream.protocol}" - limit ${incomingLimit}`)
}

// Check per-protocol inbound stream rate limit
const rateLimit = findIncomingStreamRateLimit(muxedStream.protocol, this.components.registrar)

if (rateLimit != null) {
const now = Date.now()
let state = this.inboundRateLimitState.get(muxedStream.protocol)

if (state == null || now >= state.resetAt) {
state = { count: 0, resetAt: now + rateLimit.interval }
this.inboundRateLimitState.set(muxedStream.protocol, state)
}

state.count++

if (state.count > rateLimit.count) {
throw new InboundStreamRateLimitError(`Inbound stream rate limit exceeded for protocol "${muxedStream.protocol}" - ${state.count}/${rateLimit.count} streams opened within ${rateLimit.interval}ms`)
}
}

// If a protocol stream has been successfully negotiated and is to be passed to the application,
// the peer store should ensure that the peer is registered with that protocol
await this.components.peerStore.merge(this.remotePeer, {
Expand Down Expand Up @@ -353,6 +379,19 @@ function findOutgoingStreamLimit (protocol: string, registrar: Registrar, option
return options.maxOutboundStreams ?? DEFAULT_MAX_OUTBOUND_STREAMS
}

function findIncomingStreamRateLimit (protocol: string, registrar: Registrar): { count: number, interval: number } | undefined {
try {
const { options } = registrar.getHandler(protocol)
return options.inboundStreamRateLimit
} catch (err: any) {
if (err.name !== 'UnhandledProtocolError') {
throw err
}
}

return undefined
}

function countStreams (protocol: string, direction: 'inbound' | 'outbound', connection: Connection): number {
let streamCount = 0

Expand Down
165 changes: 165 additions & 0 deletions packages/libp2p/test/connection/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,171 @@ describe('connection', () => {
expect(muxer.streams[2]).to.have.property('status', 'aborted')
})

describe('inboundStreamRateLimit', () => {
it('should abort inbound streams that exceed the rate limit within the window', async () => {
const protocol = '/test/rate-limit'
const rateLimit = { count: 2, interval: 10_000 }

registrar.getHandler.withArgs(protocol).returns({
handler: Sinon.stub(),
options: { inboundStreamRateLimit: rateLimit }
})
registrar.getProtocols.returns([protocol])
registrar.getMiddleware.withArgs(protocol).returns([])

const connection = createConnection(components, init)

// Open count streams — all should be allowed
for (let i = 0; i < rateLimit.count; i++) {
const [outboundStream, inboundStream] = await streamPair()
outboundStream.send(encode.single(uint8ArrayFromString('/multistream/1.0.0\n')))
outboundStream.send(encode.single(uint8ArrayFromString(`${protocol}\n`)))
muxer.streams.push(inboundStream)
muxer.safeDispatchEvent('stream', { detail: inboundStream })
await delay(50)
}

// Open one more stream — should be rate-limited and aborted
const [outboundStream, inboundStream] = await streamPair()
outboundStream.send(encode.single(uint8ArrayFromString('/multistream/1.0.0\n')))
outboundStream.send(encode.single(uint8ArrayFromString(`${protocol}\n`)))
muxer.streams.push(inboundStream)
muxer.safeDispatchEvent('stream', { detail: inboundStream })

await delay(100)

const streams = muxer.streams
expect(streams).to.have.lengthOf(rateLimit.count + 1)

// First `count` streams are open, the last is aborted
for (let i = 0; i < rateLimit.count; i++) {
expect(streams[i]).to.have.property('status', 'open')
}
expect(streams[rateLimit.count]).to.have.property('status', 'aborted')
})

it('should reset the rate limit counter after the interval expires', async () => {
const protocol = '/test/rate-limit-reset'
const rateLimit = { count: 2, interval: 200 } // short window for testing

registrar.getHandler.withArgs(protocol).returns({
handler: Sinon.stub(),
options: { inboundStreamRateLimit: rateLimit }
})
registrar.getProtocols.returns([protocol])
registrar.getMiddleware.withArgs(protocol).returns([])

const connection = createConnection(components, init)

// Fill up the first window
for (let i = 0; i < rateLimit.count; i++) {
const [outboundStream, inboundStream] = await streamPair()
outboundStream.send(encode.single(uint8ArrayFromString('/multistream/1.0.0\n')))
outboundStream.send(encode.single(uint8ArrayFromString(`${protocol}\n`)))
muxer.streams.push(inboundStream)
muxer.safeDispatchEvent('stream', { detail: inboundStream })
await delay(50)
}

// Wait for the window to expire
await delay(rateLimit.interval + 50)

// Open a new stream after the reset — should be allowed
const [outboundStream, inboundStream] = await streamPair()
outboundStream.send(encode.single(uint8ArrayFromString('/multistream/1.0.0\n')))
outboundStream.send(encode.single(uint8ArrayFromString(`${protocol}\n`)))
muxer.streams.push(inboundStream)
muxer.safeDispatchEvent('stream', { detail: inboundStream })

await delay(100)

// All streams should be open — the counter was reset
const streams = muxer.streams
expect(streams).to.have.lengthOf(rateLimit.count + 1)
for (const stream of streams) {
expect(stream).to.have.property('status', 'open')
}
})

it('should apply rate limits per protocol independently', async () => {
const protocolA = '/test/rate-limit-a'
const protocolB = '/test/rate-limit-b'
const rateLimit = { count: 1, interval: 10_000 }

for (const protocol of [protocolA, protocolB]) {
registrar.getHandler.withArgs(protocol).returns({
handler: Sinon.stub(),
options: { inboundStreamRateLimit: rateLimit }
})
registrar.getMiddleware.withArgs(protocol).returns([])
}
registrar.getProtocols.returns([protocolA, protocolB])

createConnection(components, init)

// Open 1 stream on protocol A (hits its limit)
const [outA1, inA1] = await streamPair()
outA1.send(encode.single(uint8ArrayFromString('/multistream/1.0.0\n')))
outA1.send(encode.single(uint8ArrayFromString(`${protocolA}\n`)))
muxer.streams.push(inA1)
muxer.safeDispatchEvent('stream', { detail: inA1 })
await delay(50)

// Open 1 stream on protocol B — independent limit, should be allowed
const [outB1, inB1] = await streamPair()
outB1.send(encode.single(uint8ArrayFromString('/multistream/1.0.0\n')))
outB1.send(encode.single(uint8ArrayFromString(`${protocolB}\n`)))
muxer.streams.push(inB1)
muxer.safeDispatchEvent('stream', { detail: inB1 })
await delay(50)

// Second stream on protocol A — should be rate-limited
const [outA2, inA2] = await streamPair()
outA2.send(encode.single(uint8ArrayFromString('/multistream/1.0.0\n')))
outA2.send(encode.single(uint8ArrayFromString(`${protocolA}\n`)))
muxer.streams.push(inA2)
muxer.safeDispatchEvent('stream', { detail: inA2 })

await delay(100)

expect(muxer.streams).to.have.lengthOf(3)
expect(muxer.streams[0]).to.have.property('status', 'open') // protocolA stream 1
expect(muxer.streams[1]).to.have.property('status', 'open') // protocolB stream 1
expect(muxer.streams[2]).to.have.property('status', 'aborted') // protocolA stream 2 (rate-limited)
})

it('should not rate limit protocols that have no inboundStreamRateLimit configured', async () => {
const protocol = '/test/no-rate-limit'
const maxInboundStreams = 10

registrar.getHandler.withArgs(protocol).returns({
handler: Sinon.stub(),
options: { maxInboundStreams }
})
registrar.getProtocols.returns([protocol])
registrar.getMiddleware.withArgs(protocol).returns([])

createConnection(components, init)

// Open many streams rapidly — none should be rate-limited
for (let i = 0; i < 5; i++) {
const [outboundStream, inboundStream] = await streamPair()
outboundStream.send(encode.single(uint8ArrayFromString('/multistream/1.0.0\n')))
outboundStream.send(encode.single(uint8ArrayFromString(`${protocol}\n`)))
muxer.streams.push(inboundStream)
muxer.safeDispatchEvent('stream', { detail: inboundStream })
await delay(20)
}

await delay(100)

expect(muxer.streams).to.have.lengthOf(5)
for (const stream of muxer.streams) {
expect(stream).to.have.property('status', 'open')
}
})
})

it('should limit the number of outgoing streams that can be opened using a protocol', async () => {
const protocol = '/test/protocol'
const maxOutboundStreams = 2
Expand Down
Loading