opentelemetry_sdk/metrics/reader.rs
1//! Interfaces for reading and producing metrics
2use std::{fmt, sync::Weak};
3
4use opentelemetry::metrics::Result;
5
6use super::{
7 aggregation::Aggregation,
8 data::{ResourceMetrics, ScopeMetrics, Temporality},
9 instrument::InstrumentKind,
10 pipeline::Pipeline,
11};
12
13/// The interface used between the SDK and an exporter.
14///
15/// Control flow is bi-directional through the `MetricReader`, since the SDK
16/// initiates `force_flush` and `shutdown` while the reader initiates
17/// collection. The `register_pipeline` method here informs the metric reader
18/// that it can begin reading, signaling the start of bi-directional control
19/// flow.
20///
21/// Typically, push-based exporters that are periodic will implement
22/// `MetricExporter` themselves and construct a `PeriodicReader` to satisfy this
23/// interface.
24///
25/// Pull-based exporters will typically implement `MetricReader` themselves,
26/// since they read on demand.
27pub trait MetricReader:
28 AggregationSelector + TemporalitySelector + fmt::Debug + Send + Sync + 'static
29{
30 /// Registers a [MetricReader] with a [Pipeline].
31 ///
32 /// The pipeline argument allows the `MetricReader` to signal the sdk to collect
33 /// and send aggregated metric measurements.
34 fn register_pipeline(&self, pipeline: Weak<Pipeline>);
35
36 /// Gathers and returns all metric data related to the [MetricReader] from the
37 /// SDK and stores it in the provided [ResourceMetrics] reference.
38 ///
39 /// An error is returned if this is called after shutdown.
40 fn collect(&self, rm: &mut ResourceMetrics) -> Result<()>;
41
42 /// Flushes all metric measurements held in an export pipeline.
43 ///
44 /// There is no guaranteed that all telemetry be flushed or all resources have
45 /// been released on error.
46 fn force_flush(&self) -> Result<()>;
47
48 /// Flushes all metric measurements held in an export pipeline and releases any
49 /// held computational resources.
50 ///
51 /// There is no guaranteed that all telemetry be flushed or all resources have
52 /// been released on error.
53 ///
54 /// After `shutdown` is called, calls to `collect` will perform no operation and
55 /// instead will return an error indicating the shutdown state.
56 fn shutdown(&self) -> Result<()>;
57}
58
59/// Produces metrics for a [MetricReader].
60pub(crate) trait SdkProducer: fmt::Debug + Send + Sync {
61 /// Returns aggregated metrics from a single collection.
62 fn produce(&self, rm: &mut ResourceMetrics) -> Result<()>;
63}
64
65/// Produces metrics for a [MetricReader] from an external source.
66pub trait MetricProducer: fmt::Debug + Send + Sync {
67 /// Returns aggregated metrics from an external source.
68 fn produce(&self) -> Result<ScopeMetrics>;
69}
70
71/// An interface for selecting the temporality for an [InstrumentKind].
72pub trait TemporalitySelector: Send + Sync {
73 /// Selects the temporality to use based on the [InstrumentKind].
74 fn temporality(&self, kind: InstrumentKind) -> Temporality;
75}
76
77/// The default temporality used if not specified for a given [InstrumentKind].
78///
79/// [Temporality::Cumulative] will be used for all instrument kinds if this
80/// [TemporalitySelector] is used.
81#[derive(Clone, Default, Debug)]
82pub struct DefaultTemporalitySelector {
83 pub(crate) _private: (),
84}
85
86impl DefaultTemporalitySelector {
87 /// Create a new default temporality selector.
88 pub fn new() -> Self {
89 Self::default()
90 }
91}
92
93impl TemporalitySelector for DefaultTemporalitySelector {
94 fn temporality(&self, _kind: InstrumentKind) -> Temporality {
95 Temporality::Cumulative
96 }
97}
98
99/// An interface for selecting the aggregation and the parameters for an
100/// [InstrumentKind].
101pub trait AggregationSelector: Send + Sync {
102 /// Selects the aggregation and the parameters to use for that aggregation based on
103 /// the [InstrumentKind].
104 fn aggregation(&self, kind: InstrumentKind) -> Aggregation;
105}
106
107impl<T> AggregationSelector for T
108where
109 T: Fn(InstrumentKind) -> Aggregation + Send + Sync,
110{
111 fn aggregation(&self, kind: InstrumentKind) -> Aggregation {
112 self(kind)
113 }
114}
115
116/// The default aggregation and parameters for an instrument of [InstrumentKind].
117///
118/// This [AggregationSelector] uses the following selection mapping per [the spec]:
119///
120/// * Counter ⇨ Sum
121/// * Observable Counter ⇨ Sum
122/// * UpDownCounter ⇨ Sum
123/// * Observable UpDownCounter ⇨ Sum
124/// * Gauge ⇨ LastValue
125/// * Observable Gauge ⇨ LastValue
126/// * Histogram ⇨ ExplicitBucketHistogram
127///
128/// [the spec]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.19.0/specification/metrics/sdk.md#default-aggregation
129#[derive(Clone, Default, Debug)]
130pub struct DefaultAggregationSelector {
131 pub(crate) _private: (),
132}
133
134impl DefaultAggregationSelector {
135 /// Create a new default aggregation selector.
136 pub fn new() -> Self {
137 Self::default()
138 }
139}
140
141impl AggregationSelector for DefaultAggregationSelector {
142 fn aggregation(&self, kind: InstrumentKind) -> Aggregation {
143 match kind {
144 InstrumentKind::Counter
145 | InstrumentKind::UpDownCounter
146 | InstrumentKind::ObservableCounter
147 | InstrumentKind::ObservableUpDownCounter => Aggregation::Sum,
148 InstrumentKind::Gauge => Aggregation::LastValue,
149 InstrumentKind::ObservableGauge => Aggregation::LastValue,
150 InstrumentKind::Histogram => Aggregation::ExplicitBucketHistogram {
151 boundaries: vec![
152 0.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 750.0, 1000.0, 2500.0,
153 5000.0, 7500.0, 10000.0,
154 ],
155 record_min_max: true,
156 },
157 }
158 }
159}