Skip to content

Commit 81d0767

Browse files
committed
Use span links for consumer distributed tracing, fix prototype instrumentation
1 parent 23c89bb commit 81d0767

File tree

5 files changed

+171
-148
lines changed

5 files changed

+171
-148
lines changed

dev-packages/browser-integration-tests/suites/integrations/supabase/queues-rpc/init.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ async function performQueueOperations() {
2020
try {
2121
await supabaseClient.rpc('send', {
2222
queue_name: 'todos',
23-
msg: { title: 'Test Todo' },
23+
message: { title: 'Test Todo' },
2424
});
2525

2626
await supabaseClient.rpc('pop', {

dev-packages/browser-integration-tests/suites/integrations/supabase/queues-schema/init.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ async function performQueueOperations() {
2020
try {
2121
await supabaseClient.schema('pgmq_public').rpc('send', {
2222
queue_name: 'todos',
23-
msg: { title: 'Test Todo' },
23+
message: { title: 'Test Todo' },
2424
});
2525

2626
await supabaseClient.schema('pgmq_public').rpc('pop', {

dev-packages/e2e-tests/test-applications/supabase-nextjs/tests/performance.test.ts

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,10 @@ test('Sends queue publish spans with `schema(...).rpc(...)`', async ({ page, bas
271271
const result = await fetch(`${baseURL}/api/queue/producer-schema`);
272272

273273
expect(result.status).toBe(200);
274-
expect(await result.json()).toEqual({ data: [1] });
274+
const responseData = await result.json();
275+
expect(responseData.data).toHaveLength(1);
276+
expect(typeof responseData.data[0]).toBe('number');
277+
const messageId = responseData.data[0];
275278

276279
const transactionEvent = await httpTransactionPromise;
277280

@@ -280,7 +283,7 @@ test('Sends queue publish spans with `schema(...).rpc(...)`', async ({ page, bas
280283
data: {
281284
'messaging.destination.name': 'todos',
282285
'messaging.system': 'supabase',
283-
'messaging.message.id': '1',
286+
'messaging.message.id': String(messageId),
284287
'messaging.operation.type': 'publish',
285288
'messaging.operation.name': 'send',
286289
'messaging.message.body.size': expect.any(Number),
@@ -305,7 +308,7 @@ test('Sends queue publish spans with `schema(...).rpc(...)`', async ({ page, bas
305308
message: 'queue.publish(todos)',
306309
data: {
307310
'messaging.destination.name': 'todos',
308-
'messaging.message.id': '1',
311+
'messaging.message.id': String(messageId),
309312
'messaging.message.body.size': expect.any(Number),
310313
},
311314
});
@@ -323,14 +326,17 @@ test('Sends queue publish spans with `rpc(...)`', async ({ page, baseURL }) => {
323326
const transactionEvent = await httpTransactionPromise;
324327

325328
expect(result.status).toBe(200);
326-
expect(await result.json()).toEqual({ data: [2] });
329+
const responseData = await result.json();
330+
expect(responseData.data).toHaveLength(1);
331+
expect(typeof responseData.data[0]).toBe('number');
332+
const messageId = responseData.data[0];
327333

328334
expect(transactionEvent.spans).toHaveLength(2);
329335
expect(transactionEvent.spans).toContainEqual({
330336
data: {
331337
'messaging.destination.name': 'todos',
332338
'messaging.system': 'supabase',
333-
'messaging.message.id': '2',
339+
'messaging.message.id': String(messageId),
334340
'messaging.operation.type': 'publish',
335341
'messaging.operation.name': 'send',
336342
'messaging.message.body.size': expect.any(Number),
@@ -355,7 +361,7 @@ test('Sends queue publish spans with `rpc(...)`', async ({ page, baseURL }) => {
355361
message: 'queue.publish(todos)',
356362
data: {
357363
'messaging.destination.name': 'todos',
358-
'messaging.message.id': '2',
364+
'messaging.message.id': String(messageId),
359365
'messaging.message.body.size': expect.any(Number),
360366
},
361367
});
@@ -453,6 +459,11 @@ test('Sends queue process spans with `schema(...).rpc(...)`', async ({ page, bas
453459
},
454460
});
455461

462+
// CRITICAL: Verify the link actually points to the producer span from the first request
463+
// This ensures distributed tracing works correctly across separate HTTP transactions
464+
expect(producerLink?.trace_id).toBe(producerSpan?.trace_id);
465+
expect(producerLink?.span_id).toBe(producerSpan?.span_id);
466+
456467
expect(transactionEvent.breadcrumbs).toContainEqual({
457468
timestamp: expect.any(Number),
458469
type: 'supabase',
@@ -558,6 +569,11 @@ test('Sends queue process spans with `rpc(...)`', async ({ page, baseURL }) => {
558569
},
559570
});
560571

572+
// CRITICAL: Verify the link actually points to the producer span from the first request
573+
// This ensures distributed tracing works correctly across separate HTTP transactions
574+
expect(producerLink?.trace_id).toBe(producerSpan?.trace_id);
575+
expect(producerLink?.span_id).toBe(producerSpan?.span_id);
576+
561577
expect(transactionEvent.breadcrumbs).toContainEqual({
562578
timestamp: expect.any(Number),
563579
type: 'supabase',

packages/core/src/integrations/supabase.ts

Lines changed: 64 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
/* eslint-disable max-lines */
55
import { addBreadcrumb } from '../breadcrumbs';
6-
import { getClient, getCurrentScope, withIsolationScope } from '../currentScopes';
6+
import { getClient, getCurrentScope } from '../currentScopes';
77
import { DEBUG_BUILD } from '../debug-build';
88
import { captureException } from '../exports';
99
import { defineIntegration } from '../integration';
@@ -15,7 +15,7 @@ import {
1515
} from '../tracing/dynamicSamplingContext';
1616
import type { IntegrationFn } from '../types-hoist/integration';
1717
import type { Span, SpanAttributes } from '../types-hoist/span';
18-
import { dynamicSamplingContextToSentryBaggageHeader, parseBaggageHeader } from '../utils/baggage';
18+
import { dynamicSamplingContextToSentryBaggageHeader } from '../utils/baggage';
1919
import { debug } from '../utils/debug-logger';
2020
import { isPlainObject } from '../utils/is';
2121
import { addExceptionMechanism } from '../utils/misc';
@@ -27,6 +27,7 @@ export interface SupabaseClientConstructorType {
2727
prototype: {
2828
from: (table: string) => PostgRESTQueryBuilder;
2929
schema: (schema: string) => { rpc: (...args: unknown[]) => Promise<unknown> };
30+
rpc: (...args: unknown[]) => Promise<unknown>;
3031
};
3132
rpc: (fn: string, params: Record<string, unknown>) => Promise<unknown>;
3233
}
@@ -648,7 +649,7 @@ const _instrumentRpcConsumer = (target: unknown, thisArg: unknown, argumentsList
648649
DEBUG_BUILD && debug.log('Consumer RPC call completed', { queueName, hasData: !!res.data });
649650

650651
// Extract trace context from message for distributed tracing
651-
const { sentryTrace, baggage } = _extractTraceAndBaggageFromMessage(res.data?.[0]?.message || {});
652+
const { sentryTrace } = _extractTraceAndBaggageFromMessage(res.data?.[0]?.message || {});
652653

653654
if (Array.isArray(res.data)) {
654655
res.data.forEach(item => {
@@ -658,11 +659,8 @@ const _instrumentRpcConsumer = (target: unknown, thisArg: unknown, argumentsList
658659
});
659660
}
660661

661-
// Extract producer trace context for span link and propagation
662+
// Extract producer trace context for span link
662663
let producerSpanContext: { traceId: string; spanId: string; traceFlags: number } | undefined;
663-
let producerPropagationContext:
664-
| { traceId: string; parentSpanId: string; sampled: boolean; dsc?: Record<string, string> }
665-
| undefined;
666664

667665
if (sentryTrace) {
668666
const traceparentData = extractTraceparentData(sentryTrace);
@@ -676,117 +674,58 @@ const _instrumentRpcConsumer = (target: unknown, thisArg: unknown, argumentsList
676674
spanId: traceparentData.parentSpanId,
677675
traceFlags,
678676
};
679-
680-
// Prepare propagation context for isolated scope
681-
producerPropagationContext = {
682-
traceId: traceparentData.traceId,
683-
parentSpanId: traceparentData.parentSpanId,
684-
sampled: traceparentData.parentSampled ?? false,
685-
dsc: baggage ? parseBaggageHeader(baggage) : undefined,
686-
};
687677
}
688678
}
689679

690680
const runWithSpan = (): SupabaseResponse => {
691-
// If we have producer trace context, use isolated scope to prevent pollution
692-
if (producerPropagationContext) {
693-
return withIsolationScope(isolatedScope => {
694-
// Set producer's propagation context in isolated scope
695-
// This ensures the consumer span continues the producer's trace
696-
isolatedScope.setPropagationContext({
697-
...producerPropagationContext,
698-
sampleRand: Math.random(), // Generate new sample rand for current execution context
699-
});
700-
701-
// Force transaction to make it a root span (not child of current span)
702-
// This is critical to prevent the consumer span from becoming a child of
703-
// an active HTTP request or other unrelated transaction
704-
return startSpan(
705-
{
706-
name: spanName,
707-
op: 'queue.process',
708-
startTime: spanStartTime,
709-
attributes: spanAttributes,
710-
forceTransaction: true, // Makes this a root span, not a child
711-
// Add span link to producer span for distributed tracing across async queue boundary
712-
links: producerSpanContext
713-
? [
714-
{
715-
context: producerSpanContext,
716-
attributes: { 'sentry.link.type': 'queue.producer' },
717-
},
718-
]
719-
: undefined,
720-
},
721-
span => {
722-
try {
723-
const processedResponse = _processConsumerSpan(span, res, queueName);
724-
725-
DEBUG_BUILD && debug.log('Consumer span processed successfully', { queueName });
726-
727-
return processedResponse;
728-
} catch (err: unknown) {
729-
DEBUG_BUILD && debug.log('Consumer span processing failed', { queueName, error: err });
730-
731-
captureException(err, scope => {
732-
scope.addEventProcessor(e => {
733-
addExceptionMechanism(e, {
734-
handled: false,
735-
type: 'auto.db.supabase.queue',
736-
});
737-
return e;
738-
});
739-
scope.setContext('supabase', { queueName });
740-
return scope;
741-
});
742-
743-
span.setStatus({ code: SPAN_STATUS_ERROR });
744-
throw err;
745-
}
746-
},
747-
);
748-
});
749-
// Isolated scope automatically discarded here, original scope restored
750-
} else {
751-
// No producer context, create regular span without isolation
752-
return startSpan(
753-
{
754-
name: spanName,
755-
op: 'queue.process',
756-
startTime: spanStartTime,
757-
attributes: spanAttributes,
758-
},
759-
span => {
760-
try {
761-
const processedResponse = _processConsumerSpan(span, res, queueName);
681+
// Create consumer span as child of current transaction (e.g., HTTP request)
682+
// Add span link to producer span for distributed tracing across async queue boundary
683+
return startSpan(
684+
{
685+
name: spanName,
686+
op: 'queue.process',
687+
startTime: spanStartTime,
688+
attributes: spanAttributes,
689+
// Add span link to producer span for distributed tracing across async queue boundary
690+
links: producerSpanContext
691+
? [
692+
{
693+
context: producerSpanContext,
694+
attributes: { 'sentry.link.type': 'queue.producer' },
695+
},
696+
]
697+
: undefined,
698+
},
699+
span => {
700+
try {
701+
const processedResponse = _processConsumerSpan(span, res, queueName);
762702

763-
DEBUG_BUILD && debug.log('Consumer span processed successfully', { queueName });
703+
DEBUG_BUILD && debug.log('Consumer span processed successfully', { queueName });
764704

765-
return processedResponse;
766-
} catch (err: unknown) {
767-
DEBUG_BUILD && debug.log('Consumer span processing failed', { queueName, error: err });
705+
return processedResponse;
706+
} catch (err: unknown) {
707+
DEBUG_BUILD && debug.log('Consumer span processing failed', { queueName, error: err });
768708

769-
captureException(err, scope => {
770-
scope.addEventProcessor(e => {
771-
addExceptionMechanism(e, {
772-
handled: false,
773-
type: 'auto.db.supabase.queue',
774-
});
775-
return e;
709+
captureException(err, scope => {
710+
scope.addEventProcessor(e => {
711+
addExceptionMechanism(e, {
712+
handled: false,
713+
type: 'auto.db.supabase.queue',
776714
});
777-
scope.setContext('supabase', { queueName });
778-
return scope;
715+
return e;
779716
});
717+
scope.setContext('supabase', { queueName });
718+
return scope;
719+
});
780720

781-
span.setStatus({ code: SPAN_STATUS_ERROR });
782-
throw err;
783-
}
784-
},
785-
);
786-
}
721+
span.setStatus({ code: SPAN_STATUS_ERROR });
722+
throw err;
723+
}
724+
},
725+
);
787726
};
788727

789-
// Create consumer span with isolated scope and forced transaction
728+
// Create consumer span as child of current transaction with span links for distributed tracing
790729
return runWithSpan();
791730
})
792731
.catch((err: unknown) => {
@@ -1026,20 +965,32 @@ function _instrumentRpcProducer(target: unknown, thisArg: unknown, argumentsList
1026965
}
1027966

1028967
/**
1029-
* Instruments direct RPC calls on a Supabase client.
968+
* Instruments direct RPC calls on a Supabase client's constructor prototype.
1030969
* This handles the pattern: `client.rpc('function_name', params)`
1031970
* Uses the shared proxy handler to route queue operations.
1032971
*
1033-
* @param SupabaseClient - The Supabase client instance to instrument
972+
* We instrument the prototype rather than individual instances to ensure consistent
973+
* behavior across all clients sharing the same constructor and to avoid issues with
974+
* Proxy property forwarding affecting the instrumentation marker on the original function.
975+
*
976+
* @param SupabaseClientConstructor - The Supabase client constructor to instrument
1034977
*/
1035-
function _instrumentRpc(SupabaseClient: unknown): void {
1036-
const client = SupabaseClient as SupabaseClientInstance;
978+
function _instrumentRpc(SupabaseClientConstructor: unknown): void {
979+
const prototype = (SupabaseClientConstructor as SupabaseClientConstructorType).prototype;
980+
981+
if (!prototype?.rpc) {
982+
return;
983+
}
1037984

1038-
if (!client.rpc) {
985+
// Prevent double-wrapping if instrumentSupabaseClient is called multiple times
986+
if (_isInstrumented(prototype.rpc)) {
1039987
return;
1040988
}
1041989

1042-
client.rpc = new Proxy(client.rpc, _createRpcProxyHandler());
990+
const wrappedRpc = new Proxy(prototype.rpc, _createRpcProxyHandler());
991+
prototype.rpc = wrappedRpc;
992+
993+
_markAsInstrumented(prototype.rpc);
1043994
}
1044995

1045996
/**
@@ -1454,7 +1405,7 @@ export const instrumentSupabaseClient = (supabaseClient: unknown): void => {
14541405

14551406
_instrumentSupabaseClientConstructor(SupabaseClientConstructor);
14561407
_instrumentRpcReturnedFromSchemaCall(SupabaseClientConstructor);
1457-
_instrumentRpc(supabaseClient as SupabaseClientInstance);
1408+
_instrumentRpc(SupabaseClientConstructor);
14581409
_instrumentSupabaseAuthClient(supabaseClient as SupabaseClientInstance);
14591410
};
14601411

0 commit comments

Comments
 (0)