1use crate::{NoExporterConfig, OtlpPipeline};
7use async_trait::async_trait;
8use core::fmt;
9use opentelemetry::metrics::Result;
10
11#[cfg(feature = "grpc-tonic")]
12use crate::exporter::tonic::TonicExporterBuilder;
13use opentelemetry_sdk::{
14 metrics::{
15 data::{ResourceMetrics, Temporality},
16 exporter::PushMetricsExporter,
17 reader::{
18 AggregationSelector, DefaultAggregationSelector, DefaultTemporalitySelector,
19 TemporalitySelector,
20 },
21 Aggregation, InstrumentKind, PeriodicReader, SdkMeterProvider,
22 },
23 runtime::Runtime,
24 Resource,
25};
26use std::fmt::{Debug, Formatter};
27use std::time;
28
29#[cfg(feature = "http-proto")]
30use crate::exporter::http::HttpExporterBuilder;
31
32pub const OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: &str = "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT";
36pub const OTEL_EXPORTER_OTLP_METRICS_TIMEOUT: &str = "OTEL_EXPORTER_OTLP_METRICS_TIMEOUT";
38pub const OTEL_EXPORTER_OTLP_METRICS_COMPRESSION: &str = "OTEL_EXPORTER_OTLP_METRICS_COMPRESSION";
40pub const OTEL_EXPORTER_OTLP_METRICS_HEADERS: &str = "OTEL_EXPORTER_OTLP_METRICS_HEADERS";
45impl OtlpPipeline {
46 pub fn metrics<RT>(self, rt: RT) -> OtlpMetricPipeline<RT, NoExporterConfig>
48 where
49 RT: Runtime,
50 {
51 OtlpMetricPipeline {
52 rt,
53 aggregator_selector: None,
54 temporality_selector: None,
55 exporter_pipeline: NoExporterConfig(()),
56 resource: None,
57 period: None,
58 timeout: None,
59 }
60 }
61}
62
63#[derive(Debug)]
65#[non_exhaustive]
66pub enum MetricsExporterBuilder {
67 #[cfg(feature = "grpc-tonic")]
69 Tonic(TonicExporterBuilder),
70 #[cfg(feature = "http-proto")]
72 Http(HttpExporterBuilder),
73
74 #[doc(hidden)]
76 #[cfg(not(any(feature = "http-proto", feature = "grpc-tonic")))]
77 Unconfigured,
78}
79
80impl MetricsExporterBuilder {
81 pub fn build_metrics_exporter(
83 self,
84 temporality_selector: Box<dyn TemporalitySelector>,
85 aggregation_selector: Box<dyn AggregationSelector>,
86 ) -> Result<MetricsExporter> {
87 match self {
88 #[cfg(feature = "grpc-tonic")]
89 MetricsExporterBuilder::Tonic(builder) => {
90 builder.build_metrics_exporter(aggregation_selector, temporality_selector)
91 }
92 #[cfg(feature = "http-proto")]
93 MetricsExporterBuilder::Http(builder) => {
94 builder.build_metrics_exporter(aggregation_selector, temporality_selector)
95 }
96 #[cfg(not(any(feature = "http-proto", feature = "grpc-tonic")))]
97 MetricsExporterBuilder::Unconfigured => {
98 drop(temporality_selector);
99 drop(aggregation_selector);
100 Err(opentelemetry::metrics::MetricsError::Other(
101 "no configured metrics exporter, enable `http-proto` or `grpc-tonic` feature to configure a metrics exporter".into(),
102 ))
103 }
104 }
105 }
106}
107
108#[cfg(feature = "grpc-tonic")]
109impl From<TonicExporterBuilder> for MetricsExporterBuilder {
110 fn from(exporter: TonicExporterBuilder) -> Self {
111 MetricsExporterBuilder::Tonic(exporter)
112 }
113}
114
115#[cfg(feature = "http-proto")]
116impl From<HttpExporterBuilder> for MetricsExporterBuilder {
117 fn from(exporter: HttpExporterBuilder) -> Self {
118 MetricsExporterBuilder::Http(exporter)
119 }
120}
121
122pub struct OtlpMetricPipeline<RT, EB> {
127 rt: RT,
128 aggregator_selector: Option<Box<dyn AggregationSelector>>,
129 temporality_selector: Option<Box<dyn TemporalitySelector>>,
130 exporter_pipeline: EB,
131 resource: Option<Resource>,
132 period: Option<time::Duration>,
133 timeout: Option<time::Duration>,
134}
135
136impl<RT, EB> OtlpMetricPipeline<RT, EB>
137where
138 RT: Runtime,
139{
140 pub fn with_resource(self, resource: Resource) -> Self {
142 OtlpMetricPipeline {
143 resource: Some(resource),
144 ..self
145 }
146 }
147
148 pub fn with_timeout(self, timeout: time::Duration) -> Self {
150 OtlpMetricPipeline {
151 timeout: Some(timeout),
152 ..self
153 }
154 }
155
156 pub fn with_period(self, period: time::Duration) -> Self {
158 OtlpMetricPipeline {
159 period: Some(period),
160 ..self
161 }
162 }
163
164 pub fn with_temporality_selector<T: TemporalitySelector + 'static>(self, selector: T) -> Self {
166 OtlpMetricPipeline {
167 temporality_selector: Some(Box::new(selector)),
168 ..self
169 }
170 }
171
172 pub fn with_delta_temporality(self) -> Self {
179 self.with_temporality_selector(DeltaTemporalitySelector)
180 }
181
182 pub fn with_aggregation_selector<T: AggregationSelector + 'static>(self, selector: T) -> Self {
184 OtlpMetricPipeline {
185 aggregator_selector: Some(Box::new(selector)),
186 ..self
187 }
188 }
189}
190
191impl<RT> OtlpMetricPipeline<RT, NoExporterConfig>
192where
193 RT: Runtime,
194{
195 pub fn with_exporter<B: Into<MetricsExporterBuilder>>(
197 self,
198 pipeline: B,
199 ) -> OtlpMetricPipeline<RT, MetricsExporterBuilder> {
200 OtlpMetricPipeline {
201 exporter_pipeline: pipeline.into(),
202 rt: self.rt,
203 aggregator_selector: self.aggregator_selector,
204 temporality_selector: self.temporality_selector,
205 resource: self.resource,
206 period: self.period,
207 timeout: self.timeout,
208 }
209 }
210}
211
212impl<RT> OtlpMetricPipeline<RT, MetricsExporterBuilder>
213where
214 RT: Runtime,
215{
216 pub fn build(self) -> Result<SdkMeterProvider> {
218 let exporter = self.exporter_pipeline.build_metrics_exporter(
219 self.temporality_selector
220 .unwrap_or_else(|| Box::new(DefaultTemporalitySelector::new())),
221 self.aggregator_selector
222 .unwrap_or_else(|| Box::new(DefaultAggregationSelector::new())),
223 )?;
224
225 let mut builder = PeriodicReader::builder(exporter, self.rt);
226
227 if let Some(period) = self.period {
228 builder = builder.with_interval(period);
229 }
230 if let Some(timeout) = self.timeout {
231 builder = builder.with_timeout(timeout)
232 }
233
234 let reader = builder.build();
235
236 let mut provider = SdkMeterProvider::builder().with_reader(reader);
237
238 if let Some(resource) = self.resource {
239 provider = provider.with_resource(resource);
240 }
241
242 let provider = provider.build();
243 Ok(provider)
244 }
245}
246
247impl<RT, EB: Debug> Debug for OtlpMetricPipeline<RT, EB> {
248 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
249 f.debug_struct("OtlpMetricPipeline")
250 .field("exporter_pipeline", &self.exporter_pipeline)
251 .field("resource", &self.resource)
252 .field("period", &self.period)
253 .field("timeout", &self.timeout)
254 .finish()
255 }
256}
257
258#[derive(Debug)]
266struct DeltaTemporalitySelector;
267
268impl TemporalitySelector for DeltaTemporalitySelector {
269 #[rustfmt::skip]
270 fn temporality(&self, kind: InstrumentKind) -> Temporality {
271 match kind {
272 InstrumentKind::Counter
273 | InstrumentKind::Histogram
274 | InstrumentKind::ObservableCounter
275 | InstrumentKind::Gauge
276 | InstrumentKind::ObservableGauge => {
277 Temporality::Delta
278 }
279 InstrumentKind::UpDownCounter
280 | InstrumentKind::ObservableUpDownCounter => {
281 Temporality::Cumulative
282 }
283 }
284 }
285}
286
287#[async_trait]
289pub trait MetricsClient: fmt::Debug + Send + Sync + 'static {
290 async fn export(&self, metrics: &mut ResourceMetrics) -> Result<()>;
291 fn shutdown(&self) -> Result<()>;
292}
293
294pub struct MetricsExporter {
296 client: Box<dyn MetricsClient>,
297 temporality_selector: Box<dyn TemporalitySelector>,
298 aggregation_selector: Box<dyn AggregationSelector>,
299}
300
301impl Debug for MetricsExporter {
302 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
303 f.debug_struct("MetricsExporter").finish()
304 }
305}
306
307impl TemporalitySelector for MetricsExporter {
308 fn temporality(&self, kind: InstrumentKind) -> Temporality {
309 self.temporality_selector.temporality(kind)
310 }
311}
312
313impl AggregationSelector for MetricsExporter {
314 fn aggregation(&self, kind: InstrumentKind) -> Aggregation {
315 self.aggregation_selector.aggregation(kind)
316 }
317}
318
319#[async_trait]
320impl PushMetricsExporter for MetricsExporter {
321 async fn export(&self, metrics: &mut ResourceMetrics) -> Result<()> {
322 self.client.export(metrics).await
323 }
324
325 async fn force_flush(&self) -> Result<()> {
326 Ok(())
328 }
329
330 fn shutdown(&self) -> Result<()> {
331 self.client.shutdown()
332 }
333}
334
335impl MetricsExporter {
336 pub fn new(
338 client: impl MetricsClient,
339 temporality_selector: Box<dyn TemporalitySelector>,
340 aggregation_selector: Box<dyn AggregationSelector>,
341 ) -> MetricsExporter {
342 MetricsExporter {
343 client: Box::new(client),
344 temporality_selector,
345 aggregation_selector,
346 }
347 }
348}