opentelemetry_sdk/metrics/internal/
sum.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2use std::vec;
3use std::{
4    collections::HashMap,
5    sync::{Mutex, RwLock},
6    time::SystemTime,
7};
8
9use crate::metrics::data::{self, Aggregation, DataPoint, Temporality};
10use crate::metrics::AttributeSet;
11use opentelemetry::KeyValue;
12use opentelemetry::{global, metrics::MetricsError};
13
14use super::{
15    aggregate::{is_under_cardinality_limit, STREAM_OVERFLOW_ATTRIBUTE_SET},
16    AtomicTracker, Number,
17};
18
19/// The storage for sums.
20struct ValueMap<T: Number<T>> {
21    values: RwLock<HashMap<AttributeSet, T::AtomicTracker>>,
22    has_no_value_attribute_value: AtomicBool,
23    no_attribute_value: T::AtomicTracker,
24}
25
26impl<T: Number<T>> Default for ValueMap<T> {
27    fn default() -> Self {
28        ValueMap::new()
29    }
30}
31
32impl<T: Number<T>> ValueMap<T> {
33    fn new() -> Self {
34        ValueMap {
35            values: RwLock::new(HashMap::new()),
36            has_no_value_attribute_value: AtomicBool::new(false),
37            no_attribute_value: T::new_atomic_tracker(),
38        }
39    }
40}
41
42impl<T: Number<T>> ValueMap<T> {
43    fn measure(&self, measurement: T, attrs: AttributeSet) {
44        if attrs.is_empty() {
45            self.no_attribute_value.add(measurement);
46            self.has_no_value_attribute_value
47                .store(true, Ordering::Release);
48        } else if let Ok(values) = self.values.read() {
49            if let Some(value_to_update) = values.get(&attrs) {
50                value_to_update.add(measurement);
51                return;
52            } else {
53                drop(values);
54                if let Ok(mut values) = self.values.write() {
55                    // Recheck after acquiring write lock, in case another
56                    // thread has added the value.
57                    if let Some(value_to_update) = values.get(&attrs) {
58                        value_to_update.add(measurement);
59                        return;
60                    } else if is_under_cardinality_limit(values.len()) {
61                        let new_value = T::new_atomic_tracker();
62                        new_value.add(measurement);
63                        values.insert(attrs, new_value);
64                    } else if let Some(overflow_value) =
65                        values.get_mut(&STREAM_OVERFLOW_ATTRIBUTE_SET)
66                    {
67                        overflow_value.add(measurement);
68                        return;
69                    } else {
70                        let new_value = T::new_atomic_tracker();
71                        new_value.add(measurement);
72                        values.insert(STREAM_OVERFLOW_ATTRIBUTE_SET.clone(), new_value);
73                        global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into()));
74                    }
75                }
76            }
77        }
78    }
79}
80
81/// Summarizes a set of measurements made as their arithmetic sum.
82pub(crate) struct Sum<T: Number<T>> {
83    value_map: ValueMap<T>,
84    monotonic: bool,
85    start: Mutex<SystemTime>,
86}
87
88impl<T: Number<T>> Sum<T> {
89    /// Returns an aggregator that summarizes a set of measurements as their
90    /// arithmetic sum.
91    ///
92    /// Each sum is scoped by attributes and the aggregation cycle the measurements
93    /// were made in.
94    pub(crate) fn new(monotonic: bool) -> Self {
95        Sum {
96            value_map: ValueMap::new(),
97            monotonic,
98            start: Mutex::new(SystemTime::now()),
99        }
100    }
101
102    pub(crate) fn measure(&self, measurement: T, attrs: AttributeSet) {
103        self.value_map.measure(measurement, attrs)
104    }
105
106    pub(crate) fn delta(
107        &self,
108        dest: Option<&mut dyn Aggregation>,
109    ) -> (usize, Option<Box<dyn Aggregation>>) {
110        let t = SystemTime::now();
111
112        let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<data::Sum<T>>());
113        let mut new_agg = if s_data.is_none() {
114            Some(data::Sum {
115                data_points: vec![],
116                temporality: Temporality::Delta,
117                is_monotonic: self.monotonic,
118            })
119        } else {
120            None
121        };
122        let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
123        s_data.temporality = Temporality::Delta;
124        s_data.is_monotonic = self.monotonic;
125        s_data.data_points.clear();
126
127        let mut values = match self.value_map.values.write() {
128            Ok(v) => v,
129            Err(_) => return (0, None),
130        };
131
132        let n = values.len() + 1;
133        if n > s_data.data_points.capacity() {
134            s_data
135                .data_points
136                .reserve_exact(n - s_data.data_points.capacity());
137        }
138
139        let prev_start = self.start.lock().map(|start| *start).unwrap_or(t);
140        if self
141            .value_map
142            .has_no_value_attribute_value
143            .swap(false, Ordering::AcqRel)
144        {
145            s_data.data_points.push(DataPoint {
146                attributes: vec![],
147                start_time: Some(prev_start),
148                time: Some(t),
149                value: self.value_map.no_attribute_value.get_and_reset_value(),
150                exemplars: vec![],
151            });
152        }
153
154        for (attrs, value) in values.drain() {
155            s_data.data_points.push(DataPoint {
156                attributes: attrs
157                    .iter()
158                    .map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
159                    .collect(),
160                start_time: Some(prev_start),
161                time: Some(t),
162                value: value.get_value(),
163                exemplars: vec![],
164            });
165        }
166
167        // The delta collection cycle resets.
168        if let Ok(mut start) = self.start.lock() {
169            *start = t;
170        }
171
172        (
173            s_data.data_points.len(),
174            new_agg.map(|a| Box::new(a) as Box<_>),
175        )
176    }
177
178    pub(crate) fn cumulative(
179        &self,
180        dest: Option<&mut dyn Aggregation>,
181    ) -> (usize, Option<Box<dyn Aggregation>>) {
182        let t = SystemTime::now();
183
184        let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<data::Sum<T>>());
185        let mut new_agg = if s_data.is_none() {
186            Some(data::Sum {
187                data_points: vec![],
188                temporality: Temporality::Cumulative,
189                is_monotonic: self.monotonic,
190            })
191        } else {
192            None
193        };
194        let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
195        s_data.temporality = Temporality::Cumulative;
196        s_data.is_monotonic = self.monotonic;
197        s_data.data_points.clear();
198
199        let values = match self.value_map.values.write() {
200            Ok(v) => v,
201            Err(_) => return (0, None),
202        };
203
204        let n = values.len() + 1;
205        if n > s_data.data_points.capacity() {
206            s_data
207                .data_points
208                .reserve_exact(n - s_data.data_points.capacity());
209        }
210
211        let prev_start = self.start.lock().map(|start| *start).unwrap_or(t);
212
213        if self
214            .value_map
215            .has_no_value_attribute_value
216            .load(Ordering::Acquire)
217        {
218            s_data.data_points.push(DataPoint {
219                attributes: vec![],
220                start_time: Some(prev_start),
221                time: Some(t),
222                value: self.value_map.no_attribute_value.get_value(),
223                exemplars: vec![],
224            });
225        }
226
227        // TODO: This will use an unbounded amount of memory if there
228        // are unbounded number of attribute sets being aggregated. Attribute
229        // sets that become "stale" need to be forgotten so this will not
230        // overload the system.
231        for (attrs, value) in values.iter() {
232            s_data.data_points.push(DataPoint {
233                attributes: attrs
234                    .iter()
235                    .map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
236                    .collect(),
237                start_time: Some(prev_start),
238                time: Some(t),
239                value: value.get_value(),
240                exemplars: vec![],
241            });
242        }
243
244        (
245            s_data.data_points.len(),
246            new_agg.map(|a| Box::new(a) as Box<_>),
247        )
248    }
249}
250
251/// Summarizes a set of pre-computed sums as their arithmetic sum.
252pub(crate) struct PrecomputedSum<T: Number<T>> {
253    value_map: ValueMap<T>,
254    monotonic: bool,
255    start: Mutex<SystemTime>,
256    reported: Mutex<HashMap<AttributeSet, T>>,
257}
258
259impl<T: Number<T>> PrecomputedSum<T> {
260    pub(crate) fn new(monotonic: bool) -> Self {
261        PrecomputedSum {
262            value_map: ValueMap::new(),
263            monotonic,
264            start: Mutex::new(SystemTime::now()),
265            reported: Mutex::new(Default::default()),
266        }
267    }
268
269    pub(crate) fn measure(&self, measurement: T, attrs: AttributeSet) {
270        self.value_map.measure(measurement, attrs)
271    }
272
273    pub(crate) fn delta(
274        &self,
275        dest: Option<&mut dyn Aggregation>,
276    ) -> (usize, Option<Box<dyn Aggregation>>) {
277        let t = SystemTime::now();
278        let prev_start = self.start.lock().map(|start| *start).unwrap_or(t);
279
280        let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<data::Sum<T>>());
281        let mut new_agg = if s_data.is_none() {
282            Some(data::Sum {
283                data_points: vec![],
284                temporality: Temporality::Delta,
285                is_monotonic: self.monotonic,
286            })
287        } else {
288            None
289        };
290        let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
291        s_data.data_points.clear();
292        s_data.temporality = Temporality::Delta;
293        s_data.is_monotonic = self.monotonic;
294
295        let mut values = match self.value_map.values.write() {
296            Ok(v) => v,
297            Err(_) => return (0, None),
298        };
299
300        let n = values.len() + 1;
301        if n > s_data.data_points.capacity() {
302            s_data
303                .data_points
304                .reserve_exact(n - s_data.data_points.capacity());
305        }
306        let mut new_reported = HashMap::with_capacity(n);
307        let mut reported = match self.reported.lock() {
308            Ok(r) => r,
309            Err(_) => return (0, None),
310        };
311
312        if self
313            .value_map
314            .has_no_value_attribute_value
315            .swap(false, Ordering::AcqRel)
316        {
317            s_data.data_points.push(DataPoint {
318                attributes: vec![],
319                start_time: Some(prev_start),
320                time: Some(t),
321                value: self.value_map.no_attribute_value.get_and_reset_value(),
322                exemplars: vec![],
323            });
324        }
325
326        let default = T::default();
327        for (attrs, value) in values.drain() {
328            let delta = value.get_value() - *reported.get(&attrs).unwrap_or(&default);
329            if delta != default {
330                new_reported.insert(attrs.clone(), value.get_value());
331            }
332            s_data.data_points.push(DataPoint {
333                attributes: attrs
334                    .iter()
335                    .map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
336                    .collect(),
337                start_time: Some(prev_start),
338                time: Some(t),
339                value: delta,
340                exemplars: vec![],
341            });
342        }
343
344        // The delta collection cycle resets.
345        if let Ok(mut start) = self.start.lock() {
346            *start = t;
347        }
348
349        *reported = new_reported;
350        drop(reported); // drop before values guard is dropped
351
352        (
353            s_data.data_points.len(),
354            new_agg.map(|a| Box::new(a) as Box<_>),
355        )
356    }
357
358    pub(crate) fn cumulative(
359        &self,
360        dest: Option<&mut dyn Aggregation>,
361    ) -> (usize, Option<Box<dyn Aggregation>>) {
362        let t = SystemTime::now();
363        let prev_start = self.start.lock().map(|start| *start).unwrap_or(t);
364
365        let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<data::Sum<T>>());
366        let mut new_agg = if s_data.is_none() {
367            Some(data::Sum {
368                data_points: vec![],
369                temporality: Temporality::Cumulative,
370                is_monotonic: self.monotonic,
371            })
372        } else {
373            None
374        };
375        let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
376        s_data.data_points.clear();
377        s_data.temporality = Temporality::Cumulative;
378        s_data.is_monotonic = self.monotonic;
379
380        let values = match self.value_map.values.write() {
381            Ok(v) => v,
382            Err(_) => return (0, None),
383        };
384
385        let n = values.len() + 1;
386        if n > s_data.data_points.capacity() {
387            s_data
388                .data_points
389                .reserve_exact(n - s_data.data_points.capacity());
390        }
391        let mut new_reported = HashMap::with_capacity(n);
392        let mut reported = match self.reported.lock() {
393            Ok(r) => r,
394            Err(_) => return (0, None),
395        };
396
397        if self
398            .value_map
399            .has_no_value_attribute_value
400            .load(Ordering::Acquire)
401        {
402            s_data.data_points.push(DataPoint {
403                attributes: vec![],
404                start_time: Some(prev_start),
405                time: Some(t),
406                value: self.value_map.no_attribute_value.get_value(),
407                exemplars: vec![],
408            });
409        }
410
411        let default = T::default();
412        for (attrs, value) in values.iter() {
413            let delta = value.get_value() - *reported.get(attrs).unwrap_or(&default);
414            if delta != default {
415                new_reported.insert(attrs.clone(), value.get_value());
416            }
417            s_data.data_points.push(DataPoint {
418                attributes: attrs
419                    .iter()
420                    .map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
421                    .collect(),
422                start_time: Some(prev_start),
423                time: Some(t),
424                value: delta,
425                exemplars: vec![],
426            });
427        }
428
429        *reported = new_reported;
430        drop(reported); // drop before values guard is dropped
431
432        (
433            s_data.data_points.len(),
434            new_agg.map(|a| Box::new(a) as Box<_>),
435        )
436    }
437}