mz_ore/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Metrics for materialize systems.
17//!
18//! The idea here is that each subsystem keeps its metrics in a scoped-to-it struct, which gets
19//! registered (once) to the server's (or a test's) prometheus registry.
20//!
21//! Instead of using prometheus's (very verbose) metrics definitions, we rely on type inference to
22//! reduce the verbosity a little bit. A typical subsystem will look like the following:
23//!
24//! ```rust
25//! # use mz_ore::metrics::{MetricsRegistry, IntCounter};
26//! # use mz_ore::metric;
27//! #[derive(Debug, Clone)] // Note that prometheus metrics can safely be cloned
28//! struct Metrics {
29//!     pub bytes_sent: IntCounter,
30//! }
31//!
32//! impl Metrics {
33//!     pub fn register_into(registry: &MetricsRegistry) -> Metrics {
34//!         Metrics {
35//!             bytes_sent: registry.register(metric!(
36//!                 name: "mz_pg_sent_bytes",
37//!                 help: "total number of bytes sent here",
38//!             )),
39//!         }
40//!     }
41//! }
42//! ```
43
44use std::fmt;
45use std::fmt::{Debug, Formatter};
46use std::future::Future;
47use std::pin::Pin;
48use std::sync::{Arc, Mutex};
49use std::task::{Context, Poll};
50use std::time::{Duration, Instant};
51
52use derivative::Derivative;
53use pin_project::pin_project;
54use prometheus::core::{
55    Atomic, AtomicF64, AtomicI64, AtomicU64, Collector, Desc, GenericCounter, GenericCounterVec,
56    GenericGauge, GenericGaugeVec,
57};
58use prometheus::proto::MetricFamily;
59use prometheus::{HistogramOpts, Registry};
60
61mod delete_on_drop;
62
63pub use delete_on_drop::*;
64pub use prometheus::Opts as PrometheusOpts;
65
66/// Define a metric for use in materialize.
67#[macro_export]
68macro_rules! metric {
69    (
70        name: $name:expr,
71        help: $help:expr
72        $(, subsystem: $subsystem_name:expr)?
73        $(, const_labels: { $($cl_key:expr => $cl_value:expr ),* })?
74        $(, var_labels: [ $($vl_name:expr),* ])?
75        $(, buckets: $bk_name:expr)?
76        $(,)?
77    ) => {{
78        let const_labels = (&[
79            $($(
80                ($cl_key.to_string(), $cl_value.to_string()),
81            )*)?
82        ]).into_iter().cloned().collect();
83        let var_labels = vec![
84            $(
85                $($vl_name.into(),)*
86            )?];
87        #[allow(unused_mut)]
88        let mut mk_opts = $crate::metrics::MakeCollectorOpts {
89            opts: $crate::metrics::PrometheusOpts::new($name, $help)
90                $(.subsystem( $subsystem_name ))?
91                .const_labels(const_labels)
92                .variable_labels(var_labels),
93            buckets: None,
94        };
95        // Set buckets if passed
96        $(mk_opts.buckets = Some($bk_name);)*
97        mk_opts
98    }}
99}
100
101/// Options for MakeCollector. This struct should be instantiated using the metric macro.
102#[derive(Debug, Clone)]
103pub struct MakeCollectorOpts {
104    /// Common Prometheus options
105    pub opts: PrometheusOpts,
106    /// Buckets to be used with Histogram and HistogramVec. Must be set to create Histogram types
107    /// and must not be set for other types.
108    pub buckets: Option<Vec<f64>>,
109}
110
111/// The materialize metrics registry.
112#[derive(Clone, Derivative)]
113#[derivative(Debug)]
114pub struct MetricsRegistry {
115    inner: Registry,
116    #[derivative(Debug = "ignore")]
117    postprocessors: Arc<Mutex<Vec<Box<dyn FnMut(&mut Vec<MetricFamily>) + Send + Sync>>>>,
118}
119
120/// A wrapper for metrics to require delete on drop semantics
121///
122/// The wrapper behaves like regular metrics but only provides functions to create delete-on-drop
123/// variants. This way, no metrics of this type can be leaked.
124///
125/// In situations where the delete-on-drop behavior is not desired or in legacy code, use the raw
126/// variants of the metrics, as defined in [self::raw].
127#[derive(Clone)]
128pub struct DeleteOnDropWrapper<M> {
129    inner: M,
130}
131
132impl<M: MakeCollector + Debug> Debug for DeleteOnDropWrapper<M> {
133    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
134        self.inner.fmt(f)
135    }
136}
137
138impl<M: Collector> Collector for DeleteOnDropWrapper<M> {
139    fn desc(&self) -> Vec<&Desc> {
140        self.inner.desc()
141    }
142
143    fn collect(&self) -> Vec<MetricFamily> {
144        self.inner.collect()
145    }
146}
147
148impl<M: MakeCollector> MakeCollector for DeleteOnDropWrapper<M> {
149    fn make_collector(opts: MakeCollectorOpts) -> Self {
150        DeleteOnDropWrapper {
151            inner: M::make_collector(opts),
152        }
153    }
154}
155
156impl<M: MetricVecExt> DeleteOnDropWrapper<M> {
157    /// Returns a metric that deletes its labels from this metrics vector when dropped.
158    pub fn get_delete_on_drop_metric<L: PromLabelsExt>(
159        &self,
160        labels: L,
161    ) -> DeleteOnDropMetric<M, L> {
162        self.inner.get_delete_on_drop_metric(labels)
163    }
164}
165
166/// The unsigned integer version of [`Gauge`]. Provides better performance if
167/// metric values are all unsigned integers.
168pub type UIntGauge = GenericGauge<AtomicU64>;
169
170/// Delete-on-drop shadow of Prometheus [prometheus::CounterVec].
171pub type CounterVec = DeleteOnDropWrapper<prometheus::CounterVec>;
172/// Delete-on-drop shadow of Prometheus [prometheus::Gauge].
173pub type Gauge = DeleteOnDropWrapper<prometheus::Gauge>;
174/// Delete-on-drop shadow of Prometheus [prometheus::GaugeVec].
175pub type GaugeVec = DeleteOnDropWrapper<prometheus::GaugeVec>;
176/// Delete-on-drop shadow of Prometheus [prometheus::HistogramVec].
177pub type HistogramVec = DeleteOnDropWrapper<prometheus::HistogramVec>;
178/// Delete-on-drop shadow of Prometheus [prometheus::IntCounterVec].
179pub type IntCounterVec = DeleteOnDropWrapper<prometheus::IntCounterVec>;
180/// Delete-on-drop shadow of Prometheus [prometheus::IntGaugeVec].
181pub type IntGaugeVec = DeleteOnDropWrapper<prometheus::IntGaugeVec>;
182/// Delete-on-drop shadow of Prometheus [raw::UIntGaugeVec].
183pub type UIntGaugeVec = DeleteOnDropWrapper<raw::UIntGaugeVec>;
184
185use crate::assert_none;
186
187pub use prometheus::{Counter, Histogram, IntCounter, IntGauge};
188
189/// Access to non-delete-on-drop vector types
190pub mod raw {
191    use prometheus::core::{AtomicU64, GenericGaugeVec};
192
193    /// The unsigned integer version of [`GaugeVec`].
194    /// Provides better performance if metric values are all unsigned integers.
195    pub type UIntGaugeVec = GenericGaugeVec<AtomicU64>;
196
197    pub use prometheus::{CounterVec, Gauge, GaugeVec, HistogramVec, IntCounterVec, IntGaugeVec};
198}
199
200impl MetricsRegistry {
201    /// Creates a new metrics registry.
202    pub fn new() -> Self {
203        MetricsRegistry {
204            inner: Registry::new(),
205            postprocessors: Arc::new(Mutex::new(vec![])),
206        }
207    }
208
209    /// Register a metric defined with the [`metric`] macro.
210    pub fn register<M>(&self, opts: MakeCollectorOpts) -> M
211    where
212        M: MakeCollector,
213    {
214        let collector = M::make_collector(opts);
215        self.inner.register(Box::new(collector.clone())).unwrap();
216        collector
217    }
218
219    /// Registers a gauge whose value is computed when observed.
220    pub fn register_computed_gauge<P>(
221        &self,
222        opts: MakeCollectorOpts,
223        f: impl Fn() -> P::T + Send + Sync + 'static,
224    ) -> ComputedGenericGauge<P>
225    where
226        P: Atomic + 'static,
227    {
228        let gauge = ComputedGenericGauge {
229            gauge: GenericGauge::make_collector(opts),
230            f: Arc::new(f),
231        };
232        self.inner.register(Box::new(gauge.clone())).unwrap();
233        gauge
234    }
235
236    /// Register a pre-defined prometheus collector.
237    pub fn register_collector<C: 'static + prometheus::core::Collector>(&self, collector: C) {
238        self.inner
239            .register(Box::new(collector))
240            .expect("registering pre-defined metrics collector");
241    }
242
243    /// Registers a metric postprocessor.
244    ///
245    /// Postprocessors are invoked on every call to [`MetricsRegistry::gather`]
246    /// in the order that they are registered.
247    pub fn register_postprocessor<F>(&self, f: F)
248    where
249        F: FnMut(&mut Vec<MetricFamily>) + Send + Sync + 'static,
250    {
251        let mut postprocessors = self.postprocessors.lock().expect("lock poisoned");
252        postprocessors.push(Box::new(f));
253    }
254
255    /// Gather all the metrics from the metrics registry for reporting.
256    ///
257    /// This function invokes the postprocessors on all gathered metrics (see
258    /// [`MetricsRegistry::register_postprocessor`]) in the order the
259    /// postprocessors were registered.
260    ///
261    /// See also [`prometheus::Registry::gather`].
262    pub fn gather(&self) -> Vec<MetricFamily> {
263        let mut metrics = self.inner.gather();
264        let mut postprocessors = self.postprocessors.lock().expect("lock poisoned");
265        for postprocessor in &mut *postprocessors {
266            postprocessor(&mut metrics);
267        }
268        metrics
269    }
270}
271
272/// A wrapper for creating prometheus metrics more conveniently.
273///
274/// Together with the [`metric`] macro, this trait is mainly used by [`MetricsRegistry`] and should
275/// not normally be used outside the metric registration flow.
276pub trait MakeCollector: Collector + Clone + 'static {
277    /// Creates a new collector.
278    fn make_collector(opts: MakeCollectorOpts) -> Self;
279}
280
281impl<T> MakeCollector for GenericCounter<T>
282where
283    T: Atomic + 'static,
284{
285    fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
286        assert_none!(mk_opts.buckets);
287        Self::with_opts(mk_opts.opts).expect("defining a counter")
288    }
289}
290
291impl<T> MakeCollector for GenericCounterVec<T>
292where
293    T: Atomic + 'static,
294{
295    fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
296        assert_none!(mk_opts.buckets);
297        let labels: Vec<String> = mk_opts.opts.variable_labels.clone();
298        let label_refs: Vec<&str> = labels.iter().map(String::as_str).collect();
299        Self::new(mk_opts.opts, label_refs.as_slice()).expect("defining a counter vec")
300    }
301}
302
303impl<T> MakeCollector for GenericGauge<T>
304where
305    T: Atomic + 'static,
306{
307    fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
308        assert_none!(mk_opts.buckets);
309        Self::with_opts(mk_opts.opts).expect("defining a gauge")
310    }
311}
312
313impl<T> MakeCollector for GenericGaugeVec<T>
314where
315    T: Atomic + 'static,
316{
317    fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
318        assert_none!(mk_opts.buckets);
319        let labels = mk_opts.opts.variable_labels.clone();
320        let labels = &labels.iter().map(|x| x.as_str()).collect::<Vec<_>>();
321        Self::new(mk_opts.opts, labels).expect("defining a gauge vec")
322    }
323}
324
325impl MakeCollector for Histogram {
326    fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
327        assert!(mk_opts.buckets.is_some());
328        Self::with_opts(HistogramOpts {
329            common_opts: mk_opts.opts,
330            buckets: mk_opts.buckets.unwrap(),
331        })
332        .expect("defining a histogram")
333    }
334}
335
336impl MakeCollector for raw::HistogramVec {
337    fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
338        assert!(mk_opts.buckets.is_some());
339        let labels = mk_opts.opts.variable_labels.clone();
340        let labels = &labels.iter().map(|x| x.as_str()).collect::<Vec<_>>();
341        Self::new(
342            HistogramOpts {
343                common_opts: mk_opts.opts,
344                buckets: mk_opts.buckets.unwrap(),
345            },
346            labels,
347        )
348        .expect("defining a histogram vec")
349    }
350}
351
352/// A [`Gauge`] whose value is computed whenever it is observed.
353pub struct ComputedGenericGauge<P>
354where
355    P: Atomic,
356{
357    gauge: GenericGauge<P>,
358    f: Arc<dyn Fn() -> P::T + Send + Sync>,
359}
360
361impl<P> fmt::Debug for ComputedGenericGauge<P>
362where
363    P: Atomic + fmt::Debug,
364{
365    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
366        f.debug_struct("ComputedGenericGauge")
367            .field("gauge", &self.gauge)
368            .finish_non_exhaustive()
369    }
370}
371
372impl<P> Clone for ComputedGenericGauge<P>
373where
374    P: Atomic,
375{
376    fn clone(&self) -> ComputedGenericGauge<P> {
377        ComputedGenericGauge {
378            gauge: self.gauge.clone(),
379            f: Arc::clone(&self.f),
380        }
381    }
382}
383
384impl<T> Collector for ComputedGenericGauge<T>
385where
386    T: Atomic,
387{
388    fn desc(&self) -> Vec<&prometheus::core::Desc> {
389        self.gauge.desc()
390    }
391
392    fn collect(&self) -> Vec<MetricFamily> {
393        self.gauge.set((self.f)());
394        self.gauge.collect()
395    }
396}
397
398impl<P> ComputedGenericGauge<P>
399where
400    P: Atomic,
401{
402    /// Computes the current value of the gauge.
403    pub fn get(&self) -> P::T {
404        (self.f)()
405    }
406}
407
408/// A [`ComputedGenericGauge`] for 64-bit floating point numbers.
409pub type ComputedGauge = ComputedGenericGauge<AtomicF64>;
410
411/// A [`ComputedGenericGauge`] for 64-bit signed integers.
412pub type ComputedIntGauge = ComputedGenericGauge<AtomicI64>;
413
414/// A [`ComputedGenericGauge`] for 64-bit unsigned integers.
415pub type ComputedUIntGauge = ComputedGenericGauge<AtomicU64>;
416
417/// Exposes combinators that report metrics related to the execution of a [`Future`] to prometheus.
418pub trait MetricsFutureExt<F> {
419    /// Records the number of seconds it takes a [`Future`] to complete according to "the clock on
420    /// the wall".
421    ///
422    /// More specifically, it records the instant at which the `Future` was first polled, and the
423    /// instant at which the `Future` completes. Then reports the duration between those two
424    /// instances to the provided metric.
425    ///
426    /// # Wall Time vs Execution Time
427    ///
428    /// There is also [`MetricsFutureExt::exec_time`], which measures how long a [`Future`] spent
429    /// executing, instead of how long it took to complete. For example, a network request may have
430    /// a wall time of 1 second, meanwhile it's execution time may have only been 50ms. The 950ms
431    /// delta would be how long the [`Future`] waited for a response from the network.
432    ///
433    /// # Uses
434    ///
435    /// Recording the wall time can be useful for monitoring latency, for example the latency of a
436    /// SQL request.
437    ///
438    /// Note: You must call either [`observe`] to record the execution time to a [`Histogram`] or
439    /// [`inc_by`] to record to a [`Counter`]. The following will not compile:
440    ///
441    /// ```compile_fail
442    /// use mz_ore::metrics::MetricsFutureExt;
443    ///
444    /// # let _ = async {
445    /// async { Ok(()) }
446    ///     .wall_time()
447    ///     .await;
448    /// # };
449    /// ```
450    ///
451    /// [`observe`]: WallTimeFuture::observe
452    /// [`inc_by`]: WallTimeFuture::inc_by
453    fn wall_time(self) -> WallTimeFuture<F, UnspecifiedMetric>;
454
455    /// Records the total number of seconds for which a [`Future`] was executing.
456    ///
457    /// More specifically, every time the `Future` is polled it records how long that individual
458    /// call took, and maintains a running sum until the `Future` completes. Then we report that
459    /// duration to the provided metric.
460    ///
461    /// # Wall Time vs Execution Time
462    ///
463    /// There is also [`MetricsFutureExt::wall_time`], which measures how long a [`Future`] took to
464    /// complete, instead of how long it spent executing. For example, a network request may have
465    /// a wall time of 1 second, meanwhile it's execution time may have only been 50ms. The 950ms
466    /// delta would be how long the [`Future`] waited for a response from the network.
467    ///
468    /// # Uses
469    ///
470    /// Recording execution time can be useful if you want to monitor [`Future`]s that could be
471    /// sensitive to CPU usage. For example, if you have a single logical control thread you'll
472    /// want to make sure that thread never spends too long running a single `Future`. Reporting
473    /// the execution time of `Future`s running on this thread can help ensure there is no
474    /// unexpected blocking.
475    ///
476    /// Note: You must call either [`observe`] to record the execution time to a [`Histogram`] or
477    /// [`inc_by`] to record to a [`Counter`]. The following will not compile:
478    ///
479    /// ```compile_fail
480    /// use mz_ore::metrics::MetricsFutureExt;
481    ///
482    /// # let _ = async {
483    /// async { Ok(()) }
484    ///     .exec_time()
485    ///     .await;
486    /// # };
487    /// ```
488    ///
489    /// [`observe`]: ExecTimeFuture::observe
490    /// [`inc_by`]: ExecTimeFuture::inc_by
491    fn exec_time(self) -> ExecTimeFuture<F, UnspecifiedMetric>;
492}
493
494impl<F: Future> MetricsFutureExt<F> for F {
495    fn wall_time(self) -> WallTimeFuture<F, UnspecifiedMetric> {
496        WallTimeFuture {
497            fut: self,
498            metric: UnspecifiedMetric(()),
499            start: None,
500            filter: None,
501        }
502    }
503
504    fn exec_time(self) -> ExecTimeFuture<F, UnspecifiedMetric> {
505        ExecTimeFuture {
506            fut: self,
507            metric: UnspecifiedMetric(()),
508            running_duration: Duration::from_millis(0),
509            filter: None,
510        }
511    }
512}
513
514/// Future returned by [`MetricsFutureExt::wall_time`].
515#[must_use = "futures do nothing unless you `.await` or poll them"]
516#[pin_project]
517pub struct WallTimeFuture<F, Metric> {
518    /// The inner [`Future`] that we're recording the wall time for.
519    #[pin]
520    fut: F,
521    /// Prometheus metric that we'll report to.
522    metric: Metric,
523    /// [`Instant`] at which the [`Future`] was first polled.
524    start: Option<Instant>,
525    /// Optional filter that determines if we observe the wall time of this [`Future`].
526    filter: Option<Box<dyn FnMut(Duration) -> bool + Send + Sync>>,
527}
528
529impl<F: Debug, M: Debug> fmt::Debug for WallTimeFuture<F, M> {
530    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
531        f.debug_struct("WallTimeFuture")
532            .field("fut", &self.fut)
533            .field("metric", &self.metric)
534            .field("start", &self.start)
535            .field("filter", &self.filter.is_some())
536            .finish()
537    }
538}
539
540impl<F> WallTimeFuture<F, UnspecifiedMetric> {
541    /// Sets the recored metric to be a [`prometheus::Histogram`].
542    ///
543    /// ```text
544    /// my_future
545    ///     .wall_time()
546    ///     .observe(metrics.slow_queries_hist.with_label_values(&["select"]))
547    /// ```
548    pub fn observe(
549        self,
550        histogram: prometheus::Histogram,
551    ) -> WallTimeFuture<F, prometheus::Histogram> {
552        WallTimeFuture {
553            fut: self.fut,
554            metric: histogram,
555            start: self.start,
556            filter: self.filter,
557        }
558    }
559
560    /// Sets the recored metric to be a [`prometheus::Counter`].
561    ///
562    /// ```text
563    /// my_future
564    ///     .wall_time()
565    ///     .inc_by(metrics.slow_queries.with_label_values(&["select"]))
566    /// ```
567    pub fn inc_by(self, counter: prometheus::Counter) -> WallTimeFuture<F, prometheus::Counter> {
568        WallTimeFuture {
569            fut: self.fut,
570            metric: counter,
571            start: self.start,
572            filter: self.filter,
573        }
574    }
575
576    /// Sets the recorded duration in a specific f64.
577    pub fn set_at(self, place: &mut f64) -> WallTimeFuture<F, &mut f64> {
578        WallTimeFuture {
579            fut: self.fut,
580            metric: place,
581            start: self.start,
582            filter: self.filter,
583        }
584    }
585}
586
587impl<F, M> WallTimeFuture<F, M> {
588    /// Specifies a filter which much return `true` for the wall time to be recorded.
589    ///
590    /// This can be particularly useful if you have a high volume `Future` and you only want to
591    /// record ones that take a long time to complete.
592    pub fn with_filter(
593        mut self,
594        filter: impl FnMut(Duration) -> bool + Send + Sync + 'static,
595    ) -> Self {
596        self.filter = Some(Box::new(filter));
597        self
598    }
599}
600
601impl<F: Future, M: DurationMetric> Future for WallTimeFuture<F, M> {
602    type Output = F::Output;
603
604    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
605        let this = self.project();
606
607        if this.start.is_none() {
608            *this.start = Some(Instant::now());
609        }
610
611        let result = match this.fut.poll(cx) {
612            Poll::Ready(r) => r,
613            Poll::Pending => return Poll::Pending,
614        };
615        let duration = Instant::now().duration_since(this.start.expect("timer to be started"));
616
617        let pass = this
618            .filter
619            .as_mut()
620            .map(|filter| filter(duration))
621            .unwrap_or(true);
622        if pass {
623            this.metric.record(duration.as_secs_f64())
624        }
625
626        Poll::Ready(result)
627    }
628}
629
630/// Future returned by [`MetricsFutureExt::exec_time`].
631#[must_use = "futures do nothing unless you `.await` or poll them"]
632#[pin_project]
633pub struct ExecTimeFuture<F, Metric> {
634    /// The inner [`Future`] that we're recording the wall time for.
635    #[pin]
636    fut: F,
637    /// Prometheus metric that we'll report to.
638    metric: Metric,
639    /// Total [`Duration`] for which this [`Future`] has been executing.
640    running_duration: Duration,
641    /// Optional filter that determines if we observe the execution time of this [`Future`].
642    filter: Option<Box<dyn FnMut(Duration) -> bool + Send + Sync>>,
643}
644
645impl<F: Debug, M: Debug> fmt::Debug for ExecTimeFuture<F, M> {
646    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
647        f.debug_struct("ExecTimeFuture")
648            .field("fut", &self.fut)
649            .field("metric", &self.metric)
650            .field("running_duration", &self.running_duration)
651            .field("filter", &self.filter.is_some())
652            .finish()
653    }
654}
655
656impl<F> ExecTimeFuture<F, UnspecifiedMetric> {
657    /// Sets the recored metric to be a [`prometheus::Histogram`].
658    ///
659    /// ```text
660    /// my_future
661    ///     .exec_time()
662    ///     .observe(metrics.slow_queries_hist.with_label_values(&["select"]))
663    /// ```
664    pub fn observe(
665        self,
666        histogram: prometheus::Histogram,
667    ) -> ExecTimeFuture<F, prometheus::Histogram> {
668        ExecTimeFuture {
669            fut: self.fut,
670            metric: histogram,
671            running_duration: self.running_duration,
672            filter: self.filter,
673        }
674    }
675
676    /// Sets the recored metric to be a [`prometheus::Counter`].
677    ///
678    /// ```text
679    /// my_future
680    ///     .exec_time()
681    ///     .inc_by(metrics.slow_queries.with_label_values(&["select"]))
682    /// ```
683    pub fn inc_by(self, counter: prometheus::Counter) -> ExecTimeFuture<F, prometheus::Counter> {
684        ExecTimeFuture {
685            fut: self.fut,
686            metric: counter,
687            running_duration: self.running_duration,
688            filter: self.filter,
689        }
690    }
691}
692
693impl<F, M> ExecTimeFuture<F, M> {
694    /// Specifies a filter which much return `true` for the execution time to be recorded.
695    pub fn with_filter(
696        mut self,
697        filter: impl FnMut(Duration) -> bool + Send + Sync + 'static,
698    ) -> Self {
699        self.filter = Some(Box::new(filter));
700        self
701    }
702}
703
704impl<F: Future, M: DurationMetric> Future for ExecTimeFuture<F, M> {
705    type Output = F::Output;
706
707    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
708        let this = self.project();
709
710        let start = Instant::now();
711        let result = this.fut.poll(cx);
712        let duration = Instant::now().duration_since(start);
713
714        *this.running_duration = this.running_duration.saturating_add(duration);
715
716        let result = match result {
717            Poll::Ready(result) => result,
718            Poll::Pending => return Poll::Pending,
719        };
720
721        let duration = *this.running_duration;
722        let pass = this
723            .filter
724            .as_mut()
725            .map(|filter| filter(duration))
726            .unwrap_or(true);
727        if pass {
728            this.metric.record(duration.as_secs_f64());
729        }
730
731        Poll::Ready(result)
732    }
733}
734
735/// A type level flag used to ensure callers specify the kind of metric to record for
736/// [`MetricsFutureExt`].
737///
738/// For example, `WallTimeFuture<F, M>` only implements [`Future`] for `M` that implements
739/// `DurationMetric` which [`UnspecifiedMetric`] does not. This forces users at build time to
740/// call [`WallTimeFuture::observe`] or [`WallTimeFuture::inc_by`].
741#[derive(Debug)]
742pub struct UnspecifiedMetric(());
743
744/// A trait makes recording a duration generic over different prometheus metrics. This allows us to
745/// de-dupe the implemenation of [`Future`] for our wrapper Futures like [`WallTimeFuture`] and
746/// [`ExecTimeFuture`] over different kinds of prometheus metrics.
747trait DurationMetric {
748    fn record(&mut self, seconds: f64);
749}
750
751impl DurationMetric for prometheus::Histogram {
752    fn record(&mut self, seconds: f64) {
753        self.observe(seconds)
754    }
755}
756
757impl DurationMetric for prometheus::Counter {
758    fn record(&mut self, seconds: f64) {
759        self.inc_by(seconds)
760    }
761}
762
763// An implementation of `DurationMetric` that lets the user take the recorded
764// value and use it elsewhere.
765impl DurationMetric for &'_ mut f64 {
766    fn record(&mut self, seconds: f64) {
767        **self = seconds;
768    }
769}
770
771/// Register the Tokio runtime's metrics in our metrics registry.
772#[cfg(feature = "async")]
773pub fn register_runtime_metrics(
774    name: &'static str,
775    runtime_metrics: tokio::runtime::RuntimeMetrics,
776    registry: &MetricsRegistry,
777) {
778    macro_rules! register {
779        ($method:ident, $doc:literal) => {
780            let metrics = runtime_metrics.clone();
781            registry.register_computed_gauge::<prometheus::core::AtomicU64>(
782                crate::metric!(
783                    name: concat!("mz_tokio_", stringify!($method)),
784                    help: $doc,
785                    const_labels: {"runtime" => name},
786                ),
787                move || <u64 as crate::cast::CastFrom<_>>::cast_from(metrics.$method()),
788            );
789        };
790    }
791
792    register!(
793        num_workers,
794        "The number of worker threads used by the runtime."
795    );
796    register!(
797        num_alive_tasks,
798        "The current number of alive tasks in the runtime."
799    );
800    register!(
801        global_queue_depth,
802        "The number of tasks currently scheduled in the runtime's global queue."
803    );
804    #[cfg(tokio_unstable)]
805    {
806        register!(
807            num_blocking_threads,
808            "The number of additional threads spawned by the runtime."
809        );
810        register!(
811            num_idle_blocking_threads,
812            "The number of idle threads which have spawned by the runtime for spawn_blocking calls."
813        );
814        register!(
815            spawned_tasks_count,
816            "The number of tasks spawned in this runtime since it was created."
817        );
818        register!(
819            remote_schedule_count,
820            "The number of tasks scheduled from outside of the runtime."
821        );
822        register!(
823            budget_forced_yield_count,
824            "The number of times that tasks have been forced to yield back to the scheduler after exhausting their task budgets."
825        );
826        register!(
827            blocking_queue_depth,
828            "The number of tasks currently scheduled in the blocking thread pool, spawned using spawn_blocking."
829        );
830    }
831}
832
833#[cfg(test)]
834mod tests {
835    use std::time::Duration;
836
837    use prometheus::{CounterVec, HistogramVec};
838
839    use crate::stats::histogram_seconds_buckets;
840
841    use super::{MetricsFutureExt, MetricsRegistry};
842
843    struct Metrics {
844        pub wall_time_hist: HistogramVec,
845        pub wall_time_cnt: CounterVec,
846        pub exec_time_hist: HistogramVec,
847        pub exec_time_cnt: CounterVec,
848    }
849
850    impl Metrics {
851        pub fn register_into(registry: &MetricsRegistry) -> Self {
852            Self {
853                wall_time_hist: registry.register(metric!(
854                    name: "wall_time_hist",
855                    help: "help",
856                    var_labels: ["action"],
857                    buckets: histogram_seconds_buckets(0.000_128, 8.0),
858                )),
859                wall_time_cnt: registry.register(metric!(
860                    name: "wall_time_cnt",
861                    help: "help",
862                    var_labels: ["action"],
863                )),
864                exec_time_hist: registry.register(metric!(
865                    name: "exec_time_hist",
866                    help: "help",
867                    var_labels: ["action"],
868                    buckets: histogram_seconds_buckets(0.000_128, 8.0),
869                )),
870                exec_time_cnt: registry.register(metric!(
871                    name: "exec_time_cnt",
872                    help: "help",
873                    var_labels: ["action"],
874                )),
875            }
876        }
877    }
878
879    #[crate::test]
880    #[cfg_attr(miri, ignore)] // unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance`
881    fn smoke_test_metrics_future_ext() {
882        let runtime = tokio::runtime::Builder::new_current_thread()
883            .enable_time()
884            .build()
885            .expect("failed to start runtime");
886        let registry = MetricsRegistry::new();
887        let metrics = Metrics::register_into(&registry);
888
889        // Record the walltime and execution time of an async sleep.
890        let async_sleep_future = async {
891            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
892        };
893        runtime.block_on(
894            async_sleep_future
895                .wall_time()
896                .observe(metrics.wall_time_hist.with_label_values(&["async_sleep_w"]))
897                .exec_time()
898                .observe(metrics.exec_time_hist.with_label_values(&["async_sleep_e"])),
899        );
900
901        let reports = registry.gather();
902
903        let exec_family = reports
904            .iter()
905            .find(|m| m.get_name() == "exec_time_hist")
906            .expect("metric not found");
907        let exec_metric = exec_family.get_metric();
908        assert_eq!(exec_metric.len(), 1);
909        assert_eq!(exec_metric[0].get_label()[0].get_value(), "async_sleep_e");
910
911        let exec_histogram = exec_metric[0].get_histogram();
912        assert_eq!(exec_histogram.get_sample_count(), 1);
913        // The 4th bucket is 1ms, which we should complete faster than, but is still much quicker
914        // than the 1 second we slept for.
915        assert_eq!(exec_histogram.get_bucket()[3].get_cumulative_count(), 1);
916
917        let wall_family = reports
918            .iter()
919            .find(|m| m.get_name() == "wall_time_hist")
920            .expect("metric not found");
921        let wall_metric = wall_family.get_metric();
922        assert_eq!(wall_metric.len(), 1);
923        assert_eq!(wall_metric[0].get_label()[0].get_value(), "async_sleep_w");
924
925        let wall_histogram = wall_metric[0].get_histogram();
926        assert_eq!(wall_histogram.get_sample_count(), 1);
927        // The 13th bucket is 512ms, which the wall time should be longer than, but is also much
928        // faster than the actual execution time of the async sleep.
929        assert_eq!(wall_histogram.get_bucket()[12].get_cumulative_count(), 0);
930
931        // Reset the registery to make collecting metrics easier.
932        let registry = MetricsRegistry::new();
933        let metrics = Metrics::register_into(&registry);
934
935        // Record the walltime and execution time of a thread sleep.
936        let thread_sleep_future = async {
937            std::thread::sleep(std::time::Duration::from_secs(1));
938        };
939        runtime.block_on(
940            thread_sleep_future
941                .wall_time()
942                .with_filter(|duration| duration < Duration::from_millis(10))
943                .inc_by(metrics.wall_time_cnt.with_label_values(&["thread_sleep_w"]))
944                .exec_time()
945                .inc_by(metrics.exec_time_cnt.with_label_values(&["thread_sleep_e"])),
946        );
947
948        let reports = registry.gather();
949
950        let exec_family = reports
951            .iter()
952            .find(|m| m.get_name() == "exec_time_cnt")
953            .expect("metric not found");
954        let exec_metric = exec_family.get_metric();
955        assert_eq!(exec_metric.len(), 1);
956        assert_eq!(exec_metric[0].get_label()[0].get_value(), "thread_sleep_e");
957
958        let exec_counter = exec_metric[0].get_counter();
959        // Since we're synchronously sleeping the execution time will be long.
960        assert!(exec_counter.get_value() >= 1.0);
961
962        let wall_family = reports
963            .iter()
964            .find(|m| m.get_name() == "wall_time_cnt")
965            .expect("metric not found");
966        let wall_metric = wall_family.get_metric();
967        assert_eq!(wall_metric.len(), 1);
968
969        let wall_counter = wall_metric[0].get_counter();
970        // We filtered wall time to < 10ms, so our wall time metric should be filtered out.
971        assert_eq!(wall_counter.get_value(), 0.0);
972    }
973}