opentelemetry_sdk/logs/
log_emitter.rs

1use super::{BatchLogProcessor, LogProcessor, LogRecord, SimpleLogProcessor, TraceContext};
2use crate::{
3    export::logs::{LogData, LogExporter},
4    runtime::RuntimeChannel,
5    Resource,
6};
7use opentelemetry::{
8    global,
9    logs::{LogError, LogResult},
10    trace::TraceContextExt,
11    Context, InstrumentationLibrary,
12};
13
14#[cfg(feature = "logs_level_enabled")]
15use opentelemetry::logs::Severity;
16
17use std::{
18    borrow::Cow,
19    sync::{atomic::Ordering, Arc},
20};
21use std::{sync::atomic::AtomicBool, time::SystemTime};
22
23use once_cell::sync::Lazy;
24
25// a no nop logger provider used as placeholder when the provider is shutdown
26static NOOP_LOGGER_PROVIDER: Lazy<LoggerProvider> = Lazy::new(|| LoggerProvider {
27    inner: Arc::new(LoggerProviderInner {
28        processors: Vec::new(),
29        resource: Resource::empty(),
30    }),
31    is_shutdown: Arc::new(AtomicBool::new(true)),
32});
33
34#[derive(Debug, Clone)]
35/// Creator for `Logger` instances.
36pub struct LoggerProvider {
37    inner: Arc<LoggerProviderInner>,
38    is_shutdown: Arc<AtomicBool>,
39}
40
41/// Default logger name if empty string is provided.
42const DEFAULT_COMPONENT_NAME: &str = "rust.opentelemetry.io/sdk/logger";
43// According to a Go-specific study mentioned on https://go.dev/blog/slog,
44// up to 5 attributes is the most common case. We have chosen 8 as the default
45// capacity for attributes to avoid reallocation in common scenarios.
46const PREALLOCATED_ATTRIBUTE_CAPACITY: usize = 8;
47
48impl opentelemetry::logs::LoggerProvider for LoggerProvider {
49    type Logger = Logger;
50
51    /// Create a new versioned `Logger` instance.
52    fn versioned_logger(
53        &self,
54        name: impl Into<Cow<'static, str>>,
55        version: Option<Cow<'static, str>>,
56        schema_url: Option<Cow<'static, str>>,
57        attributes: Option<Vec<opentelemetry::KeyValue>>,
58    ) -> Logger {
59        let name = name.into();
60
61        let component_name = if name.is_empty() {
62            Cow::Borrowed(DEFAULT_COMPONENT_NAME)
63        } else {
64            name
65        };
66
67        let mut builder = self.logger_builder(component_name);
68
69        if let Some(v) = version {
70            builder = builder.with_version(v);
71        }
72        if let Some(s) = schema_url {
73            builder = builder.with_schema_url(s);
74        }
75        if let Some(a) = attributes {
76            builder = builder.with_attributes(a);
77        }
78
79        builder.build()
80    }
81
82    fn library_logger(&self, library: Arc<InstrumentationLibrary>) -> Self::Logger {
83        // If the provider is shutdown, new logger will refer a no-op logger provider.
84        if self.is_shutdown.load(Ordering::Relaxed) {
85            return Logger::new(library, NOOP_LOGGER_PROVIDER.clone());
86        }
87        Logger::new(library, self.clone())
88    }
89}
90
91impl LoggerProvider {
92    /// Create a new `LoggerProvider` builder.
93    pub fn builder() -> Builder {
94        Builder::default()
95    }
96
97    pub(crate) fn log_processors(&self) -> &[Box<dyn LogProcessor>] {
98        &self.inner.processors
99    }
100
101    pub(crate) fn resource(&self) -> &Resource {
102        &self.inner.resource
103    }
104
105    /// Force flush all remaining logs in log processors and return results.
106    pub fn force_flush(&self) -> Vec<LogResult<()>> {
107        self.log_processors()
108            .iter()
109            .map(|processor| processor.force_flush())
110            .collect()
111    }
112
113    /// Shuts down this `LoggerProvider`
114    pub fn shutdown(&self) -> LogResult<()> {
115        if self
116            .is_shutdown
117            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
118            .is_ok()
119        {
120            // propagate the shutdown signal to processors
121            // it's up to the processor to properly block new logs after shutdown
122            let mut errs = vec![];
123            for processor in &self.inner.processors {
124                if let Err(err) = processor.shutdown() {
125                    errs.push(err);
126                }
127            }
128
129            if errs.is_empty() {
130                Ok(())
131            } else {
132                Err(LogError::Other(format!("{errs:?}").into()))
133            }
134        } else {
135            Err(LogError::Other("logger provider already shut down".into()))
136        }
137    }
138}
139
140#[derive(Debug)]
141struct LoggerProviderInner {
142    processors: Vec<Box<dyn LogProcessor>>,
143    resource: Resource,
144}
145
146impl Drop for LoggerProviderInner {
147    fn drop(&mut self) {
148        for processor in &mut self.processors {
149            if let Err(err) = processor.shutdown() {
150                global::handle_error(err);
151            }
152        }
153    }
154}
155
156#[derive(Debug, Default)]
157/// Builder for provider attributes.
158pub struct Builder {
159    processors: Vec<Box<dyn LogProcessor>>,
160    resource: Option<Resource>,
161}
162
163impl Builder {
164    /// The `LogExporter` that this provider should use.
165    pub fn with_simple_exporter<T: LogExporter + 'static>(self, exporter: T) -> Self {
166        let mut processors = self.processors;
167        processors.push(Box::new(SimpleLogProcessor::new(Box::new(exporter))));
168
169        Builder { processors, ..self }
170    }
171
172    /// The `LogExporter` setup using a default `BatchLogProcessor` that this provider should use.
173    pub fn with_batch_exporter<T: LogExporter + 'static, R: RuntimeChannel>(
174        self,
175        exporter: T,
176        runtime: R,
177    ) -> Self {
178        let batch = BatchLogProcessor::builder(exporter, runtime).build();
179        self.with_log_processor(batch)
180    }
181
182    /// The `LogProcessor` that this provider should use.
183    pub fn with_log_processor<T: LogProcessor + 'static>(self, processor: T) -> Self {
184        let mut processors = self.processors;
185        processors.push(Box::new(processor));
186
187        Builder { processors, ..self }
188    }
189
190    /// The `Resource` to be associated with this Provider.
191    pub fn with_resource(self, resource: Resource) -> Self {
192        Builder {
193            resource: Some(resource),
194            ..self
195        }
196    }
197
198    /// Create a new provider from this configuration.
199    pub fn build(self) -> LoggerProvider {
200        let resource = self.resource.unwrap_or_default();
201
202        let logger_provider = LoggerProvider {
203            inner: Arc::new(LoggerProviderInner {
204                processors: self.processors,
205                resource,
206            }),
207            is_shutdown: Arc::new(AtomicBool::new(false)),
208        };
209
210        // invoke set_resource on all the processors
211        for processor in logger_provider.log_processors() {
212            processor.set_resource(logger_provider.resource());
213        }
214        logger_provider
215    }
216}
217
218#[derive(Debug)]
219/// The object for emitting [`LogRecord`]s.
220///
221/// [`LogRecord`]: opentelemetry::logs::LogRecord
222pub struct Logger {
223    instrumentation_lib: Arc<InstrumentationLibrary>,
224    provider: LoggerProvider,
225}
226
227impl Logger {
228    pub(crate) fn new(
229        instrumentation_lib: Arc<InstrumentationLibrary>,
230        provider: LoggerProvider,
231    ) -> Self {
232        Logger {
233            instrumentation_lib,
234            provider,
235        }
236    }
237
238    /// LoggerProvider associated with this logger.
239    pub fn provider(&self) -> &LoggerProvider {
240        &self.provider
241    }
242
243    /// Instrumentation library information of this logger.
244    pub fn instrumentation_library(&self) -> &InstrumentationLibrary {
245        &self.instrumentation_lib
246    }
247}
248
249impl opentelemetry::logs::Logger for Logger {
250    type LogRecord = LogRecord;
251
252    fn create_log_record(&self) -> Self::LogRecord {
253        // Reserve attributes memory for perf optimization. This may change in future.
254        LogRecord {
255            attributes: Some(Vec::with_capacity(PREALLOCATED_ATTRIBUTE_CAPACITY)),
256            ..Default::default()
257        }
258    }
259
260    /// Emit a `LogRecord`.
261    fn emit(&self, record: Self::LogRecord) {
262        let provider = self.provider();
263        let processors = provider.log_processors();
264        let trace_context = Context::map_current(|cx| {
265            cx.has_active_span()
266                .then(|| TraceContext::from(cx.span().span_context()))
267        });
268        let mut log_record = record;
269        if let Some(ref trace_context) = trace_context {
270            log_record.trace_context = Some(trace_context.clone());
271        }
272        if log_record.observed_timestamp.is_none() {
273            log_record.observed_timestamp = Some(SystemTime::now());
274        }
275
276        let mut data = LogData {
277            record: log_record,
278            instrumentation: self.instrumentation_library().clone(),
279        };
280
281        for p in processors {
282            p.emit(&mut data);
283        }
284    }
285
286    #[cfg(feature = "logs_level_enabled")]
287    fn event_enabled(&self, level: Severity, target: &str) -> bool {
288        let provider = self.provider();
289
290        let mut enabled = false;
291        for processor in provider.log_processors() {
292            enabled = enabled
293                || processor.event_enabled(
294                    level,
295                    target,
296                    self.instrumentation_library().name.as_ref(),
297                );
298        }
299        enabled
300    }
301}
302
303#[cfg(test)]
304mod tests {
305    use crate::resource::{
306        SERVICE_NAME, TELEMETRY_SDK_LANGUAGE, TELEMETRY_SDK_NAME, TELEMETRY_SDK_VERSION,
307    };
308    use crate::Resource;
309
310    use super::*;
311    use opentelemetry::logs::{Logger, LoggerProvider as _};
312    use opentelemetry::{Key, KeyValue, Value};
313    use std::fmt::{Debug, Formatter};
314    use std::sync::atomic::AtomicU64;
315    use std::sync::Mutex;
316    use std::thread;
317
318    struct ShutdownTestLogProcessor {
319        is_shutdown: Arc<Mutex<bool>>,
320        counter: Arc<AtomicU64>,
321    }
322
323    impl Debug for ShutdownTestLogProcessor {
324        fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
325            todo!()
326        }
327    }
328
329    impl ShutdownTestLogProcessor {
330        pub(crate) fn new(counter: Arc<AtomicU64>) -> Self {
331            ShutdownTestLogProcessor {
332                is_shutdown: Arc::new(Mutex::new(false)),
333                counter,
334            }
335        }
336    }
337
338    impl LogProcessor for ShutdownTestLogProcessor {
339        fn emit(&self, _data: &mut LogData) {
340            self.is_shutdown
341                .lock()
342                .map(|is_shutdown| {
343                    if !*is_shutdown {
344                        self.counter
345                            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
346                    }
347                })
348                .expect("lock poisoned");
349        }
350
351        fn force_flush(&self) -> LogResult<()> {
352            Ok(())
353        }
354
355        fn shutdown(&self) -> LogResult<()> {
356            self.is_shutdown
357                .lock()
358                .map(|mut is_shutdown| *is_shutdown = true)
359                .expect("lock poisoned");
360            Ok(())
361        }
362
363        #[cfg(feature = "logs_level_enabled")]
364        fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
365            true
366        }
367    }
368    #[test]
369    fn test_logger_provider_default_resource() {
370        let assert_resource = |provider: &super::LoggerProvider,
371                               resource_key: &'static str,
372                               expect: Option<&'static str>| {
373            assert_eq!(
374                provider
375                    .resource()
376                    .get(Key::from_static_str(resource_key))
377                    .map(|v| v.to_string()),
378                expect.map(|s| s.to_string())
379            );
380        };
381        let assert_telemetry_resource = |provider: &super::LoggerProvider| {
382            assert_eq!(
383                provider.resource().get(TELEMETRY_SDK_LANGUAGE.into()),
384                Some(Value::from("rust"))
385            );
386            assert_eq!(
387                provider.resource().get(TELEMETRY_SDK_NAME.into()),
388                Some(Value::from("opentelemetry"))
389            );
390            assert_eq!(
391                provider.resource().get(TELEMETRY_SDK_VERSION.into()),
392                Some(Value::from(env!("CARGO_PKG_VERSION")))
393            );
394        };
395
396        // If users didn't provide a resource and there isn't a env var set. Use default one.
397        temp_env::with_var_unset("OTEL_RESOURCE_ATTRIBUTES", || {
398            let default_config_provider = super::LoggerProvider::builder().build();
399            assert_resource(
400                &default_config_provider,
401                SERVICE_NAME,
402                Some("unknown_service"),
403            );
404            assert_telemetry_resource(&default_config_provider);
405        });
406
407        // If user provided a resource, use that.
408        let custom_config_provider = super::LoggerProvider::builder()
409            .with_resource(Resource::new(vec![KeyValue::new(
410                SERVICE_NAME,
411                "test_service",
412            )]))
413            .build();
414        assert_resource(&custom_config_provider, SERVICE_NAME, Some("test_service"));
415        assert_eq!(custom_config_provider.resource().len(), 1);
416
417        // If `OTEL_RESOURCE_ATTRIBUTES` is set, read them automatically
418        temp_env::with_var(
419            "OTEL_RESOURCE_ATTRIBUTES",
420            Some("key1=value1, k2, k3=value2"),
421            || {
422                let env_resource_provider = super::LoggerProvider::builder().build();
423                assert_resource(
424                    &env_resource_provider,
425                    SERVICE_NAME,
426                    Some("unknown_service"),
427                );
428                assert_resource(&env_resource_provider, "key1", Some("value1"));
429                assert_resource(&env_resource_provider, "k3", Some("value2"));
430                assert_telemetry_resource(&env_resource_provider);
431                assert_eq!(env_resource_provider.resource().len(), 6);
432            },
433        );
434
435        // When `OTEL_RESOURCE_ATTRIBUTES` is set and also user provided config
436        temp_env::with_var(
437            "OTEL_RESOURCE_ATTRIBUTES",
438            Some("my-custom-key=env-val,k2=value2"),
439            || {
440                let user_provided_resource_config_provider = super::LoggerProvider::builder()
441                    .with_resource(Resource::default().merge(&mut Resource::new(vec![
442                        KeyValue::new("my-custom-key", "my-custom-value"),
443                        KeyValue::new("my-custom-key2", "my-custom-value2"),
444                    ])))
445                    .build();
446                assert_resource(
447                    &user_provided_resource_config_provider,
448                    SERVICE_NAME,
449                    Some("unknown_service"),
450                );
451                assert_resource(
452                    &user_provided_resource_config_provider,
453                    "my-custom-key",
454                    Some("my-custom-value"),
455                );
456                assert_resource(
457                    &user_provided_resource_config_provider,
458                    "my-custom-key2",
459                    Some("my-custom-value2"),
460                );
461                assert_resource(
462                    &user_provided_resource_config_provider,
463                    "k2",
464                    Some("value2"),
465                );
466                assert_telemetry_resource(&user_provided_resource_config_provider);
467                assert_eq!(user_provided_resource_config_provider.resource().len(), 7);
468            },
469        );
470
471        // If user provided a resource, it takes priority during collision.
472        let no_service_name = super::LoggerProvider::builder()
473            .with_resource(Resource::empty())
474            .build();
475        assert_eq!(no_service_name.resource().len(), 0);
476    }
477
478    #[test]
479    fn shutdown_test() {
480        let counter = Arc::new(AtomicU64::new(0));
481        let logger_provider = LoggerProvider::builder()
482            .with_log_processor(ShutdownTestLogProcessor::new(counter.clone()))
483            .build();
484
485        let logger1 = logger_provider.logger("test-logger1");
486        let logger2 = logger_provider.logger("test-logger2");
487        logger1.emit(logger1.create_log_record());
488        logger2.emit(logger1.create_log_record());
489
490        let logger3 = logger_provider.logger("test-logger3");
491        let handle = thread::spawn(move || {
492            logger3.emit(logger3.create_log_record());
493        });
494        handle.join().expect("thread panicked");
495
496        let _ = logger_provider.shutdown();
497        logger1.emit(logger1.create_log_record());
498
499        assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 3);
500    }
501
502    #[test]
503    fn shutdown_idempotent_test() {
504        let counter = Arc::new(AtomicU64::new(0));
505        let logger_provider = LoggerProvider::builder()
506            .with_log_processor(ShutdownTestLogProcessor::new(counter.clone()))
507            .build();
508
509        let shutdown_res = logger_provider.shutdown();
510        assert!(shutdown_res.is_ok());
511
512        // Subsequent shutdowns should return an error.
513        let shutdown_res = logger_provider.shutdown();
514        assert!(shutdown_res.is_err());
515
516        // Subsequent shutdowns should return an error.
517        let shutdown_res = logger_provider.shutdown();
518        assert!(shutdown_res.is_err());
519    }
520
521    #[test]
522    fn global_shutdown_test() {
523        // cargo test global_shutdown_test --features=testing
524
525        // Arrange
526        let shutdown_called = Arc::new(Mutex::new(false));
527        let flush_called = Arc::new(Mutex::new(false));
528        let logger_provider = LoggerProvider::builder()
529            .with_log_processor(LazyLogProcessor::new(
530                shutdown_called.clone(),
531                flush_called.clone(),
532            ))
533            .build();
534        //set_logger_provider(logger_provider);
535        let logger1 = logger_provider.logger("test-logger1");
536        let logger2 = logger_provider.logger("test-logger2");
537
538        // Acts
539        logger1.emit(logger1.create_log_record());
540        logger2.emit(logger1.create_log_record());
541
542        // explicitly calling shutdown on logger_provider. This will
543        // indeed do the shutdown, even if there are loggers still alive.
544        let _ = logger_provider.shutdown();
545
546        // Assert
547
548        // shutdown is called.
549        assert!(*shutdown_called.lock().unwrap());
550
551        // flush is never called by the sdk.
552        assert!(!*flush_called.lock().unwrap());
553    }
554
555    #[derive(Debug)]
556    pub(crate) struct LazyLogProcessor {
557        shutdown_called: Arc<Mutex<bool>>,
558        flush_called: Arc<Mutex<bool>>,
559    }
560
561    impl LazyLogProcessor {
562        pub(crate) fn new(
563            shutdown_called: Arc<Mutex<bool>>,
564            flush_called: Arc<Mutex<bool>>,
565        ) -> Self {
566            LazyLogProcessor {
567                shutdown_called,
568                flush_called,
569            }
570        }
571    }
572
573    impl LogProcessor for LazyLogProcessor {
574        fn emit(&self, _data: &mut LogData) {
575            // nothing to do.
576        }
577
578        fn force_flush(&self) -> LogResult<()> {
579            *self.flush_called.lock().unwrap() = true;
580            Ok(())
581        }
582
583        fn shutdown(&self) -> LogResult<()> {
584            *self.shutdown_called.lock().unwrap() = true;
585            Ok(())
586        }
587
588        #[cfg(feature = "logs_level_enabled")]
589        fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
590            true
591        }
592    }
593}