use futures_util::{future::BoxFuture, stream::Stream};
use std::{fmt::Debug, future::Future, time::Duration};
use thiserror::Error;
pub trait Runtime: Clone + Send + Sync + 'static {
type Interval: Stream + Send;
type Delay: Future + Send + Unpin;
fn interval(&self, duration: Duration) -> Self::Interval;
fn spawn(&self, future: BoxFuture<'static, ()>);
fn delay(&self, duration: Duration) -> Self::Delay;
}
#[cfg(feature = "rt-tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
#[derive(Debug, Clone)]
pub struct Tokio;
#[cfg(feature = "rt-tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
impl Runtime for Tokio {
type Interval = tokio_stream::wrappers::IntervalStream;
type Delay = ::std::pin::Pin<Box<tokio::time::Sleep>>;
fn interval(&self, duration: Duration) -> Self::Interval {
crate::util::tokio_interval_stream(duration)
}
fn spawn(&self, future: BoxFuture<'static, ()>) {
#[allow(clippy::let_underscore_future)]
let _ = tokio::spawn(future);
}
fn delay(&self, duration: Duration) -> Self::Delay {
Box::pin(tokio::time::sleep(duration))
}
}
#[cfg(feature = "rt-tokio-current-thread")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))]
#[derive(Debug, Clone)]
pub struct TokioCurrentThread;
#[cfg(feature = "rt-tokio-current-thread")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))]
impl Runtime for TokioCurrentThread {
type Interval = tokio_stream::wrappers::IntervalStream;
type Delay = ::std::pin::Pin<Box<tokio::time::Sleep>>;
fn interval(&self, duration: Duration) -> Self::Interval {
crate::util::tokio_interval_stream(duration)
}
fn spawn(&self, future: BoxFuture<'static, ()>) {
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to create Tokio current thead runtime for OpenTelemetry batch processing");
rt.block_on(future);
});
}
fn delay(&self, duration: Duration) -> Self::Delay {
Box::pin(tokio::time::sleep(duration))
}
}
#[cfg(feature = "rt-async-std")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))]
#[derive(Debug, Clone)]
pub struct AsyncStd;
#[cfg(feature = "rt-async-std")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))]
impl Runtime for AsyncStd {
type Interval = async_std::stream::Interval;
type Delay = BoxFuture<'static, ()>;
fn interval(&self, duration: Duration) -> Self::Interval {
async_std::stream::interval(duration)
}
fn spawn(&self, future: BoxFuture<'static, ()>) {
#[allow(clippy::let_underscore_future)]
let _ = async_std::task::spawn(future);
}
fn delay(&self, duration: Duration) -> Self::Delay {
Box::pin(async_std::task::sleep(duration))
}
}
pub trait RuntimeChannel: Runtime {
type Receiver<T: Debug + Send>: Stream<Item = T> + Send;
type Sender<T: Debug + Send>: TrySend<Message = T> + Debug;
fn batch_message_channel<T: Debug + Send>(
&self,
capacity: usize,
) -> (Self::Sender<T>, Self::Receiver<T>);
}
#[derive(Debug, Error)]
pub enum TrySendError {
#[error("cannot send message to batch processor as the channel is full")]
ChannelFull,
#[error("cannot send message to batch processor as the channel is closed")]
ChannelClosed,
#[error(transparent)]
Other(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
}
pub trait TrySend: Sync + Send {
type Message;
fn try_send(&self, item: Self::Message) -> Result<(), TrySendError>;
}
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
impl<T: Send> TrySend for tokio::sync::mpsc::Sender<T> {
type Message = T;
fn try_send(&self, item: Self::Message) -> Result<(), TrySendError> {
self.try_send(item).map_err(|err| match err {
tokio::sync::mpsc::error::TrySendError::Full(_) => TrySendError::ChannelFull,
tokio::sync::mpsc::error::TrySendError::Closed(_) => TrySendError::ChannelClosed,
})
}
}
#[cfg(feature = "rt-tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
impl RuntimeChannel for Tokio {
type Receiver<T: Debug + Send> = tokio_stream::wrappers::ReceiverStream<T>;
type Sender<T: Debug + Send> = tokio::sync::mpsc::Sender<T>;
fn batch_message_channel<T: Debug + Send>(
&self,
capacity: usize,
) -> (Self::Sender<T>, Self::Receiver<T>) {
let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
(
sender,
tokio_stream::wrappers::ReceiverStream::new(receiver),
)
}
}
#[cfg(feature = "rt-tokio-current-thread")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))]
impl RuntimeChannel for TokioCurrentThread {
type Receiver<T: Debug + Send> = tokio_stream::wrappers::ReceiverStream<T>;
type Sender<T: Debug + Send> = tokio::sync::mpsc::Sender<T>;
fn batch_message_channel<T: Debug + Send>(
&self,
capacity: usize,
) -> (Self::Sender<T>, Self::Receiver<T>) {
let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
(
sender,
tokio_stream::wrappers::ReceiverStream::new(receiver),
)
}
}
#[cfg(feature = "rt-async-std")]
impl<T: Send> TrySend for async_std::channel::Sender<T> {
type Message = T;
fn try_send(&self, item: Self::Message) -> Result<(), TrySendError> {
self.try_send(item).map_err(|err| match err {
async_std::channel::TrySendError::Full(_) => TrySendError::ChannelFull,
async_std::channel::TrySendError::Closed(_) => TrySendError::ChannelClosed,
})
}
}
#[cfg(feature = "rt-async-std")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))]
impl RuntimeChannel for AsyncStd {
type Receiver<T: Debug + Send> = async_std::channel::Receiver<T>;
type Sender<T: Debug + Send> = async_std::channel::Sender<T>;
fn batch_message_channel<T: Debug + Send>(
&self,
capacity: usize,
) -> (Self::Sender<T>, Self::Receiver<T>) {
async_std::channel::bounded(capacity)
}
}