1use std::{collections::HashMap, f64::consts::LOG2_E, sync::Mutex, time::SystemTime};
2
3use once_cell::sync::Lazy;
4use opentelemetry::{metrics::MetricsError, KeyValue};
5
6use crate::{
7 metrics::data::{self, Aggregation, Temporality},
8 metrics::AttributeSet,
9};
10
11use super::Number;
12
13pub(crate) const EXPO_MAX_SCALE: i8 = 20;
14pub(crate) const EXPO_MIN_SCALE: i8 = -10;
15
16#[derive(Debug, PartialEq)]
18struct ExpoHistogramDataPoint<T> {
19 count: usize,
20 min: T,
21 max: T,
22 sum: T,
23
24 max_size: i32,
25 record_min_max: bool,
26 record_sum: bool,
27
28 scale: i8,
29
30 pos_buckets: ExpoBuckets,
31 neg_buckets: ExpoBuckets,
32 zero_count: u64,
33}
34
35impl<T: Number<T>> ExpoHistogramDataPoint<T> {
36 fn new(max_size: i32, max_scale: i8, record_min_max: bool, record_sum: bool) -> Self {
37 ExpoHistogramDataPoint {
38 count: 0,
39 min: T::max(),
40 max: T::min(),
41 sum: T::default(),
42 max_size,
43 record_min_max,
44 record_sum,
45 scale: max_scale,
46 pos_buckets: ExpoBuckets::default(),
47 neg_buckets: ExpoBuckets::default(),
48 zero_count: 0,
49 }
50 }
51}
52
53impl<T: Number<T>> ExpoHistogramDataPoint<T> {
54 fn record(&mut self, v: T) {
58 self.count += 1;
59
60 if self.record_min_max {
61 if v < self.min {
62 self.min = v;
63 }
64 if v > self.max {
65 self.max = v;
66 }
67 }
68 if self.record_sum {
69 self.sum += v;
70 }
71
72 let abs_v = v.into_float().abs();
73
74 if abs_v == 0.0 {
75 self.zero_count += 1;
76 return;
77 }
78
79 let mut bin = self.get_bin(abs_v);
80
81 let v_is_negative = v < T::default();
82
83 let scale_delta = {
86 let bucket = if v_is_negative {
87 &self.neg_buckets
88 } else {
89 &self.pos_buckets
90 };
91
92 scale_change(
93 self.max_size,
94 bin,
95 bucket.start_bin,
96 bucket.counts.len() as i32,
97 )
98 };
99 if scale_delta > 0 {
100 if (self.scale - scale_delta as i8) < EXPO_MIN_SCALE {
101 opentelemetry::global::handle_error(MetricsError::Other(
104 "exponential histogram scale underflow".into(),
105 ));
106 return;
107 }
108 self.scale -= scale_delta as i8;
110 self.pos_buckets.downscale(scale_delta);
111 self.neg_buckets.downscale(scale_delta);
112
113 bin = self.get_bin(abs_v);
114 }
115
116 if v_is_negative {
117 self.neg_buckets.record(bin)
118 } else {
119 self.pos_buckets.record(bin)
120 }
121 }
122
123 fn get_bin(&self, v: f64) -> i32 {
125 let (frac, exp) = frexp(v);
126 if self.scale <= 0 {
127 let mut correction = 1;
129 if frac == 0.5 {
130 correction = 2;
133 }
134 return (exp - correction) >> -self.scale;
135 }
136 (exp << self.scale) + (frac.ln() * SCALE_FACTORS[self.scale as usize]) as i32 - 1
137 }
138}
139
140fn scale_change(max_size: i32, bin: i32, start_bin: i32, length: i32) -> u32 {
144 if length == 0 {
145 return 0;
147 }
148
149 let mut low = start_bin;
150 let mut high = bin;
151 if start_bin >= bin {
152 low = bin;
153 high = start_bin + length - 1;
154 }
155
156 let mut count = 0u32;
157 while high - low >= max_size {
158 low >>= 1;
159 high >>= 1;
160 count += 1;
161
162 if count > (EXPO_MAX_SCALE - EXPO_MIN_SCALE) as u32 {
163 return count;
164 }
165 }
166
167 count
168}
169
170static SCALE_FACTORS: Lazy<[f64; 21]> = Lazy::new(|| {
172 [
173 LOG2_E * 2f64.powi(0),
174 LOG2_E * 2f64.powi(1),
175 LOG2_E * 2f64.powi(2),
176 LOG2_E * 2f64.powi(3),
177 LOG2_E * 2f64.powi(4),
178 LOG2_E * 2f64.powi(5),
179 LOG2_E * 2f64.powi(6),
180 LOG2_E * 2f64.powi(7),
181 LOG2_E * 2f64.powi(8),
182 LOG2_E * 2f64.powi(9),
183 LOG2_E * 2f64.powi(10),
184 LOG2_E * 2f64.powi(11),
185 LOG2_E * 2f64.powi(12),
186 LOG2_E * 2f64.powi(13),
187 LOG2_E * 2f64.powi(14),
188 LOG2_E * 2f64.powi(15),
189 LOG2_E * 2f64.powi(16),
190 LOG2_E * 2f64.powi(17),
191 LOG2_E * 2f64.powi(18),
192 LOG2_E * 2f64.powi(19),
193 LOG2_E * 2f64.powi(20),
194 ]
195});
196
197#[inline(always)]
202fn frexp(x: f64) -> (f64, i32) {
203 let mut y = x.to_bits();
204 let ee = ((y >> 52) & 0x7ff) as i32;
205
206 if ee == 0 {
207 if x != 0.0 {
208 let x1p64 = f64::from_bits(0x43f0000000000000);
209 let (x, e) = frexp(x * x1p64);
210 return (x, e - 64);
211 }
212 return (x, 0);
213 } else if ee == 0x7ff {
214 return (x, 0);
215 }
216
217 let e = ee - 0x3fe;
218 y &= 0x800fffffffffffff;
219 y |= 0x3fe0000000000000;
220
221 (f64::from_bits(y), e)
222}
223
224#[derive(Default, Debug, PartialEq)]
226struct ExpoBuckets {
227 start_bin: i32,
228 counts: Vec<u64>,
229}
230
231impl ExpoBuckets {
232 fn record(&mut self, bin: i32) {
236 if self.counts.is_empty() {
237 self.counts = vec![1];
238 self.start_bin = bin;
239 return;
240 }
241
242 let end_bin = self.start_bin + self.counts.len() as i32 - 1;
243
244 if bin >= self.start_bin && bin <= end_bin {
246 self.counts[(bin - self.start_bin) as usize] += 1;
247 return;
248 }
249
250 if bin < self.start_bin {
252 let mut zeroes = vec![0; (end_bin - bin + 1) as usize];
253 let shift = (self.start_bin - bin) as usize;
254 zeroes[shift..].copy_from_slice(&self.counts);
255 self.counts = zeroes;
256 self.counts[0] = 1;
257 self.start_bin = bin;
258 } else if bin > end_bin {
259 if ((bin - self.start_bin) as usize) < self.counts.capacity() {
261 self.counts.resize((bin - self.start_bin + 1) as usize, 0);
262 self.counts[(bin - self.start_bin) as usize] = 1;
263 return;
264 }
265
266 self.counts.extend(
267 std::iter::repeat(0).take((bin - self.start_bin) as usize - self.counts.len() + 1),
268 );
269 self.counts[(bin - self.start_bin) as usize] = 1
270 }
271 }
272
273 fn downscale(&mut self, delta: u32) {
277 if self.counts.len() <= 1 || delta < 1 {
287 self.start_bin >>= delta;
288 return;
289 }
290
291 let steps = 1 << delta;
292 let mut offset = self.start_bin % steps;
293 offset = (offset + steps) % steps; for i in 1..self.counts.len() {
295 let idx = i + offset as usize;
296 if idx % steps as usize == 0 {
297 self.counts[idx / steps as usize] = self.counts[i];
298 continue;
299 }
300 self.counts[idx / steps as usize] += self.counts[i];
301 }
302
303 let last_idx = (self.counts.len() as i32 - 1 + offset) / steps;
304 self.counts = self.counts[..last_idx as usize + 1].to_vec();
305 self.start_bin >>= delta;
306 }
307}
308
309pub(crate) struct ExpoHistogram<T> {
315 record_sum: bool,
316 record_min_max: bool,
317 max_size: i32,
318 max_scale: i8,
319
320 values: Mutex<HashMap<AttributeSet, ExpoHistogramDataPoint<T>>>,
321
322 start: Mutex<SystemTime>,
323}
324
325impl<T: Number<T>> ExpoHistogram<T> {
326 pub(crate) fn new(
328 max_size: u32,
329 max_scale: i8,
330 record_min_max: bool,
331 record_sum: bool,
332 ) -> Self {
333 ExpoHistogram {
334 record_sum,
335 record_min_max,
336 max_size: max_size as i32,
337 max_scale,
338 values: Mutex::new(HashMap::default()),
339 start: Mutex::new(SystemTime::now()),
340 }
341 }
342
343 pub(crate) fn measure(&self, value: T, attrs: AttributeSet) {
344 let f_value = value.into_float();
345 if f_value.is_infinite() || f_value.is_nan() {
347 return;
348 }
349
350 if let Ok(mut values) = self.values.lock() {
351 let v = values.entry(attrs).or_insert_with(|| {
352 ExpoHistogramDataPoint::new(
353 self.max_size,
354 self.max_scale,
355 self.record_min_max,
356 self.record_sum,
357 )
358 });
359 v.record(value)
360 }
361 }
362
363 pub(crate) fn delta(
364 &self,
365 dest: Option<&mut dyn Aggregation>,
366 ) -> (usize, Option<Box<dyn Aggregation>>) {
367 let t = SystemTime::now();
368 let start = self
369 .start
370 .lock()
371 .map(|s| *s)
372 .unwrap_or_else(|_| SystemTime::now());
373
374 let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::ExponentialHistogram<T>>());
375 let mut new_agg = if h.is_none() {
376 Some(data::ExponentialHistogram {
377 data_points: vec![],
378 temporality: Temporality::Delta,
379 })
380 } else {
381 None
382 };
383 let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
384 h.temporality = Temporality::Delta;
385 h.data_points.clear();
386
387 let mut values = match self.values.lock() {
388 Ok(g) => g,
389 Err(_) => return (0, None),
390 };
391
392 let n = values.len();
393 if n > h.data_points.capacity() {
394 h.data_points.reserve_exact(n - h.data_points.capacity());
395 }
396
397 for (a, b) in values.drain() {
398 h.data_points.push(data::ExponentialHistogramDataPoint {
399 attributes: a
400 .iter()
401 .map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
402 .collect(),
403 start_time: start,
404 time: t,
405 count: b.count,
406 min: if self.record_min_max {
407 Some(b.min)
408 } else {
409 None
410 },
411 max: if self.record_min_max {
412 Some(b.max)
413 } else {
414 None
415 },
416 sum: if self.record_sum { b.sum } else { T::default() },
417 scale: b.scale,
418 zero_count: b.zero_count,
419 positive_bucket: data::ExponentialBucket {
420 offset: b.pos_buckets.start_bin,
421 counts: b.pos_buckets.counts.clone(),
422 },
423 negative_bucket: data::ExponentialBucket {
424 offset: b.neg_buckets.start_bin,
425 counts: b.neg_buckets.counts.clone(),
426 },
427 zero_threshold: 0.0,
428 exemplars: vec![],
429 });
430 }
431
432 if let Ok(mut start) = self.start.lock() {
434 *start = t;
435 }
436
437 (n, new_agg.map(|a| Box::new(a) as Box<_>))
438 }
439
440 pub(crate) fn cumulative(
441 &self,
442 dest: Option<&mut dyn Aggregation>,
443 ) -> (usize, Option<Box<dyn Aggregation>>) {
444 let t = SystemTime::now();
445 let start = self
446 .start
447 .lock()
448 .map(|s| *s)
449 .unwrap_or_else(|_| SystemTime::now());
450
451 let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::ExponentialHistogram<T>>());
452 let mut new_agg = if h.is_none() {
453 Some(data::ExponentialHistogram {
454 data_points: vec![],
455 temporality: Temporality::Cumulative,
456 })
457 } else {
458 None
459 };
460 let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
461 h.temporality = Temporality::Cumulative;
462
463 let values = match self.values.lock() {
464 Ok(g) => g,
465 Err(_) => return (0, None),
466 };
467 h.data_points.clear();
468
469 let n = values.len();
470 if n > h.data_points.capacity() {
471 h.data_points.reserve_exact(n - h.data_points.capacity());
472 }
473
474 for (a, b) in values.iter() {
479 h.data_points.push(data::ExponentialHistogramDataPoint {
480 attributes: a
481 .iter()
482 .map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
483 .collect(),
484 start_time: start,
485 time: t,
486 count: b.count,
487 min: if self.record_min_max {
488 Some(b.min)
489 } else {
490 None
491 },
492 max: if self.record_min_max {
493 Some(b.max)
494 } else {
495 None
496 },
497 sum: if self.record_sum { b.sum } else { T::default() },
498 scale: b.scale,
499 zero_count: b.zero_count,
500 positive_bucket: data::ExponentialBucket {
501 offset: b.pos_buckets.start_bin,
502 counts: b.pos_buckets.counts.clone(),
503 },
504 negative_bucket: data::ExponentialBucket {
505 offset: b.neg_buckets.start_bin,
506 counts: b.neg_buckets.counts.clone(),
507 },
508 zero_threshold: 0.0,
509 exemplars: vec![],
510 });
511 }
512
513 (n, new_agg.map(|a| Box::new(a) as Box<_>))
514 }
515}
516
517#[cfg(test)]
518mod tests {
519 use std::ops::Neg;
520
521 use opentelemetry::KeyValue;
522
523 use crate::metrics::internal::{self, AggregateBuilder};
524
525 use super::*;
526
527 #[test]
528 fn test_expo_histogram_data_point_record() {
529 run_data_point_record::<f64>();
530 run_data_point_record_f64();
531 run_min_max_sum_f64();
532 run_min_max_sum::<i64>();
533 run_min_max_sum::<u64>();
534 run_data_point_record::<i64>();
535 }
536
537 fn run_data_point_record<T: Number<T> + Neg<Output = T> + From<u32>>() {
538 struct TestCase<T> {
539 max_size: i32,
540 values: Vec<T>,
541 expected_buckets: ExpoBuckets,
542 expected_scale: i8,
543 }
544 let test_cases: Vec<TestCase<T>> = vec![
545 TestCase {
546 max_size: 4,
547 values: vec![2, 4, 1].into_iter().map(Into::into).collect(),
548 expected_buckets: ExpoBuckets {
549 start_bin: -1,
550 counts: vec![1, 1, 1],
551 },
552 expected_scale: 0,
553 },
554 TestCase {
555 max_size: 4,
556 values: vec![4, 4, 4, 2, 16, 1]
557 .into_iter()
558 .map(Into::into)
559 .collect(),
560 expected_buckets: ExpoBuckets {
561 start_bin: -1,
562 counts: vec![1, 4, 1],
563 },
564 expected_scale: -1,
565 },
566 TestCase {
567 max_size: 2,
568 values: vec![1, 2, 4].into_iter().map(Into::into).collect(),
569 expected_buckets: ExpoBuckets {
570 start_bin: -1,
571 counts: vec![1, 2],
572 },
573 expected_scale: -1,
574 },
575 TestCase {
576 max_size: 2,
577 values: vec![1, 4, 2].into_iter().map(Into::into).collect(),
578 expected_buckets: ExpoBuckets {
579 start_bin: -1,
580 counts: vec![1, 2],
581 },
582 expected_scale: -1,
583 },
584 TestCase {
585 max_size: 2,
586 values: vec![2, 4, 1].into_iter().map(Into::into).collect(),
587 expected_buckets: ExpoBuckets {
588 start_bin: -1,
589 counts: vec![1, 2],
590 },
591 expected_scale: -1,
592 },
593 TestCase {
594 max_size: 2,
595 values: vec![2, 1, 4].into_iter().map(Into::into).collect(),
596 expected_buckets: ExpoBuckets {
597 start_bin: -1,
598 counts: vec![1, 2],
599 },
600 expected_scale: -1,
601 },
602 TestCase {
603 max_size: 2,
604 values: vec![4, 1, 2].into_iter().map(Into::into).collect(),
605 expected_buckets: ExpoBuckets {
606 start_bin: -1,
607 counts: vec![1, 2],
608 },
609 expected_scale: -1,
610 },
611 TestCase {
612 max_size: 2,
613 values: vec![4, 2, 1].into_iter().map(Into::into).collect(),
614 expected_buckets: ExpoBuckets {
615 start_bin: -1,
616 counts: vec![1, 2],
617 },
618 expected_scale: -1,
619 },
620 ];
621
622 for test in test_cases {
623 let mut dp = ExpoHistogramDataPoint::<T>::new(test.max_size, 20, true, true);
624 for v in test.values {
625 dp.record(v);
626 dp.record(-v);
627 }
628
629 assert_eq!(test.expected_buckets, dp.pos_buckets, "positive buckets");
630 assert_eq!(test.expected_buckets, dp.neg_buckets, "negative buckets");
631 assert_eq!(test.expected_scale, dp.scale, "scale");
632 }
633 }
634
635 fn run_min_max_sum_f64() {
636 let alice = AttributeSet::from(&[KeyValue::new("user", "alice")][..]);
637 struct Expected {
638 min: f64,
639 max: f64,
640 sum: f64,
641 count: usize,
642 }
643 impl Expected {
644 fn new(min: f64, max: f64, sum: f64, count: usize) -> Self {
645 Expected {
646 min,
647 max,
648 sum,
649 count,
650 }
651 }
652 }
653 struct TestCase {
654 values: Vec<f64>,
655 expected: Expected,
656 }
657
658 let test_cases = vec![
659 TestCase {
660 values: vec![2.0, 4.0, 1.0],
661 expected: Expected::new(1.0, 4.0, 7.0, 3),
662 },
663 TestCase {
664 values: vec![2.0, 4.0, 1.0, f64::INFINITY],
665 expected: Expected::new(1.0, 4.0, 7.0, 3),
666 },
667 TestCase {
668 values: vec![2.0, 4.0, 1.0, -f64::INFINITY],
669 expected: Expected::new(1.0, 4.0, 7.0, 3),
670 },
671 TestCase {
672 values: vec![2.0, 4.0, 1.0, f64::NAN],
673 expected: Expected::new(1.0, 4.0, 7.0, 3),
674 },
675 TestCase {
676 values: vec![4.0, 4.0, 4.0, 2.0, 16.0, 1.0],
677 expected: Expected::new(1.0, 16.0, 31.0, 6),
678 },
679 ];
680
681 for test in test_cases {
682 let h = ExpoHistogram::new(4, 20, true, true);
683 for v in test.values {
684 h.measure(v, alice.clone());
685 }
686 let values = h.values.lock().unwrap();
687 let dp = values.get(&alice).unwrap();
688
689 assert_eq!(test.expected.max, dp.max);
690 assert_eq!(test.expected.min, dp.min);
691 assert_eq!(test.expected.sum, dp.sum);
692 assert_eq!(test.expected.count, dp.count);
693 }
694 }
695
696 fn run_min_max_sum<T: Number<T> + From<u32>>() {
697 let alice = AttributeSet::from(&[KeyValue::new("user", "alice")][..]);
698 struct Expected<T> {
699 min: T,
700 max: T,
701 sum: T,
702 count: usize,
703 }
704 impl<T: Number<T>> Expected<T> {
705 fn new(min: T, max: T, sum: T, count: usize) -> Self {
706 Expected {
707 min,
708 max,
709 sum,
710 count,
711 }
712 }
713 }
714 struct TestCase<T> {
715 values: Vec<T>,
716 expected: Expected<T>,
717 }
718 let test_cases: Vec<TestCase<T>> = vec![
719 TestCase {
720 values: vec![2, 4, 1].into_iter().map(Into::into).collect(),
721 expected: Expected::new(1.into(), 4.into(), 7.into(), 3),
722 },
723 TestCase {
724 values: vec![4, 4, 4, 2, 16, 1]
725 .into_iter()
726 .map(Into::into)
727 .collect(),
728 expected: Expected::new(1.into(), 16.into(), 31.into(), 6),
729 },
730 ];
731
732 for test in test_cases {
733 let h = ExpoHistogram::new(4, 20, true, true);
734 for v in test.values {
735 h.measure(v, alice.clone());
736 }
737 let values = h.values.lock().unwrap();
738 let dp = values.get(&alice).unwrap();
739
740 assert_eq!(test.expected.max, dp.max);
741 assert_eq!(test.expected.min, dp.min);
742 assert_eq!(test.expected.sum, dp.sum);
743 assert_eq!(test.expected.count, dp.count);
744 }
745 }
746
747 fn run_data_point_record_f64() {
748 struct TestCase {
749 max_size: i32,
750 values: Vec<f64>,
751 expected_buckets: ExpoBuckets,
752 expected_scale: i8,
753 }
754
755 let test_cases = vec![
756 TestCase {
757 max_size: 4,
758 values: vec![2.0, 2.0, 2.0, 1.0, 8.0, 0.5],
759 expected_buckets: ExpoBuckets {
760 start_bin: -1,
761 counts: vec![2, 3, 1],
762 },
763 expected_scale: -1,
764 },
765 TestCase {
766 max_size: 2,
767 values: vec![1.0, 0.5, 2.0],
768 expected_buckets: ExpoBuckets {
769 start_bin: -1,
770 counts: vec![2, 1],
771 },
772 expected_scale: -1,
773 },
774 TestCase {
775 max_size: 2,
776 values: vec![1.0, 2.0, 0.5],
777 expected_buckets: ExpoBuckets {
778 start_bin: -1,
779 counts: vec![2, 1],
780 },
781 expected_scale: -1,
782 },
783 TestCase {
784 max_size: 2,
785 values: vec![2.0, 0.5, 1.0],
786 expected_buckets: ExpoBuckets {
787 start_bin: -1,
788 counts: vec![2, 1],
789 },
790 expected_scale: -1,
791 },
792 TestCase {
793 max_size: 2,
794 values: vec![2.0, 1.0, 0.5],
795 expected_buckets: ExpoBuckets {
796 start_bin: -1,
797 counts: vec![2, 1],
798 },
799 expected_scale: -1,
800 },
801 TestCase {
802 max_size: 2,
803 values: vec![0.5, 1.0, 2.0],
804 expected_buckets: ExpoBuckets {
805 start_bin: -1,
806 counts: vec![2, 1],
807 },
808 expected_scale: -1,
809 },
810 TestCase {
811 max_size: 2,
812 values: vec![0.5, 2.0, 1.0],
813 expected_buckets: ExpoBuckets {
814 start_bin: -1,
815 counts: vec![2, 1],
816 },
817 expected_scale: -1,
818 },
819 ];
820 for test in test_cases {
821 let mut dp = ExpoHistogramDataPoint::new(test.max_size, 20, true, true);
822 for v in test.values {
823 dp.record(v);
824 dp.record(-v);
825 }
826
827 assert_eq!(test.expected_buckets, dp.pos_buckets);
828 assert_eq!(test.expected_buckets, dp.neg_buckets);
829 assert_eq!(test.expected_scale, dp.scale);
830 }
831 }
832
833 #[test]
834 fn data_point_record_limits() {
835 let mut fdp = ExpoHistogramDataPoint::new(4, 20, true, true);
839 fdp.record(f64::MAX);
840
841 assert_eq!(
842 fdp.pos_buckets.start_bin, 1073741823,
843 "start bin does not match for large f64 values",
844 );
845
846 let mut fdp = ExpoHistogramDataPoint::new(4, 20, true, true);
847 fdp.record(f64::MIN_POSITIVE);
848
849 assert_eq!(
850 fdp.pos_buckets.start_bin, -1071644673,
851 "start bin does not match for small positive values",
852 );
853
854 let mut idp = ExpoHistogramDataPoint::new(4, 20, true, true);
855 idp.record(i64::MAX);
856
857 assert_eq!(
858 idp.pos_buckets.start_bin, 66060287,
859 "start bin does not match for max i64 values",
860 );
861 }
862
863 #[test]
864 fn expo_bucket_downscale() {
865 struct TestCase {
866 name: &'static str,
867 bucket: ExpoBuckets,
868 scale: i8,
869 want: ExpoBuckets,
870 }
871
872 let test_cases = vec![
873 TestCase {
874 name: "Empty bucket",
875 bucket: ExpoBuckets {
876 start_bin: 0,
877 counts: vec![],
878 },
879 scale: 3,
880 want: ExpoBuckets {
881 start_bin: 0,
882 counts: vec![],
883 },
884 },
885 TestCase {
886 name: "1 size bucket",
887 bucket: ExpoBuckets {
888 start_bin: 50,
889 counts: vec![7],
890 },
891 scale: 4,
892 want: ExpoBuckets {
893 start_bin: 3,
894 counts: vec![7],
895 },
896 },
897 TestCase {
898 name: "zero scale",
899 bucket: ExpoBuckets {
900 start_bin: 50,
901 counts: vec![7, 5],
902 },
903 scale: 0,
904 want: ExpoBuckets {
905 start_bin: 50,
906 counts: vec![7, 5],
907 },
908 },
909 TestCase {
910 name: "aligned bucket scale 1",
911 bucket: ExpoBuckets {
912 start_bin: 0,
913 counts: vec![1, 2, 3, 4, 5, 6],
914 },
915 scale: 1,
916 want: ExpoBuckets {
917 start_bin: 0,
918 counts: vec![3, 7, 11],
919 },
920 },
921 TestCase {
922 name: "aligned bucket scale 2",
923 bucket: ExpoBuckets {
924 start_bin: 0,
925 counts: vec![1, 2, 3, 4, 5, 6],
926 },
927 scale: 2,
928 want: ExpoBuckets {
929 start_bin: 0,
930 counts: vec![10, 11],
931 },
932 },
933 TestCase {
934 name: "aligned bucket scale 3",
935 bucket: ExpoBuckets {
936 start_bin: 0,
937 counts: vec![1, 2, 3, 4, 5, 6],
938 },
939 scale: 3,
940 want: ExpoBuckets {
941 start_bin: 0,
942 counts: vec![21],
943 },
944 },
945 TestCase {
946 name: "unaligned bucket scale 1",
947 bucket: ExpoBuckets {
948 start_bin: 5,
949 counts: vec![1, 2, 3, 4, 5, 6],
950 }, scale: 1,
952 want: ExpoBuckets {
953 start_bin: 2,
954 counts: vec![1, 5, 9, 6],
955 }, },
957 TestCase {
958 name: "unaligned bucket scale 2",
959 bucket: ExpoBuckets {
960 start_bin: 7,
961 counts: vec![1, 2, 3, 4, 5, 6],
962 }, scale: 2,
964 want: ExpoBuckets {
965 start_bin: 1,
966 counts: vec![1, 14, 6],
967 }, },
969 TestCase {
970 name: "unaligned bucket scale 3",
971 bucket: ExpoBuckets {
972 start_bin: 3,
973 counts: vec![1, 2, 3, 4, 5, 6],
974 }, scale: 3,
976 want: ExpoBuckets {
977 start_bin: 0,
978 counts: vec![15, 6],
979 }, },
981 TestCase {
982 name: "unaligned bucket scale 1",
983 bucket: ExpoBuckets {
984 start_bin: 1,
985 counts: vec![1, 0, 1],
986 },
987 scale: 1,
988 want: ExpoBuckets {
989 start_bin: 0,
990 counts: vec![1, 1],
991 },
992 },
993 TestCase {
994 name: "negative start_bin",
995 bucket: ExpoBuckets {
996 start_bin: -1,
997 counts: vec![1, 0, 3],
998 },
999 scale: 1,
1000 want: ExpoBuckets {
1001 start_bin: -1,
1002 counts: vec![1, 3],
1003 },
1004 },
1005 TestCase {
1006 name: "negative start_bin 2",
1007 bucket: ExpoBuckets {
1008 start_bin: -4,
1009 counts: vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
1010 },
1011 scale: 1,
1012 want: ExpoBuckets {
1013 start_bin: -2,
1014 counts: vec![3, 7, 11, 15, 19],
1015 },
1016 },
1017 ];
1018 for mut test in test_cases {
1019 test.bucket.downscale(test.scale as u32);
1020 assert_eq!(test.want, test.bucket, "{}", test.name);
1021 }
1022 }
1023
1024 #[test]
1025 fn expo_bucket_record() {
1026 struct TestCase {
1027 name: &'static str,
1028 bucket: ExpoBuckets,
1029 bin: i32,
1030 want: ExpoBuckets,
1031 }
1032
1033 let test_cases = vec![
1034 TestCase {
1035 name: "Empty bucket creates first count",
1036 bucket: ExpoBuckets {
1037 start_bin: 0,
1038 counts: vec![],
1039 },
1040 bin: -5,
1041 want: ExpoBuckets {
1042 start_bin: -5,
1043 counts: vec![1],
1044 },
1045 },
1046 TestCase {
1047 name: "Bin is in the bucket",
1048 bucket: ExpoBuckets {
1049 start_bin: 3,
1050 counts: vec![1, 2, 3, 4, 5, 6],
1051 },
1052 bin: 5,
1053 want: ExpoBuckets {
1054 start_bin: 3,
1055 counts: vec![1, 2, 4, 4, 5, 6],
1056 },
1057 },
1058 TestCase {
1059 name: "Bin is before the start of the bucket",
1060 bucket: ExpoBuckets {
1061 start_bin: 1,
1062 counts: vec![1, 2, 3, 4, 5, 6],
1063 },
1064 bin: -2,
1065 want: ExpoBuckets {
1066 start_bin: -2,
1067 counts: vec![1, 0, 0, 1, 2, 3, 4, 5, 6],
1068 },
1069 },
1070 TestCase {
1071 name: "Bin is after the end of the bucket",
1072 bucket: ExpoBuckets {
1073 start_bin: -2,
1074 counts: vec![1, 2, 3, 4, 5, 6],
1075 },
1076 bin: 4,
1077 want: ExpoBuckets {
1078 start_bin: -2,
1079 counts: vec![1, 2, 3, 4, 5, 6, 1],
1080 },
1081 },
1082 ];
1083
1084 for mut test in test_cases {
1085 test.bucket.record(test.bin);
1086 assert_eq!(test.want, test.bucket, "{}", test.name);
1087 }
1088 }
1089
1090 #[test]
1091 fn scale_change_rescaling() {
1092 struct Args {
1093 bin: i32,
1094 start_bin: i32,
1095 length: i32,
1096 max_size: i32,
1097 }
1098 struct TestCase {
1099 name: &'static str,
1100 args: Args,
1101 want: u32,
1102 }
1103 let test_cases = vec![
1104 TestCase {
1105 name: "if length is 0, no rescale is needed",
1106 args: Args {
1108 bin: 5,
1109 start_bin: 0,
1110 length: 0,
1111 max_size: 4,
1112 },
1113 want: 0,
1114 },
1115 TestCase {
1116 name: "if bin is between start, and the end, no rescale needed",
1117 args: Args {
1119 bin: 5,
1120 start_bin: -1,
1121 length: 10,
1122 max_size: 20,
1123 },
1124 want: 0,
1125 },
1126 TestCase {
1127 name: "if [bin,... end].len() > max_size, rescale needed",
1128 args: Args {
1130 bin: 5,
1131 start_bin: 8,
1132 length: 3,
1133 max_size: 5,
1134 },
1135 want: 1,
1136 },
1137 TestCase {
1138 name: "if [start, ..., bin].len() > max_size, rescale needed",
1139 args: Args {
1141 bin: 7,
1142 start_bin: 2,
1143 length: 3,
1144 max_size: 5,
1145 },
1146 want: 1,
1147 },
1148 TestCase {
1149 name: "if [start, ..., bin].len() > max_size, rescale needed",
1150 args: Args {
1152 bin: 13,
1153 start_bin: 2,
1154 length: 3,
1155 max_size: 5,
1156 },
1157 want: 2,
1158 },
1159 TestCase {
1160 name: "It should not hang if it will never be able to rescale",
1161 args: Args {
1162 bin: 1,
1163 start_bin: -1,
1164 length: 1,
1165 max_size: 1,
1166 },
1167 want: 31,
1168 },
1169 ];
1170
1171 for test in test_cases {
1172 let got = scale_change(
1173 test.args.max_size,
1174 test.args.bin,
1175 test.args.start_bin,
1176 test.args.length,
1177 );
1178 assert_eq!(got, test.want, "incorrect scale change, {}", test.name);
1179 }
1180 }
1181
1182 #[test]
1183 fn sub_normal() {
1184 let want = ExpoHistogramDataPoint {
1185 max_size: 4,
1186 count: 3,
1187 min: f64::MIN_POSITIVE,
1188 max: f64::MIN_POSITIVE,
1189 sum: 3.0 * f64::MIN_POSITIVE,
1190
1191 scale: 20,
1192 pos_buckets: ExpoBuckets {
1193 start_bin: -1071644673,
1194 counts: vec![3],
1195 },
1196 neg_buckets: ExpoBuckets {
1197 start_bin: 0,
1198 counts: vec![],
1199 },
1200 record_min_max: true,
1201 record_sum: true,
1202 zero_count: 0,
1203 };
1204
1205 let mut ehdp = ExpoHistogramDataPoint::new(4, 20, true, true);
1206 ehdp.record(f64::MIN_POSITIVE);
1207 ehdp.record(f64::MIN_POSITIVE);
1208 ehdp.record(f64::MIN_POSITIVE);
1209
1210 assert_eq!(want, ehdp);
1211 }
1212
1213 #[test]
1214 fn hist_aggregations() {
1215 hist_aggregation::<i64>();
1216 hist_aggregation::<u64>();
1217 hist_aggregation::<f64>();
1218 }
1219
1220 fn box_val<T>(
1221 (m, ca): (impl internal::Measure<T>, impl internal::ComputeAggregation),
1222 ) -> (
1223 Box<dyn internal::Measure<T>>,
1224 Box<dyn internal::ComputeAggregation>,
1225 ) {
1226 (Box::new(m), Box::new(ca))
1227 }
1228
1229 fn hist_aggregation<T: Number<T> + From<u32>>() {
1230 let max_size = 4;
1231 let max_scale = 20;
1232 let record_min_max = true;
1233 let record_sum = true;
1234
1235 #[allow(clippy::type_complexity)]
1236 struct TestCase<T> {
1237 name: &'static str,
1238 build: Box<
1239 dyn Fn() -> (
1240 Box<dyn internal::Measure<T>>,
1241 Box<dyn internal::ComputeAggregation>,
1242 ),
1243 >,
1244 input: Vec<Vec<T>>,
1245 want: data::ExponentialHistogram<T>,
1246 want_count: usize,
1247 }
1248 let test_cases: Vec<TestCase<T>> = vec![
1249 TestCase {
1250 name: "Delta Single",
1251 build: Box::new(move || {
1252 box_val(
1253 AggregateBuilder::new(Some(Temporality::Delta), None)
1254 .exponential_bucket_histogram(
1255 max_size,
1256 max_scale,
1257 record_min_max,
1258 record_sum,
1259 ),
1260 )
1261 }),
1262 input: vec![vec![4, 4, 4, 2, 16, 1]
1263 .into_iter()
1264 .map(Into::into)
1265 .collect()],
1266 want: data::ExponentialHistogram {
1267 temporality: Temporality::Delta,
1268 data_points: vec![data::ExponentialHistogramDataPoint {
1269 attributes: vec![],
1270 count: 6,
1271 min: Some(1.into()),
1272 max: Some(16.into()),
1273 sum: 31.into(),
1274 start_time: SystemTime::now(),
1275 time: SystemTime::now(),
1276 scale: -1,
1277 positive_bucket: data::ExponentialBucket {
1278 offset: -1,
1279 counts: vec![1, 4, 1],
1280 },
1281 negative_bucket: data::ExponentialBucket {
1282 offset: 0,
1283 counts: vec![],
1284 },
1285 exemplars: vec![],
1286 zero_threshold: 0.0,
1287 zero_count: 0,
1288 }],
1289 },
1290 want_count: 1,
1291 },
1292 TestCase {
1293 name: "Cumulative Single",
1294 build: Box::new(move || {
1295 box_val(
1296 internal::AggregateBuilder::new(Some(Temporality::Cumulative), None)
1297 .exponential_bucket_histogram(
1298 max_size,
1299 max_scale,
1300 record_min_max,
1301 record_sum,
1302 ),
1303 )
1304 }),
1305 input: vec![vec![4, 4, 4, 2, 16, 1]
1306 .into_iter()
1307 .map(Into::into)
1308 .collect()],
1309 want: data::ExponentialHistogram {
1310 temporality: Temporality::Cumulative,
1311 data_points: vec![data::ExponentialHistogramDataPoint {
1312 attributes: vec![],
1313 count: 6,
1314 min: Some(1.into()),
1315 max: Some(16.into()),
1316 sum: 31.into(),
1317 scale: -1,
1318 positive_bucket: data::ExponentialBucket {
1319 offset: -1,
1320 counts: vec![1, 4, 1],
1321 },
1322 start_time: SystemTime::now(),
1323 time: SystemTime::now(),
1324 negative_bucket: data::ExponentialBucket {
1325 offset: 0,
1326 counts: vec![],
1327 },
1328 exemplars: vec![],
1329 zero_threshold: 0.0,
1330 zero_count: 0,
1331 }],
1332 },
1333 want_count: 1,
1334 },
1335 TestCase {
1336 name: "Delta Multiple",
1337 build: Box::new(move || {
1338 box_val(
1339 internal::AggregateBuilder::new(Some(Temporality::Delta), None)
1340 .exponential_bucket_histogram(
1341 max_size,
1342 max_scale,
1343 record_min_max,
1344 record_sum,
1345 ),
1346 )
1347 }),
1348 input: vec![
1349 vec![2, 3, 8].into_iter().map(Into::into).collect(),
1350 vec![4, 4, 4, 2, 16, 1]
1351 .into_iter()
1352 .map(Into::into)
1353 .collect(),
1354 ],
1355 want: data::ExponentialHistogram {
1356 temporality: Temporality::Delta,
1357 data_points: vec![data::ExponentialHistogramDataPoint {
1358 attributes: vec![],
1359 count: 6,
1360 min: Some(1.into()),
1361 max: Some(16.into()),
1362 sum: 31.into(),
1363 scale: -1,
1364 positive_bucket: data::ExponentialBucket {
1365 offset: -1,
1366 counts: vec![1, 4, 1],
1367 },
1368 start_time: SystemTime::now(),
1369 time: SystemTime::now(),
1370 negative_bucket: data::ExponentialBucket {
1371 offset: 0,
1372 counts: vec![],
1373 },
1374 exemplars: vec![],
1375 zero_threshold: 0.0,
1376 zero_count: 0,
1377 }],
1378 },
1379 want_count: 1,
1380 },
1381 TestCase {
1382 name: "Cumulative Multiple ",
1383 build: Box::new(move || {
1384 box_val(
1385 internal::AggregateBuilder::new(Some(Temporality::Cumulative), None)
1386 .exponential_bucket_histogram(
1387 max_size,
1388 max_scale,
1389 record_min_max,
1390 record_sum,
1391 ),
1392 )
1393 }),
1394 input: vec![
1395 vec![2, 3, 8].into_iter().map(Into::into).collect(),
1396 vec![4, 4, 4, 2, 16, 1]
1397 .into_iter()
1398 .map(Into::into)
1399 .collect(),
1400 ],
1401 want: data::ExponentialHistogram {
1402 temporality: Temporality::Cumulative,
1403 data_points: vec![data::ExponentialHistogramDataPoint {
1404 count: 9,
1405 min: Some(1.into()),
1406 max: Some(16.into()),
1407 sum: 44.into(),
1408 scale: -1,
1409 positive_bucket: data::ExponentialBucket {
1410 offset: -1,
1411 counts: vec![1, 6, 2],
1412 },
1413 attributes: vec![],
1414 start_time: SystemTime::now(),
1415 time: SystemTime::now(),
1416 negative_bucket: data::ExponentialBucket {
1417 offset: 0,
1418 counts: vec![],
1419 },
1420 exemplars: vec![],
1421 zero_threshold: 0.0,
1422 zero_count: 0,
1423 }],
1424 },
1425 want_count: 1,
1426 },
1427 ];
1428
1429 for test in test_cases {
1430 let (in_fn, out_fn) = (test.build)();
1431
1432 let mut got: Box<dyn data::Aggregation> = Box::new(data::ExponentialHistogram::<T> {
1433 data_points: vec![],
1434 temporality: Temporality::Delta,
1435 });
1436 let mut count = 0;
1437 for n in test.input {
1438 for v in n {
1439 in_fn.call(v, AttributeSet::default())
1440 }
1441 count = out_fn.call(Some(got.as_mut())).0
1442 }
1443
1444 assert_aggregation_eq::<T>(Box::new(test.want), got, true, test.name);
1445 assert_eq!(test.want_count, count, "{}", test.name);
1446 }
1447 }
1448
1449 fn assert_aggregation_eq<T: Number<T> + PartialEq>(
1450 a: Box<dyn Aggregation>,
1451 b: Box<dyn Aggregation>,
1452 ignore_timestamp: bool,
1453 test_name: &'static str,
1454 ) {
1455 assert_eq!(
1456 a.as_any().type_id(),
1457 b.as_any().type_id(),
1458 "{} Aggregation types not equal",
1459 test_name
1460 );
1461
1462 if let Some(a) = a.as_any().downcast_ref::<data::Gauge<T>>() {
1463 let b = b.as_any().downcast_ref::<data::Gauge<T>>().unwrap();
1464 assert_eq!(
1465 a.data_points.len(),
1466 b.data_points.len(),
1467 "{} gauge counts",
1468 test_name
1469 );
1470 for (a, b) in a.data_points.iter().zip(b.data_points.iter()) {
1471 assert_data_points_eq(
1472 a,
1473 b,
1474 ignore_timestamp,
1475 "mismatching gauge data points",
1476 test_name,
1477 );
1478 }
1479 } else if let Some(a) = a.as_any().downcast_ref::<data::Sum<T>>() {
1480 let b = b.as_any().downcast_ref::<data::Sum<T>>().unwrap();
1481 assert_eq!(
1482 a.temporality, b.temporality,
1483 "{} mismatching sum temporality",
1484 test_name
1485 );
1486 assert_eq!(
1487 a.is_monotonic, b.is_monotonic,
1488 "{} mismatching sum monotonicity",
1489 test_name,
1490 );
1491 assert_eq!(
1492 a.data_points.len(),
1493 b.data_points.len(),
1494 "{} sum counts",
1495 test_name
1496 );
1497 for (a, b) in a.data_points.iter().zip(b.data_points.iter()) {
1498 assert_data_points_eq(
1499 a,
1500 b,
1501 ignore_timestamp,
1502 "mismatching sum data points",
1503 test_name,
1504 );
1505 }
1506 } else if let Some(a) = a.as_any().downcast_ref::<data::Histogram<T>>() {
1507 let b = b.as_any().downcast_ref::<data::Histogram<T>>().unwrap();
1508 assert_eq!(
1509 a.temporality, b.temporality,
1510 "{}: mismatching hist temporality",
1511 test_name
1512 );
1513 assert_eq!(
1514 a.data_points.len(),
1515 b.data_points.len(),
1516 "{} hist counts",
1517 test_name
1518 );
1519 for (a, b) in a.data_points.iter().zip(b.data_points.iter()) {
1520 assert_hist_data_points_eq(
1521 a,
1522 b,
1523 ignore_timestamp,
1524 "mismatching hist data points",
1525 test_name,
1526 );
1527 }
1528 } else if let Some(a) = a.as_any().downcast_ref::<data::ExponentialHistogram<T>>() {
1529 let b = b
1530 .as_any()
1531 .downcast_ref::<data::ExponentialHistogram<T>>()
1532 .unwrap();
1533 assert_eq!(
1534 a.temporality, b.temporality,
1535 "{} mismatching hist temporality",
1536 test_name
1537 );
1538 assert_eq!(
1539 a.data_points.len(),
1540 b.data_points.len(),
1541 "{} hist counts",
1542 test_name
1543 );
1544 for (a, b) in a.data_points.iter().zip(b.data_points.iter()) {
1545 assert_exponential_hist_data_points_eq(
1546 a,
1547 b,
1548 ignore_timestamp,
1549 "mismatching hist data points",
1550 test_name,
1551 );
1552 }
1553 } else {
1554 panic!("Aggregation of unknown types")
1555 }
1556 }
1557
1558 fn assert_data_points_eq<T: Number<T>>(
1559 a: &data::DataPoint<T>,
1560 b: &data::DataPoint<T>,
1561 ignore_timestamp: bool,
1562 message: &'static str,
1563 test_name: &'static str,
1564 ) {
1565 assert_eq!(
1566 a.attributes, b.attributes,
1567 "{}: {} attributes",
1568 test_name, message
1569 );
1570 assert_eq!(a.value, b.value, "{}: {} value", test_name, message);
1571
1572 if !ignore_timestamp {
1573 assert_eq!(
1574 a.start_time, b.start_time,
1575 "{}: {} start time",
1576 test_name, message
1577 );
1578 assert_eq!(a.time, b.time, "{}: {} time", test_name, message);
1579 }
1580 }
1581
1582 fn assert_hist_data_points_eq<T: Number<T>>(
1583 a: &data::HistogramDataPoint<T>,
1584 b: &data::HistogramDataPoint<T>,
1585 ignore_timestamp: bool,
1586 message: &'static str,
1587 test_name: &'static str,
1588 ) {
1589 assert_eq!(
1590 a.attributes, b.attributes,
1591 "{}: {} attributes",
1592 test_name, message
1593 );
1594 assert_eq!(a.count, b.count, "{}: {} count", test_name, message);
1595 assert_eq!(a.bounds, b.bounds, "{}: {} bounds", test_name, message);
1596 assert_eq!(
1597 a.bucket_counts, b.bucket_counts,
1598 "{}: {} bucket counts",
1599 test_name, message
1600 );
1601 assert_eq!(a.min, b.min, "{}: {} min", test_name, message);
1602 assert_eq!(a.max, b.max, "{}: {} max", test_name, message);
1603 assert_eq!(a.sum, b.sum, "{}: {} sum", test_name, message);
1604
1605 if !ignore_timestamp {
1606 assert_eq!(
1607 a.start_time, b.start_time,
1608 "{}: {} start time",
1609 test_name, message
1610 );
1611 assert_eq!(a.time, b.time, "{}: {} time", test_name, message);
1612 }
1613 }
1614
1615 fn assert_exponential_hist_data_points_eq<T: Number<T>>(
1616 a: &data::ExponentialHistogramDataPoint<T>,
1617 b: &data::ExponentialHistogramDataPoint<T>,
1618 ignore_timestamp: bool,
1619 message: &'static str,
1620 test_name: &'static str,
1621 ) {
1622 assert_eq!(
1623 a.attributes, b.attributes,
1624 "{}: {} attributes",
1625 test_name, message
1626 );
1627 assert_eq!(a.count, b.count, "{}: {} count", test_name, message);
1628 assert_eq!(a.min, b.min, "{}: {} min", test_name, message);
1629 assert_eq!(a.max, b.max, "{}: {} max", test_name, message);
1630 assert_eq!(a.sum, b.sum, "{}: {} sum", test_name, message);
1631
1632 assert_eq!(a.scale, b.scale, "{}: {} scale", test_name, message);
1633 assert_eq!(
1634 a.zero_count, b.zero_count,
1635 "{}: {} zeros",
1636 test_name, message
1637 );
1638
1639 assert_eq!(
1640 a.positive_bucket, b.positive_bucket,
1641 "{}: {} pos",
1642 test_name, message
1643 );
1644 assert_eq!(
1645 a.negative_bucket, b.negative_bucket,
1646 "{}: {} neg",
1647 test_name, message
1648 );
1649
1650 if !ignore_timestamp {
1651 assert_eq!(
1652 a.start_time, b.start_time,
1653 "{}: {} start time",
1654 test_name, message
1655 );
1656 assert_eq!(a.time, b.time, "{}: {} time", test_name, message);
1657 }
1658 }
1659}