opentelemetry_otlp/
metric.rs

1//! OTEL metric exporter
2//!
3//! Defines a [MetricsExporter] to send metric data to backend via OTLP protocol.
4//!
5
6use crate::{NoExporterConfig, OtlpPipeline};
7use async_trait::async_trait;
8use core::fmt;
9use opentelemetry::metrics::Result;
10
11#[cfg(feature = "grpc-tonic")]
12use crate::exporter::tonic::TonicExporterBuilder;
13use opentelemetry_sdk::{
14    metrics::{
15        data::{ResourceMetrics, Temporality},
16        exporter::PushMetricsExporter,
17        reader::{
18            AggregationSelector, DefaultAggregationSelector, DefaultTemporalitySelector,
19            TemporalitySelector,
20        },
21        Aggregation, InstrumentKind, PeriodicReader, SdkMeterProvider,
22    },
23    runtime::Runtime,
24    Resource,
25};
26use std::fmt::{Debug, Formatter};
27use std::time;
28
29#[cfg(feature = "http-proto")]
30use crate::exporter::http::HttpExporterBuilder;
31
32/// Target to which the exporter is going to send metrics, defaults to https://localhost:4317/v1/metrics.
33/// Learn about the relationship between this constant and default/spans/logs at
34/// <https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/exporter.md#endpoint-urls-for-otlphttp>
35pub const OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: &str = "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT";
36/// Max waiting time for the backend to process each metrics batch, defaults to 10s.
37pub const OTEL_EXPORTER_OTLP_METRICS_TIMEOUT: &str = "OTEL_EXPORTER_OTLP_METRICS_TIMEOUT";
38/// Compression algorithm to use, defaults to none.
39pub const OTEL_EXPORTER_OTLP_METRICS_COMPRESSION: &str = "OTEL_EXPORTER_OTLP_METRICS_COMPRESSION";
40/// Key-value pairs to be used as headers associated with gRPC or HTTP requests
41/// for sending metrics.
42/// Example: `k1=v1,k2=v2`
43/// Note: this is only supported for HTTP.
44pub const OTEL_EXPORTER_OTLP_METRICS_HEADERS: &str = "OTEL_EXPORTER_OTLP_METRICS_HEADERS";
45impl OtlpPipeline {
46    /// Create a OTLP metrics pipeline.
47    pub fn metrics<RT>(self, rt: RT) -> OtlpMetricPipeline<RT, NoExporterConfig>
48    where
49        RT: Runtime,
50    {
51        OtlpMetricPipeline {
52            rt,
53            aggregator_selector: None,
54            temporality_selector: None,
55            exporter_pipeline: NoExporterConfig(()),
56            resource: None,
57            period: None,
58            timeout: None,
59        }
60    }
61}
62
63/// OTLP metrics exporter builder.
64#[derive(Debug)]
65#[non_exhaustive]
66pub enum MetricsExporterBuilder {
67    /// Tonic metrics exporter builder
68    #[cfg(feature = "grpc-tonic")]
69    Tonic(TonicExporterBuilder),
70    /// Http metrics exporter builder
71    #[cfg(feature = "http-proto")]
72    Http(HttpExporterBuilder),
73
74    /// Missing exporter builder
75    #[doc(hidden)]
76    #[cfg(not(any(feature = "http-proto", feature = "grpc-tonic")))]
77    Unconfigured,
78}
79
80impl MetricsExporterBuilder {
81    /// Build a OTLP metrics exporter with given configuration.
82    pub fn build_metrics_exporter(
83        self,
84        temporality_selector: Box<dyn TemporalitySelector>,
85        aggregation_selector: Box<dyn AggregationSelector>,
86    ) -> Result<MetricsExporter> {
87        match self {
88            #[cfg(feature = "grpc-tonic")]
89            MetricsExporterBuilder::Tonic(builder) => {
90                builder.build_metrics_exporter(aggregation_selector, temporality_selector)
91            }
92            #[cfg(feature = "http-proto")]
93            MetricsExporterBuilder::Http(builder) => {
94                builder.build_metrics_exporter(aggregation_selector, temporality_selector)
95            }
96            #[cfg(not(any(feature = "http-proto", feature = "grpc-tonic")))]
97            MetricsExporterBuilder::Unconfigured => {
98                drop(temporality_selector);
99                drop(aggregation_selector);
100                Err(opentelemetry::metrics::MetricsError::Other(
101                    "no configured metrics exporter, enable `http-proto` or `grpc-tonic` feature to configure a metrics exporter".into(),
102                ))
103            }
104        }
105    }
106}
107
108#[cfg(feature = "grpc-tonic")]
109impl From<TonicExporterBuilder> for MetricsExporterBuilder {
110    fn from(exporter: TonicExporterBuilder) -> Self {
111        MetricsExporterBuilder::Tonic(exporter)
112    }
113}
114
115#[cfg(feature = "http-proto")]
116impl From<HttpExporterBuilder> for MetricsExporterBuilder {
117    fn from(exporter: HttpExporterBuilder) -> Self {
118        MetricsExporterBuilder::Http(exporter)
119    }
120}
121
122/// Pipeline to build OTLP metrics exporter
123///
124/// Note that currently the OTLP metrics exporter only supports tonic as it's grpc layer and tokio as
125/// runtime.
126pub struct OtlpMetricPipeline<RT, EB> {
127    rt: RT,
128    aggregator_selector: Option<Box<dyn AggregationSelector>>,
129    temporality_selector: Option<Box<dyn TemporalitySelector>>,
130    exporter_pipeline: EB,
131    resource: Option<Resource>,
132    period: Option<time::Duration>,
133    timeout: Option<time::Duration>,
134}
135
136impl<RT, EB> OtlpMetricPipeline<RT, EB>
137where
138    RT: Runtime,
139{
140    /// Build with resource key value pairs.
141    pub fn with_resource(self, resource: Resource) -> Self {
142        OtlpMetricPipeline {
143            resource: Some(resource),
144            ..self
145        }
146    }
147
148    /// Build with timeout
149    pub fn with_timeout(self, timeout: time::Duration) -> Self {
150        OtlpMetricPipeline {
151            timeout: Some(timeout),
152            ..self
153        }
154    }
155
156    /// Build with period, your metrics will be exported with this period
157    pub fn with_period(self, period: time::Duration) -> Self {
158        OtlpMetricPipeline {
159            period: Some(period),
160            ..self
161        }
162    }
163
164    /// Build with the given temporality selector
165    pub fn with_temporality_selector<T: TemporalitySelector + 'static>(self, selector: T) -> Self {
166        OtlpMetricPipeline {
167            temporality_selector: Some(Box::new(selector)),
168            ..self
169        }
170    }
171
172    /// Build with delta temporality selector.
173    ///
174    /// This temporality selector is equivalent to OTLP Metrics Exporter's
175    /// `Delta` temporality preference (see [its documentation][exporter-docs]).
176    ///
177    /// [exporter-docs]: https://github.com/open-telemetry/opentelemetry-specification/blob/a1c13d59bb7d0fb086df2b3e1eaec9df9efef6cc/specification/metrics/sdk_exporters/otlp.md#additional-configuration
178    pub fn with_delta_temporality(self) -> Self {
179        self.with_temporality_selector(DeltaTemporalitySelector)
180    }
181
182    /// Build with the given aggregation selector
183    pub fn with_aggregation_selector<T: AggregationSelector + 'static>(self, selector: T) -> Self {
184        OtlpMetricPipeline {
185            aggregator_selector: Some(Box::new(selector)),
186            ..self
187        }
188    }
189}
190
191impl<RT> OtlpMetricPipeline<RT, NoExporterConfig>
192where
193    RT: Runtime,
194{
195    /// Build with the exporter
196    pub fn with_exporter<B: Into<MetricsExporterBuilder>>(
197        self,
198        pipeline: B,
199    ) -> OtlpMetricPipeline<RT, MetricsExporterBuilder> {
200        OtlpMetricPipeline {
201            exporter_pipeline: pipeline.into(),
202            rt: self.rt,
203            aggregator_selector: self.aggregator_selector,
204            temporality_selector: self.temporality_selector,
205            resource: self.resource,
206            period: self.period,
207            timeout: self.timeout,
208        }
209    }
210}
211
212impl<RT> OtlpMetricPipeline<RT, MetricsExporterBuilder>
213where
214    RT: Runtime,
215{
216    /// Build MeterProvider
217    pub fn build(self) -> Result<SdkMeterProvider> {
218        let exporter = self.exporter_pipeline.build_metrics_exporter(
219            self.temporality_selector
220                .unwrap_or_else(|| Box::new(DefaultTemporalitySelector::new())),
221            self.aggregator_selector
222                .unwrap_or_else(|| Box::new(DefaultAggregationSelector::new())),
223        )?;
224
225        let mut builder = PeriodicReader::builder(exporter, self.rt);
226
227        if let Some(period) = self.period {
228            builder = builder.with_interval(period);
229        }
230        if let Some(timeout) = self.timeout {
231            builder = builder.with_timeout(timeout)
232        }
233
234        let reader = builder.build();
235
236        let mut provider = SdkMeterProvider::builder().with_reader(reader);
237
238        if let Some(resource) = self.resource {
239            provider = provider.with_resource(resource);
240        }
241
242        let provider = provider.build();
243        Ok(provider)
244    }
245}
246
247impl<RT, EB: Debug> Debug for OtlpMetricPipeline<RT, EB> {
248    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
249        f.debug_struct("OtlpMetricPipeline")
250            .field("exporter_pipeline", &self.exporter_pipeline)
251            .field("resource", &self.resource)
252            .field("period", &self.period)
253            .field("timeout", &self.timeout)
254            .finish()
255    }
256}
257
258/// A temporality selector that returns [`Delta`][Temporality::Delta] for all
259/// instruments except `UpDownCounter` and `ObservableUpDownCounter`.
260///
261/// This temporality selector is equivalent to OTLP Metrics Exporter's
262/// `Delta` temporality preference (see [its documentation][exporter-docs]).
263///
264/// [exporter-docs]: https://github.com/open-telemetry/opentelemetry-specification/blob/a1c13d59bb7d0fb086df2b3e1eaec9df9efef6cc/specification/metrics/sdk_exporters/otlp.md#additional-configuration
265#[derive(Debug)]
266struct DeltaTemporalitySelector;
267
268impl TemporalitySelector for DeltaTemporalitySelector {
269    #[rustfmt::skip]
270    fn temporality(&self, kind: InstrumentKind) -> Temporality {
271        match kind {
272            InstrumentKind::Counter
273            | InstrumentKind::Histogram
274            | InstrumentKind::ObservableCounter
275            | InstrumentKind::Gauge
276            | InstrumentKind::ObservableGauge => {
277                Temporality::Delta
278            }
279            InstrumentKind::UpDownCounter
280            | InstrumentKind::ObservableUpDownCounter => {
281                Temporality::Cumulative
282            }
283        }
284    }
285}
286
287/// An interface for OTLP metrics clients
288#[async_trait]
289pub trait MetricsClient: fmt::Debug + Send + Sync + 'static {
290    async fn export(&self, metrics: &mut ResourceMetrics) -> Result<()>;
291    fn shutdown(&self) -> Result<()>;
292}
293
294/// Export metrics in OTEL format.
295pub struct MetricsExporter {
296    client: Box<dyn MetricsClient>,
297    temporality_selector: Box<dyn TemporalitySelector>,
298    aggregation_selector: Box<dyn AggregationSelector>,
299}
300
301impl Debug for MetricsExporter {
302    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
303        f.debug_struct("MetricsExporter").finish()
304    }
305}
306
307impl TemporalitySelector for MetricsExporter {
308    fn temporality(&self, kind: InstrumentKind) -> Temporality {
309        self.temporality_selector.temporality(kind)
310    }
311}
312
313impl AggregationSelector for MetricsExporter {
314    fn aggregation(&self, kind: InstrumentKind) -> Aggregation {
315        self.aggregation_selector.aggregation(kind)
316    }
317}
318
319#[async_trait]
320impl PushMetricsExporter for MetricsExporter {
321    async fn export(&self, metrics: &mut ResourceMetrics) -> Result<()> {
322        self.client.export(metrics).await
323    }
324
325    async fn force_flush(&self) -> Result<()> {
326        // this component is stateless
327        Ok(())
328    }
329
330    fn shutdown(&self) -> Result<()> {
331        self.client.shutdown()
332    }
333}
334
335impl MetricsExporter {
336    /// Create a new metrics exporter
337    pub fn new(
338        client: impl MetricsClient,
339        temporality_selector: Box<dyn TemporalitySelector>,
340        aggregation_selector: Box<dyn AggregationSelector>,
341    ) -> MetricsExporter {
342        MetricsExporter {
343            client: Box::new(client),
344            temporality_selector,
345            aggregation_selector,
346        }
347    }
348}