6767LOWER_64_BITS = "LOWER_64_BITS"
6868
6969
70- def _dsm_set_checkpoint (context_json , event_type , arn ):
71- if not config .data_streams_enabled :
72- return
73-
74- if not arn :
75- return
76-
77- try :
78- from ddtrace .data_streams import set_consume_checkpoint
79-
80- carrier_get = lambda k : context_json and context_json .get (k ) # noqa: E731
81- set_consume_checkpoint (event_type , arn , carrier_get , manual_checkpoint = False )
82- except Exception as e :
83- logger .debug (
84- f"DSM:Failed to set consume checkpoint for { event_type } { arn } : { e } "
85- )
86-
87-
8870def _convert_xray_trace_id (xray_trace_id ):
8971 """
9072 Convert X-Ray trace id (hex)'s last 63 bits to a Datadog trace id (int).
@@ -234,10 +216,7 @@ def extract_context_from_sqs_or_sns_event_or_context(
234216 Lambda Context.
235217
236218 Falls back to lambda context if no trace data is found in the SQS message attributes.
237- Set a DSM checkpoint if DSM is enabled and the method for context propagation is supported.
238219 """
239- source_arn = ""
240- event_type = "sqs" if event_source .equals (EventTypes .SQS ) else "sns"
241220
242221 # EventBridge => SQS
243222 try :
@@ -249,59 +228,17 @@ def extract_context_from_sqs_or_sns_event_or_context(
249228
250229 try :
251230 first_record = event .get ("Records" )[0 ]
252- source_arn = first_record .get ("eventSourceARN" , "" )
253-
254- # logic to deal with SNS => SQS event
255- if "body" in first_record :
256- body_str = first_record .get ("body" )
257- try :
258- body = json .loads (body_str )
259- if body .get ("Type" , "" ) == "Notification" and "TopicArn" in body :
260- logger .debug ("Found SNS message inside SQS event" )
261- first_record = get_first_record (create_sns_event (body ))
262- except Exception :
263- pass
264-
265- msg_attributes = first_record .get ("messageAttributes" )
266- if msg_attributes is None :
267- sns_record = first_record .get ("Sns" ) or {}
268- # SNS->SQS event would extract SNS arn without this check
269- if event_source .equals (EventTypes .SNS ):
270- source_arn = sns_record .get ("TopicArn" , "" )
271- msg_attributes = sns_record .get ("MessageAttributes" ) or {}
272- dd_payload = msg_attributes .get ("_datadog" )
273- if dd_payload :
274- # SQS uses dataType and binaryValue/stringValue
275- # SNS uses Type and Value
276- dd_json_data = None
277- dd_json_data_type = dd_payload .get ("Type" ) or dd_payload .get ("dataType" )
278- if dd_json_data_type == "Binary" :
279- import base64
280-
281- dd_json_data = dd_payload .get ("binaryValue" ) or dd_payload .get ("Value" )
282- if dd_json_data :
283- dd_json_data = base64 .b64decode (dd_json_data )
284- elif dd_json_data_type == "String" :
285- dd_json_data = dd_payload .get ("stringValue" ) or dd_payload .get ("Value" )
286- else :
287- logger .debug (
288- "Datadog Lambda Python only supports extracting trace"
289- "context from String or Binary SQS/SNS message attributes"
290- )
291-
292- if dd_json_data :
293- dd_data = json .loads (dd_json_data )
231+ dd_data = extract_datadog_context_from_sqs_or_sns_event (first_record )
232+ if dd_data :
233+ if is_step_function_event (dd_data ):
234+ try :
235+ return extract_context_from_step_functions (dd_data , None )
236+ except Exception :
237+ logger .debug (
238+ "Failed to extract Step Functions context from SQS/SNS event."
239+ )
240+ return propagator .extract (dd_data )
294241
295- if is_step_function_event (dd_data ):
296- try :
297- return extract_context_from_step_functions (dd_data , None )
298- except Exception :
299- logger .debug (
300- "Failed to extract Step Functions context from SQS/SNS event."
301- )
302- context = propagator .extract (dd_data )
303- _dsm_set_checkpoint (dd_data , event_type , source_arn )
304- return context
305242 else :
306243 # Handle case where trace context is injected into attributes.AWSTraceHeader
307244 # example: Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1
@@ -324,16 +261,53 @@ def extract_context_from_sqs_or_sns_event_or_context(
324261 span_id = int (x_ray_context ["parent_id" ], 16 ),
325262 sampling_priority = float (x_ray_context ["sampled" ]),
326263 )
327- # Still want to set a DSM checkpoint even if DSM context not propagated
328- _dsm_set_checkpoint (None , event_type , source_arn )
329264 return extract_context_from_lambda_context (lambda_context )
330265 except Exception as e :
331266 logger .debug ("The trace extractor returned with error %s" , e )
332- # Still want to set a DSM checkpoint even if DSM context not propagated
333- _dsm_set_checkpoint (None , event_type , source_arn )
334267 return extract_context_from_lambda_context (lambda_context )
335268
336269
270+ def extract_datadog_context_from_sqs_or_sns_event (record ):
271+ if "body" in record :
272+ body_str = record .get ("body" )
273+ try :
274+ body = json .loads (body_str )
275+ if body .get ("Type" , "" ) == "Notification" and "TopicArn" in body :
276+ logger .debug ("Found SNS message inside SQS event" )
277+ record = get_first_record (create_sns_event (body ))
278+ except Exception :
279+ pass
280+
281+ msg_attributes = record .get ("messageAttributes" )
282+ if msg_attributes is None :
283+ sns_record = record .get ("Sns" ) or {}
284+ msg_attributes = sns_record .get ("MessageAttributes" ) or {}
285+ dd_payload = msg_attributes .get ("_datadog" )
286+ if dd_payload :
287+ # SQS uses dataType and binaryValue/stringValue
288+ # SNS uses Type and Value
289+ dd_json_data = None
290+ dd_json_data_type = dd_payload .get ("Type" ) or dd_payload .get ("dataType" )
291+ if dd_json_data_type == "Binary" :
292+ import base64
293+
294+ dd_json_data = dd_payload .get ("binaryValue" ) or dd_payload .get ("Value" )
295+ if dd_json_data :
296+ dd_json_data = base64 .b64decode (dd_json_data )
297+ elif dd_json_data_type == "String" :
298+ dd_json_data = dd_payload .get ("stringValue" ) or dd_payload .get ("Value" )
299+ else :
300+ logger .debug (
301+ "Datadog Lambda Python only supports extracting trace"
302+ "context from String or Binary SQS/SNS message attributes"
303+ )
304+
305+ if dd_json_data :
306+ dd_data = json .loads (dd_json_data )
307+ return dd_data
308+ return None
309+
310+
337311def _extract_context_from_eventbridge_sqs_event (event ):
338312 """
339313 Extracts Datadog trace context from an SQS event triggered by
@@ -389,35 +363,35 @@ def extract_context_from_eventbridge_event(event, lambda_context):
389363def extract_context_from_kinesis_event (event , lambda_context ):
390364 """
391365 Extract datadog trace context from a Kinesis Stream's base64 encoded data string
392- Set a DSM checkpoint if DSM is enabled and the method for context propagation is supported.
393366 """
394- source_arn = ""
395367 try :
396368 record = get_first_record (event )
397- source_arn = record .get ("eventSourceARN" , "" )
398369 kinesis = record .get ("kinesis" )
399370 if not kinesis :
400371 return extract_context_from_lambda_context (lambda_context )
401- data = kinesis .get ("data" )
402- if data :
403- import base64
404-
405- b64_bytes = data .encode ("ascii" )
406- str_bytes = base64 .b64decode (b64_bytes )
407- data_str = str_bytes .decode ("ascii" )
408- data_obj = json .loads (data_str )
409- dd_ctx = data_obj .get ("_datadog" )
410- if dd_ctx :
411- context = propagator .extract (dd_ctx )
412- _dsm_set_checkpoint (dd_ctx , "kinesis" , source_arn )
413- return context
372+ dd_ctx = extract_datadog_context_from_kinesis_event (kinesis )
373+ if dd_ctx :
374+ return propagator .extract (dd_ctx )
414375 except Exception as e :
415376 logger .debug ("The trace extractor returned with error %s" , e )
416- # Still want to set a DSM checkpoint even if DSM context not propagated
417- _dsm_set_checkpoint (None , "kinesis" , source_arn )
418377 return extract_context_from_lambda_context (lambda_context )
419378
420379
380+ def extract_datadog_context_from_kinesis_event (record_kinesis_field ):
381+ data = record_kinesis_field .get ("data" )
382+ if data :
383+ import base64
384+
385+ b64_bytes = data .encode ("ascii" )
386+ str_bytes = base64 .b64decode (b64_bytes )
387+ data_str = str_bytes .decode ("ascii" )
388+ data_obj = json .loads (data_str )
389+ dd_ctx = data_obj .get ("_datadog" )
390+ if dd_ctx :
391+ return dd_ctx
392+ return None
393+
394+
421395def _deterministic_sha256_hash (s : str , part : str ) -> int :
422396 import hashlib
423397
0 commit comments