1//! Provides an abstraction of several async runtimes
2//!
3//! This allows OpenTelemetry to work with any current or future runtime. There are currently
4//! builtin implementations for [Tokio] and [async-std].
5//!
6//! [Tokio]: https://crates.io/crates/tokio
7//! [async-std]: https://crates.io/crates/async-std
89use futures_util::{future::BoxFuture, stream::Stream};
10use std::{fmt::Debug, future::Future, time::Duration};
11use thiserror::Error;
1213/// A runtime is an abstraction of an async runtime like [Tokio] or [async-std]. It allows
14/// OpenTelemetry to work with any current and hopefully future runtime implementation.
15///
16/// [Tokio]: https://crates.io/crates/tokio
17/// [async-std]: https://crates.io/crates/async-std
18pub trait Runtime: Clone + Send + Sync + 'static {
19/// A future stream, which returns items in a previously specified interval. The item type is
20 /// not important.
21type Interval: Stream + Send;
2223/// A future, which resolves after a previously specified amount of time. The output type is
24 /// not important.
25type Delay: Future + Send + Unpin;
2627/// Create a [futures_util::stream::Stream], which returns a new item every
28 /// [std::time::Duration].
29fn interval(&self, duration: Duration) -> Self::Interval;
3031/// Spawn a new task or thread, which executes the given future.
32 ///
33 /// # Note
34 ///
35 /// This is mainly used to run batch span processing in the background. Note, that the function
36 /// does not return a handle. OpenTelemetry will use a different way to wait for the future to
37 /// finish when TracerProvider gets shutdown. At the moment this happens by blocking the
38 /// current thread. This means runtime implementations need to make sure they can still execute
39 /// the given future even if the main thread is blocked.
40fn spawn(&self, future: BoxFuture<'static, ()>);
4142/// Return a new future, which resolves after the specified [std::time::Duration].
43fn delay(&self, duration: Duration) -> Self::Delay;
44}
4546/// Runtime implementation, which works with Tokio's multi thread runtime.
47#[cfg(feature = "rt-tokio")]
48#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
49#[derive(Debug, Clone)]
50pub struct Tokio;
5152#[cfg(feature = "rt-tokio")]
53#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
54impl Runtime for Tokio {
55type Interval = tokio_stream::wrappers::IntervalStream;
56type Delay = ::std::pin::Pin<Box<tokio::time::Sleep>>;
5758fn interval(&self, duration: Duration) -> Self::Interval {
59crate::util::tokio_interval_stream(duration)
60 }
6162fn spawn(&self, future: BoxFuture<'static, ()>) {
63#[allow(clippy::let_underscore_future)]
64// we don't have to await on the returned future to execute
65let _ = tokio::spawn(future);
66 }
6768fn delay(&self, duration: Duration) -> Self::Delay {
69 Box::pin(tokio::time::sleep(duration))
70 }
71}
7273/// Runtime implementation, which works with Tokio's current thread runtime.
74#[cfg(feature = "rt-tokio-current-thread")]
75#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))]
76#[derive(Debug, Clone)]
77pub struct TokioCurrentThread;
7879#[cfg(feature = "rt-tokio-current-thread")]
80#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))]
81impl Runtime for TokioCurrentThread {
82type Interval = tokio_stream::wrappers::IntervalStream;
83type Delay = ::std::pin::Pin<Box<tokio::time::Sleep>>;
8485fn interval(&self, duration: Duration) -> Self::Interval {
86crate::util::tokio_interval_stream(duration)
87 }
8889fn spawn(&self, future: BoxFuture<'static, ()>) {
90// We cannot force push tracing in current thread tokio scheduler because we rely on
91 // BatchSpanProcessor to export spans in a background task, meanwhile we need to block the
92 // shutdown function so that the runtime will not finish the blocked task and kill any
93 // remaining tasks. But there is only one thread to run task, so it's a deadlock
94 //
95 // Thus, we spawn the background task in a separate thread.
96std::thread::spawn(move || {
97let rt = tokio::runtime::Builder::new_current_thread()
98 .enable_all()
99 .build()
100 .expect("failed to create Tokio current thead runtime for OpenTelemetry batch processing");
101 rt.block_on(future);
102 });
103 }
104105fn delay(&self, duration: Duration) -> Self::Delay {
106 Box::pin(tokio::time::sleep(duration))
107 }
108}
109110/// Runtime implementation, which works with async-std.
111#[cfg(feature = "rt-async-std")]
112#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))]
113#[derive(Debug, Clone)]
114pub struct AsyncStd;
115116#[cfg(feature = "rt-async-std")]
117#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))]
118impl Runtime for AsyncStd {
119type Interval = async_std::stream::Interval;
120type Delay = BoxFuture<'static, ()>;
121122fn interval(&self, duration: Duration) -> Self::Interval {
123 async_std::stream::interval(duration)
124 }
125126fn spawn(&self, future: BoxFuture<'static, ()>) {
127#[allow(clippy::let_underscore_future)]
128let _ = async_std::task::spawn(future);
129 }
130131fn delay(&self, duration: Duration) -> Self::Delay {
132 Box::pin(async_std::task::sleep(duration))
133 }
134}
135136/// `RuntimeChannel` is an extension to [`Runtime`]. Currently, it provides a
137/// channel that is used by the [log] and [span] batch processors.
138///
139/// [log]: crate::logs::BatchLogProcessor
140/// [span]: crate::trace::BatchSpanProcessor
141pub trait RuntimeChannel: Runtime {
142/// A future stream to receive batch messages from channels.
143type Receiver<T: Debug + Send>: Stream<Item = T> + Send;
144/// A batch messages sender that can be sent across threads safely.
145type Sender<T: Debug + Send>: TrySend<Message = T> + Debug;
146147/// Return the sender and receiver used to send batch messages.
148fn batch_message_channel<T: Debug + Send>(
149&self,
150 capacity: usize,
151 ) -> (Self::Sender<T>, Self::Receiver<T>);
152}
153154/// Error returned by a [`TrySend`] implementation.
155#[derive(Debug, Error)]
156pub enum TrySendError {
157/// Send failed due to the channel being full.
158#[error("cannot send message to batch processor as the channel is full")]
159ChannelFull,
160/// Send failed due to the channel being closed.
161#[error("cannot send message to batch processor as the channel is closed")]
162ChannelClosed,
163/// Any other send error that isnt covered above.
164#[error(transparent)]
165Other(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
166}
167168/// TrySend is an abstraction of `Sender` that is capable of sending messages through a reference.
169pub trait TrySend: Sync + Send {
170/// The message that will be sent.
171type Message;
172173/// Try to send a message batch to a worker thread.
174 ///
175 /// A failure can be due to either a closed receiver, or a depleted buffer.
176fn try_send(&self, item: Self::Message) -> Result<(), TrySendError>;
177}
178179#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
180impl<T: Send> TrySend for tokio::sync::mpsc::Sender<T> {
181type Message = T;
182183fn try_send(&self, item: Self::Message) -> Result<(), TrySendError> {
184self.try_send(item).map_err(|err| match err {
185 tokio::sync::mpsc::error::TrySendError::Full(_) => TrySendError::ChannelFull,
186 tokio::sync::mpsc::error::TrySendError::Closed(_) => TrySendError::ChannelClosed,
187 })
188 }
189}
190191#[cfg(feature = "rt-tokio")]
192#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
193impl RuntimeChannel for Tokio {
194type Receiver<T: Debug + Send> = tokio_stream::wrappers::ReceiverStream<T>;
195type Sender<T: Debug + Send> = tokio::sync::mpsc::Sender<T>;
196197fn batch_message_channel<T: Debug + Send>(
198&self,
199 capacity: usize,
200 ) -> (Self::Sender<T>, Self::Receiver<T>) {
201let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
202 (
203 sender,
204 tokio_stream::wrappers::ReceiverStream::new(receiver),
205 )
206 }
207}
208209#[cfg(feature = "rt-tokio-current-thread")]
210#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))]
211impl RuntimeChannel for TokioCurrentThread {
212type Receiver<T: Debug + Send> = tokio_stream::wrappers::ReceiverStream<T>;
213type Sender<T: Debug + Send> = tokio::sync::mpsc::Sender<T>;
214215fn batch_message_channel<T: Debug + Send>(
216&self,
217 capacity: usize,
218 ) -> (Self::Sender<T>, Self::Receiver<T>) {
219let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
220 (
221 sender,
222 tokio_stream::wrappers::ReceiverStream::new(receiver),
223 )
224 }
225}
226227#[cfg(feature = "rt-async-std")]
228impl<T: Send> TrySend for async_std::channel::Sender<T> {
229type Message = T;
230231fn try_send(&self, item: Self::Message) -> Result<(), TrySendError> {
232self.try_send(item).map_err(|err| match err {
233 async_std::channel::TrySendError::Full(_) => TrySendError::ChannelFull,
234 async_std::channel::TrySendError::Closed(_) => TrySendError::ChannelClosed,
235 })
236 }
237}
238239#[cfg(feature = "rt-async-std")]
240#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))]
241impl RuntimeChannel for AsyncStd {
242type Receiver<T: Debug + Send> = async_std::channel::Receiver<T>;
243type Sender<T: Debug + Send> = async_std::channel::Sender<T>;
244245fn batch_message_channel<T: Debug + Send>(
246&self,
247 capacity: usize,
248 ) -> (Self::Sender<T>, Self::Receiver<T>) {
249 async_std::channel::bounded(capacity)
250 }
251}