Skip to content

Commit ddbd34f

Browse files
committed
celery
1 parent c1271c1 commit ddbd34f

1 file changed

Lines changed: 123 additions & 55 deletions

File tree

sentry_sdk/integrations/celery/__init__.py

Lines changed: 123 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@
1414
)
1515
from sentry_sdk.integrations.celery.utils import _now_seconds_since_epoch
1616
from sentry_sdk.integrations.logging import ignore_logger
17+
from sentry_sdk.traces import StreamedSpan
1718
from sentry_sdk.tracing import BAGGAGE_HEADER_NAME, TransactionSource
18-
from sentry_sdk.tracing_utils import Baggage
19+
from sentry_sdk.tracing_utils import Baggage, has_span_streaming_enabled
1920
from sentry_sdk.utils import (
2021
capture_internal_exceptions,
2122
ensure_integration_enabled,
@@ -247,7 +248,8 @@ def _wrap_task_run(f: "F") -> "F":
247248
def apply_async(*args: "Any", **kwargs: "Any") -> "Any":
248249
# Note: kwargs can contain headers=None, so no setdefault!
249250
# Unsure which backend though.
250-
integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
251+
client = sentry_sdk.get_client()
252+
integration = client.get_integration(CeleryIntegration)
251253
if integration is None:
252254
return f(*args, **kwargs)
253255

@@ -266,17 +268,23 @@ def apply_async(*args: "Any", **kwargs: "Any") -> "Any":
266268
else:
267269
task_name = "<unknown Celery task>"
268270

271+
span_streaming = has_span_streaming_enabled(client.options)
272+
269273
task_started_from_beat = sentry_sdk.get_isolation_scope()._name == "celery-beat"
270274

271-
span_mgr: "Union[Span, NoOpMgr]" = (
272-
sentry_sdk.start_span(
273-
op=OP.QUEUE_SUBMIT_CELERY,
274-
name=task_name,
275-
origin=CeleryIntegration.origin,
276-
)
277-
if not task_started_from_beat
278-
else NoOpMgr()
279-
)
275+
span_mgr: "Union[StreamedSpan, Span, NoOpMgr]" = NoOpMgr()
276+
if span_streaming:
277+
if not task_started_from_beat:
278+
span_mgr = sentry_sdk.traces.start_span(name=task_name)
279+
span_mgr.set_op(OP.QUEUE_SUBMIT_CELERY)
280+
span_mgr.set_origin(CeleryIntegration.origin)
281+
else:
282+
if not task_started_from_beat:
283+
span_mgr = sentry_sdk.start_span(
284+
op=OP.QUEUE_SUBMIT_CELERY,
285+
name=task_name,
286+
origin=CeleryIntegration.origin,
287+
)
280288

281289
with span_mgr as span:
282290
kwargs["headers"] = _update_celery_task_headers(
@@ -295,8 +303,13 @@ def _wrap_tracer(task: "Any", f: "F") -> "F":
295303
# Also because in Celery 3, signal dispatch returns early if one handler
296304
# crashes.
297305
@wraps(f)
298-
@ensure_integration_enabled(CeleryIntegration, f)
299306
def _inner(*args: "Any", **kwargs: "Any") -> "Any":
307+
client = sentry_sdk.get_client()
308+
if client.get_integration(CeleryIntegration) is None:
309+
return f(*args, **kwargs)
310+
311+
span_streaming = has_span_streaming_enabled(client.options)
312+
300313
with isolation_scope() as scope:
301314
scope._name = "celery"
302315
scope.clear_breadcrumbs()
@@ -308,37 +321,53 @@ def _inner(*args: "Any", **kwargs: "Any") -> "Any":
308321
# something such as attribute access can fail.
309322
with capture_internal_exceptions():
310323
headers = args[3].get("headers") or {}
311-
transaction = continue_trace(
312-
headers,
313-
op=OP.QUEUE_TASK_CELERY,
314-
name="unknown celery task",
315-
source=TransactionSource.TASK,
316-
origin=CeleryIntegration.origin,
317-
)
318-
transaction.name = task.name
319-
transaction.set_status(SPANSTATUS.OK)
324+
if span_streaming:
325+
sentry_sdk.traces.continue_trace(headers)
326+
transaction = sentry_sdk.traces.start_span(
327+
name="unknown celery task"
328+
)
329+
transaction.set_origin(CeleryIntegration.origin)
330+
transaction.set_source(TransactionSource.TASK)
331+
transaction.set_op(OP.QUEUE_TASK_CELERY)
332+
333+
span_ctx = transaction
334+
335+
else:
336+
transaction = continue_trace(
337+
headers,
338+
op=OP.QUEUE_TASK_CELERY,
339+
name="unknown celery task",
340+
source=TransactionSource.TASK,
341+
origin=CeleryIntegration.origin,
342+
)
343+
transaction.name = task.name
344+
transaction.set_status(SPANSTATUS.OK)
345+
346+
span_ctx = sentry_sdk.start_transaction(
347+
transaction,
348+
custom_sampling_context={
349+
"celery_job": {
350+
"task": task.name,
351+
# for some reason, args[1] is a list if non-empty but a
352+
# tuple if empty
353+
"args": list(args[1]),
354+
"kwargs": args[2],
355+
}
356+
},
357+
)
320358

321359
if transaction is None:
322360
return f(*args, **kwargs)
323361

324-
with sentry_sdk.start_transaction(
325-
transaction,
326-
custom_sampling_context={
327-
"celery_job": {
328-
"task": task.name,
329-
# for some reason, args[1] is a list if non-empty but a
330-
# tuple if empty
331-
"args": list(args[1]),
332-
"kwargs": args[2],
333-
}
334-
},
335-
):
362+
with span_ctx:
336363
return f(*args, **kwargs)
337364

338365
return _inner # type: ignore
339366

340367

341-
def _set_messaging_destination_name(task: "Any", span: "Span") -> None:
368+
def _set_messaging_destination_name(
369+
task: "Any", span: "Union[StreamedSpan, Span]"
370+
) -> None:
342371
"""Set "messaging.destination.name" tag for span"""
343372
with capture_internal_exceptions():
344373
delivery_info = task.request.delivery_info
@@ -347,7 +376,10 @@ def _set_messaging_destination_name(task: "Any", span: "Span") -> None:
347376
if delivery_info.get("exchange") == "" and routing_key is not None:
348377
# Empty exchange indicates the default exchange, meaning the tasks
349378
# are sent to the queue with the same name as the routing key.
350-
span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key)
379+
if isinstance(span, StreamedSpan):
380+
span.set_attribute(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key)
381+
else:
382+
span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key)
351383

352384

353385
def _wrap_task_call(task: "Any", f: "F") -> "F":
@@ -359,14 +391,32 @@ def _wrap_task_call(task: "Any", f: "F") -> "F":
359391
# but if we ever remove the @ensure_integration_enabled decorator, we need
360392
# to add @functools.wraps(f) here.
361393
# https://github.com/getsentry/sentry-python/issues/421
362-
@ensure_integration_enabled(CeleryIntegration, f)
363394
def _inner(*args: "Any", **kwargs: "Any") -> "Any":
395+
client = sentry_sdk.get_client()
396+
if client.get_integration(CeleryIntegration) is None:
397+
return f(*args, **kwargs)
398+
399+
span_streaming = has_span_streaming_enabled(client.options)
400+
364401
try:
365-
with sentry_sdk.start_span(
366-
op=OP.QUEUE_PROCESS,
367-
name=task.name,
368-
origin=CeleryIntegration.origin,
369-
) as span:
402+
span: "Union[Span, StreamedSpan]"
403+
if span_streaming:
404+
span = sentry_sdk.traces.start_span(name=task.name)
405+
span.set_op(OP.QUEUE_PROCESS)
406+
span.set_origin(CeleryIntegration.origin)
407+
else:
408+
span = sentry_sdk.start_span(
409+
op=OP.QUEUE_PROCESS,
410+
name=task.name,
411+
origin=CeleryIntegration.origin,
412+
)
413+
414+
with span:
415+
if isinstance(span, StreamedSpan):
416+
set_on_span = span.set_attribute
417+
else:
418+
set_on_span = span.set_data
419+
370420
_set_messaging_destination_name(task, span)
371421

372422
latency = None
@@ -381,18 +431,18 @@ def _inner(*args: "Any", **kwargs: "Any") -> "Any":
381431

382432
if latency is not None:
383433
latency *= 1000 # milliseconds
384-
span.set_data(SPANDATA.MESSAGING_MESSAGE_RECEIVE_LATENCY, latency)
434+
set_on_span(SPANDATA.MESSAGING_MESSAGE_RECEIVE_LATENCY, latency)
385435

386436
with capture_internal_exceptions():
387-
span.set_data(SPANDATA.MESSAGING_MESSAGE_ID, task.request.id)
437+
set_on_span(SPANDATA.MESSAGING_MESSAGE_ID, task.request.id)
388438

389439
with capture_internal_exceptions():
390-
span.set_data(
440+
set_on_span(
391441
SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, task.request.retries
392442
)
393443

394444
with capture_internal_exceptions():
395-
span.set_data(
445+
set_on_span(
396446
SPANDATA.MESSAGING_SYSTEM,
397447
task.app.connection().transport.driver_type,
398448
)
@@ -467,8 +517,13 @@ def sentry_workloop(*args: "Any", **kwargs: "Any") -> "Any":
467517
def _patch_producer_publish() -> None:
468518
original_publish = Producer.publish
469519

470-
@ensure_integration_enabled(CeleryIntegration, original_publish)
471520
def sentry_publish(self: "Producer", *args: "Any", **kwargs: "Any") -> "Any":
521+
client = sentry_sdk.get_client()
522+
if client.get_integration(CeleryIntegration) is None:
523+
return original_publish(self, *args, **kwargs)
524+
525+
span_streaming = has_span_streaming_enabled(client.options)
526+
472527
kwargs_headers = kwargs.get("headers", {})
473528
if not isinstance(kwargs_headers, Mapping):
474529
# Ensure kwargs_headers is a Mapping, so we can safely call get().
@@ -485,24 +540,37 @@ def sentry_publish(self: "Producer", *args: "Any", **kwargs: "Any") -> "Any":
485540
routing_key = kwargs.get("routing_key")
486541
exchange = kwargs.get("exchange")
487542

488-
with sentry_sdk.start_span(
489-
op=OP.QUEUE_PUBLISH,
490-
name=task_name,
491-
origin=CeleryIntegration.origin,
492-
) as span:
543+
span: "Union[StreamedSpan, Span]"
544+
if span_streaming:
545+
span = sentry_sdk.traces.start_span(name=task_name)
546+
span.set_op(OP.QUEUE_PUBLISH)
547+
span.set_origin(CeleryIntegration.origin)
548+
else:
549+
span = sentry_sdk.start_span(
550+
op=OP.QUEUE_PUBLISH,
551+
name=task_name,
552+
origin=CeleryIntegration.origin,
553+
)
554+
555+
with span:
556+
if isinstance(span, StreamedSpan):
557+
set_on_span = span.set_attribute
558+
else:
559+
set_on_span = span.set_data
560+
493561
if task_id is not None:
494-
span.set_data(SPANDATA.MESSAGING_MESSAGE_ID, task_id)
562+
set_on_span(SPANDATA.MESSAGING_MESSAGE_ID, task_id)
495563

496564
if exchange == "" and routing_key is not None:
497565
# Empty exchange indicates the default exchange, meaning messages are
498566
# routed to the queue with the same name as the routing key.
499-
span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key)
567+
set_on_span(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key)
500568

501569
if retries is not None:
502-
span.set_data(SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, retries)
570+
set_on_span(SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, retries)
503571

504572
with capture_internal_exceptions():
505-
span.set_data(
573+
set_on_span(
506574
SPANDATA.MESSAGING_SYSTEM, self.connection.transport.driver_type
507575
)
508576

0 commit comments

Comments
 (0)