1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
//! Interfaces for reading and producing metrics
use std::{fmt, sync::Weak};
use opentelemetry::metrics::Result;
use super::{
aggregation::Aggregation,
data::{ResourceMetrics, ScopeMetrics, Temporality},
instrument::InstrumentKind,
pipeline::Pipeline,
};
/// The interface used between the SDK and an exporter.
///
/// Control flow is bi-directional through the `MetricReader`, since the SDK
/// initiates `force_flush` and `shutdown` while the reader initiates
/// collection. The `register_pipeline` method here informs the metric reader
/// that it can begin reading, signaling the start of bi-directional control
/// flow.
///
/// Typically, push-based exporters that are periodic will implement
/// `MetricExporter` themselves and construct a `PeriodicReader` to satisfy this
/// interface.
///
/// Pull-based exporters will typically implement `MetricReader` themselves,
/// since they read on demand.
pub trait MetricReader:
AggregationSelector + TemporalitySelector + fmt::Debug + Send + Sync + 'static
{
/// Registers a [MetricReader] with a [Pipeline].
///
/// The pipeline argument allows the `MetricReader` to signal the sdk to collect
/// and send aggregated metric measurements.
fn register_pipeline(&self, pipeline: Weak<Pipeline>);
/// Gathers and returns all metric data related to the [MetricReader] from the
/// SDK and stores it in the provided [ResourceMetrics] reference.
///
/// An error is returned if this is called after shutdown.
fn collect(&self, rm: &mut ResourceMetrics) -> Result<()>;
/// Flushes all metric measurements held in an export pipeline.
///
/// There is no guaranteed that all telemetry be flushed or all resources have
/// been released on error.
fn force_flush(&self) -> Result<()>;
/// Flushes all metric measurements held in an export pipeline and releases any
/// held computational resources.
///
/// There is no guaranteed that all telemetry be flushed or all resources have
/// been released on error.
///
/// After `shutdown` is called, calls to `collect` will perform no operation and
/// instead will return an error indicating the shutdown state.
fn shutdown(&self) -> Result<()>;
}
/// Produces metrics for a [MetricReader].
pub(crate) trait SdkProducer: fmt::Debug + Send + Sync {
/// Returns aggregated metrics from a single collection.
fn produce(&self, rm: &mut ResourceMetrics) -> Result<()>;
}
/// Produces metrics for a [MetricReader] from an external source.
pub trait MetricProducer: fmt::Debug + Send + Sync {
/// Returns aggregated metrics from an external source.
fn produce(&self) -> Result<ScopeMetrics>;
}
/// An interface for selecting the temporality for an [InstrumentKind].
pub trait TemporalitySelector: Send + Sync {
/// Selects the temporality to use based on the [InstrumentKind].
fn temporality(&self, kind: InstrumentKind) -> Temporality;
}
/// The default temporality used if not specified for a given [InstrumentKind].
///
/// [Temporality::Cumulative] will be used for all instrument kinds if this
/// [TemporalitySelector] is used.
#[derive(Clone, Default, Debug)]
pub struct DefaultTemporalitySelector {
pub(crate) _private: (),
}
impl DefaultTemporalitySelector {
/// Create a new default temporality selector.
pub fn new() -> Self {
Self::default()
}
}
impl TemporalitySelector for DefaultTemporalitySelector {
fn temporality(&self, _kind: InstrumentKind) -> Temporality {
Temporality::Cumulative
}
}
/// An interface for selecting the aggregation and the parameters for an
/// [InstrumentKind].
pub trait AggregationSelector: Send + Sync {
/// Selects the aggregation and the parameters to use for that aggregation based on
/// the [InstrumentKind].
fn aggregation(&self, kind: InstrumentKind) -> Aggregation;
}
impl<T> AggregationSelector for T
where
T: Fn(InstrumentKind) -> Aggregation + Send + Sync,
{
fn aggregation(&self, kind: InstrumentKind) -> Aggregation {
self(kind)
}
}
/// The default aggregation and parameters for an instrument of [InstrumentKind].
///
/// This [AggregationSelector] uses the following selection mapping per [the spec]:
///
/// * Counter ⇨ Sum
/// * Observable Counter ⇨ Sum
/// * UpDownCounter ⇨ Sum
/// * Observable UpDownCounter ⇨ Sum
/// * Gauge ⇨ LastValue
/// * Observable Gauge ⇨ LastValue
/// * Histogram ⇨ ExplicitBucketHistogram
///
/// [the spec]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.19.0/specification/metrics/sdk.md#default-aggregation
#[derive(Clone, Default, Debug)]
pub struct DefaultAggregationSelector {
pub(crate) _private: (),
}
impl DefaultAggregationSelector {
/// Create a new default aggregation selector.
pub fn new() -> Self {
Self::default()
}
}
impl AggregationSelector for DefaultAggregationSelector {
fn aggregation(&self, kind: InstrumentKind) -> Aggregation {
match kind {
InstrumentKind::Counter
| InstrumentKind::UpDownCounter
| InstrumentKind::ObservableCounter
| InstrumentKind::ObservableUpDownCounter => Aggregation::Sum,
InstrumentKind::Gauge => Aggregation::LastValue,
InstrumentKind::ObservableGauge => Aggregation::LastValue,
InstrumentKind::Histogram => Aggregation::ExplicitBucketHistogram {
boundaries: vec![
0.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 750.0, 1000.0, 2500.0,
5000.0, 7500.0, 10000.0,
],
record_min_max: true,
},
}
}
}