use crate::{
export::logs::{ExportResult, LogData, LogExporter},
runtime::{RuntimeChannel, TrySend},
Resource,
};
use futures_channel::oneshot;
use futures_util::{
future::{self, Either},
{pin_mut, stream, StreamExt as _},
};
#[cfg(feature = "logs_level_enabled")]
use opentelemetry::logs::Severity;
use opentelemetry::{
global,
logs::{LogError, LogResult},
};
use std::borrow::Cow;
use std::sync::atomic::AtomicBool;
use std::{cmp::min, env, sync::Mutex};
use std::{
fmt::{self, Debug, Formatter},
str::FromStr,
sync::Arc,
time::Duration,
};
const OTEL_BLRP_SCHEDULE_DELAY: &str = "OTEL_BLRP_SCHEDULE_DELAY";
const OTEL_BLRP_SCHEDULE_DELAY_DEFAULT: u64 = 1_000;
const OTEL_BLRP_EXPORT_TIMEOUT: &str = "OTEL_BLRP_EXPORT_TIMEOUT";
const OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT: u64 = 30_000;
const OTEL_BLRP_MAX_QUEUE_SIZE: &str = "OTEL_BLRP_MAX_QUEUE_SIZE";
const OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048;
const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE";
const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
pub trait LogProcessor: Send + Sync + Debug {
fn emit(&self, data: &mut LogData);
fn force_flush(&self) -> LogResult<()>;
fn shutdown(&self) -> LogResult<()>;
#[cfg(feature = "logs_level_enabled")]
fn event_enabled(&self, level: Severity, target: &str, name: &str) -> bool;
fn set_resource(&self, _resource: &Resource) {}
}
#[derive(Debug)]
pub struct SimpleLogProcessor {
exporter: Mutex<Box<dyn LogExporter>>,
is_shutdown: AtomicBool,
}
impl SimpleLogProcessor {
pub(crate) fn new(exporter: Box<dyn LogExporter>) -> Self {
SimpleLogProcessor {
exporter: Mutex::new(exporter),
is_shutdown: AtomicBool::new(false),
}
}
}
impl LogProcessor for SimpleLogProcessor {
fn emit(&self, data: &mut LogData) {
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
return;
}
let result = self
.exporter
.lock()
.map_err(|_| LogError::Other("simple logprocessor mutex poison".into()))
.and_then(|mut exporter| {
futures_executor::block_on(exporter.export(vec![Cow::Borrowed(data)]))
});
if let Err(err) = result {
global::handle_error(err);
}
}
fn force_flush(&self) -> LogResult<()> {
Ok(())
}
fn shutdown(&self) -> LogResult<()> {
self.is_shutdown
.store(true, std::sync::atomic::Ordering::Relaxed);
if let Ok(mut exporter) = self.exporter.lock() {
exporter.shutdown();
Ok(())
} else {
Err(LogError::Other(
"simple logprocessor mutex poison during shutdown".into(),
))
}
}
fn set_resource(&self, resource: &Resource) {
if let Ok(mut exporter) = self.exporter.lock() {
exporter.set_resource(resource);
}
}
#[cfg(feature = "logs_level_enabled")]
fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
true
}
}
pub struct BatchLogProcessor<R: RuntimeChannel> {
message_sender: R::Sender<BatchMessage>,
}
impl<R: RuntimeChannel> Debug for BatchLogProcessor<R> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("BatchLogProcessor")
.field("message_sender", &self.message_sender)
.finish()
}
}
impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
fn emit(&self, data: &mut LogData) {
let result = self
.message_sender
.try_send(BatchMessage::ExportLog(data.clone()));
if let Err(err) = result {
global::handle_error(LogError::Other(err.into()));
}
}
#[cfg(feature = "logs_level_enabled")]
fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
true
}
fn force_flush(&self) -> LogResult<()> {
let (res_sender, res_receiver) = oneshot::channel();
self.message_sender
.try_send(BatchMessage::Flush(Some(res_sender)))
.map_err(|err| LogError::Other(err.into()))?;
futures_executor::block_on(res_receiver)
.map_err(|err| LogError::Other(err.into()))
.and_then(std::convert::identity)
}
fn shutdown(&self) -> LogResult<()> {
let (res_sender, res_receiver) = oneshot::channel();
self.message_sender
.try_send(BatchMessage::Shutdown(res_sender))
.map_err(|err| LogError::Other(err.into()))?;
futures_executor::block_on(res_receiver)
.map_err(|err| LogError::Other(err.into()))
.and_then(std::convert::identity)
}
fn set_resource(&self, resource: &Resource) {
let resource = Arc::new(resource.clone());
let _ = self
.message_sender
.try_send(BatchMessage::SetResource(resource));
}
}
impl<R: RuntimeChannel> BatchLogProcessor<R> {
pub(crate) fn new(mut exporter: Box<dyn LogExporter>, config: BatchConfig, runtime: R) -> Self {
let (message_sender, message_receiver) =
runtime.batch_message_channel(config.max_queue_size);
let ticker = runtime
.interval(config.scheduled_delay)
.map(|_| BatchMessage::Flush(None));
let timeout_runtime = runtime.clone();
runtime.spawn(Box::pin(async move {
let mut logs = Vec::new();
let mut messages = Box::pin(stream::select(message_receiver, ticker));
while let Some(message) = messages.next().await {
match message {
BatchMessage::ExportLog(log) => {
logs.push(Cow::Owned(log));
if logs.len() == config.max_export_batch_size {
let result = export_with_timeout(
config.max_export_timeout,
exporter.as_mut(),
&timeout_runtime,
logs.split_off(0),
)
.await;
if let Err(err) = result {
global::handle_error(err);
}
}
}
BatchMessage::Flush(res_channel) => {
let result = export_with_timeout(
config.max_export_timeout,
exporter.as_mut(),
&timeout_runtime,
logs.split_off(0),
)
.await;
if let Some(channel) = res_channel {
if let Err(result) = channel.send(result) {
global::handle_error(LogError::from(format!(
"failed to send flush result: {:?}",
result
)));
}
} else if let Err(err) = result {
global::handle_error(err);
}
}
BatchMessage::Shutdown(ch) => {
let result = export_with_timeout(
config.max_export_timeout,
exporter.as_mut(),
&timeout_runtime,
logs.split_off(0),
)
.await;
exporter.shutdown();
if let Err(result) = ch.send(result) {
global::handle_error(LogError::from(format!(
"failed to send batch processor shutdown result: {:?}",
result
)));
}
break;
}
BatchMessage::SetResource(resource) => {
exporter.set_resource(&resource);
}
}
}
}));
BatchLogProcessor { message_sender }
}
pub fn builder<E>(exporter: E, runtime: R) -> BatchLogProcessorBuilder<E, R>
where
E: LogExporter,
{
BatchLogProcessorBuilder {
exporter,
config: Default::default(),
runtime,
}
}
}
async fn export_with_timeout<'a, R, E>(
time_out: Duration,
exporter: &mut E,
runtime: &R,
batch: Vec<Cow<'a, LogData>>,
) -> ExportResult
where
R: RuntimeChannel,
E: LogExporter + ?Sized,
{
if batch.is_empty() {
return Ok(());
}
let export = exporter.export(batch);
let timeout = runtime.delay(time_out);
pin_mut!(export);
pin_mut!(timeout);
match future::select(export, timeout).await {
Either::Left((export_res, _)) => export_res,
Either::Right((_, _)) => ExportResult::Err(LogError::ExportTimedOut(time_out)),
}
}
#[derive(Debug)]
pub struct BatchConfig {
max_queue_size: usize,
scheduled_delay: Duration,
max_export_batch_size: usize,
max_export_timeout: Duration,
}
impl Default for BatchConfig {
fn default() -> Self {
BatchConfigBuilder::default().build()
}
}
#[derive(Debug)]
pub struct BatchConfigBuilder {
max_queue_size: usize,
scheduled_delay: Duration,
max_export_batch_size: usize,
max_export_timeout: Duration,
}
impl Default for BatchConfigBuilder {
fn default() -> Self {
BatchConfigBuilder {
max_queue_size: OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT,
scheduled_delay: Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT),
max_export_batch_size: OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
max_export_timeout: Duration::from_millis(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT),
}
.init_from_env_vars()
}
}
impl BatchConfigBuilder {
pub fn with_max_queue_size(mut self, max_queue_size: usize) -> Self {
self.max_queue_size = max_queue_size;
self
}
pub fn with_scheduled_delay(mut self, scheduled_delay: Duration) -> Self {
self.scheduled_delay = scheduled_delay;
self
}
pub fn with_max_export_timeout(mut self, max_export_timeout: Duration) -> Self {
self.max_export_timeout = max_export_timeout;
self
}
pub fn with_max_export_batch_size(mut self, max_export_batch_size: usize) -> Self {
self.max_export_batch_size = max_export_batch_size;
self
}
pub fn build(self) -> BatchConfig {
let max_export_batch_size = min(self.max_export_batch_size, self.max_queue_size);
BatchConfig {
max_queue_size: self.max_queue_size,
scheduled_delay: self.scheduled_delay,
max_export_timeout: self.max_export_timeout,
max_export_batch_size,
}
}
fn init_from_env_vars(mut self) -> Self {
if let Some(max_queue_size) = env::var(OTEL_BLRP_MAX_QUEUE_SIZE)
.ok()
.and_then(|queue_size| usize::from_str(&queue_size).ok())
{
self.max_queue_size = max_queue_size;
}
if let Some(max_export_batch_size) = env::var(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE)
.ok()
.and_then(|batch_size| usize::from_str(&batch_size).ok())
{
self.max_export_batch_size = max_export_batch_size;
}
if let Some(scheduled_delay) = env::var(OTEL_BLRP_SCHEDULE_DELAY)
.ok()
.and_then(|delay| u64::from_str(&delay).ok())
{
self.scheduled_delay = Duration::from_millis(scheduled_delay);
}
if let Some(max_export_timeout) = env::var(OTEL_BLRP_EXPORT_TIMEOUT)
.ok()
.and_then(|s| u64::from_str(&s).ok())
{
self.max_export_timeout = Duration::from_millis(max_export_timeout);
}
self
}
}
#[derive(Debug)]
pub struct BatchLogProcessorBuilder<E, R> {
exporter: E,
config: BatchConfig,
runtime: R,
}
impl<E, R> BatchLogProcessorBuilder<E, R>
where
E: LogExporter + 'static,
R: RuntimeChannel,
{
pub fn with_batch_config(self, config: BatchConfig) -> Self {
BatchLogProcessorBuilder { config, ..self }
}
pub fn build(self) -> BatchLogProcessor<R> {
BatchLogProcessor::new(Box::new(self.exporter), self.config, self.runtime)
}
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
enum BatchMessage {
ExportLog(LogData),
Flush(Option<oneshot::Sender<ExportResult>>),
Shutdown(oneshot::Sender<ExportResult>),
SetResource(Arc<Resource>),
}
#[cfg(all(test, feature = "testing", feature = "logs"))]
mod tests {
use super::{
BatchLogProcessor, OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
OTEL_BLRP_MAX_QUEUE_SIZE, OTEL_BLRP_SCHEDULE_DELAY,
};
use crate::testing::logs::InMemoryLogsExporterBuilder;
use crate::{
export::logs::{LogData, LogExporter},
logs::{
log_processor::{
OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BLRP_SCHEDULE_DELAY_DEFAULT,
},
BatchConfig, BatchConfigBuilder, LogProcessor, LoggerProvider, SimpleLogProcessor,
},
runtime,
testing::logs::InMemoryLogsExporter,
Resource,
};
use async_trait::async_trait;
use opentelemetry::logs::AnyValue;
#[cfg(feature = "logs_level_enabled")]
use opentelemetry::logs::Severity;
use opentelemetry::logs::{Logger, LoggerProvider as _};
use opentelemetry::Key;
use opentelemetry::{logs::LogResult, KeyValue};
use std::borrow::Cow;
use std::sync::{Arc, Mutex};
use std::time::Duration;
#[derive(Debug, Clone)]
struct MockLogExporter {
resource: Arc<Mutex<Option<Resource>>>,
}
#[async_trait]
impl LogExporter for MockLogExporter {
async fn export<'a>(&mut self, _batch: Vec<Cow<'a, LogData>>) -> LogResult<()> {
Ok(())
}
fn shutdown(&mut self) {}
fn set_resource(&mut self, resource: &Resource) {
self.resource
.lock()
.map(|mut res_opt| {
res_opt.replace(resource.clone());
})
.expect("mock log exporter shouldn't error when setting resource");
}
}
impl MockLogExporter {
fn get_resource(&self) -> Option<Resource> {
(*self.resource).lock().unwrap().clone()
}
}
#[test]
fn test_default_const_values() {
assert_eq!(OTEL_BLRP_SCHEDULE_DELAY, "OTEL_BLRP_SCHEDULE_DELAY");
assert_eq!(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT, 1_000);
assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT, "OTEL_BLRP_EXPORT_TIMEOUT");
assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT, 30_000);
assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE, "OTEL_BLRP_MAX_QUEUE_SIZE");
assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, 2_048);
assert_eq!(
OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
"OTEL_BLRP_MAX_EXPORT_BATCH_SIZE"
);
assert_eq!(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, 512);
}
#[test]
fn test_default_batch_config_adheres_to_specification() {
let env_vars = vec![
OTEL_BLRP_SCHEDULE_DELAY,
OTEL_BLRP_EXPORT_TIMEOUT,
OTEL_BLRP_MAX_QUEUE_SIZE,
OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
];
let config = temp_env::with_vars_unset(env_vars, BatchConfig::default);
assert_eq!(
config.scheduled_delay,
Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT)
);
assert_eq!(
config.max_export_timeout,
Duration::from_millis(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT)
);
assert_eq!(config.max_queue_size, OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT);
assert_eq!(
config.max_export_batch_size,
OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT
);
}
#[test]
fn test_batch_config_configurable_by_env_vars() {
let env_vars = vec![
(OTEL_BLRP_SCHEDULE_DELAY, Some("2000")),
(OTEL_BLRP_EXPORT_TIMEOUT, Some("60000")),
(OTEL_BLRP_MAX_QUEUE_SIZE, Some("4096")),
(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
];
let config = temp_env::with_vars(env_vars, BatchConfig::default);
assert_eq!(config.scheduled_delay, Duration::from_millis(2000));
assert_eq!(config.max_export_timeout, Duration::from_millis(60000));
assert_eq!(config.max_queue_size, 4096);
assert_eq!(config.max_export_batch_size, 1024);
}
#[test]
fn test_batch_config_max_export_batch_size_validation() {
let env_vars = vec![
(OTEL_BLRP_MAX_QUEUE_SIZE, Some("256")),
(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
];
let config = temp_env::with_vars(env_vars, BatchConfig::default);
assert_eq!(config.max_queue_size, 256);
assert_eq!(config.max_export_batch_size, 256);
assert_eq!(
config.scheduled_delay,
Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT)
);
assert_eq!(
config.max_export_timeout,
Duration::from_millis(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT)
);
}
#[test]
fn test_batch_config_with_fields() {
let batch = BatchConfigBuilder::default()
.with_max_export_batch_size(1)
.with_scheduled_delay(Duration::from_millis(2))
.with_max_export_timeout(Duration::from_millis(3))
.with_max_queue_size(4)
.build();
assert_eq!(batch.max_export_batch_size, 1);
assert_eq!(batch.scheduled_delay, Duration::from_millis(2));
assert_eq!(batch.max_export_timeout, Duration::from_millis(3));
assert_eq!(batch.max_queue_size, 4);
}
#[test]
fn test_build_batch_log_processor_builder() {
let mut env_vars = vec![
(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("500")),
(OTEL_BLRP_SCHEDULE_DELAY, Some("I am not number")),
(OTEL_BLRP_EXPORT_TIMEOUT, Some("2046")),
];
temp_env::with_vars(env_vars.clone(), || {
let builder =
BatchLogProcessor::builder(InMemoryLogsExporter::default(), runtime::Tokio);
assert_eq!(builder.config.max_export_batch_size, 500);
assert_eq!(
builder.config.scheduled_delay,
Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT)
);
assert_eq!(
builder.config.max_queue_size,
OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT
);
assert_eq!(
builder.config.max_export_timeout,
Duration::from_millis(2046)
);
});
env_vars.push((OTEL_BLRP_MAX_QUEUE_SIZE, Some("120")));
temp_env::with_vars(env_vars, || {
let builder =
BatchLogProcessor::builder(InMemoryLogsExporter::default(), runtime::Tokio);
assert_eq!(builder.config.max_export_batch_size, 120);
assert_eq!(builder.config.max_queue_size, 120);
});
}
#[test]
fn test_build_batch_log_processor_builder_with_custom_config() {
let expected = BatchConfigBuilder::default()
.with_max_export_batch_size(1)
.with_scheduled_delay(Duration::from_millis(2))
.with_max_export_timeout(Duration::from_millis(3))
.with_max_queue_size(4)
.build();
let builder = BatchLogProcessor::builder(InMemoryLogsExporter::default(), runtime::Tokio)
.with_batch_config(expected);
let actual = &builder.config;
assert_eq!(actual.max_export_batch_size, 1);
assert_eq!(actual.scheduled_delay, Duration::from_millis(2));
assert_eq!(actual.max_export_timeout, Duration::from_millis(3));
assert_eq!(actual.max_queue_size, 4);
}
#[test]
fn test_set_resource_simple_processor() {
let exporter = MockLogExporter {
resource: Arc::new(Mutex::new(None)),
};
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
let _ = LoggerProvider::builder()
.with_log_processor(processor)
.with_resource(Resource::new(vec![
KeyValue::new("k1", "v1"),
KeyValue::new("k2", "v3"),
KeyValue::new("k3", "v3"),
KeyValue::new("k4", "v4"),
KeyValue::new("k5", "v5"),
]))
.build();
assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_set_resource_batch_processor() {
let exporter = MockLogExporter {
resource: Arc::new(Mutex::new(None)),
};
let processor = BatchLogProcessor::new(
Box::new(exporter.clone()),
BatchConfig::default(),
runtime::Tokio,
);
let provider = LoggerProvider::builder()
.with_log_processor(processor)
.with_resource(Resource::new(vec![
KeyValue::new("k1", "v1"),
KeyValue::new("k2", "v3"),
KeyValue::new("k3", "v3"),
KeyValue::new("k4", "v4"),
KeyValue::new("k5", "v5"),
]))
.build();
tokio::time::sleep(Duration::from_secs(2)).await; assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5);
let _ = provider.shutdown();
}
#[tokio::test(flavor = "multi_thread")]
async fn test_batch_shutdown() {
let exporter = InMemoryLogsExporterBuilder::default()
.keep_records_on_shutdown()
.build();
let processor = BatchLogProcessor::new(
Box::new(exporter.clone()),
BatchConfig::default(),
runtime::Tokio,
);
let mut log_data = LogData {
record: Default::default(),
instrumentation: Default::default(),
};
processor.emit(&mut log_data);
processor.force_flush().unwrap();
processor.shutdown().unwrap();
processor.emit(&mut log_data);
assert_eq!(1, exporter.get_emitted_logs().unwrap().len())
}
#[test]
fn test_simple_shutdown() {
let exporter = InMemoryLogsExporterBuilder::default()
.keep_records_on_shutdown()
.build();
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
let mut log_data = LogData {
record: Default::default(),
instrumentation: Default::default(),
};
processor.emit(&mut log_data);
processor.shutdown().unwrap();
let is_shutdown = processor
.is_shutdown
.load(std::sync::atomic::Ordering::Relaxed);
assert!(is_shutdown);
processor.emit(&mut log_data);
assert_eq!(1, exporter.get_emitted_logs().unwrap().len())
}
#[derive(Debug)]
struct FirstProcessor {
pub(crate) logs: Arc<Mutex<Vec<LogData>>>,
}
impl LogProcessor for FirstProcessor {
fn emit(&self, data: &mut LogData) {
data.record.attributes.get_or_insert(vec![]).push((
Key::from_static_str("processed_by"),
AnyValue::String("FirstProcessor".into()),
));
data.record.body = Some("Updated by FirstProcessor".into());
self.logs.lock().unwrap().push(data.clone()); }
#[cfg(feature = "logs_level_enabled")]
fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
true
}
fn force_flush(&self) -> LogResult<()> {
Ok(())
}
fn shutdown(&self) -> LogResult<()> {
Ok(())
}
}
#[derive(Debug)]
struct SecondProcessor {
pub(crate) logs: Arc<Mutex<Vec<LogData>>>,
}
impl LogProcessor for SecondProcessor {
fn emit(&self, data: &mut LogData) {
assert!(data.record.attributes.as_ref().map_or(false, |attrs| {
attrs.iter().any(|(key, value)| {
key.as_str() == "processed_by"
&& value == &AnyValue::String("FirstProcessor".into())
})
}));
assert!(
data.record.body.clone().unwrap()
== AnyValue::String("Updated by FirstProcessor".into())
);
self.logs.lock().unwrap().push(data.clone());
}
#[cfg(feature = "logs_level_enabled")]
fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
true
}
fn force_flush(&self) -> LogResult<()> {
Ok(())
}
fn shutdown(&self) -> LogResult<()> {
Ok(())
}
}
#[test]
fn test_log_data_modification_by_multiple_processors() {
let first_processor_logs = Arc::new(Mutex::new(Vec::new()));
let second_processor_logs = Arc::new(Mutex::new(Vec::new()));
let first_processor = FirstProcessor {
logs: Arc::clone(&first_processor_logs),
};
let second_processor = SecondProcessor {
logs: Arc::clone(&second_processor_logs),
};
let logger_provider = LoggerProvider::builder()
.with_log_processor(first_processor)
.with_log_processor(second_processor)
.build();
let logger = logger_provider.logger("test-logger");
let mut log_record = logger.create_log_record();
log_record.body = Some(AnyValue::String("Test log".into()));
logger.emit(log_record);
assert_eq!(first_processor_logs.lock().unwrap().len(), 1);
assert_eq!(second_processor_logs.lock().unwrap().len(), 1);
let first_log = &first_processor_logs.lock().unwrap()[0];
let second_log = &second_processor_logs.lock().unwrap()[0];
assert!(first_log.record.attributes.iter().any(|attrs| {
attrs.iter().any(|(key, value)| {
key.as_str() == "processed_by"
&& value == &AnyValue::String("FirstProcessor".into())
})
}));
assert!(second_log.record.attributes.iter().any(|attrs| {
attrs.iter().any(|(key, value)| {
key.as_str() == "processed_by"
&& value == &AnyValue::String("FirstProcessor".into())
})
}));
assert!(
first_log.record.body.clone().unwrap()
== AnyValue::String("Updated by FirstProcessor".into())
);
assert!(
second_log.record.body.clone().unwrap()
== AnyValue::String("Updated by FirstProcessor".into())
);
}
}