opentelemetry_sdk/metrics/internal/
histogram.rs

1use std::{collections::HashMap, sync::Mutex, time::SystemTime};
2
3use crate::metrics::data::{self, Aggregation, Temporality};
4use crate::{metrics::data::HistogramDataPoint, metrics::AttributeSet};
5use opentelemetry::KeyValue;
6use opentelemetry::{global, metrics::MetricsError};
7
8use super::{
9    aggregate::{is_under_cardinality_limit, STREAM_OVERFLOW_ATTRIBUTE_SET},
10    Number,
11};
12
13#[derive(Default)]
14struct Buckets<T> {
15    counts: Vec<u64>,
16    count: u64,
17    total: T,
18    min: T,
19    max: T,
20}
21
22impl<T: Number<T>> Buckets<T> {
23    /// returns buckets with `n` bins.
24    fn new(n: usize) -> Buckets<T> {
25        Buckets {
26            counts: vec![0; n],
27            ..Default::default()
28        }
29    }
30
31    fn sum(&mut self, value: T) {
32        self.total += value;
33    }
34
35    fn bin(&mut self, idx: usize, value: T) {
36        self.counts[idx] += 1;
37        self.count += 1;
38        if value < self.min {
39            self.min = value;
40        } else if value > self.max {
41            self.max = value
42        }
43    }
44}
45
46/// Summarizes a set of measurements with explicitly defined buckets.
47struct HistValues<T> {
48    record_sum: bool,
49    bounds: Vec<f64>,
50    values: Mutex<HashMap<AttributeSet, Buckets<T>>>,
51}
52
53impl<T: Number<T>> HistValues<T> {
54    fn new(mut bounds: Vec<f64>, record_sum: bool) -> Self {
55        bounds.retain(|v| !v.is_nan());
56        bounds.sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out"));
57
58        HistValues {
59            record_sum,
60            bounds,
61            values: Mutex::new(Default::default()),
62        }
63    }
64}
65
66impl<T: Number<T>> HistValues<T> {
67    fn measure(&self, measurement: T, attrs: AttributeSet) {
68        let f = measurement.into_float();
69
70        // This search will return an index in the range `[0, bounds.len()]`, where
71        // it will return `bounds.len()` if value is greater than the last element
72        // of `bounds`. This aligns with the buckets in that the length of buckets
73        // is `bounds.len()+1`, with the last bucket representing:
74        // `(bounds[bounds.len()-1], +∞)`.
75        let idx = self.bounds.partition_point(|&x| x < f);
76
77        let mut values = match self.values.lock() {
78            Ok(guard) => guard,
79            Err(_) => return,
80        };
81        let size = values.len();
82
83        let b = if let Some(b) = values.get_mut(&attrs) {
84            b
85        } else {
86            // N+1 buckets. For example:
87            //
88            //   bounds = [0, 5, 10]
89            //
90            // Then,
91            //
92            //   buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞)
93            let mut b = Buckets::new(self.bounds.len() + 1);
94            // Ensure min and max are recorded values (not zero), for new buckets.
95            (b.min, b.max) = (measurement, measurement);
96
97            if is_under_cardinality_limit(size) {
98                values.entry(attrs).or_insert(b)
99            } else {
100                global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow.".into()));
101                values
102                    .entry(STREAM_OVERFLOW_ATTRIBUTE_SET.clone())
103                    .or_insert(b)
104            }
105        };
106
107        b.bin(idx, measurement);
108        if self.record_sum {
109            b.sum(measurement)
110        }
111    }
112}
113
114/// Summarizes a set of measurements as a histogram with explicitly defined
115/// buckets.
116pub(crate) struct Histogram<T> {
117    hist_values: HistValues<T>,
118    record_min_max: bool,
119    start: Mutex<SystemTime>,
120}
121
122impl<T: Number<T>> Histogram<T> {
123    pub(crate) fn new(boundaries: Vec<f64>, record_min_max: bool, record_sum: bool) -> Self {
124        Histogram {
125            hist_values: HistValues::new(boundaries, record_sum),
126            record_min_max,
127            start: Mutex::new(SystemTime::now()),
128        }
129    }
130
131    pub(crate) fn measure(&self, measurement: T, attrs: AttributeSet) {
132        self.hist_values.measure(measurement, attrs)
133    }
134
135    pub(crate) fn delta(
136        &self,
137        dest: Option<&mut dyn Aggregation>,
138    ) -> (usize, Option<Box<dyn Aggregation>>) {
139        let mut values = match self.hist_values.values.lock() {
140            Ok(guard) if !guard.is_empty() => guard,
141            _ => return (0, None),
142        };
143        let t = SystemTime::now();
144        let start = self
145            .start
146            .lock()
147            .map(|s| *s)
148            .unwrap_or_else(|_| SystemTime::now());
149        let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::Histogram<T>>());
150        let mut new_agg = if h.is_none() {
151            Some(data::Histogram {
152                data_points: vec![],
153                temporality: Temporality::Delta,
154            })
155        } else {
156            None
157        };
158        let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
159        h.temporality = Temporality::Delta;
160        h.data_points.clear();
161
162        let n = values.len();
163        if n > h.data_points.capacity() {
164            h.data_points.reserve_exact(n - h.data_points.capacity());
165        }
166
167        for (a, b) in values.drain() {
168            h.data_points.push(HistogramDataPoint {
169                attributes: a
170                    .iter()
171                    .map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
172                    .collect(),
173                start_time: start,
174                time: t,
175                count: b.count,
176                bounds: self.hist_values.bounds.clone(),
177                bucket_counts: b.counts.clone(),
178                sum: if self.hist_values.record_sum {
179                    b.total
180                } else {
181                    T::default()
182                },
183                min: if self.record_min_max {
184                    Some(b.min)
185                } else {
186                    None
187                },
188                max: if self.record_min_max {
189                    Some(b.max)
190                } else {
191                    None
192                },
193                exemplars: vec![],
194            });
195        }
196
197        // The delta collection cycle resets.
198        if let Ok(mut start) = self.start.lock() {
199            *start = t;
200        }
201
202        (n, new_agg.map(|a| Box::new(a) as Box<_>))
203    }
204
205    pub(crate) fn cumulative(
206        &self,
207        dest: Option<&mut dyn Aggregation>,
208    ) -> (usize, Option<Box<dyn Aggregation>>) {
209        let values = match self.hist_values.values.lock() {
210            Ok(guard) if !guard.is_empty() => guard,
211            _ => return (0, None),
212        };
213        let t = SystemTime::now();
214        let start = self
215            .start
216            .lock()
217            .map(|s| *s)
218            .unwrap_or_else(|_| SystemTime::now());
219        let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::Histogram<T>>());
220        let mut new_agg = if h.is_none() {
221            Some(data::Histogram {
222                data_points: vec![],
223                temporality: Temporality::Cumulative,
224            })
225        } else {
226            None
227        };
228        let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
229        h.temporality = Temporality::Cumulative;
230        h.data_points.clear();
231
232        let n = values.len();
233        if n > h.data_points.capacity() {
234            h.data_points.reserve_exact(n - h.data_points.capacity());
235        }
236
237        // TODO: This will use an unbounded amount of memory if there
238        // are unbounded number of attribute sets being aggregated. Attribute
239        // sets that become "stale" need to be forgotten so this will not
240        // overload the system.
241        for (a, b) in values.iter() {
242            h.data_points.push(HistogramDataPoint {
243                attributes: a
244                    .iter()
245                    .map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
246                    .collect(),
247                start_time: start,
248                time: t,
249                count: b.count,
250                bounds: self.hist_values.bounds.clone(),
251                bucket_counts: b.counts.clone(),
252                sum: if self.hist_values.record_sum {
253                    b.total
254                } else {
255                    T::default()
256                },
257                min: if self.record_min_max {
258                    Some(b.min)
259                } else {
260                    None
261                },
262                max: if self.record_min_max {
263                    Some(b.max)
264                } else {
265                    None
266                },
267                exemplars: vec![],
268            });
269        }
270
271        (n, new_agg.map(|a| Box::new(a) as Box<_>))
272    }
273}