opentelemetry_sdk/
runtime.rs1use futures_util::{future::BoxFuture, stream::Stream};
10use std::{fmt::Debug, future::Future, time::Duration};
11use thiserror::Error;
12
13pub trait Runtime: Clone + Send + Sync + 'static {
19 type Interval: Stream + Send;
22
23 type Delay: Future + Send + Unpin;
26
27 fn interval(&self, duration: Duration) -> Self::Interval;
30
31 fn spawn(&self, future: BoxFuture<'static, ()>);
41
42 fn delay(&self, duration: Duration) -> Self::Delay;
44}
45
46#[cfg(feature = "rt-tokio")]
48#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
49#[derive(Debug, Clone)]
50pub struct Tokio;
51
52#[cfg(feature = "rt-tokio")]
53#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
54impl Runtime for Tokio {
55 type Interval = tokio_stream::wrappers::IntervalStream;
56 type Delay = ::std::pin::Pin<Box<tokio::time::Sleep>>;
57
58 fn interval(&self, duration: Duration) -> Self::Interval {
59 crate::util::tokio_interval_stream(duration)
60 }
61
62 fn spawn(&self, future: BoxFuture<'static, ()>) {
63 #[allow(clippy::let_underscore_future)]
64 let _ = tokio::spawn(future);
66 }
67
68 fn delay(&self, duration: Duration) -> Self::Delay {
69 Box::pin(tokio::time::sleep(duration))
70 }
71}
72
73#[cfg(feature = "rt-tokio-current-thread")]
75#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))]
76#[derive(Debug, Clone)]
77pub struct TokioCurrentThread;
78
79#[cfg(feature = "rt-tokio-current-thread")]
80#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))]
81impl Runtime for TokioCurrentThread {
82 type Interval = tokio_stream::wrappers::IntervalStream;
83 type Delay = ::std::pin::Pin<Box<tokio::time::Sleep>>;
84
85 fn interval(&self, duration: Duration) -> Self::Interval {
86 crate::util::tokio_interval_stream(duration)
87 }
88
89 fn spawn(&self, future: BoxFuture<'static, ()>) {
90 std::thread::spawn(move || {
97 let rt = tokio::runtime::Builder::new_current_thread()
98 .enable_all()
99 .build()
100 .expect("failed to create Tokio current thead runtime for OpenTelemetry batch processing");
101 rt.block_on(future);
102 });
103 }
104
105 fn delay(&self, duration: Duration) -> Self::Delay {
106 Box::pin(tokio::time::sleep(duration))
107 }
108}
109
110#[cfg(feature = "rt-async-std")]
112#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))]
113#[derive(Debug, Clone)]
114pub struct AsyncStd;
115
116#[cfg(feature = "rt-async-std")]
117#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))]
118impl Runtime for AsyncStd {
119 type Interval = async_std::stream::Interval;
120 type Delay = BoxFuture<'static, ()>;
121
122 fn interval(&self, duration: Duration) -> Self::Interval {
123 async_std::stream::interval(duration)
124 }
125
126 fn spawn(&self, future: BoxFuture<'static, ()>) {
127 #[allow(clippy::let_underscore_future)]
128 let _ = async_std::task::spawn(future);
129 }
130
131 fn delay(&self, duration: Duration) -> Self::Delay {
132 Box::pin(async_std::task::sleep(duration))
133 }
134}
135
136pub trait RuntimeChannel: Runtime {
142 type Receiver<T: Debug + Send>: Stream<Item = T> + Send;
144 type Sender<T: Debug + Send>: TrySend<Message = T> + Debug;
146
147 fn batch_message_channel<T: Debug + Send>(
149 &self,
150 capacity: usize,
151 ) -> (Self::Sender<T>, Self::Receiver<T>);
152}
153
154#[derive(Debug, Error)]
156pub enum TrySendError {
157 #[error("cannot send message to batch processor as the channel is full")]
159 ChannelFull,
160 #[error("cannot send message to batch processor as the channel is closed")]
162 ChannelClosed,
163 #[error(transparent)]
165 Other(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
166}
167
168pub trait TrySend: Sync + Send {
170 type Message;
172
173 fn try_send(&self, item: Self::Message) -> Result<(), TrySendError>;
177}
178
179#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
180impl<T: Send> TrySend for tokio::sync::mpsc::Sender<T> {
181 type Message = T;
182
183 fn try_send(&self, item: Self::Message) -> Result<(), TrySendError> {
184 self.try_send(item).map_err(|err| match err {
185 tokio::sync::mpsc::error::TrySendError::Full(_) => TrySendError::ChannelFull,
186 tokio::sync::mpsc::error::TrySendError::Closed(_) => TrySendError::ChannelClosed,
187 })
188 }
189}
190
191#[cfg(feature = "rt-tokio")]
192#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
193impl RuntimeChannel for Tokio {
194 type Receiver<T: Debug + Send> = tokio_stream::wrappers::ReceiverStream<T>;
195 type Sender<T: Debug + Send> = tokio::sync::mpsc::Sender<T>;
196
197 fn batch_message_channel<T: Debug + Send>(
198 &self,
199 capacity: usize,
200 ) -> (Self::Sender<T>, Self::Receiver<T>) {
201 let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
202 (
203 sender,
204 tokio_stream::wrappers::ReceiverStream::new(receiver),
205 )
206 }
207}
208
209#[cfg(feature = "rt-tokio-current-thread")]
210#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))]
211impl RuntimeChannel for TokioCurrentThread {
212 type Receiver<T: Debug + Send> = tokio_stream::wrappers::ReceiverStream<T>;
213 type Sender<T: Debug + Send> = tokio::sync::mpsc::Sender<T>;
214
215 fn batch_message_channel<T: Debug + Send>(
216 &self,
217 capacity: usize,
218 ) -> (Self::Sender<T>, Self::Receiver<T>) {
219 let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
220 (
221 sender,
222 tokio_stream::wrappers::ReceiverStream::new(receiver),
223 )
224 }
225}
226
227#[cfg(feature = "rt-async-std")]
228impl<T: Send> TrySend for async_std::channel::Sender<T> {
229 type Message = T;
230
231 fn try_send(&self, item: Self::Message) -> Result<(), TrySendError> {
232 self.try_send(item).map_err(|err| match err {
233 async_std::channel::TrySendError::Full(_) => TrySendError::ChannelFull,
234 async_std::channel::TrySendError::Closed(_) => TrySendError::ChannelClosed,
235 })
236 }
237}
238
239#[cfg(feature = "rt-async-std")]
240#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))]
241impl RuntimeChannel for AsyncStd {
242 type Receiver<T: Debug + Send> = async_std::channel::Receiver<T>;
243 type Sender<T: Debug + Send> = async_std::channel::Sender<T>;
244
245 fn batch_message_channel<T: Debug + Send>(
246 &self,
247 capacity: usize,
248 ) -> (Self::Sender<T>, Self::Receiver<T>) {
249 async_std::channel::bounded(capacity)
250 }
251}