1use std::cell::RefCell;
5use std::collections::HashMap;
6use std::convert::From;
7use std::sync::{
8 atomic::{AtomicU64 as StdAtomicU64, Ordering},
9 Arc, Mutex,
10};
11use std::time::{Duration, Instant as StdInstant};
12
13use crate::atomic64::{Atomic, AtomicF64, AtomicU64};
14use crate::desc::{Desc, Describer};
15use crate::errors::{Error, Result};
16use crate::metrics::{Collector, LocalMetric, Metric, Opts};
17use crate::proto;
18use crate::value::make_label_pairs;
19use crate::vec::{MetricVec, MetricVecBuilder};
20
21pub const DEFAULT_BUCKETS: &[f64; 11] = &[
26 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
27];
28
29pub const BUCKET_LABEL: &str = "le";
32
33#[inline]
34fn check_bucket_label(label: &str) -> Result<()> {
35 if label == BUCKET_LABEL {
36 return Err(Error::Msg(
37 "`le` is not allowed as label name in histograms".to_owned(),
38 ));
39 }
40
41 Ok(())
42}
43
44fn check_and_adjust_buckets(mut buckets: Vec<f64>) -> Result<Vec<f64>> {
45 if buckets.is_empty() {
46 buckets = Vec::from(DEFAULT_BUCKETS as &'static [f64]);
47 }
48
49 for (i, upper_bound) in buckets.iter().enumerate() {
50 if i < (buckets.len() - 1) && *upper_bound >= buckets[i + 1] {
51 return Err(Error::Msg(format!(
52 "histogram buckets must be in increasing \
53 order: {} >= {}",
54 upper_bound,
55 buckets[i + 1]
56 )));
57 }
58 }
59
60 let tail = *buckets.last().unwrap();
61 if tail.is_sign_positive() && tail.is_infinite() {
62 buckets.pop();
64 }
65
66 Ok(buckets)
67}
68
69#[derive(Clone, Debug)]
73pub struct HistogramOpts {
74 pub common_opts: Opts,
76
77 pub buckets: Vec<f64>,
83}
84
85impl HistogramOpts {
86 pub fn new<S1: Into<String>, S2: Into<String>>(name: S1, help: S2) -> HistogramOpts {
88 HistogramOpts {
89 common_opts: Opts::new(name, help),
90 buckets: Vec::from(DEFAULT_BUCKETS as &'static [f64]),
91 }
92 }
93
94 pub fn namespace<S: Into<String>>(mut self, namespace: S) -> Self {
96 self.common_opts.namespace = namespace.into();
97 self
98 }
99
100 pub fn subsystem<S: Into<String>>(mut self, subsystem: S) -> Self {
102 self.common_opts.subsystem = subsystem.into();
103 self
104 }
105
106 pub fn const_labels(mut self, const_labels: HashMap<String, String>) -> Self {
108 self.common_opts = self.common_opts.const_labels(const_labels);
109 self
110 }
111
112 pub fn const_label<S1: Into<String>, S2: Into<String>>(mut self, name: S1, value: S2) -> Self {
114 self.common_opts = self.common_opts.const_label(name, value);
115 self
116 }
117
118 pub fn variable_labels(mut self, variable_labels: Vec<String>) -> Self {
120 self.common_opts = self.common_opts.variable_labels(variable_labels);
121 self
122 }
123
124 pub fn variable_label<S: Into<String>>(mut self, name: S) -> Self {
126 self.common_opts = self.common_opts.variable_label(name);
127 self
128 }
129
130 pub fn fq_name(&self) -> String {
132 self.common_opts.fq_name()
133 }
134
135 pub fn buckets(mut self, buckets: Vec<f64>) -> Self {
137 self.buckets = buckets;
138 self
139 }
140}
141
142impl Describer for HistogramOpts {
143 fn describe(&self) -> Result<Desc> {
144 self.common_opts.describe()
145 }
146}
147
148impl From<Opts> for HistogramOpts {
149 fn from(opts: Opts) -> HistogramOpts {
150 HistogramOpts {
151 common_opts: opts,
152 buckets: Vec::from(DEFAULT_BUCKETS as &'static [f64]),
153 }
154 }
155}
156
157#[derive(Debug)]
161struct Shard {
162 sum: AtomicF64,
163 count: AtomicU64,
164 buckets: Vec<AtomicU64>,
165}
166
167impl Shard {
168 fn new(num_buckets: usize) -> Self {
169 let mut buckets = Vec::new();
170 for _ in 0..num_buckets {
171 buckets.push(AtomicU64::new(0));
172 }
173
174 Shard {
175 sum: AtomicF64::new(0.0),
176 count: AtomicU64::new(0),
177 buckets,
178 }
179 }
180}
181
182#[derive(Debug, Clone, Copy)]
186enum ShardIndex {
187 First,
189 Second,
191}
192
193impl ShardIndex {
194 fn inverse(self) -> ShardIndex {
196 match self {
197 ShardIndex::First => ShardIndex::Second,
198 ShardIndex::Second => ShardIndex::First,
199 }
200 }
201}
202
203impl From<u64> for ShardIndex {
204 fn from(index: u64) -> Self {
205 match index {
206 0 => ShardIndex::First,
207 1 => ShardIndex::Second,
208 _ => panic!(
209 "Invalid shard index {:?}. A histogram only has two shards.",
210 index
211 ),
212 }
213 }
214}
215
216impl From<ShardIndex> for usize {
217 fn from(index: ShardIndex) -> Self {
218 match index {
219 ShardIndex::First => 0,
220 ShardIndex::Second => 1,
221 }
222 }
223}
224
225#[derive(Debug)]
228struct ShardAndCount {
229 inner: StdAtomicU64,
230}
231
232impl ShardAndCount {
233 fn new() -> Self {
237 ShardAndCount {
238 inner: StdAtomicU64::new(0),
239 }
240 }
241
242 fn flip(&self, ordering: Ordering) -> (ShardIndex, u64) {
245 let n = self.inner.fetch_add(1 << 63, ordering);
246
247 ShardAndCount::split_shard_index_and_count(n)
248 }
249
250 fn get(&self) -> (ShardIndex, u64) {
253 let n = self.inner.load(Ordering::Relaxed);
254
255 ShardAndCount::split_shard_index_and_count(n)
256 }
257
258 fn inc_by(&self, delta: u64, ordering: Ordering) -> (ShardIndex, u64) {
261 let n = self.inner.fetch_add(delta, ordering);
262
263 ShardAndCount::split_shard_index_and_count(n)
264 }
265
266 fn inc(&self, ordering: Ordering) -> (ShardIndex, u64) {
269 self.inc_by(1, ordering)
270 }
271
272 fn split_shard_index_and_count(n: u64) -> (ShardIndex, u64) {
273 let shard = n >> 63;
274 let count = n & ((1 << 63) - 1);
275
276 (shard.into(), count)
277 }
278}
279
280#[derive(Debug)]
309pub struct HistogramCore {
310 desc: Desc,
311 label_pairs: Vec<proto::LabelPair>,
312
313 collect_lock: Mutex<()>,
318
319 shard_and_count: ShardAndCount,
322 shards: [Shard; 2],
325
326 upper_bounds: Vec<f64>,
327}
328
329impl HistogramCore {
330 pub fn new<V: AsRef<str>>(opts: &HistogramOpts, label_values: &[V]) -> Result<HistogramCore> {
331 let desc = opts.describe()?;
332
333 for name in &desc.variable_labels {
334 check_bucket_label(name)?;
335 }
336 for pair in &desc.const_label_pairs {
337 check_bucket_label(pair.name())?;
338 }
339
340 let label_pairs = make_label_pairs(&desc, label_values)?;
341
342 let buckets = check_and_adjust_buckets(opts.buckets.clone())?;
343
344 Ok(HistogramCore {
345 desc,
346 label_pairs,
347
348 collect_lock: Mutex::new(()),
349
350 shard_and_count: ShardAndCount::new(),
351 shards: [Shard::new(buckets.len()), Shard::new(buckets.len())],
352
353 upper_bounds: buckets,
354 })
355 }
356
357 pub fn observe(&self, v: f64) {
364 let (shard_index, _count) = self.shard_and_count.inc(Ordering::Acquire);
371
372 let shard: &Shard = &self.shards[usize::from(shard_index)];
373
374 let mut iter = self
376 .upper_bounds
377 .iter()
378 .enumerate()
379 .filter(|&(_, f)| v <= *f);
380 if let Some((i, _)) = iter.next() {
381 shard.buckets[i].inc_by(1);
382 }
383
384 shard.sum.inc_by(v);
385 shard.count.inc_by_with_ordering(1, Ordering::Release);
387 }
388
389 pub fn proto(&self) -> proto::Histogram {
396 let collect_guard = self.collect_lock.lock().expect("Lock poisoned");
397
398 let (cold_shard_index, overall_count) = self.shard_and_count.flip(Ordering::AcqRel);
402
403 let cold_shard = &self.shards[usize::from(cold_shard_index)];
404 let hot_shard = &self.shards[usize::from(cold_shard_index.inverse())];
405
406 while cold_shard
419 .count
420 .compare_exchange_weak(
421 overall_count,
422 0,
424 Ordering::Acquire,
425 Ordering::Acquire,
426 )
427 .is_err()
428 {}
429
430 let cold_shard_sum = cold_shard.sum.swap(0.0, Ordering::AcqRel);
435
436 let mut h = proto::Histogram::default();
437 h.set_sample_sum(cold_shard_sum);
438 h.set_sample_count(overall_count);
439
440 let mut cumulative_count = 0;
441 let mut buckets = Vec::with_capacity(self.upper_bounds.len());
442 for (i, upper_bound) in self.upper_bounds.iter().enumerate() {
443 let cold_bucket_count = cold_shard.buckets[i].swap(0, Ordering::AcqRel);
448 hot_shard.buckets[i].inc_by(cold_bucket_count);
449
450 cumulative_count += cold_bucket_count;
451 let mut b = proto::Bucket::default();
452 b.set_cumulative_count(cumulative_count);
453 b.set_upper_bound(*upper_bound);
454 buckets.push(b);
455 }
456 h.set_bucket(buckets);
457
458 hot_shard.count.inc_by(overall_count);
460 hot_shard.sum.inc_by(cold_shard_sum);
461
462 drop(collect_guard);
463
464 h
465 }
466
467 fn sample_sum(&self) -> f64 {
468 let _guard = self.collect_lock.lock().expect("Lock poisoned");
471
472 let (shard_index, _count) = self.shard_and_count.get();
473 self.shards[shard_index as usize].sum.get()
474 }
475
476 fn sample_count(&self) -> u64 {
477 self.shard_and_count.get().1
478 }
479}
480
481#[cfg(all(feature = "nightly", target_os = "linux"))]
483pub struct Timespec(libc::timespec);
484
485#[cfg(all(feature = "nightly", target_os = "linux"))]
486impl std::fmt::Debug for Timespec {
487 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
488 write!(
489 f,
490 "Timespec {{ tv_sec: {}, tv_nsec: {} }}",
491 self.0.tv_sec, self.0.tv_nsec
492 )
493 }
494}
495
496#[derive(Debug)]
497pub enum Instant {
498 Monotonic(StdInstant),
499 #[cfg(all(feature = "nightly", target_os = "linux"))]
500 MonotonicCoarse(Timespec),
501}
502
503impl Instant {
504 pub fn now() -> Instant {
505 Instant::Monotonic(StdInstant::now())
506 }
507
508 #[cfg(all(feature = "nightly", target_os = "linux"))]
509 pub fn now_coarse() -> Instant {
510 Instant::MonotonicCoarse(get_time_coarse())
511 }
512
513 #[cfg(all(feature = "nightly", not(target_os = "linux")))]
514 pub fn now_coarse() -> Instant {
515 Instant::Monotonic(StdInstant::now())
516 }
517
518 pub fn elapsed(&self) -> Duration {
519 match self {
520 Instant::Monotonic(i) => StdInstant::now().saturating_duration_since(*i),
522
523 #[cfg(all(feature = "nightly", target_os = "linux"))]
529 Instant::MonotonicCoarse(t) => {
530 let now = get_time_coarse();
531 let now_ms = now.0.tv_sec * MILLIS_PER_SEC + now.0.tv_nsec / NANOS_PER_MILLI;
532 let t_ms = t.0.tv_sec * MILLIS_PER_SEC + t.0.tv_nsec / NANOS_PER_MILLI;
533 let dur = now_ms - t_ms;
534 if dur >= 0 {
535 Duration::from_millis(dur as u64)
536 } else {
537 Duration::from_millis(0)
538 }
539 }
540 }
541 }
542
543 #[inline]
544 pub fn elapsed_sec(&self) -> f64 {
545 duration_to_seconds(self.elapsed())
546 }
547}
548
549#[cfg(all(feature = "nightly", target_os = "linux"))]
550use self::coarse::*;
551
552#[cfg(all(feature = "nightly", target_os = "linux"))]
553mod coarse {
554 use crate::histogram::Timespec;
555 pub use libc::timespec;
556 use libc::{clock_gettime, CLOCK_MONOTONIC_COARSE};
557
558 pub const NANOS_PER_MILLI: i64 = 1_000_000;
559 pub const MILLIS_PER_SEC: i64 = 1_000;
560
561 pub fn get_time_coarse() -> Timespec {
562 let mut t = Timespec(timespec {
563 tv_sec: 0,
564 tv_nsec: 0,
565 });
566 assert_eq!(
567 unsafe { clock_gettime(CLOCK_MONOTONIC_COARSE, &mut t.0) },
568 0
569 );
570 t
571 }
572}
573
574#[must_use = "Timer should be kept in a variable otherwise it cannot observe duration"]
580#[derive(Debug)]
581pub struct HistogramTimer {
582 histogram: Histogram,
584 observed: bool,
586 start: Instant,
588}
589
590impl HistogramTimer {
591 fn new(histogram: Histogram) -> Self {
592 Self {
593 histogram,
594 observed: false,
595 start: Instant::now(),
596 }
597 }
598
599 #[cfg(feature = "nightly")]
600 fn new_coarse(histogram: Histogram) -> Self {
601 HistogramTimer {
602 histogram,
603 observed: false,
604 start: Instant::now_coarse(),
605 }
606 }
607
608 pub fn observe_duration(self) {
613 self.stop_and_record();
614 }
615
616 pub fn stop_and_record(self) -> f64 {
621 let mut timer = self;
622 timer.observe(true)
623 }
624
625 pub fn stop_and_discard(self) -> f64 {
630 let mut timer = self;
631 timer.observe(false)
632 }
633
634 fn observe(&mut self, record: bool) -> f64 {
635 let v = self.start.elapsed_sec();
636 self.observed = true;
637 if record {
638 self.histogram.observe(v);
639 }
640 v
641 }
642}
643
644impl Drop for HistogramTimer {
645 fn drop(&mut self) {
646 if !self.observed {
647 self.observe(true);
648 }
649 }
650}
651
652#[derive(Clone, Debug)]
670pub struct Histogram {
671 core: Arc<HistogramCore>,
672}
673
674impl Histogram {
675 pub fn with_opts(opts: HistogramOpts) -> Result<Histogram> {
677 Histogram::with_opts_and_label_values::<&str>(&opts, &[])
678 }
679
680 fn with_opts_and_label_values<V: AsRef<str>>(
681 opts: &HistogramOpts,
682 label_values: &[V],
683 ) -> Result<Histogram> {
684 let core = HistogramCore::new(opts, label_values)?;
685
686 Ok(Histogram {
687 core: Arc::new(core),
688 })
689 }
690}
691
692impl Histogram {
693 pub fn observe(&self, v: f64) {
695 self.core.observe(v)
696 }
697
698 pub fn start_timer(&self) -> HistogramTimer {
700 HistogramTimer::new(self.clone())
701 }
702
703 #[cfg(feature = "nightly")]
706 pub fn start_coarse_timer(&self) -> HistogramTimer {
707 HistogramTimer::new_coarse(self.clone())
708 }
709
710 pub fn observe_closure_duration<F, T>(&self, f: F) -> T
712 where
713 F: FnOnce() -> T,
714 {
715 let instant = Instant::now();
716 let res = f();
717 let elapsed = instant.elapsed_sec();
718 self.observe(elapsed);
719 res
720 }
721
722 #[cfg(feature = "nightly")]
724 pub fn observe_closure_duration_coarse<F, T>(&self, f: F) -> T
725 where
726 F: FnOnce() -> T,
727 {
728 let instant = Instant::now_coarse();
729 let res = f();
730 let elapsed = instant.elapsed_sec();
731 self.observe(elapsed);
732 res
733 }
734
735 pub fn local(&self) -> LocalHistogram {
737 LocalHistogram::new(self.clone())
738 }
739
740 pub fn get_sample_sum(&self) -> f64 {
742 self.core.sample_sum()
743 }
744
745 pub fn get_sample_count(&self) -> u64 {
747 self.core.sample_count()
748 }
749}
750
751impl Metric for Histogram {
752 fn metric(&self) -> proto::Metric {
753 let mut m = proto::Metric::from_label(self.core.label_pairs.clone());
754
755 let h = self.core.proto();
756 m.set_histogram(h);
757
758 m
759 }
760}
761
762impl Collector for Histogram {
763 fn desc(&self) -> Vec<&Desc> {
764 vec![&self.core.desc]
765 }
766
767 fn collect(&self) -> Vec<proto::MetricFamily> {
768 let mut m = proto::MetricFamily::default();
769 m.set_name(self.core.desc.fq_name.clone());
770 m.set_help(self.core.desc.help.clone());
771 m.set_field_type(proto::MetricType::HISTOGRAM);
772 m.set_metric(vec![self.metric()]);
773
774 vec![m]
775 }
776}
777
778#[derive(Clone, Debug)]
779pub struct HistogramVecBuilder {}
780
781impl MetricVecBuilder for HistogramVecBuilder {
782 type M = Histogram;
783 type P = HistogramOpts;
784
785 fn build<V: AsRef<str>>(&self, opts: &HistogramOpts, vals: &[V]) -> Result<Histogram> {
786 Histogram::with_opts_and_label_values(opts, vals)
787 }
788}
789
790pub type HistogramVec = MetricVec<HistogramVecBuilder>;
795
796impl HistogramVec {
797 pub fn new(opts: HistogramOpts, label_names: &[&str]) -> Result<HistogramVec> {
801 let variable_names = label_names.iter().map(|s| (*s).to_owned()).collect();
802 let opts = opts.variable_labels(variable_names);
803 let metric_vec =
804 MetricVec::create(proto::MetricType::HISTOGRAM, HistogramVecBuilder {}, opts)?;
805
806 Ok(metric_vec as HistogramVec)
807 }
808
809 pub fn local(&self) -> LocalHistogramVec {
811 let vec = self.clone();
812 LocalHistogramVec::new(vec)
813 }
814}
815
816pub fn linear_buckets(start: f64, width: f64, count: usize) -> Result<Vec<f64>> {
824 if count < 1 {
825 return Err(Error::Msg(format!(
826 "LinearBuckets needs a positive count, count: {}",
827 count
828 )));
829 }
830 if width <= 0.0 {
831 return Err(Error::Msg(format!(
832 "LinearBuckets needs a width greater then 0, width: {}",
833 width
834 )));
835 }
836
837 let buckets: Vec<_> = (0..count)
838 .map(|step| start + width * (step as f64))
839 .collect();
840
841 Ok(buckets)
842}
843
844pub fn exponential_buckets(start: f64, factor: f64, count: usize) -> Result<Vec<f64>> {
853 if count < 1 {
854 return Err(Error::Msg(format!(
855 "exponential_buckets needs a positive count, count: {}",
856 count
857 )));
858 }
859 if start <= 0.0 {
860 return Err(Error::Msg(format!(
861 "exponential_buckets needs a positive start value, \
862 start: {}",
863 start
864 )));
865 }
866 if factor <= 1.0 {
867 return Err(Error::Msg(format!(
868 "exponential_buckets needs a factor greater than 1, \
869 factor: {}",
870 factor
871 )));
872 }
873
874 let mut next = start;
875 let mut buckets = Vec::with_capacity(count);
876 for _ in 0..count {
877 buckets.push(next);
878 next *= factor;
879 }
880
881 Ok(buckets)
882}
883
884#[inline]
886pub fn duration_to_seconds(d: Duration) -> f64 {
887 let nanos = f64::from(d.subsec_nanos()) / 1e9;
888 d.as_secs() as f64 + nanos
889}
890
891#[derive(Clone, Debug)]
892pub struct LocalHistogramCore {
893 histogram: Histogram,
894 counts: Vec<u64>,
895 count: u64,
896 sum: f64,
897}
898
899#[derive(Debug)]
901pub struct LocalHistogram {
902 core: RefCell<LocalHistogramCore>,
903}
904
905impl Clone for LocalHistogram {
906 fn clone(&self) -> LocalHistogram {
907 let core = self.core.clone();
908 let lh = LocalHistogram { core };
909 lh.clear();
910 lh
911 }
912}
913
914#[must_use = "Timer should be kept in a variable otherwise it cannot observe duration"]
916#[derive(Debug)]
917pub struct LocalHistogramTimer {
918 local: LocalHistogram,
920 observed: bool,
922 start: Instant,
924}
925
926impl LocalHistogramTimer {
927 fn new(histogram: LocalHistogram) -> Self {
928 Self {
929 local: histogram,
930 observed: false,
931 start: Instant::now(),
932 }
933 }
934
935 #[cfg(feature = "nightly")]
936 fn new_coarse(histogram: LocalHistogram) -> Self {
937 Self {
938 local: histogram,
939 observed: false,
940 start: Instant::now_coarse(),
941 }
942 }
943
944 pub fn observe_duration(self) {
949 self.stop_and_record();
950 }
951
952 pub fn stop_and_record(self) -> f64 {
957 let mut timer = self;
958 timer.observe(true)
959 }
960
961 pub fn stop_and_discard(self) -> f64 {
966 let mut timer = self;
967 timer.observe(false)
968 }
969
970 fn observe(&mut self, record: bool) -> f64 {
971 let v = self.start.elapsed_sec();
972 self.observed = true;
973 if record {
974 self.local.observe(v);
975 }
976 v
977 }
978}
979
980impl Drop for LocalHistogramTimer {
981 fn drop(&mut self) {
982 if !self.observed {
983 self.observe(true);
984 }
985 }
986}
987
988impl LocalHistogramCore {
989 fn new(histogram: Histogram) -> LocalHistogramCore {
990 let counts = vec![0; histogram.core.upper_bounds.len()];
991
992 LocalHistogramCore {
993 histogram,
994 counts,
995 count: 0,
996 sum: 0.0,
997 }
998 }
999
1000 pub fn observe(&mut self, v: f64) {
1001 let mut iter = self
1003 .histogram
1004 .core
1005 .upper_bounds
1006 .iter()
1007 .enumerate()
1008 .filter(|&(_, f)| v <= *f);
1009 if let Some((i, _)) = iter.next() {
1010 self.counts[i] += 1;
1011 }
1012
1013 self.count += 1;
1014 self.sum += v;
1015 }
1016
1017 pub fn clear(&mut self) {
1018 for v in &mut self.counts {
1019 *v = 0
1020 }
1021
1022 self.count = 0;
1023 self.sum = 0.0;
1024 }
1025
1026 pub fn flush(&mut self) {
1027 if self.count == 0 {
1029 return;
1030 }
1031
1032 {
1033 let (shard_index, _count) = self
1040 .histogram
1041 .core
1042 .shard_and_count
1043 .inc_by(self.count, Ordering::Acquire);
1044 let shard = &self.histogram.core.shards[shard_index as usize];
1045
1046 for (i, v) in self.counts.iter().enumerate() {
1047 if *v > 0 {
1048 shard.buckets[i].inc_by(*v);
1049 }
1050 }
1051
1052 shard.sum.inc_by(self.sum);
1053 shard
1055 .count
1056 .inc_by_with_ordering(self.count, Ordering::Release);
1057 }
1058
1059 self.clear()
1060 }
1061
1062 fn sample_sum(&self) -> f64 {
1063 self.sum
1064 }
1065
1066 fn sample_count(&self) -> u64 {
1067 self.count
1068 }
1069}
1070
1071impl LocalHistogram {
1072 fn new(histogram: Histogram) -> LocalHistogram {
1073 let core = LocalHistogramCore::new(histogram);
1074 LocalHistogram {
1075 core: RefCell::new(core),
1076 }
1077 }
1078
1079 pub fn observe(&self, v: f64) {
1081 self.core.borrow_mut().observe(v);
1082 }
1083
1084 pub fn start_timer(&self) -> LocalHistogramTimer {
1086 LocalHistogramTimer::new(self.clone())
1087 }
1088
1089 #[cfg(feature = "nightly")]
1092 pub fn start_coarse_timer(&self) -> LocalHistogramTimer {
1093 LocalHistogramTimer::new_coarse(self.clone())
1094 }
1095
1096 pub fn observe_closure_duration<F, T>(&self, f: F) -> T
1098 where
1099 F: FnOnce() -> T,
1100 {
1101 let instant = Instant::now();
1102 let res = f();
1103 let elapsed = instant.elapsed_sec();
1104 self.observe(elapsed);
1105 res
1106 }
1107
1108 #[cfg(feature = "nightly")]
1110 pub fn observe_closure_duration_coarse<F, T>(&self, f: F) -> T
1111 where
1112 F: FnOnce() -> T,
1113 {
1114 let instant = Instant::now_coarse();
1115 let res = f();
1116 let elapsed = instant.elapsed_sec();
1117 self.observe(elapsed);
1118 res
1119 }
1120
1121 pub fn clear(&self) {
1123 self.core.borrow_mut().clear();
1124 }
1125
1126 pub fn flush(&self) {
1128 self.core.borrow_mut().flush();
1129 }
1130
1131 pub fn get_sample_sum(&self) -> f64 {
1133 self.core.borrow().sample_sum()
1134 }
1135
1136 pub fn get_sample_count(&self) -> u64 {
1138 self.core.borrow().sample_count()
1139 }
1140}
1141
1142impl LocalMetric for LocalHistogram {
1143 fn flush(&self) {
1145 LocalHistogram::flush(self);
1146 }
1147}
1148
1149impl Drop for LocalHistogram {
1150 fn drop(&mut self) {
1151 self.flush()
1152 }
1153}
1154
1155#[derive(Debug)]
1157pub struct LocalHistogramVec {
1158 vec: HistogramVec,
1159 local: HashMap<u64, LocalHistogram>,
1160}
1161
1162impl LocalHistogramVec {
1163 fn new(vec: HistogramVec) -> LocalHistogramVec {
1164 let local = HashMap::with_capacity(vec.v.children.read().len());
1165 LocalHistogramVec { vec, local }
1166 }
1167
1168 pub fn with_label_values<'a>(&'a mut self, vals: &[&str]) -> &'a LocalHistogram {
1171 let hash = self.vec.v.hash_label_values(vals).unwrap();
1172 let vec = &self.vec;
1173 self.local
1174 .entry(hash)
1175 .or_insert_with(|| vec.with_label_values(vals).local())
1176 }
1177
1178 pub fn remove_label_values(&mut self, vals: &[&str]) -> Result<()> {
1181 let hash = self.vec.v.hash_label_values(vals)?;
1182 self.local.remove(&hash);
1183 self.vec.v.delete_label_values(vals)
1184 }
1185
1186 pub fn flush(&self) {
1188 for h in self.local.values() {
1189 h.flush();
1190 }
1191 }
1192}
1193
1194impl LocalMetric for LocalHistogramVec {
1195 fn flush(&self) {
1197 LocalHistogramVec::flush(self)
1198 }
1199}
1200
1201impl Clone for LocalHistogramVec {
1202 fn clone(&self) -> LocalHistogramVec {
1203 LocalHistogramVec::new(self.vec.clone())
1204 }
1205}
1206
1207#[cfg(test)]
1208mod tests {
1209 use std::f64;
1210 use std::thread;
1211 use std::time::Duration;
1212
1213 use super::*;
1214 use crate::metrics::{Collector, Metric};
1215
1216 #[test]
1217 fn test_histogram() {
1218 let opts = HistogramOpts::new("test1", "test help")
1219 .const_label("a", "1")
1220 .const_label("b", "2");
1221 let histogram = Histogram::with_opts(opts).unwrap();
1222 histogram.observe(1.0);
1223
1224 let timer = histogram.start_timer();
1225 thread::sleep(Duration::from_millis(100));
1226 timer.observe_duration();
1227
1228 let timer = histogram.start_timer();
1229 let handler = thread::spawn(move || {
1230 let _timer = timer;
1231 thread::sleep(Duration::from_millis(400));
1232 });
1233 assert!(handler.join().is_ok());
1234
1235 let mut mfs = histogram.collect();
1236 assert_eq!(mfs.len(), 1);
1237
1238 let mf = mfs.pop().unwrap();
1239 let m = mf.get_metric().first().unwrap();
1240 assert_eq!(m.get_label().len(), 2);
1241 let proto_histogram = m.get_histogram();
1242 assert_eq!(proto_histogram.get_sample_count(), 3);
1243 assert!(proto_histogram.get_sample_sum() >= 1.5);
1244 assert_eq!(proto_histogram.get_bucket().len(), DEFAULT_BUCKETS.len());
1245
1246 let buckets = vec![1.0, 2.0, 3.0];
1247 let opts = HistogramOpts::new("test2", "test help").buckets(buckets.clone());
1248 let histogram = Histogram::with_opts(opts).unwrap();
1249 let mut mfs = histogram.collect();
1250 assert_eq!(mfs.len(), 1);
1251
1252 let mf = mfs.pop().unwrap();
1253 let m = mf.get_metric().first().unwrap();
1254 assert_eq!(m.get_label().len(), 0);
1255 let proto_histogram = m.get_histogram();
1256 assert_eq!(proto_histogram.get_sample_count(), 0);
1257 assert!((proto_histogram.get_sample_sum() - 0.0) < f64::EPSILON);
1258 assert_eq!(proto_histogram.get_bucket().len(), buckets.len())
1259 }
1260
1261 #[test]
1262 #[cfg(feature = "nightly")]
1263 fn test_histogram_coarse_timer() {
1264 let opts = HistogramOpts::new("test1", "test help");
1265 let histogram = Histogram::with_opts(opts).unwrap();
1266
1267 let timer = histogram.start_coarse_timer();
1268 thread::sleep(Duration::from_millis(100));
1269 timer.observe_duration();
1270
1271 let timer = histogram.start_coarse_timer();
1272 let handler = thread::spawn(move || {
1273 let _timer = timer;
1274 thread::sleep(Duration::from_millis(400));
1275 });
1276 assert!(handler.join().is_ok());
1277
1278 histogram.observe_closure_duration(|| {
1279 thread::sleep(Duration::from_millis(400));
1280 });
1281
1282 let mut mfs = histogram.collect();
1283 assert_eq!(mfs.len(), 1);
1284
1285 let mf = mfs.pop().unwrap();
1286 let m = mf.get_metric().get(0).unwrap();
1287 let proto_histogram = m.get_histogram();
1288 assert_eq!(proto_histogram.get_sample_count(), 3);
1289 assert!((proto_histogram.get_sample_sum() - 0.0) > f64::EPSILON);
1290 }
1291
1292 #[test]
1293 #[cfg(feature = "nightly")]
1294 fn test_instant_on_smp() {
1295 let zero = Duration::from_millis(0);
1296 for i in 0..100_000 {
1297 let now = Instant::now();
1298 let now_coarse = Instant::now_coarse();
1299 if i % 100 == 0 {
1300 thread::yield_now();
1301 }
1302 assert!(now.elapsed() >= zero);
1303 assert!(now_coarse.elapsed() >= zero);
1304 }
1305 }
1306
1307 #[test]
1308 fn test_buckets_invalidation() {
1309 let table = vec![
1310 (vec![], true, DEFAULT_BUCKETS.len()),
1311 (vec![-2.0, -1.0, -0.5, 0.0, 0.5, 1.0, 2.0], true, 7),
1312 (vec![-2.0, -1.0, -0.5, 10.0, 0.5, 1.0, 2.0], false, 7),
1313 (
1314 vec![-2.0, -1.0, -0.5, 0.0, 0.5, 1.0, f64::INFINITY],
1315 true,
1316 6,
1317 ),
1318 ];
1319
1320 for (buckets, is_ok, length) in table {
1321 let got = check_and_adjust_buckets(buckets);
1322 assert_eq!(got.is_ok(), is_ok);
1323 if is_ok {
1324 assert_eq!(got.unwrap().len(), length);
1325 }
1326 }
1327 }
1328
1329 #[test]
1330 fn test_buckets_functions() {
1331 let linear_table = vec![
1332 (
1333 -15.0,
1334 5.0,
1335 6,
1336 true,
1337 vec![-15.0, -10.0, -5.0, 0.0, 5.0, 10.0],
1338 ),
1339 (-15.0, 0.0, 6, false, vec![]),
1340 (-15.0, 5.0, 0, false, vec![]),
1341 ];
1342
1343 for (param1, param2, param3, is_ok, vec) in linear_table {
1344 let got = linear_buckets(param1, param2, param3);
1345 assert_eq!(got.is_ok(), is_ok);
1346 if got.is_ok() {
1347 assert_eq!(got.unwrap(), vec);
1348 }
1349 }
1350
1351 let exponential_table = vec![
1352 (100.0, 1.2, 3, true, vec![100.0, 120.0, 144.0]),
1353 (100.0, 0.5, 3, false, vec![]),
1354 (100.0, 1.2, 0, false, vec![]),
1355 ];
1356
1357 for (param1, param2, param3, is_ok, vec) in exponential_table {
1358 let got = exponential_buckets(param1, param2, param3);
1359 assert_eq!(got.is_ok(), is_ok);
1360 if got.is_ok() {
1361 assert_eq!(got.unwrap(), vec);
1362 }
1363 }
1364 }
1365
1366 #[test]
1367 fn test_duration_to_seconds() {
1368 let tbls = vec![(1000, 1.0), (1100, 1.1), (100_111, 100.111)];
1369 for (millis, seconds) in tbls {
1370 let d = Duration::from_millis(millis);
1371 let v = duration_to_seconds(d);
1372 assert!((v - seconds).abs() < f64::EPSILON);
1373 }
1374 }
1375
1376 #[test]
1377 fn test_histogram_vec_with_label_values() {
1378 let vec = HistogramVec::new(
1379 HistogramOpts::new("test_histogram_vec", "test histogram vec help"),
1380 &["l1", "l2"],
1381 )
1382 .unwrap();
1383
1384 assert!(vec.remove_label_values(&["v1", "v2"]).is_err());
1385 vec.with_label_values(&["v1", "v2"]).observe(1.0);
1386 assert!(vec.remove_label_values(&["v1", "v2"]).is_ok());
1387
1388 assert!(vec.remove_label_values(&["v1"]).is_err());
1389 assert!(vec.remove_label_values(&["v1", "v3"]).is_err());
1390 }
1391
1392 #[test]
1393 fn test_histogram_vec_with_owned_label_values() {
1394 let vec = HistogramVec::new(
1395 HistogramOpts::new("test_histogram_vec", "test histogram vec help"),
1396 &["l1", "l2"],
1397 )
1398 .unwrap();
1399
1400 let v1 = "v1".to_string();
1401 let v2 = "v2".to_string();
1402 let v3 = "v3".to_string();
1403
1404 assert!(vec.remove_label_values(&[v1.clone(), v2.clone()]).is_err());
1405 vec.with_label_values(&[v1.clone(), v2.clone()])
1406 .observe(1.0);
1407 assert!(vec.remove_label_values(&[v1.clone(), v2.clone()]).is_ok());
1408
1409 assert!(vec.remove_label_values(&[v1.clone()]).is_err());
1410 assert!(vec.remove_label_values(&[v1.clone(), v3.clone()]).is_err());
1411 }
1412
1413 #[test]
1414 fn test_histogram_vec_with_opts_buckets() {
1415 let labels = ["l1", "l2"];
1416 let buckets = vec![1.0, 2.0, 3.0];
1417 let vec = HistogramVec::new(
1418 HistogramOpts::new("test_histogram_vec", "test histogram vec help")
1419 .buckets(buckets.clone()),
1420 &labels,
1421 )
1422 .unwrap();
1423
1424 let histogram = vec.with_label_values(&["v1", "v2"]);
1425 histogram.observe(1.0);
1426
1427 let m = histogram.metric();
1428 assert_eq!(m.get_label().len(), labels.len());
1429
1430 let proto_histogram = m.get_histogram();
1431 assert_eq!(proto_histogram.get_sample_count(), 1);
1432 assert!((proto_histogram.get_sample_sum() - 1.0) < f64::EPSILON);
1433 assert_eq!(proto_histogram.get_bucket().len(), buckets.len())
1434 }
1435
1436 #[test]
1437 fn test_histogram_local() {
1438 let buckets = vec![1.0, 2.0, 3.0];
1439 let opts = HistogramOpts::new("test_histogram_local", "test histogram local help")
1440 .buckets(buckets.clone());
1441 let histogram = Histogram::with_opts(opts).unwrap();
1442 let local = histogram.local();
1443
1444 let check = |count, sum| {
1445 let m = histogram.metric();
1446 let proto_histogram = m.get_histogram();
1447 assert_eq!(proto_histogram.get_sample_count(), count);
1448 assert!((proto_histogram.get_sample_sum() - sum) < f64::EPSILON);
1449 };
1450
1451 local.observe(1.0);
1452 local.observe(4.0);
1453 check(0, 0.0);
1454
1455 local.flush();
1456 check(2, 5.0);
1457
1458 local.observe(2.0);
1459 local.clear();
1460 check(2, 5.0);
1461
1462 local.observe(2.0);
1463 drop(local);
1464 check(3, 7.0);
1465 }
1466
1467 #[test]
1468 fn test_histogram_vec_local() {
1469 let vec = HistogramVec::new(
1470 HistogramOpts::new("test_histogram_vec_local", "test histogram vec help"),
1471 &["l1", "l2"],
1472 )
1473 .unwrap();
1474 let mut local_vec = vec.local();
1475
1476 vec.remove_label_values(&["v1", "v2"]).unwrap_err();
1477 local_vec.remove_label_values(&["v1", "v2"]).unwrap_err();
1478
1479 let check = |count, sum| {
1480 let ms = vec.collect()[0].take_metric();
1481 let proto_histogram = ms[0].get_histogram();
1482 assert_eq!(proto_histogram.get_sample_count(), count);
1483 assert!((proto_histogram.get_sample_sum() - sum) < f64::EPSILON);
1484 };
1485
1486 {
1487 let h = local_vec.with_label_values(&["v1", "v2"]);
1489 h.observe(1.0);
1490 h.flush();
1491 check(1, 1.0);
1492 }
1493
1494 {
1495 local_vec.with_label_values(&["v1", "v2"]).observe(4.0);
1497 local_vec.flush();
1498 check(2, 5.0);
1499 }
1500 {
1501 local_vec.remove_label_values(&["v1", "v2"]).unwrap();
1503
1504 local_vec.with_label_values(&["v1", "v2"]).observe(2.0);
1506 drop(local_vec);
1507 check(1, 2.0);
1508 }
1509 }
1510
1511 #[test]
1515 fn atomic_observe_across_collects() {
1516 let done = Arc::new(std::sync::atomic::AtomicBool::default());
1517 let histogram =
1518 Histogram::with_opts(HistogramOpts::new("test_name", "test help").buckets(vec![1.0]))
1519 .unwrap();
1520
1521 let done_clone = done.clone();
1522 let histogram_clone = histogram.clone();
1523 let observing_thread = std::thread::spawn(move || loop {
1524 if done_clone.load(std::sync::atomic::Ordering::Relaxed) {
1525 break;
1526 }
1527
1528 for _ in 0..1_000_000 {
1529 histogram_clone.observe(1.0);
1530 }
1531 });
1532
1533 let mut sample_count = 0;
1534 let mut cumulative_count = 0;
1535 let mut sample_sum = 0;
1536 for _ in 0..1_000_000 {
1537 let metric = &histogram.collect()[0].take_metric()[0];
1538 let proto = metric.get_histogram();
1539
1540 sample_count = proto.get_sample_count();
1541 sample_sum = proto.get_sample_sum() as u64;
1542 cumulative_count = proto.get_bucket()[0].cumulative_count();
1544
1545 if sample_count != cumulative_count {
1546 break;
1547 }
1548
1549 if sample_count != sample_sum {
1554 break;
1555 }
1556 }
1557
1558 done.store(true, std::sync::atomic::Ordering::Relaxed);
1559 observing_thread.join().unwrap();
1560
1561 if sample_count != cumulative_count {
1562 panic!(
1563 "Histogram invariant violated: For a histogram with a single \
1564 bucket observing values below the bucket's upper bound only \
1565 the histogram's count should always be equal to the buckets's \
1566 cumulative count, got {:?} and {:?} instead.",
1567 sample_count, cumulative_count,
1568 );
1569 }
1570
1571 if sample_count != sample_sum {
1572 panic!(
1573 "Histogram invariant violated: For a histogram which is only \
1574 ever observing a value of `1.0` the sample count should equal \
1575 the sum, instead got: {:?} and {:?}",
1576 sample_count, sample_sum,
1577 )
1578 }
1579 }
1580
1581 #[test]
1582 fn test_error_on_inconsistent_label_cardinality() {
1583 let hist = Histogram::with_opts(
1584 histogram_opts!(
1585 "example_histogram",
1586 "Used as an example",
1587 vec![0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 5.0]
1588 )
1589 .variable_label("example_variable"),
1590 );
1591
1592 if let Err(Error::InconsistentCardinality { expect, got }) = hist {
1593 assert_eq!(1, expect);
1594 assert_eq!(0, got);
1595 } else {
1596 panic!("Expected InconsistentCardinality error.")
1597 }
1598 }
1599}