prometheus/
histogram.rs

1// Copyright 2014 The Prometheus Authors
2// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
3
4use std::cell::RefCell;
5use std::collections::HashMap;
6use std::convert::From;
7use std::sync::{
8    atomic::{AtomicU64 as StdAtomicU64, Ordering},
9    Arc, Mutex,
10};
11use std::time::{Duration, Instant as StdInstant};
12
13use crate::atomic64::{Atomic, AtomicF64, AtomicU64};
14use crate::desc::{Desc, Describer};
15use crate::errors::{Error, Result};
16use crate::metrics::{Collector, LocalMetric, Metric, Opts};
17use crate::proto;
18use crate::value::make_label_pairs;
19use crate::vec::{MetricVec, MetricVecBuilder};
20
21/// The default [`Histogram`] buckets. The default buckets are
22/// tailored to broadly measure the response time (in seconds) of a
23/// network service. Most likely, however, you will be required to define
24/// buckets customized to your use case.
25pub const DEFAULT_BUCKETS: &[f64; 11] = &[
26    0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
27];
28
29/// Used for the label that defines the upper bound of a
30/// bucket of a histogram ("le" -> "less or equal").
31pub const BUCKET_LABEL: &str = "le";
32
33#[inline]
34fn check_bucket_label(label: &str) -> Result<()> {
35    if label == BUCKET_LABEL {
36        return Err(Error::Msg(
37            "`le` is not allowed as label name in histograms".to_owned(),
38        ));
39    }
40
41    Ok(())
42}
43
44fn check_and_adjust_buckets(mut buckets: Vec<f64>) -> Result<Vec<f64>> {
45    if buckets.is_empty() {
46        buckets = Vec::from(DEFAULT_BUCKETS as &'static [f64]);
47    }
48
49    for (i, upper_bound) in buckets.iter().enumerate() {
50        if i < (buckets.len() - 1) && *upper_bound >= buckets[i + 1] {
51            return Err(Error::Msg(format!(
52                "histogram buckets must be in increasing \
53                 order: {} >= {}",
54                upper_bound,
55                buckets[i + 1]
56            )));
57        }
58    }
59
60    let tail = *buckets.last().unwrap();
61    if tail.is_sign_positive() && tail.is_infinite() {
62        // The +Inf bucket is implicit. Remove it here.
63        buckets.pop();
64    }
65
66    Ok(buckets)
67}
68
69/// A struct that bundles the options for creating a [`Histogram`] metric. It is
70/// mandatory to set Name and Help to a non-empty string. All other fields are
71/// optional and can safely be left at their zero value.
72#[derive(Clone, Debug)]
73pub struct HistogramOpts {
74    /// A container holding various options.
75    pub common_opts: Opts,
76
77    /// Defines the buckets into which observations are counted. Each
78    /// element in the slice is the upper inclusive bound of a bucket. The
79    /// values must be sorted in strictly increasing order. There is no need
80    /// to add a highest bucket with +Inf bound, it will be added
81    /// implicitly. The default value is DefBuckets.
82    pub buckets: Vec<f64>,
83}
84
85impl HistogramOpts {
86    /// Create a [`HistogramOpts`] with the `name` and `help` arguments.
87    pub fn new<S1: Into<String>, S2: Into<String>>(name: S1, help: S2) -> HistogramOpts {
88        HistogramOpts {
89            common_opts: Opts::new(name, help),
90            buckets: Vec::from(DEFAULT_BUCKETS as &'static [f64]),
91        }
92    }
93
94    /// `namespace` sets the namespace.
95    pub fn namespace<S: Into<String>>(mut self, namespace: S) -> Self {
96        self.common_opts.namespace = namespace.into();
97        self
98    }
99
100    /// `subsystem` sets the sub system.
101    pub fn subsystem<S: Into<String>>(mut self, subsystem: S) -> Self {
102        self.common_opts.subsystem = subsystem.into();
103        self
104    }
105
106    /// `const_labels` sets the const labels.
107    pub fn const_labels(mut self, const_labels: HashMap<String, String>) -> Self {
108        self.common_opts = self.common_opts.const_labels(const_labels);
109        self
110    }
111
112    /// `const_label` adds a const label.
113    pub fn const_label<S1: Into<String>, S2: Into<String>>(mut self, name: S1, value: S2) -> Self {
114        self.common_opts = self.common_opts.const_label(name, value);
115        self
116    }
117
118    /// `variable_labels` sets the variable labels.
119    pub fn variable_labels(mut self, variable_labels: Vec<String>) -> Self {
120        self.common_opts = self.common_opts.variable_labels(variable_labels);
121        self
122    }
123
124    /// `variable_label` adds a variable label.
125    pub fn variable_label<S: Into<String>>(mut self, name: S) -> Self {
126        self.common_opts = self.common_opts.variable_label(name);
127        self
128    }
129
130    /// `fq_name` returns the fq_name.
131    pub fn fq_name(&self) -> String {
132        self.common_opts.fq_name()
133    }
134
135    /// `buckets` set the buckets.
136    pub fn buckets(mut self, buckets: Vec<f64>) -> Self {
137        self.buckets = buckets;
138        self
139    }
140}
141
142impl Describer for HistogramOpts {
143    fn describe(&self) -> Result<Desc> {
144        self.common_opts.describe()
145    }
146}
147
148impl From<Opts> for HistogramOpts {
149    fn from(opts: Opts) -> HistogramOpts {
150        HistogramOpts {
151            common_opts: opts,
152            buckets: Vec::from(DEFAULT_BUCKETS as &'static [f64]),
153        }
154    }
155}
156
157/// Representation of a hot or cold shard.
158///
159/// See [`HistogramCore`] for details.
160#[derive(Debug)]
161struct Shard {
162    sum: AtomicF64,
163    count: AtomicU64,
164    buckets: Vec<AtomicU64>,
165}
166
167impl Shard {
168    fn new(num_buckets: usize) -> Self {
169        let mut buckets = Vec::new();
170        for _ in 0..num_buckets {
171            buckets.push(AtomicU64::new(0));
172        }
173
174        Shard {
175            sum: AtomicF64::new(0.0),
176            count: AtomicU64::new(0),
177            buckets,
178        }
179    }
180}
181
182/// Index into an array of [`Shard`]s.
183///
184/// Used in conjunction with [`ShardAndCount`] below.
185#[derive(Debug, Clone, Copy)]
186enum ShardIndex {
187    /// First index. Corresponds to 0.
188    First,
189    /// Second index. Corresponds to 1.
190    Second,
191}
192
193impl ShardIndex {
194    /// Inverse the given [`ShardIndex`].
195    fn inverse(self) -> ShardIndex {
196        match self {
197            ShardIndex::First => ShardIndex::Second,
198            ShardIndex::Second => ShardIndex::First,
199        }
200    }
201}
202
203impl From<u64> for ShardIndex {
204    fn from(index: u64) -> Self {
205        match index {
206            0 => ShardIndex::First,
207            1 => ShardIndex::Second,
208            _ => panic!(
209                "Invalid shard index {:?}. A histogram only has two shards.",
210                index
211            ),
212        }
213    }
214}
215
216impl From<ShardIndex> for usize {
217    fn from(index: ShardIndex) -> Self {
218        match index {
219            ShardIndex::First => 0,
220            ShardIndex::Second => 1,
221        }
222    }
223}
224
225/// An atomic u64 with the most significant used as a [`ShardIndex`] and the
226/// remaining 63 bits used to count [`Histogram`] observations.
227#[derive(Debug)]
228struct ShardAndCount {
229    inner: StdAtomicU64,
230}
231
232impl ShardAndCount {
233    /// Return a new [`ShardAndCount`] with both the most significant bit
234    /// i.e. the `ShardIndex` and the remaining 63 bit i.e. the observation
235    /// count set to 0.
236    fn new() -> Self {
237        ShardAndCount {
238            inner: StdAtomicU64::new(0),
239        }
240    }
241
242    /// Flip the most significant bit i.e. the [`ShardIndex`] leaving the
243    /// remaining 63 bits unchanged.
244    fn flip(&self, ordering: Ordering) -> (ShardIndex, u64) {
245        let n = self.inner.fetch_add(1 << 63, ordering);
246
247        ShardAndCount::split_shard_index_and_count(n)
248    }
249
250    /// Get the most significant bit i.e. the [`ShardIndex`] as well as the
251    /// remaining 63 bits i.e. the observation count.
252    fn get(&self) -> (ShardIndex, u64) {
253        let n = self.inner.load(Ordering::Relaxed);
254
255        ShardAndCount::split_shard_index_and_count(n)
256    }
257
258    /// Increment the observation count leaving the most significant bit i.e.
259    /// the [`ShardIndex`] untouched.
260    fn inc_by(&self, delta: u64, ordering: Ordering) -> (ShardIndex, u64) {
261        let n = self.inner.fetch_add(delta, ordering);
262
263        ShardAndCount::split_shard_index_and_count(n)
264    }
265
266    /// Increment the observation count by one leaving the most significant bit
267    /// i.e. the [`ShardIndex`] untouched.
268    fn inc(&self, ordering: Ordering) -> (ShardIndex, u64) {
269        self.inc_by(1, ordering)
270    }
271
272    fn split_shard_index_and_count(n: u64) -> (ShardIndex, u64) {
273        let shard = n >> 63;
274        let count = n & ((1 << 63) - 1);
275
276        (shard.into(), count)
277    }
278}
279
280/// Core datastructure of a Prometheus histogram
281///
282/// # Atomicity across collects
283///
284/// A histogram supports two main execution paths:
285///
286/// 1. `observe` which increases the overall observation counter, updates the
287/// observation sum and increases a single bucket counter.
288///
289/// 2. `proto` (aka. collecting the metric, from now on referred to as the
290/// collect operation) which snapshots the state of the histogram and exposes it
291/// as a Protobuf struct.
292///
293/// If an observe and a collect operation interleave, the latter could be
294/// exposing a snapshot of the histogram that does not uphold all histogram
295/// invariants. For example for the invariant that the overall observation
296/// counter should equal the sum of all bucket counters: Say that an `observe`
297/// increases the overall counter but before updating a specific bucket counter
298/// a collect operation snapshots the histogram.
299///
300/// The below implementation of `HistogramCore` prevents such race conditions by
301/// using two shards, one hot shard for `observe` operations to record their
302/// observation and one cold shard for collect operations to collect a
303/// consistent snapshot of the histogram.
304///
305/// `observe` operations hit the hot shard and record their observation. Collect
306/// operations switch hot and cold, wait for all `observe` calls to finish on
307/// the previously hot now cold shard and then expose the consistent snapshot.
308#[derive(Debug)]
309pub struct HistogramCore {
310    desc: Desc,
311    label_pairs: Vec<proto::LabelPair>,
312
313    /// Mutual exclusion to serialize collect operations. No two collect
314    /// operations should operate on this datastructure at the same time. (See
315    /// struct documentation for details.) `observe` operations can operate in
316    /// parallel without holding this lock.
317    collect_lock: Mutex<()>,
318
319    /// An atomic u64 where the first bit determines the currently hot shard and
320    /// the remaining 63 bits determine the overall count.
321    shard_and_count: ShardAndCount,
322    /// The two shards where `shard_and_count` determines which one is the hot
323    /// and which one the cold at any given point in time.
324    shards: [Shard; 2],
325
326    upper_bounds: Vec<f64>,
327}
328
329impl HistogramCore {
330    pub fn new(opts: &HistogramOpts, label_values: &[&str]) -> Result<HistogramCore> {
331        let desc = opts.describe()?;
332
333        for name in &desc.variable_labels {
334            check_bucket_label(name)?;
335        }
336        for pair in &desc.const_label_pairs {
337            check_bucket_label(pair.get_name())?;
338        }
339
340        let label_pairs = make_label_pairs(&desc, label_values)?;
341
342        let buckets = check_and_adjust_buckets(opts.buckets.clone())?;
343
344        Ok(HistogramCore {
345            desc,
346            label_pairs,
347
348            collect_lock: Mutex::new(()),
349
350            shard_and_count: ShardAndCount::new(),
351            shards: [Shard::new(buckets.len()), Shard::new(buckets.len())],
352
353            upper_bounds: buckets,
354        })
355    }
356
357    /// Record a given observation (f64) in the histogram.
358    //
359    // First increase the overall observation counter and thus learn which shard
360    // is the current hot shard. Subsequently on the hot shard update the
361    // corresponding bucket count, adjust the shard's sum and finally increase
362    // the shard's count.
363    pub fn observe(&self, v: f64) {
364        // The collect code path uses `self.shard_and_count` and
365        // `self.shards[x].count` to ensure not to collect data from a shard
366        // while observe calls are still operating on it.
367        //
368        // To ensure the above, this `inc` needs to use `Acquire` ordering to
369        // force anything below this line to stay below it.
370        let (shard_index, _count) = self.shard_and_count.inc(Ordering::Acquire);
371
372        let shard: &Shard = &self.shards[usize::from(shard_index)];
373
374        // Try find the bucket.
375        let mut iter = self
376            .upper_bounds
377            .iter()
378            .enumerate()
379            .filter(|&(_, f)| v <= *f);
380        if let Some((i, _)) = iter.next() {
381            shard.buckets[i].inc_by(1);
382        }
383
384        shard.sum.inc_by(v);
385        // Use `Release` ordering to ensure all operations above stay above.
386        shard.count.inc_by_with_ordering(1, Ordering::Release);
387    }
388
389    /// Make a snapshot of the current histogram state exposed as a Protobuf
390    /// struct.
391    //
392    // Acquire the collect lock, switch the hot and the cold shard, wait for all
393    // remaining `observe` calls to finish on the previously hot now cold shard,
394    // snapshot the data, update the now hot shard and reset the cold shard.
395    pub fn proto(&self) -> proto::Histogram {
396        let collect_guard = self.collect_lock.lock().expect("Lock poisoned");
397
398        // `flip` needs to use AcqRel ordering to ensure the lock operation
399        // above stays above and the histogram operations (especially the shard
400        // resets) below stay below.
401        let (cold_shard_index, overall_count) = self.shard_and_count.flip(Ordering::AcqRel);
402
403        let cold_shard = &self.shards[usize::from(cold_shard_index)];
404        let hot_shard = &self.shards[usize::from(cold_shard_index.inverse())];
405
406        // Wait for all currently active `observe` calls on the now cold shard
407        // to finish. The above call to `flip` redirects all future `observe`
408        // calls to the other previously cold, now hot, shard. Thus once the
409        // cold shard counter equals the value of the global counter when the
410        // shards were flipped, all in-progress `observe` calls are done. With
411        // all of them done, the cold shard is now in a consistent state.
412        //
413        // `observe` uses `Release` ordering. `compare_exchange` needs to use
414        // `Acquire` ordering to ensure that (1) one sees all the previous
415        // `observe` stores to the counter and (2) to ensure the below shard
416        // modifications happen after this point, thus the shard is not modified
417        // by any `observe` operations.
418        while cold_shard
419            .count
420            .compare_exchange_weak(
421                overall_count,
422                // While at it, reset cold shard count on success.
423                0,
424                Ordering::Acquire,
425                Ordering::Acquire,
426            )
427            .is_err()
428        {}
429
430        // Get cold shard sum and reset to 0.
431        //
432        // Use `Acquire` for load and `Release` for store to ensure not to
433        // interfere with previous or upcoming collect calls.
434        let cold_shard_sum = cold_shard.sum.swap(0.0, Ordering::AcqRel);
435
436        let mut h = proto::Histogram::default();
437        h.set_sample_sum(cold_shard_sum);
438        h.set_sample_count(overall_count);
439
440        let mut cumulative_count = 0;
441        let mut buckets = Vec::with_capacity(self.upper_bounds.len());
442        for (i, upper_bound) in self.upper_bounds.iter().enumerate() {
443            // Reset the cold shard and update the hot shard.
444            //
445            // Use `Acquire` for load and `Release` for store to ensure not to
446            // interfere with previous or upcoming collect calls.
447            let cold_bucket_count = cold_shard.buckets[i].swap(0, Ordering::AcqRel);
448            hot_shard.buckets[i].inc_by(cold_bucket_count);
449
450            cumulative_count += cold_bucket_count;
451            let mut b = proto::Bucket::default();
452            b.set_cumulative_count(cumulative_count);
453            b.set_upper_bound(*upper_bound);
454            buckets.push(b);
455        }
456        h.set_bucket(from_vec!(buckets));
457
458        // Update the hot shard.
459        hot_shard.count.inc_by(overall_count);
460        hot_shard.sum.inc_by(cold_shard_sum);
461
462        drop(collect_guard);
463
464        h
465    }
466
467    fn sample_sum(&self) -> f64 {
468        // Make sure to not overlap with any collect calls, as they might flip
469        // the hot and cold shards.
470        let _guard = self.collect_lock.lock().expect("Lock poisoned");
471
472        let (shard_index, _count) = self.shard_and_count.get();
473        self.shards[shard_index as usize].sum.get()
474    }
475
476    fn sample_count(&self) -> u64 {
477        self.shard_and_count.get().1
478    }
479}
480
481// We have to wrap libc::timespec in order to implement std::fmt::Debug.
482#[cfg(all(feature = "nightly", target_os = "linux"))]
483pub struct Timespec(libc::timespec);
484
485#[cfg(all(feature = "nightly", target_os = "linux"))]
486impl std::fmt::Debug for Timespec {
487    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
488        write!(
489            f,
490            "Timespec {{ tv_sec: {}, tv_nsec: {} }}",
491            self.0.tv_sec, self.0.tv_nsec
492        )
493    }
494}
495
496#[derive(Debug)]
497pub enum Instant {
498    Monotonic(StdInstant),
499    #[cfg(all(feature = "nightly", target_os = "linux"))]
500    MonotonicCoarse(Timespec),
501}
502
503impl Instant {
504    pub fn now() -> Instant {
505        Instant::Monotonic(StdInstant::now())
506    }
507
508    #[cfg(all(feature = "nightly", target_os = "linux"))]
509    pub fn now_coarse() -> Instant {
510        Instant::MonotonicCoarse(get_time_coarse())
511    }
512
513    #[cfg(all(feature = "nightly", not(target_os = "linux")))]
514    pub fn now_coarse() -> Instant {
515        Instant::Monotonic(StdInstant::now())
516    }
517
518    pub fn elapsed(&self) -> Duration {
519        match self {
520            // We use `saturating_duration_since` to avoid panics caused by non-monotonic clocks.
521            Instant::Monotonic(i) => StdInstant::now().saturating_duration_since(*i),
522
523            // It is different from `Instant::Monotonic`, the resolution here is millisecond.
524            // The processors in an SMP system do not start all at exactly the same time
525            // and therefore the timer registers are typically running at an offset.
526            // Use millisecond resolution for ignoring the error.
527            // See more: https://linux.die.net/man/2/clock_gettime
528            #[cfg(all(feature = "nightly", target_os = "linux"))]
529            Instant::MonotonicCoarse(t) => {
530                let now = get_time_coarse();
531                let now_ms = now.0.tv_sec * MILLIS_PER_SEC + now.0.tv_nsec / NANOS_PER_MILLI;
532                let t_ms = t.0.tv_sec * MILLIS_PER_SEC + t.0.tv_nsec / NANOS_PER_MILLI;
533                let dur = now_ms - t_ms;
534                if dur >= 0 {
535                    Duration::from_millis(dur as u64)
536                } else {
537                    Duration::from_millis(0)
538                }
539            }
540        }
541    }
542
543    #[inline]
544    pub fn elapsed_sec(&self) -> f64 {
545        duration_to_seconds(self.elapsed())
546    }
547}
548
549#[cfg(all(feature = "nightly", target_os = "linux"))]
550use self::coarse::*;
551
552#[cfg(all(feature = "nightly", target_os = "linux"))]
553mod coarse {
554    use crate::histogram::Timespec;
555    pub use libc::timespec;
556    use libc::{clock_gettime, CLOCK_MONOTONIC_COARSE};
557
558    pub const NANOS_PER_MILLI: i64 = 1_000_000;
559    pub const MILLIS_PER_SEC: i64 = 1_000;
560
561    pub fn get_time_coarse() -> Timespec {
562        let mut t = Timespec(timespec {
563            tv_sec: 0,
564            tv_nsec: 0,
565        });
566        assert_eq!(
567            unsafe { clock_gettime(CLOCK_MONOTONIC_COARSE, &mut t.0) },
568            0
569        );
570        t
571    }
572}
573
574/// Timer to measure and record the duration of an event.
575///
576/// This timer can be stopped and observed at most once, either automatically (when it
577/// goes out of scope) or manually.
578/// Alternatively, it can be manually stopped and discarded in order to not record its value.
579#[must_use = "Timer should be kept in a variable otherwise it cannot observe duration"]
580#[derive(Debug)]
581pub struct HistogramTimer {
582    /// A histogram for automatic recording of observations.
583    histogram: Histogram,
584    /// Whether the timer has already been observed once.
585    observed: bool,
586    /// Starting instant for the timer.
587    start: Instant,
588}
589
590impl HistogramTimer {
591    fn new(histogram: Histogram) -> Self {
592        Self {
593            histogram,
594            observed: false,
595            start: Instant::now(),
596        }
597    }
598
599    #[cfg(feature = "nightly")]
600    fn new_coarse(histogram: Histogram) -> Self {
601        HistogramTimer {
602            histogram,
603            observed: false,
604            start: Instant::now_coarse(),
605        }
606    }
607
608    /// Observe and record timer duration (in seconds).
609    ///
610    /// It observes the floating-point number of seconds elapsed since the timer
611    /// started, and it records that value to the attached histogram.
612    pub fn observe_duration(self) {
613        self.stop_and_record();
614    }
615
616    /// Observe, record and return timer duration (in seconds).
617    ///
618    /// It observes and returns a floating-point number for seconds elapsed since
619    /// the timer started, recording that value to the attached histogram.
620    pub fn stop_and_record(self) -> f64 {
621        let mut timer = self;
622        timer.observe(true)
623    }
624
625    /// Observe and return timer duration (in seconds).
626    ///
627    /// It returns a floating-point number of seconds elapsed since the timer started,
628    /// without recording to any histogram.
629    pub fn stop_and_discard(self) -> f64 {
630        let mut timer = self;
631        timer.observe(false)
632    }
633
634    fn observe(&mut self, record: bool) -> f64 {
635        let v = self.start.elapsed_sec();
636        self.observed = true;
637        if record {
638            self.histogram.observe(v);
639        }
640        v
641    }
642}
643
644impl Drop for HistogramTimer {
645    fn drop(&mut self) {
646        if !self.observed {
647            self.observe(true);
648        }
649    }
650}
651
652/// A [`Metric`] counts individual observations from an event or sample stream
653/// in configurable buckets. Similar to a [`Summary`](crate::proto::Summary),
654/// it also provides a sum of observations and an observation count.
655///
656/// On the Prometheus server, quantiles can be calculated from a [`Histogram`] using
657/// the [`histogram_quantile`][1] function in the query language.
658///
659/// Note that Histograms, in contrast to Summaries, can be aggregated with the
660/// Prometheus query language (see [the prometheus documentation][2] for
661/// detailed procedures). However, Histograms require the user to pre-define
662/// suitable buckets, (see [`linear_buckets`] and [`exponential_buckets`] for
663/// some helper provided here) and they are in general less accurate. The
664/// Observe method of a [`Histogram`] has a very low performance overhead in
665/// comparison with the Observe method of a Summary.
666///
667/// [1]: https://prometheus.io/docs/prometheus/latest/querying/functions/#histogram_quantile
668/// [2]: https://prometheus.io/docs/practices/histograms/
669#[derive(Clone, Debug)]
670pub struct Histogram {
671    core: Arc<HistogramCore>,
672}
673
674impl Histogram {
675    /// `with_opts` creates a [`Histogram`] with the `opts` options.
676    pub fn with_opts(opts: HistogramOpts) -> Result<Histogram> {
677        Histogram::with_opts_and_label_values(&opts, &[])
678    }
679
680    fn with_opts_and_label_values(
681        opts: &HistogramOpts,
682        label_values: &[&str],
683    ) -> Result<Histogram> {
684        let core = HistogramCore::new(opts, label_values)?;
685
686        Ok(Histogram {
687            core: Arc::new(core),
688        })
689    }
690}
691
692impl Histogram {
693    /// Add a single observation to the [`Histogram`].
694    pub fn observe(&self, v: f64) {
695        self.core.observe(v)
696    }
697
698    /// Return a [`HistogramTimer`] to track a duration.
699    pub fn start_timer(&self) -> HistogramTimer {
700        HistogramTimer::new(self.clone())
701    }
702
703    /// Return a [`HistogramTimer`] to track a duration.
704    /// It is faster but less precise.
705    #[cfg(feature = "nightly")]
706    pub fn start_coarse_timer(&self) -> HistogramTimer {
707        HistogramTimer::new_coarse(self.clone())
708    }
709
710    /// Observe execution time of a closure, in second.
711    pub fn observe_closure_duration<F, T>(&self, f: F) -> T
712    where
713        F: FnOnce() -> T,
714    {
715        let instant = Instant::now();
716        let res = f();
717        let elapsed = instant.elapsed_sec();
718        self.observe(elapsed);
719        res
720    }
721
722    /// Observe execution time of a closure, in second.
723    #[cfg(feature = "nightly")]
724    pub fn observe_closure_duration_coarse<F, T>(&self, f: F) -> T
725    where
726        F: FnOnce() -> T,
727    {
728        let instant = Instant::now_coarse();
729        let res = f();
730        let elapsed = instant.elapsed_sec();
731        self.observe(elapsed);
732        res
733    }
734
735    /// Return a [`LocalHistogram`] for single thread usage.
736    pub fn local(&self) -> LocalHistogram {
737        LocalHistogram::new(self.clone())
738    }
739
740    /// Return accumulated sum of all samples.
741    pub fn get_sample_sum(&self) -> f64 {
742        self.core.sample_sum()
743    }
744
745    /// Return count of all samples.
746    pub fn get_sample_count(&self) -> u64 {
747        self.core.sample_count()
748    }
749}
750
751impl Metric for Histogram {
752    fn metric(&self) -> proto::Metric {
753        let mut m = proto::Metric::default();
754        m.set_label(from_vec!(self.core.label_pairs.clone()));
755
756        let h = self.core.proto();
757        m.set_histogram(h);
758
759        m
760    }
761}
762
763impl Collector for Histogram {
764    fn desc(&self) -> Vec<&Desc> {
765        vec![&self.core.desc]
766    }
767
768    fn collect(&self) -> Vec<proto::MetricFamily> {
769        let mut m = proto::MetricFamily::default();
770        m.set_name(self.core.desc.fq_name.clone());
771        m.set_help(self.core.desc.help.clone());
772        m.set_field_type(proto::MetricType::HISTOGRAM);
773        m.set_metric(from_vec!(vec![self.metric()]));
774
775        vec![m]
776    }
777}
778
779#[derive(Clone, Debug)]
780pub struct HistogramVecBuilder {}
781
782impl MetricVecBuilder for HistogramVecBuilder {
783    type M = Histogram;
784    type P = HistogramOpts;
785
786    fn build(&self, opts: &HistogramOpts, vals: &[&str]) -> Result<Histogram> {
787        Histogram::with_opts_and_label_values(opts, vals)
788    }
789}
790
791/// A [`Collector`] that bundles a set of Histograms that all share the
792/// same [`Desc`], but have different values for their variable labels. This is used
793/// if you want to count the same thing partitioned by various dimensions
794/// (e.g. HTTP request latencies, partitioned by status code and method).
795pub type HistogramVec = MetricVec<HistogramVecBuilder>;
796
797impl HistogramVec {
798    /// Create a new [`HistogramVec`] based on the provided
799    /// [`HistogramOpts`] and partitioned by the given label names. At least
800    /// one label name must be provided.
801    pub fn new(opts: HistogramOpts, label_names: &[&str]) -> Result<HistogramVec> {
802        let variable_names = label_names.iter().map(|s| (*s).to_owned()).collect();
803        let opts = opts.variable_labels(variable_names);
804        let metric_vec =
805            MetricVec::create(proto::MetricType::HISTOGRAM, HistogramVecBuilder {}, opts)?;
806
807        Ok(metric_vec as HistogramVec)
808    }
809
810    /// Return a `LocalHistogramVec` for single thread usage.
811    pub fn local(&self) -> LocalHistogramVec {
812        let vec = self.clone();
813        LocalHistogramVec::new(vec)
814    }
815}
816
817/// Create `count` buckets, each `width` wide, where the lowest
818/// bucket has an upper bound of `start`. The final +Inf bucket is not counted
819/// and not included in the returned slice. The returned slice is meant to be
820/// used for the Buckets field of [`HistogramOpts`].
821///
822/// The function returns an error if `count` is zero or `width` is zero or
823/// negative.
824pub fn linear_buckets(start: f64, width: f64, count: usize) -> Result<Vec<f64>> {
825    if count < 1 {
826        return Err(Error::Msg(format!(
827            "LinearBuckets needs a positive count, count: {}",
828            count
829        )));
830    }
831    if width <= 0.0 {
832        return Err(Error::Msg(format!(
833            "LinearBuckets needs a width greater then 0, width: {}",
834            width
835        )));
836    }
837
838    let buckets: Vec<_> = (0..count)
839        .map(|step| start + width * (step as f64))
840        .collect();
841
842    Ok(buckets)
843}
844
845/// Create `count` buckets, where the lowest bucket has an
846/// upper bound of `start` and each following bucket's upper bound is `factor`
847/// times the previous bucket's upper bound. The final +Inf bucket is not counted
848/// and not included in the returned slice. The returned slice is meant to be
849/// used for the Buckets field of [`HistogramOpts`].
850///
851/// The function returns an error if `count` is zero, if `start` is zero or
852/// negative, or if `factor` is less than or equal 1.
853pub fn exponential_buckets(start: f64, factor: f64, count: usize) -> Result<Vec<f64>> {
854    if count < 1 {
855        return Err(Error::Msg(format!(
856            "exponential_buckets needs a positive count, count: {}",
857            count
858        )));
859    }
860    if start <= 0.0 {
861        return Err(Error::Msg(format!(
862            "exponential_buckets needs a positive start value, \
863             start: {}",
864            start
865        )));
866    }
867    if factor <= 1.0 {
868        return Err(Error::Msg(format!(
869            "exponential_buckets needs a factor greater than 1, \
870             factor: {}",
871            factor
872        )));
873    }
874
875    let mut next = start;
876    let mut buckets = Vec::with_capacity(count);
877    for _ in 0..count {
878        buckets.push(next);
879        next *= factor;
880    }
881
882    Ok(buckets)
883}
884
885/// `duration_to_seconds` converts Duration to seconds.
886#[inline]
887pub fn duration_to_seconds(d: Duration) -> f64 {
888    let nanos = f64::from(d.subsec_nanos()) / 1e9;
889    d.as_secs() as f64 + nanos
890}
891
892#[derive(Clone, Debug)]
893pub struct LocalHistogramCore {
894    histogram: Histogram,
895    counts: Vec<u64>,
896    count: u64,
897    sum: f64,
898}
899
900/// An unsync [`Histogram`].
901#[derive(Debug)]
902pub struct LocalHistogram {
903    core: RefCell<LocalHistogramCore>,
904}
905
906impl Clone for LocalHistogram {
907    fn clone(&self) -> LocalHistogram {
908        let core = self.core.clone();
909        let lh = LocalHistogram { core };
910        lh.clear();
911        lh
912    }
913}
914
915/// An unsync [`HistogramTimer`].
916#[must_use = "Timer should be kept in a variable otherwise it cannot observe duration"]
917#[derive(Debug)]
918pub struct LocalHistogramTimer {
919    /// A local histogram for automatic recording of observations.
920    local: LocalHistogram,
921    /// Whether the timer has already been observed once.
922    observed: bool,
923    /// Starting instant for the timer.
924    start: Instant,
925}
926
927impl LocalHistogramTimer {
928    fn new(histogram: LocalHistogram) -> Self {
929        Self {
930            local: histogram,
931            observed: false,
932            start: Instant::now(),
933        }
934    }
935
936    #[cfg(feature = "nightly")]
937    fn new_coarse(histogram: LocalHistogram) -> Self {
938        Self {
939            local: histogram,
940            observed: false,
941            start: Instant::now_coarse(),
942        }
943    }
944
945    /// Observe and record timer duration (in seconds).
946    ///
947    /// It observes the floating-point number of seconds elapsed since the timer
948    /// started, and it records that value to the attached histogram.
949    pub fn observe_duration(self) {
950        self.stop_and_record();
951    }
952
953    /// Observe, record and return timer duration (in seconds).
954    ///
955    /// It observes and returns a floating-point number for seconds elapsed since
956    /// the timer started, recording that value to the attached histogram.
957    pub fn stop_and_record(self) -> f64 {
958        let mut timer = self;
959        timer.observe(true)
960    }
961
962    /// Observe and return timer duration (in seconds).
963    ///
964    /// It returns a floating-point number of seconds elapsed since the timer started,
965    /// without recording to any histogram.
966    pub fn stop_and_discard(self) -> f64 {
967        let mut timer = self;
968        timer.observe(false)
969    }
970
971    fn observe(&mut self, record: bool) -> f64 {
972        let v = self.start.elapsed_sec();
973        self.observed = true;
974        if record {
975            self.local.observe(v);
976        }
977        v
978    }
979}
980
981impl Drop for LocalHistogramTimer {
982    fn drop(&mut self) {
983        if !self.observed {
984            self.observe(true);
985        }
986    }
987}
988
989impl LocalHistogramCore {
990    fn new(histogram: Histogram) -> LocalHistogramCore {
991        let counts = vec![0; histogram.core.upper_bounds.len()];
992
993        LocalHistogramCore {
994            histogram,
995            counts,
996            count: 0,
997            sum: 0.0,
998        }
999    }
1000
1001    pub fn observe(&mut self, v: f64) {
1002        // Try find the bucket.
1003        let mut iter = self
1004            .histogram
1005            .core
1006            .upper_bounds
1007            .iter()
1008            .enumerate()
1009            .filter(|&(_, f)| v <= *f);
1010        if let Some((i, _)) = iter.next() {
1011            self.counts[i] += 1;
1012        }
1013
1014        self.count += 1;
1015        self.sum += v;
1016    }
1017
1018    pub fn clear(&mut self) {
1019        for v in &mut self.counts {
1020            *v = 0
1021        }
1022
1023        self.count = 0;
1024        self.sum = 0.0;
1025    }
1026
1027    pub fn flush(&mut self) {
1028        // No cached metric, return.
1029        if self.count == 0 {
1030            return;
1031        }
1032
1033        {
1034            // The collect code path uses `self.shard_and_count` and
1035            // `self.shards[x].count` to ensure not to collect data from a shard
1036            // while observe calls are still operating on it.
1037            //
1038            // To ensure the above, this `inc` needs to use `Acquire` ordering
1039            // to force anything below this line to stay below it.
1040            let (shard_index, _count) = self
1041                .histogram
1042                .core
1043                .shard_and_count
1044                .inc_by(self.count, Ordering::Acquire);
1045            let shard = &self.histogram.core.shards[shard_index as usize];
1046
1047            for (i, v) in self.counts.iter().enumerate() {
1048                if *v > 0 {
1049                    shard.buckets[i].inc_by(*v);
1050                }
1051            }
1052
1053            shard.sum.inc_by(self.sum);
1054            // Use `Release` ordering to ensure all operations above stay above.
1055            shard
1056                .count
1057                .inc_by_with_ordering(self.count, Ordering::Release);
1058        }
1059
1060        self.clear()
1061    }
1062
1063    fn sample_sum(&self) -> f64 {
1064        self.sum
1065    }
1066
1067    fn sample_count(&self) -> u64 {
1068        self.count
1069    }
1070}
1071
1072impl LocalHistogram {
1073    fn new(histogram: Histogram) -> LocalHistogram {
1074        let core = LocalHistogramCore::new(histogram);
1075        LocalHistogram {
1076            core: RefCell::new(core),
1077        }
1078    }
1079
1080    /// Add a single observation to the [`Histogram`].
1081    pub fn observe(&self, v: f64) {
1082        self.core.borrow_mut().observe(v);
1083    }
1084
1085    /// Return a `LocalHistogramTimer` to track a duration.
1086    pub fn start_timer(&self) -> LocalHistogramTimer {
1087        LocalHistogramTimer::new(self.clone())
1088    }
1089
1090    /// Return a `LocalHistogramTimer` to track a duration.
1091    /// It is faster but less precise.
1092    #[cfg(feature = "nightly")]
1093    pub fn start_coarse_timer(&self) -> LocalHistogramTimer {
1094        LocalHistogramTimer::new_coarse(self.clone())
1095    }
1096
1097    /// Observe execution time of a closure, in second.
1098    pub fn observe_closure_duration<F, T>(&self, f: F) -> T
1099    where
1100        F: FnOnce() -> T,
1101    {
1102        let instant = Instant::now();
1103        let res = f();
1104        let elapsed = instant.elapsed_sec();
1105        self.observe(elapsed);
1106        res
1107    }
1108
1109    /// Observe execution time of a closure, in second.
1110    #[cfg(feature = "nightly")]
1111    pub fn observe_closure_duration_coarse<F, T>(&self, f: F) -> T
1112    where
1113        F: FnOnce() -> T,
1114    {
1115        let instant = Instant::now_coarse();
1116        let res = f();
1117        let elapsed = instant.elapsed_sec();
1118        self.observe(elapsed);
1119        res
1120    }
1121
1122    /// Clear the local metric.
1123    pub fn clear(&self) {
1124        self.core.borrow_mut().clear();
1125    }
1126
1127    /// Flush the local metrics to the [`Histogram`] metric.
1128    pub fn flush(&self) {
1129        self.core.borrow_mut().flush();
1130    }
1131
1132    /// Return accumulated sum of local samples.
1133    pub fn get_sample_sum(&self) -> f64 {
1134        self.core.borrow().sample_sum()
1135    }
1136
1137    /// Return count of local samples.
1138    pub fn get_sample_count(&self) -> u64 {
1139        self.core.borrow().sample_count()
1140    }
1141}
1142
1143impl LocalMetric for LocalHistogram {
1144    /// Flush the local metrics to the [`Histogram`] metric.
1145    fn flush(&self) {
1146        LocalHistogram::flush(self);
1147    }
1148}
1149
1150impl Drop for LocalHistogram {
1151    fn drop(&mut self) {
1152        self.flush()
1153    }
1154}
1155
1156/// An unsync [`HistogramVec`].
1157#[derive(Debug)]
1158pub struct LocalHistogramVec {
1159    vec: HistogramVec,
1160    local: HashMap<u64, LocalHistogram>,
1161}
1162
1163impl LocalHistogramVec {
1164    fn new(vec: HistogramVec) -> LocalHistogramVec {
1165        let local = HashMap::with_capacity(vec.v.children.read().len());
1166        LocalHistogramVec { vec, local }
1167    }
1168
1169    /// Get a [`LocalHistogram`] by label values.
1170    /// See more [`MetricVec::with_label_values`].
1171    pub fn with_label_values<'a>(&'a mut self, vals: &[&str]) -> &'a LocalHistogram {
1172        let hash = self.vec.v.hash_label_values(vals).unwrap();
1173        let vec = &self.vec;
1174        self.local
1175            .entry(hash)
1176            .or_insert_with(|| vec.with_label_values(vals).local())
1177    }
1178
1179    /// Remove a [`LocalHistogram`] by label values.
1180    /// See more [`MetricVec::remove_label_values`].
1181    pub fn remove_label_values(&mut self, vals: &[&str]) -> Result<()> {
1182        let hash = self.vec.v.hash_label_values(vals)?;
1183        self.local.remove(&hash);
1184        self.vec.v.delete_label_values(vals)
1185    }
1186
1187    /// Flush the local metrics to the [`HistogramVec`] metric.
1188    pub fn flush(&self) {
1189        for h in self.local.values() {
1190            h.flush();
1191        }
1192    }
1193}
1194
1195impl LocalMetric for LocalHistogramVec {
1196    /// Flush the local metrics to the [`HistogramVec`] metric.
1197    fn flush(&self) {
1198        LocalHistogramVec::flush(self)
1199    }
1200}
1201
1202impl Clone for LocalHistogramVec {
1203    fn clone(&self) -> LocalHistogramVec {
1204        LocalHistogramVec::new(self.vec.clone())
1205    }
1206}
1207
1208#[cfg(test)]
1209mod tests {
1210    use std::f64::{EPSILON, INFINITY};
1211    use std::thread;
1212    use std::time::Duration;
1213
1214    use super::*;
1215    use crate::metrics::{Collector, Metric};
1216
1217    #[test]
1218    fn test_histogram() {
1219        let opts = HistogramOpts::new("test1", "test help")
1220            .const_label("a", "1")
1221            .const_label("b", "2");
1222        let histogram = Histogram::with_opts(opts).unwrap();
1223        histogram.observe(1.0);
1224
1225        let timer = histogram.start_timer();
1226        thread::sleep(Duration::from_millis(100));
1227        timer.observe_duration();
1228
1229        let timer = histogram.start_timer();
1230        let handler = thread::spawn(move || {
1231            let _timer = timer;
1232            thread::sleep(Duration::from_millis(400));
1233        });
1234        assert!(handler.join().is_ok());
1235
1236        let mut mfs = histogram.collect();
1237        assert_eq!(mfs.len(), 1);
1238
1239        let mf = mfs.pop().unwrap();
1240        let m = mf.get_metric().get(0).unwrap();
1241        assert_eq!(m.get_label().len(), 2);
1242        let proto_histogram = m.get_histogram();
1243        assert_eq!(proto_histogram.get_sample_count(), 3);
1244        assert!(proto_histogram.get_sample_sum() >= 1.5);
1245        assert_eq!(proto_histogram.get_bucket().len(), DEFAULT_BUCKETS.len());
1246
1247        let buckets = vec![1.0, 2.0, 3.0];
1248        let opts = HistogramOpts::new("test2", "test help").buckets(buckets.clone());
1249        let histogram = Histogram::with_opts(opts).unwrap();
1250        let mut mfs = histogram.collect();
1251        assert_eq!(mfs.len(), 1);
1252
1253        let mf = mfs.pop().unwrap();
1254        let m = mf.get_metric().get(0).unwrap();
1255        assert_eq!(m.get_label().len(), 0);
1256        let proto_histogram = m.get_histogram();
1257        assert_eq!(proto_histogram.get_sample_count(), 0);
1258        assert!((proto_histogram.get_sample_sum() - 0.0) < EPSILON);
1259        assert_eq!(proto_histogram.get_bucket().len(), buckets.len())
1260    }
1261
1262    #[test]
1263    #[cfg(feature = "nightly")]
1264    fn test_histogram_coarse_timer() {
1265        let opts = HistogramOpts::new("test1", "test help");
1266        let histogram = Histogram::with_opts(opts).unwrap();
1267
1268        let timer = histogram.start_coarse_timer();
1269        thread::sleep(Duration::from_millis(100));
1270        timer.observe_duration();
1271
1272        let timer = histogram.start_coarse_timer();
1273        let handler = thread::spawn(move || {
1274            let _timer = timer;
1275            thread::sleep(Duration::from_millis(400));
1276        });
1277        assert!(handler.join().is_ok());
1278
1279        histogram.observe_closure_duration(|| {
1280            thread::sleep(Duration::from_millis(400));
1281        });
1282
1283        let mut mfs = histogram.collect();
1284        assert_eq!(mfs.len(), 1);
1285
1286        let mf = mfs.pop().unwrap();
1287        let m = mf.get_metric().get(0).unwrap();
1288        let proto_histogram = m.get_histogram();
1289        assert_eq!(proto_histogram.get_sample_count(), 3);
1290        assert!((proto_histogram.get_sample_sum() - 0.0) > EPSILON);
1291    }
1292
1293    #[test]
1294    #[cfg(feature = "nightly")]
1295    fn test_instant_on_smp() {
1296        let zero = Duration::from_millis(0);
1297        for i in 0..100_000 {
1298            let now = Instant::now();
1299            let now_coarse = Instant::now_coarse();
1300            if i % 100 == 0 {
1301                thread::yield_now();
1302            }
1303            assert!(now.elapsed() >= zero);
1304            assert!(now_coarse.elapsed() >= zero);
1305        }
1306    }
1307
1308    #[test]
1309    fn test_buckets_invalidation() {
1310        let table = vec![
1311            (vec![], true, DEFAULT_BUCKETS.len()),
1312            (vec![-2.0, -1.0, -0.5, 0.0, 0.5, 1.0, 2.0], true, 7),
1313            (vec![-2.0, -1.0, -0.5, 10.0, 0.5, 1.0, 2.0], false, 7),
1314            (vec![-2.0, -1.0, -0.5, 0.0, 0.5, 1.0, INFINITY], true, 6),
1315        ];
1316
1317        for (buckets, is_ok, length) in table {
1318            let got = check_and_adjust_buckets(buckets);
1319            assert_eq!(got.is_ok(), is_ok);
1320            if is_ok {
1321                assert_eq!(got.unwrap().len(), length);
1322            }
1323        }
1324    }
1325
1326    #[test]
1327    fn test_buckets_functions() {
1328        let linear_table = vec![
1329            (
1330                -15.0,
1331                5.0,
1332                6,
1333                true,
1334                vec![-15.0, -10.0, -5.0, 0.0, 5.0, 10.0],
1335            ),
1336            (-15.0, 0.0, 6, false, vec![]),
1337            (-15.0, 5.0, 0, false, vec![]),
1338        ];
1339
1340        for (param1, param2, param3, is_ok, vec) in linear_table {
1341            let got = linear_buckets(param1, param2, param3);
1342            assert_eq!(got.is_ok(), is_ok);
1343            if got.is_ok() {
1344                assert_eq!(got.unwrap(), vec);
1345            }
1346        }
1347
1348        let exponential_table = vec![
1349            (100.0, 1.2, 3, true, vec![100.0, 120.0, 144.0]),
1350            (100.0, 0.5, 3, false, vec![]),
1351            (100.0, 1.2, 0, false, vec![]),
1352        ];
1353
1354        for (param1, param2, param3, is_ok, vec) in exponential_table {
1355            let got = exponential_buckets(param1, param2, param3);
1356            assert_eq!(got.is_ok(), is_ok);
1357            if got.is_ok() {
1358                assert_eq!(got.unwrap(), vec);
1359            }
1360        }
1361    }
1362
1363    #[test]
1364    fn test_duration_to_seconds() {
1365        let tbls = vec![(1000, 1.0), (1100, 1.1), (100_111, 100.111)];
1366        for (millis, seconds) in tbls {
1367            let d = Duration::from_millis(millis);
1368            let v = duration_to_seconds(d);
1369            assert!((v - seconds).abs() < EPSILON);
1370        }
1371    }
1372
1373    #[test]
1374    fn test_histogram_vec_with_label_values() {
1375        let vec = HistogramVec::new(
1376            HistogramOpts::new("test_histogram_vec", "test histogram vec help"),
1377            &["l1", "l2"],
1378        )
1379        .unwrap();
1380
1381        assert!(vec.remove_label_values(&["v1", "v2"]).is_err());
1382        vec.with_label_values(&["v1", "v2"]).observe(1.0);
1383        assert!(vec.remove_label_values(&["v1", "v2"]).is_ok());
1384
1385        assert!(vec.remove_label_values(&["v1"]).is_err());
1386        assert!(vec.remove_label_values(&["v1", "v3"]).is_err());
1387    }
1388
1389    #[test]
1390    fn test_histogram_vec_with_opts_buckets() {
1391        let labels = ["l1", "l2"];
1392        let buckets = vec![1.0, 2.0, 3.0];
1393        let vec = HistogramVec::new(
1394            HistogramOpts::new("test_histogram_vec", "test histogram vec help")
1395                .buckets(buckets.clone()),
1396            &labels,
1397        )
1398        .unwrap();
1399
1400        let histogram = vec.with_label_values(&["v1", "v2"]);
1401        histogram.observe(1.0);
1402
1403        let m = histogram.metric();
1404        assert_eq!(m.get_label().len(), labels.len());
1405
1406        let proto_histogram = m.get_histogram();
1407        assert_eq!(proto_histogram.get_sample_count(), 1);
1408        assert!((proto_histogram.get_sample_sum() - 1.0) < EPSILON);
1409        assert_eq!(proto_histogram.get_bucket().len(), buckets.len())
1410    }
1411
1412    #[test]
1413    fn test_histogram_local() {
1414        let buckets = vec![1.0, 2.0, 3.0];
1415        let opts = HistogramOpts::new("test_histogram_local", "test histogram local help")
1416            .buckets(buckets.clone());
1417        let histogram = Histogram::with_opts(opts).unwrap();
1418        let local = histogram.local();
1419
1420        let check = |count, sum| {
1421            let m = histogram.metric();
1422            let proto_histogram = m.get_histogram();
1423            assert_eq!(proto_histogram.get_sample_count(), count);
1424            assert!((proto_histogram.get_sample_sum() - sum) < EPSILON);
1425        };
1426
1427        local.observe(1.0);
1428        local.observe(4.0);
1429        check(0, 0.0);
1430
1431        local.flush();
1432        check(2, 5.0);
1433
1434        local.observe(2.0);
1435        local.clear();
1436        check(2, 5.0);
1437
1438        local.observe(2.0);
1439        drop(local);
1440        check(3, 7.0);
1441    }
1442
1443    #[test]
1444    fn test_histogram_vec_local() {
1445        let vec = HistogramVec::new(
1446            HistogramOpts::new("test_histogram_vec_local", "test histogram vec help"),
1447            &["l1", "l2"],
1448        )
1449        .unwrap();
1450        let mut local_vec = vec.local();
1451
1452        vec.remove_label_values(&["v1", "v2"]).unwrap_err();
1453        local_vec.remove_label_values(&["v1", "v2"]).unwrap_err();
1454
1455        let check = |count, sum| {
1456            let ms = vec.collect()[0].take_metric();
1457            let proto_histogram = ms[0].get_histogram();
1458            assert_eq!(proto_histogram.get_sample_count(), count);
1459            assert!((proto_histogram.get_sample_sum() - sum) < EPSILON);
1460        };
1461
1462        {
1463            // Flush LocalHistogram
1464            let h = local_vec.with_label_values(&["v1", "v2"]);
1465            h.observe(1.0);
1466            h.flush();
1467            check(1, 1.0);
1468        }
1469
1470        {
1471            // Flush LocalHistogramVec
1472            local_vec.with_label_values(&["v1", "v2"]).observe(4.0);
1473            local_vec.flush();
1474            check(2, 5.0);
1475        }
1476        {
1477            // Reset ["v1", "v2"]
1478            local_vec.remove_label_values(&["v1", "v2"]).unwrap();
1479
1480            // Flush on drop
1481            local_vec.with_label_values(&["v1", "v2"]).observe(2.0);
1482            drop(local_vec);
1483            check(1, 2.0);
1484        }
1485    }
1486
1487    /// Ensure that when an observe and a collect operation interleave, the
1488    /// latter does not expose a snapshot of the histogram that does not uphold
1489    /// all histogram invariants.
1490    #[test]
1491    fn atomic_observe_across_collects() {
1492        let done = Arc::new(std::sync::atomic::AtomicBool::default());
1493        let histogram =
1494            Histogram::with_opts(HistogramOpts::new("test_name", "test help").buckets(vec![1.0]))
1495                .unwrap();
1496
1497        let done_clone = done.clone();
1498        let histogram_clone = histogram.clone();
1499        let observing_thread = std::thread::spawn(move || loop {
1500            if done_clone.load(std::sync::atomic::Ordering::Relaxed) {
1501                break;
1502            }
1503
1504            for _ in 0..1_000_000 {
1505                histogram_clone.observe(1.0);
1506            }
1507        });
1508
1509        let mut sample_count = 0;
1510        let mut cumulative_count = 0;
1511        let mut sample_sum = 0;
1512        for _ in 0..1_000_000 {
1513            let metric = &histogram.collect()[0].take_metric()[0];
1514            let proto = metric.get_histogram();
1515
1516            sample_count = proto.get_sample_count();
1517            sample_sum = proto.get_sample_sum() as u64;
1518            // There is only one bucket thus the `[0]`.
1519            cumulative_count = proto.get_bucket()[0].get_cumulative_count();
1520
1521            if sample_count != cumulative_count {
1522                break;
1523            }
1524
1525            // Observation value is always `1.0` thus count and sum should
1526            // always equal. The number of `observe` calls is limited to
1527            // 1_000_000, thus the sum is limited to 1_000_000. A float 64 is
1528            // able to represent the sum accurately up to 9_007_199_254_740_992.
1529            if sample_count != sample_sum {
1530                break;
1531            }
1532        }
1533
1534        done.store(true, std::sync::atomic::Ordering::Relaxed);
1535        observing_thread.join().unwrap();
1536
1537        if sample_count != cumulative_count {
1538            panic!(
1539                "Histogram invariant violated: For a histogram with a single \
1540                 bucket observing values below the bucket's upper bound only \
1541                 the histogram's count should always be equal to the buckets's \
1542                 cumulative count, got {:?} and {:?} instead.",
1543                sample_count, cumulative_count,
1544            );
1545        }
1546
1547        if sample_count != sample_sum {
1548            panic!(
1549                "Histogram invariant violated: For a histogram which is only \
1550                 ever observing a value of `1.0` the sample count should equal \
1551                 the sum, instead got: {:?} and {:?}",
1552                sample_count, sample_sum,
1553            )
1554        }
1555    }
1556
1557    #[test]
1558    fn test_error_on_inconsistent_label_cardinality() {
1559        let hist = Histogram::with_opts(
1560            histogram_opts!(
1561                "example_histogram",
1562                "Used as an example",
1563                vec![0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 5.0]
1564            )
1565            .variable_label("example_variable"),
1566        );
1567
1568        if let Err(Error::InconsistentCardinality { expect, got }) = hist {
1569            assert_eq!(1, expect);
1570            assert_eq!(0, got);
1571        } else {
1572            panic!("Expected InconsistentCardinality error.")
1573        }
1574    }
1575}