1use std::cell::RefCell;
5use std::collections::HashMap;
6use std::convert::From;
7use std::sync::{
8    atomic::{AtomicU64 as StdAtomicU64, Ordering},
9    Arc, Mutex,
10};
11use std::time::{Duration, Instant as StdInstant};
12
13use crate::atomic64::{Atomic, AtomicF64, AtomicU64};
14use crate::desc::{Desc, Describer};
15use crate::errors::{Error, Result};
16use crate::metrics::{Collector, LocalMetric, Metric, Opts};
17use crate::proto;
18use crate::value::make_label_pairs;
19use crate::vec::{MetricVec, MetricVecBuilder};
20
21pub const DEFAULT_BUCKETS: &[f64; 11] = &[
26    0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
27];
28
29pub const BUCKET_LABEL: &str = "le";
32
33#[inline]
34fn check_bucket_label(label: &str) -> Result<()> {
35    if label == BUCKET_LABEL {
36        return Err(Error::Msg(
37            "`le` is not allowed as label name in histograms".to_owned(),
38        ));
39    }
40
41    Ok(())
42}
43
44fn check_and_adjust_buckets(mut buckets: Vec<f64>) -> Result<Vec<f64>> {
45    if buckets.is_empty() {
46        buckets = Vec::from(DEFAULT_BUCKETS as &'static [f64]);
47    }
48
49    for (i, upper_bound) in buckets.iter().enumerate() {
50        if i < (buckets.len() - 1) && *upper_bound >= buckets[i + 1] {
51            return Err(Error::Msg(format!(
52                "histogram buckets must be in increasing \
53                 order: {} >= {}",
54                upper_bound,
55                buckets[i + 1]
56            )));
57        }
58    }
59
60    let tail = *buckets.last().unwrap();
61    if tail.is_sign_positive() && tail.is_infinite() {
62        buckets.pop();
64    }
65
66    Ok(buckets)
67}
68
69#[derive(Clone, Debug)]
73pub struct HistogramOpts {
74    pub common_opts: Opts,
76
77    pub buckets: Vec<f64>,
83}
84
85impl HistogramOpts {
86    pub fn new<S1: Into<String>, S2: Into<String>>(name: S1, help: S2) -> HistogramOpts {
88        HistogramOpts {
89            common_opts: Opts::new(name, help),
90            buckets: Vec::from(DEFAULT_BUCKETS as &'static [f64]),
91        }
92    }
93
94    pub fn namespace<S: Into<String>>(mut self, namespace: S) -> Self {
96        self.common_opts.namespace = namespace.into();
97        self
98    }
99
100    pub fn subsystem<S: Into<String>>(mut self, subsystem: S) -> Self {
102        self.common_opts.subsystem = subsystem.into();
103        self
104    }
105
106    pub fn const_labels(mut self, const_labels: HashMap<String, String>) -> Self {
108        self.common_opts = self.common_opts.const_labels(const_labels);
109        self
110    }
111
112    pub fn const_label<S1: Into<String>, S2: Into<String>>(mut self, name: S1, value: S2) -> Self {
114        self.common_opts = self.common_opts.const_label(name, value);
115        self
116    }
117
118    pub fn variable_labels(mut self, variable_labels: Vec<String>) -> Self {
120        self.common_opts = self.common_opts.variable_labels(variable_labels);
121        self
122    }
123
124    pub fn variable_label<S: Into<String>>(mut self, name: S) -> Self {
126        self.common_opts = self.common_opts.variable_label(name);
127        self
128    }
129
130    pub fn fq_name(&self) -> String {
132        self.common_opts.fq_name()
133    }
134
135    pub fn buckets(mut self, buckets: Vec<f64>) -> Self {
137        self.buckets = buckets;
138        self
139    }
140}
141
142impl Describer for HistogramOpts {
143    fn describe(&self) -> Result<Desc> {
144        self.common_opts.describe()
145    }
146}
147
148impl From<Opts> for HistogramOpts {
149    fn from(opts: Opts) -> HistogramOpts {
150        HistogramOpts {
151            common_opts: opts,
152            buckets: Vec::from(DEFAULT_BUCKETS as &'static [f64]),
153        }
154    }
155}
156
157#[derive(Debug)]
161struct Shard {
162    sum: AtomicF64,
163    count: AtomicU64,
164    buckets: Vec<AtomicU64>,
165}
166
167impl Shard {
168    fn new(num_buckets: usize) -> Self {
169        let mut buckets = Vec::new();
170        for _ in 0..num_buckets {
171            buckets.push(AtomicU64::new(0));
172        }
173
174        Shard {
175            sum: AtomicF64::new(0.0),
176            count: AtomicU64::new(0),
177            buckets,
178        }
179    }
180}
181
182#[derive(Debug, Clone, Copy)]
186enum ShardIndex {
187    First,
189    Second,
191}
192
193impl ShardIndex {
194    fn inverse(self) -> ShardIndex {
196        match self {
197            ShardIndex::First => ShardIndex::Second,
198            ShardIndex::Second => ShardIndex::First,
199        }
200    }
201}
202
203impl From<u64> for ShardIndex {
204    fn from(index: u64) -> Self {
205        match index {
206            0 => ShardIndex::First,
207            1 => ShardIndex::Second,
208            _ => panic!(
209                "Invalid shard index {:?}. A histogram only has two shards.",
210                index
211            ),
212        }
213    }
214}
215
216impl From<ShardIndex> for usize {
217    fn from(index: ShardIndex) -> Self {
218        match index {
219            ShardIndex::First => 0,
220            ShardIndex::Second => 1,
221        }
222    }
223}
224
225#[derive(Debug)]
228struct ShardAndCount {
229    inner: StdAtomicU64,
230}
231
232impl ShardAndCount {
233    fn new() -> Self {
237        ShardAndCount {
238            inner: StdAtomicU64::new(0),
239        }
240    }
241
242    fn flip(&self, ordering: Ordering) -> (ShardIndex, u64) {
245        let n = self.inner.fetch_add(1 << 63, ordering);
246
247        ShardAndCount::split_shard_index_and_count(n)
248    }
249
250    fn get(&self) -> (ShardIndex, u64) {
253        let n = self.inner.load(Ordering::Relaxed);
254
255        ShardAndCount::split_shard_index_and_count(n)
256    }
257
258    fn inc_by(&self, delta: u64, ordering: Ordering) -> (ShardIndex, u64) {
261        let n = self.inner.fetch_add(delta, ordering);
262
263        ShardAndCount::split_shard_index_and_count(n)
264    }
265
266    fn inc(&self, ordering: Ordering) -> (ShardIndex, u64) {
269        self.inc_by(1, ordering)
270    }
271
272    fn split_shard_index_and_count(n: u64) -> (ShardIndex, u64) {
273        let shard = n >> 63;
274        let count = n & ((1 << 63) - 1);
275
276        (shard.into(), count)
277    }
278}
279
280#[derive(Debug)]
309pub struct HistogramCore {
310    desc: Desc,
311    label_pairs: Vec<proto::LabelPair>,
312
313    collect_lock: Mutex<()>,
318
319    shard_and_count: ShardAndCount,
322    shards: [Shard; 2],
325
326    upper_bounds: Vec<f64>,
327}
328
329impl HistogramCore {
330    pub fn new<V: AsRef<str>>(opts: &HistogramOpts, label_values: &[V]) -> Result<HistogramCore> {
331        let desc = opts.describe()?;
332
333        for name in &desc.variable_labels {
334            check_bucket_label(name)?;
335        }
336        for pair in &desc.const_label_pairs {
337            check_bucket_label(pair.name())?;
338        }
339
340        let label_pairs = make_label_pairs(&desc, label_values)?;
341
342        let buckets = check_and_adjust_buckets(opts.buckets.clone())?;
343
344        Ok(HistogramCore {
345            desc,
346            label_pairs,
347
348            collect_lock: Mutex::new(()),
349
350            shard_and_count: ShardAndCount::new(),
351            shards: [Shard::new(buckets.len()), Shard::new(buckets.len())],
352
353            upper_bounds: buckets,
354        })
355    }
356
357    pub fn observe(&self, v: f64) {
364        let (shard_index, _count) = self.shard_and_count.inc(Ordering::Acquire);
371
372        let shard: &Shard = &self.shards[usize::from(shard_index)];
373
374        let mut iter = self
376            .upper_bounds
377            .iter()
378            .enumerate()
379            .filter(|&(_, f)| v <= *f);
380        if let Some((i, _)) = iter.next() {
381            shard.buckets[i].inc_by(1);
382        }
383
384        shard.sum.inc_by(v);
385        shard.count.inc_by_with_ordering(1, Ordering::Release);
387    }
388
389    pub fn proto(&self) -> proto::Histogram {
396        let collect_guard = self.collect_lock.lock().expect("Lock poisoned");
397
398        let (cold_shard_index, overall_count) = self.shard_and_count.flip(Ordering::AcqRel);
402
403        let cold_shard = &self.shards[usize::from(cold_shard_index)];
404        let hot_shard = &self.shards[usize::from(cold_shard_index.inverse())];
405
406        while cold_shard
419            .count
420            .compare_exchange_weak(
421                overall_count,
422                0,
424                Ordering::Acquire,
425                Ordering::Acquire,
426            )
427            .is_err()
428        {}
429
430        let cold_shard_sum = cold_shard.sum.swap(0.0, Ordering::AcqRel);
435
436        let mut h = proto::Histogram::default();
437        h.set_sample_sum(cold_shard_sum);
438        h.set_sample_count(overall_count);
439
440        let mut cumulative_count = 0;
441        let mut buckets = Vec::with_capacity(self.upper_bounds.len());
442        for (i, upper_bound) in self.upper_bounds.iter().enumerate() {
443            let cold_bucket_count = cold_shard.buckets[i].swap(0, Ordering::AcqRel);
448            hot_shard.buckets[i].inc_by(cold_bucket_count);
449
450            cumulative_count += cold_bucket_count;
451            let mut b = proto::Bucket::default();
452            b.set_cumulative_count(cumulative_count);
453            b.set_upper_bound(*upper_bound);
454            buckets.push(b);
455        }
456        h.set_bucket(buckets);
457
458        hot_shard.count.inc_by(overall_count);
460        hot_shard.sum.inc_by(cold_shard_sum);
461
462        drop(collect_guard);
463
464        h
465    }
466
467    fn sample_sum(&self) -> f64 {
468        let _guard = self.collect_lock.lock().expect("Lock poisoned");
471
472        let (shard_index, _count) = self.shard_and_count.get();
473        self.shards[shard_index as usize].sum.get()
474    }
475
476    fn sample_count(&self) -> u64 {
477        self.shard_and_count.get().1
478    }
479}
480
481#[cfg(all(feature = "nightly", target_os = "linux"))]
483pub struct Timespec(libc::timespec);
484
485#[cfg(all(feature = "nightly", target_os = "linux"))]
486impl std::fmt::Debug for Timespec {
487    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
488        write!(
489            f,
490            "Timespec {{ tv_sec: {}, tv_nsec: {} }}",
491            self.0.tv_sec, self.0.tv_nsec
492        )
493    }
494}
495
496#[derive(Debug)]
497pub enum Instant {
498    Monotonic(StdInstant),
499    #[cfg(all(feature = "nightly", target_os = "linux"))]
500    MonotonicCoarse(Timespec),
501}
502
503impl Instant {
504    pub fn now() -> Instant {
505        Instant::Monotonic(StdInstant::now())
506    }
507
508    #[cfg(all(feature = "nightly", target_os = "linux"))]
509    pub fn now_coarse() -> Instant {
510        Instant::MonotonicCoarse(get_time_coarse())
511    }
512
513    #[cfg(all(feature = "nightly", not(target_os = "linux")))]
514    pub fn now_coarse() -> Instant {
515        Instant::Monotonic(StdInstant::now())
516    }
517
518    pub fn elapsed(&self) -> Duration {
519        match self {
520            Instant::Monotonic(i) => StdInstant::now().saturating_duration_since(*i),
522
523            #[cfg(all(feature = "nightly", target_os = "linux"))]
529            Instant::MonotonicCoarse(t) => {
530                let now = get_time_coarse();
531                let now_ms = now.0.tv_sec * MILLIS_PER_SEC + now.0.tv_nsec / NANOS_PER_MILLI;
532                let t_ms = t.0.tv_sec * MILLIS_PER_SEC + t.0.tv_nsec / NANOS_PER_MILLI;
533                let dur = now_ms - t_ms;
534                if dur >= 0 {
535                    Duration::from_millis(dur as u64)
536                } else {
537                    Duration::from_millis(0)
538                }
539            }
540        }
541    }
542
543    #[inline]
544    pub fn elapsed_sec(&self) -> f64 {
545        duration_to_seconds(self.elapsed())
546    }
547}
548
549#[cfg(all(feature = "nightly", target_os = "linux"))]
550use self::coarse::*;
551
552#[cfg(all(feature = "nightly", target_os = "linux"))]
553mod coarse {
554    use crate::histogram::Timespec;
555    pub use libc::timespec;
556    use libc::{clock_gettime, CLOCK_MONOTONIC_COARSE};
557
558    pub const NANOS_PER_MILLI: i64 = 1_000_000;
559    pub const MILLIS_PER_SEC: i64 = 1_000;
560
561    pub fn get_time_coarse() -> Timespec {
562        let mut t = Timespec(timespec {
563            tv_sec: 0,
564            tv_nsec: 0,
565        });
566        assert_eq!(
567            unsafe { clock_gettime(CLOCK_MONOTONIC_COARSE, &mut t.0) },
568            0
569        );
570        t
571    }
572}
573
574#[must_use = "Timer should be kept in a variable otherwise it cannot observe duration"]
580#[derive(Debug)]
581pub struct HistogramTimer {
582    histogram: Histogram,
584    observed: bool,
586    start: Instant,
588}
589
590impl HistogramTimer {
591    fn new(histogram: Histogram) -> Self {
592        Self {
593            histogram,
594            observed: false,
595            start: Instant::now(),
596        }
597    }
598
599    #[cfg(feature = "nightly")]
600    fn new_coarse(histogram: Histogram) -> Self {
601        HistogramTimer {
602            histogram,
603            observed: false,
604            start: Instant::now_coarse(),
605        }
606    }
607
608    pub fn observe_duration(self) {
613        self.stop_and_record();
614    }
615
616    pub fn stop_and_record(self) -> f64 {
621        let mut timer = self;
622        timer.observe(true)
623    }
624
625    pub fn stop_and_discard(self) -> f64 {
630        let mut timer = self;
631        timer.observe(false)
632    }
633
634    fn observe(&mut self, record: bool) -> f64 {
635        let v = self.start.elapsed_sec();
636        self.observed = true;
637        if record {
638            self.histogram.observe(v);
639        }
640        v
641    }
642}
643
644impl Drop for HistogramTimer {
645    fn drop(&mut self) {
646        if !self.observed {
647            self.observe(true);
648        }
649    }
650}
651
652#[derive(Clone, Debug)]
670pub struct Histogram {
671    core: Arc<HistogramCore>,
672}
673
674impl Histogram {
675    pub fn with_opts(opts: HistogramOpts) -> Result<Histogram> {
677        Histogram::with_opts_and_label_values::<&str>(&opts, &[])
678    }
679
680    fn with_opts_and_label_values<V: AsRef<str>>(
681        opts: &HistogramOpts,
682        label_values: &[V],
683    ) -> Result<Histogram> {
684        let core = HistogramCore::new(opts, label_values)?;
685
686        Ok(Histogram {
687            core: Arc::new(core),
688        })
689    }
690}
691
692impl Histogram {
693    pub fn observe(&self, v: f64) {
695        self.core.observe(v)
696    }
697
698    pub fn start_timer(&self) -> HistogramTimer {
700        HistogramTimer::new(self.clone())
701    }
702
703    #[cfg(feature = "nightly")]
706    pub fn start_coarse_timer(&self) -> HistogramTimer {
707        HistogramTimer::new_coarse(self.clone())
708    }
709
710    pub fn observe_closure_duration<F, T>(&self, f: F) -> T
712    where
713        F: FnOnce() -> T,
714    {
715        let instant = Instant::now();
716        let res = f();
717        let elapsed = instant.elapsed_sec();
718        self.observe(elapsed);
719        res
720    }
721
722    #[cfg(feature = "nightly")]
724    pub fn observe_closure_duration_coarse<F, T>(&self, f: F) -> T
725    where
726        F: FnOnce() -> T,
727    {
728        let instant = Instant::now_coarse();
729        let res = f();
730        let elapsed = instant.elapsed_sec();
731        self.observe(elapsed);
732        res
733    }
734
735    pub fn local(&self) -> LocalHistogram {
737        LocalHistogram::new(self.clone())
738    }
739
740    pub fn get_sample_sum(&self) -> f64 {
742        self.core.sample_sum()
743    }
744
745    pub fn get_sample_count(&self) -> u64 {
747        self.core.sample_count()
748    }
749}
750
751impl Metric for Histogram {
752    fn metric(&self) -> proto::Metric {
753        let mut m = proto::Metric::from_label(self.core.label_pairs.clone());
754
755        let h = self.core.proto();
756        m.set_histogram(h);
757
758        m
759    }
760}
761
762impl Collector for Histogram {
763    fn desc(&self) -> Vec<&Desc> {
764        vec![&self.core.desc]
765    }
766
767    fn collect(&self) -> Vec<proto::MetricFamily> {
768        let mut m = proto::MetricFamily::default();
769        m.set_name(self.core.desc.fq_name.clone());
770        m.set_help(self.core.desc.help.clone());
771        m.set_field_type(proto::MetricType::HISTOGRAM);
772        m.set_metric(vec![self.metric()]);
773
774        vec![m]
775    }
776}
777
778#[derive(Clone, Debug)]
779pub struct HistogramVecBuilder {}
780
781impl MetricVecBuilder for HistogramVecBuilder {
782    type M = Histogram;
783    type P = HistogramOpts;
784
785    fn build<V: AsRef<str>>(&self, opts: &HistogramOpts, vals: &[V]) -> Result<Histogram> {
786        Histogram::with_opts_and_label_values(opts, vals)
787    }
788}
789
790pub type HistogramVec = MetricVec<HistogramVecBuilder>;
795
796impl HistogramVec {
797    pub fn new(opts: HistogramOpts, label_names: &[&str]) -> Result<HistogramVec> {
801        let variable_names = label_names.iter().map(|s| (*s).to_owned()).collect();
802        let opts = opts.variable_labels(variable_names);
803        let metric_vec =
804            MetricVec::create(proto::MetricType::HISTOGRAM, HistogramVecBuilder {}, opts)?;
805
806        Ok(metric_vec as HistogramVec)
807    }
808
809    pub fn local(&self) -> LocalHistogramVec {
811        let vec = self.clone();
812        LocalHistogramVec::new(vec)
813    }
814}
815
816pub fn linear_buckets(start: f64, width: f64, count: usize) -> Result<Vec<f64>> {
824    if count < 1 {
825        return Err(Error::Msg(format!(
826            "LinearBuckets needs a positive count, count: {}",
827            count
828        )));
829    }
830    if width <= 0.0 {
831        return Err(Error::Msg(format!(
832            "LinearBuckets needs a width greater then 0, width: {}",
833            width
834        )));
835    }
836
837    let buckets: Vec<_> = (0..count)
838        .map(|step| start + width * (step as f64))
839        .collect();
840
841    Ok(buckets)
842}
843
844pub fn exponential_buckets(start: f64, factor: f64, count: usize) -> Result<Vec<f64>> {
853    if count < 1 {
854        return Err(Error::Msg(format!(
855            "exponential_buckets needs a positive count, count: {}",
856            count
857        )));
858    }
859    if start <= 0.0 {
860        return Err(Error::Msg(format!(
861            "exponential_buckets needs a positive start value, \
862             start: {}",
863            start
864        )));
865    }
866    if factor <= 1.0 {
867        return Err(Error::Msg(format!(
868            "exponential_buckets needs a factor greater than 1, \
869             factor: {}",
870            factor
871        )));
872    }
873
874    let mut next = start;
875    let mut buckets = Vec::with_capacity(count);
876    for _ in 0..count {
877        buckets.push(next);
878        next *= factor;
879    }
880
881    Ok(buckets)
882}
883
884#[inline]
886pub fn duration_to_seconds(d: Duration) -> f64 {
887    let nanos = f64::from(d.subsec_nanos()) / 1e9;
888    d.as_secs() as f64 + nanos
889}
890
891#[derive(Clone, Debug)]
892pub struct LocalHistogramCore {
893    histogram: Histogram,
894    counts: Vec<u64>,
895    count: u64,
896    sum: f64,
897}
898
899#[derive(Debug)]
901pub struct LocalHistogram {
902    core: RefCell<LocalHistogramCore>,
903}
904
905impl Clone for LocalHistogram {
906    fn clone(&self) -> LocalHistogram {
907        let core = self.core.clone();
908        let lh = LocalHistogram { core };
909        lh.clear();
910        lh
911    }
912}
913
914#[must_use = "Timer should be kept in a variable otherwise it cannot observe duration"]
916#[derive(Debug)]
917pub struct LocalHistogramTimer {
918    local: LocalHistogram,
920    observed: bool,
922    start: Instant,
924}
925
926impl LocalHistogramTimer {
927    fn new(histogram: LocalHistogram) -> Self {
928        Self {
929            local: histogram,
930            observed: false,
931            start: Instant::now(),
932        }
933    }
934
935    #[cfg(feature = "nightly")]
936    fn new_coarse(histogram: LocalHistogram) -> Self {
937        Self {
938            local: histogram,
939            observed: false,
940            start: Instant::now_coarse(),
941        }
942    }
943
944    pub fn observe_duration(self) {
949        self.stop_and_record();
950    }
951
952    pub fn stop_and_record(self) -> f64 {
957        let mut timer = self;
958        timer.observe(true)
959    }
960
961    pub fn stop_and_discard(self) -> f64 {
966        let mut timer = self;
967        timer.observe(false)
968    }
969
970    fn observe(&mut self, record: bool) -> f64 {
971        let v = self.start.elapsed_sec();
972        self.observed = true;
973        if record {
974            self.local.observe(v);
975        }
976        v
977    }
978}
979
980impl Drop for LocalHistogramTimer {
981    fn drop(&mut self) {
982        if !self.observed {
983            self.observe(true);
984        }
985    }
986}
987
988impl LocalHistogramCore {
989    fn new(histogram: Histogram) -> LocalHistogramCore {
990        let counts = vec![0; histogram.core.upper_bounds.len()];
991
992        LocalHistogramCore {
993            histogram,
994            counts,
995            count: 0,
996            sum: 0.0,
997        }
998    }
999
1000    pub fn observe(&mut self, v: f64) {
1001        let mut iter = self
1003            .histogram
1004            .core
1005            .upper_bounds
1006            .iter()
1007            .enumerate()
1008            .filter(|&(_, f)| v <= *f);
1009        if let Some((i, _)) = iter.next() {
1010            self.counts[i] += 1;
1011        }
1012
1013        self.count += 1;
1014        self.sum += v;
1015    }
1016
1017    pub fn clear(&mut self) {
1018        for v in &mut self.counts {
1019            *v = 0
1020        }
1021
1022        self.count = 0;
1023        self.sum = 0.0;
1024    }
1025
1026    pub fn flush(&mut self) {
1027        if self.count == 0 {
1029            return;
1030        }
1031
1032        {
1033            let (shard_index, _count) = self
1040                .histogram
1041                .core
1042                .shard_and_count
1043                .inc_by(self.count, Ordering::Acquire);
1044            let shard = &self.histogram.core.shards[shard_index as usize];
1045
1046            for (i, v) in self.counts.iter().enumerate() {
1047                if *v > 0 {
1048                    shard.buckets[i].inc_by(*v);
1049                }
1050            }
1051
1052            shard.sum.inc_by(self.sum);
1053            shard
1055                .count
1056                .inc_by_with_ordering(self.count, Ordering::Release);
1057        }
1058
1059        self.clear()
1060    }
1061
1062    fn sample_sum(&self) -> f64 {
1063        self.sum
1064    }
1065
1066    fn sample_count(&self) -> u64 {
1067        self.count
1068    }
1069}
1070
1071impl LocalHistogram {
1072    fn new(histogram: Histogram) -> LocalHistogram {
1073        let core = LocalHistogramCore::new(histogram);
1074        LocalHistogram {
1075            core: RefCell::new(core),
1076        }
1077    }
1078
1079    pub fn observe(&self, v: f64) {
1081        self.core.borrow_mut().observe(v);
1082    }
1083
1084    pub fn start_timer(&self) -> LocalHistogramTimer {
1086        LocalHistogramTimer::new(self.clone())
1087    }
1088
1089    #[cfg(feature = "nightly")]
1092    pub fn start_coarse_timer(&self) -> LocalHistogramTimer {
1093        LocalHistogramTimer::new_coarse(self.clone())
1094    }
1095
1096    pub fn observe_closure_duration<F, T>(&self, f: F) -> T
1098    where
1099        F: FnOnce() -> T,
1100    {
1101        let instant = Instant::now();
1102        let res = f();
1103        let elapsed = instant.elapsed_sec();
1104        self.observe(elapsed);
1105        res
1106    }
1107
1108    #[cfg(feature = "nightly")]
1110    pub fn observe_closure_duration_coarse<F, T>(&self, f: F) -> T
1111    where
1112        F: FnOnce() -> T,
1113    {
1114        let instant = Instant::now_coarse();
1115        let res = f();
1116        let elapsed = instant.elapsed_sec();
1117        self.observe(elapsed);
1118        res
1119    }
1120
1121    pub fn clear(&self) {
1123        self.core.borrow_mut().clear();
1124    }
1125
1126    pub fn flush(&self) {
1128        self.core.borrow_mut().flush();
1129    }
1130
1131    pub fn get_sample_sum(&self) -> f64 {
1133        self.core.borrow().sample_sum()
1134    }
1135
1136    pub fn get_sample_count(&self) -> u64 {
1138        self.core.borrow().sample_count()
1139    }
1140}
1141
1142impl LocalMetric for LocalHistogram {
1143    fn flush(&self) {
1145        LocalHistogram::flush(self);
1146    }
1147}
1148
1149impl Drop for LocalHistogram {
1150    fn drop(&mut self) {
1151        self.flush()
1152    }
1153}
1154
1155#[derive(Debug)]
1157pub struct LocalHistogramVec {
1158    vec: HistogramVec,
1159    local: HashMap<u64, LocalHistogram>,
1160}
1161
1162impl LocalHistogramVec {
1163    fn new(vec: HistogramVec) -> LocalHistogramVec {
1164        let local = HashMap::with_capacity(vec.v.children.read().len());
1165        LocalHistogramVec { vec, local }
1166    }
1167
1168    pub fn with_label_values<'a>(&'a mut self, vals: &[&str]) -> &'a LocalHistogram {
1171        let hash = self.vec.v.hash_label_values(vals).unwrap();
1172        let vec = &self.vec;
1173        self.local
1174            .entry(hash)
1175            .or_insert_with(|| vec.with_label_values(vals).local())
1176    }
1177
1178    pub fn remove_label_values(&mut self, vals: &[&str]) -> Result<()> {
1181        let hash = self.vec.v.hash_label_values(vals)?;
1182        self.local.remove(&hash);
1183        self.vec.v.delete_label_values(vals)
1184    }
1185
1186    pub fn flush(&self) {
1188        for h in self.local.values() {
1189            h.flush();
1190        }
1191    }
1192}
1193
1194impl LocalMetric for LocalHistogramVec {
1195    fn flush(&self) {
1197        LocalHistogramVec::flush(self)
1198    }
1199}
1200
1201impl Clone for LocalHistogramVec {
1202    fn clone(&self) -> LocalHistogramVec {
1203        LocalHistogramVec::new(self.vec.clone())
1204    }
1205}
1206
1207#[cfg(test)]
1208mod tests {
1209    use std::f64;
1210    use std::thread;
1211    use std::time::Duration;
1212
1213    use super::*;
1214    use crate::metrics::{Collector, Metric};
1215
1216    #[test]
1217    fn test_histogram() {
1218        let opts = HistogramOpts::new("test1", "test help")
1219            .const_label("a", "1")
1220            .const_label("b", "2");
1221        let histogram = Histogram::with_opts(opts).unwrap();
1222        histogram.observe(1.0);
1223
1224        let timer = histogram.start_timer();
1225        thread::sleep(Duration::from_millis(100));
1226        timer.observe_duration();
1227
1228        let timer = histogram.start_timer();
1229        let handler = thread::spawn(move || {
1230            let _timer = timer;
1231            thread::sleep(Duration::from_millis(400));
1232        });
1233        assert!(handler.join().is_ok());
1234
1235        let mut mfs = histogram.collect();
1236        assert_eq!(mfs.len(), 1);
1237
1238        let mf = mfs.pop().unwrap();
1239        let m = mf.get_metric().first().unwrap();
1240        assert_eq!(m.get_label().len(), 2);
1241        let proto_histogram = m.get_histogram();
1242        assert_eq!(proto_histogram.get_sample_count(), 3);
1243        assert!(proto_histogram.get_sample_sum() >= 1.5);
1244        assert_eq!(proto_histogram.get_bucket().len(), DEFAULT_BUCKETS.len());
1245
1246        let buckets = vec![1.0, 2.0, 3.0];
1247        let opts = HistogramOpts::new("test2", "test help").buckets(buckets.clone());
1248        let histogram = Histogram::with_opts(opts).unwrap();
1249        let mut mfs = histogram.collect();
1250        assert_eq!(mfs.len(), 1);
1251
1252        let mf = mfs.pop().unwrap();
1253        let m = mf.get_metric().first().unwrap();
1254        assert_eq!(m.get_label().len(), 0);
1255        let proto_histogram = m.get_histogram();
1256        assert_eq!(proto_histogram.get_sample_count(), 0);
1257        assert!((proto_histogram.get_sample_sum() - 0.0) < f64::EPSILON);
1258        assert_eq!(proto_histogram.get_bucket().len(), buckets.len())
1259    }
1260
1261    #[test]
1262    #[cfg(feature = "nightly")]
1263    fn test_histogram_coarse_timer() {
1264        let opts = HistogramOpts::new("test1", "test help");
1265        let histogram = Histogram::with_opts(opts).unwrap();
1266
1267        let timer = histogram.start_coarse_timer();
1268        thread::sleep(Duration::from_millis(100));
1269        timer.observe_duration();
1270
1271        let timer = histogram.start_coarse_timer();
1272        let handler = thread::spawn(move || {
1273            let _timer = timer;
1274            thread::sleep(Duration::from_millis(400));
1275        });
1276        assert!(handler.join().is_ok());
1277
1278        histogram.observe_closure_duration(|| {
1279            thread::sleep(Duration::from_millis(400));
1280        });
1281
1282        let mut mfs = histogram.collect();
1283        assert_eq!(mfs.len(), 1);
1284
1285        let mf = mfs.pop().unwrap();
1286        let m = mf.get_metric().get(0).unwrap();
1287        let proto_histogram = m.get_histogram();
1288        assert_eq!(proto_histogram.get_sample_count(), 3);
1289        assert!((proto_histogram.get_sample_sum() - 0.0) > f64::EPSILON);
1290    }
1291
1292    #[test]
1293    #[cfg(feature = "nightly")]
1294    fn test_instant_on_smp() {
1295        let zero = Duration::from_millis(0);
1296        for i in 0..100_000 {
1297            let now = Instant::now();
1298            let now_coarse = Instant::now_coarse();
1299            if i % 100 == 0 {
1300                thread::yield_now();
1301            }
1302            assert!(now.elapsed() >= zero);
1303            assert!(now_coarse.elapsed() >= zero);
1304        }
1305    }
1306
1307    #[test]
1308    fn test_buckets_invalidation() {
1309        let table = vec![
1310            (vec![], true, DEFAULT_BUCKETS.len()),
1311            (vec![-2.0, -1.0, -0.5, 0.0, 0.5, 1.0, 2.0], true, 7),
1312            (vec![-2.0, -1.0, -0.5, 10.0, 0.5, 1.0, 2.0], false, 7),
1313            (
1314                vec![-2.0, -1.0, -0.5, 0.0, 0.5, 1.0, f64::INFINITY],
1315                true,
1316                6,
1317            ),
1318        ];
1319
1320        for (buckets, is_ok, length) in table {
1321            let got = check_and_adjust_buckets(buckets);
1322            assert_eq!(got.is_ok(), is_ok);
1323            if is_ok {
1324                assert_eq!(got.unwrap().len(), length);
1325            }
1326        }
1327    }
1328
1329    #[test]
1330    fn test_buckets_functions() {
1331        let linear_table = vec![
1332            (
1333                -15.0,
1334                5.0,
1335                6,
1336                true,
1337                vec![-15.0, -10.0, -5.0, 0.0, 5.0, 10.0],
1338            ),
1339            (-15.0, 0.0, 6, false, vec![]),
1340            (-15.0, 5.0, 0, false, vec![]),
1341        ];
1342
1343        for (param1, param2, param3, is_ok, vec) in linear_table {
1344            let got = linear_buckets(param1, param2, param3);
1345            assert_eq!(got.is_ok(), is_ok);
1346            if got.is_ok() {
1347                assert_eq!(got.unwrap(), vec);
1348            }
1349        }
1350
1351        let exponential_table = vec![
1352            (100.0, 1.2, 3, true, vec![100.0, 120.0, 144.0]),
1353            (100.0, 0.5, 3, false, vec![]),
1354            (100.0, 1.2, 0, false, vec![]),
1355        ];
1356
1357        for (param1, param2, param3, is_ok, vec) in exponential_table {
1358            let got = exponential_buckets(param1, param2, param3);
1359            assert_eq!(got.is_ok(), is_ok);
1360            if got.is_ok() {
1361                assert_eq!(got.unwrap(), vec);
1362            }
1363        }
1364    }
1365
1366    #[test]
1367    fn test_duration_to_seconds() {
1368        let tbls = vec![(1000, 1.0), (1100, 1.1), (100_111, 100.111)];
1369        for (millis, seconds) in tbls {
1370            let d = Duration::from_millis(millis);
1371            let v = duration_to_seconds(d);
1372            assert!((v - seconds).abs() < f64::EPSILON);
1373        }
1374    }
1375
1376    #[test]
1377    fn test_histogram_vec_with_label_values() {
1378        let vec = HistogramVec::new(
1379            HistogramOpts::new("test_histogram_vec", "test histogram vec help"),
1380            &["l1", "l2"],
1381        )
1382        .unwrap();
1383
1384        assert!(vec.remove_label_values(&["v1", "v2"]).is_err());
1385        vec.with_label_values(&["v1", "v2"]).observe(1.0);
1386        assert!(vec.remove_label_values(&["v1", "v2"]).is_ok());
1387
1388        assert!(vec.remove_label_values(&["v1"]).is_err());
1389        assert!(vec.remove_label_values(&["v1", "v3"]).is_err());
1390    }
1391
1392    #[test]
1393    fn test_histogram_vec_with_owned_label_values() {
1394        let vec = HistogramVec::new(
1395            HistogramOpts::new("test_histogram_vec", "test histogram vec help"),
1396            &["l1", "l2"],
1397        )
1398        .unwrap();
1399
1400        let v1 = "v1".to_string();
1401        let v2 = "v2".to_string();
1402        let v3 = "v3".to_string();
1403
1404        assert!(vec.remove_label_values(&[v1.clone(), v2.clone()]).is_err());
1405        vec.with_label_values(&[v1.clone(), v2.clone()])
1406            .observe(1.0);
1407        assert!(vec.remove_label_values(&[v1.clone(), v2.clone()]).is_ok());
1408
1409        assert!(vec.remove_label_values(&[v1.clone()]).is_err());
1410        assert!(vec.remove_label_values(&[v1.clone(), v3.clone()]).is_err());
1411    }
1412
1413    #[test]
1414    fn test_histogram_vec_with_opts_buckets() {
1415        let labels = ["l1", "l2"];
1416        let buckets = vec![1.0, 2.0, 3.0];
1417        let vec = HistogramVec::new(
1418            HistogramOpts::new("test_histogram_vec", "test histogram vec help")
1419                .buckets(buckets.clone()),
1420            &labels,
1421        )
1422        .unwrap();
1423
1424        let histogram = vec.with_label_values(&["v1", "v2"]);
1425        histogram.observe(1.0);
1426
1427        let m = histogram.metric();
1428        assert_eq!(m.get_label().len(), labels.len());
1429
1430        let proto_histogram = m.get_histogram();
1431        assert_eq!(proto_histogram.get_sample_count(), 1);
1432        assert!((proto_histogram.get_sample_sum() - 1.0) < f64::EPSILON);
1433        assert_eq!(proto_histogram.get_bucket().len(), buckets.len())
1434    }
1435
1436    #[test]
1437    fn test_histogram_local() {
1438        let buckets = vec![1.0, 2.0, 3.0];
1439        let opts = HistogramOpts::new("test_histogram_local", "test histogram local help")
1440            .buckets(buckets.clone());
1441        let histogram = Histogram::with_opts(opts).unwrap();
1442        let local = histogram.local();
1443
1444        let check = |count, sum| {
1445            let m = histogram.metric();
1446            let proto_histogram = m.get_histogram();
1447            assert_eq!(proto_histogram.get_sample_count(), count);
1448            assert!((proto_histogram.get_sample_sum() - sum) < f64::EPSILON);
1449        };
1450
1451        local.observe(1.0);
1452        local.observe(4.0);
1453        check(0, 0.0);
1454
1455        local.flush();
1456        check(2, 5.0);
1457
1458        local.observe(2.0);
1459        local.clear();
1460        check(2, 5.0);
1461
1462        local.observe(2.0);
1463        drop(local);
1464        check(3, 7.0);
1465    }
1466
1467    #[test]
1468    fn test_histogram_vec_local() {
1469        let vec = HistogramVec::new(
1470            HistogramOpts::new("test_histogram_vec_local", "test histogram vec help"),
1471            &["l1", "l2"],
1472        )
1473        .unwrap();
1474        let mut local_vec = vec.local();
1475
1476        vec.remove_label_values(&["v1", "v2"]).unwrap_err();
1477        local_vec.remove_label_values(&["v1", "v2"]).unwrap_err();
1478
1479        let check = |count, sum| {
1480            let ms = vec.collect()[0].take_metric();
1481            let proto_histogram = ms[0].get_histogram();
1482            assert_eq!(proto_histogram.get_sample_count(), count);
1483            assert!((proto_histogram.get_sample_sum() - sum) < f64::EPSILON);
1484        };
1485
1486        {
1487            let h = local_vec.with_label_values(&["v1", "v2"]);
1489            h.observe(1.0);
1490            h.flush();
1491            check(1, 1.0);
1492        }
1493
1494        {
1495            local_vec.with_label_values(&["v1", "v2"]).observe(4.0);
1497            local_vec.flush();
1498            check(2, 5.0);
1499        }
1500        {
1501            local_vec.remove_label_values(&["v1", "v2"]).unwrap();
1503
1504            local_vec.with_label_values(&["v1", "v2"]).observe(2.0);
1506            drop(local_vec);
1507            check(1, 2.0);
1508        }
1509    }
1510
1511    #[test]
1515    fn atomic_observe_across_collects() {
1516        let done = Arc::new(std::sync::atomic::AtomicBool::default());
1517        let histogram =
1518            Histogram::with_opts(HistogramOpts::new("test_name", "test help").buckets(vec![1.0]))
1519                .unwrap();
1520
1521        let done_clone = done.clone();
1522        let histogram_clone = histogram.clone();
1523        let observing_thread = std::thread::spawn(move || loop {
1524            if done_clone.load(std::sync::atomic::Ordering::Relaxed) {
1525                break;
1526            }
1527
1528            for _ in 0..1_000_000 {
1529                histogram_clone.observe(1.0);
1530            }
1531        });
1532
1533        let mut sample_count = 0;
1534        let mut cumulative_count = 0;
1535        let mut sample_sum = 0;
1536        for _ in 0..1_000_000 {
1537            let metric = &histogram.collect()[0].take_metric()[0];
1538            let proto = metric.get_histogram();
1539
1540            sample_count = proto.get_sample_count();
1541            sample_sum = proto.get_sample_sum() as u64;
1542            cumulative_count = proto.get_bucket()[0].cumulative_count();
1544
1545            if sample_count != cumulative_count {
1546                break;
1547            }
1548
1549            if sample_count != sample_sum {
1554                break;
1555            }
1556        }
1557
1558        done.store(true, std::sync::atomic::Ordering::Relaxed);
1559        observing_thread.join().unwrap();
1560
1561        if sample_count != cumulative_count {
1562            panic!(
1563                "Histogram invariant violated: For a histogram with a single \
1564                 bucket observing values below the bucket's upper bound only \
1565                 the histogram's count should always be equal to the buckets's \
1566                 cumulative count, got {:?} and {:?} instead.",
1567                sample_count, cumulative_count,
1568            );
1569        }
1570
1571        if sample_count != sample_sum {
1572            panic!(
1573                "Histogram invariant violated: For a histogram which is only \
1574                 ever observing a value of `1.0` the sample count should equal \
1575                 the sum, instead got: {:?} and {:?}",
1576                sample_count, sample_sum,
1577            )
1578        }
1579    }
1580
1581    #[test]
1582    fn test_error_on_inconsistent_label_cardinality() {
1583        let hist = Histogram::with_opts(
1584            histogram_opts!(
1585                "example_histogram",
1586                "Used as an example",
1587                vec![0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 5.0]
1588            )
1589            .variable_label("example_variable"),
1590        );
1591
1592        if let Err(Error::InconsistentCardinality { expect, got }) = hist {
1593            assert_eq!(1, expect);
1594            assert_eq!(0, got);
1595        } else {
1596            panic!("Expected InconsistentCardinality error.")
1597        }
1598    }
1599}