1use std::sync::atomic::{AtomicBool, Ordering};
2use std::vec;
3use std::{
4 collections::HashMap,
5 sync::{Mutex, RwLock},
6 time::SystemTime,
7};
8
9use crate::metrics::data::{self, Aggregation, DataPoint, Temporality};
10use crate::metrics::AttributeSet;
11use opentelemetry::KeyValue;
12use opentelemetry::{global, metrics::MetricsError};
13
14use super::{
15 aggregate::{is_under_cardinality_limit, STREAM_OVERFLOW_ATTRIBUTE_SET},
16 AtomicTracker, Number,
17};
18
19struct ValueMap<T: Number<T>> {
21 values: RwLock<HashMap<AttributeSet, T::AtomicTracker>>,
22 has_no_value_attribute_value: AtomicBool,
23 no_attribute_value: T::AtomicTracker,
24}
25
26impl<T: Number<T>> Default for ValueMap<T> {
27 fn default() -> Self {
28 ValueMap::new()
29 }
30}
31
32impl<T: Number<T>> ValueMap<T> {
33 fn new() -> Self {
34 ValueMap {
35 values: RwLock::new(HashMap::new()),
36 has_no_value_attribute_value: AtomicBool::new(false),
37 no_attribute_value: T::new_atomic_tracker(),
38 }
39 }
40}
41
42impl<T: Number<T>> ValueMap<T> {
43 fn measure(&self, measurement: T, attrs: AttributeSet) {
44 if attrs.is_empty() {
45 self.no_attribute_value.add(measurement);
46 self.has_no_value_attribute_value
47 .store(true, Ordering::Release);
48 } else if let Ok(values) = self.values.read() {
49 if let Some(value_to_update) = values.get(&attrs) {
50 value_to_update.add(measurement);
51 return;
52 } else {
53 drop(values);
54 if let Ok(mut values) = self.values.write() {
55 if let Some(value_to_update) = values.get(&attrs) {
58 value_to_update.add(measurement);
59 return;
60 } else if is_under_cardinality_limit(values.len()) {
61 let new_value = T::new_atomic_tracker();
62 new_value.add(measurement);
63 values.insert(attrs, new_value);
64 } else if let Some(overflow_value) =
65 values.get_mut(&STREAM_OVERFLOW_ATTRIBUTE_SET)
66 {
67 overflow_value.add(measurement);
68 return;
69 } else {
70 let new_value = T::new_atomic_tracker();
71 new_value.add(measurement);
72 values.insert(STREAM_OVERFLOW_ATTRIBUTE_SET.clone(), new_value);
73 global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into()));
74 }
75 }
76 }
77 }
78 }
79}
80
81pub(crate) struct Sum<T: Number<T>> {
83 value_map: ValueMap<T>,
84 monotonic: bool,
85 start: Mutex<SystemTime>,
86}
87
88impl<T: Number<T>> Sum<T> {
89 pub(crate) fn new(monotonic: bool) -> Self {
95 Sum {
96 value_map: ValueMap::new(),
97 monotonic,
98 start: Mutex::new(SystemTime::now()),
99 }
100 }
101
102 pub(crate) fn measure(&self, measurement: T, attrs: AttributeSet) {
103 self.value_map.measure(measurement, attrs)
104 }
105
106 pub(crate) fn delta(
107 &self,
108 dest: Option<&mut dyn Aggregation>,
109 ) -> (usize, Option<Box<dyn Aggregation>>) {
110 let t = SystemTime::now();
111
112 let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<data::Sum<T>>());
113 let mut new_agg = if s_data.is_none() {
114 Some(data::Sum {
115 data_points: vec![],
116 temporality: Temporality::Delta,
117 is_monotonic: self.monotonic,
118 })
119 } else {
120 None
121 };
122 let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
123 s_data.temporality = Temporality::Delta;
124 s_data.is_monotonic = self.monotonic;
125 s_data.data_points.clear();
126
127 let mut values = match self.value_map.values.write() {
128 Ok(v) => v,
129 Err(_) => return (0, None),
130 };
131
132 let n = values.len() + 1;
133 if n > s_data.data_points.capacity() {
134 s_data
135 .data_points
136 .reserve_exact(n - s_data.data_points.capacity());
137 }
138
139 let prev_start = self.start.lock().map(|start| *start).unwrap_or(t);
140 if self
141 .value_map
142 .has_no_value_attribute_value
143 .swap(false, Ordering::AcqRel)
144 {
145 s_data.data_points.push(DataPoint {
146 attributes: vec![],
147 start_time: Some(prev_start),
148 time: Some(t),
149 value: self.value_map.no_attribute_value.get_and_reset_value(),
150 exemplars: vec![],
151 });
152 }
153
154 for (attrs, value) in values.drain() {
155 s_data.data_points.push(DataPoint {
156 attributes: attrs
157 .iter()
158 .map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
159 .collect(),
160 start_time: Some(prev_start),
161 time: Some(t),
162 value: value.get_value(),
163 exemplars: vec![],
164 });
165 }
166
167 if let Ok(mut start) = self.start.lock() {
169 *start = t;
170 }
171
172 (
173 s_data.data_points.len(),
174 new_agg.map(|a| Box::new(a) as Box<_>),
175 )
176 }
177
178 pub(crate) fn cumulative(
179 &self,
180 dest: Option<&mut dyn Aggregation>,
181 ) -> (usize, Option<Box<dyn Aggregation>>) {
182 let t = SystemTime::now();
183
184 let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<data::Sum<T>>());
185 let mut new_agg = if s_data.is_none() {
186 Some(data::Sum {
187 data_points: vec![],
188 temporality: Temporality::Cumulative,
189 is_monotonic: self.monotonic,
190 })
191 } else {
192 None
193 };
194 let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
195 s_data.temporality = Temporality::Cumulative;
196 s_data.is_monotonic = self.monotonic;
197 s_data.data_points.clear();
198
199 let values = match self.value_map.values.write() {
200 Ok(v) => v,
201 Err(_) => return (0, None),
202 };
203
204 let n = values.len() + 1;
205 if n > s_data.data_points.capacity() {
206 s_data
207 .data_points
208 .reserve_exact(n - s_data.data_points.capacity());
209 }
210
211 let prev_start = self.start.lock().map(|start| *start).unwrap_or(t);
212
213 if self
214 .value_map
215 .has_no_value_attribute_value
216 .load(Ordering::Acquire)
217 {
218 s_data.data_points.push(DataPoint {
219 attributes: vec![],
220 start_time: Some(prev_start),
221 time: Some(t),
222 value: self.value_map.no_attribute_value.get_value(),
223 exemplars: vec![],
224 });
225 }
226
227 for (attrs, value) in values.iter() {
232 s_data.data_points.push(DataPoint {
233 attributes: attrs
234 .iter()
235 .map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
236 .collect(),
237 start_time: Some(prev_start),
238 time: Some(t),
239 value: value.get_value(),
240 exemplars: vec![],
241 });
242 }
243
244 (
245 s_data.data_points.len(),
246 new_agg.map(|a| Box::new(a) as Box<_>),
247 )
248 }
249}
250
251pub(crate) struct PrecomputedSum<T: Number<T>> {
253 value_map: ValueMap<T>,
254 monotonic: bool,
255 start: Mutex<SystemTime>,
256 reported: Mutex<HashMap<AttributeSet, T>>,
257}
258
259impl<T: Number<T>> PrecomputedSum<T> {
260 pub(crate) fn new(monotonic: bool) -> Self {
261 PrecomputedSum {
262 value_map: ValueMap::new(),
263 monotonic,
264 start: Mutex::new(SystemTime::now()),
265 reported: Mutex::new(Default::default()),
266 }
267 }
268
269 pub(crate) fn measure(&self, measurement: T, attrs: AttributeSet) {
270 self.value_map.measure(measurement, attrs)
271 }
272
273 pub(crate) fn delta(
274 &self,
275 dest: Option<&mut dyn Aggregation>,
276 ) -> (usize, Option<Box<dyn Aggregation>>) {
277 let t = SystemTime::now();
278 let prev_start = self.start.lock().map(|start| *start).unwrap_or(t);
279
280 let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<data::Sum<T>>());
281 let mut new_agg = if s_data.is_none() {
282 Some(data::Sum {
283 data_points: vec![],
284 temporality: Temporality::Delta,
285 is_monotonic: self.monotonic,
286 })
287 } else {
288 None
289 };
290 let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
291 s_data.data_points.clear();
292 s_data.temporality = Temporality::Delta;
293 s_data.is_monotonic = self.monotonic;
294
295 let mut values = match self.value_map.values.write() {
296 Ok(v) => v,
297 Err(_) => return (0, None),
298 };
299
300 let n = values.len() + 1;
301 if n > s_data.data_points.capacity() {
302 s_data
303 .data_points
304 .reserve_exact(n - s_data.data_points.capacity());
305 }
306 let mut new_reported = HashMap::with_capacity(n);
307 let mut reported = match self.reported.lock() {
308 Ok(r) => r,
309 Err(_) => return (0, None),
310 };
311
312 if self
313 .value_map
314 .has_no_value_attribute_value
315 .swap(false, Ordering::AcqRel)
316 {
317 s_data.data_points.push(DataPoint {
318 attributes: vec![],
319 start_time: Some(prev_start),
320 time: Some(t),
321 value: self.value_map.no_attribute_value.get_and_reset_value(),
322 exemplars: vec![],
323 });
324 }
325
326 let default = T::default();
327 for (attrs, value) in values.drain() {
328 let delta = value.get_value() - *reported.get(&attrs).unwrap_or(&default);
329 if delta != default {
330 new_reported.insert(attrs.clone(), value.get_value());
331 }
332 s_data.data_points.push(DataPoint {
333 attributes: attrs
334 .iter()
335 .map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
336 .collect(),
337 start_time: Some(prev_start),
338 time: Some(t),
339 value: delta,
340 exemplars: vec![],
341 });
342 }
343
344 if let Ok(mut start) = self.start.lock() {
346 *start = t;
347 }
348
349 *reported = new_reported;
350 drop(reported); (
353 s_data.data_points.len(),
354 new_agg.map(|a| Box::new(a) as Box<_>),
355 )
356 }
357
358 pub(crate) fn cumulative(
359 &self,
360 dest: Option<&mut dyn Aggregation>,
361 ) -> (usize, Option<Box<dyn Aggregation>>) {
362 let t = SystemTime::now();
363 let prev_start = self.start.lock().map(|start| *start).unwrap_or(t);
364
365 let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<data::Sum<T>>());
366 let mut new_agg = if s_data.is_none() {
367 Some(data::Sum {
368 data_points: vec![],
369 temporality: Temporality::Cumulative,
370 is_monotonic: self.monotonic,
371 })
372 } else {
373 None
374 };
375 let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
376 s_data.data_points.clear();
377 s_data.temporality = Temporality::Cumulative;
378 s_data.is_monotonic = self.monotonic;
379
380 let values = match self.value_map.values.write() {
381 Ok(v) => v,
382 Err(_) => return (0, None),
383 };
384
385 let n = values.len() + 1;
386 if n > s_data.data_points.capacity() {
387 s_data
388 .data_points
389 .reserve_exact(n - s_data.data_points.capacity());
390 }
391 let mut new_reported = HashMap::with_capacity(n);
392 let mut reported = match self.reported.lock() {
393 Ok(r) => r,
394 Err(_) => return (0, None),
395 };
396
397 if self
398 .value_map
399 .has_no_value_attribute_value
400 .load(Ordering::Acquire)
401 {
402 s_data.data_points.push(DataPoint {
403 attributes: vec![],
404 start_time: Some(prev_start),
405 time: Some(t),
406 value: self.value_map.no_attribute_value.get_value(),
407 exemplars: vec![],
408 });
409 }
410
411 let default = T::default();
412 for (attrs, value) in values.iter() {
413 let delta = value.get_value() - *reported.get(attrs).unwrap_or(&default);
414 if delta != default {
415 new_reported.insert(attrs.clone(), value.get_value());
416 }
417 s_data.data_points.push(DataPoint {
418 attributes: attrs
419 .iter()
420 .map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
421 .collect(),
422 start_time: Some(prev_start),
423 time: Some(t),
424 value: delta,
425 exemplars: vec![],
426 });
427 }
428
429 *reported = new_reported;
430 drop(reported); (
433 s_data.data_points.len(),
434 new_agg.map(|a| Box::new(a) as Box<_>),
435 )
436 }
437}