opentelemetry_sdk/metrics/
meter_provider.rs

1use core::fmt;
2use std::{
3    borrow::Cow,
4    collections::HashMap,
5    sync::{
6        atomic::{AtomicBool, Ordering},
7        Arc, Mutex,
8    },
9};
10
11use opentelemetry::{
12    global,
13    metrics::{noop::NoopMeterCore, Meter, MeterProvider, MetricsError, Result},
14    KeyValue,
15};
16
17use crate::{instrumentation::Scope, Resource};
18
19use super::{meter::SdkMeter, pipeline::Pipelines, reader::MetricReader, view::View};
20
21/// Handles the creation and coordination of [Meter]s.
22///
23/// All `Meter`s created by a `MeterProvider` will be associated with the same
24/// [Resource], have the same [View]s applied to them, and have their produced
25/// metric telemetry passed to the configured [MetricReader]s.
26///
27/// [Meter]: opentelemetry::metrics::Meter
28#[derive(Clone, Debug)]
29pub struct SdkMeterProvider {
30    inner: Arc<SdkMeterProviderInner>,
31}
32
33#[derive(Clone, Debug)]
34struct SdkMeterProviderInner {
35    pipes: Arc<Pipelines>,
36    meters: Arc<Mutex<HashMap<Scope, Arc<SdkMeter>>>>,
37    is_shutdown: Arc<AtomicBool>,
38}
39
40impl Default for SdkMeterProvider {
41    fn default() -> Self {
42        SdkMeterProvider::builder().build()
43    }
44}
45
46impl SdkMeterProvider {
47    /// Return default [MeterProviderBuilder]
48    pub fn builder() -> MeterProviderBuilder {
49        MeterProviderBuilder::default()
50    }
51
52    /// Flushes all pending telemetry.
53    ///
54    /// There is no guaranteed that all telemetry be flushed or all resources have
55    /// been released on error.
56    ///
57    /// # Examples
58    ///
59    /// ```
60    /// use opentelemetry::{global, Context};
61    /// use opentelemetry_sdk::metrics::SdkMeterProvider;
62    ///
63    /// fn init_metrics() -> SdkMeterProvider {
64    ///     // Setup metric pipelines with readers + views, default has no
65    ///     // readers so nothing is exported.
66    ///     let provider = SdkMeterProvider::default();
67    ///
68    ///     // Set provider to be used as global meter provider
69    ///     let _ = global::set_meter_provider(provider.clone());
70    ///
71    ///     provider
72    /// }
73    ///
74    /// fn main() -> Result<(), Box<dyn std::error::Error>> {
75    ///     let provider = init_metrics();
76    ///
77    ///     // create instruments + record measurements
78    ///
79    ///     // force all instruments to flush
80    ///     provider.force_flush()?;
81    ///
82    ///     // record more measurements..
83    ///
84    ///     // shutdown ensures any cleanup required by the provider is done,
85    ///     // and also invokes shutdown on the readers.
86    ///     provider.shutdown()?;
87    ///
88    ///     Ok(())
89    /// }
90    /// ```
91    pub fn force_flush(&self) -> Result<()> {
92        self.inner.force_flush()
93    }
94
95    /// Shuts down the meter provider flushing all pending telemetry and releasing
96    /// any held computational resources.
97    ///
98    /// This call is idempotent. The first call will perform all flush and releasing
99    /// operations. Subsequent calls will perform no action and will return an error
100    /// stating this.
101    ///
102    /// Measurements made by instruments from meters this MeterProvider created will
103    /// not be exported after Shutdown is called.
104    ///
105    /// There is no guaranteed that all telemetry be flushed or all resources have
106    /// been released on error.
107    pub fn shutdown(&self) -> Result<()> {
108        self.inner.shutdown()
109    }
110}
111
112impl SdkMeterProviderInner {
113    fn force_flush(&self) -> Result<()> {
114        self.pipes.force_flush()
115    }
116
117    fn shutdown(&self) -> Result<()> {
118        if self
119            .is_shutdown
120            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
121            .is_ok()
122        {
123            self.pipes.shutdown()
124        } else {
125            Err(MetricsError::Other(
126                "metrics provider already shut down".into(),
127            ))
128        }
129    }
130}
131
132impl Drop for SdkMeterProviderInner {
133    fn drop(&mut self) {
134        if let Err(err) = self.shutdown() {
135            global::handle_error(err);
136        }
137    }
138}
139impl MeterProvider for SdkMeterProvider {
140    fn versioned_meter(
141        &self,
142        name: impl Into<Cow<'static, str>>,
143        version: Option<impl Into<Cow<'static, str>>>,
144        schema_url: Option<impl Into<Cow<'static, str>>>,
145        attributes: Option<Vec<KeyValue>>,
146    ) -> Meter {
147        if self.inner.is_shutdown.load(Ordering::Relaxed) {
148            return Meter::new(Arc::new(NoopMeterCore::new()));
149        }
150
151        let mut builder = Scope::builder(name);
152
153        if let Some(v) = version {
154            builder = builder.with_version(v);
155        }
156        if let Some(s) = schema_url {
157            builder = builder.with_schema_url(s);
158        }
159        if let Some(a) = attributes {
160            builder = builder.with_attributes(a);
161        }
162
163        let scope = builder.build();
164
165        if let Ok(mut meters) = self.inner.meters.lock() {
166            let meter = meters
167                .entry(scope)
168                .or_insert_with_key(|scope| {
169                    Arc::new(SdkMeter::new(scope.clone(), self.inner.pipes.clone()))
170                })
171                .clone();
172            Meter::new(meter)
173        } else {
174            Meter::new(Arc::new(NoopMeterCore::new()))
175        }
176    }
177}
178
179/// Configuration options for a [MeterProvider].
180#[derive(Default)]
181pub struct MeterProviderBuilder {
182    resource: Option<Resource>,
183    readers: Vec<Box<dyn MetricReader>>,
184    views: Vec<Arc<dyn View>>,
185}
186
187impl MeterProviderBuilder {
188    /// Associates a [Resource] with a [MeterProvider].
189    ///
190    /// This [Resource] represents the entity producing telemetry and is associated
191    /// with all [Meter]s the [MeterProvider] will create.
192    ///
193    /// By default, if this option is not used, the default [Resource] will be used.
194    ///
195    /// [Meter]: opentelemetry::metrics::Meter
196    pub fn with_resource(mut self, resource: Resource) -> Self {
197        self.resource = Some(resource);
198        self
199    }
200
201    /// Associates a [MetricReader] with a [MeterProvider].
202    ///
203    /// By default, if this option is not used, the [MeterProvider] will perform no
204    /// operations; no data will be exported without a [MetricReader].
205    pub fn with_reader<T: MetricReader>(mut self, reader: T) -> Self {
206        self.readers.push(Box::new(reader));
207        self
208    }
209
210    /// Associates a [View] with a [MeterProvider].
211    ///
212    /// [View]s are appended to existing ones in a [MeterProvider] if this option is
213    /// used multiple times.
214    ///
215    /// By default, if this option is not used, the [MeterProvider] will use the
216    /// default view.
217    pub fn with_view<T: View>(mut self, view: T) -> Self {
218        self.views.push(Arc::new(view));
219        self
220    }
221
222    /// Construct a new [MeterProvider] with this configuration.
223
224    pub fn build(self) -> SdkMeterProvider {
225        SdkMeterProvider {
226            inner: Arc::new(SdkMeterProviderInner {
227                pipes: Arc::new(Pipelines::new(
228                    self.resource.unwrap_or_default(),
229                    self.readers,
230                    self.views,
231                )),
232                meters: Default::default(),
233                is_shutdown: Arc::new(AtomicBool::new(false)),
234            }),
235        }
236    }
237}
238
239impl fmt::Debug for MeterProviderBuilder {
240    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
241        f.debug_struct("MeterProviderBuilder")
242            .field("resource", &self.resource)
243            .field("readers", &self.readers)
244            .field("views", &self.views.len())
245            .finish()
246    }
247}
248#[cfg(test)]
249mod tests {
250    use crate::resource::{
251        SERVICE_NAME, TELEMETRY_SDK_LANGUAGE, TELEMETRY_SDK_NAME, TELEMETRY_SDK_VERSION,
252    };
253    use crate::testing::metrics::metric_reader::TestMetricReader;
254    use crate::Resource;
255    use opentelemetry::global;
256    use opentelemetry::metrics::MeterProvider;
257    use opentelemetry::{Key, KeyValue, Value};
258    use std::env;
259
260    #[test]
261    fn test_meter_provider_resource() {
262        let assert_resource = |provider: &super::SdkMeterProvider,
263                               resource_key: &'static str,
264                               expect: Option<&'static str>| {
265            assert_eq!(
266                provider.inner.pipes.0[0]
267                    .resource
268                    .get(Key::from_static_str(resource_key))
269                    .map(|v| v.to_string()),
270                expect.map(|s| s.to_string())
271            );
272        };
273        let assert_telemetry_resource = |provider: &super::SdkMeterProvider| {
274            assert_eq!(
275                provider.inner.pipes.0[0]
276                    .resource
277                    .get(TELEMETRY_SDK_LANGUAGE.into()),
278                Some(Value::from("rust"))
279            );
280            assert_eq!(
281                provider.inner.pipes.0[0]
282                    .resource
283                    .get(TELEMETRY_SDK_NAME.into()),
284                Some(Value::from("opentelemetry"))
285            );
286            assert_eq!(
287                provider.inner.pipes.0[0]
288                    .resource
289                    .get(TELEMETRY_SDK_VERSION.into()),
290                Some(Value::from(env!("CARGO_PKG_VERSION")))
291            );
292        };
293
294        // If users didn't provide a resource and there isn't a env var set. Use default one.
295        temp_env::with_var_unset("OTEL_RESOURCE_ATTRIBUTES", || {
296            let reader = TestMetricReader::new();
297            let default_meter_provider = super::SdkMeterProvider::builder()
298                .with_reader(reader)
299                .build();
300            assert_resource(
301                &default_meter_provider,
302                SERVICE_NAME,
303                Some("unknown_service"),
304            );
305            assert_telemetry_resource(&default_meter_provider);
306        });
307
308        // If user provided a resource, use that.
309        let reader2 = TestMetricReader::new();
310        let custom_meter_provider = super::SdkMeterProvider::builder()
311            .with_reader(reader2)
312            .with_resource(Resource::new(vec![KeyValue::new(
313                SERVICE_NAME,
314                "test_service",
315            )]))
316            .build();
317        assert_resource(&custom_meter_provider, SERVICE_NAME, Some("test_service"));
318        assert_eq!(custom_meter_provider.inner.pipes.0[0].resource.len(), 1);
319
320        temp_env::with_var(
321            "OTEL_RESOURCE_ATTRIBUTES",
322            Some("key1=value1, k2, k3=value2"),
323            || {
324                // If `OTEL_RESOURCE_ATTRIBUTES` is set, read them automatically
325                let reader3 = TestMetricReader::new();
326                let env_resource_provider = super::SdkMeterProvider::builder()
327                    .with_reader(reader3)
328                    .build();
329                assert_resource(
330                    &env_resource_provider,
331                    SERVICE_NAME,
332                    Some("unknown_service"),
333                );
334                assert_resource(&env_resource_provider, "key1", Some("value1"));
335                assert_resource(&env_resource_provider, "k3", Some("value2"));
336                assert_telemetry_resource(&env_resource_provider);
337                assert_eq!(env_resource_provider.inner.pipes.0[0].resource.len(), 6);
338            },
339        );
340
341        // When `OTEL_RESOURCE_ATTRIBUTES` is set and also user provided config
342        temp_env::with_var(
343            "OTEL_RESOURCE_ATTRIBUTES",
344            Some("my-custom-key=env-val,k2=value2"),
345            || {
346                let reader4 = TestMetricReader::new();
347                let user_provided_resource_config_provider = super::SdkMeterProvider::builder()
348                    .with_reader(reader4)
349                    .with_resource(Resource::default().merge(&mut Resource::new(vec![
350                        KeyValue::new("my-custom-key", "my-custom-value"),
351                        KeyValue::new("my-custom-key2", "my-custom-value2"),
352                    ])))
353                    .build();
354                assert_resource(
355                    &user_provided_resource_config_provider,
356                    SERVICE_NAME,
357                    Some("unknown_service"),
358                );
359                assert_resource(
360                    &user_provided_resource_config_provider,
361                    "my-custom-key",
362                    Some("my-custom-value"),
363                );
364                assert_resource(
365                    &user_provided_resource_config_provider,
366                    "my-custom-key2",
367                    Some("my-custom-value2"),
368                );
369                assert_resource(
370                    &user_provided_resource_config_provider,
371                    "k2",
372                    Some("value2"),
373                );
374                assert_telemetry_resource(&user_provided_resource_config_provider);
375                assert_eq!(
376                    user_provided_resource_config_provider.inner.pipes.0[0]
377                        .resource
378                        .len(),
379                    7
380                );
381            },
382        );
383
384        // If user provided a resource, it takes priority during collision.
385        let reader5 = TestMetricReader::new();
386        let no_service_name = super::SdkMeterProvider::builder()
387            .with_reader(reader5)
388            .with_resource(Resource::empty())
389            .build();
390
391        assert_eq!(no_service_name.inner.pipes.0[0].resource.len(), 0)
392    }
393
394    #[test]
395    fn test_meter_provider_shutdown() {
396        let reader = TestMetricReader::new();
397        let provider = super::SdkMeterProvider::builder()
398            .with_reader(reader.clone())
399            .build();
400        global::set_meter_provider(provider.clone());
401        assert!(!reader.is_shutdown());
402        // create a meter and an instrument
403        let meter = global::meter("test");
404        let counter = meter.u64_counter("test_counter").init();
405        // no need to drop a meter for meter_provider shutdown
406        let shutdown_res = provider.shutdown();
407        assert!(shutdown_res.is_ok());
408
409        // shutdown once more should return an error
410        let shutdown_res = provider.shutdown();
411        assert!(shutdown_res.is_err());
412        assert!(reader.is_shutdown());
413        // TODO Fix: the instrument is still available, and can be used.
414        // While the reader is shutdown, and no collect is happening
415        counter.add(1, &[]);
416    }
417    #[test]
418    fn test_shutdown_invoked_on_last_drop() {
419        let reader = TestMetricReader::new();
420        let provider = super::SdkMeterProvider::builder()
421            .with_reader(reader.clone())
422            .build();
423        let clone1 = provider.clone();
424        let clone2 = provider.clone();
425
426        // Initially, shutdown should not be called
427        assert!(!reader.is_shutdown());
428
429        // Drop the first clone
430        drop(clone1);
431        assert!(!reader.is_shutdown());
432
433        // Drop the second clone
434        drop(clone2);
435        assert!(!reader.is_shutdown());
436
437        // Drop the last original provider
438        drop(provider);
439        // Now the shutdown should be invoked
440        assert!(reader.is_shutdown());
441    }
442
443    #[test]
444    fn same_meter_reused_same_scope() {
445        let provider = super::SdkMeterProvider::builder().build();
446        let _meter1 = provider.meter("test");
447        let _meter2 = provider.meter("test");
448        assert_eq!(provider.inner.meters.lock().unwrap().len(), 1);
449        let _meter3 =
450            provider.versioned_meter("test", Some("1.0.0"), Some("http://example.com"), None);
451        let _meter4 =
452            provider.versioned_meter("test", Some("1.0.0"), Some("http://example.com"), None);
453        let _meter5 =
454            provider.versioned_meter("test", Some("1.0.0"), Some("http://example.com"), None);
455        assert_eq!(provider.inner.meters.lock().unwrap().len(), 2);
456
457        // the below are different meters, as meter names are case sensitive
458        let _meter6 =
459            provider.versioned_meter("ABC", Some("1.0.0"), Some("http://example.com"), None);
460        let _meter7 =
461            provider.versioned_meter("Abc", Some("1.0.0"), Some("http://example.com"), None);
462        let _meter8 =
463            provider.versioned_meter("abc", Some("1.0.0"), Some("http://example.com"), None);
464        assert_eq!(provider.inner.meters.lock().unwrap().len(), 5);
465    }
466}