mz_cluster_client/
metrics.rs1use mz_ore::cast::CastLossy;
13use mz_ore::metric;
14use mz_ore::metrics::{
15 CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, GaugeVec, IntCounterVec, MetricsRegistry,
16};
17use mz_ore::stats::SlidingMinMax;
18use prometheus::core::{AtomicF64, AtomicU64};
19
20#[derive(Debug, Clone)]
22pub struct ControllerMetrics {
23 dataflow_wallclock_lag_seconds: GaugeVec,
24 dataflow_wallclock_lag_seconds_sum: CounterVec,
25 dataflow_wallclock_lag_seconds_count: IntCounterVec,
26}
27
28impl ControllerMetrics {
29 pub fn new(metrics_registry: &MetricsRegistry) -> Self {
31 Self {
32 dataflow_wallclock_lag_seconds: metrics_registry.register(metric!(
36 name: "mz_dataflow_wallclock_lag_seconds",
37 help: "A summary of the second-by-second lag of the dataflow frontier relative \
38 to wallclock time, aggregated over the last minute.",
39 var_labels: ["instance_id", "replica_id", "collection_id", "quantile"],
40 )),
41 dataflow_wallclock_lag_seconds_sum: metrics_registry.register(metric!(
42 name: "mz_dataflow_wallclock_lag_seconds_sum",
43 help: "The total sum of dataflow wallclock lag measurements.",
44 var_labels: ["instance_id", "replica_id", "collection_id"],
45 )),
46 dataflow_wallclock_lag_seconds_count: metrics_registry.register(metric!(
47 name: "mz_dataflow_wallclock_lag_seconds_count",
48 help: "The total count of dataflow wallclock lag measurements.",
49 var_labels: ["instance_id", "replica_id", "collection_id"],
50 )),
51 }
52 }
53
54 pub fn wallclock_lag_metrics(
57 &self,
58 collection_id: String,
59 instance_id: Option<String>,
60 replica_id: Option<String>,
61 ) -> WallclockLagMetrics {
62 let labels = vec![
63 instance_id.unwrap_or_default(),
64 replica_id.unwrap_or_default(),
65 collection_id,
66 ];
67
68 let labels_with_quantile = |quantile: &str| {
69 labels
70 .iter()
71 .cloned()
72 .chain([quantile.to_string()])
73 .collect()
74 };
75
76 let wallclock_lag_seconds_min = self
77 .dataflow_wallclock_lag_seconds
78 .get_delete_on_drop_metric(labels_with_quantile("0"));
79 let wallclock_lag_seconds_max = self
80 .dataflow_wallclock_lag_seconds
81 .get_delete_on_drop_metric(labels_with_quantile("1"));
82 let wallclock_lag_seconds_sum = self
83 .dataflow_wallclock_lag_seconds_sum
84 .get_delete_on_drop_metric(labels.clone());
85 let wallclock_lag_seconds_count = self
86 .dataflow_wallclock_lag_seconds_count
87 .get_delete_on_drop_metric(labels);
88 let wallclock_lag_minmax = SlidingMinMax::new(60);
89
90 WallclockLagMetrics {
91 wallclock_lag_seconds_min,
92 wallclock_lag_seconds_max,
93 wallclock_lag_seconds_sum,
94 wallclock_lag_seconds_count,
95 wallclock_lag_minmax,
96 }
97 }
98}
99
100#[derive(Debug)]
102pub struct WallclockLagMetrics {
103 wallclock_lag_seconds_min: DeleteOnDropGauge<AtomicF64, Vec<String>>,
105 wallclock_lag_seconds_max: DeleteOnDropGauge<AtomicF64, Vec<String>>,
107 wallclock_lag_seconds_sum: DeleteOnDropCounter<AtomicF64, Vec<String>>,
109 wallclock_lag_seconds_count: DeleteOnDropCounter<AtomicU64, Vec<String>>,
111
112 wallclock_lag_minmax: SlidingMinMax<f32>,
114}
115
116impl WallclockLagMetrics {
117 pub fn observe(&mut self, lag_secs: u64) {
119 let lag_secs = f32::cast_lossy(lag_secs);
120
121 self.wallclock_lag_minmax.add_sample(lag_secs);
122
123 let (&min, &max) = self
124 .wallclock_lag_minmax
125 .get()
126 .expect("just added a sample");
127
128 self.wallclock_lag_seconds_min.set(min.into());
129 self.wallclock_lag_seconds_max.set(max.into());
130 self.wallclock_lag_seconds_sum.inc_by(lag_secs.into());
131 self.wallclock_lag_seconds_count.inc();
132 }
133}