1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910//! Metrics shared by both compute and storage.
1112use std::time::Duration;
1314use mz_ore::metric;
15use mz_ore::metrics::{
16 CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, GaugeVec, IntCounterVec, MetricsRegistry,
17};
18use mz_ore::stats::SlidingMinMax;
19use prometheus::core::{AtomicF64, AtomicU64};
2021/// Controller metrics.
22#[derive(Debug, Clone)]
23pub struct ControllerMetrics {
24 dataflow_wallclock_lag_seconds: GaugeVec,
25 dataflow_wallclock_lag_seconds_sum: CounterVec,
26 dataflow_wallclock_lag_seconds_count: IntCounterVec,
27}
2829impl ControllerMetrics {
30/// Create a metrics instance registered into the given registry.
31pub fn new(metrics_registry: &MetricsRegistry) -> Self {
32Self {
33// The next three metrics immitate a summary metric type. The `prometheus` crate lacks
34 // support for summaries, so we roll our own. Note that we also only expose the 0- and
35 // the 1-quantile, i.e., minimum and maximum lag values.
36dataflow_wallclock_lag_seconds: metrics_registry.register(metric!(
37 name: "mz_dataflow_wallclock_lag_seconds",
38 help: "A summary of the second-by-second lag of the dataflow frontier relative \
39 to wallclock time, aggregated over the last minute.",
40 var_labels: ["instance_id", "replica_id", "collection_id", "quantile"],
41 )),
42 dataflow_wallclock_lag_seconds_sum: metrics_registry.register(metric!(
43 name: "mz_dataflow_wallclock_lag_seconds_sum",
44 help: "The total sum of dataflow wallclock lag measurements.",
45 var_labels: ["instance_id", "replica_id", "collection_id"],
46 )),
47 dataflow_wallclock_lag_seconds_count: metrics_registry.register(metric!(
48 name: "mz_dataflow_wallclock_lag_seconds_count",
49 help: "The total count of dataflow wallclock lag measurements.",
50 var_labels: ["instance_id", "replica_id", "collection_id"],
51 )),
52 }
53 }
5455/// Return an object that tracks wallclock lag metrics for the given collection on the given
56 /// cluster and replica.
57pub fn wallclock_lag_metrics(
58&self,
59 collection_id: String,
60 instance_id: Option<String>,
61 replica_id: Option<String>,
62 ) -> WallclockLagMetrics {
63let labels = vec![
64 instance_id.unwrap_or_default(),
65 replica_id.unwrap_or_default(),
66 collection_id,
67 ];
6869let labels_with_quantile = |quantile: &str| {
70 labels
71 .iter()
72 .cloned()
73 .chain([quantile.to_string()])
74 .collect()
75 };
7677let wallclock_lag_seconds_min = self
78.dataflow_wallclock_lag_seconds
79 .get_delete_on_drop_metric(labels_with_quantile("0"));
80let wallclock_lag_seconds_max = self
81.dataflow_wallclock_lag_seconds
82 .get_delete_on_drop_metric(labels_with_quantile("1"));
83let wallclock_lag_seconds_sum = self
84.dataflow_wallclock_lag_seconds_sum
85 .get_delete_on_drop_metric(labels.clone());
86let wallclock_lag_seconds_count = self
87.dataflow_wallclock_lag_seconds_count
88 .get_delete_on_drop_metric(labels);
89let wallclock_lag_minmax = SlidingMinMax::new(60);
9091 WallclockLagMetrics {
92 wallclock_lag_seconds_min,
93 wallclock_lag_seconds_max,
94 wallclock_lag_seconds_sum,
95 wallclock_lag_seconds_count,
96 wallclock_lag_minmax,
97 }
98 }
99}
100101/// Metrics tracking frontier wallclock lag for a collection.
102#[derive(Debug)]
103pub struct WallclockLagMetrics {
104/// Gauge tracking minimum dataflow wallclock lag.
105wallclock_lag_seconds_min: DeleteOnDropGauge<AtomicF64, Vec<String>>,
106/// Gauge tracking maximum dataflow wallclock lag.
107wallclock_lag_seconds_max: DeleteOnDropGauge<AtomicF64, Vec<String>>,
108/// Counter tracking the total sum of dataflow wallclock lag.
109wallclock_lag_seconds_sum: DeleteOnDropCounter<AtomicF64, Vec<String>>,
110/// Counter tracking the total count of dataflow wallclock lag measurements.
111wallclock_lag_seconds_count: DeleteOnDropCounter<AtomicU64, Vec<String>>,
112113/// State maintaining minimum and maximum wallclock lag.
114wallclock_lag_minmax: SlidingMinMax<f32>,
115}
116117impl WallclockLagMetrics {
118/// Observe a new wallclock lag measurement.
119pub fn observe(&mut self, lag: Duration) {
120let lag_secs = lag.as_secs_f32();
121122self.wallclock_lag_minmax.add_sample(lag_secs);
123124let (&min, &max) = self
125.wallclock_lag_minmax
126 .get()
127 .expect("just added a sample");
128129self.wallclock_lag_seconds_min.set(min.into());
130self.wallclock_lag_seconds_max.set(max.into());
131self.wallclock_lag_seconds_sum.inc_by(lag_secs.into());
132self.wallclock_lag_seconds_count.inc();
133 }
134}