Skip to main content

mz_ore/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Metrics for materialize systems.
17//!
18//! The idea here is that each subsystem keeps its metrics in a scoped-to-it struct, which gets
19//! registered (once) to the server's (or a test's) prometheus registry.
20//!
21//! Instead of using prometheus's (very verbose) metrics definitions, we rely on type inference to
22//! reduce the verbosity a little bit. A typical subsystem will look like the following:
23//!
24//! ```rust
25//! # use mz_ore::metrics::{MetricsRegistry, IntCounter};
26//! # use mz_ore::metric;
27//! #[derive(Debug, Clone)] // Note that prometheus metrics can safely be cloned
28//! struct Metrics {
29//!     pub bytes_sent: IntCounter,
30//! }
31//!
32//! impl Metrics {
33//!     pub fn register_into(registry: &MetricsRegistry) -> Metrics {
34//!         Metrics {
35//!             bytes_sent: registry.register(metric!(
36//!                 name: "mz_pg_sent_bytes",
37//!                 help: "total number of bytes sent here",
38//!             )),
39//!         }
40//!     }
41//! }
42//! ```
43
44use std::collections::BTreeMap;
45use std::fmt;
46use std::fmt::{Debug, Formatter};
47use std::future::Future;
48use std::pin::Pin;
49use std::sync::{Arc, Mutex};
50use std::task::{Context, Poll};
51use std::time::{Duration, Instant};
52
53use derivative::Derivative;
54use pin_project::pin_project;
55use prometheus::core::{
56    Atomic, AtomicF64, AtomicI64, AtomicU64, Collector, Desc, GenericCounter, GenericCounterVec,
57    GenericGauge, GenericGaugeVec,
58};
59use prometheus::proto::MetricFamily;
60use prometheus::{HistogramOpts, Registry};
61
62mod delete_on_drop;
63mod rule;
64
65pub use delete_on_drop::*;
66pub use prometheus::Opts as PrometheusOpts;
67pub use rule::{NameLookup, ObjectName, Rule};
68
69/// Define a metric for use in materialize.
70#[macro_export]
71macro_rules! metric {
72    (
73        name: $name:expr,
74        help: $help:expr
75        $(, subsystem: $subsystem_name:expr)?
76        $(, const_labels: { $($cl_key:expr => $cl_value:expr ),* })?
77        $(, var_labels: [ $($vl_name:expr),* ])?
78        $(, buckets: $bk_name:expr)?
79        $(, rules: [ $($rule:expr),* $(,)? ])?
80        $(,)?
81    ) => {{
82        let const_labels = (&[
83            $($(
84                ($cl_key.to_string(), $cl_value.to_string()),
85            )*)?
86        ]).into_iter().cloned().collect();
87        let var_labels = vec![
88            $(
89                $($vl_name.into(),)*
90            )?];
91        #[allow(unused_mut)]
92        let mut mk_opts = $crate::metrics::MakeCollectorOpts {
93            opts: $crate::metrics::PrometheusOpts::new($name, $help)
94                $(.subsystem( $subsystem_name ))?
95                .const_labels(const_labels)
96                .variable_labels(var_labels),
97            buckets: None,
98            rules: vec![ $($($rule),*)? ],
99        };
100        // Set buckets if passed
101        $(mk_opts.buckets = Some($bk_name);)*
102        mk_opts
103    }}
104}
105
106/// Options for MakeCollector. This struct should be instantiated using the metric macro.
107#[derive(Debug, Clone)]
108pub struct MakeCollectorOpts {
109    /// Common Prometheus options
110    pub opts: PrometheusOpts,
111    /// Buckets to be used with Histogram and HistogramVec. Must be set to create Histogram types
112    /// and must not be set for other types.
113    pub buckets: Option<Vec<f64>>,
114    /// Enrichment rules attached to this metric. Applied by environmentd's public
115    /// scrape endpoint to attach human-readable name labels (e.g. `cluster_name`)
116    /// alongside the ID labels the metric already carries.
117    pub rules: Vec<Rule>,
118}
119
120/// The materialize metrics registry.
121#[derive(Clone, Derivative)]
122#[derivative(Debug)]
123pub struct MetricsRegistry {
124    inner: Registry,
125    #[derivative(Debug = "ignore")]
126    postprocessors: Arc<Mutex<Vec<Box<dyn FnMut(&mut Vec<MetricFamily>) + Send + Sync>>>>,
127    /// Enrichment rules declared per-metric via [`MakeCollectorOpts::rules`].
128    /// Keyed by the fully-qualified Prometheus metric name (matches
129    /// `MetricFamily::name()` at scrape time).
130    rules_by_metric: Arc<Mutex<BTreeMap<String, Vec<Rule>>>>,
131}
132
133/// A wrapper for metrics to require delete on drop semantics
134///
135/// The wrapper behaves like regular metrics but only provides functions to create delete-on-drop
136/// variants. This way, no metrics of this type can be leaked.
137///
138/// In situations where the delete-on-drop behavior is not desired or in legacy code, use the raw
139/// variants of the metrics, as defined in [self::raw].
140#[derive(Clone)]
141pub struct DeleteOnDropWrapper<M> {
142    inner: M,
143}
144
145impl<M: MakeCollector + Debug> Debug for DeleteOnDropWrapper<M> {
146    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
147        self.inner.fmt(f)
148    }
149}
150
151impl<M: Collector> Collector for DeleteOnDropWrapper<M> {
152    fn desc(&self) -> Vec<&Desc> {
153        self.inner.desc()
154    }
155
156    fn collect(&self) -> Vec<MetricFamily> {
157        self.inner.collect()
158    }
159}
160
161impl<M: MakeCollector> MakeCollector for DeleteOnDropWrapper<M> {
162    fn make_collector(opts: MakeCollectorOpts) -> Self {
163        DeleteOnDropWrapper {
164            inner: M::make_collector(opts),
165        }
166    }
167}
168
169impl<M: MetricVecExt> DeleteOnDropWrapper<M> {
170    /// Returns a metric that deletes its labels from this metrics vector when dropped.
171    pub fn get_delete_on_drop_metric<L: PromLabelsExt>(
172        &self,
173        labels: L,
174    ) -> DeleteOnDropMetric<M, L> {
175        self.inner.get_delete_on_drop_metric(labels)
176    }
177}
178
179/// The unsigned integer version of [`Gauge`]. Provides better performance if
180/// metric values are all unsigned integers.
181pub type UIntGauge = GenericGauge<AtomicU64>;
182
183/// Delete-on-drop shadow of Prometheus [prometheus::CounterVec].
184pub type CounterVec = DeleteOnDropWrapper<prometheus::CounterVec>;
185/// Delete-on-drop shadow of Prometheus [prometheus::Gauge].
186pub type Gauge = DeleteOnDropWrapper<prometheus::Gauge>;
187/// Delete-on-drop shadow of Prometheus [prometheus::GaugeVec].
188pub type GaugeVec = DeleteOnDropWrapper<prometheus::GaugeVec>;
189/// Delete-on-drop shadow of Prometheus [prometheus::HistogramVec].
190pub type HistogramVec = DeleteOnDropWrapper<prometheus::HistogramVec>;
191/// Delete-on-drop shadow of Prometheus [prometheus::IntCounterVec].
192pub type IntCounterVec = DeleteOnDropWrapper<prometheus::IntCounterVec>;
193/// Delete-on-drop shadow of Prometheus [prometheus::IntGaugeVec].
194pub type IntGaugeVec = DeleteOnDropWrapper<prometheus::IntGaugeVec>;
195/// Delete-on-drop shadow of Prometheus [raw::UIntGaugeVec].
196pub type UIntGaugeVec = DeleteOnDropWrapper<raw::UIntGaugeVec>;
197
198use crate::assert_none;
199
200pub use prometheus::{Counter, Histogram, IntCounter, IntGauge};
201
202/// Access to non-delete-on-drop vector types
203pub mod raw {
204    use prometheus::core::{AtomicU64, GenericGaugeVec};
205
206    /// The unsigned integer version of [`GaugeVec`].
207    /// Provides better performance if metric values are all unsigned integers.
208    pub type UIntGaugeVec = GenericGaugeVec<AtomicU64>;
209
210    pub use prometheus::{CounterVec, Gauge, GaugeVec, HistogramVec, IntCounterVec, IntGaugeVec};
211}
212
213impl MetricsRegistry {
214    /// Creates a new metrics registry.
215    pub fn new() -> Self {
216        MetricsRegistry {
217            inner: Registry::new(),
218            postprocessors: Arc::new(Mutex::new(vec![])),
219            rules_by_metric: Arc::new(Mutex::new(BTreeMap::new())),
220        }
221    }
222
223    /// Returns a snapshot of per-metric enrichment rules, keyed by the
224    /// fully-qualified Prometheus metric name.
225    pub fn rules_by_metric(&self) -> BTreeMap<String, Vec<Rule>> {
226        self.rules_by_metric.lock().expect("lock poisoned").clone()
227    }
228
229    /// Records the per-metric rules from `opts` under the metric's
230    /// fully-qualified name, if any are declared.
231    fn record_per_metric_rules(&self, opts: &MakeCollectorOpts) {
232        if opts.rules.is_empty() {
233            return;
234        }
235        let fq_name = opts.opts.fq_name();
236        self.rules_by_metric
237            .lock()
238            .expect("lock poisoned")
239            .entry(fq_name)
240            .or_default()
241            .extend(opts.rules.iter().cloned());
242    }
243
244    /// Register a metric defined with the [`metric`] macro.
245    pub fn register<M>(&self, opts: MakeCollectorOpts) -> M
246    where
247        M: MakeCollector,
248    {
249        self.record_per_metric_rules(&opts);
250        let collector = M::make_collector(opts);
251        self.inner.register(Box::new(collector.clone())).unwrap();
252        collector
253    }
254
255    /// Registers a gauge whose value is computed when observed.
256    pub fn register_computed_gauge<P>(
257        &self,
258        opts: MakeCollectorOpts,
259        f: impl Fn() -> P::T + Send + Sync + 'static,
260    ) -> ComputedGenericGauge<P>
261    where
262        P: Atomic + 'static,
263    {
264        self.record_per_metric_rules(&opts);
265        let gauge = ComputedGenericGauge {
266            gauge: GenericGauge::make_collector(opts),
267            f: Arc::new(f),
268        };
269        self.inner.register(Box::new(gauge.clone())).unwrap();
270        gauge
271    }
272
273    /// Register a pre-defined prometheus collector.
274    pub fn register_collector<C: 'static + prometheus::core::Collector>(&self, collector: C) {
275        self.inner
276            .register(Box::new(collector))
277            .expect("registering pre-defined metrics collector");
278    }
279
280    /// Registers a metric postprocessor.
281    ///
282    /// Postprocessors are invoked on every call to [`MetricsRegistry::gather`]
283    /// in the order that they are registered.
284    pub fn register_postprocessor<F>(&self, f: F)
285    where
286        F: FnMut(&mut Vec<MetricFamily>) + Send + Sync + 'static,
287    {
288        let mut postprocessors = self.postprocessors.lock().expect("lock poisoned");
289        postprocessors.push(Box::new(f));
290    }
291
292    /// Gather all the metrics from the metrics registry for reporting.
293    ///
294    /// This function invokes the postprocessors on all gathered metrics (see
295    /// [`MetricsRegistry::register_postprocessor`]) in the order the
296    /// postprocessors were registered.
297    ///
298    /// See also [`prometheus::Registry::gather`].
299    pub fn gather(&self) -> Vec<MetricFamily> {
300        let mut metrics = self.inner.gather();
301        let mut postprocessors = self.postprocessors.lock().expect("lock poisoned");
302        for postprocessor in &mut *postprocessors {
303            postprocessor(&mut metrics);
304        }
305        metrics
306    }
307}
308
309/// A wrapper for creating prometheus metrics more conveniently.
310///
311/// Together with the [`metric`] macro, this trait is mainly used by [`MetricsRegistry`] and should
312/// not normally be used outside the metric registration flow.
313pub trait MakeCollector: Collector + Clone + 'static {
314    /// Creates a new collector.
315    fn make_collector(opts: MakeCollectorOpts) -> Self;
316}
317
318impl<T> MakeCollector for GenericCounter<T>
319where
320    T: Atomic + 'static,
321{
322    fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
323        assert_none!(mk_opts.buckets);
324        Self::with_opts(mk_opts.opts).expect("defining a counter")
325    }
326}
327
328impl<T> MakeCollector for GenericCounterVec<T>
329where
330    T: Atomic + 'static,
331{
332    fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
333        assert_none!(mk_opts.buckets);
334        let labels: Vec<String> = mk_opts.opts.variable_labels.clone();
335        let label_refs: Vec<&str> = labels.iter().map(String::as_str).collect();
336        Self::new(mk_opts.opts, label_refs.as_slice()).expect("defining a counter vec")
337    }
338}
339
340impl<T> MakeCollector for GenericGauge<T>
341where
342    T: Atomic + 'static,
343{
344    fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
345        assert_none!(mk_opts.buckets);
346        Self::with_opts(mk_opts.opts).expect("defining a gauge")
347    }
348}
349
350impl<T> MakeCollector for GenericGaugeVec<T>
351where
352    T: Atomic + 'static,
353{
354    fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
355        assert_none!(mk_opts.buckets);
356        let labels = mk_opts.opts.variable_labels.clone();
357        let labels = &labels.iter().map(|x| x.as_str()).collect::<Vec<_>>();
358        Self::new(mk_opts.opts, labels).expect("defining a gauge vec")
359    }
360}
361
362impl MakeCollector for Histogram {
363    fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
364        assert!(mk_opts.buckets.is_some());
365        Self::with_opts(HistogramOpts {
366            common_opts: mk_opts.opts,
367            buckets: mk_opts.buckets.unwrap(),
368        })
369        .expect("defining a histogram")
370    }
371}
372
373impl MakeCollector for raw::HistogramVec {
374    fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
375        assert!(mk_opts.buckets.is_some());
376        let labels = mk_opts.opts.variable_labels.clone();
377        let labels = &labels.iter().map(|x| x.as_str()).collect::<Vec<_>>();
378        Self::new(
379            HistogramOpts {
380                common_opts: mk_opts.opts,
381                buckets: mk_opts.buckets.unwrap(),
382            },
383            labels,
384        )
385        .expect("defining a histogram vec")
386    }
387}
388
389/// A [`Gauge`] whose value is computed whenever it is observed.
390pub struct ComputedGenericGauge<P>
391where
392    P: Atomic,
393{
394    gauge: GenericGauge<P>,
395    f: Arc<dyn Fn() -> P::T + Send + Sync>,
396}
397
398impl<P> fmt::Debug for ComputedGenericGauge<P>
399where
400    P: Atomic + fmt::Debug,
401{
402    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
403        f.debug_struct("ComputedGenericGauge")
404            .field("gauge", &self.gauge)
405            .finish_non_exhaustive()
406    }
407}
408
409impl<P> Clone for ComputedGenericGauge<P>
410where
411    P: Atomic,
412{
413    fn clone(&self) -> ComputedGenericGauge<P> {
414        ComputedGenericGauge {
415            gauge: self.gauge.clone(),
416            f: Arc::clone(&self.f),
417        }
418    }
419}
420
421impl<T> Collector for ComputedGenericGauge<T>
422where
423    T: Atomic,
424{
425    fn desc(&self) -> Vec<&prometheus::core::Desc> {
426        self.gauge.desc()
427    }
428
429    fn collect(&self) -> Vec<MetricFamily> {
430        self.gauge.set((self.f)());
431        self.gauge.collect()
432    }
433}
434
435impl<P> ComputedGenericGauge<P>
436where
437    P: Atomic,
438{
439    /// Computes the current value of the gauge.
440    pub fn get(&self) -> P::T {
441        (self.f)()
442    }
443}
444
445/// A [`ComputedGenericGauge`] for 64-bit floating point numbers.
446pub type ComputedGauge = ComputedGenericGauge<AtomicF64>;
447
448/// A [`ComputedGenericGauge`] for 64-bit signed integers.
449pub type ComputedIntGauge = ComputedGenericGauge<AtomicI64>;
450
451/// A [`ComputedGenericGauge`] for 64-bit unsigned integers.
452pub type ComputedUIntGauge = ComputedGenericGauge<AtomicU64>;
453
454/// Exposes combinators that report metrics related to the execution of a [`Future`] to prometheus.
455pub trait MetricsFutureExt<F> {
456    /// Records the number of seconds it takes a [`Future`] to complete according to "the clock on
457    /// the wall".
458    ///
459    /// More specifically, it records the instant at which the `Future` was first polled, and the
460    /// instant at which the `Future` completes. Then reports the duration between those two
461    /// instances to the provided metric.
462    ///
463    /// # Wall Time vs Execution Time
464    ///
465    /// There is also [`MetricsFutureExt::exec_time`], which measures how long a [`Future`] spent
466    /// executing, instead of how long it took to complete. For example, a network request may have
467    /// a wall time of 1 second, meanwhile it's execution time may have only been 50ms. The 950ms
468    /// delta would be how long the [`Future`] waited for a response from the network.
469    ///
470    /// # Uses
471    ///
472    /// Recording the wall time can be useful for monitoring latency, for example the latency of a
473    /// SQL request.
474    ///
475    /// Note: You must call either [`observe`] to record the execution time to a [`Histogram`] or
476    /// [`inc_by`] to record to a [`Counter`]. The following will not compile:
477    ///
478    /// ```compile_fail
479    /// use mz_ore::metrics::MetricsFutureExt;
480    ///
481    /// # let _ = async {
482    /// async { Ok(()) }
483    ///     .wall_time()
484    ///     .await;
485    /// # };
486    /// ```
487    ///
488    /// [`observe`]: WallTimeFuture::observe
489    /// [`inc_by`]: WallTimeFuture::inc_by
490    fn wall_time(self) -> WallTimeFuture<F, UnspecifiedMetric>;
491
492    /// Records the total number of seconds for which a [`Future`] was executing.
493    ///
494    /// More specifically, every time the `Future` is polled it records how long that individual
495    /// call took, and maintains a running sum until the `Future` completes. Then we report that
496    /// duration to the provided metric.
497    ///
498    /// # Wall Time vs Execution Time
499    ///
500    /// There is also [`MetricsFutureExt::wall_time`], which measures how long a [`Future`] took to
501    /// complete, instead of how long it spent executing. For example, a network request may have
502    /// a wall time of 1 second, meanwhile it's execution time may have only been 50ms. The 950ms
503    /// delta would be how long the [`Future`] waited for a response from the network.
504    ///
505    /// # Uses
506    ///
507    /// Recording execution time can be useful if you want to monitor [`Future`]s that could be
508    /// sensitive to CPU usage. For example, if you have a single logical control thread you'll
509    /// want to make sure that thread never spends too long running a single `Future`. Reporting
510    /// the execution time of `Future`s running on this thread can help ensure there is no
511    /// unexpected blocking.
512    ///
513    /// Note: You must call either [`observe`] to record the execution time to a [`Histogram`] or
514    /// [`inc_by`] to record to a [`Counter`]. The following will not compile:
515    ///
516    /// ```compile_fail
517    /// use mz_ore::metrics::MetricsFutureExt;
518    ///
519    /// # let _ = async {
520    /// async { Ok(()) }
521    ///     .exec_time()
522    ///     .await;
523    /// # };
524    /// ```
525    ///
526    /// [`observe`]: ExecTimeFuture::observe
527    /// [`inc_by`]: ExecTimeFuture::inc_by
528    fn exec_time(self) -> ExecTimeFuture<F, UnspecifiedMetric>;
529}
530
531impl<F: Future> MetricsFutureExt<F> for F {
532    fn wall_time(self) -> WallTimeFuture<F, UnspecifiedMetric> {
533        WallTimeFuture {
534            fut: self,
535            metric: UnspecifiedMetric(()),
536            start: None,
537            filter: None,
538        }
539    }
540
541    fn exec_time(self) -> ExecTimeFuture<F, UnspecifiedMetric> {
542        ExecTimeFuture {
543            fut: self,
544            metric: UnspecifiedMetric(()),
545            running_duration: Duration::from_millis(0),
546            filter: None,
547        }
548    }
549}
550
551/// Future returned by [`MetricsFutureExt::wall_time`].
552#[must_use = "futures do nothing unless you `.await` or poll them"]
553#[pin_project]
554pub struct WallTimeFuture<F, Metric> {
555    /// The inner [`Future`] that we're recording the wall time for.
556    #[pin]
557    fut: F,
558    /// Prometheus metric that we'll report to.
559    metric: Metric,
560    /// [`Instant`] at which the [`Future`] was first polled.
561    start: Option<Instant>,
562    /// Optional filter that determines if we observe the wall time of this [`Future`].
563    filter: Option<Box<dyn FnMut(Duration) -> bool + Send + Sync>>,
564}
565
566impl<F: Debug, M: Debug> fmt::Debug for WallTimeFuture<F, M> {
567    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
568        f.debug_struct("WallTimeFuture")
569            .field("fut", &self.fut)
570            .field("metric", &self.metric)
571            .field("start", &self.start)
572            .field("filter", &self.filter.is_some())
573            .finish()
574    }
575}
576
577impl<F> WallTimeFuture<F, UnspecifiedMetric> {
578    /// Sets the recored metric to be a [`prometheus::Histogram`].
579    ///
580    /// ```text
581    /// my_future
582    ///     .wall_time()
583    ///     .observe(metrics.slow_queries_hist.with_label_values(&["select"]))
584    /// ```
585    pub fn observe(
586        self,
587        histogram: prometheus::Histogram,
588    ) -> WallTimeFuture<F, prometheus::Histogram> {
589        WallTimeFuture {
590            fut: self.fut,
591            metric: histogram,
592            start: self.start,
593            filter: self.filter,
594        }
595    }
596
597    /// Sets the recored metric to be a [`prometheus::Counter`].
598    ///
599    /// ```text
600    /// my_future
601    ///     .wall_time()
602    ///     .inc_by(metrics.slow_queries.with_label_values(&["select"]))
603    /// ```
604    pub fn inc_by(self, counter: prometheus::Counter) -> WallTimeFuture<F, prometheus::Counter> {
605        WallTimeFuture {
606            fut: self.fut,
607            metric: counter,
608            start: self.start,
609            filter: self.filter,
610        }
611    }
612
613    /// Sets the recorded duration in a specific f64.
614    pub fn set_at(self, place: &mut f64) -> WallTimeFuture<F, &mut f64> {
615        WallTimeFuture {
616            fut: self.fut,
617            metric: place,
618            start: self.start,
619            filter: self.filter,
620        }
621    }
622}
623
624impl<F, M> WallTimeFuture<F, M> {
625    /// Specifies a filter which much return `true` for the wall time to be recorded.
626    ///
627    /// This can be particularly useful if you have a high volume `Future` and you only want to
628    /// record ones that take a long time to complete.
629    pub fn with_filter(
630        mut self,
631        filter: impl FnMut(Duration) -> bool + Send + Sync + 'static,
632    ) -> Self {
633        self.filter = Some(Box::new(filter));
634        self
635    }
636}
637
638impl<F: Future, M: DurationMetric> Future for WallTimeFuture<F, M> {
639    type Output = F::Output;
640
641    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
642        let this = self.project();
643
644        if this.start.is_none() {
645            *this.start = Some(Instant::now());
646        }
647
648        let result = match this.fut.poll(cx) {
649            Poll::Ready(r) => r,
650            Poll::Pending => return Poll::Pending,
651        };
652        let duration = Instant::now().duration_since(this.start.expect("timer to be started"));
653
654        let pass = this
655            .filter
656            .as_mut()
657            .map(|filter| filter(duration))
658            .unwrap_or(true);
659        if pass {
660            this.metric.record(duration.as_secs_f64())
661        }
662
663        Poll::Ready(result)
664    }
665}
666
667/// Future returned by [`MetricsFutureExt::exec_time`].
668#[must_use = "futures do nothing unless you `.await` or poll them"]
669#[pin_project]
670pub struct ExecTimeFuture<F, Metric> {
671    /// The inner [`Future`] that we're recording the wall time for.
672    #[pin]
673    fut: F,
674    /// Prometheus metric that we'll report to.
675    metric: Metric,
676    /// Total [`Duration`] for which this [`Future`] has been executing.
677    running_duration: Duration,
678    /// Optional filter that determines if we observe the execution time of this [`Future`].
679    filter: Option<Box<dyn FnMut(Duration) -> bool + Send + Sync>>,
680}
681
682impl<F: Debug, M: Debug> fmt::Debug for ExecTimeFuture<F, M> {
683    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
684        f.debug_struct("ExecTimeFuture")
685            .field("fut", &self.fut)
686            .field("metric", &self.metric)
687            .field("running_duration", &self.running_duration)
688            .field("filter", &self.filter.is_some())
689            .finish()
690    }
691}
692
693impl<F> ExecTimeFuture<F, UnspecifiedMetric> {
694    /// Sets the recored metric to be a [`prometheus::Histogram`].
695    ///
696    /// ```text
697    /// my_future
698    ///     .exec_time()
699    ///     .observe(metrics.slow_queries_hist.with_label_values(&["select"]))
700    /// ```
701    pub fn observe(
702        self,
703        histogram: prometheus::Histogram,
704    ) -> ExecTimeFuture<F, prometheus::Histogram> {
705        ExecTimeFuture {
706            fut: self.fut,
707            metric: histogram,
708            running_duration: self.running_duration,
709            filter: self.filter,
710        }
711    }
712
713    /// Sets the recored metric to be a [`prometheus::Counter`].
714    ///
715    /// ```text
716    /// my_future
717    ///     .exec_time()
718    ///     .inc_by(metrics.slow_queries.with_label_values(&["select"]))
719    /// ```
720    pub fn inc_by(self, counter: prometheus::Counter) -> ExecTimeFuture<F, prometheus::Counter> {
721        ExecTimeFuture {
722            fut: self.fut,
723            metric: counter,
724            running_duration: self.running_duration,
725            filter: self.filter,
726        }
727    }
728}
729
730impl<F, M> ExecTimeFuture<F, M> {
731    /// Specifies a filter which much return `true` for the execution time to be recorded.
732    pub fn with_filter(
733        mut self,
734        filter: impl FnMut(Duration) -> bool + Send + Sync + 'static,
735    ) -> Self {
736        self.filter = Some(Box::new(filter));
737        self
738    }
739}
740
741impl<F: Future, M: DurationMetric> Future for ExecTimeFuture<F, M> {
742    type Output = F::Output;
743
744    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
745        let this = self.project();
746
747        let start = Instant::now();
748        let result = this.fut.poll(cx);
749        let duration = Instant::now().duration_since(start);
750
751        *this.running_duration = this.running_duration.saturating_add(duration);
752
753        let result = match result {
754            Poll::Ready(result) => result,
755            Poll::Pending => return Poll::Pending,
756        };
757
758        let duration = *this.running_duration;
759        let pass = this
760            .filter
761            .as_mut()
762            .map(|filter| filter(duration))
763            .unwrap_or(true);
764        if pass {
765            this.metric.record(duration.as_secs_f64());
766        }
767
768        Poll::Ready(result)
769    }
770}
771
772/// A type level flag used to ensure callers specify the kind of metric to record for
773/// [`MetricsFutureExt`].
774///
775/// For example, `WallTimeFuture<F, M>` only implements [`Future`] for `M` that implements
776/// `DurationMetric` which [`UnspecifiedMetric`] does not. This forces users at build time to
777/// call [`WallTimeFuture::observe`] or [`WallTimeFuture::inc_by`].
778#[derive(Debug)]
779pub struct UnspecifiedMetric(());
780
781/// A trait makes recording a duration generic over different prometheus metrics. This allows us to
782/// de-dupe the implemenation of [`Future`] for our wrapper Futures like [`WallTimeFuture`] and
783/// [`ExecTimeFuture`] over different kinds of prometheus metrics.
784trait DurationMetric {
785    fn record(&mut self, seconds: f64);
786}
787
788impl DurationMetric for prometheus::Histogram {
789    fn record(&mut self, seconds: f64) {
790        self.observe(seconds)
791    }
792}
793
794impl DurationMetric for prometheus::Counter {
795    fn record(&mut self, seconds: f64) {
796        self.inc_by(seconds)
797    }
798}
799
800// An implementation of `DurationMetric` that lets the user take the recorded
801// value and use it elsewhere.
802impl DurationMetric for &'_ mut f64 {
803    fn record(&mut self, seconds: f64) {
804        **self = seconds;
805    }
806}
807
808/// Register the Tokio runtime's metrics in our metrics registry.
809#[cfg(feature = "async")]
810pub fn register_runtime_metrics(
811    name: &'static str,
812    runtime_metrics: tokio::runtime::RuntimeMetrics,
813    registry: &MetricsRegistry,
814) {
815    macro_rules! register {
816        ($method:ident, $doc:literal) => {
817            let metrics = runtime_metrics.clone();
818            registry.register_computed_gauge::<prometheus::core::AtomicU64>(
819                crate::metric!(
820                    name: concat!("mz_tokio_", stringify!($method)),
821                    help: $doc,
822                    const_labels: {"runtime" => name},
823                ),
824                move || <u64 as crate::cast::CastFrom<_>>::cast_from(metrics.$method()),
825            );
826        };
827    }
828
829    macro_rules! register_per_worker {
830        ($method:ident, $doc:literal) => {
831            let metrics = runtime_metrics.clone();
832            registry.register_computed_gauge::<prometheus::core::AtomicU64>(
833                crate::metric!(
834                    name: concat!("mz_tokio_", stringify!($method)),
835                    help: $doc,
836                    const_labels: {"runtime" => name},
837                ),
838                move || {
839                    (0..metrics.num_workers())
840                        .map(|i| <u64 as crate::cast::CastFrom<_>>::cast_from(metrics.$method(i)))
841                        .sum::<u64>()
842                },
843            );
844        };
845    }
846
847    macro_rules! register_per_worker_duration_secs {
848        ($method:ident, $doc:literal) => {
849            let metrics = runtime_metrics.clone();
850            registry.register_computed_gauge::<prometheus::core::AtomicF64>(
851                crate::metric!(
852                    name: concat!("mz_tokio_", stringify!($method)),
853                    help: $doc,
854                    const_labels: {"runtime" => name},
855                ),
856                move || {
857                    (0..metrics.num_workers())
858                        .map(|i| metrics.$method(i).as_secs_f64())
859                        .sum::<f64>()
860                },
861            );
862        };
863    }
864
865    register!(
866        num_workers,
867        "The number of worker threads used by the runtime."
868    );
869    register!(
870        num_alive_tasks,
871        "The current number of alive tasks in the runtime."
872    );
873    register!(
874        global_queue_depth,
875        "The number of tasks currently scheduled in the runtime's global queue."
876    );
877    register_per_worker_duration_secs!(
878        worker_total_busy_duration,
879        "The amount of time the worker threads have been busy, in seconds."
880    );
881    register_per_worker!(
882        worker_park_count,
883        "The total number of times the worker threads have parked."
884    );
885    register_per_worker!(
886        worker_park_unpark_count,
887        "The total number of times the worker threads have parked and unparked."
888    );
889
890    #[cfg(tokio_unstable)]
891    {
892        register!(
893            num_blocking_threads,
894            "The number of additional threads spawned by the runtime."
895        );
896        register!(
897            num_idle_blocking_threads,
898            "The number of idle threads which have spawned by the runtime for spawn_blocking calls."
899        );
900        register_per_worker!(
901            worker_local_queue_depth,
902            "The number of tasks currently scheduled in the workers' local queues."
903        );
904        register!(
905            blocking_queue_depth,
906            "The number of tasks currently scheduled in the blocking thread pool, spawned using spawn_blocking."
907        );
908        register!(
909            spawned_tasks_count,
910            "The number of tasks spawned in this runtime since it was created."
911        );
912        register!(
913            remote_schedule_count,
914            "The number of tasks scheduled from outside of the runtime."
915        );
916        register!(
917            budget_forced_yield_count,
918            "The number of times that tasks have been forced to yield back to the scheduler after exhausting their task budgets."
919        );
920        register_per_worker!(
921            worker_noop_count,
922            "The number of times the given worker thread unparked but performed no work before parking again."
923        );
924        register_per_worker!(
925            worker_steal_count,
926            "The number of tasks the given worker thread stole from another worker thread."
927        );
928        register_per_worker!(
929            worker_steal_operations,
930            "The number of times the given worker thread stole tasks from another worker thread."
931        );
932        register_per_worker!(
933            worker_poll_count,
934            "The number of tasks the given worker thread has polled."
935        );
936        register_per_worker!(
937            worker_local_schedule_count,
938            "The number of tasks scheduled from within the runtime on the given worker's local queue."
939        );
940        register_per_worker!(
941            worker_overflow_count,
942            "The number of times the given worker thread saturated its local queue."
943        );
944        register_per_worker_duration_secs!(
945            worker_mean_poll_time,
946            "The mean duration of task polls in seconds."
947        );
948    }
949}
950
951#[cfg(test)]
952mod tests {
953    use std::time::Duration;
954
955    use prometheus::{CounterVec, HistogramVec};
956
957    use crate::stats::histogram_seconds_buckets;
958
959    use super::{MetricsFutureExt, MetricsRegistry};
960
961    struct Metrics {
962        pub wall_time_hist: HistogramVec,
963        pub wall_time_cnt: CounterVec,
964        pub exec_time_hist: HistogramVec,
965        pub exec_time_cnt: CounterVec,
966    }
967
968    impl Metrics {
969        pub fn register_into(registry: &MetricsRegistry) -> Self {
970            Self {
971                wall_time_hist: registry.register(metric!(
972                    name: "wall_time_hist",
973                    help: "help",
974                    var_labels: ["action"],
975                    buckets: histogram_seconds_buckets(0.000_128, 8.0),
976                )),
977                wall_time_cnt: registry.register(metric!(
978                    name: "wall_time_cnt",
979                    help: "help",
980                    var_labels: ["action"],
981                )),
982                exec_time_hist: registry.register(metric!(
983                    name: "exec_time_hist",
984                    help: "help",
985                    var_labels: ["action"],
986                    buckets: histogram_seconds_buckets(0.000_128, 8.0),
987                )),
988                exec_time_cnt: registry.register(metric!(
989                    name: "exec_time_cnt",
990                    help: "help",
991                    var_labels: ["action"],
992                )),
993            }
994        }
995    }
996
997    #[crate::test]
998    #[cfg_attr(miri, ignore)] // unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance`
999    fn smoke_test_metrics_future_ext() {
1000        let runtime = tokio::runtime::Builder::new_current_thread()
1001            .enable_time()
1002            .build()
1003            .expect("failed to start runtime");
1004        let registry = MetricsRegistry::new();
1005        let metrics = Metrics::register_into(&registry);
1006
1007        // Record the walltime and execution time of an async sleep.
1008        let async_sleep_future = async {
1009            tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
1010        };
1011        runtime.block_on(
1012            async_sleep_future
1013                .wall_time()
1014                .observe(metrics.wall_time_hist.with_label_values(&["async_sleep_w"]))
1015                .exec_time()
1016                .observe(metrics.exec_time_hist.with_label_values(&["async_sleep_e"])),
1017        );
1018
1019        let reports = registry.gather();
1020
1021        let exec_family = reports
1022            .iter()
1023            .find(|m| m.name() == "exec_time_hist")
1024            .expect("metric not found");
1025        let exec_metric = exec_family.get_metric();
1026        assert_eq!(exec_metric.len(), 1);
1027        assert_eq!(exec_metric[0].get_label()[0].value(), "async_sleep_e");
1028
1029        let exec_histogram = exec_metric[0].get_histogram();
1030        assert_eq!(exec_histogram.get_sample_count(), 1);
1031        // This future will normally complete very quickly, but it's hard to guarantee any particular
1032        // timing in an arbitrary test environment, so we don't assert on it here.
1033
1034        let wall_family = reports
1035            .iter()
1036            .find(|m| m.name() == "wall_time_hist")
1037            .expect("metric not found");
1038        let wall_metric = wall_family.get_metric();
1039        assert_eq!(wall_metric.len(), 1);
1040        assert_eq!(wall_metric[0].get_label()[0].value(), "async_sleep_w");
1041
1042        let wall_histogram = wall_metric[0].get_histogram();
1043        assert_eq!(wall_histogram.get_sample_count(), 1);
1044        // The 13th bucket is 512ms, which the wall time should be longer than, but is also much
1045        // faster than the actual execution time of the async sleep.
1046        assert_eq!(wall_histogram.get_bucket()[12].cumulative_count(), 0);
1047
1048        // Reset the registery to make collecting metrics easier.
1049        let registry = MetricsRegistry::new();
1050        let metrics = Metrics::register_into(&registry);
1051
1052        // Record the walltime and execution time of a thread sleep.
1053        let thread_sleep_future = async {
1054            std::thread::sleep(std::time::Duration::from_secs(1));
1055        };
1056        runtime.block_on(
1057            thread_sleep_future
1058                .wall_time()
1059                .with_filter(|duration| duration < Duration::from_millis(10))
1060                .inc_by(metrics.wall_time_cnt.with_label_values(&["thread_sleep_w"]))
1061                .exec_time()
1062                .inc_by(metrics.exec_time_cnt.with_label_values(&["thread_sleep_e"])),
1063        );
1064
1065        let reports = registry.gather();
1066
1067        let exec_family = reports
1068            .iter()
1069            .find(|m| m.name() == "exec_time_cnt")
1070            .expect("metric not found");
1071        let exec_metric = exec_family.get_metric();
1072        assert_eq!(exec_metric.len(), 1);
1073        assert_eq!(exec_metric[0].get_label()[0].value(), "thread_sleep_e");
1074
1075        let exec_counter = exec_metric[0].get_counter();
1076        // Since we're synchronously sleeping the execution time will be long.
1077        assert!(exec_counter.value() >= 1.0);
1078
1079        let wall_family = reports
1080            .iter()
1081            .find(|m| m.name() == "wall_time_cnt")
1082            .expect("metric not found");
1083        let wall_metric = wall_family.get_metric();
1084        assert_eq!(wall_metric.len(), 1);
1085
1086        let wall_counter = wall_metric[0].get_counter();
1087        // We filtered wall time to < 10ms, so our wall time metric should be filtered out.
1088        assert_eq!(wall_counter.value(), 0.0);
1089    }
1090
1091    #[crate::test]
1092    fn register_metric_stores_rules() {
1093        let registry = MetricsRegistry::new();
1094        let cluster_rule = super::Rule::ClusterNameLookup {
1095            cluster_id_label: "cluster_id".into(),
1096            output_label: "cluster_name".into(),
1097        };
1098        let replica_rule = super::Rule::ReplicaNameLookup {
1099            cluster_id_label: "cluster_id".into(),
1100            replica_id_label: "replica_id".into(),
1101            output_label: "replica_name".into(),
1102        };
1103        let _: prometheus::IntCounter = registry.register(crate::metric!(
1104            name: "mz_test_register_metric_stores_rules",
1105            help: "test metric",
1106            rules: [
1107                cluster_rule.clone(),
1108                replica_rule.clone(),
1109            ],
1110        ));
1111        let by_metric = registry.rules_by_metric();
1112        let rules = by_metric
1113            .get("mz_test_register_metric_stores_rules")
1114            .expect("rules registered under fq name");
1115        assert_eq!(rules, &vec![cluster_rule, replica_rule]);
1116    }
1117
1118    #[crate::test]
1119    fn register_metric_without_rules_omits_entry() {
1120        let registry = MetricsRegistry::new();
1121        let _: prometheus::IntCounter = registry.register(crate::metric!(
1122            name: "mz_test_register_metric_without_rules",
1123            help: "test metric without rules",
1124        ));
1125        assert!(registry.rules_by_metric().is_empty());
1126    }
1127}