opentelemetry_sdk/logs/
log_processor.rs

1use crate::{
2    export::logs::{ExportResult, LogData, LogExporter},
3    runtime::{RuntimeChannel, TrySend},
4    Resource,
5};
6use futures_channel::oneshot;
7use futures_util::{
8    future::{self, Either},
9    {pin_mut, stream, StreamExt as _},
10};
11#[cfg(feature = "logs_level_enabled")]
12use opentelemetry::logs::Severity;
13use opentelemetry::{
14    global,
15    logs::{LogError, LogResult},
16};
17use std::borrow::Cow;
18use std::sync::atomic::AtomicBool;
19use std::{cmp::min, env, sync::Mutex};
20use std::{
21    fmt::{self, Debug, Formatter},
22    str::FromStr,
23    sync::Arc,
24    time::Duration,
25};
26
27/// Delay interval between two consecutive exports.
28const OTEL_BLRP_SCHEDULE_DELAY: &str = "OTEL_BLRP_SCHEDULE_DELAY";
29/// Default delay interval between two consecutive exports.
30const OTEL_BLRP_SCHEDULE_DELAY_DEFAULT: u64 = 1_000;
31/// Maximum allowed time to export data.
32const OTEL_BLRP_EXPORT_TIMEOUT: &str = "OTEL_BLRP_EXPORT_TIMEOUT";
33/// Default maximum allowed time to export data.
34const OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT: u64 = 30_000;
35/// Maximum queue size.
36const OTEL_BLRP_MAX_QUEUE_SIZE: &str = "OTEL_BLRP_MAX_QUEUE_SIZE";
37/// Default maximum queue size.
38const OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048;
39/// Maximum batch size, must be less than or equal to OTEL_BLRP_MAX_QUEUE_SIZE.
40const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE";
41/// Default maximum batch size.
42const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
43
44/// The interface for plugging into a [`Logger`].
45///
46/// [`Logger`]: crate::logs::Logger
47pub trait LogProcessor: Send + Sync + Debug {
48    /// Called when a log record is ready to processed and exported.
49    ///
50    /// This method receives a mutable reference to `LogData`. If the processor
51    /// needs to handle the export asynchronously, it should clone the data to
52    /// ensure it can be safely processed without lifetime issues. Any changes
53    /// made to the log data in this method will be reflected in the next log
54    /// processor in the chain.
55    ///
56    /// # Parameters
57    /// - `data`: A mutable reference to `LogData` representing the log record.
58    fn emit(&self, data: &mut LogData);
59    /// Force the logs lying in the cache to be exported.
60    fn force_flush(&self) -> LogResult<()>;
61    /// Shuts down the processor.
62    /// After shutdown returns the log processor should stop processing any logs.
63    /// It's up to the implementation on when to drop the LogProcessor.
64    fn shutdown(&self) -> LogResult<()>;
65    #[cfg(feature = "logs_level_enabled")]
66    /// Check if logging is enabled
67    fn event_enabled(&self, level: Severity, target: &str, name: &str) -> bool;
68
69    /// Set the resource for the log processor.
70    fn set_resource(&self, _resource: &Resource) {}
71}
72
73/// A [LogProcessor] that passes logs to the configured `LogExporter`, as soon
74/// as they are emitted, without any batching. This is typically useful for
75/// debugging and testing. For scenarios requiring higher
76/// performance/throughput, consider using [BatchLogProcessor].
77#[derive(Debug)]
78pub struct SimpleLogProcessor {
79    exporter: Mutex<Box<dyn LogExporter>>,
80    is_shutdown: AtomicBool,
81}
82
83impl SimpleLogProcessor {
84    pub(crate) fn new(exporter: Box<dyn LogExporter>) -> Self {
85        SimpleLogProcessor {
86            exporter: Mutex::new(exporter),
87            is_shutdown: AtomicBool::new(false),
88        }
89    }
90}
91
92impl LogProcessor for SimpleLogProcessor {
93    fn emit(&self, data: &mut LogData) {
94        // noop after shutdown
95        if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
96            return;
97        }
98
99        let result = self
100            .exporter
101            .lock()
102            .map_err(|_| LogError::Other("simple logprocessor mutex poison".into()))
103            .and_then(|mut exporter| {
104                futures_executor::block_on(exporter.export(vec![Cow::Borrowed(data)]))
105            });
106        if let Err(err) = result {
107            global::handle_error(err);
108        }
109    }
110
111    fn force_flush(&self) -> LogResult<()> {
112        Ok(())
113    }
114
115    fn shutdown(&self) -> LogResult<()> {
116        self.is_shutdown
117            .store(true, std::sync::atomic::Ordering::Relaxed);
118        if let Ok(mut exporter) = self.exporter.lock() {
119            exporter.shutdown();
120            Ok(())
121        } else {
122            Err(LogError::Other(
123                "simple logprocessor mutex poison during shutdown".into(),
124            ))
125        }
126    }
127
128    fn set_resource(&self, resource: &Resource) {
129        if let Ok(mut exporter) = self.exporter.lock() {
130            exporter.set_resource(resource);
131        }
132    }
133
134    #[cfg(feature = "logs_level_enabled")]
135    fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
136        true
137    }
138}
139
140/// A [`LogProcessor`] that asynchronously buffers log records and reports
141/// them at a pre-configured interval.
142pub struct BatchLogProcessor<R: RuntimeChannel> {
143    message_sender: R::Sender<BatchMessage>,
144}
145
146impl<R: RuntimeChannel> Debug for BatchLogProcessor<R> {
147    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
148        f.debug_struct("BatchLogProcessor")
149            .field("message_sender", &self.message_sender)
150            .finish()
151    }
152}
153
154impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
155    fn emit(&self, data: &mut LogData) {
156        let result = self
157            .message_sender
158            .try_send(BatchMessage::ExportLog(data.clone()));
159
160        if let Err(err) = result {
161            global::handle_error(LogError::Other(err.into()));
162        }
163    }
164
165    #[cfg(feature = "logs_level_enabled")]
166    fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
167        true
168    }
169
170    fn force_flush(&self) -> LogResult<()> {
171        let (res_sender, res_receiver) = oneshot::channel();
172        self.message_sender
173            .try_send(BatchMessage::Flush(Some(res_sender)))
174            .map_err(|err| LogError::Other(err.into()))?;
175
176        futures_executor::block_on(res_receiver)
177            .map_err(|err| LogError::Other(err.into()))
178            .and_then(std::convert::identity)
179    }
180
181    fn shutdown(&self) -> LogResult<()> {
182        let (res_sender, res_receiver) = oneshot::channel();
183        self.message_sender
184            .try_send(BatchMessage::Shutdown(res_sender))
185            .map_err(|err| LogError::Other(err.into()))?;
186
187        futures_executor::block_on(res_receiver)
188            .map_err(|err| LogError::Other(err.into()))
189            .and_then(std::convert::identity)
190    }
191
192    fn set_resource(&self, resource: &Resource) {
193        let resource = Arc::new(resource.clone());
194        let _ = self
195            .message_sender
196            .try_send(BatchMessage::SetResource(resource));
197    }
198}
199
200impl<R: RuntimeChannel> BatchLogProcessor<R> {
201    pub(crate) fn new(mut exporter: Box<dyn LogExporter>, config: BatchConfig, runtime: R) -> Self {
202        let (message_sender, message_receiver) =
203            runtime.batch_message_channel(config.max_queue_size);
204        let ticker = runtime
205            .interval(config.scheduled_delay)
206            .map(|_| BatchMessage::Flush(None));
207        let timeout_runtime = runtime.clone();
208
209        // Spawn worker process via user-defined spawn function.
210        runtime.spawn(Box::pin(async move {
211            let mut logs = Vec::new();
212            let mut messages = Box::pin(stream::select(message_receiver, ticker));
213
214            while let Some(message) = messages.next().await {
215                match message {
216                    // Log has finished, add to buffer of pending logs.
217                    BatchMessage::ExportLog(log) => {
218                        logs.push(Cow::Owned(log));
219
220                        if logs.len() == config.max_export_batch_size {
221                            let result = export_with_timeout(
222                                config.max_export_timeout,
223                                exporter.as_mut(),
224                                &timeout_runtime,
225                                logs.split_off(0),
226                            )
227                            .await;
228
229                            if let Err(err) = result {
230                                global::handle_error(err);
231                            }
232                        }
233                    }
234                    // Log batch interval time reached or a force flush has been invoked, export current spans.
235                    BatchMessage::Flush(res_channel) => {
236                        let result = export_with_timeout(
237                            config.max_export_timeout,
238                            exporter.as_mut(),
239                            &timeout_runtime,
240                            logs.split_off(0),
241                        )
242                        .await;
243
244                        if let Some(channel) = res_channel {
245                            if let Err(result) = channel.send(result) {
246                                global::handle_error(LogError::from(format!(
247                                    "failed to send flush result: {:?}",
248                                    result
249                                )));
250                            }
251                        } else if let Err(err) = result {
252                            global::handle_error(err);
253                        }
254                    }
255                    // Stream has terminated or processor is shutdown, return to finish execution.
256                    BatchMessage::Shutdown(ch) => {
257                        let result = export_with_timeout(
258                            config.max_export_timeout,
259                            exporter.as_mut(),
260                            &timeout_runtime,
261                            logs.split_off(0),
262                        )
263                        .await;
264
265                        exporter.shutdown();
266
267                        if let Err(result) = ch.send(result) {
268                            global::handle_error(LogError::from(format!(
269                                "failed to send batch processor shutdown result: {:?}",
270                                result
271                            )));
272                        }
273
274                        break;
275                    }
276
277                    // propagate the resource
278                    BatchMessage::SetResource(resource) => {
279                        exporter.set_resource(&resource);
280                    }
281                }
282            }
283        }));
284
285        // Return batch processor with link to worker
286        BatchLogProcessor { message_sender }
287    }
288
289    /// Create a new batch processor builder
290    pub fn builder<E>(exporter: E, runtime: R) -> BatchLogProcessorBuilder<E, R>
291    where
292        E: LogExporter,
293    {
294        BatchLogProcessorBuilder {
295            exporter,
296            config: Default::default(),
297            runtime,
298        }
299    }
300}
301
302async fn export_with_timeout<'a, R, E>(
303    time_out: Duration,
304    exporter: &mut E,
305    runtime: &R,
306    batch: Vec<Cow<'a, LogData>>,
307) -> ExportResult
308where
309    R: RuntimeChannel,
310    E: LogExporter + ?Sized,
311{
312    if batch.is_empty() {
313        return Ok(());
314    }
315
316    let export = exporter.export(batch);
317    let timeout = runtime.delay(time_out);
318    pin_mut!(export);
319    pin_mut!(timeout);
320    match future::select(export, timeout).await {
321        Either::Left((export_res, _)) => export_res,
322        Either::Right((_, _)) => ExportResult::Err(LogError::ExportTimedOut(time_out)),
323    }
324}
325
326/// Batch log processor configuration.
327/// Use [`BatchConfigBuilder`] to configure your own instance of [`BatchConfig`].
328#[derive(Debug)]
329pub struct BatchConfig {
330    /// The maximum queue size to buffer logs for delayed processing. If the
331    /// queue gets full it drops the logs. The default value of is 2048.
332    max_queue_size: usize,
333
334    /// The delay interval in milliseconds between two consecutive processing
335    /// of batches. The default value is 1 second.
336    scheduled_delay: Duration,
337
338    /// The maximum number of logs to process in a single batch. If there are
339    /// more than one batch worth of logs then it processes multiple batches
340    /// of logs one batch after the other without any delay. The default value
341    /// is 512.
342    max_export_batch_size: usize,
343
344    /// The maximum duration to export a batch of data.
345    max_export_timeout: Duration,
346}
347
348impl Default for BatchConfig {
349    fn default() -> Self {
350        BatchConfigBuilder::default().build()
351    }
352}
353
354/// A builder for creating [`BatchConfig`] instances.
355#[derive(Debug)]
356pub struct BatchConfigBuilder {
357    max_queue_size: usize,
358    scheduled_delay: Duration,
359    max_export_batch_size: usize,
360    max_export_timeout: Duration,
361}
362
363impl Default for BatchConfigBuilder {
364    /// Create a new [`BatchConfigBuilder`] initialized with default batch config values as per the specs.
365    /// The values are overriden by environment variables if set.
366    /// The supported environment variables are:
367    /// * `OTEL_BLRP_MAX_QUEUE_SIZE`
368    /// * `OTEL_BLRP_SCHEDULE_DELAY`
369    /// * `OTEL_BLRP_MAX_EXPORT_BATCH_SIZE`
370    /// * `OTEL_BLRP_EXPORT_TIMEOUT`
371    fn default() -> Self {
372        BatchConfigBuilder {
373            max_queue_size: OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT,
374            scheduled_delay: Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT),
375            max_export_batch_size: OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
376            max_export_timeout: Duration::from_millis(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT),
377        }
378        .init_from_env_vars()
379    }
380}
381
382impl BatchConfigBuilder {
383    /// Set max_queue_size for [`BatchConfigBuilder`].
384    /// It's the maximum queue size to buffer logs for delayed processing.
385    /// If the queue gets full it will drop the logs.
386    /// The default value of is 2048.
387    pub fn with_max_queue_size(mut self, max_queue_size: usize) -> Self {
388        self.max_queue_size = max_queue_size;
389        self
390    }
391
392    /// Set scheduled_delay for [`BatchConfigBuilder`].
393    /// It's the delay interval in milliseconds between two consecutive processing of batches.
394    /// The default value is 1000 milliseconds.
395    pub fn with_scheduled_delay(mut self, scheduled_delay: Duration) -> Self {
396        self.scheduled_delay = scheduled_delay;
397        self
398    }
399
400    /// Set max_export_timeout for [`BatchConfigBuilder`].
401    /// It's the maximum duration to export a batch of data.
402    /// The default value is 30000 milliseconds.
403    pub fn with_max_export_timeout(mut self, max_export_timeout: Duration) -> Self {
404        self.max_export_timeout = max_export_timeout;
405        self
406    }
407
408    /// Set max_export_batch_size for [`BatchConfigBuilder`].
409    /// It's the maximum number of logs to process in a single batch. If there are
410    /// more than one batch worth of logs then it processes multiple batches
411    /// of logs one batch after the other without any delay.
412    /// The default value is 512.
413    pub fn with_max_export_batch_size(mut self, max_export_batch_size: usize) -> Self {
414        self.max_export_batch_size = max_export_batch_size;
415        self
416    }
417
418    /// Builds a `BatchConfig` enforcing the following invariants:
419    /// * `max_export_batch_size` must be less than or equal to `max_queue_size`.
420    pub fn build(self) -> BatchConfig {
421        // max export batch size must be less or equal to max queue size.
422        // we set max export batch size to max queue size if it's larger than max queue size.
423        let max_export_batch_size = min(self.max_export_batch_size, self.max_queue_size);
424
425        BatchConfig {
426            max_queue_size: self.max_queue_size,
427            scheduled_delay: self.scheduled_delay,
428            max_export_timeout: self.max_export_timeout,
429            max_export_batch_size,
430        }
431    }
432
433    fn init_from_env_vars(mut self) -> Self {
434        if let Some(max_queue_size) = env::var(OTEL_BLRP_MAX_QUEUE_SIZE)
435            .ok()
436            .and_then(|queue_size| usize::from_str(&queue_size).ok())
437        {
438            self.max_queue_size = max_queue_size;
439        }
440
441        if let Some(max_export_batch_size) = env::var(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE)
442            .ok()
443            .and_then(|batch_size| usize::from_str(&batch_size).ok())
444        {
445            self.max_export_batch_size = max_export_batch_size;
446        }
447
448        if let Some(scheduled_delay) = env::var(OTEL_BLRP_SCHEDULE_DELAY)
449            .ok()
450            .and_then(|delay| u64::from_str(&delay).ok())
451        {
452            self.scheduled_delay = Duration::from_millis(scheduled_delay);
453        }
454
455        if let Some(max_export_timeout) = env::var(OTEL_BLRP_EXPORT_TIMEOUT)
456            .ok()
457            .and_then(|s| u64::from_str(&s).ok())
458        {
459            self.max_export_timeout = Duration::from_millis(max_export_timeout);
460        }
461
462        self
463    }
464}
465
466/// A builder for creating [`BatchLogProcessor`] instances.
467///
468#[derive(Debug)]
469pub struct BatchLogProcessorBuilder<E, R> {
470    exporter: E,
471    config: BatchConfig,
472    runtime: R,
473}
474
475impl<E, R> BatchLogProcessorBuilder<E, R>
476where
477    E: LogExporter + 'static,
478    R: RuntimeChannel,
479{
480    /// Set the BatchConfig for [`BatchLogProcessorBuilder`]
481    pub fn with_batch_config(self, config: BatchConfig) -> Self {
482        BatchLogProcessorBuilder { config, ..self }
483    }
484
485    /// Build a batch processor
486    pub fn build(self) -> BatchLogProcessor<R> {
487        BatchLogProcessor::new(Box::new(self.exporter), self.config, self.runtime)
488    }
489}
490
491/// Messages sent between application thread and batch log processor's work thread.
492#[allow(clippy::large_enum_variant)]
493#[derive(Debug)]
494enum BatchMessage {
495    /// Export logs, usually called when the log is emitted.
496    ExportLog(LogData),
497    /// Flush the current buffer to the backend, it can be triggered by
498    /// pre configured interval or a call to `force_push` function.
499    Flush(Option<oneshot::Sender<ExportResult>>),
500    /// Shut down the worker thread, push all logs in buffer to the backend.
501    Shutdown(oneshot::Sender<ExportResult>),
502    /// Set the resource for the exporter.
503    SetResource(Arc<Resource>),
504}
505
506#[cfg(all(test, feature = "testing", feature = "logs"))]
507mod tests {
508    use super::{
509        BatchLogProcessor, OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
510        OTEL_BLRP_MAX_QUEUE_SIZE, OTEL_BLRP_SCHEDULE_DELAY,
511    };
512    use crate::testing::logs::InMemoryLogsExporterBuilder;
513    use crate::{
514        export::logs::{LogData, LogExporter},
515        logs::{
516            log_processor::{
517                OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
518                OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BLRP_SCHEDULE_DELAY_DEFAULT,
519            },
520            BatchConfig, BatchConfigBuilder, LogProcessor, LoggerProvider, SimpleLogProcessor,
521        },
522        runtime,
523        testing::logs::InMemoryLogsExporter,
524        Resource,
525    };
526    use async_trait::async_trait;
527    use opentelemetry::logs::AnyValue;
528    #[cfg(feature = "logs_level_enabled")]
529    use opentelemetry::logs::Severity;
530    use opentelemetry::logs::{Logger, LoggerProvider as _};
531    use opentelemetry::Key;
532    use opentelemetry::{logs::LogResult, KeyValue};
533    use std::borrow::Cow;
534    use std::sync::{Arc, Mutex};
535    use std::time::Duration;
536
537    #[derive(Debug, Clone)]
538    struct MockLogExporter {
539        resource: Arc<Mutex<Option<Resource>>>,
540    }
541
542    #[async_trait]
543    impl LogExporter for MockLogExporter {
544        async fn export<'a>(&mut self, _batch: Vec<Cow<'a, LogData>>) -> LogResult<()> {
545            Ok(())
546        }
547
548        fn shutdown(&mut self) {}
549
550        fn set_resource(&mut self, resource: &Resource) {
551            self.resource
552                .lock()
553                .map(|mut res_opt| {
554                    res_opt.replace(resource.clone());
555                })
556                .expect("mock log exporter shouldn't error when setting resource");
557        }
558    }
559
560    // Implementation specific to the MockLogExporter, not part of the LogExporter trait
561    impl MockLogExporter {
562        fn get_resource(&self) -> Option<Resource> {
563            (*self.resource).lock().unwrap().clone()
564        }
565    }
566
567    #[test]
568    fn test_default_const_values() {
569        assert_eq!(OTEL_BLRP_SCHEDULE_DELAY, "OTEL_BLRP_SCHEDULE_DELAY");
570        assert_eq!(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT, 1_000);
571        assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT, "OTEL_BLRP_EXPORT_TIMEOUT");
572        assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT, 30_000);
573        assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE, "OTEL_BLRP_MAX_QUEUE_SIZE");
574        assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, 2_048);
575        assert_eq!(
576            OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
577            "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE"
578        );
579        assert_eq!(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, 512);
580    }
581
582    #[test]
583    fn test_default_batch_config_adheres_to_specification() {
584        // The following environment variables are expected to be unset so that their default values are used.
585        let env_vars = vec![
586            OTEL_BLRP_SCHEDULE_DELAY,
587            OTEL_BLRP_EXPORT_TIMEOUT,
588            OTEL_BLRP_MAX_QUEUE_SIZE,
589            OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
590        ];
591
592        let config = temp_env::with_vars_unset(env_vars, BatchConfig::default);
593
594        assert_eq!(
595            config.scheduled_delay,
596            Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT)
597        );
598        assert_eq!(
599            config.max_export_timeout,
600            Duration::from_millis(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT)
601        );
602        assert_eq!(config.max_queue_size, OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT);
603        assert_eq!(
604            config.max_export_batch_size,
605            OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT
606        );
607    }
608
609    #[test]
610    fn test_batch_config_configurable_by_env_vars() {
611        let env_vars = vec![
612            (OTEL_BLRP_SCHEDULE_DELAY, Some("2000")),
613            (OTEL_BLRP_EXPORT_TIMEOUT, Some("60000")),
614            (OTEL_BLRP_MAX_QUEUE_SIZE, Some("4096")),
615            (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
616        ];
617
618        let config = temp_env::with_vars(env_vars, BatchConfig::default);
619
620        assert_eq!(config.scheduled_delay, Duration::from_millis(2000));
621        assert_eq!(config.max_export_timeout, Duration::from_millis(60000));
622        assert_eq!(config.max_queue_size, 4096);
623        assert_eq!(config.max_export_batch_size, 1024);
624    }
625
626    #[test]
627    fn test_batch_config_max_export_batch_size_validation() {
628        let env_vars = vec![
629            (OTEL_BLRP_MAX_QUEUE_SIZE, Some("256")),
630            (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
631        ];
632
633        let config = temp_env::with_vars(env_vars, BatchConfig::default);
634
635        assert_eq!(config.max_queue_size, 256);
636        assert_eq!(config.max_export_batch_size, 256);
637        assert_eq!(
638            config.scheduled_delay,
639            Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT)
640        );
641        assert_eq!(
642            config.max_export_timeout,
643            Duration::from_millis(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT)
644        );
645    }
646
647    #[test]
648    fn test_batch_config_with_fields() {
649        let batch = BatchConfigBuilder::default()
650            .with_max_export_batch_size(1)
651            .with_scheduled_delay(Duration::from_millis(2))
652            .with_max_export_timeout(Duration::from_millis(3))
653            .with_max_queue_size(4)
654            .build();
655
656        assert_eq!(batch.max_export_batch_size, 1);
657        assert_eq!(batch.scheduled_delay, Duration::from_millis(2));
658        assert_eq!(batch.max_export_timeout, Duration::from_millis(3));
659        assert_eq!(batch.max_queue_size, 4);
660    }
661
662    #[test]
663    fn test_build_batch_log_processor_builder() {
664        let mut env_vars = vec![
665            (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("500")),
666            (OTEL_BLRP_SCHEDULE_DELAY, Some("I am not number")),
667            (OTEL_BLRP_EXPORT_TIMEOUT, Some("2046")),
668        ];
669        temp_env::with_vars(env_vars.clone(), || {
670            let builder =
671                BatchLogProcessor::builder(InMemoryLogsExporter::default(), runtime::Tokio);
672
673            assert_eq!(builder.config.max_export_batch_size, 500);
674            assert_eq!(
675                builder.config.scheduled_delay,
676                Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT)
677            );
678            assert_eq!(
679                builder.config.max_queue_size,
680                OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT
681            );
682            assert_eq!(
683                builder.config.max_export_timeout,
684                Duration::from_millis(2046)
685            );
686        });
687
688        env_vars.push((OTEL_BLRP_MAX_QUEUE_SIZE, Some("120")));
689
690        temp_env::with_vars(env_vars, || {
691            let builder =
692                BatchLogProcessor::builder(InMemoryLogsExporter::default(), runtime::Tokio);
693            assert_eq!(builder.config.max_export_batch_size, 120);
694            assert_eq!(builder.config.max_queue_size, 120);
695        });
696    }
697
698    #[test]
699    fn test_build_batch_log_processor_builder_with_custom_config() {
700        let expected = BatchConfigBuilder::default()
701            .with_max_export_batch_size(1)
702            .with_scheduled_delay(Duration::from_millis(2))
703            .with_max_export_timeout(Duration::from_millis(3))
704            .with_max_queue_size(4)
705            .build();
706
707        let builder = BatchLogProcessor::builder(InMemoryLogsExporter::default(), runtime::Tokio)
708            .with_batch_config(expected);
709
710        let actual = &builder.config;
711        assert_eq!(actual.max_export_batch_size, 1);
712        assert_eq!(actual.scheduled_delay, Duration::from_millis(2));
713        assert_eq!(actual.max_export_timeout, Duration::from_millis(3));
714        assert_eq!(actual.max_queue_size, 4);
715    }
716
717    #[test]
718    fn test_set_resource_simple_processor() {
719        let exporter = MockLogExporter {
720            resource: Arc::new(Mutex::new(None)),
721        };
722        let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
723        let _ = LoggerProvider::builder()
724            .with_log_processor(processor)
725            .with_resource(Resource::new(vec![
726                KeyValue::new("k1", "v1"),
727                KeyValue::new("k2", "v3"),
728                KeyValue::new("k3", "v3"),
729                KeyValue::new("k4", "v4"),
730                KeyValue::new("k5", "v5"),
731            ]))
732            .build();
733        assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5);
734    }
735
736    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
737    async fn test_set_resource_batch_processor() {
738        let exporter = MockLogExporter {
739            resource: Arc::new(Mutex::new(None)),
740        };
741        let processor = BatchLogProcessor::new(
742            Box::new(exporter.clone()),
743            BatchConfig::default(),
744            runtime::Tokio,
745        );
746        let provider = LoggerProvider::builder()
747            .with_log_processor(processor)
748            .with_resource(Resource::new(vec![
749                KeyValue::new("k1", "v1"),
750                KeyValue::new("k2", "v3"),
751                KeyValue::new("k3", "v3"),
752                KeyValue::new("k4", "v4"),
753                KeyValue::new("k5", "v5"),
754            ]))
755            .build();
756        tokio::time::sleep(Duration::from_secs(2)).await; // set resource in batch span processor is not blocking. Should we make it blocking?
757        assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5);
758        let _ = provider.shutdown();
759    }
760
761    #[tokio::test(flavor = "multi_thread")]
762    async fn test_batch_shutdown() {
763        // assert we will receive an error
764        // setup
765        let exporter = InMemoryLogsExporterBuilder::default()
766            .keep_records_on_shutdown()
767            .build();
768        let processor = BatchLogProcessor::new(
769            Box::new(exporter.clone()),
770            BatchConfig::default(),
771            runtime::Tokio,
772        );
773        let mut log_data = LogData {
774            record: Default::default(),
775            instrumentation: Default::default(),
776        };
777        processor.emit(&mut log_data);
778        processor.force_flush().unwrap();
779        processor.shutdown().unwrap();
780        // todo: expect to see errors here. How should we assert this?
781        processor.emit(&mut log_data);
782        assert_eq!(1, exporter.get_emitted_logs().unwrap().len())
783    }
784
785    #[test]
786    fn test_simple_shutdown() {
787        let exporter = InMemoryLogsExporterBuilder::default()
788            .keep_records_on_shutdown()
789            .build();
790        let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
791
792        let mut log_data = LogData {
793            record: Default::default(),
794            instrumentation: Default::default(),
795        };
796
797        processor.emit(&mut log_data);
798
799        processor.shutdown().unwrap();
800
801        let is_shutdown = processor
802            .is_shutdown
803            .load(std::sync::atomic::Ordering::Relaxed);
804        assert!(is_shutdown);
805
806        processor.emit(&mut log_data);
807
808        assert_eq!(1, exporter.get_emitted_logs().unwrap().len())
809    }
810
811    #[derive(Debug)]
812    struct FirstProcessor {
813        pub(crate) logs: Arc<Mutex<Vec<LogData>>>,
814    }
815
816    impl LogProcessor for FirstProcessor {
817        fn emit(&self, data: &mut LogData) {
818            // add attribute
819            data.record.attributes.get_or_insert(vec![]).push((
820                Key::from_static_str("processed_by"),
821                AnyValue::String("FirstProcessor".into()),
822            ));
823            // update body
824            data.record.body = Some("Updated by FirstProcessor".into());
825
826            self.logs.lock().unwrap().push(data.clone()); //clone as the LogProcessor is storing the data.
827        }
828
829        #[cfg(feature = "logs_level_enabled")]
830        fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
831            true
832        }
833
834        fn force_flush(&self) -> LogResult<()> {
835            Ok(())
836        }
837
838        fn shutdown(&self) -> LogResult<()> {
839            Ok(())
840        }
841    }
842
843    #[derive(Debug)]
844    struct SecondProcessor {
845        pub(crate) logs: Arc<Mutex<Vec<LogData>>>,
846    }
847
848    impl LogProcessor for SecondProcessor {
849        fn emit(&self, data: &mut LogData) {
850            assert!(data.record.attributes.as_ref().map_or(false, |attrs| {
851                attrs.iter().any(|(key, value)| {
852                    key.as_str() == "processed_by"
853                        && value == &AnyValue::String("FirstProcessor".into())
854                })
855            }));
856            assert!(
857                data.record.body.clone().unwrap()
858                    == AnyValue::String("Updated by FirstProcessor".into())
859            );
860            self.logs.lock().unwrap().push(data.clone());
861        }
862
863        #[cfg(feature = "logs_level_enabled")]
864        fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
865            true
866        }
867
868        fn force_flush(&self) -> LogResult<()> {
869            Ok(())
870        }
871
872        fn shutdown(&self) -> LogResult<()> {
873            Ok(())
874        }
875    }
876    #[test]
877    fn test_log_data_modification_by_multiple_processors() {
878        let first_processor_logs = Arc::new(Mutex::new(Vec::new()));
879        let second_processor_logs = Arc::new(Mutex::new(Vec::new()));
880
881        let first_processor = FirstProcessor {
882            logs: Arc::clone(&first_processor_logs),
883        };
884        let second_processor = SecondProcessor {
885            logs: Arc::clone(&second_processor_logs),
886        };
887
888        let logger_provider = LoggerProvider::builder()
889            .with_log_processor(first_processor)
890            .with_log_processor(second_processor)
891            .build();
892
893        let logger = logger_provider.logger("test-logger");
894        let mut log_record = logger.create_log_record();
895        log_record.body = Some(AnyValue::String("Test log".into()));
896
897        logger.emit(log_record);
898
899        assert_eq!(first_processor_logs.lock().unwrap().len(), 1);
900        assert_eq!(second_processor_logs.lock().unwrap().len(), 1);
901
902        let first_log = &first_processor_logs.lock().unwrap()[0];
903        let second_log = &second_processor_logs.lock().unwrap()[0];
904
905        assert!(first_log.record.attributes.iter().any(|attrs| {
906            attrs.iter().any(|(key, value)| {
907                key.as_str() == "processed_by"
908                    && value == &AnyValue::String("FirstProcessor".into())
909            })
910        }));
911
912        assert!(second_log.record.attributes.iter().any(|attrs| {
913            attrs.iter().any(|(key, value)| {
914                key.as_str() == "processed_by"
915                    && value == &AnyValue::String("FirstProcessor".into())
916            })
917        }));
918        assert!(
919            first_log.record.body.clone().unwrap()
920                == AnyValue::String("Updated by FirstProcessor".into())
921        );
922        assert!(
923            second_log.record.body.clone().unwrap()
924                == AnyValue::String("Updated by FirstProcessor".into())
925        );
926    }
927}