1use crate::export::trace::{ExportResult, SpanData, SpanExporter};
38use crate::resource::Resource;
39use crate::runtime::{RuntimeChannel, TrySend};
40use crate::trace::Span;
41use futures_channel::oneshot;
42use futures_util::{
43 future::{self, BoxFuture, Either},
44 select,
45 stream::{self, FusedStream, FuturesUnordered},
46 StreamExt as _,
47};
48use opentelemetry::global;
49use opentelemetry::{
50 trace::{TraceError, TraceResult},
51 Context,
52};
53use std::cmp::min;
54use std::sync::{Arc, Mutex};
55use std::{env, fmt, str::FromStr, time::Duration};
56
57const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY";
59const OTEL_BSP_SCHEDULE_DELAY_DEFAULT: u64 = 5_000;
61const OTEL_BSP_MAX_QUEUE_SIZE: &str = "OTEL_BSP_MAX_QUEUE_SIZE";
63const OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048;
65const OTEL_BSP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE";
67const OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
69const OTEL_BSP_EXPORT_TIMEOUT: &str = "OTEL_BSP_EXPORT_TIMEOUT";
71const OTEL_BSP_EXPORT_TIMEOUT_DEFAULT: u64 = 30_000;
73const OTEL_BSP_MAX_CONCURRENT_EXPORTS: &str = "OTEL_BSP_MAX_CONCURRENT_EXPORTS";
76const OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT: usize = 1;
78
79pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
83 fn on_start(&self, span: &mut Span, cx: &Context);
87 fn on_end(&self, span: SpanData);
91 fn force_flush(&self) -> TraceResult<()>;
93 fn shutdown(&self) -> TraceResult<()>;
98 fn set_resource(&mut self, _resource: &Resource) {}
100}
101
102#[derive(Debug)]
107pub struct SimpleSpanProcessor {
108 exporter: Mutex<Box<dyn SpanExporter>>,
109}
110
111impl SimpleSpanProcessor {
112 pub(crate) fn new(exporter: Box<dyn SpanExporter>) -> Self {
113 Self {
114 exporter: Mutex::new(exporter),
115 }
116 }
117}
118
119impl SpanProcessor for SimpleSpanProcessor {
120 fn on_start(&self, _span: &mut Span, _cx: &Context) {
121 }
123
124 fn on_end(&self, span: SpanData) {
125 if !span.span_context.is_sampled() {
126 return;
127 }
128
129 let result = self
130 .exporter
131 .lock()
132 .map_err(|_| TraceError::Other("SimpleSpanProcessor mutex poison".into()))
133 .and_then(|mut exporter| futures_executor::block_on(exporter.export(vec![span])));
134
135 if let Err(err) = result {
136 global::handle_error(err);
137 }
138 }
139
140 fn force_flush(&self) -> TraceResult<()> {
141 Ok(())
143 }
144
145 fn shutdown(&self) -> TraceResult<()> {
146 if let Ok(mut exporter) = self.exporter.lock() {
147 exporter.shutdown();
148 Ok(())
149 } else {
150 Err(TraceError::Other(
151 "SimpleSpanProcessor mutex poison at shutdown".into(),
152 ))
153 }
154 }
155
156 fn set_resource(&mut self, resource: &Resource) {
157 if let Ok(mut exporter) = self.exporter.lock() {
158 exporter.set_resource(resource);
159 }
160 }
161}
162
163pub struct BatchSpanProcessor<R: RuntimeChannel> {
224 message_sender: R::Sender<BatchMessage>,
225}
226
227impl<R: RuntimeChannel> fmt::Debug for BatchSpanProcessor<R> {
228 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
229 f.debug_struct("BatchSpanProcessor")
230 .field("message_sender", &self.message_sender)
231 .finish()
232 }
233}
234
235impl<R: RuntimeChannel> SpanProcessor for BatchSpanProcessor<R> {
236 fn on_start(&self, _span: &mut Span, _cx: &Context) {
237 }
239
240 fn on_end(&self, span: SpanData) {
241 if !span.span_context.is_sampled() {
242 return;
243 }
244
245 let result = self.message_sender.try_send(BatchMessage::ExportSpan(span));
246
247 if let Err(err) = result {
248 global::handle_error(TraceError::Other(err.into()));
249 }
250 }
251
252 fn force_flush(&self) -> TraceResult<()> {
253 let (res_sender, res_receiver) = oneshot::channel();
254 self.message_sender
255 .try_send(BatchMessage::Flush(Some(res_sender)))
256 .map_err(|err| TraceError::Other(err.into()))?;
257
258 futures_executor::block_on(res_receiver)
259 .map_err(|err| TraceError::Other(err.into()))
260 .and_then(|identity| identity)
261 }
262
263 fn shutdown(&self) -> TraceResult<()> {
264 let (res_sender, res_receiver) = oneshot::channel();
265 self.message_sender
266 .try_send(BatchMessage::Shutdown(res_sender))
267 .map_err(|err| TraceError::Other(err.into()))?;
268
269 futures_executor::block_on(res_receiver)
270 .map_err(|err| TraceError::Other(err.into()))
271 .and_then(|identity| identity)
272 }
273
274 fn set_resource(&mut self, resource: &Resource) {
275 let resource = Arc::new(resource.clone());
276 let _ = self
277 .message_sender
278 .try_send(BatchMessage::SetResource(resource));
279 }
280}
281
282#[allow(clippy::large_enum_variant)]
287#[derive(Debug)]
288enum BatchMessage {
289 ExportSpan(SpanData),
291 Flush(Option<oneshot::Sender<ExportResult>>),
294 Shutdown(oneshot::Sender<ExportResult>),
296 SetResource(Arc<Resource>),
298}
299
300struct BatchSpanProcessorInternal<R> {
301 spans: Vec<SpanData>,
302 export_tasks: FuturesUnordered<BoxFuture<'static, ExportResult>>,
303 runtime: R,
304 exporter: Box<dyn SpanExporter>,
305 config: BatchConfig,
306}
307
308impl<R: RuntimeChannel> BatchSpanProcessorInternal<R> {
309 async fn flush(&mut self, res_channel: Option<oneshot::Sender<ExportResult>>) {
310 let export_task = self.export();
311 let task = Box::pin(async move {
312 let result = export_task.await;
313
314 if let Some(channel) = res_channel {
315 if let Err(result) = channel.send(result) {
316 global::handle_error(TraceError::from(format!(
317 "failed to send flush result: {:?}",
318 result
319 )));
320 }
321 } else if let Err(err) = result {
322 global::handle_error(err);
323 }
324
325 Ok(())
326 });
327
328 if self.config.max_concurrent_exports == 1 {
329 let _ = task.await;
330 } else {
331 self.export_tasks.push(task);
332 while self.export_tasks.next().await.is_some() {}
333 }
334 }
335
336 async fn process_message(&mut self, message: BatchMessage) -> bool {
340 match message {
341 BatchMessage::ExportSpan(span) => {
343 self.spans.push(span);
344
345 if self.spans.len() == self.config.max_export_batch_size {
346 if !self.export_tasks.is_empty()
348 && self.export_tasks.len() == self.config.max_concurrent_exports
349 {
350 self.export_tasks.next().await;
351 }
352
353 let export_task = self.export();
354 let task = async move {
355 if let Err(err) = export_task.await {
356 global::handle_error(err);
357 }
358
359 Ok(())
360 };
361 if self.config.max_concurrent_exports == 1 {
363 let _ = task.await;
364 } else {
365 self.export_tasks.push(Box::pin(task));
366 }
367 }
368 }
369 BatchMessage::Flush(res_channel) => {
390 self.flush(res_channel).await;
391 }
392 BatchMessage::Shutdown(ch) => {
394 self.flush(Some(ch)).await;
395 self.exporter.shutdown();
396 return false;
397 }
398 BatchMessage::SetResource(resource) => {
400 self.exporter.set_resource(&resource);
401 }
402 }
403 true
404 }
405
406 fn export(&mut self) -> BoxFuture<'static, ExportResult> {
407 if self.spans.is_empty() {
410 return Box::pin(future::ready(Ok(())));
411 }
412
413 let export = self.exporter.export(self.spans.split_off(0));
414 let timeout = self.runtime.delay(self.config.max_export_timeout);
415 let time_out = self.config.max_export_timeout;
416
417 Box::pin(async move {
418 match future::select(export, timeout).await {
419 Either::Left((export_res, _)) => export_res,
420 Either::Right((_, _)) => ExportResult::Err(TraceError::ExportTimedOut(time_out)),
421 }
422 })
423 }
424
425 async fn run(mut self, mut messages: impl FusedStream<Item = BatchMessage> + Unpin) {
426 loop {
427 select! {
428 _ = self.export_tasks.next() => {
431 },
433 message = messages.next() => {
434 match message {
435 Some(message) => {
436 if !self.process_message(message).await {
437 break;
438 }
439 },
440 None => break,
441 }
442 },
443 }
444 }
445 }
446}
447
448impl<R: RuntimeChannel> BatchSpanProcessor<R> {
449 pub(crate) fn new(exporter: Box<dyn SpanExporter>, config: BatchConfig, runtime: R) -> Self {
450 let (message_sender, message_receiver) =
451 runtime.batch_message_channel(config.max_queue_size);
452 let ticker = runtime
453 .interval(config.scheduled_delay)
454 .map(|_| BatchMessage::Flush(None));
455 let timeout_runtime = runtime.clone();
456
457 let messages = Box::pin(stream::select(message_receiver, ticker));
458 let processor = BatchSpanProcessorInternal {
459 spans: Vec::new(),
460 export_tasks: FuturesUnordered::new(),
461 runtime: timeout_runtime,
462 config,
463 exporter,
464 };
465
466 runtime.spawn(Box::pin(processor.run(messages)));
468
469 BatchSpanProcessor { message_sender }
471 }
472
473 pub fn builder<E>(exporter: E, runtime: R) -> BatchSpanProcessorBuilder<E, R>
475 where
476 E: SpanExporter,
477 {
478 BatchSpanProcessorBuilder {
479 exporter,
480 config: Default::default(),
481 runtime,
482 }
483 }
484}
485
486#[derive(Debug)]
489pub struct BatchConfig {
490 max_queue_size: usize,
493
494 scheduled_delay: Duration,
497
498 max_export_batch_size: usize,
503
504 max_export_timeout: Duration,
506
507 max_concurrent_exports: usize,
513}
514
515impl Default for BatchConfig {
516 fn default() -> Self {
517 BatchConfigBuilder::default().build()
518 }
519}
520
521#[derive(Debug)]
523pub struct BatchConfigBuilder {
524 max_queue_size: usize,
525 scheduled_delay: Duration,
526 max_export_batch_size: usize,
527 max_export_timeout: Duration,
528 max_concurrent_exports: usize,
529}
530
531impl Default for BatchConfigBuilder {
532 fn default() -> Self {
541 BatchConfigBuilder {
542 max_queue_size: OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
543 scheduled_delay: Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_DEFAULT),
544 max_export_batch_size: OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
545 max_export_timeout: Duration::from_millis(OTEL_BSP_EXPORT_TIMEOUT_DEFAULT),
546 max_concurrent_exports: OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT,
547 }
548 .init_from_env_vars()
549 }
550}
551
552impl BatchConfigBuilder {
553 pub fn with_max_queue_size(mut self, max_queue_size: usize) -> Self {
558 self.max_queue_size = max_queue_size;
559 self
560 }
561
562 pub fn with_max_export_batch_size(mut self, max_export_batch_size: usize) -> Self {
568 self.max_export_batch_size = max_export_batch_size;
569 self
570 }
571
572 pub fn with_max_concurrent_exports(mut self, max_concurrent_exports: usize) -> Self {
579 self.max_concurrent_exports = max_concurrent_exports;
580 self
581 }
582
583 pub fn with_scheduled_delay(mut self, scheduled_delay: Duration) -> Self {
587 self.scheduled_delay = scheduled_delay;
588 self
589 }
590
591 pub fn with_max_export_timeout(mut self, max_export_timeout: Duration) -> Self {
595 self.max_export_timeout = max_export_timeout;
596 self
597 }
598
599 pub fn build(self) -> BatchConfig {
602 let max_export_batch_size = min(self.max_export_batch_size, self.max_queue_size);
605
606 BatchConfig {
607 max_queue_size: self.max_queue_size,
608 scheduled_delay: self.scheduled_delay,
609 max_export_timeout: self.max_export_timeout,
610 max_concurrent_exports: self.max_concurrent_exports,
611 max_export_batch_size,
612 }
613 }
614
615 fn init_from_env_vars(mut self) -> Self {
616 if let Some(max_concurrent_exports) = env::var(OTEL_BSP_MAX_CONCURRENT_EXPORTS)
617 .ok()
618 .and_then(|max_concurrent_exports| usize::from_str(&max_concurrent_exports).ok())
619 {
620 self.max_concurrent_exports = max_concurrent_exports;
621 }
622
623 if let Some(max_queue_size) = env::var(OTEL_BSP_MAX_QUEUE_SIZE)
624 .ok()
625 .and_then(|queue_size| usize::from_str(&queue_size).ok())
626 {
627 self.max_queue_size = max_queue_size;
628 }
629
630 if let Some(scheduled_delay) = env::var(OTEL_BSP_SCHEDULE_DELAY)
631 .ok()
632 .and_then(|delay| u64::from_str(&delay).ok())
633 {
634 self.scheduled_delay = Duration::from_millis(scheduled_delay);
635 }
636
637 if let Some(max_export_batch_size) = env::var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE)
638 .ok()
639 .and_then(|batch_size| usize::from_str(&batch_size).ok())
640 {
641 self.max_export_batch_size = max_export_batch_size;
642 }
643
644 if self.max_export_batch_size > self.max_queue_size {
647 self.max_export_batch_size = self.max_queue_size;
648 }
649
650 if let Some(max_export_timeout) = env::var(OTEL_BSP_EXPORT_TIMEOUT)
651 .ok()
652 .and_then(|timeout| u64::from_str(&timeout).ok())
653 {
654 self.max_export_timeout = Duration::from_millis(max_export_timeout);
655 }
656
657 self
658 }
659}
660
661#[derive(Debug)]
664pub struct BatchSpanProcessorBuilder<E, R> {
665 exporter: E,
666 config: BatchConfig,
667 runtime: R,
668}
669
670impl<E, R> BatchSpanProcessorBuilder<E, R>
671where
672 E: SpanExporter + 'static,
673 R: RuntimeChannel,
674{
675 pub fn with_batch_config(self, config: BatchConfig) -> Self {
677 BatchSpanProcessorBuilder { config, ..self }
678 }
679
680 pub fn build(self) -> BatchSpanProcessor<R> {
682 BatchSpanProcessor::new(Box::new(self.exporter), self.config, self.runtime)
683 }
684}
685
686#[cfg(all(test, feature = "testing", feature = "trace"))]
687mod tests {
688 use super::{
690 BatchSpanProcessor, SimpleSpanProcessor, SpanProcessor, OTEL_BSP_EXPORT_TIMEOUT,
691 OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
692 OTEL_BSP_SCHEDULE_DELAY, OTEL_BSP_SCHEDULE_DELAY_DEFAULT,
693 };
694 use crate::export::trace::{ExportResult, SpanData, SpanExporter};
695 use crate::runtime;
696 use crate::testing::trace::{
697 new_test_export_span_data, new_tokio_test_exporter, InMemorySpanExporterBuilder,
698 };
699 use crate::trace::span_processor::{
700 OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, OTEL_BSP_MAX_CONCURRENT_EXPORTS,
701 OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
702 };
703 use crate::trace::{BatchConfig, BatchConfigBuilder, SpanEvents, SpanLinks};
704 use async_trait::async_trait;
705 use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status};
706 use std::fmt::Debug;
707 use std::future::Future;
708 use std::time::Duration;
709
710 #[test]
711 fn simple_span_processor_on_end_calls_export() {
712 let exporter = InMemorySpanExporterBuilder::new().build();
713 let processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
714 let span_data = new_test_export_span_data();
715 processor.on_end(span_data.clone());
716 assert_eq!(exporter.get_finished_spans().unwrap()[0], span_data);
717 let _result = processor.shutdown();
718 }
719
720 #[test]
721 fn simple_span_processor_on_end_skips_export_if_not_sampled() {
722 let exporter = InMemorySpanExporterBuilder::new().build();
723 let processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
724 let unsampled = SpanData {
725 span_context: SpanContext::empty_context(),
726 parent_span_id: SpanId::INVALID,
727 span_kind: SpanKind::Internal,
728 name: "opentelemetry".into(),
729 start_time: opentelemetry::time::now(),
730 end_time: opentelemetry::time::now(),
731 attributes: Vec::new(),
732 dropped_attributes_count: 0,
733 events: SpanEvents::default(),
734 links: SpanLinks::default(),
735 status: Status::Unset,
736 instrumentation_lib: Default::default(),
737 };
738 processor.on_end(unsampled);
739 assert!(exporter.get_finished_spans().unwrap().is_empty());
740 }
741
742 #[test]
743 fn simple_span_processor_shutdown_calls_shutdown() {
744 let exporter = InMemorySpanExporterBuilder::new().build();
745 let processor = SimpleSpanProcessor::new(Box::new(exporter.clone()));
746 let span_data = new_test_export_span_data();
747 processor.on_end(span_data.clone());
748 assert!(!exporter.get_finished_spans().unwrap().is_empty());
749 let _result = processor.shutdown();
750 assert!(exporter.get_finished_spans().unwrap().is_empty());
752 }
753
754 #[test]
755 fn test_default_const_values() {
756 assert_eq!(OTEL_BSP_MAX_QUEUE_SIZE, "OTEL_BSP_MAX_QUEUE_SIZE");
757 assert_eq!(OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, 2048);
758 assert_eq!(OTEL_BSP_SCHEDULE_DELAY, "OTEL_BSP_SCHEDULE_DELAY");
759 assert_eq!(OTEL_BSP_SCHEDULE_DELAY_DEFAULT, 5000);
760 assert_eq!(
761 OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
762 "OTEL_BSP_MAX_EXPORT_BATCH_SIZE"
763 );
764 assert_eq!(OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT, 512);
765 assert_eq!(OTEL_BSP_EXPORT_TIMEOUT, "OTEL_BSP_EXPORT_TIMEOUT");
766 assert_eq!(OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, 30000);
767 }
768
769 #[test]
770 fn test_default_batch_config_adheres_to_specification() {
771 let env_vars = vec![
772 OTEL_BSP_SCHEDULE_DELAY,
773 OTEL_BSP_EXPORT_TIMEOUT,
774 OTEL_BSP_MAX_QUEUE_SIZE,
775 OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
776 OTEL_BSP_MAX_CONCURRENT_EXPORTS,
777 ];
778
779 let config = temp_env::with_vars_unset(env_vars, BatchConfig::default);
780
781 assert_eq!(
782 config.max_concurrent_exports,
783 OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT
784 );
785 assert_eq!(
786 config.scheduled_delay,
787 Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_DEFAULT)
788 );
789 assert_eq!(
790 config.max_export_timeout,
791 Duration::from_millis(OTEL_BSP_EXPORT_TIMEOUT_DEFAULT)
792 );
793 assert_eq!(config.max_queue_size, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT);
794 assert_eq!(
795 config.max_export_batch_size,
796 OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT
797 );
798 }
799
800 #[test]
801 fn test_batch_config_configurable_by_env_vars() {
802 let env_vars = vec![
803 (OTEL_BSP_SCHEDULE_DELAY, Some("2000")),
804 (OTEL_BSP_EXPORT_TIMEOUT, Some("60000")),
805 (OTEL_BSP_MAX_QUEUE_SIZE, Some("4096")),
806 (OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
807 ];
808
809 let config = temp_env::with_vars(env_vars, BatchConfig::default);
810
811 assert_eq!(config.scheduled_delay, Duration::from_millis(2000));
812 assert_eq!(config.max_export_timeout, Duration::from_millis(60000));
813 assert_eq!(config.max_queue_size, 4096);
814 assert_eq!(config.max_export_batch_size, 1024);
815 }
816
817 #[test]
818 fn test_batch_config_max_export_batch_size_validation() {
819 let env_vars = vec![
820 (OTEL_BSP_MAX_QUEUE_SIZE, Some("256")),
821 (OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
822 ];
823
824 let config = temp_env::with_vars(env_vars, BatchConfig::default);
825
826 assert_eq!(config.max_queue_size, 256);
827 assert_eq!(config.max_export_batch_size, 256);
828 assert_eq!(
829 config.scheduled_delay,
830 Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_DEFAULT)
831 );
832 assert_eq!(
833 config.max_export_timeout,
834 Duration::from_millis(OTEL_BSP_EXPORT_TIMEOUT_DEFAULT)
835 );
836 }
837
838 #[test]
839 fn test_batch_config_with_fields() {
840 let batch = BatchConfigBuilder::default()
841 .with_max_export_batch_size(10)
842 .with_scheduled_delay(Duration::from_millis(10))
843 .with_max_export_timeout(Duration::from_millis(10))
844 .with_max_concurrent_exports(10)
845 .with_max_queue_size(10)
846 .build();
847 assert_eq!(batch.max_export_batch_size, 10);
848 assert_eq!(batch.scheduled_delay, Duration::from_millis(10));
849 assert_eq!(batch.max_export_timeout, Duration::from_millis(10));
850 assert_eq!(batch.max_concurrent_exports, 10);
851 assert_eq!(batch.max_queue_size, 10);
852 }
853
854 #[test]
855 fn test_build_batch_span_processor_builder() {
856 let mut env_vars = vec![
857 (OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("500")),
858 (OTEL_BSP_SCHEDULE_DELAY, Some("I am not number")),
859 (OTEL_BSP_EXPORT_TIMEOUT, Some("2046")),
860 ];
861 temp_env::with_vars(env_vars.clone(), || {
862 let builder = BatchSpanProcessor::builder(
863 InMemorySpanExporterBuilder::new().build(),
864 runtime::Tokio,
865 );
866 assert_eq!(builder.config.max_export_batch_size, 500);
868 assert_eq!(
869 builder.config.scheduled_delay,
870 Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_DEFAULT)
871 );
872 assert_eq!(
873 builder.config.max_queue_size,
874 OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT
875 );
876 assert_eq!(
877 builder.config.max_export_timeout,
878 Duration::from_millis(2046)
879 );
880 });
881
882 env_vars.push((OTEL_BSP_MAX_QUEUE_SIZE, Some("120")));
883
884 temp_env::with_vars(env_vars, || {
885 let builder = BatchSpanProcessor::builder(
886 InMemorySpanExporterBuilder::new().build(),
887 runtime::Tokio,
888 );
889 assert_eq!(builder.config.max_export_batch_size, 120);
890 assert_eq!(builder.config.max_queue_size, 120);
891 });
892 }
893
894 #[tokio::test]
895 async fn test_batch_span_processor() {
896 let (exporter, mut export_receiver, _shutdown_receiver) = new_tokio_test_exporter();
897 let config = BatchConfig {
898 scheduled_delay: Duration::from_secs(60 * 60 * 24), ..Default::default()
900 };
901 let processor =
902 BatchSpanProcessor::new(Box::new(exporter), config, runtime::TokioCurrentThread);
903 let handle = tokio::spawn(async move {
904 loop {
905 if let Some(span) = export_receiver.recv().await {
906 assert_eq!(span.span_context, new_test_export_span_data().span_context);
907 break;
908 }
909 }
910 });
911 tokio::time::sleep(Duration::from_secs(1)).await; processor.on_end(new_test_export_span_data());
913 let flush_res = processor.force_flush();
914 assert!(flush_res.is_ok());
915 let _shutdown_result = processor.shutdown();
916
917 assert!(
918 tokio::time::timeout(Duration::from_secs(5), handle)
919 .await
920 .is_ok(),
921 "timed out in 5 seconds. force_flush may not export any data when called"
922 );
923 }
924
925 struct BlockingExporter<D> {
926 delay_for: Duration,
927 delay_fn: D,
928 }
929
930 impl<D, DS> Debug for BlockingExporter<D>
931 where
932 D: Fn(Duration) -> DS + 'static + Send + Sync,
933 DS: Future<Output = ()> + Send + Sync + 'static,
934 {
935 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
936 f.write_str("blocking exporter for testing")
937 }
938 }
939
940 #[async_trait]
941 impl<D, DS> SpanExporter for BlockingExporter<D>
942 where
943 D: Fn(Duration) -> DS + 'static + Send + Sync,
944 DS: Future<Output = ()> + Send + Sync + 'static,
945 {
946 fn export(
947 &mut self,
948 _batch: Vec<SpanData>,
949 ) -> futures_util::future::BoxFuture<'static, ExportResult> {
950 use futures_util::FutureExt;
951 Box::pin((self.delay_fn)(self.delay_for).map(|_| Ok(())))
952 }
953 }
954
955 #[test]
956 fn test_timeout_tokio_timeout() {
957 let runtime = tokio::runtime::Builder::new_multi_thread()
961 .enable_all()
962 .build()
963 .unwrap();
964 runtime.block_on(timeout_test_tokio(true));
965 }
966
967 #[test]
968 fn test_timeout_tokio_not_timeout() {
969 let runtime = tokio::runtime::Builder::new_multi_thread()
970 .enable_all()
971 .build()
972 .unwrap();
973 runtime.block_on(timeout_test_tokio(false));
974 }
975
976 #[test]
977 #[cfg(feature = "rt-async-std")]
978 fn test_timeout_async_std_timeout() {
979 async_std::task::block_on(timeout_test_std_async(true));
980 }
981
982 #[test]
983 #[cfg(feature = "rt-async-std")]
984 fn test_timeout_async_std_not_timeout() {
985 async_std::task::block_on(timeout_test_std_async(false));
986 }
987
988 #[cfg(feature = "rt-async-std")]
991 async fn timeout_test_std_async(time_out: bool) {
992 let config = BatchConfig {
993 max_export_timeout: Duration::from_millis(if time_out { 5 } else { 60 }),
994 scheduled_delay: Duration::from_secs(60 * 60 * 24), ..Default::default()
996 };
997 let exporter = BlockingExporter {
998 delay_for: Duration::from_millis(if !time_out { 5 } else { 60 }),
999 delay_fn: async_std::task::sleep,
1000 };
1001 let processor = BatchSpanProcessor::new(Box::new(exporter), config, runtime::AsyncStd);
1002 processor.on_end(new_test_export_span_data());
1003 let flush_res = processor.force_flush();
1004 if time_out {
1005 assert!(flush_res.is_err());
1006 } else {
1007 assert!(flush_res.is_ok());
1008 }
1009 let shutdown_res = processor.shutdown();
1010 assert!(shutdown_res.is_ok());
1011 }
1012
1013 async fn timeout_test_tokio(time_out: bool) {
1016 let config = BatchConfig {
1017 max_export_timeout: Duration::from_millis(if time_out { 5 } else { 60 }),
1018 scheduled_delay: Duration::from_secs(60 * 60 * 24), ..Default::default()
1020 };
1021 let exporter = BlockingExporter {
1022 delay_for: Duration::from_millis(if !time_out { 5 } else { 60 }),
1023 delay_fn: tokio::time::sleep,
1024 };
1025 let processor =
1026 BatchSpanProcessor::new(Box::new(exporter), config, runtime::TokioCurrentThread);
1027 tokio::time::sleep(Duration::from_secs(1)).await; processor.on_end(new_test_export_span_data());
1029 let flush_res = processor.force_flush();
1030 if time_out {
1031 assert!(flush_res.is_err());
1032 } else {
1033 assert!(flush_res.is_ok());
1034 }
1035 let shutdown_res = processor.shutdown();
1036 assert!(shutdown_res.is_ok());
1037 }
1038}