use std::{
fmt,
sync::{Mutex, Weak},
};
use opentelemetry::{
global,
metrics::{MetricsError, Result},
};
use super::{
data::{ResourceMetrics, Temporality},
instrument::InstrumentKind,
pipeline::Pipeline,
reader::{
AggregationSelector, DefaultAggregationSelector, DefaultTemporalitySelector,
MetricProducer, MetricReader, SdkProducer, TemporalitySelector,
},
};
pub struct ManualReader {
inner: Box<Mutex<ManualReaderInner>>,
temporality_selector: Box<dyn TemporalitySelector>,
aggregation_selector: Box<dyn AggregationSelector>,
}
impl Default for ManualReader {
fn default() -> Self {
ManualReader::builder().build()
}
}
impl fmt::Debug for ManualReader {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("ManualReader")
}
}
#[derive(Debug)]
struct ManualReaderInner {
sdk_producer: Option<Weak<dyn SdkProducer>>,
is_shutdown: bool,
external_producers: Vec<Box<dyn MetricProducer>>,
}
impl ManualReader {
pub fn builder() -> ManualReaderBuilder {
ManualReaderBuilder::default()
}
pub(crate) fn new(
temporality_selector: Box<dyn TemporalitySelector>,
aggregation_selector: Box<dyn AggregationSelector>,
producers: Vec<Box<dyn MetricProducer>>,
) -> Self {
ManualReader {
inner: Box::new(Mutex::new(ManualReaderInner {
sdk_producer: None,
is_shutdown: false,
external_producers: producers,
})),
temporality_selector,
aggregation_selector,
}
}
}
impl TemporalitySelector for ManualReader {
fn temporality(&self, kind: InstrumentKind) -> Temporality {
self.temporality_selector.temporality(kind)
}
}
impl AggregationSelector for ManualReader {
fn aggregation(&self, kind: InstrumentKind) -> super::aggregation::Aggregation {
self.aggregation_selector.aggregation(kind)
}
}
impl MetricReader for ManualReader {
fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
let _ = self.inner.lock().map(|mut inner| {
if inner.sdk_producer.is_none() {
inner.sdk_producer = Some(pipeline);
} else {
global::handle_error(MetricsError::Config(
"duplicate reader registration, did not register manual reader".into(),
))
}
});
}
fn collect(&self, rm: &mut ResourceMetrics) -> Result<()> {
let inner = self.inner.lock()?;
match &inner.sdk_producer.as_ref().and_then(|w| w.upgrade()) {
Some(producer) => producer.produce(rm)?,
None => {
return Err(MetricsError::Other(
"reader is shut down or not registered".into(),
))
}
};
let mut errs = vec![];
for producer in &inner.external_producers {
match producer.produce() {
Ok(metrics) => rm.scope_metrics.push(metrics),
Err(err) => errs.push(err),
}
}
if errs.is_empty() {
Ok(())
} else {
Err(MetricsError::Other(format!("{:?}", errs)))
}
}
fn force_flush(&self) -> Result<()> {
Ok(())
}
fn shutdown(&self) -> Result<()> {
let mut inner = self.inner.lock()?;
inner.sdk_producer = None;
inner.is_shutdown = true;
inner.external_producers = Vec::new();
Ok(())
}
}
pub struct ManualReaderBuilder {
temporality_selector: Box<dyn TemporalitySelector>,
aggregation_selector: Box<dyn AggregationSelector>,
producers: Vec<Box<dyn MetricProducer>>,
}
impl fmt::Debug for ManualReaderBuilder {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("ManualReaderBuilder")
}
}
impl Default for ManualReaderBuilder {
fn default() -> Self {
ManualReaderBuilder {
temporality_selector: Box::new(DefaultTemporalitySelector { _private: () }),
aggregation_selector: Box::new(DefaultAggregationSelector { _private: () }),
producers: vec![],
}
}
}
impl ManualReaderBuilder {
pub fn new() -> Self {
Default::default()
}
pub fn with_temporality_selector(
mut self,
temporality_selector: impl TemporalitySelector + 'static,
) -> Self {
self.temporality_selector = Box::new(temporality_selector);
self
}
pub fn with_aggregation_selector(
mut self,
aggregation_selector: impl AggregationSelector + 'static,
) -> Self {
self.aggregation_selector = Box::new(aggregation_selector);
self
}
pub fn with_producer(mut self, producer: impl MetricProducer + 'static) -> Self {
self.producers.push(Box::new(producer));
self
}
pub fn build(self) -> ManualReader {
ManualReader::new(
self.temporality_selector,
self.aggregation_selector,
self.producers,
)
}
}