use std::{
env, fmt, mem,
sync::{Arc, Mutex, Weak},
time::Duration,
};
use futures_channel::{mpsc, oneshot};
use futures_util::{
future::{self, Either},
pin_mut,
stream::{self, FusedStream},
StreamExt,
};
use opentelemetry::{
global,
metrics::{MetricsError, Result},
};
use crate::runtime::Runtime;
use crate::{
metrics::{
exporter::PushMetricsExporter,
reader::{MetricProducer, SdkProducer},
},
Resource,
};
use super::{
aggregation::Aggregation,
data::{ResourceMetrics, Temporality},
instrument::InstrumentKind,
reader::{AggregationSelector, MetricReader, TemporalitySelector},
Pipeline,
};
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
const DEFAULT_INTERVAL: Duration = Duration::from_secs(60);
const METRIC_EXPORT_INTERVAL_NAME: &str = "OTEL_METRIC_EXPORT_INTERVAL";
const METRIC_EXPORT_TIMEOUT_NAME: &str = "OTEL_METRIC_EXPORT_TIMEOUT";
#[derive(Debug)]
pub struct PeriodicReaderBuilder<E, RT> {
interval: Duration,
timeout: Duration,
exporter: E,
producers: Vec<Box<dyn MetricProducer>>,
runtime: RT,
}
impl<E, RT> PeriodicReaderBuilder<E, RT>
where
E: PushMetricsExporter,
RT: Runtime,
{
fn new(exporter: E, runtime: RT) -> Self {
let interval = env::var(METRIC_EXPORT_INTERVAL_NAME)
.ok()
.and_then(|v| v.parse().map(Duration::from_millis).ok())
.unwrap_or(DEFAULT_INTERVAL);
let timeout = env::var(METRIC_EXPORT_TIMEOUT_NAME)
.ok()
.and_then(|v| v.parse().map(Duration::from_millis).ok())
.unwrap_or(DEFAULT_TIMEOUT);
PeriodicReaderBuilder {
interval,
timeout,
producers: vec![],
exporter,
runtime,
}
}
pub fn with_interval(mut self, interval: Duration) -> Self {
if !interval.is_zero() {
self.interval = interval;
}
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
if !timeout.is_zero() {
self.timeout = timeout;
}
self
}
pub fn with_producer(mut self, producer: impl MetricProducer + 'static) -> Self {
self.producers.push(Box::new(producer));
self
}
pub fn build(self) -> PeriodicReader {
let (message_sender, message_receiver) = mpsc::channel(256);
let worker = move |reader: &PeriodicReader| {
let ticker = self
.runtime
.interval(self.interval)
.skip(1) .map(|_| Message::Export);
let messages = Box::pin(stream::select(message_receiver, ticker));
let runtime = self.runtime.clone();
self.runtime.spawn(Box::pin(
PeriodicReaderWorker {
reader: reader.clone(),
timeout: self.timeout,
runtime,
rm: ResourceMetrics {
resource: Resource::empty(),
scope_metrics: Vec::new(),
},
}
.run(messages),
));
};
PeriodicReader {
exporter: Arc::new(self.exporter),
inner: Arc::new(Mutex::new(PeriodicReaderInner {
message_sender,
is_shutdown: false,
external_producers: self.producers,
sdk_producer_or_worker: ProducerOrWorker::Worker(Box::new(worker)),
})),
}
}
}
#[derive(Clone)]
pub struct PeriodicReader {
exporter: Arc<dyn PushMetricsExporter>,
inner: Arc<Mutex<PeriodicReaderInner>>,
}
impl PeriodicReader {
pub fn builder<E, RT>(exporter: E, runtime: RT) -> PeriodicReaderBuilder<E, RT>
where
E: PushMetricsExporter,
RT: Runtime,
{
PeriodicReaderBuilder::new(exporter, runtime)
}
}
impl fmt::Debug for PeriodicReader {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PeriodicReader").finish()
}
}
struct PeriodicReaderInner {
message_sender: mpsc::Sender<Message>,
is_shutdown: bool,
external_producers: Vec<Box<dyn MetricProducer>>,
sdk_producer_or_worker: ProducerOrWorker,
}
#[derive(Debug)]
enum Message {
Export,
Flush(oneshot::Sender<Result<()>>),
Shutdown(oneshot::Sender<Result<()>>),
}
enum ProducerOrWorker {
Producer(Weak<dyn SdkProducer>),
Worker(Box<dyn FnOnce(&PeriodicReader) + Send + Sync>),
}
struct PeriodicReaderWorker<RT: Runtime> {
reader: PeriodicReader,
timeout: Duration,
runtime: RT,
rm: ResourceMetrics,
}
impl<RT: Runtime> PeriodicReaderWorker<RT> {
async fn collect_and_export(&mut self) -> Result<()> {
self.reader.collect(&mut self.rm)?;
let export = self.reader.exporter.export(&mut self.rm);
let timeout = self.runtime.delay(self.timeout);
pin_mut!(export);
pin_mut!(timeout);
match future::select(export, timeout).await {
Either::Left((res, _)) => res, Either::Right(_) => Err(MetricsError::Other("export timed out".into())),
}
}
async fn process_message(&mut self, message: Message) -> bool {
match message {
Message::Export => {
if let Err(err) = self.collect_and_export().await {
global::handle_error(err)
}
}
Message::Flush(ch) => {
let res = self.collect_and_export().await;
if ch.send(res).is_err() {
global::handle_error(MetricsError::Other("flush channel closed".into()))
}
}
Message::Shutdown(ch) => {
let res = self.collect_and_export().await;
let _ = self.reader.exporter.shutdown();
if ch.send(res).is_err() {
global::handle_error(MetricsError::Other("shutdown channel closed".into()))
}
return false;
}
}
true
}
async fn run(mut self, mut messages: impl FusedStream<Item = Message> + Unpin) {
while let Some(message) = messages.next().await {
if !self.process_message(message).await {
break;
}
}
}
}
impl AggregationSelector for PeriodicReader {
fn aggregation(&self, kind: InstrumentKind) -> Aggregation {
self.exporter.aggregation(kind)
}
}
impl TemporalitySelector for PeriodicReader {
fn temporality(&self, kind: InstrumentKind) -> Temporality {
self.exporter.temporality(kind)
}
}
impl MetricReader for PeriodicReader {
fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
let mut inner = match self.inner.lock() {
Ok(guard) => guard,
Err(_) => return,
};
let worker = match &mut inner.sdk_producer_or_worker {
ProducerOrWorker::Producer(_) => {
global::handle_error(MetricsError::Other(
"duplicate meter registration, did not register manual reader".into(),
));
return;
}
ProducerOrWorker::Worker(w) => mem::replace(w, Box::new(|_| {})),
};
inner.sdk_producer_or_worker = ProducerOrWorker::Producer(pipeline);
worker(self);
}
fn collect(&self, rm: &mut ResourceMetrics) -> Result<()> {
let inner = self.inner.lock()?;
if inner.is_shutdown {
return Err(MetricsError::Other("reader is shut down".into()));
}
if let Some(producer) = match &inner.sdk_producer_or_worker {
ProducerOrWorker::Producer(sdk_producer) => sdk_producer.upgrade(),
ProducerOrWorker::Worker(_) => None,
} {
producer.produce(rm)?;
} else {
return Err(MetricsError::Other("reader is not registered".into()));
}
let mut errs = vec![];
for producer in &inner.external_producers {
match producer.produce() {
Ok(metrics) => rm.scope_metrics.push(metrics),
Err(err) => errs.push(err),
}
}
if errs.is_empty() {
Ok(())
} else {
Err(MetricsError::Other(format!("{:?}", errs)))
}
}
fn force_flush(&self) -> Result<()> {
let mut inner = self.inner.lock()?;
if inner.is_shutdown {
return Err(MetricsError::Other("reader is shut down".into()));
}
let (sender, receiver) = oneshot::channel();
inner
.message_sender
.try_send(Message::Flush(sender))
.map_err(|e| MetricsError::Other(e.to_string()))?;
drop(inner); futures_executor::block_on(receiver)
.map_err(|err| MetricsError::Other(err.to_string()))
.and_then(|res| res)
}
fn shutdown(&self) -> Result<()> {
let mut inner = self.inner.lock()?;
if inner.is_shutdown {
return Err(MetricsError::Other("reader is already shut down".into()));
}
let (sender, receiver) = oneshot::channel();
inner
.message_sender
.try_send(Message::Shutdown(sender))
.map_err(|e| MetricsError::Other(e.to_string()))?;
drop(inner); let shutdown_result = futures_executor::block_on(receiver)
.map_err(|err| MetricsError::Other(err.to_string()))?;
let mut inner = self.inner.lock()?;
inner.is_shutdown = true;
shutdown_result
}
}
#[cfg(all(test, feature = "testing"))]
mod tests {
use super::PeriodicReader;
use crate::{
metrics::data::ResourceMetrics, metrics::reader::MetricReader, metrics::SdkMeterProvider,
runtime, testing::metrics::InMemoryMetricsExporter, Resource,
};
use opentelemetry::metrics::MeterProvider;
use std::sync::mpsc;
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn registration_triggers_collection() {
let interval = std::time::Duration::from_millis(1);
let exporter = InMemoryMetricsExporter::default();
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio)
.with_interval(interval)
.build();
let (sender, receiver) = mpsc::channel();
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
let meter = meter_provider.meter("test");
let _counter = meter
.u64_observable_counter("testcounter")
.with_callback(move |_| {
sender.send(()).expect("channel should still be open");
})
.init();
_ = meter_provider.force_flush();
receiver
.try_recv()
.expect("message should be available in channel, indicating a collection occurred");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn unregistered_collect() {
let exporter = InMemoryMetricsExporter::default();
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
let mut rm = ResourceMetrics {
resource: Resource::empty(),
scope_metrics: Vec::new(),
};
let result = reader.collect(&mut rm);
result.expect_err("error expected when reader is not registered");
}
}