opentelemetry_sdk/metrics/
periodic_reader.rs
1use std::{
2 env, fmt, mem,
3 sync::{Arc, Mutex, Weak},
4 time::Duration,
5};
6
7use futures_channel::{mpsc, oneshot};
8use futures_util::{
9 future::{self, Either},
10 pin_mut,
11 stream::{self, FusedStream},
12 StreamExt,
13};
14use opentelemetry::{
15 global,
16 metrics::{MetricsError, Result},
17};
18
19use crate::runtime::Runtime;
20use crate::{
21 metrics::{
22 exporter::PushMetricsExporter,
23 reader::{MetricProducer, SdkProducer},
24 },
25 Resource,
26};
27
28use super::{
29 aggregation::Aggregation,
30 data::{ResourceMetrics, Temporality},
31 instrument::InstrumentKind,
32 reader::{AggregationSelector, MetricReader, TemporalitySelector},
33 Pipeline,
34};
35
36const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
37const DEFAULT_INTERVAL: Duration = Duration::from_secs(60);
38
39const METRIC_EXPORT_INTERVAL_NAME: &str = "OTEL_METRIC_EXPORT_INTERVAL";
40const METRIC_EXPORT_TIMEOUT_NAME: &str = "OTEL_METRIC_EXPORT_TIMEOUT";
41
42#[derive(Debug)]
57pub struct PeriodicReaderBuilder<E, RT> {
58 interval: Duration,
59 timeout: Duration,
60 exporter: E,
61 producers: Vec<Box<dyn MetricProducer>>,
62 runtime: RT,
63}
64
65impl<E, RT> PeriodicReaderBuilder<E, RT>
66where
67 E: PushMetricsExporter,
68 RT: Runtime,
69{
70 fn new(exporter: E, runtime: RT) -> Self {
71 let interval = env::var(METRIC_EXPORT_INTERVAL_NAME)
72 .ok()
73 .and_then(|v| v.parse().map(Duration::from_millis).ok())
74 .unwrap_or(DEFAULT_INTERVAL);
75 let timeout = env::var(METRIC_EXPORT_TIMEOUT_NAME)
76 .ok()
77 .and_then(|v| v.parse().map(Duration::from_millis).ok())
78 .unwrap_or(DEFAULT_TIMEOUT);
79
80 PeriodicReaderBuilder {
81 interval,
82 timeout,
83 producers: vec![],
84 exporter,
85 runtime,
86 }
87 }
88
89 pub fn with_interval(mut self, interval: Duration) -> Self {
97 if !interval.is_zero() {
98 self.interval = interval;
99 }
100 self
101 }
102
103 pub fn with_timeout(mut self, timeout: Duration) -> Self {
112 if !timeout.is_zero() {
113 self.timeout = timeout;
114 }
115 self
116 }
117
118 pub fn with_producer(mut self, producer: impl MetricProducer + 'static) -> Self {
123 self.producers.push(Box::new(producer));
124 self
125 }
126
127 pub fn build(self) -> PeriodicReader {
129 let (message_sender, message_receiver) = mpsc::channel(256);
130
131 let worker = move |reader: &PeriodicReader| {
132 let ticker = self
133 .runtime
134 .interval(self.interval)
135 .skip(1) .map(|_| Message::Export);
137
138 let messages = Box::pin(stream::select(message_receiver, ticker));
139
140 let runtime = self.runtime.clone();
141 self.runtime.spawn(Box::pin(
142 PeriodicReaderWorker {
143 reader: reader.clone(),
144 timeout: self.timeout,
145 runtime,
146 rm: ResourceMetrics {
147 resource: Resource::empty(),
148 scope_metrics: Vec::new(),
149 },
150 }
151 .run(messages),
152 ));
153 };
154
155 PeriodicReader {
156 exporter: Arc::new(self.exporter),
157 inner: Arc::new(Mutex::new(PeriodicReaderInner {
158 message_sender,
159 is_shutdown: false,
160 external_producers: self.producers,
161 sdk_producer_or_worker: ProducerOrWorker::Worker(Box::new(worker)),
162 })),
163 }
164 }
165}
166
167#[derive(Clone)]
205pub struct PeriodicReader {
206 exporter: Arc<dyn PushMetricsExporter>,
207 inner: Arc<Mutex<PeriodicReaderInner>>,
208}
209
210impl PeriodicReader {
211 pub fn builder<E, RT>(exporter: E, runtime: RT) -> PeriodicReaderBuilder<E, RT>
213 where
214 E: PushMetricsExporter,
215 RT: Runtime,
216 {
217 PeriodicReaderBuilder::new(exporter, runtime)
218 }
219}
220
221impl fmt::Debug for PeriodicReader {
222 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223 f.debug_struct("PeriodicReader").finish()
224 }
225}
226
227struct PeriodicReaderInner {
228 message_sender: mpsc::Sender<Message>,
229 is_shutdown: bool,
230 external_producers: Vec<Box<dyn MetricProducer>>,
231 sdk_producer_or_worker: ProducerOrWorker,
232}
233
234#[derive(Debug)]
235enum Message {
236 Export,
237 Flush(oneshot::Sender<Result<()>>),
238 Shutdown(oneshot::Sender<Result<()>>),
239}
240
241enum ProducerOrWorker {
242 Producer(Weak<dyn SdkProducer>),
243 Worker(Box<dyn FnOnce(&PeriodicReader) + Send + Sync>),
244}
245
246struct PeriodicReaderWorker<RT: Runtime> {
247 reader: PeriodicReader,
248 timeout: Duration,
249 runtime: RT,
250 rm: ResourceMetrics,
251}
252
253impl<RT: Runtime> PeriodicReaderWorker<RT> {
254 async fn collect_and_export(&mut self) -> Result<()> {
255 self.reader.collect(&mut self.rm)?;
256
257 let export = self.reader.exporter.export(&mut self.rm);
258 let timeout = self.runtime.delay(self.timeout);
259 pin_mut!(export);
260 pin_mut!(timeout);
261
262 match future::select(export, timeout).await {
263 Either::Left((res, _)) => res, Either::Right(_) => Err(MetricsError::Other("export timed out".into())),
265 }
266 }
267
268 async fn process_message(&mut self, message: Message) -> bool {
269 match message {
270 Message::Export => {
271 if let Err(err) = self.collect_and_export().await {
272 global::handle_error(err)
273 }
274 }
275 Message::Flush(ch) => {
276 let res = self.collect_and_export().await;
277 if ch.send(res).is_err() {
278 global::handle_error(MetricsError::Other("flush channel closed".into()))
279 }
280 }
281 Message::Shutdown(ch) => {
282 let res = self.collect_and_export().await;
283 let _ = self.reader.exporter.shutdown();
284 if ch.send(res).is_err() {
285 global::handle_error(MetricsError::Other("shutdown channel closed".into()))
286 }
287 return false;
288 }
289 }
290
291 true
292 }
293
294 async fn run(mut self, mut messages: impl FusedStream<Item = Message> + Unpin) {
295 while let Some(message) = messages.next().await {
296 if !self.process_message(message).await {
297 break;
298 }
299 }
300 }
301}
302
303impl AggregationSelector for PeriodicReader {
304 fn aggregation(&self, kind: InstrumentKind) -> Aggregation {
305 self.exporter.aggregation(kind)
306 }
307}
308
309impl TemporalitySelector for PeriodicReader {
310 fn temporality(&self, kind: InstrumentKind) -> Temporality {
311 self.exporter.temporality(kind)
312 }
313}
314
315impl MetricReader for PeriodicReader {
316 fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
317 let mut inner = match self.inner.lock() {
318 Ok(guard) => guard,
319 Err(_) => return,
320 };
321
322 let worker = match &mut inner.sdk_producer_or_worker {
323 ProducerOrWorker::Producer(_) => {
324 global::handle_error(MetricsError::Other(
326 "duplicate meter registration, did not register manual reader".into(),
327 ));
328 return;
329 }
330 ProducerOrWorker::Worker(w) => mem::replace(w, Box::new(|_| {})),
331 };
332
333 inner.sdk_producer_or_worker = ProducerOrWorker::Producer(pipeline);
334 worker(self);
335 }
336
337 fn collect(&self, rm: &mut ResourceMetrics) -> Result<()> {
338 let inner = self.inner.lock()?;
339 if inner.is_shutdown {
340 return Err(MetricsError::Other("reader is shut down".into()));
341 }
342
343 if let Some(producer) = match &inner.sdk_producer_or_worker {
344 ProducerOrWorker::Producer(sdk_producer) => sdk_producer.upgrade(),
345 ProducerOrWorker::Worker(_) => None,
346 } {
347 producer.produce(rm)?;
348 } else {
349 return Err(MetricsError::Other("reader is not registered".into()));
350 }
351
352 let mut errs = vec![];
353 for producer in &inner.external_producers {
354 match producer.produce() {
355 Ok(metrics) => rm.scope_metrics.push(metrics),
356 Err(err) => errs.push(err),
357 }
358 }
359
360 if errs.is_empty() {
361 Ok(())
362 } else {
363 Err(MetricsError::Other(format!("{:?}", errs)))
364 }
365 }
366
367 fn force_flush(&self) -> Result<()> {
368 let mut inner = self.inner.lock()?;
369 if inner.is_shutdown {
370 return Err(MetricsError::Other("reader is shut down".into()));
371 }
372 let (sender, receiver) = oneshot::channel();
373 inner
374 .message_sender
375 .try_send(Message::Flush(sender))
376 .map_err(|e| MetricsError::Other(e.to_string()))?;
377
378 drop(inner); futures_executor::block_on(receiver)
381 .map_err(|err| MetricsError::Other(err.to_string()))
382 .and_then(|res| res)
383 }
384
385 fn shutdown(&self) -> Result<()> {
386 let mut inner = self.inner.lock()?;
387 if inner.is_shutdown {
388 return Err(MetricsError::Other("reader is already shut down".into()));
389 }
390
391 let (sender, receiver) = oneshot::channel();
392 inner
393 .message_sender
394 .try_send(Message::Shutdown(sender))
395 .map_err(|e| MetricsError::Other(e.to_string()))?;
396 drop(inner); let shutdown_result = futures_executor::block_on(receiver)
399 .map_err(|err| MetricsError::Other(err.to_string()))?;
400
401 let mut inner = self.inner.lock()?;
403 inner.is_shutdown = true;
404
405 shutdown_result
406 }
407}
408
409#[cfg(all(test, feature = "testing"))]
410mod tests {
411 use super::PeriodicReader;
412 use crate::{
413 metrics::data::ResourceMetrics, metrics::reader::MetricReader, metrics::SdkMeterProvider,
414 runtime, testing::metrics::InMemoryMetricsExporter, Resource,
415 };
416 use opentelemetry::metrics::MeterProvider;
417 use std::sync::mpsc;
418
419 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
420 async fn registration_triggers_collection() {
421 let interval = std::time::Duration::from_millis(1);
423 let exporter = InMemoryMetricsExporter::default();
424 let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio)
425 .with_interval(interval)
426 .build();
427 let (sender, receiver) = mpsc::channel();
428
429 let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
431 let meter = meter_provider.meter("test");
432 let _counter = meter
433 .u64_observable_counter("testcounter")
434 .with_callback(move |_| {
435 sender.send(()).expect("channel should still be open");
436 })
437 .init();
438
439 _ = meter_provider.force_flush();
440
441 receiver
443 .try_recv()
444 .expect("message should be available in channel, indicating a collection occurred");
445 }
446
447 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
448 async fn unregistered_collect() {
449 let exporter = InMemoryMetricsExporter::default();
451 let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
452 let mut rm = ResourceMetrics {
453 resource: Resource::empty(),
454 scope_metrics: Vec::new(),
455 };
456
457 let result = reader.collect(&mut rm);
459
460 result.expect_err("error expected when reader is not registered");
462 }
463}