Skip to content

Commit 49fafc3

Browse files
feat(ray): Support span streaming
1 parent 9e54e14 commit 49fafc3

2 files changed

Lines changed: 363 additions & 120 deletions

File tree

sentry_sdk/integrations/ray.py

Lines changed: 91 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55
import sentry_sdk
66
from sentry_sdk.consts import OP, SPANSTATUS
77
from sentry_sdk.integrations import DidNotEnable, Integration, _check_minimum_version
8+
from sentry_sdk.traces import SegmentSource
89
from sentry_sdk.tracing import TransactionSource
10+
from sentry_sdk.tracing_utils import has_span_streaming_enabled
911
from sentry_sdk.utils import (
1012
event_from_exception,
1113
logger,
@@ -88,25 +90,49 @@ def new_func(
8890
) -> "Any":
8991
_check_sentry_initialized()
9092

91-
transaction = sentry_sdk.continue_trace(
92-
_sentry_tracing or {},
93-
op=OP.QUEUE_TASK_RAY,
94-
name=qualname_from_function(user_f),
95-
origin=RayIntegration.origin,
96-
source=TransactionSource.TASK,
93+
span_streaming = has_span_streaming_enabled(
94+
sentry_sdk.get_client().options
9795
)
98-
99-
with sentry_sdk.start_transaction(transaction) as transaction:
100-
try:
101-
result = user_f(*f_args, **f_kwargs)
102-
transaction.set_status(SPANSTATUS.OK)
103-
except Exception:
104-
transaction.set_status(SPANSTATUS.INTERNAL_ERROR)
105-
exc_info = sys.exc_info()
106-
_capture_exception(exc_info)
107-
reraise(*exc_info)
108-
109-
return result
96+
if span_streaming:
97+
sentry_sdk.traces.continue_trace(_sentry_tracing or {})
98+
99+
with sentry_sdk.traces.start_span(
100+
name=qualname_from_function(user_f),
101+
attributes={
102+
"sentry.op": OP.QUEUE_TASK_RAY,
103+
"sentry.origin": RayIntegration.origin,
104+
"sentry.span.source": SegmentSource.TASK,
105+
},
106+
parent_span=None,
107+
):
108+
try:
109+
result = user_f(*f_args, **f_kwargs)
110+
except Exception:
111+
exc_info = sys.exc_info()
112+
_capture_exception(exc_info)
113+
reraise(*exc_info)
114+
115+
return result
116+
else:
117+
transaction = sentry_sdk.continue_trace(
118+
_sentry_tracing or {},
119+
op=OP.QUEUE_TASK_RAY,
120+
name=qualname_from_function(user_f),
121+
origin=RayIntegration.origin,
122+
source=TransactionSource.TASK,
123+
)
124+
125+
with sentry_sdk.start_transaction(transaction) as transaction:
126+
try:
127+
result = user_f(*f_args, **f_kwargs)
128+
transaction.set_status(SPANSTATUS.OK)
129+
except Exception:
130+
transaction.set_status(SPANSTATUS.INTERNAL_ERROR)
131+
exc_info = sys.exc_info()
132+
_capture_exception(exc_info)
133+
reraise(*exc_info)
134+
135+
return result
110136

111137
_insert_sentry_tracing_in_signature(new_func)
112138

@@ -122,27 +148,53 @@ def _remote_method_with_header_propagation(
122148
"""
123149
Ray Client
124150
"""
125-
with sentry_sdk.start_span(
126-
op=OP.QUEUE_SUBMIT_RAY,
127-
name=qualname_from_function(user_f),
128-
origin=RayIntegration.origin,
129-
) as span:
130-
tracing = {
131-
k: v
132-
for k, v in sentry_sdk.get_current_scope().iter_trace_propagation_headers()
133-
}
134-
try:
135-
result = old_remote_method(
136-
*args, **kwargs, _sentry_tracing=tracing
137-
)
138-
span.set_status(SPANSTATUS.OK)
139-
except Exception:
140-
span.set_status(SPANSTATUS.INTERNAL_ERROR)
141-
exc_info = sys.exc_info()
142-
_capture_exception(exc_info)
143-
reraise(*exc_info)
144-
145-
return result
151+
span_streaming = has_span_streaming_enabled(
152+
sentry_sdk.get_client().options
153+
)
154+
if span_streaming:
155+
with sentry_sdk.traces.start_span(
156+
name=qualname_from_function(user_f),
157+
attributes={
158+
"sentry.op": OP.QUEUE_SUBMIT_RAY,
159+
"sentry.origin": RayIntegration.origin,
160+
},
161+
):
162+
tracing = {
163+
k: v
164+
for k, v in sentry_sdk.get_current_scope().iter_trace_propagation_headers()
165+
}
166+
try:
167+
result = old_remote_method(
168+
*args, **kwargs, _sentry_tracing=tracing
169+
)
170+
except Exception:
171+
exc_info = sys.exc_info()
172+
_capture_exception(exc_info)
173+
reraise(*exc_info)
174+
175+
return result
176+
else:
177+
with sentry_sdk.start_span(
178+
op=OP.QUEUE_SUBMIT_RAY,
179+
name=qualname_from_function(user_f),
180+
origin=RayIntegration.origin,
181+
) as span:
182+
tracing = {
183+
k: v
184+
for k, v in sentry_sdk.get_current_scope().iter_trace_propagation_headers()
185+
}
186+
try:
187+
result = old_remote_method(
188+
*args, **kwargs, _sentry_tracing=tracing
189+
)
190+
span.set_status(SPANSTATUS.OK)
191+
except Exception:
192+
span.set_status(SPANSTATUS.INTERNAL_ERROR)
193+
exc_info = sys.exc_info()
194+
_capture_exception(exc_info)
195+
reraise(*exc_info)
196+
197+
return result
146198

147199
rv.remote = _remote_method_with_header_propagation
148200

0 commit comments

Comments
 (0)