opentelemetry_sdk/metrics/
meter_provider.rs
1use core::fmt;
2use std::{
3 borrow::Cow,
4 collections::HashMap,
5 sync::{
6 atomic::{AtomicBool, Ordering},
7 Arc, Mutex,
8 },
9};
10
11use opentelemetry::{
12 global,
13 metrics::{noop::NoopMeterCore, Meter, MeterProvider, MetricsError, Result},
14 KeyValue,
15};
16
17use crate::{instrumentation::Scope, Resource};
18
19use super::{meter::SdkMeter, pipeline::Pipelines, reader::MetricReader, view::View};
20
21#[derive(Clone, Debug)]
29pub struct SdkMeterProvider {
30 inner: Arc<SdkMeterProviderInner>,
31}
32
33#[derive(Clone, Debug)]
34struct SdkMeterProviderInner {
35 pipes: Arc<Pipelines>,
36 meters: Arc<Mutex<HashMap<Scope, Arc<SdkMeter>>>>,
37 is_shutdown: Arc<AtomicBool>,
38}
39
40impl Default for SdkMeterProvider {
41 fn default() -> Self {
42 SdkMeterProvider::builder().build()
43 }
44}
45
46impl SdkMeterProvider {
47 pub fn builder() -> MeterProviderBuilder {
49 MeterProviderBuilder::default()
50 }
51
52 pub fn force_flush(&self) -> Result<()> {
92 self.inner.force_flush()
93 }
94
95 pub fn shutdown(&self) -> Result<()> {
108 self.inner.shutdown()
109 }
110}
111
112impl SdkMeterProviderInner {
113 fn force_flush(&self) -> Result<()> {
114 self.pipes.force_flush()
115 }
116
117 fn shutdown(&self) -> Result<()> {
118 if self
119 .is_shutdown
120 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
121 .is_ok()
122 {
123 self.pipes.shutdown()
124 } else {
125 Err(MetricsError::Other(
126 "metrics provider already shut down".into(),
127 ))
128 }
129 }
130}
131
132impl Drop for SdkMeterProviderInner {
133 fn drop(&mut self) {
134 if let Err(err) = self.shutdown() {
135 global::handle_error(err);
136 }
137 }
138}
139impl MeterProvider for SdkMeterProvider {
140 fn versioned_meter(
141 &self,
142 name: impl Into<Cow<'static, str>>,
143 version: Option<impl Into<Cow<'static, str>>>,
144 schema_url: Option<impl Into<Cow<'static, str>>>,
145 attributes: Option<Vec<KeyValue>>,
146 ) -> Meter {
147 if self.inner.is_shutdown.load(Ordering::Relaxed) {
148 return Meter::new(Arc::new(NoopMeterCore::new()));
149 }
150
151 let mut builder = Scope::builder(name);
152
153 if let Some(v) = version {
154 builder = builder.with_version(v);
155 }
156 if let Some(s) = schema_url {
157 builder = builder.with_schema_url(s);
158 }
159 if let Some(a) = attributes {
160 builder = builder.with_attributes(a);
161 }
162
163 let scope = builder.build();
164
165 if let Ok(mut meters) = self.inner.meters.lock() {
166 let meter = meters
167 .entry(scope)
168 .or_insert_with_key(|scope| {
169 Arc::new(SdkMeter::new(scope.clone(), self.inner.pipes.clone()))
170 })
171 .clone();
172 Meter::new(meter)
173 } else {
174 Meter::new(Arc::new(NoopMeterCore::new()))
175 }
176 }
177}
178
179#[derive(Default)]
181pub struct MeterProviderBuilder {
182 resource: Option<Resource>,
183 readers: Vec<Box<dyn MetricReader>>,
184 views: Vec<Arc<dyn View>>,
185}
186
187impl MeterProviderBuilder {
188 pub fn with_resource(mut self, resource: Resource) -> Self {
197 self.resource = Some(resource);
198 self
199 }
200
201 pub fn with_reader<T: MetricReader>(mut self, reader: T) -> Self {
206 self.readers.push(Box::new(reader));
207 self
208 }
209
210 pub fn with_view<T: View>(mut self, view: T) -> Self {
218 self.views.push(Arc::new(view));
219 self
220 }
221
222 pub fn build(self) -> SdkMeterProvider {
225 SdkMeterProvider {
226 inner: Arc::new(SdkMeterProviderInner {
227 pipes: Arc::new(Pipelines::new(
228 self.resource.unwrap_or_default(),
229 self.readers,
230 self.views,
231 )),
232 meters: Default::default(),
233 is_shutdown: Arc::new(AtomicBool::new(false)),
234 }),
235 }
236 }
237}
238
239impl fmt::Debug for MeterProviderBuilder {
240 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
241 f.debug_struct("MeterProviderBuilder")
242 .field("resource", &self.resource)
243 .field("readers", &self.readers)
244 .field("views", &self.views.len())
245 .finish()
246 }
247}
248#[cfg(test)]
249mod tests {
250 use crate::resource::{
251 SERVICE_NAME, TELEMETRY_SDK_LANGUAGE, TELEMETRY_SDK_NAME, TELEMETRY_SDK_VERSION,
252 };
253 use crate::testing::metrics::metric_reader::TestMetricReader;
254 use crate::Resource;
255 use opentelemetry::global;
256 use opentelemetry::metrics::MeterProvider;
257 use opentelemetry::{Key, KeyValue, Value};
258 use std::env;
259
260 #[test]
261 fn test_meter_provider_resource() {
262 let assert_resource = |provider: &super::SdkMeterProvider,
263 resource_key: &'static str,
264 expect: Option<&'static str>| {
265 assert_eq!(
266 provider.inner.pipes.0[0]
267 .resource
268 .get(Key::from_static_str(resource_key))
269 .map(|v| v.to_string()),
270 expect.map(|s| s.to_string())
271 );
272 };
273 let assert_telemetry_resource = |provider: &super::SdkMeterProvider| {
274 assert_eq!(
275 provider.inner.pipes.0[0]
276 .resource
277 .get(TELEMETRY_SDK_LANGUAGE.into()),
278 Some(Value::from("rust"))
279 );
280 assert_eq!(
281 provider.inner.pipes.0[0]
282 .resource
283 .get(TELEMETRY_SDK_NAME.into()),
284 Some(Value::from("opentelemetry"))
285 );
286 assert_eq!(
287 provider.inner.pipes.0[0]
288 .resource
289 .get(TELEMETRY_SDK_VERSION.into()),
290 Some(Value::from(env!("CARGO_PKG_VERSION")))
291 );
292 };
293
294 temp_env::with_var_unset("OTEL_RESOURCE_ATTRIBUTES", || {
296 let reader = TestMetricReader::new();
297 let default_meter_provider = super::SdkMeterProvider::builder()
298 .with_reader(reader)
299 .build();
300 assert_resource(
301 &default_meter_provider,
302 SERVICE_NAME,
303 Some("unknown_service"),
304 );
305 assert_telemetry_resource(&default_meter_provider);
306 });
307
308 let reader2 = TestMetricReader::new();
310 let custom_meter_provider = super::SdkMeterProvider::builder()
311 .with_reader(reader2)
312 .with_resource(Resource::new(vec![KeyValue::new(
313 SERVICE_NAME,
314 "test_service",
315 )]))
316 .build();
317 assert_resource(&custom_meter_provider, SERVICE_NAME, Some("test_service"));
318 assert_eq!(custom_meter_provider.inner.pipes.0[0].resource.len(), 1);
319
320 temp_env::with_var(
321 "OTEL_RESOURCE_ATTRIBUTES",
322 Some("key1=value1, k2, k3=value2"),
323 || {
324 let reader3 = TestMetricReader::new();
326 let env_resource_provider = super::SdkMeterProvider::builder()
327 .with_reader(reader3)
328 .build();
329 assert_resource(
330 &env_resource_provider,
331 SERVICE_NAME,
332 Some("unknown_service"),
333 );
334 assert_resource(&env_resource_provider, "key1", Some("value1"));
335 assert_resource(&env_resource_provider, "k3", Some("value2"));
336 assert_telemetry_resource(&env_resource_provider);
337 assert_eq!(env_resource_provider.inner.pipes.0[0].resource.len(), 6);
338 },
339 );
340
341 temp_env::with_var(
343 "OTEL_RESOURCE_ATTRIBUTES",
344 Some("my-custom-key=env-val,k2=value2"),
345 || {
346 let reader4 = TestMetricReader::new();
347 let user_provided_resource_config_provider = super::SdkMeterProvider::builder()
348 .with_reader(reader4)
349 .with_resource(Resource::default().merge(&mut Resource::new(vec![
350 KeyValue::new("my-custom-key", "my-custom-value"),
351 KeyValue::new("my-custom-key2", "my-custom-value2"),
352 ])))
353 .build();
354 assert_resource(
355 &user_provided_resource_config_provider,
356 SERVICE_NAME,
357 Some("unknown_service"),
358 );
359 assert_resource(
360 &user_provided_resource_config_provider,
361 "my-custom-key",
362 Some("my-custom-value"),
363 );
364 assert_resource(
365 &user_provided_resource_config_provider,
366 "my-custom-key2",
367 Some("my-custom-value2"),
368 );
369 assert_resource(
370 &user_provided_resource_config_provider,
371 "k2",
372 Some("value2"),
373 );
374 assert_telemetry_resource(&user_provided_resource_config_provider);
375 assert_eq!(
376 user_provided_resource_config_provider.inner.pipes.0[0]
377 .resource
378 .len(),
379 7
380 );
381 },
382 );
383
384 let reader5 = TestMetricReader::new();
386 let no_service_name = super::SdkMeterProvider::builder()
387 .with_reader(reader5)
388 .with_resource(Resource::empty())
389 .build();
390
391 assert_eq!(no_service_name.inner.pipes.0[0].resource.len(), 0)
392 }
393
394 #[test]
395 fn test_meter_provider_shutdown() {
396 let reader = TestMetricReader::new();
397 let provider = super::SdkMeterProvider::builder()
398 .with_reader(reader.clone())
399 .build();
400 global::set_meter_provider(provider.clone());
401 assert!(!reader.is_shutdown());
402 let meter = global::meter("test");
404 let counter = meter.u64_counter("test_counter").init();
405 let shutdown_res = provider.shutdown();
407 assert!(shutdown_res.is_ok());
408
409 let shutdown_res = provider.shutdown();
411 assert!(shutdown_res.is_err());
412 assert!(reader.is_shutdown());
413 counter.add(1, &[]);
416 }
417 #[test]
418 fn test_shutdown_invoked_on_last_drop() {
419 let reader = TestMetricReader::new();
420 let provider = super::SdkMeterProvider::builder()
421 .with_reader(reader.clone())
422 .build();
423 let clone1 = provider.clone();
424 let clone2 = provider.clone();
425
426 assert!(!reader.is_shutdown());
428
429 drop(clone1);
431 assert!(!reader.is_shutdown());
432
433 drop(clone2);
435 assert!(!reader.is_shutdown());
436
437 drop(provider);
439 assert!(reader.is_shutdown());
441 }
442
443 #[test]
444 fn same_meter_reused_same_scope() {
445 let provider = super::SdkMeterProvider::builder().build();
446 let _meter1 = provider.meter("test");
447 let _meter2 = provider.meter("test");
448 assert_eq!(provider.inner.meters.lock().unwrap().len(), 1);
449 let _meter3 =
450 provider.versioned_meter("test", Some("1.0.0"), Some("http://example.com"), None);
451 let _meter4 =
452 provider.versioned_meter("test", Some("1.0.0"), Some("http://example.com"), None);
453 let _meter5 =
454 provider.versioned_meter("test", Some("1.0.0"), Some("http://example.com"), None);
455 assert_eq!(provider.inner.meters.lock().unwrap().len(), 2);
456
457 let _meter6 =
459 provider.versioned_meter("ABC", Some("1.0.0"), Some("http://example.com"), None);
460 let _meter7 =
461 provider.versioned_meter("Abc", Some("1.0.0"), Some("http://example.com"), None);
462 let _meter8 =
463 provider.versioned_meter("abc", Some("1.0.0"), Some("http://example.com"), None);
464 assert_eq!(provider.inner.meters.lock().unwrap().len(), 5);
465 }
466}