opentelemetry_sdk/trace/
provider.rs

1//! # Trace Provider SDK
2//!
3//! ## Tracer Creation
4//!
5//! New [`Tracer`] instances are always created through a [`TracerProvider`].
6//!
7//! All configuration objects and extension points (span processors,
8//! propagators) are provided by the [`TracerProvider`]. [`Tracer`] instances do
9//! not duplicate this data to avoid that different [`Tracer`] instances
10//! of the [`TracerProvider`] have different versions of these data.
11use crate::runtime::RuntimeChannel;
12use crate::trace::{
13    BatchSpanProcessor, Config, RandomIdGenerator, Sampler, SimpleSpanProcessor, SpanLimits, Tracer,
14};
15use crate::{export::trace::SpanExporter, trace::SpanProcessor};
16use crate::{InstrumentationLibrary, Resource};
17use once_cell::sync::{Lazy, OnceCell};
18use opentelemetry::trace::TraceError;
19use opentelemetry::{global, trace::TraceResult};
20use std::borrow::Cow;
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::Arc;
23
24/// Default tracer name if empty string is provided.
25const DEFAULT_COMPONENT_NAME: &str = "rust.opentelemetry.io/sdk/tracer";
26static PROVIDER_RESOURCE: OnceCell<Resource> = OnceCell::new();
27
28// a no nop tracer provider used as placeholder when the provider is shutdown
29static NOOP_TRACER_PROVIDER: Lazy<TracerProvider> = Lazy::new(|| TracerProvider {
30    inner: Arc::new(TracerProviderInner {
31        processors: Vec::new(),
32        config: Config {
33            // cannot use default here as the default resource is not empty
34            sampler: Box::new(Sampler::ParentBased(Box::new(Sampler::AlwaysOn))),
35            id_generator: Box::<RandomIdGenerator>::default(),
36            span_limits: SpanLimits::default(),
37            resource: Cow::Owned(Resource::empty()),
38        },
39    }),
40    is_shutdown: Arc::new(AtomicBool::new(true)),
41});
42
43/// TracerProvider inner type
44#[derive(Debug)]
45pub(crate) struct TracerProviderInner {
46    processors: Vec<Box<dyn SpanProcessor>>,
47    config: crate::trace::Config,
48}
49
50impl Drop for TracerProviderInner {
51    fn drop(&mut self) {
52        for processor in &mut self.processors {
53            if let Err(err) = processor.shutdown() {
54                global::handle_error(err);
55            }
56        }
57    }
58}
59
60/// Creator and registry of named [`Tracer`] instances.
61///
62/// `TracerProvider` is lightweight container holding pointers to `SpanProcessor` and other components.
63/// Cloning and dropping them will not stop the span processing. To stop span processing, users
64/// must either call `shutdown` method explicitly, or drop every clone of `TracerProvider`.
65#[derive(Clone, Debug)]
66pub struct TracerProvider {
67    inner: Arc<TracerProviderInner>,
68    is_shutdown: Arc<AtomicBool>,
69}
70
71impl Default for TracerProvider {
72    fn default() -> Self {
73        TracerProvider::builder().build()
74    }
75}
76
77impl TracerProvider {
78    /// Build a new tracer provider
79    pub(crate) fn new(inner: TracerProviderInner) -> Self {
80        TracerProvider {
81            inner: Arc::new(inner),
82            is_shutdown: Arc::new(AtomicBool::new(false)),
83        }
84    }
85
86    /// Create a new [`TracerProvider`] builder.
87    pub fn builder() -> Builder {
88        Builder::default()
89    }
90
91    /// Span processors associated with this provider
92    pub(crate) fn span_processors(&self) -> &[Box<dyn SpanProcessor>] {
93        &self.inner.processors
94    }
95
96    /// Config associated with this tracer
97    pub(crate) fn config(&self) -> &crate::trace::Config {
98        &self.inner.config
99    }
100
101    /// true if the provider has been shutdown
102    /// Don't start span or export spans when provider is shutdown
103    pub(crate) fn is_shutdown(&self) -> bool {
104        self.is_shutdown.load(Ordering::Relaxed)
105    }
106
107    /// Force flush all remaining spans in span processors and return results.
108    ///
109    /// # Examples
110    ///
111    /// ```
112    /// use opentelemetry::global;
113    /// use opentelemetry_sdk::trace::TracerProvider;
114    ///
115    /// fn init_tracing() -> TracerProvider {
116    ///     let provider = TracerProvider::default();
117    ///
118    ///     // Set provider to be used as global tracer provider
119    ///     let _ = global::set_tracer_provider(provider.clone());
120    ///
121    ///     provider
122    /// }
123    ///
124    /// fn main() {
125    ///     let provider = init_tracing();
126    ///
127    ///     // create spans..
128    ///
129    ///     // force all spans to flush
130    ///     for result in provider.force_flush() {
131    ///         if let Err(err) = result {
132    ///             // .. handle flush error
133    ///         }
134    ///     }
135    ///
136    ///     // create more spans..
137    ///
138    ///     // dropping provider and shutting down global provider ensure all
139    ///     // remaining spans are exported
140    ///     drop(provider);
141    ///     global::shutdown_tracer_provider();
142    /// }
143    /// ```
144    pub fn force_flush(&self) -> Vec<TraceResult<()>> {
145        self.span_processors()
146            .iter()
147            .map(|processor| processor.force_flush())
148            .collect()
149    }
150
151    /// Shuts down the current `TracerProvider`.
152    ///
153    /// Note that shut down doesn't means the TracerProvider has dropped
154    pub fn shutdown(&self) -> TraceResult<()> {
155        if self
156            .is_shutdown
157            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
158            .is_ok()
159        {
160            // propagate the shutdown signal to processors
161            // it's up to the processor to properly block new spans after shutdown
162            let mut errs = vec![];
163            for processor in &self.inner.processors {
164                if let Err(err) = processor.shutdown() {
165                    errs.push(err);
166                }
167            }
168
169            if errs.is_empty() {
170                Ok(())
171            } else {
172                Err(TraceError::Other(format!("{errs:?}").into()))
173            }
174        } else {
175            Err(TraceError::Other(
176                "tracer provider already shut down".into(),
177            ))
178        }
179    }
180}
181
182impl opentelemetry::trace::TracerProvider for TracerProvider {
183    /// This implementation of `TracerProvider` produces `Tracer` instances.
184    type Tracer = Tracer;
185
186    /// Create a new versioned `Tracer` instance.
187    fn versioned_tracer(
188        &self,
189        name: impl Into<Cow<'static, str>>,
190        version: Option<impl Into<Cow<'static, str>>>,
191        schema_url: Option<impl Into<Cow<'static, str>>>,
192        attributes: Option<Vec<opentelemetry::KeyValue>>,
193    ) -> Self::Tracer {
194        // Use default value if name is invalid empty string
195        let name = name.into();
196        let component_name = if name.is_empty() {
197            Cow::Borrowed(DEFAULT_COMPONENT_NAME)
198        } else {
199            name
200        };
201
202        let mut builder = self.tracer_builder(component_name);
203
204        if let Some(v) = version {
205            builder = builder.with_version(v);
206        }
207        if let Some(s) = schema_url {
208            builder = builder.with_schema_url(s);
209        }
210        if let Some(a) = attributes {
211            builder = builder.with_attributes(a);
212        }
213
214        builder.build()
215    }
216
217    fn library_tracer(&self, library: Arc<InstrumentationLibrary>) -> Self::Tracer {
218        if self.is_shutdown.load(Ordering::Relaxed) {
219            return Tracer::new(library, NOOP_TRACER_PROVIDER.clone());
220        }
221        Tracer::new(library, self.clone())
222    }
223}
224
225/// Builder for provider attributes.
226#[derive(Debug, Default)]
227pub struct Builder {
228    processors: Vec<Box<dyn SpanProcessor>>,
229    config: crate::trace::Config,
230}
231
232impl Builder {
233    /// The `SpanExporter` that this provider should use.
234    pub fn with_simple_exporter<T: SpanExporter + 'static>(self, exporter: T) -> Self {
235        let mut processors = self.processors;
236        processors.push(Box::new(SimpleSpanProcessor::new(Box::new(exporter))));
237
238        Builder { processors, ..self }
239    }
240
241    /// The [`SpanExporter`] setup using a default [`BatchSpanProcessor`] that this provider should use.
242    pub fn with_batch_exporter<T: SpanExporter + 'static, R: RuntimeChannel>(
243        self,
244        exporter: T,
245        runtime: R,
246    ) -> Self {
247        let batch = BatchSpanProcessor::builder(exporter, runtime).build();
248        self.with_span_processor(batch)
249    }
250
251    /// The [`SpanProcessor`] that this provider should use.
252    pub fn with_span_processor<T: SpanProcessor + 'static>(self, processor: T) -> Self {
253        let mut processors = self.processors;
254        processors.push(Box::new(processor));
255
256        Builder { processors, ..self }
257    }
258
259    /// The sdk [`crate::trace::Config`] that this provider will use.
260    pub fn with_config(self, config: crate::trace::Config) -> Self {
261        Builder { config, ..self }
262    }
263
264    /// Create a new provider from this configuration.
265    pub fn build(self) -> TracerProvider {
266        let mut config = self.config;
267
268        // Standard config will contain an owned [`Resource`] (either sdk default or use supplied)
269        // we can optimize the common case with a static ref to avoid cloning the underlying
270        // resource data for each span.
271        //
272        // For the uncommon case where there are multiple tracer providers with different resource
273        // configurations, users can optionally provide their own borrowed static resource.
274        if matches!(config.resource, Cow::Owned(_)) {
275            config.resource = match PROVIDER_RESOURCE.try_insert(config.resource.into_owned()) {
276                Ok(static_resource) => Cow::Borrowed(static_resource),
277                Err((prev, new)) => {
278                    if prev == &new {
279                        Cow::Borrowed(prev)
280                    } else {
281                        Cow::Owned(new)
282                    }
283                }
284            }
285        }
286
287        // Create a new vector to hold the modified processors
288        let mut processors = self.processors;
289
290        // Set the resource for each processor
291        for p in &mut processors {
292            p.set_resource(config.resource.as_ref());
293        }
294
295        TracerProvider::new(TracerProviderInner { processors, config })
296    }
297}
298
299#[cfg(test)]
300mod tests {
301    use crate::export::trace::SpanData;
302    use crate::resource::{
303        SERVICE_NAME, TELEMETRY_SDK_LANGUAGE, TELEMETRY_SDK_NAME, TELEMETRY_SDK_VERSION,
304    };
305    use crate::trace::provider::TracerProviderInner;
306    use crate::trace::{Config, Span, SpanProcessor};
307    use crate::Resource;
308    use opentelemetry::trace::{TraceError, TraceResult, Tracer, TracerProvider};
309    use opentelemetry::{Context, Key, KeyValue, Value};
310    use std::borrow::Cow;
311    use std::env;
312    use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
313    use std::sync::Arc;
314
315    // fields below is wrapped with Arc so we can assert it
316    #[derive(Default, Debug)]
317    struct AssertInfo {
318        started_span: AtomicU32,
319        is_shutdown: AtomicBool,
320    }
321
322    #[derive(Default, Debug, Clone)]
323    struct SharedAssertInfo(Arc<AssertInfo>);
324
325    impl SharedAssertInfo {
326        fn started_span_count(&self, count: u32) -> bool {
327            self.0.started_span.load(Ordering::SeqCst) == count
328        }
329    }
330
331    #[derive(Debug)]
332    struct TestSpanProcessor {
333        success: bool,
334        assert_info: SharedAssertInfo,
335    }
336
337    impl TestSpanProcessor {
338        fn new(success: bool) -> TestSpanProcessor {
339            TestSpanProcessor {
340                success,
341                assert_info: SharedAssertInfo::default(),
342            }
343        }
344
345        // get handle to assert info
346        fn assert_info(&self) -> SharedAssertInfo {
347            self.assert_info.clone()
348        }
349    }
350
351    impl SpanProcessor for TestSpanProcessor {
352        fn on_start(&self, _span: &mut Span, _cx: &Context) {
353            self.assert_info
354                .0
355                .started_span
356                .fetch_add(1, Ordering::SeqCst);
357        }
358
359        fn on_end(&self, _span: SpanData) {
360            // ignore
361        }
362
363        fn force_flush(&self) -> TraceResult<()> {
364            if self.success {
365                Ok(())
366            } else {
367                Err(TraceError::from("cannot export"))
368            }
369        }
370
371        fn shutdown(&self) -> TraceResult<()> {
372            if self.assert_info.0.is_shutdown.load(Ordering::SeqCst) {
373                Ok(())
374            } else {
375                let _ = self.assert_info.0.is_shutdown.compare_exchange(
376                    false,
377                    true,
378                    Ordering::SeqCst,
379                    Ordering::SeqCst,
380                );
381                self.force_flush()
382            }
383        }
384    }
385
386    #[test]
387    fn test_force_flush() {
388        let tracer_provider = super::TracerProvider::new(TracerProviderInner {
389            processors: vec![
390                Box::from(TestSpanProcessor::new(true)),
391                Box::from(TestSpanProcessor::new(false)),
392            ],
393            config: Default::default(),
394        });
395
396        let results = tracer_provider.force_flush();
397        assert_eq!(results.len(), 2);
398    }
399
400    #[test]
401    fn test_tracer_provider_default_resource() {
402        let assert_resource = |provider: &super::TracerProvider,
403                               resource_key: &'static str,
404                               expect: Option<&'static str>| {
405            assert_eq!(
406                provider
407                    .config()
408                    .resource
409                    .get(Key::from_static_str(resource_key))
410                    .map(|v| v.to_string()),
411                expect.map(|s| s.to_string())
412            );
413        };
414        let assert_telemetry_resource = |provider: &super::TracerProvider| {
415            assert_eq!(
416                provider
417                    .config()
418                    .resource
419                    .get(TELEMETRY_SDK_LANGUAGE.into()),
420                Some(Value::from("rust"))
421            );
422            assert_eq!(
423                provider.config().resource.get(TELEMETRY_SDK_NAME.into()),
424                Some(Value::from("opentelemetry"))
425            );
426            assert_eq!(
427                provider.config().resource.get(TELEMETRY_SDK_VERSION.into()),
428                Some(Value::from(env!("CARGO_PKG_VERSION")))
429            );
430        };
431
432        // If users didn't provide a resource and there isn't a env var set. Use default one.
433        temp_env::with_var_unset("OTEL_RESOURCE_ATTRIBUTES", || {
434            let default_config_provider = super::TracerProvider::builder().build();
435            assert_resource(
436                &default_config_provider,
437                SERVICE_NAME,
438                Some("unknown_service"),
439            );
440            assert_telemetry_resource(&default_config_provider);
441        });
442
443        // If user provided a resource, use that.
444        let custom_config_provider = super::TracerProvider::builder()
445            .with_config(Config {
446                resource: Cow::Owned(Resource::new(vec![KeyValue::new(
447                    SERVICE_NAME,
448                    "test_service",
449                )])),
450                ..Default::default()
451            })
452            .build();
453        assert_resource(&custom_config_provider, SERVICE_NAME, Some("test_service"));
454        assert_eq!(custom_config_provider.config().resource.len(), 1);
455
456        // If `OTEL_RESOURCE_ATTRIBUTES` is set, read them automatically
457        temp_env::with_var(
458            "OTEL_RESOURCE_ATTRIBUTES",
459            Some("key1=value1, k2, k3=value2"),
460            || {
461                let env_resource_provider = super::TracerProvider::builder().build();
462                assert_resource(
463                    &env_resource_provider,
464                    SERVICE_NAME,
465                    Some("unknown_service"),
466                );
467                assert_resource(&env_resource_provider, "key1", Some("value1"));
468                assert_resource(&env_resource_provider, "k3", Some("value2"));
469                assert_telemetry_resource(&env_resource_provider);
470                assert_eq!(env_resource_provider.config().resource.len(), 6);
471            },
472        );
473
474        // When `OTEL_RESOURCE_ATTRIBUTES` is set and also user provided config
475        temp_env::with_var(
476            "OTEL_RESOURCE_ATTRIBUTES",
477            Some("my-custom-key=env-val,k2=value2"),
478            || {
479                let user_provided_resource_config_provider = super::TracerProvider::builder()
480                    .with_config(Config {
481                        resource: Cow::Owned(Resource::default().merge(&mut Resource::new(vec![
482                            KeyValue::new("my-custom-key", "my-custom-value"),
483                            KeyValue::new("my-custom-key2", "my-custom-value2"),
484                        ]))),
485                        ..Default::default()
486                    })
487                    .build();
488                assert_resource(
489                    &user_provided_resource_config_provider,
490                    SERVICE_NAME,
491                    Some("unknown_service"),
492                );
493                assert_resource(
494                    &user_provided_resource_config_provider,
495                    "my-custom-key",
496                    Some("my-custom-value"),
497                );
498                assert_resource(
499                    &user_provided_resource_config_provider,
500                    "my-custom-key2",
501                    Some("my-custom-value2"),
502                );
503                assert_resource(
504                    &user_provided_resource_config_provider,
505                    "k2",
506                    Some("value2"),
507                );
508                assert_telemetry_resource(&user_provided_resource_config_provider);
509                assert_eq!(
510                    user_provided_resource_config_provider
511                        .config()
512                        .resource
513                        .len(),
514                    7
515                );
516            },
517        );
518
519        // If user provided a resource, it takes priority during collision.
520        let no_service_name = super::TracerProvider::builder()
521            .with_config(Config {
522                resource: Cow::Owned(Resource::empty()),
523                ..Default::default()
524            })
525            .build();
526
527        assert_eq!(no_service_name.config().resource.len(), 0)
528    }
529
530    #[test]
531    fn test_shutdown_noops() {
532        let processor = TestSpanProcessor::new(false);
533        let assert_handle = processor.assert_info();
534        let tracer_provider = super::TracerProvider::new(TracerProviderInner {
535            processors: vec![Box::from(processor)],
536            config: Default::default(),
537        });
538
539        let test_tracer_1 = tracer_provider.tracer("test1");
540        let _ = test_tracer_1.start("test");
541
542        assert!(assert_handle.started_span_count(1));
543
544        let _ = test_tracer_1.start("test");
545
546        assert!(assert_handle.started_span_count(2));
547
548        let shutdown = |tracer_provider: super::TracerProvider| {
549            let _ = tracer_provider.shutdown(); // shutdown once
550        };
551
552        // assert tracer provider can be shutdown using on a cloned version
553        shutdown(tracer_provider.clone());
554
555        // after shutdown we should get noop tracer
556        let noop_tracer = tracer_provider.tracer("noop");
557        // noop tracer cannot start anything
558        let _ = noop_tracer.start("test");
559        assert!(assert_handle.started_span_count(2));
560        // noop tracer's tracer provider should be shutdown
561        assert!(noop_tracer.provider().is_shutdown.load(Ordering::SeqCst));
562
563        // existing tracer becomes noops after shutdown
564        let _ = test_tracer_1.start("test");
565        assert!(assert_handle.started_span_count(2));
566    }
567}