opentelemetry_sdk/metrics/internal/
histogram.rs1use std::{collections::HashMap, sync::Mutex, time::SystemTime};
2
3use crate::metrics::data::{self, Aggregation, Temporality};
4use crate::{metrics::data::HistogramDataPoint, metrics::AttributeSet};
5use opentelemetry::KeyValue;
6use opentelemetry::{global, metrics::MetricsError};
7
8use super::{
9 aggregate::{is_under_cardinality_limit, STREAM_OVERFLOW_ATTRIBUTE_SET},
10 Number,
11};
12
13#[derive(Default)]
14struct Buckets<T> {
15 counts: Vec<u64>,
16 count: u64,
17 total: T,
18 min: T,
19 max: T,
20}
21
22impl<T: Number<T>> Buckets<T> {
23 fn new(n: usize) -> Buckets<T> {
25 Buckets {
26 counts: vec![0; n],
27 ..Default::default()
28 }
29 }
30
31 fn sum(&mut self, value: T) {
32 self.total += value;
33 }
34
35 fn bin(&mut self, idx: usize, value: T) {
36 self.counts[idx] += 1;
37 self.count += 1;
38 if value < self.min {
39 self.min = value;
40 } else if value > self.max {
41 self.max = value
42 }
43 }
44}
45
46struct HistValues<T> {
48 record_sum: bool,
49 bounds: Vec<f64>,
50 values: Mutex<HashMap<AttributeSet, Buckets<T>>>,
51}
52
53impl<T: Number<T>> HistValues<T> {
54 fn new(mut bounds: Vec<f64>, record_sum: bool) -> Self {
55 bounds.retain(|v| !v.is_nan());
56 bounds.sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out"));
57
58 HistValues {
59 record_sum,
60 bounds,
61 values: Mutex::new(Default::default()),
62 }
63 }
64}
65
66impl<T: Number<T>> HistValues<T> {
67 fn measure(&self, measurement: T, attrs: AttributeSet) {
68 let f = measurement.into_float();
69
70 let idx = self.bounds.partition_point(|&x| x < f);
76
77 let mut values = match self.values.lock() {
78 Ok(guard) => guard,
79 Err(_) => return,
80 };
81 let size = values.len();
82
83 let b = if let Some(b) = values.get_mut(&attrs) {
84 b
85 } else {
86 let mut b = Buckets::new(self.bounds.len() + 1);
94 (b.min, b.max) = (measurement, measurement);
96
97 if is_under_cardinality_limit(size) {
98 values.entry(attrs).or_insert(b)
99 } else {
100 global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow.".into()));
101 values
102 .entry(STREAM_OVERFLOW_ATTRIBUTE_SET.clone())
103 .or_insert(b)
104 }
105 };
106
107 b.bin(idx, measurement);
108 if self.record_sum {
109 b.sum(measurement)
110 }
111 }
112}
113
114pub(crate) struct Histogram<T> {
117 hist_values: HistValues<T>,
118 record_min_max: bool,
119 start: Mutex<SystemTime>,
120}
121
122impl<T: Number<T>> Histogram<T> {
123 pub(crate) fn new(boundaries: Vec<f64>, record_min_max: bool, record_sum: bool) -> Self {
124 Histogram {
125 hist_values: HistValues::new(boundaries, record_sum),
126 record_min_max,
127 start: Mutex::new(SystemTime::now()),
128 }
129 }
130
131 pub(crate) fn measure(&self, measurement: T, attrs: AttributeSet) {
132 self.hist_values.measure(measurement, attrs)
133 }
134
135 pub(crate) fn delta(
136 &self,
137 dest: Option<&mut dyn Aggregation>,
138 ) -> (usize, Option<Box<dyn Aggregation>>) {
139 let mut values = match self.hist_values.values.lock() {
140 Ok(guard) if !guard.is_empty() => guard,
141 _ => return (0, None),
142 };
143 let t = SystemTime::now();
144 let start = self
145 .start
146 .lock()
147 .map(|s| *s)
148 .unwrap_or_else(|_| SystemTime::now());
149 let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::Histogram<T>>());
150 let mut new_agg = if h.is_none() {
151 Some(data::Histogram {
152 data_points: vec![],
153 temporality: Temporality::Delta,
154 })
155 } else {
156 None
157 };
158 let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
159 h.temporality = Temporality::Delta;
160 h.data_points.clear();
161
162 let n = values.len();
163 if n > h.data_points.capacity() {
164 h.data_points.reserve_exact(n - h.data_points.capacity());
165 }
166
167 for (a, b) in values.drain() {
168 h.data_points.push(HistogramDataPoint {
169 attributes: a
170 .iter()
171 .map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
172 .collect(),
173 start_time: start,
174 time: t,
175 count: b.count,
176 bounds: self.hist_values.bounds.clone(),
177 bucket_counts: b.counts.clone(),
178 sum: if self.hist_values.record_sum {
179 b.total
180 } else {
181 T::default()
182 },
183 min: if self.record_min_max {
184 Some(b.min)
185 } else {
186 None
187 },
188 max: if self.record_min_max {
189 Some(b.max)
190 } else {
191 None
192 },
193 exemplars: vec![],
194 });
195 }
196
197 if let Ok(mut start) = self.start.lock() {
199 *start = t;
200 }
201
202 (n, new_agg.map(|a| Box::new(a) as Box<_>))
203 }
204
205 pub(crate) fn cumulative(
206 &self,
207 dest: Option<&mut dyn Aggregation>,
208 ) -> (usize, Option<Box<dyn Aggregation>>) {
209 let values = match self.hist_values.values.lock() {
210 Ok(guard) if !guard.is_empty() => guard,
211 _ => return (0, None),
212 };
213 let t = SystemTime::now();
214 let start = self
215 .start
216 .lock()
217 .map(|s| *s)
218 .unwrap_or_else(|_| SystemTime::now());
219 let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::Histogram<T>>());
220 let mut new_agg = if h.is_none() {
221 Some(data::Histogram {
222 data_points: vec![],
223 temporality: Temporality::Cumulative,
224 })
225 } else {
226 None
227 };
228 let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
229 h.temporality = Temporality::Cumulative;
230 h.data_points.clear();
231
232 let n = values.len();
233 if n > h.data_points.capacity() {
234 h.data_points.reserve_exact(n - h.data_points.capacity());
235 }
236
237 for (a, b) in values.iter() {
242 h.data_points.push(HistogramDataPoint {
243 attributes: a
244 .iter()
245 .map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
246 .collect(),
247 start_time: start,
248 time: t,
249 count: b.count,
250 bounds: self.hist_values.bounds.clone(),
251 bucket_counts: b.counts.clone(),
252 sum: if self.hist_values.record_sum {
253 b.total
254 } else {
255 T::default()
256 },
257 min: if self.record_min_max {
258 Some(b.min)
259 } else {
260 None
261 },
262 max: if self.record_min_max {
263 Some(b.max)
264 } else {
265 None
266 },
267 exemplars: vec![],
268 });
269 }
270
271 (n, new_agg.map(|a| Box::new(a) as Box<_>))
272 }
273}