mz_cluster_client/
metrics.rs1use mz_ore::cast::CastLossy;
13use mz_ore::metric;
14use mz_ore::metrics::{
15 CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, GaugeVec, IntCounterVec, MetricVisibility,
16 MetricsRegistry,
17};
18use mz_ore::stats::SlidingMinMax;
19use prometheus::core::{AtomicF64, AtomicU64};
20
21#[derive(Debug, Clone)]
23pub struct ControllerMetrics {
24 dataflow_wallclock_lag_seconds: GaugeVec,
25 dataflow_wallclock_lag_seconds_sum: CounterVec,
26 dataflow_wallclock_lag_seconds_count: IntCounterVec,
27}
28
29impl ControllerMetrics {
30 pub fn new(metrics_registry: &MetricsRegistry) -> Self {
32 Self {
33 dataflow_wallclock_lag_seconds: metrics_registry.register(metric!(
37 name: "mz_dataflow_wallclock_lag_seconds",
38 help: "A summary of the second-by-second lag of the dataflow frontier relative \
39 to wallclock time, aggregated over the last minute.",
40 var_labels: ["instance_id", "replica_id", "collection_id", "quantile"],
41 visibility: MetricVisibility::Public,
42 )),
43 dataflow_wallclock_lag_seconds_sum: metrics_registry.register(metric!(
44 name: "mz_dataflow_wallclock_lag_seconds_sum",
45 help: "The total sum of dataflow wallclock lag measurements.",
46 var_labels: ["instance_id", "replica_id", "collection_id"],
47 )),
48 dataflow_wallclock_lag_seconds_count: metrics_registry.register(metric!(
49 name: "mz_dataflow_wallclock_lag_seconds_count",
50 help: "The total count of dataflow wallclock lag measurements.",
51 var_labels: ["instance_id", "replica_id", "collection_id"],
52 )),
53 }
54 }
55
56 pub fn wallclock_lag_metrics(
59 &self,
60 collection_id: String,
61 instance_id: Option<String>,
62 replica_id: Option<String>,
63 ) -> WallclockLagMetrics {
64 let labels = vec![
65 instance_id.unwrap_or_default(),
66 replica_id.unwrap_or_default(),
67 collection_id,
68 ];
69
70 let labels_with_quantile = |quantile: &str| {
71 labels
72 .iter()
73 .cloned()
74 .chain([quantile.to_string()])
75 .collect()
76 };
77
78 let wallclock_lag_seconds_min = self
79 .dataflow_wallclock_lag_seconds
80 .get_delete_on_drop_metric(labels_with_quantile("0"));
81 let wallclock_lag_seconds_max = self
82 .dataflow_wallclock_lag_seconds
83 .get_delete_on_drop_metric(labels_with_quantile("1"));
84 let wallclock_lag_seconds_sum = self
85 .dataflow_wallclock_lag_seconds_sum
86 .get_delete_on_drop_metric(labels.clone());
87 let wallclock_lag_seconds_count = self
88 .dataflow_wallclock_lag_seconds_count
89 .get_delete_on_drop_metric(labels);
90 let wallclock_lag_minmax = SlidingMinMax::new(60);
91
92 WallclockLagMetrics {
93 wallclock_lag_seconds_min,
94 wallclock_lag_seconds_max,
95 wallclock_lag_seconds_sum,
96 wallclock_lag_seconds_count,
97 wallclock_lag_minmax,
98 }
99 }
100}
101
102#[derive(Debug)]
104pub struct WallclockLagMetrics {
105 wallclock_lag_seconds_min: DeleteOnDropGauge<AtomicF64, Vec<String>>,
107 wallclock_lag_seconds_max: DeleteOnDropGauge<AtomicF64, Vec<String>>,
109 wallclock_lag_seconds_sum: DeleteOnDropCounter<AtomicF64, Vec<String>>,
111 wallclock_lag_seconds_count: DeleteOnDropCounter<AtomicU64, Vec<String>>,
113
114 wallclock_lag_minmax: SlidingMinMax<f32>,
116}
117
118impl WallclockLagMetrics {
119 pub fn observe(&mut self, lag_secs: u64) {
121 let lag_secs = f32::cast_lossy(lag_secs);
122
123 self.wallclock_lag_minmax.add_sample(lag_secs);
124
125 let (&min, &max) = self
126 .wallclock_lag_minmax
127 .get()
128 .expect("just added a sample");
129
130 self.wallclock_lag_seconds_min.set(min.into());
131 self.wallclock_lag_seconds_max.set(max.into());
132 self.wallclock_lag_seconds_sum.inc_by(lag_secs.into());
133 self.wallclock_lag_seconds_count.inc();
134 }
135}