opentelemetry_sdk/metrics/internal/
aggregate.rs

1use std::{marker, sync::Arc};
2
3use once_cell::sync::Lazy;
4use opentelemetry::KeyValue;
5
6use crate::{
7    metrics::data::{Aggregation, Gauge, Temporality},
8    metrics::AttributeSet,
9};
10
11use super::{
12    exponential_histogram::ExpoHistogram,
13    histogram::Histogram,
14    last_value::LastValue,
15    sum::{PrecomputedSum, Sum},
16    Number,
17};
18
19const STREAM_CARDINALITY_LIMIT: u32 = 2000;
20pub(crate) static STREAM_OVERFLOW_ATTRIBUTE_SET: Lazy<AttributeSet> = Lazy::new(|| {
21    let key_values: [KeyValue; 1] = [KeyValue::new("otel.metric.overflow", "true")];
22    AttributeSet::from(&key_values[..])
23});
24
25/// Checks whether aggregator has hit cardinality limit for metric streams
26pub(crate) fn is_under_cardinality_limit(size: usize) -> bool {
27    size < STREAM_CARDINALITY_LIMIT as usize
28}
29
30/// Receives measurements to be aggregated.
31pub(crate) trait Measure<T>: Send + Sync + 'static {
32    fn call(&self, measurement: T, attrs: AttributeSet);
33}
34
35impl<F, T> Measure<T> for F
36where
37    F: Fn(T, AttributeSet) + Send + Sync + 'static,
38{
39    fn call(&self, measurement: T, attrs: AttributeSet) {
40        self(measurement, attrs)
41    }
42}
43
44/// Stores the aggregate of measurements into the aggregation and returns the number
45/// of aggregate data-points output.
46pub(crate) trait ComputeAggregation: Send + Sync + 'static {
47    /// Compute the new aggregation and store in `dest`.
48    ///
49    /// If no initial aggregation exists, `dest` will be `None`, in which case the
50    /// returned option is expected to contain a new aggregation with the data from
51    /// the current collection cycle.
52    fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>);
53}
54
55impl<T> ComputeAggregation for T
56where
57    T: Fn(Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>)
58        + Send
59        + Sync
60        + 'static,
61{
62    fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
63        self(dest)
64    }
65}
66
67/// Builds aggregate functions
68pub(crate) struct AggregateBuilder<T> {
69    /// The temporality used for the returned aggregate functions.
70    ///
71    /// If this is not provided, a default of cumulative will be used (except for the
72    /// last-value aggregate function where delta is the only appropriate
73    /// temporality).
74    temporality: Option<Temporality>,
75
76    /// The attribute filter the aggregate function will use on the input of
77    /// measurements.
78    filter: Option<Filter>,
79
80    _marker: marker::PhantomData<T>,
81}
82
83type Filter = Arc<dyn Fn(&KeyValue) -> bool + Send + Sync>;
84
85impl<T: Number<T>> AggregateBuilder<T> {
86    pub(crate) fn new(temporality: Option<Temporality>, filter: Option<Filter>) -> Self {
87        AggregateBuilder {
88            temporality,
89            filter,
90            _marker: marker::PhantomData,
91        }
92    }
93
94    /// Wraps the passed in measure with an attribute filtering function.
95    fn filter(&self, f: impl Measure<T>) -> impl Measure<T> {
96        let filter = self.filter.clone();
97        move |n, mut attrs: AttributeSet| {
98            if let Some(filter) = &filter {
99                attrs.retain(filter.as_ref());
100            }
101            f.call(n, attrs)
102        }
103    }
104
105    /// Builds a last-value aggregate function input and output.
106    ///
107    /// [Builder::temporality] is ignored and delta is always used.
108    pub(crate) fn last_value(&self) -> (impl Measure<T>, impl ComputeAggregation) {
109        // Delta temporality is the only temporality that makes semantic sense for
110        // a last-value aggregate.
111        let lv_filter = Arc::new(LastValue::new());
112        let lv_agg = Arc::clone(&lv_filter);
113
114        (
115            self.filter(move |n, a| lv_filter.measure(n, a)),
116            move |dest: Option<&mut dyn Aggregation>| {
117                let g = dest.and_then(|d| d.as_mut().downcast_mut::<Gauge<T>>());
118                let mut new_agg = if g.is_none() {
119                    Some(Gauge {
120                        data_points: vec![],
121                    })
122                } else {
123                    None
124                };
125                let g = g.unwrap_or_else(|| new_agg.as_mut().expect("present if g is none"));
126
127                lv_agg.compute_aggregation(&mut g.data_points);
128
129                (g.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
130            },
131        )
132    }
133
134    /// Builds a precomputed sum aggregate function input and output.
135    pub(crate) fn precomputed_sum(
136        &self,
137        monotonic: bool,
138    ) -> (impl Measure<T>, impl ComputeAggregation) {
139        let s = Arc::new(PrecomputedSum::new(monotonic));
140        let agg_sum = Arc::clone(&s);
141        let t = self.temporality;
142
143        (
144            self.filter(move |n, a| s.measure(n, a)),
145            move |dest: Option<&mut dyn Aggregation>| match t {
146                Some(Temporality::Delta) => agg_sum.delta(dest),
147                _ => agg_sum.cumulative(dest),
148            },
149        )
150    }
151
152    /// Builds a sum aggregate function input and output.
153    pub(crate) fn sum(&self, monotonic: bool) -> (impl Measure<T>, impl ComputeAggregation) {
154        let s = Arc::new(Sum::new(monotonic));
155        let agg_sum = Arc::clone(&s);
156        let t = self.temporality;
157
158        (
159            self.filter(move |n, a| s.measure(n, a)),
160            move |dest: Option<&mut dyn Aggregation>| match t {
161                Some(Temporality::Delta) => agg_sum.delta(dest),
162                _ => agg_sum.cumulative(dest),
163            },
164        )
165    }
166
167    /// Builds a histogram aggregate function input and output.
168    pub(crate) fn explicit_bucket_histogram(
169        &self,
170        boundaries: Vec<f64>,
171        record_min_max: bool,
172        record_sum: bool,
173    ) -> (impl Measure<T>, impl ComputeAggregation) {
174        let h = Arc::new(Histogram::new(boundaries, record_min_max, record_sum));
175        let agg_h = Arc::clone(&h);
176        let t = self.temporality;
177
178        (
179            self.filter(move |n, a| h.measure(n, a)),
180            move |dest: Option<&mut dyn Aggregation>| match t {
181                Some(Temporality::Delta) => agg_h.delta(dest),
182                _ => agg_h.cumulative(dest),
183            },
184        )
185    }
186
187    /// Builds an exponential histogram aggregate function input and output.
188    pub(crate) fn exponential_bucket_histogram(
189        &self,
190        max_size: u32,
191        max_scale: i8,
192        record_min_max: bool,
193        record_sum: bool,
194    ) -> (impl Measure<T>, impl ComputeAggregation) {
195        let h = Arc::new(ExpoHistogram::new(
196            max_size,
197            max_scale,
198            record_min_max,
199            record_sum,
200        ));
201        let agg_h = Arc::clone(&h);
202        let t = self.temporality;
203
204        (
205            self.filter(move |n, a| h.measure(n, a)),
206            move |dest: Option<&mut dyn Aggregation>| match t {
207                Some(Temporality::Delta) => agg_h.delta(dest),
208                _ => agg_h.cumulative(dest),
209            },
210        )
211    }
212}
213
214#[cfg(test)]
215mod tests {
216    use crate::metrics::data::{
217        DataPoint, ExponentialBucket, ExponentialHistogram, ExponentialHistogramDataPoint,
218        Histogram, HistogramDataPoint, Sum,
219    };
220    use std::{time::SystemTime, vec};
221
222    use super::*;
223
224    #[test]
225    fn last_value_aggregation() {
226        let (measure, agg) = AggregateBuilder::<u64>::new(None, None).last_value();
227        let mut a = Gauge {
228            data_points: vec![DataPoint {
229                attributes: vec![KeyValue::new("a", 1)],
230                start_time: Some(SystemTime::now()),
231                time: Some(SystemTime::now()),
232                value: 1u64,
233                exemplars: vec![],
234            }],
235        };
236        let new_attributes = [KeyValue::new("b", 2)];
237        measure.call(2, AttributeSet::from(&new_attributes[..]));
238
239        let (count, new_agg) = agg.call(Some(&mut a));
240
241        assert_eq!(count, 1);
242        assert!(new_agg.is_none());
243        assert_eq!(a.data_points.len(), 1);
244        assert_eq!(a.data_points[0].attributes, new_attributes.to_vec());
245        assert_eq!(a.data_points[0].value, 2);
246    }
247
248    #[test]
249    fn precomputed_sum_aggregation() {
250        for temporality in [Temporality::Delta, Temporality::Cumulative] {
251            let (measure, agg) =
252                AggregateBuilder::<u64>::new(Some(temporality), None).precomputed_sum(true);
253            let mut a = Sum {
254                data_points: vec![
255                    DataPoint {
256                        attributes: vec![KeyValue::new("a1", 1)],
257                        start_time: Some(SystemTime::now()),
258                        time: Some(SystemTime::now()),
259                        value: 1u64,
260                        exemplars: vec![],
261                    },
262                    DataPoint {
263                        attributes: vec![KeyValue::new("a2", 1)],
264                        start_time: Some(SystemTime::now()),
265                        time: Some(SystemTime::now()),
266                        value: 2u64,
267                        exemplars: vec![],
268                    },
269                ],
270                temporality: if temporality == Temporality::Delta {
271                    Temporality::Cumulative
272                } else {
273                    Temporality::Delta
274                },
275                is_monotonic: false,
276            };
277            let new_attributes = [KeyValue::new("b", 2)];
278            measure.call(3, AttributeSet::from(&new_attributes[..]));
279
280            let (count, new_agg) = agg.call(Some(&mut a));
281
282            assert_eq!(count, 1);
283            assert!(new_agg.is_none());
284            assert_eq!(a.temporality, temporality);
285            assert!(a.is_monotonic);
286            assert_eq!(a.data_points.len(), 1);
287            assert_eq!(a.data_points[0].attributes, new_attributes.to_vec());
288            assert_eq!(a.data_points[0].value, 3);
289        }
290    }
291
292    #[test]
293    fn sum_aggregation() {
294        for temporality in [Temporality::Delta, Temporality::Cumulative] {
295            let (measure, agg) = AggregateBuilder::<u64>::new(Some(temporality), None).sum(true);
296            let mut a = Sum {
297                data_points: vec![
298                    DataPoint {
299                        attributes: vec![KeyValue::new("a1", 1)],
300                        start_time: Some(SystemTime::now()),
301                        time: Some(SystemTime::now()),
302                        value: 1u64,
303                        exemplars: vec![],
304                    },
305                    DataPoint {
306                        attributes: vec![KeyValue::new("a2", 1)],
307                        start_time: Some(SystemTime::now()),
308                        time: Some(SystemTime::now()),
309                        value: 2u64,
310                        exemplars: vec![],
311                    },
312                ],
313                temporality: if temporality == Temporality::Delta {
314                    Temporality::Cumulative
315                } else {
316                    Temporality::Delta
317                },
318                is_monotonic: false,
319            };
320            let new_attributes = [KeyValue::new("b", 2)];
321            measure.call(3, AttributeSet::from(&new_attributes[..]));
322
323            let (count, new_agg) = agg.call(Some(&mut a));
324
325            assert_eq!(count, 1);
326            assert!(new_agg.is_none());
327            assert_eq!(a.temporality, temporality);
328            assert!(a.is_monotonic);
329            assert_eq!(a.data_points.len(), 1);
330            assert_eq!(a.data_points[0].attributes, new_attributes.to_vec());
331            assert_eq!(a.data_points[0].value, 3);
332        }
333    }
334
335    #[test]
336    fn explicit_bucket_histogram_aggregation() {
337        for temporality in [Temporality::Delta, Temporality::Cumulative] {
338            let (measure, agg) = AggregateBuilder::<u64>::new(Some(temporality), None)
339                .explicit_bucket_histogram(vec![1.0], true, true);
340            let mut a = Histogram {
341                data_points: vec![HistogramDataPoint {
342                    attributes: vec![KeyValue::new("a1", 1)],
343                    start_time: SystemTime::now(),
344                    time: SystemTime::now(),
345                    count: 2,
346                    bounds: vec![1.0, 2.0],
347                    bucket_counts: vec![0, 1, 1],
348                    min: None,
349                    max: None,
350                    sum: 3u64,
351                    exemplars: vec![],
352                }],
353                temporality: if temporality == Temporality::Delta {
354                    Temporality::Cumulative
355                } else {
356                    Temporality::Delta
357                },
358            };
359            let new_attributes = [KeyValue::new("b", 2)];
360            measure.call(3, AttributeSet::from(&new_attributes[..]));
361
362            let (count, new_agg) = agg.call(Some(&mut a));
363
364            assert_eq!(count, 1);
365            assert!(new_agg.is_none());
366            assert_eq!(a.temporality, temporality);
367            assert_eq!(a.data_points.len(), 1);
368            assert_eq!(a.data_points[0].attributes, new_attributes.to_vec());
369            assert_eq!(a.data_points[0].count, 1);
370            assert_eq!(a.data_points[0].bounds, vec![1.0]);
371            assert_eq!(a.data_points[0].bucket_counts, vec![0, 1]);
372            assert_eq!(a.data_points[0].min, Some(3));
373            assert_eq!(a.data_points[0].max, Some(3));
374            assert_eq!(a.data_points[0].sum, 3);
375        }
376    }
377
378    #[test]
379    fn exponential_histogram_aggregation() {
380        for temporality in [Temporality::Delta, Temporality::Cumulative] {
381            let (measure, agg) = AggregateBuilder::<u64>::new(Some(temporality), None)
382                .exponential_bucket_histogram(4, 20, true, true);
383            let mut a = ExponentialHistogram {
384                data_points: vec![ExponentialHistogramDataPoint {
385                    attributes: vec![KeyValue::new("a1", 1)],
386                    start_time: SystemTime::now(),
387                    time: SystemTime::now(),
388                    count: 2,
389                    min: None,
390                    max: None,
391                    sum: 3u64,
392                    scale: 10,
393                    zero_count: 1,
394                    positive_bucket: ExponentialBucket {
395                        offset: 1,
396                        counts: vec![1],
397                    },
398                    negative_bucket: ExponentialBucket {
399                        offset: 1,
400                        counts: vec![1],
401                    },
402                    zero_threshold: 1.0,
403                    exemplars: vec![],
404                }],
405                temporality: if temporality == Temporality::Delta {
406                    Temporality::Cumulative
407                } else {
408                    Temporality::Delta
409                },
410            };
411            let new_attributes = [KeyValue::new("b", 2)];
412            measure.call(3, AttributeSet::from(&new_attributes[..]));
413
414            let (count, new_agg) = agg.call(Some(&mut a));
415
416            assert_eq!(count, 1);
417            assert!(new_agg.is_none());
418            assert_eq!(a.temporality, temporality);
419            assert_eq!(a.data_points.len(), 1);
420            assert_eq!(a.data_points[0].attributes, new_attributes.to_vec());
421            assert_eq!(a.data_points[0].count, 1);
422            assert_eq!(a.data_points[0].min, Some(3));
423            assert_eq!(a.data_points[0].max, Some(3));
424            assert_eq!(a.data_points[0].sum, 3);
425            assert_eq!(a.data_points[0].zero_count, 0);
426            assert_eq!(a.data_points[0].zero_threshold, 0.0);
427            assert_eq!(a.data_points[0].positive_bucket.offset, 1661953);
428            assert_eq!(a.data_points[0].positive_bucket.counts, vec![1]);
429            assert_eq!(a.data_points[0].negative_bucket.offset, 0);
430            assert!(a.data_points[0].negative_bucket.counts.is_empty());
431        }
432    }
433}