11import json
22import threading
3+ import math
34from collections import defaultdict
45from datetime import datetime , timezone
56from typing import TYPE_CHECKING
1516
1617class SpanBatcher (Batcher ["StreamedSpan" ]):
1718 # TODO[span-first]: adjust flush/drop defaults
19+ # MAX_BEFORE_FLUSH should be lower than MAX_BEFORE_DROP, so that there is
20+ # a bit of a buffer for spans that appear between setting the flush event
21+ # and actually flushing the buffer.
22+ #
23+ # The max limits are all per trace.
24+ MAX_ENVELOPE_SIZE = 1000 # spans
1825 MAX_BEFORE_FLUSH = 1000
19- MAX_BEFORE_DROP = 1000
26+ MAX_BEFORE_DROP = 2000
2027 MAX_BYTES_BEFORE_FLUSH = 5 * 1024 * 1024 # 5 MB
2128 FLUSH_WAIT_TIME = 5.0
2229
@@ -45,16 +52,12 @@ def __init__(
4552 self ._flusher : "Optional[threading.Thread]" = None
4653 self ._flusher_pid : "Optional[int]" = None
4754
48- def get_size (self ) -> int :
49- # caller is responsible for locking before checking this
50- return sum (len (buffer ) for buffer in self ._span_buffer .values ())
51-
5255 def add (self , span : "StreamedSpan" ) -> None :
5356 if not self ._ensure_thread () or self ._flusher is None :
5457 return None
5558
5659 with self ._lock :
57- size = self .get_size ( )
60+ size = len ( self ._span_buffer [ span . trace_id ] )
5861 if size >= self .MAX_BEFORE_DROP :
5962 self ._record_lost_func (
6063 reason = "queue_overflow" ,
@@ -76,9 +79,8 @@ def add(self, span: "StreamedSpan") -> None:
7679
7780 @staticmethod
7881 def _estimate_size (item : "StreamedSpan" ) -> int :
79- # This is just a quick approximation
80- span_dict = SpanBatcher ._to_transport_format (item )
81- return len (str (span_dict ))
82+ # Rough estimate of serialized span size that's quick to compute
83+ return 210 + 70 * len (item ._attributes )
8284
8385 @staticmethod
8486 def _to_transport_format (item : "StreamedSpan" ) -> "Any" :
@@ -107,39 +109,45 @@ def _to_transport_format(item: "StreamedSpan") -> "Any":
107109 def _flush (self ) -> None :
108110 with self ._lock :
109111 if len (self ._span_buffer ) == 0 :
110- return None
112+ return
111113
112114 envelopes = []
113115 for spans in self ._span_buffer .values ():
114116 if spans :
115117 dsc = spans [0 ].dynamic_sampling_context ()
116118
117- envelope = Envelope (
118- headers = {
119- "sent_at" : format_timestamp (datetime .now (timezone .utc )),
120- "trace" : dsc ,
121- }
122- )
123-
124- envelope .add_item (
125- Item (
126- type = "span" ,
127- content_type = "application/vnd.sentry.items.span.v2+json" ,
119+ # Max per envelope is 1000, so if we happen to have more than
120+ # 1000 spans in one bucket, we'll need to separate them.
121+ for i in range (math .ceil (len (spans ) / self .MAX_ENVELOPE_SIZE )):
122+ envelope = Envelope (
128123 headers = {
129- "item_count" : len (spans ),
130- },
131- payload = PayloadRef (
132- json = {
133- "items" : [
134- self ._to_transport_format (span )
135- for span in spans
136- ]
137- }
138- ),
124+ "sent_at" : format_timestamp (datetime .now (timezone .utc )),
125+ "trace" : dsc ,
126+ }
127+ )
128+
129+ envelope .add_item (
130+ Item (
131+ type = "span" ,
132+ content_type = "application/vnd.sentry.items.span.v2+json" ,
133+ headers = {
134+ "item_count" : len (spans ),
135+ },
136+ payload = PayloadRef (
137+ json = {
138+ "items" : [
139+ self ._to_transport_format (span )
140+ for span in spans [
141+ i * self .MAX_ENVELOPE_SIZE : (i + 1 )
142+ * self .MAX_ENVELOPE_SIZE
143+ ]
144+ ]
145+ }
146+ ),
147+ )
139148 )
140- )
141149
142- envelopes .append (envelope )
150+ envelopes .append (envelope )
143151
144152 self ._span_buffer .clear ()
145153 self ._running_size .clear ()
0 commit comments