opentelemetry_sdk/metrics/
periodic_reader.rs

1use std::{
2    env, fmt, mem,
3    sync::{Arc, Mutex, Weak},
4    time::Duration,
5};
6
7use futures_channel::{mpsc, oneshot};
8use futures_util::{
9    future::{self, Either},
10    pin_mut,
11    stream::{self, FusedStream},
12    StreamExt,
13};
14use opentelemetry::{
15    global,
16    metrics::{MetricsError, Result},
17};
18
19use crate::runtime::Runtime;
20use crate::{
21    metrics::{
22        exporter::PushMetricsExporter,
23        reader::{MetricProducer, SdkProducer},
24    },
25    Resource,
26};
27
28use super::{
29    aggregation::Aggregation,
30    data::{ResourceMetrics, Temporality},
31    instrument::InstrumentKind,
32    reader::{AggregationSelector, MetricReader, TemporalitySelector},
33    Pipeline,
34};
35
36const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
37const DEFAULT_INTERVAL: Duration = Duration::from_secs(60);
38
39const METRIC_EXPORT_INTERVAL_NAME: &str = "OTEL_METRIC_EXPORT_INTERVAL";
40const METRIC_EXPORT_TIMEOUT_NAME: &str = "OTEL_METRIC_EXPORT_TIMEOUT";
41
42/// Configuration options for [PeriodicReader].
43///
44/// A periodic reader is a [MetricReader] that collects and exports metric data
45/// to the exporter at a defined interval.
46///
47/// By default, the returned [MetricReader] will collect and export data every
48/// 60 seconds, and will cancel export attempts that exceed 30 seconds. The
49/// export time is not counted towards the interval between attempts.
50///
51/// The [collect] method of the returned [MetricReader] continues to gather and
52/// return metric data to the user. It will not automatically send that data to
53/// the exporter outside of the predefined interval.
54///
55/// [collect]: MetricReader::collect
56#[derive(Debug)]
57pub struct PeriodicReaderBuilder<E, RT> {
58    interval: Duration,
59    timeout: Duration,
60    exporter: E,
61    producers: Vec<Box<dyn MetricProducer>>,
62    runtime: RT,
63}
64
65impl<E, RT> PeriodicReaderBuilder<E, RT>
66where
67    E: PushMetricsExporter,
68    RT: Runtime,
69{
70    fn new(exporter: E, runtime: RT) -> Self {
71        let interval = env::var(METRIC_EXPORT_INTERVAL_NAME)
72            .ok()
73            .and_then(|v| v.parse().map(Duration::from_millis).ok())
74            .unwrap_or(DEFAULT_INTERVAL);
75        let timeout = env::var(METRIC_EXPORT_TIMEOUT_NAME)
76            .ok()
77            .and_then(|v| v.parse().map(Duration::from_millis).ok())
78            .unwrap_or(DEFAULT_TIMEOUT);
79
80        PeriodicReaderBuilder {
81            interval,
82            timeout,
83            producers: vec![],
84            exporter,
85            runtime,
86        }
87    }
88
89    /// Configures the intervening time between exports for a [PeriodicReader].
90    ///
91    /// This option overrides any value set for the `OTEL_METRIC_EXPORT_INTERVAL`
92    /// environment variable.
93    ///
94    /// If this option is not used or `interval` is equal to zero, 60 seconds is
95    /// used as the default.
96    pub fn with_interval(mut self, interval: Duration) -> Self {
97        if !interval.is_zero() {
98            self.interval = interval;
99        }
100        self
101    }
102
103    /// Configures the time a [PeriodicReader] waits for an export to complete
104    /// before canceling it.
105    ///
106    /// This option overrides any value set for the `OTEL_METRIC_EXPORT_TIMEOUT`
107    /// environment variable.
108    ///
109    /// If this option is not used or `timeout` is equal to zero, 30 seconds is used
110    /// as the default.
111    pub fn with_timeout(mut self, timeout: Duration) -> Self {
112        if !timeout.is_zero() {
113            self.timeout = timeout;
114        }
115        self
116    }
117
118    /// Registers a an external [MetricProducer] with this reader.
119    ///
120    /// The producer is used as a source of aggregated metric data which is
121    /// incorporated into metrics collected from the SDK.
122    pub fn with_producer(mut self, producer: impl MetricProducer + 'static) -> Self {
123        self.producers.push(Box::new(producer));
124        self
125    }
126
127    /// Create a [PeriodicReader] with the given config.
128    pub fn build(self) -> PeriodicReader {
129        let (message_sender, message_receiver) = mpsc::channel(256);
130
131        let worker = move |reader: &PeriodicReader| {
132            let ticker = self
133                .runtime
134                .interval(self.interval)
135                .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval.
136                .map(|_| Message::Export);
137
138            let messages = Box::pin(stream::select(message_receiver, ticker));
139
140            let runtime = self.runtime.clone();
141            self.runtime.spawn(Box::pin(
142                PeriodicReaderWorker {
143                    reader: reader.clone(),
144                    timeout: self.timeout,
145                    runtime,
146                    rm: ResourceMetrics {
147                        resource: Resource::empty(),
148                        scope_metrics: Vec::new(),
149                    },
150                }
151                .run(messages),
152            ));
153        };
154
155        PeriodicReader {
156            exporter: Arc::new(self.exporter),
157            inner: Arc::new(Mutex::new(PeriodicReaderInner {
158                message_sender,
159                is_shutdown: false,
160                external_producers: self.producers,
161                sdk_producer_or_worker: ProducerOrWorker::Worker(Box::new(worker)),
162            })),
163        }
164    }
165}
166
167/// A [MetricReader] that continuously collects and exports metric data at a set
168/// interval.
169///
170/// By default it will collect and export data every 60 seconds, and will cancel
171/// export attempts that exceed 30 seconds. The export time is not counted
172/// towards the interval between attempts.
173///
174/// The [collect] method of the returned continues to gather and
175/// return metric data to the user. It will not automatically send that data to
176/// the exporter outside of the predefined interval.
177///
178/// The [runtime] can be selected based on feature flags set for this crate.
179///
180/// The exporter can be any exporter that implements [PushMetricsExporter] such
181/// as [opentelemetry-otlp].
182///
183/// [collect]: MetricReader::collect
184/// [runtime]: crate::runtime
185/// [opentelemetry-otlp]: https://docs.rs/opentelemetry-otlp/latest/opentelemetry_otlp/
186///
187/// # Example
188///
189/// ```no_run
190/// use opentelemetry_sdk::metrics::PeriodicReader;
191/// # fn example<E, R>(get_exporter: impl Fn() -> E, get_runtime: impl Fn() -> R)
192/// # where
193/// #     E: opentelemetry_sdk::metrics::exporter::PushMetricsExporter,
194/// #     R: opentelemetry_sdk::runtime::Runtime,
195/// # {
196///
197/// let exporter = get_exporter(); // set up a push exporter like OTLP
198/// let runtime = get_runtime(); // select runtime: e.g. opentelemetry_sdk:runtime::Tokio
199///
200/// let reader = PeriodicReader::builder(exporter, runtime).build();
201/// # drop(reader);
202/// # }
203/// ```
204#[derive(Clone)]
205pub struct PeriodicReader {
206    exporter: Arc<dyn PushMetricsExporter>,
207    inner: Arc<Mutex<PeriodicReaderInner>>,
208}
209
210impl PeriodicReader {
211    /// Configuration options for a periodic reader
212    pub fn builder<E, RT>(exporter: E, runtime: RT) -> PeriodicReaderBuilder<E, RT>
213    where
214        E: PushMetricsExporter,
215        RT: Runtime,
216    {
217        PeriodicReaderBuilder::new(exporter, runtime)
218    }
219}
220
221impl fmt::Debug for PeriodicReader {
222    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223        f.debug_struct("PeriodicReader").finish()
224    }
225}
226
227struct PeriodicReaderInner {
228    message_sender: mpsc::Sender<Message>,
229    is_shutdown: bool,
230    external_producers: Vec<Box<dyn MetricProducer>>,
231    sdk_producer_or_worker: ProducerOrWorker,
232}
233
234#[derive(Debug)]
235enum Message {
236    Export,
237    Flush(oneshot::Sender<Result<()>>),
238    Shutdown(oneshot::Sender<Result<()>>),
239}
240
241enum ProducerOrWorker {
242    Producer(Weak<dyn SdkProducer>),
243    Worker(Box<dyn FnOnce(&PeriodicReader) + Send + Sync>),
244}
245
246struct PeriodicReaderWorker<RT: Runtime> {
247    reader: PeriodicReader,
248    timeout: Duration,
249    runtime: RT,
250    rm: ResourceMetrics,
251}
252
253impl<RT: Runtime> PeriodicReaderWorker<RT> {
254    async fn collect_and_export(&mut self) -> Result<()> {
255        self.reader.collect(&mut self.rm)?;
256
257        let export = self.reader.exporter.export(&mut self.rm);
258        let timeout = self.runtime.delay(self.timeout);
259        pin_mut!(export);
260        pin_mut!(timeout);
261
262        match future::select(export, timeout).await {
263            Either::Left((res, _)) => res, // return the status of export.
264            Either::Right(_) => Err(MetricsError::Other("export timed out".into())),
265        }
266    }
267
268    async fn process_message(&mut self, message: Message) -> bool {
269        match message {
270            Message::Export => {
271                if let Err(err) = self.collect_and_export().await {
272                    global::handle_error(err)
273                }
274            }
275            Message::Flush(ch) => {
276                let res = self.collect_and_export().await;
277                if ch.send(res).is_err() {
278                    global::handle_error(MetricsError::Other("flush channel closed".into()))
279                }
280            }
281            Message::Shutdown(ch) => {
282                let res = self.collect_and_export().await;
283                let _ = self.reader.exporter.shutdown();
284                if ch.send(res).is_err() {
285                    global::handle_error(MetricsError::Other("shutdown channel closed".into()))
286                }
287                return false;
288            }
289        }
290
291        true
292    }
293
294    async fn run(mut self, mut messages: impl FusedStream<Item = Message> + Unpin) {
295        while let Some(message) = messages.next().await {
296            if !self.process_message(message).await {
297                break;
298            }
299        }
300    }
301}
302
303impl AggregationSelector for PeriodicReader {
304    fn aggregation(&self, kind: InstrumentKind) -> Aggregation {
305        self.exporter.aggregation(kind)
306    }
307}
308
309impl TemporalitySelector for PeriodicReader {
310    fn temporality(&self, kind: InstrumentKind) -> Temporality {
311        self.exporter.temporality(kind)
312    }
313}
314
315impl MetricReader for PeriodicReader {
316    fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
317        let mut inner = match self.inner.lock() {
318            Ok(guard) => guard,
319            Err(_) => return,
320        };
321
322        let worker = match &mut inner.sdk_producer_or_worker {
323            ProducerOrWorker::Producer(_) => {
324                // Only register once. If producer is already set, do nothing.
325                global::handle_error(MetricsError::Other(
326                    "duplicate meter registration, did not register manual reader".into(),
327                ));
328                return;
329            }
330            ProducerOrWorker::Worker(w) => mem::replace(w, Box::new(|_| {})),
331        };
332
333        inner.sdk_producer_or_worker = ProducerOrWorker::Producer(pipeline);
334        worker(self);
335    }
336
337    fn collect(&self, rm: &mut ResourceMetrics) -> Result<()> {
338        let inner = self.inner.lock()?;
339        if inner.is_shutdown {
340            return Err(MetricsError::Other("reader is shut down".into()));
341        }
342
343        if let Some(producer) = match &inner.sdk_producer_or_worker {
344            ProducerOrWorker::Producer(sdk_producer) => sdk_producer.upgrade(),
345            ProducerOrWorker::Worker(_) => None,
346        } {
347            producer.produce(rm)?;
348        } else {
349            return Err(MetricsError::Other("reader is not registered".into()));
350        }
351
352        let mut errs = vec![];
353        for producer in &inner.external_producers {
354            match producer.produce() {
355                Ok(metrics) => rm.scope_metrics.push(metrics),
356                Err(err) => errs.push(err),
357            }
358        }
359
360        if errs.is_empty() {
361            Ok(())
362        } else {
363            Err(MetricsError::Other(format!("{:?}", errs)))
364        }
365    }
366
367    fn force_flush(&self) -> Result<()> {
368        let mut inner = self.inner.lock()?;
369        if inner.is_shutdown {
370            return Err(MetricsError::Other("reader is shut down".into()));
371        }
372        let (sender, receiver) = oneshot::channel();
373        inner
374            .message_sender
375            .try_send(Message::Flush(sender))
376            .map_err(|e| MetricsError::Other(e.to_string()))?;
377
378        drop(inner); // don't hold lock when blocking on future
379
380        futures_executor::block_on(receiver)
381            .map_err(|err| MetricsError::Other(err.to_string()))
382            .and_then(|res| res)
383    }
384
385    fn shutdown(&self) -> Result<()> {
386        let mut inner = self.inner.lock()?;
387        if inner.is_shutdown {
388            return Err(MetricsError::Other("reader is already shut down".into()));
389        }
390
391        let (sender, receiver) = oneshot::channel();
392        inner
393            .message_sender
394            .try_send(Message::Shutdown(sender))
395            .map_err(|e| MetricsError::Other(e.to_string()))?;
396        drop(inner); // don't hold lock when blocking on future
397
398        let shutdown_result = futures_executor::block_on(receiver)
399            .map_err(|err| MetricsError::Other(err.to_string()))?;
400
401        // Acquire the lock again to set the shutdown flag
402        let mut inner = self.inner.lock()?;
403        inner.is_shutdown = true;
404
405        shutdown_result
406    }
407}
408
409#[cfg(all(test, feature = "testing"))]
410mod tests {
411    use super::PeriodicReader;
412    use crate::{
413        metrics::data::ResourceMetrics, metrics::reader::MetricReader, metrics::SdkMeterProvider,
414        runtime, testing::metrics::InMemoryMetricsExporter, Resource,
415    };
416    use opentelemetry::metrics::MeterProvider;
417    use std::sync::mpsc;
418
419    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
420    async fn registration_triggers_collection() {
421        // Arrange
422        let interval = std::time::Duration::from_millis(1);
423        let exporter = InMemoryMetricsExporter::default();
424        let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio)
425            .with_interval(interval)
426            .build();
427        let (sender, receiver) = mpsc::channel();
428
429        // Act
430        let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
431        let meter = meter_provider.meter("test");
432        let _counter = meter
433            .u64_observable_counter("testcounter")
434            .with_callback(move |_| {
435                sender.send(()).expect("channel should still be open");
436            })
437            .init();
438
439        _ = meter_provider.force_flush();
440
441        // Assert
442        receiver
443            .try_recv()
444            .expect("message should be available in channel, indicating a collection occurred");
445    }
446
447    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
448    async fn unregistered_collect() {
449        // Arrange
450        let exporter = InMemoryMetricsExporter::default();
451        let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
452        let mut rm = ResourceMetrics {
453            resource: Resource::empty(),
454            scope_metrics: Vec::new(),
455        };
456
457        // Act
458        let result = reader.collect(&mut rm);
459
460        // Assert
461        result.expect_err("error expected when reader is not registered");
462    }
463}