Skip to main content

mz_cluster_client/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Metrics shared by both compute and storage.
11
12use mz_ore::cast::CastLossy;
13use mz_ore::metric;
14use mz_ore::metrics::{
15    CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, GaugeVec, IntCounterVec, MetricVisibility,
16    MetricsRegistry,
17};
18use mz_ore::stats::SlidingMinMax;
19use prometheus::core::{AtomicF64, AtomicU64};
20
21/// Controller metrics.
22#[derive(Debug, Clone)]
23pub struct ControllerMetrics {
24    dataflow_wallclock_lag_seconds: GaugeVec,
25    dataflow_wallclock_lag_seconds_sum: CounterVec,
26    dataflow_wallclock_lag_seconds_count: IntCounterVec,
27}
28
29impl ControllerMetrics {
30    /// Create a metrics instance registered into the given registry.
31    pub fn new(metrics_registry: &MetricsRegistry) -> Self {
32        Self {
33            // The next three metrics immitate a summary metric type. The `prometheus` crate lacks
34            // support for summaries, so we roll our own. Note that we also only expose the 0- and
35            // the 1-quantile, i.e., minimum and maximum lag values.
36            dataflow_wallclock_lag_seconds: metrics_registry.register(metric!(
37                name: "mz_dataflow_wallclock_lag_seconds",
38                help: "A summary of the second-by-second lag of the dataflow frontier relative \
39                       to wallclock time, aggregated over the last minute.",
40                var_labels: ["instance_id", "replica_id", "collection_id", "quantile"],
41                visibility: MetricVisibility::Public,
42            )),
43            dataflow_wallclock_lag_seconds_sum: metrics_registry.register(metric!(
44                name: "mz_dataflow_wallclock_lag_seconds_sum",
45                help: "The total sum of dataflow wallclock lag measurements.",
46                var_labels: ["instance_id", "replica_id", "collection_id"],
47            )),
48            dataflow_wallclock_lag_seconds_count: metrics_registry.register(metric!(
49                name: "mz_dataflow_wallclock_lag_seconds_count",
50                help: "The total count of dataflow wallclock lag measurements.",
51                var_labels: ["instance_id", "replica_id", "collection_id"],
52            )),
53        }
54    }
55
56    /// Return an object that tracks wallclock lag metrics for the given collection on the given
57    /// cluster and replica.
58    pub fn wallclock_lag_metrics(
59        &self,
60        collection_id: String,
61        instance_id: Option<String>,
62        replica_id: Option<String>,
63    ) -> WallclockLagMetrics {
64        let labels = vec![
65            instance_id.unwrap_or_default(),
66            replica_id.unwrap_or_default(),
67            collection_id,
68        ];
69
70        let labels_with_quantile = |quantile: &str| {
71            labels
72                .iter()
73                .cloned()
74                .chain([quantile.to_string()])
75                .collect()
76        };
77
78        let wallclock_lag_seconds_min = self
79            .dataflow_wallclock_lag_seconds
80            .get_delete_on_drop_metric(labels_with_quantile("0"));
81        let wallclock_lag_seconds_max = self
82            .dataflow_wallclock_lag_seconds
83            .get_delete_on_drop_metric(labels_with_quantile("1"));
84        let wallclock_lag_seconds_sum = self
85            .dataflow_wallclock_lag_seconds_sum
86            .get_delete_on_drop_metric(labels.clone());
87        let wallclock_lag_seconds_count = self
88            .dataflow_wallclock_lag_seconds_count
89            .get_delete_on_drop_metric(labels);
90        let wallclock_lag_minmax = SlidingMinMax::new(60);
91
92        WallclockLagMetrics {
93            wallclock_lag_seconds_min,
94            wallclock_lag_seconds_max,
95            wallclock_lag_seconds_sum,
96            wallclock_lag_seconds_count,
97            wallclock_lag_minmax,
98        }
99    }
100}
101
102/// Metrics tracking frontier wallclock lag for a collection.
103#[derive(Debug)]
104pub struct WallclockLagMetrics {
105    /// Gauge tracking minimum dataflow wallclock lag.
106    wallclock_lag_seconds_min: DeleteOnDropGauge<AtomicF64, Vec<String>>,
107    /// Gauge tracking maximum dataflow wallclock lag.
108    wallclock_lag_seconds_max: DeleteOnDropGauge<AtomicF64, Vec<String>>,
109    /// Counter tracking the total sum of dataflow wallclock lag.
110    wallclock_lag_seconds_sum: DeleteOnDropCounter<AtomicF64, Vec<String>>,
111    /// Counter tracking the total count of dataflow wallclock lag measurements.
112    wallclock_lag_seconds_count: DeleteOnDropCounter<AtomicU64, Vec<String>>,
113
114    /// State maintaining minimum and maximum wallclock lag.
115    wallclock_lag_minmax: SlidingMinMax<f32>,
116}
117
118impl WallclockLagMetrics {
119    /// Observe a new wallclock lag measurement.
120    pub fn observe(&mut self, lag_secs: u64) {
121        let lag_secs = f32::cast_lossy(lag_secs);
122
123        self.wallclock_lag_minmax.add_sample(lag_secs);
124
125        let (&min, &max) = self
126            .wallclock_lag_minmax
127            .get()
128            .expect("just added a sample");
129
130        self.wallclock_lag_seconds_min.set(min.into());
131        self.wallclock_lag_seconds_max.set(max.into());
132        self.wallclock_lag_seconds_sum.inc_by(lag_secs.into());
133        self.wallclock_lag_seconds_count.inc();
134    }
135}