Skip to content

Commit 37c990e

Browse files
committed
feat(span-streaming): Add span batcher
1 parent bab4359 commit 37c990e

3 files changed

Lines changed: 866 additions & 1 deletion

File tree

sentry_sdk/_span_batcher.py

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
import threading
2+
from collections import defaultdict
3+
from datetime import datetime, timezone
4+
from typing import TYPE_CHECKING
5+
6+
from sentry_sdk._batcher import Batcher
7+
from sentry_sdk.consts import SPANSTATUS
8+
from sentry_sdk.envelope import Envelope, Item, PayloadRef
9+
from sentry_sdk.utils import format_timestamp, serialize_attribute, safe_repr
10+
11+
if TYPE_CHECKING:
12+
from typing import Any, Callable, Optional
13+
from sentry_sdk.traces import SpanStatus, StreamedSpan
14+
from sentry_sdk._types import SerializedAttributeValue
15+
16+
17+
class SpanBatcher(Batcher["StreamedSpan"]):
18+
# TODO[span-first]: size-based flushes
19+
# TODO[span-first]: adjust flush/drop defaults
20+
MAX_BEFORE_FLUSH = 1000
21+
MAX_BEFORE_DROP = 5000
22+
FLUSH_WAIT_TIME = 5.0
23+
24+
TYPE = "span"
25+
CONTENT_TYPE = "application/vnd.sentry.items.span.v2+json"
26+
27+
def __init__(
28+
self,
29+
capture_func: "Callable[[Envelope], None]",
30+
record_lost_func: "Callable[..., None]",
31+
) -> None:
32+
# Spans from different traces cannot be emitted in the same envelope
33+
# since the envelope contains a shared trace header. That's why we bucket
34+
# by trace_id, so that we can then send the buckets each in its own
35+
# envelope.
36+
# trace_id -> span buffer
37+
self._span_buffer: dict[str, list["StreamedSpan"]] = defaultdict(list)
38+
self._capture_func = capture_func
39+
self._record_lost_func = record_lost_func
40+
self._running = True
41+
self._lock = threading.Lock()
42+
43+
self._flush_event: "threading.Event" = threading.Event()
44+
45+
self._flusher: "Optional[threading.Thread]" = None
46+
self._flusher_pid: "Optional[int]" = None
47+
48+
def get_size(self) -> int:
49+
# caller is responsible for locking before checking this
50+
return sum(len(buffer) for buffer in self._span_buffer.values())
51+
52+
def add(self, span: "StreamedSpan") -> None:
53+
if not self._ensure_thread() or self._flusher is None:
54+
return None
55+
56+
with self._lock:
57+
size = self.get_size()
58+
if size >= self.MAX_BEFORE_DROP:
59+
self._record_lost_func(
60+
reason="queue_overflow",
61+
data_category="span",
62+
quantity=1,
63+
)
64+
return None
65+
66+
self._span_buffer[span.trace_id].append(span)
67+
if size + 1 >= self.MAX_BEFORE_FLUSH:
68+
self._flush_event.set()
69+
70+
@staticmethod
71+
def _to_transport_format(item: "StreamedSpan") -> "Any":
72+
res: "dict[str, Any]" = {
73+
"trace_id": item.trace_id,
74+
"span_id": item.span_id,
75+
"name": item.get_name(),
76+
"status": item.status,
77+
"is_segment": item.is_segment(),
78+
"start_timestamp": item.start_timestamp.timestamp(), # TODO[span-first]
79+
}
80+
81+
if item.timestamp:
82+
res["end_timestamp"] = item.timestamp.timestamp()
83+
84+
if item.parent_span_id:
85+
res["parent_span_id"] = item.parent_span_id
86+
87+
if item.attributes:
88+
res["attributes"] = {
89+
k: serialize_attribute(v) for (k, v) in item.attributes.items()
90+
}
91+
92+
return res
93+
94+
def _flush(self) -> None:
95+
with self._lock:
96+
if len(self._span_buffer) == 0:
97+
return None
98+
99+
envelopes = []
100+
for trace_id, spans in self._span_buffer.items():
101+
if spans:
102+
dsc = spans[0].dynamic_sampling_context()
103+
104+
envelope = Envelope(
105+
headers={
106+
"sent_at": format_timestamp(datetime.now(timezone.utc)),
107+
"trace": dsc,
108+
}
109+
)
110+
111+
envelope.add_item(
112+
Item(
113+
type="span",
114+
content_type="application/vnd.sentry.items.span.v2+json",
115+
headers={
116+
"item_count": len(spans),
117+
},
118+
payload=PayloadRef(
119+
json={
120+
"items": [
121+
self._to_transport_format(span)
122+
for span in spans
123+
]
124+
}
125+
),
126+
)
127+
)
128+
129+
envelopes.append(envelope)
130+
131+
self._span_buffer.clear()
132+
133+
for envelope in envelopes:
134+
self._capture_func(envelope)

sentry_sdk/client.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import sentry_sdk
1212
from sentry_sdk._compat import PY37, check_uwsgi_thread_support
1313
from sentry_sdk._metrics_batcher import MetricsBatcher
14+
from sentry_sdk._span_batcher import SpanBatcher
1415
from sentry_sdk.utils import (
1516
AnnotatedValue,
1617
ContextVar,
@@ -31,6 +32,7 @@
3132
)
3233
from sentry_sdk.serializer import serialize
3334
from sentry_sdk.tracing import trace
35+
from sentry_sdk.tracing_utils import has_span_streaming_enabled
3436
from sentry_sdk.transport import BaseHttpTransport, make_transport
3537
from sentry_sdk.consts import (
3638
SPANDATA,
@@ -67,6 +69,7 @@
6769
from sentry_sdk.scope import Scope
6870
from sentry_sdk.session import Session
6971
from sentry_sdk.spotlight import SpotlightClient
72+
from sentry_sdk.traces import StreamedSpan
7073
from sentry_sdk.transport import Transport, Item
7174
from sentry_sdk._log_batcher import LogBatcher
7275
from sentry_sdk._metrics_batcher import MetricsBatcher
@@ -188,6 +191,7 @@ def __init__(self, options: "Optional[Dict[str, Any]]" = None) -> None:
188191
self.monitor: "Optional[Monitor]" = None
189192
self.log_batcher: "Optional[LogBatcher]" = None
190193
self.metrics_batcher: "Optional[MetricsBatcher]" = None
194+
self.span_batcher: "Optional[SpanBatcher]" = None
191195
self.integrations: "dict[str, Integration]" = {}
192196

193197
def __getstate__(self, *args: "Any", **kwargs: "Any") -> "Any":
@@ -224,6 +228,9 @@ def _capture_log(self, log: "Log", scope: "Scope") -> None:
224228
def _capture_metric(self, metric: "Metric", scope: "Scope") -> None:
225229
pass
226230

231+
def _capture_span(self, span: "StreamedSpan", scope: "Scope") -> None:
232+
pass
233+
227234
def capture_session(self, *args: "Any", **kwargs: "Any") -> None:
228235
return None
229236

@@ -399,6 +406,13 @@ def _record_lost_event(
399406
record_lost_func=_record_lost_event,
400407
)
401408

409+
self.span_batcher = None
410+
if has_span_streaming_enabled(self.options):
411+
self.span_batcher = SpanBatcher(
412+
capture_func=_capture_envelope,
413+
record_lost_func=_record_lost_event,
414+
)
415+
402416
max_request_body_size = ("always", "never", "small", "medium")
403417
if self.options["max_request_body_size"] not in max_request_body_size:
404418
raise ValueError(
@@ -909,7 +923,10 @@ def capture_event(
909923
return return_value
910924

911925
def _capture_telemetry(
912-
self, telemetry: "Optional[Union[Log, Metric]]", ty: str, scope: "Scope"
926+
self,
927+
telemetry: "Optional[Union[Log, Metric, StreamedSpan]]",
928+
ty: str,
929+
scope: "Scope",
913930
) -> None:
914931
# Capture attributes-based telemetry (logs, metrics, spansV2)
915932
if telemetry is None:
@@ -934,6 +951,8 @@ def _capture_telemetry(
934951
batcher = self.log_batcher
935952
elif ty == "metric":
936953
batcher = self.metrics_batcher # type: ignore
954+
elif ty == "span":
955+
batcher = self.span_batcher # type: ignore
937956

938957
if batcher is not None:
939958
batcher.add(telemetry) # type: ignore
@@ -944,6 +963,9 @@ def _capture_log(self, log: "Optional[Log]", scope: "Scope") -> None:
944963
def _capture_metric(self, metric: "Optional[Metric]", scope: "Scope") -> None:
945964
self._capture_telemetry(metric, "metric", scope)
946965

966+
def _capture_span(self, span: "Optional[StreamedSpan]", scope: "Scope") -> None:
967+
self._capture_telemetry(span, "span", scope)
968+
947969
def capture_session(
948970
self,
949971
session: "Session",
@@ -993,6 +1015,8 @@ def close(
9931015
self.log_batcher.kill()
9941016
if self.metrics_batcher is not None:
9951017
self.metrics_batcher.kill()
1018+
if self.span_batcher is not None:
1019+
self.span_batcher.kill()
9961020
if self.monitor:
9971021
self.monitor.kill()
9981022
self.transport.kill()
@@ -1018,6 +1042,8 @@ def flush(
10181042
self.log_batcher.flush()
10191043
if self.metrics_batcher is not None:
10201044
self.metrics_batcher.flush()
1045+
if self.span_batcher is not None:
1046+
self.span_batcher.flush()
10211047
self.transport.flush(timeout=timeout, callback=callback)
10221048

10231049
def __enter__(self) -> "_Client":

0 commit comments

Comments
 (0)