Skip to content

Commit 7874a54

Browse files
committed
ref: Per-bucket limits, fix envelope chunking
1 parent f2738ff commit 7874a54

1 file changed

Lines changed: 53 additions & 31 deletions

File tree

sentry_sdk/_span_batcher.py

Lines changed: 53 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,15 @@
1515

1616

1717
class SpanBatcher(Batcher["StreamedSpan"]):
18-
# TODO[span-first]: size-based flushes
19-
# TODO[span-first]: adjust flush/drop defaults
18+
# MAX_BEFORE_FLUSH should be lower than MAX_BEFORE_DROP, so that there is
19+
# a bit of a buffer for spans that appear between setting the flush event
20+
# and actually flushing the buffer.
21+
#
22+
# The max limits are all per trace.
23+
MAX_ENVELOPE_SIZE = 1000 # spans
2024
MAX_BEFORE_FLUSH = 1000
21-
MAX_BEFORE_DROP = 5000
25+
MAX_BEFORE_DROP = 2000
26+
MAX_BYTES_BEFORE_FLUSH = 5 * 1024 * 1024 # 5 MB
2227
FLUSH_WAIT_TIME = 5.0
2328

2429
TYPE = "span"
@@ -35,6 +40,7 @@ def __init__(
3540
# envelope.
3641
# trace_id -> span buffer
3742
self._span_buffer: dict[str, list["StreamedSpan"]] = defaultdict(list)
43+
self._running_size: dict[str, int] = defaultdict(lambda: 0)
3844
self._capture_func = capture_func
3945
self._record_lost_func = record_lost_func
4046
self._running = True
@@ -45,16 +51,12 @@ def __init__(
4551
self._flusher: "Optional[threading.Thread]" = None
4652
self._flusher_pid: "Optional[int]" = None
4753

48-
def get_size(self) -> int:
49-
# caller is responsible for locking before checking this
50-
return sum(len(buffer) for buffer in self._span_buffer.values())
51-
5254
def add(self, span: "StreamedSpan") -> None:
5355
if not self._ensure_thread() or self._flusher is None:
5456
return None
5557

5658
with self._lock:
57-
size = self.get_size()
59+
size = len(self._span_buffer[span.trace_id])
5860
if size >= self.MAX_BEFORE_DROP:
5961
self._record_lost_func(
6062
reason="queue_overflow",
@@ -64,8 +66,22 @@ def add(self, span: "StreamedSpan") -> None:
6466
return None
6567

6668
self._span_buffer[span.trace_id].append(span)
69+
self._running_size[span.trace_id] += self._estimate_size(span)
70+
6771
if size + 1 >= self.MAX_BEFORE_FLUSH:
6872
self._flush_event.set()
73+
return
74+
75+
if self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH:
76+
self._flush_event.set()
77+
return
78+
79+
@staticmethod
80+
def _estimate_size(item: "StreamedSpan") -> int:
81+
# Rough estimate of serialized span size that's quick to compute.
82+
# 210 is the rough size of the payload without attributes, and we
83+
# estimate additional 70 bytes on top of that per attribute.
84+
return 210 + 70 * len(item._attributes)
6985

7086
@staticmethod
7187
def _to_transport_format(item: "StreamedSpan") -> "Any":
@@ -95,34 +111,40 @@ def _flush(self) -> None:
95111
# dsc = spans[0].dynamic_sampling_context()
96112
dsc = None
97113

98-
envelope = Envelope(
99-
headers={
100-
"sent_at": format_timestamp(datetime.now(timezone.utc)),
101-
"trace": dsc,
102-
}
103-
)
104-
105-
envelope.add_item(
106-
Item(
107-
type="span",
108-
content_type="application/vnd.sentry.items.span.v2+json",
114+
# Max per envelope is 1000, so if we happen to have more than
115+
# 1000 spans in one bucket, we'll need to separate them.
116+
for start in range(0, len(spans), self.MAX_ENVELOPE_SIZE):
117+
end = min(start + self.MAX_ENVELOPE_SIZE, len(spans))
118+
119+
envelope = Envelope(
109120
headers={
110-
"item_count": len(spans),
111-
},
112-
payload=PayloadRef(
113-
json={
114-
"items": [
115-
self._to_transport_format(span)
116-
for span in spans
117-
]
118-
}
119-
),
121+
"sent_at": format_timestamp(datetime.now(timezone.utc)),
122+
"trace": dsc,
123+
}
124+
)
125+
126+
envelope.add_item(
127+
Item(
128+
type=self.TYPE,
129+
content_type=self.CONTENT_TYPE,
130+
headers={
131+
"item_count": end - start,
132+
},
133+
payload=PayloadRef(
134+
json={
135+
"items": [
136+
self._to_transport_format(spans[j])
137+
for j in range(start, end)
138+
]
139+
}
140+
),
141+
)
120142
)
121-
)
122143

123-
envelopes.append(envelope)
144+
envelopes.append(envelope)
124145

125146
self._span_buffer.clear()
147+
self._running_size.clear()
126148

127149
for envelope in envelopes:
128150
self._capture_func(envelope)

0 commit comments

Comments
 (0)