1use super::{BatchLogProcessor, LogProcessor, LogRecord, SimpleLogProcessor, TraceContext};
2use crate::{
3 export::logs::{LogData, LogExporter},
4 runtime::RuntimeChannel,
5 Resource,
6};
7use opentelemetry::{
8 global,
9 logs::{LogError, LogResult},
10 trace::TraceContextExt,
11 Context, InstrumentationLibrary,
12};
13
14#[cfg(feature = "logs_level_enabled")]
15use opentelemetry::logs::Severity;
16
17use std::{
18 borrow::Cow,
19 sync::{atomic::Ordering, Arc},
20};
21use std::{sync::atomic::AtomicBool, time::SystemTime};
22
23use once_cell::sync::Lazy;
24
25static NOOP_LOGGER_PROVIDER: Lazy<LoggerProvider> = Lazy::new(|| LoggerProvider {
27 inner: Arc::new(LoggerProviderInner {
28 processors: Vec::new(),
29 resource: Resource::empty(),
30 }),
31 is_shutdown: Arc::new(AtomicBool::new(true)),
32});
33
34#[derive(Debug, Clone)]
35pub struct LoggerProvider {
37 inner: Arc<LoggerProviderInner>,
38 is_shutdown: Arc<AtomicBool>,
39}
40
41const DEFAULT_COMPONENT_NAME: &str = "rust.opentelemetry.io/sdk/logger";
43const PREALLOCATED_ATTRIBUTE_CAPACITY: usize = 8;
47
48impl opentelemetry::logs::LoggerProvider for LoggerProvider {
49 type Logger = Logger;
50
51 fn versioned_logger(
53 &self,
54 name: impl Into<Cow<'static, str>>,
55 version: Option<Cow<'static, str>>,
56 schema_url: Option<Cow<'static, str>>,
57 attributes: Option<Vec<opentelemetry::KeyValue>>,
58 ) -> Logger {
59 let name = name.into();
60
61 let component_name = if name.is_empty() {
62 Cow::Borrowed(DEFAULT_COMPONENT_NAME)
63 } else {
64 name
65 };
66
67 let mut builder = self.logger_builder(component_name);
68
69 if let Some(v) = version {
70 builder = builder.with_version(v);
71 }
72 if let Some(s) = schema_url {
73 builder = builder.with_schema_url(s);
74 }
75 if let Some(a) = attributes {
76 builder = builder.with_attributes(a);
77 }
78
79 builder.build()
80 }
81
82 fn library_logger(&self, library: Arc<InstrumentationLibrary>) -> Self::Logger {
83 if self.is_shutdown.load(Ordering::Relaxed) {
85 return Logger::new(library, NOOP_LOGGER_PROVIDER.clone());
86 }
87 Logger::new(library, self.clone())
88 }
89}
90
91impl LoggerProvider {
92 pub fn builder() -> Builder {
94 Builder::default()
95 }
96
97 pub(crate) fn log_processors(&self) -> &[Box<dyn LogProcessor>] {
98 &self.inner.processors
99 }
100
101 pub(crate) fn resource(&self) -> &Resource {
102 &self.inner.resource
103 }
104
105 pub fn force_flush(&self) -> Vec<LogResult<()>> {
107 self.log_processors()
108 .iter()
109 .map(|processor| processor.force_flush())
110 .collect()
111 }
112
113 pub fn shutdown(&self) -> LogResult<()> {
115 if self
116 .is_shutdown
117 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
118 .is_ok()
119 {
120 let mut errs = vec![];
123 for processor in &self.inner.processors {
124 if let Err(err) = processor.shutdown() {
125 errs.push(err);
126 }
127 }
128
129 if errs.is_empty() {
130 Ok(())
131 } else {
132 Err(LogError::Other(format!("{errs:?}").into()))
133 }
134 } else {
135 Err(LogError::Other("logger provider already shut down".into()))
136 }
137 }
138}
139
140#[derive(Debug)]
141struct LoggerProviderInner {
142 processors: Vec<Box<dyn LogProcessor>>,
143 resource: Resource,
144}
145
146impl Drop for LoggerProviderInner {
147 fn drop(&mut self) {
148 for processor in &mut self.processors {
149 if let Err(err) = processor.shutdown() {
150 global::handle_error(err);
151 }
152 }
153 }
154}
155
156#[derive(Debug, Default)]
157pub struct Builder {
159 processors: Vec<Box<dyn LogProcessor>>,
160 resource: Option<Resource>,
161}
162
163impl Builder {
164 pub fn with_simple_exporter<T: LogExporter + 'static>(self, exporter: T) -> Self {
166 let mut processors = self.processors;
167 processors.push(Box::new(SimpleLogProcessor::new(Box::new(exporter))));
168
169 Builder { processors, ..self }
170 }
171
172 pub fn with_batch_exporter<T: LogExporter + 'static, R: RuntimeChannel>(
174 self,
175 exporter: T,
176 runtime: R,
177 ) -> Self {
178 let batch = BatchLogProcessor::builder(exporter, runtime).build();
179 self.with_log_processor(batch)
180 }
181
182 pub fn with_log_processor<T: LogProcessor + 'static>(self, processor: T) -> Self {
184 let mut processors = self.processors;
185 processors.push(Box::new(processor));
186
187 Builder { processors, ..self }
188 }
189
190 pub fn with_resource(self, resource: Resource) -> Self {
192 Builder {
193 resource: Some(resource),
194 ..self
195 }
196 }
197
198 pub fn build(self) -> LoggerProvider {
200 let resource = self.resource.unwrap_or_default();
201
202 let logger_provider = LoggerProvider {
203 inner: Arc::new(LoggerProviderInner {
204 processors: self.processors,
205 resource,
206 }),
207 is_shutdown: Arc::new(AtomicBool::new(false)),
208 };
209
210 for processor in logger_provider.log_processors() {
212 processor.set_resource(logger_provider.resource());
213 }
214 logger_provider
215 }
216}
217
218#[derive(Debug)]
219pub struct Logger {
223 instrumentation_lib: Arc<InstrumentationLibrary>,
224 provider: LoggerProvider,
225}
226
227impl Logger {
228 pub(crate) fn new(
229 instrumentation_lib: Arc<InstrumentationLibrary>,
230 provider: LoggerProvider,
231 ) -> Self {
232 Logger {
233 instrumentation_lib,
234 provider,
235 }
236 }
237
238 pub fn provider(&self) -> &LoggerProvider {
240 &self.provider
241 }
242
243 pub fn instrumentation_library(&self) -> &InstrumentationLibrary {
245 &self.instrumentation_lib
246 }
247}
248
249impl opentelemetry::logs::Logger for Logger {
250 type LogRecord = LogRecord;
251
252 fn create_log_record(&self) -> Self::LogRecord {
253 LogRecord {
255 attributes: Some(Vec::with_capacity(PREALLOCATED_ATTRIBUTE_CAPACITY)),
256 ..Default::default()
257 }
258 }
259
260 fn emit(&self, record: Self::LogRecord) {
262 let provider = self.provider();
263 let processors = provider.log_processors();
264 let trace_context = Context::map_current(|cx| {
265 cx.has_active_span()
266 .then(|| TraceContext::from(cx.span().span_context()))
267 });
268 let mut log_record = record;
269 if let Some(ref trace_context) = trace_context {
270 log_record.trace_context = Some(trace_context.clone());
271 }
272 if log_record.observed_timestamp.is_none() {
273 log_record.observed_timestamp = Some(SystemTime::now());
274 }
275
276 let mut data = LogData {
277 record: log_record,
278 instrumentation: self.instrumentation_library().clone(),
279 };
280
281 for p in processors {
282 p.emit(&mut data);
283 }
284 }
285
286 #[cfg(feature = "logs_level_enabled")]
287 fn event_enabled(&self, level: Severity, target: &str) -> bool {
288 let provider = self.provider();
289
290 let mut enabled = false;
291 for processor in provider.log_processors() {
292 enabled = enabled
293 || processor.event_enabled(
294 level,
295 target,
296 self.instrumentation_library().name.as_ref(),
297 );
298 }
299 enabled
300 }
301}
302
303#[cfg(test)]
304mod tests {
305 use crate::resource::{
306 SERVICE_NAME, TELEMETRY_SDK_LANGUAGE, TELEMETRY_SDK_NAME, TELEMETRY_SDK_VERSION,
307 };
308 use crate::Resource;
309
310 use super::*;
311 use opentelemetry::logs::{Logger, LoggerProvider as _};
312 use opentelemetry::{Key, KeyValue, Value};
313 use std::fmt::{Debug, Formatter};
314 use std::sync::atomic::AtomicU64;
315 use std::sync::Mutex;
316 use std::thread;
317
318 struct ShutdownTestLogProcessor {
319 is_shutdown: Arc<Mutex<bool>>,
320 counter: Arc<AtomicU64>,
321 }
322
323 impl Debug for ShutdownTestLogProcessor {
324 fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
325 todo!()
326 }
327 }
328
329 impl ShutdownTestLogProcessor {
330 pub(crate) fn new(counter: Arc<AtomicU64>) -> Self {
331 ShutdownTestLogProcessor {
332 is_shutdown: Arc::new(Mutex::new(false)),
333 counter,
334 }
335 }
336 }
337
338 impl LogProcessor for ShutdownTestLogProcessor {
339 fn emit(&self, _data: &mut LogData) {
340 self.is_shutdown
341 .lock()
342 .map(|is_shutdown| {
343 if !*is_shutdown {
344 self.counter
345 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
346 }
347 })
348 .expect("lock poisoned");
349 }
350
351 fn force_flush(&self) -> LogResult<()> {
352 Ok(())
353 }
354
355 fn shutdown(&self) -> LogResult<()> {
356 self.is_shutdown
357 .lock()
358 .map(|mut is_shutdown| *is_shutdown = true)
359 .expect("lock poisoned");
360 Ok(())
361 }
362
363 #[cfg(feature = "logs_level_enabled")]
364 fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
365 true
366 }
367 }
368 #[test]
369 fn test_logger_provider_default_resource() {
370 let assert_resource = |provider: &super::LoggerProvider,
371 resource_key: &'static str,
372 expect: Option<&'static str>| {
373 assert_eq!(
374 provider
375 .resource()
376 .get(Key::from_static_str(resource_key))
377 .map(|v| v.to_string()),
378 expect.map(|s| s.to_string())
379 );
380 };
381 let assert_telemetry_resource = |provider: &super::LoggerProvider| {
382 assert_eq!(
383 provider.resource().get(TELEMETRY_SDK_LANGUAGE.into()),
384 Some(Value::from("rust"))
385 );
386 assert_eq!(
387 provider.resource().get(TELEMETRY_SDK_NAME.into()),
388 Some(Value::from("opentelemetry"))
389 );
390 assert_eq!(
391 provider.resource().get(TELEMETRY_SDK_VERSION.into()),
392 Some(Value::from(env!("CARGO_PKG_VERSION")))
393 );
394 };
395
396 temp_env::with_var_unset("OTEL_RESOURCE_ATTRIBUTES", || {
398 let default_config_provider = super::LoggerProvider::builder().build();
399 assert_resource(
400 &default_config_provider,
401 SERVICE_NAME,
402 Some("unknown_service"),
403 );
404 assert_telemetry_resource(&default_config_provider);
405 });
406
407 let custom_config_provider = super::LoggerProvider::builder()
409 .with_resource(Resource::new(vec![KeyValue::new(
410 SERVICE_NAME,
411 "test_service",
412 )]))
413 .build();
414 assert_resource(&custom_config_provider, SERVICE_NAME, Some("test_service"));
415 assert_eq!(custom_config_provider.resource().len(), 1);
416
417 temp_env::with_var(
419 "OTEL_RESOURCE_ATTRIBUTES",
420 Some("key1=value1, k2, k3=value2"),
421 || {
422 let env_resource_provider = super::LoggerProvider::builder().build();
423 assert_resource(
424 &env_resource_provider,
425 SERVICE_NAME,
426 Some("unknown_service"),
427 );
428 assert_resource(&env_resource_provider, "key1", Some("value1"));
429 assert_resource(&env_resource_provider, "k3", Some("value2"));
430 assert_telemetry_resource(&env_resource_provider);
431 assert_eq!(env_resource_provider.resource().len(), 6);
432 },
433 );
434
435 temp_env::with_var(
437 "OTEL_RESOURCE_ATTRIBUTES",
438 Some("my-custom-key=env-val,k2=value2"),
439 || {
440 let user_provided_resource_config_provider = super::LoggerProvider::builder()
441 .with_resource(Resource::default().merge(&mut Resource::new(vec![
442 KeyValue::new("my-custom-key", "my-custom-value"),
443 KeyValue::new("my-custom-key2", "my-custom-value2"),
444 ])))
445 .build();
446 assert_resource(
447 &user_provided_resource_config_provider,
448 SERVICE_NAME,
449 Some("unknown_service"),
450 );
451 assert_resource(
452 &user_provided_resource_config_provider,
453 "my-custom-key",
454 Some("my-custom-value"),
455 );
456 assert_resource(
457 &user_provided_resource_config_provider,
458 "my-custom-key2",
459 Some("my-custom-value2"),
460 );
461 assert_resource(
462 &user_provided_resource_config_provider,
463 "k2",
464 Some("value2"),
465 );
466 assert_telemetry_resource(&user_provided_resource_config_provider);
467 assert_eq!(user_provided_resource_config_provider.resource().len(), 7);
468 },
469 );
470
471 let no_service_name = super::LoggerProvider::builder()
473 .with_resource(Resource::empty())
474 .build();
475 assert_eq!(no_service_name.resource().len(), 0);
476 }
477
478 #[test]
479 fn shutdown_test() {
480 let counter = Arc::new(AtomicU64::new(0));
481 let logger_provider = LoggerProvider::builder()
482 .with_log_processor(ShutdownTestLogProcessor::new(counter.clone()))
483 .build();
484
485 let logger1 = logger_provider.logger("test-logger1");
486 let logger2 = logger_provider.logger("test-logger2");
487 logger1.emit(logger1.create_log_record());
488 logger2.emit(logger1.create_log_record());
489
490 let logger3 = logger_provider.logger("test-logger3");
491 let handle = thread::spawn(move || {
492 logger3.emit(logger3.create_log_record());
493 });
494 handle.join().expect("thread panicked");
495
496 let _ = logger_provider.shutdown();
497 logger1.emit(logger1.create_log_record());
498
499 assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 3);
500 }
501
502 #[test]
503 fn shutdown_idempotent_test() {
504 let counter = Arc::new(AtomicU64::new(0));
505 let logger_provider = LoggerProvider::builder()
506 .with_log_processor(ShutdownTestLogProcessor::new(counter.clone()))
507 .build();
508
509 let shutdown_res = logger_provider.shutdown();
510 assert!(shutdown_res.is_ok());
511
512 let shutdown_res = logger_provider.shutdown();
514 assert!(shutdown_res.is_err());
515
516 let shutdown_res = logger_provider.shutdown();
518 assert!(shutdown_res.is_err());
519 }
520
521 #[test]
522 fn global_shutdown_test() {
523 let shutdown_called = Arc::new(Mutex::new(false));
527 let flush_called = Arc::new(Mutex::new(false));
528 let logger_provider = LoggerProvider::builder()
529 .with_log_processor(LazyLogProcessor::new(
530 shutdown_called.clone(),
531 flush_called.clone(),
532 ))
533 .build();
534 let logger1 = logger_provider.logger("test-logger1");
536 let logger2 = logger_provider.logger("test-logger2");
537
538 logger1.emit(logger1.create_log_record());
540 logger2.emit(logger1.create_log_record());
541
542 let _ = logger_provider.shutdown();
545
546 assert!(*shutdown_called.lock().unwrap());
550
551 assert!(!*flush_called.lock().unwrap());
553 }
554
555 #[derive(Debug)]
556 pub(crate) struct LazyLogProcessor {
557 shutdown_called: Arc<Mutex<bool>>,
558 flush_called: Arc<Mutex<bool>>,
559 }
560
561 impl LazyLogProcessor {
562 pub(crate) fn new(
563 shutdown_called: Arc<Mutex<bool>>,
564 flush_called: Arc<Mutex<bool>>,
565 ) -> Self {
566 LazyLogProcessor {
567 shutdown_called,
568 flush_called,
569 }
570 }
571 }
572
573 impl LogProcessor for LazyLogProcessor {
574 fn emit(&self, _data: &mut LogData) {
575 }
577
578 fn force_flush(&self) -> LogResult<()> {
579 *self.flush_called.lock().unwrap() = true;
580 Ok(())
581 }
582
583 fn shutdown(&self) -> LogResult<()> {
584 *self.shutdown_called.lock().unwrap() = true;
585 Ok(())
586 }
587
588 #[cfg(feature = "logs_level_enabled")]
589 fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
590 true
591 }
592 }
593}