1use std::{marker, sync::Arc};
2
3use once_cell::sync::Lazy;
4use opentelemetry::KeyValue;
5
6use crate::{
7 metrics::data::{Aggregation, Gauge, Temporality},
8 metrics::AttributeSet,
9};
10
11use super::{
12 exponential_histogram::ExpoHistogram,
13 histogram::Histogram,
14 last_value::LastValue,
15 sum::{PrecomputedSum, Sum},
16 Number,
17};
18
19const STREAM_CARDINALITY_LIMIT: u32 = 2000;
20pub(crate) static STREAM_OVERFLOW_ATTRIBUTE_SET: Lazy<AttributeSet> = Lazy::new(|| {
21 let key_values: [KeyValue; 1] = [KeyValue::new("otel.metric.overflow", "true")];
22 AttributeSet::from(&key_values[..])
23});
24
25pub(crate) fn is_under_cardinality_limit(size: usize) -> bool {
27 size < STREAM_CARDINALITY_LIMIT as usize
28}
29
30pub(crate) trait Measure<T>: Send + Sync + 'static {
32 fn call(&self, measurement: T, attrs: AttributeSet);
33}
34
35impl<F, T> Measure<T> for F
36where
37 F: Fn(T, AttributeSet) + Send + Sync + 'static,
38{
39 fn call(&self, measurement: T, attrs: AttributeSet) {
40 self(measurement, attrs)
41 }
42}
43
44pub(crate) trait ComputeAggregation: Send + Sync + 'static {
47 fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>);
53}
54
55impl<T> ComputeAggregation for T
56where
57 T: Fn(Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>)
58 + Send
59 + Sync
60 + 'static,
61{
62 fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
63 self(dest)
64 }
65}
66
67pub(crate) struct AggregateBuilder<T> {
69 temporality: Option<Temporality>,
75
76 filter: Option<Filter>,
79
80 _marker: marker::PhantomData<T>,
81}
82
83type Filter = Arc<dyn Fn(&KeyValue) -> bool + Send + Sync>;
84
85impl<T: Number<T>> AggregateBuilder<T> {
86 pub(crate) fn new(temporality: Option<Temporality>, filter: Option<Filter>) -> Self {
87 AggregateBuilder {
88 temporality,
89 filter,
90 _marker: marker::PhantomData,
91 }
92 }
93
94 fn filter(&self, f: impl Measure<T>) -> impl Measure<T> {
96 let filter = self.filter.clone();
97 move |n, mut attrs: AttributeSet| {
98 if let Some(filter) = &filter {
99 attrs.retain(filter.as_ref());
100 }
101 f.call(n, attrs)
102 }
103 }
104
105 pub(crate) fn last_value(&self) -> (impl Measure<T>, impl ComputeAggregation) {
109 let lv_filter = Arc::new(LastValue::new());
112 let lv_agg = Arc::clone(&lv_filter);
113
114 (
115 self.filter(move |n, a| lv_filter.measure(n, a)),
116 move |dest: Option<&mut dyn Aggregation>| {
117 let g = dest.and_then(|d| d.as_mut().downcast_mut::<Gauge<T>>());
118 let mut new_agg = if g.is_none() {
119 Some(Gauge {
120 data_points: vec![],
121 })
122 } else {
123 None
124 };
125 let g = g.unwrap_or_else(|| new_agg.as_mut().expect("present if g is none"));
126
127 lv_agg.compute_aggregation(&mut g.data_points);
128
129 (g.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
130 },
131 )
132 }
133
134 pub(crate) fn precomputed_sum(
136 &self,
137 monotonic: bool,
138 ) -> (impl Measure<T>, impl ComputeAggregation) {
139 let s = Arc::new(PrecomputedSum::new(monotonic));
140 let agg_sum = Arc::clone(&s);
141 let t = self.temporality;
142
143 (
144 self.filter(move |n, a| s.measure(n, a)),
145 move |dest: Option<&mut dyn Aggregation>| match t {
146 Some(Temporality::Delta) => agg_sum.delta(dest),
147 _ => agg_sum.cumulative(dest),
148 },
149 )
150 }
151
152 pub(crate) fn sum(&self, monotonic: bool) -> (impl Measure<T>, impl ComputeAggregation) {
154 let s = Arc::new(Sum::new(monotonic));
155 let agg_sum = Arc::clone(&s);
156 let t = self.temporality;
157
158 (
159 self.filter(move |n, a| s.measure(n, a)),
160 move |dest: Option<&mut dyn Aggregation>| match t {
161 Some(Temporality::Delta) => agg_sum.delta(dest),
162 _ => agg_sum.cumulative(dest),
163 },
164 )
165 }
166
167 pub(crate) fn explicit_bucket_histogram(
169 &self,
170 boundaries: Vec<f64>,
171 record_min_max: bool,
172 record_sum: bool,
173 ) -> (impl Measure<T>, impl ComputeAggregation) {
174 let h = Arc::new(Histogram::new(boundaries, record_min_max, record_sum));
175 let agg_h = Arc::clone(&h);
176 let t = self.temporality;
177
178 (
179 self.filter(move |n, a| h.measure(n, a)),
180 move |dest: Option<&mut dyn Aggregation>| match t {
181 Some(Temporality::Delta) => agg_h.delta(dest),
182 _ => agg_h.cumulative(dest),
183 },
184 )
185 }
186
187 pub(crate) fn exponential_bucket_histogram(
189 &self,
190 max_size: u32,
191 max_scale: i8,
192 record_min_max: bool,
193 record_sum: bool,
194 ) -> (impl Measure<T>, impl ComputeAggregation) {
195 let h = Arc::new(ExpoHistogram::new(
196 max_size,
197 max_scale,
198 record_min_max,
199 record_sum,
200 ));
201 let agg_h = Arc::clone(&h);
202 let t = self.temporality;
203
204 (
205 self.filter(move |n, a| h.measure(n, a)),
206 move |dest: Option<&mut dyn Aggregation>| match t {
207 Some(Temporality::Delta) => agg_h.delta(dest),
208 _ => agg_h.cumulative(dest),
209 },
210 )
211 }
212}
213
214#[cfg(test)]
215mod tests {
216 use crate::metrics::data::{
217 DataPoint, ExponentialBucket, ExponentialHistogram, ExponentialHistogramDataPoint,
218 Histogram, HistogramDataPoint, Sum,
219 };
220 use std::{time::SystemTime, vec};
221
222 use super::*;
223
224 #[test]
225 fn last_value_aggregation() {
226 let (measure, agg) = AggregateBuilder::<u64>::new(None, None).last_value();
227 let mut a = Gauge {
228 data_points: vec![DataPoint {
229 attributes: vec![KeyValue::new("a", 1)],
230 start_time: Some(SystemTime::now()),
231 time: Some(SystemTime::now()),
232 value: 1u64,
233 exemplars: vec![],
234 }],
235 };
236 let new_attributes = [KeyValue::new("b", 2)];
237 measure.call(2, AttributeSet::from(&new_attributes[..]));
238
239 let (count, new_agg) = agg.call(Some(&mut a));
240
241 assert_eq!(count, 1);
242 assert!(new_agg.is_none());
243 assert_eq!(a.data_points.len(), 1);
244 assert_eq!(a.data_points[0].attributes, new_attributes.to_vec());
245 assert_eq!(a.data_points[0].value, 2);
246 }
247
248 #[test]
249 fn precomputed_sum_aggregation() {
250 for temporality in [Temporality::Delta, Temporality::Cumulative] {
251 let (measure, agg) =
252 AggregateBuilder::<u64>::new(Some(temporality), None).precomputed_sum(true);
253 let mut a = Sum {
254 data_points: vec![
255 DataPoint {
256 attributes: vec![KeyValue::new("a1", 1)],
257 start_time: Some(SystemTime::now()),
258 time: Some(SystemTime::now()),
259 value: 1u64,
260 exemplars: vec![],
261 },
262 DataPoint {
263 attributes: vec![KeyValue::new("a2", 1)],
264 start_time: Some(SystemTime::now()),
265 time: Some(SystemTime::now()),
266 value: 2u64,
267 exemplars: vec![],
268 },
269 ],
270 temporality: if temporality == Temporality::Delta {
271 Temporality::Cumulative
272 } else {
273 Temporality::Delta
274 },
275 is_monotonic: false,
276 };
277 let new_attributes = [KeyValue::new("b", 2)];
278 measure.call(3, AttributeSet::from(&new_attributes[..]));
279
280 let (count, new_agg) = agg.call(Some(&mut a));
281
282 assert_eq!(count, 1);
283 assert!(new_agg.is_none());
284 assert_eq!(a.temporality, temporality);
285 assert!(a.is_monotonic);
286 assert_eq!(a.data_points.len(), 1);
287 assert_eq!(a.data_points[0].attributes, new_attributes.to_vec());
288 assert_eq!(a.data_points[0].value, 3);
289 }
290 }
291
292 #[test]
293 fn sum_aggregation() {
294 for temporality in [Temporality::Delta, Temporality::Cumulative] {
295 let (measure, agg) = AggregateBuilder::<u64>::new(Some(temporality), None).sum(true);
296 let mut a = Sum {
297 data_points: vec![
298 DataPoint {
299 attributes: vec![KeyValue::new("a1", 1)],
300 start_time: Some(SystemTime::now()),
301 time: Some(SystemTime::now()),
302 value: 1u64,
303 exemplars: vec![],
304 },
305 DataPoint {
306 attributes: vec![KeyValue::new("a2", 1)],
307 start_time: Some(SystemTime::now()),
308 time: Some(SystemTime::now()),
309 value: 2u64,
310 exemplars: vec![],
311 },
312 ],
313 temporality: if temporality == Temporality::Delta {
314 Temporality::Cumulative
315 } else {
316 Temporality::Delta
317 },
318 is_monotonic: false,
319 };
320 let new_attributes = [KeyValue::new("b", 2)];
321 measure.call(3, AttributeSet::from(&new_attributes[..]));
322
323 let (count, new_agg) = agg.call(Some(&mut a));
324
325 assert_eq!(count, 1);
326 assert!(new_agg.is_none());
327 assert_eq!(a.temporality, temporality);
328 assert!(a.is_monotonic);
329 assert_eq!(a.data_points.len(), 1);
330 assert_eq!(a.data_points[0].attributes, new_attributes.to_vec());
331 assert_eq!(a.data_points[0].value, 3);
332 }
333 }
334
335 #[test]
336 fn explicit_bucket_histogram_aggregation() {
337 for temporality in [Temporality::Delta, Temporality::Cumulative] {
338 let (measure, agg) = AggregateBuilder::<u64>::new(Some(temporality), None)
339 .explicit_bucket_histogram(vec![1.0], true, true);
340 let mut a = Histogram {
341 data_points: vec![HistogramDataPoint {
342 attributes: vec![KeyValue::new("a1", 1)],
343 start_time: SystemTime::now(),
344 time: SystemTime::now(),
345 count: 2,
346 bounds: vec![1.0, 2.0],
347 bucket_counts: vec![0, 1, 1],
348 min: None,
349 max: None,
350 sum: 3u64,
351 exemplars: vec![],
352 }],
353 temporality: if temporality == Temporality::Delta {
354 Temporality::Cumulative
355 } else {
356 Temporality::Delta
357 },
358 };
359 let new_attributes = [KeyValue::new("b", 2)];
360 measure.call(3, AttributeSet::from(&new_attributes[..]));
361
362 let (count, new_agg) = agg.call(Some(&mut a));
363
364 assert_eq!(count, 1);
365 assert!(new_agg.is_none());
366 assert_eq!(a.temporality, temporality);
367 assert_eq!(a.data_points.len(), 1);
368 assert_eq!(a.data_points[0].attributes, new_attributes.to_vec());
369 assert_eq!(a.data_points[0].count, 1);
370 assert_eq!(a.data_points[0].bounds, vec![1.0]);
371 assert_eq!(a.data_points[0].bucket_counts, vec![0, 1]);
372 assert_eq!(a.data_points[0].min, Some(3));
373 assert_eq!(a.data_points[0].max, Some(3));
374 assert_eq!(a.data_points[0].sum, 3);
375 }
376 }
377
378 #[test]
379 fn exponential_histogram_aggregation() {
380 for temporality in [Temporality::Delta, Temporality::Cumulative] {
381 let (measure, agg) = AggregateBuilder::<u64>::new(Some(temporality), None)
382 .exponential_bucket_histogram(4, 20, true, true);
383 let mut a = ExponentialHistogram {
384 data_points: vec![ExponentialHistogramDataPoint {
385 attributes: vec![KeyValue::new("a1", 1)],
386 start_time: SystemTime::now(),
387 time: SystemTime::now(),
388 count: 2,
389 min: None,
390 max: None,
391 sum: 3u64,
392 scale: 10,
393 zero_count: 1,
394 positive_bucket: ExponentialBucket {
395 offset: 1,
396 counts: vec![1],
397 },
398 negative_bucket: ExponentialBucket {
399 offset: 1,
400 counts: vec![1],
401 },
402 zero_threshold: 1.0,
403 exemplars: vec![],
404 }],
405 temporality: if temporality == Temporality::Delta {
406 Temporality::Cumulative
407 } else {
408 Temporality::Delta
409 },
410 };
411 let new_attributes = [KeyValue::new("b", 2)];
412 measure.call(3, AttributeSet::from(&new_attributes[..]));
413
414 let (count, new_agg) = agg.call(Some(&mut a));
415
416 assert_eq!(count, 1);
417 assert!(new_agg.is_none());
418 assert_eq!(a.temporality, temporality);
419 assert_eq!(a.data_points.len(), 1);
420 assert_eq!(a.data_points[0].attributes, new_attributes.to_vec());
421 assert_eq!(a.data_points[0].count, 1);
422 assert_eq!(a.data_points[0].min, Some(3));
423 assert_eq!(a.data_points[0].max, Some(3));
424 assert_eq!(a.data_points[0].sum, 3);
425 assert_eq!(a.data_points[0].zero_count, 0);
426 assert_eq!(a.data_points[0].zero_threshold, 0.0);
427 assert_eq!(a.data_points[0].positive_bucket.offset, 1661953);
428 assert_eq!(a.data_points[0].positive_bucket.counts, vec![1]);
429 assert_eq!(a.data_points[0].negative_bucket.offset, 0);
430 assert!(a.data_points[0].negative_bucket.counts.is_empty());
431 }
432 }
433}