opentelemetry_sdk/metrics/
manual_reader.rs

1use std::{
2    fmt,
3    sync::{Mutex, Weak},
4};
5
6use opentelemetry::{
7    global,
8    metrics::{MetricsError, Result},
9};
10
11use super::{
12    data::{ResourceMetrics, Temporality},
13    instrument::InstrumentKind,
14    pipeline::Pipeline,
15    reader::{
16        AggregationSelector, DefaultAggregationSelector, DefaultTemporalitySelector,
17        MetricProducer, MetricReader, SdkProducer, TemporalitySelector,
18    },
19};
20
21/// A simple [MetricReader] that allows an application to read metrics on demand.
22///
23/// See [ManualReaderBuilder] for configuration options.
24///
25/// # Example
26///
27/// ```
28/// use opentelemetry_sdk::metrics::ManualReader;
29///
30/// // can specify additional reader configuration
31/// let reader = ManualReader::builder().build();
32/// # drop(reader)
33/// ```
34pub struct ManualReader {
35    inner: Box<Mutex<ManualReaderInner>>,
36    temporality_selector: Box<dyn TemporalitySelector>,
37    aggregation_selector: Box<dyn AggregationSelector>,
38}
39
40impl Default for ManualReader {
41    fn default() -> Self {
42        ManualReader::builder().build()
43    }
44}
45
46impl fmt::Debug for ManualReader {
47    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
48        f.write_str("ManualReader")
49    }
50}
51
52#[derive(Debug)]
53struct ManualReaderInner {
54    sdk_producer: Option<Weak<dyn SdkProducer>>,
55    is_shutdown: bool,
56    external_producers: Vec<Box<dyn MetricProducer>>,
57}
58
59impl ManualReader {
60    /// Configuration for this reader
61    pub fn builder() -> ManualReaderBuilder {
62        ManualReaderBuilder::default()
63    }
64
65    /// A [MetricReader] which is directly called to collect metrics.
66    pub(crate) fn new(
67        temporality_selector: Box<dyn TemporalitySelector>,
68        aggregation_selector: Box<dyn AggregationSelector>,
69        producers: Vec<Box<dyn MetricProducer>>,
70    ) -> Self {
71        ManualReader {
72            inner: Box::new(Mutex::new(ManualReaderInner {
73                sdk_producer: None,
74                is_shutdown: false,
75                external_producers: producers,
76            })),
77            temporality_selector,
78            aggregation_selector,
79        }
80    }
81}
82
83impl TemporalitySelector for ManualReader {
84    fn temporality(&self, kind: InstrumentKind) -> Temporality {
85        self.temporality_selector.temporality(kind)
86    }
87}
88
89impl AggregationSelector for ManualReader {
90    fn aggregation(&self, kind: InstrumentKind) -> super::aggregation::Aggregation {
91        self.aggregation_selector.aggregation(kind)
92    }
93}
94
95impl MetricReader for ManualReader {
96    ///  Register a pipeline which enables the caller to read metrics from the SDK
97    ///  on demand.
98    fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
99        let _ = self.inner.lock().map(|mut inner| {
100            // Only register once. If producer is already set, do nothing.
101            if inner.sdk_producer.is_none() {
102                inner.sdk_producer = Some(pipeline);
103            } else {
104                global::handle_error(MetricsError::Config(
105                    "duplicate reader registration, did not register manual reader".into(),
106                ))
107            }
108        });
109    }
110
111    /// Gathers all metrics from the SDK and other [MetricProducer]s, calling any
112    /// callbacks necessary and returning the results.
113    ///
114    /// Returns an error if called after shutdown.
115    fn collect(&self, rm: &mut ResourceMetrics) -> Result<()> {
116        let inner = self.inner.lock()?;
117        match &inner.sdk_producer.as_ref().and_then(|w| w.upgrade()) {
118            Some(producer) => producer.produce(rm)?,
119            None => {
120                return Err(MetricsError::Other(
121                    "reader is shut down or not registered".into(),
122                ))
123            }
124        };
125
126        let mut errs = vec![];
127        for producer in &inner.external_producers {
128            match producer.produce() {
129                Ok(metrics) => rm.scope_metrics.push(metrics),
130                Err(err) => errs.push(err),
131            }
132        }
133
134        if errs.is_empty() {
135            Ok(())
136        } else {
137            Err(MetricsError::Other(format!("{:?}", errs)))
138        }
139    }
140
141    /// ForceFlush is a no-op, it always returns nil.
142    fn force_flush(&self) -> Result<()> {
143        Ok(())
144    }
145
146    /// Closes any connections and frees any resources used by the reader.
147    fn shutdown(&self) -> Result<()> {
148        let mut inner = self.inner.lock()?;
149
150        // Any future call to collect will now return an error.
151        inner.sdk_producer = None;
152        inner.is_shutdown = true;
153        inner.external_producers = Vec::new();
154
155        Ok(())
156    }
157}
158
159/// Configuration for a [ManualReader]
160pub struct ManualReaderBuilder {
161    temporality_selector: Box<dyn TemporalitySelector>,
162    aggregation_selector: Box<dyn AggregationSelector>,
163    producers: Vec<Box<dyn MetricProducer>>,
164}
165
166impl fmt::Debug for ManualReaderBuilder {
167    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
168        f.write_str("ManualReaderBuilder")
169    }
170}
171
172impl Default for ManualReaderBuilder {
173    fn default() -> Self {
174        ManualReaderBuilder {
175            temporality_selector: Box::new(DefaultTemporalitySelector { _private: () }),
176            aggregation_selector: Box::new(DefaultAggregationSelector { _private: () }),
177            producers: vec![],
178        }
179    }
180}
181
182impl ManualReaderBuilder {
183    /// New manual builder configuration
184    pub fn new() -> Self {
185        Default::default()
186    }
187
188    /// Sets the [TemporalitySelector] a reader will use to determine the [Temporality] of
189    /// an instrument based on its kind. If this option is not used, the reader will use
190    /// the default temporality selector.
191    pub fn with_temporality_selector(
192        mut self,
193        temporality_selector: impl TemporalitySelector + 'static,
194    ) -> Self {
195        self.temporality_selector = Box::new(temporality_selector);
196        self
197    }
198
199    /// Sets the [AggregationSelector] a reader will use to determine the
200    /// aggregation to use for an instrument based on its kind.
201    ///
202    /// If this option is not used, the reader will use the default aggregation
203    /// selector or the aggregation explicitly passed for a view matching an
204    /// instrument.
205    pub fn with_aggregation_selector(
206        mut self,
207        aggregation_selector: impl AggregationSelector + 'static,
208    ) -> Self {
209        self.aggregation_selector = Box::new(aggregation_selector);
210        self
211    }
212
213    /// Registers a an external [MetricProducer] with this reader.
214    ///
215    /// The producer is used as a source of aggregated metric data which is
216    /// incorporated into metrics collected from the SDK.
217    pub fn with_producer(mut self, producer: impl MetricProducer + 'static) -> Self {
218        self.producers.push(Box::new(producer));
219        self
220    }
221
222    /// Create a new [ManualReader] from this configuration.
223    pub fn build(self) -> ManualReader {
224        ManualReader::new(
225            self.temporality_selector,
226            self.aggregation_selector,
227            self.producers,
228        )
229    }
230}