opentelemetry_sdk/metrics/
mod.rs

1//! The crust of the OpenTelemetry metrics SDK.
2//!
3//! ## Configuration
4//!
5//! The metrics SDK configuration is stored with each [SdkMeterProvider].
6//! Configuration for [Resource]s, [View]s, and [ManualReader] or
7//! [PeriodicReader] instances can be specified.
8//!
9//! ### Example
10//!
11//! ```
12//! use opentelemetry::global;
13//! use opentelemetry::KeyValue;
14//! use opentelemetry_sdk::{metrics::SdkMeterProvider, Resource};
15//!
16//! // Generate SDK configuration, resource, views, etc
17//! let resource = Resource::default(); // default attributes about the current process
18//!
19//! // Create a meter provider with the desired config
20//! let meter_provider = SdkMeterProvider::builder().with_resource(resource).build();
21//! global::set_meter_provider(meter_provider.clone());
22//!
23//! // Use the meter provider to create meter instances
24//! let meter = global::meter("my_app");
25//!
26//! // Create instruments scoped to the meter
27//! let counter = meter
28//!     .u64_counter("power_consumption")
29//!     .with_unit("kWh")
30//!     .init();
31//!
32//! // use instruments to record measurements
33//! counter.add(10, &[KeyValue::new("rate", "standard")]);
34//!
35//! // shutdown the provider at the end of the application to ensure any metrics not yet
36//! // exported are flushed.
37//! meter_provider.shutdown().unwrap();
38//! ```
39//!
40//! [Resource]: crate::Resource
41
42pub(crate) mod aggregation;
43pub mod data;
44pub mod exporter;
45pub(crate) mod instrument;
46pub(crate) mod internal;
47pub(crate) mod manual_reader;
48pub(crate) mod meter;
49mod meter_provider;
50pub(crate) mod periodic_reader;
51pub(crate) mod pipeline;
52pub mod reader;
53pub(crate) mod view;
54
55pub use aggregation::*;
56pub use instrument::*;
57pub use manual_reader::*;
58pub use meter::*;
59pub use meter_provider::*;
60pub use periodic_reader::*;
61pub use pipeline::Pipeline;
62pub use view::*;
63
64use std::collections::hash_map::DefaultHasher;
65use std::collections::HashSet;
66use std::hash::{Hash, Hasher};
67
68use opentelemetry::{Key, KeyValue, Value};
69
70/// A unique set of attributes that can be used as instrument identifiers.
71///
72/// This must implement [Hash], [PartialEq], and [Eq] so it may be used as
73/// HashMap keys and other de-duplication methods.
74#[derive(Clone, Default, Debug, PartialEq, Eq)]
75pub struct AttributeSet(Vec<KeyValue>, u64);
76
77impl From<&[KeyValue]> for AttributeSet {
78    fn from(values: &[KeyValue]) -> Self {
79        let mut seen_keys = HashSet::with_capacity(values.len());
80        let vec = values
81            .iter()
82            .rev()
83            .filter_map(|kv| {
84                if seen_keys.insert(kv.key.clone()) {
85                    Some(kv.clone())
86                } else {
87                    None
88                }
89            })
90            .collect::<Vec<_>>();
91
92        AttributeSet::new(vec)
93    }
94}
95
96fn calculate_hash(values: &[KeyValue]) -> u64 {
97    let mut hasher = DefaultHasher::new();
98    values.iter().fold(&mut hasher, |mut hasher, item| {
99        item.hash(&mut hasher);
100        hasher
101    });
102    hasher.finish()
103}
104
105impl AttributeSet {
106    fn new(mut values: Vec<KeyValue>) -> Self {
107        values.sort_unstable();
108        let hash = calculate_hash(&values);
109        AttributeSet(values, hash)
110    }
111
112    /// Returns `true` if the set contains no elements.
113    pub fn is_empty(&self) -> bool {
114        self.0.is_empty()
115    }
116
117    /// Retains only the attributes specified by the predicate.
118    pub fn retain<F>(&mut self, f: F)
119    where
120        F: Fn(&KeyValue) -> bool,
121    {
122        self.0.retain(|kv| f(kv));
123
124        // Recalculate the hash as elements are changed.
125        self.1 = calculate_hash(&self.0);
126    }
127
128    /// Iterate over key value pairs in the set
129    pub fn iter(&self) -> impl Iterator<Item = (&Key, &Value)> {
130        self.0.iter().map(|kv| (&kv.key, &kv.value))
131    }
132}
133
134impl Hash for AttributeSet {
135    fn hash<H: Hasher>(&self, state: &mut H) {
136        state.write_u64(self.1)
137    }
138}
139
140#[cfg(all(test, feature = "testing"))]
141mod tests {
142    use self::data::{DataPoint, HistogramDataPoint, ScopeMetrics};
143    use super::*;
144    use crate::metrics::data::{ResourceMetrics, Temporality};
145    use crate::metrics::reader::TemporalitySelector;
146    use crate::testing::metrics::InMemoryMetricsExporterBuilder;
147    use crate::{runtime, testing::metrics::InMemoryMetricsExporter};
148    use opentelemetry::metrics::{Counter, Meter, UpDownCounter};
149    use opentelemetry::{metrics::MeterProvider as _, KeyValue};
150    use rand::{rngs, Rng, SeedableRng};
151    use std::borrow::Cow;
152    use std::sync::{Arc, Mutex};
153    use std::thread;
154    use std::time::Duration;
155
156    // Run all tests in this mod
157    // cargo test metrics::tests --features=testing
158    // Note for all tests from this point onwards in this mod:
159    // "multi_thread" tokio flavor must be used else flush won't
160    // be able to make progress!
161
162    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
163    async fn counter_overflow_delta() {
164        // Arrange
165        let mut test_context = TestContext::new(Temporality::Delta);
166        let counter = test_context.u64_counter("test", "my_counter", None);
167
168        // Act
169        // Record measurements with A:0, A:1,.......A:1999, which just fits in the 2000 limit
170        for v in 0..2000 {
171            counter.add(100, &[KeyValue::new("A", v.to_string())]);
172        }
173
174        // All of the below will now go into overflow.
175        counter.add(100, &[KeyValue::new("A", "foo")]);
176        counter.add(100, &[KeyValue::new("A", "another")]);
177        counter.add(100, &[KeyValue::new("A", "yet_another")]);
178        test_context.flush_metrics();
179
180        let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
181
182        // Expecting 2001 metric points. (2000 + 1 overflow)
183        assert_eq!(sum.data_points.len(), 2001);
184
185        let data_point =
186            find_datapoint_with_key_value(&sum.data_points, "otel.metric.overflow", "true")
187                .expect("overflow point expected");
188        assert_eq!(data_point.value, 300);
189    }
190
191    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
192    async fn counter_aggregation_cumulative() {
193        // Run this test with stdout enabled to see output.
194        // cargo test counter_aggregation_cumulative --features=testing -- --nocapture
195        counter_aggregation_helper(Temporality::Cumulative);
196    }
197
198    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
199    async fn counter_aggregation_delta() {
200        // Run this test with stdout enabled to see output.
201        // cargo test counter_aggregation_delta --features=testing -- --nocapture
202        counter_aggregation_helper(Temporality::Delta);
203    }
204
205    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
206    async fn histogram_aggregation_cumulative() {
207        // Run this test with stdout enabled to see output.
208        // cargo test histogram_aggregation_cumulative --features=testing -- --nocapture
209        histogram_aggregation_helper(Temporality::Cumulative);
210    }
211
212    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
213    async fn histogram_aggregation_delta() {
214        // Run this test with stdout enabled to see output.
215        // cargo test histogram_aggregation_delta --features=testing -- --nocapture
216        histogram_aggregation_helper(Temporality::Delta);
217    }
218
219    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
220    async fn updown_counter_aggregation_cumulative() {
221        // Run this test with stdout enabled to see output.
222        // cargo test updown_counter_aggregation_cumulative --features=testing -- --nocapture
223        updown_counter_aggregation_helper(Temporality::Cumulative);
224    }
225
226    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
227    async fn updown_counter_aggregation_delta() {
228        // Run this test with stdout enabled to see output.
229        // cargo test updown_counter_aggregation_delta --features=testing -- --nocapture
230        updown_counter_aggregation_helper(Temporality::Delta);
231    }
232
233    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
234    async fn gauge_aggregation() {
235        // Run this test with stdout enabled to see output.
236        // cargo test gauge_aggregation --features=testing -- --nocapture
237
238        // Gauge should use last value aggregation regardless of the aggregation temporality used.
239        gauge_aggregation_helper(Temporality::Delta);
240        gauge_aggregation_helper(Temporality::Cumulative);
241    }
242
243    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
244    async fn observable_counter_aggregation_cumulative_non_zero_increment() {
245        // Run this test with stdout enabled to see output.
246        // cargo test observable_counter_aggregation_cumulative_non_zero_increment --features=testing -- --nocapture
247        observable_counter_aggregation_helper(Temporality::Cumulative, 100, 10, 4);
248    }
249
250    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
251    async fn observable_counter_aggregation_delta_non_zero_increment() {
252        // Run this test with stdout enabled to see output.
253        // cargo test observable_counter_aggregation_delta_non_zero_increment --features=testing -- --nocapture
254        observable_counter_aggregation_helper(Temporality::Delta, 100, 10, 4);
255    }
256
257    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
258    async fn observable_counter_aggregation_cumulative_zero_increment() {
259        // Run this test with stdout enabled to see output.
260        // cargo test observable_counter_aggregation_cumulative_zero_increment --features=testing -- --nocapture
261        observable_counter_aggregation_helper(Temporality::Cumulative, 100, 0, 4);
262    }
263
264    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
265    #[ignore = "Aggregation bug! https://github.com/open-telemetry/opentelemetry-rust/issues/1517"]
266    async fn observable_counter_aggregation_delta_zero_increment() {
267        // Run this test with stdout enabled to see output.
268        // cargo test observable_counter_aggregation_delta_zero_increment --features=testing -- --nocapture
269        observable_counter_aggregation_helper(Temporality::Delta, 100, 0, 4);
270    }
271
272    fn observable_counter_aggregation_helper(
273        temporality: Temporality,
274        start: u64,
275        increment: u64,
276        length: u64,
277    ) {
278        // Arrange
279        let mut test_context = TestContext::new(temporality);
280        // The Observable counter reports values[0], values[1],....values[n] on each flush.
281        let values: Vec<u64> = (0..length).map(|i| start + i * increment).collect();
282        println!("Testing with observable values: {:?}", values);
283        let values = Arc::new(values);
284        let values_clone = values.clone();
285        let i = Arc::new(Mutex::new(0));
286        let _observable_counter = test_context
287            .meter()
288            .u64_observable_counter("my_observable_counter")
289            .with_unit("my_unit")
290            .with_callback(move |observer| {
291                let mut index = i.lock().unwrap();
292                if *index < values.len() {
293                    observer.observe(values[*index], &[KeyValue::new("key1", "value1")]);
294                    *index += 1;
295                }
296            })
297            .init();
298
299        for (iter, v) in values_clone.iter().enumerate() {
300            test_context.flush_metrics();
301            let sum = test_context.get_aggregation::<data::Sum<u64>>("my_observable_counter", None);
302            assert_eq!(sum.data_points.len(), 1);
303            assert!(sum.is_monotonic, "Counter should produce monotonic.");
304            if let Temporality::Cumulative = temporality {
305                assert_eq!(
306                    sum.temporality,
307                    Temporality::Cumulative,
308                    "Should produce cumulative"
309                );
310            } else {
311                assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
312            }
313
314            // find and validate key1=value1 datapoint
315            let data_point = find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
316                .expect("datapoint with key1=value1 expected");
317            if let Temporality::Cumulative = temporality {
318                // Cumulative counter should have the value as is.
319                assert_eq!(data_point.value, *v);
320            } else {
321                // Delta counter should have the increment value.
322                // Except for the first value which should be the start value.
323                if iter == 0 {
324                    assert_eq!(data_point.value, start);
325                } else {
326                    assert_eq!(data_point.value, increment);
327                }
328            }
329
330            test_context.reset_metrics();
331        }
332    }
333
334    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
335    async fn counter_duplicate_instrument_merge() {
336        // Arrange
337        let exporter = InMemoryMetricsExporter::default();
338        let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
339        let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
340
341        // Act
342        let meter = meter_provider.meter("test");
343        let counter = meter
344            .u64_counter("my_counter")
345            .with_unit("my_unit")
346            .with_description("my_description")
347            .init();
348
349        let counter_duplicated = meter
350            .u64_counter("my_counter")
351            .with_unit("my_unit")
352            .with_description("my_description")
353            .init();
354
355        let attribute = vec![KeyValue::new("key1", "value1")];
356        counter.add(10, &attribute);
357        counter_duplicated.add(5, &attribute);
358
359        meter_provider.force_flush().unwrap();
360
361        // Assert
362        let resource_metrics = exporter
363            .get_finished_metrics()
364            .expect("metrics are expected to be exported.");
365        assert!(
366            resource_metrics[0].scope_metrics[0].metrics.len() == 1,
367            "There should be single metric merging duplicate instruments"
368        );
369        let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
370        assert_eq!(metric.name, "my_counter");
371        assert_eq!(metric.unit, "my_unit");
372        let sum = metric
373            .data
374            .as_any()
375            .downcast_ref::<data::Sum<u64>>()
376            .expect("Sum aggregation expected for Counter instruments by default");
377
378        // Expecting 1 time-series.
379        assert_eq!(sum.data_points.len(), 1);
380
381        let datapoint = &sum.data_points[0];
382        assert_eq!(datapoint.value, 15);
383    }
384
385    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
386    async fn counter_duplicate_instrument_different_meter_no_merge() {
387        // Arrange
388        let exporter = InMemoryMetricsExporter::default();
389        let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
390        let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
391
392        // Act
393        let meter1 = meter_provider.meter("test.meter1");
394        let meter2 = meter_provider.meter("test.meter2");
395        let counter1 = meter1
396            .u64_counter("my_counter")
397            .with_unit("my_unit")
398            .with_description("my_description")
399            .init();
400
401        let counter2 = meter2
402            .u64_counter("my_counter")
403            .with_unit("my_unit")
404            .with_description("my_description")
405            .init();
406
407        let attribute = vec![KeyValue::new("key1", "value1")];
408        counter1.add(10, &attribute);
409        counter2.add(5, &attribute);
410
411        meter_provider.force_flush().unwrap();
412
413        // Assert
414        let resource_metrics = exporter
415            .get_finished_metrics()
416            .expect("metrics are expected to be exported.");
417        assert!(
418            resource_metrics[0].scope_metrics.len() == 2,
419            "There should be 2 separate scope"
420        );
421        assert!(
422            resource_metrics[0].scope_metrics[0].metrics.len() == 1,
423            "There should be single metric for the scope"
424        );
425        assert!(
426            resource_metrics[0].scope_metrics[1].metrics.len() == 1,
427            "There should be single metric for the scope"
428        );
429
430        let scope1 = find_scope_metric(&resource_metrics[0].scope_metrics, "test.meter1");
431        let scope2 = find_scope_metric(&resource_metrics[0].scope_metrics, "test.meter2");
432
433        if let Some(scope1) = scope1 {
434            let metric1 = &scope1.metrics[0];
435            assert_eq!(metric1.name, "my_counter");
436            assert_eq!(metric1.unit, "my_unit");
437            assert_eq!(metric1.description, "my_description");
438            let sum1 = metric1
439                .data
440                .as_any()
441                .downcast_ref::<data::Sum<u64>>()
442                .expect("Sum aggregation expected for Counter instruments by default");
443
444            // Expecting 1 time-series.
445            assert_eq!(sum1.data_points.len(), 1);
446
447            let datapoint1 = &sum1.data_points[0];
448            assert_eq!(datapoint1.value, 10);
449        } else {
450            panic!("No MetricScope found for 'test.meter1'");
451        }
452
453        if let Some(scope2) = scope2 {
454            let metric2 = &scope2.metrics[0];
455            assert_eq!(metric2.name, "my_counter");
456            assert_eq!(metric2.unit, "my_unit");
457            assert_eq!(metric2.description, "my_description");
458            let sum2 = metric2
459                .data
460                .as_any()
461                .downcast_ref::<data::Sum<u64>>()
462                .expect("Sum aggregation expected for Counter instruments by default");
463
464            // Expecting 1 time-series.
465            assert_eq!(sum2.data_points.len(), 1);
466
467            let datapoint2 = &sum2.data_points[0];
468            assert_eq!(datapoint2.value, 5);
469        } else {
470            panic!("No MetricScope found for 'test.meter2'");
471        }
472    }
473
474    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
475    async fn instrumentation_scope_identity_test() {
476        // Arrange
477        let exporter = InMemoryMetricsExporter::default();
478        let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
479        let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
480
481        // Act
482        // Meters are identical except for scope attributes, but scope attributes are not an identifying property.
483        // Hence there should be a single metric stream output for this test.
484        let meter1 = meter_provider.versioned_meter(
485            "test.meter",
486            Some("v0.1.0"),
487            Some("schema_url"),
488            Some(vec![KeyValue::new("key", "value1")]),
489        );
490        let meter2 = meter_provider.versioned_meter(
491            "test.meter",
492            Some("v0.1.0"),
493            Some("schema_url"),
494            Some(vec![KeyValue::new("key", "value2")]),
495        );
496        let counter1 = meter1
497            .u64_counter("my_counter")
498            .with_unit("my_unit")
499            .with_description("my_description")
500            .init();
501
502        let counter2 = meter2
503            .u64_counter("my_counter")
504            .with_unit("my_unit")
505            .with_description("my_description")
506            .init();
507
508        let attribute = vec![KeyValue::new("key1", "value1")];
509        counter1.add(10, &attribute);
510        counter2.add(5, &attribute);
511
512        meter_provider.force_flush().unwrap();
513
514        // Assert
515        let resource_metrics = exporter
516            .get_finished_metrics()
517            .expect("metrics are expected to be exported.");
518        println!("resource_metrics: {:?}", resource_metrics);
519        assert!(
520            resource_metrics[0].scope_metrics.len() == 1,
521            "There should be a single scope as the meters are identical"
522        );
523        assert!(
524            resource_metrics[0].scope_metrics[0].metrics.len() == 1,
525            "There should be single metric for the scope as instruments are identical"
526        );
527
528        let scope = &resource_metrics[0].scope_metrics[0].scope;
529        assert_eq!(scope.name, "test.meter");
530        assert_eq!(scope.version, Some(Cow::Borrowed("v0.1.0")));
531        assert_eq!(scope.schema_url, Some(Cow::Borrowed("schema_url")));
532
533        // This is validating current behavior, but it is not guaranteed to be the case in the future,
534        // as this is a user error and SDK reserves right to change this behavior.
535        assert_eq!(scope.attributes, vec![KeyValue::new("key", "value1")]);
536
537        let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
538        assert_eq!(metric.name, "my_counter");
539        assert_eq!(metric.unit, "my_unit");
540        assert_eq!(metric.description, "my_description");
541        let sum = metric
542            .data
543            .as_any()
544            .downcast_ref::<data::Sum<u64>>()
545            .expect("Sum aggregation expected for Counter instruments by default");
546
547        // Expecting 1 time-series.
548        assert_eq!(sum.data_points.len(), 1);
549
550        let datapoint = &sum.data_points[0];
551        assert_eq!(datapoint.value, 15);
552    }
553
554    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
555    async fn histogram_aggregation_with_invalid_aggregation_should_proceed_as_if_view_not_exist() {
556        // Run this test with stdout enabled to see output.
557        // cargo test histogram_aggregation_with_invalid_aggregation_should_proceed_as_if_view_not_exist --features=testing -- --nocapture
558
559        // Arrange
560        let exporter = InMemoryMetricsExporter::default();
561        let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
562        let criteria = Instrument::new().name("test_histogram");
563        let stream_invalid_aggregation = Stream::new()
564            .aggregation(Aggregation::ExplicitBucketHistogram {
565                boundaries: vec![0.9, 1.9, 1.2, 1.3, 1.4, 1.5], // invalid boundaries
566                record_min_max: false,
567            })
568            .name("test_histogram_renamed")
569            .unit("test_unit_renamed");
570
571        let view =
572            new_view(criteria, stream_invalid_aggregation).expect("Expected to create a new view");
573        let meter_provider = SdkMeterProvider::builder()
574            .with_reader(reader)
575            .with_view(view)
576            .build();
577
578        // Act
579        let meter = meter_provider.meter("test");
580        let histogram = meter
581            .f64_histogram("test_histogram")
582            .with_unit("test_unit")
583            .init();
584
585        histogram.record(1.5, &[KeyValue::new("key1", "value1")]);
586        meter_provider.force_flush().unwrap();
587
588        // Assert
589        let resource_metrics = exporter
590            .get_finished_metrics()
591            .expect("metrics are expected to be exported.");
592        assert!(!resource_metrics.is_empty());
593        let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
594        assert_eq!(
595            metric.name, "test_histogram",
596            "View rename should be ignored and original name retained."
597        );
598        assert_eq!(
599            metric.unit, "test_unit",
600            "View rename of unit should be ignored and original unit retained."
601        );
602    }
603
604    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
605    async fn spatial_aggregation_when_view_drops_attributes_observable_counter() {
606        // cargo test spatial_aggregation_when_view_drops_attributes_observable_counter --features=testing
607
608        // Arrange
609        let exporter = InMemoryMetricsExporter::default();
610        let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
611        let criteria = Instrument::new().name("my_observable_counter");
612        // View drops all attributes.
613        let stream_invalid_aggregation = Stream::new().allowed_attribute_keys(vec![]);
614
615        let view =
616            new_view(criteria, stream_invalid_aggregation).expect("Expected to create a new view");
617        let meter_provider = SdkMeterProvider::builder()
618            .with_reader(reader)
619            .with_view(view)
620            .build();
621
622        // Act
623        let meter = meter_provider.meter("test");
624        let _observable_counter = meter
625            .u64_observable_counter("my_observable_counter")
626            .with_callback(|observer| {
627                observer.observe(
628                    100,
629                    &[
630                        KeyValue::new("statusCode", "200"),
631                        KeyValue::new("verb", "get"),
632                    ],
633                );
634
635                observer.observe(
636                    100,
637                    &[
638                        KeyValue::new("statusCode", "200"),
639                        KeyValue::new("verb", "post"),
640                    ],
641                );
642
643                observer.observe(
644                    100,
645                    &[
646                        KeyValue::new("statusCode", "500"),
647                        KeyValue::new("verb", "get"),
648                    ],
649                );
650            })
651            .init();
652
653        meter_provider.force_flush().unwrap();
654
655        // Assert
656        let resource_metrics = exporter
657            .get_finished_metrics()
658            .expect("metrics are expected to be exported.");
659        assert!(!resource_metrics.is_empty());
660        let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
661        assert_eq!(metric.name, "my_observable_counter",);
662
663        let sum = metric
664            .data
665            .as_any()
666            .downcast_ref::<data::Sum<u64>>()
667            .expect("Sum aggregation expected for ObservableCounter instruments by default");
668
669        // Expecting 1 time-series only, as the view drops all attributes resulting
670        // in a single time-series.
671        // This is failing today, due to lack of support for spatial aggregation.
672        assert_eq!(sum.data_points.len(), 1);
673
674        // find and validate the single datapoint
675        let data_point = &sum.data_points[0];
676        assert_eq!(data_point.value, 300);
677    }
678
679    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
680    async fn spatial_aggregation_when_view_drops_attributes_counter() {
681        // cargo test spatial_aggregation_when_view_drops_attributes_counter --features=testing
682
683        // Arrange
684        let exporter = InMemoryMetricsExporter::default();
685        let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
686        let criteria = Instrument::new().name("my_counter");
687        // View drops all attributes.
688        let stream_invalid_aggregation = Stream::new().allowed_attribute_keys(vec![]);
689
690        let view =
691            new_view(criteria, stream_invalid_aggregation).expect("Expected to create a new view");
692        let meter_provider = SdkMeterProvider::builder()
693            .with_reader(reader)
694            .with_view(view)
695            .build();
696
697        // Act
698        let meter = meter_provider.meter("test");
699        let counter = meter.u64_counter("my_counter").init();
700
701        // Normally, this would generate 3 time-series, but since the view
702        // drops all attributes, we expect only 1 time-series.
703        counter.add(
704            10,
705            [
706                KeyValue::new("statusCode", "200"),
707                KeyValue::new("verb", "Get"),
708            ]
709            .as_ref(),
710        );
711
712        counter.add(
713            10,
714            [
715                KeyValue::new("statusCode", "500"),
716                KeyValue::new("verb", "Get"),
717            ]
718            .as_ref(),
719        );
720
721        counter.add(
722            10,
723            [
724                KeyValue::new("statusCode", "200"),
725                KeyValue::new("verb", "Post"),
726            ]
727            .as_ref(),
728        );
729
730        meter_provider.force_flush().unwrap();
731
732        // Assert
733        let resource_metrics = exporter
734            .get_finished_metrics()
735            .expect("metrics are expected to be exported.");
736        assert!(!resource_metrics.is_empty());
737        let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
738        assert_eq!(metric.name, "my_counter",);
739
740        let sum = metric
741            .data
742            .as_any()
743            .downcast_ref::<data::Sum<u64>>()
744            .expect("Sum aggregation expected for Counter instruments by default");
745
746        // Expecting 1 time-series only, as the view drops all attributes resulting
747        // in a single time-series.
748        // This is failing today, due to lack of support for spatial aggregation.
749        assert_eq!(sum.data_points.len(), 1);
750        // find and validate the single datapoint
751        let data_point = &sum.data_points[0];
752        assert_eq!(data_point.value, 30);
753    }
754
755    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
756    async fn counter_aggregation_attribute_order() {
757        // Run this test with stdout enabled to see output.
758        // cargo test counter_aggregation_attribute_order --features=testing -- --nocapture
759
760        // Arrange
761        let mut test_context = TestContext::new(Temporality::Delta);
762        let counter = test_context.u64_counter("test", "my_counter", None);
763
764        // Act
765        // Add the same set of attributes in different order. (they are expected
766        // to be treated as same attributes)
767        counter.add(
768            1,
769            &[
770                KeyValue::new("A", "a"),
771                KeyValue::new("B", "b"),
772                KeyValue::new("C", "c"),
773            ],
774        );
775        counter.add(
776            1,
777            &[
778                KeyValue::new("A", "a"),
779                KeyValue::new("C", "c"),
780                KeyValue::new("B", "b"),
781            ],
782        );
783        counter.add(
784            1,
785            &[
786                KeyValue::new("B", "b"),
787                KeyValue::new("A", "a"),
788                KeyValue::new("C", "c"),
789            ],
790        );
791        counter.add(
792            1,
793            &[
794                KeyValue::new("B", "b"),
795                KeyValue::new("C", "c"),
796                KeyValue::new("A", "a"),
797            ],
798        );
799        counter.add(
800            1,
801            &[
802                KeyValue::new("C", "c"),
803                KeyValue::new("B", "b"),
804                KeyValue::new("A", "a"),
805            ],
806        );
807        counter.add(
808            1,
809            &[
810                KeyValue::new("C", "c"),
811                KeyValue::new("A", "a"),
812                KeyValue::new("B", "b"),
813            ],
814        );
815        test_context.flush_metrics();
816
817        let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
818
819        // Expecting 1 time-series.
820        assert_eq!(sum.data_points.len(), 1);
821
822        // validate the sole datapoint
823        let data_point1 = &sum.data_points[0];
824        assert_eq!(data_point1.value, 6);
825    }
826
827    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
828    async fn no_attr_cumulative_counter() {
829        let mut test_context = TestContext::new(Temporality::Cumulative);
830        let counter = test_context.u64_counter("test", "my_counter", None);
831
832        counter.add(50, &[]);
833        test_context.flush_metrics();
834
835        let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
836
837        assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
838        assert!(sum.is_monotonic, "Should produce monotonic.");
839        assert_eq!(
840            sum.temporality,
841            Temporality::Cumulative,
842            "Should produce cumulative"
843        );
844
845        let data_point = &sum.data_points[0];
846        assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
847        assert_eq!(data_point.value, 50, "Unexpected data point value");
848    }
849
850    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
851    async fn no_attr_delta_counter() {
852        let mut test_context = TestContext::new(Temporality::Delta);
853        let counter = test_context.u64_counter("test", "my_counter", None);
854
855        counter.add(50, &[]);
856        test_context.flush_metrics();
857
858        let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
859
860        assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
861        assert!(sum.is_monotonic, "Should produce monotonic.");
862        assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
863
864        let data_point = &sum.data_points[0];
865        assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
866        assert_eq!(data_point.value, 50, "Unexpected data point value");
867    }
868
869    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
870    async fn no_attr_cumulative_up_down_counter() {
871        let mut test_context = TestContext::new(Temporality::Cumulative);
872        let counter = test_context.i64_up_down_counter("test", "my_counter", Some("my_unit"));
873
874        counter.add(50, &[]);
875        test_context.flush_metrics();
876
877        let sum = test_context.get_aggregation::<data::Sum<i64>>("my_counter", Some("my_unit"));
878
879        assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
880        assert!(!sum.is_monotonic, "Should not produce monotonic.");
881        assert_eq!(
882            sum.temporality,
883            Temporality::Cumulative,
884            "Should produce cumulative"
885        );
886
887        let data_point = &sum.data_points[0];
888        assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
889        assert_eq!(data_point.value, 50, "Unexpected data point value");
890    }
891
892    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
893    async fn no_attr_delta_up_down_counter() {
894        let mut test_context = TestContext::new(Temporality::Delta);
895        let counter = test_context.i64_up_down_counter("test", "my_counter", Some("my_unit"));
896
897        counter.add(50, &[]);
898        test_context.flush_metrics();
899
900        let sum = test_context.get_aggregation::<data::Sum<i64>>("my_counter", Some("my_unit"));
901
902        assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
903        assert!(!sum.is_monotonic, "Should not produce monotonic.");
904        assert_eq!(sum.temporality, Temporality::Delta, "Should produce Delta");
905
906        let data_point = &sum.data_points[0];
907        assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
908        assert_eq!(data_point.value, 50, "Unexpected data point value");
909    }
910
911    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
912    async fn no_attr_cumulative_counter_value_added_after_export() {
913        let mut test_context = TestContext::new(Temporality::Cumulative);
914        let counter = test_context.u64_counter("test", "my_counter", None);
915
916        counter.add(50, &[]);
917        test_context.flush_metrics();
918        let _ = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
919        test_context.reset_metrics();
920
921        counter.add(5, &[]);
922        test_context.flush_metrics();
923        let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
924
925        assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
926        assert!(sum.is_monotonic, "Should produce monotonic.");
927        assert_eq!(
928            sum.temporality,
929            Temporality::Cumulative,
930            "Should produce cumulative"
931        );
932
933        let data_point = &sum.data_points[0];
934        assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
935        assert_eq!(data_point.value, 55, "Unexpected data point value");
936    }
937
938    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
939    async fn no_attr_delta_counter_value_reset_after_export() {
940        let mut test_context = TestContext::new(Temporality::Delta);
941        let counter = test_context.u64_counter("test", "my_counter", None);
942
943        counter.add(50, &[]);
944        test_context.flush_metrics();
945        let _ = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
946        test_context.reset_metrics();
947
948        counter.add(5, &[]);
949        test_context.flush_metrics();
950        let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
951
952        assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
953        assert!(sum.is_monotonic, "Should produce monotonic.");
954        assert_eq!(
955            sum.temporality,
956            Temporality::Delta,
957            "Should produce cumulative"
958        );
959
960        let data_point = &sum.data_points[0];
961        assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
962        assert_eq!(data_point.value, 5, "Unexpected data point value");
963    }
964
965    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
966    async fn second_delta_export_does_not_give_no_attr_value_if_add_not_called() {
967        let mut test_context = TestContext::new(Temporality::Delta);
968        let counter = test_context.u64_counter("test", "my_counter", None);
969
970        counter.add(50, &[]);
971        test_context.flush_metrics();
972        let _ = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
973        test_context.reset_metrics();
974
975        counter.add(50, &[KeyValue::new("a", "b")]);
976        test_context.flush_metrics();
977        let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
978
979        let no_attr_data_point = sum.data_points.iter().find(|x| x.attributes.is_empty());
980
981        assert!(
982            no_attr_data_point.is_none(),
983            "Expected no data points with no attributes"
984        );
985    }
986
987    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
988    #[ignore = "Known bug: https://github.com/open-telemetry/opentelemetry-rust/issues/1598"]
989    async fn delta_memory_efficiency_test() {
990        // Run this test with stdout enabled to see output.
991        // cargo test delta_memory_efficiency_test --features=testing -- --nocapture
992
993        // Arrange
994        let mut test_context = TestContext::new(Temporality::Delta);
995        let counter = test_context.u64_counter("test", "my_counter", None);
996
997        // Act
998        counter.add(1, &[KeyValue::new("key1", "value1")]);
999        counter.add(1, &[KeyValue::new("key1", "value1")]);
1000        counter.add(1, &[KeyValue::new("key1", "value1")]);
1001        counter.add(1, &[KeyValue::new("key1", "value1")]);
1002        counter.add(1, &[KeyValue::new("key1", "value1")]);
1003
1004        counter.add(1, &[KeyValue::new("key1", "value2")]);
1005        counter.add(1, &[KeyValue::new("key1", "value2")]);
1006        counter.add(1, &[KeyValue::new("key1", "value2")]);
1007        test_context.flush_metrics();
1008
1009        let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
1010
1011        // Expecting 2 time-series.
1012        assert_eq!(sum.data_points.len(), 2);
1013
1014        // find and validate key1=value1 datapoint
1015        let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1016            .expect("datapoint with key1=value1 expected");
1017        assert_eq!(data_point1.value, 5);
1018
1019        // find and validate key1=value2 datapoint
1020        let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value2")
1021            .expect("datapoint with key1=value2 expected");
1022        assert_eq!(data_point1.value, 3);
1023
1024        test_context.exporter.reset();
1025        // flush again, and validate that nothing is flushed
1026        // as delta temporality.
1027        test_context.flush_metrics();
1028
1029        let resource_metrics = test_context
1030            .exporter
1031            .get_finished_metrics()
1032            .expect("metrics are expected to be exported.");
1033        println!("resource_metrics: {:?}", resource_metrics);
1034        assert!(resource_metrics.is_empty(), "No metrics should be exported as no new measurements were recorded since last collect.");
1035    }
1036
1037    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1038    async fn counter_multithreaded() {
1039        // Run this test with stdout enabled to see output.
1040        // cargo test counter_multithreaded --features=testing -- --nocapture
1041
1042        counter_multithreaded_aggregation_helper(Temporality::Delta);
1043        counter_multithreaded_aggregation_helper(Temporality::Cumulative);
1044    }
1045
1046    fn counter_multithreaded_aggregation_helper(temporality: Temporality) {
1047        // Arrange
1048        let mut test_context = TestContext::new(temporality);
1049        let counter = Arc::new(test_context.u64_counter("test", "my_counter", None));
1050
1051        for i in 0..10 {
1052            thread::scope(|s| {
1053                s.spawn(|| {
1054                    counter.add(1, &[KeyValue::new("key1", "value1")]);
1055                    counter.add(1, &[KeyValue::new("key1", "value1")]);
1056                    counter.add(1, &[KeyValue::new("key1", "value1")]);
1057
1058                    // Test concurrent collection by forcing half of the update threads to `force_flush` metrics and sleep for some time.
1059                    if i % 2 == 0 {
1060                        test_context.flush_metrics();
1061                        thread::sleep(Duration::from_millis(i)); // Make each thread sleep for some time duration for better testing
1062                    }
1063
1064                    counter.add(1, &[KeyValue::new("key1", "value1")]);
1065                    counter.add(1, &[KeyValue::new("key1", "value1")]);
1066                });
1067            });
1068        }
1069
1070        test_context.flush_metrics();
1071
1072        // Assert
1073        // We invoke `test_context.flush_metrics()` six times.
1074        let sums =
1075            test_context.get_from_multiple_aggregations::<data::Sum<u64>>("my_counter", None, 6);
1076
1077        let values = sums
1078            .iter()
1079            .map(|sum| {
1080                assert_eq!(sum.data_points.len(), 1); // Expecting 1 time-series.
1081                assert!(sum.is_monotonic, "Counter should produce monotonic.");
1082                assert_eq!(sum.temporality, temporality);
1083
1084                // find and validate key1=value1 datapoint
1085                let data_point = find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1086                    .expect("datapoint with key1=value1 expected");
1087
1088                data_point.value
1089            })
1090            .collect::<Vec<_>>();
1091
1092        let total_sum: u64 = if temporality == Temporality::Delta {
1093            values.iter().sum()
1094        } else {
1095            *values.last().unwrap()
1096        };
1097
1098        assert_eq!(total_sum, 50); // Each of the 10 update threads record measurements summing up to 5.
1099    }
1100
1101    fn histogram_aggregation_helper(temporality: Temporality) {
1102        // Arrange
1103        let mut test_context = TestContext::new(temporality);
1104        let histogram = test_context.meter().u64_histogram("my_histogram").init();
1105
1106        // Act
1107        let mut rand = rngs::SmallRng::from_entropy();
1108        let values_kv1 = (0..50)
1109            .map(|_| rand.gen_range(0..100))
1110            .collect::<Vec<u64>>();
1111        for value in values_kv1.iter() {
1112            histogram.record(*value, &[KeyValue::new("key1", "value1")]);
1113        }
1114
1115        let values_kv2 = (0..30)
1116            .map(|_| rand.gen_range(0..100))
1117            .collect::<Vec<u64>>();
1118        for value in values_kv2.iter() {
1119            histogram.record(*value, &[KeyValue::new("key1", "value2")]);
1120        }
1121
1122        test_context.flush_metrics();
1123
1124        // Assert
1125        let histogram_data =
1126            test_context.get_aggregation::<data::Histogram<u64>>("my_histogram", None);
1127        // Expecting 2 time-series.
1128        assert_eq!(histogram_data.data_points.len(), 2);
1129        if let Temporality::Cumulative = temporality {
1130            assert_eq!(
1131                histogram_data.temporality,
1132                Temporality::Cumulative,
1133                "Should produce cumulative"
1134            );
1135        } else {
1136            assert_eq!(
1137                histogram_data.temporality,
1138                Temporality::Delta,
1139                "Should produce delta"
1140            );
1141        }
1142
1143        // find and validate key1=value2 datapoint
1144        let data_point1 =
1145            find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value1")
1146                .expect("datapoint with key1=value1 expected");
1147        assert_eq!(data_point1.count, values_kv1.len() as u64);
1148        assert_eq!(data_point1.sum, values_kv1.iter().sum::<u64>());
1149        assert_eq!(data_point1.min.unwrap(), *values_kv1.iter().min().unwrap());
1150        assert_eq!(data_point1.max.unwrap(), *values_kv1.iter().max().unwrap());
1151
1152        let data_point2 =
1153            find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value2")
1154                .expect("datapoint with key1=value2 expected");
1155        assert_eq!(data_point2.count, values_kv2.len() as u64);
1156        assert_eq!(data_point2.sum, values_kv2.iter().sum::<u64>());
1157        assert_eq!(data_point2.min.unwrap(), *values_kv2.iter().min().unwrap());
1158        assert_eq!(data_point2.max.unwrap(), *values_kv2.iter().max().unwrap());
1159
1160        // Reset and report more measurements
1161        test_context.reset_metrics();
1162        for value in values_kv1.iter() {
1163            histogram.record(*value, &[KeyValue::new("key1", "value1")]);
1164        }
1165
1166        for value in values_kv2.iter() {
1167            histogram.record(*value, &[KeyValue::new("key1", "value2")]);
1168        }
1169
1170        test_context.flush_metrics();
1171
1172        let histogram_data =
1173            test_context.get_aggregation::<data::Histogram<u64>>("my_histogram", None);
1174        assert_eq!(histogram_data.data_points.len(), 2);
1175        let data_point1 =
1176            find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value1")
1177                .expect("datapoint with key1=value1 expected");
1178        if temporality == Temporality::Cumulative {
1179            assert_eq!(data_point1.count, 2 * (values_kv1.len() as u64));
1180            assert_eq!(data_point1.sum, 2 * (values_kv1.iter().sum::<u64>()));
1181            assert_eq!(data_point1.min.unwrap(), *values_kv1.iter().min().unwrap());
1182            assert_eq!(data_point1.max.unwrap(), *values_kv1.iter().max().unwrap());
1183        } else {
1184            assert_eq!(data_point1.count, values_kv1.len() as u64);
1185            assert_eq!(data_point1.sum, values_kv1.iter().sum::<u64>());
1186            assert_eq!(data_point1.min.unwrap(), *values_kv1.iter().min().unwrap());
1187            assert_eq!(data_point1.max.unwrap(), *values_kv1.iter().max().unwrap());
1188        }
1189
1190        let data_point1 =
1191            find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value2")
1192                .expect("datapoint with key1=value1 expected");
1193        if temporality == Temporality::Cumulative {
1194            assert_eq!(data_point1.count, 2 * (values_kv2.len() as u64));
1195            assert_eq!(data_point1.sum, 2 * (values_kv2.iter().sum::<u64>()));
1196            assert_eq!(data_point1.min.unwrap(), *values_kv2.iter().min().unwrap());
1197            assert_eq!(data_point1.max.unwrap(), *values_kv2.iter().max().unwrap());
1198        } else {
1199            assert_eq!(data_point1.count, values_kv2.len() as u64);
1200            assert_eq!(data_point1.sum, values_kv2.iter().sum::<u64>());
1201            assert_eq!(data_point1.min.unwrap(), *values_kv2.iter().min().unwrap());
1202            assert_eq!(data_point1.max.unwrap(), *values_kv2.iter().max().unwrap());
1203        }
1204    }
1205
1206    fn gauge_aggregation_helper(temporality: Temporality) {
1207        // Arrange
1208        let mut test_context = TestContext::new(temporality);
1209        let gauge = test_context.meter().i64_gauge("my_gauge").init();
1210
1211        // Act
1212        gauge.record(1, &[KeyValue::new("key1", "value1")]);
1213        gauge.record(2, &[KeyValue::new("key1", "value1")]);
1214        gauge.record(1, &[KeyValue::new("key1", "value1")]);
1215        gauge.record(3, &[KeyValue::new("key1", "value1")]);
1216        gauge.record(4, &[KeyValue::new("key1", "value1")]);
1217
1218        gauge.record(11, &[KeyValue::new("key1", "value2")]);
1219        gauge.record(13, &[KeyValue::new("key1", "value2")]);
1220        gauge.record(6, &[KeyValue::new("key1", "value2")]);
1221
1222        test_context.flush_metrics();
1223
1224        // Assert
1225        let gauge_data_point = test_context.get_aggregation::<data::Gauge<i64>>("my_gauge", None);
1226        // Expecting 2 time-series.
1227        assert_eq!(gauge_data_point.data_points.len(), 2);
1228
1229        // find and validate key1=value2 datapoint
1230        let data_point1 =
1231            find_datapoint_with_key_value(&gauge_data_point.data_points, "key1", "value1")
1232                .expect("datapoint with key1=value1 expected");
1233        assert_eq!(data_point1.value, 4);
1234
1235        let data_point1 =
1236            find_datapoint_with_key_value(&gauge_data_point.data_points, "key1", "value2")
1237                .expect("datapoint with key1=value2 expected");
1238        assert_eq!(data_point1.value, 6);
1239
1240        // Reset and report more measurements
1241        test_context.reset_metrics();
1242        gauge.record(1, &[KeyValue::new("key1", "value1")]);
1243        gauge.record(2, &[KeyValue::new("key1", "value1")]);
1244        gauge.record(11, &[KeyValue::new("key1", "value1")]);
1245        gauge.record(3, &[KeyValue::new("key1", "value1")]);
1246        gauge.record(41, &[KeyValue::new("key1", "value1")]);
1247
1248        gauge.record(34, &[KeyValue::new("key1", "value2")]);
1249        gauge.record(12, &[KeyValue::new("key1", "value2")]);
1250        gauge.record(54, &[KeyValue::new("key1", "value2")]);
1251
1252        test_context.flush_metrics();
1253
1254        let sum = test_context.get_aggregation::<data::Gauge<i64>>("my_gauge", None);
1255        assert_eq!(sum.data_points.len(), 2);
1256        let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1257            .expect("datapoint with key1=value1 expected");
1258        assert_eq!(data_point1.value, 41);
1259
1260        let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value2")
1261            .expect("datapoint with key1=value2 expected");
1262        assert_eq!(data_point1.value, 54);
1263    }
1264
1265    fn counter_aggregation_helper(temporality: Temporality) {
1266        // Arrange
1267        let mut test_context = TestContext::new(temporality);
1268        let counter = test_context.u64_counter("test", "my_counter", None);
1269
1270        // Act
1271        counter.add(1, &[KeyValue::new("key1", "value1")]);
1272        counter.add(1, &[KeyValue::new("key1", "value1")]);
1273        counter.add(1, &[KeyValue::new("key1", "value1")]);
1274        counter.add(1, &[KeyValue::new("key1", "value1")]);
1275        counter.add(1, &[KeyValue::new("key1", "value1")]);
1276
1277        counter.add(1, &[KeyValue::new("key1", "value2")]);
1278        counter.add(1, &[KeyValue::new("key1", "value2")]);
1279        counter.add(1, &[KeyValue::new("key1", "value2")]);
1280
1281        test_context.flush_metrics();
1282
1283        // Assert
1284        let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
1285        // Expecting 2 time-series.
1286        assert_eq!(sum.data_points.len(), 2);
1287        assert!(sum.is_monotonic, "Counter should produce monotonic.");
1288        if let Temporality::Cumulative = temporality {
1289            assert_eq!(
1290                sum.temporality,
1291                Temporality::Cumulative,
1292                "Should produce cumulative"
1293            );
1294        } else {
1295            assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
1296        }
1297
1298        // find and validate key1=value2 datapoint
1299        let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1300            .expect("datapoint with key1=value1 expected");
1301        assert_eq!(data_point1.value, 5);
1302
1303        let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value2")
1304            .expect("datapoint with key1=value2 expected");
1305        assert_eq!(data_point1.value, 3);
1306
1307        // Reset and report more measurements
1308        test_context.reset_metrics();
1309        counter.add(1, &[KeyValue::new("key1", "value1")]);
1310        counter.add(1, &[KeyValue::new("key1", "value1")]);
1311        counter.add(1, &[KeyValue::new("key1", "value1")]);
1312        counter.add(1, &[KeyValue::new("key1", "value1")]);
1313        counter.add(1, &[KeyValue::new("key1", "value1")]);
1314
1315        counter.add(1, &[KeyValue::new("key1", "value2")]);
1316        counter.add(1, &[KeyValue::new("key1", "value2")]);
1317        counter.add(1, &[KeyValue::new("key1", "value2")]);
1318
1319        test_context.flush_metrics();
1320
1321        let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
1322        assert_eq!(sum.data_points.len(), 2);
1323        let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1324            .expect("datapoint with key1=value1 expected");
1325        if temporality == Temporality::Cumulative {
1326            assert_eq!(data_point1.value, 10);
1327        } else {
1328            assert_eq!(data_point1.value, 5);
1329        }
1330
1331        let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value2")
1332            .expect("datapoint with key1=value2 expected");
1333        if temporality == Temporality::Cumulative {
1334            assert_eq!(data_point1.value, 6);
1335        } else {
1336            assert_eq!(data_point1.value, 3);
1337        }
1338    }
1339
1340    fn updown_counter_aggregation_helper(temporality: Temporality) {
1341        // Arrange
1342        let mut test_context = TestContext::new(temporality);
1343        let counter = test_context.i64_up_down_counter("test", "my_updown_counter", None);
1344
1345        // Act
1346        counter.add(10, &[KeyValue::new("key1", "value1")]);
1347        counter.add(-1, &[KeyValue::new("key1", "value1")]);
1348        counter.add(-5, &[KeyValue::new("key1", "value1")]);
1349        counter.add(0, &[KeyValue::new("key1", "value1")]);
1350        counter.add(1, &[KeyValue::new("key1", "value1")]);
1351
1352        counter.add(10, &[KeyValue::new("key1", "value2")]);
1353        counter.add(0, &[KeyValue::new("key1", "value2")]);
1354        counter.add(-3, &[KeyValue::new("key1", "value2")]);
1355
1356        test_context.flush_metrics();
1357
1358        // Assert
1359        let sum = test_context.get_aggregation::<data::Sum<i64>>("my_updown_counter", None);
1360        // Expecting 2 time-series.
1361        assert_eq!(sum.data_points.len(), 2);
1362        assert!(
1363            !sum.is_monotonic,
1364            "UpDownCounter should produce non-monotonic."
1365        );
1366        if let Temporality::Cumulative = temporality {
1367            assert_eq!(
1368                sum.temporality,
1369                Temporality::Cumulative,
1370                "Should produce cumulative"
1371            );
1372        } else {
1373            assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
1374        }
1375
1376        // find and validate key1=value2 datapoint
1377        let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1378            .expect("datapoint with key1=value1 expected");
1379        assert_eq!(data_point1.value, 5);
1380
1381        let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value2")
1382            .expect("datapoint with key1=value2 expected");
1383        assert_eq!(data_point1.value, 7);
1384
1385        // Reset and report more measurements
1386        test_context.reset_metrics();
1387        counter.add(10, &[KeyValue::new("key1", "value1")]);
1388        counter.add(-1, &[KeyValue::new("key1", "value1")]);
1389        counter.add(-5, &[KeyValue::new("key1", "value1")]);
1390        counter.add(0, &[KeyValue::new("key1", "value1")]);
1391        counter.add(1, &[KeyValue::new("key1", "value1")]);
1392
1393        counter.add(10, &[KeyValue::new("key1", "value2")]);
1394        counter.add(0, &[KeyValue::new("key1", "value2")]);
1395        counter.add(-3, &[KeyValue::new("key1", "value2")]);
1396
1397        test_context.flush_metrics();
1398
1399        let sum = test_context.get_aggregation::<data::Sum<i64>>("my_updown_counter", None);
1400        assert_eq!(sum.data_points.len(), 2);
1401        let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1402            .expect("datapoint with key1=value1 expected");
1403        if temporality == Temporality::Cumulative {
1404            assert_eq!(data_point1.value, 10);
1405        } else {
1406            assert_eq!(data_point1.value, 5);
1407        }
1408
1409        let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value2")
1410            .expect("datapoint with key1=value2 expected");
1411        if temporality == Temporality::Cumulative {
1412            assert_eq!(data_point1.value, 14);
1413        } else {
1414            assert_eq!(data_point1.value, 7);
1415        }
1416    }
1417
1418    fn find_datapoint_with_key_value<'a, T>(
1419        data_points: &'a [DataPoint<T>],
1420        key: &str,
1421        value: &str,
1422    ) -> Option<&'a DataPoint<T>> {
1423        data_points.iter().find(|&datapoint| {
1424            datapoint
1425                .attributes
1426                .iter()
1427                .any(|kv| kv.key.as_str() == key && kv.value.as_str() == value)
1428        })
1429    }
1430
1431    fn find_histogram_datapoint_with_key_value<'a, T>(
1432        data_points: &'a [HistogramDataPoint<T>],
1433        key: &str,
1434        value: &str,
1435    ) -> Option<&'a HistogramDataPoint<T>> {
1436        data_points.iter().find(|&datapoint| {
1437            datapoint
1438                .attributes
1439                .iter()
1440                .any(|kv| kv.key.as_str() == key && kv.value.as_str() == value)
1441        })
1442    }
1443
1444    fn find_scope_metric<'a>(
1445        metrics: &'a [ScopeMetrics],
1446        name: &'a str,
1447    ) -> Option<&'a ScopeMetrics> {
1448        metrics
1449            .iter()
1450            .find(|&scope_metric| scope_metric.scope.name == name)
1451    }
1452
1453    struct TestContext {
1454        exporter: InMemoryMetricsExporter,
1455        meter_provider: SdkMeterProvider,
1456
1457        // Saving this on the test context for lifetime simplicity
1458        resource_metrics: Vec<ResourceMetrics>,
1459    }
1460
1461    impl TestContext {
1462        fn new(temporality: Temporality) -> Self {
1463            struct TestTemporalitySelector(Temporality);
1464            impl TemporalitySelector for TestTemporalitySelector {
1465                fn temporality(&self, _kind: InstrumentKind) -> Temporality {
1466                    self.0
1467                }
1468            }
1469
1470            let mut exporter = InMemoryMetricsExporterBuilder::new();
1471            exporter = exporter.with_temporality_selector(TestTemporalitySelector(temporality));
1472
1473            let exporter = exporter.build();
1474            let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
1475            let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
1476
1477            TestContext {
1478                exporter,
1479                meter_provider,
1480                resource_metrics: vec![],
1481            }
1482        }
1483
1484        fn u64_counter(
1485            &self,
1486            meter_name: &'static str,
1487            counter_name: &'static str,
1488            unit: Option<&'static str>,
1489        ) -> Counter<u64> {
1490            let meter = self.meter_provider.meter(meter_name);
1491            let mut counter_builder = meter.u64_counter(counter_name);
1492            if let Some(unit_name) = unit {
1493                counter_builder = counter_builder.with_unit(unit_name);
1494            }
1495            counter_builder.init()
1496        }
1497
1498        fn i64_up_down_counter(
1499            &self,
1500            meter_name: &'static str,
1501            counter_name: &'static str,
1502            unit: Option<&'static str>,
1503        ) -> UpDownCounter<i64> {
1504            let meter = self.meter_provider.meter(meter_name);
1505            let mut updown_counter_builder = meter.i64_up_down_counter(counter_name);
1506            if let Some(unit_name) = unit {
1507                updown_counter_builder = updown_counter_builder.with_unit(unit_name);
1508            }
1509            updown_counter_builder.init()
1510        }
1511
1512        fn meter(&self) -> Meter {
1513            self.meter_provider.meter("test")
1514        }
1515
1516        fn flush_metrics(&self) {
1517            self.meter_provider.force_flush().unwrap();
1518        }
1519
1520        fn reset_metrics(&self) {
1521            self.exporter.reset();
1522        }
1523
1524        fn get_aggregation<T: data::Aggregation>(
1525            &mut self,
1526            counter_name: &str,
1527            unit_name: Option<&str>,
1528        ) -> &T {
1529            self.resource_metrics = self
1530                .exporter
1531                .get_finished_metrics()
1532                .expect("metrics expected to be exported");
1533
1534            assert!(
1535                !self.resource_metrics.is_empty(),
1536                "no metrics were exported"
1537            );
1538
1539            assert!(
1540                self.resource_metrics.len() == 1,
1541                "Expected single resource metrics."
1542            );
1543            let resource_metric = self
1544                .resource_metrics
1545                .first()
1546                .expect("This should contain exactly one resource metric, as validated above.");
1547
1548            assert!(
1549                !resource_metric.scope_metrics.is_empty(),
1550                "No scope metrics in latest export"
1551            );
1552            assert!(!resource_metric.scope_metrics[0].metrics.is_empty());
1553
1554            let metric = &resource_metric.scope_metrics[0].metrics[0];
1555            assert_eq!(metric.name, counter_name);
1556            if let Some(expected_unit) = unit_name {
1557                assert_eq!(metric.unit, expected_unit);
1558            }
1559
1560            metric
1561                .data
1562                .as_any()
1563                .downcast_ref::<T>()
1564                .expect("Failed to cast aggregation to expected type")
1565        }
1566
1567        fn get_from_multiple_aggregations<T: data::Aggregation>(
1568            &mut self,
1569            counter_name: &str,
1570            unit_name: Option<&str>,
1571            invocation_count: usize,
1572        ) -> Vec<&T> {
1573            self.resource_metrics = self
1574                .exporter
1575                .get_finished_metrics()
1576                .expect("metrics expected to be exported");
1577
1578            assert!(
1579                !self.resource_metrics.is_empty(),
1580                "no metrics were exported"
1581            );
1582
1583            assert_eq!(
1584                self.resource_metrics.len(),
1585                invocation_count,
1586                "Expected collect to be called {} times",
1587                invocation_count
1588            );
1589
1590            let result = self
1591                .resource_metrics
1592                .iter()
1593                .map(|resource_metric| {
1594                    assert!(
1595                        !resource_metric.scope_metrics.is_empty(),
1596                        "An export with no scope metrics occurred"
1597                    );
1598
1599                    assert!(!resource_metric.scope_metrics[0].metrics.is_empty());
1600
1601                    let metric = &resource_metric.scope_metrics[0].metrics[0];
1602                    assert_eq!(metric.name, counter_name);
1603
1604                    if let Some(expected_unit) = unit_name {
1605                        assert_eq!(metric.unit, expected_unit);
1606                    }
1607
1608                    let aggregation = metric
1609                        .data
1610                        .as_any()
1611                        .downcast_ref::<T>()
1612                        .expect("Failed to cast aggregation to expected type");
1613                    aggregation
1614                })
1615                .collect::<Vec<_>>();
1616
1617            result
1618        }
1619    }
1620}