Skip to main content

mz_ore/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Metrics for materialize systems.
17//!
18//! The idea here is that each subsystem keeps its metrics in a scoped-to-it struct, which gets
19//! registered (once) to the server's (or a test's) prometheus registry.
20//!
21//! Instead of using prometheus's (very verbose) metrics definitions, we rely on type inference to
22//! reduce the verbosity a little bit. A typical subsystem will look like the following:
23//!
24//! ```rust
25//! # use mz_ore::metrics::{MetricsRegistry, IntCounter};
26//! # use mz_ore::metric;
27//! #[derive(Debug, Clone)] // Note that prometheus metrics can safely be cloned
28//! struct Metrics {
29//!     pub bytes_sent: IntCounter,
30//! }
31//!
32//! impl Metrics {
33//!     pub fn register_into(registry: &MetricsRegistry) -> Metrics {
34//!         Metrics {
35//!             bytes_sent: registry.register(metric!(
36//!                 name: "mz_pg_sent_bytes",
37//!                 help: "total number of bytes sent here",
38//!             )),
39//!         }
40//!     }
41//! }
42//! ```
43
44use std::fmt;
45use std::fmt::{Debug, Formatter};
46use std::future::Future;
47use std::pin::Pin;
48use std::sync::{Arc, Mutex};
49use std::task::{Context, Poll};
50use std::time::{Duration, Instant};
51
52use derivative::Derivative;
53use pin_project::pin_project;
54use prometheus::core::{
55    Atomic, AtomicF64, AtomicI64, AtomicU64, Collector, Desc, GenericCounter, GenericCounterVec,
56    GenericGauge, GenericGaugeVec,
57};
58use prometheus::proto::MetricFamily;
59use prometheus::{HistogramOpts, Registry};
60
61mod delete_on_drop;
62
63pub use delete_on_drop::*;
64pub use prometheus::Opts as PrometheusOpts;
65
66/// Define a metric for use in materialize.
67#[macro_export]
68macro_rules! metric {
69    (
70        name: $name:expr,
71        help: $help:expr
72        $(, subsystem: $subsystem_name:expr)?
73        $(, const_labels: { $($cl_key:expr => $cl_value:expr ),* })?
74        $(, var_labels: [ $($vl_name:expr),* ])?
75        $(, buckets: $bk_name:expr)?
76        $(, visibility: $visibility:expr)?
77        $(,)?
78    ) => {{
79        let const_labels = (&[
80            $($(
81                ($cl_key.to_string(), $cl_value.to_string()),
82            )*)?
83        ]).into_iter().cloned().collect();
84        let var_labels = vec![
85            $(
86                $($vl_name.into(),)*
87            )?];
88        #[allow(unused_mut)]
89        let mut mk_opts = $crate::metrics::MakeCollectorOpts {
90            opts: $crate::metrics::PrometheusOpts::new($name, $help)
91                $(.subsystem( $subsystem_name ))?
92                .const_labels(const_labels)
93                .variable_labels(var_labels),
94            buckets: None,
95        };
96        // Set buckets if passed
97        $(mk_opts.buckets = Some($bk_name);)*
98        // `visibility` is documentation metadata for the metrics catalog
99        // (`bin/gen-metrics-catalog`).
100        // It has no runtime effect; we only type-check it here so a bad value is
101        // a compile error rather than silently ignored.
102        $(let _: $crate::metrics::MetricVisibility = $visibility;)?
103        mk_opts
104    }}
105}
106
107/// Options for MakeCollector. This struct should be instantiated using the metric macro.
108#[derive(Debug, Clone)]
109pub struct MakeCollectorOpts {
110    /// Common Prometheus options
111    pub opts: PrometheusOpts,
112    /// Buckets to be used with Histogram and HistogramVec. Must be set to create Histogram types
113    /// and must not be set for other types.
114    pub buckets: Option<Vec<f64>>,
115}
116
117/// This is documentation metadata: it is set via the optional `visibility:`
118/// field of [`metric!`] and consumed by the metrics catalog
119/// (`bin/gen-metrics-catalog`), which reads it from the source tree to produce
120/// the user-facing metrics reference. It has no effect on the metric at
121/// runtime.
122#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Serialize)]
123#[serde(rename_all = "snake_case")]
124pub enum MetricVisibility {
125    /// A metric we use for internal development.
126    #[default]
127    Internal,
128    /// A metric we want customers to build dashboards and
129    /// alerts on. We do not guarantee stability for this group
130    /// of metrics.
131    Public,
132}
133
134/// The materialize metrics registry.
135#[derive(Clone, Derivative)]
136#[derivative(Debug)]
137pub struct MetricsRegistry {
138    inner: Registry,
139    #[derivative(Debug = "ignore")]
140    postprocessors: Arc<Mutex<Vec<Box<dyn FnMut(&mut Vec<MetricFamily>) + Send + Sync>>>>,
141}
142
143/// A wrapper for metrics to require delete on drop semantics
144///
145/// The wrapper behaves like regular metrics but only provides functions to create delete-on-drop
146/// variants. This way, no metrics of this type can be leaked.
147///
148/// In situations where the delete-on-drop behavior is not desired or in legacy code, use the raw
149/// variants of the metrics, as defined in [self::raw].
150#[derive(Clone)]
151pub struct DeleteOnDropWrapper<M> {
152    inner: M,
153}
154
155impl<M: MakeCollector + Debug> Debug for DeleteOnDropWrapper<M> {
156    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
157        self.inner.fmt(f)
158    }
159}
160
161impl<M: Collector> Collector for DeleteOnDropWrapper<M> {
162    fn desc(&self) -> Vec<&Desc> {
163        self.inner.desc()
164    }
165
166    fn collect(&self) -> Vec<MetricFamily> {
167        self.inner.collect()
168    }
169}
170
171impl<M: MakeCollector> MakeCollector for DeleteOnDropWrapper<M> {
172    fn make_collector(opts: MakeCollectorOpts) -> Self {
173        DeleteOnDropWrapper {
174            inner: M::make_collector(opts),
175        }
176    }
177}
178
179impl<M: MetricVecExt> DeleteOnDropWrapper<M> {
180    /// Returns a metric that deletes its labels from this metrics vector when dropped.
181    pub fn get_delete_on_drop_metric<L: PromLabelsExt>(
182        &self,
183        labels: L,
184    ) -> DeleteOnDropMetric<M, L> {
185        self.inner.get_delete_on_drop_metric(labels)
186    }
187}
188
189/// The unsigned integer version of [`Gauge`]. Provides better performance if
190/// metric values are all unsigned integers.
191pub type UIntGauge = GenericGauge<AtomicU64>;
192
193/// Delete-on-drop shadow of Prometheus [prometheus::CounterVec].
194pub type CounterVec = DeleteOnDropWrapper<prometheus::CounterVec>;
195/// Delete-on-drop shadow of Prometheus [prometheus::Gauge].
196pub type Gauge = DeleteOnDropWrapper<prometheus::Gauge>;
197/// Delete-on-drop shadow of Prometheus [prometheus::GaugeVec].
198pub type GaugeVec = DeleteOnDropWrapper<prometheus::GaugeVec>;
199/// Delete-on-drop shadow of Prometheus [prometheus::HistogramVec].
200pub type HistogramVec = DeleteOnDropWrapper<prometheus::HistogramVec>;
201/// Delete-on-drop shadow of Prometheus [prometheus::IntCounterVec].
202pub type IntCounterVec = DeleteOnDropWrapper<prometheus::IntCounterVec>;
203/// Delete-on-drop shadow of Prometheus [prometheus::IntGaugeVec].
204pub type IntGaugeVec = DeleteOnDropWrapper<prometheus::IntGaugeVec>;
205/// Delete-on-drop shadow of Prometheus [raw::UIntGaugeVec].
206pub type UIntGaugeVec = DeleteOnDropWrapper<raw::UIntGaugeVec>;
207
208use crate::assert_none;
209
210pub use prometheus::{Counter, Histogram, IntCounter, IntGauge};
211
212/// Access to non-delete-on-drop vector types
213pub mod raw {
214    use prometheus::core::{AtomicU64, GenericGaugeVec};
215
216    /// The unsigned integer version of [`GaugeVec`].
217    /// Provides better performance if metric values are all unsigned integers.
218    pub type UIntGaugeVec = GenericGaugeVec<AtomicU64>;
219
220    pub use prometheus::{CounterVec, Gauge, GaugeVec, HistogramVec, IntCounterVec, IntGaugeVec};
221}
222
223impl MetricsRegistry {
224    /// Creates a new metrics registry.
225    pub fn new() -> Self {
226        MetricsRegistry {
227            inner: Registry::new(),
228            postprocessors: Arc::new(Mutex::new(vec![])),
229        }
230    }
231
232    /// Register a metric defined with the [`metric`] macro.
233    pub fn register<M>(&self, opts: MakeCollectorOpts) -> M
234    where
235        M: MakeCollector,
236    {
237        let collector = M::make_collector(opts);
238        self.inner.register(Box::new(collector.clone())).unwrap();
239        collector
240    }
241
242    /// Registers a gauge whose value is computed when observed.
243    pub fn register_computed_gauge<P>(
244        &self,
245        opts: MakeCollectorOpts,
246        f: impl Fn() -> P::T + Send + Sync + 'static,
247    ) -> ComputedGenericGauge<P>
248    where
249        P: Atomic + 'static,
250    {
251        let gauge = ComputedGenericGauge {
252            gauge: GenericGauge::make_collector(opts),
253            f: Arc::new(f),
254        };
255        self.inner.register(Box::new(gauge.clone())).unwrap();
256        gauge
257    }
258
259    /// Register a pre-defined prometheus collector.
260    pub fn register_collector<C: 'static + prometheus::core::Collector>(&self, collector: C) {
261        self.inner
262            .register(Box::new(collector))
263            .expect("registering pre-defined metrics collector");
264    }
265
266    /// Registers a metric postprocessor.
267    ///
268    /// Postprocessors are invoked on every call to [`MetricsRegistry::gather`]
269    /// in the order that they are registered.
270    pub fn register_postprocessor<F>(&self, f: F)
271    where
272        F: FnMut(&mut Vec<MetricFamily>) + Send + Sync + 'static,
273    {
274        let mut postprocessors = self.postprocessors.lock().expect("lock poisoned");
275        postprocessors.push(Box::new(f));
276    }
277
278    /// Gather all the metrics from the metrics registry for reporting.
279    ///
280    /// This function invokes the postprocessors on all gathered metrics (see
281    /// [`MetricsRegistry::register_postprocessor`]) in the order the
282    /// postprocessors were registered.
283    ///
284    /// See also [`prometheus::Registry::gather`].
285    pub fn gather(&self) -> Vec<MetricFamily> {
286        let mut metrics = self.inner.gather();
287        let mut postprocessors = self.postprocessors.lock().expect("lock poisoned");
288        for postprocessor in &mut *postprocessors {
289            postprocessor(&mut metrics);
290        }
291        metrics
292    }
293}
294
295/// A wrapper for creating prometheus metrics more conveniently.
296///
297/// Together with the [`metric`] macro, this trait is mainly used by [`MetricsRegistry`] and should
298/// not normally be used outside the metric registration flow.
299pub trait MakeCollector: Collector + Clone + 'static {
300    /// Creates a new collector.
301    fn make_collector(opts: MakeCollectorOpts) -> Self;
302}
303
304impl<T> MakeCollector for GenericCounter<T>
305where
306    T: Atomic + 'static,
307{
308    fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
309        assert_none!(mk_opts.buckets);
310        Self::with_opts(mk_opts.opts).expect("defining a counter")
311    }
312}
313
314impl<T> MakeCollector for GenericCounterVec<T>
315where
316    T: Atomic + 'static,
317{
318    fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
319        assert_none!(mk_opts.buckets);
320        let labels: Vec<String> = mk_opts.opts.variable_labels.clone();
321        let label_refs: Vec<&str> = labels.iter().map(String::as_str).collect();
322        Self::new(mk_opts.opts, label_refs.as_slice()).expect("defining a counter vec")
323    }
324}
325
326impl<T> MakeCollector for GenericGauge<T>
327where
328    T: Atomic + 'static,
329{
330    fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
331        assert_none!(mk_opts.buckets);
332        Self::with_opts(mk_opts.opts).expect("defining a gauge")
333    }
334}
335
336impl<T> MakeCollector for GenericGaugeVec<T>
337where
338    T: Atomic + 'static,
339{
340    fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
341        assert_none!(mk_opts.buckets);
342        let labels = mk_opts.opts.variable_labels.clone();
343        let labels = &labels.iter().map(|x| x.as_str()).collect::<Vec<_>>();
344        Self::new(mk_opts.opts, labels).expect("defining a gauge vec")
345    }
346}
347
348impl MakeCollector for Histogram {
349    fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
350        assert!(mk_opts.buckets.is_some());
351        Self::with_opts(HistogramOpts {
352            common_opts: mk_opts.opts,
353            buckets: mk_opts.buckets.unwrap(),
354        })
355        .expect("defining a histogram")
356    }
357}
358
359impl MakeCollector for raw::HistogramVec {
360    fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
361        assert!(mk_opts.buckets.is_some());
362        let labels = mk_opts.opts.variable_labels.clone();
363        let labels = &labels.iter().map(|x| x.as_str()).collect::<Vec<_>>();
364        Self::new(
365            HistogramOpts {
366                common_opts: mk_opts.opts,
367                buckets: mk_opts.buckets.unwrap(),
368            },
369            labels,
370        )
371        .expect("defining a histogram vec")
372    }
373}
374
375/// A [`Gauge`] whose value is computed whenever it is observed.
376pub struct ComputedGenericGauge<P>
377where
378    P: Atomic,
379{
380    gauge: GenericGauge<P>,
381    f: Arc<dyn Fn() -> P::T + Send + Sync>,
382}
383
384impl<P> fmt::Debug for ComputedGenericGauge<P>
385where
386    P: Atomic + fmt::Debug,
387{
388    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
389        f.debug_struct("ComputedGenericGauge")
390            .field("gauge", &self.gauge)
391            .finish_non_exhaustive()
392    }
393}
394
395impl<P> Clone for ComputedGenericGauge<P>
396where
397    P: Atomic,
398{
399    fn clone(&self) -> ComputedGenericGauge<P> {
400        ComputedGenericGauge {
401            gauge: self.gauge.clone(),
402            f: Arc::clone(&self.f),
403        }
404    }
405}
406
407impl<T> Collector for ComputedGenericGauge<T>
408where
409    T: Atomic,
410{
411    fn desc(&self) -> Vec<&prometheus::core::Desc> {
412        self.gauge.desc()
413    }
414
415    fn collect(&self) -> Vec<MetricFamily> {
416        self.gauge.set((self.f)());
417        self.gauge.collect()
418    }
419}
420
421impl<P> ComputedGenericGauge<P>
422where
423    P: Atomic,
424{
425    /// Computes the current value of the gauge.
426    pub fn get(&self) -> P::T {
427        (self.f)()
428    }
429}
430
431/// A [`ComputedGenericGauge`] for 64-bit floating point numbers.
432pub type ComputedGauge = ComputedGenericGauge<AtomicF64>;
433
434/// A [`ComputedGenericGauge`] for 64-bit signed integers.
435pub type ComputedIntGauge = ComputedGenericGauge<AtomicI64>;
436
437/// A [`ComputedGenericGauge`] for 64-bit unsigned integers.
438pub type ComputedUIntGauge = ComputedGenericGauge<AtomicU64>;
439
440/// Exposes combinators that report metrics related to the execution of a [`Future`] to prometheus.
441pub trait MetricsFutureExt<F> {
442    /// Records the number of seconds it takes a [`Future`] to complete according to "the clock on
443    /// the wall".
444    ///
445    /// More specifically, it records the instant at which the `Future` was first polled, and the
446    /// instant at which the `Future` completes. Then reports the duration between those two
447    /// instances to the provided metric.
448    ///
449    /// # Wall Time vs Execution Time
450    ///
451    /// There is also [`MetricsFutureExt::exec_time`], which measures how long a [`Future`] spent
452    /// executing, instead of how long it took to complete. For example, a network request may have
453    /// a wall time of 1 second, meanwhile it's execution time may have only been 50ms. The 950ms
454    /// delta would be how long the [`Future`] waited for a response from the network.
455    ///
456    /// # Uses
457    ///
458    /// Recording the wall time can be useful for monitoring latency, for example the latency of a
459    /// SQL request.
460    ///
461    /// Note: You must call either [`observe`] to record the execution time to a [`Histogram`] or
462    /// [`inc_by`] to record to a [`Counter`]. The following will not compile:
463    ///
464    /// ```compile_fail
465    /// use mz_ore::metrics::MetricsFutureExt;
466    ///
467    /// # let _ = async {
468    /// async { Ok(()) }
469    ///     .wall_time()
470    ///     .await;
471    /// # };
472    /// ```
473    ///
474    /// [`observe`]: WallTimeFuture::observe
475    /// [`inc_by`]: WallTimeFuture::inc_by
476    fn wall_time(self) -> WallTimeFuture<F, UnspecifiedMetric>;
477
478    /// Records the total number of seconds for which a [`Future`] was executing.
479    ///
480    /// More specifically, every time the `Future` is polled it records how long that individual
481    /// call took, and maintains a running sum until the `Future` completes. Then we report that
482    /// duration to the provided metric.
483    ///
484    /// # Wall Time vs Execution Time
485    ///
486    /// There is also [`MetricsFutureExt::wall_time`], which measures how long a [`Future`] took to
487    /// complete, instead of how long it spent executing. For example, a network request may have
488    /// a wall time of 1 second, meanwhile it's execution time may have only been 50ms. The 950ms
489    /// delta would be how long the [`Future`] waited for a response from the network.
490    ///
491    /// # Uses
492    ///
493    /// Recording execution time can be useful if you want to monitor [`Future`]s that could be
494    /// sensitive to CPU usage. For example, if you have a single logical control thread you'll
495    /// want to make sure that thread never spends too long running a single `Future`. Reporting
496    /// the execution time of `Future`s running on this thread can help ensure there is no
497    /// unexpected blocking.
498    ///
499    /// Note: You must call either [`observe`] to record the execution time to a [`Histogram`] or
500    /// [`inc_by`] to record to a [`Counter`]. The following will not compile:
501    ///
502    /// ```compile_fail
503    /// use mz_ore::metrics::MetricsFutureExt;
504    ///
505    /// # let _ = async {
506    /// async { Ok(()) }
507    ///     .exec_time()
508    ///     .await;
509    /// # };
510    /// ```
511    ///
512    /// [`observe`]: ExecTimeFuture::observe
513    /// [`inc_by`]: ExecTimeFuture::inc_by
514    fn exec_time(self) -> ExecTimeFuture<F, UnspecifiedMetric>;
515}
516
517impl<F: Future> MetricsFutureExt<F> for F {
518    fn wall_time(self) -> WallTimeFuture<F, UnspecifiedMetric> {
519        WallTimeFuture {
520            fut: self,
521            metric: UnspecifiedMetric(()),
522            start: None,
523            filter: None,
524        }
525    }
526
527    fn exec_time(self) -> ExecTimeFuture<F, UnspecifiedMetric> {
528        ExecTimeFuture {
529            fut: self,
530            metric: UnspecifiedMetric(()),
531            running_duration: Duration::from_millis(0),
532            filter: None,
533        }
534    }
535}
536
537/// Future returned by [`MetricsFutureExt::wall_time`].
538#[must_use = "futures do nothing unless you `.await` or poll them"]
539#[pin_project]
540pub struct WallTimeFuture<F, Metric> {
541    /// The inner [`Future`] that we're recording the wall time for.
542    #[pin]
543    fut: F,
544    /// Prometheus metric that we'll report to.
545    metric: Metric,
546    /// [`Instant`] at which the [`Future`] was first polled.
547    start: Option<Instant>,
548    /// Optional filter that determines if we observe the wall time of this [`Future`].
549    filter: Option<Box<dyn FnMut(Duration) -> bool + Send + Sync>>,
550}
551
552impl<F: Debug, M: Debug> fmt::Debug for WallTimeFuture<F, M> {
553    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
554        f.debug_struct("WallTimeFuture")
555            .field("fut", &self.fut)
556            .field("metric", &self.metric)
557            .field("start", &self.start)
558            .field("filter", &self.filter.is_some())
559            .finish()
560    }
561}
562
563impl<F> WallTimeFuture<F, UnspecifiedMetric> {
564    /// Sets the recored metric to be a [`prometheus::Histogram`].
565    ///
566    /// ```text
567    /// my_future
568    ///     .wall_time()
569    ///     .observe(metrics.slow_queries_hist.with_label_values(&["select"]))
570    /// ```
571    pub fn observe(
572        self,
573        histogram: prometheus::Histogram,
574    ) -> WallTimeFuture<F, prometheus::Histogram> {
575        WallTimeFuture {
576            fut: self.fut,
577            metric: histogram,
578            start: self.start,
579            filter: self.filter,
580        }
581    }
582
583    /// Sets the recored metric to be a [`prometheus::Counter`].
584    ///
585    /// ```text
586    /// my_future
587    ///     .wall_time()
588    ///     .inc_by(metrics.slow_queries.with_label_values(&["select"]))
589    /// ```
590    pub fn inc_by(self, counter: prometheus::Counter) -> WallTimeFuture<F, prometheus::Counter> {
591        WallTimeFuture {
592            fut: self.fut,
593            metric: counter,
594            start: self.start,
595            filter: self.filter,
596        }
597    }
598
599    /// Sets the recorded duration in a specific f64.
600    pub fn set_at(self, place: &mut f64) -> WallTimeFuture<F, &mut f64> {
601        WallTimeFuture {
602            fut: self.fut,
603            metric: place,
604            start: self.start,
605            filter: self.filter,
606        }
607    }
608}
609
610impl<F, M> WallTimeFuture<F, M> {
611    /// Specifies a filter which much return `true` for the wall time to be recorded.
612    ///
613    /// This can be particularly useful if you have a high volume `Future` and you only want to
614    /// record ones that take a long time to complete.
615    pub fn with_filter(
616        mut self,
617        filter: impl FnMut(Duration) -> bool + Send + Sync + 'static,
618    ) -> Self {
619        self.filter = Some(Box::new(filter));
620        self
621    }
622}
623
624impl<F: Future, M: DurationMetric> Future for WallTimeFuture<F, M> {
625    type Output = F::Output;
626
627    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
628        let this = self.project();
629
630        if this.start.is_none() {
631            *this.start = Some(Instant::now());
632        }
633
634        let result = match this.fut.poll(cx) {
635            Poll::Ready(r) => r,
636            Poll::Pending => return Poll::Pending,
637        };
638        let duration = Instant::now().duration_since(this.start.expect("timer to be started"));
639
640        let pass = this
641            .filter
642            .as_mut()
643            .map(|filter| filter(duration))
644            .unwrap_or(true);
645        if pass {
646            this.metric.record(duration.as_secs_f64())
647        }
648
649        Poll::Ready(result)
650    }
651}
652
653/// Future returned by [`MetricsFutureExt::exec_time`].
654#[must_use = "futures do nothing unless you `.await` or poll them"]
655#[pin_project]
656pub struct ExecTimeFuture<F, Metric> {
657    /// The inner [`Future`] that we're recording the wall time for.
658    #[pin]
659    fut: F,
660    /// Prometheus metric that we'll report to.
661    metric: Metric,
662    /// Total [`Duration`] for which this [`Future`] has been executing.
663    running_duration: Duration,
664    /// Optional filter that determines if we observe the execution time of this [`Future`].
665    filter: Option<Box<dyn FnMut(Duration) -> bool + Send + Sync>>,
666}
667
668impl<F: Debug, M: Debug> fmt::Debug for ExecTimeFuture<F, M> {
669    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
670        f.debug_struct("ExecTimeFuture")
671            .field("fut", &self.fut)
672            .field("metric", &self.metric)
673            .field("running_duration", &self.running_duration)
674            .field("filter", &self.filter.is_some())
675            .finish()
676    }
677}
678
679impl<F> ExecTimeFuture<F, UnspecifiedMetric> {
680    /// Sets the recored metric to be a [`prometheus::Histogram`].
681    ///
682    /// ```text
683    /// my_future
684    ///     .exec_time()
685    ///     .observe(metrics.slow_queries_hist.with_label_values(&["select"]))
686    /// ```
687    pub fn observe(
688        self,
689        histogram: prometheus::Histogram,
690    ) -> ExecTimeFuture<F, prometheus::Histogram> {
691        ExecTimeFuture {
692            fut: self.fut,
693            metric: histogram,
694            running_duration: self.running_duration,
695            filter: self.filter,
696        }
697    }
698
699    /// Sets the recored metric to be a [`prometheus::Counter`].
700    ///
701    /// ```text
702    /// my_future
703    ///     .exec_time()
704    ///     .inc_by(metrics.slow_queries.with_label_values(&["select"]))
705    /// ```
706    pub fn inc_by(self, counter: prometheus::Counter) -> ExecTimeFuture<F, prometheus::Counter> {
707        ExecTimeFuture {
708            fut: self.fut,
709            metric: counter,
710            running_duration: self.running_duration,
711            filter: self.filter,
712        }
713    }
714}
715
716impl<F, M> ExecTimeFuture<F, M> {
717    /// Specifies a filter which much return `true` for the execution time to be recorded.
718    pub fn with_filter(
719        mut self,
720        filter: impl FnMut(Duration) -> bool + Send + Sync + 'static,
721    ) -> Self {
722        self.filter = Some(Box::new(filter));
723        self
724    }
725}
726
727impl<F: Future, M: DurationMetric> Future for ExecTimeFuture<F, M> {
728    type Output = F::Output;
729
730    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
731        let this = self.project();
732
733        let start = Instant::now();
734        let result = this.fut.poll(cx);
735        let duration = Instant::now().duration_since(start);
736
737        *this.running_duration = this.running_duration.saturating_add(duration);
738
739        let result = match result {
740            Poll::Ready(result) => result,
741            Poll::Pending => return Poll::Pending,
742        };
743
744        let duration = *this.running_duration;
745        let pass = this
746            .filter
747            .as_mut()
748            .map(|filter| filter(duration))
749            .unwrap_or(true);
750        if pass {
751            this.metric.record(duration.as_secs_f64());
752        }
753
754        Poll::Ready(result)
755    }
756}
757
758/// A type level flag used to ensure callers specify the kind of metric to record for
759/// [`MetricsFutureExt`].
760///
761/// For example, `WallTimeFuture<F, M>` only implements [`Future`] for `M` that implements
762/// `DurationMetric` which [`UnspecifiedMetric`] does not. This forces users at build time to
763/// call [`WallTimeFuture::observe`] or [`WallTimeFuture::inc_by`].
764#[derive(Debug)]
765pub struct UnspecifiedMetric(());
766
767/// A trait makes recording a duration generic over different prometheus metrics. This allows us to
768/// de-dupe the implemenation of [`Future`] for our wrapper Futures like [`WallTimeFuture`] and
769/// [`ExecTimeFuture`] over different kinds of prometheus metrics.
770trait DurationMetric {
771    fn record(&mut self, seconds: f64);
772}
773
774impl DurationMetric for prometheus::Histogram {
775    fn record(&mut self, seconds: f64) {
776        self.observe(seconds)
777    }
778}
779
780impl DurationMetric for prometheus::Counter {
781    fn record(&mut self, seconds: f64) {
782        self.inc_by(seconds)
783    }
784}
785
786// An implementation of `DurationMetric` that lets the user take the recorded
787// value and use it elsewhere.
788impl DurationMetric for &'_ mut f64 {
789    fn record(&mut self, seconds: f64) {
790        **self = seconds;
791    }
792}
793
794/// Register the Tokio runtime's metrics in our metrics registry.
795#[cfg(feature = "async")]
796pub fn register_runtime_metrics(
797    name: &'static str,
798    runtime_metrics: tokio::runtime::RuntimeMetrics,
799    registry: &MetricsRegistry,
800) {
801    macro_rules! register {
802        ($method:ident, $doc:literal) => {
803            let metrics = runtime_metrics.clone();
804            registry.register_computed_gauge::<prometheus::core::AtomicU64>(
805                crate::metric!(
806                    name: concat!("mz_tokio_", stringify!($method)),
807                    help: $doc,
808                    const_labels: {"runtime" => name},
809                ),
810                move || <u64 as crate::cast::CastFrom<_>>::cast_from(metrics.$method()),
811            );
812        };
813    }
814
815    macro_rules! register_per_worker {
816        ($method:ident, $doc:literal) => {
817            let metrics = runtime_metrics.clone();
818            registry.register_computed_gauge::<prometheus::core::AtomicU64>(
819                crate::metric!(
820                    name: concat!("mz_tokio_", stringify!($method)),
821                    help: $doc,
822                    const_labels: {"runtime" => name},
823                ),
824                move || {
825                    (0..metrics.num_workers())
826                        .map(|i| <u64 as crate::cast::CastFrom<_>>::cast_from(metrics.$method(i)))
827                        .sum::<u64>()
828                },
829            );
830        };
831    }
832
833    macro_rules! register_per_worker_duration_secs {
834        ($method:ident, $doc:literal) => {
835            let metrics = runtime_metrics.clone();
836            registry.register_computed_gauge::<prometheus::core::AtomicF64>(
837                crate::metric!(
838                    name: concat!("mz_tokio_", stringify!($method)),
839                    help: $doc,
840                    const_labels: {"runtime" => name},
841                ),
842                move || {
843                    (0..metrics.num_workers())
844                        .map(|i| metrics.$method(i).as_secs_f64())
845                        .sum::<f64>()
846                },
847            );
848        };
849    }
850
851    register!(
852        num_workers,
853        "The number of worker threads used by the runtime."
854    );
855    register!(
856        num_alive_tasks,
857        "The current number of alive tasks in the runtime."
858    );
859    register!(
860        global_queue_depth,
861        "The number of tasks currently scheduled in the runtime's global queue."
862    );
863    register_per_worker_duration_secs!(
864        worker_total_busy_duration,
865        "The amount of time the worker threads have been busy, in seconds."
866    );
867    register_per_worker!(
868        worker_park_count,
869        "The total number of times the worker threads have parked."
870    );
871    register_per_worker!(
872        worker_park_unpark_count,
873        "The total number of times the worker threads have parked and unparked."
874    );
875
876    #[cfg(tokio_unstable)]
877    {
878        register!(
879            num_blocking_threads,
880            "The number of additional threads spawned by the runtime."
881        );
882        register!(
883            num_idle_blocking_threads,
884            "The number of idle threads which have spawned by the runtime for spawn_blocking calls."
885        );
886        register_per_worker!(
887            worker_local_queue_depth,
888            "The number of tasks currently scheduled in the workers' local queues."
889        );
890        register!(
891            blocking_queue_depth,
892            "The number of tasks currently scheduled in the blocking thread pool, spawned using spawn_blocking."
893        );
894        register!(
895            spawned_tasks_count,
896            "The number of tasks spawned in this runtime since it was created."
897        );
898        register!(
899            remote_schedule_count,
900            "The number of tasks scheduled from outside of the runtime."
901        );
902        register!(
903            budget_forced_yield_count,
904            "The number of times that tasks have been forced to yield back to the scheduler after exhausting their task budgets."
905        );
906        register_per_worker!(
907            worker_noop_count,
908            "The number of times the given worker thread unparked but performed no work before parking again."
909        );
910        register_per_worker!(
911            worker_steal_count,
912            "The number of tasks the given worker thread stole from another worker thread."
913        );
914        register_per_worker!(
915            worker_steal_operations,
916            "The number of times the given worker thread stole tasks from another worker thread."
917        );
918        register_per_worker!(
919            worker_poll_count,
920            "The number of tasks the given worker thread has polled."
921        );
922        register_per_worker!(
923            worker_local_schedule_count,
924            "The number of tasks scheduled from within the runtime on the given worker's local queue."
925        );
926        register_per_worker!(
927            worker_overflow_count,
928            "The number of times the given worker thread saturated its local queue."
929        );
930        register_per_worker_duration_secs!(
931            worker_mean_poll_time,
932            "The mean duration of task polls in seconds."
933        );
934    }
935}
936
937/// Returns the `(name, help, source)` of every Tokio runtime metric registered
938/// by [`register_runtime_metrics`].
939#[cfg(feature = "async")]
940pub fn describe_runtime_metrics() -> Vec<(String, String, &'static str)> {
941    // A current-thread runtime is enough to enumerate the metrics; we only read
942    // their names and help text, never their values.
943    let runtime = tokio::runtime::Builder::new_current_thread()
944        .build()
945        .expect("building a current-thread runtime");
946    let registry = MetricsRegistry::new();
947    register_runtime_metrics("describe", runtime.handle().metrics(), &registry);
948    registry
949        .gather()
950        .into_iter()
951        .map(|mf| (mf.name().to_owned(), mf.help().to_owned(), file!()))
952        .collect()
953}
954
955#[cfg(test)]
956mod tests {
957    use std::time::Duration;
958
959    use prometheus::{CounterVec, HistogramVec};
960
961    use crate::stats::histogram_seconds_buckets;
962
963    use super::{MetricsFutureExt, MetricsRegistry};
964
965    struct Metrics {
966        pub wall_time_hist: HistogramVec,
967        pub wall_time_cnt: CounterVec,
968        pub exec_time_hist: HistogramVec,
969        pub exec_time_cnt: CounterVec,
970    }
971
972    impl Metrics {
973        pub fn register_into(registry: &MetricsRegistry) -> Self {
974            Self {
975                wall_time_hist: registry.register(metric!(
976                    name: "wall_time_hist",
977                    help: "help",
978                    var_labels: ["action"],
979                    buckets: histogram_seconds_buckets(0.000_128, 8.0),
980                )),
981                wall_time_cnt: registry.register(metric!(
982                    name: "wall_time_cnt",
983                    help: "help",
984                    var_labels: ["action"],
985                )),
986                exec_time_hist: registry.register(metric!(
987                    name: "exec_time_hist",
988                    help: "help",
989                    var_labels: ["action"],
990                    buckets: histogram_seconds_buckets(0.000_128, 8.0),
991                )),
992                exec_time_cnt: registry.register(metric!(
993                    name: "exec_time_cnt",
994                    help: "help",
995                    var_labels: ["action"],
996                )),
997            }
998        }
999    }
1000
1001    #[crate::test]
1002    #[cfg_attr(miri, ignore)] // unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance`
1003    fn smoke_test_metrics_future_ext() {
1004        let runtime = tokio::runtime::Builder::new_current_thread()
1005            .enable_time()
1006            .build()
1007            .expect("failed to start runtime");
1008        let registry = MetricsRegistry::new();
1009        let metrics = Metrics::register_into(&registry);
1010
1011        // Record the walltime and execution time of an async sleep.
1012        let async_sleep_future = async {
1013            tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
1014        };
1015        runtime.block_on(
1016            async_sleep_future
1017                .wall_time()
1018                .observe(metrics.wall_time_hist.with_label_values(&["async_sleep_w"]))
1019                .exec_time()
1020                .observe(metrics.exec_time_hist.with_label_values(&["async_sleep_e"])),
1021        );
1022
1023        let reports = registry.gather();
1024
1025        let exec_family = reports
1026            .iter()
1027            .find(|m| m.name() == "exec_time_hist")
1028            .expect("metric not found");
1029        let exec_metric = exec_family.get_metric();
1030        assert_eq!(exec_metric.len(), 1);
1031        assert_eq!(exec_metric[0].get_label()[0].value(), "async_sleep_e");
1032
1033        let exec_histogram = exec_metric[0].get_histogram();
1034        assert_eq!(exec_histogram.get_sample_count(), 1);
1035        // This future will normally complete very quickly, but it's hard to guarantee any particular
1036        // timing in an arbitrary test environment, so we don't assert on it here.
1037
1038        let wall_family = reports
1039            .iter()
1040            .find(|m| m.name() == "wall_time_hist")
1041            .expect("metric not found");
1042        let wall_metric = wall_family.get_metric();
1043        assert_eq!(wall_metric.len(), 1);
1044        assert_eq!(wall_metric[0].get_label()[0].value(), "async_sleep_w");
1045
1046        let wall_histogram = wall_metric[0].get_histogram();
1047        assert_eq!(wall_histogram.get_sample_count(), 1);
1048        // The 13th bucket is 512ms, which the wall time should be longer than, but is also much
1049        // faster than the actual execution time of the async sleep.
1050        assert_eq!(wall_histogram.get_bucket()[12].cumulative_count(), 0);
1051
1052        // Reset the registery to make collecting metrics easier.
1053        let registry = MetricsRegistry::new();
1054        let metrics = Metrics::register_into(&registry);
1055
1056        // Record the walltime and execution time of a thread sleep.
1057        let thread_sleep_future = async {
1058            std::thread::sleep(std::time::Duration::from_secs(1));
1059        };
1060        runtime.block_on(
1061            thread_sleep_future
1062                .wall_time()
1063                .with_filter(|duration| duration < Duration::from_millis(10))
1064                .inc_by(metrics.wall_time_cnt.with_label_values(&["thread_sleep_w"]))
1065                .exec_time()
1066                .inc_by(metrics.exec_time_cnt.with_label_values(&["thread_sleep_e"])),
1067        );
1068
1069        let reports = registry.gather();
1070
1071        let exec_family = reports
1072            .iter()
1073            .find(|m| m.name() == "exec_time_cnt")
1074            .expect("metric not found");
1075        let exec_metric = exec_family.get_metric();
1076        assert_eq!(exec_metric.len(), 1);
1077        assert_eq!(exec_metric[0].get_label()[0].value(), "thread_sleep_e");
1078
1079        let exec_counter = exec_metric[0].get_counter();
1080        // Since we're synchronously sleeping the execution time will be long.
1081        assert!(exec_counter.value() >= 1.0);
1082
1083        let wall_family = reports
1084            .iter()
1085            .find(|m| m.name() == "wall_time_cnt")
1086            .expect("metric not found");
1087        let wall_metric = wall_family.get_metric();
1088        assert_eq!(wall_metric.len(), 1);
1089
1090        let wall_counter = wall_metric[0].get_counter();
1091        // We filtered wall time to < 10ms, so our wall time metric should be filtered out.
1092        assert_eq!(wall_counter.value(), 0.0);
1093    }
1094}