opentelemetry_sdk/trace/
span_processor.rs

1//! # OpenTelemetry Span Processor Interface
2//!
3//! Span processor is an interface which allows hooks for span start and end method
4//! invocations. The span processors are invoked only when
5//! [`is_recording`] is true.
6//!
7//! Built-in span processors are responsible for batching and conversion of spans to
8//! exportable representation and passing batches to exporters.
9//!
10//! Span processors can be registered directly on SDK [`TracerProvider`] and they are
11//! invoked in the same order as they were registered.
12//!
13//! All `Tracer` instances created by a `TracerProvider` share the same span processors.
14//! Changes to this collection reflect in all `Tracer` instances.
15//!
16//! The following diagram shows `SpanProcessor`'s relationship to other components
17//! in the SDK:
18//!
19//! ```ascii
20//!   +-----+--------------+   +-----------------------+   +-------------------+
21//!   |     |              |   |                       |   |                   |
22//!   |     |              |   | (Batch)SpanProcessor  |   |    SpanExporter   |
23//!   |     |              +---> (Simple)SpanProcessor +--->  (OTLPExporter)   |
24//!   |     |              |   |                       |   |                   |
25//!   | SDK | Tracer.span()|   +-----------------------+   +-------------------+
26//!   |     | Span.end()   |
27//!   |     |              |
28//!   |     |              |
29//!   |     |              |
30//!   |     |              |
31//!   +-----+--------------+
32//! ```
33//!
34//! [`is_recording`]: opentelemetry::trace::Span::is_recording()
35//! [`TracerProvider`]: opentelemetry::trace::TracerProvider
36
37use crate::export::trace::{ExportResult, SpanData, SpanExporter};
38use crate::resource::Resource;
39use crate::runtime::{RuntimeChannel, TrySend};
40use crate::trace::Span;
41use futures_channel::oneshot;
42use futures_util::{
43    future::{self, BoxFuture, Either},
44    select,
45    stream::{self, FusedStream, FuturesUnordered},
46    StreamExt as _,
47};
48use opentelemetry::global;
49use opentelemetry::{
50    trace::{TraceError, TraceResult},
51    Context,
52};
53use std::cmp::min;
54use std::sync::{Arc, Mutex};
55use std::{env, fmt, str::FromStr, time::Duration};
56
57/// Delay interval between two consecutive exports.
58const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY";
59/// Default delay interval between two consecutive exports.
60const OTEL_BSP_SCHEDULE_DELAY_DEFAULT: u64 = 5_000;
61/// Maximum queue size
62const OTEL_BSP_MAX_QUEUE_SIZE: &str = "OTEL_BSP_MAX_QUEUE_SIZE";
63/// Default maximum queue size
64const OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048;
65/// Maximum batch size, must be less than or equal to OTEL_BSP_MAX_QUEUE_SIZE
66const OTEL_BSP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE";
67/// Default maximum batch size
68const OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
69/// Maximum allowed time to export data.
70const OTEL_BSP_EXPORT_TIMEOUT: &str = "OTEL_BSP_EXPORT_TIMEOUT";
71/// Default maximum allowed time to export data.
72const OTEL_BSP_EXPORT_TIMEOUT_DEFAULT: u64 = 30_000;
73/// Environment variable to configure max concurrent exports for batch span
74/// processor.
75const OTEL_BSP_MAX_CONCURRENT_EXPORTS: &str = "OTEL_BSP_MAX_CONCURRENT_EXPORTS";
76/// Default max concurrent exports for BSP
77const OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT: usize = 1;
78
79/// `SpanProcessor` is an interface which allows hooks for span start and end
80/// method invocations. The span processors are invoked only when is_recording
81/// is true.
82pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
83    /// `on_start` is called when a `Span` is started.  This method is called
84    /// synchronously on the thread that started the span, therefore it should
85    /// not block or throw exceptions.
86    fn on_start(&self, span: &mut Span, cx: &Context);
87    /// `on_end` is called after a `Span` is ended (i.e., the end timestamp is
88    /// already set). This method is called synchronously within the `Span::end`
89    /// API, therefore it should not block or throw an exception.
90    fn on_end(&self, span: SpanData);
91    /// Force the spans lying in the cache to be exported.
92    fn force_flush(&self) -> TraceResult<()>;
93    /// Shuts down the processor. Called when SDK is shut down. This is an
94    /// opportunity for processors to do any cleanup required.
95    ///
96    /// Implementation should make sure shutdown can be called multiple times.
97    fn shutdown(&self) -> TraceResult<()>;
98    /// Set the resource for the log processor.
99    fn set_resource(&mut self, _resource: &Resource) {}
100}
101
102/// A [SpanProcessor] that passes finished spans to the configured
103/// `SpanExporter`, as soon as they are finished, without any batching. This is
104/// typically useful for debugging and testing. For scenarios requiring higher
105/// performance/throughput, consider using [BatchSpanProcessor].
106#[derive(Debug)]
107pub struct SimpleSpanProcessor {
108    exporter: Mutex<Box<dyn SpanExporter>>,
109}
110
111impl SimpleSpanProcessor {
112    pub(crate) fn new(exporter: Box<dyn SpanExporter>) -> Self {
113        Self {
114            exporter: Mutex::new(exporter),
115        }
116    }
117}
118
119impl SpanProcessor for SimpleSpanProcessor {
120    fn on_start(&self, _span: &mut Span, _cx: &Context) {
121        // Ignored
122    }
123
124    fn on_end(&self, span: SpanData) {
125        if !span.span_context.is_sampled() {
126            return;
127        }
128
129        let result = self
130            .exporter
131            .lock()
132            .map_err(|_| TraceError::Other("SimpleSpanProcessor mutex poison".into()))
133            .and_then(|mut exporter| futures_executor::block_on(exporter.export(vec![span])));
134
135        if let Err(err) = result {
136            global::handle_error(err);
137        }
138    }
139
140    fn force_flush(&self) -> TraceResult<()> {
141        // Nothing to flush for simple span processor.
142        Ok(())
143    }
144
145    fn shutdown(&self) -> TraceResult<()> {
146        if let Ok(mut exporter) = self.exporter.lock() {
147            exporter.shutdown();
148            Ok(())
149        } else {
150            Err(TraceError::Other(
151                "SimpleSpanProcessor mutex poison at shutdown".into(),
152            ))
153        }
154    }
155
156    fn set_resource(&mut self, resource: &Resource) {
157        if let Ok(mut exporter) = self.exporter.lock() {
158            exporter.set_resource(resource);
159        }
160    }
161}
162
163/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports
164/// them at a preconfigured interval.
165///
166/// Batch span processors need to run a background task to collect and send
167/// spans. Different runtimes need different ways to handle the background task.
168///
169/// Note: Configuring an opentelemetry `Runtime` that's not compatible with the
170/// underlying runtime can cause deadlocks (see tokio section).
171///
172/// ### Use with Tokio
173///
174/// Tokio currently offers two different schedulers. One is
175/// `current_thread_scheduler`, the other is `multiple_thread_scheduler`. Both
176/// of them default to use batch span processors to install span exporters.
177///
178/// Tokio's `current_thread_scheduler` can cause the program to hang forever if
179/// blocking work is scheduled with other tasks in the same runtime. To avoid
180/// this, be sure to enable the `rt-tokio-current-thread` feature in this crate
181/// if you are using that runtime (e.g. users of actix-web), and blocking tasks
182/// will then be scheduled on a different thread.
183///
184/// # Examples
185///
186/// This processor can be configured with an [`executor`] of your choice to
187/// batch and upload spans asynchronously when they end. If you have added a
188/// library like [`tokio`] or [`async-std`], you can pass in their respective
189/// `spawn` and `interval` functions to have batching performed in those
190/// contexts.
191///
192/// ```
193/// # #[cfg(feature="tokio")]
194/// # {
195/// use opentelemetry::global;
196/// use opentelemetry_sdk::{runtime, testing::trace::NoopSpanExporter, trace};
197/// use opentelemetry_sdk::trace::BatchConfigBuilder;
198/// use std::time::Duration;
199///
200/// #[tokio::main]
201/// async fn main() {
202///     // Configure your preferred exporter
203///     let exporter = NoopSpanExporter::new();
204///
205///     // Create a batch span processor using an exporter and a runtime
206///     let batch = trace::BatchSpanProcessor::builder(exporter, runtime::Tokio)
207///         .with_batch_config(BatchConfigBuilder::default().with_max_queue_size(4096).build())
208///         .build();
209///
210///     // Then use the `with_batch_exporter` method to have the provider export spans in batches.
211///     let provider = trace::TracerProvider::builder()
212///         .with_span_processor(batch)
213///         .build();
214///
215///     let _ = global::set_tracer_provider(provider);
216/// }
217/// # }
218/// ```
219///
220/// [`executor`]: https://docs.rs/futures/0.3/futures/executor/index.html
221/// [`tokio`]: https://tokio.rs
222/// [`async-std`]: https://async.rs
223pub struct BatchSpanProcessor<R: RuntimeChannel> {
224    message_sender: R::Sender<BatchMessage>,
225}
226
227impl<R: RuntimeChannel> fmt::Debug for BatchSpanProcessor<R> {
228    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
229        f.debug_struct("BatchSpanProcessor")
230            .field("message_sender", &self.message_sender)
231            .finish()
232    }
233}
234
235impl<R: RuntimeChannel> SpanProcessor for BatchSpanProcessor<R> {
236    fn on_start(&self, _span: &mut Span, _cx: &Context) {
237        // Ignored
238    }
239
240    fn on_end(&self, span: SpanData) {
241        if !span.span_context.is_sampled() {
242            return;
243        }
244
245        let result = self.message_sender.try_send(BatchMessage::ExportSpan(span));
246
247        if let Err(err) = result {
248            global::handle_error(TraceError::Other(err.into()));
249        }
250    }
251
252    fn force_flush(&self) -> TraceResult<()> {
253        let (res_sender, res_receiver) = oneshot::channel();
254        self.message_sender
255            .try_send(BatchMessage::Flush(Some(res_sender)))
256            .map_err(|err| TraceError::Other(err.into()))?;
257
258        futures_executor::block_on(res_receiver)
259            .map_err(|err| TraceError::Other(err.into()))
260            .and_then(|identity| identity)
261    }
262
263    fn shutdown(&self) -> TraceResult<()> {
264        let (res_sender, res_receiver) = oneshot::channel();
265        self.message_sender
266            .try_send(BatchMessage::Shutdown(res_sender))
267            .map_err(|err| TraceError::Other(err.into()))?;
268
269        futures_executor::block_on(res_receiver)
270            .map_err(|err| TraceError::Other(err.into()))
271            .and_then(|identity| identity)
272    }
273
274    fn set_resource(&mut self, resource: &Resource) {
275        let resource = Arc::new(resource.clone());
276        let _ = self
277            .message_sender
278            .try_send(BatchMessage::SetResource(resource));
279    }
280}
281
282/// Messages sent between application thread and batch span processor's work thread.
283// In this enum the size difference is not a concern because:
284// 1. If we wrap SpanData into a pointer, it will add overhead when processing.
285// 2. Most of the messages will be ExportSpan.
286#[allow(clippy::large_enum_variant)]
287#[derive(Debug)]
288enum BatchMessage {
289    /// Export spans, usually called when span ends
290    ExportSpan(SpanData),
291    /// Flush the current buffer to the backend, it can be triggered by
292    /// pre configured interval or a call to `force_push` function.
293    Flush(Option<oneshot::Sender<ExportResult>>),
294    /// Shut down the worker thread, push all spans in buffer to the backend.
295    Shutdown(oneshot::Sender<ExportResult>),
296    /// Set the resource for the exporter.
297    SetResource(Arc<Resource>),
298}
299
300struct BatchSpanProcessorInternal<R> {
301    spans: Vec<SpanData>,
302    export_tasks: FuturesUnordered<BoxFuture<'static, ExportResult>>,
303    runtime: R,
304    exporter: Box<dyn SpanExporter>,
305    config: BatchConfig,
306}
307
308impl<R: RuntimeChannel> BatchSpanProcessorInternal<R> {
309    async fn flush(&mut self, res_channel: Option<oneshot::Sender<ExportResult>>) {
310        let export_task = self.export();
311        let task = Box::pin(async move {
312            let result = export_task.await;
313
314            if let Some(channel) = res_channel {
315                if let Err(result) = channel.send(result) {
316                    global::handle_error(TraceError::from(format!(
317                        "failed to send flush result: {:?}",
318                        result
319                    )));
320                }
321            } else if let Err(err) = result {
322                global::handle_error(err);
323            }
324
325            Ok(())
326        });
327
328        if self.config.max_concurrent_exports == 1 {
329            let _ = task.await;
330        } else {
331            self.export_tasks.push(task);
332            while self.export_tasks.next().await.is_some() {}
333        }
334    }
335
336    /// Process a single message
337    ///
338    /// A return value of false indicates shutdown
339    async fn process_message(&mut self, message: BatchMessage) -> bool {
340        match message {
341            // Span has finished, add to buffer of pending spans.
342            BatchMessage::ExportSpan(span) => {
343                self.spans.push(span);
344
345                if self.spans.len() == self.config.max_export_batch_size {
346                    // If concurrent exports are saturated, wait for one to complete.
347                    if !self.export_tasks.is_empty()
348                        && self.export_tasks.len() == self.config.max_concurrent_exports
349                    {
350                        self.export_tasks.next().await;
351                    }
352
353                    let export_task = self.export();
354                    let task = async move {
355                        if let Err(err) = export_task.await {
356                            global::handle_error(err);
357                        }
358
359                        Ok(())
360                    };
361                    // Special case when not using concurrent exports
362                    if self.config.max_concurrent_exports == 1 {
363                        let _ = task.await;
364                    } else {
365                        self.export_tasks.push(Box::pin(task));
366                    }
367                }
368            }
369            // Span batch interval time reached or a force flush has been invoked, export
370            // current spans.
371            //
372            // This is a hint to ensure that any tasks associated with Spans for which the
373            // SpanProcessor had already received events prior to the call to ForceFlush
374            // SHOULD be completed as soon as possible, preferably before returning from
375            // this method.
376            //
377            // In particular, if any SpanProcessor has any associated exporter, it SHOULD
378            // try to call the exporter's Export with all spans for which this was not
379            // already done and then invoke ForceFlush on it. The built-in SpanProcessors
380            // MUST do so. If a timeout is specified (see below), the SpanProcessor MUST
381            // prioritize honoring the timeout over finishing all calls. It MAY skip or
382            // abort some or all Export or ForceFlush calls it has made to achieve this
383            // goal.
384            //
385            // NB: `force_flush` is not currently implemented on exporters; the equivalent
386            // would be waiting for exporter tasks to complete. In the case of
387            // channel-coupled exporters, they will need a `force_flush` implementation to
388            // properly block.
389            BatchMessage::Flush(res_channel) => {
390                self.flush(res_channel).await;
391            }
392            // Stream has terminated or processor is shutdown, return to finish execution.
393            BatchMessage::Shutdown(ch) => {
394                self.flush(Some(ch)).await;
395                self.exporter.shutdown();
396                return false;
397            }
398            // propagate the resource
399            BatchMessage::SetResource(resource) => {
400                self.exporter.set_resource(&resource);
401            }
402        }
403        true
404    }
405
406    fn export(&mut self) -> BoxFuture<'static, ExportResult> {
407        // Batch size check for flush / shutdown. Those methods may be called
408        // when there's no work to do.
409        if self.spans.is_empty() {
410            return Box::pin(future::ready(Ok(())));
411        }
412
413        let export = self.exporter.export(self.spans.split_off(0));
414        let timeout = self.runtime.delay(self.config.max_export_timeout);
415        let time_out = self.config.max_export_timeout;
416
417        Box::pin(async move {
418            match future::select(export, timeout).await {
419                Either::Left((export_res, _)) => export_res,
420                Either::Right((_, _)) => ExportResult::Err(TraceError::ExportTimedOut(time_out)),
421            }
422        })
423    }
424
425    async fn run(mut self, mut messages: impl FusedStream<Item = BatchMessage> + Unpin) {
426        loop {
427            select! {
428                // FuturesUnordered implements Fuse intelligently such that it
429                // will become eligible again once new tasks are added to it.
430                _ = self.export_tasks.next() => {
431                    // An export task completed; do we need to do anything with it?
432                },
433                message = messages.next() => {
434                    match message {
435                        Some(message) => {
436                            if !self.process_message(message).await {
437                                break;
438                            }
439                        },
440                        None => break,
441                    }
442                },
443            }
444        }
445    }
446}
447
448impl<R: RuntimeChannel> BatchSpanProcessor<R> {
449    pub(crate) fn new(exporter: Box<dyn SpanExporter>, config: BatchConfig, runtime: R) -> Self {
450        let (message_sender, message_receiver) =
451            runtime.batch_message_channel(config.max_queue_size);
452        let ticker = runtime
453            .interval(config.scheduled_delay)
454            .map(|_| BatchMessage::Flush(None));
455        let timeout_runtime = runtime.clone();
456
457        let messages = Box::pin(stream::select(message_receiver, ticker));
458        let processor = BatchSpanProcessorInternal {
459            spans: Vec::new(),
460            export_tasks: FuturesUnordered::new(),
461            runtime: timeout_runtime,
462            config,
463            exporter,
464        };
465
466        // Spawn worker process via user-defined spawn function.
467        runtime.spawn(Box::pin(processor.run(messages)));
468
469        // Return batch processor with link to worker
470        BatchSpanProcessor { message_sender }
471    }
472
473    /// Create a new batch processor builder
474    pub fn builder<E>(exporter: E, runtime: R) -> BatchSpanProcessorBuilder<E, R>
475    where
476        E: SpanExporter,
477    {
478        BatchSpanProcessorBuilder {
479            exporter,
480            config: Default::default(),
481            runtime,
482        }
483    }
484}
485
486/// Batch span processor configuration.
487/// Use [`BatchConfigBuilder`] to configure your own instance of [`BatchConfig`].
488#[derive(Debug)]
489pub struct BatchConfig {
490    /// The maximum queue size to buffer spans for delayed processing. If the
491    /// queue gets full it drops the spans. The default value of is 2048.
492    max_queue_size: usize,
493
494    /// The delay interval in milliseconds between two consecutive processing
495    /// of batches. The default value is 5 seconds.
496    scheduled_delay: Duration,
497
498    /// The maximum number of spans to process in a single batch. If there are
499    /// more than one batch worth of spans then it processes multiple batches
500    /// of spans one batch after the other without any delay. The default value
501    /// is 512.
502    max_export_batch_size: usize,
503
504    /// The maximum duration to export a batch of data.
505    max_export_timeout: Duration,
506
507    /// Maximum number of concurrent exports
508    ///
509    /// Limits the number of spawned tasks for exports and thus memory consumed
510    /// by an exporter. A value of 1 will cause exports to be performed
511    /// synchronously on the BatchSpanProcessor task.
512    max_concurrent_exports: usize,
513}
514
515impl Default for BatchConfig {
516    fn default() -> Self {
517        BatchConfigBuilder::default().build()
518    }
519}
520
521/// A builder for creating [`BatchConfig`] instances.
522#[derive(Debug)]
523pub struct BatchConfigBuilder {
524    max_queue_size: usize,
525    scheduled_delay: Duration,
526    max_export_batch_size: usize,
527    max_export_timeout: Duration,
528    max_concurrent_exports: usize,
529}
530
531impl Default for BatchConfigBuilder {
532    /// Create a new [`BatchConfigBuilder`] initialized with default batch config values as per the specs.
533    /// The values are overriden by environment variables if set.
534    /// The supported environment variables are:
535    /// * `OTEL_BSP_MAX_QUEUE_SIZE`
536    /// * `OTEL_BSP_SCHEDULE_DELAY`
537    /// * `OTEL_BSP_MAX_EXPORT_BATCH_SIZE`
538    /// * `OTEL_BSP_EXPORT_TIMEOUT`
539    /// * `OTEL_BSP_MAX_CONCURRENT_EXPORTS`
540    fn default() -> Self {
541        BatchConfigBuilder {
542            max_queue_size: OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
543            scheduled_delay: Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_DEFAULT),
544            max_export_batch_size: OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
545            max_export_timeout: Duration::from_millis(OTEL_BSP_EXPORT_TIMEOUT_DEFAULT),
546            max_concurrent_exports: OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT,
547        }
548        .init_from_env_vars()
549    }
550}
551
552impl BatchConfigBuilder {
553    /// Set max_queue_size for [`BatchConfigBuilder`].
554    /// It's the maximum queue size to buffer spans for delayed processing.
555    /// If the queue gets full it will drops the spans.
556    /// The default value of is 2048.
557    pub fn with_max_queue_size(mut self, max_queue_size: usize) -> Self {
558        self.max_queue_size = max_queue_size;
559        self
560    }
561
562    /// Set max_export_batch_size for [`BatchConfigBuilder`].
563    /// It's the maximum number of spans to process in a single batch. If there are
564    /// more than one batch worth of spans then it processes multiple batches
565    /// of spans one batch after the other without any delay. The default value
566    /// is 512.
567    pub fn with_max_export_batch_size(mut self, max_export_batch_size: usize) -> Self {
568        self.max_export_batch_size = max_export_batch_size;
569        self
570    }
571
572    /// Set max_concurrent_exports for [`BatchConfigBuilder`].
573    /// It's the maximum number of concurrent exports.
574    /// Limits the number of spawned tasks for exports and thus memory consumed by an exporter.
575    /// The default value is 1.
576    /// IF the max_concurrent_exports value is default value, it will cause exports to be performed
577    /// synchronously on the BatchSpanProcessor task.
578    pub fn with_max_concurrent_exports(mut self, max_concurrent_exports: usize) -> Self {
579        self.max_concurrent_exports = max_concurrent_exports;
580        self
581    }
582
583    /// Set scheduled_delay_duration for [`BatchConfigBuilder`].
584    /// It's the delay interval in milliseconds between two consecutive processing of batches.
585    /// The default value is 5000 milliseconds.
586    pub fn with_scheduled_delay(mut self, scheduled_delay: Duration) -> Self {
587        self.scheduled_delay = scheduled_delay;
588        self
589    }
590
591    /// Set max_export_timeout for [`BatchConfigBuilder`].
592    /// It's the maximum duration to export a batch of data.
593    /// The The default value is 30000 milliseconds.
594    pub fn with_max_export_timeout(mut self, max_export_timeout: Duration) -> Self {
595        self.max_export_timeout = max_export_timeout;
596        self
597    }
598
599    /// Builds a `BatchConfig` enforcing the following invariants:
600    /// * `max_export_batch_size` must be less than or equal to `max_queue_size`.
601    pub fn build(self) -> BatchConfig {
602        // max export batch size must be less or equal to max queue size.
603        // we set max export batch size to max queue size if it's larger than max queue size.
604        let max_export_batch_size = min(self.max_export_batch_size, self.max_queue_size);
605
606        BatchConfig {
607            max_queue_size: self.max_queue_size,
608            scheduled_delay: self.scheduled_delay,
609            max_export_timeout: self.max_export_timeout,
610            max_concurrent_exports: self.max_concurrent_exports,
611            max_export_batch_size,
612        }
613    }
614
615    fn init_from_env_vars(mut self) -> Self {
616        if let Some(max_concurrent_exports) = env::var(OTEL_BSP_MAX_CONCURRENT_EXPORTS)
617            .ok()
618            .and_then(|max_concurrent_exports| usize::from_str(&max_concurrent_exports).ok())
619        {
620            self.max_concurrent_exports = max_concurrent_exports;
621        }
622
623        if let Some(max_queue_size) = env::var(OTEL_BSP_MAX_QUEUE_SIZE)
624            .ok()
625            .and_then(|queue_size| usize::from_str(&queue_size).ok())
626        {
627            self.max_queue_size = max_queue_size;
628        }
629
630        if let Some(scheduled_delay) = env::var(OTEL_BSP_SCHEDULE_DELAY)
631            .ok()
632            .and_then(|delay| u64::from_str(&delay).ok())
633        {
634            self.scheduled_delay = Duration::from_millis(scheduled_delay);
635        }
636
637        if let Some(max_export_batch_size) = env::var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE)
638            .ok()
639            .and_then(|batch_size| usize::from_str(&batch_size).ok())
640        {
641            self.max_export_batch_size = max_export_batch_size;
642        }
643
644        // max export batch size must be less or equal to max queue size.
645        // we set max export batch size to max queue size if it's larger than max queue size.
646        if self.max_export_batch_size > self.max_queue_size {
647            self.max_export_batch_size = self.max_queue_size;
648        }
649
650        if let Some(max_export_timeout) = env::var(OTEL_BSP_EXPORT_TIMEOUT)
651            .ok()
652            .and_then(|timeout| u64::from_str(&timeout).ok())
653        {
654            self.max_export_timeout = Duration::from_millis(max_export_timeout);
655        }
656
657        self
658    }
659}
660
661/// A builder for creating [`BatchSpanProcessor`] instances.
662///
663#[derive(Debug)]
664pub struct BatchSpanProcessorBuilder<E, R> {
665    exporter: E,
666    config: BatchConfig,
667    runtime: R,
668}
669
670impl<E, R> BatchSpanProcessorBuilder<E, R>
671where
672    E: SpanExporter + 'static,
673    R: RuntimeChannel,
674{
675    /// Set the BatchConfig for [BatchSpanProcessorBuilder]
676    pub fn with_batch_config(self, config: BatchConfig) -> Self {
677        BatchSpanProcessorBuilder { config, ..self }
678    }
679
680    /// Build a batch processor
681    pub fn build(self) -> BatchSpanProcessor<R> {
682        BatchSpanProcessor::new(Box::new(self.exporter), self.config, self.runtime)
683    }
684}
685
686#[cfg(all(test, feature = "testing", feature = "trace"))]
687mod tests {
688    // cargo test trace::span_processor::tests:: --features=testing
689    use super::{
690        BatchSpanProcessor, SimpleSpanProcessor, SpanProcessor, OTEL_BSP_EXPORT_TIMEOUT,
691        OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
692        OTEL_BSP_SCHEDULE_DELAY, OTEL_BSP_SCHEDULE_DELAY_DEFAULT,
693    };
694    use crate::export::trace::{ExportResult, SpanData, SpanExporter};
695    use crate::runtime;
696    use crate::testing::trace::{
697        new_test_export_span_data, new_tokio_test_exporter, InMemorySpanExporterBuilder,
698    };
699    use crate::trace::span_processor::{
700        OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, OTEL_BSP_MAX_CONCURRENT_EXPORTS,
701        OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
702    };
703    use crate::trace::{BatchConfig, BatchConfigBuilder, SpanEvents, SpanLinks};
704    use async_trait::async_trait;
705    use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status};
706    use std::fmt::Debug;
707    use std::future::Future;
708    use std::time::Duration;
709
710    #[test]
711    fn simple_span_processor_on_end_calls_export() {
712        let exporter = InMemorySpanExporterBuilder::new().build();
713        let processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
714        let span_data = new_test_export_span_data();
715        processor.on_end(span_data.clone());
716        assert_eq!(exporter.get_finished_spans().unwrap()[0], span_data);
717        let _result = processor.shutdown();
718    }
719
720    #[test]
721    fn simple_span_processor_on_end_skips_export_if_not_sampled() {
722        let exporter = InMemorySpanExporterBuilder::new().build();
723        let processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
724        let unsampled = SpanData {
725            span_context: SpanContext::empty_context(),
726            parent_span_id: SpanId::INVALID,
727            span_kind: SpanKind::Internal,
728            name: "opentelemetry".into(),
729            start_time: opentelemetry::time::now(),
730            end_time: opentelemetry::time::now(),
731            attributes: Vec::new(),
732            dropped_attributes_count: 0,
733            events: SpanEvents::default(),
734            links: SpanLinks::default(),
735            status: Status::Unset,
736            instrumentation_lib: Default::default(),
737        };
738        processor.on_end(unsampled);
739        assert!(exporter.get_finished_spans().unwrap().is_empty());
740    }
741
742    #[test]
743    fn simple_span_processor_shutdown_calls_shutdown() {
744        let exporter = InMemorySpanExporterBuilder::new().build();
745        let processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
746        let span_data = new_test_export_span_data();
747        processor.on_end(span_data.clone());
748        assert!(!exporter.get_finished_spans().unwrap().is_empty());
749        let _result = processor.shutdown();
750        // Assume shutdown is called by ensuring spans are empty in the exporter
751        assert!(exporter.get_finished_spans().unwrap().is_empty());
752    }
753
754    #[test]
755    fn test_default_const_values() {
756        assert_eq!(OTEL_BSP_MAX_QUEUE_SIZE, "OTEL_BSP_MAX_QUEUE_SIZE");
757        assert_eq!(OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, 2048);
758        assert_eq!(OTEL_BSP_SCHEDULE_DELAY, "OTEL_BSP_SCHEDULE_DELAY");
759        assert_eq!(OTEL_BSP_SCHEDULE_DELAY_DEFAULT, 5000);
760        assert_eq!(
761            OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
762            "OTEL_BSP_MAX_EXPORT_BATCH_SIZE"
763        );
764        assert_eq!(OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT, 512);
765        assert_eq!(OTEL_BSP_EXPORT_TIMEOUT, "OTEL_BSP_EXPORT_TIMEOUT");
766        assert_eq!(OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, 30000);
767    }
768
769    #[test]
770    fn test_default_batch_config_adheres_to_specification() {
771        let env_vars = vec![
772            OTEL_BSP_SCHEDULE_DELAY,
773            OTEL_BSP_EXPORT_TIMEOUT,
774            OTEL_BSP_MAX_QUEUE_SIZE,
775            OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
776            OTEL_BSP_MAX_CONCURRENT_EXPORTS,
777        ];
778
779        let config = temp_env::with_vars_unset(env_vars, BatchConfig::default);
780
781        assert_eq!(
782            config.max_concurrent_exports,
783            OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT
784        );
785        assert_eq!(
786            config.scheduled_delay,
787            Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_DEFAULT)
788        );
789        assert_eq!(
790            config.max_export_timeout,
791            Duration::from_millis(OTEL_BSP_EXPORT_TIMEOUT_DEFAULT)
792        );
793        assert_eq!(config.max_queue_size, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT);
794        assert_eq!(
795            config.max_export_batch_size,
796            OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT
797        );
798    }
799
800    #[test]
801    fn test_batch_config_configurable_by_env_vars() {
802        let env_vars = vec![
803            (OTEL_BSP_SCHEDULE_DELAY, Some("2000")),
804            (OTEL_BSP_EXPORT_TIMEOUT, Some("60000")),
805            (OTEL_BSP_MAX_QUEUE_SIZE, Some("4096")),
806            (OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
807        ];
808
809        let config = temp_env::with_vars(env_vars, BatchConfig::default);
810
811        assert_eq!(config.scheduled_delay, Duration::from_millis(2000));
812        assert_eq!(config.max_export_timeout, Duration::from_millis(60000));
813        assert_eq!(config.max_queue_size, 4096);
814        assert_eq!(config.max_export_batch_size, 1024);
815    }
816
817    #[test]
818    fn test_batch_config_max_export_batch_size_validation() {
819        let env_vars = vec![
820            (OTEL_BSP_MAX_QUEUE_SIZE, Some("256")),
821            (OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
822        ];
823
824        let config = temp_env::with_vars(env_vars, BatchConfig::default);
825
826        assert_eq!(config.max_queue_size, 256);
827        assert_eq!(config.max_export_batch_size, 256);
828        assert_eq!(
829            config.scheduled_delay,
830            Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_DEFAULT)
831        );
832        assert_eq!(
833            config.max_export_timeout,
834            Duration::from_millis(OTEL_BSP_EXPORT_TIMEOUT_DEFAULT)
835        );
836    }
837
838    #[test]
839    fn test_batch_config_with_fields() {
840        let batch = BatchConfigBuilder::default()
841            .with_max_export_batch_size(10)
842            .with_scheduled_delay(Duration::from_millis(10))
843            .with_max_export_timeout(Duration::from_millis(10))
844            .with_max_concurrent_exports(10)
845            .with_max_queue_size(10)
846            .build();
847        assert_eq!(batch.max_export_batch_size, 10);
848        assert_eq!(batch.scheduled_delay, Duration::from_millis(10));
849        assert_eq!(batch.max_export_timeout, Duration::from_millis(10));
850        assert_eq!(batch.max_concurrent_exports, 10);
851        assert_eq!(batch.max_queue_size, 10);
852    }
853
854    #[test]
855    fn test_build_batch_span_processor_builder() {
856        let mut env_vars = vec![
857            (OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("500")),
858            (OTEL_BSP_SCHEDULE_DELAY, Some("I am not number")),
859            (OTEL_BSP_EXPORT_TIMEOUT, Some("2046")),
860        ];
861        temp_env::with_vars(env_vars.clone(), || {
862            let builder = BatchSpanProcessor::builder(
863                InMemorySpanExporterBuilder::new().build(),
864                runtime::Tokio,
865            );
866            // export batch size cannot exceed max queue size
867            assert_eq!(builder.config.max_export_batch_size, 500);
868            assert_eq!(
869                builder.config.scheduled_delay,
870                Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_DEFAULT)
871            );
872            assert_eq!(
873                builder.config.max_queue_size,
874                OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT
875            );
876            assert_eq!(
877                builder.config.max_export_timeout,
878                Duration::from_millis(2046)
879            );
880        });
881
882        env_vars.push((OTEL_BSP_MAX_QUEUE_SIZE, Some("120")));
883
884        temp_env::with_vars(env_vars, || {
885            let builder = BatchSpanProcessor::builder(
886                InMemorySpanExporterBuilder::new().build(),
887                runtime::Tokio,
888            );
889            assert_eq!(builder.config.max_export_batch_size, 120);
890            assert_eq!(builder.config.max_queue_size, 120);
891        });
892    }
893
894    #[tokio::test]
895    async fn test_batch_span_processor() {
896        let (exporter, mut export_receiver, _shutdown_receiver) = new_tokio_test_exporter();
897        let config = BatchConfig {
898            scheduled_delay: Duration::from_secs(60 * 60 * 24), // set the tick to 24 hours so we know the span must be exported via force_flush
899            ..Default::default()
900        };
901        let processor =
902            BatchSpanProcessor::new(Box::new(exporter), config, runtime::TokioCurrentThread);
903        let handle = tokio::spawn(async move {
904            loop {
905                if let Some(span) = export_receiver.recv().await {
906                    assert_eq!(span.span_context, new_test_export_span_data().span_context);
907                    break;
908                }
909            }
910        });
911        tokio::time::sleep(Duration::from_secs(1)).await; // skip the first
912        processor.on_end(new_test_export_span_data());
913        let flush_res = processor.force_flush();
914        assert!(flush_res.is_ok());
915        let _shutdown_result = processor.shutdown();
916
917        assert!(
918            tokio::time::timeout(Duration::from_secs(5), handle)
919                .await
920                .is_ok(),
921            "timed out in 5 seconds. force_flush may not export any data when called"
922        );
923    }
924
925    struct BlockingExporter<D> {
926        delay_for: Duration,
927        delay_fn: D,
928    }
929
930    impl<D, DS> Debug for BlockingExporter<D>
931    where
932        D: Fn(Duration) -> DS + 'static + Send + Sync,
933        DS: Future<Output = ()> + Send + Sync + 'static,
934    {
935        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
936            f.write_str("blocking exporter for testing")
937        }
938    }
939
940    #[async_trait]
941    impl<D, DS> SpanExporter for BlockingExporter<D>
942    where
943        D: Fn(Duration) -> DS + 'static + Send + Sync,
944        DS: Future<Output = ()> + Send + Sync + 'static,
945    {
946        fn export(
947            &mut self,
948            _batch: Vec<SpanData>,
949        ) -> futures_util::future::BoxFuture<'static, ExportResult> {
950            use futures_util::FutureExt;
951            Box::pin((self.delay_fn)(self.delay_for).map(|_| Ok(())))
952        }
953    }
954
955    #[test]
956    fn test_timeout_tokio_timeout() {
957        // If time_out is true, then we ask exporter to block for 60s and set timeout to 5s.
958        // If time_out is false, then we ask the exporter to block for 5s and set timeout to 60s.
959        // Either way, the test should be finished within 5s.
960        let runtime = tokio::runtime::Builder::new_multi_thread()
961            .enable_all()
962            .build()
963            .unwrap();
964        runtime.block_on(timeout_test_tokio(true));
965    }
966
967    #[test]
968    fn test_timeout_tokio_not_timeout() {
969        let runtime = tokio::runtime::Builder::new_multi_thread()
970            .enable_all()
971            .build()
972            .unwrap();
973        runtime.block_on(timeout_test_tokio(false));
974    }
975
976    #[test]
977    #[cfg(feature = "rt-async-std")]
978    fn test_timeout_async_std_timeout() {
979        async_std::task::block_on(timeout_test_std_async(true));
980    }
981
982    #[test]
983    #[cfg(feature = "rt-async-std")]
984    fn test_timeout_async_std_not_timeout() {
985        async_std::task::block_on(timeout_test_std_async(false));
986    }
987
988    // If the time_out is true, then the result suppose to ended with timeout.
989    // otherwise the exporter should be able to export within time out duration.
990    #[cfg(feature = "rt-async-std")]
991    async fn timeout_test_std_async(time_out: bool) {
992        let config = BatchConfig {
993            max_export_timeout: Duration::from_millis(if time_out { 5 } else { 60 }),
994            scheduled_delay: Duration::from_secs(60 * 60 * 24), // set the tick to 24 hours so we know the span must be exported via force_flush
995            ..Default::default()
996        };
997        let exporter = BlockingExporter {
998            delay_for: Duration::from_millis(if !time_out { 5 } else { 60 }),
999            delay_fn: async_std::task::sleep,
1000        };
1001        let processor = BatchSpanProcessor::new(Box::new(exporter), config, runtime::AsyncStd);
1002        processor.on_end(new_test_export_span_data());
1003        let flush_res = processor.force_flush();
1004        if time_out {
1005            assert!(flush_res.is_err());
1006        } else {
1007            assert!(flush_res.is_ok());
1008        }
1009        let shutdown_res = processor.shutdown();
1010        assert!(shutdown_res.is_ok());
1011    }
1012
1013    // If the time_out is true, then the result suppose to ended with timeout.
1014    // otherwise the exporter should be able to export within time out duration.
1015    async fn timeout_test_tokio(time_out: bool) {
1016        let config = BatchConfig {
1017            max_export_timeout: Duration::from_millis(if time_out { 5 } else { 60 }),
1018            scheduled_delay: Duration::from_secs(60 * 60 * 24), // set the tick to 24 hours so we know the span must be exported via force_flush,
1019            ..Default::default()
1020        };
1021        let exporter = BlockingExporter {
1022            delay_for: Duration::from_millis(if !time_out { 5 } else { 60 }),
1023            delay_fn: tokio::time::sleep,
1024        };
1025        let processor =
1026            BatchSpanProcessor::new(Box::new(exporter), config, runtime::TokioCurrentThread);
1027        tokio::time::sleep(Duration::from_secs(1)).await; // skip the first
1028        processor.on_end(new_test_export_span_data());
1029        let flush_res = processor.force_flush();
1030        if time_out {
1031            assert!(flush_res.is_err());
1032        } else {
1033            assert!(flush_res.is_ok());
1034        }
1035        let shutdown_res = processor.shutdown();
1036        assert!(shutdown_res.is_ok());
1037    }
1038}