opentelemetry_sdk/metrics/
reader.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
//! Interfaces for reading and producing metrics
use std::{fmt, sync::Weak};

use opentelemetry::metrics::Result;

use super::{
    aggregation::Aggregation,
    data::{ResourceMetrics, ScopeMetrics, Temporality},
    instrument::InstrumentKind,
    pipeline::Pipeline,
};

/// The interface used between the SDK and an exporter.
///
/// Control flow is bi-directional through the `MetricReader`, since the SDK
/// initiates `force_flush` and `shutdown` while the reader initiates
/// collection. The `register_pipeline` method here informs the metric reader
/// that it can begin reading, signaling the start of bi-directional control
/// flow.
///
/// Typically, push-based exporters that are periodic will implement
/// `MetricExporter` themselves and construct a `PeriodicReader` to satisfy this
/// interface.
///
/// Pull-based exporters will typically implement `MetricReader` themselves,
/// since they read on demand.
pub trait MetricReader:
    AggregationSelector + TemporalitySelector + fmt::Debug + Send + Sync + 'static
{
    /// Registers a [MetricReader] with a [Pipeline].
    ///
    /// The pipeline argument allows the `MetricReader` to signal the sdk to collect
    /// and send aggregated metric measurements.
    fn register_pipeline(&self, pipeline: Weak<Pipeline>);

    /// Gathers and returns all metric data related to the [MetricReader] from the
    /// SDK and stores it in the provided [ResourceMetrics] reference.
    ///
    /// An error is returned if this is called after shutdown.
    fn collect(&self, rm: &mut ResourceMetrics) -> Result<()>;

    /// Flushes all metric measurements held in an export pipeline.
    ///
    /// There is no guaranteed that all telemetry be flushed or all resources have
    /// been released on error.
    fn force_flush(&self) -> Result<()>;

    /// Flushes all metric measurements held in an export pipeline and releases any
    /// held computational resources.
    ///
    /// There is no guaranteed that all telemetry be flushed or all resources have
    /// been released on error.
    ///
    /// After `shutdown` is called, calls to `collect` will perform no operation and
    /// instead will return an error indicating the shutdown state.
    fn shutdown(&self) -> Result<()>;
}

/// Produces metrics for a [MetricReader].
pub(crate) trait SdkProducer: fmt::Debug + Send + Sync {
    /// Returns aggregated metrics from a single collection.
    fn produce(&self, rm: &mut ResourceMetrics) -> Result<()>;
}

/// Produces metrics for a [MetricReader] from an external source.
pub trait MetricProducer: fmt::Debug + Send + Sync {
    /// Returns aggregated metrics from an external source.
    fn produce(&self) -> Result<ScopeMetrics>;
}

/// An interface for selecting the temporality for an [InstrumentKind].
pub trait TemporalitySelector: Send + Sync {
    /// Selects the temporality to use based on the [InstrumentKind].
    fn temporality(&self, kind: InstrumentKind) -> Temporality;
}

/// The default temporality used if not specified for a given [InstrumentKind].
///
/// [Temporality::Cumulative] will be used for all instrument kinds if this
/// [TemporalitySelector] is used.
#[derive(Clone, Default, Debug)]
pub struct DefaultTemporalitySelector {
    pub(crate) _private: (),
}

impl DefaultTemporalitySelector {
    /// Create a new default temporality selector.
    pub fn new() -> Self {
        Self::default()
    }
}

impl TemporalitySelector for DefaultTemporalitySelector {
    fn temporality(&self, _kind: InstrumentKind) -> Temporality {
        Temporality::Cumulative
    }
}

/// An interface for selecting the aggregation and the parameters for an
/// [InstrumentKind].
pub trait AggregationSelector: Send + Sync {
    /// Selects the aggregation and the parameters to use for that aggregation based on
    /// the [InstrumentKind].
    fn aggregation(&self, kind: InstrumentKind) -> Aggregation;
}

impl<T> AggregationSelector for T
where
    T: Fn(InstrumentKind) -> Aggregation + Send + Sync,
{
    fn aggregation(&self, kind: InstrumentKind) -> Aggregation {
        self(kind)
    }
}

/// The default aggregation and parameters for an instrument of [InstrumentKind].
///
/// This [AggregationSelector] uses the following selection mapping per [the spec]:
///
/// * Counter ⇨ Sum
/// * Observable Counter ⇨ Sum
/// * UpDownCounter ⇨ Sum
/// * Observable UpDownCounter ⇨ Sum
/// * Gauge ⇨ LastValue
/// * Observable Gauge ⇨ LastValue
/// * Histogram ⇨ ExplicitBucketHistogram
///
/// [the spec]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.19.0/specification/metrics/sdk.md#default-aggregation
#[derive(Clone, Default, Debug)]
pub struct DefaultAggregationSelector {
    pub(crate) _private: (),
}

impl DefaultAggregationSelector {
    /// Create a new default aggregation selector.
    pub fn new() -> Self {
        Self::default()
    }
}

impl AggregationSelector for DefaultAggregationSelector {
    fn aggregation(&self, kind: InstrumentKind) -> Aggregation {
        match kind {
            InstrumentKind::Counter
            | InstrumentKind::UpDownCounter
            | InstrumentKind::ObservableCounter
            | InstrumentKind::ObservableUpDownCounter => Aggregation::Sum,
            InstrumentKind::Gauge => Aggregation::LastValue,
            InstrumentKind::ObservableGauge => Aggregation::LastValue,
            InstrumentKind::Histogram => Aggregation::ExplicitBucketHistogram {
                boundaries: vec![
                    0.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 750.0, 1000.0, 2500.0,
                    5000.0, 7500.0, 10000.0,
                ],
                record_min_max: true,
            },
        }
    }
}