@@ -462,6 +462,101 @@ def _set_input_data(
462462 span .set_data (SPANDATA .GEN_AI_REQUEST_AVAILABLE_TOOLS , safe_serialize (tools ))
463463
464464
465+ def _wrap_synchronous_message_iterator (
466+ iterator : "Iterator[RawMessageStreamEvent]" ,
467+ span : "Span" ,
468+ integration : "AnthropicIntegration" ,
469+ ) -> "Iterator[RawMessageStreamEvent]" :
470+ """
471+ Sets information received while iterating the response stream on the AI Client Span.
472+ Responsible for closing the AI Client Span.
473+ """
474+
475+ model = None
476+ usage = _RecordedUsage ()
477+ content_blocks : "list[str]" = []
478+
479+ for event in iterator :
480+ (
481+ model ,
482+ usage ,
483+ content_blocks ,
484+ ) = _collect_ai_data (
485+ event ,
486+ model ,
487+ usage ,
488+ content_blocks ,
489+ )
490+ yield event
491+
492+ # Anthropic's input_tokens excludes cached/cache_write tokens.
493+ # Normalize to total input tokens for correct cost calculations.
494+ total_input = (
495+ usage .input_tokens
496+ + (usage .cache_read_input_tokens or 0 )
497+ + (usage .cache_write_input_tokens or 0 )
498+ )
499+
500+ _set_output_data (
501+ span = span ,
502+ integration = integration ,
503+ model = model ,
504+ input_tokens = total_input ,
505+ output_tokens = usage .output_tokens ,
506+ cache_read_input_tokens = usage .cache_read_input_tokens ,
507+ cache_write_input_tokens = usage .cache_write_input_tokens ,
508+ content_blocks = [{"text" : "" .join (content_blocks ), "type" : "text" }],
509+ finish_span = True ,
510+ )
511+
512+
513+ async def _wrap_asynchronous_message_iterator (
514+ iterator : "Iterator[RawMessageStreamEvent]" ,
515+ span : "Span" ,
516+ integration : "AnthropicIntegration" ,
517+ ) -> "Iterator[RawMessageStreamEvent]" :
518+ """
519+ Sets information received while iterating the response stream on the AI Client Span.
520+ Responsible for closing the AI Client Span.
521+ """
522+ model = None
523+ usage = _RecordedUsage ()
524+ content_blocks : "list[str]" = []
525+
526+ async for event in iterator :
527+ (
528+ model ,
529+ usage ,
530+ content_blocks ,
531+ ) = _collect_ai_data (
532+ event ,
533+ model ,
534+ usage ,
535+ content_blocks ,
536+ )
537+ yield event
538+
539+ # Anthropic's input_tokens excludes cached/cache_write tokens.
540+ # Normalize to total input tokens for correct cost calculations.
541+ total_input = (
542+ usage .input_tokens
543+ + (usage .cache_read_input_tokens or 0 )
544+ + (usage .cache_write_input_tokens or 0 )
545+ )
546+
547+ _set_output_data (
548+ span = span ,
549+ integration = integration ,
550+ model = model ,
551+ input_tokens = total_input ,
552+ output_tokens = usage .output_tokens ,
553+ cache_read_input_tokens = usage .cache_read_input_tokens ,
554+ cache_write_input_tokens = usage .cache_write_input_tokens ,
555+ content_blocks = [{"text" : "" .join (content_blocks ), "type" : "text" }],
556+ finish_span = True ,
557+ )
558+
559+
465560def _set_output_data (
466561 span : "Span" ,
467562 integration : "AnthropicIntegration" ,
@@ -539,9 +634,16 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A
539634
540635 result = yield f , args , kwargs
541636
542- if isinstance (result , Stream ) or isinstance (result , AsyncStream ):
543- result ._sentry_span = span
544- result ._integration = integration
637+ if isinstance (result , Stream ):
638+ result ._iterator = _wrap_synchronous_message_iterator (
639+ result ._iterator , span , integration
640+ )
641+ return result
642+
643+ if isinstance (result , AsyncStream ):
644+ result ._iterator = _wrap_asynchronous_message_iterator (
645+ result ._iterator , span , integration
646+ )
545647 return result
546648
547649 with capture_internal_exceptions ():
0 commit comments