From fd4ffbc4d8f8e676bac9bdd366677f7589edc81b Mon Sep 17 00:00:00 2001 From: Lukas Stracke Date: Fri, 6 Feb 2026 12:24:28 +0100 Subject: [PATCH 1/3] feat(core): Add `SpanBuffer` implementation --- packages/core/src/index.ts | 3 + .../src/tracing/dynamicSamplingContext.ts | 5 +- .../core/src/tracing/spans/captureSpan.ts | 2 +- packages/core/src/tracing/spans/spanBuffer.ts | 164 +++++++++++ .../test/lib/tracing/spans/spanBuffer.test.ts | 262 ++++++++++++++++++ 5 files changed, 434 insertions(+), 2 deletions(-) create mode 100644 packages/core/src/tracing/spans/spanBuffer.ts create mode 100644 packages/core/test/lib/tracing/spans/spanBuffer.test.ts diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index fd75b251a5ee..d4531a895056 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -174,6 +174,9 @@ export type { GoogleGenAIOptions, GoogleGenAIIstrumentedMethod, } from './tracing/google-genai/types'; + +export { SpanBuffer } from './tracing/spans/spanBuffer'; + export type { FeatureFlag } from './utils/featureFlags'; export { diff --git a/packages/core/src/tracing/dynamicSamplingContext.ts b/packages/core/src/tracing/dynamicSamplingContext.ts index 47d5657a7d87..219d7deddcd7 100644 --- a/packages/core/src/tracing/dynamicSamplingContext.ts +++ b/packages/core/src/tracing/dynamicSamplingContext.ts @@ -6,6 +6,7 @@ import { SEMANTIC_ATTRIBUTE_SENTRY_PREVIOUS_TRACE_SAMPLE_RATE, SEMANTIC_ATTRIBUTE_SENTRY_SAMPLE_RATE, SEMANTIC_ATTRIBUTE_SENTRY_SOURCE, + SEMANTIC_ATTRIBUTE_SENTRY_SPAN_SOURCE, } from '../semanticAttributes'; import type { DynamicSamplingContext } from '../types-hoist/envelope'; import type { Span } from '../types-hoist/span'; @@ -119,7 +120,9 @@ export function getDynamicSamplingContextFromSpan(span: Span): Readonly>; + + private _flushIntervalId: ReturnType | null; + private _client: Client; + private _maxSpanLimit: number; + private _flushInterval: number; + + public constructor(client: Client, options?: SpanBufferOptions) { + this._traceMap = new Map(); + this._client = client; + + const { maxSpanLimit, flushInterval } = options ?? {}; + + this._maxSpanLimit = + maxSpanLimit && maxSpanLimit > 0 && maxSpanLimit <= MAX_SPANS_PER_ENVELOPE + ? maxSpanLimit + : MAX_SPANS_PER_ENVELOPE; + this._flushInterval = flushInterval && flushInterval > 0 ? flushInterval : 5_000; + + this._flushIntervalId = null; + this._debounceFlushInterval(); + + this._client.on('flush', () => { + this.drain(); + }); + } + + /** + * Add a span to the buffer. + */ + public add(spanJSON: SerializedStreamedSpanWithSegmentSpan): void { + const traceId = spanJSON.trace_id; + let traceBucket = this._traceMap.get(traceId); + if (traceBucket) { + traceBucket.add(spanJSON); + } else { + traceBucket = new Set([spanJSON]); + this._traceMap.set(traceId, traceBucket); + } + + if (traceBucket.size >= this._maxSpanLimit) { + this.flush(traceId); + this._debounceFlushInterval(); + } + } + + /** + * Drain and flush all buffered traces. + */ + public drain(): void { + if (!this._traceMap.size) { + return; + } + + DEBUG_BUILD && debug.log(`Flushing span tree map with ${this._traceMap.size} traces`); + + this._traceMap.forEach((_, traceId) => { + this.flush(traceId); + }); + this._debounceFlushInterval(); + } + + /** + * Flush spans of a specific trace. + * In contrast to {@link SpanBuffer.flush}, this method does not flush all traces, but only the one with the given traceId. + */ + public flush(traceId: string): void { + const traceBucket = this._traceMap.get(traceId); + if (!traceBucket) { + return; + } + + if (!traceBucket.size) { + // we should never get here, given we always add a span when we create a new bucket + // and delete the bucket once we flush out the trace + this._traceMap.delete(traceId); + return; + } + + const spans = Array.from(traceBucket); + + const segmentSpan = spans[0]?._segmentSpan; + if (!segmentSpan) { + DEBUG_BUILD && debug.warn('No segment span reference found on span JSON, cannot compute DSC'); + this._traceMap.delete(traceId); + return; + } + + const dsc = getDynamicSamplingContextFromSpan(segmentSpan); + + const cleanedSpans: SerializedStreamedSpan[] = Array.from(traceBucket).map(spanJSON => { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const { _segmentSpan, ...cleanSpanJSON } = spanJSON; + return cleanSpanJSON; + }); + + const envelope = createStreamedSpanEnvelope(cleanedSpans, dsc, this._client); + + DEBUG_BUILD && debug.log(`Sending span envelope for trace ${traceId} with ${cleanedSpans.length} spans`); + + this._client.sendEnvelope(envelope).then(null, reason => { + DEBUG_BUILD && debug.error('Error while sending streamed span envelope:', reason); + }); + + this._traceMap.delete(traceId); + } + + private _debounceFlushInterval(): void { + if (this._flushIntervalId) { + clearInterval(this._flushIntervalId); + } + this._flushIntervalId = safeUnref( + setInterval(() => { + this.drain(); + }, this._flushInterval), + ); + } +} diff --git a/packages/core/test/lib/tracing/spans/spanBuffer.test.ts b/packages/core/test/lib/tracing/spans/spanBuffer.test.ts new file mode 100644 index 000000000000..1b654cd400e6 --- /dev/null +++ b/packages/core/test/lib/tracing/spans/spanBuffer.test.ts @@ -0,0 +1,262 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import type { Client, StreamedSpanEnvelope } from '../../../../src'; +import { SentrySpan, setCurrentClient, SpanBuffer } from '../../../../src'; +import type { SerializedStreamedSpanWithSegmentSpan } from '../../../../src/tracing/spans/captureSpan'; +import { getDefaultTestClientOptions, TestClient } from '../../../mocks/client'; + +describe('SpanBuffer', () => { + let client: TestClient; + let sendEnvelopeSpy: ReturnType; + + let sentEnvelopes: Array = []; + + beforeEach(() => { + vi.useFakeTimers(); + sentEnvelopes = []; + sendEnvelopeSpy = vi.fn().mockImplementation(e => { + sentEnvelopes.push(e); + return Promise.resolve(); + }); + + client = new TestClient( + getDefaultTestClientOptions({ + dsn: 'https://username@domain/123', + tracesSampleRate: 1.0, + }), + ); + client.sendEnvelope = sendEnvelopeSpy; + client.init(); + setCurrentClient(client as Client); + }); + + afterEach(() => { + vi.useRealTimers(); + vi.clearAllMocks(); + }); + + it('flushes all traces on drain()', () => { + const buffer = new SpanBuffer(client); + + const segmentSpan1 = new SentrySpan({ name: 'segment', sampled: true, traceId: 'trace123' }); + const segmentSpan2 = new SentrySpan({ name: 'segment', sampled: true, traceId: 'trace456' }); + + buffer.add({ + trace_id: 'trace123', + span_id: 'span1', + name: 'test span', + start_timestamp: Date.now() / 1000, + end_timestamp: Date.now() / 1000, + status: 'ok', + is_segment: false, + _segmentSpan: segmentSpan1, + }); + + buffer.add({ + trace_id: 'trace456', + span_id: 'span2', + name: 'test span', + start_timestamp: Date.now() / 1000, + end_timestamp: Date.now() / 1000, + status: 'ok', + is_segment: false, + _segmentSpan: segmentSpan2, + }); + + buffer.drain(); + + expect(sendEnvelopeSpy).toHaveBeenCalledTimes(2); + expect(sentEnvelopes).toHaveLength(2); + expect(sentEnvelopes[0]?.[1]?.[0]?.[1]?.items[0]?.trace_id).toBe('trace123'); + expect(sentEnvelopes[1]?.[1]?.[0]?.[1]?.items[0]?.trace_id).toBe('trace456'); + }); + + it('drains on interval', () => { + const buffer = new SpanBuffer(client, { flushInterval: 1000 }); + + const segmentSpan1 = new SentrySpan({ name: 'segment', sampled: true }); + const span1 = { + trace_id: 'trace123', + span_id: 'span1', + name: 'test span', + start_timestamp: Date.now() / 1000, + end_timestamp: Date.now() / 1000, + status: 'ok', + is_segment: false, + _segmentSpan: segmentSpan1, + }; + + const segmentSpan2 = new SentrySpan({ name: 'segment2', sampled: true }); + const span2 = { + trace_id: 'trace123', + span_id: 'span2', + name: 'test span', + start_timestamp: Date.now() / 1000, + end_timestamp: Date.now() / 1000, + status: 'ok', + is_segment: false, + _segmentSpan: segmentSpan2, + }; + + buffer.add(span1 as SerializedStreamedSpanWithSegmentSpan); + buffer.add(span2 as SerializedStreamedSpanWithSegmentSpan); + + expect(sendEnvelopeSpy).not.toHaveBeenCalled(); + + vi.advanceTimersByTime(1000); + + expect(sendEnvelopeSpy).toHaveBeenCalledTimes(1); + + // since the buffer is now empty, it should not send anything anymore + vi.advanceTimersByTime(1000); + expect(sendEnvelopeSpy).toHaveBeenCalledTimes(1); + }); + + it('flushes when maxSpanLimit is reached', () => { + const buffer = new SpanBuffer(client, { maxSpanLimit: 2 }); + + const segmentSpan = new SentrySpan({ name: 'segment', sampled: true }); + + buffer.add({ + trace_id: 'trace123', + span_id: 'span1', + name: 'test span 1', + start_timestamp: Date.now() / 1000, + end_timestamp: Date.now() / 1000, + status: 'ok', + is_segment: false, + _segmentSpan: segmentSpan, + }); + + expect(sendEnvelopeSpy).not.toHaveBeenCalled(); + + buffer.add({ + trace_id: 'trace123', + span_id: 'span2', + name: 'test span 2', + start_timestamp: Date.now() / 1000, + end_timestamp: Date.now() / 1000, + status: 'ok', + is_segment: false, + _segmentSpan: segmentSpan, + }); + + expect(sendEnvelopeSpy).toHaveBeenCalledTimes(1); + + buffer.add({ + trace_id: 'trace123', + span_id: 'span3', + name: 'test span 3', + start_timestamp: Date.now() / 1000, + end_timestamp: Date.now() / 1000, + status: 'ok', + is_segment: false, + _segmentSpan: segmentSpan, + }); + + // we added another span after flushing but neither limit nor time interval should have been reached + expect(sendEnvelopeSpy).toHaveBeenCalledTimes(1); + + // draining will flush out the remaining span + buffer.drain(); + expect(sendEnvelopeSpy).toHaveBeenCalledTimes(2); + }); + + it('flushes on client flush event', () => { + const buffer = new SpanBuffer(client); + + const segmentSpan = new SentrySpan({ name: 'segment', sampled: true }); + + buffer.add({ + trace_id: 'trace123', + span_id: 'span1', + name: 'test span', + start_timestamp: Date.now() / 1000, + end_timestamp: Date.now() / 1000, + status: 'ok', + is_segment: false, + _segmentSpan: segmentSpan, + }); + + expect(sendEnvelopeSpy).not.toHaveBeenCalled(); + + client.emit('flush'); + + expect(sendEnvelopeSpy).toHaveBeenCalledTimes(1); + }); + + it('groups spans by traceId', () => { + const buffer = new SpanBuffer(client); + + const segmentSpan1 = new SentrySpan({ name: 'segment1', sampled: true }); + const segmentSpan2 = new SentrySpan({ name: 'segment2', sampled: true }); + + buffer.add({ + trace_id: 'trace1', + span_id: 'span1', + name: 'test span 1', + start_timestamp: Date.now() / 1000, + end_timestamp: Date.now() / 1000, + status: 'ok', + is_segment: false, + _segmentSpan: segmentSpan1, + }); + + buffer.add({ + trace_id: 'trace2', + span_id: 'span2', + name: 'test span 2', + start_timestamp: Date.now() / 1000, + end_timestamp: Date.now() / 1000, + status: 'ok', + is_segment: false, + _segmentSpan: segmentSpan2, + }); + + buffer.drain(); + + // Should send 2 envelopes, one for each trace + expect(sendEnvelopeSpy).toHaveBeenCalledTimes(2); + }); + + it('flushes a specific trace on flush(traceId)', () => { + const buffer = new SpanBuffer(client); + + const segmentSpan1 = new SentrySpan({ name: 'segment1', sampled: true }); + const segmentSpan2 = new SentrySpan({ name: 'segment2', sampled: true }); + + buffer.add({ + trace_id: 'trace1', + span_id: 'span1', + name: 'test span 1', + start_timestamp: Date.now() / 1000, + end_timestamp: Date.now() / 1000, + status: 'ok', + is_segment: false, + _segmentSpan: segmentSpan1, + }); + + buffer.add({ + trace_id: 'trace2', + span_id: 'span2', + name: 'test span 2', + start_timestamp: Date.now() / 1000, + end_timestamp: Date.now() / 1000, + status: 'ok', + is_segment: false, + _segmentSpan: segmentSpan2, + }); + + buffer.flush('trace1'); + + expect(sendEnvelopeSpy).toHaveBeenCalledTimes(1); + expect(sentEnvelopes[0]?.[1]?.[0]?.[1]?.items[0]?.trace_id).toBe('trace1'); + }); + + it('handles flushing a non-existing trace', () => { + const buffer = new SpanBuffer(client); + + buffer.flush('trace1'); + + expect(sendEnvelopeSpy).not.toHaveBeenCalled(); + }); +}); From 471048de869fad44b553ef20586159ec5679664d Mon Sep 17 00:00:00 2001 From: Lukas Stracke Date: Fri, 6 Feb 2026 14:37:59 +0100 Subject: [PATCH 2/3] adjust sentry.span.source to changes --- packages/core/src/tracing/dynamicSamplingContext.ts | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/packages/core/src/tracing/dynamicSamplingContext.ts b/packages/core/src/tracing/dynamicSamplingContext.ts index 219d7deddcd7..7cf79e53d07b 100644 --- a/packages/core/src/tracing/dynamicSamplingContext.ts +++ b/packages/core/src/tracing/dynamicSamplingContext.ts @@ -6,7 +6,6 @@ import { SEMANTIC_ATTRIBUTE_SENTRY_PREVIOUS_TRACE_SAMPLE_RATE, SEMANTIC_ATTRIBUTE_SENTRY_SAMPLE_RATE, SEMANTIC_ATTRIBUTE_SENTRY_SOURCE, - SEMANTIC_ATTRIBUTE_SENTRY_SPAN_SOURCE, } from '../semanticAttributes'; import type { DynamicSamplingContext } from '../types-hoist/envelope'; import type { Span } from '../types-hoist/span'; @@ -120,9 +119,8 @@ export function getDynamicSamplingContextFromSpan(span: Span): Readonly Date: Fri, 6 Feb 2026 16:33:40 +0100 Subject: [PATCH 3/3] listen to 'close' --- packages/core/src/tracing/spans/spanBuffer.ts | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/packages/core/src/tracing/spans/spanBuffer.ts b/packages/core/src/tracing/spans/spanBuffer.ts index 9285fe8a8403..761ba076e4d2 100644 --- a/packages/core/src/tracing/spans/spanBuffer.ts +++ b/packages/core/src/tracing/spans/spanBuffer.ts @@ -69,6 +69,15 @@ export class SpanBuffer { this._client.on('flush', () => { this.drain(); }); + + this._client.on('close', () => { + // No need to drain the buffer here as `Client.close()` internally already calls `Client.flush()` + // which already invokes the `flush` hook and thus drains the buffer. + if (this._flushIntervalId) { + clearInterval(this._flushIntervalId); + } + this._traceMap.clear(); + }); } /** @@ -134,7 +143,7 @@ export class SpanBuffer { const dsc = getDynamicSamplingContextFromSpan(segmentSpan); - const cleanedSpans: SerializedStreamedSpan[] = Array.from(traceBucket).map(spanJSON => { + const cleanedSpans: SerializedStreamedSpan[] = spans.map(spanJSON => { // eslint-disable-next-line @typescript-eslint/no-unused-vars const { _segmentSpan, ...cleanSpanJSON } = spanJSON; return cleanSpanJSON;