opentelemetry_sdk/metrics/internal/
last_value.rs

1use std::{
2    collections::{hash_map::Entry, HashMap},
3    sync::Mutex,
4    time::SystemTime,
5};
6
7use crate::{metrics::data::DataPoint, metrics::AttributeSet};
8use opentelemetry::{global, metrics::MetricsError, KeyValue};
9
10use super::{
11    aggregate::{is_under_cardinality_limit, STREAM_OVERFLOW_ATTRIBUTE_SET},
12    Number,
13};
14
15/// Timestamped measurement data.
16struct DataPointValue<T> {
17    timestamp: SystemTime,
18    value: T,
19}
20
21/// Summarizes a set of measurements as the last one made.
22#[derive(Default)]
23pub(crate) struct LastValue<T> {
24    values: Mutex<HashMap<AttributeSet, DataPointValue<T>>>,
25}
26
27impl<T: Number<T>> LastValue<T> {
28    pub(crate) fn new() -> Self {
29        Self::default()
30    }
31
32    pub(crate) fn measure(&self, measurement: T, attrs: AttributeSet) {
33        let d: DataPointValue<T> = DataPointValue {
34            timestamp: SystemTime::now(),
35            value: measurement,
36        };
37        if let Ok(mut values) = self.values.lock() {
38            let size = values.len();
39            match values.entry(attrs) {
40                Entry::Occupied(mut occupied_entry) => {
41                    occupied_entry.insert(d);
42                }
43                Entry::Vacant(vacant_entry) => {
44                    if is_under_cardinality_limit(size) {
45                        vacant_entry.insert(d);
46                    } else {
47                        values.insert(STREAM_OVERFLOW_ATTRIBUTE_SET.clone(), d);
48                        global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow.".into()));
49                    }
50                }
51            }
52        }
53    }
54
55    pub(crate) fn compute_aggregation(&self, dest: &mut Vec<DataPoint<T>>) {
56        dest.clear();
57        let mut values = match self.values.lock() {
58            Ok(guard) if !guard.is_empty() => guard,
59            _ => return,
60        };
61
62        let n = values.len();
63        if n > dest.capacity() {
64            dest.reserve_exact(n - dest.capacity());
65        }
66
67        for (attrs, value) in values.drain() {
68            dest.push(DataPoint {
69                attributes: attrs
70                    .iter()
71                    .map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
72                    .collect(),
73                time: Some(value.timestamp),
74                value: value.value,
75                start_time: None,
76                exemplars: vec![],
77            });
78        }
79    }
80}