Skip to content

Commit f54e45d

Browse files
Merge branch 'master' into webb/clickhouse-driver/span-first
2 parents 3dd6bc0 + baae950 commit f54e45d

4 files changed

Lines changed: 414 additions & 132 deletions

File tree

sentry_sdk/integrations/arq.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,9 @@ def _capture_exception(exc_info: "ExcInfo") -> None:
118118

119119
scope.transaction.set_status(SPANSTATUS.INTERNAL_ERROR)
120120

121+
if exc_info[0] in ARQ_CONTROL_FLOW_EXCEPTIONS:
122+
return
123+
121124
event, hint = event_from_exception(
122125
exc_info,
123126
client_options=sentry_sdk.get_client().options,

sentry_sdk/integrations/rq.py

Lines changed: 60 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,16 @@
22

33
import sentry_sdk
44
from sentry_sdk.api import continue_trace
5-
from sentry_sdk.consts import OP
5+
from sentry_sdk.consts import OP, SPANDATA
66
from sentry_sdk.integrations import DidNotEnable, Integration, _check_minimum_version
77
from sentry_sdk.integrations.logging import ignore_logger
8-
from sentry_sdk.scope import should_send_default_pii
8+
from sentry_sdk.scope import Scope, should_send_default_pii
9+
from sentry_sdk.traces import SegmentSource
910
from sentry_sdk.tracing import TransactionSource
11+
from sentry_sdk.tracing_utils import has_span_streaming_enabled
1012
from sentry_sdk.utils import (
1113
SENSITIVE_DATA_SUBSTITUTE,
1214
capture_internal_exceptions,
13-
ensure_integration_enabled,
1415
event_from_exception,
1516
format_timestamp,
1617
parse_version,
@@ -61,30 +62,59 @@ def setup_once() -> None:
6162

6263
old_perform_job = worker_cls.perform_job
6364

64-
@ensure_integration_enabled(RqIntegration, old_perform_job)
6565
def sentry_patched_perform_job(
6666
self: "Any", job: "Job", *args: "Queue", **kwargs: "Any"
6767
) -> bool:
68+
client = sentry_sdk.get_client()
69+
if client.get_integration(RqIntegration) is None:
70+
return old_perform_job(self, job, *args, **kwargs)
71+
6872
with sentry_sdk.new_scope() as scope:
6973
scope.clear_breadcrumbs()
7074
scope.add_event_processor(_make_event_processor(weakref.ref(job)))
7175

72-
transaction = continue_trace(
73-
job.meta.get("_sentry_trace_headers") or {},
74-
op=OP.QUEUE_TASK_RQ,
75-
name="unknown RQ task",
76-
source=TransactionSource.TASK,
77-
origin=RqIntegration.origin,
78-
)
79-
80-
with capture_internal_exceptions():
81-
transaction.name = job.func_name
82-
83-
with sentry_sdk.start_transaction(
84-
transaction,
85-
custom_sampling_context={"rq_job": job},
86-
):
87-
rv = old_perform_job(self, job, *args, **kwargs)
76+
if has_span_streaming_enabled(client.options):
77+
sentry_sdk.traces.continue_trace(
78+
job.meta.get("_sentry_trace_headers") or {}
79+
)
80+
81+
Scope.set_custom_sampling_context({"rq_job": job})
82+
83+
func_name = None
84+
with capture_internal_exceptions():
85+
func_name = job.func_name
86+
87+
with sentry_sdk.traces.start_span(
88+
name="unknown RQ task" if func_name is None else func_name,
89+
attributes={
90+
"sentry.op": OP.QUEUE_TASK_RQ,
91+
"sentry.origin": RqIntegration.origin,
92+
"sentry.span.source": SegmentSource.TASK,
93+
SPANDATA.MESSAGING_MESSAGE_ID: job.id,
94+
},
95+
parent_span=None,
96+
) as span:
97+
if func_name is not None:
98+
span.set_attribute(SPANDATA.CODE_FUNCTION_NAME, func_name)
99+
100+
rv = old_perform_job(self, job, *args, **kwargs)
101+
else:
102+
transaction = continue_trace(
103+
job.meta.get("_sentry_trace_headers") or {},
104+
op=OP.QUEUE_TASK_RQ,
105+
name="unknown RQ task",
106+
source=TransactionSource.TASK,
107+
origin=RqIntegration.origin,
108+
)
109+
110+
with capture_internal_exceptions():
111+
transaction.name = job.func_name
112+
113+
with sentry_sdk.start_transaction(
114+
transaction,
115+
custom_sampling_context={"rq_job": job},
116+
):
117+
rv = old_perform_job(self, job, *args, **kwargs)
88118

89119
if self.is_horse:
90120
# We're inside of a forked process and RQ is
@@ -116,12 +146,20 @@ def sentry_patched_handle_exception(
116146

117147
old_enqueue_job = Queue.enqueue_job
118148

119-
@ensure_integration_enabled(RqIntegration, old_enqueue_job)
120149
def sentry_patched_enqueue_job(
121150
self: "Queue", job: "Any", **kwargs: "Any"
122151
) -> "Any":
152+
client = sentry_sdk.get_client()
153+
if client.get_integration(RqIntegration) is None:
154+
return old_enqueue_job(self, job, **kwargs)
155+
123156
scope = sentry_sdk.get_current_scope()
124-
if scope.span is not None:
157+
span = (
158+
scope.streamed_span
159+
if has_span_streaming_enabled(client.options)
160+
else scope.span
161+
)
162+
if span is not None:
125163
job.meta["_sentry_trace_headers"] = dict(
126164
scope.iter_trace_propagation_headers()
127165
)

tests/conftest.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -466,8 +466,8 @@ def append(envelope):
466466

467467
def flush(timeout=None, callback=None):
468468
real_flush(timeout=timeout, callback=callback)
469-
items_w.write(json.dumps(telemetry).encode("utf-8"))
470-
items_w.write(b"\n")
469+
items_w.write(json.dumps(telemetry).encode("utf-8") + b"\n")
470+
items_w.write(b"flush\n")
471471

472472
monkeypatch.setattr(test_client.transport, "capture_envelope", append)
473473
monkeypatch.setattr(test_client, "flush", flush)

0 commit comments

Comments
 (0)