@@ -7,7 +7,11 @@ import { getClient, getCurrentScope } from '../currentScopes';
77import { DEBUG_BUILD } from '../debug-build' ;
88import { captureException } from '../exports' ;
99import { defineIntegration } from '../integration' ;
10- import { SEMANTIC_ATTRIBUTE_SENTRY_OP , SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN } from '../semanticAttributes' ;
10+ import {
11+ SEMANTIC_ATTRIBUTE_SENTRY_OP ,
12+ SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN ,
13+ SEMANTIC_ATTRIBUTE_SENTRY_SOURCE ,
14+ } from '../semanticAttributes' ;
1115import {
1216 setHttpStatus ,
1317 SPAN_STATUS_ERROR ,
@@ -27,7 +31,6 @@ import { debug } from '../utils/debug-logger';
2731import { isPlainObject } from '../utils/is' ;
2832import { addExceptionMechanism } from '../utils/misc' ;
2933import { spanToTraceContext , spanToTraceHeader } from '../utils/spanUtils' ;
30- import { timestampInSeconds } from '../utils/time' ;
3134import { extractTraceparentData } from '../utils/tracing' ;
3235
3336export interface SupabaseClientConstructorType {
@@ -714,6 +717,10 @@ function _processConsumerSpan(span: Span, res: SupabaseResponse, queueName: stri
714717 const messageId = _extractMessageIds ( data ) ;
715718
716719 // Set span attributes
720+ // Set batch message count for OTel semantic conventions compliance
721+ // This is set for all consumer operations since pop() returns an array (batch-oriented)
722+ span . setAttribute ( 'messaging.batch.message_count' , data . length ) ;
723+
717724 if ( messageId ) {
718725 span . setAttribute ( 'messaging.message.id' , messageId ) ;
719726 }
@@ -793,161 +800,129 @@ const _instrumentRpcConsumer = (target: unknown, thisArg: unknown, argumentsList
793800 } ) ;
794801
795802 const spanName = `process ${ queueName || 'unknown' } ` ;
803+ // Use the Cloudflare pattern for queue spans:
804+ // - op: 'db.queue' makes the span a valid transaction when no parent exists
805+ // - SEMANTIC_ATTRIBUTE_SENTRY_OP: 'queue.process' ensures Queue Insights recognition
806+ // - SEMANTIC_ATTRIBUTE_SENTRY_SOURCE: 'task' marks it as a task source
807+ // This pattern allows queue spans to work both as child spans (when inside a transaction)
808+ // and as root spans (when called outside a transaction context).
796809 const spanAttributes = {
797810 [ SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN ] : 'auto.db.supabase.queue.consumer' ,
798811 [ SEMANTIC_ATTRIBUTE_SENTRY_OP ] : 'queue.process' ,
812+ [ SEMANTIC_ATTRIBUTE_SENTRY_SOURCE ] : 'task' ,
799813 'messaging.system' : 'supabase' ,
800814 'messaging.destination.name' : queueName ,
801815 'messaging.operation.name' : operationName ,
802816 'messaging.operation.type' : 'process' ,
803817 } as const ;
804- const spanStartTime = timestampInSeconds ( ) ;
805-
806- const rpcPromise = Reflect . apply (
807- target as ( ...args : unknown [ ] ) => Promise < unknown > ,
808- thisArg ,
809- argumentsList ,
810- ) as Promise < SupabaseResponse > ;
811-
812- return rpcPromise
813- . then ( res => {
814- DEBUG_BUILD && debug . log ( 'Consumer RPC call completed' , { queueName, hasData : ! ! res . data } ) ;
815-
816- // Skip span creation for empty/null responses when there's no error - no messages to process
817- // Still create span if there's an error to capture the failure
818- if ( ( ! res . data || ( Array . isArray ( res . data ) && res . data . length === 0 ) ) && ! res . error ) {
819- DEBUG_BUILD && debug . log ( 'Skipping consumer span for empty response' , { queueName } ) ;
820- return res ;
821- }
822818
823- // Extract trace context from message for distributed tracing
824- const { sentryTrace } = _extractTraceAndBaggageFromMessage ( res . data ?. [ 0 ] ?. message || { } ) ;
825-
826- // Clean up _sentry metadata from messages before returning to user
827- // Use immutable updates to avoid mutating the original response data
828- if ( Array . isArray ( res . data ) ) {
829- const hasMetadata = res . data . some (
830- item =>
831- item &&
832- typeof item === 'object' &&
833- item . message &&
834- typeof item . message === 'object' &&
835- '_sentry' in item . message ,
836- ) ;
837-
838- if ( hasMetadata ) {
839- res . data = res . data . map ( item => {
840- if ( item && typeof item === 'object' && item . message && typeof item . message === 'object' ) {
841- const messageCopy = { ...( item . message as Record < string , unknown > ) } ;
842- delete messageCopy . _sentry ;
843- return { ...item , message : messageCopy } ;
844- }
845- return item ;
846- } ) ;
847- }
848- }
849-
850- // Extract producer trace context for span link
851- let producerSpanContext : { traceId : string ; spanId : string ; traceFlags : number } | undefined ;
819+ // Wrap the entire RPC call with startSpan to ensure the span is created BEFORE the async operation.
820+ // This matches the producer pattern and ensures the span is properly attached to the current
821+ // transaction context. Previously, the span was created inside .then() which could cause
822+ // the span to be orphaned if the transaction ended before the callback executed.
823+ return startSpan (
824+ {
825+ name : spanName ,
826+ op : 'db.queue' ,
827+ attributes : spanAttributes ,
828+ } ,
829+ span => {
830+ const rpcPromise = Reflect . apply (
831+ target as ( ...args : unknown [ ] ) => Promise < unknown > ,
832+ thisArg ,
833+ argumentsList ,
834+ ) as Promise < SupabaseResponse > ;
852835
853- if ( sentryTrace ) {
854- const traceparentData = extractTraceparentData ( sentryTrace ) ;
855- if ( traceparentData ?. traceId && traceparentData ?. parentSpanId ) {
856- // Convert parentSampled boolean to traceFlags (W3C trace context spec)
857- // traceFlags bit 0 (LSB) = sampled flag: 1 if sampled, 0 if not sampled
858- const traceFlags = traceparentData . parentSampled ? 1 : 0 ;
836+ return rpcPromise . then (
837+ ( res : SupabaseResponse ) => {
838+ DEBUG_BUILD && debug . log ( 'Consumer RPC call completed' , { queueName, hasData : ! ! res . data } ) ;
839+
840+ // Handle empty responses - set span status but don't skip span creation
841+ // The span is already created, we just mark it appropriately
842+ if ( ( ! res . data || ( Array . isArray ( res . data ) && res . data . length === 0 ) ) && ! res . error ) {
843+ DEBUG_BUILD && debug . log ( 'Consumer received empty response' , { queueName } ) ;
844+ span . setStatus ( { code : SPAN_STATUS_OK } ) ;
845+ span . setAttribute ( 'messaging.batch.message_count' , 0 ) ;
846+ return res ;
847+ }
859848
860- producerSpanContext = {
861- traceId : traceparentData . traceId ,
862- spanId : traceparentData . parentSpanId ,
863- traceFlags,
864- } ;
865- }
866- }
849+ // Extract trace context from message for distributed tracing (before cleanup)
850+ const { sentryTrace } = _extractTraceAndBaggageFromMessage ( res . data ?. [ 0 ] ?. message || { } ) ;
851+
852+ // Clean up _sentry metadata from messages before returning to user
853+ // Use immutable updates to avoid mutating the original response data
854+ if ( Array . isArray ( res . data ) ) {
855+ const hasMetadata = res . data . some (
856+ item =>
857+ item &&
858+ typeof item === 'object' &&
859+ item . message &&
860+ typeof item . message === 'object' &&
861+ '_sentry' in item . message ,
862+ ) ;
867863
868- const runWithSpan = ( ) : SupabaseResponse => {
869- // Create consumer span as child of current transaction (e.g., HTTP request)
870- // Add span link to producer span for distributed tracing across async queue boundary
871- return startSpan (
872- {
873- name : spanName ,
874- op : 'queue.process' ,
875- startTime : spanStartTime ,
876- attributes : spanAttributes ,
877- // Add span link to producer span for distributed tracing across async queue boundary
878- links : producerSpanContext
879- ? [
880- {
881- context : producerSpanContext ,
882- attributes : { 'sentry.link.type' : 'queue.producer' } ,
883- } ,
884- ]
885- : undefined ,
886- } ,
887- span => {
888- try {
889- const processedResponse = _processConsumerSpan ( span , res , queueName ) ;
864+ if ( hasMetadata ) {
865+ res . data = res . data . map ( item => {
866+ if ( item && typeof item === 'object' && item . message && typeof item . message === 'object' ) {
867+ const messageCopy = { ...( item . message as Record < string , unknown > ) } ;
868+ delete messageCopy . _sentry ;
869+ return { ...item , message : messageCopy } ;
870+ }
871+ return item ;
872+ } ) ;
873+ }
874+ }
890875
891- DEBUG_BUILD && debug . log ( 'Consumer span processed successfully' , { queueName } ) ;
876+ // Extract producer trace context and add span link for distributed tracing
877+ if ( sentryTrace ) {
878+ const traceparentData = extractTraceparentData ( sentryTrace ) ;
879+ if ( traceparentData ?. traceId && traceparentData ?. parentSpanId ) {
880+ // Convert parentSampled boolean to traceFlags (W3C trace context spec)
881+ // traceFlags bit 0 (LSB) = sampled flag: 1 if sampled, 0 if not sampled
882+ const traceFlags = traceparentData . parentSampled ? 1 : 0 ;
883+
884+ span . addLink ( {
885+ context : {
886+ traceId : traceparentData . traceId ,
887+ spanId : traceparentData . parentSpanId ,
888+ traceFlags,
889+ } ,
890+ attributes : { 'sentry.link.type' : 'queue.producer' } ,
891+ } ) ;
892+ }
893+ }
892894
893- return processedResponse ;
894- } catch ( err : unknown ) {
895- // Handle span processing errors without re-throwing.
896- // The outer .catch() is for RPC promise rejections only.
897- // Re-throwing here would cause duplicate spans and duplicate captureException calls
898- // because the .then().catch() pattern catches errors from both promise rejections
899- // and errors thrown within the .then() callback.
900- DEBUG_BUILD && debug . log ( 'Consumer span processing failed' , { queueName, error : err } ) ;
895+ // Process the span with response data
896+ try {
897+ const processedResponse = _processConsumerSpan ( span , res , queueName ) ;
898+ DEBUG_BUILD && debug . log ( 'Consumer span processed successfully' , { queueName } ) ;
899+ return processedResponse ;
900+ } catch ( err : unknown ) {
901+ DEBUG_BUILD && debug . log ( 'Consumer span processing failed' , { queueName, error : err } ) ;
901902
902- captureException ( err , scope => {
903- scope . addEventProcessor ( e => {
904- addExceptionMechanism ( e , {
905- handled : false ,
906- type : 'auto.db.supabase.queue' ,
907- } ) ;
908- return e ;
903+ captureException ( err , scope => {
904+ scope . addEventProcessor ( e => {
905+ addExceptionMechanism ( e , {
906+ handled : false ,
907+ type : 'auto.db.supabase.queue' ,
909908 } ) ;
910- scope . setContext ( 'supabase' , { queueName } ) ;
911- return scope ;
909+ return e ;
912910 } ) ;
911+ scope . setContext ( 'supabase' , { queueName } ) ;
912+ return scope ;
913+ } ) ;
913914
914- span . setStatus ( { code : SPAN_STATUS_ERROR } ) ;
915-
916- // Return the original response since the RPC call itself succeeded.
917- // Only span processing failed, which we've already captured.
918- return res ;
919- }
920- } ,
921- ) ;
922- } ;
923-
924- // Create consumer span as child of current transaction with span links for distributed tracing
925- return runWithSpan ( ) ;
926- } )
927- . catch ( ( err : unknown ) => {
928- DEBUG_BUILD && debug . log ( 'Consumer RPC call failed' , { queueName, error : err } ) ;
929-
930- return startSpan (
931- {
932- name : spanName ,
933- op : 'queue.process' ,
934- startTime : spanStartTime ,
935- attributes : spanAttributes ,
936- } ,
937- span => {
938- if ( queueName ) {
939- span . setAttribute ( 'messaging.destination.name' , queueName ) ;
915+ span . setStatus ( { code : SPAN_STATUS_ERROR } ) ;
916+ return res ;
940917 }
918+ } ,
919+ ( err : unknown ) => {
920+ // Handle RPC promise rejection
921+ DEBUG_BUILD && debug . log ( 'Consumer RPC call failed' , { queueName, error : err } ) ;
941922
942- const breadcrumbData : Record < string , unknown > = { } ;
943- if ( queueName ) {
944- breadcrumbData [ 'messaging.destination.name' ] = queueName ;
945- }
946- _createQueueBreadcrumb (
947- 'queue.process' ,
948- queueName ,
949- Object . keys ( breadcrumbData ) . length ? breadcrumbData : undefined ,
950- ) ;
923+ _createQueueBreadcrumb ( 'queue.process' , queueName , {
924+ 'messaging.destination.name' : queueName ,
925+ } ) ;
951926
952927 captureException ( err , scope => {
953928 scope . addEventProcessor ( e => {
@@ -965,7 +940,8 @@ const _instrumentRpcConsumer = (target: unknown, thisArg: unknown, argumentsList
965940 throw err ;
966941 } ,
967942 ) ;
968- } ) ;
943+ } ,
944+ ) ;
969945} ;
970946
971947/**
@@ -1008,13 +984,20 @@ function _instrumentRpcProducer(target: unknown, thisArg: unknown, argumentsList
1008984 // Calculate message body size upfront for initial span attributes
1009985 const messageBodySize = _calculateMessageBodySize ( queueParams ?. message || queueParams ?. messages ) ;
1010986
987+ // Use the Cloudflare pattern for queue spans:
988+ // - op: 'db.queue' makes the span a valid transaction when no parent exists
989+ // - SEMANTIC_ATTRIBUTE_SENTRY_OP: 'queue.publish' ensures Queue Insights recognition
990+ // - SEMANTIC_ATTRIBUTE_SENTRY_SOURCE: 'task' marks it as a task source
991+ // This pattern allows queue spans to work both as child spans (when inside a transaction)
992+ // and as root spans (when called outside a transaction context).
1011993 return startSpan (
1012994 {
1013995 name : `publish ${ queueName || 'unknown' } ` ,
1014- op : 'queue.publish ' ,
996+ op : 'db.queue ' ,
1015997 attributes : {
1016998 [ SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN ] : 'auto.db.supabase.queue.producer' ,
1017999 [ SEMANTIC_ATTRIBUTE_SENTRY_OP ] : 'queue.publish' ,
1000+ [ SEMANTIC_ATTRIBUTE_SENTRY_SOURCE ] : 'task' ,
10181001 'messaging.system' : 'supabase' ,
10191002 'messaging.destination.name' : queueName ,
10201003 'messaging.operation.name' : operationName ,
0 commit comments