Skip to main content

opentelemetry_sdk/trace/
span_processor_with_async_runtime.rs

1use crate::error::{OTelSdkError, OTelSdkResult};
2use crate::resource::Resource;
3use crate::runtime::{to_interval_stream, RuntimeChannel, TrySend};
4use crate::trace::BatchConfig;
5use crate::trace::Span;
6use crate::trace::SpanProcessor;
7use crate::trace::{SpanData, SpanExporter};
8use futures_channel::oneshot;
9use futures_util::{
10    future::{self, BoxFuture, Either},
11    pin_mut, select,
12    stream::{self, FusedStream, FuturesUnordered},
13    StreamExt as _,
14};
15use opentelemetry::Context;
16use opentelemetry::{otel_debug, otel_error, otel_warn};
17use std::fmt;
18use std::sync::{
19    atomic::{AtomicUsize, Ordering},
20    Arc,
21};
22use std::time::Duration;
23use tokio::sync::RwLock;
24
25/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports
26/// them at a preconfigured interval.
27///
28/// Batch span processors need to run a background task to collect and send
29/// spans. Different runtimes need different ways to handle the background task.
30///
31/// Note: Configuring an opentelemetry `Runtime` that's not compatible with the
32/// underlying runtime can cause deadlocks (see tokio section).
33///
34/// ### Use with Tokio
35///
36/// Tokio currently offers two different schedulers. One is
37/// `current_thread_scheduler`, the other is `multiple_thread_scheduler`. Both
38/// of them default to use batch span processors to install span exporters.
39///
40/// Tokio's `current_thread_scheduler` can cause the program to hang forever if
41/// blocking work is scheduled with other tasks in the same runtime. To avoid
42/// this, be sure to enable the `rt-tokio-current-thread` feature in this crate
43/// if you are using that runtime (e.g. users of actix-web), and blocking tasks
44/// will then be scheduled on a different thread.
45///
46/// # Examples
47///
48/// This processor can be configured with an [`executor`] of your choice to
49/// batch and upload spans asynchronously when they end. If you have added a
50/// library like [`tokio`], you can pass in their respective
51/// `spawn` and `interval` functions to have batching performed in those
52/// contexts.
53///
54/// ```
55/// # #[cfg(feature="tokio")]
56/// # {
57/// use opentelemetry::global;
58/// use opentelemetry_sdk::{runtime, testing::trace::NoopSpanExporter, trace};
59/// use opentelemetry_sdk::trace::BatchConfigBuilder;
60/// use std::time::Duration;
61/// use opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor;
62///
63/// #[tokio::main]
64/// async fn main() {
65///     // Configure your preferred exporter
66///     let exporter = NoopSpanExporter::new();
67///
68///     // Create a batch span processor using an exporter and a runtime
69///     let batch = BatchSpanProcessor::builder(exporter, runtime::Tokio)
70///         .with_batch_config(BatchConfigBuilder::default().with_max_queue_size(4096).build())
71///         .build();
72///
73///     // Then use the `with_batch_exporter` method to have the provider export spans in batches.
74///     let provider = trace::SdkTracerProvider::builder()
75///         .with_span_processor(batch)
76///         .build();
77///
78///     let _ = global::set_tracer_provider(provider);
79/// }
80/// # }
81/// ```
82///
83/// [`executor`]: https://docs.rs/futures/0.3/futures/executor/index.html
84/// [`tokio`]: https://tokio.rs
85pub struct BatchSpanProcessor<R: RuntimeChannel> {
86    message_sender: R::Sender<BatchMessage>,
87
88    // Track dropped spans
89    dropped_spans_count: AtomicUsize,
90
91    // Track the maximum queue size that was configured for this processor
92    max_queue_size: usize,
93}
94
95impl<R: RuntimeChannel> fmt::Debug for BatchSpanProcessor<R> {
96    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97        f.debug_struct("BatchSpanProcessor")
98            .field("message_sender", &self.message_sender)
99            .finish()
100    }
101}
102
103impl<R: RuntimeChannel> SpanProcessor for BatchSpanProcessor<R> {
104    fn on_start(&self, _span: &mut Span, _cx: &Context) {
105        // Ignored
106    }
107
108    fn on_end(&self, span: SpanData) {
109        if !span.span_context.is_sampled() {
110            return;
111        }
112
113        let result = self.message_sender.try_send(BatchMessage::ExportSpan(span));
114
115        // If the queue is full, and we can't buffer a span
116        if result.is_err() {
117            // Increment the number of dropped spans. If this is the first time we've had to drop,
118            // emit a warning.
119            if self.dropped_spans_count.fetch_add(1, Ordering::Relaxed) == 0 {
120                otel_warn!(name: "BatchSpanProcessor.SpanDroppingStarted",
121                    message = "Beginning to drop span messages due to full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total spans dropped.");
122            }
123        }
124    }
125
126    fn force_flush(&self) -> OTelSdkResult {
127        let (res_sender, res_receiver) = oneshot::channel();
128        self.message_sender
129            .try_send(BatchMessage::Flush(Some(res_sender)))
130            .map_err(|err| {
131                OTelSdkError::InternalFailure(format!("Failed to send flush message: {err}"))
132            })?;
133
134        futures_executor::block_on(res_receiver).map_err(|err| {
135            OTelSdkError::InternalFailure(format!("Flush response channel error: {err}"))
136        })?
137    }
138
139    fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
140        let dropped_spans = self.dropped_spans_count.load(Ordering::Relaxed);
141        let max_queue_size = self.max_queue_size;
142        if dropped_spans > 0 {
143            otel_warn!(
144                name: "BatchSpanProcessor.Shutdown",
145                dropped_spans = dropped_spans,
146                max_queue_size = max_queue_size,
147                message = "Spans were dropped due to a full or closed queue. The count represents the total count of span records dropped in the lifetime of the BatchSpanProcessor. Consider increasing the queue size and/or decrease delay between intervals."
148            );
149        }
150
151        let (res_sender, res_receiver) = oneshot::channel();
152        self.message_sender
153            .try_send(BatchMessage::Shutdown(res_sender))
154            .map_err(|err| {
155                OTelSdkError::InternalFailure(format!("Failed to send shutdown message: {err}"))
156            })?;
157
158        futures_executor::block_on(res_receiver).map_err(|err| {
159            OTelSdkError::InternalFailure(format!("Shutdown response channel error: {err}"))
160        })?
161    }
162
163    fn set_resource(&mut self, resource: &Resource) {
164        let resource = Arc::new(resource.clone());
165        let _ = self
166            .message_sender
167            .try_send(BatchMessage::SetResource(resource));
168    }
169}
170
171/// Messages sent between application thread and batch span processor's work thread.
172// In this enum the size difference is not a concern because:
173// 1. If we wrap SpanData into a pointer, it will add overhead when processing.
174// 2. Most of the messages will be ExportSpan.
175#[allow(clippy::large_enum_variant)]
176#[derive(Debug)]
177enum BatchMessage {
178    /// Export spans, usually called when span ends
179    ExportSpan(SpanData),
180    /// Flush the current buffer to the backend, it can be triggered by
181    /// pre configured interval or a call to `force_push` function.
182    Flush(Option<oneshot::Sender<OTelSdkResult>>),
183    /// Shut down the worker thread, push all spans in buffer to the backend.
184    Shutdown(oneshot::Sender<OTelSdkResult>),
185    /// Set the resource for the exporter.
186    SetResource(Arc<Resource>),
187}
188
189struct BatchSpanProcessorInternal<E, R> {
190    spans: Vec<SpanData>,
191    export_tasks: FuturesUnordered<BoxFuture<'static, OTelSdkResult>>,
192    runtime: R,
193    config: BatchConfig,
194    // TODO: Redesign the `SpanExporter` trait to use immutable references (`&self`)
195    // for all methods. This would allow us to remove the `RwLock` and just use `Arc<E>`,
196    // similar to how `crate::logs::LogExporter` is implemented.
197    exporter: Arc<RwLock<E>>,
198}
199
200impl<E: SpanExporter + 'static, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
201    async fn flush(&mut self, res_channel: Option<oneshot::Sender<OTelSdkResult>>) {
202        let export_result = Self::export(
203            self.spans.split_off(0),
204            self.exporter.clone(),
205            self.runtime.clone(),
206            self.config.max_export_timeout,
207        )
208        .await;
209        let task = Box::pin(async move {
210            if let Some(channel) = res_channel {
211                // If a response channel is provided, attempt to send the export result through it.
212                if let Err(result) = channel.send(export_result) {
213                    otel_debug!(
214                        name: "BatchSpanProcessor.Flush.SendResultError",
215                        reason = format!("{:?}", result)
216                    );
217                }
218            } else if let Err(err) = export_result {
219                // If no channel is provided and the export operation encountered an error,
220                // log the error directly here.
221                // TODO: Consider returning the status instead of logging it.
222                otel_error!(
223                    name: "BatchSpanProcessor.Flush.ExportError",
224                    reason = format!("{:?}", err),
225                    message = "Failed during the export process"
226                );
227            }
228
229            Ok(())
230        });
231
232        if self.config.max_concurrent_exports == 1 {
233            let _ = task.await;
234        } else {
235            self.export_tasks.push(task);
236            while self.export_tasks.next().await.is_some() {}
237        }
238    }
239
240    /// Process a single message
241    ///
242    /// A return value of false indicates shutdown
243    async fn process_message(&mut self, message: BatchMessage) -> bool {
244        match message {
245            // Span has finished, add to buffer of pending spans.
246            BatchMessage::ExportSpan(span) => {
247                self.spans.push(span);
248
249                if self.spans.len() == self.config.max_export_batch_size {
250                    // If concurrent exports are saturated, wait for one to complete.
251                    if !self.export_tasks.is_empty()
252                        && self.export_tasks.len() == self.config.max_concurrent_exports
253                    {
254                        self.export_tasks.next().await;
255                    }
256
257                    let batch = self.spans.split_off(0);
258                    let exporter = self.exporter.clone();
259                    let runtime = self.runtime.clone();
260                    let max_export_timeout = self.config.max_export_timeout;
261
262                    let task = async move {
263                        if let Err(err) =
264                            Self::export(batch, exporter, runtime, max_export_timeout).await
265                        {
266                            otel_error!(
267                                name: "BatchSpanProcessor.Export.Error",
268                                reason = format!("{}", err)
269                            );
270                        }
271
272                        Ok(())
273                    };
274
275                    // Special case when not using concurrent exports
276                    if self.config.max_concurrent_exports == 1 {
277                        let _ = task.await;
278                    } else {
279                        self.export_tasks.push(Box::pin(task));
280                    }
281                }
282            }
283            // Span batch interval time reached or a force flush has been invoked, export
284            // current spans.
285            //
286            // This is a hint to ensure that any tasks associated with Spans for which the
287            // SpanProcessor had already received events prior to the call to ForceFlush
288            // SHOULD be completed as soon as possible, preferably before returning from
289            // this method.
290            //
291            // In particular, if any SpanProcessor has any associated exporter, it SHOULD
292            // try to call the exporter's Export with all spans for which this was not
293            // already done and then invoke ForceFlush on it. The built-in SpanProcessors
294            // MUST do so. If a timeout is specified (see below), the SpanProcessor MUST
295            // prioritize honoring the timeout over finishing all calls. It MAY skip or
296            // abort some or all Export or ForceFlush calls it has made to achieve this
297            // goal.
298            //
299            // NB: `force_flush` is not currently implemented on exporters; the equivalent
300            // would be waiting for exporter tasks to complete. In the case of
301            // channel-coupled exporters, they will need a `force_flush` implementation to
302            // properly block.
303            BatchMessage::Flush(res_channel) => {
304                self.flush(res_channel).await;
305            }
306            // Stream has terminated or processor is shutdown, return to finish execution.
307            BatchMessage::Shutdown(ch) => {
308                self.flush(Some(ch)).await;
309                let _ = self.exporter.write().await.shutdown();
310                return false;
311            }
312            // propagate the resource
313            BatchMessage::SetResource(resource) => {
314                self.exporter.write().await.set_resource(&resource);
315            }
316        }
317        true
318    }
319
320    async fn export(
321        batch: Vec<SpanData>,
322        exporter: Arc<RwLock<E>>,
323        runtime: R,
324        max_export_timeout: Duration,
325    ) -> OTelSdkResult {
326        // Batch size check for flush / shutdown. Those methods may be called
327        // when there's no work to do.
328        if batch.is_empty() {
329            return Ok(());
330        }
331
332        let exporter_guard = exporter.read().await;
333        let export = exporter_guard.export(batch);
334        let timeout = runtime.delay(max_export_timeout);
335
336        pin_mut!(export);
337        pin_mut!(timeout);
338
339        match future::select(export, timeout).await {
340            Either::Left((export_res, _)) => export_res,
341            Either::Right((_, _)) => Err(OTelSdkError::Timeout(max_export_timeout)),
342        }
343    }
344
345    async fn run(mut self, mut messages: impl FusedStream<Item = BatchMessage> + Unpin) {
346        loop {
347            select! {
348                // FuturesUnordered implements Fuse intelligently such that it
349                // will become eligible again once new tasks are added to it.
350                _ = self.export_tasks.next() => {
351                    // An export task completed; do we need to do anything with it?
352                },
353                message = messages.next() => {
354                    match message {
355                        Some(message) => {
356                            if !self.process_message(message).await {
357                                break;
358                            }
359                        },
360                        None => break,
361                    }
362                },
363            }
364        }
365    }
366}
367
368impl<R: RuntimeChannel> BatchSpanProcessor<R> {
369    pub(crate) fn new<E>(exporter: E, config: BatchConfig, runtime: R) -> Self
370    where
371        E: SpanExporter + Send + Sync + 'static,
372    {
373        let (message_sender, message_receiver) =
374            runtime.batch_message_channel(config.max_queue_size);
375
376        let max_queue_size = config.max_queue_size;
377
378        let inner_runtime = runtime.clone();
379        // Spawn worker process via user-defined spawn function.
380        runtime.spawn(async move {
381            // Timer will take a reference to the current runtime, so its important we do this within the
382            // runtime.spawn()
383            let ticker = to_interval_stream(inner_runtime.clone(), config.scheduled_delay)
384                .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval.
385                .map(|_| BatchMessage::Flush(None));
386            let timeout_runtime = inner_runtime.clone();
387
388            let messages = Box::pin(stream::select(message_receiver, ticker));
389            let processor = BatchSpanProcessorInternal {
390                spans: Vec::new(),
391                export_tasks: FuturesUnordered::new(),
392                runtime: timeout_runtime,
393                config,
394                exporter: Arc::new(RwLock::new(exporter)),
395            };
396
397            processor.run(messages).await
398        });
399
400        // Return batch processor with link to worker
401        BatchSpanProcessor {
402            message_sender,
403            dropped_spans_count: AtomicUsize::new(0),
404            max_queue_size,
405        }
406    }
407
408    /// Create a new batch processor builder
409    pub fn builder<E>(exporter: E, runtime: R) -> BatchSpanProcessorBuilder<E, R>
410    where
411        E: SpanExporter,
412    {
413        BatchSpanProcessorBuilder {
414            exporter,
415            config: Default::default(),
416            runtime,
417        }
418    }
419}
420
421/// A builder for creating [`BatchSpanProcessor`] instances.
422///
423#[derive(Debug)]
424pub struct BatchSpanProcessorBuilder<E, R> {
425    exporter: E,
426    config: BatchConfig,
427    runtime: R,
428}
429
430impl<E, R> BatchSpanProcessorBuilder<E, R>
431where
432    E: SpanExporter + 'static,
433    R: RuntimeChannel,
434{
435    /// Set the BatchConfig for [BatchSpanProcessorBuilder]
436    pub fn with_batch_config(self, config: BatchConfig) -> Self {
437        BatchSpanProcessorBuilder { config, ..self }
438    }
439
440    /// Build a batch processor
441    pub fn build(self) -> BatchSpanProcessor<R> {
442        BatchSpanProcessor::new(self.exporter, self.config, self.runtime)
443    }
444}
445
446#[cfg(all(test, feature = "testing", feature = "trace"))]
447mod tests {
448    // cargo test trace::span_processor::tests:: --features=testing
449    use super::{BatchSpanProcessor, SpanProcessor};
450    use crate::error::OTelSdkResult;
451    use crate::runtime;
452    use crate::testing::trace::{new_test_export_span_data, new_tokio_test_exporter};
453    use crate::trace::span_processor::{
454        OTEL_BSP_EXPORT_TIMEOUT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE,
455        OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BSP_SCHEDULE_DELAY, OTEL_BSP_SCHEDULE_DELAY_DEFAULT,
456    };
457    use crate::trace::{BatchConfig, BatchConfigBuilder, InMemorySpanExporterBuilder};
458    use crate::trace::{SpanData, SpanExporter};
459    use futures_util::Future;
460    use std::fmt::Debug;
461    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
462    use std::sync::Arc;
463    use std::time::Duration;
464
465    struct BlockingExporter<D> {
466        delay_for: Duration,
467        delay_fn: D,
468    }
469
470    impl<D, DS> Debug for BlockingExporter<D>
471    where
472        D: Fn(Duration) -> DS + 'static + Send + Sync,
473        DS: Future<Output = ()> + Send + Sync + 'static,
474    {
475        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
476            f.write_str("blocking exporter for testing")
477        }
478    }
479
480    impl<D, DS> SpanExporter for BlockingExporter<D>
481    where
482        D: Fn(Duration) -> DS + 'static + Send + Sync,
483        DS: Future<Output = ()> + Send + Sync + 'static,
484    {
485        async fn export(&self, _batch: Vec<SpanData>) -> OTelSdkResult {
486            (self.delay_fn)(self.delay_for).await;
487            Ok(())
488        }
489    }
490
491    /// Exporter that records whether two exports overlap in time.
492    struct TrackingExporter {
493        /// Artificial delay to keep each export alive for a while.
494        delay: Duration,
495        /// Current number of in-flight exports.
496        active: Arc<AtomicUsize>,
497        /// Set to true the first time we see overlap.
498        concurrent_seen: Arc<AtomicBool>,
499    }
500
501    impl Debug for TrackingExporter {
502        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
503            f.write_str("tracking exporter")
504        }
505    }
506
507    impl SpanExporter for TrackingExporter {
508        async fn export(&self, _batch: Vec<SpanData>) -> crate::error::OTelSdkResult {
509            // Increment in-flight counter and note any overlap.
510            let inflight = self.active.fetch_add(1, Ordering::SeqCst) + 1;
511            if inflight > 1 {
512                self.concurrent_seen.store(true, Ordering::SeqCst);
513            }
514
515            // Keep the export "busy" for a bit.
516            tokio::time::sleep(self.delay).await;
517
518            // Decrement counter.
519            self.active.fetch_sub(1, Ordering::SeqCst);
520            Ok(())
521        }
522    }
523
524    #[test]
525    fn test_build_batch_span_processor_builder() {
526        let mut env_vars = vec![
527            (OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("500")),
528            (OTEL_BSP_SCHEDULE_DELAY, Some("I am not number")),
529            (OTEL_BSP_EXPORT_TIMEOUT, Some("2046")),
530        ];
531        temp_env::with_vars(env_vars.clone(), || {
532            let builder = BatchSpanProcessor::builder(
533                InMemorySpanExporterBuilder::new().build(),
534                runtime::Tokio,
535            );
536            // export batch size cannot exceed max queue size
537            assert_eq!(builder.config.max_export_batch_size, 500);
538            assert_eq!(
539                builder.config.scheduled_delay,
540                OTEL_BSP_SCHEDULE_DELAY_DEFAULT
541            );
542            assert_eq!(
543                builder.config.max_queue_size,
544                OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT
545            );
546            assert_eq!(
547                builder.config.max_export_timeout,
548                Duration::from_millis(2046)
549            );
550        });
551
552        env_vars.push((OTEL_BSP_MAX_QUEUE_SIZE, Some("120")));
553
554        temp_env::with_vars(env_vars, || {
555            let builder = BatchSpanProcessor::builder(
556                InMemorySpanExporterBuilder::new().build(),
557                runtime::Tokio,
558            );
559            assert_eq!(builder.config.max_export_batch_size, 120);
560            assert_eq!(builder.config.max_queue_size, 120);
561        });
562    }
563
564    #[tokio::test]
565    async fn test_batch_span_processor() {
566        let (exporter, mut export_receiver, _shutdown_receiver) = new_tokio_test_exporter();
567        let config = BatchConfigBuilder::default()
568            .with_scheduled_delay(Duration::from_secs(60 * 60 * 24)) // set the tick to 24 hours so we know the span must be exported via force_flush
569            .build();
570        let processor = BatchSpanProcessor::new(exporter, config, runtime::TokioCurrentThread);
571        let handle = tokio::spawn(async move {
572            loop {
573                if let Some(span) = export_receiver.recv().await {
574                    assert_eq!(span.span_context, new_test_export_span_data().span_context);
575                    break;
576                }
577            }
578        });
579        tokio::time::sleep(Duration::from_secs(1)).await; // skip the first
580        processor.on_end(new_test_export_span_data());
581        let flush_res = processor.force_flush();
582        assert!(flush_res.is_ok());
583        let _shutdown_result = processor.shutdown();
584
585        assert!(
586            tokio::time::timeout(Duration::from_secs(5), handle)
587                .await
588                .is_ok(),
589            "timed out in 5 seconds. force_flush may not export any data when called"
590        );
591    }
592
593    // If `time_out` is `true`, then the export should fail with a timeout.
594    // Else, the exporter should be able to export within the timeout duration.
595    async fn timeout_test_tokio(time_out: bool) {
596        let config = BatchConfig {
597            max_export_timeout: Duration::from_millis(if time_out { 5 } else { 60 }),
598            scheduled_delay: Duration::from_secs(60 * 60 * 24), // set the tick to 24 hours so we know the span must be exported via force_flush,
599            ..Default::default()
600        };
601        let exporter = BlockingExporter {
602            delay_for: Duration::from_millis(if !time_out { 5 } else { 60 }),
603            delay_fn: tokio::time::sleep,
604        };
605        let processor = BatchSpanProcessor::new(exporter, config, runtime::TokioCurrentThread);
606        tokio::time::sleep(Duration::from_secs(1)).await; // skip the first
607        processor.on_end(new_test_export_span_data());
608        let flush_res = processor.force_flush();
609        if time_out {
610            assert!(flush_res.is_err());
611        } else {
612            assert!(flush_res.is_ok());
613        }
614        let shutdown_res = processor.shutdown();
615        assert!(shutdown_res.is_ok());
616    }
617
618    #[tokio::test(flavor = "multi_thread")]
619    async fn test_timeout_tokio_timeout() {
620        // If time_out is true, then we ask exporter to block for 60s and set timeout to 5s.
621        // If time_out is false, then we ask the exporter to block for 5s and set timeout to 60s.
622        // Either way, the test should be finished within 5s.
623        timeout_test_tokio(true).await;
624    }
625
626    #[tokio::test(flavor = "multi_thread")]
627    async fn test_timeout_tokio_not_timeout() {
628        timeout_test_tokio(false).await;
629    }
630
631    #[tokio::test(flavor = "multi_thread")]
632    async fn test_concurrent_exports_expected() {
633        // Shared state for the exporter.
634        let active = Arc::new(AtomicUsize::new(0));
635        let concurrent_seen = Arc::new(AtomicBool::new(false));
636
637        let exporter = TrackingExporter {
638            delay: Duration::from_millis(50),
639            active: active.clone(),
640            concurrent_seen: concurrent_seen.clone(),
641        };
642
643        // Intentionally tiny batch-size so every span forces an export.
644        let config = BatchConfig {
645            max_export_batch_size: 1,
646            max_queue_size: 16,
647            scheduled_delay: Duration::from_secs(3600), // effectively disabled
648            max_export_timeout: Duration::from_secs(5),
649            max_concurrent_exports: 2, // what we want to verify
650        };
651
652        // Spawn the processor.
653        let processor = BatchSpanProcessor::new(exporter, config, runtime::Tokio);
654
655        // Finish three spans in rapid succession.
656        processor.on_end(new_test_export_span_data());
657        processor.on_end(new_test_export_span_data());
658        processor.on_end(new_test_export_span_data());
659
660        // Wait until everything has been exported.
661        processor.force_flush().expect("force flush failed");
662        processor.shutdown().expect("shutdown failed");
663
664        // Expect at least one period with >1 export in flight.
665        assert!(
666            concurrent_seen.load(Ordering::SeqCst),
667            "exports never overlapped, processor is still serialising them"
668        );
669    }
670
671    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
672    async fn test_exports_serial_when_max_concurrent_exports_1() {
673        let active = Arc::new(AtomicUsize::new(0));
674        let concurrent_seen = Arc::new(AtomicBool::new(false));
675
676        let exporter = TrackingExporter {
677            delay: Duration::from_millis(50),
678            active: active.clone(),
679            concurrent_seen: concurrent_seen.clone(),
680        };
681
682        let config = BatchConfig {
683            max_export_batch_size: 1,
684            max_queue_size: 16,
685            scheduled_delay: Duration::from_secs(3600),
686            max_export_timeout: Duration::from_secs(5),
687            max_concurrent_exports: 1, // what we want to verify
688        };
689
690        let processor = BatchSpanProcessor::new(exporter, config, runtime::Tokio);
691
692        // Finish several spans quickly.
693        processor.on_end(new_test_export_span_data());
694        processor.on_end(new_test_export_span_data());
695        processor.on_end(new_test_export_span_data());
696
697        processor.force_flush().expect("force flush failed");
698        processor.shutdown().expect("shutdown failed");
699
700        // There must never have been more than one export in flight.
701        assert!(
702            !concurrent_seen.load(Ordering::SeqCst),
703            "exports overlapped even though max_concurrent_exports was 1"
704        );
705    }
706}