1use crate::runtime::RuntimeChannel;
12use crate::trace::{
13 BatchSpanProcessor, Config, RandomIdGenerator, Sampler, SimpleSpanProcessor, SpanLimits, Tracer,
14};
15use crate::{export::trace::SpanExporter, trace::SpanProcessor};
16use crate::{InstrumentationLibrary, Resource};
17use once_cell::sync::{Lazy, OnceCell};
18use opentelemetry::trace::TraceError;
19use opentelemetry::{global, trace::TraceResult};
20use std::borrow::Cow;
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::Arc;
23
24const DEFAULT_COMPONENT_NAME: &str = "rust.opentelemetry.io/sdk/tracer";
26static PROVIDER_RESOURCE: OnceCell<Resource> = OnceCell::new();
27
28static NOOP_TRACER_PROVIDER: Lazy<TracerProvider> = Lazy::new(|| TracerProvider {
30 inner: Arc::new(TracerProviderInner {
31 processors: Vec::new(),
32 config: Config {
33 sampler: Box::new(Sampler::ParentBased(Box::new(Sampler::AlwaysOn))),
35 id_generator: Box::<RandomIdGenerator>::default(),
36 span_limits: SpanLimits::default(),
37 resource: Cow::Owned(Resource::empty()),
38 },
39 }),
40 is_shutdown: Arc::new(AtomicBool::new(true)),
41});
42
43#[derive(Debug)]
45pub(crate) struct TracerProviderInner {
46 processors: Vec<Box<dyn SpanProcessor>>,
47 config: crate::trace::Config,
48}
49
50impl Drop for TracerProviderInner {
51 fn drop(&mut self) {
52 for processor in &mut self.processors {
53 if let Err(err) = processor.shutdown() {
54 global::handle_error(err);
55 }
56 }
57 }
58}
59
60#[derive(Clone, Debug)]
66pub struct TracerProvider {
67 inner: Arc<TracerProviderInner>,
68 is_shutdown: Arc<AtomicBool>,
69}
70
71impl Default for TracerProvider {
72 fn default() -> Self {
73 TracerProvider::builder().build()
74 }
75}
76
77impl TracerProvider {
78 pub(crate) fn new(inner: TracerProviderInner) -> Self {
80 TracerProvider {
81 inner: Arc::new(inner),
82 is_shutdown: Arc::new(AtomicBool::new(false)),
83 }
84 }
85
86 pub fn builder() -> Builder {
88 Builder::default()
89 }
90
91 pub(crate) fn span_processors(&self) -> &[Box<dyn SpanProcessor>] {
93 &self.inner.processors
94 }
95
96 pub(crate) fn config(&self) -> &crate::trace::Config {
98 &self.inner.config
99 }
100
101 pub(crate) fn is_shutdown(&self) -> bool {
104 self.is_shutdown.load(Ordering::Relaxed)
105 }
106
107 pub fn force_flush(&self) -> Vec<TraceResult<()>> {
145 self.span_processors()
146 .iter()
147 .map(|processor| processor.force_flush())
148 .collect()
149 }
150
151 pub fn shutdown(&self) -> TraceResult<()> {
155 if self
156 .is_shutdown
157 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
158 .is_ok()
159 {
160 let mut errs = vec![];
163 for processor in &self.inner.processors {
164 if let Err(err) = processor.shutdown() {
165 errs.push(err);
166 }
167 }
168
169 if errs.is_empty() {
170 Ok(())
171 } else {
172 Err(TraceError::Other(format!("{errs:?}").into()))
173 }
174 } else {
175 Err(TraceError::Other(
176 "tracer provider already shut down".into(),
177 ))
178 }
179 }
180}
181
182impl opentelemetry::trace::TracerProvider for TracerProvider {
183 type Tracer = Tracer;
185
186 fn versioned_tracer(
188 &self,
189 name: impl Into<Cow<'static, str>>,
190 version: Option<impl Into<Cow<'static, str>>>,
191 schema_url: Option<impl Into<Cow<'static, str>>>,
192 attributes: Option<Vec<opentelemetry::KeyValue>>,
193 ) -> Self::Tracer {
194 let name = name.into();
196 let component_name = if name.is_empty() {
197 Cow::Borrowed(DEFAULT_COMPONENT_NAME)
198 } else {
199 name
200 };
201
202 let mut builder = self.tracer_builder(component_name);
203
204 if let Some(v) = version {
205 builder = builder.with_version(v);
206 }
207 if let Some(s) = schema_url {
208 builder = builder.with_schema_url(s);
209 }
210 if let Some(a) = attributes {
211 builder = builder.with_attributes(a);
212 }
213
214 builder.build()
215 }
216
217 fn library_tracer(&self, library: Arc<InstrumentationLibrary>) -> Self::Tracer {
218 if self.is_shutdown.load(Ordering::Relaxed) {
219 return Tracer::new(library, NOOP_TRACER_PROVIDER.clone());
220 }
221 Tracer::new(library, self.clone())
222 }
223}
224
225#[derive(Debug, Default)]
227pub struct Builder {
228 processors: Vec<Box<dyn SpanProcessor>>,
229 config: crate::trace::Config,
230}
231
232impl Builder {
233 pub fn with_simple_exporter<T: SpanExporter + 'static>(self, exporter: T) -> Self {
235 let mut processors = self.processors;
236 processors.push(Box::new(SimpleSpanProcessor::new(Box::new(exporter))));
237
238 Builder { processors, ..self }
239 }
240
241 pub fn with_batch_exporter<T: SpanExporter + 'static, R: RuntimeChannel>(
243 self,
244 exporter: T,
245 runtime: R,
246 ) -> Self {
247 let batch = BatchSpanProcessor::builder(exporter, runtime).build();
248 self.with_span_processor(batch)
249 }
250
251 pub fn with_span_processor<T: SpanProcessor + 'static>(self, processor: T) -> Self {
253 let mut processors = self.processors;
254 processors.push(Box::new(processor));
255
256 Builder { processors, ..self }
257 }
258
259 pub fn with_config(self, config: crate::trace::Config) -> Self {
261 Builder { config, ..self }
262 }
263
264 pub fn build(self) -> TracerProvider {
266 let mut config = self.config;
267
268 if matches!(config.resource, Cow::Owned(_)) {
275 config.resource = match PROVIDER_RESOURCE.try_insert(config.resource.into_owned()) {
276 Ok(static_resource) => Cow::Borrowed(static_resource),
277 Err((prev, new)) => {
278 if prev == &new {
279 Cow::Borrowed(prev)
280 } else {
281 Cow::Owned(new)
282 }
283 }
284 }
285 }
286
287 let mut processors = self.processors;
289
290 for p in &mut processors {
292 p.set_resource(config.resource.as_ref());
293 }
294
295 TracerProvider::new(TracerProviderInner { processors, config })
296 }
297}
298
299#[cfg(test)]
300mod tests {
301 use crate::export::trace::SpanData;
302 use crate::resource::{
303 SERVICE_NAME, TELEMETRY_SDK_LANGUAGE, TELEMETRY_SDK_NAME, TELEMETRY_SDK_VERSION,
304 };
305 use crate::trace::provider::TracerProviderInner;
306 use crate::trace::{Config, Span, SpanProcessor};
307 use crate::Resource;
308 use opentelemetry::trace::{TraceError, TraceResult, Tracer, TracerProvider};
309 use opentelemetry::{Context, Key, KeyValue, Value};
310 use std::borrow::Cow;
311 use std::env;
312 use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
313 use std::sync::Arc;
314
315 #[derive(Default, Debug)]
317 struct AssertInfo {
318 started_span: AtomicU32,
319 is_shutdown: AtomicBool,
320 }
321
322 #[derive(Default, Debug, Clone)]
323 struct SharedAssertInfo(Arc<AssertInfo>);
324
325 impl SharedAssertInfo {
326 fn started_span_count(&self, count: u32) -> bool {
327 self.0.started_span.load(Ordering::SeqCst) == count
328 }
329 }
330
331 #[derive(Debug)]
332 struct TestSpanProcessor {
333 success: bool,
334 assert_info: SharedAssertInfo,
335 }
336
337 impl TestSpanProcessor {
338 fn new(success: bool) -> TestSpanProcessor {
339 TestSpanProcessor {
340 success,
341 assert_info: SharedAssertInfo::default(),
342 }
343 }
344
345 fn assert_info(&self) -> SharedAssertInfo {
347 self.assert_info.clone()
348 }
349 }
350
351 impl SpanProcessor for TestSpanProcessor {
352 fn on_start(&self, _span: &mut Span, _cx: &Context) {
353 self.assert_info
354 .0
355 .started_span
356 .fetch_add(1, Ordering::SeqCst);
357 }
358
359 fn on_end(&self, _span: SpanData) {
360 }
362
363 fn force_flush(&self) -> TraceResult<()> {
364 if self.success {
365 Ok(())
366 } else {
367 Err(TraceError::from("cannot export"))
368 }
369 }
370
371 fn shutdown(&self) -> TraceResult<()> {
372 if self.assert_info.0.is_shutdown.load(Ordering::SeqCst) {
373 Ok(())
374 } else {
375 let _ = self.assert_info.0.is_shutdown.compare_exchange(
376 false,
377 true,
378 Ordering::SeqCst,
379 Ordering::SeqCst,
380 );
381 self.force_flush()
382 }
383 }
384 }
385
386 #[test]
387 fn test_force_flush() {
388 let tracer_provider = super::TracerProvider::new(TracerProviderInner {
389 processors: vec![
390 Box::from(TestSpanProcessor::new(true)),
391 Box::from(TestSpanProcessor::new(false)),
392 ],
393 config: Default::default(),
394 });
395
396 let results = tracer_provider.force_flush();
397 assert_eq!(results.len(), 2);
398 }
399
400 #[test]
401 fn test_tracer_provider_default_resource() {
402 let assert_resource = |provider: &super::TracerProvider,
403 resource_key: &'static str,
404 expect: Option<&'static str>| {
405 assert_eq!(
406 provider
407 .config()
408 .resource
409 .get(Key::from_static_str(resource_key))
410 .map(|v| v.to_string()),
411 expect.map(|s| s.to_string())
412 );
413 };
414 let assert_telemetry_resource = |provider: &super::TracerProvider| {
415 assert_eq!(
416 provider
417 .config()
418 .resource
419 .get(TELEMETRY_SDK_LANGUAGE.into()),
420 Some(Value::from("rust"))
421 );
422 assert_eq!(
423 provider.config().resource.get(TELEMETRY_SDK_NAME.into()),
424 Some(Value::from("opentelemetry"))
425 );
426 assert_eq!(
427 provider.config().resource.get(TELEMETRY_SDK_VERSION.into()),
428 Some(Value::from(env!("CARGO_PKG_VERSION")))
429 );
430 };
431
432 temp_env::with_var_unset("OTEL_RESOURCE_ATTRIBUTES", || {
434 let default_config_provider = super::TracerProvider::builder().build();
435 assert_resource(
436 &default_config_provider,
437 SERVICE_NAME,
438 Some("unknown_service"),
439 );
440 assert_telemetry_resource(&default_config_provider);
441 });
442
443 let custom_config_provider = super::TracerProvider::builder()
445 .with_config(Config {
446 resource: Cow::Owned(Resource::new(vec![KeyValue::new(
447 SERVICE_NAME,
448 "test_service",
449 )])),
450 ..Default::default()
451 })
452 .build();
453 assert_resource(&custom_config_provider, SERVICE_NAME, Some("test_service"));
454 assert_eq!(custom_config_provider.config().resource.len(), 1);
455
456 temp_env::with_var(
458 "OTEL_RESOURCE_ATTRIBUTES",
459 Some("key1=value1, k2, k3=value2"),
460 || {
461 let env_resource_provider = super::TracerProvider::builder().build();
462 assert_resource(
463 &env_resource_provider,
464 SERVICE_NAME,
465 Some("unknown_service"),
466 );
467 assert_resource(&env_resource_provider, "key1", Some("value1"));
468 assert_resource(&env_resource_provider, "k3", Some("value2"));
469 assert_telemetry_resource(&env_resource_provider);
470 assert_eq!(env_resource_provider.config().resource.len(), 6);
471 },
472 );
473
474 temp_env::with_var(
476 "OTEL_RESOURCE_ATTRIBUTES",
477 Some("my-custom-key=env-val,k2=value2"),
478 || {
479 let user_provided_resource_config_provider = super::TracerProvider::builder()
480 .with_config(Config {
481 resource: Cow::Owned(Resource::default().merge(&mut Resource::new(vec![
482 KeyValue::new("my-custom-key", "my-custom-value"),
483 KeyValue::new("my-custom-key2", "my-custom-value2"),
484 ]))),
485 ..Default::default()
486 })
487 .build();
488 assert_resource(
489 &user_provided_resource_config_provider,
490 SERVICE_NAME,
491 Some("unknown_service"),
492 );
493 assert_resource(
494 &user_provided_resource_config_provider,
495 "my-custom-key",
496 Some("my-custom-value"),
497 );
498 assert_resource(
499 &user_provided_resource_config_provider,
500 "my-custom-key2",
501 Some("my-custom-value2"),
502 );
503 assert_resource(
504 &user_provided_resource_config_provider,
505 "k2",
506 Some("value2"),
507 );
508 assert_telemetry_resource(&user_provided_resource_config_provider);
509 assert_eq!(
510 user_provided_resource_config_provider
511 .config()
512 .resource
513 .len(),
514 7
515 );
516 },
517 );
518
519 let no_service_name = super::TracerProvider::builder()
521 .with_config(Config {
522 resource: Cow::Owned(Resource::empty()),
523 ..Default::default()
524 })
525 .build();
526
527 assert_eq!(no_service_name.config().resource.len(), 0)
528 }
529
530 #[test]
531 fn test_shutdown_noops() {
532 let processor = TestSpanProcessor::new(false);
533 let assert_handle = processor.assert_info();
534 let tracer_provider = super::TracerProvider::new(TracerProviderInner {
535 processors: vec![Box::from(processor)],
536 config: Default::default(),
537 });
538
539 let test_tracer_1 = tracer_provider.tracer("test1");
540 let _ = test_tracer_1.start("test");
541
542 assert!(assert_handle.started_span_count(1));
543
544 let _ = test_tracer_1.start("test");
545
546 assert!(assert_handle.started_span_count(2));
547
548 let shutdown = |tracer_provider: super::TracerProvider| {
549 let _ = tracer_provider.shutdown(); };
551
552 shutdown(tracer_provider.clone());
554
555 let noop_tracer = tracer_provider.tracer("noop");
557 let _ = noop_tracer.start("test");
559 assert!(assert_handle.started_span_count(2));
560 assert!(noop_tracer.provider().is_shutdown.load(Ordering::SeqCst));
562
563 let _ = test_tracer_1.start("test");
565 assert!(assert_handle.started_span_count(2));
566 }
567}