1pub(crate) mod aggregation;
43pub mod data;
44pub mod exporter;
45pub(crate) mod instrument;
46pub(crate) mod internal;
47pub(crate) mod manual_reader;
48pub(crate) mod meter;
49mod meter_provider;
50pub(crate) mod periodic_reader;
51pub(crate) mod pipeline;
52pub mod reader;
53pub(crate) mod view;
54
55pub use aggregation::*;
56pub use instrument::*;
57pub use manual_reader::*;
58pub use meter::*;
59pub use meter_provider::*;
60pub use periodic_reader::*;
61pub use pipeline::Pipeline;
62pub use view::*;
63
64use std::collections::hash_map::DefaultHasher;
65use std::collections::HashSet;
66use std::hash::{Hash, Hasher};
67
68use opentelemetry::{Key, KeyValue, Value};
69
70#[derive(Clone, Default, Debug, PartialEq, Eq)]
75pub struct AttributeSet(Vec<KeyValue>, u64);
76
77impl From<&[KeyValue]> for AttributeSet {
78 fn from(values: &[KeyValue]) -> Self {
79 let mut seen_keys = HashSet::with_capacity(values.len());
80 let vec = values
81 .iter()
82 .rev()
83 .filter_map(|kv| {
84 if seen_keys.insert(kv.key.clone()) {
85 Some(kv.clone())
86 } else {
87 None
88 }
89 })
90 .collect::<Vec<_>>();
91
92 AttributeSet::new(vec)
93 }
94}
95
96fn calculate_hash(values: &[KeyValue]) -> u64 {
97 let mut hasher = DefaultHasher::new();
98 values.iter().fold(&mut hasher, |mut hasher, item| {
99 item.hash(&mut hasher);
100 hasher
101 });
102 hasher.finish()
103}
104
105impl AttributeSet {
106 fn new(mut values: Vec<KeyValue>) -> Self {
107 values.sort_unstable();
108 let hash = calculate_hash(&values);
109 AttributeSet(values, hash)
110 }
111
112 pub fn is_empty(&self) -> bool {
114 self.0.is_empty()
115 }
116
117 pub fn retain<F>(&mut self, f: F)
119 where
120 F: Fn(&KeyValue) -> bool,
121 {
122 self.0.retain(|kv| f(kv));
123
124 self.1 = calculate_hash(&self.0);
126 }
127
128 pub fn iter(&self) -> impl Iterator<Item = (&Key, &Value)> {
130 self.0.iter().map(|kv| (&kv.key, &kv.value))
131 }
132}
133
134impl Hash for AttributeSet {
135 fn hash<H: Hasher>(&self, state: &mut H) {
136 state.write_u64(self.1)
137 }
138}
139
140#[cfg(all(test, feature = "testing"))]
141mod tests {
142 use self::data::{DataPoint, HistogramDataPoint, ScopeMetrics};
143 use super::*;
144 use crate::metrics::data::{ResourceMetrics, Temporality};
145 use crate::metrics::reader::TemporalitySelector;
146 use crate::testing::metrics::InMemoryMetricsExporterBuilder;
147 use crate::{runtime, testing::metrics::InMemoryMetricsExporter};
148 use opentelemetry::metrics::{Counter, Meter, UpDownCounter};
149 use opentelemetry::{metrics::MeterProvider as _, KeyValue};
150 use rand::{rngs, Rng, SeedableRng};
151 use std::borrow::Cow;
152 use std::sync::{Arc, Mutex};
153 use std::thread;
154 use std::time::Duration;
155
156 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
163 async fn counter_overflow_delta() {
164 let mut test_context = TestContext::new(Temporality::Delta);
166 let counter = test_context.u64_counter("test", "my_counter", None);
167
168 for v in 0..2000 {
171 counter.add(100, &[KeyValue::new("A", v.to_string())]);
172 }
173
174 counter.add(100, &[KeyValue::new("A", "foo")]);
176 counter.add(100, &[KeyValue::new("A", "another")]);
177 counter.add(100, &[KeyValue::new("A", "yet_another")]);
178 test_context.flush_metrics();
179
180 let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
181
182 assert_eq!(sum.data_points.len(), 2001);
184
185 let data_point =
186 find_datapoint_with_key_value(&sum.data_points, "otel.metric.overflow", "true")
187 .expect("overflow point expected");
188 assert_eq!(data_point.value, 300);
189 }
190
191 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
192 async fn counter_aggregation_cumulative() {
193 counter_aggregation_helper(Temporality::Cumulative);
196 }
197
198 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
199 async fn counter_aggregation_delta() {
200 counter_aggregation_helper(Temporality::Delta);
203 }
204
205 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
206 async fn histogram_aggregation_cumulative() {
207 histogram_aggregation_helper(Temporality::Cumulative);
210 }
211
212 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
213 async fn histogram_aggregation_delta() {
214 histogram_aggregation_helper(Temporality::Delta);
217 }
218
219 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
220 async fn updown_counter_aggregation_cumulative() {
221 updown_counter_aggregation_helper(Temporality::Cumulative);
224 }
225
226 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
227 async fn updown_counter_aggregation_delta() {
228 updown_counter_aggregation_helper(Temporality::Delta);
231 }
232
233 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
234 async fn gauge_aggregation() {
235 gauge_aggregation_helper(Temporality::Delta);
240 gauge_aggregation_helper(Temporality::Cumulative);
241 }
242
243 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
244 async fn observable_counter_aggregation_cumulative_non_zero_increment() {
245 observable_counter_aggregation_helper(Temporality::Cumulative, 100, 10, 4);
248 }
249
250 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
251 async fn observable_counter_aggregation_delta_non_zero_increment() {
252 observable_counter_aggregation_helper(Temporality::Delta, 100, 10, 4);
255 }
256
257 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
258 async fn observable_counter_aggregation_cumulative_zero_increment() {
259 observable_counter_aggregation_helper(Temporality::Cumulative, 100, 0, 4);
262 }
263
264 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
265 #[ignore = "Aggregation bug! https://github.com/open-telemetry/opentelemetry-rust/issues/1517"]
266 async fn observable_counter_aggregation_delta_zero_increment() {
267 observable_counter_aggregation_helper(Temporality::Delta, 100, 0, 4);
270 }
271
272 fn observable_counter_aggregation_helper(
273 temporality: Temporality,
274 start: u64,
275 increment: u64,
276 length: u64,
277 ) {
278 let mut test_context = TestContext::new(temporality);
280 let values: Vec<u64> = (0..length).map(|i| start + i * increment).collect();
282 println!("Testing with observable values: {:?}", values);
283 let values = Arc::new(values);
284 let values_clone = values.clone();
285 let i = Arc::new(Mutex::new(0));
286 let _observable_counter = test_context
287 .meter()
288 .u64_observable_counter("my_observable_counter")
289 .with_unit("my_unit")
290 .with_callback(move |observer| {
291 let mut index = i.lock().unwrap();
292 if *index < values.len() {
293 observer.observe(values[*index], &[KeyValue::new("key1", "value1")]);
294 *index += 1;
295 }
296 })
297 .init();
298
299 for (iter, v) in values_clone.iter().enumerate() {
300 test_context.flush_metrics();
301 let sum = test_context.get_aggregation::<data::Sum<u64>>("my_observable_counter", None);
302 assert_eq!(sum.data_points.len(), 1);
303 assert!(sum.is_monotonic, "Counter should produce monotonic.");
304 if let Temporality::Cumulative = temporality {
305 assert_eq!(
306 sum.temporality,
307 Temporality::Cumulative,
308 "Should produce cumulative"
309 );
310 } else {
311 assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
312 }
313
314 let data_point = find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
316 .expect("datapoint with key1=value1 expected");
317 if let Temporality::Cumulative = temporality {
318 assert_eq!(data_point.value, *v);
320 } else {
321 if iter == 0 {
324 assert_eq!(data_point.value, start);
325 } else {
326 assert_eq!(data_point.value, increment);
327 }
328 }
329
330 test_context.reset_metrics();
331 }
332 }
333
334 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
335 async fn counter_duplicate_instrument_merge() {
336 let exporter = InMemoryMetricsExporter::default();
338 let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
339 let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
340
341 let meter = meter_provider.meter("test");
343 let counter = meter
344 .u64_counter("my_counter")
345 .with_unit("my_unit")
346 .with_description("my_description")
347 .init();
348
349 let counter_duplicated = meter
350 .u64_counter("my_counter")
351 .with_unit("my_unit")
352 .with_description("my_description")
353 .init();
354
355 let attribute = vec![KeyValue::new("key1", "value1")];
356 counter.add(10, &attribute);
357 counter_duplicated.add(5, &attribute);
358
359 meter_provider.force_flush().unwrap();
360
361 let resource_metrics = exporter
363 .get_finished_metrics()
364 .expect("metrics are expected to be exported.");
365 assert!(
366 resource_metrics[0].scope_metrics[0].metrics.len() == 1,
367 "There should be single metric merging duplicate instruments"
368 );
369 let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
370 assert_eq!(metric.name, "my_counter");
371 assert_eq!(metric.unit, "my_unit");
372 let sum = metric
373 .data
374 .as_any()
375 .downcast_ref::<data::Sum<u64>>()
376 .expect("Sum aggregation expected for Counter instruments by default");
377
378 assert_eq!(sum.data_points.len(), 1);
380
381 let datapoint = &sum.data_points[0];
382 assert_eq!(datapoint.value, 15);
383 }
384
385 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
386 async fn counter_duplicate_instrument_different_meter_no_merge() {
387 let exporter = InMemoryMetricsExporter::default();
389 let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
390 let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
391
392 let meter1 = meter_provider.meter("test.meter1");
394 let meter2 = meter_provider.meter("test.meter2");
395 let counter1 = meter1
396 .u64_counter("my_counter")
397 .with_unit("my_unit")
398 .with_description("my_description")
399 .init();
400
401 let counter2 = meter2
402 .u64_counter("my_counter")
403 .with_unit("my_unit")
404 .with_description("my_description")
405 .init();
406
407 let attribute = vec![KeyValue::new("key1", "value1")];
408 counter1.add(10, &attribute);
409 counter2.add(5, &attribute);
410
411 meter_provider.force_flush().unwrap();
412
413 let resource_metrics = exporter
415 .get_finished_metrics()
416 .expect("metrics are expected to be exported.");
417 assert!(
418 resource_metrics[0].scope_metrics.len() == 2,
419 "There should be 2 separate scope"
420 );
421 assert!(
422 resource_metrics[0].scope_metrics[0].metrics.len() == 1,
423 "There should be single metric for the scope"
424 );
425 assert!(
426 resource_metrics[0].scope_metrics[1].metrics.len() == 1,
427 "There should be single metric for the scope"
428 );
429
430 let scope1 = find_scope_metric(&resource_metrics[0].scope_metrics, "test.meter1");
431 let scope2 = find_scope_metric(&resource_metrics[0].scope_metrics, "test.meter2");
432
433 if let Some(scope1) = scope1 {
434 let metric1 = &scope1.metrics[0];
435 assert_eq!(metric1.name, "my_counter");
436 assert_eq!(metric1.unit, "my_unit");
437 assert_eq!(metric1.description, "my_description");
438 let sum1 = metric1
439 .data
440 .as_any()
441 .downcast_ref::<data::Sum<u64>>()
442 .expect("Sum aggregation expected for Counter instruments by default");
443
444 assert_eq!(sum1.data_points.len(), 1);
446
447 let datapoint1 = &sum1.data_points[0];
448 assert_eq!(datapoint1.value, 10);
449 } else {
450 panic!("No MetricScope found for 'test.meter1'");
451 }
452
453 if let Some(scope2) = scope2 {
454 let metric2 = &scope2.metrics[0];
455 assert_eq!(metric2.name, "my_counter");
456 assert_eq!(metric2.unit, "my_unit");
457 assert_eq!(metric2.description, "my_description");
458 let sum2 = metric2
459 .data
460 .as_any()
461 .downcast_ref::<data::Sum<u64>>()
462 .expect("Sum aggregation expected for Counter instruments by default");
463
464 assert_eq!(sum2.data_points.len(), 1);
466
467 let datapoint2 = &sum2.data_points[0];
468 assert_eq!(datapoint2.value, 5);
469 } else {
470 panic!("No MetricScope found for 'test.meter2'");
471 }
472 }
473
474 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
475 async fn instrumentation_scope_identity_test() {
476 let exporter = InMemoryMetricsExporter::default();
478 let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
479 let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
480
481 let meter1 = meter_provider.versioned_meter(
485 "test.meter",
486 Some("v0.1.0"),
487 Some("schema_url"),
488 Some(vec![KeyValue::new("key", "value1")]),
489 );
490 let meter2 = meter_provider.versioned_meter(
491 "test.meter",
492 Some("v0.1.0"),
493 Some("schema_url"),
494 Some(vec![KeyValue::new("key", "value2")]),
495 );
496 let counter1 = meter1
497 .u64_counter("my_counter")
498 .with_unit("my_unit")
499 .with_description("my_description")
500 .init();
501
502 let counter2 = meter2
503 .u64_counter("my_counter")
504 .with_unit("my_unit")
505 .with_description("my_description")
506 .init();
507
508 let attribute = vec![KeyValue::new("key1", "value1")];
509 counter1.add(10, &attribute);
510 counter2.add(5, &attribute);
511
512 meter_provider.force_flush().unwrap();
513
514 let resource_metrics = exporter
516 .get_finished_metrics()
517 .expect("metrics are expected to be exported.");
518 println!("resource_metrics: {:?}", resource_metrics);
519 assert!(
520 resource_metrics[0].scope_metrics.len() == 1,
521 "There should be a single scope as the meters are identical"
522 );
523 assert!(
524 resource_metrics[0].scope_metrics[0].metrics.len() == 1,
525 "There should be single metric for the scope as instruments are identical"
526 );
527
528 let scope = &resource_metrics[0].scope_metrics[0].scope;
529 assert_eq!(scope.name, "test.meter");
530 assert_eq!(scope.version, Some(Cow::Borrowed("v0.1.0")));
531 assert_eq!(scope.schema_url, Some(Cow::Borrowed("schema_url")));
532
533 assert_eq!(scope.attributes, vec![KeyValue::new("key", "value1")]);
536
537 let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
538 assert_eq!(metric.name, "my_counter");
539 assert_eq!(metric.unit, "my_unit");
540 assert_eq!(metric.description, "my_description");
541 let sum = metric
542 .data
543 .as_any()
544 .downcast_ref::<data::Sum<u64>>()
545 .expect("Sum aggregation expected for Counter instruments by default");
546
547 assert_eq!(sum.data_points.len(), 1);
549
550 let datapoint = &sum.data_points[0];
551 assert_eq!(datapoint.value, 15);
552 }
553
554 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
555 async fn histogram_aggregation_with_invalid_aggregation_should_proceed_as_if_view_not_exist() {
556 let exporter = InMemoryMetricsExporter::default();
561 let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
562 let criteria = Instrument::new().name("test_histogram");
563 let stream_invalid_aggregation = Stream::new()
564 .aggregation(Aggregation::ExplicitBucketHistogram {
565 boundaries: vec![0.9, 1.9, 1.2, 1.3, 1.4, 1.5], record_min_max: false,
567 })
568 .name("test_histogram_renamed")
569 .unit("test_unit_renamed");
570
571 let view =
572 new_view(criteria, stream_invalid_aggregation).expect("Expected to create a new view");
573 let meter_provider = SdkMeterProvider::builder()
574 .with_reader(reader)
575 .with_view(view)
576 .build();
577
578 let meter = meter_provider.meter("test");
580 let histogram = meter
581 .f64_histogram("test_histogram")
582 .with_unit("test_unit")
583 .init();
584
585 histogram.record(1.5, &[KeyValue::new("key1", "value1")]);
586 meter_provider.force_flush().unwrap();
587
588 let resource_metrics = exporter
590 .get_finished_metrics()
591 .expect("metrics are expected to be exported.");
592 assert!(!resource_metrics.is_empty());
593 let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
594 assert_eq!(
595 metric.name, "test_histogram",
596 "View rename should be ignored and original name retained."
597 );
598 assert_eq!(
599 metric.unit, "test_unit",
600 "View rename of unit should be ignored and original unit retained."
601 );
602 }
603
604 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
605 async fn spatial_aggregation_when_view_drops_attributes_observable_counter() {
606 let exporter = InMemoryMetricsExporter::default();
610 let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
611 let criteria = Instrument::new().name("my_observable_counter");
612 let stream_invalid_aggregation = Stream::new().allowed_attribute_keys(vec![]);
614
615 let view =
616 new_view(criteria, stream_invalid_aggregation).expect("Expected to create a new view");
617 let meter_provider = SdkMeterProvider::builder()
618 .with_reader(reader)
619 .with_view(view)
620 .build();
621
622 let meter = meter_provider.meter("test");
624 let _observable_counter = meter
625 .u64_observable_counter("my_observable_counter")
626 .with_callback(|observer| {
627 observer.observe(
628 100,
629 &[
630 KeyValue::new("statusCode", "200"),
631 KeyValue::new("verb", "get"),
632 ],
633 );
634
635 observer.observe(
636 100,
637 &[
638 KeyValue::new("statusCode", "200"),
639 KeyValue::new("verb", "post"),
640 ],
641 );
642
643 observer.observe(
644 100,
645 &[
646 KeyValue::new("statusCode", "500"),
647 KeyValue::new("verb", "get"),
648 ],
649 );
650 })
651 .init();
652
653 meter_provider.force_flush().unwrap();
654
655 let resource_metrics = exporter
657 .get_finished_metrics()
658 .expect("metrics are expected to be exported.");
659 assert!(!resource_metrics.is_empty());
660 let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
661 assert_eq!(metric.name, "my_observable_counter",);
662
663 let sum = metric
664 .data
665 .as_any()
666 .downcast_ref::<data::Sum<u64>>()
667 .expect("Sum aggregation expected for ObservableCounter instruments by default");
668
669 assert_eq!(sum.data_points.len(), 1);
673
674 let data_point = &sum.data_points[0];
676 assert_eq!(data_point.value, 300);
677 }
678
679 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
680 async fn spatial_aggregation_when_view_drops_attributes_counter() {
681 let exporter = InMemoryMetricsExporter::default();
685 let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
686 let criteria = Instrument::new().name("my_counter");
687 let stream_invalid_aggregation = Stream::new().allowed_attribute_keys(vec![]);
689
690 let view =
691 new_view(criteria, stream_invalid_aggregation).expect("Expected to create a new view");
692 let meter_provider = SdkMeterProvider::builder()
693 .with_reader(reader)
694 .with_view(view)
695 .build();
696
697 let meter = meter_provider.meter("test");
699 let counter = meter.u64_counter("my_counter").init();
700
701 counter.add(
704 10,
705 [
706 KeyValue::new("statusCode", "200"),
707 KeyValue::new("verb", "Get"),
708 ]
709 .as_ref(),
710 );
711
712 counter.add(
713 10,
714 [
715 KeyValue::new("statusCode", "500"),
716 KeyValue::new("verb", "Get"),
717 ]
718 .as_ref(),
719 );
720
721 counter.add(
722 10,
723 [
724 KeyValue::new("statusCode", "200"),
725 KeyValue::new("verb", "Post"),
726 ]
727 .as_ref(),
728 );
729
730 meter_provider.force_flush().unwrap();
731
732 let resource_metrics = exporter
734 .get_finished_metrics()
735 .expect("metrics are expected to be exported.");
736 assert!(!resource_metrics.is_empty());
737 let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
738 assert_eq!(metric.name, "my_counter",);
739
740 let sum = metric
741 .data
742 .as_any()
743 .downcast_ref::<data::Sum<u64>>()
744 .expect("Sum aggregation expected for Counter instruments by default");
745
746 assert_eq!(sum.data_points.len(), 1);
750 let data_point = &sum.data_points[0];
752 assert_eq!(data_point.value, 30);
753 }
754
755 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
756 async fn counter_aggregation_attribute_order() {
757 let mut test_context = TestContext::new(Temporality::Delta);
762 let counter = test_context.u64_counter("test", "my_counter", None);
763
764 counter.add(
768 1,
769 &[
770 KeyValue::new("A", "a"),
771 KeyValue::new("B", "b"),
772 KeyValue::new("C", "c"),
773 ],
774 );
775 counter.add(
776 1,
777 &[
778 KeyValue::new("A", "a"),
779 KeyValue::new("C", "c"),
780 KeyValue::new("B", "b"),
781 ],
782 );
783 counter.add(
784 1,
785 &[
786 KeyValue::new("B", "b"),
787 KeyValue::new("A", "a"),
788 KeyValue::new("C", "c"),
789 ],
790 );
791 counter.add(
792 1,
793 &[
794 KeyValue::new("B", "b"),
795 KeyValue::new("C", "c"),
796 KeyValue::new("A", "a"),
797 ],
798 );
799 counter.add(
800 1,
801 &[
802 KeyValue::new("C", "c"),
803 KeyValue::new("B", "b"),
804 KeyValue::new("A", "a"),
805 ],
806 );
807 counter.add(
808 1,
809 &[
810 KeyValue::new("C", "c"),
811 KeyValue::new("A", "a"),
812 KeyValue::new("B", "b"),
813 ],
814 );
815 test_context.flush_metrics();
816
817 let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
818
819 assert_eq!(sum.data_points.len(), 1);
821
822 let data_point1 = &sum.data_points[0];
824 assert_eq!(data_point1.value, 6);
825 }
826
827 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
828 async fn no_attr_cumulative_counter() {
829 let mut test_context = TestContext::new(Temporality::Cumulative);
830 let counter = test_context.u64_counter("test", "my_counter", None);
831
832 counter.add(50, &[]);
833 test_context.flush_metrics();
834
835 let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
836
837 assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
838 assert!(sum.is_monotonic, "Should produce monotonic.");
839 assert_eq!(
840 sum.temporality,
841 Temporality::Cumulative,
842 "Should produce cumulative"
843 );
844
845 let data_point = &sum.data_points[0];
846 assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
847 assert_eq!(data_point.value, 50, "Unexpected data point value");
848 }
849
850 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
851 async fn no_attr_delta_counter() {
852 let mut test_context = TestContext::new(Temporality::Delta);
853 let counter = test_context.u64_counter("test", "my_counter", None);
854
855 counter.add(50, &[]);
856 test_context.flush_metrics();
857
858 let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
859
860 assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
861 assert!(sum.is_monotonic, "Should produce monotonic.");
862 assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
863
864 let data_point = &sum.data_points[0];
865 assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
866 assert_eq!(data_point.value, 50, "Unexpected data point value");
867 }
868
869 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
870 async fn no_attr_cumulative_up_down_counter() {
871 let mut test_context = TestContext::new(Temporality::Cumulative);
872 let counter = test_context.i64_up_down_counter("test", "my_counter", Some("my_unit"));
873
874 counter.add(50, &[]);
875 test_context.flush_metrics();
876
877 let sum = test_context.get_aggregation::<data::Sum<i64>>("my_counter", Some("my_unit"));
878
879 assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
880 assert!(!sum.is_monotonic, "Should not produce monotonic.");
881 assert_eq!(
882 sum.temporality,
883 Temporality::Cumulative,
884 "Should produce cumulative"
885 );
886
887 let data_point = &sum.data_points[0];
888 assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
889 assert_eq!(data_point.value, 50, "Unexpected data point value");
890 }
891
892 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
893 async fn no_attr_delta_up_down_counter() {
894 let mut test_context = TestContext::new(Temporality::Delta);
895 let counter = test_context.i64_up_down_counter("test", "my_counter", Some("my_unit"));
896
897 counter.add(50, &[]);
898 test_context.flush_metrics();
899
900 let sum = test_context.get_aggregation::<data::Sum<i64>>("my_counter", Some("my_unit"));
901
902 assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
903 assert!(!sum.is_monotonic, "Should not produce monotonic.");
904 assert_eq!(sum.temporality, Temporality::Delta, "Should produce Delta");
905
906 let data_point = &sum.data_points[0];
907 assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
908 assert_eq!(data_point.value, 50, "Unexpected data point value");
909 }
910
911 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
912 async fn no_attr_cumulative_counter_value_added_after_export() {
913 let mut test_context = TestContext::new(Temporality::Cumulative);
914 let counter = test_context.u64_counter("test", "my_counter", None);
915
916 counter.add(50, &[]);
917 test_context.flush_metrics();
918 let _ = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
919 test_context.reset_metrics();
920
921 counter.add(5, &[]);
922 test_context.flush_metrics();
923 let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
924
925 assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
926 assert!(sum.is_monotonic, "Should produce monotonic.");
927 assert_eq!(
928 sum.temporality,
929 Temporality::Cumulative,
930 "Should produce cumulative"
931 );
932
933 let data_point = &sum.data_points[0];
934 assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
935 assert_eq!(data_point.value, 55, "Unexpected data point value");
936 }
937
938 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
939 async fn no_attr_delta_counter_value_reset_after_export() {
940 let mut test_context = TestContext::new(Temporality::Delta);
941 let counter = test_context.u64_counter("test", "my_counter", None);
942
943 counter.add(50, &[]);
944 test_context.flush_metrics();
945 let _ = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
946 test_context.reset_metrics();
947
948 counter.add(5, &[]);
949 test_context.flush_metrics();
950 let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
951
952 assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
953 assert!(sum.is_monotonic, "Should produce monotonic.");
954 assert_eq!(
955 sum.temporality,
956 Temporality::Delta,
957 "Should produce cumulative"
958 );
959
960 let data_point = &sum.data_points[0];
961 assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
962 assert_eq!(data_point.value, 5, "Unexpected data point value");
963 }
964
965 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
966 async fn second_delta_export_does_not_give_no_attr_value_if_add_not_called() {
967 let mut test_context = TestContext::new(Temporality::Delta);
968 let counter = test_context.u64_counter("test", "my_counter", None);
969
970 counter.add(50, &[]);
971 test_context.flush_metrics();
972 let _ = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
973 test_context.reset_metrics();
974
975 counter.add(50, &[KeyValue::new("a", "b")]);
976 test_context.flush_metrics();
977 let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
978
979 let no_attr_data_point = sum.data_points.iter().find(|x| x.attributes.is_empty());
980
981 assert!(
982 no_attr_data_point.is_none(),
983 "Expected no data points with no attributes"
984 );
985 }
986
987 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
988 #[ignore = "Known bug: https://github.com/open-telemetry/opentelemetry-rust/issues/1598"]
989 async fn delta_memory_efficiency_test() {
990 let mut test_context = TestContext::new(Temporality::Delta);
995 let counter = test_context.u64_counter("test", "my_counter", None);
996
997 counter.add(1, &[KeyValue::new("key1", "value1")]);
999 counter.add(1, &[KeyValue::new("key1", "value1")]);
1000 counter.add(1, &[KeyValue::new("key1", "value1")]);
1001 counter.add(1, &[KeyValue::new("key1", "value1")]);
1002 counter.add(1, &[KeyValue::new("key1", "value1")]);
1003
1004 counter.add(1, &[KeyValue::new("key1", "value2")]);
1005 counter.add(1, &[KeyValue::new("key1", "value2")]);
1006 counter.add(1, &[KeyValue::new("key1", "value2")]);
1007 test_context.flush_metrics();
1008
1009 let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
1010
1011 assert_eq!(sum.data_points.len(), 2);
1013
1014 let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1016 .expect("datapoint with key1=value1 expected");
1017 assert_eq!(data_point1.value, 5);
1018
1019 let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value2")
1021 .expect("datapoint with key1=value2 expected");
1022 assert_eq!(data_point1.value, 3);
1023
1024 test_context.exporter.reset();
1025 test_context.flush_metrics();
1028
1029 let resource_metrics = test_context
1030 .exporter
1031 .get_finished_metrics()
1032 .expect("metrics are expected to be exported.");
1033 println!("resource_metrics: {:?}", resource_metrics);
1034 assert!(resource_metrics.is_empty(), "No metrics should be exported as no new measurements were recorded since last collect.");
1035 }
1036
1037 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1038 async fn counter_multithreaded() {
1039 counter_multithreaded_aggregation_helper(Temporality::Delta);
1043 counter_multithreaded_aggregation_helper(Temporality::Cumulative);
1044 }
1045
1046 fn counter_multithreaded_aggregation_helper(temporality: Temporality) {
1047 let mut test_context = TestContext::new(temporality);
1049 let counter = Arc::new(test_context.u64_counter("test", "my_counter", None));
1050
1051 for i in 0..10 {
1052 thread::scope(|s| {
1053 s.spawn(|| {
1054 counter.add(1, &[KeyValue::new("key1", "value1")]);
1055 counter.add(1, &[KeyValue::new("key1", "value1")]);
1056 counter.add(1, &[KeyValue::new("key1", "value1")]);
1057
1058 if i % 2 == 0 {
1060 test_context.flush_metrics();
1061 thread::sleep(Duration::from_millis(i)); }
1063
1064 counter.add(1, &[KeyValue::new("key1", "value1")]);
1065 counter.add(1, &[KeyValue::new("key1", "value1")]);
1066 });
1067 });
1068 }
1069
1070 test_context.flush_metrics();
1071
1072 let sums =
1075 test_context.get_from_multiple_aggregations::<data::Sum<u64>>("my_counter", None, 6);
1076
1077 let values = sums
1078 .iter()
1079 .map(|sum| {
1080 assert_eq!(sum.data_points.len(), 1); assert!(sum.is_monotonic, "Counter should produce monotonic.");
1082 assert_eq!(sum.temporality, temporality);
1083
1084 let data_point = find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1086 .expect("datapoint with key1=value1 expected");
1087
1088 data_point.value
1089 })
1090 .collect::<Vec<_>>();
1091
1092 let total_sum: u64 = if temporality == Temporality::Delta {
1093 values.iter().sum()
1094 } else {
1095 *values.last().unwrap()
1096 };
1097
1098 assert_eq!(total_sum, 50); }
1100
1101 fn histogram_aggregation_helper(temporality: Temporality) {
1102 let mut test_context = TestContext::new(temporality);
1104 let histogram = test_context.meter().u64_histogram("my_histogram").init();
1105
1106 let mut rand = rngs::SmallRng::from_entropy();
1108 let values_kv1 = (0..50)
1109 .map(|_| rand.gen_range(0..100))
1110 .collect::<Vec<u64>>();
1111 for value in values_kv1.iter() {
1112 histogram.record(*value, &[KeyValue::new("key1", "value1")]);
1113 }
1114
1115 let values_kv2 = (0..30)
1116 .map(|_| rand.gen_range(0..100))
1117 .collect::<Vec<u64>>();
1118 for value in values_kv2.iter() {
1119 histogram.record(*value, &[KeyValue::new("key1", "value2")]);
1120 }
1121
1122 test_context.flush_metrics();
1123
1124 let histogram_data =
1126 test_context.get_aggregation::<data::Histogram<u64>>("my_histogram", None);
1127 assert_eq!(histogram_data.data_points.len(), 2);
1129 if let Temporality::Cumulative = temporality {
1130 assert_eq!(
1131 histogram_data.temporality,
1132 Temporality::Cumulative,
1133 "Should produce cumulative"
1134 );
1135 } else {
1136 assert_eq!(
1137 histogram_data.temporality,
1138 Temporality::Delta,
1139 "Should produce delta"
1140 );
1141 }
1142
1143 let data_point1 =
1145 find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value1")
1146 .expect("datapoint with key1=value1 expected");
1147 assert_eq!(data_point1.count, values_kv1.len() as u64);
1148 assert_eq!(data_point1.sum, values_kv1.iter().sum::<u64>());
1149 assert_eq!(data_point1.min.unwrap(), *values_kv1.iter().min().unwrap());
1150 assert_eq!(data_point1.max.unwrap(), *values_kv1.iter().max().unwrap());
1151
1152 let data_point2 =
1153 find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value2")
1154 .expect("datapoint with key1=value2 expected");
1155 assert_eq!(data_point2.count, values_kv2.len() as u64);
1156 assert_eq!(data_point2.sum, values_kv2.iter().sum::<u64>());
1157 assert_eq!(data_point2.min.unwrap(), *values_kv2.iter().min().unwrap());
1158 assert_eq!(data_point2.max.unwrap(), *values_kv2.iter().max().unwrap());
1159
1160 test_context.reset_metrics();
1162 for value in values_kv1.iter() {
1163 histogram.record(*value, &[KeyValue::new("key1", "value1")]);
1164 }
1165
1166 for value in values_kv2.iter() {
1167 histogram.record(*value, &[KeyValue::new("key1", "value2")]);
1168 }
1169
1170 test_context.flush_metrics();
1171
1172 let histogram_data =
1173 test_context.get_aggregation::<data::Histogram<u64>>("my_histogram", None);
1174 assert_eq!(histogram_data.data_points.len(), 2);
1175 let data_point1 =
1176 find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value1")
1177 .expect("datapoint with key1=value1 expected");
1178 if temporality == Temporality::Cumulative {
1179 assert_eq!(data_point1.count, 2 * (values_kv1.len() as u64));
1180 assert_eq!(data_point1.sum, 2 * (values_kv1.iter().sum::<u64>()));
1181 assert_eq!(data_point1.min.unwrap(), *values_kv1.iter().min().unwrap());
1182 assert_eq!(data_point1.max.unwrap(), *values_kv1.iter().max().unwrap());
1183 } else {
1184 assert_eq!(data_point1.count, values_kv1.len() as u64);
1185 assert_eq!(data_point1.sum, values_kv1.iter().sum::<u64>());
1186 assert_eq!(data_point1.min.unwrap(), *values_kv1.iter().min().unwrap());
1187 assert_eq!(data_point1.max.unwrap(), *values_kv1.iter().max().unwrap());
1188 }
1189
1190 let data_point1 =
1191 find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value2")
1192 .expect("datapoint with key1=value1 expected");
1193 if temporality == Temporality::Cumulative {
1194 assert_eq!(data_point1.count, 2 * (values_kv2.len() as u64));
1195 assert_eq!(data_point1.sum, 2 * (values_kv2.iter().sum::<u64>()));
1196 assert_eq!(data_point1.min.unwrap(), *values_kv2.iter().min().unwrap());
1197 assert_eq!(data_point1.max.unwrap(), *values_kv2.iter().max().unwrap());
1198 } else {
1199 assert_eq!(data_point1.count, values_kv2.len() as u64);
1200 assert_eq!(data_point1.sum, values_kv2.iter().sum::<u64>());
1201 assert_eq!(data_point1.min.unwrap(), *values_kv2.iter().min().unwrap());
1202 assert_eq!(data_point1.max.unwrap(), *values_kv2.iter().max().unwrap());
1203 }
1204 }
1205
1206 fn gauge_aggregation_helper(temporality: Temporality) {
1207 let mut test_context = TestContext::new(temporality);
1209 let gauge = test_context.meter().i64_gauge("my_gauge").init();
1210
1211 gauge.record(1, &[KeyValue::new("key1", "value1")]);
1213 gauge.record(2, &[KeyValue::new("key1", "value1")]);
1214 gauge.record(1, &[KeyValue::new("key1", "value1")]);
1215 gauge.record(3, &[KeyValue::new("key1", "value1")]);
1216 gauge.record(4, &[KeyValue::new("key1", "value1")]);
1217
1218 gauge.record(11, &[KeyValue::new("key1", "value2")]);
1219 gauge.record(13, &[KeyValue::new("key1", "value2")]);
1220 gauge.record(6, &[KeyValue::new("key1", "value2")]);
1221
1222 test_context.flush_metrics();
1223
1224 let gauge_data_point = test_context.get_aggregation::<data::Gauge<i64>>("my_gauge", None);
1226 assert_eq!(gauge_data_point.data_points.len(), 2);
1228
1229 let data_point1 =
1231 find_datapoint_with_key_value(&gauge_data_point.data_points, "key1", "value1")
1232 .expect("datapoint with key1=value1 expected");
1233 assert_eq!(data_point1.value, 4);
1234
1235 let data_point1 =
1236 find_datapoint_with_key_value(&gauge_data_point.data_points, "key1", "value2")
1237 .expect("datapoint with key1=value2 expected");
1238 assert_eq!(data_point1.value, 6);
1239
1240 test_context.reset_metrics();
1242 gauge.record(1, &[KeyValue::new("key1", "value1")]);
1243 gauge.record(2, &[KeyValue::new("key1", "value1")]);
1244 gauge.record(11, &[KeyValue::new("key1", "value1")]);
1245 gauge.record(3, &[KeyValue::new("key1", "value1")]);
1246 gauge.record(41, &[KeyValue::new("key1", "value1")]);
1247
1248 gauge.record(34, &[KeyValue::new("key1", "value2")]);
1249 gauge.record(12, &[KeyValue::new("key1", "value2")]);
1250 gauge.record(54, &[KeyValue::new("key1", "value2")]);
1251
1252 test_context.flush_metrics();
1253
1254 let sum = test_context.get_aggregation::<data::Gauge<i64>>("my_gauge", None);
1255 assert_eq!(sum.data_points.len(), 2);
1256 let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1257 .expect("datapoint with key1=value1 expected");
1258 assert_eq!(data_point1.value, 41);
1259
1260 let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value2")
1261 .expect("datapoint with key1=value2 expected");
1262 assert_eq!(data_point1.value, 54);
1263 }
1264
1265 fn counter_aggregation_helper(temporality: Temporality) {
1266 let mut test_context = TestContext::new(temporality);
1268 let counter = test_context.u64_counter("test", "my_counter", None);
1269
1270 counter.add(1, &[KeyValue::new("key1", "value1")]);
1272 counter.add(1, &[KeyValue::new("key1", "value1")]);
1273 counter.add(1, &[KeyValue::new("key1", "value1")]);
1274 counter.add(1, &[KeyValue::new("key1", "value1")]);
1275 counter.add(1, &[KeyValue::new("key1", "value1")]);
1276
1277 counter.add(1, &[KeyValue::new("key1", "value2")]);
1278 counter.add(1, &[KeyValue::new("key1", "value2")]);
1279 counter.add(1, &[KeyValue::new("key1", "value2")]);
1280
1281 test_context.flush_metrics();
1282
1283 let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
1285 assert_eq!(sum.data_points.len(), 2);
1287 assert!(sum.is_monotonic, "Counter should produce monotonic.");
1288 if let Temporality::Cumulative = temporality {
1289 assert_eq!(
1290 sum.temporality,
1291 Temporality::Cumulative,
1292 "Should produce cumulative"
1293 );
1294 } else {
1295 assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
1296 }
1297
1298 let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1300 .expect("datapoint with key1=value1 expected");
1301 assert_eq!(data_point1.value, 5);
1302
1303 let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value2")
1304 .expect("datapoint with key1=value2 expected");
1305 assert_eq!(data_point1.value, 3);
1306
1307 test_context.reset_metrics();
1309 counter.add(1, &[KeyValue::new("key1", "value1")]);
1310 counter.add(1, &[KeyValue::new("key1", "value1")]);
1311 counter.add(1, &[KeyValue::new("key1", "value1")]);
1312 counter.add(1, &[KeyValue::new("key1", "value1")]);
1313 counter.add(1, &[KeyValue::new("key1", "value1")]);
1314
1315 counter.add(1, &[KeyValue::new("key1", "value2")]);
1316 counter.add(1, &[KeyValue::new("key1", "value2")]);
1317 counter.add(1, &[KeyValue::new("key1", "value2")]);
1318
1319 test_context.flush_metrics();
1320
1321 let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
1322 assert_eq!(sum.data_points.len(), 2);
1323 let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1324 .expect("datapoint with key1=value1 expected");
1325 if temporality == Temporality::Cumulative {
1326 assert_eq!(data_point1.value, 10);
1327 } else {
1328 assert_eq!(data_point1.value, 5);
1329 }
1330
1331 let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value2")
1332 .expect("datapoint with key1=value2 expected");
1333 if temporality == Temporality::Cumulative {
1334 assert_eq!(data_point1.value, 6);
1335 } else {
1336 assert_eq!(data_point1.value, 3);
1337 }
1338 }
1339
1340 fn updown_counter_aggregation_helper(temporality: Temporality) {
1341 let mut test_context = TestContext::new(temporality);
1343 let counter = test_context.i64_up_down_counter("test", "my_updown_counter", None);
1344
1345 counter.add(10, &[KeyValue::new("key1", "value1")]);
1347 counter.add(-1, &[KeyValue::new("key1", "value1")]);
1348 counter.add(-5, &[KeyValue::new("key1", "value1")]);
1349 counter.add(0, &[KeyValue::new("key1", "value1")]);
1350 counter.add(1, &[KeyValue::new("key1", "value1")]);
1351
1352 counter.add(10, &[KeyValue::new("key1", "value2")]);
1353 counter.add(0, &[KeyValue::new("key1", "value2")]);
1354 counter.add(-3, &[KeyValue::new("key1", "value2")]);
1355
1356 test_context.flush_metrics();
1357
1358 let sum = test_context.get_aggregation::<data::Sum<i64>>("my_updown_counter", None);
1360 assert_eq!(sum.data_points.len(), 2);
1362 assert!(
1363 !sum.is_monotonic,
1364 "UpDownCounter should produce non-monotonic."
1365 );
1366 if let Temporality::Cumulative = temporality {
1367 assert_eq!(
1368 sum.temporality,
1369 Temporality::Cumulative,
1370 "Should produce cumulative"
1371 );
1372 } else {
1373 assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
1374 }
1375
1376 let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1378 .expect("datapoint with key1=value1 expected");
1379 assert_eq!(data_point1.value, 5);
1380
1381 let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value2")
1382 .expect("datapoint with key1=value2 expected");
1383 assert_eq!(data_point1.value, 7);
1384
1385 test_context.reset_metrics();
1387 counter.add(10, &[KeyValue::new("key1", "value1")]);
1388 counter.add(-1, &[KeyValue::new("key1", "value1")]);
1389 counter.add(-5, &[KeyValue::new("key1", "value1")]);
1390 counter.add(0, &[KeyValue::new("key1", "value1")]);
1391 counter.add(1, &[KeyValue::new("key1", "value1")]);
1392
1393 counter.add(10, &[KeyValue::new("key1", "value2")]);
1394 counter.add(0, &[KeyValue::new("key1", "value2")]);
1395 counter.add(-3, &[KeyValue::new("key1", "value2")]);
1396
1397 test_context.flush_metrics();
1398
1399 let sum = test_context.get_aggregation::<data::Sum<i64>>("my_updown_counter", None);
1400 assert_eq!(sum.data_points.len(), 2);
1401 let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1402 .expect("datapoint with key1=value1 expected");
1403 if temporality == Temporality::Cumulative {
1404 assert_eq!(data_point1.value, 10);
1405 } else {
1406 assert_eq!(data_point1.value, 5);
1407 }
1408
1409 let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value2")
1410 .expect("datapoint with key1=value2 expected");
1411 if temporality == Temporality::Cumulative {
1412 assert_eq!(data_point1.value, 14);
1413 } else {
1414 assert_eq!(data_point1.value, 7);
1415 }
1416 }
1417
1418 fn find_datapoint_with_key_value<'a, T>(
1419 data_points: &'a [DataPoint<T>],
1420 key: &str,
1421 value: &str,
1422 ) -> Option<&'a DataPoint<T>> {
1423 data_points.iter().find(|&datapoint| {
1424 datapoint
1425 .attributes
1426 .iter()
1427 .any(|kv| kv.key.as_str() == key && kv.value.as_str() == value)
1428 })
1429 }
1430
1431 fn find_histogram_datapoint_with_key_value<'a, T>(
1432 data_points: &'a [HistogramDataPoint<T>],
1433 key: &str,
1434 value: &str,
1435 ) -> Option<&'a HistogramDataPoint<T>> {
1436 data_points.iter().find(|&datapoint| {
1437 datapoint
1438 .attributes
1439 .iter()
1440 .any(|kv| kv.key.as_str() == key && kv.value.as_str() == value)
1441 })
1442 }
1443
1444 fn find_scope_metric<'a>(
1445 metrics: &'a [ScopeMetrics],
1446 name: &'a str,
1447 ) -> Option<&'a ScopeMetrics> {
1448 metrics
1449 .iter()
1450 .find(|&scope_metric| scope_metric.scope.name == name)
1451 }
1452
1453 struct TestContext {
1454 exporter: InMemoryMetricsExporter,
1455 meter_provider: SdkMeterProvider,
1456
1457 resource_metrics: Vec<ResourceMetrics>,
1459 }
1460
1461 impl TestContext {
1462 fn new(temporality: Temporality) -> Self {
1463 struct TestTemporalitySelector(Temporality);
1464 impl TemporalitySelector for TestTemporalitySelector {
1465 fn temporality(&self, _kind: InstrumentKind) -> Temporality {
1466 self.0
1467 }
1468 }
1469
1470 let mut exporter = InMemoryMetricsExporterBuilder::new();
1471 exporter = exporter.with_temporality_selector(TestTemporalitySelector(temporality));
1472
1473 let exporter = exporter.build();
1474 let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
1475 let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
1476
1477 TestContext {
1478 exporter,
1479 meter_provider,
1480 resource_metrics: vec![],
1481 }
1482 }
1483
1484 fn u64_counter(
1485 &self,
1486 meter_name: &'static str,
1487 counter_name: &'static str,
1488 unit: Option<&'static str>,
1489 ) -> Counter<u64> {
1490 let meter = self.meter_provider.meter(meter_name);
1491 let mut counter_builder = meter.u64_counter(counter_name);
1492 if let Some(unit_name) = unit {
1493 counter_builder = counter_builder.with_unit(unit_name);
1494 }
1495 counter_builder.init()
1496 }
1497
1498 fn i64_up_down_counter(
1499 &self,
1500 meter_name: &'static str,
1501 counter_name: &'static str,
1502 unit: Option<&'static str>,
1503 ) -> UpDownCounter<i64> {
1504 let meter = self.meter_provider.meter(meter_name);
1505 let mut updown_counter_builder = meter.i64_up_down_counter(counter_name);
1506 if let Some(unit_name) = unit {
1507 updown_counter_builder = updown_counter_builder.with_unit(unit_name);
1508 }
1509 updown_counter_builder.init()
1510 }
1511
1512 fn meter(&self) -> Meter {
1513 self.meter_provider.meter("test")
1514 }
1515
1516 fn flush_metrics(&self) {
1517 self.meter_provider.force_flush().unwrap();
1518 }
1519
1520 fn reset_metrics(&self) {
1521 self.exporter.reset();
1522 }
1523
1524 fn get_aggregation<T: data::Aggregation>(
1525 &mut self,
1526 counter_name: &str,
1527 unit_name: Option<&str>,
1528 ) -> &T {
1529 self.resource_metrics = self
1530 .exporter
1531 .get_finished_metrics()
1532 .expect("metrics expected to be exported");
1533
1534 assert!(
1535 !self.resource_metrics.is_empty(),
1536 "no metrics were exported"
1537 );
1538
1539 assert!(
1540 self.resource_metrics.len() == 1,
1541 "Expected single resource metrics."
1542 );
1543 let resource_metric = self
1544 .resource_metrics
1545 .first()
1546 .expect("This should contain exactly one resource metric, as validated above.");
1547
1548 assert!(
1549 !resource_metric.scope_metrics.is_empty(),
1550 "No scope metrics in latest export"
1551 );
1552 assert!(!resource_metric.scope_metrics[0].metrics.is_empty());
1553
1554 let metric = &resource_metric.scope_metrics[0].metrics[0];
1555 assert_eq!(metric.name, counter_name);
1556 if let Some(expected_unit) = unit_name {
1557 assert_eq!(metric.unit, expected_unit);
1558 }
1559
1560 metric
1561 .data
1562 .as_any()
1563 .downcast_ref::<T>()
1564 .expect("Failed to cast aggregation to expected type")
1565 }
1566
1567 fn get_from_multiple_aggregations<T: data::Aggregation>(
1568 &mut self,
1569 counter_name: &str,
1570 unit_name: Option<&str>,
1571 invocation_count: usize,
1572 ) -> Vec<&T> {
1573 self.resource_metrics = self
1574 .exporter
1575 .get_finished_metrics()
1576 .expect("metrics expected to be exported");
1577
1578 assert!(
1579 !self.resource_metrics.is_empty(),
1580 "no metrics were exported"
1581 );
1582
1583 assert_eq!(
1584 self.resource_metrics.len(),
1585 invocation_count,
1586 "Expected collect to be called {} times",
1587 invocation_count
1588 );
1589
1590 let result = self
1591 .resource_metrics
1592 .iter()
1593 .map(|resource_metric| {
1594 assert!(
1595 !resource_metric.scope_metrics.is_empty(),
1596 "An export with no scope metrics occurred"
1597 );
1598
1599 assert!(!resource_metric.scope_metrics[0].metrics.is_empty());
1600
1601 let metric = &resource_metric.scope_metrics[0].metrics[0];
1602 assert_eq!(metric.name, counter_name);
1603
1604 if let Some(expected_unit) = unit_name {
1605 assert_eq!(metric.unit, expected_unit);
1606 }
1607
1608 let aggregation = metric
1609 .data
1610 .as_any()
1611 .downcast_ref::<T>()
1612 .expect("Failed to cast aggregation to expected type");
1613 aggregation
1614 })
1615 .collect::<Vec<_>>();
1616
1617 result
1618 }
1619 }
1620}