1use crate::{
2 export::logs::{ExportResult, LogData, LogExporter},
3 runtime::{RuntimeChannel, TrySend},
4 Resource,
5};
6use futures_channel::oneshot;
7use futures_util::{
8 future::{self, Either},
9 {pin_mut, stream, StreamExt as _},
10};
11#[cfg(feature = "logs_level_enabled")]
12use opentelemetry::logs::Severity;
13use opentelemetry::{
14 global,
15 logs::{LogError, LogResult},
16};
17use std::borrow::Cow;
18use std::sync::atomic::AtomicBool;
19use std::{cmp::min, env, sync::Mutex};
20use std::{
21 fmt::{self, Debug, Formatter},
22 str::FromStr,
23 sync::Arc,
24 time::Duration,
25};
26
27const OTEL_BLRP_SCHEDULE_DELAY: &str = "OTEL_BLRP_SCHEDULE_DELAY";
29const OTEL_BLRP_SCHEDULE_DELAY_DEFAULT: u64 = 1_000;
31const OTEL_BLRP_EXPORT_TIMEOUT: &str = "OTEL_BLRP_EXPORT_TIMEOUT";
33const OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT: u64 = 30_000;
35const OTEL_BLRP_MAX_QUEUE_SIZE: &str = "OTEL_BLRP_MAX_QUEUE_SIZE";
37const OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048;
39const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE";
41const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
43
44pub trait LogProcessor: Send + Sync + Debug {
48 fn emit(&self, data: &mut LogData);
59 fn force_flush(&self) -> LogResult<()>;
61 fn shutdown(&self) -> LogResult<()>;
65 #[cfg(feature = "logs_level_enabled")]
66 fn event_enabled(&self, level: Severity, target: &str, name: &str) -> bool;
68
69 fn set_resource(&self, _resource: &Resource) {}
71}
72
73#[derive(Debug)]
78pub struct SimpleLogProcessor {
79 exporter: Mutex<Box<dyn LogExporter>>,
80 is_shutdown: AtomicBool,
81}
82
83impl SimpleLogProcessor {
84 pub(crate) fn new(exporter: Box<dyn LogExporter>) -> Self {
85 SimpleLogProcessor {
86 exporter: Mutex::new(exporter),
87 is_shutdown: AtomicBool::new(false),
88 }
89 }
90}
91
92impl LogProcessor for SimpleLogProcessor {
93 fn emit(&self, data: &mut LogData) {
94 if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
96 return;
97 }
98
99 let result = self
100 .exporter
101 .lock()
102 .map_err(|_| LogError::Other("simple logprocessor mutex poison".into()))
103 .and_then(|mut exporter| {
104 futures_executor::block_on(exporter.export(vec![Cow::Borrowed(data)]))
105 });
106 if let Err(err) = result {
107 global::handle_error(err);
108 }
109 }
110
111 fn force_flush(&self) -> LogResult<()> {
112 Ok(())
113 }
114
115 fn shutdown(&self) -> LogResult<()> {
116 self.is_shutdown
117 .store(true, std::sync::atomic::Ordering::Relaxed);
118 if let Ok(mut exporter) = self.exporter.lock() {
119 exporter.shutdown();
120 Ok(())
121 } else {
122 Err(LogError::Other(
123 "simple logprocessor mutex poison during shutdown".into(),
124 ))
125 }
126 }
127
128 fn set_resource(&self, resource: &Resource) {
129 if let Ok(mut exporter) = self.exporter.lock() {
130 exporter.set_resource(resource);
131 }
132 }
133
134 #[cfg(feature = "logs_level_enabled")]
135 fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
136 true
137 }
138}
139
140pub struct BatchLogProcessor<R: RuntimeChannel> {
143 message_sender: R::Sender<BatchMessage>,
144}
145
146impl<R: RuntimeChannel> Debug for BatchLogProcessor<R> {
147 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
148 f.debug_struct("BatchLogProcessor")
149 .field("message_sender", &self.message_sender)
150 .finish()
151 }
152}
153
154impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
155 fn emit(&self, data: &mut LogData) {
156 let result = self
157 .message_sender
158 .try_send(BatchMessage::ExportLog(data.clone()));
159
160 if let Err(err) = result {
161 global::handle_error(LogError::Other(err.into()));
162 }
163 }
164
165 #[cfg(feature = "logs_level_enabled")]
166 fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
167 true
168 }
169
170 fn force_flush(&self) -> LogResult<()> {
171 let (res_sender, res_receiver) = oneshot::channel();
172 self.message_sender
173 .try_send(BatchMessage::Flush(Some(res_sender)))
174 .map_err(|err| LogError::Other(err.into()))?;
175
176 futures_executor::block_on(res_receiver)
177 .map_err(|err| LogError::Other(err.into()))
178 .and_then(std::convert::identity)
179 }
180
181 fn shutdown(&self) -> LogResult<()> {
182 let (res_sender, res_receiver) = oneshot::channel();
183 self.message_sender
184 .try_send(BatchMessage::Shutdown(res_sender))
185 .map_err(|err| LogError::Other(err.into()))?;
186
187 futures_executor::block_on(res_receiver)
188 .map_err(|err| LogError::Other(err.into()))
189 .and_then(std::convert::identity)
190 }
191
192 fn set_resource(&self, resource: &Resource) {
193 let resource = Arc::new(resource.clone());
194 let _ = self
195 .message_sender
196 .try_send(BatchMessage::SetResource(resource));
197 }
198}
199
200impl<R: RuntimeChannel> BatchLogProcessor<R> {
201 pub(crate) fn new(mut exporter: Box<dyn LogExporter>, config: BatchConfig, runtime: R) -> Self {
202 let (message_sender, message_receiver) =
203 runtime.batch_message_channel(config.max_queue_size);
204 let ticker = runtime
205 .interval(config.scheduled_delay)
206 .map(|_| BatchMessage::Flush(None));
207 let timeout_runtime = runtime.clone();
208
209 runtime.spawn(Box::pin(async move {
211 let mut logs = Vec::new();
212 let mut messages = Box::pin(stream::select(message_receiver, ticker));
213
214 while let Some(message) = messages.next().await {
215 match message {
216 BatchMessage::ExportLog(log) => {
218 logs.push(Cow::Owned(log));
219
220 if logs.len() == config.max_export_batch_size {
221 let result = export_with_timeout(
222 config.max_export_timeout,
223 exporter.as_mut(),
224 &timeout_runtime,
225 logs.split_off(0),
226 )
227 .await;
228
229 if let Err(err) = result {
230 global::handle_error(err);
231 }
232 }
233 }
234 BatchMessage::Flush(res_channel) => {
236 let result = export_with_timeout(
237 config.max_export_timeout,
238 exporter.as_mut(),
239 &timeout_runtime,
240 logs.split_off(0),
241 )
242 .await;
243
244 if let Some(channel) = res_channel {
245 if let Err(result) = channel.send(result) {
246 global::handle_error(LogError::from(format!(
247 "failed to send flush result: {:?}",
248 result
249 )));
250 }
251 } else if let Err(err) = result {
252 global::handle_error(err);
253 }
254 }
255 BatchMessage::Shutdown(ch) => {
257 let result = export_with_timeout(
258 config.max_export_timeout,
259 exporter.as_mut(),
260 &timeout_runtime,
261 logs.split_off(0),
262 )
263 .await;
264
265 exporter.shutdown();
266
267 if let Err(result) = ch.send(result) {
268 global::handle_error(LogError::from(format!(
269 "failed to send batch processor shutdown result: {:?}",
270 result
271 )));
272 }
273
274 break;
275 }
276
277 BatchMessage::SetResource(resource) => {
279 exporter.set_resource(&resource);
280 }
281 }
282 }
283 }));
284
285 BatchLogProcessor { message_sender }
287 }
288
289 pub fn builder<E>(exporter: E, runtime: R) -> BatchLogProcessorBuilder<E, R>
291 where
292 E: LogExporter,
293 {
294 BatchLogProcessorBuilder {
295 exporter,
296 config: Default::default(),
297 runtime,
298 }
299 }
300}
301
302async fn export_with_timeout<'a, R, E>(
303 time_out: Duration,
304 exporter: &mut E,
305 runtime: &R,
306 batch: Vec<Cow<'a, LogData>>,
307) -> ExportResult
308where
309 R: RuntimeChannel,
310 E: LogExporter + ?Sized,
311{
312 if batch.is_empty() {
313 return Ok(());
314 }
315
316 let export = exporter.export(batch);
317 let timeout = runtime.delay(time_out);
318 pin_mut!(export);
319 pin_mut!(timeout);
320 match future::select(export, timeout).await {
321 Either::Left((export_res, _)) => export_res,
322 Either::Right((_, _)) => ExportResult::Err(LogError::ExportTimedOut(time_out)),
323 }
324}
325
326#[derive(Debug)]
329pub struct BatchConfig {
330 max_queue_size: usize,
333
334 scheduled_delay: Duration,
337
338 max_export_batch_size: usize,
343
344 max_export_timeout: Duration,
346}
347
348impl Default for BatchConfig {
349 fn default() -> Self {
350 BatchConfigBuilder::default().build()
351 }
352}
353
354#[derive(Debug)]
356pub struct BatchConfigBuilder {
357 max_queue_size: usize,
358 scheduled_delay: Duration,
359 max_export_batch_size: usize,
360 max_export_timeout: Duration,
361}
362
363impl Default for BatchConfigBuilder {
364 fn default() -> Self {
372 BatchConfigBuilder {
373 max_queue_size: OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT,
374 scheduled_delay: Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT),
375 max_export_batch_size: OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
376 max_export_timeout: Duration::from_millis(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT),
377 }
378 .init_from_env_vars()
379 }
380}
381
382impl BatchConfigBuilder {
383 pub fn with_max_queue_size(mut self, max_queue_size: usize) -> Self {
388 self.max_queue_size = max_queue_size;
389 self
390 }
391
392 pub fn with_scheduled_delay(mut self, scheduled_delay: Duration) -> Self {
396 self.scheduled_delay = scheduled_delay;
397 self
398 }
399
400 pub fn with_max_export_timeout(mut self, max_export_timeout: Duration) -> Self {
404 self.max_export_timeout = max_export_timeout;
405 self
406 }
407
408 pub fn with_max_export_batch_size(mut self, max_export_batch_size: usize) -> Self {
414 self.max_export_batch_size = max_export_batch_size;
415 self
416 }
417
418 pub fn build(self) -> BatchConfig {
421 let max_export_batch_size = min(self.max_export_batch_size, self.max_queue_size);
424
425 BatchConfig {
426 max_queue_size: self.max_queue_size,
427 scheduled_delay: self.scheduled_delay,
428 max_export_timeout: self.max_export_timeout,
429 max_export_batch_size,
430 }
431 }
432
433 fn init_from_env_vars(mut self) -> Self {
434 if let Some(max_queue_size) = env::var(OTEL_BLRP_MAX_QUEUE_SIZE)
435 .ok()
436 .and_then(|queue_size| usize::from_str(&queue_size).ok())
437 {
438 self.max_queue_size = max_queue_size;
439 }
440
441 if let Some(max_export_batch_size) = env::var(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE)
442 .ok()
443 .and_then(|batch_size| usize::from_str(&batch_size).ok())
444 {
445 self.max_export_batch_size = max_export_batch_size;
446 }
447
448 if let Some(scheduled_delay) = env::var(OTEL_BLRP_SCHEDULE_DELAY)
449 .ok()
450 .and_then(|delay| u64::from_str(&delay).ok())
451 {
452 self.scheduled_delay = Duration::from_millis(scheduled_delay);
453 }
454
455 if let Some(max_export_timeout) = env::var(OTEL_BLRP_EXPORT_TIMEOUT)
456 .ok()
457 .and_then(|s| u64::from_str(&s).ok())
458 {
459 self.max_export_timeout = Duration::from_millis(max_export_timeout);
460 }
461
462 self
463 }
464}
465
466#[derive(Debug)]
469pub struct BatchLogProcessorBuilder<E, R> {
470 exporter: E,
471 config: BatchConfig,
472 runtime: R,
473}
474
475impl<E, R> BatchLogProcessorBuilder<E, R>
476where
477 E: LogExporter + 'static,
478 R: RuntimeChannel,
479{
480 pub fn with_batch_config(self, config: BatchConfig) -> Self {
482 BatchLogProcessorBuilder { config, ..self }
483 }
484
485 pub fn build(self) -> BatchLogProcessor<R> {
487 BatchLogProcessor::new(Box::new(self.exporter), self.config, self.runtime)
488 }
489}
490
491#[allow(clippy::large_enum_variant)]
493#[derive(Debug)]
494enum BatchMessage {
495 ExportLog(LogData),
497 Flush(Option<oneshot::Sender<ExportResult>>),
500 Shutdown(oneshot::Sender<ExportResult>),
502 SetResource(Arc<Resource>),
504}
505
506#[cfg(all(test, feature = "testing", feature = "logs"))]
507mod tests {
508 use super::{
509 BatchLogProcessor, OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
510 OTEL_BLRP_MAX_QUEUE_SIZE, OTEL_BLRP_SCHEDULE_DELAY,
511 };
512 use crate::testing::logs::InMemoryLogsExporterBuilder;
513 use crate::{
514 export::logs::{LogData, LogExporter},
515 logs::{
516 log_processor::{
517 OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
518 OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BLRP_SCHEDULE_DELAY_DEFAULT,
519 },
520 BatchConfig, BatchConfigBuilder, LogProcessor, LoggerProvider, SimpleLogProcessor,
521 },
522 runtime,
523 testing::logs::InMemoryLogsExporter,
524 Resource,
525 };
526 use async_trait::async_trait;
527 use opentelemetry::logs::AnyValue;
528 #[cfg(feature = "logs_level_enabled")]
529 use opentelemetry::logs::Severity;
530 use opentelemetry::logs::{Logger, LoggerProvider as _};
531 use opentelemetry::Key;
532 use opentelemetry::{logs::LogResult, KeyValue};
533 use std::borrow::Cow;
534 use std::sync::{Arc, Mutex};
535 use std::time::Duration;
536
537 #[derive(Debug, Clone)]
538 struct MockLogExporter {
539 resource: Arc<Mutex<Option<Resource>>>,
540 }
541
542 #[async_trait]
543 impl LogExporter for MockLogExporter {
544 async fn export<'a>(&mut self, _batch: Vec<Cow<'a, LogData>>) -> LogResult<()> {
545 Ok(())
546 }
547
548 fn shutdown(&mut self) {}
549
550 fn set_resource(&mut self, resource: &Resource) {
551 self.resource
552 .lock()
553 .map(|mut res_opt| {
554 res_opt.replace(resource.clone());
555 })
556 .expect("mock log exporter shouldn't error when setting resource");
557 }
558 }
559
560 impl MockLogExporter {
562 fn get_resource(&self) -> Option<Resource> {
563 (*self.resource).lock().unwrap().clone()
564 }
565 }
566
567 #[test]
568 fn test_default_const_values() {
569 assert_eq!(OTEL_BLRP_SCHEDULE_DELAY, "OTEL_BLRP_SCHEDULE_DELAY");
570 assert_eq!(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT, 1_000);
571 assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT, "OTEL_BLRP_EXPORT_TIMEOUT");
572 assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT, 30_000);
573 assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE, "OTEL_BLRP_MAX_QUEUE_SIZE");
574 assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, 2_048);
575 assert_eq!(
576 OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
577 "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE"
578 );
579 assert_eq!(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, 512);
580 }
581
582 #[test]
583 fn test_default_batch_config_adheres_to_specification() {
584 let env_vars = vec![
586 OTEL_BLRP_SCHEDULE_DELAY,
587 OTEL_BLRP_EXPORT_TIMEOUT,
588 OTEL_BLRP_MAX_QUEUE_SIZE,
589 OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
590 ];
591
592 let config = temp_env::with_vars_unset(env_vars, BatchConfig::default);
593
594 assert_eq!(
595 config.scheduled_delay,
596 Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT)
597 );
598 assert_eq!(
599 config.max_export_timeout,
600 Duration::from_millis(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT)
601 );
602 assert_eq!(config.max_queue_size, OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT);
603 assert_eq!(
604 config.max_export_batch_size,
605 OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT
606 );
607 }
608
609 #[test]
610 fn test_batch_config_configurable_by_env_vars() {
611 let env_vars = vec![
612 (OTEL_BLRP_SCHEDULE_DELAY, Some("2000")),
613 (OTEL_BLRP_EXPORT_TIMEOUT, Some("60000")),
614 (OTEL_BLRP_MAX_QUEUE_SIZE, Some("4096")),
615 (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
616 ];
617
618 let config = temp_env::with_vars(env_vars, BatchConfig::default);
619
620 assert_eq!(config.scheduled_delay, Duration::from_millis(2000));
621 assert_eq!(config.max_export_timeout, Duration::from_millis(60000));
622 assert_eq!(config.max_queue_size, 4096);
623 assert_eq!(config.max_export_batch_size, 1024);
624 }
625
626 #[test]
627 fn test_batch_config_max_export_batch_size_validation() {
628 let env_vars = vec![
629 (OTEL_BLRP_MAX_QUEUE_SIZE, Some("256")),
630 (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
631 ];
632
633 let config = temp_env::with_vars(env_vars, BatchConfig::default);
634
635 assert_eq!(config.max_queue_size, 256);
636 assert_eq!(config.max_export_batch_size, 256);
637 assert_eq!(
638 config.scheduled_delay,
639 Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT)
640 );
641 assert_eq!(
642 config.max_export_timeout,
643 Duration::from_millis(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT)
644 );
645 }
646
647 #[test]
648 fn test_batch_config_with_fields() {
649 let batch = BatchConfigBuilder::default()
650 .with_max_export_batch_size(1)
651 .with_scheduled_delay(Duration::from_millis(2))
652 .with_max_export_timeout(Duration::from_millis(3))
653 .with_max_queue_size(4)
654 .build();
655
656 assert_eq!(batch.max_export_batch_size, 1);
657 assert_eq!(batch.scheduled_delay, Duration::from_millis(2));
658 assert_eq!(batch.max_export_timeout, Duration::from_millis(3));
659 assert_eq!(batch.max_queue_size, 4);
660 }
661
662 #[test]
663 fn test_build_batch_log_processor_builder() {
664 let mut env_vars = vec![
665 (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("500")),
666 (OTEL_BLRP_SCHEDULE_DELAY, Some("I am not number")),
667 (OTEL_BLRP_EXPORT_TIMEOUT, Some("2046")),
668 ];
669 temp_env::with_vars(env_vars.clone(), || {
670 let builder =
671 BatchLogProcessor::builder(InMemoryLogsExporter::default(), runtime::Tokio);
672
673 assert_eq!(builder.config.max_export_batch_size, 500);
674 assert_eq!(
675 builder.config.scheduled_delay,
676 Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT)
677 );
678 assert_eq!(
679 builder.config.max_queue_size,
680 OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT
681 );
682 assert_eq!(
683 builder.config.max_export_timeout,
684 Duration::from_millis(2046)
685 );
686 });
687
688 env_vars.push((OTEL_BLRP_MAX_QUEUE_SIZE, Some("120")));
689
690 temp_env::with_vars(env_vars, || {
691 let builder =
692 BatchLogProcessor::builder(InMemoryLogsExporter::default(), runtime::Tokio);
693 assert_eq!(builder.config.max_export_batch_size, 120);
694 assert_eq!(builder.config.max_queue_size, 120);
695 });
696 }
697
698 #[test]
699 fn test_build_batch_log_processor_builder_with_custom_config() {
700 let expected = BatchConfigBuilder::default()
701 .with_max_export_batch_size(1)
702 .with_scheduled_delay(Duration::from_millis(2))
703 .with_max_export_timeout(Duration::from_millis(3))
704 .with_max_queue_size(4)
705 .build();
706
707 let builder = BatchLogProcessor::builder(InMemoryLogsExporter::default(), runtime::Tokio)
708 .with_batch_config(expected);
709
710 let actual = &builder.config;
711 assert_eq!(actual.max_export_batch_size, 1);
712 assert_eq!(actual.scheduled_delay, Duration::from_millis(2));
713 assert_eq!(actual.max_export_timeout, Duration::from_millis(3));
714 assert_eq!(actual.max_queue_size, 4);
715 }
716
717 #[test]
718 fn test_set_resource_simple_processor() {
719 let exporter = MockLogExporter {
720 resource: Arc::new(Mutex::new(None)),
721 };
722 let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
723 let _ = LoggerProvider::builder()
724 .with_log_processor(processor)
725 .with_resource(Resource::new(vec![
726 KeyValue::new("k1", "v1"),
727 KeyValue::new("k2", "v3"),
728 KeyValue::new("k3", "v3"),
729 KeyValue::new("k4", "v4"),
730 KeyValue::new("k5", "v5"),
731 ]))
732 .build();
733 assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5);
734 }
735
736 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
737 async fn test_set_resource_batch_processor() {
738 let exporter = MockLogExporter {
739 resource: Arc::new(Mutex::new(None)),
740 };
741 let processor = BatchLogProcessor::new(
742 Box::new(exporter.clone()),
743 BatchConfig::default(),
744 runtime::Tokio,
745 );
746 let provider = LoggerProvider::builder()
747 .with_log_processor(processor)
748 .with_resource(Resource::new(vec![
749 KeyValue::new("k1", "v1"),
750 KeyValue::new("k2", "v3"),
751 KeyValue::new("k3", "v3"),
752 KeyValue::new("k4", "v4"),
753 KeyValue::new("k5", "v5"),
754 ]))
755 .build();
756 tokio::time::sleep(Duration::from_secs(2)).await; assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5);
758 let _ = provider.shutdown();
759 }
760
761 #[tokio::test(flavor = "multi_thread")]
762 async fn test_batch_shutdown() {
763 let exporter = InMemoryLogsExporterBuilder::default()
766 .keep_records_on_shutdown()
767 .build();
768 let processor = BatchLogProcessor::new(
769 Box::new(exporter.clone()),
770 BatchConfig::default(),
771 runtime::Tokio,
772 );
773 let mut log_data = LogData {
774 record: Default::default(),
775 instrumentation: Default::default(),
776 };
777 processor.emit(&mut log_data);
778 processor.force_flush().unwrap();
779 processor.shutdown().unwrap();
780 processor.emit(&mut log_data);
782 assert_eq!(1, exporter.get_emitted_logs().unwrap().len())
783 }
784
785 #[test]
786 fn test_simple_shutdown() {
787 let exporter = InMemoryLogsExporterBuilder::default()
788 .keep_records_on_shutdown()
789 .build();
790 let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
791
792 let mut log_data = LogData {
793 record: Default::default(),
794 instrumentation: Default::default(),
795 };
796
797 processor.emit(&mut log_data);
798
799 processor.shutdown().unwrap();
800
801 let is_shutdown = processor
802 .is_shutdown
803 .load(std::sync::atomic::Ordering::Relaxed);
804 assert!(is_shutdown);
805
806 processor.emit(&mut log_data);
807
808 assert_eq!(1, exporter.get_emitted_logs().unwrap().len())
809 }
810
811 #[derive(Debug)]
812 struct FirstProcessor {
813 pub(crate) logs: Arc<Mutex<Vec<LogData>>>,
814 }
815
816 impl LogProcessor for FirstProcessor {
817 fn emit(&self, data: &mut LogData) {
818 data.record.attributes.get_or_insert(vec![]).push((
820 Key::from_static_str("processed_by"),
821 AnyValue::String("FirstProcessor".into()),
822 ));
823 data.record.body = Some("Updated by FirstProcessor".into());
825
826 self.logs.lock().unwrap().push(data.clone()); }
828
829 #[cfg(feature = "logs_level_enabled")]
830 fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
831 true
832 }
833
834 fn force_flush(&self) -> LogResult<()> {
835 Ok(())
836 }
837
838 fn shutdown(&self) -> LogResult<()> {
839 Ok(())
840 }
841 }
842
843 #[derive(Debug)]
844 struct SecondProcessor {
845 pub(crate) logs: Arc<Mutex<Vec<LogData>>>,
846 }
847
848 impl LogProcessor for SecondProcessor {
849 fn emit(&self, data: &mut LogData) {
850 assert!(data.record.attributes.as_ref().map_or(false, |attrs| {
851 attrs.iter().any(|(key, value)| {
852 key.as_str() == "processed_by"
853 && value == &AnyValue::String("FirstProcessor".into())
854 })
855 }));
856 assert!(
857 data.record.body.clone().unwrap()
858 == AnyValue::String("Updated by FirstProcessor".into())
859 );
860 self.logs.lock().unwrap().push(data.clone());
861 }
862
863 #[cfg(feature = "logs_level_enabled")]
864 fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
865 true
866 }
867
868 fn force_flush(&self) -> LogResult<()> {
869 Ok(())
870 }
871
872 fn shutdown(&self) -> LogResult<()> {
873 Ok(())
874 }
875 }
876 #[test]
877 fn test_log_data_modification_by_multiple_processors() {
878 let first_processor_logs = Arc::new(Mutex::new(Vec::new()));
879 let second_processor_logs = Arc::new(Mutex::new(Vec::new()));
880
881 let first_processor = FirstProcessor {
882 logs: Arc::clone(&first_processor_logs),
883 };
884 let second_processor = SecondProcessor {
885 logs: Arc::clone(&second_processor_logs),
886 };
887
888 let logger_provider = LoggerProvider::builder()
889 .with_log_processor(first_processor)
890 .with_log_processor(second_processor)
891 .build();
892
893 let logger = logger_provider.logger("test-logger");
894 let mut log_record = logger.create_log_record();
895 log_record.body = Some(AnyValue::String("Test log".into()));
896
897 logger.emit(log_record);
898
899 assert_eq!(first_processor_logs.lock().unwrap().len(), 1);
900 assert_eq!(second_processor_logs.lock().unwrap().len(), 1);
901
902 let first_log = &first_processor_logs.lock().unwrap()[0];
903 let second_log = &second_processor_logs.lock().unwrap()[0];
904
905 assert!(first_log.record.attributes.iter().any(|attrs| {
906 attrs.iter().any(|(key, value)| {
907 key.as_str() == "processed_by"
908 && value == &AnyValue::String("FirstProcessor".into())
909 })
910 }));
911
912 assert!(second_log.record.attributes.iter().any(|attrs| {
913 attrs.iter().any(|(key, value)| {
914 key.as_str() == "processed_by"
915 && value == &AnyValue::String("FirstProcessor".into())
916 })
917 }));
918 assert!(
919 first_log.record.body.clone().unwrap()
920 == AnyValue::String("Updated by FirstProcessor".into())
921 );
922 assert!(
923 second_log.record.body.clone().unwrap()
924 == AnyValue::String("Updated by FirstProcessor".into())
925 );
926 }
927}