opentelemetry_sdk/metrics/
reader.rs

1//! Interfaces for reading and producing metrics
2use std::{fmt, sync::Weak};
3
4use opentelemetry::metrics::Result;
5
6use super::{
7    aggregation::Aggregation,
8    data::{ResourceMetrics, ScopeMetrics, Temporality},
9    instrument::InstrumentKind,
10    pipeline::Pipeline,
11};
12
13/// The interface used between the SDK and an exporter.
14///
15/// Control flow is bi-directional through the `MetricReader`, since the SDK
16/// initiates `force_flush` and `shutdown` while the reader initiates
17/// collection. The `register_pipeline` method here informs the metric reader
18/// that it can begin reading, signaling the start of bi-directional control
19/// flow.
20///
21/// Typically, push-based exporters that are periodic will implement
22/// `MetricExporter` themselves and construct a `PeriodicReader` to satisfy this
23/// interface.
24///
25/// Pull-based exporters will typically implement `MetricReader` themselves,
26/// since they read on demand.
27pub trait MetricReader:
28    AggregationSelector + TemporalitySelector + fmt::Debug + Send + Sync + 'static
29{
30    /// Registers a [MetricReader] with a [Pipeline].
31    ///
32    /// The pipeline argument allows the `MetricReader` to signal the sdk to collect
33    /// and send aggregated metric measurements.
34    fn register_pipeline(&self, pipeline: Weak<Pipeline>);
35
36    /// Gathers and returns all metric data related to the [MetricReader] from the
37    /// SDK and stores it in the provided [ResourceMetrics] reference.
38    ///
39    /// An error is returned if this is called after shutdown.
40    fn collect(&self, rm: &mut ResourceMetrics) -> Result<()>;
41
42    /// Flushes all metric measurements held in an export pipeline.
43    ///
44    /// There is no guaranteed that all telemetry be flushed or all resources have
45    /// been released on error.
46    fn force_flush(&self) -> Result<()>;
47
48    /// Flushes all metric measurements held in an export pipeline and releases any
49    /// held computational resources.
50    ///
51    /// There is no guaranteed that all telemetry be flushed or all resources have
52    /// been released on error.
53    ///
54    /// After `shutdown` is called, calls to `collect` will perform no operation and
55    /// instead will return an error indicating the shutdown state.
56    fn shutdown(&self) -> Result<()>;
57}
58
59/// Produces metrics for a [MetricReader].
60pub(crate) trait SdkProducer: fmt::Debug + Send + Sync {
61    /// Returns aggregated metrics from a single collection.
62    fn produce(&self, rm: &mut ResourceMetrics) -> Result<()>;
63}
64
65/// Produces metrics for a [MetricReader] from an external source.
66pub trait MetricProducer: fmt::Debug + Send + Sync {
67    /// Returns aggregated metrics from an external source.
68    fn produce(&self) -> Result<ScopeMetrics>;
69}
70
71/// An interface for selecting the temporality for an [InstrumentKind].
72pub trait TemporalitySelector: Send + Sync {
73    /// Selects the temporality to use based on the [InstrumentKind].
74    fn temporality(&self, kind: InstrumentKind) -> Temporality;
75}
76
77/// The default temporality used if not specified for a given [InstrumentKind].
78///
79/// [Temporality::Cumulative] will be used for all instrument kinds if this
80/// [TemporalitySelector] is used.
81#[derive(Clone, Default, Debug)]
82pub struct DefaultTemporalitySelector {
83    pub(crate) _private: (),
84}
85
86impl DefaultTemporalitySelector {
87    /// Create a new default temporality selector.
88    pub fn new() -> Self {
89        Self::default()
90    }
91}
92
93impl TemporalitySelector for DefaultTemporalitySelector {
94    fn temporality(&self, _kind: InstrumentKind) -> Temporality {
95        Temporality::Cumulative
96    }
97}
98
99/// An interface for selecting the aggregation and the parameters for an
100/// [InstrumentKind].
101pub trait AggregationSelector: Send + Sync {
102    /// Selects the aggregation and the parameters to use for that aggregation based on
103    /// the [InstrumentKind].
104    fn aggregation(&self, kind: InstrumentKind) -> Aggregation;
105}
106
107impl<T> AggregationSelector for T
108where
109    T: Fn(InstrumentKind) -> Aggregation + Send + Sync,
110{
111    fn aggregation(&self, kind: InstrumentKind) -> Aggregation {
112        self(kind)
113    }
114}
115
116/// The default aggregation and parameters for an instrument of [InstrumentKind].
117///
118/// This [AggregationSelector] uses the following selection mapping per [the spec]:
119///
120/// * Counter ⇨ Sum
121/// * Observable Counter ⇨ Sum
122/// * UpDownCounter ⇨ Sum
123/// * Observable UpDownCounter ⇨ Sum
124/// * Gauge ⇨ LastValue
125/// * Observable Gauge ⇨ LastValue
126/// * Histogram ⇨ ExplicitBucketHistogram
127///
128/// [the spec]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.19.0/specification/metrics/sdk.md#default-aggregation
129#[derive(Clone, Default, Debug)]
130pub struct DefaultAggregationSelector {
131    pub(crate) _private: (),
132}
133
134impl DefaultAggregationSelector {
135    /// Create a new default aggregation selector.
136    pub fn new() -> Self {
137        Self::default()
138    }
139}
140
141impl AggregationSelector for DefaultAggregationSelector {
142    fn aggregation(&self, kind: InstrumentKind) -> Aggregation {
143        match kind {
144            InstrumentKind::Counter
145            | InstrumentKind::UpDownCounter
146            | InstrumentKind::ObservableCounter
147            | InstrumentKind::ObservableUpDownCounter => Aggregation::Sum,
148            InstrumentKind::Gauge => Aggregation::LastValue,
149            InstrumentKind::ObservableGauge => Aggregation::LastValue,
150            InstrumentKind::Histogram => Aggregation::ExplicitBucketHistogram {
151                boundaries: vec![
152                    0.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 750.0, 1000.0, 2500.0,
153                    5000.0, 7500.0, 10000.0,
154                ],
155                record_min_max: true,
156            },
157        }
158    }
159}