From 286f00b807442b3519f8c3e7e5995ead648524d3 Mon Sep 17 00:00:00 2001 From: Neha Kumari Date: Sat, 14 Mar 2026 17:48:25 +0530 Subject: [PATCH 1/5] feat: add protocol stream opened/closed counter metrics --- packages/libp2p/test/connection/index.spec.ts | 62 +++++++++- packages/metrics-prometheus/src/index.ts | 26 ++++- packages/metrics-simple/src/index.ts | 22 +++- .../test/track-protocol-stream.spec.ts | 106 ++++++++++++++++++ 4 files changed, 212 insertions(+), 4 deletions(-) create mode 100644 packages/metrics-simple/test/track-protocol-stream.spec.ts diff --git a/packages/libp2p/test/connection/index.spec.ts b/packages/libp2p/test/connection/index.spec.ts index df97511479..6284a8b740 100644 --- a/packages/libp2p/test/connection/index.spec.ts +++ b/packages/libp2p/test/connection/index.spec.ts @@ -12,7 +12,7 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { createConnection } from '../../src/connection.js' import { UnhandledProtocolError } from '../../src/errors.ts' import type { ConnectionComponents, ConnectionInit } from '../../src/connection.js' -import type { MultiaddrConnection, PeerStore, Stream, StreamMuxer } from '@libp2p/interface' +import type { MultiaddrConnection, PeerStore, Stream, StreamMuxer, Metrics } from '@libp2p/interface' import type { Registrar } from '@libp2p/interface-internal' import type { StubbedInstance } from 'sinon-ts' @@ -469,4 +469,64 @@ describe('connection', () => { expect(middleware2.called).to.be.false() expect(incomingStream).to.have.nested.property('abort.called', true) }) + + it('should call trackProtocolStream when a new outbound stream is opened', async () => { + const metrics = stubInterface() + + const connection = createConnection({ + ...components, + metrics + }, init) + + await connection.newStream([ECHO_PROTOCOL]) + + // trackProtocolStream must be called exactly once with the negotiated stream + expect(metrics.trackProtocolStream.callCount).to.equal(1) + expect(metrics.trackProtocolStream.firstCall.args[0]).to.have.property('protocol', ECHO_PROTOCOL) + expect(metrics.trackProtocolStream.firstCall.args[0]).to.have.property('direction', 'outbound') + }) + + it('should call trackProtocolStream when an inbound stream is opened', async () => { + const streamProtocol = '/test/protocol' + const metrics = stubInterface() + + registrar.getHandler.withArgs(streamProtocol).returns({ + handler: () => {}, + options: {} + }) + registrar.getMiddleware.withArgs(streamProtocol).returns([]) + registrar.getProtocols.returns([streamProtocol]) + + const stubbedMuxer = stubInterface({ streams: [] }) + + createConnection({ + ...components, + metrics + }, { + ...init, + muxer: stubbedMuxer + }) + + // grab the inbound stream listener registered on the muxer + const onIncomingStream = stubbedMuxer.addEventListener.getCall(0).args[1] + + if (onIncomingStream == null || typeof onIncomingStream !== 'function') { + throw new Error('No incoming stream handler registered') + } + + const incomingStream = stubInterface({ + log: defaultLogger().forComponent('stream'), + protocol: streamProtocol, + direction: 'inbound' + }) + + onIncomingStream(new CustomEvent('stream', { detail: incomingStream })) + + // inbound stream handling is async + await delay(100) + + expect(metrics.trackProtocolStream.callCount).to.equal(1) + expect(metrics.trackProtocolStream.firstCall.args[0]).to.have.property('protocol', streamProtocol) + expect(metrics.trackProtocolStream.firstCall.args[0]).to.have.property('direction', 'inbound') + }) }) diff --git a/packages/metrics-prometheus/src/index.ts b/packages/metrics-prometheus/src/index.ts index aaa84c8bf4..2b51fad85f 100644 --- a/packages/metrics-prometheus/src/index.ts +++ b/packages/metrics-prometheus/src/index.ts @@ -141,6 +141,8 @@ class PrometheusMetrics implements Metrics { private readonly log: Logger private transferStats: Map private readonly registry?: Registry + private readonly streamsOpened: CounterGroup + private readonly streamsClosed: CounterGroup constructor (components: PrometheusMetricsComponents, init?: Partial) { this.log = components.logger.forComponent('libp2p:prometheus-metrics') @@ -205,6 +207,16 @@ class PrometheusMetrics implements Metrics { } } }) + + this.log('Collecting protocol stream open/close metrics') + this.streamsOpened = this.registerCounterGroup('libp2p_protocol_streams_opened_total', { + label: 'protocol', + help: 'Total number of protocol streams opened, by direction and protocol' + }) + this.streamsClosed = this.registerCounterGroup('libp2p_protocol_streams_closed_total', { + label: 'protocol', + help: 'Total number of protocol streams closed, by direction and protocol' + }) } readonly [Symbol.toStringTag] = '@libp2p/metrics-prometheus' @@ -272,8 +284,20 @@ class PrometheusMetrics implements Metrics { // calls this handler after protocol negotiation return } - + + // existing byte-transfer tracking — unchanged this._track(stream, stream.protocol) + + // Label format: "direction protocol" e.g. "inbound /identify/1.0.0" + // Matches the format of the existing libp2p_protocol_streams_total gauge. + const label = `${stream.direction} ${stream.protocol}` + + // Stream is now open — increment the opened counter immediately. + this.streamsOpened.increment({ [label]: 1 }) + + stream.addEventListener('close', () => { + this.streamsClosed.increment({ [label]: 1 }) + }, { once: true }) } registerMetric (name: string, opts: PrometheusCalculatedMetricOptions): void diff --git a/packages/metrics-simple/src/index.ts b/packages/metrics-simple/src/index.ts index adf26194b9..84a3d17f9b 100644 --- a/packages/metrics-simple/src/index.ts +++ b/packages/metrics-simple/src/index.ts @@ -344,6 +344,8 @@ export interface SimpleMetricsComponents { class SimpleMetrics implements Metrics, Startable { public metrics = new Map() private readonly transferStats: Map + private readonly streamsOpened: CounterGroup + private readonly streamsClosed: CounterGroup private started: boolean private interval?: ReturnType private readonly intervalMs: number @@ -361,6 +363,16 @@ class SimpleMetrics implements Metrics, Startable { // holds global and per-protocol sent/received stats this.transferStats = new Map() + + this.streamsOpened = this.registerCounterGroup('libp2p_protocol_streams_opened_total', { + label: 'protocol', + help: 'Total number of protocol streams opened, by direction and protocol' + }) + + this.streamsClosed = this.registerCounterGroup('libp2p_protocol_streams_closed_total', { + label: 'protocol', + help: 'Total number of protocol streams closed, by direction and protocol' + }) } readonly [Symbol.toStringTag] = '@libp2p/metrics-simple' @@ -446,12 +458,18 @@ class SimpleMetrics implements Metrics, Startable { trackProtocolStream (stream: Stream): void { if (stream.protocol == null) { - // protocol not negotiated yet, should not happen as the upgrader - // calls this handler after protocol negotiation return } this._track(stream, stream.protocol) + + const label = `${stream.direction} ${stream.protocol}` + + this.streamsOpened.increment({ [label]: 1 }) + + stream.addEventListener('close', () => { + this.streamsClosed.increment({ [label]: 1 }) + }, { once: true }) } registerMetric (name: string, opts: CalculatedMetricOptions): void diff --git a/packages/metrics-simple/test/track-protocol-stream.spec.ts b/packages/metrics-simple/test/track-protocol-stream.spec.ts new file mode 100644 index 0000000000..e257eb6a53 --- /dev/null +++ b/packages/metrics-simple/test/track-protocol-stream.spec.ts @@ -0,0 +1,106 @@ +import { defaultLogger } from '@libp2p/logger' +import { expect } from 'aegir/chai' +import { stubInterface } from 'sinon-ts' +import { simpleMetrics } from '../src/index.js' +import type { Stream } from '@libp2p/interface' + +describe('SimpleMetrics - protocol stream counters', () => { + function makeMetrics (): ReturnType> { + return simpleMetrics({ onMetrics: () => {} })({ + logger: defaultLogger() + }) + } + + function makeStream (direction: 'inbound' | 'outbound', protocol: string): Stream { + const stub = stubInterface({ + direction, + protocol, + log: defaultLogger().forComponent('stream') + }) + const target = new EventTarget() + ;(stub as any).addEventListener = target.addEventListener.bind(target) + ;(stub as any).removeEventListener = target.removeEventListener.bind(target) + ;(stub as any).dispatchEvent = target.dispatchEvent.bind(target) + return stub + } + + it('increments opened counter when trackProtocolStream is called', async () => { + const metrics = makeMetrics() as any + const stream = makeStream('outbound', '/identify/1.0.0') + + metrics.trackProtocolStream(stream) + + const opened = await metrics.metrics.get('libp2p_protocol_streams_opened_total')?.collect() + const closed = await metrics.metrics.get('libp2p_protocol_streams_closed_total')?.collect() + + expect(opened).to.deep.include({ 'outbound /identify/1.0.0': 1 }) + // closed counter must NOT have fired yet + expect(closed).to.deep.equal({}) + }) + + it('increments closed counter on graceful stream close', async () => { + const metrics = makeMetrics() as any + const stream = makeStream('inbound', '/ping/1.0.0') + + metrics.trackProtocolStream(stream) + stream.dispatchEvent(new Event('close')) + + const opened = await metrics.metrics.get('libp2p_protocol_streams_opened_total')?.collect() + const closed = await metrics.metrics.get('libp2p_protocol_streams_closed_total')?.collect() + + expect(opened).to.deep.include({ 'inbound /ping/1.0.0': 1 }) + expect(closed).to.deep.include({ 'inbound /ping/1.0.0': 1 }) + }) + + it('increments closed counter exactly once because listener is registered with once:true', async () => { + const metrics = makeMetrics() as any + const stream = makeStream('outbound', '/test/1.0.0') + + metrics.trackProtocolStream(stream) + stream.dispatchEvent(new Event('close')) + stream.dispatchEvent(new Event('close')) + + const closed = await metrics.metrics.get('libp2p_protocol_streams_closed_total')?.collect() + expect(closed['outbound /test/1.0.0']).to.equal(1) + }) + + it('accumulates counts correctly across multiple streams', async () => { + const metrics = makeMetrics() as any + + const s1 = makeStream('outbound', '/identify/1.0.0') + const s2 = makeStream('outbound', '/identify/1.0.0') + const s3 = makeStream('inbound', '/identify/1.0.0') + + metrics.trackProtocolStream(s1) + metrics.trackProtocolStream(s2) + metrics.trackProtocolStream(s3) + + // close only s1 and s3 + s1.dispatchEvent(new Event('close')) + s3.dispatchEvent(new Event('close')) + + const opened = await metrics.metrics.get('libp2p_protocol_streams_opened_total')?.collect() + const closed = await metrics.metrics.get('libp2p_protocol_streams_closed_total')?.collect() + + // 2 outbound + 1 inbound opened + expect(opened['outbound /identify/1.0.0']).to.equal(2) + expect(opened['inbound /identify/1.0.0']).to.equal(1) + + // 1 outbound + 1 inbound closed (s2 still open) + expect(closed['outbound /identify/1.0.0']).to.equal(1) + expect(closed['inbound /identify/1.0.0']).to.equal(1) + }) + + it('does not track a stream with no protocol', async () => { + const metrics = makeMetrics() as any + const stream = stubInterface({ + protocol: undefined, + log: defaultLogger().forComponent('stream') + }) + + metrics.trackProtocolStream(stream) + + const opened = await metrics.metrics.get('libp2p_protocol_streams_opened_total')?.collect() + expect(opened).to.deep.equal({}) + }) +}) From 6a2517efe48d79bbed61ade36e1aab9f80a3d8ec Mon Sep 17 00:00:00 2001 From: Neha Kumari Date: Thu, 19 Mar 2026 15:47:18 +0530 Subject: [PATCH 2/5] chore: lint and polish for protocol stream metrics - Remove trailing spaces (libp2p, metrics-prometheus, metrics-simple) - Use TypedEventEmitter in track-protocol-stream spec instead of EventTarget + cast - Fix import order and add early-return comment in metrics-simple --- packages/libp2p/test/connection/index.spec.ts | 26 ++++----- packages/metrics-prometheus/src/index.ts | 9 ++- packages/metrics-simple/src/index.ts | 2 + .../test/track-protocol-stream.spec.ts | 55 ++++++++++--------- 4 files changed, 47 insertions(+), 45 deletions(-) diff --git a/packages/libp2p/test/connection/index.spec.ts b/packages/libp2p/test/connection/index.spec.ts index 6284a8b740..e759fbecb8 100644 --- a/packages/libp2p/test/connection/index.spec.ts +++ b/packages/libp2p/test/connection/index.spec.ts @@ -472,33 +472,33 @@ describe('connection', () => { it('should call trackProtocolStream when a new outbound stream is opened', async () => { const metrics = stubInterface() - + const connection = createConnection({ ...components, metrics }, init) - + await connection.newStream([ECHO_PROTOCOL]) - + // trackProtocolStream must be called exactly once with the negotiated stream expect(metrics.trackProtocolStream.callCount).to.equal(1) expect(metrics.trackProtocolStream.firstCall.args[0]).to.have.property('protocol', ECHO_PROTOCOL) expect(metrics.trackProtocolStream.firstCall.args[0]).to.have.property('direction', 'outbound') }) - + it('should call trackProtocolStream when an inbound stream is opened', async () => { const streamProtocol = '/test/protocol' const metrics = stubInterface() - + registrar.getHandler.withArgs(streamProtocol).returns({ handler: () => {}, options: {} }) registrar.getMiddleware.withArgs(streamProtocol).returns([]) registrar.getProtocols.returns([streamProtocol]) - + const stubbedMuxer = stubInterface({ streams: [] }) - + createConnection({ ...components, metrics @@ -506,25 +506,25 @@ describe('connection', () => { ...init, muxer: stubbedMuxer }) - + // grab the inbound stream listener registered on the muxer const onIncomingStream = stubbedMuxer.addEventListener.getCall(0).args[1] - + if (onIncomingStream == null || typeof onIncomingStream !== 'function') { throw new Error('No incoming stream handler registered') } - + const incomingStream = stubInterface({ log: defaultLogger().forComponent('stream'), protocol: streamProtocol, direction: 'inbound' }) - + onIncomingStream(new CustomEvent('stream', { detail: incomingStream })) - + // inbound stream handling is async await delay(100) - + expect(metrics.trackProtocolStream.callCount).to.equal(1) expect(metrics.trackProtocolStream.firstCall.args[0]).to.have.property('protocol', streamProtocol) expect(metrics.trackProtocolStream.firstCall.args[0]).to.have.property('direction', 'inbound') diff --git a/packages/metrics-prometheus/src/index.ts b/packages/metrics-prometheus/src/index.ts index 2b51fad85f..171c77d549 100644 --- a/packages/metrics-prometheus/src/index.ts +++ b/packages/metrics-prometheus/src/index.ts @@ -284,17 +284,16 @@ class PrometheusMetrics implements Metrics { // calls this handler after protocol negotiation return } - - // existing byte-transfer tracking — unchanged + this._track(stream, stream.protocol) - + // Label format: "direction protocol" e.g. "inbound /identify/1.0.0" // Matches the format of the existing libp2p_protocol_streams_total gauge. const label = `${stream.direction} ${stream.protocol}` - + // Stream is now open — increment the opened counter immediately. this.streamsOpened.increment({ [label]: 1 }) - + stream.addEventListener('close', () => { this.streamsClosed.increment({ [label]: 1 }) }, { once: true }) diff --git a/packages/metrics-simple/src/index.ts b/packages/metrics-simple/src/index.ts index 84a3d17f9b..710072e7c2 100644 --- a/packages/metrics-simple/src/index.ts +++ b/packages/metrics-simple/src/index.ts @@ -458,6 +458,8 @@ class SimpleMetrics implements Metrics, Startable { trackProtocolStream (stream: Stream): void { if (stream.protocol == null) { + // protocol not negotiated yet, should not happen as the upgrader + // calls this handler after protocol negotiation return } diff --git a/packages/metrics-simple/test/track-protocol-stream.spec.ts b/packages/metrics-simple/test/track-protocol-stream.spec.ts index e257eb6a53..a2e90d74d9 100644 --- a/packages/metrics-simple/test/track-protocol-stream.spec.ts +++ b/packages/metrics-simple/test/track-protocol-stream.spec.ts @@ -1,96 +1,97 @@ +import { TypedEventEmitter } from '@libp2p/interface' import { defaultLogger } from '@libp2p/logger' import { expect } from 'aegir/chai' import { stubInterface } from 'sinon-ts' import { simpleMetrics } from '../src/index.js' -import type { Stream } from '@libp2p/interface' - +import type { MessageStreamEvents, Stream } from '@libp2p/interface' + describe('SimpleMetrics - protocol stream counters', () => { function makeMetrics (): ReturnType> { return simpleMetrics({ onMetrics: () => {} })({ logger: defaultLogger() }) } - + function makeStream (direction: 'inbound' | 'outbound', protocol: string): Stream { + const target = new TypedEventEmitter() const stub = stubInterface({ direction, protocol, - log: defaultLogger().forComponent('stream') + log: defaultLogger().forComponent('stream'), + addEventListener: target.addEventListener.bind(target), + removeEventListener: target.removeEventListener.bind(target), + dispatchEvent: target.dispatchEvent.bind(target) }) - const target = new EventTarget() - ;(stub as any).addEventListener = target.addEventListener.bind(target) - ;(stub as any).removeEventListener = target.removeEventListener.bind(target) - ;(stub as any).dispatchEvent = target.dispatchEvent.bind(target) return stub } - + it('increments opened counter when trackProtocolStream is called', async () => { const metrics = makeMetrics() as any const stream = makeStream('outbound', '/identify/1.0.0') - + metrics.trackProtocolStream(stream) - + const opened = await metrics.metrics.get('libp2p_protocol_streams_opened_total')?.collect() const closed = await metrics.metrics.get('libp2p_protocol_streams_closed_total')?.collect() - + expect(opened).to.deep.include({ 'outbound /identify/1.0.0': 1 }) // closed counter must NOT have fired yet expect(closed).to.deep.equal({}) }) - + it('increments closed counter on graceful stream close', async () => { const metrics = makeMetrics() as any const stream = makeStream('inbound', '/ping/1.0.0') - + metrics.trackProtocolStream(stream) stream.dispatchEvent(new Event('close')) - + const opened = await metrics.metrics.get('libp2p_protocol_streams_opened_total')?.collect() const closed = await metrics.metrics.get('libp2p_protocol_streams_closed_total')?.collect() - + expect(opened).to.deep.include({ 'inbound /ping/1.0.0': 1 }) expect(closed).to.deep.include({ 'inbound /ping/1.0.0': 1 }) }) - + it('increments closed counter exactly once because listener is registered with once:true', async () => { const metrics = makeMetrics() as any const stream = makeStream('outbound', '/test/1.0.0') - + metrics.trackProtocolStream(stream) stream.dispatchEvent(new Event('close')) stream.dispatchEvent(new Event('close')) - + const closed = await metrics.metrics.get('libp2p_protocol_streams_closed_total')?.collect() expect(closed['outbound /test/1.0.0']).to.equal(1) }) - + it('accumulates counts correctly across multiple streams', async () => { const metrics = makeMetrics() as any - + const s1 = makeStream('outbound', '/identify/1.0.0') const s2 = makeStream('outbound', '/identify/1.0.0') const s3 = makeStream('inbound', '/identify/1.0.0') - + metrics.trackProtocolStream(s1) metrics.trackProtocolStream(s2) metrics.trackProtocolStream(s3) - + // close only s1 and s3 s1.dispatchEvent(new Event('close')) s3.dispatchEvent(new Event('close')) - + const opened = await metrics.metrics.get('libp2p_protocol_streams_opened_total')?.collect() const closed = await metrics.metrics.get('libp2p_protocol_streams_closed_total')?.collect() - + // 2 outbound + 1 inbound opened expect(opened['outbound /identify/1.0.0']).to.equal(2) expect(opened['inbound /identify/1.0.0']).to.equal(1) - + // 1 outbound + 1 inbound closed (s2 still open) expect(closed['outbound /identify/1.0.0']).to.equal(1) expect(closed['inbound /identify/1.0.0']).to.equal(1) }) - + it('does not track a stream with no protocol', async () => { const metrics = makeMetrics() as any const stream = stubInterface({ From 786b1dc2716928269ffe75aef636987ece686aed Mon Sep 17 00:00:00 2001 From: Neha Kumari Date: Sun, 29 Mar 2026 14:55:10 +0530 Subject: [PATCH 3/5] chore(simple-metrics): add sinon-ts devDependency for dep-check --- packages/metrics-simple/package.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/metrics-simple/package.json b/packages/metrics-simple/package.json index db396c6af7..a7f43197f4 100644 --- a/packages/metrics-simple/package.json +++ b/packages/metrics-simple/package.json @@ -49,7 +49,8 @@ "devDependencies": { "@types/tdigest": "^0.1.5", "aegir": "^47.0.22", - "p-defer": "^4.0.1" + "p-defer": "^4.0.1", + "sinon-ts": "^2.0.0" }, "sideEffects": false } From 8f909e3d875143a75b7e99c7bd00668057189a02 Mon Sep 17 00:00:00 2001 From: Neha Kumari Date: Wed, 1 Apr 2026 03:30:54 +0530 Subject: [PATCH 4/5] feat(metrics): protocol stream counters in OTEL; split clean vs error closes; add tests --- packages/metrics-opentelemetry/package.json | 1 + packages/metrics-opentelemetry/src/index.ts | 31 ++++- .../test/track-protocol-stream.spec.ts | 129 ++++++++++++++++++ packages/metrics-prometheus/src/index.ts | 16 ++- .../test/track-protocol-stream.spec.ts | 51 +++++++ packages/metrics-simple/src/index.ts | 17 ++- .../test/track-protocol-stream.spec.ts | 25 +++- 7 files changed, 261 insertions(+), 9 deletions(-) create mode 100644 packages/metrics-opentelemetry/test/track-protocol-stream.spec.ts create mode 100644 packages/metrics-prometheus/test/track-protocol-stream.spec.ts diff --git a/packages/metrics-opentelemetry/package.json b/packages/metrics-opentelemetry/package.json index 68171fa0de..c6db2d7c05 100644 --- a/packages/metrics-opentelemetry/package.json +++ b/packages/metrics-opentelemetry/package.json @@ -46,6 +46,7 @@ }, "devDependencies": { "@libp2p/logger": "^6.2.3", + "@opentelemetry/sdk-metrics": "^1.30.1", "aegir": "^47.0.22" }, "browser": { diff --git a/packages/metrics-opentelemetry/src/index.ts b/packages/metrics-opentelemetry/src/index.ts index ce6556bc7b..320909cf2a 100644 --- a/packages/metrics-opentelemetry/src/index.ts +++ b/packages/metrics-opentelemetry/src/index.ts @@ -45,7 +45,7 @@ import { OpenTelemetryMetric } from './metric.js' import { OpenTelemetrySummaryGroup } from './summary-group.js' import { OpenTelemetrySummary } from './summary.js' import { collectSystemMetrics } from './system-metrics.js' -import type { MultiaddrConnection, Stream, Metric, MetricGroup, Metrics, CalculatedMetricOptions, MetricOptions, Counter, CounterGroup, Histogram, HistogramOptions, HistogramGroup, Summary, SummaryOptions, SummaryGroup, CalculatedHistogramOptions, CalculatedSummaryOptions, NodeInfo, TraceFunctionOptions, TraceGeneratorFunctionOptions, TraceAttributes, ComponentLogger, Logger, MessageStream } from '@libp2p/interface' +import type { MultiaddrConnection, Stream, StreamCloseEvent, Metric, MetricGroup, Metrics, CalculatedMetricOptions, MetricOptions, Counter, CounterGroup, Histogram, HistogramOptions, HistogramGroup, Summary, SummaryOptions, SummaryGroup, CalculatedHistogramOptions, CalculatedSummaryOptions, NodeInfo, TraceFunctionOptions, TraceGeneratorFunctionOptions, TraceAttributes, ComponentLogger, Logger, MessageStream } from '@libp2p/interface' import type { Span, Attributes, Meter, Observable } from '@opentelemetry/api' // see https://betterstack.com/community/guides/observability/opentelemetry-metrics-nodejs/#prerequisites @@ -93,6 +93,9 @@ class OpenTelemetryMetrics implements Metrics { private readonly log: Logger private metrics: Map private observables: Map + private readonly streamsOpened: CounterGroup + private readonly streamsClosed: CounterGroup + private readonly streamsCloseErrors: CounterGroup constructor (components: OpenTelemetryComponents, init?: OpenTelemetryMetricsInit) { this.log = components.logger.forComponent('libp2p:open-telemetry-metrics') @@ -120,6 +123,19 @@ class OpenTelemetryMetrics implements Metrics { } }) + this.streamsOpened = this.registerCounterGroup('libp2p_protocol_streams_opened_total', { + label: 'protocol', + help: 'Total number of protocol streams opened, by direction and protocol' + }) + this.streamsClosed = this.registerCounterGroup('libp2p_protocol_streams_closed_total', { + label: 'protocol', + help: 'Total number of protocol streams closed, by direction and protocol' + }) + this.streamsCloseErrors = this.registerCounterGroup('libp2p_protocol_streams_close_errors_total', { + label: 'protocol', + help: 'Total number of protocol streams that ended with an error (abort, reset, etc.), by direction and protocol' + }) + collectSystemMetrics(this, init) } @@ -188,6 +204,19 @@ class OpenTelemetryMetrics implements Metrics { } this._track(stream, stream.protocol) + + const label = `${stream.direction} ${stream.protocol}` + + this.streamsOpened.increment({ [label]: 1 }) + + stream.addEventListener('close', (evt: Event) => { + const e = evt as StreamCloseEvent + if (e.error != null) { + this.streamsCloseErrors.increment({ [label]: 1 }) + } else { + this.streamsClosed.increment({ [label]: 1 }) + } + }, { once: true }) } registerMetric (name: string, opts: CalculatedMetricOptions): void diff --git a/packages/metrics-opentelemetry/test/track-protocol-stream.spec.ts b/packages/metrics-opentelemetry/test/track-protocol-stream.spec.ts new file mode 100644 index 0000000000..a45ff5cb36 --- /dev/null +++ b/packages/metrics-opentelemetry/test/track-protocol-stream.spec.ts @@ -0,0 +1,129 @@ +import { StreamAbortEvent, TypedEventEmitter } from '@libp2p/interface' +import { defaultLogger } from '@libp2p/logger' +import { metrics as otelApi } from '@opentelemetry/api' +import { + AggregationTemporality, + DataPointType, + InMemoryMetricExporter, + MeterProvider, + PeriodicExportingMetricReader +} from '@opentelemetry/sdk-metrics' +import { expect } from 'aegir/chai' +import { openTelemetryMetrics } from '../src/index.js' +import type { MessageStreamEvents, Stream } from '@libp2p/interface' +import type { ResourceMetrics, ScopeMetrics } from '@opentelemetry/sdk-metrics' + +function sumScopeMetrics (sm: ScopeMetrics, metricName: string, protocolValue: string): number { + let sub = 0 + for (const metric of sm.metrics) { + if (metric.descriptor.name !== metricName || metric.dataPointType !== DataPointType.SUM) { + continue + } + for (const dp of metric.dataPoints) { + if (dp.attributes.protocol === protocolValue) { + sub += dp.value as number + } + } + } + return sub +} + +function sumForProtocol (batches: ResourceMetrics[], metricName: string, protocolValue: string): number { + let total = 0 + for (const batch of batches) { + for (const sm of batch.scopeMetrics) { + total += sumScopeMetrics(sm, metricName, protocolValue) + } + } + return total +} + +describe('opentelemetry protocol stream counters', () => { + let previousProvider: ReturnType + let reader: PeriodicExportingMetricReader + let exporter: InMemoryMetricExporter + let provider: MeterProvider + let meterId: number + + before(() => { + previousProvider = otelApi.getMeterProvider() + exporter = new InMemoryMetricExporter(AggregationTemporality.CUMULATIVE) + reader = new PeriodicExportingMetricReader({ + exporter, + exportIntervalMillis: 3_600_000 + }) + provider = new MeterProvider({ readers: [reader] }) + otelApi.setGlobalMeterProvider(provider) + }) + + after(async () => { + await provider.shutdown() + otelApi.setGlobalMeterProvider(previousProvider) + }) + + beforeEach(() => { + meterId = Date.now() + Math.floor(Math.random() * 1e6) + exporter.reset() + }) + + function makeStream (direction: 'inbound' | 'outbound', protocol: string): Stream { + const target = new TypedEventEmitter() + return { + direction, + protocol, + log: defaultLogger().forComponent('stream'), + addEventListener: target.addEventListener.bind(target), + removeEventListener: target.removeEventListener.bind(target), + dispatchEvent: target.dispatchEvent.bind(target), + send: () => true + } as unknown as Stream + } + + it('increments opened and clean closed counters', async () => { + const metrics = openTelemetryMetrics()({ + nodeInfo: { + name: `otel-ps-test-${meterId}`, + version: '1.0.0', + userAgent: 'test/1.0.0' + }, + logger: defaultLogger() + }) + + const stream = makeStream('outbound', '/identify/1.0.0') + const label = `${stream.direction} ${stream.protocol}` + + metrics.trackProtocolStream(stream) + stream.dispatchEvent(new Event('close')) + + await reader.forceFlush() + const batches = exporter.getMetrics() + + expect(sumForProtocol(batches, 'libp2p_protocol_streams_opened_total', label)).to.equal(1) + expect(sumForProtocol(batches, 'libp2p_protocol_streams_closed_total', label)).to.equal(1) + expect(sumForProtocol(batches, 'libp2p_protocol_streams_close_errors_total', label)).to.equal(0) + }) + + it('increments close-errors counter on StreamAbortEvent', async () => { + const metrics = openTelemetryMetrics()({ + nodeInfo: { + name: `otel-ps-test-${meterId}`, + version: '1.0.0', + userAgent: 'test/1.0.0' + }, + logger: defaultLogger() + }) + + const stream = makeStream('inbound', '/ping/1.0.0') + const label = `${stream.direction} ${stream.protocol}` + + metrics.trackProtocolStream(stream) + stream.dispatchEvent(new StreamAbortEvent(new Error('aborted'))) + + await reader.forceFlush() + const batches = exporter.getMetrics() + + expect(sumForProtocol(batches, 'libp2p_protocol_streams_opened_total', label)).to.equal(1) + expect(sumForProtocol(batches, 'libp2p_protocol_streams_closed_total', label)).to.equal(0) + expect(sumForProtocol(batches, 'libp2p_protocol_streams_close_errors_total', label)).to.equal(1) + }) +}) diff --git a/packages/metrics-prometheus/src/index.ts b/packages/metrics-prometheus/src/index.ts index 171c77d549..314ece4b0f 100644 --- a/packages/metrics-prometheus/src/index.ts +++ b/packages/metrics-prometheus/src/index.ts @@ -78,7 +78,7 @@ import { PrometheusMetricGroup } from './metric-group.js' import { PrometheusMetric } from './metric.js' import { PrometheusSummaryGroup } from './summary-group.js' import { PrometheusSummary } from './summary.js' -import type { ComponentLogger, Logger, MultiaddrConnection, Stream, CalculatedMetricOptions, Counter, CounterGroup, Metric, MetricGroup, MetricOptions, Metrics, CalculatedHistogramOptions, CalculatedSummaryOptions, HistogramOptions, Histogram, HistogramGroup, SummaryOptions, Summary, SummaryGroup, MessageStream } from '@libp2p/interface' +import type { ComponentLogger, Logger, MultiaddrConnection, Stream, StreamCloseEvent, CalculatedMetricOptions, Counter, CounterGroup, Metric, MetricGroup, MetricOptions, Metrics, CalculatedHistogramOptions, CalculatedSummaryOptions, HistogramOptions, Histogram, HistogramGroup, SummaryOptions, Summary, SummaryGroup, MessageStream } from '@libp2p/interface' import type { DefaultMetricsCollectorConfiguration, Registry, RegistryContentType } from 'prom-client' // export helper functions for creating buckets @@ -143,6 +143,7 @@ class PrometheusMetrics implements Metrics { private readonly registry?: Registry private readonly streamsOpened: CounterGroup private readonly streamsClosed: CounterGroup + private readonly streamsCloseErrors: CounterGroup constructor (components: PrometheusMetricsComponents, init?: Partial) { this.log = components.logger.forComponent('libp2p:prometheus-metrics') @@ -217,6 +218,10 @@ class PrometheusMetrics implements Metrics { label: 'protocol', help: 'Total number of protocol streams closed, by direction and protocol' }) + this.streamsCloseErrors = this.registerCounterGroup('libp2p_protocol_streams_close_errors_total', { + label: 'protocol', + help: 'Total number of protocol streams that ended with an error (abort, reset, etc.), by direction and protocol' + }) } readonly [Symbol.toStringTag] = '@libp2p/metrics-prometheus' @@ -294,8 +299,13 @@ class PrometheusMetrics implements Metrics { // Stream is now open — increment the opened counter immediately. this.streamsOpened.increment({ [label]: 1 }) - stream.addEventListener('close', () => { - this.streamsClosed.increment({ [label]: 1 }) + stream.addEventListener('close', (evt: Event) => { + const e = evt as StreamCloseEvent + if (e.error != null) { + this.streamsCloseErrors.increment({ [label]: 1 }) + } else { + this.streamsClosed.increment({ [label]: 1 }) + } }, { once: true }) } diff --git a/packages/metrics-prometheus/test/track-protocol-stream.spec.ts b/packages/metrics-prometheus/test/track-protocol-stream.spec.ts new file mode 100644 index 0000000000..deeabc68f3 --- /dev/null +++ b/packages/metrics-prometheus/test/track-protocol-stream.spec.ts @@ -0,0 +1,51 @@ +import { defaultLogger } from '@libp2p/logger' +import { streamPair } from '@libp2p/utils' +import { expect } from 'aegir/chai' +import { pEvent } from 'p-event' +import client from 'prom-client' +import { prometheusMetrics } from '../src/index.js' + +describe('prometheus protocol stream counters', () => { + it('records opened and clean closed counters', async () => { + const [outbound, inbound] = await streamPair() + + const metrics = prometheusMetrics({ + collectDefaultMetrics: false + })({ + logger: defaultLogger() + }) + + metrics.trackProtocolStream(outbound) + + await Promise.all([ + pEvent(inbound, 'close'), + outbound.close(), + inbound.close() + ]) + + const scraped = await client.register.metrics() + const label = `protocol="${outbound.direction} ${outbound.protocol}"` + + expect(scraped).to.include(`libp2p_protocol_streams_opened_total{${label}} 1`) + expect(scraped).to.include(`libp2p_protocol_streams_closed_total{${label}} 1`) + }) + + it('records close-errors counter when stream aborts', async () => { + const [outbound] = await streamPair() + + const metrics = prometheusMetrics({ + collectDefaultMetrics: false + })({ + logger: defaultLogger() + }) + + metrics.trackProtocolStream(outbound) + outbound.abort(new Error('test abort')) + + const scraped = await client.register.metrics() + const label = `protocol="${outbound.direction} ${outbound.protocol}"` + + expect(scraped).to.include(`libp2p_protocol_streams_opened_total{${label}} 1`) + expect(scraped).to.include(`libp2p_protocol_streams_close_errors_total{${label}} 1`) + }) +}) diff --git a/packages/metrics-simple/src/index.ts b/packages/metrics-simple/src/index.ts index 710072e7c2..89ae1625b2 100644 --- a/packages/metrics-simple/src/index.ts +++ b/packages/metrics-simple/src/index.ts @@ -26,7 +26,7 @@ import { serviceCapabilities } from '@libp2p/interface' import { logger } from '@libp2p/logger' import { TDigest } from 'tdigest' -import type { Startable, MultiaddrConnection, Stream, Metric, MetricGroup, StopTimer, Metrics, CalculatedMetricOptions, MetricOptions, Counter, CounterGroup, CalculateMetric, Histogram, HistogramOptions, HistogramGroup, Summary, SummaryOptions, SummaryGroup, CalculatedHistogramOptions, CalculatedSummaryOptions, ComponentLogger, Logger, MessageStream } from '@libp2p/interface' +import type { Startable, MultiaddrConnection, Stream, StreamCloseEvent, Metric, MetricGroup, StopTimer, Metrics, CalculatedMetricOptions, MetricOptions, Counter, CounterGroup, CalculateMetric, Histogram, HistogramOptions, HistogramGroup, Summary, SummaryOptions, SummaryGroup, CalculatedHistogramOptions, CalculatedSummaryOptions, ComponentLogger, Logger, MessageStream } from '@libp2p/interface' const log = logger('libp2p:simple-metrics') @@ -346,6 +346,7 @@ class SimpleMetrics implements Metrics, Startable { private readonly transferStats: Map private readonly streamsOpened: CounterGroup private readonly streamsClosed: CounterGroup + private readonly streamsCloseErrors: CounterGroup private started: boolean private interval?: ReturnType private readonly intervalMs: number @@ -373,6 +374,11 @@ class SimpleMetrics implements Metrics, Startable { label: 'protocol', help: 'Total number of protocol streams closed, by direction and protocol' }) + + this.streamsCloseErrors = this.registerCounterGroup('libp2p_protocol_streams_close_errors_total', { + label: 'protocol', + help: 'Total number of protocol streams that ended with an error (abort, reset, etc.), by direction and protocol' + }) } readonly [Symbol.toStringTag] = '@libp2p/metrics-simple' @@ -469,8 +475,13 @@ class SimpleMetrics implements Metrics, Startable { this.streamsOpened.increment({ [label]: 1 }) - stream.addEventListener('close', () => { - this.streamsClosed.increment({ [label]: 1 }) + stream.addEventListener('close', (evt: Event) => { + const e = evt as StreamCloseEvent + if (e.error != null) { + this.streamsCloseErrors.increment({ [label]: 1 }) + } else { + this.streamsClosed.increment({ [label]: 1 }) + } }, { once: true }) } diff --git a/packages/metrics-simple/test/track-protocol-stream.spec.ts b/packages/metrics-simple/test/track-protocol-stream.spec.ts index a2e90d74d9..1a98489c98 100644 --- a/packages/metrics-simple/test/track-protocol-stream.spec.ts +++ b/packages/metrics-simple/test/track-protocol-stream.spec.ts @@ -1,4 +1,4 @@ -import { TypedEventEmitter } from '@libp2p/interface' +import { StreamAbortEvent, TypedEventEmitter } from '@libp2p/interface' import { defaultLogger } from '@libp2p/logger' import { expect } from 'aegir/chai' import { stubInterface } from 'sinon-ts' @@ -33,10 +33,11 @@ describe('SimpleMetrics - protocol stream counters', () => { const opened = await metrics.metrics.get('libp2p_protocol_streams_opened_total')?.collect() const closed = await metrics.metrics.get('libp2p_protocol_streams_closed_total')?.collect() + const closeErrors = await metrics.metrics.get('libp2p_protocol_streams_close_errors_total')?.collect() expect(opened).to.deep.include({ 'outbound /identify/1.0.0': 1 }) - // closed counter must NOT have fired yet expect(closed).to.deep.equal({}) + expect(closeErrors).to.deep.equal({}) }) it('increments closed counter on graceful stream close', async () => { @@ -48,9 +49,11 @@ describe('SimpleMetrics - protocol stream counters', () => { const opened = await metrics.metrics.get('libp2p_protocol_streams_opened_total')?.collect() const closed = await metrics.metrics.get('libp2p_protocol_streams_closed_total')?.collect() + const closeErrors = await metrics.metrics.get('libp2p_protocol_streams_close_errors_total')?.collect() expect(opened).to.deep.include({ 'inbound /ping/1.0.0': 1 }) expect(closed).to.deep.include({ 'inbound /ping/1.0.0': 1 }) + expect(closeErrors).to.deep.equal({}) }) it('increments closed counter exactly once because listener is registered with once:true', async () => { @@ -62,7 +65,23 @@ describe('SimpleMetrics - protocol stream counters', () => { stream.dispatchEvent(new Event('close')) const closed = await metrics.metrics.get('libp2p_protocol_streams_closed_total')?.collect() + const closeErrors = await metrics.metrics.get('libp2p_protocol_streams_close_errors_total')?.collect() expect(closed['outbound /test/1.0.0']).to.equal(1) + expect(closeErrors).to.deep.equal({}) + }) + + it('increments close-errors counter when close event carries an error', async () => { + const metrics = makeMetrics() as any + const stream = makeStream('outbound', '/ping/1.0.0') + + metrics.trackProtocolStream(stream) + stream.dispatchEvent(new StreamAbortEvent(new Error('aborted'))) + + const closed = await metrics.metrics.get('libp2p_protocol_streams_closed_total')?.collect() + const closeErrors = await metrics.metrics.get('libp2p_protocol_streams_close_errors_total')?.collect() + + expect(closed).to.deep.equal({}) + expect(closeErrors).to.deep.include({ 'outbound /ping/1.0.0': 1 }) }) it('accumulates counts correctly across multiple streams', async () => { @@ -82,6 +101,7 @@ describe('SimpleMetrics - protocol stream counters', () => { const opened = await metrics.metrics.get('libp2p_protocol_streams_opened_total')?.collect() const closed = await metrics.metrics.get('libp2p_protocol_streams_closed_total')?.collect() + const closeErrors = await metrics.metrics.get('libp2p_protocol_streams_close_errors_total')?.collect() // 2 outbound + 1 inbound opened expect(opened['outbound /identify/1.0.0']).to.equal(2) @@ -90,6 +110,7 @@ describe('SimpleMetrics - protocol stream counters', () => { // 1 outbound + 1 inbound closed (s2 still open) expect(closed['outbound /identify/1.0.0']).to.equal(1) expect(closed['inbound /identify/1.0.0']).to.equal(1) + expect(closeErrors).to.deep.equal({}) }) it('does not track a stream with no protocol', async () => { From f5c70ecc011eaf4a8c4a84b977615482008deb0f Mon Sep 17 00:00:00 2001 From: Neha Kumari Date: Sun, 12 Apr 2026 18:40:41 +0530 Subject: [PATCH 5/5] chore(metrics): cover protocol stream counters (simple, prometheus, OTEL) --- .../test/track-protocol-stream.spec.ts | 4 ++-- .../test/track-protocol-stream.spec.ts | 3 ++- .../test/track-protocol-stream.spec.ts | 12 ++++++------ 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/packages/metrics-opentelemetry/test/track-protocol-stream.spec.ts b/packages/metrics-opentelemetry/test/track-protocol-stream.spec.ts index a45ff5cb36..d9695e841e 100644 --- a/packages/metrics-opentelemetry/test/track-protocol-stream.spec.ts +++ b/packages/metrics-opentelemetry/test/track-protocol-stream.spec.ts @@ -1,4 +1,4 @@ -import { StreamAbortEvent, TypedEventEmitter } from '@libp2p/interface' +import { StreamAbortEvent, TypedEventEmitter, StreamCloseEvent } from '@libp2p/interface' import { defaultLogger } from '@libp2p/logger' import { metrics as otelApi } from '@opentelemetry/api' import { @@ -93,7 +93,7 @@ describe('opentelemetry protocol stream counters', () => { const label = `${stream.direction} ${stream.protocol}` metrics.trackProtocolStream(stream) - stream.dispatchEvent(new Event('close')) + stream.dispatchEvent(new StreamCloseEvent()) await reader.forceFlush() const batches = exporter.getMetrics() diff --git a/packages/metrics-prometheus/test/track-protocol-stream.spec.ts b/packages/metrics-prometheus/test/track-protocol-stream.spec.ts index deeabc68f3..75d7d5baa0 100644 --- a/packages/metrics-prometheus/test/track-protocol-stream.spec.ts +++ b/packages/metrics-prometheus/test/track-protocol-stream.spec.ts @@ -30,7 +30,7 @@ describe('prometheus protocol stream counters', () => { expect(scraped).to.include(`libp2p_protocol_streams_closed_total{${label}} 1`) }) - it('records close-errors counter when stream aborts', async () => { + it('records opened and close-errors but not clean closed when stream aborts', async () => { const [outbound] = await streamPair() const metrics = prometheusMetrics({ @@ -47,5 +47,6 @@ describe('prometheus protocol stream counters', () => { expect(scraped).to.include(`libp2p_protocol_streams_opened_total{${label}} 1`) expect(scraped).to.include(`libp2p_protocol_streams_close_errors_total{${label}} 1`) + expect(scraped).not.to.include(`libp2p_protocol_streams_closed_total{${label}}`) }) }) diff --git a/packages/metrics-simple/test/track-protocol-stream.spec.ts b/packages/metrics-simple/test/track-protocol-stream.spec.ts index 1a98489c98..48b559f97f 100644 --- a/packages/metrics-simple/test/track-protocol-stream.spec.ts +++ b/packages/metrics-simple/test/track-protocol-stream.spec.ts @@ -1,4 +1,4 @@ -import { StreamAbortEvent, TypedEventEmitter } from '@libp2p/interface' +import { StreamAbortEvent, TypedEventEmitter, StreamCloseEvent } from '@libp2p/interface' import { defaultLogger } from '@libp2p/logger' import { expect } from 'aegir/chai' import { stubInterface } from 'sinon-ts' @@ -45,7 +45,7 @@ describe('SimpleMetrics - protocol stream counters', () => { const stream = makeStream('inbound', '/ping/1.0.0') metrics.trackProtocolStream(stream) - stream.dispatchEvent(new Event('close')) + stream.dispatchEvent(new StreamCloseEvent()) const opened = await metrics.metrics.get('libp2p_protocol_streams_opened_total')?.collect() const closed = await metrics.metrics.get('libp2p_protocol_streams_closed_total')?.collect() @@ -61,8 +61,8 @@ describe('SimpleMetrics - protocol stream counters', () => { const stream = makeStream('outbound', '/test/1.0.0') metrics.trackProtocolStream(stream) - stream.dispatchEvent(new Event('close')) - stream.dispatchEvent(new Event('close')) + stream.dispatchEvent(new StreamCloseEvent()) + stream.dispatchEvent(new StreamCloseEvent()) const closed = await metrics.metrics.get('libp2p_protocol_streams_closed_total')?.collect() const closeErrors = await metrics.metrics.get('libp2p_protocol_streams_close_errors_total')?.collect() @@ -96,8 +96,8 @@ describe('SimpleMetrics - protocol stream counters', () => { metrics.trackProtocolStream(s3) // close only s1 and s3 - s1.dispatchEvent(new Event('close')) - s3.dispatchEvent(new Event('close')) + s1.dispatchEvent(new StreamCloseEvent()) + s3.dispatchEvent(new StreamCloseEvent()) const opened = await metrics.metrics.get('libp2p_protocol_streams_opened_total')?.collect() const closed = await metrics.metrics.get('libp2p_protocol_streams_closed_total')?.collect()