-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Expand file tree
/
Copy path_otel.py
More file actions
99 lines (82 loc) · 3.31 KB
/
_otel.py
File metadata and controls
99 lines (82 loc) · 3.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
"""OpenTelemetry helpers for MCP."""
from __future__ import annotations
from collections.abc import Iterator
from contextlib import contextmanager
from typing import Any
from opentelemetry.context import Context
from opentelemetry.metrics import get_meter
from opentelemetry.propagate import extract, inject
from opentelemetry.trace import SpanKind, get_tracer
_tracer = get_tracer("mcp-python-sdk")
_meter = get_meter("mcp-python-sdk")
# Metrics as defined by the OTEL semconv https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/mcp.md
_DURATION_BUCKETS = [0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 30, 60, 120, 300]
_server_operation_duration = _meter.create_histogram(
"mcp.server.operation.duration",
unit="s",
description=(
"MCP request or notification duration as observed on the receiver "
"from the time it was received until the result or ack is sent."
),
explicit_bucket_boundaries_advisory=_DURATION_BUCKETS,
)
_server_session_duration = _meter.create_histogram(
"mcp.server.session.duration",
unit="s",
description="The duration of the MCP session as observed on the MCP server.",
explicit_bucket_boundaries_advisory=_DURATION_BUCKETS,
)
@contextmanager
def otel_span(
name: str,
*,
kind: SpanKind,
attributes: dict[str, Any] | None = None,
context: Context | None = None,
) -> Iterator[Any]:
"""Create an OTel span."""
with _tracer.start_as_current_span(name, kind=kind, attributes=attributes, context=context) as span:
yield span
def inject_trace_context(meta: dict[str, Any]) -> None:
"""Inject W3C trace context (traceparent/tracestate) into a `_meta` dict."""
inject(meta)
def extract_trace_context(meta: dict[str, Any]) -> Context:
"""Extract W3C trace context from a `_meta` dict."""
return extract(meta)
def record_server_operation_duration(
duration_s: float,
method: str,
*,
error_type: str | None = None,
rpc_response_status_code: str | None = None,
tool_name: str | None = None,
prompt_name: str | None = None,
mcp_protocol_version: str | None = None,
) -> None:
"""Record a data point for mcp.server.operation.duration."""
attributes: dict[str, str] = {"mcp.method.name": method}
if error_type is not None:
attributes["error.type"] = error_type
if rpc_response_status_code is not None:
attributes["rpc.response.status_code"] = rpc_response_status_code
if tool_name is not None:
attributes["gen_ai.tool.name"] = tool_name
attributes["gen_ai.operation.name"] = "execute_tool"
if prompt_name is not None:
attributes["gen_ai.prompt.name"] = prompt_name
if mcp_protocol_version is not None:
attributes["mcp.protocol.version"] = mcp_protocol_version
_server_operation_duration.record(duration_s, attributes)
def record_server_session_duration(
duration_s: float,
*,
error_type: str | None = None,
mcp_protocol_version: str | None = None,
) -> None:
"""Record a data point for mcp.server.session.duration."""
attributes: dict[str, str] = {}
if error_type is not None:
attributes["error.type"] = error_type
if mcp_protocol_version is not None:
attributes["mcp.protocol.version"] = mcp_protocol_version
_server_session_duration.record(duration_s, attributes)