1use std::cell::RefCell;
5use std::collections::HashMap;
6use std::convert::From;
7use std::sync::{
8 atomic::{AtomicU64 as StdAtomicU64, Ordering},
9 Arc, Mutex,
10};
11use std::time::{Duration, Instant as StdInstant};
12
13use crate::atomic64::{Atomic, AtomicF64, AtomicU64};
14use crate::desc::{Desc, Describer};
15use crate::errors::{Error, Result};
16use crate::metrics::{Collector, LocalMetric, Metric, Opts};
17use crate::proto;
18use crate::value::make_label_pairs;
19use crate::vec::{MetricVec, MetricVecBuilder};
20
21pub const DEFAULT_BUCKETS: &[f64; 11] = &[
26 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
27];
28
29pub const BUCKET_LABEL: &str = "le";
32
33#[inline]
34fn check_bucket_label(label: &str) -> Result<()> {
35 if label == BUCKET_LABEL {
36 return Err(Error::Msg(
37 "`le` is not allowed as label name in histograms".to_owned(),
38 ));
39 }
40
41 Ok(())
42}
43
44fn check_and_adjust_buckets(mut buckets: Vec<f64>) -> Result<Vec<f64>> {
45 if buckets.is_empty() {
46 buckets = Vec::from(DEFAULT_BUCKETS as &'static [f64]);
47 }
48
49 for (i, upper_bound) in buckets.iter().enumerate() {
50 if i < (buckets.len() - 1) && *upper_bound >= buckets[i + 1] {
51 return Err(Error::Msg(format!(
52 "histogram buckets must be in increasing \
53 order: {} >= {}",
54 upper_bound,
55 buckets[i + 1]
56 )));
57 }
58 }
59
60 let tail = *buckets.last().unwrap();
61 if tail.is_sign_positive() && tail.is_infinite() {
62 buckets.pop();
64 }
65
66 Ok(buckets)
67}
68
69#[derive(Clone, Debug)]
73pub struct HistogramOpts {
74 pub common_opts: Opts,
76
77 pub buckets: Vec<f64>,
83}
84
85impl HistogramOpts {
86 pub fn new<S1: Into<String>, S2: Into<String>>(name: S1, help: S2) -> HistogramOpts {
88 HistogramOpts {
89 common_opts: Opts::new(name, help),
90 buckets: Vec::from(DEFAULT_BUCKETS as &'static [f64]),
91 }
92 }
93
94 pub fn namespace<S: Into<String>>(mut self, namespace: S) -> Self {
96 self.common_opts.namespace = namespace.into();
97 self
98 }
99
100 pub fn subsystem<S: Into<String>>(mut self, subsystem: S) -> Self {
102 self.common_opts.subsystem = subsystem.into();
103 self
104 }
105
106 pub fn const_labels(mut self, const_labels: HashMap<String, String>) -> Self {
108 self.common_opts = self.common_opts.const_labels(const_labels);
109 self
110 }
111
112 pub fn const_label<S1: Into<String>, S2: Into<String>>(mut self, name: S1, value: S2) -> Self {
114 self.common_opts = self.common_opts.const_label(name, value);
115 self
116 }
117
118 pub fn variable_labels(mut self, variable_labels: Vec<String>) -> Self {
120 self.common_opts = self.common_opts.variable_labels(variable_labels);
121 self
122 }
123
124 pub fn variable_label<S: Into<String>>(mut self, name: S) -> Self {
126 self.common_opts = self.common_opts.variable_label(name);
127 self
128 }
129
130 pub fn fq_name(&self) -> String {
132 self.common_opts.fq_name()
133 }
134
135 pub fn buckets(mut self, buckets: Vec<f64>) -> Self {
137 self.buckets = buckets;
138 self
139 }
140}
141
142impl Describer for HistogramOpts {
143 fn describe(&self) -> Result<Desc> {
144 self.common_opts.describe()
145 }
146}
147
148impl From<Opts> for HistogramOpts {
149 fn from(opts: Opts) -> HistogramOpts {
150 HistogramOpts {
151 common_opts: opts,
152 buckets: Vec::from(DEFAULT_BUCKETS as &'static [f64]),
153 }
154 }
155}
156
157#[derive(Debug)]
161struct Shard {
162 sum: AtomicF64,
163 count: AtomicU64,
164 buckets: Vec<AtomicU64>,
165}
166
167impl Shard {
168 fn new(num_buckets: usize) -> Self {
169 let mut buckets = Vec::new();
170 for _ in 0..num_buckets {
171 buckets.push(AtomicU64::new(0));
172 }
173
174 Shard {
175 sum: AtomicF64::new(0.0),
176 count: AtomicU64::new(0),
177 buckets,
178 }
179 }
180}
181
182#[derive(Debug, Clone, Copy)]
186enum ShardIndex {
187 First,
189 Second,
191}
192
193impl ShardIndex {
194 fn inverse(self) -> ShardIndex {
196 match self {
197 ShardIndex::First => ShardIndex::Second,
198 ShardIndex::Second => ShardIndex::First,
199 }
200 }
201}
202
203impl From<u64> for ShardIndex {
204 fn from(index: u64) -> Self {
205 match index {
206 0 => ShardIndex::First,
207 1 => ShardIndex::Second,
208 _ => panic!(
209 "Invalid shard index {:?}. A histogram only has two shards.",
210 index
211 ),
212 }
213 }
214}
215
216impl From<ShardIndex> for usize {
217 fn from(index: ShardIndex) -> Self {
218 match index {
219 ShardIndex::First => 0,
220 ShardIndex::Second => 1,
221 }
222 }
223}
224
225#[derive(Debug)]
228struct ShardAndCount {
229 inner: StdAtomicU64,
230}
231
232impl ShardAndCount {
233 fn new() -> Self {
237 ShardAndCount {
238 inner: StdAtomicU64::new(0),
239 }
240 }
241
242 fn flip(&self, ordering: Ordering) -> (ShardIndex, u64) {
245 let n = self.inner.fetch_add(1 << 63, ordering);
246
247 ShardAndCount::split_shard_index_and_count(n)
248 }
249
250 fn get(&self) -> (ShardIndex, u64) {
253 let n = self.inner.load(Ordering::Relaxed);
254
255 ShardAndCount::split_shard_index_and_count(n)
256 }
257
258 fn inc_by(&self, delta: u64, ordering: Ordering) -> (ShardIndex, u64) {
261 let n = self.inner.fetch_add(delta, ordering);
262
263 ShardAndCount::split_shard_index_and_count(n)
264 }
265
266 fn inc(&self, ordering: Ordering) -> (ShardIndex, u64) {
269 self.inc_by(1, ordering)
270 }
271
272 fn split_shard_index_and_count(n: u64) -> (ShardIndex, u64) {
273 let shard = n >> 63;
274 let count = n & ((1 << 63) - 1);
275
276 (shard.into(), count)
277 }
278}
279
280#[derive(Debug)]
309pub struct HistogramCore {
310 desc: Desc,
311 label_pairs: Vec<proto::LabelPair>,
312
313 collect_lock: Mutex<()>,
318
319 shard_and_count: ShardAndCount,
322 shards: [Shard; 2],
325
326 upper_bounds: Vec<f64>,
327}
328
329impl HistogramCore {
330 pub fn new(opts: &HistogramOpts, label_values: &[&str]) -> Result<HistogramCore> {
331 let desc = opts.describe()?;
332
333 for name in &desc.variable_labels {
334 check_bucket_label(name)?;
335 }
336 for pair in &desc.const_label_pairs {
337 check_bucket_label(pair.get_name())?;
338 }
339
340 let label_pairs = make_label_pairs(&desc, label_values)?;
341
342 let buckets = check_and_adjust_buckets(opts.buckets.clone())?;
343
344 Ok(HistogramCore {
345 desc,
346 label_pairs,
347
348 collect_lock: Mutex::new(()),
349
350 shard_and_count: ShardAndCount::new(),
351 shards: [Shard::new(buckets.len()), Shard::new(buckets.len())],
352
353 upper_bounds: buckets,
354 })
355 }
356
357 pub fn observe(&self, v: f64) {
364 let (shard_index, _count) = self.shard_and_count.inc(Ordering::Acquire);
371
372 let shard: &Shard = &self.shards[usize::from(shard_index)];
373
374 let mut iter = self
376 .upper_bounds
377 .iter()
378 .enumerate()
379 .filter(|&(_, f)| v <= *f);
380 if let Some((i, _)) = iter.next() {
381 shard.buckets[i].inc_by(1);
382 }
383
384 shard.sum.inc_by(v);
385 shard.count.inc_by_with_ordering(1, Ordering::Release);
387 }
388
389 pub fn proto(&self) -> proto::Histogram {
396 let collect_guard = self.collect_lock.lock().expect("Lock poisoned");
397
398 let (cold_shard_index, overall_count) = self.shard_and_count.flip(Ordering::AcqRel);
402
403 let cold_shard = &self.shards[usize::from(cold_shard_index)];
404 let hot_shard = &self.shards[usize::from(cold_shard_index.inverse())];
405
406 while cold_shard
419 .count
420 .compare_exchange_weak(
421 overall_count,
422 0,
424 Ordering::Acquire,
425 Ordering::Acquire,
426 )
427 .is_err()
428 {}
429
430 let cold_shard_sum = cold_shard.sum.swap(0.0, Ordering::AcqRel);
435
436 let mut h = proto::Histogram::default();
437 h.set_sample_sum(cold_shard_sum);
438 h.set_sample_count(overall_count);
439
440 let mut cumulative_count = 0;
441 let mut buckets = Vec::with_capacity(self.upper_bounds.len());
442 for (i, upper_bound) in self.upper_bounds.iter().enumerate() {
443 let cold_bucket_count = cold_shard.buckets[i].swap(0, Ordering::AcqRel);
448 hot_shard.buckets[i].inc_by(cold_bucket_count);
449
450 cumulative_count += cold_bucket_count;
451 let mut b = proto::Bucket::default();
452 b.set_cumulative_count(cumulative_count);
453 b.set_upper_bound(*upper_bound);
454 buckets.push(b);
455 }
456 h.set_bucket(from_vec!(buckets));
457
458 hot_shard.count.inc_by(overall_count);
460 hot_shard.sum.inc_by(cold_shard_sum);
461
462 drop(collect_guard);
463
464 h
465 }
466
467 fn sample_sum(&self) -> f64 {
468 let _guard = self.collect_lock.lock().expect("Lock poisoned");
471
472 let (shard_index, _count) = self.shard_and_count.get();
473 self.shards[shard_index as usize].sum.get()
474 }
475
476 fn sample_count(&self) -> u64 {
477 self.shard_and_count.get().1
478 }
479}
480
481#[cfg(all(feature = "nightly", target_os = "linux"))]
483pub struct Timespec(libc::timespec);
484
485#[cfg(all(feature = "nightly", target_os = "linux"))]
486impl std::fmt::Debug for Timespec {
487 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
488 write!(
489 f,
490 "Timespec {{ tv_sec: {}, tv_nsec: {} }}",
491 self.0.tv_sec, self.0.tv_nsec
492 )
493 }
494}
495
496#[derive(Debug)]
497pub enum Instant {
498 Monotonic(StdInstant),
499 #[cfg(all(feature = "nightly", target_os = "linux"))]
500 MonotonicCoarse(Timespec),
501}
502
503impl Instant {
504 pub fn now() -> Instant {
505 Instant::Monotonic(StdInstant::now())
506 }
507
508 #[cfg(all(feature = "nightly", target_os = "linux"))]
509 pub fn now_coarse() -> Instant {
510 Instant::MonotonicCoarse(get_time_coarse())
511 }
512
513 #[cfg(all(feature = "nightly", not(target_os = "linux")))]
514 pub fn now_coarse() -> Instant {
515 Instant::Monotonic(StdInstant::now())
516 }
517
518 pub fn elapsed(&self) -> Duration {
519 match self {
520 Instant::Monotonic(i) => StdInstant::now().saturating_duration_since(*i),
522
523 #[cfg(all(feature = "nightly", target_os = "linux"))]
529 Instant::MonotonicCoarse(t) => {
530 let now = get_time_coarse();
531 let now_ms = now.0.tv_sec * MILLIS_PER_SEC + now.0.tv_nsec / NANOS_PER_MILLI;
532 let t_ms = t.0.tv_sec * MILLIS_PER_SEC + t.0.tv_nsec / NANOS_PER_MILLI;
533 let dur = now_ms - t_ms;
534 if dur >= 0 {
535 Duration::from_millis(dur as u64)
536 } else {
537 Duration::from_millis(0)
538 }
539 }
540 }
541 }
542
543 #[inline]
544 pub fn elapsed_sec(&self) -> f64 {
545 duration_to_seconds(self.elapsed())
546 }
547}
548
549#[cfg(all(feature = "nightly", target_os = "linux"))]
550use self::coarse::*;
551
552#[cfg(all(feature = "nightly", target_os = "linux"))]
553mod coarse {
554 use crate::histogram::Timespec;
555 pub use libc::timespec;
556 use libc::{clock_gettime, CLOCK_MONOTONIC_COARSE};
557
558 pub const NANOS_PER_MILLI: i64 = 1_000_000;
559 pub const MILLIS_PER_SEC: i64 = 1_000;
560
561 pub fn get_time_coarse() -> Timespec {
562 let mut t = Timespec(timespec {
563 tv_sec: 0,
564 tv_nsec: 0,
565 });
566 assert_eq!(
567 unsafe { clock_gettime(CLOCK_MONOTONIC_COARSE, &mut t.0) },
568 0
569 );
570 t
571 }
572}
573
574#[must_use = "Timer should be kept in a variable otherwise it cannot observe duration"]
580#[derive(Debug)]
581pub struct HistogramTimer {
582 histogram: Histogram,
584 observed: bool,
586 start: Instant,
588}
589
590impl HistogramTimer {
591 fn new(histogram: Histogram) -> Self {
592 Self {
593 histogram,
594 observed: false,
595 start: Instant::now(),
596 }
597 }
598
599 #[cfg(feature = "nightly")]
600 fn new_coarse(histogram: Histogram) -> Self {
601 HistogramTimer {
602 histogram,
603 observed: false,
604 start: Instant::now_coarse(),
605 }
606 }
607
608 pub fn observe_duration(self) {
613 self.stop_and_record();
614 }
615
616 pub fn stop_and_record(self) -> f64 {
621 let mut timer = self;
622 timer.observe(true)
623 }
624
625 pub fn stop_and_discard(self) -> f64 {
630 let mut timer = self;
631 timer.observe(false)
632 }
633
634 fn observe(&mut self, record: bool) -> f64 {
635 let v = self.start.elapsed_sec();
636 self.observed = true;
637 if record {
638 self.histogram.observe(v);
639 }
640 v
641 }
642}
643
644impl Drop for HistogramTimer {
645 fn drop(&mut self) {
646 if !self.observed {
647 self.observe(true);
648 }
649 }
650}
651
652#[derive(Clone, Debug)]
670pub struct Histogram {
671 core: Arc<HistogramCore>,
672}
673
674impl Histogram {
675 pub fn with_opts(opts: HistogramOpts) -> Result<Histogram> {
677 Histogram::with_opts_and_label_values(&opts, &[])
678 }
679
680 fn with_opts_and_label_values(
681 opts: &HistogramOpts,
682 label_values: &[&str],
683 ) -> Result<Histogram> {
684 let core = HistogramCore::new(opts, label_values)?;
685
686 Ok(Histogram {
687 core: Arc::new(core),
688 })
689 }
690}
691
692impl Histogram {
693 pub fn observe(&self, v: f64) {
695 self.core.observe(v)
696 }
697
698 pub fn start_timer(&self) -> HistogramTimer {
700 HistogramTimer::new(self.clone())
701 }
702
703 #[cfg(feature = "nightly")]
706 pub fn start_coarse_timer(&self) -> HistogramTimer {
707 HistogramTimer::new_coarse(self.clone())
708 }
709
710 pub fn observe_closure_duration<F, T>(&self, f: F) -> T
712 where
713 F: FnOnce() -> T,
714 {
715 let instant = Instant::now();
716 let res = f();
717 let elapsed = instant.elapsed_sec();
718 self.observe(elapsed);
719 res
720 }
721
722 #[cfg(feature = "nightly")]
724 pub fn observe_closure_duration_coarse<F, T>(&self, f: F) -> T
725 where
726 F: FnOnce() -> T,
727 {
728 let instant = Instant::now_coarse();
729 let res = f();
730 let elapsed = instant.elapsed_sec();
731 self.observe(elapsed);
732 res
733 }
734
735 pub fn local(&self) -> LocalHistogram {
737 LocalHistogram::new(self.clone())
738 }
739
740 pub fn get_sample_sum(&self) -> f64 {
742 self.core.sample_sum()
743 }
744
745 pub fn get_sample_count(&self) -> u64 {
747 self.core.sample_count()
748 }
749}
750
751impl Metric for Histogram {
752 fn metric(&self) -> proto::Metric {
753 let mut m = proto::Metric::default();
754 m.set_label(from_vec!(self.core.label_pairs.clone()));
755
756 let h = self.core.proto();
757 m.set_histogram(h);
758
759 m
760 }
761}
762
763impl Collector for Histogram {
764 fn desc(&self) -> Vec<&Desc> {
765 vec![&self.core.desc]
766 }
767
768 fn collect(&self) -> Vec<proto::MetricFamily> {
769 let mut m = proto::MetricFamily::default();
770 m.set_name(self.core.desc.fq_name.clone());
771 m.set_help(self.core.desc.help.clone());
772 m.set_field_type(proto::MetricType::HISTOGRAM);
773 m.set_metric(from_vec!(vec![self.metric()]));
774
775 vec![m]
776 }
777}
778
779#[derive(Clone, Debug)]
780pub struct HistogramVecBuilder {}
781
782impl MetricVecBuilder for HistogramVecBuilder {
783 type M = Histogram;
784 type P = HistogramOpts;
785
786 fn build(&self, opts: &HistogramOpts, vals: &[&str]) -> Result<Histogram> {
787 Histogram::with_opts_and_label_values(opts, vals)
788 }
789}
790
791pub type HistogramVec = MetricVec<HistogramVecBuilder>;
796
797impl HistogramVec {
798 pub fn new(opts: HistogramOpts, label_names: &[&str]) -> Result<HistogramVec> {
802 let variable_names = label_names.iter().map(|s| (*s).to_owned()).collect();
803 let opts = opts.variable_labels(variable_names);
804 let metric_vec =
805 MetricVec::create(proto::MetricType::HISTOGRAM, HistogramVecBuilder {}, opts)?;
806
807 Ok(metric_vec as HistogramVec)
808 }
809
810 pub fn local(&self) -> LocalHistogramVec {
812 let vec = self.clone();
813 LocalHistogramVec::new(vec)
814 }
815}
816
817pub fn linear_buckets(start: f64, width: f64, count: usize) -> Result<Vec<f64>> {
825 if count < 1 {
826 return Err(Error::Msg(format!(
827 "LinearBuckets needs a positive count, count: {}",
828 count
829 )));
830 }
831 if width <= 0.0 {
832 return Err(Error::Msg(format!(
833 "LinearBuckets needs a width greater then 0, width: {}",
834 width
835 )));
836 }
837
838 let buckets: Vec<_> = (0..count)
839 .map(|step| start + width * (step as f64))
840 .collect();
841
842 Ok(buckets)
843}
844
845pub fn exponential_buckets(start: f64, factor: f64, count: usize) -> Result<Vec<f64>> {
854 if count < 1 {
855 return Err(Error::Msg(format!(
856 "exponential_buckets needs a positive count, count: {}",
857 count
858 )));
859 }
860 if start <= 0.0 {
861 return Err(Error::Msg(format!(
862 "exponential_buckets needs a positive start value, \
863 start: {}",
864 start
865 )));
866 }
867 if factor <= 1.0 {
868 return Err(Error::Msg(format!(
869 "exponential_buckets needs a factor greater than 1, \
870 factor: {}",
871 factor
872 )));
873 }
874
875 let mut next = start;
876 let mut buckets = Vec::with_capacity(count);
877 for _ in 0..count {
878 buckets.push(next);
879 next *= factor;
880 }
881
882 Ok(buckets)
883}
884
885#[inline]
887pub fn duration_to_seconds(d: Duration) -> f64 {
888 let nanos = f64::from(d.subsec_nanos()) / 1e9;
889 d.as_secs() as f64 + nanos
890}
891
892#[derive(Clone, Debug)]
893pub struct LocalHistogramCore {
894 histogram: Histogram,
895 counts: Vec<u64>,
896 count: u64,
897 sum: f64,
898}
899
900#[derive(Debug)]
902pub struct LocalHistogram {
903 core: RefCell<LocalHistogramCore>,
904}
905
906impl Clone for LocalHistogram {
907 fn clone(&self) -> LocalHistogram {
908 let core = self.core.clone();
909 let lh = LocalHistogram { core };
910 lh.clear();
911 lh
912 }
913}
914
915#[must_use = "Timer should be kept in a variable otherwise it cannot observe duration"]
917#[derive(Debug)]
918pub struct LocalHistogramTimer {
919 local: LocalHistogram,
921 observed: bool,
923 start: Instant,
925}
926
927impl LocalHistogramTimer {
928 fn new(histogram: LocalHistogram) -> Self {
929 Self {
930 local: histogram,
931 observed: false,
932 start: Instant::now(),
933 }
934 }
935
936 #[cfg(feature = "nightly")]
937 fn new_coarse(histogram: LocalHistogram) -> Self {
938 Self {
939 local: histogram,
940 observed: false,
941 start: Instant::now_coarse(),
942 }
943 }
944
945 pub fn observe_duration(self) {
950 self.stop_and_record();
951 }
952
953 pub fn stop_and_record(self) -> f64 {
958 let mut timer = self;
959 timer.observe(true)
960 }
961
962 pub fn stop_and_discard(self) -> f64 {
967 let mut timer = self;
968 timer.observe(false)
969 }
970
971 fn observe(&mut self, record: bool) -> f64 {
972 let v = self.start.elapsed_sec();
973 self.observed = true;
974 if record {
975 self.local.observe(v);
976 }
977 v
978 }
979}
980
981impl Drop for LocalHistogramTimer {
982 fn drop(&mut self) {
983 if !self.observed {
984 self.observe(true);
985 }
986 }
987}
988
989impl LocalHistogramCore {
990 fn new(histogram: Histogram) -> LocalHistogramCore {
991 let counts = vec![0; histogram.core.upper_bounds.len()];
992
993 LocalHistogramCore {
994 histogram,
995 counts,
996 count: 0,
997 sum: 0.0,
998 }
999 }
1000
1001 pub fn observe(&mut self, v: f64) {
1002 let mut iter = self
1004 .histogram
1005 .core
1006 .upper_bounds
1007 .iter()
1008 .enumerate()
1009 .filter(|&(_, f)| v <= *f);
1010 if let Some((i, _)) = iter.next() {
1011 self.counts[i] += 1;
1012 }
1013
1014 self.count += 1;
1015 self.sum += v;
1016 }
1017
1018 pub fn clear(&mut self) {
1019 for v in &mut self.counts {
1020 *v = 0
1021 }
1022
1023 self.count = 0;
1024 self.sum = 0.0;
1025 }
1026
1027 pub fn flush(&mut self) {
1028 if self.count == 0 {
1030 return;
1031 }
1032
1033 {
1034 let (shard_index, _count) = self
1041 .histogram
1042 .core
1043 .shard_and_count
1044 .inc_by(self.count, Ordering::Acquire);
1045 let shard = &self.histogram.core.shards[shard_index as usize];
1046
1047 for (i, v) in self.counts.iter().enumerate() {
1048 if *v > 0 {
1049 shard.buckets[i].inc_by(*v);
1050 }
1051 }
1052
1053 shard.sum.inc_by(self.sum);
1054 shard
1056 .count
1057 .inc_by_with_ordering(self.count, Ordering::Release);
1058 }
1059
1060 self.clear()
1061 }
1062
1063 fn sample_sum(&self) -> f64 {
1064 self.sum
1065 }
1066
1067 fn sample_count(&self) -> u64 {
1068 self.count
1069 }
1070}
1071
1072impl LocalHistogram {
1073 fn new(histogram: Histogram) -> LocalHistogram {
1074 let core = LocalHistogramCore::new(histogram);
1075 LocalHistogram {
1076 core: RefCell::new(core),
1077 }
1078 }
1079
1080 pub fn observe(&self, v: f64) {
1082 self.core.borrow_mut().observe(v);
1083 }
1084
1085 pub fn start_timer(&self) -> LocalHistogramTimer {
1087 LocalHistogramTimer::new(self.clone())
1088 }
1089
1090 #[cfg(feature = "nightly")]
1093 pub fn start_coarse_timer(&self) -> LocalHistogramTimer {
1094 LocalHistogramTimer::new_coarse(self.clone())
1095 }
1096
1097 pub fn observe_closure_duration<F, T>(&self, f: F) -> T
1099 where
1100 F: FnOnce() -> T,
1101 {
1102 let instant = Instant::now();
1103 let res = f();
1104 let elapsed = instant.elapsed_sec();
1105 self.observe(elapsed);
1106 res
1107 }
1108
1109 #[cfg(feature = "nightly")]
1111 pub fn observe_closure_duration_coarse<F, T>(&self, f: F) -> T
1112 where
1113 F: FnOnce() -> T,
1114 {
1115 let instant = Instant::now_coarse();
1116 let res = f();
1117 let elapsed = instant.elapsed_sec();
1118 self.observe(elapsed);
1119 res
1120 }
1121
1122 pub fn clear(&self) {
1124 self.core.borrow_mut().clear();
1125 }
1126
1127 pub fn flush(&self) {
1129 self.core.borrow_mut().flush();
1130 }
1131
1132 pub fn get_sample_sum(&self) -> f64 {
1134 self.core.borrow().sample_sum()
1135 }
1136
1137 pub fn get_sample_count(&self) -> u64 {
1139 self.core.borrow().sample_count()
1140 }
1141}
1142
1143impl LocalMetric for LocalHistogram {
1144 fn flush(&self) {
1146 LocalHistogram::flush(self);
1147 }
1148}
1149
1150impl Drop for LocalHistogram {
1151 fn drop(&mut self) {
1152 self.flush()
1153 }
1154}
1155
1156#[derive(Debug)]
1158pub struct LocalHistogramVec {
1159 vec: HistogramVec,
1160 local: HashMap<u64, LocalHistogram>,
1161}
1162
1163impl LocalHistogramVec {
1164 fn new(vec: HistogramVec) -> LocalHistogramVec {
1165 let local = HashMap::with_capacity(vec.v.children.read().len());
1166 LocalHistogramVec { vec, local }
1167 }
1168
1169 pub fn with_label_values<'a>(&'a mut self, vals: &[&str]) -> &'a LocalHistogram {
1172 let hash = self.vec.v.hash_label_values(vals).unwrap();
1173 let vec = &self.vec;
1174 self.local
1175 .entry(hash)
1176 .or_insert_with(|| vec.with_label_values(vals).local())
1177 }
1178
1179 pub fn remove_label_values(&mut self, vals: &[&str]) -> Result<()> {
1182 let hash = self.vec.v.hash_label_values(vals)?;
1183 self.local.remove(&hash);
1184 self.vec.v.delete_label_values(vals)
1185 }
1186
1187 pub fn flush(&self) {
1189 for h in self.local.values() {
1190 h.flush();
1191 }
1192 }
1193}
1194
1195impl LocalMetric for LocalHistogramVec {
1196 fn flush(&self) {
1198 LocalHistogramVec::flush(self)
1199 }
1200}
1201
1202impl Clone for LocalHistogramVec {
1203 fn clone(&self) -> LocalHistogramVec {
1204 LocalHistogramVec::new(self.vec.clone())
1205 }
1206}
1207
1208#[cfg(test)]
1209mod tests {
1210 use std::f64::{EPSILON, INFINITY};
1211 use std::thread;
1212 use std::time::Duration;
1213
1214 use super::*;
1215 use crate::metrics::{Collector, Metric};
1216
1217 #[test]
1218 fn test_histogram() {
1219 let opts = HistogramOpts::new("test1", "test help")
1220 .const_label("a", "1")
1221 .const_label("b", "2");
1222 let histogram = Histogram::with_opts(opts).unwrap();
1223 histogram.observe(1.0);
1224
1225 let timer = histogram.start_timer();
1226 thread::sleep(Duration::from_millis(100));
1227 timer.observe_duration();
1228
1229 let timer = histogram.start_timer();
1230 let handler = thread::spawn(move || {
1231 let _timer = timer;
1232 thread::sleep(Duration::from_millis(400));
1233 });
1234 assert!(handler.join().is_ok());
1235
1236 let mut mfs = histogram.collect();
1237 assert_eq!(mfs.len(), 1);
1238
1239 let mf = mfs.pop().unwrap();
1240 let m = mf.get_metric().get(0).unwrap();
1241 assert_eq!(m.get_label().len(), 2);
1242 let proto_histogram = m.get_histogram();
1243 assert_eq!(proto_histogram.get_sample_count(), 3);
1244 assert!(proto_histogram.get_sample_sum() >= 1.5);
1245 assert_eq!(proto_histogram.get_bucket().len(), DEFAULT_BUCKETS.len());
1246
1247 let buckets = vec![1.0, 2.0, 3.0];
1248 let opts = HistogramOpts::new("test2", "test help").buckets(buckets.clone());
1249 let histogram = Histogram::with_opts(opts).unwrap();
1250 let mut mfs = histogram.collect();
1251 assert_eq!(mfs.len(), 1);
1252
1253 let mf = mfs.pop().unwrap();
1254 let m = mf.get_metric().get(0).unwrap();
1255 assert_eq!(m.get_label().len(), 0);
1256 let proto_histogram = m.get_histogram();
1257 assert_eq!(proto_histogram.get_sample_count(), 0);
1258 assert!((proto_histogram.get_sample_sum() - 0.0) < EPSILON);
1259 assert_eq!(proto_histogram.get_bucket().len(), buckets.len())
1260 }
1261
1262 #[test]
1263 #[cfg(feature = "nightly")]
1264 fn test_histogram_coarse_timer() {
1265 let opts = HistogramOpts::new("test1", "test help");
1266 let histogram = Histogram::with_opts(opts).unwrap();
1267
1268 let timer = histogram.start_coarse_timer();
1269 thread::sleep(Duration::from_millis(100));
1270 timer.observe_duration();
1271
1272 let timer = histogram.start_coarse_timer();
1273 let handler = thread::spawn(move || {
1274 let _timer = timer;
1275 thread::sleep(Duration::from_millis(400));
1276 });
1277 assert!(handler.join().is_ok());
1278
1279 histogram.observe_closure_duration(|| {
1280 thread::sleep(Duration::from_millis(400));
1281 });
1282
1283 let mut mfs = histogram.collect();
1284 assert_eq!(mfs.len(), 1);
1285
1286 let mf = mfs.pop().unwrap();
1287 let m = mf.get_metric().get(0).unwrap();
1288 let proto_histogram = m.get_histogram();
1289 assert_eq!(proto_histogram.get_sample_count(), 3);
1290 assert!((proto_histogram.get_sample_sum() - 0.0) > EPSILON);
1291 }
1292
1293 #[test]
1294 #[cfg(feature = "nightly")]
1295 fn test_instant_on_smp() {
1296 let zero = Duration::from_millis(0);
1297 for i in 0..100_000 {
1298 let now = Instant::now();
1299 let now_coarse = Instant::now_coarse();
1300 if i % 100 == 0 {
1301 thread::yield_now();
1302 }
1303 assert!(now.elapsed() >= zero);
1304 assert!(now_coarse.elapsed() >= zero);
1305 }
1306 }
1307
1308 #[test]
1309 fn test_buckets_invalidation() {
1310 let table = vec![
1311 (vec![], true, DEFAULT_BUCKETS.len()),
1312 (vec![-2.0, -1.0, -0.5, 0.0, 0.5, 1.0, 2.0], true, 7),
1313 (vec![-2.0, -1.0, -0.5, 10.0, 0.5, 1.0, 2.0], false, 7),
1314 (vec![-2.0, -1.0, -0.5, 0.0, 0.5, 1.0, INFINITY], true, 6),
1315 ];
1316
1317 for (buckets, is_ok, length) in table {
1318 let got = check_and_adjust_buckets(buckets);
1319 assert_eq!(got.is_ok(), is_ok);
1320 if is_ok {
1321 assert_eq!(got.unwrap().len(), length);
1322 }
1323 }
1324 }
1325
1326 #[test]
1327 fn test_buckets_functions() {
1328 let linear_table = vec![
1329 (
1330 -15.0,
1331 5.0,
1332 6,
1333 true,
1334 vec![-15.0, -10.0, -5.0, 0.0, 5.0, 10.0],
1335 ),
1336 (-15.0, 0.0, 6, false, vec![]),
1337 (-15.0, 5.0, 0, false, vec![]),
1338 ];
1339
1340 for (param1, param2, param3, is_ok, vec) in linear_table {
1341 let got = linear_buckets(param1, param2, param3);
1342 assert_eq!(got.is_ok(), is_ok);
1343 if got.is_ok() {
1344 assert_eq!(got.unwrap(), vec);
1345 }
1346 }
1347
1348 let exponential_table = vec![
1349 (100.0, 1.2, 3, true, vec![100.0, 120.0, 144.0]),
1350 (100.0, 0.5, 3, false, vec![]),
1351 (100.0, 1.2, 0, false, vec![]),
1352 ];
1353
1354 for (param1, param2, param3, is_ok, vec) in exponential_table {
1355 let got = exponential_buckets(param1, param2, param3);
1356 assert_eq!(got.is_ok(), is_ok);
1357 if got.is_ok() {
1358 assert_eq!(got.unwrap(), vec);
1359 }
1360 }
1361 }
1362
1363 #[test]
1364 fn test_duration_to_seconds() {
1365 let tbls = vec![(1000, 1.0), (1100, 1.1), (100_111, 100.111)];
1366 for (millis, seconds) in tbls {
1367 let d = Duration::from_millis(millis);
1368 let v = duration_to_seconds(d);
1369 assert!((v - seconds).abs() < EPSILON);
1370 }
1371 }
1372
1373 #[test]
1374 fn test_histogram_vec_with_label_values() {
1375 let vec = HistogramVec::new(
1376 HistogramOpts::new("test_histogram_vec", "test histogram vec help"),
1377 &["l1", "l2"],
1378 )
1379 .unwrap();
1380
1381 assert!(vec.remove_label_values(&["v1", "v2"]).is_err());
1382 vec.with_label_values(&["v1", "v2"]).observe(1.0);
1383 assert!(vec.remove_label_values(&["v1", "v2"]).is_ok());
1384
1385 assert!(vec.remove_label_values(&["v1"]).is_err());
1386 assert!(vec.remove_label_values(&["v1", "v3"]).is_err());
1387 }
1388
1389 #[test]
1390 fn test_histogram_vec_with_opts_buckets() {
1391 let labels = ["l1", "l2"];
1392 let buckets = vec![1.0, 2.0, 3.0];
1393 let vec = HistogramVec::new(
1394 HistogramOpts::new("test_histogram_vec", "test histogram vec help")
1395 .buckets(buckets.clone()),
1396 &labels,
1397 )
1398 .unwrap();
1399
1400 let histogram = vec.with_label_values(&["v1", "v2"]);
1401 histogram.observe(1.0);
1402
1403 let m = histogram.metric();
1404 assert_eq!(m.get_label().len(), labels.len());
1405
1406 let proto_histogram = m.get_histogram();
1407 assert_eq!(proto_histogram.get_sample_count(), 1);
1408 assert!((proto_histogram.get_sample_sum() - 1.0) < EPSILON);
1409 assert_eq!(proto_histogram.get_bucket().len(), buckets.len())
1410 }
1411
1412 #[test]
1413 fn test_histogram_local() {
1414 let buckets = vec![1.0, 2.0, 3.0];
1415 let opts = HistogramOpts::new("test_histogram_local", "test histogram local help")
1416 .buckets(buckets.clone());
1417 let histogram = Histogram::with_opts(opts).unwrap();
1418 let local = histogram.local();
1419
1420 let check = |count, sum| {
1421 let m = histogram.metric();
1422 let proto_histogram = m.get_histogram();
1423 assert_eq!(proto_histogram.get_sample_count(), count);
1424 assert!((proto_histogram.get_sample_sum() - sum) < EPSILON);
1425 };
1426
1427 local.observe(1.0);
1428 local.observe(4.0);
1429 check(0, 0.0);
1430
1431 local.flush();
1432 check(2, 5.0);
1433
1434 local.observe(2.0);
1435 local.clear();
1436 check(2, 5.0);
1437
1438 local.observe(2.0);
1439 drop(local);
1440 check(3, 7.0);
1441 }
1442
1443 #[test]
1444 fn test_histogram_vec_local() {
1445 let vec = HistogramVec::new(
1446 HistogramOpts::new("test_histogram_vec_local", "test histogram vec help"),
1447 &["l1", "l2"],
1448 )
1449 .unwrap();
1450 let mut local_vec = vec.local();
1451
1452 vec.remove_label_values(&["v1", "v2"]).unwrap_err();
1453 local_vec.remove_label_values(&["v1", "v2"]).unwrap_err();
1454
1455 let check = |count, sum| {
1456 let ms = vec.collect()[0].take_metric();
1457 let proto_histogram = ms[0].get_histogram();
1458 assert_eq!(proto_histogram.get_sample_count(), count);
1459 assert!((proto_histogram.get_sample_sum() - sum) < EPSILON);
1460 };
1461
1462 {
1463 let h = local_vec.with_label_values(&["v1", "v2"]);
1465 h.observe(1.0);
1466 h.flush();
1467 check(1, 1.0);
1468 }
1469
1470 {
1471 local_vec.with_label_values(&["v1", "v2"]).observe(4.0);
1473 local_vec.flush();
1474 check(2, 5.0);
1475 }
1476 {
1477 local_vec.remove_label_values(&["v1", "v2"]).unwrap();
1479
1480 local_vec.with_label_values(&["v1", "v2"]).observe(2.0);
1482 drop(local_vec);
1483 check(1, 2.0);
1484 }
1485 }
1486
1487 #[test]
1491 fn atomic_observe_across_collects() {
1492 let done = Arc::new(std::sync::atomic::AtomicBool::default());
1493 let histogram =
1494 Histogram::with_opts(HistogramOpts::new("test_name", "test help").buckets(vec![1.0]))
1495 .unwrap();
1496
1497 let done_clone = done.clone();
1498 let histogram_clone = histogram.clone();
1499 let observing_thread = std::thread::spawn(move || loop {
1500 if done_clone.load(std::sync::atomic::Ordering::Relaxed) {
1501 break;
1502 }
1503
1504 for _ in 0..1_000_000 {
1505 histogram_clone.observe(1.0);
1506 }
1507 });
1508
1509 let mut sample_count = 0;
1510 let mut cumulative_count = 0;
1511 let mut sample_sum = 0;
1512 for _ in 0..1_000_000 {
1513 let metric = &histogram.collect()[0].take_metric()[0];
1514 let proto = metric.get_histogram();
1515
1516 sample_count = proto.get_sample_count();
1517 sample_sum = proto.get_sample_sum() as u64;
1518 cumulative_count = proto.get_bucket()[0].get_cumulative_count();
1520
1521 if sample_count != cumulative_count {
1522 break;
1523 }
1524
1525 if sample_count != sample_sum {
1530 break;
1531 }
1532 }
1533
1534 done.store(true, std::sync::atomic::Ordering::Relaxed);
1535 observing_thread.join().unwrap();
1536
1537 if sample_count != cumulative_count {
1538 panic!(
1539 "Histogram invariant violated: For a histogram with a single \
1540 bucket observing values below the bucket's upper bound only \
1541 the histogram's count should always be equal to the buckets's \
1542 cumulative count, got {:?} and {:?} instead.",
1543 sample_count, cumulative_count,
1544 );
1545 }
1546
1547 if sample_count != sample_sum {
1548 panic!(
1549 "Histogram invariant violated: For a histogram which is only \
1550 ever observing a value of `1.0` the sample count should equal \
1551 the sum, instead got: {:?} and {:?}",
1552 sample_count, sample_sum,
1553 )
1554 }
1555 }
1556
1557 #[test]
1558 fn test_error_on_inconsistent_label_cardinality() {
1559 let hist = Histogram::with_opts(
1560 histogram_opts!(
1561 "example_histogram",
1562 "Used as an example",
1563 vec![0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 5.0]
1564 )
1565 .variable_label("example_variable"),
1566 );
1567
1568 if let Err(Error::InconsistentCardinality { expect, got }) = hist {
1569 assert_eq!(1, expect);
1570 assert_eq!(0, got);
1571 } else {
1572 panic!("Expected InconsistentCardinality error.")
1573 }
1574 }
1575}