use std::sync::Arc;
use opentelemetry_api::KeyValue;
use crate::{attributes::AttributeSet, metrics::data::Aggregation};
use super::{aggregator::PrecomputeAggregator, Aggregator, Number};
pub(crate) fn new_filter<T: Number<T>>(
agg: Arc<dyn Aggregator<T>>,
filter: Arc<dyn Fn(&KeyValue) -> bool + Send + Sync>,
) -> Arc<dyn Aggregator<T>> {
if let Some(agg) = agg.as_precompute_aggregator() {
Arc::new(PrecomputeFilter { agg, filter })
} else {
Arc::new(Filter { agg, filter })
}
}
struct Filter<T> {
filter: Arc<dyn Fn(&KeyValue) -> bool + Send + Sync>,
agg: Arc<dyn Aggregator<T>>,
}
impl<T: Number<T>> Aggregator<T> for Filter<T> {
fn aggregate(&self, measurement: T, mut attrs: AttributeSet) {
attrs.retain(self.filter.as_ref());
self.agg.aggregate(measurement, attrs)
}
fn aggregation(&self) -> Option<Box<dyn Aggregation>> {
self.agg.aggregation()
}
}
struct PrecomputeFilter<T: Number<T>> {
filter: Arc<dyn Fn(&KeyValue) -> bool + Send + Sync>,
agg: Arc<dyn PrecomputeAggregator<T>>,
}
impl<T: Number<T>> Aggregator<T> for PrecomputeFilter<T> {
fn aggregate(&self, measurement: T, mut attrs: AttributeSet) {
let pre_len = attrs.len();
attrs.retain(self.filter.as_ref());
if pre_len == attrs.len() {
self.agg.aggregate(measurement, attrs)
} else {
self.agg.aggregate_filtered(measurement, attrs)
}
}
fn aggregation(&self) -> Option<Box<dyn Aggregation>> {
self.agg.aggregation()
}
}