Skip to main content

mz_ore/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Metrics for materialize systems.
17//!
18//! The idea here is that each subsystem keeps its metrics in a scoped-to-it struct, which gets
19//! registered (once) to the server's (or a test's) prometheus registry.
20//!
21//! Instead of using prometheus's (very verbose) metrics definitions, we rely on type inference to
22//! reduce the verbosity a little bit. A typical subsystem will look like the following:
23//!
24//! ```rust
25//! # use mz_ore::metrics::{MetricsRegistry, IntCounter};
26//! # use mz_ore::metric;
27//! #[derive(Debug, Clone)] // Note that prometheus metrics can safely be cloned
28//! struct Metrics {
29//!     pub bytes_sent: IntCounter,
30//! }
31//!
32//! impl Metrics {
33//!     pub fn register_into(registry: &MetricsRegistry) -> Metrics {
34//!         Metrics {
35//!             bytes_sent: registry.register(metric!(
36//!                 name: "mz_pg_sent_bytes",
37//!                 help: "total number of bytes sent here",
38//!             )),
39//!         }
40//!     }
41//! }
42//! ```
43
44use std::fmt;
45use std::fmt::{Debug, Formatter};
46use std::future::Future;
47use std::pin::Pin;
48use std::sync::{Arc, Mutex};
49use std::task::{Context, Poll};
50use std::time::{Duration, Instant};
51
52use derivative::Derivative;
53use pin_project::pin_project;
54use prometheus::core::{
55    Atomic, AtomicF64, AtomicI64, AtomicU64, Collector, Desc, GenericCounter, GenericCounterVec,
56    GenericGauge, GenericGaugeVec,
57};
58use prometheus::proto::MetricFamily;
59use prometheus::{HistogramOpts, Registry};
60
61mod delete_on_drop;
62
63pub use delete_on_drop::*;
64pub use prometheus::Opts as PrometheusOpts;
65
66/// Define a metric for use in materialize.
67#[macro_export]
68macro_rules! metric {
69    (
70        name: $name:expr,
71        help: $help:expr
72        $(, subsystem: $subsystem_name:expr)?
73        $(, const_labels: { $($cl_key:expr => $cl_value:expr ),* })?
74        $(, var_labels: [ $($vl_name:expr),* ])?
75        $(, buckets: $bk_name:expr)?
76        $(, visibility: $visibility:expr)?
77        $(, tags: [ $($tag:expr),* $(,)? ])?
78        $(,)?
79    ) => {{
80        let const_labels = (&[
81            $($(
82                ($cl_key.to_string(), $cl_value.to_string()),
83            )*)?
84        ]).into_iter().cloned().collect();
85        let var_labels = vec![
86            $(
87                $($vl_name.into(),)*
88            )?];
89        #[allow(unused_mut)]
90        let mut mk_opts = $crate::metrics::MakeCollectorOpts {
91            opts: $crate::metrics::PrometheusOpts::new($name, $help)
92                $(.subsystem( $subsystem_name ))?
93                .const_labels(const_labels)
94                .variable_labels(var_labels),
95            buckets: None,
96        };
97        // Set buckets if passed
98        $(mk_opts.buckets = Some($bk_name);)*
99        // `visibility` is documentation metadata for the metrics catalog
100        // (`bin/gen-metrics-catalog`).
101        // It has no runtime effect; we only type-check it here so a bad value is
102        // a compile error rather than silently ignored.
103        $(let _: $crate::metrics::MetricVisibility = $visibility;)?
104        // `tags` is documentation metadata for the metrics catalog.
105        // It has no runtime effect.
106        $($(let _: $crate::metrics::MetricTag = $tag;)*)?
107        mk_opts
108    }}
109}
110
111/// Options for MakeCollector. This struct should be instantiated using the metric macro.
112#[derive(Debug, Clone)]
113pub struct MakeCollectorOpts {
114    /// Common Prometheus options
115    pub opts: PrometheusOpts,
116    /// Buckets to be used with Histogram and HistogramVec. Must be set to create Histogram types
117    /// and must not be set for other types.
118    pub buckets: Option<Vec<f64>>,
119}
120
121/// This is documentation metadata: it is set via the optional `visibility:`
122/// field of [`metric!`] and consumed by the metrics catalog
123/// (`bin/gen-metrics-catalog`), which reads it from the source tree to produce
124/// the user-facing metrics reference. It has no effect on the metric at
125/// runtime.
126#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Serialize)]
127#[serde(rename_all = "snake_case")]
128pub enum MetricVisibility {
129    /// A metric we use for internal development.
130    #[default]
131    Internal,
132    /// A metric we want customers to build dashboards and
133    /// alerts on. We do not guarantee stability for this group
134    /// of metrics.
135    Public,
136}
137
138/// A tag categorizing a metric in the user-facing metrics catalog.
139///
140/// It is set via the optional `tags:` field of [`metric!`] and
141/// consumed by the metrics catalog (`bin/gen-metrics-catalog`).
142/// It has no effect on the metric at runtime.
143///
144/// A metric may carry many tags, or none (the default). A tag names the
145/// grouping the metric is presented under in user-facing documentation.
146#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
147#[serde(rename_all = "kebab-case")]
148pub enum MetricTag {
149    /// SQL Control plane metrics (client connections, availability, catalog).
150    Environment,
151    /// Metrics for compute objects (indexes, materialized views).
152    Compute,
153    /// Metrics for sources.
154    Source,
155    /// Metrics for sinks.
156    Sink,
157}
158
159/// The materialize metrics registry.
160#[derive(Clone, Derivative)]
161#[derivative(Debug)]
162pub struct MetricsRegistry {
163    inner: Registry,
164    #[derivative(Debug = "ignore")]
165    postprocessors: Arc<Mutex<Vec<Box<dyn FnMut(&mut Vec<MetricFamily>) + Send + Sync>>>>,
166}
167
168/// A wrapper for metrics to require delete on drop semantics
169///
170/// The wrapper behaves like regular metrics but only provides functions to create delete-on-drop
171/// variants. This way, no metrics of this type can be leaked.
172///
173/// In situations where the delete-on-drop behavior is not desired or in legacy code, use the raw
174/// variants of the metrics, as defined in [self::raw].
175#[derive(Clone)]
176pub struct DeleteOnDropWrapper<M> {
177    inner: M,
178}
179
180impl<M: MakeCollector + Debug> Debug for DeleteOnDropWrapper<M> {
181    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
182        self.inner.fmt(f)
183    }
184}
185
186impl<M: Collector> Collector for DeleteOnDropWrapper<M> {
187    fn desc(&self) -> Vec<&Desc> {
188        self.inner.desc()
189    }
190
191    fn collect(&self) -> Vec<MetricFamily> {
192        self.inner.collect()
193    }
194}
195
196impl<M: MakeCollector> MakeCollector for DeleteOnDropWrapper<M> {
197    fn make_collector(opts: MakeCollectorOpts) -> Self {
198        DeleteOnDropWrapper {
199            inner: M::make_collector(opts),
200        }
201    }
202}
203
204impl<M: MetricVecExt> DeleteOnDropWrapper<M> {
205    /// Returns a metric that deletes its labels from this metrics vector when dropped.
206    pub fn get_delete_on_drop_metric<L: PromLabelsExt>(
207        &self,
208        labels: L,
209    ) -> DeleteOnDropMetric<M, L> {
210        self.inner.get_delete_on_drop_metric(labels)
211    }
212}
213
214/// The unsigned integer version of [`Gauge`]. Provides better performance if
215/// metric values are all unsigned integers.
216pub type UIntGauge = GenericGauge<AtomicU64>;
217
218/// Delete-on-drop shadow of Prometheus [prometheus::CounterVec].
219pub type CounterVec = DeleteOnDropWrapper<prometheus::CounterVec>;
220/// Delete-on-drop shadow of Prometheus [prometheus::Gauge].
221pub type Gauge = DeleteOnDropWrapper<prometheus::Gauge>;
222/// Delete-on-drop shadow of Prometheus [prometheus::GaugeVec].
223pub type GaugeVec = DeleteOnDropWrapper<prometheus::GaugeVec>;
224/// Delete-on-drop shadow of Prometheus [prometheus::HistogramVec].
225pub type HistogramVec = DeleteOnDropWrapper<prometheus::HistogramVec>;
226/// Delete-on-drop shadow of Prometheus [prometheus::IntCounterVec].
227pub type IntCounterVec = DeleteOnDropWrapper<prometheus::IntCounterVec>;
228/// Delete-on-drop shadow of Prometheus [prometheus::IntGaugeVec].
229pub type IntGaugeVec = DeleteOnDropWrapper<prometheus::IntGaugeVec>;
230/// Delete-on-drop shadow of Prometheus [raw::UIntGaugeVec].
231pub type UIntGaugeVec = DeleteOnDropWrapper<raw::UIntGaugeVec>;
232
233use crate::assert_none;
234
235pub use prometheus::{Counter, Histogram, IntCounter, IntGauge};
236
237/// Access to non-delete-on-drop vector types
238pub mod raw {
239    use prometheus::core::{AtomicU64, GenericGaugeVec};
240
241    /// The unsigned integer version of [`GaugeVec`].
242    /// Provides better performance if metric values are all unsigned integers.
243    pub type UIntGaugeVec = GenericGaugeVec<AtomicU64>;
244
245    pub use prometheus::{CounterVec, Gauge, GaugeVec, HistogramVec, IntCounterVec, IntGaugeVec};
246}
247
248impl MetricsRegistry {
249    /// Creates a new metrics registry.
250    pub fn new() -> Self {
251        MetricsRegistry {
252            inner: Registry::new(),
253            postprocessors: Arc::new(Mutex::new(vec![])),
254        }
255    }
256
257    /// Register a metric defined with the [`metric`] macro.
258    pub fn register<M>(&self, opts: MakeCollectorOpts) -> M
259    where
260        M: MakeCollector,
261    {
262        let collector = M::make_collector(opts);
263        self.inner.register(Box::new(collector.clone())).unwrap();
264        collector
265    }
266
267    /// Registers a gauge whose value is computed when observed.
268    pub fn register_computed_gauge<P>(
269        &self,
270        opts: MakeCollectorOpts,
271        f: impl Fn() -> P::T + Send + Sync + 'static,
272    ) -> ComputedGenericGauge<P>
273    where
274        P: Atomic + 'static,
275    {
276        let gauge = ComputedGenericGauge {
277            gauge: GenericGauge::make_collector(opts),
278            f: Arc::new(f),
279        };
280        self.inner.register(Box::new(gauge.clone())).unwrap();
281        gauge
282    }
283
284    /// Register a pre-defined prometheus collector.
285    pub fn register_collector<C: 'static + prometheus::core::Collector>(&self, collector: C) {
286        self.inner
287            .register(Box::new(collector))
288            .expect("registering pre-defined metrics collector");
289    }
290
291    /// Registers a metric postprocessor.
292    ///
293    /// Postprocessors are invoked on every call to [`MetricsRegistry::gather`]
294    /// in the order that they are registered.
295    pub fn register_postprocessor<F>(&self, f: F)
296    where
297        F: FnMut(&mut Vec<MetricFamily>) + Send + Sync + 'static,
298    {
299        let mut postprocessors = self.postprocessors.lock().expect("lock poisoned");
300        postprocessors.push(Box::new(f));
301    }
302
303    /// Gather all the metrics from the metrics registry for reporting.
304    ///
305    /// This function invokes the postprocessors on all gathered metrics (see
306    /// [`MetricsRegistry::register_postprocessor`]) in the order the
307    /// postprocessors were registered.
308    ///
309    /// See also [`prometheus::Registry::gather`].
310    pub fn gather(&self) -> Vec<MetricFamily> {
311        let mut metrics = self.inner.gather();
312        let mut postprocessors = self.postprocessors.lock().expect("lock poisoned");
313        for postprocessor in &mut *postprocessors {
314            postprocessor(&mut metrics);
315        }
316        metrics
317    }
318}
319
320/// A wrapper for creating prometheus metrics more conveniently.
321///
322/// Together with the [`metric`] macro, this trait is mainly used by [`MetricsRegistry`] and should
323/// not normally be used outside the metric registration flow.
324pub trait MakeCollector: Collector + Clone + 'static {
325    /// Creates a new collector.
326    fn make_collector(opts: MakeCollectorOpts) -> Self;
327}
328
329impl<T> MakeCollector for GenericCounter<T>
330where
331    T: Atomic + 'static,
332{
333    fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
334        assert_none!(mk_opts.buckets);
335        Self::with_opts(mk_opts.opts).expect("defining a counter")
336    }
337}
338
339impl<T> MakeCollector for GenericCounterVec<T>
340where
341    T: Atomic + 'static,
342{
343    fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
344        assert_none!(mk_opts.buckets);
345        let labels: Vec<String> = mk_opts.opts.variable_labels.clone();
346        let label_refs: Vec<&str> = labels.iter().map(String::as_str).collect();
347        Self::new(mk_opts.opts, label_refs.as_slice()).expect("defining a counter vec")
348    }
349}
350
351impl<T> MakeCollector for GenericGauge<T>
352where
353    T: Atomic + 'static,
354{
355    fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
356        assert_none!(mk_opts.buckets);
357        Self::with_opts(mk_opts.opts).expect("defining a gauge")
358    }
359}
360
361impl<T> MakeCollector for GenericGaugeVec<T>
362where
363    T: Atomic + 'static,
364{
365    fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
366        assert_none!(mk_opts.buckets);
367        let labels = mk_opts.opts.variable_labels.clone();
368        let labels = &labels.iter().map(|x| x.as_str()).collect::<Vec<_>>();
369        Self::new(mk_opts.opts, labels).expect("defining a gauge vec")
370    }
371}
372
373impl MakeCollector for Histogram {
374    fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
375        assert!(mk_opts.buckets.is_some());
376        Self::with_opts(HistogramOpts {
377            common_opts: mk_opts.opts,
378            buckets: mk_opts.buckets.unwrap(),
379        })
380        .expect("defining a histogram")
381    }
382}
383
384impl MakeCollector for raw::HistogramVec {
385    fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
386        assert!(mk_opts.buckets.is_some());
387        let labels = mk_opts.opts.variable_labels.clone();
388        let labels = &labels.iter().map(|x| x.as_str()).collect::<Vec<_>>();
389        Self::new(
390            HistogramOpts {
391                common_opts: mk_opts.opts,
392                buckets: mk_opts.buckets.unwrap(),
393            },
394            labels,
395        )
396        .expect("defining a histogram vec")
397    }
398}
399
400/// A [`Gauge`] whose value is computed whenever it is observed.
401pub struct ComputedGenericGauge<P>
402where
403    P: Atomic,
404{
405    gauge: GenericGauge<P>,
406    f: Arc<dyn Fn() -> P::T + Send + Sync>,
407}
408
409impl<P> fmt::Debug for ComputedGenericGauge<P>
410where
411    P: Atomic + fmt::Debug,
412{
413    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
414        f.debug_struct("ComputedGenericGauge")
415            .field("gauge", &self.gauge)
416            .finish_non_exhaustive()
417    }
418}
419
420impl<P> Clone for ComputedGenericGauge<P>
421where
422    P: Atomic,
423{
424    fn clone(&self) -> ComputedGenericGauge<P> {
425        ComputedGenericGauge {
426            gauge: self.gauge.clone(),
427            f: Arc::clone(&self.f),
428        }
429    }
430}
431
432impl<T> Collector for ComputedGenericGauge<T>
433where
434    T: Atomic,
435{
436    fn desc(&self) -> Vec<&prometheus::core::Desc> {
437        self.gauge.desc()
438    }
439
440    fn collect(&self) -> Vec<MetricFamily> {
441        self.gauge.set((self.f)());
442        self.gauge.collect()
443    }
444}
445
446impl<P> ComputedGenericGauge<P>
447where
448    P: Atomic,
449{
450    /// Computes the current value of the gauge.
451    pub fn get(&self) -> P::T {
452        (self.f)()
453    }
454}
455
456/// A [`ComputedGenericGauge`] for 64-bit floating point numbers.
457pub type ComputedGauge = ComputedGenericGauge<AtomicF64>;
458
459/// A [`ComputedGenericGauge`] for 64-bit signed integers.
460pub type ComputedIntGauge = ComputedGenericGauge<AtomicI64>;
461
462/// A [`ComputedGenericGauge`] for 64-bit unsigned integers.
463pub type ComputedUIntGauge = ComputedGenericGauge<AtomicU64>;
464
465/// Exposes combinators that report metrics related to the execution of a [`Future`] to prometheus.
466pub trait MetricsFutureExt<F> {
467    /// Records the number of seconds it takes a [`Future`] to complete according to "the clock on
468    /// the wall".
469    ///
470    /// More specifically, it records the instant at which the `Future` was first polled, and the
471    /// instant at which the `Future` completes. Then reports the duration between those two
472    /// instances to the provided metric.
473    ///
474    /// # Wall Time vs Execution Time
475    ///
476    /// There is also [`MetricsFutureExt::exec_time`], which measures how long a [`Future`] spent
477    /// executing, instead of how long it took to complete. For example, a network request may have
478    /// a wall time of 1 second, meanwhile it's execution time may have only been 50ms. The 950ms
479    /// delta would be how long the [`Future`] waited for a response from the network.
480    ///
481    /// # Uses
482    ///
483    /// Recording the wall time can be useful for monitoring latency, for example the latency of a
484    /// SQL request.
485    ///
486    /// Note: You must call either [`observe`] to record the execution time to a [`Histogram`] or
487    /// [`inc_by`] to record to a [`Counter`]. The following will not compile:
488    ///
489    /// ```compile_fail
490    /// use mz_ore::metrics::MetricsFutureExt;
491    ///
492    /// # let _ = async {
493    /// async { Ok(()) }
494    ///     .wall_time()
495    ///     .await;
496    /// # };
497    /// ```
498    ///
499    /// [`observe`]: WallTimeFuture::observe
500    /// [`inc_by`]: WallTimeFuture::inc_by
501    fn wall_time(self) -> WallTimeFuture<F, UnspecifiedMetric>;
502
503    /// Records the total number of seconds for which a [`Future`] was executing.
504    ///
505    /// More specifically, every time the `Future` is polled it records how long that individual
506    /// call took, and maintains a running sum until the `Future` completes. Then we report that
507    /// duration to the provided metric.
508    ///
509    /// # Wall Time vs Execution Time
510    ///
511    /// There is also [`MetricsFutureExt::wall_time`], which measures how long a [`Future`] took to
512    /// complete, instead of how long it spent executing. For example, a network request may have
513    /// a wall time of 1 second, meanwhile it's execution time may have only been 50ms. The 950ms
514    /// delta would be how long the [`Future`] waited for a response from the network.
515    ///
516    /// # Uses
517    ///
518    /// Recording execution time can be useful if you want to monitor [`Future`]s that could be
519    /// sensitive to CPU usage. For example, if you have a single logical control thread you'll
520    /// want to make sure that thread never spends too long running a single `Future`. Reporting
521    /// the execution time of `Future`s running on this thread can help ensure there is no
522    /// unexpected blocking.
523    ///
524    /// Note: You must call either [`observe`] to record the execution time to a [`Histogram`] or
525    /// [`inc_by`] to record to a [`Counter`]. The following will not compile:
526    ///
527    /// ```compile_fail
528    /// use mz_ore::metrics::MetricsFutureExt;
529    ///
530    /// # let _ = async {
531    /// async { Ok(()) }
532    ///     .exec_time()
533    ///     .await;
534    /// # };
535    /// ```
536    ///
537    /// [`observe`]: ExecTimeFuture::observe
538    /// [`inc_by`]: ExecTimeFuture::inc_by
539    fn exec_time(self) -> ExecTimeFuture<F, UnspecifiedMetric>;
540}
541
542impl<F: Future> MetricsFutureExt<F> for F {
543    fn wall_time(self) -> WallTimeFuture<F, UnspecifiedMetric> {
544        WallTimeFuture {
545            fut: self,
546            metric: UnspecifiedMetric(()),
547            start: None,
548            filter: None,
549        }
550    }
551
552    fn exec_time(self) -> ExecTimeFuture<F, UnspecifiedMetric> {
553        ExecTimeFuture {
554            fut: self,
555            metric: UnspecifiedMetric(()),
556            running_duration: Duration::from_millis(0),
557            filter: None,
558        }
559    }
560}
561
562/// Future returned by [`MetricsFutureExt::wall_time`].
563#[must_use = "futures do nothing unless you `.await` or poll them"]
564#[pin_project]
565pub struct WallTimeFuture<F, Metric> {
566    /// The inner [`Future`] that we're recording the wall time for.
567    #[pin]
568    fut: F,
569    /// Prometheus metric that we'll report to.
570    metric: Metric,
571    /// [`Instant`] at which the [`Future`] was first polled.
572    start: Option<Instant>,
573    /// Optional filter that determines if we observe the wall time of this [`Future`].
574    filter: Option<Box<dyn FnMut(Duration) -> bool + Send + Sync>>,
575}
576
577impl<F: Debug, M: Debug> fmt::Debug for WallTimeFuture<F, M> {
578    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
579        f.debug_struct("WallTimeFuture")
580            .field("fut", &self.fut)
581            .field("metric", &self.metric)
582            .field("start", &self.start)
583            .field("filter", &self.filter.is_some())
584            .finish()
585    }
586}
587
588impl<F> WallTimeFuture<F, UnspecifiedMetric> {
589    /// Sets the recored metric to be a [`prometheus::Histogram`].
590    ///
591    /// ```text
592    /// my_future
593    ///     .wall_time()
594    ///     .observe(metrics.slow_queries_hist.with_label_values(&["select"]))
595    /// ```
596    pub fn observe(
597        self,
598        histogram: prometheus::Histogram,
599    ) -> WallTimeFuture<F, prometheus::Histogram> {
600        WallTimeFuture {
601            fut: self.fut,
602            metric: histogram,
603            start: self.start,
604            filter: self.filter,
605        }
606    }
607
608    /// Sets the recored metric to be a [`prometheus::Counter`].
609    ///
610    /// ```text
611    /// my_future
612    ///     .wall_time()
613    ///     .inc_by(metrics.slow_queries.with_label_values(&["select"]))
614    /// ```
615    pub fn inc_by(self, counter: prometheus::Counter) -> WallTimeFuture<F, prometheus::Counter> {
616        WallTimeFuture {
617            fut: self.fut,
618            metric: counter,
619            start: self.start,
620            filter: self.filter,
621        }
622    }
623
624    /// Sets the recorded duration in a specific f64.
625    pub fn set_at(self, place: &mut f64) -> WallTimeFuture<F, &mut f64> {
626        WallTimeFuture {
627            fut: self.fut,
628            metric: place,
629            start: self.start,
630            filter: self.filter,
631        }
632    }
633}
634
635impl<F, M> WallTimeFuture<F, M> {
636    /// Specifies a filter which much return `true` for the wall time to be recorded.
637    ///
638    /// This can be particularly useful if you have a high volume `Future` and you only want to
639    /// record ones that take a long time to complete.
640    pub fn with_filter(
641        mut self,
642        filter: impl FnMut(Duration) -> bool + Send + Sync + 'static,
643    ) -> Self {
644        self.filter = Some(Box::new(filter));
645        self
646    }
647}
648
649impl<F: Future, M: DurationMetric> Future for WallTimeFuture<F, M> {
650    type Output = F::Output;
651
652    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
653        let this = self.project();
654
655        if this.start.is_none() {
656            *this.start = Some(Instant::now());
657        }
658
659        let result = match this.fut.poll(cx) {
660            Poll::Ready(r) => r,
661            Poll::Pending => return Poll::Pending,
662        };
663        let duration = Instant::now().duration_since(this.start.expect("timer to be started"));
664
665        let pass = this
666            .filter
667            .as_mut()
668            .map(|filter| filter(duration))
669            .unwrap_or(true);
670        if pass {
671            this.metric.record(duration.as_secs_f64())
672        }
673
674        Poll::Ready(result)
675    }
676}
677
678/// Future returned by [`MetricsFutureExt::exec_time`].
679#[must_use = "futures do nothing unless you `.await` or poll them"]
680#[pin_project]
681pub struct ExecTimeFuture<F, Metric> {
682    /// The inner [`Future`] that we're recording the wall time for.
683    #[pin]
684    fut: F,
685    /// Prometheus metric that we'll report to.
686    metric: Metric,
687    /// Total [`Duration`] for which this [`Future`] has been executing.
688    running_duration: Duration,
689    /// Optional filter that determines if we observe the execution time of this [`Future`].
690    filter: Option<Box<dyn FnMut(Duration) -> bool + Send + Sync>>,
691}
692
693impl<F: Debug, M: Debug> fmt::Debug for ExecTimeFuture<F, M> {
694    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
695        f.debug_struct("ExecTimeFuture")
696            .field("fut", &self.fut)
697            .field("metric", &self.metric)
698            .field("running_duration", &self.running_duration)
699            .field("filter", &self.filter.is_some())
700            .finish()
701    }
702}
703
704impl<F> ExecTimeFuture<F, UnspecifiedMetric> {
705    /// Sets the recored metric to be a [`prometheus::Histogram`].
706    ///
707    /// ```text
708    /// my_future
709    ///     .exec_time()
710    ///     .observe(metrics.slow_queries_hist.with_label_values(&["select"]))
711    /// ```
712    pub fn observe(
713        self,
714        histogram: prometheus::Histogram,
715    ) -> ExecTimeFuture<F, prometheus::Histogram> {
716        ExecTimeFuture {
717            fut: self.fut,
718            metric: histogram,
719            running_duration: self.running_duration,
720            filter: self.filter,
721        }
722    }
723
724    /// Sets the recored metric to be a [`prometheus::Counter`].
725    ///
726    /// ```text
727    /// my_future
728    ///     .exec_time()
729    ///     .inc_by(metrics.slow_queries.with_label_values(&["select"]))
730    /// ```
731    pub fn inc_by(self, counter: prometheus::Counter) -> ExecTimeFuture<F, prometheus::Counter> {
732        ExecTimeFuture {
733            fut: self.fut,
734            metric: counter,
735            running_duration: self.running_duration,
736            filter: self.filter,
737        }
738    }
739}
740
741impl<F, M> ExecTimeFuture<F, M> {
742    /// Specifies a filter which much return `true` for the execution time to be recorded.
743    pub fn with_filter(
744        mut self,
745        filter: impl FnMut(Duration) -> bool + Send + Sync + 'static,
746    ) -> Self {
747        self.filter = Some(Box::new(filter));
748        self
749    }
750}
751
752impl<F: Future, M: DurationMetric> Future for ExecTimeFuture<F, M> {
753    type Output = F::Output;
754
755    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
756        let this = self.project();
757
758        let start = Instant::now();
759        let result = this.fut.poll(cx);
760        let duration = Instant::now().duration_since(start);
761
762        *this.running_duration = this.running_duration.saturating_add(duration);
763
764        let result = match result {
765            Poll::Ready(result) => result,
766            Poll::Pending => return Poll::Pending,
767        };
768
769        let duration = *this.running_duration;
770        let pass = this
771            .filter
772            .as_mut()
773            .map(|filter| filter(duration))
774            .unwrap_or(true);
775        if pass {
776            this.metric.record(duration.as_secs_f64());
777        }
778
779        Poll::Ready(result)
780    }
781}
782
783/// A type level flag used to ensure callers specify the kind of metric to record for
784/// [`MetricsFutureExt`].
785///
786/// For example, `WallTimeFuture<F, M>` only implements [`Future`] for `M` that implements
787/// `DurationMetric` which [`UnspecifiedMetric`] does not. This forces users at build time to
788/// call [`WallTimeFuture::observe`] or [`WallTimeFuture::inc_by`].
789#[derive(Debug)]
790pub struct UnspecifiedMetric(());
791
792/// A trait makes recording a duration generic over different prometheus metrics. This allows us to
793/// de-dupe the implemenation of [`Future`] for our wrapper Futures like [`WallTimeFuture`] and
794/// [`ExecTimeFuture`] over different kinds of prometheus metrics.
795trait DurationMetric {
796    fn record(&mut self, seconds: f64);
797}
798
799impl DurationMetric for prometheus::Histogram {
800    fn record(&mut self, seconds: f64) {
801        self.observe(seconds)
802    }
803}
804
805impl DurationMetric for prometheus::Counter {
806    fn record(&mut self, seconds: f64) {
807        self.inc_by(seconds)
808    }
809}
810
811// An implementation of `DurationMetric` that lets the user take the recorded
812// value and use it elsewhere.
813impl DurationMetric for &'_ mut f64 {
814    fn record(&mut self, seconds: f64) {
815        **self = seconds;
816    }
817}
818
819/// Register the Tokio runtime's metrics in our metrics registry.
820#[cfg(feature = "async")]
821pub fn register_runtime_metrics(
822    name: &'static str,
823    runtime_metrics: tokio::runtime::RuntimeMetrics,
824    registry: &MetricsRegistry,
825) {
826    macro_rules! register {
827        ($method:ident, $doc:literal) => {
828            let metrics = runtime_metrics.clone();
829            registry.register_computed_gauge::<prometheus::core::AtomicU64>(
830                crate::metric!(
831                    name: concat!("mz_tokio_", stringify!($method)),
832                    help: $doc,
833                    const_labels: {"runtime" => name},
834                ),
835                move || <u64 as crate::cast::CastFrom<_>>::cast_from(metrics.$method()),
836            );
837        };
838    }
839
840    macro_rules! register_per_worker {
841        ($method:ident, $doc:literal) => {
842            let metrics = runtime_metrics.clone();
843            registry.register_computed_gauge::<prometheus::core::AtomicU64>(
844                crate::metric!(
845                    name: concat!("mz_tokio_", stringify!($method)),
846                    help: $doc,
847                    const_labels: {"runtime" => name},
848                ),
849                move || {
850                    (0..metrics.num_workers())
851                        .map(|i| <u64 as crate::cast::CastFrom<_>>::cast_from(metrics.$method(i)))
852                        .sum::<u64>()
853                },
854            );
855        };
856    }
857
858    macro_rules! register_per_worker_duration_secs {
859        ($method:ident, $doc:literal) => {
860            let metrics = runtime_metrics.clone();
861            registry.register_computed_gauge::<prometheus::core::AtomicF64>(
862                crate::metric!(
863                    name: concat!("mz_tokio_", stringify!($method)),
864                    help: $doc,
865                    const_labels: {"runtime" => name},
866                ),
867                move || {
868                    (0..metrics.num_workers())
869                        .map(|i| metrics.$method(i).as_secs_f64())
870                        .sum::<f64>()
871                },
872            );
873        };
874    }
875
876    register!(
877        num_workers,
878        "The number of worker threads used by the runtime."
879    );
880    register!(
881        num_alive_tasks,
882        "The current number of alive tasks in the runtime."
883    );
884    register!(
885        global_queue_depth,
886        "The number of tasks currently scheduled in the runtime's global queue."
887    );
888    register_per_worker_duration_secs!(
889        worker_total_busy_duration,
890        "The amount of time the worker threads have been busy, in seconds."
891    );
892    register_per_worker!(
893        worker_park_count,
894        "The total number of times the worker threads have parked."
895    );
896    register_per_worker!(
897        worker_park_unpark_count,
898        "The total number of times the worker threads have parked and unparked."
899    );
900
901    #[cfg(tokio_unstable)]
902    {
903        register!(
904            num_blocking_threads,
905            "The number of additional threads spawned by the runtime."
906        );
907        register!(
908            num_idle_blocking_threads,
909            "The number of idle threads which have spawned by the runtime for spawn_blocking calls."
910        );
911        register_per_worker!(
912            worker_local_queue_depth,
913            "The number of tasks currently scheduled in the workers' local queues."
914        );
915        register!(
916            blocking_queue_depth,
917            "The number of tasks currently scheduled in the blocking thread pool, spawned using spawn_blocking."
918        );
919        register!(
920            spawned_tasks_count,
921            "The number of tasks spawned in this runtime since it was created."
922        );
923        register!(
924            remote_schedule_count,
925            "The number of tasks scheduled from outside of the runtime."
926        );
927        register!(
928            budget_forced_yield_count,
929            "The number of times that tasks have been forced to yield back to the scheduler after exhausting their task budgets."
930        );
931        register_per_worker!(
932            worker_noop_count,
933            "The number of times the given worker thread unparked but performed no work before parking again."
934        );
935        register_per_worker!(
936            worker_steal_count,
937            "The number of tasks the given worker thread stole from another worker thread."
938        );
939        register_per_worker!(
940            worker_steal_operations,
941            "The number of times the given worker thread stole tasks from another worker thread."
942        );
943        register_per_worker!(
944            worker_poll_count,
945            "The number of tasks the given worker thread has polled."
946        );
947        register_per_worker!(
948            worker_local_schedule_count,
949            "The number of tasks scheduled from within the runtime on the given worker's local queue."
950        );
951        register_per_worker!(
952            worker_overflow_count,
953            "The number of times the given worker thread saturated its local queue."
954        );
955        register_per_worker_duration_secs!(
956            worker_mean_poll_time,
957            "The mean duration of task polls in seconds."
958        );
959    }
960}
961
962/// Returns the `(name, help, labels, source)` of every Tokio runtime metric
963/// registered by [`register_runtime_metrics`].
964#[cfg(feature = "async")]
965pub fn describe_runtime_metrics() -> Vec<(String, String, Vec<String>, &'static str)> {
966    // A current-thread runtime is enough to enumerate the metrics; we only read
967    // their names, help text, and labels, never their values.
968    let runtime = tokio::runtime::Builder::new_current_thread()
969        .build()
970        .expect("building a current-thread runtime");
971    let registry = MetricsRegistry::new();
972    register_runtime_metrics("describe", runtime.handle().metrics(), &registry);
973    registry
974        .gather()
975        .into_iter()
976        .map(|mf| {
977            // Every series in a family shares the same label keys, so the first
978            // metric's labels are representative.
979            let mut labels: Vec<String> = mf
980                .get_metric()
981                .first()
982                .map(|m| m.get_label().iter().map(|l| l.name().to_owned()).collect())
983                .unwrap_or_default();
984            labels.sort();
985            labels.dedup();
986            (mf.name().to_owned(), mf.help().to_owned(), labels, file!())
987        })
988        .collect()
989}
990
991#[cfg(test)]
992mod tests {
993    use std::time::Duration;
994
995    use prometheus::{CounterVec, HistogramVec};
996
997    use crate::stats::histogram_seconds_buckets;
998
999    use super::{MetricsFutureExt, MetricsRegistry};
1000
1001    struct Metrics {
1002        pub wall_time_hist: HistogramVec,
1003        pub wall_time_cnt: CounterVec,
1004        pub exec_time_hist: HistogramVec,
1005        pub exec_time_cnt: CounterVec,
1006    }
1007
1008    impl Metrics {
1009        pub fn register_into(registry: &MetricsRegistry) -> Self {
1010            Self {
1011                wall_time_hist: registry.register(metric!(
1012                    name: "wall_time_hist",
1013                    help: "help",
1014                    var_labels: ["action"],
1015                    buckets: histogram_seconds_buckets(0.000_128, 8.0),
1016                )),
1017                wall_time_cnt: registry.register(metric!(
1018                    name: "wall_time_cnt",
1019                    help: "help",
1020                    var_labels: ["action"],
1021                )),
1022                exec_time_hist: registry.register(metric!(
1023                    name: "exec_time_hist",
1024                    help: "help",
1025                    var_labels: ["action"],
1026                    buckets: histogram_seconds_buckets(0.000_128, 8.0),
1027                )),
1028                exec_time_cnt: registry.register(metric!(
1029                    name: "exec_time_cnt",
1030                    help: "help",
1031                    var_labels: ["action"],
1032                )),
1033            }
1034        }
1035    }
1036
1037    #[crate::test]
1038    #[cfg_attr(miri, ignore)] // unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance`
1039    fn smoke_test_metrics_future_ext() {
1040        let runtime = tokio::runtime::Builder::new_current_thread()
1041            .enable_time()
1042            .build()
1043            .expect("failed to start runtime");
1044        let registry = MetricsRegistry::new();
1045        let metrics = Metrics::register_into(&registry);
1046
1047        // Record the walltime and execution time of an async sleep.
1048        let async_sleep_future = async {
1049            tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
1050        };
1051        runtime.block_on(
1052            async_sleep_future
1053                .wall_time()
1054                .observe(metrics.wall_time_hist.with_label_values(&["async_sleep_w"]))
1055                .exec_time()
1056                .observe(metrics.exec_time_hist.with_label_values(&["async_sleep_e"])),
1057        );
1058
1059        let reports = registry.gather();
1060
1061        let exec_family = reports
1062            .iter()
1063            .find(|m| m.name() == "exec_time_hist")
1064            .expect("metric not found");
1065        let exec_metric = exec_family.get_metric();
1066        assert_eq!(exec_metric.len(), 1);
1067        assert_eq!(exec_metric[0].get_label()[0].value(), "async_sleep_e");
1068
1069        let exec_histogram = exec_metric[0].get_histogram();
1070        assert_eq!(exec_histogram.get_sample_count(), 1);
1071        // This future will normally complete very quickly, but it's hard to guarantee any particular
1072        // timing in an arbitrary test environment, so we don't assert on it here.
1073
1074        let wall_family = reports
1075            .iter()
1076            .find(|m| m.name() == "wall_time_hist")
1077            .expect("metric not found");
1078        let wall_metric = wall_family.get_metric();
1079        assert_eq!(wall_metric.len(), 1);
1080        assert_eq!(wall_metric[0].get_label()[0].value(), "async_sleep_w");
1081
1082        let wall_histogram = wall_metric[0].get_histogram();
1083        assert_eq!(wall_histogram.get_sample_count(), 1);
1084        // The 13th bucket is 512ms, which the wall time should be longer than, but is also much
1085        // faster than the actual execution time of the async sleep.
1086        assert_eq!(wall_histogram.get_bucket()[12].cumulative_count(), 0);
1087
1088        // Reset the registery to make collecting metrics easier.
1089        let registry = MetricsRegistry::new();
1090        let metrics = Metrics::register_into(&registry);
1091
1092        // Record the walltime and execution time of a thread sleep.
1093        let thread_sleep_future = async {
1094            std::thread::sleep(std::time::Duration::from_secs(1));
1095        };
1096        runtime.block_on(
1097            thread_sleep_future
1098                .wall_time()
1099                .with_filter(|duration| duration < Duration::from_millis(10))
1100                .inc_by(metrics.wall_time_cnt.with_label_values(&["thread_sleep_w"]))
1101                .exec_time()
1102                .inc_by(metrics.exec_time_cnt.with_label_values(&["thread_sleep_e"])),
1103        );
1104
1105        let reports = registry.gather();
1106
1107        let exec_family = reports
1108            .iter()
1109            .find(|m| m.name() == "exec_time_cnt")
1110            .expect("metric not found");
1111        let exec_metric = exec_family.get_metric();
1112        assert_eq!(exec_metric.len(), 1);
1113        assert_eq!(exec_metric[0].get_label()[0].value(), "thread_sleep_e");
1114
1115        let exec_counter = exec_metric[0].get_counter();
1116        // Since we're synchronously sleeping the execution time will be long.
1117        assert!(exec_counter.value() >= 1.0);
1118
1119        let wall_family = reports
1120            .iter()
1121            .find(|m| m.name() == "wall_time_cnt")
1122            .expect("metric not found");
1123        let wall_metric = wall_family.get_metric();
1124        assert_eq!(wall_metric.len(), 1);
1125
1126        let wall_counter = wall_metric[0].get_counter();
1127        // We filtered wall time to < 10ms, so our wall time metric should be filtered out.
1128        assert_eq!(wall_counter.value(), 0.0);
1129    }
1130}