1use crate::error::{OTelSdkError, OTelSdkResult};
2use crate::resource::Resource;
3use crate::runtime::{to_interval_stream, RuntimeChannel, TrySend};
4use crate::trace::BatchConfig;
5use crate::trace::Span;
6use crate::trace::SpanProcessor;
7use crate::trace::{SpanData, SpanExporter};
8use futures_channel::oneshot;
9use futures_util::{
10 future::{self, BoxFuture, Either},
11 pin_mut, select,
12 stream::{self, FusedStream, FuturesUnordered},
13 StreamExt as _,
14};
15use opentelemetry::Context;
16use opentelemetry::{otel_debug, otel_error, otel_warn};
17use std::fmt;
18use std::sync::{
19 atomic::{AtomicUsize, Ordering},
20 Arc,
21};
22use std::time::Duration;
23use tokio::sync::RwLock;
24
25pub struct BatchSpanProcessor<R: RuntimeChannel> {
86 message_sender: R::Sender<BatchMessage>,
87
88 dropped_spans_count: AtomicUsize,
90
91 max_queue_size: usize,
93}
94
95impl<R: RuntimeChannel> fmt::Debug for BatchSpanProcessor<R> {
96 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97 f.debug_struct("BatchSpanProcessor")
98 .field("message_sender", &self.message_sender)
99 .finish()
100 }
101}
102
103impl<R: RuntimeChannel> SpanProcessor for BatchSpanProcessor<R> {
104 fn on_start(&self, _span: &mut Span, _cx: &Context) {
105 }
107
108 fn on_end(&self, span: SpanData) {
109 if !span.span_context.is_sampled() {
110 return;
111 }
112
113 let result = self.message_sender.try_send(BatchMessage::ExportSpan(span));
114
115 if result.is_err() {
117 if self.dropped_spans_count.fetch_add(1, Ordering::Relaxed) == 0 {
120 otel_warn!(name: "BatchSpanProcessor.SpanDroppingStarted",
121 message = "Beginning to drop span messages due to full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total spans dropped.");
122 }
123 }
124 }
125
126 fn force_flush(&self) -> OTelSdkResult {
127 let (res_sender, res_receiver) = oneshot::channel();
128 self.message_sender
129 .try_send(BatchMessage::Flush(Some(res_sender)))
130 .map_err(|err| {
131 OTelSdkError::InternalFailure(format!("Failed to send flush message: {err}"))
132 })?;
133
134 futures_executor::block_on(res_receiver).map_err(|err| {
135 OTelSdkError::InternalFailure(format!("Flush response channel error: {err}"))
136 })?
137 }
138
139 fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
140 let dropped_spans = self.dropped_spans_count.load(Ordering::Relaxed);
141 let max_queue_size = self.max_queue_size;
142 if dropped_spans > 0 {
143 otel_warn!(
144 name: "BatchSpanProcessor.Shutdown",
145 dropped_spans = dropped_spans,
146 max_queue_size = max_queue_size,
147 message = "Spans were dropped due to a full or closed queue. The count represents the total count of span records dropped in the lifetime of the BatchSpanProcessor. Consider increasing the queue size and/or decrease delay between intervals."
148 );
149 }
150
151 let (res_sender, res_receiver) = oneshot::channel();
152 self.message_sender
153 .try_send(BatchMessage::Shutdown(res_sender))
154 .map_err(|err| {
155 OTelSdkError::InternalFailure(format!("Failed to send shutdown message: {err}"))
156 })?;
157
158 futures_executor::block_on(res_receiver).map_err(|err| {
159 OTelSdkError::InternalFailure(format!("Shutdown response channel error: {err}"))
160 })?
161 }
162
163 fn set_resource(&mut self, resource: &Resource) {
164 let resource = Arc::new(resource.clone());
165 let _ = self
166 .message_sender
167 .try_send(BatchMessage::SetResource(resource));
168 }
169}
170
171#[allow(clippy::large_enum_variant)]
176#[derive(Debug)]
177enum BatchMessage {
178 ExportSpan(SpanData),
180 Flush(Option<oneshot::Sender<OTelSdkResult>>),
183 Shutdown(oneshot::Sender<OTelSdkResult>),
185 SetResource(Arc<Resource>),
187}
188
189struct BatchSpanProcessorInternal<E, R> {
190 spans: Vec<SpanData>,
191 export_tasks: FuturesUnordered<BoxFuture<'static, OTelSdkResult>>,
192 runtime: R,
193 config: BatchConfig,
194 exporter: Arc<RwLock<E>>,
198}
199
200impl<E: SpanExporter + 'static, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
201 async fn flush(&mut self, res_channel: Option<oneshot::Sender<OTelSdkResult>>) {
202 let export_result = Self::export(
203 self.spans.split_off(0),
204 self.exporter.clone(),
205 self.runtime.clone(),
206 self.config.max_export_timeout,
207 )
208 .await;
209 let task = Box::pin(async move {
210 if let Some(channel) = res_channel {
211 if let Err(result) = channel.send(export_result) {
213 otel_debug!(
214 name: "BatchSpanProcessor.Flush.SendResultError",
215 reason = format!("{:?}", result)
216 );
217 }
218 } else if let Err(err) = export_result {
219 otel_error!(
223 name: "BatchSpanProcessor.Flush.ExportError",
224 reason = format!("{:?}", err),
225 message = "Failed during the export process"
226 );
227 }
228
229 Ok(())
230 });
231
232 if self.config.max_concurrent_exports == 1 {
233 let _ = task.await;
234 } else {
235 self.export_tasks.push(task);
236 while self.export_tasks.next().await.is_some() {}
237 }
238 }
239
240 async fn process_message(&mut self, message: BatchMessage) -> bool {
244 match message {
245 BatchMessage::ExportSpan(span) => {
247 self.spans.push(span);
248
249 if self.spans.len() == self.config.max_export_batch_size {
250 if !self.export_tasks.is_empty()
252 && self.export_tasks.len() == self.config.max_concurrent_exports
253 {
254 self.export_tasks.next().await;
255 }
256
257 let batch = self.spans.split_off(0);
258 let exporter = self.exporter.clone();
259 let runtime = self.runtime.clone();
260 let max_export_timeout = self.config.max_export_timeout;
261
262 let task = async move {
263 if let Err(err) =
264 Self::export(batch, exporter, runtime, max_export_timeout).await
265 {
266 otel_error!(
267 name: "BatchSpanProcessor.Export.Error",
268 reason = format!("{}", err)
269 );
270 }
271
272 Ok(())
273 };
274
275 if self.config.max_concurrent_exports == 1 {
277 let _ = task.await;
278 } else {
279 self.export_tasks.push(Box::pin(task));
280 }
281 }
282 }
283 BatchMessage::Flush(res_channel) => {
304 self.flush(res_channel).await;
305 }
306 BatchMessage::Shutdown(ch) => {
308 self.flush(Some(ch)).await;
309 let _ = self.exporter.write().await.shutdown();
310 return false;
311 }
312 BatchMessage::SetResource(resource) => {
314 self.exporter.write().await.set_resource(&resource);
315 }
316 }
317 true
318 }
319
320 async fn export(
321 batch: Vec<SpanData>,
322 exporter: Arc<RwLock<E>>,
323 runtime: R,
324 max_export_timeout: Duration,
325 ) -> OTelSdkResult {
326 if batch.is_empty() {
329 return Ok(());
330 }
331
332 let exporter_guard = exporter.read().await;
333 let export = exporter_guard.export(batch);
334 let timeout = runtime.delay(max_export_timeout);
335
336 pin_mut!(export);
337 pin_mut!(timeout);
338
339 match future::select(export, timeout).await {
340 Either::Left((export_res, _)) => export_res,
341 Either::Right((_, _)) => Err(OTelSdkError::Timeout(max_export_timeout)),
342 }
343 }
344
345 async fn run(mut self, mut messages: impl FusedStream<Item = BatchMessage> + Unpin) {
346 loop {
347 select! {
348 _ = self.export_tasks.next() => {
351 },
353 message = messages.next() => {
354 match message {
355 Some(message) => {
356 if !self.process_message(message).await {
357 break;
358 }
359 },
360 None => break,
361 }
362 },
363 }
364 }
365 }
366}
367
368impl<R: RuntimeChannel> BatchSpanProcessor<R> {
369 pub(crate) fn new<E>(exporter: E, config: BatchConfig, runtime: R) -> Self
370 where
371 E: SpanExporter + Send + Sync + 'static,
372 {
373 let (message_sender, message_receiver) =
374 runtime.batch_message_channel(config.max_queue_size);
375
376 let max_queue_size = config.max_queue_size;
377
378 let inner_runtime = runtime.clone();
379 runtime.spawn(async move {
381 let ticker = to_interval_stream(inner_runtime.clone(), config.scheduled_delay)
384 .skip(1) .map(|_| BatchMessage::Flush(None));
386 let timeout_runtime = inner_runtime.clone();
387
388 let messages = Box::pin(stream::select(message_receiver, ticker));
389 let processor = BatchSpanProcessorInternal {
390 spans: Vec::new(),
391 export_tasks: FuturesUnordered::new(),
392 runtime: timeout_runtime,
393 config,
394 exporter: Arc::new(RwLock::new(exporter)),
395 };
396
397 processor.run(messages).await
398 });
399
400 BatchSpanProcessor {
402 message_sender,
403 dropped_spans_count: AtomicUsize::new(0),
404 max_queue_size,
405 }
406 }
407
408 pub fn builder<E>(exporter: E, runtime: R) -> BatchSpanProcessorBuilder<E, R>
410 where
411 E: SpanExporter,
412 {
413 BatchSpanProcessorBuilder {
414 exporter,
415 config: Default::default(),
416 runtime,
417 }
418 }
419}
420
421#[derive(Debug)]
424pub struct BatchSpanProcessorBuilder<E, R> {
425 exporter: E,
426 config: BatchConfig,
427 runtime: R,
428}
429
430impl<E, R> BatchSpanProcessorBuilder<E, R>
431where
432 E: SpanExporter + 'static,
433 R: RuntimeChannel,
434{
435 pub fn with_batch_config(self, config: BatchConfig) -> Self {
437 BatchSpanProcessorBuilder { config, ..self }
438 }
439
440 pub fn build(self) -> BatchSpanProcessor<R> {
442 BatchSpanProcessor::new(self.exporter, self.config, self.runtime)
443 }
444}
445
446#[cfg(all(test, feature = "testing", feature = "trace"))]
447mod tests {
448 use super::{BatchSpanProcessor, SpanProcessor};
450 use crate::error::OTelSdkResult;
451 use crate::runtime;
452 use crate::testing::trace::{new_test_export_span_data, new_tokio_test_exporter};
453 use crate::trace::span_processor::{
454 OTEL_BSP_EXPORT_TIMEOUT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE,
455 OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BSP_SCHEDULE_DELAY, OTEL_BSP_SCHEDULE_DELAY_DEFAULT,
456 };
457 use crate::trace::{BatchConfig, BatchConfigBuilder, InMemorySpanExporterBuilder};
458 use crate::trace::{SpanData, SpanExporter};
459 use futures_util::Future;
460 use std::fmt::Debug;
461 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
462 use std::sync::Arc;
463 use std::time::Duration;
464
465 struct BlockingExporter<D> {
466 delay_for: Duration,
467 delay_fn: D,
468 }
469
470 impl<D, DS> Debug for BlockingExporter<D>
471 where
472 D: Fn(Duration) -> DS + 'static + Send + Sync,
473 DS: Future<Output = ()> + Send + Sync + 'static,
474 {
475 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
476 f.write_str("blocking exporter for testing")
477 }
478 }
479
480 impl<D, DS> SpanExporter for BlockingExporter<D>
481 where
482 D: Fn(Duration) -> DS + 'static + Send + Sync,
483 DS: Future<Output = ()> + Send + Sync + 'static,
484 {
485 async fn export(&self, _batch: Vec<SpanData>) -> OTelSdkResult {
486 (self.delay_fn)(self.delay_for).await;
487 Ok(())
488 }
489 }
490
491 struct TrackingExporter {
493 delay: Duration,
495 active: Arc<AtomicUsize>,
497 concurrent_seen: Arc<AtomicBool>,
499 }
500
501 impl Debug for TrackingExporter {
502 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
503 f.write_str("tracking exporter")
504 }
505 }
506
507 impl SpanExporter for TrackingExporter {
508 async fn export(&self, _batch: Vec<SpanData>) -> crate::error::OTelSdkResult {
509 let inflight = self.active.fetch_add(1, Ordering::SeqCst) + 1;
511 if inflight > 1 {
512 self.concurrent_seen.store(true, Ordering::SeqCst);
513 }
514
515 tokio::time::sleep(self.delay).await;
517
518 self.active.fetch_sub(1, Ordering::SeqCst);
520 Ok(())
521 }
522 }
523
524 #[test]
525 fn test_build_batch_span_processor_builder() {
526 let mut env_vars = vec![
527 (OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("500")),
528 (OTEL_BSP_SCHEDULE_DELAY, Some("I am not number")),
529 (OTEL_BSP_EXPORT_TIMEOUT, Some("2046")),
530 ];
531 temp_env::with_vars(env_vars.clone(), || {
532 let builder = BatchSpanProcessor::builder(
533 InMemorySpanExporterBuilder::new().build(),
534 runtime::Tokio,
535 );
536 assert_eq!(builder.config.max_export_batch_size, 500);
538 assert_eq!(
539 builder.config.scheduled_delay,
540 OTEL_BSP_SCHEDULE_DELAY_DEFAULT
541 );
542 assert_eq!(
543 builder.config.max_queue_size,
544 OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT
545 );
546 assert_eq!(
547 builder.config.max_export_timeout,
548 Duration::from_millis(2046)
549 );
550 });
551
552 env_vars.push((OTEL_BSP_MAX_QUEUE_SIZE, Some("120")));
553
554 temp_env::with_vars(env_vars, || {
555 let builder = BatchSpanProcessor::builder(
556 InMemorySpanExporterBuilder::new().build(),
557 runtime::Tokio,
558 );
559 assert_eq!(builder.config.max_export_batch_size, 120);
560 assert_eq!(builder.config.max_queue_size, 120);
561 });
562 }
563
564 #[tokio::test]
565 async fn test_batch_span_processor() {
566 let (exporter, mut export_receiver, _shutdown_receiver) = new_tokio_test_exporter();
567 let config = BatchConfigBuilder::default()
568 .with_scheduled_delay(Duration::from_secs(60 * 60 * 24)) .build();
570 let processor = BatchSpanProcessor::new(exporter, config, runtime::TokioCurrentThread);
571 let handle = tokio::spawn(async move {
572 loop {
573 if let Some(span) = export_receiver.recv().await {
574 assert_eq!(span.span_context, new_test_export_span_data().span_context);
575 break;
576 }
577 }
578 });
579 tokio::time::sleep(Duration::from_secs(1)).await; processor.on_end(new_test_export_span_data());
581 let flush_res = processor.force_flush();
582 assert!(flush_res.is_ok());
583 let _shutdown_result = processor.shutdown();
584
585 assert!(
586 tokio::time::timeout(Duration::from_secs(5), handle)
587 .await
588 .is_ok(),
589 "timed out in 5 seconds. force_flush may not export any data when called"
590 );
591 }
592
593 async fn timeout_test_tokio(time_out: bool) {
596 let config = BatchConfig {
597 max_export_timeout: Duration::from_millis(if time_out { 5 } else { 60 }),
598 scheduled_delay: Duration::from_secs(60 * 60 * 24), ..Default::default()
600 };
601 let exporter = BlockingExporter {
602 delay_for: Duration::from_millis(if !time_out { 5 } else { 60 }),
603 delay_fn: tokio::time::sleep,
604 };
605 let processor = BatchSpanProcessor::new(exporter, config, runtime::TokioCurrentThread);
606 tokio::time::sleep(Duration::from_secs(1)).await; processor.on_end(new_test_export_span_data());
608 let flush_res = processor.force_flush();
609 if time_out {
610 assert!(flush_res.is_err());
611 } else {
612 assert!(flush_res.is_ok());
613 }
614 let shutdown_res = processor.shutdown();
615 assert!(shutdown_res.is_ok());
616 }
617
618 #[tokio::test(flavor = "multi_thread")]
619 async fn test_timeout_tokio_timeout() {
620 timeout_test_tokio(true).await;
624 }
625
626 #[tokio::test(flavor = "multi_thread")]
627 async fn test_timeout_tokio_not_timeout() {
628 timeout_test_tokio(false).await;
629 }
630
631 #[tokio::test(flavor = "multi_thread")]
632 async fn test_concurrent_exports_expected() {
633 let active = Arc::new(AtomicUsize::new(0));
635 let concurrent_seen = Arc::new(AtomicBool::new(false));
636
637 let exporter = TrackingExporter {
638 delay: Duration::from_millis(50),
639 active: active.clone(),
640 concurrent_seen: concurrent_seen.clone(),
641 };
642
643 let config = BatchConfig {
645 max_export_batch_size: 1,
646 max_queue_size: 16,
647 scheduled_delay: Duration::from_secs(3600), max_export_timeout: Duration::from_secs(5),
649 max_concurrent_exports: 2, };
651
652 let processor = BatchSpanProcessor::new(exporter, config, runtime::Tokio);
654
655 processor.on_end(new_test_export_span_data());
657 processor.on_end(new_test_export_span_data());
658 processor.on_end(new_test_export_span_data());
659
660 processor.force_flush().expect("force flush failed");
662 processor.shutdown().expect("shutdown failed");
663
664 assert!(
666 concurrent_seen.load(Ordering::SeqCst),
667 "exports never overlapped, processor is still serialising them"
668 );
669 }
670
671 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
672 async fn test_exports_serial_when_max_concurrent_exports_1() {
673 let active = Arc::new(AtomicUsize::new(0));
674 let concurrent_seen = Arc::new(AtomicBool::new(false));
675
676 let exporter = TrackingExporter {
677 delay: Duration::from_millis(50),
678 active: active.clone(),
679 concurrent_seen: concurrent_seen.clone(),
680 };
681
682 let config = BatchConfig {
683 max_export_batch_size: 1,
684 max_queue_size: 16,
685 scheduled_delay: Duration::from_secs(3600),
686 max_export_timeout: Duration::from_secs(5),
687 max_concurrent_exports: 1, };
689
690 let processor = BatchSpanProcessor::new(exporter, config, runtime::Tokio);
691
692 processor.on_end(new_test_export_span_data());
694 processor.on_end(new_test_export_span_data());
695 processor.on_end(new_test_export_span_data());
696
697 processor.force_flush().expect("force flush failed");
698 processor.shutdown().expect("shutdown failed");
699
700 assert!(
702 !concurrent_seen.load(Ordering::SeqCst),
703 "exports overlapped even though max_concurrent_exports was 1"
704 );
705 }
706}