use crate::{
export::logs::{ExportResult, LogData, LogExporter},
runtime::{RuntimeChannel, TrySend},
};
use futures_channel::oneshot;
use futures_util::{
future::{self, Either},
{pin_mut, stream, StreamExt as _},
};
#[cfg(feature = "logs_level_enabled")]
use opentelemetry_api::logs::Severity;
use opentelemetry_api::{
global,
logs::{LogError, LogResult},
};
use std::thread;
use std::{
fmt::{self, Debug, Formatter},
time::Duration,
};
pub trait LogProcessor: Send + Sync + Debug {
fn emit(&self, data: LogData);
fn force_flush(&self) -> LogResult<()>;
fn shutdown(&mut self) -> LogResult<()>;
#[cfg(feature = "logs_level_enabled")]
fn event_enabled(&self, level: Severity, target: &str, name: &str) -> bool;
}
#[derive(Debug)]
pub struct SimpleLogProcessor {
sender: crossbeam_channel::Sender<Option<LogData>>,
shutdown: crossbeam_channel::Receiver<()>,
}
impl SimpleLogProcessor {
pub(crate) fn new(mut exporter: Box<dyn LogExporter>) -> Self {
let (log_tx, log_rx) = crossbeam_channel::unbounded();
let (shutdown_tx, shutdown_rx) = crossbeam_channel::bounded(0);
let _ = thread::Builder::new()
.name("opentelemetry-log-exporter".to_string())
.spawn(move || {
while let Ok(Some(log)) = log_rx.recv() {
if let Err(err) = futures_executor::block_on(exporter.export(vec![log])) {
global::handle_error(err);
}
}
exporter.shutdown();
if let Err(err) = shutdown_tx.send(()) {
global::handle_error(LogError::from(format!(
"could not send shutdown: {:?}",
err
)));
}
});
SimpleLogProcessor {
sender: log_tx,
shutdown: shutdown_rx,
}
}
}
impl LogProcessor for SimpleLogProcessor {
fn emit(&self, data: LogData) {
if let Err(err) = self.sender.send(Some(data)) {
global::handle_error(LogError::from(format!("error processing log {:?}", err)));
}
}
fn force_flush(&self) -> LogResult<()> {
Ok(())
}
fn shutdown(&mut self) -> LogResult<()> {
if self.sender.send(None).is_ok() {
if let Err(err) = self.shutdown.recv() {
global::handle_error(LogError::from(format!(
"error shutting down log processor: {:?}",
err
)))
}
}
Ok(())
}
#[cfg(feature = "logs_level_enabled")]
fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
true
}
}
pub struct BatchLogProcessor<R: RuntimeChannel<BatchMessage>> {
message_sender: R::Sender,
}
impl<R: RuntimeChannel<BatchMessage>> Debug for BatchLogProcessor<R> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("BatchLogProcessor")
.field("message_sender", &self.message_sender)
.finish()
}
}
impl<R: RuntimeChannel<BatchMessage>> LogProcessor for BatchLogProcessor<R> {
fn emit(&self, data: LogData) {
let result = self.message_sender.try_send(BatchMessage::ExportLog(data));
if let Err(err) = result {
global::handle_error(LogError::Other(err.into()));
}
}
#[cfg(feature = "logs_level_enabled")]
fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
true
}
fn force_flush(&self) -> LogResult<()> {
let (res_sender, res_receiver) = oneshot::channel();
self.message_sender
.try_send(BatchMessage::Flush(Some(res_sender)))
.map_err(|err| LogError::Other(err.into()))?;
futures_executor::block_on(res_receiver)
.map_err(|err| LogError::Other(err.into()))
.and_then(std::convert::identity)
}
fn shutdown(&mut self) -> LogResult<()> {
let (res_sender, res_receiver) = oneshot::channel();
self.message_sender
.try_send(BatchMessage::Shutdown(res_sender))
.map_err(|err| LogError::Other(err.into()))?;
futures_executor::block_on(res_receiver)
.map_err(|err| LogError::Other(err.into()))
.and_then(std::convert::identity)
}
}
impl<R: RuntimeChannel<BatchMessage>> BatchLogProcessor<R> {
pub(crate) fn new(mut exporter: Box<dyn LogExporter>, config: BatchConfig, runtime: R) -> Self {
let (message_sender, message_receiver) =
runtime.batch_message_channel(config.max_queue_size);
let ticker = runtime
.interval(config.scheduled_delay)
.map(|_| BatchMessage::Flush(None));
let timeout_runtime = runtime.clone();
runtime.spawn(Box::pin(async move {
let mut logs = Vec::new();
let mut messages = Box::pin(stream::select(message_receiver, ticker));
while let Some(message) = messages.next().await {
match message {
BatchMessage::ExportLog(log) => {
logs.push(log);
if logs.len() == config.max_export_batch_size {
let result = export_with_timeout(
config.max_export_timeout,
exporter.as_mut(),
&timeout_runtime,
logs.split_off(0),
)
.await;
if let Err(err) = result {
global::handle_error(err);
}
}
}
BatchMessage::Flush(res_channel) => {
let result = export_with_timeout(
config.max_export_timeout,
exporter.as_mut(),
&timeout_runtime,
logs.split_off(0),
)
.await;
if let Some(channel) = res_channel {
if let Err(result) = channel.send(result) {
global::handle_error(LogError::from(format!(
"failed to send flush result: {:?}",
result
)));
}
} else if let Err(err) = result {
global::handle_error(err);
}
}
BatchMessage::Shutdown(ch) => {
let result = export_with_timeout(
config.max_export_timeout,
exporter.as_mut(),
&timeout_runtime,
logs.split_off(0),
)
.await;
exporter.shutdown();
if let Err(result) = ch.send(result) {
global::handle_error(LogError::from(format!(
"failed to send batch processor shutdown result: {:?}",
result
)));
}
break;
}
}
}
}));
BatchLogProcessor { message_sender }
}
pub fn builder<E>(exporter: E, runtime: R) -> BatchLogProcessorBuilder<E, R>
where
E: LogExporter,
{
BatchLogProcessorBuilder {
exporter,
config: BatchConfig::default(),
runtime,
}
}
}
async fn export_with_timeout<R, E>(
time_out: Duration,
exporter: &mut E,
runtime: &R,
batch: Vec<LogData>,
) -> ExportResult
where
R: RuntimeChannel<BatchMessage>,
E: LogExporter + ?Sized,
{
if batch.is_empty() {
return Ok(());
}
let export = exporter.export(batch);
let timeout = runtime.delay(time_out);
pin_mut!(export);
pin_mut!(timeout);
match future::select(export, timeout).await {
Either::Left((export_res, _)) => export_res,
Either::Right((_, _)) => ExportResult::Err(LogError::ExportTimedOut(time_out)),
}
}
#[derive(Debug)]
pub struct BatchConfig {
max_queue_size: usize,
scheduled_delay: Duration,
max_export_batch_size: usize,
max_export_timeout: Duration,
}
impl Default for BatchConfig {
fn default() -> Self {
BatchConfig {
max_queue_size: 2_048,
scheduled_delay: Duration::from_millis(1_000),
max_export_batch_size: 512,
max_export_timeout: Duration::from_millis(30_000),
}
}
}
#[derive(Debug)]
pub struct BatchLogProcessorBuilder<E, R> {
exporter: E,
config: BatchConfig,
runtime: R,
}
impl<E, R> BatchLogProcessorBuilder<E, R>
where
E: LogExporter + 'static,
R: RuntimeChannel<BatchMessage>,
{
pub fn with_max_queue_size(self, size: usize) -> Self {
let mut config = self.config;
config.max_queue_size = size;
BatchLogProcessorBuilder { config, ..self }
}
pub fn with_scheduled_delay(self, delay: Duration) -> Self {
let mut config = self.config;
config.scheduled_delay = delay;
BatchLogProcessorBuilder { config, ..self }
}
pub fn with_max_timeout(self, timeout: Duration) -> Self {
let mut config = self.config;
config.max_export_timeout = timeout;
BatchLogProcessorBuilder { config, ..self }
}
pub fn with_max_export_batch_size(self, size: usize) -> Self {
let mut config = self.config;
if size > config.max_queue_size {
config.max_export_batch_size = config.max_queue_size;
} else {
config.max_export_batch_size = size;
}
BatchLogProcessorBuilder { config, ..self }
}
pub fn build(self) -> BatchLogProcessor<R> {
BatchLogProcessor::new(Box::new(self.exporter), self.config, self.runtime)
}
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum BatchMessage {
ExportLog(LogData),
Flush(Option<oneshot::Sender<ExportResult>>),
Shutdown(oneshot::Sender<ExportResult>),
}