opentelemetry_sdk/
runtime.rs

1//! Provides an abstraction of several async runtimes
2//!
3//! This  allows OpenTelemetry to work with any current or future runtime. There are currently
4//! builtin implementations for [Tokio] and [async-std].
5//!
6//! [Tokio]: https://crates.io/crates/tokio
7//! [async-std]: https://crates.io/crates/async-std
8
9use futures_util::{future::BoxFuture, stream::Stream};
10use std::{fmt::Debug, future::Future, time::Duration};
11use thiserror::Error;
12
13/// A runtime is an abstraction of an async runtime like [Tokio] or [async-std]. It allows
14/// OpenTelemetry to work with any current and hopefully future runtime implementation.
15///
16/// [Tokio]: https://crates.io/crates/tokio
17/// [async-std]: https://crates.io/crates/async-std
18pub trait Runtime: Clone + Send + Sync + 'static {
19    /// A future stream, which returns items in a previously specified interval. The item type is
20    /// not important.
21    type Interval: Stream + Send;
22
23    /// A future, which resolves after a previously specified amount of time. The output type is
24    /// not important.
25    type Delay: Future + Send + Unpin;
26
27    /// Create a [futures_util::stream::Stream], which returns a new item every
28    /// [std::time::Duration].
29    fn interval(&self, duration: Duration) -> Self::Interval;
30
31    /// Spawn a new task or thread, which executes the given future.
32    ///
33    /// # Note
34    ///
35    /// This is mainly used to run batch span processing in the background. Note, that the function
36    /// does not return a handle. OpenTelemetry will use a different way to wait for the future to
37    /// finish when TracerProvider gets shutdown. At the moment this happens by blocking the
38    /// current thread. This means runtime implementations need to make sure they can still execute
39    /// the given future even if the main thread is blocked.
40    fn spawn(&self, future: BoxFuture<'static, ()>);
41
42    /// Return a new future, which resolves after the specified [std::time::Duration].
43    fn delay(&self, duration: Duration) -> Self::Delay;
44}
45
46/// Runtime implementation, which works with Tokio's multi thread runtime.
47#[cfg(feature = "rt-tokio")]
48#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
49#[derive(Debug, Clone)]
50pub struct Tokio;
51
52#[cfg(feature = "rt-tokio")]
53#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
54impl Runtime for Tokio {
55    type Interval = tokio_stream::wrappers::IntervalStream;
56    type Delay = ::std::pin::Pin<Box<tokio::time::Sleep>>;
57
58    fn interval(&self, duration: Duration) -> Self::Interval {
59        crate::util::tokio_interval_stream(duration)
60    }
61
62    fn spawn(&self, future: BoxFuture<'static, ()>) {
63        #[allow(clippy::let_underscore_future)]
64        // we don't have to await on the returned future to execute
65        let _ = tokio::spawn(future);
66    }
67
68    fn delay(&self, duration: Duration) -> Self::Delay {
69        Box::pin(tokio::time::sleep(duration))
70    }
71}
72
73/// Runtime implementation, which works with Tokio's current thread runtime.
74#[cfg(feature = "rt-tokio-current-thread")]
75#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))]
76#[derive(Debug, Clone)]
77pub struct TokioCurrentThread;
78
79#[cfg(feature = "rt-tokio-current-thread")]
80#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))]
81impl Runtime for TokioCurrentThread {
82    type Interval = tokio_stream::wrappers::IntervalStream;
83    type Delay = ::std::pin::Pin<Box<tokio::time::Sleep>>;
84
85    fn interval(&self, duration: Duration) -> Self::Interval {
86        crate::util::tokio_interval_stream(duration)
87    }
88
89    fn spawn(&self, future: BoxFuture<'static, ()>) {
90        // We cannot force push tracing in current thread tokio scheduler because we rely on
91        // BatchSpanProcessor to export spans in a background task, meanwhile we need to block the
92        // shutdown function so that the runtime will not finish the blocked task and kill any
93        // remaining tasks. But there is only one thread to run task, so it's a deadlock
94        //
95        // Thus, we spawn the background task in a separate thread.
96        std::thread::spawn(move || {
97            let rt = tokio::runtime::Builder::new_current_thread()
98                .enable_all()
99                .build()
100                .expect("failed to create Tokio current thead runtime for OpenTelemetry batch processing");
101            rt.block_on(future);
102        });
103    }
104
105    fn delay(&self, duration: Duration) -> Self::Delay {
106        Box::pin(tokio::time::sleep(duration))
107    }
108}
109
110/// Runtime implementation, which works with async-std.
111#[cfg(feature = "rt-async-std")]
112#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))]
113#[derive(Debug, Clone)]
114pub struct AsyncStd;
115
116#[cfg(feature = "rt-async-std")]
117#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))]
118impl Runtime for AsyncStd {
119    type Interval = async_std::stream::Interval;
120    type Delay = BoxFuture<'static, ()>;
121
122    fn interval(&self, duration: Duration) -> Self::Interval {
123        async_std::stream::interval(duration)
124    }
125
126    fn spawn(&self, future: BoxFuture<'static, ()>) {
127        #[allow(clippy::let_underscore_future)]
128        let _ = async_std::task::spawn(future);
129    }
130
131    fn delay(&self, duration: Duration) -> Self::Delay {
132        Box::pin(async_std::task::sleep(duration))
133    }
134}
135
136/// `RuntimeChannel` is an extension to [`Runtime`]. Currently, it provides a
137/// channel that is used by the [log] and [span] batch processors.
138///
139/// [log]: crate::logs::BatchLogProcessor
140/// [span]: crate::trace::BatchSpanProcessor
141pub trait RuntimeChannel: Runtime {
142    /// A future stream to receive batch messages from channels.
143    type Receiver<T: Debug + Send>: Stream<Item = T> + Send;
144    /// A batch messages sender that can be sent across threads safely.
145    type Sender<T: Debug + Send>: TrySend<Message = T> + Debug;
146
147    /// Return the sender and receiver used to send batch messages.
148    fn batch_message_channel<T: Debug + Send>(
149        &self,
150        capacity: usize,
151    ) -> (Self::Sender<T>, Self::Receiver<T>);
152}
153
154/// Error returned by a [`TrySend`] implementation.
155#[derive(Debug, Error)]
156pub enum TrySendError {
157    /// Send failed due to the channel being full.
158    #[error("cannot send message to batch processor as the channel is full")]
159    ChannelFull,
160    /// Send failed due to the channel being closed.
161    #[error("cannot send message to batch processor as the channel is closed")]
162    ChannelClosed,
163    /// Any other send error that isnt covered above.
164    #[error(transparent)]
165    Other(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
166}
167
168/// TrySend is an abstraction of `Sender` that is capable of sending messages through a reference.
169pub trait TrySend: Sync + Send {
170    /// The message that will be sent.
171    type Message;
172
173    /// Try to send a message batch to a worker thread.
174    ///
175    /// A failure can be due to either a closed receiver, or a depleted buffer.
176    fn try_send(&self, item: Self::Message) -> Result<(), TrySendError>;
177}
178
179#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
180impl<T: Send> TrySend for tokio::sync::mpsc::Sender<T> {
181    type Message = T;
182
183    fn try_send(&self, item: Self::Message) -> Result<(), TrySendError> {
184        self.try_send(item).map_err(|err| match err {
185            tokio::sync::mpsc::error::TrySendError::Full(_) => TrySendError::ChannelFull,
186            tokio::sync::mpsc::error::TrySendError::Closed(_) => TrySendError::ChannelClosed,
187        })
188    }
189}
190
191#[cfg(feature = "rt-tokio")]
192#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
193impl RuntimeChannel for Tokio {
194    type Receiver<T: Debug + Send> = tokio_stream::wrappers::ReceiverStream<T>;
195    type Sender<T: Debug + Send> = tokio::sync::mpsc::Sender<T>;
196
197    fn batch_message_channel<T: Debug + Send>(
198        &self,
199        capacity: usize,
200    ) -> (Self::Sender<T>, Self::Receiver<T>) {
201        let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
202        (
203            sender,
204            tokio_stream::wrappers::ReceiverStream::new(receiver),
205        )
206    }
207}
208
209#[cfg(feature = "rt-tokio-current-thread")]
210#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))]
211impl RuntimeChannel for TokioCurrentThread {
212    type Receiver<T: Debug + Send> = tokio_stream::wrappers::ReceiverStream<T>;
213    type Sender<T: Debug + Send> = tokio::sync::mpsc::Sender<T>;
214
215    fn batch_message_channel<T: Debug + Send>(
216        &self,
217        capacity: usize,
218    ) -> (Self::Sender<T>, Self::Receiver<T>) {
219        let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
220        (
221            sender,
222            tokio_stream::wrappers::ReceiverStream::new(receiver),
223        )
224    }
225}
226
227#[cfg(feature = "rt-async-std")]
228impl<T: Send> TrySend for async_std::channel::Sender<T> {
229    type Message = T;
230
231    fn try_send(&self, item: Self::Message) -> Result<(), TrySendError> {
232        self.try_send(item).map_err(|err| match err {
233            async_std::channel::TrySendError::Full(_) => TrySendError::ChannelFull,
234            async_std::channel::TrySendError::Closed(_) => TrySendError::ChannelClosed,
235        })
236    }
237}
238
239#[cfg(feature = "rt-async-std")]
240#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))]
241impl RuntimeChannel for AsyncStd {
242    type Receiver<T: Debug + Send> = async_std::channel::Receiver<T>;
243    type Sender<T: Debug + Send> = async_std::channel::Sender<T>;
244
245    fn batch_message_channel<T: Debug + Send>(
246        &self,
247        capacity: usize,
248    ) -> (Self::Sender<T>, Self::Receiver<T>) {
249        async_std::channel::bounded(capacity)
250    }
251}