2222from os import environ , linesep
2323from sys import stdout
2424from threading import Event , Lock , RLock , Thread
25- from time import time_ns
25+ from time import perf_counter , time_ns
2626from typing import IO , Callable , Iterable , Optional
2727
2828from typing_extensions import final
3535 detach ,
3636 set_value ,
3737)
38+ from opentelemetry .metrics import MeterProvider , NoOpMeterProvider
3839from opentelemetry .sdk .environment_variables import (
3940 OTEL_METRIC_EXPORT_INTERVAL ,
4041 OTEL_METRIC_EXPORT_TIMEOUT ,
6162 _UpDownCounter ,
6263)
6364from opentelemetry .sdk .metrics ._internal .point import MetricsData
65+ from opentelemetry .semconv ._incubating .attributes .otel_attributes import (
66+ OtelComponentTypeValues ,
67+ )
6468from opentelemetry .util ._once import Once
6569
70+ from ._metric_reader_metrics import MetricReaderMetrics
71+
6672_logger = getLogger (__name__ )
6773
6874
@@ -220,6 +226,8 @@ def __init__(
220226 type , "opentelemetry.sdk.metrics.view.Aggregation"
221227 ]
222228 | None = None ,
229+ * ,
230+ otel_component_type : OtelComponentTypeValues | None = None ,
223231 ) -> None :
224232 self ._collect : Callable [
225233 [
@@ -318,6 +326,15 @@ def __init__(
318326 else :
319327 raise Exception (f"Invalid instrument class found { typ } " )
320328
329+ self ._otel_component_type = (
330+ otel_component_type .value
331+ if otel_component_type
332+ else type (self ).__qualname__
333+ )
334+ self ._metrics = MetricReaderMetrics (
335+ self ._otel_component_type , NoOpMeterProvider ()
336+ )
337+
321338 @final
322339 def collect (self , timeout_millis : float = 10_000 ) -> None :
323340 """Collects the metrics from the internal SDK state and
@@ -337,7 +354,11 @@ def collect(self, timeout_millis: float = 10_000) -> None:
337354 )
338355 return
339356
340- metrics = self ._collect (self , timeout_millis = timeout_millis )
357+ start_time = perf_counter ()
358+ try :
359+ metrics = self ._collect (self , timeout_millis = timeout_millis )
360+ finally :
361+ self ._metrics .record_collection (perf_counter () - start_time )
341362
342363 if metrics is not None :
343364 self ._receive_metrics (
@@ -368,6 +389,11 @@ def _receive_metrics(
368389 ) -> None :
369390 """Called by `MetricReader.collect` when it receives a batch of metrics"""
370391
392+ def _set_meter_provider (self , meter_provider : MeterProvider ) -> None :
393+ self ._metrics = MetricReaderMetrics (
394+ self ._otel_component_type , meter_provider
395+ )
396+
371397 def force_flush (self , timeout_millis : float = 10_000 ) -> bool :
372398 self .collect (timeout_millis = timeout_millis )
373399 return True
@@ -451,6 +477,7 @@ def __init__(
451477 super ().__init__ (
452478 preferred_temporality = exporter ._preferred_temporality ,
453479 preferred_aggregation = exporter ._preferred_aggregation ,
480+ otel_component_type = OtelComponentTypeValues .PERIODIC_METRIC_READER ,
454481 )
455482
456483 # This lock is held whenever calling self._exporter.export() to prevent concurrent
0 commit comments