mz_cluster_client/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Metrics shared by both compute and storage.
11
12use std::time::Duration;
13
14use mz_ore::metric;
15use mz_ore::metrics::{
16    CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, GaugeVec, IntCounterVec, MetricsRegistry,
17};
18use mz_ore::stats::SlidingMinMax;
19use prometheus::core::{AtomicF64, AtomicU64};
20
21/// Controller metrics.
22#[derive(Debug, Clone)]
23pub struct ControllerMetrics {
24    dataflow_wallclock_lag_seconds: GaugeVec,
25    dataflow_wallclock_lag_seconds_sum: CounterVec,
26    dataflow_wallclock_lag_seconds_count: IntCounterVec,
27}
28
29impl ControllerMetrics {
30    /// Create a metrics instance registered into the given registry.
31    pub fn new(metrics_registry: &MetricsRegistry) -> Self {
32        Self {
33            // The next three metrics immitate a summary metric type. The `prometheus` crate lacks
34            // support for summaries, so we roll our own. Note that we also only expose the 0- and
35            // the 1-quantile, i.e., minimum and maximum lag values.
36            dataflow_wallclock_lag_seconds: metrics_registry.register(metric!(
37                name: "mz_dataflow_wallclock_lag_seconds",
38                help: "A summary of the second-by-second lag of the dataflow frontier relative \
39                       to wallclock time, aggregated over the last minute.",
40                var_labels: ["instance_id", "replica_id", "collection_id", "quantile"],
41            )),
42            dataflow_wallclock_lag_seconds_sum: metrics_registry.register(metric!(
43                name: "mz_dataflow_wallclock_lag_seconds_sum",
44                help: "The total sum of dataflow wallclock lag measurements.",
45                var_labels: ["instance_id", "replica_id", "collection_id"],
46            )),
47            dataflow_wallclock_lag_seconds_count: metrics_registry.register(metric!(
48                name: "mz_dataflow_wallclock_lag_seconds_count",
49                help: "The total count of dataflow wallclock lag measurements.",
50                var_labels: ["instance_id", "replica_id", "collection_id"],
51            )),
52        }
53    }
54
55    /// Return an object that tracks wallclock lag metrics for the given collection on the given
56    /// cluster and replica.
57    pub fn wallclock_lag_metrics(
58        &self,
59        collection_id: String,
60        instance_id: Option<String>,
61        replica_id: Option<String>,
62    ) -> WallclockLagMetrics {
63        let labels = vec![
64            instance_id.unwrap_or_default(),
65            replica_id.unwrap_or_default(),
66            collection_id,
67        ];
68
69        let labels_with_quantile = |quantile: &str| {
70            labels
71                .iter()
72                .cloned()
73                .chain([quantile.to_string()])
74                .collect()
75        };
76
77        let wallclock_lag_seconds_min = self
78            .dataflow_wallclock_lag_seconds
79            .get_delete_on_drop_metric(labels_with_quantile("0"));
80        let wallclock_lag_seconds_max = self
81            .dataflow_wallclock_lag_seconds
82            .get_delete_on_drop_metric(labels_with_quantile("1"));
83        let wallclock_lag_seconds_sum = self
84            .dataflow_wallclock_lag_seconds_sum
85            .get_delete_on_drop_metric(labels.clone());
86        let wallclock_lag_seconds_count = self
87            .dataflow_wallclock_lag_seconds_count
88            .get_delete_on_drop_metric(labels);
89        let wallclock_lag_minmax = SlidingMinMax::new(60);
90
91        WallclockLagMetrics {
92            wallclock_lag_seconds_min,
93            wallclock_lag_seconds_max,
94            wallclock_lag_seconds_sum,
95            wallclock_lag_seconds_count,
96            wallclock_lag_minmax,
97        }
98    }
99}
100
101/// Metrics tracking frontier wallclock lag for a collection.
102#[derive(Debug)]
103pub struct WallclockLagMetrics {
104    /// Gauge tracking minimum dataflow wallclock lag.
105    wallclock_lag_seconds_min: DeleteOnDropGauge<AtomicF64, Vec<String>>,
106    /// Gauge tracking maximum dataflow wallclock lag.
107    wallclock_lag_seconds_max: DeleteOnDropGauge<AtomicF64, Vec<String>>,
108    /// Counter tracking the total sum of dataflow wallclock lag.
109    wallclock_lag_seconds_sum: DeleteOnDropCounter<AtomicF64, Vec<String>>,
110    /// Counter tracking the total count of dataflow wallclock lag measurements.
111    wallclock_lag_seconds_count: DeleteOnDropCounter<AtomicU64, Vec<String>>,
112
113    /// State maintaining minimum and maximum wallclock lag.
114    wallclock_lag_minmax: SlidingMinMax<f32>,
115}
116
117impl WallclockLagMetrics {
118    /// Observe a new wallclock lag measurement.
119    pub fn observe(&mut self, lag: Duration) {
120        let lag_secs = lag.as_secs_f32();
121
122        self.wallclock_lag_minmax.add_sample(lag_secs);
123
124        let (&min, &max) = self
125            .wallclock_lag_minmax
126            .get()
127            .expect("just added a sample");
128
129        self.wallclock_lag_seconds_min.set(min.into());
130        self.wallclock_lag_seconds_max.set(max.into());
131        self.wallclock_lag_seconds_sum.inc_by(lag_secs.into());
132        self.wallclock_lag_seconds_count.inc();
133    }
134}