mz_cluster_client/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Metrics shared by both compute and storage.
11
12use mz_ore::cast::CastLossy;
13use mz_ore::metric;
14use mz_ore::metrics::{
15    CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, GaugeVec, IntCounterVec, MetricsRegistry,
16};
17use mz_ore::stats::SlidingMinMax;
18use prometheus::core::{AtomicF64, AtomicU64};
19
20/// Controller metrics.
21#[derive(Debug, Clone)]
22pub struct ControllerMetrics {
23    dataflow_wallclock_lag_seconds: GaugeVec,
24    dataflow_wallclock_lag_seconds_sum: CounterVec,
25    dataflow_wallclock_lag_seconds_count: IntCounterVec,
26}
27
28impl ControllerMetrics {
29    /// Create a metrics instance registered into the given registry.
30    pub fn new(metrics_registry: &MetricsRegistry) -> Self {
31        Self {
32            // The next three metrics immitate a summary metric type. The `prometheus` crate lacks
33            // support for summaries, so we roll our own. Note that we also only expose the 0- and
34            // the 1-quantile, i.e., minimum and maximum lag values.
35            dataflow_wallclock_lag_seconds: metrics_registry.register(metric!(
36                name: "mz_dataflow_wallclock_lag_seconds",
37                help: "A summary of the second-by-second lag of the dataflow frontier relative \
38                       to wallclock time, aggregated over the last minute.",
39                var_labels: ["instance_id", "replica_id", "collection_id", "quantile"],
40            )),
41            dataflow_wallclock_lag_seconds_sum: metrics_registry.register(metric!(
42                name: "mz_dataflow_wallclock_lag_seconds_sum",
43                help: "The total sum of dataflow wallclock lag measurements.",
44                var_labels: ["instance_id", "replica_id", "collection_id"],
45            )),
46            dataflow_wallclock_lag_seconds_count: metrics_registry.register(metric!(
47                name: "mz_dataflow_wallclock_lag_seconds_count",
48                help: "The total count of dataflow wallclock lag measurements.",
49                var_labels: ["instance_id", "replica_id", "collection_id"],
50            )),
51        }
52    }
53
54    /// Return an object that tracks wallclock lag metrics for the given collection on the given
55    /// cluster and replica.
56    pub fn wallclock_lag_metrics(
57        &self,
58        collection_id: String,
59        instance_id: Option<String>,
60        replica_id: Option<String>,
61    ) -> WallclockLagMetrics {
62        let labels = vec![
63            instance_id.unwrap_or_default(),
64            replica_id.unwrap_or_default(),
65            collection_id,
66        ];
67
68        let labels_with_quantile = |quantile: &str| {
69            labels
70                .iter()
71                .cloned()
72                .chain([quantile.to_string()])
73                .collect()
74        };
75
76        let wallclock_lag_seconds_min = self
77            .dataflow_wallclock_lag_seconds
78            .get_delete_on_drop_metric(labels_with_quantile("0"));
79        let wallclock_lag_seconds_max = self
80            .dataflow_wallclock_lag_seconds
81            .get_delete_on_drop_metric(labels_with_quantile("1"));
82        let wallclock_lag_seconds_sum = self
83            .dataflow_wallclock_lag_seconds_sum
84            .get_delete_on_drop_metric(labels.clone());
85        let wallclock_lag_seconds_count = self
86            .dataflow_wallclock_lag_seconds_count
87            .get_delete_on_drop_metric(labels);
88        let wallclock_lag_minmax = SlidingMinMax::new(60);
89
90        WallclockLagMetrics {
91            wallclock_lag_seconds_min,
92            wallclock_lag_seconds_max,
93            wallclock_lag_seconds_sum,
94            wallclock_lag_seconds_count,
95            wallclock_lag_minmax,
96        }
97    }
98}
99
100/// Metrics tracking frontier wallclock lag for a collection.
101#[derive(Debug)]
102pub struct WallclockLagMetrics {
103    /// Gauge tracking minimum dataflow wallclock lag.
104    wallclock_lag_seconds_min: DeleteOnDropGauge<AtomicF64, Vec<String>>,
105    /// Gauge tracking maximum dataflow wallclock lag.
106    wallclock_lag_seconds_max: DeleteOnDropGauge<AtomicF64, Vec<String>>,
107    /// Counter tracking the total sum of dataflow wallclock lag.
108    wallclock_lag_seconds_sum: DeleteOnDropCounter<AtomicF64, Vec<String>>,
109    /// Counter tracking the total count of dataflow wallclock lag measurements.
110    wallclock_lag_seconds_count: DeleteOnDropCounter<AtomicU64, Vec<String>>,
111
112    /// State maintaining minimum and maximum wallclock lag.
113    wallclock_lag_minmax: SlidingMinMax<f32>,
114}
115
116impl WallclockLagMetrics {
117    /// Observe a new wallclock lag measurement.
118    pub fn observe(&mut self, lag_secs: u64) {
119        let lag_secs = f32::cast_lossy(lag_secs);
120
121        self.wallclock_lag_minmax.add_sample(lag_secs);
122
123        let (&min, &max) = self
124            .wallclock_lag_minmax
125            .get()
126            .expect("just added a sample");
127
128        self.wallclock_lag_seconds_min.set(min.into());
129        self.wallclock_lag_seconds_max.set(max.into());
130        self.wallclock_lag_seconds_sum.inc_by(lag_secs.into());
131        self.wallclock_lag_seconds_count.inc();
132    }
133}