Skip to content

Commit db134a3

Browse files
anuraagaxrmx
andauthored
fix(http-metric-exporter): use consistent protobuf for export request (#5015)
* fix(http-metric-exporter): use consistent protobuf for export request * changelog * Update exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py --------- Co-authored-by: Riccardo Magliocchetti <riccardo.magliocchetti@gmail.com>
1 parent 9ed343a commit db134a3

3 files changed

Lines changed: 45 additions & 41 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
4141
([#4910](https://github.com/open-telemetry/opentelemetry-python/pull/4910))
4242
- Add configurable `max_export_batch_size` to OTLP HTTP metrics exporter
4343
([#4576](https://github.com/open-telemetry/opentelemetry-python/pull/4576))
44+
- `opentelemetry-exporter-otlp-proto-http`: use consistent protobuf for export request
45+
([#5015](https://github.com/open-telemetry/opentelemetry-python/pull/5015))
4446
- `opentelemetry-sdk`: cache TracerConfig into the tracer, this changes an internal interface. Only one Tracer with the same instrumentation scope will be created
4547
([#5007](https://github.com/open-telemetry/opentelemetry-python/pull/5007))
4648

exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
_is_retryable,
5151
_load_session_from_envvar,
5252
)
53-
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ( # noqa: F401
53+
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import (
5454
ExportMetricsServiceRequest,
5555
)
5656
from opentelemetry.proto.common.v1.common_pb2 import ( # noqa: F401
@@ -60,7 +60,7 @@
6060
KeyValue,
6161
KeyValueList,
6262
)
63-
from opentelemetry.proto.metrics.v1 import metrics_pb2 as pb2 # noqa: F401
63+
from opentelemetry.proto.metrics.v1 import metrics_pb2 as pb2
6464
from opentelemetry.proto.resource.v1.resource_pb2 import Resource # noqa: F401
6565
from opentelemetry.proto.resource.v1.resource_pb2 import (
6666
Resource as PB2Resource,
@@ -243,18 +243,19 @@ def _export(
243243

244244
def _export_with_retries(
245245
self,
246-
serialized_data: bytes,
246+
export_request: ExportMetricsServiceRequest,
247247
deadline_sec: float,
248248
) -> MetricExportResult:
249249
"""Export serialized data with retry logic until success, non-transient error, or exponential backoff maxed out.
250250
251251
Args:
252-
serialized_data: serialized metrics data to export
252+
export_request: ExportMetricsServiceRequest object containing metrics data to export
253253
deadline_sec: timestamp deadline for the export
254254
255255
Returns:
256256
MetricExportResult: SUCCESS if export succeeded, FAILURE otherwise
257257
"""
258+
serialized_data = export_request.SerializeToString()
258259
for retry_num in range(_MAX_RETRYS):
259260
# multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff.
260261
backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2)
@@ -310,23 +311,21 @@ def export(
310311
_logger.warning("Exporter already shutdown, ignoring batch")
311312
return MetricExportResult.FAILURE
312313

313-
serialized_data = encode_metrics(metrics_data)
314+
export_request = encode_metrics(metrics_data)
314315
deadline_sec = time() + self._timeout
315316

316317
# If no batch size configured, export as single batch with retries as configured
317318
if self._max_export_batch_size is None:
318-
return self._export_with_retries(
319-
serialized_data.SerializeToString(), deadline_sec
320-
)
319+
return self._export_with_retries(export_request, deadline_sec)
321320

322321
# Else, export in batches of configured size
323-
split_metrics_batches = list(
324-
_split_metrics_data(serialized_data, self._max_export_batch_size)
322+
batched_export_requests = _split_metrics_data(
323+
export_request, self._max_export_batch_size
325324
)
326325

327-
for split_metrics_data in split_metrics_batches:
326+
for split_metrics_data in batched_export_requests:
328327
export_result = self._export_with_retries(
329-
split_metrics_data.SerializeToString(),
328+
split_metrics_data,
330329
deadline_sec,
331330
)
332331
if export_result != MetricExportResult.SUCCESS:
@@ -353,18 +352,18 @@ def force_flush(self, timeout_millis: float = 10_000) -> bool:
353352

354353

355354
def _split_metrics_data(
356-
metrics_data: pb2.MetricsData,
355+
metrics_data: ExportMetricsServiceRequest,
357356
max_export_batch_size: int | None = None,
358-
) -> Iterable[pb2.MetricsData]:
359-
"""Splits metrics data into several MetricsData (copies protobuf originals),
357+
) -> Iterable[ExportMetricsServiceRequest]:
358+
"""Splits metrics data into several ExportMetricsServiceRequest (copies protobuf originals),
360359
based on configured data point max export batch size.
361360
362361
Args:
363362
metrics_data: metrics object based on HTTP protocol buffer definition
364363
365364
Returns:
366-
Iterable[pb2.MetricsData]: An iterable of pb2.MetricsData objects containing
367-
pb2.ResourceMetrics, pb2.ScopeMetrics, pb2.Metrics, and data points
365+
Iterable[ExportMetricsServiceRequest]: An iterable of ExportMetricsServiceRequest objects containing
366+
ExportMetricsServiceRequest.ResourceMetrics, ExportMetricsServiceRequest.ScopeMetrics, ExportMetricsServiceRequest.Metrics, and data points
368367
"""
369368
if not max_export_batch_size:
370369
return metrics_data
@@ -430,7 +429,7 @@ def _split_metrics_data(
430429
batch_size += 1
431430

432431
if batch_size >= max_export_batch_size:
433-
yield pb2.MetricsData(
432+
yield ExportMetricsServiceRequest(
434433
resource_metrics=_get_split_resource_metrics_pb2(
435434
split_resource_metrics
436435
)
@@ -491,7 +490,7 @@ def _split_metrics_data(
491490
split_resource_metrics.pop()
492491

493492
if batch_size > 0:
494-
yield pb2.MetricsData(
493+
yield ExportMetricsServiceRequest(
495494
resource_metrics=_get_split_resource_metrics_pb2(
496495
split_resource_metrics
497496
)

exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@
4040
_split_metrics_data,
4141
)
4242
from opentelemetry.exporter.otlp.proto.http.version import __version__
43+
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import (
44+
ExportMetricsServiceRequest,
45+
)
4346
from opentelemetry.proto.common.v1.common_pb2 import (
4447
InstrumentationScope,
4548
KeyValue,
@@ -374,7 +377,7 @@ def test_serialization(self, mock_post):
374377
)
375378

376379
def test_split_metrics_data_many_data_points(self):
377-
metrics_data = pb2.MetricsData(
380+
metrics_data = ExportMetricsServiceRequest(
378381
resource_metrics=[
379382
_resource_metrics(
380383
index=1,
@@ -396,7 +399,7 @@ def test_split_metrics_data_many_data_points(self):
396399
),
397400
]
398401
)
399-
split_metrics_data: List[MetricsData] = list(
402+
split_metrics_data: List[ExportMetricsServiceRequest] = list(
400403
# pylint: disable=protected-access
401404
_split_metrics_data(
402405
metrics_data=metrics_data,
@@ -406,7 +409,7 @@ def test_split_metrics_data_many_data_points(self):
406409

407410
self.assertEqual(
408411
[
409-
pb2.MetricsData(
412+
ExportMetricsServiceRequest(
410413
resource_metrics=[
411414
_resource_metrics(
412415
index=1,
@@ -427,7 +430,7 @@ def test_split_metrics_data_many_data_points(self):
427430
),
428431
]
429432
),
430-
pb2.MetricsData(
433+
ExportMetricsServiceRequest(
431434
resource_metrics=[
432435
_resource_metrics(
433436
index=1,
@@ -452,7 +455,7 @@ def test_split_metrics_data_many_data_points(self):
452455
)
453456

454457
def test_split_metrics_data_nb_data_points_equal_batch_size(self):
455-
metrics_data = pb2.MetricsData(
458+
metrics_data = ExportMetricsServiceRequest(
456459
resource_metrics=[
457460
_resource_metrics(
458461
index=1,
@@ -475,7 +478,7 @@ def test_split_metrics_data_nb_data_points_equal_batch_size(self):
475478
]
476479
)
477480

478-
split_metrics_data: List[MetricsData] = list(
481+
split_metrics_data: List[ExportMetricsServiceRequest] = list(
479482
# pylint: disable=protected-access
480483
_split_metrics_data(
481484
metrics_data=metrics_data,
@@ -485,7 +488,7 @@ def test_split_metrics_data_nb_data_points_equal_batch_size(self):
485488

486489
self.assertEqual(
487490
[
488-
pb2.MetricsData(
491+
ExportMetricsServiceRequest(
489492
resource_metrics=[
490493
_resource_metrics(
491494
index=1,
@@ -513,7 +516,7 @@ def test_split_metrics_data_nb_data_points_equal_batch_size(self):
513516

514517
def test_split_metrics_data_many_resources_scopes_metrics(self):
515518
# GIVEN
516-
metrics_data = pb2.MetricsData(
519+
metrics_data = ExportMetricsServiceRequest(
517520
resource_metrics=[
518521
_resource_metrics(
519522
index=1,
@@ -567,7 +570,7 @@ def test_split_metrics_data_many_resources_scopes_metrics(self):
567570
]
568571
)
569572

570-
split_metrics_data: List[MetricsData] = list(
573+
split_metrics_data: List[ExportMetricsServiceRequest] = list(
571574
# pylint: disable=protected-access
572575
_split_metrics_data(
573576
metrics_data=metrics_data,
@@ -577,7 +580,7 @@ def test_split_metrics_data_many_resources_scopes_metrics(self):
577580

578581
self.assertEqual(
579582
[
580-
pb2.MetricsData(
583+
ExportMetricsServiceRequest(
581584
resource_metrics=[
582585
_resource_metrics(
583586
index=1,
@@ -603,7 +606,7 @@ def test_split_metrics_data_many_resources_scopes_metrics(self):
603606
),
604607
]
605608
),
606-
pb2.MetricsData(
609+
ExportMetricsServiceRequest(
607610
resource_metrics=[
608611
_resource_metrics(
609612
index=1,
@@ -858,7 +861,7 @@ def test_export_retries_with_batching_success(
858861
MagicMock(ok=True),
859862
MagicMock(ok=True),
860863
]
861-
mock_encode_metrics.return_value = pb2.MetricsData(
864+
mock_encode_metrics.return_value = ExportMetricsServiceRequest(
862865
resource_metrics=[
863866
_resource_metrics(
864867
index=1,
@@ -880,7 +883,7 @@ def test_export_retries_with_batching_success(
880883
),
881884
]
882885
)
883-
batch_1 = pb2.MetricsData(
886+
batch_1 = ExportMetricsServiceRequest(
884887
resource_metrics=[
885888
_resource_metrics(
886889
index=1,
@@ -901,7 +904,7 @@ def test_export_retries_with_batching_success(
901904
),
902905
]
903906
)
904-
batch_2 = pb2.MetricsData(
907+
batch_2 = ExportMetricsServiceRequest(
905908
resource_metrics=[
906909
_resource_metrics(
907910
index=1,
@@ -954,7 +957,7 @@ def test_export_retries_with_batching_failure_first(
954957
MagicMock(ok=True),
955958
MagicMock(ok=True),
956959
]
957-
mock_encode_metrics.return_value = pb2.MetricsData(
960+
mock_encode_metrics.return_value = ExportMetricsServiceRequest(
958961
resource_metrics=[
959962
_resource_metrics(
960963
index=1,
@@ -976,7 +979,7 @@ def test_export_retries_with_batching_failure_first(
976979
),
977980
]
978981
)
979-
batch_1 = pb2.MetricsData(
982+
batch_1 = ExportMetricsServiceRequest(
980983
resource_metrics=[
981984
_resource_metrics(
982985
index=1,
@@ -1031,7 +1034,7 @@ def test_export_retries_with_batching_failure_last(
10311034
# Non-retryable
10321035
MagicMock(ok=False, status_code=400, reason="bad request"),
10331036
]
1034-
mock_encode_metrics.return_value = pb2.MetricsData(
1037+
mock_encode_metrics.return_value = ExportMetricsServiceRequest(
10351038
resource_metrics=[
10361039
_resource_metrics(
10371040
index=1,
@@ -1053,7 +1056,7 @@ def test_export_retries_with_batching_failure_last(
10531056
),
10541057
]
10551058
)
1056-
batch_1 = pb2.MetricsData(
1059+
batch_1 = ExportMetricsServiceRequest(
10571060
resource_metrics=[
10581061
_resource_metrics(
10591062
index=1,
@@ -1074,7 +1077,7 @@ def test_export_retries_with_batching_failure_last(
10741077
),
10751078
]
10761079
)
1077-
batch_2 = pb2.MetricsData(
1080+
batch_2 = ExportMetricsServiceRequest(
10781081
resource_metrics=[
10791082
_resource_metrics(
10801083
index=1,
@@ -1131,7 +1134,7 @@ def test_export_retries_with_batching_failure_retryable(
11311134
# Then success
11321135
MagicMock(ok=True),
11331136
]
1134-
mock_encode_metrics.return_value = pb2.MetricsData(
1137+
mock_encode_metrics.return_value = ExportMetricsServiceRequest(
11351138
resource_metrics=[
11361139
_resource_metrics(
11371140
index=1,
@@ -1153,7 +1156,7 @@ def test_export_retries_with_batching_failure_retryable(
11531156
),
11541157
]
11551158
)
1156-
batch_1 = pb2.MetricsData(
1159+
batch_1 = ExportMetricsServiceRequest(
11571160
resource_metrics=[
11581161
_resource_metrics(
11591162
index=1,
@@ -1174,7 +1177,7 @@ def test_export_retries_with_batching_failure_retryable(
11741177
),
11751178
]
11761179
)
1177-
batch_2 = pb2.MetricsData(
1180+
batch_2 = ExportMetricsServiceRequest(
11781181
resource_metrics=[
11791182
_resource_metrics(
11801183
index=1,

0 commit comments

Comments
 (0)