opentelemetry_sdk/metrics/
manual_reader.rs
1use std::{
2 fmt,
3 sync::{Mutex, Weak},
4};
5
6use opentelemetry::{
7 global,
8 metrics::{MetricsError, Result},
9};
10
11use super::{
12 data::{ResourceMetrics, Temporality},
13 instrument::InstrumentKind,
14 pipeline::Pipeline,
15 reader::{
16 AggregationSelector, DefaultAggregationSelector, DefaultTemporalitySelector,
17 MetricProducer, MetricReader, SdkProducer, TemporalitySelector,
18 },
19};
20
21pub struct ManualReader {
35 inner: Box<Mutex<ManualReaderInner>>,
36 temporality_selector: Box<dyn TemporalitySelector>,
37 aggregation_selector: Box<dyn AggregationSelector>,
38}
39
40impl Default for ManualReader {
41 fn default() -> Self {
42 ManualReader::builder().build()
43 }
44}
45
46impl fmt::Debug for ManualReader {
47 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
48 f.write_str("ManualReader")
49 }
50}
51
52#[derive(Debug)]
53struct ManualReaderInner {
54 sdk_producer: Option<Weak<dyn SdkProducer>>,
55 is_shutdown: bool,
56 external_producers: Vec<Box<dyn MetricProducer>>,
57}
58
59impl ManualReader {
60 pub fn builder() -> ManualReaderBuilder {
62 ManualReaderBuilder::default()
63 }
64
65 pub(crate) fn new(
67 temporality_selector: Box<dyn TemporalitySelector>,
68 aggregation_selector: Box<dyn AggregationSelector>,
69 producers: Vec<Box<dyn MetricProducer>>,
70 ) -> Self {
71 ManualReader {
72 inner: Box::new(Mutex::new(ManualReaderInner {
73 sdk_producer: None,
74 is_shutdown: false,
75 external_producers: producers,
76 })),
77 temporality_selector,
78 aggregation_selector,
79 }
80 }
81}
82
83impl TemporalitySelector for ManualReader {
84 fn temporality(&self, kind: InstrumentKind) -> Temporality {
85 self.temporality_selector.temporality(kind)
86 }
87}
88
89impl AggregationSelector for ManualReader {
90 fn aggregation(&self, kind: InstrumentKind) -> super::aggregation::Aggregation {
91 self.aggregation_selector.aggregation(kind)
92 }
93}
94
95impl MetricReader for ManualReader {
96 fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
99 let _ = self.inner.lock().map(|mut inner| {
100 if inner.sdk_producer.is_none() {
102 inner.sdk_producer = Some(pipeline);
103 } else {
104 global::handle_error(MetricsError::Config(
105 "duplicate reader registration, did not register manual reader".into(),
106 ))
107 }
108 });
109 }
110
111 fn collect(&self, rm: &mut ResourceMetrics) -> Result<()> {
116 let inner = self.inner.lock()?;
117 match &inner.sdk_producer.as_ref().and_then(|w| w.upgrade()) {
118 Some(producer) => producer.produce(rm)?,
119 None => {
120 return Err(MetricsError::Other(
121 "reader is shut down or not registered".into(),
122 ))
123 }
124 };
125
126 let mut errs = vec![];
127 for producer in &inner.external_producers {
128 match producer.produce() {
129 Ok(metrics) => rm.scope_metrics.push(metrics),
130 Err(err) => errs.push(err),
131 }
132 }
133
134 if errs.is_empty() {
135 Ok(())
136 } else {
137 Err(MetricsError::Other(format!("{:?}", errs)))
138 }
139 }
140
141 fn force_flush(&self) -> Result<()> {
143 Ok(())
144 }
145
146 fn shutdown(&self) -> Result<()> {
148 let mut inner = self.inner.lock()?;
149
150 inner.sdk_producer = None;
152 inner.is_shutdown = true;
153 inner.external_producers = Vec::new();
154
155 Ok(())
156 }
157}
158
159pub struct ManualReaderBuilder {
161 temporality_selector: Box<dyn TemporalitySelector>,
162 aggregation_selector: Box<dyn AggregationSelector>,
163 producers: Vec<Box<dyn MetricProducer>>,
164}
165
166impl fmt::Debug for ManualReaderBuilder {
167 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
168 f.write_str("ManualReaderBuilder")
169 }
170}
171
172impl Default for ManualReaderBuilder {
173 fn default() -> Self {
174 ManualReaderBuilder {
175 temporality_selector: Box::new(DefaultTemporalitySelector { _private: () }),
176 aggregation_selector: Box::new(DefaultAggregationSelector { _private: () }),
177 producers: vec![],
178 }
179 }
180}
181
182impl ManualReaderBuilder {
183 pub fn new() -> Self {
185 Default::default()
186 }
187
188 pub fn with_temporality_selector(
192 mut self,
193 temporality_selector: impl TemporalitySelector + 'static,
194 ) -> Self {
195 self.temporality_selector = Box::new(temporality_selector);
196 self
197 }
198
199 pub fn with_aggregation_selector(
206 mut self,
207 aggregation_selector: impl AggregationSelector + 'static,
208 ) -> Self {
209 self.aggregation_selector = Box::new(aggregation_selector);
210 self
211 }
212
213 pub fn with_producer(mut self, producer: impl MetricProducer + 'static) -> Self {
218 self.producers.push(Box::new(producer));
219 self
220 }
221
222 pub fn build(self) -> ManualReader {
224 ManualReader::new(
225 self.temporality_selector,
226 self.aggregation_selector,
227 self.producers,
228 )
229 }
230}